跳至主内容

RabbitMQ 教程 - 使用发布者确认实现可靠发布

发布者确认

信息

先决条件

本教程假设 RabbitMQ 已 安装 并在 localhost 上的 标准端口 (5672) 上运行。如果您使用不同的主机、端口或凭据,则需要调整连接设置。

哪里寻求帮助

如果您在学习本教程时遇到困难,可以通过 GitHub DiscussionsRabbitMQ 社区 Discord 联系我们。

发布者确认 是 RabbitMQ 的一项扩展,用于实现可靠发布。当通道上启用了发布者确认时,客户端发布的这些消息将由代理异步确认,这意味着这些消息在服务器端已被处理。

(使用 Java 客户端)

概述

在本教程中,我们将使用发布者确认来确保已发布的消息已安全到达代理。我们将介绍几种使用发布者确认的策略,并解释它们的优缺点。

在通道上启用发布者确认

Publisher confirms(发布确认)是 AMQP 0.9.1 协议的一种 RabbitMQ 扩展,因此默认情况下不会启用。需要通过 confirmSelect 方法在通道(channel)级别启用 Publisher confirms。

Channel channel = connection.createChannel();
channel.confirmSelect();

此方法必须在每一个你打算使用发布确认的通道上调用。确认机制只需启用一次,而不是为每条发布的消息都启用。

策略 #1:单独发布消息

让我们从最简单的发布确认方式开始,即发布一条消息并同步等待其确认。

while (thereAreMessagesToPublish()) {
byte[] body = ...;
BasicProperties properties = ...;
channel.basicPublish(exchange, queue, properties, body);
// uses a 5 second timeout
channel.waitForConfirmsOrDie(5_000);
}

在前面的示例中,我们像往常一样发布消息,并使用 Channel#waitForConfirmsOrDie(long) 方法等待其确认。该方法会在消息被确认后立即返回。如果消息在超时时间内未被确认,或者收到 nack(表示代理由于某种原因无法处理该消息),该方法将抛出异常。对异常的处理通常包括记录错误日志和/或重试发送消息。

不同的客户端库处理同步发布者确认的方式各不相同,因此请务必仔细阅读你所使用的客户端文档。

这种技术非常直观,但也有一个主要缺点:它**显著降低了发布速度**,因为确认一条消息会阻塞后续所有消息的发布。这种方法每秒无法实现超过几百条消息的吞吐量。尽管如此,这对某些应用来说已经足够了。

发布者确认是异步的吗?

我们在开头提到过,代理是异步确认发布的消息的,但在第一个示例中,代码同步等待直到消息被确认。实际上,客户端异步接收确认,并相应地解除对 waitForConfirmsOrDie 的阻塞。可以将 waitForConfirmsOrDie 看作一个同步辅助工具,其底层依赖于异步通知。

策略 #2:批量发布消息

为了改进我们之前的示例,我们可以批量发布消息并等待整个批次被确认。下面的示例使用 100 条消息作为一个批次。

int batchSize = 100;
int outstandingMessageCount = 0;
while (thereAreMessagesToPublish()) {
byte[] body = ...;
BasicProperties properties = ...;
channel.basicPublish(exchange, queue, properties, body);
outstandingMessageCount++;
if (outstandingMessageCount == batchSize) {
channel.waitForConfirmsOrDie(5_000);
outstandingMessageCount = 0;
}
}
if (outstandingMessageCount > 0) {
channel.waitForConfirmsOrDie(5_000);
}

与等待每条消息的确认相比,等待一批消息的确认可以显著提高吞吐量(在远程 RabbitMQ 节点上可提高 20-30 倍)。一个缺点是,如果发生故障,我们无法确切知道哪里出了问题,因此可能必须将整个批次保存在内存中,以便记录有意义的日志或重新发布消息。此外,该方案仍然是同步的,因此它会阻塞消息的发布。

策略 3:异步处理发布者确认

代理会异步确认已发布的消息,只需在客户端注册一个回调函数即可获得这些确认的通知。

Channel channel = connection.createChannel();
channel.confirmSelect();
channel.addConfirmListener((sequenceNumber, multiple) -> {
// code when message is confirmed
}, (sequenceNumber, multiple) -> {
// code when message is nack-ed
});

这里有两个回调:一个用于已确认的消息,另一个用于 nack 的消息(即代理认为已丢失的消息)。每个回调都有两个参数:

  • 序列号(sequence number):一个标识已确认或 nack 消息的数字。我们很快会看到如何将其与已发布的消息关联起来。
  • multiple:这是一个布尔值。如果为 false,则仅确认/nack 一条消息;如果为 true,则序列号小于或等于该数字的所有消息都被确认/nack。

可以在发布前通过 Channel#getNextPublishSeqNo() 获取序列号。

int sequenceNumber = channel.getNextPublishSeqNo());
ch.basicPublish(exchange, queue, properties, body);

关联消息与序列号的一种简单方法是使用 Map。假设我们要发布字符串,因为它们很容易转换为字节数组进行发布。以下代码示例使用 Map 将发布序列号与消息的字符串正文关联起来:

ConcurrentNavigableMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();
// ... code for confirm callbacks will come later
String body = "...";
outstandingConfirms.put(channel.getNextPublishSeqNo(), body);
channel.basicPublish(exchange, queue, properties, body.getBytes());

现在,发布代码使用 Map 跟踪传出消息。我们需要在收到确认时清理此 Map,并在消息被 nack 时执行诸如记录警告日志之类的操作。

ConcurrentNavigableMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();
ConfirmCallback cleanOutstandingConfirms = (sequenceNumber, multiple) -> {
if (multiple) {
ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(
sequenceNumber, true
);
confirmed.clear();
} else {
outstandingConfirms.remove(sequenceNumber);
}
};

channel.addConfirmListener(cleanOutstandingConfirms, (sequenceNumber, multiple) -> {
String body = outstandingConfirms.get(sequenceNumber);
System.err.format(
"Message with body %s has been nack-ed. Sequence number: %d, multiple: %b%n",
body, sequenceNumber, multiple
);
cleanOutstandingConfirms.handle(sequenceNumber, multiple);
});
// ... publishing code

前面的示例包含一个在收到确认时清理 Map 的回调。请注意,此回调处理单个和多个确认。当确认到达时(作为 Channel#addConfirmListener 的第一个参数),会调用此回调。针对 nack 消息的回调会检索消息正文并发出警告。然后,它重用之前的回调来清理未完成确认的 Map(无论消息是被确认还是被 nack,都必须从 Map 中删除其对应的条目)。

如何跟踪未完成的确认?

我们的示例使用 ConcurrentNavigableMap 来跟踪未完成的确认。这种数据结构在多个方面都很方便。它允许轻松地将序列号与消息关联(无论消息数据是什么),并轻松地清除直到给定序列 ID 的条目(以处理多个确认/nack)。最后,它支持并发访问,因为确认回调是在客户端库拥有的线程中调用的,该线程应与发布线程分开。

除了使用复杂的 Map 实现外,还有其他跟踪未完成确认的方法,例如使用简单的 ConcurrentHashMap 和一个变量来跟踪发布序列的下限,但这些方法通常更复杂,不属于本教程的讨论范围。

总之,异步处理发布者确认通常需要以下步骤:

  • 提供一种将发布序列号与消息关联的方法。
  • 在通道上注册一个确认监听器,以便在收到发布者的 ack/nack 时得到通知,从而执行适当的操作,例如记录日志或重新发布 nack 的消息。序列号与消息的关联机制在此步骤中可能也需要进行一些清理。
  • 在发布消息前追踪发布序列号。

重新发布被拒绝的消息?

直接在对应的回调中重新发布 nack 的消息很诱人,但这应该避免,因为确认回调是在 I/O 线程中分发的,而在该线程中通道不应执行此类操作。更好的解决方案是将消息放入内存队列中,由发布线程轮询处理。像 ConcurrentLinkedQueue 这样的类非常适合在确认回调和发布线程之间传输消息。

总结

确保已发布的消息到达代理对于某些应用程序至关重要。发布者确认是 RabbitMQ 的一项功能,有助于满足这一要求。发布者确认本质上是异步的,但也可以同步处理。没有一种绝对的实现发布者确认的方法,这通常取决于应用程序和整个系统的约束。典型的技术包括:

  • 单独发布消息,同步等待确认:简单,但吞吐量非常有限。
  • 批量发布消息,针对一批消息同步等待确认:简单,吞吐量合理,但一旦出错,很难进行逻辑判断。
  • 异步处理:最佳性能和资源利用率,错误情况下控制良好,但正确实现可能比较复杂。

整合

PublisherConfirms.java 类包含了我们所介绍技术的代码。我们可以编译它,按原样执行它,看看它们的表现如何。

javac -cp $CP PublisherConfirms.java
java -cp $CP PublisherConfirms

输出结果如下所示:

Published 50,000 messages individually in 5,549 ms
Published 50,000 messages in batch in 2,331 ms
Published 50,000 messages and handled confirms asynchronously in 4,054 ms

如果客户端和服务器位于同一台机器上,您计算机上的输出应该类似。正如预期的那样,单独发布消息的性能很差,但异步处理的结果与批量发布相比有些令人失望。

Publisher confirms 高度依赖网络,因此我们最好尝试使用远程节点,这更符合实际情况,因为在生产环境中,客户端和服务器通常不在同一台机器上。可以很容易地更改 PublisherConfirms.java 以使用非本地节点。

static Connection createConnection() throws Exception {
ConnectionFactory cf = new ConnectionFactory();
cf.setHost("remote-host");
cf.setUsername("remote-user");
cf.setPassword("remote-password");
return cf.newConnection();
}

重新编译该类,再次执行它,并等待结果。

Published 50,000 messages individually in 231,541 ms
Published 50,000 messages in batch in 7,232 ms
Published 50,000 messages and handled confirms asynchronously in 6,332 ms

我们看到单独发布消息的性能现在变得非常糟糕。但在客户端和服务器之间存在网络的情况下,批量发布和异步处理的性能现在表现相似,异步处理发布确认略有优势。

请记住,批量发布易于实现,但无法轻易得知在收到否定确认时到底是哪些消息未能到达代理。异步处理发布者确认的实现较为复杂,但提供了更好的粒度和对消息被拒绝时所需执行操作的更好控制。

© . This site is unofficial and not affiliated with VMware.