跳至主内容

RabbitMQ 教程 - 使用发布者确认实现可靠发布

发布者确认

信息

先决条件

本教程假设 RabbitMQ 已 安装 并在 localhost 上的 标准端口 (5672) 上运行。如果您使用不同的主机、端口或凭据,则需要调整连接设置。

哪里寻求帮助

如果您在学习本教程时遇到困难,可以通过 GitHub DiscussionsRabbitMQ 社区 Discord 联系我们。

发布者确认 是 RabbitMQ 的一项扩展,用于实现可靠发布。当通道上启用了发布者确认时,客户端发布的这些消息将由代理异步确认,这意味着这些消息在服务器端已被处理。

概述

在本教程中,我们将使用发布者确认来确保已发布的消息已安全到达代理。我们将介绍几种使用发布者确认的策略,并解释它们的优缺点。

在通道上启用发布者确认

发布者确认(Publisher confirms)是 AMQP 0.9.1 协议的 RabbitMQ 扩展,因此默认情况下不会启用。发布者确认需要在通道级别通过 CreateChannelOptions 类进行启用。

dotnet/PublisherConfirms/PublisherConfirms.cs
loading...

这些选项必须传递给每一个你期望使用发布者确认功能的通道。

策略 #1:单独发布消息

让我们从最简单的发布确认方式开始,即:发送一条消息并等待其确认。

dotnet/PublisherConfirms/PublisherConfirms.cs
loading...

在前一个示例中,我们像往常一样发布消息,并通过 await 等待 BasicPublishAsync 返回的任务来获取确认。一旦消息被确认,await 就会立即返回。如果消息被拒绝(nack-ed)或返回(意味着代理由于某种原因无法处理它),该方法将抛出异常。处理该异常通常包括记录错误消息和/或重试发送消息。

策略 #2:批量发布消息

为了改进上一个示例,我们可以批量发布消息并等待整个批次被确认。以下示例使用的批处理大小等于允许的未确认消息总数的一半。

dotnet/PublisherConfirms/PublisherConfirms.cs
loading...

此方法负责等待给定批次消息的发布者确认。

dotnet/PublisherConfirms/PublisherConfirms.cs
loading...

策略 #3:在应用程序中处理发布者确认

代理会异步确认已发布的消息,只需在客户端注册一个回调函数即可获得这些确认的通知。

dotnet/PublisherConfirms/PublisherConfirms.cs
loading...

共有 3 种回调:一种用于已确认的消息,一种用于被拒绝(nack-ed)的消息,还有一种用于被返回的消息。所有回调都有一个对应的 EventArgs 参数 (ea)。对于确认(ack)和拒绝(nack),该参数包含:

  • 投递标签(delivery tag):用于标识被确认或被拒绝消息的序列号。我们稍后会看到如何将其与已发布的消息关联起来。
  • multiple(多重确认):这是一个布尔值。如果为 false,则仅确认/拒绝一条消息;如果为 true,则所有序列号小于或等于该值的消息均被确认/拒绝。

序列号可以在发布前通过 IChannel#GetNextPublishSequenceNumberAsync 获取。

var sequenceNumber = await channel.GetNextPublishSequenceNumberAsync();
await channel.BasicPublishAsync(exchange, queue, properties, body);

将消息与序列号关联的一种高性能方法是使用链表。假设我们要发布字符串,因为它们很容易转换为字节数组进行发布。

现在的发布代码使用链表跟踪出站消息。当确认到达时,我们需要清理此列表;当消息被拒绝时,我们需要执行诸如记录警告之类的操作。

dotnet/PublisherConfirms/PublisherConfirms.cs
loading...

前一个示例包含一个回调,用于在确认、拒绝或返回到达时清理链表。请注意,此回调同时处理单个确认和多重确认。对于被拒绝或返回的消息,回调会发出警告。然后,它会复用之前的回调来清理未确认消息的链表。

总之,异步处理发布者确认通常需要以下步骤:

  • 提供一种将发布序列号与消息关联的方法。
  • 在通道上注册确认监听器,以便在收到发布者确认/拒绝时得到通知,从而执行适当的操作,例如记录日志或重新发布被拒绝的消息。序列号到消息的关联机制在这一步可能也需要进行一些清理。
  • 在发布消息前追踪发布序列号。

重新发布被拒绝的消息?

在对应的回调中重新发布被拒绝的消息是很诱人的做法,但这应该避免,因为确认回调是在 I/O 线程中调度的,而通道不应该在这些线程中执行操作。一个更好的解决方案是将消息放入内存队列中,由发布线程进行轮询。像 ConcurrentQueue 这样的类非常适合在确认回调和发布线程之间传输消息。

总结

确保已发布的消息到达代理对于某些应用程序至关重要。发布者确认是 RabbitMQ 的一项功能,有助于满足这一要求。发布者确认本质上是异步的,但也可以同步处理。没有一种绝对的实现发布者确认的方法,这通常取决于应用程序和整个系统的约束。典型的技术包括:

  • 逐个发布消息,通过 await 等待确认:简单。
  • 批量发布消息,等待一批消息的确认:简单,吞吐量合理,但在出错时很难分析原因。
  • 异步处理:最佳性能和资源利用率,错误情况下控制良好,但正确实现可能比较复杂。

整合

PublisherConfirms.cs 类包含了我们所涵盖技术的代码。我们可以编译并按原样执行它,看看它们的表现如何。

dotnet run
# => 11/6/2024 10:36:22 AM [INFO] publishing 50,000 messages and handling confirms per-message
# => 11/6/2024 10:36:28 AM [INFO] published 50,000 messages individually in 5,699 ms
# => 11/6/2024 10:36:28 AM [INFO] publishing 50,000 messages and handling confirms in batches
# => 11/6/2024 10:36:29 AM [INFO] published 50,000 messages in batch in 1,085 ms
# => 11/6/2024 10:36:29 AM [INFO] publishing 50,000 messages and handling confirms asynchronously
# => 11/6/2024 10:36:29 AM [WARNING] message sequence number 50000 has been basic.return-ed
# => 11/6/2024 10:36:29 AM [WARNING] message sequence number 50000 has been basic.return-ed
# => 11/6/2024 10:36:29 AM [WARNING] message sequence number 50000 has been basic.return-ed
# => ...
# => ...
# => ...
# => 11/6/2024 10:36:30 AM [WARNING] message sequence number 50000 has been basic.return-ed
# => 11/6/2024 10:36:30 AM [INFO] published 50,000 messages and handled confirm asynchronously 878 ms

如果客户端和服务器位于同一台机器上,你在计算机上看到的输出应该类似。

发布者确认非常依赖网络,因此最好尝试使用远程节点,这更符合实际情况,因为在生产环境中,客户端和服务器通常不在同一台机器上。可以很容易地修改 PublisherConfirms.cs 以使用非本地节点。

private static Task<IConnection> CreateConnection()
{
var factory = new ConnectionFactory
{
HostName = "remote-host",
UserName = "remote-host",
Password = "remote-password"
};

return factory.CreateConnectionAsync();
}

请记住,批量发布易于实现,但无法轻易得知在收到否定确认时到底是哪些消息未能到达代理。异步处理发布者确认的实现较为复杂,但提供了更好的粒度和对消息被拒绝时所需执行操作的更好控制。

© . This site is unofficial and not affiliated with VMware.