跳到主要内容

RabbitMQ 教程 - 使用发布者确认实现可靠发布

发布者确认

信息

前提条件

本教程假设您已安装 RabbitMQ 并在 localhost 上的标准端口 (5672) 上运行。如果您使用不同的主机、端口或凭据,则需要调整连接设置。

获取帮助

如果您在学习本教程时遇到问题,可以通过 GitHub DiscussionsRabbitMQ 社区 Discord 联系我们。

发布者确认是 RabbitMQ 扩展,用于实现可靠发布。当通道上启用发布者确认时,客户端发布的消息会由 Broker 异步确认,这意味着服务器端已处理这些消息。

概述

在本教程中,我们将使用发布者确认来确保已发布的消息安全到达 Broker。我们将介绍使用发布者确认的几种策略,并解释它们的优点和缺点。

在通道上启用发布者确认

发布者确认是 AMQP 0.9.1 协议的 RabbitMQ 扩展,因此默认情况下未启用。发布者确认在通道级别通过 CreateChannelOptions 类启用

dotnet/PublisherConfirms/PublisherConfirms.cs
loading...

这些选项必须传递给您希望使用发布者确认的每个通道。

策略 #1:单独发布消息

让我们从最简单的发布确认方法开始,即发布一条消息并等待其确认

dotnet/PublisherConfirms/PublisherConfirms.cs
loading...

在前面的示例中,我们像往常一样发布消息,并通过 await 等待 BasicPublishAsync 返回的任务来确认消息。一旦消息被确认,await 就会返回。如果消息被 nack 或退回(意味着 Broker 由于某种原因无法处理),该方法将抛出异常。异常处理通常包括记录错误消息和/或重试发送消息。

策略 #2:批量发布消息

为了改进之前的示例,我们可以批量发布消息并等待整个批次得到确认。以下示例使用的批量大小等于允许的未完成确认计数的一半

dotnet/PublisherConfirms/PublisherConfirms.cs
loading...

此方法负责等待给定批次消息的发布者确认

dotnet/PublisherConfirms/PublisherConfirms.cs
loading...

策略 #3:在应用程序中处理发布者确认

Broker 异步确认已发布的消息,只需在客户端上注册一个回调即可接收这些确认的通知

dotnet/PublisherConfirms/PublisherConfirms.cs
loading...

有 3 个回调:一个用于已确认的消息,一个用于 nack 的消息,一个用于退回的消息。所有回调都有一个对应的 EventArgs 参数 (ea)。对于 ack 和 nack,这包含

  • 投递标签:标识已确认或 nack 消息的序列号。我们稍后将看到如何将其与已发布的消息关联起来。
  • multiple:这是一个布尔值。如果为 false,则仅确认/nack 一条消息;如果为 true,则确认/nack 所有序列号小于或等于该序列号的消息。

序列号可以使用 IChannel#GetNextPublishSequenceNumberAsync 在发布之前获得

var sequenceNumber = await channel.GetNextPublishSequenceNumberAsync();
await channel.BasicPublishAsync(exchange, queue, properties, body);

将消息与序列号关联的有效方法是使用链表。假设我们要发布字符串,因为它们很容易转换为字节数组以进行发布。

发布代码现在使用链表跟踪出站消息。当确认到达时,我们需要清理此列表,并在消息被 nack 时执行类似记录警告的操作

dotnet/PublisherConfirms/PublisherConfirms.cs
loading...

之前的示例包含一个回调,该回调在确认、nack 或退回到达时清理链表。请注意,此回调处理单个和多个确认。nack 或退回消息的回调发出警告。然后,它重用先前的回调来清理未完成确认的链表。

总而言之,异步处理发布者确认通常需要以下步骤

  • 提供一种将发布序列号与消息关联起来的方法。
  • 在通道上注册确认监听器,以便在发布者 ack/nack 到达时收到通知,以执行适当的操作,例如记录日志或重新发布 nack 的消息。序列号到消息的关联机制也可能需要在此步骤中进行一些清理。
  • 在发布消息之前跟踪发布序列号。

重新发布 nack 的消息?

从相应的回调中重新发布 nack 的消息可能很诱人,但这应该避免,因为确认回调是在 I/O 线程中调度的,通道不应在 I/O 线程中执行操作。更好的解决方案是将消息排队到内存队列中,该队列由发布线程轮询。像 ConcurrentQueue 这样的类将是传输确认回调和发布线程之间消息的良好选择。

总结

在某些应用程序中,确保已发布的消息到达 Broker 可能至关重要。发布者确认是 RabbitMQ 的一项功能,有助于满足此要求。发布者确认本质上是异步的,但也可以同步处理它们。没有明确的方法来实现发布者确认,这通常取决于应用程序和整个系统中的约束。典型的技术是

  • 单独发布消息,通过 await 等待确认:简单。
  • 批量发布消息,等待批次的确认:简单,吞吐量合理,但出现问题时难以推理。
  • 异步处理:最佳性能和资源利用率,在发生错误时具有良好的控制,但正确实施可能很复杂。

整合所有内容

PublisherConfirms.cs 类包含我们介绍的技术的代码。我们可以编译它,按原样执行它,看看它们各自的性能如何

dotnet run
# => 11/6/2024 10:36:22 AM [INFO] publishing 50,000 messages and handling confirms per-message
# => 11/6/2024 10:36:28 AM [INFO] published 50,000 messages individually in 5,699 ms
# => 11/6/2024 10:36:28 AM [INFO] publishing 50,000 messages and handling confirms in batches
# => 11/6/2024 10:36:29 AM [INFO] published 50,000 messages in batch in 1,085 ms
# => 11/6/2024 10:36:29 AM [INFO] publishing 50,000 messages and handling confirms asynchronously
# => 11/6/2024 10:36:29 AM [WARNING] message sequence number 50000 has been basic.return-ed
# => 11/6/2024 10:36:29 AM [WARNING] message sequence number 50000 has been basic.return-ed
# => 11/6/2024 10:36:29 AM [WARNING] message sequence number 50000 has been basic.return-ed
# => ...
# => ...
# => ...
# => 11/6/2024 10:36:30 AM [WARNING] message sequence number 50000 has been basic.return-ed
# => 11/6/2024 10:36:30 AM [INFO] published 50,000 messages and handled confirm asynchronously 878 ms

如果客户端和服务器位于同一台机器上,您计算机上的输出应该看起来相似。

发布者确认非常依赖于网络,因此我们最好尝试使用远程节点,这更符合实际情况,因为客户端和服务器在生产环境中通常不在同一台机器上。PublisherConfirms.cs 可以很容易地更改为使用非本地节点

private static Task<IConnection> CreateConnection()
{
var factory = new ConnectionFactory
{
HostName = "remote-host",
UserName = "remote-host",
Password = "remote-password"
};

return factory.CreateConnectionAsync();
}

请记住,批量发布实现起来很简单,但在发布者负面确认的情况下,不容易知道哪些消息无法到达 Broker。异步处理发布者确认实现起来更复杂,但在已发布的消息被 nack 时,可以提供更好的粒度和对要执行的操作的更好控制。

© . All rights reserved.