跳至主内容

RabbitMQ Streams 的代理端 SQL 过滤

·5 分钟阅读

RabbitMQ 4.2 为 引入了 SQL 过滤器表达式,支持强大的代理端消息过滤。

在我们的基准测试中,将 SQL 过滤器与布隆过滤器结合使用,在具有高入口速率的高度选择性场景中,过滤速率超过了每秒 400 万条消息。这意味着只有您的消费者真正关心的消息才会离开代理,大大降低了网络流量和客户端处理开销。

动机

高吞吐量的事件流通常会向消费者传递大量数据,其中很多可能与他们无关。在实际系统中,可能存在成千上万个主题(事件类型、租户、区域、SKU 等),为每个主题设置一个单独的流既不实际也不可扩展。

RabbitMQ Streams 通过服务端 过滤 来解决这个问题。

Bloom 过滤器 可以跳过不包含感兴趣值的整个数据块,而 SQL 过滤表达式 则可以评估精确的每条消息谓词,从而只有匹配的消息才能跨越网络。这可以减少网络流量,降低客户端 CPU 和内存使用,并保持应用程序代码的简洁。

长期以来,人们一直要求服务端过滤功能——Kafka 用户多年来一直在要求此功能(请参阅 KAFKA-6020)——但 Kafka 仍然缺乏此功能。RabbitMQ 的 Bloom + SQL 过滤功能使选择性消费在当今大规模应用中变得可行。

让我们通过一个实际示例来演示。

运行示例应用程序

要在您的环境中运行此示例

  1. 使用单个调度器线程启动 RabbitMQ
docker run -it --rm --name rabbitmq -p 5672:5672 -e ERL_AFLAGS="+S 1" rabbitmq:4.2.0-beta.3
  1. 示例应用程序 的根目录运行
mvn clean compile exec:java

示例应用程序使用了 RabbitMQ AMQP 1.0 Java 客户端,因为 SQL 过滤表达式是 AMQP 1.0 的一项功能。

发布事件

考虑一个典型的电子商务平台,它生成一个连续的客户事件流

  • product.search
  • product.view
  • cart.add
  • cart.remove
  • order.created
  • 以及许多其他事件

我们的示例应用程序将 1000 万个此类事件发布到一个流中,其中 order.created 事件每 100,000 条消息发生一次——仅占总量的 0.001%。

每条消息都包含一个设置为其事件类型的 Bloom 过滤器注解,从而实现高效的数据块级过滤

publisher
.message(body.getBytes(StandardCharsets.UTF_8))
.priority(priority)
// set the Bloom filter value
.annotation("x-stream-filter-value", eventType)
.subject(eventType)
.creationTime(creationTime)
// set application properties, e.g. region, price, or premium_customer
.property("region", region);

定义您的过滤器

假设您只想处理满足以下所有条件的高价值订单

  • 事件类型为 order.created
  • 订单创建时间在过去一小时内
  • 订单源自 AMER、EMEA 或 APJ 区域
  • 并且至少满足以下条件之一
    • Priority > 4
    • Price ≥ 99.99
    • Premium customer

在我们的演示中,1000 万条消息中只有 10 条满足这些条件——这是实际应用中常见的、高度选择性的过滤场景。

传统方法需要消费所有 1000 万条消息并在客户端进行过滤,这将导致巨大的网络开销和资源浪费。

SQL 过滤表达式通过在服务端执行所有过滤来优雅地解决此问题

String SQL =
"properties.subject = 'order.created' AND " +
"properties.creation_time > UTC() - 3600000 AND " +
"region IN ('AMER', 'EMEA', 'APJ') AND " +
"(header.priority > 4 OR price >= 99.99 OR premium_customer = TRUE)";

消费者实现变得简单明了

ConsumerBuilder.StreamOptions builder = connection.consumerBuilder()
.queue(STREAM_NAME)
.stream()
.offset(FIRST);

if (useBloomFilter) {
// Stage 1: Bloom filter - quickly skip chunks without order.created events
builder = builder.filterValues("order.created");
}

Consumer consumer = builder
// Stage 2: SQL filter - precise broker-side per-message filtering
.filter()
.sql(SQL)
.stream()
.builder()
.messageHandler((ctx, msg) -> {
System.out.printf(" [%s] Received: %s\n",
consumerType, new String(msg.body(), StandardCharsets.UTF_8));
latch.countDown();
ctx.accept();
})
.build();

性能结果

仅 SQL 过滤

Received 10 messages in 24.71 seconds using SQL filter only
Broker-side filtering rate: 404,645 messages/second

消费者恰好收到了 10 条匹配条件的的消息。所有过滤都在代理上进行,每秒处理超过 400k 条消息,同时仅通过网络传输相关数据。

Bloom + SQL 过滤

Received 10 messages in 2.05 seconds using Bloom + SQL filters
Broker-side filtering rate: 4,868,549 messages/second

通过结合这两种过滤阶段,性能提高了十倍。Bloom 过滤器(第一阶段)在从磁盘读取数据块之前即可丢弃不包含 order.created 事件的整个数据块,而 SQL 过滤器(第二阶段)则对剩余的消息应用精确的业务逻辑。

提示

通过将 Bloom 过滤器与 SQL 过滤表达式结合使用,RabbitMQ 提供了两者的优势:第一阶段高效的数据块级过滤,用于跳过不必要的磁盘 I/O、CPU 和内存使用,然后是第二阶段精确的消息级过滤,用于处理复杂的业务逻辑——所有这些都在代理上完成。

请注意,实际的 Bloom 过滤性能取决于每个数据块的消息数量,这会随着消息的传入速率而变化。

了解更多

请查阅我们新的 Stream Filtering 指南,了解最佳实践、示例和配置技巧。

© . This site is unofficial and not affiliated with VMware.