RabbitMQ 教程 - 使用发布者确认的可靠发布
发布者确认
先决条件
本教程假定 RabbitMQ 已安装并在 localhost
上的标准端口 (5672) 上运行。如果您使用不同的主机、端口或凭据,则连接设置将需要调整。
在哪里获得帮助
如果您在学习本教程时遇到问题,您可以通过 GitHub Discussions 或 RabbitMQ 社区 Discord 联系我们。
发布者确认是 RabbitMQ 的扩展,用于实现可靠发布。当通道上启用发布者确认时,客户端发布的消息将由 Broker 异步确认,这意味着服务器端已处理这些消息。
(使用 php-amqplib)
概述
在本教程中,我们将使用发布者确认来确保已发布的消息已安全到达 Broker。我们将介绍使用发布者确认的几种策略,并解释它们的优缺点。
在通道上启用发布者确认
发布者确认是 AMQP 0.9.1 协议的 RabbitMQ 扩展,因此默认情况下未启用。发布者确认在通道级别使用 confirm_select
方法启用
$channel = $connection->channel();
$channel->confirm_select();
必须在您希望使用发布者确认的每个通道上调用此方法。确认应该只启用一次,而不是为每个发布的消息启用。
策略 #1:单独发布消息
让我们从使用确认发布的最简单方法开始,即发布消息并同步等待其确认
while (thereAreMessagesToPublish()) {
$data = "Hello World!";
$msg = new AMQPMessage($data);
$channel->basic_publish($msg, 'exchange');
// uses a 5 second timeout
$channel->wait_for_pending_acks(5.000);
}
在前面的示例中,我们像往常一样发布消息,并使用 $channel::wait_for_pending_acks(int|float)
方法等待其确认。该方法在消息被确认后立即返回。如果消息未在超时时间内确认,或者如果消息被 nack-ed(意味着 Broker 由于某种原因无法处理它),则该方法将抛出异常。异常的处理通常包括记录错误消息和/或重试发送消息。
不同的客户端库具有不同的同步处理发布者确认的方式,因此请务必仔细阅读您正在使用的客户端的文档。
此技术非常简单,但也存在一个主要缺点:它显着降低了发布速度,因为消息的确认会阻止所有后续消息的发布。这种方法无法提供每秒超过几百条已发布消息的吞吐量。尽管如此,这对于某些应用程序来说可能已经足够了。
发布者确认是异步的吗?
我们在一开始提到 Broker 异步确认已发布的消息,但在第一个示例中,代码同步等待直到消息被确认。客户端实际上异步接收确认,并相应地取消阻止对
wait_for_pending_acks
的调用。可以将wait_for_pending_acks
视为一个同步助手,它依赖于底层的异步通知。
策略 #2:批量发布消息
为了改进我们之前的示例,我们可以发布一批消息,并等待整个批次得到确认。以下示例使用 100 个批次
$batch_size = 100;
$outstanding_message_count = 0;
while (thereAreMessagesToPublish()) {
$data = ...;
$msg = new AMQPMessage($data);
$channel->basic_publish($msg, 'exchange');
$outstanding_message_count++;
if ($outstanding_message_count === $batch_size) {
$channel->wait_for_pending_acks(5.000);
$outstanding_message_count = 0;
}
}
if ($outstanding_message_count > 0) {
$channel->wait_for_pending_acks(5.000);
}
等待一批消息得到确认,与等待单个消息的确认相比,吞吐量显着提高(对于远程 RabbitMQ 节点,最多可提高 20-30 倍)。一个缺点是,如果发生故障,我们不确切知道哪里出了问题,因此我们可能需要将整个批次保存在内存中以记录一些有意义的内容或重新发布消息。并且此解决方案仍然是同步的,因此它会阻止消息的发布。
策略 #3:异步处理发布者确认
Broker 异步确认已发布的消息,只需在客户端上注册一个回调以接收这些确认的通知
$channel = $connection->channel();
$channel->confirm_select();
$channel->set_ack_handler(
function (AMQPMessage $message){
// code when message is confirmed
}
);
$channel->set_nack_handler(
function (AMQPMessage $message){
// code when message is nack-ed
}
);
有两个回调:一个用于已确认的消息,另一个用于 nack-ed 消息(Broker 可以认为丢失的消息)。每个回调都有 AMQPMessage $message
参数以及返回的消息,因此您无需处理序列号(传递标签)来了解此回调属于哪个消息。
总结
确保已发布的消息到达 Broker 在某些应用程序中可能至关重要。发布者确认是 RabbitMQ 的一项功能,可帮助满足此要求。发布者确认本质上是异步的,但也可以同步处理它们。实现发布者确认没有明确的方法,这通常取决于应用程序和整个系统中的约束。典型的技术是
- 单独发布消息,同步等待确认:简单,但吞吐量非常有限。
- 批量发布消息,同步等待批次的确认:简单,吞吐量合理,但出现问题时很难推理。
- 异步处理:最佳性能和资源利用率,在发生错误时具有良好的控制,但正确实施可能很复杂。