RabbitMQ 教程 - 使用发布者确认实现可靠发布
发布者确认
前提条件
本教程假设您已安装 RabbitMQ 并在 localhost
上的标准端口 (5672) 上运行。如果您使用不同的主机、端口或凭据,则需要调整连接设置。
获取帮助
如果您在学习本教程时遇到问题,可以通过 GitHub Discussions 或 RabbitMQ 社区 Discord 联系我们。
发布者确认是 RabbitMQ 扩展,用于实现可靠发布。当通道上启用发布者确认时,客户端发布的消息会由 Broker 异步确认,这意味着服务器端已处理这些消息。
概述
在本教程中,我们将使用发布者确认来确保已发布的消息安全到达 Broker。我们将介绍使用发布者确认的几种策略,并解释它们的优点和缺点。
在通道上启用发布者确认
发布者确认是 AMQP 0.9.1 协议的 RabbitMQ 扩展,因此默认情况下未启用。发布者确认在通道级别通过 CreateChannelOptions
类启用
loading...
这些选项必须传递给您希望使用发布者确认的每个通道。
策略 #1:单独发布消息
让我们从最简单的发布确认方法开始,即发布一条消息并等待其确认
loading...
在前面的示例中,我们像往常一样发布消息,并通过 await
等待 BasicPublishAsync
返回的任务来确认消息。一旦消息被确认,await
就会返回。如果消息被 nack 或退回(意味着 Broker 由于某种原因无法处理),该方法将抛出异常。异常处理通常包括记录错误消息和/或重试发送消息。
策略 #2:批量发布消息
为了改进之前的示例,我们可以批量发布消息并等待整个批次得到确认。以下示例使用的批量大小等于允许的未完成确认计数的一半
loading...
此方法负责等待给定批次消息的发布者确认
loading...
策略 #3:在应用程序中处理发布者确认
Broker 异步确认已发布的消息,只需在客户端上注册一个回调即可接收这些确认的通知
loading...
有 3 个回调:一个用于已确认的消息,一个用于 nack 的消息,一个用于退回的消息。所有回调都有一个对应的 EventArgs
参数 (ea
)。对于 ack 和 nack,这包含
- 投递标签:标识已确认或 nack 消息的序列号。我们稍后将看到如何将其与已发布的消息关联起来。
- multiple:这是一个布尔值。如果为 false,则仅确认/nack 一条消息;如果为 true,则确认/nack 所有序列号小于或等于该序列号的消息。
序列号可以使用 IChannel#GetNextPublishSequenceNumberAsync
在发布之前获得
var sequenceNumber = await channel.GetNextPublishSequenceNumberAsync();
await channel.BasicPublishAsync(exchange, queue, properties, body);
将消息与序列号关联的有效方法是使用链表。假设我们要发布字符串,因为它们很容易转换为字节数组以进行发布。
发布代码现在使用链表跟踪出站消息。当确认到达时,我们需要清理此列表,并在消息被 nack 时执行类似记录警告的操作
loading...
之前的示例包含一个回调,该回调在确认、nack 或退回到达时清理链表。请注意,此回调处理单个和多个确认。nack 或退回消息的回调发出警告。然后,它重用先前的回调来清理未完成确认的链表。
总而言之,异步处理发布者确认通常需要以下步骤
- 提供一种将发布序列号与消息关联起来的方法。
- 在通道上注册确认监听器,以便在发布者 ack/nack 到达时收到通知,以执行适当的操作,例如记录日志或重新发布 nack 的消息。序列号到消息的关联机制也可能需要在此步骤中进行一些清理。
- 在发布消息之前跟踪发布序列号。
重新发布 nack 的消息?
从相应的回调中重新发布 nack 的消息可能很诱人,但这应该避免,因为确认回调是在 I/O 线程中调度的,通道不应在 I/O 线程中执行操作。更好的解决方案是将消息排队到内存队列中,该队列由发布线程轮询。像
ConcurrentQueue
这样的类将是传输确认回调和发布线程之间消息的良好选择。
总结
在某些应用程序中,确保已发布的消息到达 Broker 可能至关重要。发布者确认是 RabbitMQ 的一项功能,有助于满足此要求。发布者确认本质上是异步的,但也可以同步处理它们。没有明确的方法来实现发布者确认,这通常取决于应用程序和整个系统中的约束。典型的技术是
- 单独发布消息,通过
await
等待确认:简单。 - 批量发布消息,等待批次的确认:简单,吞吐量合理,但出现问题时难以推理。
- 异步处理:最佳性能和资源利用率,在发生错误时具有良好的控制,但正确实施可能很复杂。
整合所有内容
PublisherConfirms.cs
类包含我们介绍的技术的代码。我们可以编译它,按原样执行它,看看它们各自的性能如何
dotnet run
# => 11/6/2024 10:36:22 AM [INFO] publishing 50,000 messages and handling confirms per-message
# => 11/6/2024 10:36:28 AM [INFO] published 50,000 messages individually in 5,699 ms
# => 11/6/2024 10:36:28 AM [INFO] publishing 50,000 messages and handling confirms in batches
# => 11/6/2024 10:36:29 AM [INFO] published 50,000 messages in batch in 1,085 ms
# => 11/6/2024 10:36:29 AM [INFO] publishing 50,000 messages and handling confirms asynchronously
# => 11/6/2024 10:36:29 AM [WARNING] message sequence number 50000 has been basic.return-ed
# => 11/6/2024 10:36:29 AM [WARNING] message sequence number 50000 has been basic.return-ed
# => 11/6/2024 10:36:29 AM [WARNING] message sequence number 50000 has been basic.return-ed
# => ...
# => ...
# => ...
# => 11/6/2024 10:36:30 AM [WARNING] message sequence number 50000 has been basic.return-ed
# => 11/6/2024 10:36:30 AM [INFO] published 50,000 messages and handled confirm asynchronously 878 ms
如果客户端和服务器位于同一台机器上,您计算机上的输出应该看起来相似。
发布者确认非常依赖于网络,因此我们最好尝试使用远程节点,这更符合实际情况,因为客户端和服务器在生产环境中通常不在同一台机器上。PublisherConfirms.cs
可以很容易地更改为使用非本地节点
private static Task<IConnection> CreateConnection()
{
var factory = new ConnectionFactory
{
HostName = "remote-host",
UserName = "remote-host",
Password = "remote-password"
};
return factory.CreateConnectionAsync();
}
请记住,批量发布实现起来很简单,但在发布者负面确认的情况下,不容易知道哪些消息无法到达 Broker。异步处理发布者确认实现起来更复杂,但在已发布的消息被 nack 时,可以提供更好的粒度和对要执行的操作的更好控制。