流过滤
RabbitMQ 可以将流中的同一消息投递给多个消费者。由于某些消费者只需要特定子集的消息,RabbitMQ 提供了过滤功能,允许消费者仅接收他们感兴趣的消息。
流的磁盘布局
当消息被追加到流中时,RabbitMQ 会为每条消息分配一个单调递增的偏移量(offset)。
流的磁盘布局如下所示
一个流由许多分片文件(segment files)组成。每个分片文件对应一个索引文件。索引文件包含从偏移量和时间戳到分片文件中位置的映射。当客户端应用程序提供流偏移量时,索引文件使 RabbitMQ 能够快速定位分片文件中的相应消息。
每个分片文件由多个数据块(chunks)组成,每个数据块由多条消息组成。一个数据块中的消息数量取决于入口速率,即消息发布到流中的速度。如果入口速率很高,一个数据块中会有很多消息。如果入口速率很低,可能会出现每个数据块仅包含一条消息的情况。
过滤阶段概述
流消息可以在三个不同的阶段进行过滤(如下图绿色部分所示)。每个过滤阶段都是可选的。如果没有配置任何过滤器,客户端将接收其所连接流中的所有消息。
阶段 1:布隆过滤器 (Bloom Filter)
布隆过滤器(Bloom filter)是一种空间效率极高的概率型数据结构,用于测试某个元素是否属于一个集合。它可能会产生误报(False Positive),但绝不会产生漏报(False Negative)。
布隆过滤的工作原理如下:发布者可以根据需要为每条消息分配一个过滤值(字符串)。在将消息块写入磁盘之前,RabbitMQ 会将这些过滤值收集到一个布隆过滤器数据结构中,并将其存储在数据块的头部。
消费者在连接到流时,可以选择提供一个或多个过滤值。当从磁盘读取包含布隆过滤器的数据块头部时,RabbitMQ 会高效评估消费者提供的任何过滤值是否与数据块中的消息匹配。如果至少有一个匹配,RabbitMQ 就会从磁盘读取该数据块中的所有消息。
布隆过滤器效率极高。
当布隆过滤器评估结果为假时,RabbitMQ 会跳过整个数据块——它不会从磁盘读取消息、不会将消息解析到内存中,也不会将它们发送给客户端。这节省了整个系统的资源:服务器 CPU、内存和磁盘 I/O;网络带宽;以及客户端的 CPU 和内存。
布隆过滤器是最高效的过滤方式。然而,由于它们是在数据块级别(而非消息级别)操作,且可能会发生误报,因此布隆过滤器通常与阶段 2:AMQP 过滤表达式或阶段 3:客户端过滤结合使用。
默认情况下,当消费者设置了布隆过滤器时,RabbitMQ 不会投递缺乏过滤值的消息。要更改此行为并同时接收没有过滤值的消息,请将 AMQP 1.0 过滤器 rabbitmq:stream-match-unfiltered 或 AMQP 0.9.1 消费者参数 x-stream-match-unfiltered 设置为 true。
如声明 RabbitMQ 流中所述,可以通过配置 x-stream-filter-size-bytes 参数来设置布隆过滤器的大小。默认的 16 字节足以满足大多数用例。配置较大的值可以降低布隆过滤器的误报率,但会导致更多的存储开销。如果您的工作负载需要大量唯一的过滤值(高基数),例如过滤值代表客户 ID 且每条消息都有不同的客户 ID,则增加布隆过滤器的大小是有意义的。
布隆过滤器可用于 AMQP 1.0、AMQP 0.9.1、RabbitMQ 流协议和 STOMP。
有关布隆过滤器和 RabbitMQ 流协议的详细描述,请阅读以下博客文章
示例:布隆过滤器 AMQP 1.0
为了使布隆过滤功能正常工作,发布消息时必须附带关联的布隆过滤器值,该值由 x-stream-filter-value 消息注解指定。
- Java
- C#
- Python
- Go
- Erlang
- JavaScript
Message message = publisher.message(body)
.annotation("x-stream-filter-value", "invoices"); // set Bloom filter value
publisher.publish(message, context -> {
// confirm callback
});
var message = new AmqpMessage(body);
message.Annotation("x-stream-filter-value", "invoices"); // set Bloom filter value
PublishResult pr = await publisher.PublishAsync(message);
publisher.publish(
Message(
Converter.string_to_bytes(body="myBody"),
annotations={"x-stream-filter-value": "invoices"}, # set Bloom filter value
)
message := amqp.NewMessage(body)
message.Annotations = amqp.Annotations{
"x-stream-filter-value": "invoices", // set Bloom filter value
}
publishResult, err := publisher.Publish(context.Background(), message)
Msg = amqp10_msg:set_message_annotations(
%% set Bloom filter value
#{<<"x-stream-filter-value">> => <<"invoices>>},
amqp10_msg:set_application_properties(
#{<<"category">> => <<"invoices">>},
amqp10_msg:new(<<"my-tag">>, <<"my-payload">>))),
amqp10_client:send_msg(Sender, Msg),
const message = createAmqpMessage({
body: "Hello World!",
annotations: { "x-stream-filter-value": "invoices" }, // set Bloom filter value
})
await publisher.publish(message)
接收者必须使用描述符为 rabbitmq:stream-filter 的过滤器。此过滤器接受字符串或字符串列表。如果提供了字符串列表,它们将通过 OR 运算符进行逻辑连接,即只要其中一个或部分字符串匹配即可。
随后的大多数语言示例都将布隆过滤器(阶段 1)与客户端过滤(阶段 3)相结合。Erlang 示例展示了如何将布隆过滤器(阶段 1)与 AMQP 过滤表达式(阶段 2)相结合,从而无需进行客户端过滤(阶段 3)。
- Java
- C#
- Python
- Go
- Erlang
- JavaScript
Consumer consumer = connection.consumerBuilder()
.queue("some-stream")
.stream()
// This Bloom filter will be evaluated server-side per chunk (Stage 1).
.filterValues("invoices", "orders")
.filterMatchUnfiltered(true)
.builder()
.messageHandler((ctx, msg) -> {
String filterValue = (String) msg.annotation("x-stream-filter-value");
// This filter will be evaluted client-side per message (Stage 3).
if ("invoices".equals(filterValue) || "orders".equals(filterValue)) {
// message processing
}
ctx.accept();
})
.build();
IConsumer consumer = await connection.ConsumerBuilder()
.Queue("some-stream")
.Stream()
// This Bloom filter will be evaluated server-side per chunk (Stage 1).
.FilterValues(["invoices", "orders"])
.FilterMatchUnfiltered(true)
.Builder()
.MessageHandler(async (context, message) => {
string filterValue = (string)message.Annotation("x-stream-filter-value");
// This filter will be evaluted client-side per message (Stage 3).
if (filterValue.Equals("invoices")|| filterValue.Equals("orders"))
{
// message processing
}
context.Accept();
}
).BuildAndStartAsync();
class MyMessageHandler(AMQPMessagingHandler):
def __init__(self):
super().__init__()
def on_message(self, event: Event):
filterValue = event.message.annotations["x-stream-filter-value"]
### This filter will be evaluted client-side per message (Stage 3).
if filterValue == "invoices" or filterValue == "orders":
### message processing
self.delivery_context.accept(event)
stream_address = AddressHelper.queue_address("some-stream")
consumer = connection.consumer(
stream_address,
message_handler=MyMessageHandler(),
### This Bloom filter will be evaluated server-side per chunk (Stage 1).
stream_filter_options=StreamOptions(stream_filters=["invoices", "orders"], match_unfiltered=True),
)
consumer, err := connection.NewConsumer(context.Background(), qName, &
StreamConsumerOptions{
Offset: &OffsetFirst{},
// This Bloom filter will be evaluated server-side per chunk (Stage 1).
Filters: []string{"invoices", "orders"},
})
deliveryContext, err := consumer.Receive(context.Background())
var filterValue string
filterValue = deliveryContext.Message().Annotations["x-stream-filter-value"].(string)
// This filter will be evaluted client-side per message (Stage 3).
if filterValue == "orders" || filterValue == "invoices" {
// process message ...
}
err = deliveryContext.Accept(context.Background())
Address = rabbitmq_amqp_address:queue(<<"some-stream">>),
Filter = #{%% This Bloom filter will be evaluated server-side per chunk (Stage 1).
<<"my Bloom filter">> =>
#filter{descriptor = <<"rabbitmq:stream-filter">>,
value = {utf8, <<"invoices">>}},
%% This filter will be evaluted server-side per message (Stage 2).
<<"my AMQP Property filter">> =>
#filter{descriptor = <<"amqp:application-properties-filter">>,
value = {map, [{{utf8, <<"category">>},
{utf8, <<"invoices">>}}]}}},
{ok, Receiver} = amqp10_client:attach_receiver_link(
Session, <<"my receiver">>, Address,
unsettled, none, Filter),
receive {amqp10_event, {link, Receiver, attached}} -> ok
after 5000 -> exit(missing_attached)
end,
%% No need for client-side filtering (Stage 3) because the server will
%% deliver only messages with category=invoices
ok = amqp10_client:flow_link_credit(Receiver, 100, 50),
receive {amqp10_msg, Receiver, Message} ->
%% Process message...
ok = amqp10_client:accept_msg(Receiver, Message)
after 5000 -> exit(missing_msg)
end,
const consumer = await connection.createConsumer({
stream: {
name: "some-stream",
offset: Offset.first(),
matchUnfiltered: true,
filterValues: ["invoices", "orders"], // This Bloom filter will be evaluated server-side per chunk (Stage 1).
},
messageHandler: (context, message) => {
// This filter will be evaluated client-side per message (Stage 3).
if (
message.message_annotations &&
["invoices", "orders"].includes(message.message_annotations["x-stream-filter-value"])
) {
// message processing
}
context.accept()
},
})
consumer.start()
示例:布隆过滤器 AMQP 0.9.1
为了使布隆过滤功能正常工作,发布消息时必须附带关联的布隆过滤器值,该值由 x-stream-filter-value 头部指定。
- Java
channel.basicPublish(
"", // default exchange
"invoices",
new AMQP.BasicProperties.Builder()
.headers(Collections.singletonMap(
// set Bloom filter value
"x-stream-filter-value", "emea"
))
.build(),
body
);
如果消费者只想接收给定过滤值对应的消息,则必须使用 x-stream-filter 消费者参数。此参数接受一个字符串或字符串数组。如果提供了字符串数组,它们将通过 OR 运算符进行逻辑连接,即只要其中一个或部分字符串匹配即可。
- Java
// Consumer prefetch must be specified when consuming from a stream.
channel.basicQos(100);
channel.basicConsume(
"invoices",
false,
// This Bloom filter will be evaluated server-side per chunk (Stage 1).
Collections.singletonMap("x-stream-filter", "emea"),
(consumerTag, message) -> {
Map<String, Object> headers = message.getProperties().getHeaders();
// This filter will be evaluted client-side per message (Stage 3).
if ("emea".equals(headers.get("x-stream-filter-value"))) {
// message processing ...
}
// Ack is required to receive more messages from the stream.
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
},
consumerTag -> { });
如上面的代码片段所示,还需要包含一些客户端过滤逻辑,因为服务器端的布隆过滤器可能包含误报,且仅在整个数据块级别执行。
示例:布隆过滤器流协议
- Java
Producer producer = environment.producerBuilder()
.stream("invoices")
// The Java library will set the Bloom filter value by extracting
// the value from key "region" in the message's application properties.
.filterValue(msg -> msg.getApplicationProperties().get("region").toString())
.build();
- Java
Consumer consumer = environment.consumerBuilder()
.stream("invoices")
.filter()
// This Bloom filter will be evaluated server-side per chunk (Stage 1).
.values("emea")
// This filter will be evaluted client-side per message (Stage 3).
.postFilter(msg -> "emea".equals(msg.getApplicationProperties().get("region")))
.builder()
.messageHandler((ctx, msg) -> {
// message processing ...
})
.build();
阶段 2:AMQP 过滤表达式
AMQP 过滤表达式是消费者在连接到流时提供的逻辑语句。RabbitMQ 节点会根据消息元数据对这些表达式进行评估。如果消息匹配,RabbitMQ 就会将消息投递给客户端。
过滤器语法定义在 AMQP 1.0 扩展规范 AMQP 过滤表达式 1.0 版本委员会草案 01 中。RabbitMQ 支持该规范的一个子集,如下所述。
AMQP 过滤表达式可以是 属性 (Property) 过滤表达式 或 SQL 过滤表达式。
属性过滤表达式和 SQL 过滤表达式是互斥的。消费者可以定义属性过滤器或 SQL 过滤器,但不能同时定义两者。
AMQP 过滤表达式使多个客户端能够并发地消费来自同一流的特定消息子集,同时保持消息顺序。
属性过滤表达式
属性过滤表达式使 RabbitMQ 能够根据消费者在连接到流时提供的参考模式,将每条消息的元数据进行匹配。
RabbitMQ 实现了
- § 4.2.4 属性过滤器 (properties filter):适用于消息的不可变属性部分。
- § 4.2.5 应用程序属性过滤器 (application-properties filter):适用于消息的不可变应用程序属性部分。
如规范中所述,支持使用 &p:<前缀> 和 &s:<后缀> 修饰符进行前缀和后缀匹配。
示例:属性过滤表达式
以下示例使 RabbitMQ 仅投递满足所有以下条件的消息
- 字段
user-id为John - 字段
subject以Order开头 - 应用程序属性键
region为emea
- Java
- C#
- Go
- Python
- Erlang
- JavaScript
Consumer consumer = connection.consumerBuilder()
.stream().filter()
.userId("John".getBytes(UTF_8))
.subject("&p:Order")
.property("region", "emea")
.stream().builder()
.queue("my-queue")
.messageHandler((ctx, msg ) -> {
// message processing
})
.build();
IConsumer consumer = await connection.ConsumerBuilder().Queue("my-queue").
MessageHandler((context, message) =>
{
// process the messages
}
).Stream().Offset(StreamOffsetSpecification.First).Filter()
.UserId("John"u8.ToArray())
.Subject("&p:Order")
.Property("region", "emea")
.Stream().Builder()
.BuildAndStartAsync();
var subjectPrt = "&p:Order"
consumer, err := amqpConnection.NewConsumer(context.Background(), "my-queue",
&rmq.StreamConsumerOptions{
Offset: &rmq.OffsetFirst{},
StreamFilterOptions: &rmq.StreamFilterOptions{
Properties: &amqp.MessageProperties{Subject: &subjectPrt, UserID: []byte("John")},
ApplicationProperties: map[string]interface{}{"region": "emea"},
},
})
consumer = connection.consumer(
"my-queue",
message_handler=MyMessageHandler(),
consumer_options=StreamConsumerOptions(
offset_specification=OffsetSpecification.first,
filter_options=StreamFilterOptions(
message_properties=MessageProperties(
subject="&p:Order",
user_id= "John".encode("utf-8"),
),
application_properties={"region": "emea"},
),
),
)
Filter = #{<<"filter-name-1">> =>
#filter{
descriptor = <<"amqp:properties-filter">>,
value = {map, [{{symbol, <<"user-id">>}, {binary, <<"John">>}},
{{symbol, <<"subject">>}, {utf8, <<"&p:Order">>}}]}},
<<"filter-name-2">> =>
#filter{
descriptor = <<"amqp:application-properties-filter">>,
value = {map, [{{utf8, <<"region">>}, {utf8, <<"emea">>}}]}}},
{ok, Receiver} = amqp10_client:attach_receiver_link(
Session, <<"my receiver">>, Address,
unsettled, none, Filter),
const consumer = await connection.createConsumer({
stream: {
name: "my-queue",
offset: Offset.first(),
messagePropertiesFilter: {
subject: "&p:Order",
user_id: "John"
},
applicationPropertiesFilter: {
region: "emea",
},
},
messageHandler: (context, message) => {
// process the messages
},
})
consumer.start()
SQL 过滤表达式
RabbitMQ 支持使用基于 SQL WHERE 子句语法的复杂表达式来过滤消息。消费者在连接到流时提供这些 SQL 语句。
SQL 过滤表达式提供了属性过滤表达式所具备功能的一个超集。
RabbitMQ 支持 SQL 过滤表达式语法的一个子集。下面我们介绍当前支持和不支持的内容。
保留字和运算符名称必须大写
AND、OR、NOTIN、IS、NULL、LIKE、ESCAPETRUE、FALSEUTC
AMQP 1.0 消息部分
✅ 支持的部分
-
Header (头部) -
header或h(仅限priority字段)- 示例:
header.priority = 4 - 示例:
h.priority = 4
- 示例:
-
Properties (属性) -
properties或p- 示例:
properties.message_id = 12345 - 示例:
p.user_id = 0x426F62 - 示例:
p.to = 'orders' - 示例:
p.subject LIKE 'Order%' - 示例:
p.reply_to = '/queues/q1' - 示例:
p.correlation_id IS NOT NULL - 示例:
p.content_type = 'application/json' - 示例:
p.content_encoding IN ('gzip', 'zstd', 'deflate') - 示例:
p.absolute_expiry_time > UTC() - 示例:
p.creation_time < 1753365622460 - 示例:
p.group_id = 'Group A' - 示例:
p.group_sequence % 3 = 0 - 示例:
p.reply_to_group_id = 'Group B'
- 示例:
-
Application Properties (应用程序属性) -
application_properties或a- 示例:
application_properties.color = 'blue'(全限定名) - 示例:
a.color = 'blue'(简写) - 示例:
color = 'blue'(未指定部分的字段默认为应用程序属性)
- 示例:
❌ 不支持的部分
- Header - 除
priority外的字段 - Delivery Annotations (投递注解) -
delivery_annotations或d - Message Annotations (消息注解) -
message_annotations或m - Footer (页脚) -
footer或f
常量
✅ 支持的常量
-
整数常量
- 示例:
age = 25
- 示例:
-
十进制和近似数值常量
- 示例:
temperature = -5.5 - 示例:
value = 1.23E6(科学计数法)
- 示例:
-
布尔常量
- 示例:
active = TRUE - 示例:
deleted = FALSE
- 示例:
-
字符串常量(使用单引号或双引号)
- 示例:
name = 'Alice' - 示例:
city = "New York" - 示例:
quote = 'It''s great'(转义引号) - 示例:
emojis = '😎☀️'(UTF-8 字符)
- 示例:
-
二进制常量
- 示例:
properties.message_id = 0x0123456789ABCDEF
- 示例:
❌ 不支持的常量
- 特殊数值常量
- IEEE 754 无穷大值:
INF - IEEE 754 非数:
NAN
- IEEE 754 无穷大值:
标识符
标识符例如是指属性部分的字段或应用程序属性部分的键。
✅ 支持的标识符
-
普通标识符
- 示例:
properties.subject = 'EMEA' - 示例:
order_status = 'pending'
- 示例:
-
分隔标识符(用于特殊字符或保留字)
- 示例:
[order-status] = 'pending' - 示例:
[order status] = 'pending' - 示例:
[注文状況] = 'pending'
- 示例:
❌ 不支持的标识符
- 复合类型引用(数组/映射访问)
一元和二元逻辑运算符
✅ 全部支持
-
AND
- 示例:
header.priority > 4 AND properties.subject = 'orders'
- 示例:
-
OR
- 示例:
status = 'new' OR status = 'pending'
- 示例:
-
NOT
- 示例:
NOT cancelled
- 示例:
-
用于分组的括号
- 示例:
(category = 'books' OR category = 'music') AND price < 20
- 示例:
比较运算符
✅ 全部支持
-
相等:
=- 示例:
customer_id = 12345
- 示例:
-
不等:
<>或!=- 示例:
status <> 'cancelled' - 示例:
region != 'EU'
- 示例:
-
大于:
>- 示例:
age > 18
- 示例:
-
大于等于:
>=- 示例:
p.creation_time >= 1753690918262
- 示例:
-
小于:
<- 示例:
quantity < 10
- 示例:
-
小于等于:
<=- 示例:
discount <= 0.25
- 示例:
算术运算符
✅ 支持(仅限数值类型)
-
加法:
+- 示例:
quantity + 5 > stock_level
- 示例:
-
减法:
-- 示例:
price - discount > 10
- 示例:
-
乘法:
*- 示例:
quantity * price > 100
- 示例:
-
除法:
/- 示例:
total / quantity < 50
- 示例:
-
取模:
%- 示例:
p.group_sequence % 2 = 0(偶数组序列)
- 示例:
-
一元加/减:
+,-- 示例:
balance < +100 - 示例:
balance < -100
- 示例:
❌ 不支持
- 使用
+进行字符串或符号连接- 示例:
firstname + lastname = 'JohnDoe'
- 示例:
其他逻辑谓词
✅ 支持
-
IS NULL
- 示例:
p.reply_to IS NULL - 示例:
category IS NOT NULL
- 示例:
-
LIKE (使用
%和_通配符)- 示例:
name LIKE 'John%'(以 John 开头) - 示例:
name NOT LIKE 'John%'(不以 John 开头) - 示例:
product LIKE 'John % Doe'(John Doe 中间包含任何名字) - 示例:
email LIKE '%@example.com'(以 @example.com 结尾) - 示例:
code LIKE '___ABC'(3 个字符后跟 ABC)
- 示例:
-
带有 ESCAPE 的 LIKE
- 示例:
underscored LIKE '\_%' ESCAPE '\'对'_foo'为真,对'bar'为假
- 示例:
-
IN
- 示例:
status IN ('new', 'pending', 'processing') - 示例:
environment NOT IN ('DEV', 'STAGING')
- 示例:
❌ 不支持
- EXISTS 谓词
函数
✅ 支持
- UTC() - 返回当前的 UTC 时间(毫秒)
- 示例:
p.absolute_expiry_time IS NULL OR p.absolute_expiry_time > UTC()选择未过期的消息
- 示例:
❌ 不支持
- LOWER - 小写转换
- UPPER - 大写转换
- LEFT - 左子串
- RIGHT - 右子串
- SUBSTRING - 子串提取
- DATE - ISO 8601 日期解析
示例:SQL 过滤表达式
以下示例使 RabbitMQ 仅投递满足所有以下条件的消息
- 字段
user-id为John - 字段
subject以Order开头 - 应用程序提供的键
region为emea
- Java
- C#
- Go
- Python
- Erlang
- JavaScript
Consumer consumer = connection.consumerBuilder()
.stream().filter()
.sql("properties.user_id = 'John' AND " +
"properties.subject LIKE 'Order%' AND " +
"region = 'emea'")
.stream().builder()
.queue("my-queue")
.messageHandler((ctx, msg ) -> {
// message processing
})
.build();
IConsumer consumer = await connection.ConsumerBuilder().Queue("my-queue").
MessageHandler((context, message) =>
{
// process the messages
}
).Stream().Offset(StreamOffsetSpecification.First).Filter()
.Sql("properties.user_id = 'John' AND"
+ "properties.subject LIKE 'Order%' AND region = 'emea'")
.Stream().Builder()
.Stream().Builder()
.BuildAndStartAsync();
consumer, err := amqpConnection.NewConsumer(context.Background(), "my-queue",
&rmq.StreamConsumerOptions{
Offset: &rmq.OffsetFirst{},
StreamFilterOptions: &rmq.StreamFilterOptions{
SQL: "properties.user_id = 'John' AND "
+ "properties.subject LIKE 'Order%' AND region = 'emea'",
},
})
consumer = connection.consumer(
"my-queue",
message_handler=MyMessageHandler(),
consumer_options=StreamConsumerOptions(
offset_specification=OffsetSpecification.first,
filter_options=StreamFilterOptions(
sql="properties.user_id = 'John' AND "
+ "properties.subject LIKE 'Order%' AND region = 'emea'",
),
),
)
Expression = <<"properties.user_id = 'John' AND "
"properties.subject LIKE 'Order%' AND "
"region = 'emea'">>,
Filter = #{<<"sql-filter">> => #filter{descriptor = <<"amqp:sql-filter">>,
value = {utf8, Expression}}},
{ok, Receiver} = amqp10_client:attach_receiver_link(
Session, <<"my receiver">>, Address,
unsettled, none, Filter),
const consumer = await connection.createConsumer({
stream: {
name: "my-queue",
offset: Offset.first(),
sqlFilter: "properties.user_id = 'John' AND"
+ "properties.subject LIKE 'Order%' AND region = 'emea'"
},
messageHandler: (context, message) => {
// process the messages
},
})
consumer.start()
错误处理
定义错误
在消费者连接到流时,RabbitMQ 会检查过滤器的语法正确性,例如 SQL 表达式是否有效。如果提供的过滤器无效,RabbitMQ 将在发送给客户端的 attach 帧中排除该过滤器。此外,RabbitMQ 可能会记录一条带有过滤器无效原因的警告。客户端库随后应断开链路并将错误返回给客户端应用程序。
评估错误
当 RabbitMQ 在运行时根据消息评估过滤器且无法完成评估时,会发生评估错误。
此类评估错误的示例包括
- 不兼容类型的比较:例如,数字 3 是否大于字符串 "London"?
- 算术除以零:例如
3 / 0 - 整数除法出现浮点数:例如
3 % 1.2
遇到评估错误的过滤器会根据具体错误被视为返回 false 或 unknown。RabbitMQ 将仅向客户端投递那些 SQL 条件表达式评估为 true 的消息。
布隆过滤器 vs. AMQP 过滤表达式
下表比较了布隆过滤器(阶段 1)和 AMQP 过滤表达式(阶段 2)的特性。
| 功能 | 布隆过滤器 | AMQP 过滤表达式 |
|---|---|---|
| 服务器端过滤? | 是 | 是 |
| 阶段 | 阶段 1:从磁盘读取数据块头之后,但在从磁盘读取任何消息之前 | 阶段 2:RabbitMQ 将消息从磁盘读取到内存之后,但在投递给客户端之前 |
| 粒度 | 发布者按每条消息设置过滤值,但 RabbitMQ 按数据块评估过滤器 | 按每条消息 |
| 误报 (False Positives) | 可能:需要阶段 2 或阶段 3 的额外逐条消息过滤。 | 无 |
| 支持的协议 | AMQP 1.0, AMQP 0.9.1, RabbitMQ 流协议, STOMP | AMQP 1.0 |
| 支持多个过滤值 (发布者) | 否:发布者每条消息只能分配一个过滤值。 | 是:发布者可以在属性或应用程序属性部分定义多个值。 |
| 支持多个过滤表达式 (消费者) | 是:消费者可以指定多个过滤值,如果任何过滤值匹配,则投递该消息。 | 是:消费者可以提供多个过滤表达式。 |
| 过滤复杂度 | 低:简单的字符串相等匹配 | 高:可以进行复杂的 SQL 查询 |
| 评估速度 (消息/秒) | 高达数百万 | 高达数十万 |
| 代理开销 | 极小:布隆过滤器成员检查使用固定时间。如果数据块匹配且使用了 RabbitMQ 流协议,sendfile 系统调用会优化数据块投递,而无需消息进入用户空间。如果数据块不匹配,RabbitMQ 甚至不会读取磁盘上的消息。 | 主要通过高效的 Erlang 模式匹配实现。然而,每条消息都会被读取到内存中供每个消费者处理(除非与布隆过滤器结合使用)。 |
| 网络和客户端侧开销 | 较高:即使只有一条消息的过滤值匹配,也会传输整个数据块。 | 较低:仅传输与过滤器匹配的消息。 |
结合布隆过滤器和 AMQP 过滤表达式
通过将布隆过滤器与 AMQP(特别是 SQL)过滤表达式相结合,RabbitMQ 提供了两者的优势:在阶段 1 进行高效的数据块级过滤以跳过不必要的磁盘 I/O、CPU 和内存使用,随后在阶段 2 进行精确的消息级过滤以处理复杂的业务逻辑——所有操作均在服务器端完成。
示例:结合布隆过滤器和 SQL 过滤表达式
假设有一个包含各种客户相关事件的流,例如
user.loginproduct.viewcart.addcart.removeorder.createdreview.submitted- 等。
以下示例提供了一个复杂的 SQL 过滤表达式,查询满足以下所有条件的事件
- 过去一小时内创建的订单
- 位于
AMER、EMEA或AJP区域之一 - 订单必须是高优先级、高价格,或者是高级客户提交的
- Java
- C#
- Go
- Python
- Erlang
- JavaScript
Consumer consumer = connection.consumerBuilder()
.stream()
// This Bloom filter will be evaluated server-side per chunk (Stage 1).
.filterValues("order.created")
.filter()
// This complex SQL filter expression will be evaluted server-side
// per message at stage 2.
.sql("p.subject = 'order.created' AND " +
"p.creation_time > UTC() - 3600000 AND " +
"region IN ('AMER', 'EMEA', 'APJ') AND " +
"(h.priority > 4 OR price >= 99.99 OR premium_customer = TRUE)")
.stream().builder()
.queue("my-queue")
.messageHandler((ctx, msg ) -> {
// message processing
})
.build();
IConsumer consumer = await connection.ConsumerBuilder().Queue("my-queue").
MessageHandler((context, message) =>
// message processing
}).Stream().Offset(StreamOffsetSpecification.First)
// This Bloom filter will be evaluated server-side per chunk (Stage 1).
.FilterValues("order.created")
.Filter()
// This complex SQL filter expression will be evaluted server-side
// per message at stage 2.
.Sql("p.subject = 'order.created' AND " +
"p.creation_time > UTC() - 3600000 AND " +
"region IN ('AMER', 'EMEA', 'APJ') AND " +
"(h.priority > 4 OR price >= 99.99 OR premium_customer = TRUE)").Stream().Builder()
.BuildAndStartAsync().ConfigureAwait(false);
consumer, err := amqpConnection.NewConsumer(context.Background(), "my-queue",
&rmq.StreamConsumerOptions{
Offset: &rmq.OffsetFirst{},
StreamFilterOptions: &rmq.StreamFilterOptions{
// This Bloom filter will be evaluated server-side per chunk (Stage 1).
Values: []string{"order.created"},
// This complex SQL filter expression will be evaluted server-side
// per message at stage 2.
SQL:"p.subject = 'order.created' AND " +
"p.creation_time > UTC() - 3600000 AND " +
"region IN ('AMER', 'EMEA', 'APJ') AND " +
"(h.priority > 4 OR price >= 99.99 OR premium_customer = TRUE)",
},
})
consumer = consumer_connection.consumer(
"my-queue",
message_handler=MyMessageHandler(),
# the consumer will only receive messages with filter value banana and subject yellow
# and application property from = italy
consumer_options=StreamConsumerOptions(
offset_specification=OffsetSpecification.first,
filter_options=StreamFilterOptions(
# This Bloom filter will be evaluated server-side per chunk (Stage 1).
values=["order.created"],
# This complex SQL filter expression will be evaluted server-side
# per message at stage 2.
sql="p.subject = 'order.created' AND " +
"p.creation_time > UTC() - 3600000 AND " +
"region IN ('AMER', 'EMEA', 'APJ') AND " +
"(h.priority > 4 OR price >= 99.99 OR premium_customer = TRUE)",
),
),
)
Expression = <<"p.subject = 'order.created' AND "
"p.creation_time > UTC() - 3600000 AND "
"region IN ('AMER', 'EMEA', 'APJ') AND "
"(h.priority > 4 OR price >= 99.99 OR premium_customer = TRUE)">>,
Filter = #{%% This Bloom filter will be evaluated server-side per chunk at stage 1.
<<"my Bloom filter">> =>
#filter{descriptor = <<"rabbitmq:stream-filter">>,
value = {utf8, <<"order.created">>}},
%% This complex SQL filter expression will be evaluted server-side
%% per message at stage 2.
<<"sql-filter">> => #filter{descriptor = <<"amqp:sql-filter">>,
value = {utf8, Expression}}},
{ok, Receiver} = amqp10_client:attach_receiver_link(
Session, <<"my receiver">>, Address,
unsettled, none, Filter),
const consumer = await connection.createConsumer({
stream: {
name: "my-queue",
offset: Offset.first(),
filterValues: ["order.created"], // This Bloom filter will be evaluated server-side per chunk (Stage 1).
sqlFilter: "p.subject = 'order.created' AND " +
"p.creation_time > UTC() - 3600000 AND " +
"region IN ('AMER', 'EMEA', 'APJ') AND " +
"(h.priority > 4 OR price >= 99.99 OR premium_customer = TRUE)", // This complex SQL filter expression will be evaluted server-side
// per message at stage 2.
},
messageHandler: (context, message) => {
// message processing
},
})
consumer.start()
如果 order.created 事件仅占所有事件的一小部分,RabbitMQ 可以高效地过滤流,因为只有极少部分的消息需要在内存中进行解析和评估。
阶段 3:客户端过滤
在 RabbitMQ 将消息发送给客户端后,可以在客户端库或客户端应用程序本身中进行额外的过滤。
客户端过滤提供了最高的灵活性,因为客户端不受服务器端过滤原语的限制。例如,客户端可以通过检查消息体进行过滤。
关于如何将客户端过滤(阶段 3)与服务器端布隆过滤(阶段 1)相结合的示例,请参见 布隆过滤器 AMQP 1.0 示例 和 布隆过滤器 AMQP 0.9.1 示例。