跳至主内容
版本:4.2

消费者

概述

本指南涵盖了与消费者相关的各种主题

等等。

术语

“消费者”一词在不同上下文中含义不同。总的来说,在消息传递和流式处理的上下文中,“消费者”是指一个消费并确认消息的应用程序(或应用程序实例)。同一个应用程序也可以发布消息,因此可以同时充当发布者。

消息传递协议也包含持久订阅以传递消息的概念。订阅是常用于描述此类实体的术语之一。消费者是另一个。RabbitMQ 支持的消息传递协议同时使用这两个术语,但 RabbitMQ 文档倾向于后者。

从这个意义上说,消费者是用于消息传递的订阅,需要在传递开始前注册,并且可以由应用程序取消。

基础知识

RabbitMQ 是一个消息代理。它接收来自发布者的消息,进行路由,并在存在要路由到的队列时,将其存储以供消费,或者如果存在消费者,则立即将消息传递给消费者。

消费者从队列消费。要消费消息,必须有一个队列。当添加新消费者时,假设队列中已准备好消息,传递将立即开始。

目标队列在注册消费者时可能是空的。在这种情况下,首次传递将在消息入队时发生。

尝试从不存在的队列消费将导致通道级别的异常,代码为 404 Not Found,并关闭尝试消费的通道。

消费者标签

每个消费者都有一个标识符,客户端库使用该标识符来确定为给定传递调用哪个处理程序。它们的名称因协议而异。消费者标签和订阅 ID 是最常用的两个术语。RabbitMQ 文档倾向于使用前者。

消费者标签也用于取消消费者。

消费者生命周期

消费者旨在长期存在:也就是说,在消费者的整个生命周期中,它会接收多次传递。注册一个消费者来消费单条消息不是最优的。

消费者通常在应用程序启动时注册。它们通常会与其连接甚至应用程序一起运行。

消费者可以更动态,并响应系统事件进行注册,在不再需要时取消订阅。这在使用 Web STOMPWeb MQTT 插件以及移动客户端等时通过 WebSocket 客户端很常见。

连接恢复

客户端可能会丢失与 RabbitMQ 的连接。当检测到连接丢失时,消息传递将停止。

一些客户端库提供自动连接恢复功能,其中包括消费者恢复。Java、.NETBunny 是此类库的示例。虽然连接恢复无法覆盖 100% 的场景和工作负载,但它通常对消费应用程序非常有效,并被推荐使用。

对于其他客户端库,应用程序开发人员负责执行连接恢复。通常,以下恢复顺序效果很好

  • 恢复连接
  • 恢复通道
  • 恢复队列
  • 恢复交换机
  • 恢复绑定
  • 恢复消费者

换句话说,消费者通常在最后恢复,在其目标队列和这些队列的绑定就位之后。

重要

请注意,自动恢复使用自动删除和独占队列的连接应确保这些队列是服务器命名的

注册消费者 (订阅,“推送 API”)

应用程序可以订阅,让 RabbitMQ 将入队的消息(传递)推送到它们。这通过在队列上注册消费者(订阅)来实现。订阅建立后,RabbitMQ 将开始传递消息。对于每次传递,将调用用户提供的处理程序。根据使用的客户端库,这可能是一个用户提供的函数或遵循特定接口的对象。

成功的订阅操作会返回一个订阅标识符(消费者标签)。以后可以使用它来取消消费者。

Java 客户端

请参阅Java 客户端指南获取示例。

.NET 客户端

请参阅.NET 客户端指南获取示例。

消息属性和传递元数据

每次传递都结合了消息元数据和传递信息。不同的客户端库使用略有不同的方式提供对这些属性的访问。通常,传递处理程序可以访问传递数据结构。

以下属性是传递和路由详情;它们本身不是消息属性,由 RabbitMQ 在路由和传递时设置

属性类型描述
传递标签正整数

传递标识符,请参阅确认

已重新传递布尔值如果此消息先前已被传递并重新排队,则设置为 true
交换机字符串路由此消息的交换机
路由键字符串发布者使用的路由键
消费者标签字符串消费者(订阅)标识符

以下是消息属性。其中大部分是可选的。它们由发布者在发布时设置

属性类型描述必需?
传递模式枚举 (1 或 2)

2 表示“持久”,1 表示“瞬时”。一些客户端库将此属性公开为布尔值或枚举。

类型字符串应用程序特定的消息类型,例如“orders.created”
Headers映射 (string => any)任意的头部映射,具有字符串头部名称
内容类型字符串内容类型,例如“application/json”。由应用程序使用,而不是核心 RabbitMQ
内容编码字符串内容编码,例如“gzip”。由应用程序使用,而不是核心 RabbitMQ
消息 ID字符串任意消息 ID
关联 ID字符串帮助关联请求和响应,请参阅教程 6
回复到字符串包含响应队列名称,请参阅教程 6
过期字符串每条消息的 TTL
时间戳时间戳应用程序提供的 timestamp
用户 ID字符串用户 ID,如果设置则已验证
应用 ID字符串应用程序名称

消息类型

消息的 type 属性是一个任意字符串,用于帮助应用程序沟通消息的种类。它由发布者在发布时设置。该值可以是发布者和消费者商定的任何领域特定字符串。

RabbitMQ 不会验证或使用此字段,它仅供应用程序和插件使用和解释。

实际中的消息类型自然会分为几类,点分隔的命名约定很常见(但 RabbitMQ 或客户端不强制要求),例如 orders.createdlogs.lineprofiles.image.changed

如果消费者收到无法处理的消息类型,强烈建议记录这些事件以方便故障排除。

内容类型和编码

内容(MIME 媒体)类型和内容编码字段允许发布者沟通消息载荷应如何被消费者反序列化和解码。

RabbitMQ 不会验证或使用这些字段,它们仅供应用程序和插件使用和解释。

例如,具有 JSON 载荷的消息应使用 application/json。如果载荷使用 LZ77 (GZip) 算法压缩,则其内容编码应为 gzip

可以通过用逗号分隔来指定多个编码。

确认模式

注册消费者时,应用程序可以选择两种传递模式之一

  • 自动(传递无需确认,又称“发送即忘”)
  • 手动(传递需要客户端确认)

消费者确认是单独文档指南的主题,以及与发布者确认(发布者的密切相关概念)一起。

使用 Prefetch 限制并发传递

在手动确认模式下,消费者可以限制多少传递可以“在途”(在网络上传输或已传递但未确认)。这可以避免消费者过载。

此功能与消费者确认一起,是单独文档指南的主题。

消费者容量指标

RabbitMQ 管理 UI 以及监控数据端点,例如用于Prometheus 抓取的端点,会显示每个队列的消费者容量(以前称为消费者利用率)指标。

该指标计算的是队列能够立即向消费者传递消息的时间比例。它有助于操作员注意到可能值得为队列添加更多消费者(应用程序实例)的情况。

如果此数字小于 100%,则队列领导副本可能能够更快地传递消息,如果

  • 有更多消费者,或
  • 消费者花费更少的时间处理传递,或
  • 消费者通道使用了更高的prefetch 值

对于没有消费者的队列,消费者容量将为 0%。对于有在线消费者但没有消息流的队列,该值为 100%:理念是任何数量的消费者都可以维持这种传递速率。

请注意,消费者容量仅仅是一个提示。消费者应用程序可以并且应该收集有关其操作的更具体的指标,以帮助进行容量调整和任何可能的容量更改。

取消消费者(取消订阅)

要取消消费者,必须知道其标识符(消费者标签)。

在消费者被取消后,将不再向其发送未来的传递。请注意,仍然可能存在先前已发送的“在途”传递。取消消费者既不会丢弃它们,也不会重新排队它们。

被取消的消费者将不会观察到除了在 RabbitMQ 处理 basic.cancel 方法时“在途”的传递之外的任何新传递。所有先前未确认的传递都不会受到任何影响。要重新排队“在途”传递,应用程序必须关闭通道。

Java 客户端

请参阅Java 客户端指南获取示例。

.NET 客户端

请参阅.NET 客户端指南获取示例。

轮询单个消息(“拉取 API”)

危险

本节所述机制是一种轮询。与分布式系统中的任何基于轮询的方法一样,它效率非常低下,尤其是在队列可能为空一段时间的情况下。

除了集成测试,强烈反对使用此 AMQP 0-9-1 消费机制。

RabbitMQ 管理Prometheus插件提供了几个有助于检测使用轮询(basic.get)的应用程序的指标。

提示

使用长期存在的消费者而不是轮询。

使用 AMQP 0-9-1,可以通过 basic.get 协议方法逐个获取消息。消息按 FIFO 顺序获取。与消费者(订阅)一样,可以使用自动或手动确认。

逐个获取消息是强烈不推荐的,因为它比常规长期存在的消费者效率非常低。与任何基于轮询的算法一样,在消息发布零星且队列可能长时间为空的情况下,它将非常浪费

如有疑问,请优先使用常规长期存在的消费者。

Java 客户端

请参阅Java 客户端指南获取示例。

.NET 客户端

请参阅.NET 客户端指南获取示例。

传递确认超时

RabbitMQ 对消费者传递确认强制执行超时。这是一个保护机制,用于检测消费者何时未确认消息传递。配置传递确认超时有助于防止磁盘数据压缩和节点磁盘空间耗尽。

工作原理

如果在超时值内消费者未确认其传递,则其通道将以 PRECONDITION_FAILED 通道异常关闭。消息将显示如下

Consumer 'consumer-tag-998754663370' on channel 1 and queue 'qq.1' in vhost '/' has timed out
waiting for a consumer acknowledgement of a delivery with delivery tag = 10. Timeout used: 180000 ms.
This timeout value can be configured, see consumers doc guide to learn more

该错误会由消费者连接到的节点记录。然后,该通道上的所有后续传递(来自所有消费者)将被重新排队。要解决 PRECONDITION_FAILED 通道异常,请重新评估您的消费者并考虑增加超时值。

RabbitMQ 的默认超时值为 30 分钟。是否执行超时会定期(以一分钟为间隔)进行评估。不支持小于一分钟的值,不推荐小于五分钟的值。

每节点配置

超时值可在 rabbitmq.conf 中配置(以毫秒为单位)

# 30 minutes in milliseconds
consumer_timeout = 1800000
# one hour in milliseconds
consumer_timeout = 3600000

可以使用advanced.config 禁用超时。这不推荐

%% advanced.config
[
{rabbit, [
{consumer_timeout, undefined}
]}
].

与其完全禁用超时,不如考虑使用一个较高的值(例如几个小时)。

每队列配置

从 RabbitMQ 3.12 开始,超时值也可以按队列配置。

通过策略进行每队列传递超时

设置 consumer-timeout 策略键。

值必须为毫秒。是否执行超时会定期(以一分钟为间隔)进行评估。

# override consumer timeout for a group of queues using a policy
rabbitmqctl set_policy queue_consumer_timeout "with_delivery_timeout\.*" '{"consumer-timeout":3600000}' --apply-to classic_queues

通过可选队列参数进行每队列传递超时

在声明队列时,将 x-consumer-timeout 可选队列参数设置为该队列。超时以毫秒为单位。是否执行超时会定期(以一分钟为间隔)进行评估。

限制每个通道的消费者数量

在某些可能发生消费者泄漏的场景中,限制每个通道上活跃的消费者数量是个好主意。这可以在 rabbitmq.conf 中使用设置 consumer_max_per_channel 进行配置

consumer_max_per_channel = 100

独占性

对于仅限经典队列,当使用 AMQP 0-9-1 客户端注册消费者时,可以将 basic.consume 方法的 exclusive 标志设置为 true,以请求该消费者成为目标队列上的唯一消费者。仅当当时队列上没有已注册的消费者时,该调用才会成功。这可以确保一次只有一个消费者从队列消费。

如果独占消费者被取消或死亡,应用程序有责任注册一个新的消费者以继续从队列消费。

如果同时需要独占消费消费连续性,请使用单一活跃消费者

重要

Quorum 队列将忽略 basic.consume 帧上的 exclusive 标志。对于 quorum 队列,请改用单一活跃消费者

单一活跃消费者

单一活跃消费者允许一次只有一个消费者从队列消费,并在当前活动消费者被取消或死亡时故障转移到另一个已注册的消费者。仅用一个消费者消费在消息必须按其进入队列的相同顺序被消费和处理时非常有用。

典型的事件顺序如下

  • 声明一个队列,并且一些消费者几乎同时注册到它。
  • 第一个注册的消费者成为单一活跃消费者:消息被分发给它,而其他消费者则被忽略。
  • 如果队列是 quorum 队列,并且新注册的消费者具有更高的优先级,那么队列将停止向当前活跃消费者传递消息。当所有消息都被确认后,新消费者将成为活跃消费者。
  • 当单一活跃消费者因某种原因被取消或死亡时,将选择另一个消费者作为活跃者。换句话说,队列会自动故障转移到另一个消费者。有关如何选择新消费者的更多详细信息,请参阅SAC 行为

请注意,如果没有启用单一活跃消费者功能,消息将以轮询方式分发给所有消费者。

警告

本节介绍了适用于经典队列和 quorum 队列上的 AMQP 0-9-1 和 AMQP 1.0 客户端的单一活跃消费者。它与流的单一活跃消费者功能有显著区别。

尝试使用 AMQP 0-9-1 客户端在流上启用 SAC将不起作用。要在流上使用 SAC,必须使用原生 RabbitMQ 流协议客户端

在 Quorum 和经典队列上启用单一活跃消费者

可以在声明队列时启用单一活跃消费者,将 x-single-active-consumer 参数设置为 true,例如使用 Java 客户端

Channel ch = ...;
Map<String, Object> arguments = new HashMap<String, Object>();
arguments.put("x-single-active-consumer", true);
ch.queueDeclare("my-queue", false, false, false, arguments);

与独占消费者的区别

AMQP 0-9-1 独占消费者相比,单一活跃消费者对应用程序端的维护消费连续性的压力较小。消费者只需注册,故障转移会自动处理,无需检测活跃消费者故障并注册新消费者。

确定当前哪个消费者是活跃的

管理 UIlist_consumers CLI 命令可以报告哪个消费者是当前在启用该功能的队列上的活跃消费者。

初始 SAC 选择

当与经典队列一起使用时,初始活跃消费者是随机选择的,即使消费者优先级正在使用。

如果队列是 quorum 队列,并且新注册的消费者具有更高的优先级,那么队列将停止向当前活跃消费者传递消息。当所有消息都被确认后,新消费者将成为活跃消费者。

在此宣布此功能的博客文章中了解有关此行为的更多信息。

SAC 和独占消费者互斥

尝试使用 SAC 注册独占消费者将导致错误。SAC 本身就假定有多个消费者在线。

SAC 和通道 Prefetch

消息总是传递给活跃的消费者,即使它在某个时候太忙。这可能发生在手动确认和 basic.qos 时,消费者可能忙于处理它通过 basic.qos 请求的未确认消息的最大数量。在这种情况下,其他消费者将被忽略,消息将被返回到队列。

SAC 无法通过策略启用

单一活跃消费者功能无法通过策略启用。由于 RabbitMQ 中的策略本质上是动态的,它们可以来来去去,启用和禁用它们声明的功能。想象一下突然在一个队列上禁用单一活跃消费者:代理将开始向非活跃消费者发送消息,消息将被并行处理,这与单一活跃消费者试图实现的目标完全相反。由于单一活跃消费者的语义与策略的动态性不兼容,因此该功能只能在声明队列时通过队列参数启用。

消费者活动

管理 UI 和 list_consumers CLI 命令会报告消费者的 active 标志。此标志的值取决于几个参数。

  • 对于经典队列,当单一活跃消费者未启用时,该标志始终为 true
  • 对于 quorum 队列且单一活跃消费者未启用时,该标志默认为 true,并且在消费者连接到的节点被怀疑宕机时设置为 false
  • 如果单一活跃消费者已启用,则该标志仅设置为当前单一活跃消费者,队列上的其他消费者正在等待,如果活跃消费者消失则被提升,因此其 active 设置为 false

优先级

通常,连接到队列的活跃消费者以轮询方式接收消息。

消费者优先级允许您确保高优先级消费者在活动时接收消息,只有在高优先级消费者被阻塞时(例如,由于有效的prefetch设置)消息才会发送到低优先级消费者。

当使用消费者优先级时,如果存在多个具有相同高优先级的活跃消费者,消息将以轮询方式传递。

消费者优先级在单独指南中涵盖。

异常处理

消费者应负责处理在处理传递或其他消费者操作期间发生的任何异常。这些异常应被记录、收集并忽略。

如果由于依赖项不可用或类似原因,消费者无法处理传递,它应该清楚地记录下来并取消自身,直到它能够再次处理传递。这将使 RabbitMQ 和监控系统可以看到消费者的不可用性。

并发注意事项

消费者并发主要是客户端库实现细节和应用程序配置的问题。对于大多数客户端库(例如 Java、.NET、Go、Erlang),传递会被分发到一个线程池(或类似机制)来处理所有异步消费者操作。该池通常具有可控的并发度。

Java 和 .NET 客户端保证单个通道上的传递将按接收顺序进行分发,而不管并发度如何。请注意,一旦分发,并发处理传递将导致执行线程之间的自然竞态条件。

某些客户端(例如 Bunny)和框架可能会选择将消费者分发池限制为单个线程(或类似机制),以避免在并发处理传递时的自然竞态条件。一些应用程序依赖于严格按顺序处理传递,因此必须使用一个并发因子或在自己的代码中处理同步。可以并发处理传递的应用程序可以使用高达其可用核心数量的并发度。

队列并行性注意事项

单个 RabbitMQ 队列绑定到单个核心。使用多个队列来提高节点上的 CPU 利用率。诸如分片一致性哈希交换之类的插件有助于提高并行性。

© . This site is unofficial and not affiliated with VMware.