跳到主要内容

.NET/C# 客户端 API 指南

概述

本指南涵盖了 RabbitMQ .NET/C# 客户端 7.0 版本及其公共 API。它假设使用了 客户端的最新主要版本,并且读者熟悉 基础知识

本指南的主要章节是

API 参考 可单独获取。

.NET 版本要求

此库的 7.0.x 和 6.8.x 发布系列需要 .NET 4.6.1+ 或 .NET Standard 2.0+ 实现

许可证

该库是开源的,在 GitHub 上 开发,并根据以下双重许可获得许可

这意味着用户可以将该库视为根据上述列表中的任何许可证获得许可。例如,用户可以选择 Apache 公共许可证 2.0 并将此客户端包含到商业产品中。

主要命名空间、接口和类

客户端 API 紧密地建模在 AMQP 0-9-1 协议模型 上,并增加了易用性的抽象。

API 参考 可单独获取。

核心 API 接口和类在 RabbitMQ.Client 命名空间中定义

using RabbitMQ.Client;

核心 API 接口和类是

  • IChannel: 表示 AMQP 0-9-1 通道,并提供大多数操作(协议方法)
  • IConnection: 表示 AMQP 0-9-1 连接
  • ConnectionFactory: 构造 IConnection 实例
  • IAsyncBasicConsumer: 表示消息消费者

其他有用的接口和类包括

  • AsyncDefaultBasicConsumer: 常用的消费者基类

RabbitMQ.Client 之外的公共命名空间包括

  • RabbitMQ.Client.Events: 客户端库的各种事件和事件处理程序,包括 AsyncEventingBasicConsumer,这是一个围绕 C# 事件处理程序构建的消费者实现。
  • RabbitMQ.Client.Exceptions: 用户可见的异常。

所有其他命名空间都保留用于库的私有实现细节,尽管私有命名空间的成员通常可供使用该库的应用程序使用,以便开发人员能够为他们在库实现中发现的错误和缺陷实施解决方法。应用程序不能依赖于出现在私有命名空间中的任何类、接口、成员变量等在库的发布版本之间保持稳定。

局限性

此客户端不支持无符号 64 位整数,在类型 ulong 中表示。尝试编码 ulong 值将抛出异常。请注意,支持有符号 64 位整数。

这部分是由于 AMQP 0-9-1 规范中的类型标记 歧义,部分是由于 其他流行的客户端支持的类型列表

连接到 RabbitMQ

在应用程序可以使用 RabbitMQ 之前,它必须打开与 RabbitMQ 节点的连接。然后,该连接将用于执行所有后续操作。连接旨在长期存在。为每个操作(例如,发布消息)打开连接将非常低效,并且强烈不建议这样做

要使用 .NET 客户端打开连接,首先实例化一个 ConnectionFactory 并将其配置为使用所需的主机名、虚拟主机、凭据、TLS 设置以及任何其他需要的参数。

然后等待 ConnectionFactory.CreateConnectionAsync() 方法打开连接。成功和不成功的客户端连接事件可以在服务器日志中观察到

以下两个代码片段使用使用 hostName 属性配置的主机名连接到 RabbitMQ 节点

ConnectionFactory factory = new ConnectionFactory();
// "guest"/"guest" by default, limited to localhost connections
factory.UserName = user;
factory.Password = pass;
factory.VirtualHost = vhost;
factory.HostName = hostName;

IConnection conn = await factory.CreateConnectionAsync();
ConnectionFactory factory = new ConnectionFactory();
factory.Uri = new Uri("amqp://user:pass@hostName:port/vhost");

IConnection conn = await factory.CreateConnectionAsync();

使用端点列表

可以指定连接时要使用的端点列表。将使用第一个可访问的端点。在连接故障的情况下,使用端点列表使应用程序可以在原始节点关闭时连接到不同的节点。

要使用多个端点,请将 AmqpTcpEndpoint 的列表提供给 ConnectionFactory#CreateConnectionAmqpTcpEndpoint 表示主机名和端口对。

ConnectionFactory factory = new ConnectionFactory();
factory.UserName = "username";
factory.Password = "s3Kre7";

var endpoints = new System.Collections.Generic.List<AmqpTcpEndpoint> {
new AmqpTcpEndpoint("hostname"),
new AmqpTcpEndpoint("localhost")
};
IConnection conn = await factory.CreateConnectionAsync(endpoints);

由于 .NET 客户端使用比其他客户端更严格的 AMQP 0-9-1 URI 规范 解释,因此在使用 URI 时必须小心。特别是,主机部分不得省略,并且具有空名称的虚拟主机不可寻址。

所有工厂属性都有默认值。如果属性在创建连接之前保持未分配状态,则将使用该属性的默认值

属性默认值
用户名"guest"
密码"guest"
虚拟主机"/"
主机名"localhost"
端口

5672 用于常规(“纯 TCP”)连接,5671 用于启用 TLS 的连接

请注意,默认情况下,用户 guest 只能从 localhost 连接。这是为了限制生产系统中众所周知的凭据的使用。

然后,可以使用 IConnection 接口打开通道

IChannel channel = await conn.CreateChannelAsync();

现在可以使用该通道发送和接收消息,如后续章节所述。

就像连接一样,通道也旨在长期存在。为每个操作打开一个新通道将非常低效,并且强烈不建议这样做。但是,通道的生命周期可能比连接短。例如,某些协议错误会自动关闭通道。如果应用程序可以从中恢复,则可以打开一个新通道并重试操作。

这在通道指南以及其他指南(如消费者确认)中进行了更详细的介绍。

断开与 RabbitMQ 的连接

要断开连接,只需关闭通道和连接即可

await channel.CloseAsync();
await conn.CloseAsync();
await channel.DisposeAsync();
await conn.DisposeAsync();

虽然处置通道和连接对象就足够了,但最佳实践是先显式关闭它们。

客户端断开连接事件可以在服务器节点日志中观察到

连接和通道生命周期

连接旨在长期存在。底层协议是为长期运行的连接而设计和优化的。这意味着每个操作(例如,发布的消息)打开一个新连接是不必要的,并且强烈不建议这样做,因为它会引入大量的网络往返和开销。

通道也旨在长期存在,但由于许多可恢复的协议错误会导致通道关闭,因此通道的生命周期可能比其连接的生命周期短。每个操作关闭和打开新通道通常是不必要的,但可能是合适的。如有疑问,请首先考虑重用通道。

通道级异常,例如尝试从不存在的队列进行消费,将导致通道关闭。关闭的通道不能再使用,并且不会再接收来自服务器的任何事件(例如消息传递)。通道级异常将由 RabbitMQ 记录,并将启动通道的关闭序列(见下文)。

客户端提供的连接名称

RabbitMQ 节点具有关于其客户端的有限信息

  • 它们的 TCP 端点(源 IP 地址和端口)
  • 使用的凭据

仅此信息就可能使识别应用程序和实例变得成问题,尤其是在凭据可以共享并且客户端通过负载均衡器连接但无法启用 Proxy 协议 的情况下。

为了更容易在服务器日志管理 UI 中识别客户端,AMQP 0-9-1 客户端连接(包括 RabbitMQ .NET 客户端)可以提供自定义标识符。如果设置了,则该标识符将在日志条目和管理 UI 中提及。该标识符被称为客户端提供的连接名称。该名称可用于识别应用程序或应用程序中的特定组件。该名称是可选的;但是,强烈建议开发人员提供一个名称,因为它将大大简化某些操作任务。

RabbitMQ .NET 客户端提供了一个连接工厂属性 ConnectionFactory.ClientProvidedName,如果设置了该属性,则控制由此工厂打开的所有新连接的客户端提供的连接名称。

这是一个上面使用的修改后的连接示例,它提供了这样一个名称

ConnectionFactory factory = new ConnectionFactory();
// "guest"/"guest" by default, limited to localhost connections
factory.UserName = user;
factory.Password = pass;
factory.VirtualHost = vhost;
factory.HostName = hostName;

// this name will be shared by all connections instantiated by
// this factory
factory.ClientProvidedName = "app:audit component:event-consumer";

IConnection conn = await factory.CreateConnectionAsync();

使用交换器和队列

客户端应用程序使用交换器和队列,即协议的高级构建块。这些必须在使用前“声明”。声明任何类型的对象只是确保存在该名称的对象,并在必要时创建它。

继续前面的示例,以下代码声明了一个交换器和一个队列,然后将它们绑定在一起。

await channel.ExchangeDeclareAsync(exchangeName, ExchangeType.Direct);
await channel.QueueDeclareAsync(queueName, false, false, false, null);
await channel.QueueBindAsync(queueName, exchangeName, routingKey, null);

这将主动声明以下对象

  • 一个非持久、非自动删除的 “direct” 类型交换器
  • 一个非持久、非自动删除、非独占队列

可以通过使用其他参数来自定义交换器。上面的代码然后使用给定的路由键将队列绑定到交换器。

许多通道 API (IChannel) 方法都是重载的。ExchangeDeclare 的便捷简短形式使用合理的默认值。还有更长的形式,带有更多参数,让您可以根据需要覆盖这些默认值,在需要时提供完全控制。

这种“简短版本,长版本”模式在整个 API 中使用。

被动声明

队列和交换器可以“被动地”声明。被动声明只是检查具有提供的名称的实体是否存在。如果存在,则该操作为空操作。对于队列,成功的被动声明将返回与非被动声明相同的信息,即消费者数量和队列中就绪状态的消息数量。

如果实体不存在,则操作将失败并出现通道级异常。之后无法使用该通道。应该打开一个新通道。通常使用一次性(临时)通道进行被动声明。

IChannel#QueueDeclarePassiveAsyncIChannel#ExchangeDeclarePassiveAsync 是用于被动声明的方法。以下示例演示了 IChannel#QueueDeclarePassive

var response = await channel.QueueDeclarePassiveAsync("queue-name");
// returns the number of messages in Ready state in the queue
response.MessageCount;
// returns the number of consumers the queue has
response.ConsumerCount;

IChannel#ExchangeDeclarePassiveAsync 的返回值不包含有用的信息。因此,如果该方法返回并且没有发生通道异常,则意味着交换器确实存在。

具有可选响应的操作

一些常见的操作也有“no wait”版本,它不会等待服务器响应。例如,要声明一个队列并指示服务器不发送任何响应,请使用

await channel.QueueDeclareAsync(queueName, true, false, false, null, noWait: true);

“no wait”版本更有效,但提供的安全保证较低,例如,它们更依赖于 心跳机制 来检测失败的操作。如有疑问,请从标准版本开始。“no wait”版本仅在具有高拓扑(队列、绑定) churn 的场景中才需要。

删除实体和清除消息

队列或交换器可以显式删除

await channel.QueueDeleteAsync("queue-name", false, false);

只有当队列为空时才能删除队列

await channel.QueueDeleteAsync("queue-name", false, true);

或者如果它未使用(没有任何消费者)

await channel.QueueDeleteAsync("queue-name", true, false);

可以清除队列(删除其所有消息)

await channel.QueuePurgeAsync("queue-name");

发布消息

要将消息发布到交换器,请按如下方式使用 IChannel.BasicPublishAsync

byte[] messageBodyBytes = System.Text.Encoding.UTF8.GetBytes("Hello, world!");
var props = new BasicProperties();
await channel.BasicPublishAsync(exchangeName, routingKey, false, props, messageBodyBytes);

为了进行精细控制,您可以使用重载的变体来指定 mandatory 标志,或指定消息属性

byte[] messageBodyBytes = System.Text.Encoding.UTF8.GetBytes("Hello, world!");
var props = new BasicProperties();
props.ContentType = "text/plain";
props.DeliveryMode = 2;
await channel.BasicPublishAsync(exchangeName, routingKey,
mandatory: true, basicProperties: props, body: messageBodyBytes);

这发送了一个 mandatory:true消息,其传递模式为 2(持久),内容类型为 “text/plain”。有关可用消息属性的更多信息,请参阅 IBasicProperties 接口的定义。

在以下示例中,我们发布带有自定义标头的消息

byte[] messageBodyBytes = System.Text.Encoding.UTF8.GetBytes("Hello, world!");

var props = new BasicProperties();
props.ContentType = "text/plain";
props.DeliveryMode = 2;
props.Headers = new Dictionary<string, object>();
props.Headers.Add("latitude", 51.5252949);
props.Headers.Add("longitude", -0.0905493);

await channel.BasicPublishAsync(exchangeName, routingKey, true, props, messageBodyBytes);

以下代码示例设置了消息过期时间

byte[] messageBodyBytes = System.Text.Encoding.UTF8.GetBytes("Hello, world!");

var props = new BasicProperties();
props.ContentType = "text/plain";
props.DeliveryMode = 2;
props.Expiration = "36000000";

await channel.BasicPublishAsync(exchangeName, routingKey, true, props, messageBodyBytes);

通过订阅检索消息(“push API”)

接收消息的推荐且最方便的方法是使用 IAsyncBasicConsumer 接口设置订阅。然后,消息将在到达时自动传递,而无需主动请求。

实现消费者的一种方法是使用便捷类 AsyncEventingBasicConsumer,它将传递和其他消费者生命周期事件作为 C# 事件分派

var consumer = new EventingBasicConsumer(channel);
consumer.Received += async (ch, ea) =>
{
var body = ea.Body.ToArray();
// copy or deserialise the payload
// and process the message
// ...
await channel.BasicAckAsync(ea.DeliveryTag, false);
};
// this consumer tag identifies the subscription
// when it has to be cancelled
string consumerTag = await channel.BasicConsumeAsync(queueName, false, consumer);

另一种选择是子类化 AsyncDefaultBasicConsumer,根据需要重写方法,或直接实现 IAsyncBasicConsumer。您通常需要实现核心方法 IAsyncBasicConsumer.HandleBasicDeliverAsync

更复杂的消费者将需要实现更多方法。特别是,HandleChannelShutdown 捕获通道/连接关闭。消费者还可以实现 HandleBasicCancelOk 以接收取消通知。

在未向原始 IChannel.BasicConsumeAsync 调用提供消费者标签的情况下,可以使用 AsyncDefaultBasicConsumerConsumerTag 属性来检索服务器生成的消费者标签。

您可以使用 IChannel.BasicCancelAsync 取消活动消费者

await channel.BasicCancelAsync(consumerTag);

在调用 API 方法时,您始终通过消费者标签引用消费者,消费者标签可以是客户端生成的,也可以是服务器生成的,如 AMQP 0-9-1 规范文档中所述。

消费者内存安全要求

从 .NET 客户端的 7.0 版本 开始,消息有效负载使用 System.ReadOnlyMemory<byte> 类型表示,该类型来自 System.Memory

RabbitMQ.Client 库对应用程序何时可以访问只读内存跨度施加了某些限制。

重要提示:消费者接口实现必须在传递处理程序方法返回之前反序列化或复制传递有效负载。保留对有效负载的引用是不安全的:分配给它的内存可以在处理程序返回后的任何时刻被释放。

获取单个消息(轮询或 “pull API”)

也可以按需检索单个消息(“pull API”,也称为轮询)。这种消费方法非常低效,因为它实际上是轮询,即使绝大多数请求没有产生任何结果,应用程序也必须重复请求结果。因此,强烈不建议使用此方法。

要 “pull” 消息,请使用 IChannel.BasicGetAsync 方法。返回的值是 BasicGetResult 的实例,可以从中提取标头信息(属性)和消息体

bool autoAck = false;
BasicGetResult result = await channel.BasicGetAsync(queueName, autoAck);
if (result == null) {
// No message available at this time.
} else {
var props = result.BasicProperties;
ReadOnlyMemory<byte> body = result.Body;
...

上面的示例使用手动确认 (autoAck = false),因此应用程序还必须调用 IChannel.BasicAckAsync 以在处理后确认传递

    ...
// acknowledge receipt of the message
await channel.BasicAckAsync(result.DeliveryTag, false);
}

请注意,使用此 API 获取消息效率相对较低。如果您希望 RabbitMQ 将消息推送到客户端,请参阅下一节。

消费者的并发注意事项

库用户需要考虑许多与并发相关的主题。

在线程之间共享通道

应避免多个线程同时使用 IChannel 实例。应用程序代码应保持 IChannel 实例的线程所有权的概念清晰。

对于发布者来说,这是一个硬性要求:共享通道(IChannel 实例)以进行并发发布将导致协议级别的帧交错不正确。线程不得共享发布在其上的通道实例。

如果多个线程需要访问特定的 IChannel 实例,则应用程序应强制执行互斥。实现此目的的一种方法是让 IChannel 的所有用户 lock 实例本身

IChannel ch = RetrieveSomeSharedIChannelInstance();
await _channelSemaphore.WaitAsync();
try
{
ch.BasicPublishAsync(...);
}
finally
{
_channelSemaphore.Release();
}

IChannel 操作序列化不正确的症状包括但不限于

  • 由于线路上的帧交错无效而导致的连接级异常。RabbitMQ 服务器日志将在这种情况下包含意外的帧错误。
  • 客户端抛出的管道和延续异常

应尽可能避免涉及在线程之间共享通道的消费,但可以安全地完成。

可以多线程或在内部使用线程池的消费者(包括基于 TPL 的消费者)必须对共享通道上的确认操作使用互斥。

每个连接的任务使用

在当前的实现中,每个 IConnection 实例都由一个 Task 支持,该任务从套接字读取并将结果事件分派给应用程序。如果启用了心跳,它们将为每个连接使用一对 .NET 计时器。

因此,通常在使用此库的应用程序中,至少会有两个 Task 实例处于活动状态

应用程序线程(“main” Task

包含应用程序逻辑,并在 IChannel 方法上进行调用以执行协议操作。

I/O 活动 Task 实例

隐藏起来并完全由 IConnection 实例管理。

线程模型的性质对应用程序可见的一个地方是应用程序向库注册的任何回调中。此类回调包括

  • 任何 IAsyncBasicConsumer 方法
  • IChannel 上的 BasicReturn 事件
  • IConnectionIChannel 等上的各种关闭事件。

消费者回调、并发和操作顺序

消费者操作分派是否并发?

默认情况下,IAsyncBasicConsumer 回调是顺序调用的(并发度为 1)。

对于入站消费者传递的并发分派,请将 ConnectionFactory.ConsumerDispatchConcurrency 设置为大于 1 的值。

消息顺序保证

保证同一通道上的消费者事件以它们接收到的相同顺序分派。

例如,如果消息 A 和 B 以此顺序在同一通道上传递,则它们将以此顺序分派给消费者(特定的 IAsyncBasicConsumer 实例)。

如果消息 A 和 B 在不同的通道上传递,则它们可以以任何顺序(或并行)分派给消费者。

当并发度为 1 时,同一通道上的传递将按顺序处理。当并发度较高时,它们的分派将以相同的顺序发生,但实际处理可以并行发生(取决于可用核心数和应用程序运行时),这可能会导致并发危害。

一次确认多个传递

消费者可以一次确认多个传递。当消费者分派并发度高于 1 时,这可能会导致双重确认,这被认为是 协议中的错误

因此,对于并发消费者分派,消费者应一次仅确认一个传递。

同一通道上的消费者和操作

消费者事件处理程序可以调用同一通道上的操作(例如 IChannel.QueueDeclareAsyncIChannel.BasicCancelAsync),而不会发生死锁。

处理无法路由的消息

如果发布消息时设置了 “mandatory” 标志,但无法传递,则代理将通过 (basic.return AMQP 0-9-1 命令) 将其返回给发送客户端。

要收到此类返回的通知,客户端可以订阅 IChannel.BasicReturn 事件。如果没有侦听器附加到该事件,则返回的消息将被静默丢弃。

channel.BasicReturn += (sender, ea) => {
...
};

例如,如果客户端将 “mandatory” 标志设置为 “direct” 类型的交换器(未绑定到队列),则将触发 BasicReturn 事件。

从网络故障自动恢复

连接恢复

客户端和 RabbitMQ 节点之间的网络连接可能会失败。RabbitMQ .NET/C# 客户端支持连接和拓扑(队列、交换器、绑定和消费者)的自动恢复。该功能具有本指南稍后介绍的某些限制。

自动恢复过程执行以下步骤

  • 重新连接
  • 恢复连接侦听器
  • 重新打开通道
  • 恢复通道侦听器
  • 恢复通道 basic.qos 设置、发布者确认和事务设置

拓扑恢复在完成上述操作后开始。为连接失败时已知处于打开状态的每个通道执行以下步骤

  • 重新声明交换器(预定义的交换器除外)
  • 重新声明队列
  • 恢复所有绑定
  • 恢复所有消费者

要启用自动连接恢复,请将 ConnectionFactory.AutomaticRecoveryEnabled 设置为 true

ConnectionFactory factory = new ConnectionFactory();
factory.AutomaticRecoveryEnabled = true;
// connection that will recover automatically
IConnection conn = await factory.CreateConnectionAsync();

如果由于异常导致恢复失败(例如,RabbitMQ 节点仍然无法访问),则将在固定的时间间隔后重试(默认为 5 秒)。可以配置该间隔

ConnectionFactory factory = new ConnectionFactory();
// attempt recovery every 10 seconds
factory.NetworkRecoveryInterval = TimeSpan.FromSeconds(10);

何时触发连接恢复?

如果启用自动连接恢复,则将由以下事件触发

  • 在连接的 I/O 循环中抛出 I/O 异常
  • 套接字读取操作超时
  • 检测到丢失的服务器心跳
  • 在连接的 I/O 循环中抛出任何其他意外异常

以先发生者为准。

如果客户端与 RabbitMQ 节点的初始连接失败,则自动连接恢复将不会启动。应用程序开发人员负责重试此类连接、记录失败的尝试、实施重试次数限制等等。这是一个非常基本的示例

ConnectionFactory factory = new ConnectionFactory();
// configure various connection settings

try {
IConnection conn = await factory.CreateConnectionAsync();
} catch (RabbitMQ.Client.Exceptions.BrokerUnreachableException e) {
await Task.Delay(5000);
// apply retry logic
}

当应用程序通过 Connection.CloseAsync 方法关闭连接时,将不会启动连接恢复。

通道级异常不会触发任何类型的恢复,因为它们通常指示应用程序中的语义问题(例如,尝试从不存在的队列进行消费)。

对发布的影响

当连接断开时,使用 IChannel.BasicPublishAsync 发布的消息将丢失。客户端不会将它们排队以在连接恢复后传递。为了确保发布的消息到达 RabbitMQ 应用程序,需要使用发布者确认并考虑连接故障。

拓扑恢复

拓扑恢复涉及交换器、队列、绑定和消费者的恢复。默认情况下启用,但可以禁用

ConnectionFactory factory = new ConnectionFactory();
factory.AutomaticRecoveryEnabled = true;
factory.TopologyRecoveryEnabled = false;

IConnection conn = await factory.CreateConnectionAsync();

故障检测和恢复限制

自动连接恢复有许多限制和有意的设计决策,应用程序开发人员需要注意。

当连接断开或丢失时,检测需要时间。因此,存在一个时间窗口,在此期间库和应用程序都不知道有效的连接故障。在此时间范围内发布的任何消息都将像往常一样序列化并写入 TCP 套接字。它们传递到代理的保证只能通过发布者确认:AMQP 0-9-1 中的发布在设计上是完全异步的。

当具有自动恢复功能的连接检测到套接字或 I/O 操作错误时,恢复会在可配置的延迟(默认为 5 秒)后开始。此设计假设即使许多网络故障是短暂的且通常是短暂的,它们也不会立即消失。连接恢复尝试将以相同的时间间隔继续,直到成功打开新连接。

当连接处于恢复状态时,在其通道上尝试进行的任何发布都将被异常拒绝。客户端当前不执行任何此类传出消息的内部缓冲。应用程序开发人员有责任跟踪此类消息,并在恢复成功时重新发布它们。发布者确认是应该由无法承受消息丢失的发布者使用的协议扩展。

当通道由于通道级异常而关闭时,连接恢复不会启动。

此类异常通常指示应用程序级问题。库无法确定是否是这种情况,也无法就如何恢复做出明智的决定。

即使在连接恢复启动后,已关闭的通道也不会恢复。这包括显式关闭的通道和上述通道级异常情况。

手动确认和自动恢复

当使用手动确认时,在消息传递和确认之间,与 RabbitMQ 节点的网络连接可能会失败。连接恢复后,RabbitMQ 将重置所有通道上的传递标签。

这意味着具有旧传递标签的 basic.ackbasic.nackbasic.reject 将导致通道异常。为了避免这种情况,RabbitMQ .NET 客户端会跟踪并更新传递标签,以使其在恢复之间单调增长。

然后,IChannel.BasicAckAsyncIChannel.BasicNackAsyncIChannel.BasicRejectAsync 将调整后的传递标签转换为 RabbitMQ 使用的标签。

将不会发送具有过时传递标签的确认。使用手动确认和自动恢复的应用程序必须能够处理重新传递。

OAuth 2 支持

客户端可以针对 OAuth 2 服务器(如 UAA)进行身份验证。必须在服务器端启用 OAuth 2 插件,并将其配置为使用与客户端相同的 OAuth 2 服务器。本节假设使用了 OAuth2 客户端库的最新主要版本

获取 OAuth 2 Token

.NET 客户端提供了 OAuth2ClientCredentialsProvider 类,用于使用 OAuth 2 客户端凭据流程 获取 JWT token。客户端在打开连接时在密码字段中发送访问令牌。然后,broker 会验证访问令牌的签名、有效性和权限,然后授权连接并授予对请求的虚拟主机的访问权限。

using RabbitMQ.Client.OAuth2;

var tokenEndpointUri = new Uri("http://somedomain.com/token");
var oAuth2Client = new OAuth2ClientBuilder("client_id", "client_secret", tokenEndpointUri).Build();
ICredentialsProvider credentialProvider = new OAuth2ClientCredentialsProvider("prod-uaa-1", oAuth2Client);

var connectionFactory = new ConnectionFactory
{
CredentialsProvider = credentialsProvider
};
var connection = await connectionFactory.CreateConnectionAsync();

在生产环境中,请务必对 token 端点 URI 使用 HTTPS,并为 HttpClient 适当配置 HttpClientHandler

HttpClientHandler httpClientHandler = buildHttpClientHandlerWithTLSEnabled();

var tokenEndpointUri = new Uri("https://somedomain.com/token");

var oAuth2ClientBuilder = new OAuth2ClientBuilder("client_id", "client_secret", tokenEndpointUri)
oAuth2ClientBuilder.SetHttpClientHandler(httpClientHandler);
var oAuth2Client = await oAuth2ClientBuilder.BuildAsync();

ICredentialsProvider credentialsProvider = new OAuth2ClientCredentialsProvider("prod-uaa-1", oAuth2Client);

var connectionFactory = new ConnectionFactory
{
CredentialsProvider = credentialsProvider
};
var connection = await connectionFactory.CreateConnectionAsync();

注意:如果您的授权服务器需要超出规范要求的额外请求参数,您可以将 <key, value> 对添加到 Dictionary 中,并将其传递给 OAuth2ClientCredentialsProvider 构造函数,而不是像上面所示的 EMPTY

刷新 Token

当 token 过期时,broker 将拒绝在该连接上进行进一步操作。可以在过期之前调用 ICredentialsProvider#GetCredentialsAsync() 并将新 token 发送到服务器。这对应用程序来说不太方便,因此 .NET 客户端提供了 CredentialsRefresher 来提供帮助。

请参阅 TestOAuth2,了解如何使用 CredentialsRefresher 类。

CredentialsRefresher 会在 token 有效期的 2/3 后安排刷新。例如,如果 token 在 60 分钟后过期,则会在 40 分钟后刷新。

© . All rights reserved.