RabbitMQ 教程 - 使用发布者确认实现可靠发布
发布者确认
先决条件
本教程假设 RabbitMQ 已 安装 并在 localhost 上的 标准端口 (5672) 上运行。如果您使用不同的主机、端口或凭据,则需要调整连接设置。
哪里寻求帮助
如果您在学习本教程时遇到困难,可以通过 GitHub Discussions 或 RabbitMQ 社区 Discord 联系我们。
发布者确认 是 RabbitMQ 的一项扩展,用于实现可靠发布。当通道上启用了发布者确认时,客户端发布的这些消息将由代理异步确认,这意味着这些消息在服务器端已被处理。
(使用 php-amqplib)
概述
在本教程中,我们将使用发布者确认来确保已发布的消息已安全到达代理。我们将介绍几种使用发布者确认的策略,并解释它们的优缺点。
在通道上启用发布者确认
发布者确认是 RabbitMQ 对 AMQP 0.9.1 协议的一项扩展,因此默认情况下未启用。发布者确认在通道级别使用 confirm_select 方法启用。
$channel = $connection->channel();
$channel->confirm_select();
此方法必须在您期望使用发布者确认的每个通道上调用。确认应仅启用一次,而不是为发布的每条消息启用。
策略 #1:单独发布消息
让我们从最简单的带确认发布的途径开始,即发布一条消息并同步等待其确认。
while (thereAreMessagesToPublish()) {
$data = "Hello World!";
$msg = new AMQPMessage($data);
$channel->basic_publish($msg, 'exchange');
// uses a 5 second timeout
$channel->wait_for_pending_acks(5.000);
}
在上一个示例中,我们像往常一样发布了一条消息,并通过 $channel::wait_for_pending_acks(int|float) 方法等待其确认。该方法在消息确认后立即返回。如果在超时时间内未确认消息,或者消息被 nack(意味着代理因某种原因未能处理它),则该方法将抛出异常。异常处理通常包括记录错误消息和/或重试发送消息。
不同的客户端库处理发布者确认同步的方式不同,因此请务必仔细阅读您所使用的客户端的文档。
此技术非常直接,但也有一个主要缺点:它会 **显著减慢发布速度**,因为一条消息的确认会阻塞所有后续消息的发布。这种方法的吞吐量不会超过每秒几百条已发布消息。尽管如此,对于某些应用程序来说,这已经足够好了。
发布者确认是异步的吗?
我们在开头提到,代理会异步确认已发布的消息,但在第一个示例中,代码会同步等待消息被确认。客户端实际上是异步接收确认,并相应地解除
wait_for_pending_acks的阻塞。可以将wait_for_pending_acks视为一个同步辅助函数,它底层依赖于异步通知。
策略 #2:批量发布消息
为了改进我们之前的示例,我们可以发布一批消息,并等待整批消息被确认。以下示例使用了一批 100 条消息。
$batch_size = 100;
$outstanding_message_count = 0;
while (thereAreMessagesToPublish()) {
$data = ...;
$msg = new AMQPMessage($data);
$channel->basic_publish($msg, 'exchange');
$outstanding_message_count++;
if ($outstanding_message_count === $batch_size) {
$channel->wait_for_pending_acks(5.000);
$outstanding_message_count = 0;
}
}
if ($outstanding_message_count > 0) {
$channel->wait_for_pending_acks(5.000);
}
等待一批消息被确认,与等待单条消息确认相比,吞吐量得到了大幅提高(与远程 RabbitMQ 节点相比,最高可提高 20-30 倍)。一个缺点是,在发生故障时,我们不确切知道出了什么问题,因此我们可能需要将整个批次保留在内存中以记录有意义的信息或重新发布这些消息。而且这个解决方案仍然是同步的,所以它会阻塞消息的发布。
策略 #3:异步处理发布者确认
代理会异步确认已发布的消息,只需在客户端注册一个回调函数即可获得这些确认的通知。
$channel = $connection->channel();
$channel->confirm_select();
$channel->set_ack_handler(
function (AMQPMessage $message){
// code when message is confirmed
}
);
$channel->set_nack_handler(
function (AMQPMessage $message){
// code when message is nack-ed
}
);
有两个回调函数:一个用于已确认的消息,一个用于 nack 的消息(代理可视为已丢失的消息)。每个回调函数都有一个 AMQPMessage $message 参数,其中包含返回的消息,因此您无需处理序列号(delivery tag)即可了解此回调属于哪个消息。
总结
确保已发布的消息到达代理对于某些应用程序至关重要。发布者确认是 RabbitMQ 的一项功能,有助于满足这一要求。发布者确认本质上是异步的,但也可以同步处理。没有一种绝对的实现发布者确认的方法,这通常取决于应用程序和整个系统的约束。典型的技术包括:
- 单独发布消息,同步等待确认:简单,但吞吐量非常有限。
- 批量发布消息,同步等待一批消息的确认:简单,吞吐量合理,但出现问题时难以处理。
- 异步处理:最佳性能和资源利用率,错误情况下控制良好,但正确实现可能比较复杂。