跳至主要内容
版本:4.0

消费者

概述

本指南涵盖了与消费者相关的各种主题

等等。

术语

术语“消费者”在不同的上下文中含义不同。通常,在消息传递和流式传输的上下文中,消费者是一个使用并确认消息的应用程序(或应用程序实例)。同一个应用程序也可以发布消息,因此同时也是发布者。

消息传递协议也具有用于消息传递的持久订阅的概念。订阅是通常用于描述此类实体的一个术语。消费者是另一个。RabbitMQ 支持的消息传递协议都使用这两个术语,但 RabbitMQ 文档倾向于使用后者。

从这个意义上说,消费者是在开始传递之前必须注册的消息传递订阅,并且可以由应用程序取消。

基础知识

RabbitMQ 是一个消息代理。它接受来自发布者的消息,对其进行路由,如果存在要路由到的队列,则将其存储以供消费,或者立即传递给消费者(如果有)。

消费者从队列中消费。为了消费消息,必须存在一个队列。当添加新的消费者时,假设队列中已经有准备好的消息,则传递将立即开始。

目标队列在消费者注册时可能是空的。在这种情况下,当新消息入队时,将发生第一次传递。

尝试从不存在的队列中消费将导致通道级异常,代码为 404 Not Found,并使尝试使用的通道关闭。

消费者标签

每个消费者都有一个标识符,客户端库使用该标识符来确定为给定的传递调用哪个处理程序。它们的名称因协议而异。消费者标签和订阅 ID 是两个最常用的术语。RabbitMQ 文档倾向于使用前者。

消费者标签也用于取消消费者。

消费者生命周期

消费者应该具有较长的生命周期:也就是说,在消费者的整个生命周期中,它都会收到多个传递。注册一个消费者来消费一条消息不是最佳做法。

消费者通常在应用程序启动期间注册。它们通常会与其连接甚至应用程序运行的时间一样长。

消费者可以更具动态性,并对系统事件做出反应进行注册,并在不再需要时取消订阅。这在通过 Web STOMPWeb MQTT 插件使用的 WebSocket 客户端、移动客户端等中很常见。

连接恢复

客户端可能会丢失与 RabbitMQ 的连接。当检测到连接丢失时,消息传递将停止。

一些客户端库提供自动连接恢复功能,其中涉及消费者恢复。Java.NETBunny 是此类库的示例。虽然连接恢复无法涵盖 100% 的场景和工作负载,但它通常对消费应用程序非常有效,建议使用。

对于其他客户端库,应用程序开发人员负责执行连接恢复。通常,以下恢复顺序效果很好

  • 恢复连接
  • 恢复通道
  • 恢复队列
  • 恢复交换机
  • 恢复绑定
  • 恢复消费者

换句话说,消费者通常最后恢复,在其目标队列和这些队列的绑定到位之后。

重要

请注意,自动恢复使用自动删除和排他队列的连接应确保这些队列是服务器命名的

注册消费者(订阅,“推送 API”)

应用程序可以订阅以让 RabbitMQ 将入队消息(传递)推送到它们。这是通过在队列上注册消费者(订阅)来完成的。订阅到位后,RabbitMQ 将开始传递消息。对于每次传递,都会调用用户提供的处理程序。根据使用的客户端库,这可以是用户提供的函数或对象,该函数或对象符合特定接口。

成功的订阅操作将返回一个订阅标识符(消费者标签)。稍后可用于取消消费者。

Java 客户端

有关示例,请参阅 Java 客户端指南

.NET 客户端

有关示例,请参阅 .NET 客户端指南

消息属性和传递元数据

每次传递都结合了消息元数据和传递信息。不同的客户端库使用略微不同的方法来提供对这些属性的访问。通常,传递处理程序可以访问传递数据结构。

以下属性是传递和路由详细信息;它们本身并不是消息属性,而是在路由和传递时由 RabbitMQ 设置的

属性类型描述
传递标签正整数

传递标识符,请参阅 确认

重新传递布尔值如果此消息之前传递并重新入队,则设置为 true
交换机字符串路由此消息的交换机
路由键字符串发布者使用的路由键
消费者标签字符串消费者(订阅)标识符

以下是消息属性。大多数是可选的。它们由发布者在发布时设置

属性类型描述必需?
传递模式枚举(1 或 2)

2 表示“持久”,1 表示“瞬态”。一些客户端库将此属性公开为布尔值或枚举。

类型字符串特定于应用程序的消息类型,例如“orders.created”
标头映射(字符串 => 任何)带有字符串标头名称的任意标头映射
内容类型字符串内容类型,例如“application/json”。由应用程序使用,而不是核心 RabbitMQ
内容编码字符串内容编码,例如“gzip”。由应用程序使用,而不是核心 RabbitMQ
消息 ID字符串任意消息 ID
相关 ID字符串帮助将请求与响应关联,请参阅 教程 6
回复到字符串承载响应队列名称,请参阅 教程 6
过期时间字符串每条消息的 TTL
时间戳时间戳应用程序提供的时间戳
用户 ID字符串用户 ID,如果设置,则已验证
应用程序 ID字符串应用程序名称

消息类型

消息上的 type 属性是帮助应用程序传达消息类型的任意字符串。它由发布者在发布时设置。该值可以是发布者和消费者商定的任何特定于域的字符串。

RabbitMQ 不会验证或使用此字段,它供应用程序和插件使用和解释。

实践中的消息类型自然会分为组,点分隔命名约定很常见(但 RabbitMQ 或客户端不要求),例如 orders.createdlogs.lineprofiles.image.changed

如果消费者收到无法处理的类型的传递,则强烈建议记录此类事件以方便故障排除。

内容类型和编码

内容(MIME 媒体)类型和内容编码字段允许发布者传达消费者应如何反序列化和解码消息有效负载。

RabbitMQ 不会验证或使用这些字段,它供应用程序和插件使用和解释。

例如,具有 JSON 有效负载的消息应使用 application/json。如果有效负载使用 LZ77(GZip)算法压缩,则其内容编码应为 gzip

可以通过逗号分隔来指定多个编码。

确认模式

在注册消费者时,应用程序可以选择两种传递模式之一

  • 自动(传递不需要确认,也称为“即发即弃”)
  • 手动(传递需要客户端确认)

消费者确认是单独的文档指南的主题,以及发布者确认,这是发布者密切相关的概念。

使用预取限制同时传递

在手动确认模式下,消费者可以限制“正在传输”的消息数量(网络传输中或已传递但未确认)。这可以避免消费者过载。

此功能以及消费者确认是单独的文档指南的主题。

消费者容量指标

RabbitMQ 管理 UI 以及 监控数据 端点(例如 Prometheus 采集 端点)会为各个队列显示一个名为消费者容量(以前称为消费者利用率)的指标。

该指标计算的是队列能够立即将消息传递给消费者的时间的比例。它可以帮助操作员发现可能值得向队列添加更多消费者(应用程序实例)的情况。

如果此数字小于 100%,则队列领导者副本如果满足以下条件,则可能能够更快地传递消息:

  • 有更多消费者,或者
  • 消费者花费更少的时间处理传递,或者
  • 消费者通道使用了更高的预取值

对于没有消费者的队列,消费者容量将为 0%。对于有在线消费者但没有消息流的队列,该值将为 100%:其含义是任何数量的消费者都可以维持这种传递速率。

请注意,消费者容量只是一个提示。消费者应用程序可以并且应该收集有关其操作的更具体的指标,以帮助进行调整和任何可能的容量更改。

取消消费者(取消订阅)

要取消消费者,必须知道其标识符(消费者标签)。

取消消费者后,将不会再向其分发未来的传递。请注意,仍然可能存在之前分发的“正在传输”的传递。取消消费者不会丢弃或重新排队它们。

取消的消费者不会观察到任何新的传递,除了在 RabbitMQ 处理 basic.cancel 方法时正在传输的传递。所有之前未确认的传递都不会受到任何影响。要重新排队正在传输的传递,应用程序必须关闭通道。

Java 客户端

有关示例,请参阅 Java 客户端指南

.NET 客户端

有关示例,请参阅 .NET 客户端指南

获取单个消息(“拉取 API”)

使用 AMQP 0-9-1,可以使用 basic.get 协议方法逐条获取消息。消息按 FIFO 顺序获取。可以像消费者(订阅)一样使用自动或手动确认。

强烈不建议逐条获取消息,因为与常规的长期存在的消费者相比,它非常低效。与任何基于轮询的算法一样,在消息发布零散且队列可能长时间保持为空的系统中,它将极其浪费资源

如有疑问,请优先使用常规的长期存在的消费者。

Java 客户端

有关示例,请参阅Java 客户端指南

.NET 客户端

有关示例,请参阅.NET 客户端指南

传递确认超时

RabbitMQ 对消费者传递确认实施超时。这是一种保护机制,有助于检测从不确认传递的有问题的(卡住的)消费者。此类消费者会影响节点的磁盘数据压缩,并可能导致节点耗尽磁盘空间。

工作原理

如果消费者在超过超时值(默认情况下为 30 分钟)后仍未确认其传递,则其通道将关闭,并显示 PRECONDITION_FAILED 通道异常。

消费者连接到的节点将记录该错误。日志该消费者连接到的节点将记录该错误。该通道上所有来自所有消费者的未完成传递都将重新排队

是否应实施超时会定期评估,间隔为一分钟。不支持小于一分钟的值,也不建议使用小于五分钟的值。

每个节点配置

超时值可在rabbitmq.conf(以毫秒为单位)中配置。

# 30 minutes in milliseconds
consumer_timeout = 1800000
# one hour in milliseconds
consumer_timeout = 3600000

可以使用advanced.config停用超时。不建议这样做。

%% advanced.config
[
{rabbit, [
{consumer_timeout, undefined}
]}
].

与其完全禁用超时,不如考虑使用较高的值(例如,几小时)。

每个队列配置

从 RabbitMQ 3.12 开始,还可以为每个队列配置超时值。

使用策略的每个队列传递超时

设置 consumer-timeout 策略键。

该值必须以毫秒为单位。是否应实施超时会定期评估,间隔为一分钟。

# override consumer timeout for a group of queues using a policy
rabbitmqctl set_policy queue_consumer_timeout "with_delivery_timeout\.*" '{"consumer-timeout":3600000}' --apply-to classic_queues

使用可选队列参数的每个队列传递超时

在声明队列时,为队列设置 x-consumer-timeout 可选队列参数。超时以毫秒为单位指定。是否应实施超时会定期评估,间隔为一分钟。

限制每个通道的消费者数量

在某些可能发生消费者泄漏的场景中,最好限制每个通道上可以处于活动状态的消费者数量。这可以在rabbitmq.conf 中使用设置 consumer_max_per_channel 进行配置。

consumer_max_per_channel = 100

排他性

使用 AMQP 0-9-1 客户端注册消费者时,可以将exclusive 标志设置为 true 以请求消费者成为目标队列上的唯一消费者。只有在此时没有其他消费者已注册到队列时,此调用才会成功。这可以确保一次只有一个消费者从队列中消费。

如果排他性消费者被取消或死亡,则应用程序负责注册一个新的消费者以继续从队列中消费。

如果需要排他性消费消费连续性,则单个活动消费者可能更合适。

单个活动消费者

单个活动消费者允许一次只有一个消费者从队列中消费,并在活动消费者被取消或死亡时故障转移到另一个已注册的消费者。当必须按消息到达队列的相同顺序消费和处理消息时,仅使用一个消费者进行消费非常有用。

典型的事件顺序如下:

  • 声明一个队列,并且一些消费者几乎同时注册到该队列。
  • 第一个注册的消费者成为单个活动消费者:消息将分发给它,而其他消费者将被忽略。
  • 如果队列是仲裁队列,并且一个新的消费者以更高的优先级注册,则队列将停止将消息传递给当前活动消费者。当所有消息都得到确认时,新的消费者将成为活动消费者。
  • 当单个活动消费者因某种原因被取消或死亡时,将选择另一个消费者作为活动消费者。换句话说,队列会自动故障转移到另一个消费者。有关如何选择新消费者的更多详细信息,请参阅SAC 行为

请注意,在未启用单个活动消费者功能的情况下,消息将使用循环方式分发给所有消费者。

请注意:本节涵盖适用于经典和仲裁队列上 AMQP 0-9-1 和 AMQP 1.0 客户端的单个活动消费者。它与流上的单个活动消费者无关。

尝试在流上使用 AMQP 0-9-1 客户端启用 SAC 将不起作用。要在流上使用 SAC,必须使用本机 RabbitMQ 流协议客户端

在仲裁和经典队列上启用单个活动消费者

在声明队列时可以启用单个活动消费者,方法是将 x-single-active-consumer 参数设置为 true,例如,使用 Java 客户端

Channel ch = ...;
Map<String, Object> arguments = new HashMap<String, Object>();
arguments.put("x-single-active-consumer", true);
ch.queueDeclare("my-queue", false, false, false, arguments);

与排他性消费者的区别

AMQP 0-9-1 排他性消费者相比,单个活动消费者减少了应用程序方面维持消费连续性的压力。消费者只需注册,故障转移就会自动处理,无需检测活动消费者故障并注册新的消费者。

确定当前活动的是哪个消费者

管理 UICLI 可以报告在启用了该功能的队列上哪个消费者是当前活动消费者。

初始 SAC 选择

与经典队列一起使用时,即使正在使用消费者优先级,也会随机选择初始活动消费者。

如果队列是仲裁队列,并且一个新的消费者以更高的优先级注册,则队列将停止将消息传递给当前活动消费者。当所有消息都得到确认时,新的消费者将成为活动消费者。

有关此行为的更多信息,请参阅宣布此功能的博文

SAC 和排他性消费者是互斥的

尝试使用 SAC 注册排他性消费者将导致错误。SAC 根据定义假设将有多个消费者在线。

SAC 和通道预取

消息始终传递给活动消费者,即使它在某些时候太忙。当使用手动确认和basic.qos时,可能会发生这种情况,消费者可能忙于处理它使用basic.qos请求的最大未确认消息数。在这种情况下,其他消费者将被忽略,并且传递将返回到队列。

SAC 无法通过策略启用

单一活动消费者功能无法通过策略启用。由于 RabbitMQ 中的策略本质上是动态的,因此它们可以出现和消失,启用和禁用它们声明的功能。想象一下突然禁用队列上的单一活动消费者:代理将开始向非活动消费者发送消息,并且消息将并行处理,这与单一活动消费者试图实现的目标完全相反。由于单一活动消费者的语义与策略的动态特性不兼容,因此此功能仅在声明队列时,使用队列参数才能启用。

消费者活动

管理 UIlist_consumers CLI 命令会为消费者报告一个active标志。此标志的值取决于多个参数。

  • 对于经典队列,当未启用单一活动消费者时,该标志始终为true
  • 对于仲裁队列以及当未启用单一活动消费者时,默认情况下该标志为true,如果消费者连接到的节点被怀疑已关闭,则将其设置为false
  • 如果启用了单一活动消费者,则仅为当前的单一活动消费者设置该标志为true,队列上的其他消费者正在等待在活动消费者消失时被提升,因此它们的活动设置为false

优先级

通常,连接到队列的活动消费者以循环方式接收来自该队列的消息。

消费者优先级允许您确保高优先级消费者在处于活动状态时接收消息,并且仅当高优先级消费者被阻塞(例如,通过有效的预取设置)时,消息才会传递给低优先级消费者。

当使用消费者优先级时,如果多个活动消费者具有相同的最高优先级,则以循环方式传递消息。

消费者优先级在单独指南中进行了介绍。

异常处理

预计消费者将在处理传递或任何其他消费者操作期间出现的任何异常。应记录、收集和忽略此类异常。

如果消费者由于依赖项不可用或类似原因而无法处理传递,则应清楚地记录并取消自身,直到它能够再次处理传递。这将使消费者的不可用性对 RabbitMQ 和监控系统可见。

并发注意事项

消费者并发主要取决于客户端库实现细节和应用程序配置。对于大多数客户端库(例如 Java、.NET、Go、Erlang),传递将分派到处理所有异步消费者操作的线程池(或类似)。该池通常具有可控的并发度。

Java 和 .NET 客户端保证在单个通道上的传递将按照接收到的顺序分派,无论并发度如何。请注意,一旦分派,传递的并发处理将导致执行处理的线程之间出现自然的竞争条件。

某些客户端(例如 Bunny)和框架可能会选择将消费者分派池限制为单个线程(或类似),以避免在并发处理传递时出现自然的竞争条件。某些应用程序依赖于传递的严格顺序处理,因此必须使用并发因子 1 或在自己的代码中处理同步。可以并发处理传递的应用程序可以使用并发度,直到它们可用的核心数量。

队列并行性注意事项

单个 RabbitMQ 队列绑定到单个核心。使用多个队列来提高节点上的 CPU 利用率。诸如分片一致哈希交换之类的插件可以帮助提高并行性。

© 2024 RabbitMQ. All rights reserved.