跳至主内容

RabbitMQ 4.3 新特性亮点

·阅读 18 分钟

我们很高兴地宣布 RabbitMQ 4.3 正式发布。此版本带来了强大的新功能,旨在帮助您构建更具弹性、可扩展性和可观测性的消息传递架构。

开源消息代理的主要增强功能包括:

我们还在 VMware Tanzu RabbitMQ 中引入了以下新的企业级功能:

开源 RabbitMQ

仲裁队列特性

仲裁队列于七年前在 RabbitMQ 3.8 中引入。它们是确保数据安全的首选队列类型,可将消息持久化到磁盘并跨 RabbitMQ 节点进行复制。

仲裁队列基于 Raft 一致性算法构建,实现了复制状态机。这确保了网络故障期间的数据一致性,只要队列的大多数成员保持在线,即可维持可用性。

几乎每个次要版本都为仲裁队列带来了增强,RabbitMQ 4.3 也不例外,提供了重要的新功能。

支持 32 个严格消息优先级

在 RabbitMQ 4.0 之前,仲裁队列不支持消息优先级。

RabbitMQ 4.0 到 4.2 支持 两个相对优先级:普通和高。如果队列中同时包含这两种优先级,消费者收到的消息比例为每收到一条普通优先级消息,就会收到两条高优先级消息。

RabbitMQ 4.3 引入了对 32 个严格优先级级别的支持。现在,优先级较高的消息将严格在优先级较低的消息之前投递给消费者。

如下图橙色矩形框所示,管理 UI 现在会显示每个仲裁队列中按优先级划分的消息计数,允许操作员按优先级级别监控队列深度。

Management UI - Quorum Queue Details
管理 UI - 仲裁队列详情

红色矩形框重点标注了新的延迟重试功能,我们将在下文介绍。

延迟重试

当特定消息因瞬时故障无法处理时,延迟重试至关重要。常见示例包括:

  • 特定实体的速率限制: API 可能对特定用户 ID 的请求进行速率限制,同时允许其他用户的请求。
  • 数据库锁定: 数据库中的某一行可能被临时锁定。暂停整个消费者将阻止您更新成千上万个未被锁定的其他行。
注意

如果消费者无法处理任何消息(例如,由于下游整个数据库离线),最好临时暂停消费者,而不是延迟每一条消息。一旦数据库恢复,您可以重新启动消费者。

在 RabbitMQ 4.3 之前,实现延迟重试需要两种复杂变通方法之一:

  1. 死信循环:消费者可以拒绝消息,将其路由到 死信队列。该队列配置了 消息 TTL,超时后消息会被重新发送回原始队列。

    +-------------------+ 1) Consume +----------+
    | Original Queue | -------------------> | Consumer |
    +-------------------+ <------------------- +----------+
    | ^ 2) Reject
    | |
    | |
    | 3) Dead-letter | 4) Dead-letter after TTL
    | |
    v |
    +-------------------+
    | Dead-letter Queue |
    | (with TTL) |
    +-------------------+

    缺点是 RabbitMQ 必须对消息进行两次重写。此外,默认的“最多一次”死信策略存在消息丢失风险,而 “至少一次”死信策略 则引入了其自身的 注意事项

  2. 重新发布到消息调度器:或者,消费者可以将失败的消息重新发布到 消息调度器

    +-------------------+ 1) Consume +----------+
    | Original Queue | -------------------> | Consumer |
    +-------------------+ <------------------- +----------+
    ^ 3) Acknowledge |
    | |
    | 4) Publish after delay | 2) Re-publish new message with delay
    | |
    | |
    +-------------------+ |
    | Message Scheduler | <----------------------+
    +-------------------+

    这同样会强制重写消息。此外,如果客户端与 RabbitMQ 之间在重新发布之后、确认之前发生网络故障,消息既会被重新发布也会被重新入队。这会导致重复:调度器中有一份,原始队列中还有一份。

RabbitMQ 4.3 引入了一种更简洁的解决方案:仲裁队列在内部直接将延迟消息搁置,仅在配置的延迟时间过后才将其提供给任何消费者重新投递。

+--------------------------------+
| Quorum Queue |
| | 1) Consume +----------+
| [Message set aside until | -------------------> | Consumer |
| configured delay elapses] | <------------------- +----------+
| | 2) Return
+--------------------------------+

主要优势包括:

  • 性能提升: 消息无需重写,降低了代理的资源开销。
  • 更简单、更安全: 消除了消息丢失或重复的风险。
  • 精细控制: 可以按队列或按消息配置延迟。

要配置延迟重试,请使用以下队列参数或策略:

  • x-delayed-retry-type(队列参数)或 delayed-retry-type策略):定义延迟消息的条件。选项包括 disabled(默认)、allreturnedfailed。(关于 returnedfailed 的区别,请参阅下文的“无限重试”表)。
  • x-delayed-retry-min / delayed-retry-min:以毫秒为单位的最小延迟时间。
  • x-delayed-retry-max / delayed-retry-max:以毫秒为单位的最大延迟时间(默认为 delayed-retry-min)。

延迟时间根据消息的 delivery-count 使用线性退避算法计算得出。

min(delayed-retry-min * delivery-count, delayed-retry-max)

因此,消息被返回的次数越多,延迟时间就越长。例如,设置 delayed-retry-min=30000delayed-retry-max=120000 时:

  • 第 1 次返回:30秒延迟
  • 第 2 次返回:60秒延迟
  • 第 3 次返回:90秒延迟
  • 第 4 次及以后:120秒延迟(上限为最大值)

消费者还可以按每条消息动态覆盖此延迟。使用 AMQP 1.0 时,消费者可以在 modified outcome 中使用 x-opt-delivery-time 注解(以毫秒为单位的 Unix 时间戳)返回消息。此明确指令会覆盖计算出的线性退避延迟,从而精确控制重新投递的时机。

按消息定义延迟重试对于特定实体的速率限制非常有用。假设您的消费者在第三方 SaaS 平台中更新记录。如果 API 拒绝了租户 A 的更新并返回 HTTP 429(可能是达到了他们的特定层级限制)以及 Retry-After 头部信息,您肯定不想暂停整个消费者并阻塞租户 B 和 C 的更新。相反,您的消费者可以解析该头部信息,并仅针对租户 A 的消息设置 x-opt-delivery-time。这确保了消息在 API 准备好处理该特定租户时被重新投递,而系统的其余部分则正常运行。

支持无限重试

在 RabbitMQ 4.2 之前,无论原因如何,每条重新入队的消息都会将其 delivery-count 增加 1。毒丸消息处理会在该计数超过队列的 delivery-limit(默认为 20)时将消息转入死信队列。

从 RabbitMQ 4.3 开始,仲裁队列会跟踪两个不同的计数器:acquired-countdelivery-count

  • acquired-count每次消息重新入队时都会增加。
  • delivery-count:仅在消息因失败的投递尝试而重新入队时才会增加。

由于毒丸消息处理仍然与 delivery-count 挂钩,因此在将尝试标记为失败的情况下返回的消息不再计入限制。简而言之,RabbitMQ 4.3 现在支持无限重试。

根据定义,acquired-count 将始终大于或等于 delivery-count。下表概述了构成投递失败的情况:

触发条件acquired-count 增加delivery-count 增加(投递尝试失败
AMQP 1.0 released 配置
AMQP 1.0 rejected 配置
AMQP 1.0 modified 配置(delivery-failed=false
AMQP 1.0 modified 配置(delivery-failed=true
AMQP 0.9.1 basic.nack
AMQP 0.9.1 basic.reject
客户端崩溃 / 连接丢失
集群内网络分区(怀疑消费者节点宕机)
消费者超时(消费者未及时确认)

消费者超时

消费者超时(也称为“消息锁定”)通过限制消费者持有未确认消息的时间来防止其变慢或卡死,从而在代理干预前起到保护作用。

从历史上看,超时是在所有队列类型中全局评估的,并且超时触发的是破坏性的 AMQP 0.9.1 通道关闭。这会突然终止通道上的所有消费者,而不仅仅是卡住的那一个。

在 RabbitMQ 4.3 中,超时处理已直接转移到仲裁队列和新的 JMS 队列中。由于 经典队列 很少需要此功能,它们不再评估消费者超时。

当仲裁队列发生消费者超时时,超时的消息会被返回到队列(使其可供重新投递),并且服务器会使用更优雅的、特定于协议的机制通知客户端:

  • AMQP 1.0: 超时消息通过 DISPOSITION(state=released) 释放,而不是断开链路。这允许消费者在无需重新连接的情况下恢复。
  • AMQP 0.9.1: 如果客户端支持 consumer_cancel_notify 能力(大多数现代客户端都支持),服务器会发送 basic.cancel 通知,仅取消超时的消费者,从而保持通道和其他消费者的完好无损。如果客户端缺乏此能力,服务器将回退到关闭通道以保持向后兼容性。

消费者超时可以在四个级别上以毫秒为单位进行配置(优先级从高到低):

  1. 消费者参数: x-consumer-timeout(在创建消费者时)。
  2. 队列参数: x-consumer-timeout(在声明队列时)。
  3. 队列策略: consumer-timeout 键。
  4. 全局配置: rabbitmq.conf 中的 consumer_timeout(默认为 1800000 毫秒,即 30 分钟)。
提示

如果您的消费者执行合法的长时间任务(超过 30 分钟),请考虑增加超时时间。

相反,如果您希望快速处理消息的消费者缩短超时时间,那么在消费者卡住时,较短的超时时间可确保消息能更快地重新投递给其他健康的消费者。

内存使用量降低 50%

对于大小不超过 32 KiB 的消息,每条消息的内存开销已减半,从而显著减少了仲裁队列的整体内存占用。

Khepri

RabbitMQ 元数据包括队列、交换机绑定虚拟主机、用户和权限的定义。从历史上看,这些数据存储在 Mnesia 中,即 Erlang/OTP 的内置分布式键值数据库。

然而,Mnesia 有两个主要缺点:

  1. 它在网络分区期间容易产生不一致。
  2. 由于需要相同数据库锁的事务会面临人为延迟,它在高并发操作下的性能表现较差。

为了解决这个问题,RabbitMQ 团队开发了 Khepri,这是一个基于 Raft 构建的开源分布式数据库。Khepri 保证了网络故障期间的一致性,并通过无锁串行执行事务,在高并发客户端操作下提供了卓越的性能。

在 3.13 版本中作为实验性支持并从 4.2 版本起成为新集群的默认存储后,Khepri 现在是 RabbitMQ 4.3 中唯一受支持的元数据存储。 Mnesia 以及分区处理策略(如 pause_if_all_downpause_minorityautoheal)已被完全移除。

注意

我们建议在升级到 4.3 之前启用 khepri_db 功能标志。否则,RabbitMQ 将在 4.3 节点启动过程中将所有元数据从 Mnesia 迁移到 Khepri。

AMQP 1.0 改进

继 4.0 中引入 原生 AMQP 1.0 支持,4.1 中引入 Websocket 上的 AMQP 1.0过滤表达式,以及 4.2 中引入 SQL 过滤表达式 之后,RabbitMQ 4.3 又带来了两项 AMQP 1.0 增强功能。

拒绝方与拒绝原因

当队列拒绝消息时,RabbitMQ 现在会在 Rejected outcome 中将队列名称和具体的拒绝原因(例如:达到最大长度、队列不可用)传递给 AMQP 1.0 发布者。当多个队列绑定到单个交换机时,这非常有用,因为它允许发布者查明到底是哪个队列失败以及失败原因。

消费者活动通知

RabbitMQ 现在通过 AMQP 1.0 流帧 (flow frame) 属性向消费者发送链路状态信息。在启用 单一活动消费者 (Single Active Consumer) 的仲裁队列中消费时,RabbitMQ 将在消费者从非活动(等待)状态转变为活动状态(反之亦然)时立即通知消费者。


VMware Tanzu RabbitMQ

VMware Tanzu RabbitMQ 是 RabbitMQ 的商业版本,在开源核心的基础上提供了企业级的闭源插件。4.3 版本包含了几个强大的新扩展。

JMS 队列类型

VMware Tanzu RabbitMQ 4.3 引入了一种专为 JMS (Java Message Service) 应用程序设计的队列类型。在 Raft 的支持下,新的 JMS 队列类型 提供了与仲裁队列相同的安全保证:消息及其投递状态在节点间进行复制,即使在网络故障期间也能确保持久性和一致性。

这种架构相对于传统的 JMS 代理具有巨大的优势,传统代理通常依赖共享存储 (SAN/NAS) 或脆弱的主/被动故障转移,极易出现脑裂和数据丢失情况。

虽然针对 AMQP 1.0 上的 Qpid JMS 客户端 进行了优化,但该队列在设计上是跨协议的。消费者可以是 JMS 应用程序,而发布者可以是运行在 AMQP 1.0、AMQP 0.9.1、STOMP 或 MQTT 上的非 Java 客户端。

JMS 消息选择器

消息选择器 允许消费者使用 SQL 语法在代理端直接过滤消息。

要启用此功能,必须使用 x-selector-fields 参数或 selector-fields 策略配置队列(例如:selector-fields = ["category", "price", "in_stock"]),这会将这些消息属性保存在内存中以进行快速评估。

// Create a consumer with a message selector
String selector = "category = 'electronics' AND price BETWEEN 100 AND 500 AND in_stock = TRUE";
MessageConsumer consumer = session.createConsumer(queue, selector);

// Receives only messages matching the SQL expression
Message msg = consumer.receive(5000);

队列浏览器

队列浏览器 允许消费者以非破坏性的方式检查队列中的消息,而无需移除它们,这非常适合在不中断主动处理流程的情况下进行调试。浏览器还完全支持消息选择器!

// Create a QueueBrowser with a message selector
String selector = "category = 'electronics' AND price > 1000";
QueueBrowser browser = session.createBrowser(queue, selector);
Enumeration enumeration = browser.getEnumeration();

// Iterate through the matching messages
while (enumeration.hasMoreElements()) {
Message msg = (Message) enumeration.nextElement();
System.out.println("Browsed high-value electronics order: " + msg);
}

browser.close();

消息投递延迟

发布者现在可以指定 投递延迟。队列将接受该消息,但会对其隐藏,直到指定的时间过后才向消费者公开。

MessageProducer producer = session.createProducer(queue);
producer.setDeliveryDelay(10_000); // 10 seconds
producer.send(session.createTextMessage("Delayed payload"));

Apache Spark 连接器

Apache Spark 是一个用于大规模数据分析和机器学习任务的流行开源项目。

VMware Tanzu RabbitMQ 4.3 附带了一个新的 RabbitMQ 流连接器 (Stream Connector),可在 RabbitMQ Streams 和 Apache Spark 集群之间提供高速、并行的传输。它架起了高吞吐量消息处理与大数据分析之间的桥梁。通过结合两者,您可以获得一个强大的端到端流水线,其中 RabbitMQ 作为可靠、高速的摄入层(“神经系统”),而 Spark 则作为分布式计算引擎(“大脑”)。

这解锁了高级的企业级用例:

  • 统一批处理和流处理: 因为流可以持久化存储数据并支持任意偏移量的重放,完全相同的结构化流 (Structured Streaming) 代码即可处理历史批次数据,也能处理实时数据。
  • 实时机器学习 (MLlib): 在历史数据上训练模型,并将其应用于实时流,以实现即时欺诈检测或动态定价。
  • 复杂的时间窗口聚合: 直接针对实时 RabbitMQ 流执行基于时间窗口的 Spark SQL 查询(例如:“5 分钟滚动平均温度”)。
  • 实时扩充: 在将扩充后的数据存入之前,将快速移动的事件流与静态数据库表进行联接。
  • 数据归档: 将历史流数据批量存入长期的冷存储或数据湖中。

主要功能:

  • 超级流 (Super Stream) 支持: 从单个流无缝扩展到分区的超级流。
  • 灵活的偏移量: 从绝对起始、当前末尾、特定偏移量或时间戳开始读取。
  • 优化的字段提取: 仅提取所需的字段以最小化内存消耗。
  • 内置速率限制: 限制每个触发间隔的记录数以获得可预测的性能。
  • 完全 AMQP 访问: 同时读取载荷、属性和时间字段。
  • 确定性分区: 用于将 Spark 数据写回 RabbitMQ 流的基于哈希的分区。

示例:物联网传感器聚合

要了解其实际效果,想象成千上万的物联网传感器正在发布到超级流。使用 Spark Structured Streaming,我们可以解析 JSON、过滤掉噪音并在保存到 Postgres 数据库之前计算滚动平均值。

// 1. Read from the RabbitMQ Super Stream
Dataset<Row> rawStream = spark.readStream()
.format("rabbitmq-stream")
.option("uris", "rabbitmq-stream://guest:guest@localhost:5552/%2f")
.option("super.stream", "iot-sensor-stream")
.option("starting.offsets", "next")
.option("rmq.stream.select.fields", "timestamp,payload")
.load();

// 2. Parse the JSON payload
StructType sensorSchema = new StructType()
.add("device_id", DataTypes.StringType)
.add("temperature", DataTypes.DoubleType)
.add("humidity", DataTypes.DoubleType);

Dataset<Row> parsed = rawStream
.select(col("timestamp"), from_json(col("payload").cast("string"), sensorSchema).as("data"))
.select(col("timestamp"), col("data.*"));

// 3. Filter out invalid readings
Dataset<Row> valid = parsed.filter(
col("temperature").between(-40, 80)
.and(col("humidity").between(0, 100))
.and(col("device_id").isNotNull())
);

// 4. Aggregate metrics over a 30-second tumbling window
Dataset<Row> deviceAgg = valid
.groupBy(
window(col("timestamp"), "30 seconds"),
col("device_id")
)
.agg(
avg("temperature").as("avg_temp"),
avg("humidity").as("avg_humidity"),
count("*").as("reading_count")
);

// 5. Upsert aggregated data to a database
StreamingQuery query = deviceAgg.writeStream()
.outputMode("append")
.trigger(Trigger.ProcessingTime("5 seconds"))
.foreachBatch((batchDF, batchId) -> {
batchDF.write().jdbc("jdbc:postgresql://:5432/iot", "device_stats", connectionProperties);
})
.start();

流浏览器

流浏览器 (Stream Browser) 是一个新的管理 UI 插件,可直接观察流内容。它位于流浏览器选项卡下,提供:

  • 统一搜索: 即时定位流和超级流。
  • 灵活导航: 按绝对偏移量、时间戳或从头部/尾部开始浏览。
  • 深度检查: 以表格格式查看数据或展开单个消息以查看所有 AMQP 1.0 部分。
  • 结构视图: 按物理段文件和块进行导航,以了解底层的磁盘布局。
  • 选择性下载: 选择下载哪些消息部分,从而节省带宽。

以下是流浏览器展示 磁盘上流布局 的效果:

Stream Browser
流浏览器

消息调度器

消息调度器允许生产者指定未来的目标投递时间。代理将消息保留在目标队列之外,直到延迟时间过期,届时它会在内部发布该消息。

这种“在时间 T 发布”的范式非常适合:

  • 业务工作流: “2 小时后发送放弃购物车的电子邮件”。
  • 时间解耦: 确保事件不会被过早处理。
  • 扇出 (Fan-out): 将单条延迟消息路由到多个队列(例如,稍后向任何在线订阅者发送消息)。
警告

从历史上看,调度依赖于社区支持的 rabbitmq-delayed-message-exchange 插件。由于严重的架构限制,该插件已被弃用并归档。

VMware Tanzu RabbitMQ 4.3 将其完全替换为新的、企业级的 延迟队列 (Delayed Queue) 插件:

  • 高可用性: 完全由 Raft 复制提供支持。
  • 标准协议: 使用标准的 AMQP 1.0 注解 (x-opt-delivery-time, x-opt-delivery-delay) 来处理绝对和相对延迟。
  • 发布/订阅集成: 与交换机(例如 topic)无缝协作,确保一条延迟消息可以路由到多个 MQTT 或 JMS 订阅。
  • 有界内存: 无论您调度的是一千条还是一百万条消息,内存占用都保持平稳。
  • 高级操作: 允许浏览和选择性清理已调度的消息。
  • 灾难恢复: 支持跨数据中心的 热备复制

Linter 代码检查器

最后,管理 UI 中内置了一个新的 Linter 工具。它会针对您的配置运行检查,以发现反模式并建议立即、可行的改进措施。

Lints
Lints (代码检查结果)

总结

我们非常期待您体验 RabbitMQ 4.3 和强大的全新 VMware Tanzu RabbitMQ 扩展。欢迎通过 GitHub Discussions 提供反馈。如需支持和商业扩展,请通过 VMware Tanzu RabbitMQ 联系我们的销售团队。

© . This site is unofficial and not affiliated with VMware.