RabbitMQ 教程 - 使用发布者确认实现可靠发布
发布者确认
先决条件
本教程假设 RabbitMQ 已 安装 并在 localhost 上的 标准端口 (5672) 上运行。如果您使用不同的主机、端口或凭据,则需要调整连接设置。
哪里寻求帮助
如果您在学习本教程时遇到困难,可以通过 GitHub Discussions 或 RabbitMQ 社区 Discord 联系我们。
发布者确认 (Publisher confirms) 是 RabbitMQ 的一项扩展,用于实现可靠发布。当通道上启用发布者确认后,客户端发布的每条消息都会由代理进行异步确认,这意味着消息已在服务器端得到处理。
概述
在本教程中,我们将使用发布者确认来确保已发布的消息安全送达代理。我们将介绍几种使用发布者确认的策略,并解释它们的优缺点。
在通道上启用发布者确认
发布者确认是 RabbitMQ 对 AMQP 0.9.1 协议的扩展,因此默认情况下不启用。发布者确认是在通道级别通过 CreateChannelOptions 类启用的。
loading...
必须将这些选项传递给您期望使用发布者确认的每个通道。
策略 #1:单独发布消息
让我们从最简单的带确认的发布方法开始,即发布一条消息并等待其确认。
loading...
在上一个示例中,我们像平常一样发布消息,并通过 await 等待 BasicPublishAsync 返回的任务。await 一旦消息被确认就会返回。如果消息被拒绝 (nack-ed) 或被退回 (表示代理因某种原因无法处理它),该方法将抛出异常。异常处理通常包括记录错误消息和/或重试发送消息。
策略 #2:批量发布消息
为了改进我们先前的示例,我们可以发布一批消息并等待整个批次被确认。以下示例使用了一个批次大小,该大小等于允许的未确认确认数的一半。
loading...
此方法负责等待给定消息批次的发布者确认。
loading...
策略 #3:在应用程序中处理发布者确认
代理异步确认已发布的消息,客户端只需注册一个回调即可收到这些确认的通知。
loading...
有 3 个回调:一个用于已确认的消息,一个用于已拒绝的消息,还有一个用于已退回的消息。所有回调都有一个对应的 EventArgs 参数 (ea)。对于 ack 和 nack,这包含:
- delivery tag:用于标识已确认或已拒绝消息的序列号。我们稍后将看到如何将其与已发布的消息关联起来。
- multiple:这是一个布尔值。如果为 false,则只确认/拒绝一条消息;如果为 true,则确认/拒绝所有序列号小于或等于该序列号的消息。
在发布之前,可以使用 IChannel#GetNextPublishSequenceNumberAsync 获取序列号。
var sequenceNumber = await channel.GetNextPublishSequenceNumberAsync();
await channel.BasicPublishAsync(exchange, queue, properties, body);
一种高效地将消息与序列号关联的方法是使用链表。假设我们要发布字符串,因为它们很容易转换为字节数组进行发布。
发布代码现在使用链表跟踪出站消息。当收到确认时,我们需要清理此列表,并在消息被拒绝时执行类似记录警告的操作。
loading...
前面的示例包含一个回调,当收到确认、拒绝或退回时会清理链表。请注意,此回调同时处理单个和多个确认。对于被拒绝或退回的消息的回调会发出警告。然后它会重新使用前面的回调来清理未确认确认的链表。
总而言之,异步处理发布者确认通常需要以下步骤:
- 提供一种将发布序列号与消息相关联的方法。
- 在通道上注册确认监听器,以便在发布者 ack/nack 到达时收到通知,并执行适当的操作,例如记录或重新发布被拒绝的消息。序列号到消息的关联机制在此步骤中也可能需要一些清理。
- 在发布消息之前跟踪发布序列号。
重新发布被拒绝的消息?
当收到拒绝消息的回调时,可能会想立即重新发布该消息,但这应该避免,因为确认回调是在 I/O 线程上分发的,而通道不应在此线程上执行操作。更好的解决方案是将消息放入一个内存队列中,然后由发布线程轮询。像
ConcurrentQueue这样的类将是传递消息在确认回调和发布线程之间的好选择。
总结
确保已发布的消息已到达代理在某些应用程序中可能至关重要。发布者确认是 RabbitMQ 的一项功能,有助于满足这一要求。发布者确认本质上是异步的,但也可以同步处理。没有一种绝对的方法可以实现发布者确认,这通常取决于应用程序和整个系统的约束。典型技术包括:
- 单独发布消息,通过
await等待确认:简单。 - 批量发布消息,等待一批消息的确认:简单,吞吐量合理,但在出现问题时难以排查。
- 异步处理:性能和资源利用率最佳,错误处理控制良好,但正确实现可能比较复杂。
整合
PublisherConfirms.cs 类包含了我们介绍的这些技术的代码。我们可以编译并按原样执行它,看看它们各自的性能如何。
dotnet run
# => 11/6/2024 10:36:22 AM [INFO] publishing 50,000 messages and handling confirms per-message
# => 11/6/2024 10:36:28 AM [INFO] published 50,000 messages individually in 5,699 ms
# => 11/6/2024 10:36:28 AM [INFO] publishing 50,000 messages and handling confirms in batches
# => 11/6/2024 10:36:29 AM [INFO] published 50,000 messages in batch in 1,085 ms
# => 11/6/2024 10:36:29 AM [INFO] publishing 50,000 messages and handling confirms asynchronously
# => 11/6/2024 10:36:29 AM [WARNING] message sequence number 50000 has been basic.return-ed
# => 11/6/2024 10:36:29 AM [WARNING] message sequence number 50000 has been basic.return-ed
# => 11/6/2024 10:36:29 AM [WARNING] message sequence number 50000 has been basic.return-ed
# => ...
# => ...
# => ...
# => 11/6/2024 10:36:30 AM [WARNING] message sequence number 50000 has been basic.return-ed
# => 11/6/2024 10:36:30 AM [INFO] published 50,000 messages and handled confirm asynchronously 878 ms
如果客户端和服务器位于同一台机器上,您计算机上的输出应该与此类似。
发布者确认非常依赖网络,因此我们最好尝试连接到远程节点,这更符合实际情况,因为在生产环境中客户端和服务器通常不在同一台机器上。PublisherConfirms.cs 可以轻松地进行更改以使用非本地节点。
private static Task<IConnection> CreateConnection()
{
var factory = new ConnectionFactory
{
HostName = "remote-host",
UserName = "remote-host",
Password = "remote-password"
};
return factory.CreateConnectionAsync();
}
请记住,批量发布简单易实现,但在发布者收到否定确认的情况下,很难知道哪些消息未能到达代理。异步处理发布者确认的实现更复杂,但提供了更好的粒度和对发布消息被拒绝时要执行的操作的更好控制。