AMQP 1.0 过滤器表达式
RabbitMQ 4.1 引入了一个令人兴奋的新功能:用于流的 AMQP 过滤器表达式。
此功能使 RabbitMQ 能够支持多个并发客户端,每个客户端仅消费消息的特定子集,同时保持消息顺序。此外,它通过仅分发与客户端兴趣匹配的消息,最大限度地减少 RabbitMQ 与其客户端之间的网络流量。
在这篇博文中,我们将探讨什么是 AMQP 过滤器表达式,并通过一个简单的 Java 示例来演示如何使用它们。
规范
正如原生 AMQP 1.0 博文中所述,AMQP 1.0 的优势之一是其可扩展性,这得益于众多扩展规范的支持。RabbitMQ 4.1 利用了扩展规范AMQP 过滤器表达式 1.0 版本工作草案 09。
本规范定义了消息过滤器表达式的 AMQP 类型定义。过滤器表达式是对消息进行评估的谓词,返回 true
或 false
。如果谓词评估为 true
,则 broker 将消息分发给消费者。
RabbitMQ 4.1 实现了本规范的子集,包括
- § 4.2.4 properties filter:应用于消息的不可变 properties 部分。
- § 4.2.5 application-properties filter:应用于消息的不可变 application-properties 部分。
示例
假设每条消息都带有指定特定颜色的元数据。不同的消费者可以订阅同一个流,过滤消息以仅接收他们感兴趣的颜色的消息。
第一个消费者接收所有绿色消息。第二个消费者接收所有紫色消息。第三个消费者接收所有蓝色消息。
尝试此示例。
您可以按照以下步骤,使用 amqp-filter-expressions 示例应用程序以及 RabbitMQ AMQP 1.0 Java 客户端 尝试此示例
- 使用以下命令启动 RabbitMQ 服务器
docker run -it --rm --name rabbitmq \
-p 5672:5672 -p 15672:15672 \
rabbitmq:4.1-rc-management
- 导航到示例应用程序的根目录并启动客户端
mvn clean compile exec:java
运行示例应用程序后,您应该在控制台上看到以下输出
publisher sent message 0 with color green
publisher sent message 1 with color blue
publisher sent message 2 with color purple
publisher sent message 3 with color purple
publisher sent message 4 with color green
publisher sent message 5 with color green
consumer (filter green) received message 0
consumer (filter green) received message 4
consumer (filter green) received message 5
consumer (filter purple) received message 2
consumer (filter purple) received message 3
consumer (filter blue) received message 1
consumer (filter &s:e) received message 1
consumer (filter &s:e) received message 2
consumer (filter &s:e) received message 3
在此示例中,发布者发送六条消息,并在 application-properties 部分为每条消息分配特定颜色。
- 第一个消费者应用
color: green
的 application-properties 过滤器,按消息发布到流的顺序接收所有绿色消息。 - 同样,第二个消费者过滤
color: purple
,接收所有紫色消息,第三个消费者过滤color: blue
,接收所有蓝色消息。
此外,此示例应用程序还包含第四个消费者(上图中未显示),其过滤器匹配颜色以字母 e
结尾的消息。(根据规范,过滤器表达式 &s:suffix
匹配以指定后缀结尾的值。)因此,第四个消费者接收颜色为蓝色和紫色的消息。
AMQP 过滤器表达式使多个客户端能够并发地从同一流中消费特定子集的消息,同时保持消息顺序。此功能还通过仅分发与每个客户端的兴趣匹配的消息,最大限度地减少 RabbitMQ 与其客户端之间的网络流量。
流过滤比较
本博文中描述的 AMQP 过滤器表达式 功能不应与 RabbitMQ 3.13 中引入的 基于 Bloom 过滤器的流过滤 相混淆。
这两个功能都服务于相同的目的:从流中过滤消息。但是,它们的实现方式不同,导致了不同的特性
功能 | AMQP 过滤器表达式 | 基于 Bloom 过滤器的流过滤 |
---|---|---|
支持的协议 | AMQP 1.0 | 主要用于 RabbitMQ Streams 协议,但也支持 AMQP 1.0、AMQP 0.9.1 和 STOMP。 |
假阳性 | 无 | 可能:需要在客户端进行额外的每消息过滤。 |
支持要过滤的多个值(发布者) | 是:发布者可以在 properties 或 application-properties 部分中定义多个值。 | 否:发布者每条消息只能分配一个过滤器值。 |
支持多个过滤器表达式(消费者) | 是:消费者可以提供多个过滤器表达式,并且如果所有过滤器都匹配,则会传递消息。 | 是:消费者可以指定多个过滤器值,并且如果任何过滤器匹配,则会传递消息。 |
前缀和后缀匹配 | 是:对于字符串值,消费者可以定义如下表达式:“过滤 subject 以 emea. 开头的消息”或“过滤 application-properties 部分具有键 color 且值以 e 结尾的消息”。 | 否 |
Broker 开销 | 使用高效的 Erlang 模式匹配或术语相等操作实现。但是,每条消息都会为每个消费者读取到内存中(除非与基于 Bloom 过滤器的过滤结合使用)。 | 最小:Bloom 过滤器成员资格检查使用恒定时间。对于 RabbitMQ Streams 协议,sendfile 系统调用优化了块传递,而无需消息进入用户空间。 |
网络开销 | 较低:仅传输与消费者过滤器匹配的消息。 | 较高:即使只有一条消息匹配,也会传输整个块。 |
通过 AMQP 1.0 消费时,这两个功能可以一起使用。
总结
RabbitMQ 4.1 解决了在单个队列/流上启用多个消费者,同时确保某些消息(例如,具有相同主题或 ID 的消息)始终由同一消费者处理,从而保持顺序处理的挑战。
虽然此功能不适用于经典队列或仲裁队列,但 AMQP 过滤器表达式允许消费者在从流中消费消息时进行过滤。由于流是不可变的日志,因此总的消息顺序得以维护。