.NET/C# 客户端 API 指南
概述
本指南涵盖了 RabbitMQ .NET/C# 客户端 7.0 版本及其公共 API。它假设您使用的是 最新主版本客户端,并且读者已熟悉 基础知识。
本指南的关键部分包括
- .NET 版本要求
- 公共 API 中的重要接口和类
- 限制
- 连接到 RabbitMQ
- 连接和通道的生命周期
- 客户端提供的连接名称
- 使用交换器和队列
- 发布消息
- 使用订阅进行消费 和 消费者内存安全
- 并发注意事项和安全
- 网络故障自动恢复
- OAuth 2 支持
可以单独获取 API 参考。
.NET 版本要求
此库的 7.0.x 和 6.8.x 发布系列需要 .NET 4.6.1+ 或 .NET Standard 2.0+ 实现。
许可证
该库是开源的,在 GitHub 上开发,并根据以下许可证双重许可:
这意味着用户可以将该库视为根据上述列表中的任何许可证许可。例如,用户可以选择 Apache Public License 2.0 并将此客户端包含到商业产品中。
主要命名空间、接口和类
客户端 API 与 AMQP 0-9-1 协议模型 非常相似,并增加了额外的抽象以方便使用。
可以单独获取 API 参考。
核心 API 接口和类定义在 RabbitMQ.Client 命名空间中
using RabbitMQ.Client;
核心 API 接口和类包括
IChannel:表示一个 AMQP 0-9-1 通道,并提供大多数操作(协议方法)IConnection:表示一个 AMQP 0-9-1 连接ConnectionFactory:用于创建IConnection实例IAsyncBasicConsumer:表示一个消息消费者
其他有用的接口和类包括
AsyncDefaultBasicConsumer:常用的消费者基类
除了 RabbitMQ.Client 之外的公共命名空间包括
RabbitMQ.Client.Events:客户端库的各种事件和事件处理程序,包括AsyncEventingBasicConsumer,这是一个围绕 C# 事件处理程序构建的消费者实现。RabbitMQ.Client.Exceptions:用户可见的异常。
所有其他命名空间都保留给库的私有实现细节,尽管私有命名空间中的成员通常可供使用该库的应用程序访问,以便开发人员能够实现他们发现的库实现中的故障和遗漏的解决方法。应用程序不能依赖出现在私有命名空间中的任何类、接口、成员变量等在库版本之间保持稳定。
限制
此客户端不支持无符号 64 位整数,用类型 ulong 表示。尝试编码 ulong 值将引发异常。请注意,支持有符号 64 位整数。
这部分是由于类型标记 AMQP 0-9-1 规范中的歧义,部分原因是 其他流行客户端支持的类型列表。
连接到 RabbitMQ
在应用程序可以使用 RabbitMQ 之前,它必须打开与 RabbitMQ 节点的连接。然后,该连接将用于执行所有后续操作。连接
旨在长时间运行。为每个操作(例如,发布消息)打开连接效率会非常低,
强烈不推荐。
要使用 .NET 客户端打开连接,首先实例化一个 ConnectionFactory 并对其进行配置,使其使用所需的 hostname、虚拟主机、凭据、TLS 设置以及任何其他所需参数。
然后 await ConnectionFactory.CreateConnectionAsync() 方法打开连接。可以在服务器日志中观察到成功的和不成功的客户端连接事件。
以下两个代码片段使用 hostName 属性配置的 hostname 连接到 RabbitMQ 节点
ConnectionFactory factory = new ConnectionFactory();
// "guest"/"guest" by default, limited to localhost connections
factory.UserName = user;
factory.Password = pass;
factory.VirtualHost = vhost;
factory.HostName = hostName;
IConnection conn = await factory.CreateConnectionAsync();
ConnectionFactory factory = new ConnectionFactory();
factory.Uri = new Uri("amqp://user:pass@hostName:port/vhost");
IConnection conn = await factory.CreateConnectionAsync();
使用端点列表
连接时可以指定端点列表。将使用第一个可访问的端点。在连接失败的情况下,使用端点列表可以使应用程序在原始节点宕机时连接到另一个节点。
要使用多个端点,请将 AmqpTcpEndpoint 列表提供给 ConnectionFactory#CreateConnection。AmqpTcpEndpoint 表示一个 hostname 和端口对。
ConnectionFactory factory = new ConnectionFactory();
factory.UserName = "username";
factory.Password = "s3Kre7";
var endpoints = new System.Collections.Generic.List<AmqpTcpEndpoint> {
new AmqpTcpEndpoint("hostname"),
new AmqpTcpEndpoint("localhost")
};
IConnection conn = await factory.CreateConnectionAsync(endpoints);
由于 .NET 客户端比其他客户端使用更严格的 AMQP 0-9-1 URI 规范 解释,因此在使用 URI 时必须小心。特别是,不能省略主机部分,并且名称为空的虚拟主机是不可寻址的。
所有工厂属性都有默认值。如果属性在创建连接之前未赋值,则将使用属性的默认值
| 属性 | 默认值 |
| 用户名 | "guest" |
| 密码 | "guest" |
| 虚拟主机 | "/" |
| 主机名 | "localhost" |
| 端口 | 常规(“纯 TCP”)连接为 |
请注意,用户 guest 默认只能从 localhost 连接。这是为了限制在生产系统中周知凭证的使用。
然后可以使用 IConnection 接口打开一个通道
IChannel channel = await conn.CreateChannelAsync();
现在可以使用该通道发送和接收消息,如后续章节所述。
与连接一样,通道
旨在长时间运行。为每个操作打开新通道效率非常低,
强烈不推荐。但是,通道的生命周期可以比连接短。例如,某些协议错误会自动关闭通道。如果应用程序可以从中恢复,它们可以打开新通道并重试操作。
这在 通道指南 以及 消费者确认 等其他指南中有更详细的介绍。
断开与 RabbitMQ 的连接
要断开连接,只需关闭通道和连接
await channel.CloseAsync();
await conn.CloseAsync();
await channel.DisposeAsync();
await conn.DisposeAsync();
虽然处置通道和连接对象就足够了,但最佳实践是先显式关闭它们。
可以在服务器节点日志中观察到客户端断开连接事件。
连接和通道的生命周期
连接旨在长时间运行。底层协议是为长时间运行的连接设计的并进行了优化。这意味着为每个操作(例如,发布一条消息)打开新连接是不必要的,并且强烈不推荐,因为它会引入大量的网络往返和开销。
通道也旨在长时间运行,但由于许多可恢复的协议错误会导致通道关闭,因此通道的生命周期可能比其连接短。为每个操作关闭和打开新通道通常是不必要的,但可能是合适的。如有疑问,请考虑先重用通道。
通道级别的异常,例如尝试从不存在的队列进行消费,将导致通道关闭。已关闭的通道将无法再使用,也不会再接收来自服务器的任何事件(如消息传递)。通道级别的异常将在 RabbitMQ 中记录,并将启动通道的关闭序列(见下文)。
客户端提供的连接名称
RabbitMQ 节点关于其客户端的信息有限:
- 它们的 TCP 端点(源 IP 地址和端口)
- 使用的凭据
仅凭这些信息就可能在识别应用程序和实例时遇到问题,尤其是在凭据可以共享,客户端通过负载均衡器连接,但 Proxy Protocol 无法启用时。
为了方便在服务器日志和管理 UI 中识别客户端,AMQP 0-9-1 客户端连接(包括 RabbitMQ .NET 客户端)可以提供自定义标识符。如果设置了标识符,它将在日志条目和管理 UI 中提及。该标识符称为
客户端提供的连接名称。该名称可用于识别应用程序或应用程序中的特定组件。该名称是可选的;但是,强烈建议开发人员提供一个,因为它将大大简化某些操作任务。
RabbitMQ .NET 客户端提供了一个连接工厂属性,ConnectionFactory.ClientProvidedName,如果设置了该属性,它将控制此工厂打开的所有新连接的客户端提供的连接名称。
这是一个修改后的连接示例,其中提供了这样的名称
ConnectionFactory factory = new ConnectionFactory();
// "guest"/"guest" by default, limited to localhost connections
factory.UserName = user;
factory.Password = pass;
factory.VirtualHost = vhost;
factory.HostName = hostName;
// this name will be shared by all connections instantiated by
// this factory
factory.ClientProvidedName = "app:audit component:event-consumer";
IConnection conn = await factory.CreateConnectionAsync();
使用交换器和队列
客户端应用程序与交换器和队列(协议的高级构建块)进行交互。在使用它们之前必须“声明”它们。声明任何类型的对象只是确保名称相同的对象存在,如果需要则创建它。
继续前面的示例,以下代码声明一个交换器和一个队列,然后将它们绑定在一起。
await channel.ExchangeDeclareAsync(exchangeName, ExchangeType.Direct);
await channel.QueueDeclareAsync(queueName, false, false, false, null);
await channel.QueueBindAsync(queueName, exchangeName, routingKey, null);
这将主动声明以下对象
- 一个非持久、非自动删除的“direct”类型交换器
- 一个非持久、非自动删除、非独占的队列
可以通过使用其他参数来自定义交换器。上面的代码然后将队列与具有给定路由键的交换器绑定。
许多通道 API (IChannel) 方法是重载的。ExchangeDeclare 的方便的短形式使用合理的默认值。还有带有更多参数的长形式,让您可以根据需要覆盖这些默认值,从而在需要时提供完全的控制。
这种“短版本、长版本”模式在 API 中普遍使用。
被动声明
队列和交换器可以“被动”声明。被动声明只是检查具有提供名称的实体是否存在。如果存在,则操作是无操作。对于队列,成功的被动声明将返回与非被动声明相同的信息,即队列中处于就绪状态的消费者数量和消息数量。
如果实体不存在,则操作将因通道级别异常而失败。此后无法使用该通道。应打开新通道。通常使用一次性(临时)通道进行被动声明。
IChannel#QueueDeclarePassiveAsync 和 IChannel#ExchangeDeclarePassiveAsync 是用于被动声明的方法。以下示例演示了 IChannel#QueueDeclarePassive
var response = await channel.QueueDeclarePassiveAsync("queue-name");
// returns the number of messages in Ready state in the queue
response.MessageCount;
// returns the number of consumers the queue has
response.ConsumerCount;
IChannel#ExchangeDeclarePassiveAsync 的返回值不包含任何有用信息。因此,如果该方法返回且未发生通道异常,则表示该交换器确实存在。
具有可选响应的操作
一些常用操作也具有“no wait”版本,它不会等待服务器响应。例如,要声明一个队列并指示服务器不发送任何响应,请使用
await channel.QueueDeclareAsync(queueName, true, false, false, null, noWait: true);
“no wait”版本效率更高,但提供的安全性较低,例如,它们更依赖于心跳机制来检测失败的操作。如有疑问,请先从标准版本开始。仅在拓扑(队列、绑定)频繁变化的场景中才需要“no wait”版本。
删除实体和清除消息
队列或交换器可以显式删除
await channel.QueueDeleteAsync("queue-name", false, false);
仅当队列为空时才能删除它
await channel.QueueDeleteAsync("queue-name", false, true);
或者当它未使用(没有任何消费者)时
await channel.QueueDeleteAsync("queue-name", true, false);
可以清除队列(删除其中的所有消息)
await channel.QueuePurgeAsync("queue-name");
发布消息
要将消息发布到交换器,请使用 IChannel.BasicPublishAsync,如下所示
byte[] messageBodyBytes = System.Text.Encoding.UTF8.GetBytes("Hello, world!");
var props = new BasicProperties();
await channel.BasicPublishAsync(exchangeName, routingKey, false, props, messageBodyBytes);
为了进行精细控制,您可以使用重载变体来指定 mandatory 标志,或指定消息属性
byte[] messageBodyBytes = System.Text.Encoding.UTF8.GetBytes("Hello, world!");
var props = new BasicProperties();
props.ContentType = "text/plain";
props.DeliveryMode = 2;
await channel.BasicPublishAsync(exchangeName, routingKey,
mandatory: true, basicProperties: props, body: messageBodyBytes);
这将发送一个 mandatory:true消息,交付模式为 2(持久化),内容类型为“text/plain”。有关可用消息属性的更多信息,请参阅 IBasicProperties 接口的定义。
在下面的示例中,我们发布带有自定义标头的消息
byte[] messageBodyBytes = System.Text.Encoding.UTF8.GetBytes("Hello, world!");
var props = new BasicProperties();
props.ContentType = "text/plain";
props.DeliveryMode = 2;
props.Headers = new Dictionary<string, object>();
props.Headers.Add("latitude", 51.5252949);
props.Headers.Add("longitude", -0.0905493);
await channel.BasicPublishAsync(exchangeName, routingKey, true, props, messageBodyBytes);
下面的代码示例设置了消息的过期时间
byte[] messageBodyBytes = System.Text.Encoding.UTF8.GetBytes("Hello, world!");
var props = new BasicProperties();
props.ContentType = "text/plain";
props.DeliveryMode = 2;
props.Expiration = "36000000";
await channel.BasicPublishAsync(exchangeName, routingKey, true, props, messageBodyBytes);
按订阅检索消息(“推 API” kecepatan 速度)
接收消息推荐且最方便的方式是使用 IAsyncBasicConsumer 接口设置订阅。然后,消息将在到达时自动传递,而不是需要主动请求。
实现消费者的一个方法是使用便捷类 AsyncEventingBasicConsumer,它将传递和其他消费者生命周期事件作为 C# 事件分派
var consumer = new AsyncEventingBasicConsumer(channel);
consumer.ReceivedAsync += async (ch, ea) =>
{
var body = ea.Body.ToArray();
// copy or deserialise the payload
// and process the message
// ...
await channel.BasicAckAsync(ea.DeliveryTag, false);
};
// this consumer tag identifies the subscription
// when it has to be cancelled
string consumerTag = await channel.BasicConsumeAsync(queueName, false, consumer);
另一个选择是继承 AsyncDefaultBasicConsumer,根据需要重写方法,或者直接实现 IAsyncBasicConsumer。您通常需要实现核心方法 IAsyncBasicConsumer.HandleBasicDeliverAsync。
更复杂的消费者需要实现更多方法。特别是,HandleChannelShutdown 用于捕获通道/连接关闭。消费者还可以实现 HandleBasicCancelOk 以通知取消。
AsyncDefaultBasicConsumer 的 ConsumerTag 属性可用于检索服务器生成的消费者标签,如果原始 IChannel.BasicConsumeAsync 调用中未提供该标签。
您可以使用 IChannel.BasicCancelAsync 取消活动消费者
await channel.BasicCancelAsync(consumerTag);
在调用 API 方法时,您始终通过消费者标签来引用消费者,这些标签可以由客户端或服务器生成,如 AMQP 0-9-1 规范 文档中所述。
消费者内存安全要求
从 .NET 客户端的 7.0 版本开始,消息负载使用来自System.Memory 库的 System.ReadOnlyMemory<byte> 类型表示。
RabbitMQ.Client 库对应用程序访问只读内存跨度的时间施加了某些限制。
重要提示:消费者接口实现
必须在传递处理程序方法返回之前反序列化或复制传递负载。保留对负载的引用是不安全的:在处理程序返回后,分配给它的内存可能会随时被释放。
获取单个消息(轮询或“拉 API”)
也可以按需检索单个消息(“拉 API”又名轮询)。这种消费方法
非常低效,因为它实际上是在轮询,应用程序需要反复请求结果,即使绝大多数请求都没有结果。因此
强烈不推荐使用此方法。
要“拉”一条消息,请使用 IChannel.BasicGetAsync 方法。返回值是 BasicGetResult 的实例,可以从中提取标头信息(属性)和消息正文
bool autoAck = false;
BasicGetResult result = await channel.BasicGetAsync(queueName, autoAck);
if (result == null) {
// No message available at this time.
} else {
var props = result.BasicProperties;
ReadOnlyMemory<byte> body = result.Body;
...
上面的示例使用了手动确认(autoAck = false),因此应用程序还必须在处理后调用 IChannel.BasicAckAsync 来确认传递。
...
// acknowledge receipt of the message
await channel.BasicAckAsync(result.DeliveryTag, false);
}
请注意,使用此 API 获取消息的效率相对较低。如果您希望 RabbitMQ 将消息推送到客户端,请参阅下一节。
消费者的并发注意事项
库用户需要考虑许多与并发相关的方面。
在线程之间共享通道
应避免在多个线程之间同时使用 IChannel 实例。应用程序代码应维护 IChannel 实例的明确线程所有权概念。
这是
发布者的硬性要求:共享一个通道(IChannel 实例)进行并发发布将导致协议级别的不正确的帧交错。通道实例
不得被发布线程共享。
如果多个线程需要访问特定的 IChannel 实例,应用程序应强制执行互斥。实现此目的的一种方法是让 IChannel 的所有用户 lock 实例本身
IChannel ch = RetrieveSomeSharedIChannelInstance();
await _channelSemaphore.WaitAsync();
try
{
ch.BasicPublishAsync(...);
}
finally
{
_channelSemaphore.Release();
}
IChannel 操作串行化不正确的症状包括但不限于
涉及在线程之间共享通道的消费应尽可能避免,但可以安全地进行。
消费者,包括基于 TPL 的消费者,如果可以多线程或在其内部使用线程池,则必须对共享通道上的确认操作强制执行互斥。
每个连接的任务使用
在当前实现中,每个 IConnection 实例都由一个 Task 支持,该任务从套接字读取并将产生的事件分派给应用程序。如果启用了心跳,它们将为每个连接使用一对 .NET 定时器。
因此,通常,使用此库的应用程序中至少会有两个 Task 实例处于活动状态
- 应用程序线程(“主”
Task) 包含应用程序逻辑,并调用
IChannel方法执行协议操作。- I/O 活动
Task实例 隐藏且完全由
IConnection实例管理。
应用程序在注册到库的任何回调中可以看到线程模型性质的一个地方。此类回调包括
- 任何
IAsyncBasicConsumer方法 IChannel上的BasicReturn事件IConnection、IChannel等的各种关闭事件中的任何一个。
消费者回调、并发和操作排序
消费者操作分派是并发的吗?
默认情况下,IAsyncBasicConsumer 回调是按顺序(并发度为一)调用的。
要实现并发分派入站消费者传递,请将 ConnectionFactory.ConsumerDispatchConcurrency 设置为大于一的值。
消息排序保证
同一通道上的消费者事件保证按接收顺序分派。
例如,如果消息 A 和 B 按此顺序在同一通道上传递,它们将按此顺序分派给消费者(特定的 IAsyncBasicConsumer 实例)。
如果消息 A 和 B 在不同的通道上传递,它们可以按任何顺序(或并行)分派给消费者。
当并发度为一时,同一通道上的传递将按顺序处理。当并发度较高时,它们的传递将按相同顺序进行,但实际处理可能并行发生(取决于可用核心数和应用程序运行时),这可能导致并发风险。
一次性确认多个传递
消费者可以一次性确认多个传递。当消费者分派并发度大于一时,这可能导致双重确认,这被认为是协议中的错误。
因此,对于并发消费者分派,消费者应一次只确认一个传递。
同一通道上的消费者和操作
消费者事件处理程序可以调用同一通道上的操作(如 IChannel.QueueDeclareAsync 或 IChannel.BasicCancelAsync),而不会死锁。
处理无法路由的消息
如果消息以设置了“mandatory”标志的方式发布,但无法传递,则代理将将其退回给发送客户端(通过 basic.return AMQP 0-9-1 命令)。
要接收此类退回通知,客户端可以订阅 IChannel.BasicReturn 事件。如果没有附加到事件的监听器,则退回的消息将被静默丢弃。
channel.BasicReturn += (sender, ea) => {
...
};
例如,当客户端将设置了“mandatory”标志的消息发布到未绑定到队列的“direct”类型交换器时,将触发 BasicReturn 事件。
网络故障自动恢复
连接恢复
客户端和 RabbitMQ 节点之间的网络连接可能会失败。RabbitMQ .NET/C# 客户端支持连接和拓扑(队列、交换器、绑定和消费者)的自动恢复。此功能有某些限制,将在本指南后面介绍。
自动恢复过程执行以下步骤
- 重新连接
- 恢复连接侦听器
- 重新打开通道
- 恢复通道侦听器
- 恢复通道
basic.qos设置、发布者确认和事务设置
拓扑恢复在上述操作完成后开始。对于连接失败时已知的每个打开的通道,将执行以下步骤
- 重新声明交换器(预定义交换器除外)
- 重新声明队列
- 恢复所有绑定
- 恢复所有消费者
要启用自动连接恢复,请将 ConnectionFactory.AutomaticRecoveryEnabled 设置为 true
ConnectionFactory factory = new ConnectionFactory();
factory.AutomaticRecoveryEnabled = true;
// connection that will recover automatically
IConnection conn = await factory.CreateConnectionAsync();
如果因异常(例如,RabbitMQ 节点仍无法访问)导致恢复失败,将在固定时间间隔后重试(默认 5 秒)。可以配置间隔
ConnectionFactory factory = new ConnectionFactory();
// attempt recovery every 10 seconds
factory.NetworkRecoveryInterval = TimeSpan.FromSeconds(10);
何时触发连接恢复?
如果启用了自动连接恢复,它将由以下事件触发
- 连接的 I/O 循环中抛出 I/O 异常
- 套接字读取操作超时
- 检测到丢失的服务器心跳
- 连接的 I/O 循环中抛出任何其他意外异常
以先发生的为准。
如果与 RabbitMQ 节点的初始连接失败,自动连接恢复将不会启动。应用程序开发人员负责重试此类连接、记录失败的尝试、实现重试次数限制等。这是一个非常基本的示例
ConnectionFactory factory = new ConnectionFactory();
// configure various connection settings
try {
IConnection conn = await factory.CreateConnectionAsync();
} catch (RabbitMQ.Client.Exceptions.BrokerUnreachableException e) {
await Task.Delay(5000);
// apply retry logic
}
当应用程序通过 Connection.CloseAsync 方法关闭连接时,将不会启动连接恢复。
通道级别的异常不会触发任何类型的恢复,因为它们通常表示应用程序中的语义问题(例如,尝试从不存在的队列进行消费)。
对发布的影响
当连接断开时使用 IChannel.BasicPublishAsync 发布的消息将丢失。客户端不会在连接恢复后将它们排队进行传递。为了确保发布的消息到达 RabbitMQ,应用程序需要使用发布者确认并考虑连接失败。
拓扑恢复
拓扑恢复包括交换器、队列、绑定和消费者的恢复。它默认启用,但可以禁用
ConnectionFactory factory = new ConnectionFactory();
factory.AutomaticRecoveryEnabled = true;
factory.TopologyRecoveryEnabled = false;
IConnection conn = await factory.CreateConnectionAsync();
故障检测和恢复限制
自动连接恢复有一些应用程序开发人员需要了解的限制和有意设计决策。
当连接断开或丢失时,需要时间来检测。因此,存在一个时间窗口,在此期间库和应用程序都不知道实际的连接失败。在此期间发布的任何消息都将按常规序列化并写入 TCP 套接字。它们的传递到代理只能通过发布者确认来保证:AMQP 0-9-1 中的发布在设计上是完全异步的。
当启用了自动恢复的连接检测到套接字或 I/O 操作错误时,恢复将在可配置的延迟(默认为 5 秒)后开始。此设计假定即使许多网络故障是暂时的且通常短暂的,它们也不会立即消失。连接恢复尝试将以相同的时间间隔继续进行,直到成功建立新连接。
当连接处于恢复状态时,对其实例上的任何发布尝试都将被拒绝并引发异常。客户端目前不执行此类出站消息的内部缓冲。由应用程序开发人员负责跟踪这些消息并在恢复成功后重新发布它们。发布者确认是一个协议扩展,应该被无法承受消息丢失的发布者使用。
当通道因通道级别异常而关闭时,连接恢复将不会启动。
这些异常通常表示应用程序级别的问
题。库无法确定是否属于这种情况,也无法做出关于可以采取哪些措施来恢复的明智决定。
即使在连接恢复启动后,已关闭的通道也不会恢复。这包括显式关闭的通道以及上述通道级别异常的情况。
手动确认和自动恢复
当使用手动确认时,有可能在消息传递和确认之间,到 RabbitMQ 节点的网络连接失败。连接恢复后,RabbitMQ 将重置所有通道上的传递标签。
这意味着使用旧传递标签的 basic.ack、basic.nack 和 basic.reject 将导致通道异常。为避免这种情况,RabbitMQ .NET 客户端会跟踪并更新传递标签,使其在恢复之间单调增长。
IChannel.BasicAckAsync、IChannel.BasicNackAsync 和 IChannel.BasicRejectAsync 然后将调整后的传递标签转换为 RabbitMQ 使用的标签。
带有过时传递标签的确认将不会被发送。使用手动确认和自动恢复的应用程序必须能够处理重新传递。
OAuth 2 支持
客户端可以与 OAuth 2 服务器进行身份验证,例如 UAA。必须在服务器端打开OAuth 2 插件并配置为使用与客户端相同的 OAuth 2 服务器。本节假定您使用的是 最新主版本的 OAuth2 客户端库。
获取 OAuth 2 令牌
.NET 客户端提供了 OAuth2ClientCredentialsProvider 类,使用 OAuth 2 客户端凭据流获取 JWT 令牌。客户端在打开连接时将访问令牌发送到密码字段。然后,代理在授权连接并授予对请求的虚拟主机的访问权限之前,验证访问令牌的签名、有效性和权限。
using RabbitMQ.Client.OAuth2;
var tokenEndpointUri = new Uri("http://somedomain.com/token");
var oAuth2Client = new OAuth2ClientBuilder("client_id", "client_secret", tokenEndpointUri).Build();
ICredentialsProvider credentialProvider = new OAuth2ClientCredentialsProvider("prod-uaa-1", oAuth2Client);
var connectionFactory = new ConnectionFactory
{
CredentialsProvider = credentialsProvider
};
var connection = await connectionFactory.CreateConnectionAsync();
在生产环境中,请确保对令牌端点 URI 使用 HTTPS,并为 HttpClient 适当地配置 HttpClientHandler
HttpClientHandler httpClientHandler = buildHttpClientHandlerWithTLSEnabled();
var tokenEndpointUri = new Uri("https://somedomain.com/token");
var oAuth2ClientBuilder = new OAuth2ClientBuilder("client_id", "client_secret", tokenEndpointUri)
oAuth2ClientBuilder.SetHttpClientHandler(httpClientHandler);
var oAuth2Client = await oAuth2ClientBuilder.BuildAsync();
ICredentialsProvider credentialsProvider = new OAuth2ClientCredentialsProvider("prod-uaa-1", oAuth2Client);
var connectionFactory = new ConnectionFactory
{
CredentialsProvider = credentialsProvider
};
var connection = await connectionFactory.CreateConnectionAsync();
注意:如果您的授权服务器需要除规范要求之外的其他请求参数,您可以将 <key, value> 对添加到 Dictionary 并将其传递给 OAuth2ClientCredentialsProvider 构造函数,而不是像上面那样传递给 EMPTY。
刷新令牌
当令牌过期时,代理将拒绝通过连接进行进一步的操作。可以在令牌过期前调用 ICredentialsProvider#GetCredentialsAsync() 并将新令牌发送到服务器。这对应用程序来说并不方便,因此 .NET 客户端提供了 CredentialsRefresher 来提供帮助。
有关如何使用 CredentialsRefresher 类的信息,请参阅 TestOAuth2 类。
CredentialsRefresher 会在令牌有效时间的 2/3 之后安排一次刷新。例如,如果令牌在 60 分钟后过期,则在 40 分钟后刷新。