RabbitMQ 教程 - 使用发布者确认实现可靠发布
发布者确认
先决条件
本教程假设 RabbitMQ 已 安装 并在 localhost 上的 标准端口 (5672) 上运行。如果您使用不同的主机、端口或凭据,则需要调整连接设置。
哪里寻求帮助
如果您在学习本教程时遇到困难,可以通过 GitHub Discussions 或 RabbitMQ 社区 Discord 联系我们。
发布者确认 是 RabbitMQ 的一项扩展,用于实现可靠发布。当通道上启用了发布者确认时,客户端发布的这些消息将由代理异步确认,这意味着这些消息在服务器端已被处理。
概述
在本教程中,我们将使用发布者确认来确保已发布的消息已安全到达代理。我们将介绍几种使用发布者确认的策略,并解释它们的优缺点。
在通道上启用发布者确认
发布者确认(Publisher confirms)是 AMQP 0.9.1 协议的 RabbitMQ 扩展,因此默认情况下不会启用。发布者确认需要在通道级别通过 CreateChannelOptions 类进行启用。
loading...
这些选项必须传递给每一个你期望使用发布者确认功能的通道。
策略 #1:单独发布消息
让我们从最简单的发布确认方式开始,即:发送一条消息并等待其确认。
loading...
在前一个示例中,我们像往常一样发布消息,并通过 await 等待 BasicPublishAsync 返回的任务来获取确认。一旦消息被确认,await 就会立即返回。如果消息被拒绝(nack-ed)或返回(意味着代理由于某种原因无法处理它),该方法将抛出异常。处理该异常通常包括记录错误消息和/或重试发送消息。
策略 #2:批量发布消息
为了改进上一个示例,我们可以批量发布消息并等待整个批次被确认。以下示例使用的批处理大小等于允许的未确认消息总数的一半。
loading...
此方法负责等待给定批次消息的发布者确认。
loading...
策略 #3:在应用程序中处理发布者确认
代理会异步确认已发布的消息,只需在客户端注册一个回调函数即可获得这些确认的通知。
loading...
共有 3 种回调:一种用于已确认的消息,一种用于被拒绝(nack-ed)的消息,还有一种用于被返回的消息。所有回调都有一个对应的 EventArgs 参数 (ea)。对于确认(ack)和拒绝(nack),该参数包含:
- 投递标签(delivery tag):用于标识被确认或被拒绝消息的序列号。我们稍后会看到如何将其与已发布的消息关联起来。
- multiple(多重确认):这是一个布尔值。如果为 false,则仅确认/拒绝一条消息;如果为 true,则所有序列号小于或等于该值的消息均被确认/拒绝。
序列号可以在发布前通过 IChannel#GetNextPublishSequenceNumberAsync 获取。
var sequenceNumber = await channel.GetNextPublishSequenceNumberAsync();
await channel.BasicPublishAsync(exchange, queue, properties, body);
将消息与序列号关联的一种高性能方法是使用链表。假设我们要发布字符串,因为它们很容易转换为字节数组进行发布。
现在的发布代码使用链表跟踪出站消息。当确认到达时,我们需要清理此列表;当消息被拒绝时,我们需要执行诸如记录警告之类的操作。
loading...
前一个示例包含一个回调,用于在确认、拒绝或返回到达时清理链表。请注意,此回调同时处理单个确认和多重确认。对于被拒绝或返回的消息,回调会发出警告。然后,它会复用之前的回调来清理未确认消息的链表。
总之,异步处理发布者确认通常需要以下步骤:
- 提供一种将发布序列号与消息关联的方法。
- 在通道上注册确认监听器,以便在收到发布者确认/拒绝时得到通知,从而执行适当的操作,例如记录日志或重新发布被拒绝的消息。序列号到消息的关联机制在这一步可能也需要进行一些清理。
- 在发布消息前追踪发布序列号。
重新发布被拒绝的消息?
在对应的回调中重新发布被拒绝的消息是很诱人的做法,但这应该避免,因为确认回调是在 I/O 线程中调度的,而通道不应该在这些线程中执行操作。一个更好的解决方案是将消息放入内存队列中,由发布线程进行轮询。像
ConcurrentQueue这样的类非常适合在确认回调和发布线程之间传输消息。
总结
确保已发布的消息到达代理对于某些应用程序至关重要。发布者确认是 RabbitMQ 的一项功能,有助于满足这一要求。发布者确认本质上是异步的,但也可以同步处理。没有一种绝对的实现发布者确认的方法,这通常取决于应用程序和整个系统的约束。典型的技术包括:
- 逐个发布消息,通过
await等待确认:简单。 - 批量发布消息,等待一批消息的确认:简单,吞吐量合理,但在出错时很难分析原因。
- 异步处理:最佳性能和资源利用率,错误情况下控制良好,但正确实现可能比较复杂。
整合
PublisherConfirms.cs 类包含了我们所涵盖技术的代码。我们可以编译并按原样执行它,看看它们的表现如何。
dotnet run
# => 11/6/2024 10:36:22 AM [INFO] publishing 50,000 messages and handling confirms per-message
# => 11/6/2024 10:36:28 AM [INFO] published 50,000 messages individually in 5,699 ms
# => 11/6/2024 10:36:28 AM [INFO] publishing 50,000 messages and handling confirms in batches
# => 11/6/2024 10:36:29 AM [INFO] published 50,000 messages in batch in 1,085 ms
# => 11/6/2024 10:36:29 AM [INFO] publishing 50,000 messages and handling confirms asynchronously
# => 11/6/2024 10:36:29 AM [WARNING] message sequence number 50000 has been basic.return-ed
# => 11/6/2024 10:36:29 AM [WARNING] message sequence number 50000 has been basic.return-ed
# => 11/6/2024 10:36:29 AM [WARNING] message sequence number 50000 has been basic.return-ed
# => ...
# => ...
# => ...
# => 11/6/2024 10:36:30 AM [WARNING] message sequence number 50000 has been basic.return-ed
# => 11/6/2024 10:36:30 AM [INFO] published 50,000 messages and handled confirm asynchronously 878 ms
如果客户端和服务器位于同一台机器上,你在计算机上看到的输出应该类似。
发布者确认非常依赖网络,因此最好尝试使用远程节点,这更符合实际情况,因为在生产环境中,客户端和服务器通常不在同一台机器上。可以很容易地修改 PublisherConfirms.cs 以使用非本地节点。
private static Task<IConnection> CreateConnection()
{
var factory = new ConnectionFactory
{
HostName = "remote-host",
UserName = "remote-host",
Password = "remote-password"
};
return factory.CreateConnectionAsync();
}
请记住,批量发布易于实现,但无法轻易得知在收到否定确认时到底是哪些消息未能到达代理。异步处理发布者确认的实现较为复杂,但提供了更好的粒度和对消息被拒绝时所需执行操作的更好控制。