跳至主要内容

RabbitMQ 教程 - 使用发布者确认实现可靠发布

发布者确认

信息

先决条件

本教程假设 RabbitMQ 已安装并在 localhost 上的标准端口(5672)上运行。如果您使用不同的主机、端口或凭据,则连接设置需要进行调整。

获取帮助

如果您在学习本教程时遇到问题,可以通过GitHub 讨论区RabbitMQ 社区 Discord联系我们。

发布者确认是 RabbitMQ 的一个扩展,用于实现可靠发布。当在通道上启用发布者确认时,客户端发布的消息会由代理异步确认,这意味着服务器端已处理这些消息。

(使用 Java 客户端)

概述

在本教程中,我们将使用发布者确认来确保已发布的消息安全到达代理。我们将介绍使用发布者确认的几种策略,并解释其优缺点。

在通道上启用发布者确认

发布者确认是 AMQP 0.9.1 协议的 RabbitMQ 扩展,因此默认情况下未启用。发布者确认是在通道级别使用 confirmSelect 方法启用的。

Channel channel = connection.createChannel();
channel.confirmSelect();

此方法必须在您期望使用发布者确认的每个通道上调用。确认应该只启用一次,而不是为发布的每条消息都启用。

策略 #1:逐条发布消息

让我们从使用确认发布的最简单方法开始,即发布一条消息并同步等待其确认。

while (thereAreMessagesToPublish()) {
byte[] body = ...;
BasicProperties properties = ...;
channel.basicPublish(exchange, queue, properties, body);
// uses a 5 second timeout
channel.waitForConfirmsOrDie(5_000);
}

在前面的示例中,我们照常发布消息,并使用 Channel#waitForConfirmsOrDie(long) 方法等待其确认。此方法在消息已确认后立即返回。如果在超时时间内未确认消息或消息被拒绝(这意味着代理由于某种原因无法处理消息),则该方法将抛出异常。异常处理通常包括记录错误消息和/或重试发送消息。

不同的客户端库具有不同的同步处理发布者确认的方法,因此请务必仔细阅读您正在使用的客户端的文档。

此技术非常简单,但也存在一个主要缺点:它会显著降低发布速度,因为消息的确认会阻塞所有后续消息的发布。这种方法的吞吐量不会超过每秒几百条已发布的消息。但是,对于某些应用程序来说,这可能已经足够了。

发布者确认是异步的吗?

我们在开头提到,代理异步确认已发布的消息,但在第一个示例中,代码同步等待直到消息得到确认。客户端实际上异步接收确认,并相应地解除对 waitForConfirmsOrDie 调用的阻塞。可以将 waitForConfirmsOrDie 视为一个同步助手,它依赖于底层的异步通知。

策略 #2:批量发布消息

为了改进之前的示例,我们可以发布一批消息并等待这整批消息得到确认。以下示例使用一批 100 条消息。

int batchSize = 100;
int outstandingMessageCount = 0;
while (thereAreMessagesToPublish()) {
byte[] body = ...;
BasicProperties properties = ...;
channel.basicPublish(exchange, queue, properties, body);
outstandingMessageCount++;
if (outstandingMessageCount == batchSize) {
channel.waitForConfirmsOrDie(5_000);
outstandingMessageCount = 0;
}
}
if (outstandingMessageCount > 0) {
channel.waitForConfirmsOrDie(5_000);
}

等待一批消息得到确认与等待单个消息的确认相比,可以大幅提高吞吐量(使用远程 RabbitMQ 节点时可提高 20-30 倍)。一个缺点是我们不知道在发生故障时究竟出了什么问题,因此我们可能需要将整批消息保存在内存中以记录有意义的信息或重新发布消息。而且此解决方案仍然是同步的,因此它会阻塞消息的发布。

策略 #3:异步处理发布者确认

代理异步确认已发布的消息,只需在客户端上注册一个回调即可收到这些确认。

Channel channel = connection.createChannel();
channel.confirmSelect();
channel.addConfirmListener((sequenceNumber, multiple) -> {
// code when message is confirmed
}, (sequenceNumber, multiple) -> {
// code when message is nack-ed
});

有 2 个回调:一个用于已确认的消息,另一个用于被拒绝的消息(代理认为已丢失的消息)。每个回调都有 2 个参数。

  • 序列号:一个标识已确认或被拒绝消息的数字。我们很快就会看到如何将其与已发布的消息相关联。
  • multiple:这是一个布尔值。如果为 false,则仅确认/拒绝一条消息;如果为 true,则确认/拒绝所有序列号小于或等于它的消息。

序列号可以在发布之前使用 Channel#getNextPublishSeqNo() 获取。

int sequenceNumber = channel.getNextPublishSeqNo());
ch.basicPublish(exchange, queue, properties, body);

关联消息和序列号的一种简单方法是使用映射。假设我们想要发布字符串,因为它们很容易转换为发布的字节数组。以下代码示例使用映射来关联发布序列号与消息的字符串主体。

ConcurrentNavigableMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();
// ... code for confirm callbacks will come later
String body = "...";
outstandingConfirms.put(channel.getNextPublishSeqNo(), body);
channel.basicPublish(exchange, queue, properties, body.getBytes());

发布代码现在使用映射跟踪出站消息。我们需要注意在确认到达时清理此映射,并在消息被拒绝时执行一些操作(例如记录警告)。

ConcurrentNavigableMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();
ConfirmCallback cleanOutstandingConfirms = (sequenceNumber, multiple) -> {
if (multiple) {
ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(
sequenceNumber, true
);
confirmed.clear();
} else {
outstandingConfirms.remove(sequenceNumber);
}
};

channel.addConfirmListener(cleanOutstandingConfirms, (sequenceNumber, multiple) -> {
String body = outstandingConfirms.get(sequenceNumber);
System.err.format(
"Message with body %s has been nack-ed. Sequence number: %d, multiple: %b%n",
body, sequenceNumber, multiple
);
cleanOutstandingConfirms.handle(sequenceNumber, multiple);
});
// ... publishing code

前面的示例包含一个在确认到达时清理映射的回调。请注意,此回调处理单个确认和多个确认。此回调用于在确认到达时(作为 Channel#addConfirmListener 的第一个参数)。被拒绝消息的回调检索消息主体并发出警告。然后,它重用以前的回调来清理未完成确认的映射(无论消息是已确认还是被拒绝,都必须删除映射中与其对应的条目)。

如何跟踪未完成的确认?

我们的示例使用 ConcurrentNavigableMap 来跟踪未完成的确认。此数据结构方便出于多种原因。它允许轻松地将序列号与消息相关联(无论消息数据是什么),并轻松地清理直到给定序列 ID 的条目(以处理多个确认/拒绝)。最后,它支持并发访问,因为确认回调在客户端库拥有的线程中调用,该线程应与发布线程保持不同。

除了使用复杂映射实现之外,还有其他方法可以跟踪未完成的确认,例如使用简单的并发哈希映射和一个变量来跟踪发布序列的下限,但它们通常更复杂,不适合教程。

总而言之,异步处理发布者确认通常需要以下步骤。

  • 提供一种将发布序列号与消息相关联的方法。
  • 在通道上注册一个确认侦听器,以便在发布确认/拒绝到达时收到通知,以执行相应的操作,例如记录被拒绝消息或重新发布被拒绝消息。序列号到消息的关联机制在此步骤中也可能需要一些清理。
  • 在发布消息之前跟踪发布序列号。

重新发布被拒绝的消息?

从相应的回调中重新发布被拒绝的消息可能很诱人,但这应该避免,因为确认回调是在 I/O 线程中分派的,在该线程中,通道不应该执行操作。更好的解决方案是将消息排队到内存队列中,该队列由发布线程轮询。像 ConcurrentLinkedQueue 这样的类将是传输确认回调和发布线程之间消息的良好选择。

总结

在某些应用程序中,确保已发布的消息已到达代理至关重要。发布者确认是 RabbitMQ 的一项功能,有助于满足此要求。发布者确认本质上是异步的,但也可以同步处理它们。实现发布者确认没有明确的方法,这通常取决于应用程序和整个系统的约束条件。典型技术包括

  • 逐条发布消息,同步等待确认:简单,但吞吐量非常有限。
  • 批量发布消息,同步等待批次的确认:简单,吞吐量合理,但在出现问题时难以推理。
  • 异步处理:最佳性能和资源利用率,在发生错误时能够很好地控制,但实现起来可能比较复杂。

综合示例

包含我们所讲技术代码的PublisherConfirms.java 类。我们可以编译它,按原样执行它,并查看它们的性能。

javac -cp $CP PublisherConfirms.java
java -cp $CP PublisherConfirms

输出将如下所示

Published 50,000 messages individually in 5,549 ms
Published 50,000 messages in batch in 2,331 ms
Published 50,000 messages and handled confirms asynchronously in 4,054 ms

如果客户端和服务器位于同一台机器上,则您计算机上的输出应该看起来类似。正如预期的那样,单独发布消息的性能很差,但与批量发布相比,异步处理的结果有点令人失望。

发布者确认非常依赖于网络,因此我们最好尝试使用远程节点,这在生产环境中更为现实,因为客户端和服务器通常不在同一台机器上。PublisherConfirms.java 可以轻松更改为使用非本地节点

static Connection createConnection() throws Exception {
ConnectionFactory cf = new ConnectionFactory();
cf.setHost("remote-host");
cf.setUsername("remote-user");
cf.setPassword("remote-password");
return cf.newConnection();
}

重新编译类,再次执行它,并等待结果

Published 50,000 messages individually in 231,541 ms
Published 50,000 messages in batch in 7,232 ms
Published 50,000 messages and handled confirms asynchronously in 6,332 ms

我们看到单独发布现在性能很差。但是,在客户端和服务器之间的网络环境下,批量发布和异步处理现在表现相似,异步处理发布者确认略有优势。

请记住,批量发布易于实现,但在出现负面发布者确认的情况下,它并不容易知道哪些消息无法发送到代理。异步处理发布者确认的实现更复杂,但提供了更好的粒度和更好地控制在发布的消息被拒绝时执行的操作。

© 2024 RabbitMQ. All rights reserved.