.NET/C# 客户端 API 指南
概述
本指南涵盖了RabbitMQ .NET/C# 客户端及其公共 API。它假设使用了客户端的最新主版本,并且读者熟悉基础知识。
本指南的关键部分包括
- 依赖项
- 公共 API 中的重要接口和类
- 限制
- 连接到 RabbitMQ
- 连接和通道生命周期
- 客户端提供的连接名称
- 使用交换机和队列
- 发布消息
- 使用订阅进行消费 和 消费者内存安全
- 异步消费者实现
- 并发注意事项和安全性
- 从网络故障中自动恢复
- OAuth 2 支持
一个API 参考 可单独获得。
.NET 版本要求
该库的 6.x 版本系列需要 .NET 4.6.1+ 或 .NET Standard 2.0+ 实现。对于 5.x 版本,要求是.NET 4.5.1+ 或 .NET Standard 1.5+ 实现。
许可证
该库是开源的,在GitHub 上开发,并根据以下两种许可证双重许可:
这意味着用户可以认为该库已获得上述任何许可证的许可。例如,用户可以选择 Apache 公共许可证 2.0 并将此客户端包含到商业产品中。
依赖项
客户端有一些依赖项
System.Memory
4.5.xSystem.Threading.Channels
4.7.x
使用相同依赖项的不同版本的应用程序应使用程序集版本重定向,自动 或显式重定向。
主要命名空间、接口和类
客户端 API 紧密地基于AMQP 0-9-1 协议模型,并增加了额外的抽象以方便使用。
一个API 参考 可单独获得。
核心 API 接口和类在 RabbitMQ.Client
命名空间中定义
using RabbitMQ.Client;
核心 API 接口和类包括
IModel
:表示 AMQP 0-9-1 通道,并提供大多数操作(协议方法)IConnection
:表示 AMQP 0-9-1 连接ConnectionFactory
:构造IConnection
实例IBasicConsumer
:表示消息消费者
其他有用的接口和类包括
DefaultBasicConsumer
:消费者常用的基类
除了 RabbitMQ.Client
之外的公共命名空间包括
RabbitMQ.Client.Events
:客户端库中各种事件和事件处理程序,包括EventingBasicConsumer
,一个围绕 C# 事件处理程序构建的消费者实现。RabbitMQ.Client.Exceptions
:用户可见的异常。
所有其他命名空间都保留用于库的私有实现细节,尽管私有命名空间的成员通常会提供给使用该库的应用程序,以便开发人员能够为他们在库实现中发现的故障和差距实施解决方法。应用程序不能依赖于私有命名空间中出现的任何类、接口、成员变量等在库的版本之间保持稳定。
限制
此客户端不支持无符号 64 位整数,该整数以 ulong
类型表示。尝试编码 ulong
值将引发异常。请注意,支持有符号 64 位整数。
这部分是由于AMQP 0-9-1 规范中的类型标记歧义,部分是由于其他流行客户端支持的类型列表。
连接到 RabbitMQ
在应用程序可以使用 RabbitMQ 之前,它必须打开到 RabbitMQ 节点的连接。然后将使用该连接执行所有后续操作。连接旨在保持长期存在。为每个操作(例如发布消息)打开一个连接效率非常低,并且强烈不建议这样做。
要使用 .NET 客户端打开连接,首先实例化一个 ConnectionFactory
并将其配置为使用所需的 hostname、虚拟主机、凭据、TLS 设置 和任何其他所需的参数。
然后调用 ConnectionFactory.CreateConnection()
方法打开连接。可以在服务器日志中观察成功的和不成功的客户端连接事件。
以下两个代码片段使用使用 hostName
属性配置的 hostname 连接到 RabbitMQ 节点
ConnectionFactory factory = new ConnectionFactory();
// "guest"/"guest" by default, limited to localhost connections
factory.UserName = user;
factory.Password = pass;
factory.VirtualHost = vhost;
factory.HostName = hostName;
IConnection conn = factory.CreateConnection();
ConnectionFactory factory = new ConnectionFactory();
factory.Uri = new Uri("amqp://user:pass@hostName:port/vhost");
IConnection conn = factory.CreateConnection();
使用端点列表
可以指定一个端点列表以在连接时使用。将使用第一个可到达的端点。如果发生连接故障,使用端点列表使应用程序能够连接到其他节点(如果原始节点已关闭)。
要使用多个端点,请向 ConnectionFactory#CreateConnection
提供 AmqpTcpEndpoint
列表。AmqpTcpEndpoint
表示主机名和端口对。
ConnectionFactory factory = new ConnectionFactory();
factory.UserName = "username";
factory.Password = "s3Kre7";
var endpoints = new System.Collections.Generic.List<AmqpTcpEndpoint> {
new AmqpTcpEndpoint("hostname"),
new AmqpTcpEndpoint("localhost")
};
IConnection conn = factory.CreateConnection(endpoints);
由于 .NET 客户端对AMQP 0-9-1 URI 规范 的解释比其他客户端更严格,因此在使用 URI 时必须小心。特别是,主机部分不得省略,并且名称为空的虚拟主机不可寻址。
所有工厂属性都具有默认值。如果属性在创建连接之前未分配,则将使用该属性的默认值
属性 | 默认值 |
用户名 | "guest" |
密码 | "guest" |
虚拟主机 | "/" |
主机名 | "localhost" |
端口 | 对于常规(“纯 TCP”)连接为 |
请注意,用户 guest 默认只能从 localhost 连接。这是为了限制在生产系统中使用众所周知的凭据。
然后可以使用 IConnection
接口打开通道
IModel channel = conn.CreateModel();
现在可以使用该通道发送和接收消息,如后续部分所述。
与连接一样,通道也旨在保持长期存在。为每个操作打开一个新通道效率会非常低,并且强烈不建议这样做。但是,通道的寿命可能比连接短。例如,某些协议错误将自动关闭通道。如果应用程序可以从这些错误中恢复,则可以打开一个新通道并重试操作。
这在通道指南以及其他指南(如消费者确认)中进行了更详细的介绍。
断开与 RabbitMQ 的连接
要断开连接,只需关闭通道和连接即可
channel.Close();
conn.Close();
处理通道和连接对象是不够的,必须使用上面示例中的 API 方法显式关闭它们。
请注意,关闭通道可能被认为是最佳实践,但此处并非严格必要 - 当底层连接关闭时,它将自动执行。
可以在服务器节点日志中观察客户端断开连接事件。
连接和通道生命周期
连接旨在保持长期存在。底层协议旨在针对长期运行的连接进行优化。这意味着为每个操作(例如发布的消息)打开一个新连接是不必要的,并且强烈不建议这样做,因为它会引入大量网络往返和开销。
通道也旨在保持长期存在,但由于许多可恢复的协议错误会导致通道关闭,因此通道的寿命可能比其连接短。为每个操作关闭和打开新通道通常是不必要的,但可能是合适的。如有疑问,请首先考虑重用通道。
通道级异常(例如尝试从不存在的队列中消费)将导致通道关闭。关闭的通道无法再使用,也不会再从服务器接收任何事件(例如消息传递)。RabbitMQ 将记录通道级异常,并将启动通道的关闭序列(请参见下文)。
客户端提供的连接名称
RabbitMQ 节点对其客户端的信息有限
- 它们的 TCP 端点(源 IP 地址和端口)
- 使用的凭据
仅凭此信息就可能导致识别应用程序和实例出现问题,尤其是在凭据可以共享且客户端通过负载均衡器连接但无法启用代理协议时。
为了更轻松地在服务器日志和管理 UI中识别客户端,包括 RabbitMQ .NET 客户端在内的 AMQP 0-9-1 客户端连接可以提供自定义标识符。如果设置了标识符,它将出现在日志条目和管理 UI 中。该标识符称为客户端提供的连接名称。该名称可用于识别应用程序或应用程序内的特定组件。该名称是可选的;但是,强烈建议开发人员提供一个名称,因为它可以大大简化某些操作任务。
RabbitMQ .NET 客户端提供了一个连接工厂属性,ConnectionFactory.ClientProvidedName
,如果设置了该属性,则会控制此工厂打开的所有新连接的客户端提供的连接名称。
以下是用作示例的修改后的连接,它提供了这样的名称
ConnectionFactory factory = new ConnectionFactory();
// "guest"/"guest" by default, limited to localhost connections
factory.UserName = user;
factory.Password = pass;
factory.VirtualHost = vhost;
factory.HostName = hostName;
// this name will be shared by all connections instantiated by
// this factory
factory.ClientProvidedName = "app:audit component:event-consumer";
IConnection conn = factory.CreateConnection();
使用交换机和队列
客户端应用程序使用交换机和队列,这是协议的高级构建块。在使用这些构建块之前,必须先“声明”它们。声明这两种对象类型中的任何一种都只是确保存在一个具有该名称的对象,如果不存在则创建它。
继续前面的示例,以下代码声明了一个交换机和一个队列,然后将它们绑定在一起。
channel.ExchangeDeclare(exchangeName, ExchangeType.Direct);
channel.QueueDeclare(queueName, false, false, false, null);
channel.QueueBind(queueName, exchangeName, routingKey, null);
这将主动声明以下对象
- 一个非持久、非自动删除的“direct”类型交换机
- 一个非持久、非自动删除、非排他的队列
可以通过使用其他参数来定制交换机。上述代码然后使用给定的路由键将队列绑定到交换机。
许多通道 API(IModel
)方法都已重载。ExchangeDeclare
的便捷简写形式使用合理的默认值。还有一些带有更多参数的较长形式,以便您根据需要覆盖这些默认值,在需要时提供完全控制。
此“简写形式,长写形式”模式在整个 API 中使用。
被动声明
队列和交换机可以“被动”声明。被动声明只是检查具有提供的名称的实体是否存在。如果存在,则操作将为无操作。对于队列,成功的被动声明将返回与非被动声明相同的信息,即队列中处于就绪状态的消费者和消息的数量。
如果实体不存在,则操作将失败并出现通道级异常。此后无法使用该通道。应打开一个新通道。通常使用一次性(临时)通道进行被动声明。
IModel#QueueDeclarePassive
和 IModel#ExchangeDeclarePassive
是用于被动声明的方法。以下示例演示了 IModel#QueueDeclarePassive
var response = channel.QueueDeclarePassive("queue-name");
// returns the number of messages in Ready state in the queue
response.MessageCount;
// returns the number of consumers the queue has
response.ConsumerCount;
IModel#ExchangeDeclarePassive
的返回值不包含任何有用的信息。因此,如果方法返回并且没有发生通道异常,则表示交换机确实存在。
带可选响应的操作
某些常见操作也具有“不等待”版本,该版本不会等待服务器响应。例如,要声明一个队列并指示服务器不发送任何响应,请使用
channel.QueueDeclareNoWait(queueName, true, false, false, null);
“不等待”版本效率更高,但提供的安全保证较低,例如,它们更多地依赖于心跳机制来检测失败的操作。如有疑问,请从标准版本开始。“不等待”版本仅在拓扑(队列、绑定)变化率很高的场景中需要。
删除实体和清除消息
可以显式删除队列或交换机
channel.QueueDelete("queue-name", false, false);
只有在队列为空时才能删除它
channel.QueueDelete("queue-name", false, true);
或者如果它未被使用(没有任何消费者)
channel.QueueDelete("queue-name", true, false);
可以清除队列(删除其所有消息)
channel.QueuePurge("queue-name");
发布消息
要将消息发布到交换机,请使用 IModel.BasicPublish
,如下所示
byte[] messageBodyBytes = System.Text.Encoding.UTF8.GetBytes("Hello, world!");
channel.BasicPublish(exchangeName, routingKey, null, messageBodyBytes);
为了进行精细控制,您可以使用重载变体来指定强制标志,或指定消息属性
byte[] messageBodyBytes = System.Text.Encoding.UTF8.GetBytes("Hello, world!");
IBasicProperties props = channel.CreateBasicProperties();
props.ContentType = "text/plain";
props.DeliveryMode = 2;
channel.BasicPublish(exchangeName, routingKey, props, messageBodyBytes);
这将发送一个传递模式为 2(持久)且内容类型为“text/plain”的消息。有关可用消息属性的更多信息,请参阅 IBasicProperties
接口的定义。
在以下示例中,我们发布了一个带有自定义标头的消息
byte[] messageBodyBytes = System.Text.Encoding.UTF8.GetBytes("Hello, world!");
IBasicProperties props = channel.CreateBasicProperties();
props.ContentType = "text/plain";
props.DeliveryMode = 2;
props.Headers = new Dictionary<string, object>();
props.Headers.Add("latitude", 51.5252949);
props.Headers.Add("longitude", -0.0905493);
channel.BasicPublish(exchangeName, routingKey, props, messageBodyBytes);
以下代码示例设置了消息过期时间
byte[] messageBodyBytes = System.Text.Encoding.UTF8.GetBytes("Hello, world!");
IBasicProperties props = channel.CreateBasicProperties();
props.ContentType = "text/plain";
props.DeliveryMode = 2;
props.Expiration = "36000000";
channel.BasicPublish(exchangeName, routingKey, props, messageBodyBytes);
通过订阅检索消息(“推送 API”)
接收消息的推荐方法也是最方便的方法是使用 IBasicConsumer
接口设置订阅。然后,消息将在到达时自动传递,而不是必须主动请求。
实现消费者的其中一种方法是使用便捷类 EventingBasicConsumer
,该类将传递和其他消费者生命周期事件作为 C# 事件分派。
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (ch, ea) =>
{
var body = ea.Body.ToArray();
// copy or deserialise the payload
// and process the message
// ...
channel.BasicAck(ea.DeliveryTag, false);
};
// this consumer tag identifies the subscription
// when it has to be cancelled
string consumerTag = channel.BasicConsume(queueName, false, consumer);
另一种选择是子类化 DefaultBasicConsumer
,根据需要覆盖方法,或直接实现 IBasicConsumer
。通常,您需要实现核心方法 IBasicConsumer.HandleBasicDeliver
。
更复杂的消费者需要实现更多方法。特别是,HandleModelShutdown
会捕获通道/连接关闭。消费者还可以实现 HandleBasicCancelOk
以接收取消通知。
DefaultBasicConsumer
的 ConsumerTag
属性可用于检索服务器生成的消费者标签,在原始 IModel.BasicConsume
调用未提供任何标签的情况下。
您可以使用 IModel.BasicCancel
取消活动消费者
channel.BasicCancel(consumerTag);
调用 API 方法时,您始终通过其消费者标签引用消费者,消费者标签可以是客户端生成的或服务器生成的,如AMQP 0-9-1 规范文档中所述。
消费者内存安全要求
从.NET 客户端 6.0 版开始,消息有效负载使用来自System.Memory
库的System.ReadOnlyMemory<byte>
类型表示。
此库对应用程序何时可以访问只读内存跨度施加了某些限制。
重要提示:消费者接口实现必须在传递处理程序方法返回之前反序列化或复制传递有效负载。保留对有效负载的引用是不安全的:分配给它的内存可以在处理程序返回后的任何时刻被释放。
异步消费者实现
客户端提供了一个面向异步的消费者分派实现。此分派程序只能与异步消费者一起使用,即 IAsyncBasicConsumer
实现。
为了使用此分派程序,请将 ConnectionFactory.DispatchConsumersAsync
属性设置为 true
ConnectionFactory factory = new ConnectionFactory();
// ...
// use async-oriented consumer dispatcher. Only compatible with IAsyncBasicConsumer implementations
factory.DispatchConsumersAsync = true;
然后注册一个实现 IAsyncBasicConsumer
的消费者,例如 AsyncEventingBasicConsumer
或 AsyncDefaultBasicConsumer
var consumer = new AsyncEventingBasicConsumer(channel);
consumer.Received += async (ch, ea) =>
{
var body = ea.Body.ToArray();
// copy or deserialise the payload
// and process the message
// ...
channel.BasicAck(ea.DeliveryTag, false);
await Task.Yield();
};
// this consumer tag identifies the subscription
// when it has to be cancelled
string consumerTag = channel.BasicConsume(queueName, false, consumer);
// ensure we get a delivery
bool waitRes = latch.WaitOne(2000);
获取单个消息(轮询或“拉取 API”)
也可以根据需要检索单个消息(“拉取 API”又名轮询)。这种使用方式非常低效,因为它实际上是在轮询,即使绝大多数请求没有产生结果,应用程序也必须重复请求结果。因此,强烈建议不要使用这种方法。
要“拉取”一条消息,请使用 IModel.BasicGet
方法。返回值是 BasicGetResult
的实例,从中可以提取标头信息(属性)和消息正文
bool autoAck = false;
BasicGetResult result = channel.BasicGet(queueName, autoAck);
if (result == null) {
// No message available at this time.
} else {
IBasicProperties props = result.BasicProperties;
ReadOnlyMemory<byte> body = result.Body;
...
以上示例使用手动确认(autoAck = false
),因此应用程序还必须在处理后调用 IModel.BasicAck
来确认传递
...
// acknowledge receipt of the message
channel.BasicAck(result.DeliveryTag, false);
}
请注意,使用此 API 获取消息效率相对较低。如果您希望 RabbitMQ 将消息推送到客户端,请参阅下一节。
消费者的并发注意事项
库用户需要考虑许多与并发相关的主题。
在线程之间共享通道
应避免多个线程同时使用 IModel
实例。应用程序代码应维护 IModel
实例的线程所有权的清晰概念。
这是发布者的硬性要求:共享一个通道(一个 IModel
实例)以进行并发发布会导致协议级别出现不正确的帧交错。发布线程不得共享通道实例。
如果多个线程需要访问特定的 IModel
实例,则应用程序应强制互斥。实现此目的的一种方法是让 IModel
的所有用户都对实例本身进行锁定
IModel ch = RetrieveSomeSharedIModelInstance();
lock (ch) {
ch.BasicPublish(...);
}
IModel
操作序列化不正确的一些症状包括(但不限于):
如果可能,应避免涉及在线程之间共享通道的消费,但可以安全地进行。
可以多线程或在内部使用线程池的消费者(包括基于 TPL 的消费者)必须对共享通道上的确认操作使用互斥。
每个连接的线程使用
在当前实现中,每个 IConnection
实例都由一个读取套接字并将结果事件分派到应用程序的单个后台线程支持。如果启用了心跳,则每个连接将使用一对 .NET 计时器。
因此,通常在使用此库的应用程序中至少有两个线程处于活动状态
- 应用程序线程
包含应用程序逻辑,并对
IModel
方法进行调用以执行协议操作。- I/O 活动线程
隐藏并完全由
IConnection
实例管理。
应用程序可以感知线程模型性质的一个地方是在应用程序向库注册的任何回调中。此类回调包括
- 任何
IBasicConsumer
方法 IModel
上的BasicReturn
事件IConnection
、IModel
等上的各种关闭事件。
消费者回调、并发和操作顺序
消费者操作调度是否并发?
默认情况下,IBasicConsumer
回调按顺序(并发度为 1)被调用。
要并发调度入站消费者传递,请将ConnectionFactory.ConsumerDispatchConcurrency
设置为大于 1 的值。
消息顺序保证
保证在同一通道上的消费者事件按照接收到的顺序进行调度。
例如,如果消息 A 和 B 按此顺序传递到同一通道,则它们将按此顺序调度到消费者(特定的 IBasicConsumer
实例)。
如果消息 A 和 B 在不同的通道上传递,则可以按任何顺序(或并行)将它们调度到消费者。
并发度为 1 时,同一通道上的传递将按顺序处理。使用更高的并发度,它们的调度将按相同的顺序进行,但实际处理可以并行进行(取决于可用内核和应用程序运行时),这可能导致并发风险。
一次确认多个传递
消费者可以确认一次多个传递。当消费者调度并发度大于 1 时,这可能导致双重确认,这被认为是协议中的错误。
因此,在消费者并发调度的情况下,消费者应一次仅确认一个传递。
消费者和同一通道上的阻塞操作
消费者事件处理程序可以在同一通道上调用阻塞操作(例如 IModel.QueueDeclare
或 IModel.BasicCancel
)而不会死锁。
处理不可路由消息
如果发布的消息设置了“mandatory”标志,但无法传递,则代理会将其返回给发送客户端(通过 basic.return
AMQP 0-9-1 命令)。
要收到此类返回通知,客户端可以订阅 IModel.BasicReturn
事件。如果没有侦听器附加到该事件,则返回的消息将被静默丢弃。
channel.BasicReturn += (sender, ea) => {
...
};
例如,如果客户端将设置了“mandatory”标志的消息发布到未绑定到队列的“direct”类型交换机,则 BasicReturn
事件将触发。
自动从网络故障中恢复
连接恢复
客户端和 RabbitMQ 节点之间的网络连接可能会失败。RabbitMQ .NET/C# 客户端支持自动恢复连接和拓扑(队列、交换机、绑定和消费者)。此功能有一些限制,将在本指南的后面部分介绍。
自动恢复过程执行以下步骤
- 重新连接
- 恢复连接侦听器
- 重新打开通道
- 恢复通道侦听器
- 恢复通道
basic.qos
设置、发布者确认和事务设置
在完成上述操作后,拓扑恢复开始。对于连接失败时已知打开的每个通道,都会执行以下步骤
- 重新声明交换机(预定义的交换机除外)
- 重新声明队列
- 恢复所有绑定
- 恢复所有消费者
要启用自动连接恢复,请将 ConnectionFactory.AutomaticRecoveryEnabled
设置为 true
ConnectionFactory factory = new ConnectionFactory();
factory.AutomaticRecoveryEnabled = true;
// connection that will recover automatically
IConnection conn = factory.CreateConnection();
如果由于异常导致恢复失败(例如,RabbitMQ 节点仍无法访问),则将在固定时间间隔(默认为 5 秒)后重试。该间隔可以配置
ConnectionFactory factory = new ConnectionFactory();
// attempt recovery every 10 seconds
factory.NetworkRecoveryInterval = TimeSpan.FromSeconds(10);
何时触发连接恢复?
如果已启用,则以下事件将触发自动连接恢复
- 连接的 I/O 循环中抛出 I/O 异常
- 套接字读取操作超时
- 检测到服务器心跳丢失
- 连接的 I/O 循环中抛出任何其他意外异常
以先发生的为准。
如果客户端最初连接到 RabbitMQ 节点失败,则不会启动自动连接恢复。应用程序开发人员负责重试此类连接、记录失败的尝试、实施重试次数限制等。以下是一个非常基本的示例
ConnectionFactory factory = new ConnectionFactory();
// configure various connection settings
try {
IConnection conn = factory.CreateConnection();
} catch (RabbitMQ.Client.Exceptions.BrokerUnreachableException e) {
Thread.Sleep(5000);
// apply retry logic
}
通过 Connection.Close
方法由应用程序关闭连接时,不会启动连接恢复。
通道级异常不会触发任何类型的恢复,因为它们通常表示应用程序中的语义问题(例如,尝试从不存在的队列中使用)。
对发布的影响
连接断开时使用 IModel.BasicPublish
发布的消息将丢失。连接恢复后,客户端不会将其排队以进行传递。为了确保已发布的消息到达 RabbitMQ,应用程序需要使用发布者确认并考虑连接故障。
拓扑恢复
拓扑恢复涉及恢复交换机、队列、绑定和消费者。默认情况下启用,但可以禁用
ConnectionFactory factory = new ConnectionFactory();
IConnection conn = factory.CreateConnection();
factory.AutomaticRecoveryEnabled = true;
factory.TopologyRecoveryEnabled = false;
故障检测和恢复限制
自动连接恢复有一些限制和有意设计的决策,应用程序开发人员需要了解。
当连接断开或丢失时,需要一段时间才能检测到。因此,存在一段时间窗口,在此期间库和应用程序都无法感知有效的连接故障。在此时间范围内发布的任何消息都将像往常一样序列化并写入 TCP 套接字。它们的传递到代理只能通过发布者确认来保证:根据设计,AMQP 0-9-1 中的发布完全是异步的。
当启用自动恢复的连接检测到套接字或 I/O 操作错误时,恢复将在可配置的延迟后开始,默认为 5 秒。此设计假设即使许多网络故障是短暂的且通常持续时间很短,它们也不会立即消失。连接恢复尝试将以相同的时间间隔继续,直到成功打开新连接。
当连接处于恢复状态时,在其通道上尝试的任何发布都将被异常拒绝。客户端目前不会对此类传出消息执行任何内部缓冲。跟踪此类消息并在恢复成功时重新发布它们是应用程序开发人员的责任。发布者确认是发布者无法承受消息丢失时应使用的协议扩展。
当由于通道级异常关闭通道时,连接恢复不会启动。此类异常通常表示应用程序级问题。库无法就何时出现这种情况做出明智的决定。
即使连接恢复启动,也不会恢复已关闭的通道。这包括显式关闭的通道和上述通道级异常情况。
手动确认和自动恢复
当使用手动确认时,消息传递和确认之间可能发生到 RabbitMQ 节点的网络连接故障。连接恢复后,RabbitMQ 将重置所有通道上的传递标记。
这意味着使用旧传递标记的 basic.ack
、basic.nack
和 basic.reject
将导致通道异常。为了避免这种情况,RabbitMQ .NET 客户端会跟踪并更新传递标记,使其在恢复之间单调递增。
然后,IModel.BasicAck
、IModel.BasicNack
和 IModel.BasicReject
将调整后的传递标记转换为 RabbitMQ 使用的传递标记。
不会发送带有过时传递标记的确认。使用手动确认和自动恢复的应用程序必须能够处理重新传递。
OAuth 2 支持
客户端可以对像UAA这样的 OAuth 2 服务器进行身份验证。OAuth 2 插件必须在服务器端启用并配置为使用与客户端相同的 OAuth 2 服务器。本节假设使用了OAuth2 客户端库的最新主要版本。
获取 OAuth 2 令牌
.Net 客户端提供 OAuth2ClientCredentialsProvider
类,用于使用OAuth 2 客户端凭据流获取 JWT 令牌。客户端在打开连接时将访问令牌发送到密码字段。然后,代理在授权连接并授予对请求的虚拟主机的访问权限之前,会验证访问令牌的签名、有效性和权限。
using RabbitMQ.Client.OAuth2;
var tokenEndpointUri = new Uri("https://somedomain.com/token");
var oAuth2Client = new OAuth2ClientBuilder("client_id", "client_secret", tokenEndpointUri).Build();
ICredentialsProvider credentialProvider = new OAuth2ClientCredentialsProvider("prod-uaa-1", oAuth2Client);
var connectionFactory = new ConnectionFactory {
CredentialsProvider = credentialsProvider
};
var connection = connectionFactory.CreateConnection();
在生产环境中,请确保对令牌端点 URI 使用 HTTPS,并为 HttpClient
适当地配置 HttpClientHandler
HttpClientHandler httpClientHandler = buildHttpClientHandlerWithTLSEnabled();
var tokenEndpointUri = new Uri("https://somedomain.com/token");
var oAuth2ClientBuilder = new OAuth2ClientBuilder("client_id", "client_secret", tokenEndpointUri)
oAuth2ClientBuilder.SetHttpClientHandler(httpClientHandler);
var oAuth2Client = oAuth2ClientBuilder.Build();
ICredentialsProvider credentialsProvider = new OAuth2ClientCredentialsProvider("prod-uaa-1", oAuth2Client);
var connectionFactory = new ConnectionFactory {
CredentialsProvider = credentialsProvider
};
var connection = connectionFactory.CreateConnection();
注意:如果您的授权服务器需要超出规范要求的其他请求参数,则可以将 <key, value>
对添加到 Dictionary
中,并将其传递给 OAuth2ClientCredentialsProvider
构造函数,而不是如上所示的 EMPTY
。
刷新令牌
当令牌过期时,代理会拒绝通过连接进行进一步操作。可以在过期前调用 ICredentialsProvider#Refresh()
并将新令牌发送到服务器。这对于应用程序来说并不方便,因此 .Net 客户端提供了 TimerBasedCredentialRefresher
的帮助。此实用程序为接收到的每个令牌计划一个计时器。当计时器到期时,它会引发一个事件,在该事件中,连接会调用 ICredentialsProvider#Refresh()
。
以下代码片段显示了如何创建 TimerBasedCredentialRefresher
实例并在 ConnectionFactory
上设置它
using RabbitMQ.Client.OAuth2;
...
ICredentialsRefresher credentialsRefresher = new TimerBasedCredentialRefresher();
var connectionFactory = new ConnectionFactory {
CredentialsProvider = credentialsProvider,
CredentialsRefresher = credentialsRefresher
};
var connection = connectionFactory.CreateConnection();
TimerBasedCredentialRefresher
会在令牌有效期 2/3 的时间后安排刷新。例如,如果令牌在 60 分钟后过期,则会在 40 分钟后刷新。