RabbitMQ 4.0:新的 Quorum Queue 功能
RabbitMQ 4.0(目前为 beta 版)包含新的 Quorum Queue 功能
- 消息优先级
- 消费者优先级与单活跃消费者结合
- 默认投递限制现为 20(破坏性更改!)
- 长队列的更快恢复
消息优先级
RabbitMQ 4.0 中引入并在此部分描述的双级相对优先级系统已被取代。RabbitMQ 4.3 引入了 32 个严格的优先级级别。
对消息优先级的支持可能是仲裁队列中最受期待的功能,主要是因为现有的经典镜像队列用户希望迁移到仲裁队列(请记住,4.0 版本已移除对经典队列镜像的支持)。
然而,仲裁队列支持优先级的方式与经典队列处理优先级的方式有显著不同。经典队列需要使用 x-max-priority 参数来定义给定队列的最大优先级数量(如果未提供此参数,队列将平等对待所有消息)。技术上允许 0 到 255 之间的值,尽管实际上每个队列不应使用超过少数几个优先级。仲裁队列不需要任何预先声明(无需为给定队列启用优先级),但每个队列正好有两种优先级:普通(normal)和高(high)。其行为与 AMQP 1.0 规范一致(请参见 AMQP 1.0 规范第 3.2.1 章)。
- 0 到 4(含)之间的优先级值被视为普通优先级
- 任何大于 4 的值都被视为高优先级
- 如果发布者未指定消息的优先级,则默认值为
4(普通优先级)
如果仲裁队列中既有普通优先级消息又有高优先级消息,消费者将收到两者的混合,比例为每 1 条普通优先级消息对应 2 条高优先级消息。这种方法避免了饥饿(Starvation)现象,因为无论高优先级消息的数量有多少,系统都会对普通优先级消息的处理取得进展。这与经典队列的实现形成了对比,经典队列总是优先传递高优先级消息(如果存在),因此普通优先级消息可能永远无法被传递(或者更常见的情况是,它们的传递延迟会非常高)。
以下是其工作原理的直观表示。为准备此测试,我们首先发布了 10 万条普通优先级消息,然后发布了 10 万条高优先级消息。由于仲裁队列在 4.0 之前并不具备优先级感知能力,如果我们旧版本中执行此操作并启动消费者,它会简单地先接收普通优先级消息(因为它们更旧),然后是所有高优先级消息。而在 4.0 中,我们可以看到消费者立即开始以每秒约 1500 条普通优先级消息和两倍数量的高优先级消息的比例接收,总计每秒约 4500 条消息(实际传输速率在这里并不重要,它们取决于多种因素;在优先级的背景下,2:1 的高/普通优先级比例才是关键)。一旦队列传递了所有高优先级消息,消费者就开始以每秒约 4500 条的速度接收普通优先级消息——即在此测试场景中它所能处理的最大速度。蓝色虚线(右侧轴刻度)是队列中的就绪消息数量(两种优先级的总和)——我们可以看到它从 20 万开始,最终降至零。

让我们考虑相反的场景——如果我们先发布所有高优先级消息,然后再发布所有普通优先级消息会怎样?在这种情况下,消费者将按照发布顺序接收消息。普通优先级消息没有理由越过高优先级消息。

此测试是如何执行的?
对于此测试,我们使用了 omq,这是一个用于 AMQP 1.0、MQTT 和 STOMP 的测试客户端。仲裁队列的行为并不取决于所使用的协议——之所以使用 AMQP 1.0,仅仅是因为 omq 可以按消息优先级输出消息消费指标。
# declare a quorum queue (you can use the Management UI or any other method)
rabbitmqadmin declare queue name=qq queue_type=quorum
# publish normal priority messages (10 publishers, 10k messages each)
omq amqp --publishers 10 --consumers 0 --publish-to /queues/qq --message-priority 1 --pmessages 10000
# publish high priority messages
omq amqp --publishers 10 --consumers 0 --publish-to /queues/qq --message-priority 10 --pmessages 10000
# consume all messages from the queue
omq amqp --publishers 0 --consumers 1 --consume-from /queues/qq --consumer-credits 100
对于第二个场景,只需以相反的顺序运行发布命令即可。
如果我需要更多的控制权怎么办?
如果 2:1 传递比例的两种优先级不能满足您的需求,我们建议采取以下两点:
- 重新考虑您的需求。😄推理具有多种优先级的消息传递顺序是非常困难的。确保所有消息都能足够快地传递,并仅在偶尔出现大量积压时利用优先级让重要消息跳过队列,这可能更容易实现。
- 如果您确实需要更多的优先级和/或对不同优先级处理方式的更多控制,使用多个队列是最好的选择。您可以开发一个订阅多个队列的消费者,然后决定从哪个队列进行消费。
消费者优先级与单一活跃消费者(Single Active Consumer)结合
从 RabbitMQ 4.0 开始,仲裁队列在选择单一活跃消费者时会考虑消费者优先级。如果出现更高优先级的消费者(订阅),仲裁队列将切换到该消费者。如果您有多个队列,且每个队列都应有一个单一消费者,但您又不希望应用程序的单个实例成为所有队列的消费者(当启动的第一个应用实例订阅所有单一活跃消费者队列时,这种情况很可能会发生),那么此功能特别有用。现在,您可以在订阅不同队列时选择不同的优先级,以确保每个实例仅从其“首选”队列进行消费,并仅作为其他队列的备份消费者。
为了更好地解释此功能,让我们回顾一下所有相关组件。单一活跃消费者是一个队列参数,它防止队列向多个消费者发送消息,无论有多少消费者订阅了该队列。只有一个消费者处于活跃状态,所有其他消费者都不活跃。如果活跃消费者断开连接,其中一个非活跃消费者将被激活。此功能用于需要维护严格消息处理顺序的场景。
消费者优先级允许您指定应优先考虑某个特定的消费者,而不是以公平的轮询方式向所有订阅的消费者发送消息(这是经典队列和仲裁队列的默认行为)。
在 4.0 版本之前,这些功能实际上是互斥的——如果启用了单一活跃消费者,只要前一个消费者保持活跃,无论优先级如何,新消费者都不会变为活跃状态。从 4.0 开始,如果新消费者的优先级高于当前活跃消费者的优先级,仲裁队列将切换到更高优先级的消费者:它将停止向当前消费者发送消息,等待所有消息被确认,然后停用旧消费者,并激活更高优先级的消费者。
下图展示了此行为。该图中有三个指标:
- 绿色线条显示了第一个(默认优先级)消费者消费的消息数量(该消费者被配置为以 10 条/秒的速度消费)
- 黄色线条显示了第二个、更高优先级消费者的相同数值
- 蓝色线条显示了未确认消息的数量(右侧轴刻度)

最初,我们只有一个消费者,如预期的那样,它以 9-10 条/秒的速度消费(这些在 9 到 10 之间的跳动仅仅是指标发射和显示方式的结果)。该消费者配置了 1000 条消息的预取(prefetch),由于队列中有很多消息,预取缓冲区被最大限度地利用。然后黄色线条出现,初始为 0 条/秒。这是更高优先级的消费者,它已经连接但尚未活跃。从它连接的那一刻起,我们可以看到未确认消息的数量在下降,因为队列不再向原始消费者发送消息。一旦所有消息都被确认,新消费者就成为单一活跃消费者并接收 1000 条消息(因为这是它的预取值)。然后它愉快地按照配置以大约 10 条/秒的速度进行消费。
此测试是如何执行的?
对于此测试,我们使用了 perf-test,这是一个用于 AMQP 0.9.1 的测试客户端。
# Publish 5000 messages to have a backlog (perf-test will declare a quorum queue `qq-sac`)
perf-test --quorum-queue --queue qq-sac --pmessages 5000 --confirm 100 -qa x-single-active-consumer=true --consumers 0
# Start a consumer with the default priority and prefetch of 1000; consume ~10 msgs/s
perf-test --producers 0 --predeclared --queue qq-sac --consumer-latency 100000 --qos 1000
# In another window, some time after starting the first consumer, start a higher priority consumer
perf-test --producers 0 --predeclared --queue qq-sac --consumer-latency 100000 --qos 1000 --consumer-args x-priority=10
过一段时间,您应该会看到第一个消费者停止接收消息(perf-test 不再有输出),而第二个消费者开始接收消息。
此示例中使用的设置是为了突出切换过程,并不适合实际场景。如果消费者只能处理 10 条/秒,通常没有理由将预取值配置得高达 1000。
传递限制现在默认为 20
这对某些应用程序来说可能是一个破坏性变更。
仲裁队列现在默认将传递限制(delivery limit)设置为 20。过去,此限制未设置,因此仲裁队列会无限期地尝试传递,直到消息被确认或被消费者丢弃。这可能导致消息卡在队列中且永远无法被传递的情况。
此更改的缺点是,如果未配置死信队列,消息将在 20 次尝试后被丢弃。因此,强烈建议为所有仲裁队列配置死信机制。
长队列的更快恢复
与其说这是一个功能,不如说是一个内部改进,但非常值得一提。到目前为止,如果 RabbitMQ 节点重启,该节点上的所有仲裁队列都必须读取自上次快照以来的所有数据(Raft 日志)以重建其内存状态。例如,如果您现在向仲裁队列发布数百万条消息,然后重启节点,您会发现节点启动后,队列会在相当长的时间内(至少几秒钟)报告 0 条就绪消息,并且您无法开始消费这些消息。队列只是尚未准备好服务流量——它仍在从磁盘读取数据(注意:这并不意味着所有这些数据都会保留在内存中,绝大部分都不会,但会保留一个队列数据的索引/摘要)。从 RabbitMQ 4.0 开始,仲裁队列会创建包含队列在特定时间点状态的检查点文件。启动时,队列可以读取最新的检查点,仅读取从该时间点开始的部分 Raft 日志。这意味着仲裁队列的启动时间大大缩短。
例如,我的机器上一个包含 1000 万条 12 字节消息的 RabbitMQ 节点,需要大约 30 秒才能启动。而在 RabbitMQ 4.0 中,只需不到一秒。
您可能想知道快照(snapshot)和检查点(checkpoint)之间的区别。在许多方面,它们是相同的——它们实际上共享写入磁盘的代码。区别在于快照仅在 Raft 日志被截断时创建。对于许多常见的队列用例,这就是所需要的——旧消息被消费,我们创建一个不再包含它们的新快照并截断日志。此时,队列不再记得那些消息曾经存在过。另一方面,当无法截断日志时,会定期创建检查点。测试用例场景就是一个很好的例子——由于我们没有消费任何消息,最旧的消息仍然存在,我们不能简单地忘记它们。但检查点仍然允许队列更快地启动。当日志被截断时(在此示例中为消费掉一些旧消息后),检查点可以提升为快照。
我该如何尝试这些功能?
我们将再次使用 perf-test 来声明队列并发布消息。
# Publish 10 million 12-byte messages (feel free to play with other values)
perf-test --quorum-queue --queue qq --consumers 0 --pmessages 5000000 --confirm 1000 --producers 2
# restart the node
rabbitmqctl stop_app && rabbitmqctl start_app
# list the queues (repeat this command until the number of messages is 10 million instead of 0)
rabbitmqctl list_queues
总结
RabbitMQ 4.0 是 RabbitMQ 的一个重要里程碑。随着经典队列镜像的移除,仲裁队列成为高可用、复制队列的唯一选择(注意:流(streams)也是高可用和复制的,但从技术上讲它们不是队列;尽管如此,对于过去使用经典镜像队列的某些用例,它们可能仍然是一个不错的选择)。多年来,仲裁队列提供了比镜像队列更高的数据安全保障和更好的性能,随着这些最新的改进,它们在更广泛的场景中变得更加稳健和高效。
您现在可以尝试 RabbitMQ 4.0 beta 版本: https://github.com/rabbitmq/rabbitmq-server/releases/tag/v4.0.0-beta.5
