跳至主内容

AMQP 1.0 过滤表达式

·6 分钟阅读

RabbitMQ 4.1 引入了一项令人兴奋的新功能:用于Streams的 AMQP 过滤表达式。

此功能使 RabbitMQ 能够支持多个并发客户端,每个客户端仅消耗特定子集的消息,同时保留消息顺序。此外,它通过仅分派与客户端兴趣匹配的消息,最大限度地减少了 RabbitMQ 和其客户端之间的网络流量。

在这篇博文中,我们将探讨 AMQP 过滤表达式是什么,并通过一个简单的 Java 示例来演示如何使用它们。

规范

正如在 原生 AMQP 1.0 博客文章中所概述的那样,AMQP 1.0 的优势之一是其可扩展性,并得到了众多扩展规范的支持。RabbitMQ 4.1 利用了扩展规范 AMQP 过滤表达式版本 1.0 工作草案 09

本规范定义了用于消息过滤表达式的 AMQP 类型定义。过滤表达式是针对消息进行评估的谓词,返回 truefalse。如果谓词评估结果为 true,代理(broker)会将消息分发给消费者。

RabbitMQ 4.1 实现了该规范的一个子集,包括:

  • § 4.2.4 properties 过滤器:应用于消息中不可变的 properties 部分。
  • § 4.2.5 application-properties 过滤器:应用于消息中不可变的 application-properties 部分。

示例

想象一下,每条消息都携带指定特定颜色的元数据。不同的消费者可以订阅同一个流,通过过滤消息,仅接收那些符合其感兴趣颜色的消息。

Consumers filtering messages from a stream
从流中过滤消息的消费者

第一个消费者接收所有绿色消息。第二个消费者接收所有紫色消息。第三个消费者接收所有蓝色消息。

尝试此示例。

您可以按照以下步骤,使用 amqp-filter-expressions 示例应用以及 RabbitMQ AMQP 1.0 Java 客户端来尝试此示例。

  1. 使用以下命令启动 RabbitMQ 服务器
docker run -it --rm --name rabbitmq \
-p 5672:5672 -p 15672:15672 \
rabbitmq:4.1-rc-management
  1. 导航到示例应用的根目录并启动客户端
mvn clean compile exec:java

运行示例应用后,您应该会在控制台上看到以下输出

publisher sent message 0 with color green
publisher sent message 1 with color blue
publisher sent message 2 with color purple
publisher sent message 3 with color purple
publisher sent message 4 with color green
publisher sent message 5 with color green
consumer (filter green) received message 0
consumer (filter green) received message 4
consumer (filter green) received message 5
consumer (filter purple) received message 2
consumer (filter purple) received message 3
consumer (filter blue) received message 1
consumer (filter &s:e) received message 1
consumer (filter &s:e) received message 2
consumer (filter &s:e) received message 3

在此示例中,发布者发送了六条消息,并在 application-properties 部分为每条消息分配了特定的颜色。

  • 第一个消费者应用了 application-properties 过滤器 color: green,按发布到流中的顺序接收所有绿色消息。
  • 类似地,第二个消费者过滤 color: purple,接收所有紫色消息;第三个消费者过滤 color: blue,接收所有蓝色消息。

此外,该示例应用还包含第四个消费者(上图中未显示),其过滤器匹配颜色以字母 e 结尾的消息。(根据规范,过滤表达式 &s:suffix 匹配以指定后缀结尾的值。)因此,第四个消费者接收颜色为蓝色和紫色的消息。

AMQP 过滤表达式使多个客户端能够同时从同一个流中消费特定的消息子集,同时保持消息顺序。该功能还通过仅分发符合每个客户端兴趣的消息,最大限度地减少了 RabbitMQ 与其客户端之间的网络流量。

流过滤比较

本博文中介绍的 AMQP 过滤表达式功能不应与 RabbitMQ 3.13 中引入的 基于布隆过滤器的流过滤 混淆。

这两个功能用途相同:从流中过滤消息。然而,它们的实现方式不同,导致了各自独特的特性:

功能AMQP 过滤表达式基于布隆过滤器的流过滤
支持的协议AMQP 1.0主要用于 RabbitMQ Streams 协议,但也支持 AMQP 1.0、AMQP 0.9.1 和 STOMP。
误报 (False Positives)可能存在:需要在客户端进行额外的逐条消息过滤。
支持多值过滤(发布者端)支持:发布者可以在 properties 或 application-properties 部分定义多个值。不支持:发布者每条消息只能分配一个过滤值。
支持多过滤表达式(消费者端)支持:消费者可以提供多个过滤表达式,只有当 所有 过滤器都匹配时,消息才会被传递。支持:消费者可以指定多个过滤值,只要 任何 过滤器匹配,消息就会被传递。
前缀和后缀匹配支持:对于字符串值,消费者可以定义如下表达式:“过滤 subjectemea. 开头的消息”或“过滤 application-properties 部分包含 key color 且值以 e 结尾的消息”。
代理开销使用高效的 Erlang 模式匹配或项相等操作实现。但是,每条消息都会为每个消费者读取到内存中(除非与基于布隆过滤器的过滤结合使用)。最小化:布隆过滤器成员资格检查使用常数时间。通过 RabbitMQ Streams 协议,sendfile 系统调用可以在不将消息进入用户空间的情况下优化块传输。
网络开销较低:仅传输匹配消费者过滤器的消息。较高:即使只有一条消息匹配,也会传输整个 块 (chunks)

通过 AMQP 1.0 消费时,这两个功能可以结合使用。

总结

RabbitMQ 4.1 解决了在单个队列/流上启用多个消费者同时确保特定消息(例如具有相同主题或 ID 的消息)始终由同一消费者处理以保持有序处理的 挑战

虽然此功能不适用于 经典队列仲裁队列,但 AMQP 过滤表达式允许消费者在从流中消费时过滤消息。由于流是不可变的日志,因此保持了消息的总顺序。

© . This site is unofficial and not affiliated with VMware.