跳至主内容
版本:4.2

消费者预取

概述

消费者预取是 通道预取机制 的一个扩展。

AMQP 0-9-1 规范指定了 basic.qos 方法,以便在通道(或连接)上消费时 限制未确认消息的数量(也称为“预取计数”)。不幸的是,通道并非此功能的理想作用域——因为单个通道可能从多个队列中消费,所以通道和队列需要与每条发送的消息进行协调,以确保它们不会超出限制。这在一个单机上效率较低,而在跨集群消费时则效率非常低。

此外,对于许多用例而言,为每个消费者指定预取计数更为自然。

因此,在预取如何应用于通道上的多个消费者方面,RabbitMQ 略微偏离了 AMQP 0-9-1 规范。

AMQP 0-9-1 中 prefetch_count 的含义RabbitMQ 中 prefetch_count 的含义
在通道上的所有消费者之间共享应用于通道上的每个新消费者

单个消费者

以下 Java 基本示例将一次接收最多 10 条未确认的消息。

Channel channel = ...;
Consumer consumer = ...;
channel.basicQos(10); // Per consumer limit
channel.basicConsume("my-queue", false, consumer);

值为 0 被视为无限,允许任意数量的未确认消息。

Channel channel = ...;
Consumer consumer = ...;
channel.basicQos(0); // No limit for this consumer
channel.basicConsume("my-queue", false, consumer);

独立消费者

此示例在同一通道上启动两个消费者,每个消费者将独立接收最多 10 条未确认的消息。

Channel channel = ...;
Consumer consumer1 = ...;
Consumer consumer2 = ...;
channel.basicQos(10); // Per consumer limit
channel.basicConsume("my-queue1", false, consumer1);
channel.basicConsume("my-queue2", false, consumer2);

多个消费者共享限制

AMQP 0-9-1 规范并未解释使用不同的 global 值多次调用 basic.qos 会发生什么。RabbitMQ 将此解释为两个预取限制将独立执行;只有当未确认消息的任一限制未达到时,消费者才会接收新消息。

例如:

Channel channel = ...;
Consumer consumer1 = ...;
Consumer consumer2 = ...;
channel.basicQos(10, false); // Per consumer limit
channel.basicQos(15, true); // Per channel limit
channel.basicConsume("my-queue1", false, consumer1);
channel.basicConsume("my-queue2", false, consumer2);

这两个消费者之间最多只有 15 条未确认的消息,每个消费者最多 10 条。由于在通道和队列之间协调以执行全局限制的额外开销,这将比上述示例慢。

可配置的默认预取

RabbitMQ 可以使用默认预取,如果消费者未指定,则将应用此默认预取。该值可以在 高级配置文件 中配置为 rabbit.default_consumer_prefetch

%% advanced.config file
[
{rabbit, [
{default_consumer_prefetch, {false,250}}
]
}
].
© . This site is unofficial and not affiliated with VMware.