跳至主内容

RabbitMQ Streams 的代理端 SQL 过滤

·5 分钟阅读

RabbitMQ 4.2 为 引入了 SQL 过滤器表达式,支持强大的代理端消息过滤。

在我们的基准测试中,将 SQL 过滤器与布隆过滤器结合使用,在具有高入口速率的高度选择性场景中,过滤速率超过了每秒 400 万条消息。这意味着只有您的消费者真正关心的消息才会离开代理,大大降低了网络流量和客户端处理开销。

动机

高吞吐量的事件流通常会向消费者发送海量数据,其中大部分数据对消费者而言可能并不相关。在实际系统中,可能存在成千上万个主题(事件类型、租户、区域、SKU 等),这使得为每个主题创建一个专用流变得不切实际且难以扩展。

RabbitMQ Streams 通过 Broker 端 过滤 功能解决了这个问题。

布隆过滤器 (Bloom filters) 可以跳过不包含目标值的整个数据块,而 SQL 过滤表达式 则能够评估精确的单条消息谓词,从而确保只有匹配的消息才会通过网络传输。这减少了网络流量,降低了客户端的 CPU 和内存使用率,并简化了应用程序代码。

对 Broker 端过滤的需求由来已久 —— Kafka 用户多年来一直有此请求(参见 KAFKA-6020),但 Kafka 至今仍缺乏此功能。RabbitMQ 的“布隆过滤器 + SQL 过滤”组合使得在大规模场景下进行选择性消费成为可能。

让我们通过一个动手实践的示例来了解一下。

运行示例应用程序

要在您的环境中运行此示例

  1. 使用单个调度程序线程启动 RabbitMQ
docker run -it --rm --name rabbitmq -p 5672:5672 -e ERL_AFLAGS="+S 1" rabbitmq:4.2.0-beta.3
  1. 示例应用程序的根目录,运行
mvn clean compile exec:java

该示例应用程序使用了 RabbitMQ AMQP 1.0 Java 客户端,因为 SQL 过滤表达式是 AMQP 1.0 的一项功能。

发布事件

考虑一个典型的电子商务平台,它会生成一个持续的客户事件流

  • product.search
  • product.view
  • cart.add
  • cart.remove
  • order.created
  • 以及其他许多事件

我们的示例应用程序向流中发布了 1000 万个此类事件,其中 order.created 事件每 10 万条消息出现一次 —— 仅占总量的 0.001%。

每条消息都包含一个指向其事件类型的布隆过滤器注解,从而实现了高效的块级过滤

publisher
.message(body.getBytes(StandardCharsets.UTF_8))
.priority(priority)
// set the Bloom filter value
.annotation("x-stream-filter-value", eventType)
.subject(eventType)
.creationTime(creationTime)
// set application properties, e.g. region, price, or premium_customer
.property("region", region);

定义您的过滤器

假设您只想处理满足以下所有条件的高价值订单

  • 事件类型为 order.created
  • 订单创建于最近一小时内
  • 订单来自 AMER、EMEA 或 APJ 区域
  • 并且至少满足以下条件之一
    • 优先级 > 4
    • 价格 ≥ 99.99
    • 高级客户

在我们的演示中,1000 万条消息中只有 10 条满足这些条件 —— 这是一个在现实应用中很常见的高选择性过滤场景。

传统方法需要消费所有 1000 万条消息并在客户端进行过滤,这将导致巨大的网络开销和资源浪费。

SQL 过滤表达式通过在 Broker 端执行所有过滤操作,优雅地解决了这个问题

String SQL =
"properties.subject = 'order.created' AND " +
"properties.creation_time > UTC() - 3600000 AND " +
"region IN ('AMER', 'EMEA', 'APJ') AND " +
"(header.priority > 4 OR price >= 99.99 OR premium_customer = TRUE)";

消费者实现变得非常简单

ConsumerBuilder.StreamOptions builder = connection.consumerBuilder()
.queue(STREAM_NAME)
.stream()
.offset(FIRST);

if (useBloomFilter) {
// Stage 1: Bloom filter - quickly skip chunks without order.created events
builder = builder.filterValues("order.created");
}

Consumer consumer = builder
// Stage 2: SQL filter - precise broker-side per-message filtering
.filter()
.sql(SQL)
.stream()
.builder()
.messageHandler((ctx, msg) -> {
System.out.printf(" [%s] Received: %s\n",
consumerType, new String(msg.body(), StandardCharsets.UTF_8));
latch.countDown();
ctx.accept();
})
.build();

性能结果

仅使用 SQL 过滤

Received 10 messages in 24.71 seconds using SQL filter only
Broker-side filtering rate: 404,645 messages/second

消费者仅收到符合条件的 10 条消息。所有过滤均在 Broker 端完成,处理速度超过每秒 40 万条消息,同时仅将相关数据通过网络进行传输。

布隆过滤器 + SQL 过滤

Received 10 messages in 2.05 seconds using Bloom + SQL filters
Broker-side filtering rate: 4,868,549 messages/second

通过结合两个过滤阶段,性能提高了一个数量级。布隆过滤器(第 1 阶段)在数据从磁盘读取之前,就剔除了不包含 order.created 事件的整个数据块;而 SQL 过滤器(第 2 阶段)则对剩余的消息应用精确的业务逻辑。

提示

通过将布隆过滤器与 SQL 过滤表达式相结合,RabbitMQ 兼具两者的优势:在第 1 阶段进行高效的块级过滤以跳过不必要的磁盘 I/O、CPU 和内存占用,随后在第 2 阶段进行精确的消息级过滤以处理复杂业务逻辑 —— 所有这些都在 Broker 端完成。

请注意,实际的布隆过滤性能取决于每个块的消息数量,这会随着消息摄入速率的变化而变化。

了解更多

请查阅我们的新版 流过滤 指南,获取最佳实践、示例和配置技巧。

© . This site is unofficial and not affiliated with VMware.