RabbitMQ Streams 的代理端 SQL 过滤
RabbitMQ 4.2 为 流 引入了 SQL 过滤器表达式,支持强大的代理端消息过滤。
在我们的基准测试中,将 SQL 过滤器与布隆过滤器结合使用,在具有高入口速率的高度选择性场景中,过滤速率超过了每秒 400 万条消息。这意味着只有您的消费者真正关心的消息才会离开代理,大大降低了网络流量和客户端处理开销。
动机
高吞吐量的事件流通常会向消费者传递大量数据,其中很多可能与他们无关。在实际系统中,可能存在成千上万个主题(事件类型、租户、区域、SKU 等),为每个主题设置一个单独的流既不实际也不可扩展。
RabbitMQ Streams 通过服务端 过滤 来解决这个问题。
Bloom 过滤器 可以跳过不包含感兴趣值的整个数据块,而 SQL 过滤表达式 则可以评估精确的每条消息谓词,从而只有匹配的消息才能跨越网络。这可以减少网络流量,降低客户端 CPU 和内存使用,并保持应用程序代码的简洁。
长期以来,人们一直要求服务端过滤功能——Kafka 用户多年来一直在要求此功能(请参阅 KAFKA-6020)——但 Kafka 仍然缺乏此功能。RabbitMQ 的 Bloom + SQL 过滤功能使选择性消费在当今大规模应用中变得可行。
让我们通过一个实际示例来演示。
运行示例应用程序
要在您的环境中运行此示例
- 使用单个调度器线程启动 RabbitMQ
docker run -it --rm --name rabbitmq -p 5672:5672 -e ERL_AFLAGS="+S 1" rabbitmq:4.2.0-beta.3
- 从 示例应用程序 的根目录运行
mvn clean compile exec:java
示例应用程序使用了 RabbitMQ AMQP 1.0 Java 客户端,因为 SQL 过滤表达式是 AMQP 1.0 的一项功能。
发布事件
考虑一个典型的电子商务平台,它生成一个连续的客户事件流
product.searchproduct.viewcart.addcart.removeorder.created- 以及许多其他事件
我们的示例应用程序将 1000 万个此类事件发布到一个流中,其中 order.created 事件每 100,000 条消息发生一次——仅占总量的 0.001%。
每条消息都包含一个设置为其事件类型的 Bloom 过滤器注解,从而实现高效的数据块级过滤
publisher
.message(body.getBytes(StandardCharsets.UTF_8))
.priority(priority)
// set the Bloom filter value
.annotation("x-stream-filter-value", eventType)
.subject(eventType)
.creationTime(creationTime)
// set application properties, e.g. region, price, or premium_customer
.property("region", region);
定义您的过滤器
假设您只想处理满足以下所有条件的高价值订单
- 事件类型为
order.created - 订单创建时间在过去一小时内
- 订单源自 AMER、EMEA 或 APJ 区域
- 并且至少满足以下条件之一
- Priority > 4
- Price ≥ 99.99
- Premium customer
在我们的演示中,1000 万条消息中只有 10 条满足这些条件——这是实际应用中常见的、高度选择性的过滤场景。
传统方法需要消费所有 1000 万条消息并在客户端进行过滤,这将导致巨大的网络开销和资源浪费。
SQL 过滤表达式通过在服务端执行所有过滤来优雅地解决此问题
String SQL =
"properties.subject = 'order.created' AND " +
"properties.creation_time > UTC() - 3600000 AND " +
"region IN ('AMER', 'EMEA', 'APJ') AND " +
"(header.priority > 4 OR price >= 99.99 OR premium_customer = TRUE)";
消费者实现变得简单明了
ConsumerBuilder.StreamOptions builder = connection.consumerBuilder()
.queue(STREAM_NAME)
.stream()
.offset(FIRST);
if (useBloomFilter) {
// Stage 1: Bloom filter - quickly skip chunks without order.created events
builder = builder.filterValues("order.created");
}
Consumer consumer = builder
// Stage 2: SQL filter - precise broker-side per-message filtering
.filter()
.sql(SQL)
.stream()
.builder()
.messageHandler((ctx, msg) -> {
System.out.printf(" [%s] Received: %s\n",
consumerType, new String(msg.body(), StandardCharsets.UTF_8));
latch.countDown();
ctx.accept();
})
.build();
性能结果
仅 SQL 过滤
Received 10 messages in 24.71 seconds using SQL filter only
Broker-side filtering rate: 404,645 messages/second
消费者恰好收到了 10 条匹配条件的的消息。所有过滤都在代理上进行,每秒处理超过 400k 条消息,同时仅通过网络传输相关数据。
Bloom + SQL 过滤
Received 10 messages in 2.05 seconds using Bloom + SQL filters
Broker-side filtering rate: 4,868,549 messages/second
通过结合这两种过滤阶段,性能提高了十倍。Bloom 过滤器(第一阶段)在从磁盘读取数据块之前即可丢弃不包含 order.created 事件的整个数据块,而 SQL 过滤器(第二阶段)则对剩余的消息应用精确的业务逻辑。
通过将 Bloom 过滤器与 SQL 过滤表达式结合使用,RabbitMQ 提供了两者的优势:第一阶段高效的数据块级过滤,用于跳过不必要的磁盘 I/O、CPU 和内存使用,然后是第二阶段精确的消息级过滤,用于处理复杂的业务逻辑——所有这些都在代理上完成。
请注意,实际的 Bloom 过滤性能取决于每个数据块的消息数量,这会随着消息的传入速率而变化。
了解更多
请查阅我们新的 Stream Filtering 指南,了解最佳实践、示例和配置技巧。
