RabbitMQ 教程 - 使用发布者确认实现可靠发布
发布者确认
先决条件
本教程假设 RabbitMQ 已安装并在 localhost
上的标准端口 (5672) 上运行。如果您使用不同的主机、端口或凭据,则连接设置需要进行调整。
获取帮助
如果您在学习本教程时遇到问题,可以通过GitHub 讨论区或RabbitMQ 社区 Discord联系我们。
发布者确认是 RabbitMQ 用于实现可靠发布的扩展。当在通道上启用发布者确认时,客户端发布的消息会由代理异步确认,这意味着消息已在服务器端得到处理。
(使用php-amqplib)
概述
在本教程中,我们将使用发布者确认来确保已发布的消息安全到达代理。我们将介绍使用发布者确认的几种策略,并解释其优缺点。
在通道上启用发布者确认
发布者确认是 AMQP 0.9.1 协议的 RabbitMQ 扩展,因此默认情况下未启用。发布者确认在通道级别使用 confirm_select
方法启用。
$channel = $connection->channel();
$channel->confirm_select();
此方法必须在您期望使用发布者确认的每个通道上调用。确认应该只启用一次,而不是为发布的每条消息启用。
策略 #1:单独发布消息
让我们从使用确认发布的最简单方法开始,即发布一条消息并同步等待其确认。
while (thereAreMessagesToPublish()) {
$data = "Hello World!";
$msg = new AMQPMessage($data);
$channel->basic_publish($msg, 'exchange');
// uses a 5 second timeout
$channel->wait_for_pending_acks(5.000);
}
在前面的示例中,我们像往常一样发布消息,并使用 $channel::wait_for_pending_acks(int|float)
方法等待其确认。该方法在消息得到确认后立即返回。如果消息在超时时间内未得到确认,或者如果它被 nack-ed(表示代理由于某种原因无法处理它),该方法将抛出异常。异常处理通常包括记录错误消息和/或重试发送消息。
不同的客户端库有不同的方法来同步处理发布者确认,因此请务必仔细阅读您正在使用的客户端的文档。
此技术非常简单,但也存在一个主要缺点:它显着降低了发布速度,因为消息的确认会阻塞所有后续消息的发布。这种方法的吞吐量不会超过每秒几百条已发布消息。尽管如此,对于某些应用程序来说,这可能已经足够了。
发布者确认是异步的吗?
我们在开头提到代理异步确认已发布的消息,但在第一个示例中,代码同步等待直到消息得到确认。客户端实际上异步接收确认,并相应地取消阻塞对
wait_for_pending_acks
的调用。可以将wait_for_pending_acks
视为一个同步辅助函数,它依赖于底层的异步通知。
策略 #2:批量发布消息
为了改进之前的示例,我们可以发布一批消息并等待这整批消息得到确认。以下示例使用一批 100 条消息。
$batch_size = 100;
$outstanding_message_count = 0;
while (thereAreMessagesToPublish()) {
$data = ...;
$msg = new AMQPMessage($data);
$channel->basic_publish($msg, 'exchange');
$outstanding_message_count++;
if ($outstanding_message_count === $batch_size) {
$channel->wait_for_pending_acks(5.000);
$outstanding_message_count = 0;
}
}
if ($outstanding_message_count > 0) {
$channel->wait_for_pending_acks(5.000);
}
等待一批消息得到确认与等待单个消息的确认相比,可以大大提高吞吐量(使用远程 RabbitMQ 节点时可提高 20-30 倍)。一个缺点是我们不知道在发生故障时究竟出了什么问题,因此我们可能需要将整批消息保存在内存中以记录有意义的信息或重新发布消息。并且此解决方案仍然是同步的,因此它会阻塞消息的发布。
策略 #3:异步处理发布者确认
代理异步确认已发布的消息,只需在客户端注册一个回调即可收到这些确认。
$channel = $connection->channel();
$channel->confirm_select();
$channel->set_ack_handler(
function (AMQPMessage $message){
// code when message is confirmed
}
);
$channel->set_nack_handler(
function (AMQPMessage $message){
// code when message is nack-ed
}
);
有两个回调:一个用于已确认的消息,一个用于 nack-ed 消息(代理认为已丢失的消息)。每个回调都有 AMQPMessage $message
参数,其中包含返回的消息,因此您无需处理序列号(传递标签)即可了解此回调属于哪条消息。
总结
在某些应用程序中,确保已发布的消息已到达代理至关重要。发布者确认是 RabbitMQ 的一项功能,有助于满足此要求。发布者确认本质上是异步的,但也可以同步处理它们。实现发布者确认没有明确的方法,这通常取决于应用程序和整个系统的约束条件。典型技术包括
- 单独发布消息,同步等待确认:简单,但吞吐量非常有限。
- 批量发布消息,同步等待批次的确认:简单,吞吐量合理,但在出现问题时难以判断原因。
- 异步处理:最佳性能和资源利用率,在发生错误时可以很好地控制,但实现起来可能比较复杂。