消费者预取
概述
消费者预取是 通道预取机制 的一个扩展。
AMQP 0-9-1 规范指定了 basic.qos 方法,以便在通道(或连接)上消费时 限制未确认消息的数量(也称为“预取计数”)。不幸的是,通道并非此功能的理想作用域——因为单个通道可能从多个队列中消费,所以通道和队列需要与每条发送的消息进行协调,以确保它们不会超出限制。这在一个单机上效率较低,而在跨集群消费时则效率非常低。
此外,对于许多用例而言,为每个消费者指定预取计数更为自然。
因此,在预取如何应用于通道上的多个消费者方面,RabbitMQ 略微偏离了 AMQP 0-9-1 规范。
AMQP 0-9-1 中 prefetch_count 的含义 | RabbitMQ 中 prefetch_count 的含义 |
|---|---|
| 在通道上的所有消费者之间共享 | 应用于通道上的每个新消费者 |
单个消费者
以下 Java 基本示例将一次接收最多 10 条未确认的消息。
Channel channel = ...;
Consumer consumer = ...;
channel.basicQos(10); // Per consumer limit
channel.basicConsume("my-queue", false, consumer);
值为 0 被视为无限,允许任意数量的未确认消息。
Channel channel = ...;
Consumer consumer = ...;
channel.basicQos(0); // No limit for this consumer
channel.basicConsume("my-queue", false, consumer);
独立消费者
此示例在同一通道上启动两个消费者,每个消费者将独立接收最多 10 条未确认的消息。
Channel channel = ...;
Consumer consumer1 = ...;
Consumer consumer2 = ...;
channel.basicQos(10); // Per consumer limit
channel.basicConsume("my-queue1", false, consumer1);
channel.basicConsume("my-queue2", false, consumer2);
多个消费者共享限制
AMQP 0-9-1 规范并未解释使用不同的 global 值多次调用 basic.qos 会发生什么。RabbitMQ 将此解释为两个预取限制将独立执行;只有当未确认消息的任一限制未达到时,消费者才会接收新消息。
例如:
Channel channel = ...;
Consumer consumer1 = ...;
Consumer consumer2 = ...;
channel.basicQos(10, false); // Per consumer limit
channel.basicQos(15, true); // Per channel limit
channel.basicConsume("my-queue1", false, consumer1);
channel.basicConsume("my-queue2", false, consumer2);
这两个消费者之间最多只有 15 条未确认的消息,每个消费者最多 10 条。由于在通道和队列之间协调以执行全局限制的额外开销,这将比上述示例慢。
可配置的默认预取
RabbitMQ 可以使用默认预取,如果消费者未指定,则将应用此默认预取。该值可以在 高级配置文件 中配置为 rabbit.default_consumer_prefetch。
%% advanced.config file
[
{rabbit, [
{default_consumer_prefetch, {false,250}}
]
}
].