跳至主内容

RabbitMQ 教程 - 使用发布者确认实现可靠发布

发布者确认

信息

先决条件

本教程假设 RabbitMQ 已 安装 并在 localhost 上的 标准端口 (5672) 上运行。如果您使用不同的主机、端口或凭据,则需要调整连接设置。

哪里寻求帮助

如果您在学习本教程时遇到困难,可以通过 GitHub DiscussionsRabbitMQ 社区 Discord 联系我们。

发布者确认 是 RabbitMQ 的一项扩展,用于实现可靠发布。当通道上启用了发布者确认时,客户端发布的这些消息将由代理异步确认,这意味着这些消息在服务器端已被处理。

(使用 Java 客户端)

概述

在本教程中,我们将使用发布者确认来确保已发布的消息已安全到达代理。我们将介绍几种使用发布者确认的策略,并解释它们的优缺点。

在通道上启用发布者确认

发布者确认是 AMQP 0.9.1 协议的 RabbitMQ 扩展,因此默认情况下未启用。发布者确认在通道级别使用 confirmSelect 方法启用。

Channel channel = connection.createChannel();
channel.confirmSelect();

必须在您希望使用发布者确认的每个通道上调用此方法。确认应该只启用一次,而不是为发布的每条消息启用。

策略 #1:单独发布消息

让我们从发布确认的最简单方法开始,即发布一条消息并同步等待其确认。

while (thereAreMessagesToPublish()) {
byte[] body = ...;
BasicProperties properties = ...;
channel.basicPublish(exchange, queue, properties, body);
// uses a 5 second timeout
channel.waitForConfirmsOrDie(5_000);
}

在前面的示例中,我们像往常一样发布一条消息,并使用 Channel#waitForConfirmsOrDie(long) 方法等待其确认。一旦消息被确认,该方法就会返回。如果在超时时间内消息未被确认,或者它被 nack(意味着代理因某种原因无法处理它),该方法将抛出异常。异常处理通常包括记录错误消息和/或重试发送消息。

不同的客户端库处理发布者确认同步的方式不同,因此请务必仔细阅读您正在使用的客户端文档。

这种技术非常直接,但也有一个主要缺点:它会显著减慢发布速度,因为一条消息的确认会阻塞所有后续消息的发布。这种方法无法实现每秒数百条已发布消息的吞吐量。尽管如此,对于某些应用程序来说,这已经足够了。

发布者确认是异步的吗?

我们在开始时提到,代理会异步确认已发布的消息,但在第一个示例中,代码会同步等待消息被确认。客户端实际上是异步接收确认,并相应地解除对 waitForConfirmsOrDie 的调用阻塞。可以将 waitForConfirmsOrDie 视为一种同步辅助工具,它依赖于底层的异步通知。

策略 #2:批量发布消息

为了改进我们之前的示例,我们可以发布一批消息,并等待整个批次被确认。以下示例使用了 100 条消息的批次。

int batchSize = 100;
int outstandingMessageCount = 0;
while (thereAreMessagesToPublish()) {
byte[] body = ...;
BasicProperties properties = ...;
channel.basicPublish(exchange, queue, properties, body);
outstandingMessageCount++;
if (outstandingMessageCount == batchSize) {
channel.waitForConfirmsOrDie(5_000);
outstandingMessageCount = 0;
}
}
if (outstandingMessageCount > 0) {
channel.waitForConfirmsOrDie(5_000);
}

等待一批消息被确认,与等待每条单独消息的确认相比,吞吐量得到了极大的提高(对于远程 RabbitMQ 节点,最多可提高 20-30 倍)。一个缺点是,在出现故障时,我们不确切知道出了什么问题,因此我们可能需要将整个批次保存在内存中以记录有意义的内容或重新发布消息。而且这个解决方案仍然是同步的,所以它会阻塞消息的发布。

策略 #3:异步处理发布者确认

代理会异步确认已发布的消息,只需在客户端注册一个回调函数即可获得这些确认的通知。

Channel channel = connection.createChannel();
channel.confirmSelect();
channel.addConfirmListener((sequenceNumber, multiple) -> {
// code when message is confirmed
}, (sequenceNumber, multiple) -> {
// code when message is nack-ed
});

有两种回调:一种用于已确认的消息,另一种用于 nack 的消息(代理可以认为这些消息已丢失)。每个回调都有两个参数。

  • 序列号:一个标识已确认或 nack 消息的数字。我们稍后将看到如何将其与已发布的消息关联起来。
  • multiple:这是一个布尔值。如果为 false,则只确认/nack 一条消息;如果为 true,则确认/nack 所有序列号小于或等于该序列号的消息。

在发布之前,可以使用 Channel#getNextPublishSeqNo() 获取序列号。

int sequenceNumber = channel.getNextPublishSeqNo());
ch.basicPublish(exchange, queue, properties, body);

一种将消息与序列号关联的简单方法是使用映射。假设我们想发布字符串,因为它们很容易转换为字节数组进行发布。下面是一个使用映射将发布序列号与消息字符串正文关联起来的代码示例。

ConcurrentNavigableMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();
// ... code for confirm callbacks will come later
String body = "...";
outstandingConfirms.put(channel.getNextPublishSeqNo(), body);
channel.basicPublish(exchange, queue, properties, body.getBytes());

发布代码现在使用映射来跟踪出站消息。我们需要在收到确认时清除此映射,并在收到 nack 时执行诸如记录警告之类的操作。

ConcurrentNavigableMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();
ConfirmCallback cleanOutstandingConfirms = (sequenceNumber, multiple) -> {
if (multiple) {
ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(
sequenceNumber, true
);
confirmed.clear();
} else {
outstandingConfirms.remove(sequenceNumber);
}
};

channel.addConfirmListener(cleanOutstandingConfirms, (sequenceNumber, multiple) -> {
String body = outstandingConfirms.get(sequenceNumber);
System.err.format(
"Message with body %s has been nack-ed. Sequence number: %d, multiple: %b%n",
body, sequenceNumber, multiple
);
cleanOutstandingConfirms.handle(sequenceNumber, multiple);
});
// ... publishing code

上一个示例包含一个在收到确认时清除映射的回调。请注意,此回调同时处理单个和多个确认。收到确认时(作为 Channel#addConfirmListener 的第一个参数)会使用此回调。nack 消息的回调会检索消息正文并发出警告。然后,它会重用前面的回调来清除待确认消息的映射(无论消息是已确认还是 nack,其在映射中的对应条目都必须被删除)。

如何跟踪待确认的消息?

我们的示例使用 ConcurrentNavigableMap 来跟踪待确认的消息。这种数据结构很方便,原因如下。它允许轻松地将序列号与消息(无论消息数据是什么)关联起来,并轻松清除到给定序列 ID 的条目(用于处理多个确认/nack)。最后,它支持并发访问,因为确认回调是在客户端库拥有的线程中调用的,而这个线程应该与发布线程不同。

除了使用复杂的映射实现来跟踪待确认的消息之外,还有其他方法,例如使用简单的并发哈希映射和一个变量来跟踪发布序列的下限,但这些方法通常更复杂,不属于本教程的范畴。

总而言之,异步处理发布者确认通常需要以下步骤:

  • 提供一种将发布序列号与消息关联的方法。
  • 在通道上注册确认监听器,以便在收到发布者 ack/nack 时得到通知,并执行适当的操作,例如记录或重新发布 nack 的消息。序列号到消息的关联机制在此步骤中可能也需要进行一些清理。
  • 在发布消息之前跟踪发布序列号。

重新发布 nack 的消息?

从相应的回调中重新发布 nack 的消息可能会很诱人,但这应该避免,因为确认回调是在 I/O 线程中分派的,而在这些线程中不应该执行通道操作。更好的解决方案是将消息排队到内存队列中,然后由发布线程轮询。像 ConcurrentLinkedQueue 这样的类将是连接确认回调和发布线程的好选择。

总结

确保已发布的消息到达代理对于某些应用程序至关重要。发布者确认是 RabbitMQ 的一项功能,有助于满足这一要求。发布者确认本质上是异步的,但也可以同步处理。没有一种绝对的实现发布者确认的方法,这通常取决于应用程序和整个系统的约束。典型的技术包括:

  • 单独发布消息,同步等待确认:简单,但吞吐量非常有限。
  • 批量发布消息,同步等待一批消息的确认:简单,吞吐量尚可,但在出现问题时难以排查。
  • 异步处理:最佳性能和资源利用率,错误情况下控制良好,但正确实现可能比较复杂。

整合

PublisherConfirms.java 类包含了我们介绍的各种技术的代码。我们可以编译它,按原样执行,看看它们各自的表现。

javac -cp $CP PublisherConfirms.java
java -cp $CP PublisherConfirms

输出将如下所示:

Published 50,000 messages individually in 5,549 ms
Published 50,000 messages in batch in 2,331 ms
Published 50,000 messages and handled confirms asynchronously in 4,054 ms

如果客户端和服务器在同一台机器上,您计算机上的输出应该类似。单独发布消息的性能如预期那样很差,但与批量发布相比,异步处理的结果有点令人失望。

发布者确认非常依赖网络,因此最好尝试使用远程节点,这更符合实际情况,因为在生产环境中,客户端和服务器通常不在同一台机器上。可以轻松更改 PublisherConfirms.java 以使用非本地节点。

static Connection createConnection() throws Exception {
ConnectionFactory cf = new ConnectionFactory();
cf.setHost("remote-host");
cf.setUsername("remote-user");
cf.setPassword("remote-password");
return cf.newConnection();
}

重新编译该类,再次执行,然后等待结果。

Published 50,000 messages individually in 231,541 ms
Published 50,000 messages in batch in 7,232 ms
Published 50,000 messages and handled confirms asynchronously in 6,332 ms

我们看到单独发布现在表现非常糟糕。但是,在客户端和服务器之间存在网络的情况下,批量发布和异步处理的表现相似,异步处理发布者确认略有优势。

请记住,批量发布易于实现,但在发布者发生否定确认时,很难知道哪些消息未能到达代理。异步处理发布者确认的实现更复杂,但在发布的消息被 nack 时,可以提供更好的粒度和更好的控制。

© . This site is unofficial and not affiliated with VMware.