跳至主要内容

RabbitMQ 教程 - 使用发布者确认实现可靠发布

发布者确认

信息

先决条件

本教程假设 RabbitMQ 已安装并在 localhost 上的标准端口 (5672) 上运行。如果您使用不同的主机、端口或凭据,则连接设置需要进行调整。

获取帮助

如果您在学习本教程时遇到问题,可以通过GitHub 讨论RabbitMQ 社区 Discord联系我们。

发布者确认是 RabbitMQ 用于实现可靠发布的扩展。当在通道上启用发布者确认时,客户端发布的消息会由代理异步确认,这意味着服务器端已处理这些消息。

(使用 .NET 客户端)

概述

在本教程中,我们将使用发布者确认来确保已发布的消息安全到达代理。我们将介绍几种使用发布者确认的策略,并解释它们的优缺点。

在通道上启用发布者确认

发布者确认是 AMQP 0.9.1 协议的 RabbitMQ 扩展,因此默认情况下未启用。发布者确认是在通道级别使用 ConfirmSelect 方法启用的。

var channel = connection.CreateModel();
channel.ConfirmSelect();

此方法必须在您期望使用发布者确认的每个通道上调用。确认应该只启用一次,而不是为发布的每条消息启用。

策略 #1:单独发布消息

让我们从使用确认发布的最简单方法开始,即发布一条消息并同步等待其确认。

while (ThereAreMessagesToPublish())
{
byte[] body = ...;
IBasicProperties properties = ...;
channel.BasicPublish(exchange, queue, properties, body);
// uses a 5 second timeout
channel.WaitForConfirmsOrDie(TimeSpan.FromSeconds(5));
}

在前面的示例中,我们照常发布消息,并使用 Channel#WaitForConfirmsOrDie(TimeSpan) 方法等待其确认。该方法在消息已确认后立即返回。如果在超时时间内未确认消息或消息被拒绝(表示代理由于某种原因无法处理消息),则该方法将抛出异常。异常处理通常包括记录错误消息和/或重试发送消息。

不同的客户端库具有不同的同步处理发布者确认的方式,因此请务必仔细阅读您正在使用的客户端的文档。

此技术非常简单,但也存在一个主要缺点:它会显着降低发布速度,因为消息的确认会阻塞所有后续消息的发布。此方法的吞吐量不会超过每秒几百条已发布的消息。尽管如此,对于某些应用程序来说,这可能已经足够了。

发布者确认是否异步?

我们在开头提到代理异步确认已发布的消息,但在第一个示例中,代码会同步等待直到消息得到确认。客户端实际上异步接收确认,并相应地取消阻塞对 WaitForConfirmsOrDie 的调用。将 WaitForConfirmsOrDie 视为一个同步助手,它依赖于底层的异步通知。

策略 #2:批量发布消息

为了改进我们之前的示例,我们可以发布一批消息并等待整个批次得到确认。以下示例使用 100 条消息的批次。

var batchSize = 100;
var outstandingMessageCount = 0;
while (ThereAreMessagesToPublish())
{
byte[] body = ...;
IBasicProperties properties = ...;
channel.BasicPublish(exchange, queue, properties, body);
outstandingMessageCount++;
if (outstandingMessageCount == batchSize)
{
channel.WaitForConfirmsOrDie(TimeSpan.FromSeconds(5));
outstandingMessageCount = 0;
}
}
if (outstandingMessageCount > 0)
{
channel.WaitForConfirmsOrDie(TimeSpan.FromSeconds(5));
}

等待一批消息得到确认与等待单个消息的确认相比,可以大幅提高吞吐量(使用远程 RabbitMQ 节点时可提高 20-30 倍)。一个缺点是我们不知道在发生故障时究竟出了什么问题,因此我们可能需要将整个批次保留在内存中才能记录有意义的信息或重新发布消息。而且此解决方案仍然是同步的,因此会阻塞消息的发布。

策略 #3:异步处理发布者确认

代理异步确认已发布的消息,只需在客户端上注册回调即可收到这些确认。

var channel = connection.CreateModel();
channel.ConfirmSelect();
channel.BasicAcks += (sender, ea) =>
{
// code when message is confirmed
};
channel.BasicNacks += (sender, ea) =>
{
//code when message is nack-ed
};

有 2 个回调:一个用于确认的消息,一个用于拒绝的消息(代理认为已丢失的消息)。这两个回调都有一个对应的 EventArgs 参数 (ea),其中包含

  • 传递标记:识别已确认或拒绝消息的序列号。我们很快就会看到如何将其与已发布的消息相关联。
  • 多个:这是一个布尔值。如果为 false,则仅确认/拒绝一条消息,如果为 true,则确认/拒绝所有序列号小于或等于该序列号的消息。

可以在发布前使用 Channel#NextPublishSeqNo 获取序列号。

var sequenceNumber = channel.NextPublishSeqNo;
channel.BasicPublish(exchange, queue, properties, body);

将消息与序列号相关联的一种简单方法是使用字典。假设我们想发布字符串,因为它们很容易转换为字节数组以进行发布。以下是一个使用字典将发布序列号与消息的字符串主体相关联的代码示例。

var outstandingConfirms = new ConcurrentDictionary<ulong, string>();
// ... code for confirm callbacks will come later
var body = "...";
outstandingConfirms.TryAdd(channel.NextPublishSeqNo, body);
channel.BasicPublish(exchange, queue, properties, Encoding.UTF8.GetBytes(body));

发布代码现在使用字典跟踪出站消息。我们需要在确认到达时清理此字典,并在消息被拒绝时执行一些操作(例如记录警告)。

var outstandingConfirms = new ConcurrentDictionary<ulong, string>();

void CleanOutstandingConfirms(ulong sequenceNumber, bool multiple)
{
if (multiple)
{
var confirmed = outstandingConfirms.Where(k => k.Key <= sequenceNumber);
foreach (var entry in confirmed)
{
outstandingConfirms.TryRemove(entry.Key, out _);
}
}
else
{
outstandingConfirms.TryRemove(sequenceNumber, out _);
}
}

channel.BasicAcks += (sender, ea) => CleanOutstandingConfirms(ea.DeliveryTag, ea.Multiple);
channel.BasicNacks += (sender, ea) =>
{
outstandingConfirms.TryGetValue(ea.DeliveryTag, out string body);
Console.WriteLine($"Message with body {body} has been nack-ed. Sequence number: {ea.DeliveryTag}, multiple: {ea.Multiple}");
CleanOutstandingConfirms(ea.DeliveryTag, ea.Multiple);
};

// ... publishing code

前面的示例包含一个回调,在确认到达时清理字典。请注意,此回调处理单个确认和多个确认。此回调用于确认到达时 (Channel#BasicAcks)。拒绝消息的回调检索消息主体并发出警告。然后,它重用先前的回调来清理未完成确认的字典(无论消息是已确认还是已拒绝,都必须删除字典中相应的条目)。

如何跟踪未完成的确认?

我们的示例使用 ConcurrentDictionary 来跟踪未完成的确认。此数据结构由于几个原因而方便。它允许轻松地将序列号与消息相关联(无论消息数据是什么),并轻松地清理直到给定序列 ID 的条目(以处理多个确认/拒绝)。最后,它支持并发访问,因为确认回调是在客户端库拥有的线程中调用的,该线程应与发布线程保持不同。

除了使用复杂的字典实现之外,还有其他方法可以跟踪未完成的确认,例如使用简单的并发哈希表和一个变量来跟踪发布序列的下界,但它们通常更复杂,不属于教程范围。

总而言之,异步处理发布者确认通常需要以下步骤。

  • 提供一种将发布序列号与消息相关联的方法。
  • 在通道上注册确认侦听器,以便在发布确认/拒绝到达时收到通知,以执行相应的操作,例如记录或重新发布被拒绝的消息。序列号到消息的相关机制也可能在此步骤中需要一些清理。
  • 在发布消息之前跟踪发布序列号。

重新发布被拒绝的消息?

从相应的回调中重新发布被拒绝的消息可能很诱人,但应避免这样做,因为确认回调是在 I/O 线程中分派的,在该线程中,通道不应该执行操作。更好的解决方案是将消息排队到内存队列中,该队列由发布线程轮询。像 ConcurrentQueue 这样的类将是用于在确认回调和发布线程之间传输消息的良好候选者。

总结

在某些应用程序中,确保已发布的消息已到达代理至关重要。发布者确认是 RabbitMQ 的一项功能,有助于满足此要求。发布者确认本质上是异步的,但也可以同步处理它们。没有实现发布者确认的明确方法,这通常取决于应用程序和整个系统的约束。典型技术是

  • 单独发布消息,同步等待确认:简单,但吞吐量非常有限。
  • 批量发布消息,同步等待批次的确认:简单,合理的吞吐量,但在出现问题时难以推理。
  • 异步处理:最佳性能和资源利用率,在发生错误时可以很好地控制,但实现起来可能比较复杂。

将所有内容整合在一起

PublisherConfirms.cs 类包含我们介绍的技术的代码。我们可以编译它,按原样执行它,并查看它们各自的性能。

dotnet run

输出将如下所示。

Published 50,000 messages individually in 5,549 ms
Published 50,000 messages in batch in 2,331 ms
Published 50,000 messages and handled confirms asynchronously in 4,054 ms

如果客户端和服务器位于同一台机器上,则您计算机上的输出应该看起来类似。如预期的那样,单独发布消息的性能很差,但与批量发布相比,异步处理的结果有点令人失望。

发布确认高度依赖网络,因此最好尝试使用远程节点,这更符合实际情况,因为在生产环境中,客户端和服务器通常不在同一台机器上。PublisherConfirms.cs 可以轻松修改为使用非本地节点。

private static IConnection CreateConnection()
{
var factory = new ConnectionFactory { HostName = "remote-host", UserName = "remote-host", Password = "remote-password" };
return factory.CreateConnection();
}

重新编译类,再次执行它,然后等待结果。

Published 50,000 messages individually in 231,541 ms
Published 50,000 messages in batch in 7,232 ms
Published 50,000 messages and handled confirms asynchronously in 6,332 ms

我们看到现在单独发布的性能很差。但是,在客户端和服务器之间的网络环境下,批量发布和异步处理的性能现在相似,异步处理发布确认略有优势。

请记住,批量发布实现简单,但在出现负面发布确认的情况下,难以知道哪些消息未能到达代理。异步处理发布确认的实现更复杂,但提供了更好的粒度和对执行操作的更好控制,以便在发布的消息被拒绝时执行。

© 2024 RabbitMQ. All rights reserved.