跳至主要内容
版本:4.0

消费者确认和发布者确认

概述

本指南涵盖了与数据安全相关的两个功能,即消费者确认和发布者确认。

等等。在使用消息传递的应用程序中,消费者端和发布者端的确认对于数据安全都至关重要。

更多相关主题在发布者消费者指南中介绍。

基础知识

使用像 RabbitMQ 这样的消息代理的系统本质上是分布式的。由于发送的协议方法(消息)无法保证到达对等方或被其成功处理,因此发布者和消费者都需要一种机制来确认传递和处理。RabbitMQ 支持的几种消息传递协议提供了此类功能。本指南涵盖了 AMQP 0-9-1 中的功能,但在其他支持的协议中,其理念基本相同。

来自消费者到 RabbitMQ 的传递处理确认在消息传递协议中称为确认;代理对发布者的确认是称为发布者确认的协议扩展。这两个功能建立在相同的想法之上,并受 TCP 启发。

它们对于从发布者到 RabbitMQ 节点以及从 RabbitMQ 节点到消费者的可靠传递至关重要。换句话说,它们对于数据安全至关重要,应用程序和 RabbitMQ 节点都对此负责。

发布者确认与消费者传递确认有关吗?

发布者确认消费者传递确认是非常相似的功能,它们在不同的上下文中解决了类似的问题。

  1. 顾名思义,消费者确认涵盖了 RabbitMQ 与消费者的通信。
  2. 发布者确认涵盖了发布者与 RabbitMQ 的通信。

但是,这两个功能完全正交且相互独立。

发布者确认不了解消费者:它们仅涵盖发布者与其连接到的节点以及队列(或)领导者副本的交互。

消费者确认不了解发布者:其目标是向 RabbitMQ 节点确认已成功接收和处理了给定的传递,以便可以将已传递的消息标记为将来删除。

有时,发布和消费应用程序需要通过需要对等方明确确认的请求和响应进行通信。RabbitMQ 教程 #6 演示了如何执行此操作的基本知识,而直接回复提供了一种无需声明许多短暂的临时回复队列即可执行此操作的方法。

但是,本指南不涵盖此类通信,仅提及它以将其与本指南中描述的更集中的消息传递协议功能进行对比。

(消费者)传递确认

当 RabbitMQ 将消息传递给消费者时,它需要知道何时将消息视为已成功发送。哪种逻辑是最佳的取决于系统。因此,这主要是一个应用程序决策。在 AMQP 0-9-1 中,当使用 basic.consume 方法注册消费者或使用 basic.get 方法按需获取消息时做出此决定。

如果您更喜欢面向示例和分步的材料,则RabbitMQ 教程 #2中也介绍了消费者确认。

传递标识符:传递标记

在继续讨论其他主题之前,重要的是要解释如何识别传递(以及确认指示其各自的传递)。当注册消费者(订阅)时,RabbitMQ 将使用 basic.deliver 方法传递(推送)消息。该方法带有 delivery tag,它在通道上唯一标识传递。因此,传递标记的作用域是每个通道。

传递标记是单调递增的正整数,并且客户端库将其表示为正整数。确认传递的客户端库方法将传递标记作为参数。

因为传递标记的作用域是每个通道,所以必须在接收传递的同一通道上确认传递。在不同的通道上确认会导致“未知传递标记”协议异常并关闭通道。

消费者确认模式和数据安全注意事项

当节点将消息传递给消费者时,它必须确定是否应将消息视为消费者已处理(或至少已接收)。由于多个事物(客户端连接、消费者应用程序等)可能会失败,因此此决定是数据安全问题。消息传递协议通常提供一种确认机制,允许消费者向其连接到的节点确认传递。是否使用该机制是在消费者订阅时决定的。

根据使用的确认模式,RabbitMQ 可以认为消息是在发送后立即成功传递(写入 TCP 套接字)或在收到显式(“手动”)客户端确认后成功传递。手动发送的确认可以是肯定的或否定的,并使用以下协议方法之一

  • basic.ack 用于肯定确认
  • basic.nack 用于否定确认(注意:这是RabbitMQ 对 AMQP 0-9-1 的扩展
  • basic.reject 用于否定确认,但与 basic.nack 相比有一个限制

下面将讨论如何在客户端库 API 中公开这些方法。

肯定确认只是指示 RabbitMQ 将消息记录为已传递,并且可以将其丢弃。使用 basic.reject 的否定确认具有相同的效果。主要区别在于语义:肯定确认假设消息已成功处理,而其否定的对应物表明传递未处理,但仍应删除。

在自动确认模式下,消息在发送后立即被视为已成功传递。此模式以更高的吞吐量(只要消费者能够跟上)为代价,降低了传递和消费者处理的安全级别。此模式通常称为“即发即弃”。与手动确认模型不同,如果消费者的 TCP 连接或通道在成功传递之前关闭,则服务器发送的消息将丢失。因此,自动消息确认应被视为不安全,不适用于所有工作负载。

在使用自动确认模式时,另一件需要考虑的事情是消费者过载。手动确认模式通常与有界通道预取一起使用,后者限制了通道上未完成(“正在进行”)传递的数量。但是,在自动确认的情况下,根据定义没有这样的限制。因此,消费者可能会被传递速率压垮,可能在内存中累积积压,耗尽堆内存或被操作系统终止进程。一些客户端库将应用 TCP 反压(停止从套接字读取,直到未处理传递的积压量下降到某个特定限制以下)。因此,自动确认模式仅推荐用于能够高效且稳定地处理传递的消费者。

肯定确认传递

用于传递确认的 API 方法通常在客户端库中作为通道上的操作公开。Java 客户端用户将使用 Channel#basicAckChannel#basicNack 分别执行 basic.ackbasic.nack。以下是一个演示肯定确认的 Java 客户端示例

// this example assumes an existing channel instance

boolean autoAck = false;
channel.basicConsume(queueName, autoAck, "a-consumer-tag",
new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException
{
long deliveryTag = envelope.getDeliveryTag();
// positively acknowledge a single delivery, the message will
// be discarded
channel.basicAck(deliveryTag, false);
}
});

在 .NET 客户端中,这些方法分别为 IModel#BasicAckIModel#BasicNack。以下是一个使用该客户端演示肯定确认的示例

// this example assumes an existing channel (IModel) instance

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (ch, ea) =>
{
var body = ea.Body.ToArray();
// positively acknowledge a single delivery, the message will
// be discarded
channel.BasicAck(ea.DeliveryTag, false);
};
String consumerTag = channel.BasicConsume(queueName, false, consumer);

一次确认多个传递

可以批量处理手动确认以减少网络流量。这可以通过将确认方法(见上文)的 multiple 字段设置为 true 来实现。请注意,basic.reject 从历史上看没有此字段,这就是 RabbitMQ 引入 basic.nack 作为协议扩展的原因。

multiple 字段设置为 true 时,RabbitMQ 将确认所有未完成的传递标记,直到包含确认中指定的标记。与确认相关的所有其他内容一样,这在每个通道中都有作用域。例如,假设通道 Ch 上有 5、6、7 和 8 未确认的传递标记,当确认帧到达该通道时,delivery_tag 设置为 8multiple 设置为 true,则将确认从 5 到 8 的所有标记。如果 multiple 设置为 false,则传递 5、6 和 7 仍将未确认。

要使用 RabbitMQ Java 客户端确认多个传递,请将 true 作为 multiple 参数传递给 Channel#basicAck

// this example assumes an existing channel instance

boolean autoAck = false;
channel.basicConsume(queueName, autoAck, "a-consumer-tag",
new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException
{
long deliveryTag = envelope.getDeliveryTag();
// positively acknowledge all deliveries up to
// this delivery tag
channel.basicAck(deliveryTag, true);
}
});

.NET 客户端的思路与此非常相似

// this example assumes an existing channel (IModel) instance

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (ch, ea) =>
{
var body = ea.Body.ToArray();
// positively acknowledge all deliveries up to
// this delivery tag
channel.BasicAck(ea.DeliveryTag, true);
};
String consumerTag = channel.BasicConsume(queueName, false, consumer);

消息否定确认和重新入队

有时消费者无法立即处理消息,但其他实例可能可以处理。在这种情况下,可能需要将其重新入队,并让其他消费者接收和处理它。basic.rejectbasic.nack 是用于此目的的两个协议方法。

这些方法通常用于否定确认消息的传递。此类消息可以被代理丢弃、转入死信队列或重新入队。此行为由 requeue 字段控制。当该字段设置为 true 时,代理将使用指定的传递标签重新入队消息(或多条消息,稍后将解释)。或者,当该字段设置为 false 时,如果配置了死信交换机,则消息将路由到死信交换机,否则将被丢弃。

这两种方法通常在客户端库中作为通道上的操作公开。Java 客户端用户将使用 Channel#basicRejectChannel#basicNack 分别执行 basic.rejectbasic.nack

// this example assumes an existing channel instance

boolean autoAck = false;
channel.basicConsume(queueName, autoAck, "a-consumer-tag",
new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException
{
long deliveryTag = envelope.getDeliveryTag();
// negatively acknowledge, the message will
// be discarded
channel.basicReject(deliveryTag, false);
}
});
// this example assumes an existing channel instance

boolean autoAck = false;
channel.basicConsume(queueName, autoAck, "a-consumer-tag",
new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException
{
long deliveryTag = envelope.getDeliveryTag();
// requeue the delivery
channel.basicReject(deliveryTag, true);
}
});

在 .NET 客户端中,这些方法分别是 IModel#BasicRejectIModel#BasicNack

// this example assumes an existing channel (IModel) instance

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (ch, ea) =>
{
var body = ea.Body.ToArray();
// negatively acknowledge, the message will
// be discarded
channel.BasicReject(ea.DeliveryTag, false);
};
String consumerTag = channel.BasicConsume(queueName, false, consumer);
// this example assumes an existing channel (IModel) instance

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (ch, ea) =>
{
var body = ea.Body.ToArray();
// requeue the delivery
channel.BasicReject(ea.DeliveryTag, true);
};
String consumerTag = channel.BasicConsume(queueName, false, consumer);

当消息重新入队时,如果可能,它将被放置到队列中的原始位置。如果不可能(由于其他消费者的并发传递和确认,当多个消费者共享一个队列时),则消息将被重新入队到更靠近队列头的某个位置。

重新入队的消息可能会立即准备重新传递,具体取决于它们在队列中的位置以及具有活动消费者的通道使用的预取值。这意味着,如果所有消费者由于瞬态条件而无法处理传递而重新入队,它们将创建一个重新入队/重新传递循环。此类循环在网络带宽和 CPU 资源方面可能代价高昂。消费者实现可以跟踪重新传递次数,并永久拒绝消息(丢弃它们)或在延迟后安排重新入队。

可以使用 basic.nack 方法一次拒绝或重新入队多条消息。这就是它与 basic.reject 的区别所在。它接受一个额外的参数 multiple。以下是一个 Java 客户端示例

// this example assumes an existing channel instance

boolean autoAck = false;
channel.basicConsume(queueName, autoAck, "a-consumer-tag",
new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException
{
long deliveryTag = envelope.getDeliveryTag();
// requeue all unacknowledged deliveries up to
// this delivery tag
channel.basicNack(deliveryTag, true, true);
}
});

.NET 客户端的工作方式非常相似

// this example assumes an existing channel (IModel) instance

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (ch, ea) =>
{
var body = ea.Body.ToArray();
// requeue all unacknowledged deliveries up to
// this delivery tag
channel.BasicNack(ea.DeliveryTag, true, true);
};
String consumerTag = channel.BasicConsume(queueName, false, consumer);

通道预取设置 (QoS)

消息以异步方式传递(发送)到客户端,并且在任何给定时间,通道上可能有多个消息“正在传输”。来自客户端的手动确认本质上也是异步的,但以相反的方向流动。

这意味着未确认的传递的滑动窗口。

对于大多数消费者来说,限制此窗口的大小是有意义的,以避免消费者端出现无界缓冲区(堆)增长问题。这是通过使用 basic.qos 方法设置“预取计数”值来完成的。该值定义了通道上允许的最大未确认传递数。当数量达到配置的计数时,RabbitMQ 将停止在通道上传递更多消息,直到至少确认其中一个未完成的消息。

值为 0 表示“无限制”,允许任意数量的未确认消息。

例如,假设通道 Ch 上有四个未确认的传递,传递标签分别为 5、6、7 和 8,并且通道 Ch 的预取计数设置为 4,则 RabbitMQ 不会在 Ch 上推送任何更多传递,除非至少确认一个未完成的传递。

当确认帧到达该通道且 delivery_tag 设置为 5(或 678)时,RabbitMQ 将注意到并传递一条消息。一次确认多条消息 将使多条消息可供传递。

值得重申的是,传递和手动客户端确认的流程完全是异步的。因此,如果在传递正在传输时更改了预取值,则会产生自然竞争条件,并且通道上可能暂时存在超过预取计数的未确认消息。

每个通道、每个消费者和全局预取

可以为特定通道或特定消费者配置 QoS 设置。消费者预取 指南解释了此范围的影响。

预取和轮询消费者

QoS 预取设置对使用 basic.get(“拉取 API”)获取的消息没有影响,即使在手动确认模式下也是如此。

消费者确认模式、预取和吞吐量

确认模式和 QoS 预取值对消费者吞吐量有重大影响。通常,增加预取将提高消息传递到消费者的速度。自动确认模式产生最佳的传递速度。但是,在这两种情况下,已传递但尚未处理的消息数量也将增加,从而增加消费者 RAM 的消耗。

应谨慎使用自动确认模式或具有无限预取的手动确认模式。不确认地消费大量消息的消费者会导致其连接到的节点上的内存消耗增长。找到合适的预取值需要反复试验,并且会因工作负载而异。100 到 300 范围内的值通常提供最佳吞吐量,并且不会出现压倒消费者的重大风险。较高的值通常会遇到收益递减

预取值为 1 最为保守。它会显著降低吞吐量,尤其是在消费者连接延迟较高的环境中。对于许多应用程序,较高的值将是合适且最佳的。

消费者故障或连接丢失时:自动重新入队

使用手动确认时,任何未确认的传递(消息)在发生传递的通道(或连接)关闭时都会自动重新入队。这包括客户端的 TCP 连接丢失、消费者应用程序(进程)故障以及通道级协议异常(如下所述)。

请注意,需要一段时间才能检测到不可用的客户端

由于此行为,消费者必须准备好处理重新传递,并且以幂等性为前提进行实现。重新传递将具有一个特殊的布尔属性 redeliver,该属性由 RabbitMQ 设置为 true。对于首次传递,它将设置为 false。请注意,消费者可能会收到之前传递给其他消费者的消息。

客户端错误:重复确认和未知标签

如果客户端多次确认相同的传递标签,RabbitMQ 将导致通道错误,例如 PRECONDITION_FAILED - unknown delivery tag 100。如果使用未知的传递标签,也会引发相同的通道异常。

代理抱怨“未知传递标签”的另一种情况是在与接收传递的通道不同的通道上尝试确认(无论正向还是负向)。必须在同一通道上确认传递。

发布者确认

网络可能会以不明显的方式发生故障,并且检测某些故障需要时间。因此,已将其协议帧或一组帧(例如已发布的消息)写入其套接字的客户端不能假设该消息已到达服务器并已成功处理。它可能在传输过程中丢失,或者其传递可能会被显着延迟。

使用标准 AMQP 0-9-1,保证消息不会丢失的唯一方法是使用事务——使通道成为事务性的,然后对每条消息或一组消息发布进行提交。在这种情况下,事务不必要地繁重,并且会将吞吐量降低 250 倍。为了解决这个问题,引入了一种确认机制。它模仿了协议中已经存在的消息者确认机制。

要启用确认,客户端会发送 confirm.select 方法。根据是否设置了 no-wait,代理可能会回复 confirm.select-ok。一旦在通道上使用 confirm.select 方法,则该通道就被认为处于确认模式。事务性通道不能置于确认模式,并且一旦通道处于确认模式,就不能将其设为事务性。

一旦通道处于确认模式,代理和客户端都会对消息进行计数(在第一个 confirm.select 上从 1 开始计数)。然后,代理在处理消息时通过在同一通道上发送 basic.ack 来确认消息。delivery-tag 字段包含已确认消息的序列号。代理也可能在 basic.ack 中设置 multiple 字段以指示已处理所有直到序列号消息为止的消息。

发布的否定确认

在代理无法成功处理消息的特殊情况下,代理将发送 basic.nack 而不是 basic.ack。在此上下文中,basic.nack 的字段与 basic.ack 中的相应字段具有相同的含义,并且应忽略 requeue 字段。通过拒绝确认一条或多条消息,代理表示它无法处理这些消息,并且拒绝为其承担责任;此时,客户端可以选择重新发布这些消息。

将通道置于确认模式后,所有随后发布的消息都将被确认或拒绝确认一次。不保证消息确认的速度。任何消息都不会同时被确认和拒绝确认。

仅当负责队列的 Erlang 进程中发生内部错误时,才会传递 basic.nack

代理何时会确认已发布的消息?

对于不可路由的消息,一旦交换机验证消息无法路由到任何队列(返回空队列列表),代理将发出确认。如果消息也以强制方式发布,则在basic.ack之前将basic.return发送到客户端。否定确认(basic.nack)也是如此。

对于可路由的消息,当所有队列都已接受消息时,将发送basic.ack。对于路由到持久队列的持久消息,这意味着写入磁盘。对于仲裁队列,这意味着仲裁副本已接受并将消息确认给选出的领导者。

持久消息的确认延迟

路由到持久队列的持久消息的basic.ack将在将消息持久化到磁盘后发送。RabbitMQ消息存储会在一段时间间隔(几百毫秒)后批量将消息持久化到磁盘,以最大限度地减少fsync(2)调用的次数,或者在队列空闲时。

这意味着在持续负载下,basic.ack的延迟可能达到几百毫秒。为了提高吞吐量,强烈建议应用程序异步处理确认(作为流)或发布消息批次并等待未完成的确认。这方面的确切API在不同的客户端库之间有所不同。

发布者确认的排序注意事项

在大多数情况下,RabbitMQ会按照发布消息的顺序向发布者确认消息(这适用于在单个通道上发布的消息)。但是,发布者确认是异步发出的,可以确认单个消息或一组消息。确认发出的确切时间取决于消息的传递模式(持久或瞬态)以及消息路由到的队列的属性(请参见上文)。也就是说,不同的消息可以在不同的时间被认为已准备好进行确认。这意味着确认的到达顺序可能与它们各自的消息不同。应用程序应尽可能避免依赖确认的顺序。

发布者确认和保证交付

如果RabbitMQ节点在将消息写入磁盘之前发生故障,则可能会丢失持久消息。例如,考虑以下场景

  1. 客户端将持久消息发布到持久队列
  2. 客户端从队列中使用消息(注意消息是持久的且队列是持久的),但确认未激活,
  3. 代理节点发生故障并重新启动,以及
  4. 客户端重新连接并开始使用消息

此时,客户端可能会合理地假设消息将再次传递。但事实并非如此:重新启动导致代理丢失了消息。为了保证持久性,客户端应使用确认。如果发布者的通道处于确认模式,则发布者将不会收到已丢失消息的确认(因为消息尚未写入磁盘)。

限制

最大传递标记

传递标记是一个64位长值,因此其最大值为9223372036854775807。由于传递标记的作用域是每个通道,因此在实践中发布者或使用者不太可能超过此值。

© 2024 RabbitMQ. All rights reserved.