发布者确认的介绍
在许多消息传递场景中,您必须不能丢失消息。由于 AMQP 对消息持久性/处理提供很少保证,因此传统的做法是使用事务,这可能慢到无法接受。为了解决此问题,我们以轻量级发布者确认的形式引入了 AMQP 的扩展。
使用 Tx 保证消息传递
在 RabbitMQ 中,持久消息是指在代理重启后应该能够存活的消息。这里需要注意的是应该,因为如果代理在有机会将消息写入磁盘之前就宕机了,消息仍然可能丢失。在某些情况下,这还不够,发布者需要知道消息是否被正确处理。简单的解决方案是使用事务,即提交每条消息。
发布者将使用类似以下代码:
ch.txSelect();
for (int i = 0; i < MSG_COUNT; ++i) {
ch.basicPublish("", QUEUE_NAME,
MessageProperties.PERSISTENT_BASIC,
"nop".getBytes());
ch.txCommit();
}
而消费者将执行类似以下操作:
QueueingConsumer qc = new QueueingConsumer(ch);
ch.basicConsume(QUEUE_NAME, true, qc);
for (int i = 0; i < MSG_COUNT; ++i) {
qc.nextDelivery();
System.out.printf("Consumed %d\n", i);
}
包含一些计时代码的完整程序可在此处获得。发布 10000 条消息需要 4 分多钟。
流式轻量级发布者确认
在这种情况下,使用事务有两个问题。第一个问题是它们是阻塞的:发布者必须等待代理处理每条消息。知道除最后一条消息之外的所有消息都已成功处理,通常来说,保证过于强烈;如果发布者知道在代理死亡时哪些消息尚未处理就足够了。第二个问题是事务不必要地繁重:每次提交都需要执行 fsync(),这需要很长时间才能完成。
引入确认:一旦通道进入确认模式,代理就会在处理消息时确认消息。由于这是异步完成的,因此生产者可以流式发布而无需等待代理,并且代理可以有效地批量写入磁盘。
以下是上面的示例,但使用确认:
private volatile SortedSet<Long> unconfirmedSet =
Collections.synchronizedSortedSet(new TreeSet());
...
ch.setConfirmListener(new ConfirmListener() {
public void handleAck(long seqNo, boolean multiple) {
if (multiple) {
unconfirmedSet.headSet(seqNo+1).clear();
} else {
unconfirmedSet.remove(seqNo);
}
}
public void handleNack(long seqNo, boolean multiple) {
// handle the lost messages somehow
}
});
ch.confirmSelect();
for (long i = 0; i < MSG_COUNT; ++i) {
unconfirmedSet.add(ch.getNextPublishSeqNo());
ch.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_BASIC,
"nop".getBytes());
}
while (unconfirmedSet.size() > 0)
Thread.sleep(10);
完整的代码可在此处获得。在继续之前,值得一提的是,运行此代码大约需要 2 秒。它比事务代码快 100 多倍。
代码做了什么?它首先声明一个集合,该集合将保存迄今为止未确认的消息的 ID。然后,它将通道设置为确认模式并将 AckListener 附加到通道。当发布消息时,它会将消息添加到集合中;同时,AckListener 在收到确认时会从集合中删除消息。最后,生产者等待所有消息都得到确认。该集合始终保存需要在发生故障时重新传输的消息。
确认的工作原理
确认通过添加 confirm 类来扩展标准 AMQP。此类仅包含两种方法,confirm.select 和 confirm.select-ok。此外,basic.ack 方法可以发送到客户端。
confirm.select 方法在通道上启用发布者确认。请注意,事务通道无法置于确认模式,而确认模式通道无法设置为事务模式。
当发送/接收 confirm.select 方法时,发布者/代理开始对发布进行编号(confirm.select 之后的第一个发布为 1)。一旦通道处于确认模式,发布者应该期望收到 basic.ack 方法。delivery-tag 字段指示已确认消息的编号。
当代理确认消息时,它会承担该消息的责任,并通知发布者该消息已成功处理;“成功处理”的含义取决于上下文。
基本规则如下:
- 不可路由的强制或立即消息在 basic.return 之后立即确认;
- 否则,瞬态消息在入队时立即确认;并且,
- 持久消息在持久化到磁盘或在每个队列上使用时确认。
请注意,要确认持久消息,必须将其写入磁盘或在所有传递到的队列上进行确认。关于确认,传递到非持久队列的持久消息的行为类似于瞬态消息。队列删除、队列清除和basic.reject{requeue=false} 模拟消费者确认。关于每个队列的 ttl,消息过期模拟消费者确认。
如果满足上述多个条件,则只有第一个条件会导致发送确认。每个发布的消息迟早都会得到确认,并且不会对任何消息确认多次。由于 basic.return 在 basic.ack 之前发送,因此一旦发布者收到 basic.ack,它就知道永远不会再收到该消息。
代理始终可以在 basic.ack 中设置multiple 位。设置了 multiple 的 basic.ack 意味着所有消息(包括 delivery-tag 在内)都已确认。
关于确认有一些需要注意的地方。首先,代理不保证何时确认消息,只保证会确认消息。其次,未确认的消息堆积会降低消息处理速度:代理对每个确认模式发布执行多次 O(log(number-of-unconfirmed-messages)) 操作。第三,如果发布者和代理之间的连接在存在未完成确认的情况下断开,并不一定意味着消息丢失了,因此重新发布可能会导致重复消息。最后,如果代理内部发生某些错误导致消息丢失,它将 basic.nack 这些消息(因此,ConfirmHandler 中的 handleNack())。
总之,确认使客户端能够以轻量级的方式跟踪哪些消息已由代理处理,以及在代理关闭或网络故障时哪些消息需要重新发布。