跳至主内容
版本:4.2

队列 (Queues)

什么是队列?

RabbitMQ 中的队列是消息的有序集合。消息以(FIFO(“先进先出”))方式入队和出队(传递给消费者)。

从通用角度定义一个队列,它是一个顺序数据结构,具有两个主要操作:一项可以被入队(添加)到尾部,并从头部出队(消费)。

队列在消息传递技术领域扮演着重要角色。许多消息传递协议和工具都假定发布者消费者使用类似队列的存储机制进行通信。

消息传递系统中的许多功能都与队列有关。一些 RabbitMQ 队列功能,例如优先级和消费者重新排队,会影响消费者观察到的顺序。

此主题中的信息包括 RabbitMQ 中队列的概述,以及指向其他主题的链接,以便您可以了解更多关于在 RabbitMQ 中使用队列的信息。

信息

除了队列之外,现代 RabbitMQ 版本还支持两种称为流和超流的替代数据结构。

本指南主要在AMQP 0-9-1 协议的上下文中介绍队列,但大部分内容适用于其他支持的协议。

一些协议(例如:STOMP 和 MQTT)围绕主题的概念。对于这些协议,队列充当消费者的一个数据累积缓冲区。然而,理解队列的作用仍然很重要,因为即使对于这些协议,许多功能仍然在队列级别运行。

是 RabbitMQ 中可用的替代消息数据结构。流提供与队列不同的功能。

本主题中涵盖的 RabbitMQ 队列信息包括:

有关消费者相关主题,请参阅消费者指南经典队列仲裁队列也有专门的指南。

队列名称

队列有名称,以便应用程序可以引用它们。

应用程序可以指定队列名称,或者要求代理生成一个名称。队列名称最多可以包含 255 字节的 UTF-8 字符。

以“amq.”开头的队列名称保留给代理内部使用。尝试声明违反此规则的队列名称将导致通道级异常,回复代码为 403 (ACCESS_REFUSED)。

服务器命名的队列

在 AMQP 0-9-1 中,代理可以代表应用程序生成唯一的队列名称。要使用此功能,请将空字符串作为队列名称参数传递:同一生成的名称可以通过同一通道中的后续方法使用空字符串(在需要队列名称的地方)来获取。这是可行的,因为通道会记住最后一个由服务器生成的队列名称。

服务器命名的队列旨在用于本质上是暂时的且特定于某个消费者的状态(应用程序实例)。应用程序可以在消息元数据中共享这些名称,以便其他应用程序可以响应它们(如教程六中所演示)。否则,服务器命名队列的名称应由声明应用程序实例知道并仅由该实例使用。该实例还应为队列设置适当的绑定(路由),以便发布者可以使用已知的交换器,而不是直接使用服务器生成的队列名称。

队列属性

队列具有定义其行为的属性。有一组强制属性和一个可选属性的映射。

  • 名称
  • 持久化(队列将在代理重启后继续存在)
  • 独占(仅供一个连接使用,并且当该连接关闭时队列将被删除)
  • 自动删除(至少有一个消费者的队列在最后一个消费者取消订阅时被删除)
  • 参数(可选;由插件和特定于代理的功能使用,例如消息 TTL、队列长度限制等)

请注意,并非所有属性组合在实践中都有意义。例如,自动删除和独占队列应由服务器命名。这类队列应仅用于特定于客户端或特定于连接(会话)的数据。

当自动删除或独占队列使用已知的(静态)名称时,在客户端断开连接并立即重新连接的情况下,RabbitMQ 节点之间会存在自然竞争条件,这些节点会删除这些队列,而恢复中的客户端会尝试重新声明它们。这可能导致客户端连接恢复失败或异常,并造成不必要的混淆或影响应用程序可用性。

声明和属性等效性

提示

特别是对于队列类型属性,属性等效性检查可以放宽。或者,可以配置默认队列类型(DQT)。

在使用队列之前必须声明它。声明队列会创建它(如果它尚不存在)。如果队列已存在且其属性与声明中的属性相同,则声明无效。当现有队列属性与声明中的属性不同时,将引发代码为 406 (PRECONDITION_FAILED) 的通道级异常。

特别是对于队列类型属性,属性等效性检查可以放宽或配置为使用默认值。

请参阅虚拟主机指南了解更多信息。

可选参数

可选队列参数,也称为“x-arguments”(因为它们在 AMQP 0-9-1 协议中的字段名称),是一个可以由客户端在声明队列时提供的任意键/值对的映射(字典)。

该映射用于各种功能和插件,例如

等等。

同样的概念也用于其他协议操作,例如,在注册消费者时

一些可选参数在队列声明时设置,并在队列的整个生命周期内保持不变。另一些则可以在队列声明后通过策略动态更改。

提示

对于可以通过策略设置的键,始终优先考虑使用策略而不是在应用程序代码中设置这些值。

例如,队列类型x-queue-type)和最大队列优先级数(x-max-priority)必须在队列声明时设置,之后无法更改。

可选队列参数可以设置得不同

  • 通过策略分组到队列(推荐)
  • 当客户端声明队列时,按每个队列进行设置
  • 对于 x-queue-type 参数,使用默认队列类型

前一种选择更灵活,侵入性小,不需要修改和重新部署应用程序。因此,它强烈推荐给大多数用户。请注意,一些可选参数,例如队列类型或最大优先级数,只能由客户端提供,因为它们不能动态更改,并且必须在声明时可知。

客户端提供可选参数的方式因客户端库而异,但通常是声明队列的函数(方法)中 durableauto_delete 和其他参数旁边的参数。

可选参数和策略定义的键优先级

当客户端提供的 x-arguments策略都提供了相同的键时,前者具有优先权。

然而,如果也使用了操作员策略,那么它将优先于客户端提供的参数。

对于数值,例如最大队列长度TTL,将使用两者中的较小值。如果应用程序需要或选择使用较低值,则操作员策略将允许这样做。但是,无法使用高于操作员策略中定义的值。

使用操作员策略为与资源使用相关的应用程序控制参数(例如,磁盘空间峰值使用量)引入护栏。

消息排序

当消息之间存在因果关系时,消息排序很重要。RabbitMQ 试图保留消息的顺序。

队列是提供FIFO语义的消息的有序集合。当在单个通道上发布时,消息会按发布顺序排入它们被路由到的每个队列。当在多个连接或通道上发布时,它们的[消息]序列将并发地路由和交错。从队列到消费者的传递按入队顺序进行(除非发生以下事件)。

当消息可以重新排序时

即使 RabbitMQ 旨在保持顺序,以下情况也会改变实际的传递顺序

  1. 消息优先级:高优先级消息可能在低优先级消息之前传递。
  2. 同一队列上的多个活动消费者:代理仍然按 FIFO 出队,但任何重新传递都可能改变顺序。当消费者否定确认并重新排队,或者通道/会话在未确认消息的情况下关闭时,会发生重新传递。重新传递的消息会被标记(AMQP 1.0:first-acquirer=false,AMQP 0-9-1:redelivered=true)。

保留消息顺序

要在 RabbitMQ 中保留消息顺序,您有两种选择。

1.)使用

流是一个不可变的追加式日志。每条消息在发布时都会分配其偏移量。此偏移量永不更改。

多个消费者可以并发地处理同一流中的消息,而不会影响顺序。通过流过滤,您可以分割工作,以便不同的消费者处理流的不相交子集,同时在每个子集内保持顺序。

2.)使用具有单个活动消费者的队列

要使用队列保持顺序

  • 启用单个活动消费者,这样一次只有一个消费者接收消息。(或者,每个队列运行一个消费者。)
  • 如果您的消费者将消息返回队列,请确保它们按接收顺序返回这些消息。
  • 对于仲裁队列,设置传递限制。这可确保消息被重新排入队列的前端。
  • AMQP 0-9-1:不要使用 basic.get。停止消费者时,优先关闭通道而不是 basic.cancel

持久性

队列可以是持久的或临时的。持久队列的元数据存储在磁盘上,而临时队列的元数据则尽可能存储在内存中。在某些协议(例如 AMQP 0-9-1 和 MQTT)下,发布消息时也会做出相同的区分。

在需要持久性的环境和用例中,应用程序必须使用持久队列确保发布者将已发布的消息标记为持久化。

临时队列将在节点启动时被删除。因此,它们不会在节点重启后继续存在(这是设计使然)。临时队列中的消息也将被丢弃。

持久队列将在节点启动时恢复,包括其中已发布为持久化的消息。已发布为临时消息的消息将在恢复期间被丢弃,即使它们存储在持久队列中。

如何选择

在大多数其他情况下,推荐使用持久队列。对于复制队列,唯一合理的选择是使用持久队列。

在大多数情况下,队列的吞吐量和延迟不受队列是否持久的影响。只有在队列或绑定更改非常频繁的环境中——即,队列每秒删除和重新声明数百次或更多次——才会看到某些操作(尤其是绑定)的延迟改进。因此,在持久队列和临时队列之间进行选择取决于用例的语义。

临时队列对于具有临时客户端的工作负载来说是一个合理选择,例如,用户界面中的临时 WebSocket 连接、预期会离线或切换身份的移动应用程序和设备。这类客户端通常具有本质上是临时的状态,在客户端重新连接时应替换。

某些队列类型不支持临时队列。例如,仲裁队列由于底层复制协议的假设和要求,必须是持久的。

临时队列

对于某些工作负载,队列应该是短命的。虽然客户端可以删除它们声明的队列(在断开连接之前),但这并不总是方便。此外,客户端连接可能会失败,可能留下未使用的资源(队列)。

RabbitMQ 支持许多对本质上是临时或特定于客户端的数据有意义的队列属性。其中一些设置可以应用于持久队列,但并非所有组合都有意义。

提示

考虑为临时队列使用服务器生成的名称。由于这类队列不打算在多个消费者之间共享,因此使用唯一的名称是有意义的。

共享临时队列可能导致 RabbitMQ 节点操作和恢复中的客户端之间发生自然竞争条件

有三种方法可以使队列自动删除

  • 独占队列(下文介绍)
  • TTL(下文也介绍)
  • 自动删除队列

当自动删除队列的最后一个消费者被取消(例如,在 AMQP 0-9-1 中使用 basic.cancel)或消失(通道或连接关闭,或 TCP 连接丢失)时,它将被删除。

如果队列从未有过消费者,例如,当所有消费使用轮询时,它将不会被自动删除。对于这种情况,请使用独占队列或队列 TTL。

警告

临时(非持久)非独占经典队列已被弃用。请改用持久队列非持久独占队列

队列 TTL 可用于清理未使用的持久队列。

当 RabbitMQ 检测到非持久且非独占队列时,它会在管理界面中显示弃用警告。

独占(客户端连接特定)队列

独占队列只能由声明它的连接使用(从中消费、清除、删除等)。这类队列本质上是临时的,在持久队列上设置 exclusive 属性在逻辑上没有意义,因为这样的队列不能比声明它的连接存活更久,因此在节点重启时无法满足其持久性属性。

声明为独占的队列始终被声明为经典队列:独占的仲裁队列在逻辑上没有意义,因为它们的生命周期将绑定到特定客户端连接的生命周期,因此绑定到单个节点(或应用程序实例)。

提示

考虑为独占队列使用服务器生成的名称。由于这类队列不能在多个消费者之间共享,因此使用服务器生成的名称最合适。

尝试从不同的连接使用独占队列将导致一个通道级异常 RESOURCE_LOCKED,错误消息为 cannot obtain exclusive access to locked queue

独占队列在其声明连接关闭或消失时(例如,由于底层 TCP 连接丢失)被删除。因此,它们仅适用于特定于客户端的临时状态。

通常将独占队列设置为服务器命名。

独占队列在“客户端本地”节点(声明队列的客户端连接到的节点)上声明,而与 queue_leader_locator 值无关。

复制和分布式队列

仲裁队列是一种复制的、面向数据安全和一致性的队列类型。经典队列历史上支持复制,但在 RabbitMQ 4.x 版本中已移除此功能。

任何客户端连接都可以使用任何队列,无论它是否被复制,而不管队列副本托管在哪个节点上,或者客户端连接到哪个节点。RabbitMQ 将为客户端透明地将操作路由到相应的节点。

例如,在具有节点 A、B 和 C 的集群中,连接到节点 A 的客户端可以从托管在 B 上的队列 Q 中消费,而连接到节点 C 的客户端可以发布消息,将消息路由到队列 Q。

客户端库或应用程序可以选择连接到托管特定队列当前领导者副本的节点,以提高数据局部性。

这个通用规则适用于 RabbitMQ 支持的所有消息传递数据类型,只有一个例外。是此规则的例外,它要求客户端(无论使用何种协议)连接到托管目标流副本(滚动领导者)的节点。因此,RabbitMQ 流协议客户端将并行连接到多个节点

队列也可以联合到松散耦合的节点或集群。

请注意,集群内复制和联合是正交的功能,不应被视为直接替代品。

是 RabbitMQ 支持的另一种复制数据结构,具有一套不同的支持操作和功能。

非复制队列和客户端操作

任何客户端连接都可以使用任何队列,包括非复制(单副本)队列,而不管队列副本托管在哪个节点上,或者客户端连接到哪个节点。RabbitMQ 将为客户端透明地将操作路由到相应的节点。

例如,在具有节点 A、B 和 C 的集群中,连接到节点 A 的客户端可以从托管在 B 上的队列 Q 中消费,而连接到节点 C 的客户端可以发布消息,将消息路由到队列 Q。

客户端库或应用程序可以选择连接到托管特定队列当前领导者副本的节点,以提高数据局部性。

这个通用规则适用于 RabbitMQ 支持的所有消息传递数据类型,只有一个例外。是此规则的例外,它要求客户端(无论使用何种协议)连接到托管目标流副本(滚动领导者)的节点。因此,RabbitMQ 流协议客户端将并行连接到多个节点

生存时间 (TTL) 和长度限制

队列的长度可以限制。队列和消息可以有TTL

这两种功能都可以用于数据过期,并且可以限制队列最多可以使用多少资源(RAM、磁盘空间),例如,当消费者离线或其吞吐量落后于发布者时。

在持久和内存存储中

在现代 RabbitMQ 版本中,仲裁队列和经典队列 v2 都积极地将数据移动到磁盘,并且只将相对较小的活动数据集保留在内存中。

在某些协议(例如 AMQP 0-9-1)中,客户端可以将消息发布为持久或临时的。临时消息仍会存储在磁盘上,但在下一个节点重启时将被丢弃。

在 AMQP 0-9-1 中,这是通过消息属性(delivery_mode 或在某些客户端中为 persistent)完成的。

有关此主题的其他相关指南包括仲裁队列关于内存使用量的推理警报内存警报可用磁盘空间警报部署指南以及消息存储配置

优先级

队列可以有 0 个或多个优先级。此功能是选择加入的:只有通过可选参数(见上文)配置了最大优先级数的队列才会进行优先级排序。

发布者使用消息属性中的 priority 字段指定消息优先级。

如果需要优先级队列,建议使用 1 到 10 之间。目前使用更多优先级会消耗更多资源(Erlang 进程)。

CPU 利用率和并行性考虑

目前,单个队列副本(无论是领导者还是追随者)在其热代码路径上仅限于单个 CPU 核心。因此,此设计假定大多数系统在实践中使用多个队列。

危险

单个队列通常被认为是一种反模式,不仅仅是由于资源利用率的原因。

对于将队列吞吐量推向极限的工作负载,请考虑使用流或分区流,并配合RabbitMQ 流协议客户端

指标和监控

RabbitMQ 收集有关队列的多个指标。其中大多数可通过RabbitMQ HTTP API 和管理 UI 获得,这些 API 和 UI 专为监控而设计。这包括队列长度、入站和出站速率、消费者数量、各种状态下的消息数量(例如,准备传递或未确认)、内存中与磁盘上的消息数量等等。

rabbitmqctl 可以列出队列和一些基本指标。

可以使用 rabbitmq-top 插件以及管理 UI 中的单个队列页面来访问运行时指标,例如 VM 调度程序使用情况、队列(Erlang)进程 GC 活动、队列进程使用的内存量、队列进程邮箱长度。

消费者和确认

可以通过注册一个消费者(订阅)来消费消息,这意味着 RabbitMQ 会将消息推送到客户端,或者对于支持此功能的协议(例如 AMQP 0-9-1 的 basic.get 方法)单独获取消息,类似于 HTTP GET。

已传递的消息可以由消费者显式确认,或者在传递写入连接套接字后自动确认。

自动确认模式通常会提供更高的吞吐量,并使用更少的网络带宽。然而,在故障方面,它的保证最少。经验法则是,首先考虑使用手动确认模式。

预取和消费者过载

自动确认模式还可能导致消费者过载,因为它们无法像消息传递那样快地处理消息。这可能导致消费者进程的内存使用量永久增长和/或操作系统交换。

手动确认模式提供了一种限制未完成(未确认)传递数量的方法:通道 QoS(预取)。

使用较高(数千或更多)预取级别的消费者可能会遇到与使用自动确认的消费者相同的过载问题。

大量未确认消息将导致代理的内存使用量增加。

消息状态

因此,入队的消息可以处于以下两种状态之一:

可以在管理 UI 中找到按状态划分的消息明细。

确定队列长度

可以通过多种方式确定队列长度

  • 使用 AMQP 0-9-1,利用 queue.declare 方法响应(queue.declare-ok)上的一个属性。字段名称为 message_count。其访问方式因客户端库而异。
  • 使用RabbitMQ HTTP API
  • 使用 rabbitmqctllist_queues 命令。

队列长度定义为准备传递的消息数量。

避免使用具有已知名称的临时队列

一个非独占的临时队列可以由客户端命名并在多个消费者之间共享。但是,不推荐这样做,并且可能导致 RabbitMQ 节点操作和客户端恢复之间的竞争条件。

考虑以下场景

  • 消费者使用一个具有已知名称的自动删除队列
  • 客户端连接失败
  • 客户端检测到并启动连接恢复

由于具有自动删除队列上唯一消费者的连接失败,RabbitMQ 必须删除该队列。此操作需要一些时间,在此期间消费者可能会恢复。

然后,根据操作的时间安排,队列可能会

  1. 被恢复的客户端声明然后删除
  2. 被删除然后重新声明

在第一种情况下,客户端将尝试在一个已被并发删除的队列上重新注册其消费者,这将导致通道异常。

这个根本的竞争条件有两个解决方案

  1. 引入连接恢复延迟。例如,一些 RabbitMQ 客户端库默认使用 5 秒的连接恢复延迟。
  2. 使用服务器命名的队列,这可以完全绕过问题,因为新的客户端连接将使用与其前身不同的队列名称。
© . This site is unofficial and not affiliated with VMware.