跳至主内容

RabbitMQ 教程 - 使用发布者确认实现可靠发布

发布者确认

信息

先决条件

本教程假设 RabbitMQ 已 安装 并在 localhost 上的 标准端口 (5672) 上运行。如果您使用不同的主机、端口或凭据,则需要调整连接设置。

哪里寻求帮助

如果您在学习本教程时遇到困难,可以通过 GitHub DiscussionsRabbitMQ 社区 Discord 联系我们。

发布者确认 是 RabbitMQ 的一项扩展,用于实现可靠发布。当通道上启用了发布者确认时,客户端发布的这些消息将由代理异步确认,这意味着这些消息在服务器端已被处理。

(使用 php-amqplib)

概述

在本教程中,我们将使用发布者确认来确保已发布的消息已安全到达代理。我们将介绍几种使用发布者确认的策略,并解释它们的优缺点。

在通道上启用发布者确认

发布者确认(Publisher confirms)是 RabbitMQ 对 AMQP 0.9.1 协议的一种扩展,因此默认情况下不会启用。发布者确认需要在通道(channel)级别通过 confirm_select 方法启用。

$channel = $connection->channel();
$channel->confirm_select();

此方法必须在每一个你打算使用发布确认的通道上调用。确认机制只需启用一次,而不是为每条发布的消息都启用。

策略 #1:单独发布消息

让我们从最简单的发布确认方式开始,即发布一条消息并同步等待其确认。

while (thereAreMessagesToPublish()) {
$data = "Hello World!";
$msg = new AMQPMessage($data);
$channel->basic_publish($msg, 'exchange');
// uses a 5 second timeout
$channel->wait_for_pending_acks(5.000);
}

在前面的示例中,我们像往常一样发布消息,并使用 $channel::wait_for_pending_acks(int|float) 方法等待确认。一旦消息被确认,该方法就会立即返回。如果消息在超时时间内未被确认,或者收到 nack(表示代理由于某种原因无法处理该消息),该方法将抛出异常。处理异常的方法通常是记录错误日志和/或重试发送消息。

不同的客户端库处理同步发布者确认的方式各不相同,因此请务必仔细阅读你所使用的客户端文档。

这种技术非常直观,但也有一个主要缺点:它**显著降低了发布速度**,因为确认一条消息会阻塞后续所有消息的发布。这种方法每秒无法实现超过几百条消息的吞吐量。尽管如此,这对某些应用来说已经足够了。

发布者确认是异步的吗?

我们在开头提到过,代理是异步确认已发布消息的,但在第一个示例中,代码是同步等待直到消息被确认的。客户端实际上是异步接收确认信息,并相应地解除对 wait_for_pending_acks 调用的阻塞。可以将 wait_for_pending_acks 看作是一个依赖底层异步通知的同步辅助工具。

策略 #2:批量发布消息

为了改进我们之前的示例,我们可以批量发布消息并等待整个批次被确认。下面的示例使用 100 条消息作为一个批次。

$batch_size = 100;
$outstanding_message_count = 0;
while (thereAreMessagesToPublish()) {
$data = ...;
$msg = new AMQPMessage($data);
$channel->basic_publish($msg, 'exchange');
$outstanding_message_count++;
if ($outstanding_message_count === $batch_size) {
$channel->wait_for_pending_acks(5.000);
$outstanding_message_count = 0;
}
}
if ($outstanding_message_count > 0) {
$channel->wait_for_pending_acks(5.000);
}

与等待每条消息的确认相比,等待一批消息的确认可以显著提高吞吐量(在远程 RabbitMQ 节点上可提高 20-30 倍)。一个缺点是,如果发生故障,我们无法确切知道哪里出了问题,因此可能必须将整个批次保存在内存中,以便记录有意义的日志或重新发布消息。此外,该方案仍然是同步的,因此它会阻塞消息的发布。

策略 3:异步处理发布者确认

代理会异步确认已发布的消息,只需在客户端注册一个回调函数即可获得这些确认的通知。

$channel = $connection->channel();
$channel->confirm_select();

$channel->set_ack_handler(
function (AMQPMessage $message){
// code when message is confirmed
}
);

$channel->set_nack_handler(
function (AMQPMessage $message){
// code when message is nack-ed
}
);

这里有两种回调:一种用于已确认的消息,另一种用于 nack 的消息(即代理认为已丢失的消息)。每个回调都有一个 AMQPMessage $message 参数,其中包含返回的消息,因此你无需处理序列号(投递标签/delivery tag)来确定该回调属于哪条消息。

总结

确保已发布的消息到达代理对于某些应用程序至关重要。发布者确认是 RabbitMQ 的一项功能,有助于满足这一要求。发布者确认本质上是异步的,但也可以同步处理。没有一种绝对的实现发布者确认的方法,这通常取决于应用程序和整个系统的约束。典型的技术包括:

  • 逐条发布消息,并同步等待确认:实现简单,但吞吐量非常有限。
  • 批量发布消息,并同步等待一批消息的确认:实现简单,吞吐量合理,但在出错时难以进行逻辑分析。
  • 异步处理:最佳性能和资源利用率,错误情况下控制良好,但正确实现可能比较复杂。
© . This site is unofficial and not affiliated with VMware.