.NET/C# 客户端 API 指南
概述
本指南涵盖了 RabbitMQ .NET/C# 客户端 7.0 版本及其公共 API。本指南假设读者使用的是 该客户端的最新主要版本,并且熟悉 基本概念。
本指南的关键章节如下:
- .NET 版本要求
- 公共 API 中的 重要接口和类
- 限制
- 连接到 RabbitMQ
- 连接和通道的生命周期
- 客户端提供的连接名称
- 使用交换机 (Exchange) 和队列 (Queue)
- 发布消息
- 使用订阅进行消费 和 消费者内存安全
- 并发考量与安全性
- 网络故障自动恢复
- OAuth 2 支持
可单独查阅 API 参考文档。
.NET 版本要求
本库的 7.0.x 和 6.8.x 发行系列 要求 .NET 4.6.1+ 或 .NET Standard 2.0+ 实现。
许可协议
本库是开源的,在 GitHub 上开发,采用双重许可协议:
这意味着用户可以将本库视为上述列表中的任意一种许可协议。例如,用户可以选择 Apache Public License 2.0 并将此客户端包含在商业产品中。
主要命名空间、接口和类
客户端 API 紧密遵循 AMQP 0-9-1 协议模型,并为了易用性增加了额外的抽象。
可单独查阅 API 参考文档。
核心 API 接口和类定义在 RabbitMQ.Client 命名空间中。
using RabbitMQ.Client;
核心 API 接口和类包括:
IChannel:表示 AMQP 0-9-1 通道,并提供大多数操作(协议方法)。IConnection:表示 AMQP 0-9-1 连接。ConnectionFactory:用于构建IConnection实例。IAsyncBasicConsumer:表示消息消费者。
其他有用的接口和类包括:
AsyncDefaultBasicConsumer:常用的消费者基类。
除 RabbitMQ.Client 外,其他公共命名空间包括:
RabbitMQ.Client.Events:客户端库中的各种事件和事件处理程序,包括AsyncEventingBasicConsumer(一种基于 C# 事件处理程序构建的消费者实现)。RabbitMQ.Client.Exceptions:用户可见的异常。
所有其他命名空间保留为库的私有实现细节。尽管私有命名空间的成员通常对使用该库的应用程序开放,以允许开发人员实现针对库实现中发现的缺陷和漏洞的变通方法,但应用程序不能依赖私有命名空间中出现的任何类、接口或成员变量,因为它们在不同版本间可能不稳定。
限制
此客户端不支持 ulong 类型表示的无符号 64 位整数。尝试编码 ulong 值将抛出异常。请注意,支持有符号的 64 位整数。
这部分是由于 AMQP 0-9-1 规范中类型标记的歧义,部分原因是 其他流行客户端所支持的类型列表。
连接到 RabbitMQ
在应用程序使用 RabbitMQ 之前,必须打开一个指向 RabbitMQ 节点的 连接。该连接将用于执行所有后续操作。连接应当是长连接。为每个操作(如发布消息)都打开一个连接是非常低效的,且强烈不推荐。
要使用 .NET 客户端打开连接,首先实例化一个 ConnectionFactory 并进行配置,使用所需的主机名、虚拟主机、凭据、TLS 设置以及其他必要参数。
然后 await ConnectionFactory.CreateConnectionAsync() 方法来打开连接。成功和失败的客户端连接事件可以在 服务器日志中观察到。
以下两个代码片段展示了使用 hostName 属性配置的主机名连接到 RabbitMQ 节点:
ConnectionFactory factory = new ConnectionFactory();
// "guest"/"guest" by default, limited to localhost connections
factory.UserName = user;
factory.Password = pass;
factory.VirtualHost = vhost;
factory.HostName = hostName;
IConnection conn = await factory.CreateConnectionAsync();
ConnectionFactory factory = new ConnectionFactory();
factory.Uri = new Uri("amqp://user:pass@hostName:port/vhost");
IConnection conn = await factory.CreateConnectionAsync();
使用端点列表
可以指定连接时使用的端点列表。将使用第一个可达的端点。如果发生 连接故障,使用端点列表可以使应用程序在原始节点宕机时连接到另一个节点。
要使用多个端点,请向 ConnectionFactory#CreateConnection 提供一个 AmqpTcpEndpoint 列表。AmqpTcpEndpoint 代表一对主机名和端口。
ConnectionFactory factory = new ConnectionFactory();
factory.UserName = "username";
factory.Password = "s3Kre7";
var endpoints = new System.Collections.Generic.List<AmqpTcpEndpoint> {
new AmqpTcpEndpoint("hostname"),
new AmqpTcpEndpoint("localhost")
};
IConnection conn = await factory.CreateConnectionAsync(endpoints);
由于 .NET 客户端比其他客户端更严格地解释 AMQP 0-9-1 URI 规范,因此在使用 URI 时必须小心。特别是,主机部分不能省略,且空名称的虚拟主机无法寻址。
所有工厂属性都有默认值。如果属性在创建连接之前未赋值,将使用其默认值。
| 属性 | 默认值 |
| 用户名 | "guest" |
| 密码 | "guest" |
| 虚拟主机 | "/" |
| 主机名 | "localhost" |
| 端口 | 普通(“纯 TCP”)连接为 |
请注意,默认情况下 guest 用户只能从 localhost 连接。这是为了限制生产系统中广为人知的凭据的使用。
IConnection 接口可用于打开一个 通道。
IChannel channel = await conn.CreateChannelAsync();
通道现在可以用于发送和接收消息,后续章节将对此进行描述。
与连接一样,通道也应当是长连接。为每个操作打开一个新通道是非常低效的,且强烈不推荐。不过,通道的生命周期可以比连接短。例如,某些协议错误会自动关闭通道。如果应用程序可以从中恢复,它们可以打开一个新的通道并重试该操作。
这在 通道指南 以及 消费者确认 等其他指南中有更详细的说明。
断开与 RabbitMQ 的连接
要断开连接,只需关闭通道和连接即可。
await channel.CloseAsync();
await conn.CloseAsync();
await channel.DisposeAsync();
await conn.DisposeAsync();
虽然处理(dispose)通道和连接对象就足够了,但最佳实践是先显式关闭它们。
客户端断开连接事件可以在 服务器节点日志中观察到。
连接和通道的生命周期
连接应当是长期的。底层协议是为长连接设计和优化的。这意味着每个操作(例如发布一条消息)都打开一个新连接是不必要的,并且强烈不推荐,因为它会引入大量的网络往返和开销。
通道也应当是长期的,但由于许多可恢复的协议错误会导致通道关闭,因此通道的生命周期可能比其连接短。通常不需要每个操作都关闭并打开新通道,但有时可能是合适的。如有疑问,请优先考虑复用通道。
通道级异常(例如尝试从不存在的队列中消费)将导致通道关闭。关闭的通道无法再使用,也不会从服务器接收任何更多事件(如消息投递)。通道级异常将由 RabbitMQ 记录,并启动通道的关闭序列(见下文)。
客户端提供的连接名称
RabbitMQ 节点关于其客户端的信息有限:
- 它们的 TCP 端点(源 IP 地址和端口)
- 使用的凭据
仅凭这些信息就可能在识别应用程序和实例时遇到问题,尤其是在凭据可以共享,客户端通过负载均衡器连接,但 Proxy Protocol 无法启用时。
为了更方便地在 服务器日志 和 管理界面 中识别客户端,包括 RabbitMQ .NET 客户端在内的 AMQP 0-9-1 客户端连接可以提供自定义标识符。如果设置了标识符,它将出现在日志条目和管理界面中。该标识符被称为客户端提供的连接名称。该名称可用于标识应用程序或应用程序内的特定组件。名称是可选的;但是,强烈建议开发人员提供一个名称,因为它将显著简化某些操作任务。
RabbitMQ .NET 客户端提供了一个连接工厂属性 ConnectionFactory.ClientProvidedName,如果设置,它将控制由此工厂打开的所有新连接的客户端提供名称。
以下是上述连接示例的修改版本,其中提供了此类名称:
ConnectionFactory factory = new ConnectionFactory();
// "guest"/"guest" by default, limited to localhost connections
factory.UserName = user;
factory.Password = pass;
factory.VirtualHost = vhost;
factory.HostName = hostName;
// this name will be shared by all connections instantiated by
// this factory
factory.ClientProvidedName = "app:audit component:event-consumer";
IConnection conn = await factory.CreateConnectionAsync();
使用交换机和队列
客户端应用程序使用交换机和 队列,这是 协议的高级构建块。这些必须在使用前进行“声明”。声明任何类型的对象都仅仅是确保存在该名称的对象,如有必要则创建它。
继续上一个示例,以下代码声明了一个交换机和一个队列,然后将它们绑定在一起。
await channel.ExchangeDeclareAsync(exchangeName, ExchangeType.Direct);
await channel.QueueDeclareAsync(queueName, false, false, false, null);
await channel.QueueBindAsync(queueName, exchangeName, routingKey, null);
这将主动声明以下对象:
- 一个非持久化、非自动删除的“direct”类型交换机
- 一个非持久化、非自动删除、非排他的队列
交换机可以通过使用其他参数进行自定义。上述代码随后使用给定的路由键将队列绑定到交换机。
许多通道 API (IChannel) 方法都有重载版本。ExchangeDeclare 的便捷短形式使用合理的默认值。也有参数较多的长形式,允许您在必要时覆盖这些默认值,从而在需要时提供完全控制。
这种“短版本、长版本”的模式在整个 API 中都有使用。
被动声明
队列和交换机可以“被动”声明。被动声明只是检查提供的名称的实体是否存在。如果存在,操作即为空操作(no-op)。对于队列,成功的被动声明将返回与非被动声明相同的信息,即队列中消费者数量和 就绪状态 的消息数量。
如果实体不存在,操作会以通道级异常失败。此后该通道无法再使用,应打开一个新通道。通常使用一次性(临时)通道进行被动声明。
IChannel#QueueDeclarePassiveAsync 和 IChannel#ExchangeDeclarePassiveAsync 是用于被动声明的方法。以下示例演示了 IChannel#QueueDeclarePassive。
var response = await channel.QueueDeclarePassiveAsync("queue-name");
// returns the number of messages in Ready state in the queue
response.MessageCount;
// returns the number of consumers the queue has
response.ConsumerCount;
IChannel#ExchangeDeclarePassiveAsync 的返回值不包含任何有用信息。因此,如果方法返回且没有发生通道异常,则意味着该交换机确实存在。
具有可选响应的操作
一些常见操作也有“无需等待”(no wait)版本,不会等待服务器响应。例如,要声明一个队列并指示服务器不发送任何响应,请使用:
await channel.QueueDeclareAsync(queueName, true, false, false, null, noWait: true);
“无需等待”版本效率更高,但提供的安全性保证较低,例如,它们更依赖于 心跳机制 来检测失败的操作。如有疑问,请从标准版本开始。“无需等待”版本仅在拓扑(队列、绑定)频繁变更的场景中才需要。
删除实体和清除消息
可以显式删除队列或交换机。
await channel.QueueDeleteAsync("queue-name", false, false);
仅当队列为空时才能删除它,
await channel.QueueDeleteAsync("queue-name", false, true);
或者当它未被使用时(没有消费者)。
await channel.QueueDeleteAsync("queue-name", true, false);
可以清除队列(删除其中所有消息)。
await channel.QueuePurgeAsync("queue-name");
发布消息
要向交换机发布消息,请使用 IChannel.BasicPublishAsync,如下所示:
byte[] messageBodyBytes = System.Text.Encoding.UTF8.GetBytes("Hello, world!");
var props = new BasicProperties();
await channel.BasicPublishAsync(exchangeName, routingKey, false, props, messageBodyBytes);
为了精细控制,您可以使用重载变体来指定 mandatory 标志,或指定消息属性。
byte[] messageBodyBytes = System.Text.Encoding.UTF8.GetBytes("Hello, world!");
var props = new BasicProperties();
props.ContentType = "text/plain";
props.DeliveryMode = 2;
await channel.BasicPublishAsync(exchangeName, routingKey,
mandatory: true, basicProperties: props, body: messageBodyBytes);
这将发送一条 mandatory:true的消息,其投递模式为 2(持久化),内容类型为 "text/plain"。有关可用消息属性的更多信息,请参阅 IBasicProperties 接口的定义。
在以下示例中,我们发布了一条带有自定义标头的消息:
byte[] messageBodyBytes = System.Text.Encoding.UTF8.GetBytes("Hello, world!");
var props = new BasicProperties();
props.ContentType = "text/plain";
props.DeliveryMode = 2;
props.Headers = new Dictionary<string, object>();
props.Headers.Add("latitude", 51.5252949);
props.Headers.Add("longitude", -0.0905493);
await channel.BasicPublishAsync(exchangeName, routingKey, true, props, messageBodyBytes);
以下代码示例设置了消息过期时间:
byte[] messageBodyBytes = System.Text.Encoding.UTF8.GetBytes("Hello, world!");
var props = new BasicProperties();
props.ContentType = "text/plain";
props.DeliveryMode = 2;
props.Expiration = "36000000";
await channel.BasicPublishAsync(exchangeName, routingKey, true, props, messageBodyBytes);
通过订阅获取消息(“推送 API”)
接收消息的推荐且最方便的方法是使用 IAsyncBasicConsumer 接口设置订阅。这样,消息到达时会自动推送,而不必主动请求。
实现消费者的一种方法是使用便捷类 AsyncEventingBasicConsumer,它将投递和其他消费者生命周期事件作为 C# 事件进行分发。
var consumer = new AsyncEventingBasicConsumer(channel);
consumer.ReceivedAsync += async (ch, ea) =>
{
var body = ea.Body.ToArray();
// copy or deserialise the payload
// and process the message
// ...
await channel.BasicAckAsync(ea.DeliveryTag, false);
};
// this consumer tag identifies the subscription
// when it has to be cancelled
string consumerTag = await channel.BasicConsumeAsync(queueName, false, consumer);
另一个选择是继承 AsyncDefaultBasicConsumer 并根据需要重写方法,或者直接实现 IAsyncBasicConsumer。通常您需要实现核心方法 IAsyncBasicConsumer.HandleBasicDeliverAsync。
更复杂的消费者需要实现更多方法。特别是 HandleChannelShutdown 用于捕获通道/连接关闭。消费者还可以实现 HandleBasicCancelOk 以获取取消通知。
当调用原始 IChannel.BasicConsumeAsync 时未提供消费者标签时,可以使用 AsyncDefaultBasicConsumer 的 ConsumerTag 属性来检索服务器生成的消费者标签。
您可以使用 IChannel.BasicCancelAsync 取消活动的消费者。
await channel.BasicCancelAsync(consumerTag);
调用 API 方法时,始终通过其消费者标签引用消费者,标签可以是客户端生成的,也可以是服务器生成的,如 AMQP 0-9-1 规范 文档中所述。
消费者内存安全要求
从 .NET 客户端 7.0 版本 开始,消息负载使用 System.Memory 库 中的 System.ReadOnlyMemory<byte> 类型表示。
RabbitMQ.Client 库对应用程序何时可以访问只读内存跨度(span)施加了某些限制。
重要提示:消费者接口实现必须在投递处理程序方法返回之前反序列化或复制投递负载。保留对负载的引用是不安全的:分配给它的内存在处理程序返回后的任何时刻都可能被释放。
获取单条消息(轮询或“拉取 API”)
也可以按需获取单条消息(“拉取 API”,即轮询)。这种消费方法非常低效,因为它实际上是在进行轮询,即使绝大多数请求都没有结果,应用程序也必须反复请求。因此,强烈不推荐使用此方法。
要“拉取”消息,请使用 IChannel.BasicGetAsync 方法。返回值是 BasicGetResult 的实例,可以从中提取标头信息(属性)和消息体。
bool autoAck = false;
BasicGetResult result = await channel.BasicGetAsync(queueName, autoAck);
if (result == null) {
// No message available at this time.
} else {
var props = result.BasicProperties;
ReadOnlyMemory<byte> body = result.Body;
...
上面的示例使用 手动确认 (autoAck = false),因此应用程序必须在处理后调用 IChannel.BasicAckAsync 来确认投递。
...
// acknowledge receipt of the message
await channel.BasicAckAsync(result.DeliveryTag, false);
}
请注意,使用此 API 获取消息相对低效。如果您希望 RabbitMQ 将消息推送到客户端,请参阅下一节。
消费者的并发考量
库用户需要考虑许多与并发相关的主题。
在线程间共享通道
应避免多线程同时使用 IChannel 实例。应用程序代码应保持对 IChannel 实例的明确线程所有权概念。
这是发布者的硬性要求:共享一个用于并发发布的通道(IChannel 实例)将导致协议层的帧交错错误。线程不得共享用于发布的通道实例。
如果多个线程需要访问特定的 IChannel 实例,应用程序应强制执行互斥。实现这一点的方法之一是让 IChannel 的所有使用者 lock 实例本身。
IChannel ch = RetrieveSomeSharedIChannelInstance();
await _channelSemaphore.WaitAsync();
try
{
ch.BasicPublishAsync(...);
}
finally
{
_channelSemaphore.Release();
}
IChannel 操作不正确序列化的症状包括但不限于:
涉及在线程间共享通道的消费应尽可能避免,但可以安全地完成。
可以是多线程或内部使用线程池的消费者(包括基于 TPL 的消费者),必须在共享通道上对 确认 操作使用互斥。
每连接任务使用
在当前的实现中,每个 IConnection 实例由一个 Task 支持,该任务从 socket 读取并将结果事件分发给应用程序。如果启用了心跳,它们将为每个连接使用一对 .NET 定时器。
因此,在使用此库的应用程序中,通常至少有两个 Task 实例处于活动状态:
- 应用程序线程(“主”
Task) 包含应用程序逻辑,并调用
IChannel方法来执行协议操作。- I/O 活动
Task实例 由
IConnection实例完全隐藏和管理。
线程模型对应用程序可见的唯一地方是应用程序向库注册的任何回调中。此类回调包括:
- 任何
IAsyncBasicConsumer方法 IChannel上的BasicReturn事件IConnection、IChannel等上的任何各种关闭事件。
消费者回调、并发和操作排序
消费者操作分发是并发的吗?
默认情况下,IAsyncBasicConsumer 回调是按顺序(并发度为 1)调用的。
要并发分发入站消费者投递,请将 ConnectionFactory.ConsumerDispatchConcurrency 设置为大于 1 的值。
消息顺序保证
同一通道上的消费者事件保证按接收顺序进行分发。
例如,如果消息 A 和 B 在同一通道上按此顺序投递,它们将按此顺序分发给消费者(特定的 IAsyncBasicConsumer 实例)。
如果消息 A 和 B 在不同通道上投递,它们可以以任何顺序(或并行)分发给消费者。
并发度为 1 时,同一通道上的投递将按顺序处理。并发度较高时,它们的分发顺序相同,但实际处理可以并行进行(取决于可用核心数和应用程序运行时),这可能会导致并发隐患。
一次性确认多个投递
消费者可以一次 确认 多个投递。当消费者分发并发度高于 1 时,这可能会导致 双重确认,这被视为 协议中的错误。
因此,在并发消费者分发时,消费者应一次仅确认一个投递。
消费者和同一通道上的操作
消费者事件处理程序可以调用同一通道上的操作(如 IChannel.QueueDeclareAsync 或 IChannel.BasicCancelAsync)而不会死锁。
处理不可路由消息
如果消息在发布时设置了 "mandatory" 标志,但无法投递,代理将通过 basic.return AMQP 0-9-1 命令将其退还给发送客户端。
要获取此类退回通知,客户端可以订阅 IChannel.BasicReturn 事件。如果没有侦听器附加到该事件,则退回的消息将被静默丢弃。
channel.BasicReturn += (sender, ea) => {
...
};
例如,如果客户端将设置了 "mandatory" 标志的消息发布到未绑定到任何队列的 "direct" 类型交换机,BasicReturn 事件就会触发。
网络故障自动恢复
连接恢复
客户端与 RabbitMQ 节点之间的网络连接可能会失败。RabbitMQ .NET/C# 客户端支持连接和拓扑(队列、交换机、绑定和消费者)的自动恢复。该功能在本指南稍后部分会介绍一些限制。
自动恢复过程执行以下步骤:
- 重新连接
- 恢复连接侦听器
- 重新打开通道
- 恢复通道侦听器
- 恢复通道
basic.qos设置、发布者确认和事务设置
拓扑恢复在上述操作完成后开始。对连接故障时已知处于打开状态的每个通道执行以下步骤:
- 重新声明交换器(预定义交换器除外)
- 重新声明队列
- 恢复所有绑定
- 恢复所有消费者
要启用自动连接恢复,请将 ConnectionFactory.AutomaticRecoveryEnabled 设置为 true。
ConnectionFactory factory = new ConnectionFactory();
factory.AutomaticRecoveryEnabled = true;
// connection that will recover automatically
IConnection conn = await factory.CreateConnectionAsync();
如果恢复由于异常(例如 RabbitMQ 节点仍然不可达)而失败,它将在固定的时间间隔后重试(默认为 5 秒)。该间隔可以配置。
ConnectionFactory factory = new ConnectionFactory();
// attempt recovery every 10 seconds
factory.NetworkRecoveryInterval = TimeSpan.FromSeconds(10);
何时触发连接恢复?
如果启用,自动连接恢复将由以下事件触发:
- 连接的 I/O 循环中抛出 I/O 异常
- Socket 读取操作超时
- 检测到错过的服务器 心跳
- 连接的 I/O 循环中抛出任何其他意外异常
以先发生者为准。
如果最初连接到 RabbitMQ 节点的客户端连接失败,则不会触发自动连接恢复。应用程序开发人员负责重试此类连接、记录失败尝试、限制重试次数等。这是一个非常基础的示例:
ConnectionFactory factory = new ConnectionFactory();
// configure various connection settings
try {
IConnection conn = await factory.CreateConnectionAsync();
} catch (RabbitMQ.Client.Exceptions.BrokerUnreachableException e) {
await Task.Delay(5000);
// apply retry logic
}
当应用程序通过 Connection.CloseAsync 方法关闭连接时,不会发起连接恢复。
通道级异常不会触发任何类型的恢复,因为它们通常表示应用程序中的语义问题(例如,尝试从不存在的队列中消费)。
对发布的影响
连接断开时使用 IChannel.BasicPublishAsync 发布的消息将会丢失。客户端不会在连接恢复后将它们重新入队以进行投递。为确保发布的消息到达 RabbitMQ,应用程序需要使用 发布者确认 (Publisher Confirms) 并考虑连接故障。
拓扑恢复
拓扑恢复涉及交换机、队列、绑定和消费者的恢复。它默认启用,但可以禁用。
ConnectionFactory factory = new ConnectionFactory();
factory.AutomaticRecoveryEnabled = true;
factory.TopologyRecoveryEnabled = false;
IConnection conn = await factory.CreateConnectionAsync();
故障检测和恢复限制
自动连接恢复有许多限制和有意为之的设计决策,应用程序开发人员需要了解。
当连接中断或丢失时,需要时间进行检测。因此,存在一个时间窗口,在此期间库和应用程序都不知道实际的连接故障。在此期间发布的任何消息都会按通常方式序列化并写入 TCP socket。只有通过 发布者确认 才能保证投递给代理:AMQP 0-9-1 中的发布设计上是完全异步的。
当连接(已启用自动恢复)检测到 socket 或 I/O 操作错误时,恢复会在可配置的延迟后开始(默认为 5 秒)。此设计假设虽然许多网络故障是瞬时的且通常持续时间很短,但它们不会瞬间消失。连接恢复尝试将以固定的时间间隔持续进行,直到成功建立新连接。
当连接处于恢复状态时,在其通道上尝试的任何发布都将因异常而被拒绝。客户端目前不对这些传出消息执行任何内部缓冲。跟踪此类消息并在恢复成功时重新发布是应用程序开发人员的责任。发布者确认 是一种协议扩展,应当被那些无法容忍消息丢失的发布者使用。
当通道因 通道级异常 而关闭时,不会触发连接恢复。
此类异常通常表示应用程序级问题。库无法确定是否如此,也无法对如何恢复做出明智决定。
已关闭的通道即使在连接恢复触发后也不会被恢复。这包括明确关闭的通道和上述通道级异常情况。
手动确认和自动恢复
使用手动确认时,网络连接到 RabbitMQ 节点可能会在消息投递和确认之间发生故障。连接恢复后,RabbitMQ 将重置所有通道上的投递标签。
这意味着使用旧投递标签的 basic.ack、basic.nack 和 basic.reject 将导致通道异常。为了避免这种情况,RabbitMQ .NET 客户端会跟踪并更新投递标签,使它们在恢复之间保持单调增长。
随后,IChannel.BasicAckAsync、IChannel.BasicNackAsync 和 IChannel.BasicRejectAsync 将调整后的投递标签转换为 RabbitMQ 使用的标签。
过期的投递标签确认将不会被发送。使用手动确认和自动恢复的应用程序必须能够处理重复投递。
OAuth 2 支持
客户端可以针对像 UAA 这样的 OAuth 2 服务器进行身份验证。OAuth 2 插件 必须在服务器端开启,并配置为使用与客户端相同的 OAuth 2 服务器。本节假设使用了 最新主要版本的 OAuth2 客户端库。
获取 OAuth 2 令牌
.NET 客户端提供 OAuth2ClientCredentialsProvider 类,用于使用 OAuth 2 客户端凭据流程 (Client Credentials flow) 获取 JWT 令牌。客户端在打开连接时将访问令牌发送到密码字段。随后,代理在授权连接并授予对所请求虚拟主机的访问权限之前,会验证访问令牌的签名、有效性和权限。
using RabbitMQ.Client.OAuth2;
var tokenEndpointUri = new Uri("http://somedomain.com/token");
var oAuth2Client = new OAuth2ClientBuilder("client_id", "client_secret", tokenEndpointUri).Build();
ICredentialsProvider credentialProvider = new OAuth2ClientCredentialsProvider("prod-uaa-1", oAuth2Client);
var connectionFactory = new ConnectionFactory
{
CredentialsProvider = credentialsProvider
};
var connection = await connectionFactory.CreateConnectionAsync();
在生产环境中,请确保对令牌端点 URI 使用 HTTPS,并为 HttpClient 配置适当的 HttpClientHandler。
HttpClientHandler httpClientHandler = buildHttpClientHandlerWithTLSEnabled();
var tokenEndpointUri = new Uri("https://somedomain.com/token");
var oAuth2ClientBuilder = new OAuth2ClientBuilder("client_id", "client_secret", tokenEndpointUri)
oAuth2ClientBuilder.SetHttpClientHandler(httpClientHandler);
var oAuth2Client = await oAuth2ClientBuilder.BuildAsync();
ICredentialsProvider credentialsProvider = new OAuth2ClientCredentialsProvider("prod-uaa-1", oAuth2Client);
var connectionFactory = new ConnectionFactory
{
CredentialsProvider = credentialsProvider
};
var connection = await connectionFactory.CreateConnectionAsync();
注意:如果您的授权服务器需要的额外请求参数超出了规范的要求,您可以将 <key, value> 对添加到 Dictionary 中,并将其传递给 OAuth2ClientCredentialsProvider 构造函数,而不是像上面那样使用 EMPTY 构造函数。
刷新令牌
当令牌过期时,代理会拒绝连接上的进一步操作。可以在过期前调用 ICredentialsProvider#GetCredentialsAsync() 并将新令牌发送到服务器。这对应用程序来说不方便,因此 .NET 客户端通过 CredentialsRefresher 提供帮助。
有关如何使用 CredentialsRefresher 类的信息,请参阅 TestOAuth2 类。
CredentialsRefresher 在令牌有效时间的 2/3 后安排刷新。例如,如果令牌在 60 分钟后过期,它会在 40 分钟后刷新。