RabbitMQ 教程 - 使用发布者确认的可靠发布
发布者确认
前提条件
本教程假设您已安装并运行 RabbitMQ,并在 标准端口 (5672) 上使用 localhost
。如果您使用不同的主机、端口或凭据,则连接设置可能需要调整。
如何获取帮助
如果您在学习本教程时遇到问题,可以通过 GitHub Discussions 或 RabbitMQ 社区 Discord 联系我们。
发布者确认是 RabbitMQ 的扩展,用于实现可靠发布。当在通道上启用发布者确认后,客户端发布的消息将由代理异步确认,这意味着它们已在服务器端得到处理。
(使用 Java 客户端)
概述
在本教程中,我们将使用发布者确认来确保发布的消息已安全到达代理。我们将介绍使用发布者确认的几种策略,并解释它们的优缺点。
在通道上启用发布者确认
发布者确认是 AMQP 0.9.1 协议的 RabbitMQ 扩展,因此默认情况下未启用。发布者确认在通道级别通过 confirmSelect
方法启用
Channel channel = connection.createChannel();
channel.confirmSelect();
必须在您希望使用发布者确认的每个通道上调用此方法。确认应该只启用一次,而不是为每个发布的消息启用。
策略 #1:单独发布消息
让我们从最简单的使用确认发布的方法开始,即发布一条消息并同步等待其确认
while (thereAreMessagesToPublish()) {
byte[] body = ...;
BasicProperties properties = ...;
channel.basicPublish(exchange, queue, properties, body);
// uses a 5 second timeout
channel.waitForConfirmsOrDie(5_000);
}
在前面的示例中,我们像往常一样发布消息,并使用 Channel#waitForConfirmsOrDie(long)
方法等待其确认。一旦消息被确认,该方法就会返回。如果消息在超时时间内未被确认,或者被 nack-ed(意味着代理由于某种原因无法处理它),该方法将抛出异常。异常处理通常包括记录错误消息和/或重试发送消息。
不同的客户端库有不同的同步处理发布者确认的方法,因此请务必仔细阅读您正在使用的客户端的文档。
这种技术非常直接,但也存在一个主要缺点:它显著降低了发布速度,因为一条消息的确认会阻止所有后续消息的发布。这种方法每秒的吞吐量不会超过几百条已发布的消息。然而,这对于某些应用程序来说可能已经足够了。
发布者确认是异步的吗?
我们在开始时提到,代理异步确认已发布的消息,但在第一个示例中,代码同步等待直到消息被确认。客户端实际上是异步接收确认,并相应地解除对
waitForConfirmsOrDie
的调用阻塞。可以将waitForConfirmsOrDie
视为一个同步助手,它在底层依赖于异步通知。
策略 #2:批量发布消息
为了改进我们之前的示例,我们可以批量发布消息,并等待整个批次被确认。以下示例使用批量为 100 的消息
int batchSize = 100;
int outstandingMessageCount = 0;
while (thereAreMessagesToPublish()) {
byte[] body = ...;
BasicProperties properties = ...;
channel.basicPublish(exchange, queue, properties, body);
outstandingMessageCount++;
if (outstandingMessageCount == batchSize) {
channel.waitForConfirmsOrDie(5_000);
outstandingMessageCount = 0;
}
}
if (outstandingMessageCount > 0) {
channel.waitForConfirmsOrDie(5_000);
}
等待批量消息被确认比等待单个消息的确认大大提高了吞吐量(对于远程 RabbitMQ 节点,吞吐量提高了 20-30 倍)。一个缺点是,如果发生故障,我们不确切知道哪里出了问题,因此我们可能需要将整个批次保存在内存中以记录一些有意义的内容或重新发布消息。而且这种解决方案仍然是同步的,因此它会阻止消息的发布。
策略 #3:异步处理发布者确认
代理异步确认已发布的消息,只需在客户端注册一个回调,以便在收到这些确认时得到通知
Channel channel = connection.createChannel();
channel.confirmSelect();
channel.addConfirmListener((sequenceNumber, multiple) -> {
// code when message is confirmed
}, (sequenceNumber, multiple) -> {
// code when message is nack-ed
});
有两个回调:一个用于已确认的消息,另一个用于 nack-ed 消息(可以被代理视为丢失的消息)。每个回调都有 2 个参数
- 序列号:一个标识已确认或 nack-ed 消息的数字。我们稍后将看到如何将其与已发布的消息关联起来。
- multiple:这是一个布尔值。如果为 false,则仅确认/nack-ed 一条消息;如果为 true,则确认/nack-ed 所有序列号小于或等于当前序列号的消息。
序列号可以在发布之前通过 Channel#getNextPublishSeqNo()
获取
int sequenceNumber = channel.getNextPublishSeqNo());
ch.basicPublish(exchange, queue, properties, body);
将消息与序列号关联的简单方法是使用 map。假设我们要发布字符串,因为它们很容易转换为字节数组以进行发布。这是一个代码示例,它使用 map 将发布序列号与消息的字符串主体关联起来
ConcurrentNavigableMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();
// ... code for confirm callbacks will come later
String body = "...";
outstandingConfirms.put(channel.getNextPublishSeqNo(), body);
channel.basicPublish(exchange, queue, properties, body.getBytes());
发布代码现在使用 map 跟踪出站消息。当确认到达时,我们需要清理这个 map,并在消息被 nack-ed 时执行类似记录警告的操作
ConcurrentNavigableMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();
ConfirmCallback cleanOutstandingConfirms = (sequenceNumber, multiple) -> {
if (multiple) {
ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(
sequenceNumber, true
);
confirmed.clear();
} else {
outstandingConfirms.remove(sequenceNumber);
}
};
channel.addConfirmListener(cleanOutstandingConfirms, (sequenceNumber, multiple) -> {
String body = outstandingConfirms.get(sequenceNumber);
System.err.format(
"Message with body %s has been nack-ed. Sequence number: %d, multiple: %b%n",
body, sequenceNumber, multiple
);
cleanOutstandingConfirms.handle(sequenceNumber, multiple);
});
// ... publishing code
之前的示例包含一个在确认到达时清理 map 的回调。请注意,此回调处理单个和多个确认。当确认到达时(作为 Channel#addConfirmListener
的第一个参数),将使用此回调。nack-ed 消息的回调检索消息主体并发出警告。然后,它重用之前的回调来清理 map 中未完成的确认(无论消息是已确认还是 nack-ed,都必须删除它们在 map 中对应的条目。)
如何跟踪未完成的确认?
我们的示例使用
ConcurrentNavigableMap
来跟踪未完成的确认。这种数据结构很方便,原因有几个。它允许轻松地将序列号与消息(无论消息数据是什么)关联起来,并轻松地清理到给定序列 ID 的条目(以处理多个确认/nack)。最后,它支持并发访问,因为确认回调是在客户端库拥有的线程中调用的,该线程应与发布线程保持不同。还有其他方法可以跟踪未完成的确认,而不是使用复杂的 map 实现,例如使用简单的并发哈希 map 和一个变量来跟踪发布序列的下限,但它们通常更复杂,不属于教程的范围。
总而言之,异步处理发布者确认通常需要以下步骤
- 提供一种将发布序列号与消息关联的方法。
- 在通道上注册一个确认侦听器,以便在发布者 acks/nacks 到达时收到通知,以执行适当的操作,例如记录或重新发布 nack-ed 消息。序列号到消息的关联机制也可能需要在这一步进行一些清理。
- 在发布消息之前跟踪发布序列号。
重新发布 nack-ed 消息?
从相应的回调中重新发布 nack-ed 消息可能很诱人,但这应该避免,因为确认回调是在 I/O 线程中调度的,而通道不应该在 I/O 线程中执行操作。更好的解决方案是将消息排队到内存队列中,该队列由发布线程轮询。像
ConcurrentLinkedQueue
这样的类将是传输确认回调和发布线程之间消息的一个很好的选择。
总结
在某些应用程序中,确保发布的消息到达代理可能至关重要。发布者确认是 RabbitMQ 的一项功能,有助于满足此要求。发布者确认本质上是异步的,但也可以同步处理它们。没有实现发布者确认的明确方法,这通常取决于应用程序和整个系统中的约束。典型的技术包括
- 单独发布消息,同步等待确认:简单,但吞吐量非常有限。
- 批量发布消息,同步等待一个批次的确认:简单,吞吐量合理,但当出现问题时很难推理。
- 异步处理:最佳性能和资源利用率,在发生错误时具有良好的控制,但正确实现可能很复杂。
整合在一起
PublisherConfirms.java
类包含我们介绍的技术的代码。我们可以编译它,按原样执行它,看看它们的性能如何
javac -cp $CP PublisherConfirms.java
java -cp $CP PublisherConfirms
输出将如下所示
Published 50,000 messages individually in 5,549 ms
Published 50,000 messages in batch in 2,331 ms
Published 50,000 messages and handled confirms asynchronously in 4,054 ms
如果客户端和服务器位于同一台机器上,则您计算机上的输出应看起来相似。正如预期的那样,单独发布消息的性能很差,但异步处理的结果与批量发布相比有点令人失望。
发布者确认非常依赖于网络,因此我们最好尝试使用远程节点,这更现实,因为在生产环境中,客户端和服务器通常不在同一台机器上。PublisherConfirms.java
可以很容易地更改为使用非本地节点
static Connection createConnection() throws Exception {
ConnectionFactory cf = new ConnectionFactory();
cf.setHost("remote-host");
cf.setUsername("remote-user");
cf.setPassword("remote-password");
return cf.newConnection();
}
重新编译该类,再次执行它,并等待结果
Published 50,000 messages individually in 231,541 ms
Published 50,000 messages in batch in 7,232 ms
Published 50,000 messages and handled confirms asynchronously in 6,332 ms
我们看到单独发布现在表现非常糟糕。但是,在客户端和服务器之间有网络的情况下,批量发布和异步处理现在的性能相似,异步处理发布者确认略有优势。
请记住,批量发布实现起来很简单,但当发布者收到否定确认时,不容易知道哪些消息未能到达代理。异步处理发布者确认实现起来更复杂,但可以提供更好的粒度,并在发布的消息被 nack-ed 时更好地控制要执行的操作。