跳至主要内容
版本:4.0

消费者预取

概述

消费者预取是 通道预取机制 的扩展。

AMQP 0-9-1 指定了 basic.qos 方法,以便在消费时(也称为“预取计数”)限制通道(或连接)上未确认消息的数量。不幸的是,通道不是此操作的理想范围 - 因为单个通道可以从多个队列中消费,所以通道和队列需要相互协调才能确保每个发送的消息都不会超过限制。这在单台机器上很慢,在跨集群消费时非常慢。

此外,对于许多用途来说,指定适用于每个消费者的预取计数更为自然。

因此,RabbitMQ 重新定义了 basic.qos 方法中 global 标志的含义

全局AMQP 0-9-1 中 prefetch_count 的含义RabbitMQ 中 prefetch_count 的含义
false在通道上的所有消费者之间共享分别应用于通道上的每个新消费者
true在连接上的所有消费者之间共享在通道上的所有消费者之间共享

请注意,大多数 API 中 global 标志的默认值为 false

单个消费者

以下 Java 中的基本示例将一次最多接收 10 条未确认的消息

Channel channel = ...;
Consumer consumer = ...;
channel.basicQos(10); // Per consumer limit
channel.basicConsume("my-queue", false, consumer);

值为 0 被视为无限,允许任意数量的未确认消息。

Channel channel = ...;
Consumer consumer = ...;
channel.basicQos(0); // No limit for this consumer
channel.basicConsume("my-queue", false, consumer);

独立消费者

此示例在同一通道上启动两个消费者,每个消费者将独立地一次最多接收 10 条未确认的消息

Channel channel = ...;
Consumer consumer1 = ...;
Consumer consumer2 = ...;
channel.basicQos(10); // Per consumer limit
channel.basicConsume("my-queue1", false, consumer1);
channel.basicConsume("my-queue2", false, consumer2);

多个消费者共享限制

AMQP 0-9-1 规范没有解释如果多次使用不同的 global 值调用 basic.qos 会发生什么。RabbitMQ 将此解释为意味着两个预取限制应彼此独立地执行;只有在未达到未确认消息的任何限制时,消费者才会接收新消息。

例如

Channel channel = ...;
Consumer consumer1 = ...;
Consumer consumer2 = ...;
channel.basicQos(10, false); // Per consumer limit
channel.basicQos(15, true); // Per channel limit
channel.basicConsume("my-queue1", false, consumer1);
channel.basicConsume("my-queue2", false, consumer2);

这两个消费者总共最多只有 15 条未确认的消息,每个消费者最多 10 条消息。由于需要在通道和队列之间进行额外的协调以执行全局限制,因此这将比以上示例慢。

可配置的默认预取

RabbitMQ 可以使用默认预取,如果消费者未指定,则将应用该预取。该值可以在 高级配置文件 中配置为 rabbit.default_consumer_prefetch

%% advanced.config file
[
{rabbit, [
{default_consumer_prefetch, {false,250}}
]
}
].
© 2024 RabbitMQ. All rights reserved.