RabbitMQ Streams 的代理端 SQL 过滤
RabbitMQ 4.2 为 流 引入了 SQL 过滤器表达式,支持强大的代理端消息过滤。
在我们的基准测试中,将 SQL 过滤器与布隆过滤器结合使用,在具有高入口速率的高度选择性场景中,过滤速率超过了每秒 400 万条消息。这意味着只有您的消费者真正关心的消息才会离开代理,大大降低了网络流量和客户端处理开销。
动机
高吞吐量的事件流通常会向消费者发送海量数据,其中大部分数据对消费者而言可能并不相关。在实际系统中,可能存在成千上万个主题(事件类型、租户、区域、SKU 等),这使得为每个主题创建一个专用流变得不切实际且难以扩展。
RabbitMQ Streams 通过 Broker 端 过滤 功能解决了这个问题。
布隆过滤器 (Bloom filters) 可以跳过不包含目标值的整个数据块,而 SQL 过滤表达式 则能够评估精确的单条消息谓词,从而确保只有匹配的消息才会通过网络传输。这减少了网络流量,降低了客户端的 CPU 和内存使用率,并简化了应用程序代码。
对 Broker 端过滤的需求由来已久 —— Kafka 用户多年来一直有此请求(参见 KAFKA-6020),但 Kafka 至今仍缺乏此功能。RabbitMQ 的“布隆过滤器 + SQL 过滤”组合使得在大规模场景下进行选择性消费成为可能。
让我们通过一个动手实践的示例来了解一下。
运行示例应用程序
要在您的环境中运行此示例
- 使用单个调度程序线程启动 RabbitMQ
docker run -it --rm --name rabbitmq -p 5672:5672 -e ERL_AFLAGS="+S 1" rabbitmq:4.2.0-beta.3
- 从示例应用程序的根目录,运行
mvn clean compile exec:java
该示例应用程序使用了 RabbitMQ AMQP 1.0 Java 客户端,因为 SQL 过滤表达式是 AMQP 1.0 的一项功能。
发布事件
考虑一个典型的电子商务平台,它会生成一个持续的客户事件流
product.searchproduct.viewcart.addcart.removeorder.created- 以及其他许多事件
我们的示例应用程序向流中发布了 1000 万个此类事件,其中 order.created 事件每 10 万条消息出现一次 —— 仅占总量的 0.001%。
每条消息都包含一个指向其事件类型的布隆过滤器注解,从而实现了高效的块级过滤
publisher
.message(body.getBytes(StandardCharsets.UTF_8))
.priority(priority)
// set the Bloom filter value
.annotation("x-stream-filter-value", eventType)
.subject(eventType)
.creationTime(creationTime)
// set application properties, e.g. region, price, or premium_customer
.property("region", region);
定义您的过滤器
假设您只想处理满足以下所有条件的高价值订单
- 事件类型为
order.created - 订单创建于最近一小时内
- 订单来自 AMER、EMEA 或 APJ 区域
- 并且至少满足以下条件之一
- 优先级 > 4
- 价格 ≥ 99.99
- 高级客户
在我们的演示中,1000 万条消息中只有 10 条满足这些条件 —— 这是一个在现实应用中很常见的高选择性过滤场景。
传统方法需要消费所有 1000 万条消息并在客户端进行过滤,这将导致巨大的网络开销和资源浪费。
SQL 过滤表达式通过在 Broker 端执行所有过滤操作,优雅地解决了这个问题
String SQL =
"properties.subject = 'order.created' AND " +
"properties.creation_time > UTC() - 3600000 AND " +
"region IN ('AMER', 'EMEA', 'APJ') AND " +
"(header.priority > 4 OR price >= 99.99 OR premium_customer = TRUE)";
消费者实现变得非常简单
ConsumerBuilder.StreamOptions builder = connection.consumerBuilder()
.queue(STREAM_NAME)
.stream()
.offset(FIRST);
if (useBloomFilter) {
// Stage 1: Bloom filter - quickly skip chunks without order.created events
builder = builder.filterValues("order.created");
}
Consumer consumer = builder
// Stage 2: SQL filter - precise broker-side per-message filtering
.filter()
.sql(SQL)
.stream()
.builder()
.messageHandler((ctx, msg) -> {
System.out.printf(" [%s] Received: %s\n",
consumerType, new String(msg.body(), StandardCharsets.UTF_8));
latch.countDown();
ctx.accept();
})
.build();
性能结果
仅使用 SQL 过滤
Received 10 messages in 24.71 seconds using SQL filter only
Broker-side filtering rate: 404,645 messages/second
消费者仅收到符合条件的 10 条消息。所有过滤均在 Broker 端完成,处理速度超过每秒 40 万条消息,同时仅将相关数据通过网络进行传输。
布隆过滤器 + SQL 过滤
Received 10 messages in 2.05 seconds using Bloom + SQL filters
Broker-side filtering rate: 4,868,549 messages/second
通过结合两个过滤阶段,性能提高了一个数量级。布隆过滤器(第 1 阶段)在数据从磁盘读取之前,就剔除了不包含 order.created 事件的整个数据块;而 SQL 过滤器(第 2 阶段)则对剩余的消息应用精确的业务逻辑。
通过将布隆过滤器与 SQL 过滤表达式相结合,RabbitMQ 兼具两者的优势:在第 1 阶段进行高效的块级过滤以跳过不必要的磁盘 I/O、CPU 和内存占用,随后在第 2 阶段进行精确的消息级过滤以处理复杂业务逻辑 —— 所有这些都在 Broker 端完成。
请注意,实际的布隆过滤性能取决于每个块的消息数量,这会随着消息摄入速率的变化而变化。
了解更多
请查阅我们的新版 流过滤 指南,获取最佳实践、示例和配置技巧。
