队列中的优先级支持
先决条件
本指南假设读者熟悉 RabbitMQ 的基础知识
请参考上述指南。
什么是优先级队列
RabbitMQ 中的经典队列和 仲裁队列 支持优先级。队列的标准操作模式是 FIFO(先进先出)。这意味着,暂不考虑预取(prefetch)、竞争消费者、重新入队和重新投递,RabbitMQ 将按照消息进入队列的顺序将消息投递给消费者。
对于配置了优先级支持的队列,这种标准行为会发生改变。为了简洁起见,本指南及后续的 RabbitMQ 文档中将这些队列(包括经典队列和仲裁队列)统称为“优先级队列”。
优先级队列按照消息的优先级顺序投递消息。消息优先级是 发布者 在发布时设置的正整数值。
考虑一个包含三条消息 A、B 和 C 的队列,它们以相等的优先级发布,并按该顺序进入一个“常规”经典队列
| 消息 | 入队顺序 | 优先级 |
|---|---|---|
| A | 1 | 1 |
| B | 2 | 1 |
| C | 3 | 1 |
这些消息将按以下顺序分发(发送)给消费者(或多个消费者):A、B、C。
现在,考虑一个包含相同消息但具有不同优先级的优先级队列
| 消息 | 入队顺序 | 优先级 |
|---|---|---|
| A | 1 | 1 |
| B | 2 | 3 |
| C | 3 | 2 |
与标准的 FIFO 投递行为不同,这些消息将根据其优先级以不同的顺序分发给消费者(或多个消费者):B、C、A。
在实践中,当优先级队列存在竞争消费者、重新投递,或者在消费者连接丢失或消费者应用程序失败时发生 自动重新入队 时,投递顺序可能会略有不同。
采用优先级队列之前:考虑替代方案
优先级队列在消费者投递方面的行为比标准的 FIFO 队列行为更难预测,特别是在消费者经常 重新入队 投递的环境中。因此,考虑简单的替代方案是否更合适非常重要。
通常采用优先级队列是为了避免排队系统中的一个经典问题:队头阻塞(head-of-line blocking)问题。然而,首先应考虑几种可能的替代解决方案:
- 使用多个队列而不是一个。单一巨型队列(Single Giant Queue™)是队列使用中最常见的反模式之一。
- 对于单队列上的竞争消费者,使用单独的通道(channel)配合大于 1 的 预取值,这样耗尽的预取不会阻塞投递流。
- 使用 流(stream) 代替队列。流提供了不同的消费模式并支持重复消费。
- 在少数场景中,消费者优先级 相比优先级队列可能更容易预测。
例如,一组三个队列:priority.low、priority.medium 和 priority.high,可以在保持标准投递行为的同时避免队头阻塞问题,并且作为正面效果,能提供 更好的运行时并行性。
声明和支持的优先级范围
经典队列
经典队列支持 [0, 255] 范围内的优先级。
具体来说,对于经典队列,较高的优先级值会消耗更多的 CPU 和内存资源:RabbitMQ 需要在内部为从 1 到为给定队列配置的最大值之间的每个优先级维护一个子队列。
对于经典队列,强烈建议仅使用个位数的优先级。这对于几乎所有用例都足够,并且每个队列的开销合理。
配置最大优先级数
可以通过客户端提供的 可选参数 来配置经典队列的最大优先级数。
使用策略(policies) 将队列声明为优先级队列在设计上是 不支持的。有关原因,请参考 为什么不支持为优先级队列定义策略。
使用客户端提供的可选参数
要声明优先级队列,请使用 x-max-priority 可选队列参数。该参数应为 [1, 255] 范围内的正整数。然而,如上所述,强烈建议使用 2 到 4 个优先级。
例如,使用 Java 客户端
Channel ch = ...;
Map<String, Object> args = new HashMap<String, Object>();
// Recommendation: use values from 2 to 4 for the maximum number of priorities
args.put("x-max-priority", 4);
ch.queueDeclare("my-priority-queue", true, false, false, args);
发布者随后可以使用 basic.properties 中的 priority 字段发布优先级消息。数字越大表示优先级越高。
仲裁队列
仲裁队列在内部仅支持两种优先级:高和普通。未设置优先级或优先级在 [0, 4] 范围内的消息将被视为普通优先级。
优先级高于 4 的消息将被视为高优先级。
现在,考虑一个包含相同消息但具有不同优先级的优先级仲裁队列
| 消息 | 入队顺序 | 优先级 |
|---|---|---|
| A | 1 | 1 |
| B | 2 | 2 |
| C | 3 | 8 |
这些消息将按以下顺序分发(发送)给消费者(或多个消费者):C、A、B。消息 C 将被视为高优先级消息,而 A 和 B 将被视为普通优先级。
优先级队列行为
AMQP 0-9-1 规范对于优先级的工作方式表述有些模糊。它指出所有队列必须至少支持 2 个优先级,最多可以支持 10 个。它并未定义如何处理没有优先级属性的消息。
默认情况下,RabbitMQ 经典队列不支持优先级。创建优先级队列时,可以根据需要选择最大优先级。在选择优先级值时,需要考虑以下因素:
-
每个队列的每个优先级级别都有一定的内存和磁盘成本。此外还有额外的 CPU 成本,特别是在消费时,因此您可能不希望创建过多的级别。
-
消息
priority字段定义为无符号字节,即其值不能超出 [0, 255] 范围。 -
没有
priority属性的消息将被视为优先级为 0。优先级高于队列最大值的消息将被视为以最大优先级发布。
低消息优先级与资源匮乏问题
基于优先级的消息投递的一个常见问题是 资源匮乏(resource starvation)。在优先级队列的背景下,这意味着只有高优先级的消息被投递,而低优先级的消息永远无法被投递。
RabbitMQ 中的经典队列和仲裁队列都能防止匮乏。
经典队列的优先级实现使用子队列。在每个投递周期中,队列会先投递最高优先级子队列中的消息,然后依次处理下一个高优先级的子队列,直到最低优先级的子队列。
在投递周期中入队的高优先级消息将在下一个周期中投递,从而防止匮乏。
仲裁队列总是为每一批高优先级消息投递一定比例的普通优先级消息,从而防止匮乏。
最大优先级数与资源使用
对于采用优先级发布和优先级队列的环境,强烈建议使用 低个位数的优先级。
对于经典队列,使用更多的优先级会通过增加 Erlang 进程的数量来消耗更多的 CPU 资源。运行时调度 也会受到影响。
优先级队列如何与消费者协同工作
如果消费者连接到一个空的优先级队列,随后消息被发布到该队列,这些消息在被消费者接收之前可能不会在优先级队列中等待(所有消息都会立即被接收)。在这种情况下,优先级队列没有机会对消息进行排序,优先级也就没有必要了。
然而,在大多数情况下,上述情况并非标准情况,因此您应该在消费者上使用手动确认模式下的 basic.qos (预取) 方法,以限制任何时候可以处于投递状态的消息数量,并允许消息被排序。basic.qos 是消费者连接到队列时设置的值。它指示消费者一次可以处理多少条消息。
以下示例试图更详细地解释消费者如何与优先级队列协同工作,并强调有时当优先级队列与消费者工作时,高优先级消息在实践中可能需要等待低优先级消息先被处理。
示例
-
一个新的消费者连接到一个空的经典(非优先级)队列,消费者预取(
basic.qos)值为 10。 -
发布一条消息,并立即发送给消费者进行处理。
-
随后快速发布 5 条消息并立即发送给消费者,因为消费者在 10 个声明的 qos(预取)中只有 1 个处于传输中(未确认)的消息。
-
接下来,快速发布 10 条消息并发送给消费者,只有 10 条消息中的 4 条被发送给消费者(因为最初的 10 个
basic.qos(消费者预取)值现已满),其余 6 条消息必须在队列中等待(就绪消息)。 -
消费者现在确认了 5 条消息,因此上述等待的 6 条消息中有 5 条随后被发送给消费者。
现在添加优先级
-
如上例所示,消费者连接时设置的
basic.qos(消费者预取)值为 10。 -
发布 10 条低优先级消息并立即发送给消费者(
basic.qos(消费者预取)已达到其限制)。 -
此时发布了一条最高优先级消息,但预取限制已达到,因此最高优先级消息需要等待低优先级消息先被处理。
与其他功能的交互
通常,优先级队列具有标准 RabbitMQ 队列的所有功能。开发者应注意以下几点交互。
应该过期的消息 仍然只从队列头部过期。这意味着与普通队列不同,即使是每队列 TTL 也可能导致过期的低优先级消息卡在未过期的优先级更高的消息后面。这些消息永远不会被投递,但它们会出现在队列统计信息中。
设置了最大长度的队列 通常会从队列头部丢弃消息以执行限制。这意味着优先级较高的消息可能会为了给优先级较低的消息腾出空间而被丢弃,这可能不是您预期的行为。
为什么不支持为优先级队列定义策略
为队列定义可选参数最方便的方法是使用 策略。策略是配置 TTL、队列长度限制 和其他 可选队列参数 的推荐方式。
然而,策略不能用于配置优先级,因为策略是动态的,可以在队列声明后更改。优先级队列在队列声明后永远无法更改其支持的优先级数量,因此策略不是一个安全的选择。