跳至主内容
版本:4.2

流过滤

RabbitMQ 可以将流中的相同消息传递给多个消费者。由于某些消费者只需要消息的特定子集,RabbitMQ 提供了过滤功能,允许消费者仅接收他们感兴趣的消息。

磁盘上流布局

当消息被追加到流时,RabbitMQ 会为每条消息分配一个单调递增的偏移量。

流在磁盘上的布局如下所示

一个流由许多段文件组成。每个段文件都有一个索引文件。索引文件包含从偏移量和时间戳到段文件中位置的映射。当客户端应用程序提供流偏移量时,索引文件使 RabbitMQ 能够快速在段文件中定位相应的消息。

每个段文件由块组成,每个块由消息组成。一个块中的消息数量取决于入口速率,即消息发布到流的速度。如果入口速率很高,则一个块内会有很多消息。如果入口速率很低,可能会发生每个块只包含一条消息的情况。

过滤阶段概述

流消息可以在三个不同的阶段进行过滤,如下面的绿色突出显示。每个过滤阶段都是可选的。如果未配置任何过滤器,客户端将消耗它们连接到的流中的所有消息。

阶段 1:布隆过滤器

布隆过滤器是一种节省空间的概率性数据结构,用于测试一个元素是否是某个集合的成员。可能存在假阳性,但绝不存在假阴性。

布隆过滤的工作原理如下:发布者可以选择性地为每条消息分配一个过滤值(字符串)。在将消息块写入磁盘之前,RabbitMQ 会将所有这些过滤值收集到一个布隆过滤器数据结构中,并将其存储在块的头部。

消费者在连接到流时可以选择提供一个或多个过滤值。在从磁盘读取包含布隆过滤器的块头部时,RabbitMQ 会有效地评估消费者提供的过滤值是否与块中的任何消息匹配。如果至少有一个匹配项,RabbitMQ 将从磁盘读取该块中的所有消息。

提示

布隆过滤器效率极高。

当布隆过滤器评估结果为 false 时,RabbitMQ 会跳过整个块——它不会从磁盘读取消息,也不会将它们解析到内存中,也不会将它们发送给客户端。这可以节省整个系统的资源:服务器 CPU、内存和磁盘 I/O;网络带宽;以及客户端 CPU 和内存。

布隆过滤器是最有效的过滤方式。然而,由于它们在块级别(而不是消息级别)运行,并且可能出现假阳性,因此布隆过滤器最常与阶段 2:AMQP 过滤表达式阶段 3:客户端过滤结合使用。

默认情况下,当消费者设置布隆过滤器时,RabbitMQ 不会传递缺少过滤值的消息。要更改此行为并接收没有过滤值的消息,请将 AMQP 1.0 过滤器 rabbitmq:stream-match-unfiltered 或 AMQP 0.9.1 消费者参数 x-stream-match-unfiltered 设置为 true

声明 RabbitMQ 流中所述,可以通过 x-stream-filter-size-bytes 参数配置布隆过滤器的大小。16 字节的默认值应足以满足大多数用例。配置更高的值可以降低布隆过滤器的假阳性率,但会导致更多的存储开销。如果您的工作负载需要大量的唯一过滤值(高基数),例如过滤值代表客户 ID 且每条消息都有不同的客户 ID,那么增加布隆过滤器的大小是有意义的。

布隆过滤器可与 AMQP 1.0、AMQP 0.9.1、RabbitMQ 流协议和 STOMP 一起使用。

有关布隆过滤器和 RabbitMQ 流协议的详细说明,请阅读以下博客文章

  1. 流过滤
  2. 流过滤内部机制

示例:布隆过滤器 AMQP 1.0

为了使布隆过滤功能正常工作,消息必须附带一个关联的布隆过滤值发布,该值由 x-stream-filter-value 消息注释指定

Message message = publisher.message(body)
.annotation("x-stream-filter-value", "invoices"); // set Bloom filter value
publisher.publish(message, context -> {
// confirm callback
});

接收方必须使用描述符为 rabbitmq:stream-filter 的过滤器。此过滤器接受一个字符串或字符串列表。如果提供了字符串列表,则它们通过 OR 运算符逻辑组合,即,其中一个或几个字符串适用即可。

下面大多数语言示例将布隆过滤器(阶段 1)与客户端过滤(阶段 3)结合使用。Erlang 示例展示了如何将布隆过滤器(阶段 1)与 AMQP 过滤表达式(阶段 2)结合使用,这样就不需要客户端过滤(阶段 3)。

Consumer consumer = connection.consumerBuilder()
.queue("some-stream")
.stream()
// This Bloom filter will be evaluated server-side per chunk (Stage 1).
.filterValues("invoices", "orders")
.filterMatchUnfiltered(true)
.builder()
.messageHandler((ctx, msg) -> {
String filterValue = (String) msg.annotation("x-stream-filter-value");
// This filter will be evaluted client-side per message (Stage 3).
if ("invoices".equals(filterValue) || "orders".equals(filterValue)) {
// message processing
}
ctx.accept();
})
.build();

示例:布隆过滤器 AMQP 0.9.1

为了使布隆过滤功能正常工作,消息必须附带一个关联的布隆过滤值发布,该值由 x-stream-filter-value 头部指定

channel.basicPublish(
"", // default exchange
"invoices",
new AMQP.BasicProperties.Builder()
.headers(Collections.singletonMap(
// set Bloom filter value
"x-stream-filter-value", "emea"
))
.build(),
body
);

如果消费者想要仅接收给定过滤值(或多个值)的消息,则必须使用 x-stream-filter 消费者参数。此参数接受一个字符串或字符串数组。如果提供字符串数组,则它们通过 OR 运算符逻辑组合,即,其中一个或几个字符串适用即可。

// Consumer prefetch must be specified when consuming from a stream.
channel.basicQos(100);

channel.basicConsume(
"invoices",
false,
// This Bloom filter will be evaluated server-side per chunk (Stage 1).
Collections.singletonMap("x-stream-filter", "emea"),
(consumerTag, message) -> {
Map<String, Object> headers = message.getProperties().getHeaders();
// This filter will be evaluted client-side per message (Stage 3).
if ("emea".equals(headers.get("x-stream-filter-value"))) {
// message processing ...
}
// Ack is required to receive more messages from the stream.
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
},
consumerTag -> { });

如上面的代码片段所示,也应该有一些客户端过滤逻辑,因为服务器端布隆过滤器可能包含假阳性,并且仅在整个块级别执行。

示例:布隆过滤器流协议

Producer producer = environment.producerBuilder()
.stream("invoices")
// The Java library will set the Bloom filter value by extracting
// the value from key "region" in the message's application properties.
.filterValue(msg -> msg.getApplicationProperties().get("region").toString())
.build();
Consumer consumer = environment.consumerBuilder()
.stream("invoices")
.filter()
// This Bloom filter will be evaluated server-side per chunk (Stage 1).
.values("emea")
// This filter will be evaluted client-side per message (Stage 3).
.postFilter(msg -> "emea".equals(msg.getApplicationProperties().get("region")))
.builder()
.messageHandler((ctx, msg) -> {
// message processing ...
})
.build();

阶段 2:AMQP 过滤表达式

AMQP 过滤表达式是消费者在连接到流时提供的逻辑语句。RabbitMQ 节点会根据消息元数据评估这些表达式。如果消息匹配,RabbitMQ 会将消息传递给客户端。

过滤语法定义在 AMQP 1.0 扩展规范 AMQP Filter Expressions Version 1.0 Committee Specification Draft 01 中。RabbitMQ 支持该规范的一个子集,如下所述。

AMQP 过滤表达式是属性过滤表达式SQL过滤表达式

注意

属性过滤表达式和 SQL 过滤表达式是互斥的。消费者可以定义一个属性过滤器或一个 SQL 过滤器,但不能同时定义两者。

AMQP 过滤表达式允许多个客户端同时从同一个流中消费特定子集的消息,同时保持消息顺序。

属性过滤表达式

属性过滤表达式允许 RabbitMQ 将每条消息的元数据与消费者在连接到流时提供的参考模式进行匹配。

RabbitMQ 实现

如规范所述,可以使用 &p:<prefix>&s:<suffix> 修改器支持前缀和后缀匹配。

示例:属性过滤表达式

以下示例会导致 RabbitMQ 仅传递满足以下所有条件的的消息

  • 字段 user-idJohn
  • 字段 subjectOrder 前缀开头
  • 应用程序属性键 regionemea
Consumer consumer = connection.consumerBuilder()
.stream().filter()
.userId("John".getBytes(UTF_8))
.subject("&p:Order")
.property("region", "emea")
.stream().builder()
.queue("my-queue")
.messageHandler((ctx, msg ) -> {
// message processing
})
.build();

SQL 过滤表达式

RabbitMQ 支持使用基于 SQL WHERE 子句语法的复杂表达式过滤消息。消费者在连接到流时提供这些 SQL 语句。

SQL 过滤表达式提供了属性过滤表达式所提供功能的超集。

RabbitMQ 支持SQL 过滤表达式语法的一个子集。下面我们描述了当前支持的和不支持的内容。

保留字和运算符名称必须大写

  • AND OR, NOT
  • IN, IS, NULL, LIKE, ESCAPE
  • TRUE, FALSE
  • UTC

AMQP 1.0 消息部分

✅ 支持的部分
  • Header - headerh(仅 priority 字段)

    • 示例:header.priority = 4
    • 示例:h.priority = 4
  • Properties - propertiesp

    • 示例:properties.message_id = 12345
    • 示例:p.user_id = 0x426F62
    • 示例:p.to = 'orders'
    • 示例:p.subject LIKE 'Order%'
    • 示例:p.reply_to = '/queues/q1'
    • 示例:p.correlation_id IS NOT NULL
    • 示例:p.content_type = 'application/json'
    • 示例:p.content_encoding IN ('gzip', 'zstd', 'deflate')
    • 示例:p.absolute_expiry_time > UTC()
    • 示例:p.creation_time < 1753365622460
    • 示例:p.group_id = 'Group A'
    • 示例:p.group_sequence % 3 = 0
    • 示例:p.reply_to_group_id = 'Group B'
  • Application Properties - application_propertiesa

    • 示例:application_properties.color = 'blue'(完整限定)
    • 示例:a.color = 'blue'(简写)
    • 示例:color = 'blue'(未限定的字段默认为 application properties)
❌ 不支持的部分
  • Header - priority 以外的字段
  • Delivery Annotations - delivery_annotationsd
  • Message Annotations - message_annotationsm
  • Footer - footerf

常量

✅ 支持
  • 整数常量

    • 示例:age = 25
  • 小数和近似数字常量

    • 示例:temperature = -5.5
    • 示例:value = 1.23E6(科学计数法)
  • 布尔常量

    • 示例:active = TRUE
    • 示例:deleted = FALSE
  • 字符串常量(单引号或双引号)

    • 示例:name = 'Alice'
    • 示例:city = "New York"
    • 示例:quote = 'It''s great'(转义引号)
    • 示例:emojis = '😎☀️'(UTF-8 字符)
  • 二进制常量

    • 示例:properties.message_id = 0x0123456789ABCDEF
❌ 不支持
  • 特殊数字常量
    • IEEE 754 无穷大值:INF
    • IEEE 754 非数字值:NAN

标识符

标识符指的是例如属性部分的字段或应用程序属性部分的键。

✅ 支持
  • 常规标识符

    • 示例:properties.subject = 'EMEA'
    • 示例:order_status = 'pending'
  • 分隔标识符(用于特殊字符或保留字)

    • 示例:[order-status] = 'pending'
    • 示例:[order status] = 'pending'
    • 示例:[注文状況] = 'pending'
❌ 不支持
  • 复合类型引用(数组/映射访问)

一元和二元逻辑运算符

✅ 全部支持
  • AND

    • 示例:header.priority > 4 AND properties.subject = 'orders'
  • OR

    • 示例:status = 'new' OR status = 'pending'
  • NOT

    • 示例:NOT cancelled
  • 用于分组的括号

    • 示例:(category = 'books' OR category = 'music') AND price < 20

比较运算符

✅ 全部支持
  • 相等=

    • 示例:customer_id = 12345
  • 不等<>!=

    • 示例:status <> 'cancelled'
    • 示例:region != 'EU'
  • 大于>

    • 示例:age > 18
  • 大于等于>=

    • 示例:p.creation_time >= 1753690918262
  • 小于<

    • 示例:quantity < 10
  • 小于等于<=

    • 示例:discount <= 0.25

算术运算符

✅ 支持(仅适用于数字类型)
  • 加法+

    • 示例:quantity + 5 > stock_level
  • 减法-

    • 示例:price - discount > 10
  • 乘法*

    • 示例:quantity * price > 100
  • 除法/

    • 示例:total / quantity < 50
  • 取模%

    • 示例:p.group_sequence % 2 = 0(偶数组序列)
  • 一元加/减+, -

    • 示例:balance < +100
    • 示例:balance < -100
❌ 不支持
  • 字符串或符号与 + 的连接
    • 示例:firstname + lastname = 'JohnDoe'

其他逻辑谓词

✅ 支持
  • IS NULL

    • 示例:p.reply_to IS NULL
    • 示例:category IS NOT NULL
  • LIKE(带有 %_ 通配符)

    • 示例:name LIKE 'John%'(以 John 开头)
    • 示例:name NOT LIKE 'John%'(不以 John 开头)
    • 示例:product LIKE 'John % Doe'(John Doe,中间有名)
    • 示例:email LIKE '%@example.com'(以 @example.com 结尾)
    • 示例:code LIKE '___ABC'(3 个字符后跟 ABC)
  • LIKE 与 ESCAPE

    • 示例:underscored LIKE '\_%' ESCAPE '\' 对于 '_foo' 为 true,对于 'bar' 为 false
  • IN

    • 示例:status IN ('new', 'pending', 'processing')
    • 示例:environment NOT IN ('DEV', 'STAGING')
❌ 不支持
  • EXISTS 谓词

函数

✅ 支持
  • UTC() - 返回当前 UTC 时间(毫秒)
    • 示例:p.absolute_expiry_time IS NULL OR p.absolute_expiry_time > UTC() 选择未过期的消息
❌ 不支持
  • LOWER - 小写转换
  • UPPER - 大写转换
  • LEFT - 左侧子字符串
  • RIGHT - 右侧子字符串
  • SUBSTRING - 子字符串提取
  • DATE - ISO 8601 日期解析

示例:SQL 过滤表达式

以下示例会导致 RabbitMQ 仅传递满足以下所有条件的的消息

  • 字段 user-idJohn
  • 字段 subjectOrder 前缀开头
  • 应用程序提供的键 regionemea
Consumer consumer = connection.consumerBuilder()
.stream().filter()
.sql("properties.user_id = 'John' AND " +
"properties.subject LIKE 'Order%' AND " +
"region = 'emea'")
.stream().builder()
.queue("my-queue")
.messageHandler((ctx, msg ) -> {
// message processing
})
.build();

错误处理

定义错误

在消费者连接到流时,RabbitMQ 会检查过滤器的语法正确性,例如 SQL 表达式是否有效。如果提供的过滤器无效,RabbitMQ 将在发送给客户端的attach帧中排除此过滤器。此外,RabbitMQ 可能会记录一条警告,说明过滤器无效的原因。客户端库应随后分离链接并将错误返回给客户端应用程序。

评估错误

评估错误发生在 RabbitMQ 在运行时根据消息评估过滤器且评估无法完成时。

此类评估错误的示例包括

  • 比较不同类型的比较:例如,数字 3 是否大于字符串“London”?
  • 算术除以零:例如,3 / 0
  • 整数除法带有浮点数:例如,3 % 1.2

遇到评估错误的过滤器将被视为返回 falseunknown(取决于错误)。RabbitMQ 只会将 SQL 条件表达式评估为 true 的消息传递给客户端。

布隆过滤器与 AMQP 过滤表达式

此表比较了布隆过滤器(阶段 1)和 AMQP 过滤表达式(阶段 2)的特性。

功能布隆过滤器AMQP 过滤表达式
服务器端过滤?
阶段阶段 1:从磁盘读取块头部后,但在从磁盘读取任何消息之前阶段 2:在 RabbitMQ 将消息从磁盘读取到内存后,在将它们传递给客户端之前
粒度发布者为每条消息设置过滤值,但 RabbitMQ 为每个块评估过滤器每条消息
假阳性可能:需要在阶段 2 或阶段 3 进行额外的每条消息过滤。
支持的协议AMQP 1.0, AMQP 0.9.1, RabbitMQ 流协议, STOMPAMQP 1.0
支持多个过滤值(发布者)否:发布者每个消息只能分配一个过滤值。是:发布者可以在 properties 或 application-properties 部分定义多个值。
支持多个过滤表达式(消费者)是:消费者可以指定多个过滤值,如果*任何*过滤值匹配,则会传递消息。是:消费者可以提供多个过滤表达式。
过滤复杂性低:简单的字符串相等匹配高:可能进行复杂的 SQL 查询
消息/秒的评估速度高达数百万高达数十万
Broker 开销最小:布隆过滤器成员资格检查使用恒定时间。如果块匹配并且使用了 RabbitMQ Streams 协议,则sendfile系统调用会优化块传输,而无需消息进入用户空间。如果不匹配,RabbitMQ 甚至不会从磁盘读取消息。主要使用高效的 Erlang 模式匹配实现。但是,每条消息都会为每个消费者读入内存(除非与布隆过滤器结合使用)。
网络和客户端开销较高:即使只有一个消息的过滤值匹配,也会传输整个块。较低:只传输匹配过滤器的消息。

结合布隆过滤器和 AMQP 过滤表达式

提示

通过将布隆过滤器与 AMQP(尤其是 SQL)过滤表达式结合使用,RabbitMQ 实现了两者的优势:阶段 1 高效的块级过滤,以跳过不必要的磁盘 I/O、CPU 和内存使用;然后是阶段 2 精确的消息级过滤,用于复杂的业务逻辑——所有这些都在服务器端完成。

示例:结合布隆过滤器和 SQL 过滤表达式

考虑一个包含各种客户相关事件的流,例如

  • user.login
  • product.view
  • cart.add
  • cart.remove
  • order.created
  • review.submitted
  • 等等。

以下示例提供了一个复杂的 SQL 过滤表达式,用于查询满足以下所有条件的事件

  • 订单在过去一小时内创建
  • 位于 AMEREMEAAJP 区域之一
  • 订单必须是高优先级或高价格,或者由高级客户提交
Consumer consumer = connection.consumerBuilder()
.stream()
// This Bloom filter will be evaluated server-side per chunk (Stage 1).
.filterValues("order.created")
.filter()
// This complex SQL filter expression will be evaluted server-side
// per message at stage 2.
.sql("p.subject = 'order.created' AND " +
"p.creation_time > UTC() - 3600000 AND " +
"region IN ('AMER', 'EMEA', 'APJ') AND " +
"(h.priority > 4 OR price >= 99.99 OR premium_customer = TRUE)")
.stream().builder()
.queue("my-queue")
.messageHandler((ctx, msg ) -> {
// message processing
})
.build();

如果 order.created 事件仅占所有事件的一小部分,RabbitMQ 可以高效地过滤流,因为只有一小部分消息需要解析和在内存中评估。

阶段 3:客户端过滤

在 RabbitMQ 将消息发送给客户端后,可以在客户端库或客户端应用程序本身中进行额外的过滤。

客户端过滤允许最高的灵活性,因为客户端不受服务器端过滤原语的约束。例如,客户端可以通过检查消息正文来进行过滤。

布隆过滤器 AMQP 1.0 示例布隆过滤器 AMQP 0.9.1 示例中提供了如何将客户端过滤(阶段 3)与服务器端布隆过滤(阶段 1)结合使用的示例。

© . This site is unofficial and not affiliated with VMware.