跳至主内容

Publisher Confirms 介绍

·6 分钟阅读
Alexandru Scvortov

在许多消息传递场景中,您不能丢失消息。  由于 AMQP 在消息持久化/处理方面提供的保证很少,因此传统的实现方式是使用 事务,但这可能会慢得无法接受。  为了解决这个问题,我们引入了一个 AMQP 的扩展,即轻量级 Publisher Confirms。

通过事务保证消息送达

在 RabbitMQ 中,持久化消息是指“应该”能在 Broker 重启后存活的消息。  这里的关键是“应该”,因为如果 Broker 在将消息写入磁盘之前崩溃,消息仍可能丢失。  在某些情况下,这还不够,发布者需要知道消息是否被正确处理。  最直接的解决方案是使用事务,即提交每条消息。

发布者会使用类似这样的方式

ch.txSelect();
for (int i = 0; i < MSG_COUNT; ++i) {
ch.basicPublish("", QUEUE_NAME,
MessageProperties.PERSISTENT_BASIC,
"nop".getBytes());
ch.txCommit();
}

消费者会做类似这样的事情

QueueingConsumer qc = new QueueingConsumer(ch);
ch.basicConsume(QUEUE_NAME, true, qc);
for (int i = 0; i < MSG_COUNT; ++i) {
qc.nextDelivery();
System.out.printf("Consumed %d\n", i);
}

完整的程序,包括一些计时代码,可以在这里找到。  发布 10000 条消息大约需要 4 分钟多。

流式轻量级 Publisher Confirms

在这种情况下使用事务存在两个问题。  第一个问题是它们是阻塞的:发布者必须等待 Broker 处理每条消息。  通常,知道所有消息(除了最后一条)都已成功处理,这个保证过于强大;如果发布者知道 Broker 在死机时哪些消息尚未处理,那就足够了。  第二个问题是事务开销太大:每次提交都需要一次 fsync(),这需要很长时间才能完成。

这时就轮到 Confirms 了:一旦通道进入确认模式,Broker 就会在处理消息时确认它们。  由于这是异步完成的,发布者可以流式发布消息而不必等待 Broker,而 Broker 可以有效地批处理磁盘写入。

下面是上面的示例,但使用了 Confirms

private volatile SortedSet<Long> unconfirmedSet =
Collections.synchronizedSortedSet(new TreeSet());

...

ch.setConfirmListener(new ConfirmListener() {
public void handleAck(long seqNo, boolean multiple) {
if (multiple) {
unconfirmedSet.headSet(seqNo+1).clear();
} else {
unconfirmedSet.remove(seqNo);
}
}
public void handleNack(long seqNo, boolean multiple) {
// handle the lost messages somehow
}
});
ch.confirmSelect();
for (long i = 0; i < MSG_COUNT; ++i) {
unconfirmedSet.add(ch.getNextPublishSeqNo());
ch.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_BASIC,
"nop".getBytes());
}
while (unconfirmedSet.size() > 0)
Thread.sleep(10);

完整的代码可以在这里找到。  在继续之前,值得一提的是,运行这个代码大约需要 2 秒钟。  它的速度比事务代码快一百多倍。

代码做了什么?  它首先声明一个集合,用于存储迄今为止未确认消息的 ID。  然后,它将通道设置为确认模式,并向通道附加一个 AckListener。  在发布消息时,它将消息添加到集合中;同时,AckListener 在收到确认时从集合中移除消息。  最后,发布者等待所有消息被确认。  该集合始终包含在发生故障时需要重新传输的消息。

Confirms 如何工作

Confirms 通过添加 confirm 类来扩展标准的 AMQP。  此类仅包含两个方法:confirm.selectconfirm.select-ok。  此外,还可以向客户端发送 basic.ack 方法。

confirm.select 方法在通道上启用 Publisher Confirms。  请注意,事务性通道不能设置为确认模式,而确认模式通道也不能设置为事务性。

当发送/接收 confirm.select 方法时,发布者/Broker 开始为发布的消息编号(confirm.select 之后的第一次发布是 1)。  一旦通道处于确认模式,发布者就应该期望收到 basic.ack 方法。  delivery-tag 字段指示已确认消息的编号。

当 Broker 确认一条消息时,它就承担了处理该消息的责任,并通知发布者该消息已成功处理;“成功处理”的含义取决于具体场景。

基本规则如下:

  • 无法路由的 mandatory 或 immediate 消息在 basic.return 之后立即被确认;
  • 否则,瞬态消息在入队时被确认;
  • 持久化消息在持久化到磁盘时,或在被所有队列消费后被确认。

请注意,对于持久化消息,必须将其写入磁盘或被投递到的所有队列确认后才能被确认。  关于 Confirms,投递到非持久化队列的持久化消息的行为与瞬态消息类似。  队列删除、队列清空和basic.reject{requeue=false}模拟消费者确认。  关于每个队列的 TTL,消息过期模拟消费者确认。

如果满足多个条件,只有第一个条件会导致发送确认。  每一条已发布的消息最终都会被确认,并且不会有消息被确认多次。  由于 basic.returnbasic.ack 之前发送,一旦发布者收到 basic.ack,它就知道再也不会收到关于该消息的信息了。

Broker 可以在 basic.ack 中设置 multiple 位。  带有 multiple 位的 basic.ack 意味着所有截至 delivery-tag(包括该标签)的消息都已被确认。

关于 Confirms 有一些需要注意的地方。  首先,Broker 不保证消息何时会被确认,只保证它会被确认。  其次,随着未确认消息的堆积,消息处理速度会减慢:Broker 会为每条确认模式的发布执行几次 O(log(未确认消息数量)) 的操作。  第三,如果发布者和 Broker 之间的连接断开,并且有未完成的确认,这并不一定意味着消息丢失了,所以重新发布可能会导致重复消息。  最后,如果 Broker 内部发生意外情况导致消息丢失,它会 basic.nack 这些消息(因此,ConfirmHandler 中的 handleNack())。

总而言之,Confirms 为客户端提供了一种轻量级的方式来跟踪哪些消息已被 Broker 处理,以及在 Broker 关机或网络故障时哪些消息需要重新发布。

© . This site is unofficial and not affiliated with VMware.