跳至主要内容

流过滤

·阅读时长:6 分钟

流过滤是 RabbitMQ 3.13 中的新功能。当应用程序只需要处理流中的一部分消息时,它可以节省代理与应用程序之间的带宽。

继续阅读以了解流过滤的工作原理,并查看实际操作。

流过滤的概念

假设您有一个包含来自世界各地数据的流,以及一个仅需要处理该数据子集的应用程序,比如该地区的消息。该应用程序可以读取整个流并过滤掉数据,以仅处理其感兴趣的消息。这种方法可行,但意味着整个流内容都将通过网络。

流过滤在代理端提供了一级高效的过滤功能,无需 代理解释消息。对于某些用例,它可以显著减少网络上交换的数据量。让我们来了解一下这项激动人心的新功能的语义。

在发布端

流过滤基于一个过滤值:发布应用程序可以将每个消息与一个字符串值关联。过滤值可以是任何值,但它们应满足某些标准才能使过滤正常工作。在消息之间共享的一组定义的值是一个不错的选择:地理位置(例如国家、州)、存储文档信息的流中的文档类型(例如工资单、发票、订单)、产品类别(例如书籍、行李、玩具)。

消息与过滤值关联的方式取决于客户端库。以下是一个使用流 Java 客户端的示例

Producer producer = environment.producerBuilder()
.stream("invoices")
.filterValue(msg -> msg.getApplicationProperties().get("region").toString())
.build();

在此示例中,应用程序开发人员提供了一些逻辑来从消息应用程序属性中提取过滤值。使用过滤功能非常简单:无需更改实际的消息发布代码,您只需要在创建 Producer 时提供过滤值逻辑。

现在让我们看看它在消费者端如何工作。

在消费者端

以下是一个 Java 代码片段,用于声明一个仅对来自 emea 区域的消息感兴趣的消费者

Consumer consumer = environment.consumerBuilder()
.stream("invoices")
.filter()
.values("emea")
.postFilter(msg -> "emea".equals(msg.getApplicationProperties().get("region")))
.builder()
.messageHandler((ctx, msg) -> {
// message processing code
})
.build();

过滤功能在两个地方配置

  • filter().values(String... filterValues) 告诉代理我们对与这些值关联的消息感兴趣(我们可以指定多个值,而不仅仅是一个值)
  • filter().postFilter(Predicate<Message> filter) 提供一些客户端逻辑,以过滤掉 与预期过滤值关联的消息。

为什么需要这种客户端过滤逻辑?代理端过滤逻辑使用布隆过滤器

布隆过滤器是一种空间效率高的概率数据结构,用于测试元素是否为集合中的成员。

布隆过滤器在存储和速度方面非常高效,但它是概率性的:它可能会返回误报。因此,代理可能会发送它认为与预期过滤值匹配的消息,而实际上并非如此。这就是为什么需要一些客户端过滤逻辑。

这是一个需要注意的地方,但与流过滤带来的好处相比,这是一个微不足道的缺点。

一篇后续博客文章将介绍流过滤的内部机制,以供那些对技术细节感兴趣的人参考。您还可以查看流 Java 客户端文档中的过滤功能以获取更多信息。其中包括一条消息不一定必须与过滤值关联,以及消费者可以选择接收具有给定过滤值的消息以及没有 过滤值的消息(使用 filter().matchUnfiltered())。

试用

让我们看看流过滤功能的实际操作。启动 RabbitMQ 3.13+ 节点

docker run -it --rm --name rabbitmq -p 5552:5552 \
-e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS='-rabbitmq_stream advertised_host localhost' \
rabbitmq:3.13

启用流插件

docker exec rabbitmq rabbitmq-plugins enable rabbitmq_stream

下载流性能测试(运行它需要 Java 11 或更高版本)

cd /tmp
wget -O stream-perf-test.jar \
https://github.com/rabbitmq/rabbitmq-java-tools-binaries-dev/releases/download/v-stream-perf-test-latest/stream-perf-test-latest.jar

让我们发布 10 秒钟的消息

java -jar stream-perf-test.jar --producers 1 --consumers 0 --rate 100 --filter-value-set 1..50 --size 10000 --time 10

消息长度为 10 KB,--filter-value-set 1..50 表示每个消息都与 "1""50" 之间的随机过滤值相关联。

让我们消费所有消息(没有任何过滤)

java -jar stream-perf-test.jar --producers 0 --consumers 1 --offset first --prometheus

当消费者到达流的末尾时,输出应该在几秒钟后停止。不要停止应用程序,而是打开另一个终端标签并查询流性能测试指标以查看它读取了多少数据

curl --silent localhost:8080/metrics | grep rabbitmq_stream_read_bytes_total

您应该会看到以下类似内容

# HELP rabbitmq_stream_read_bytes_total
# TYPE rabbitmq_stream_read_bytes_total counter
rabbitmq_stream_read_bytes_total 1.0046894E7

大约 10 MB。客户端必须传输整个流。

现在停止流性能测试(Ctrl-C)并重新启动它,这次启用过滤功能

java -jar stream-perf-test.jar --producers 0 --consumers 1 --offset first --prometheus --filter-values 5

在这里,我们请求仅获取具有 "5" 过滤值的 消息(--filter-values 5)。再次等待输出停止,并检查读取的字节数

curl --silent localhost:8080/metrics | grep rabbitmq_stream_read_bytes_total

您应该会看到以下类似内容

# HELP rabbitmq_stream_read_bytes_total
# TYPE rabbitmq_stream_read_bytes_total counter
rabbitmq_stream_read_bytes_total 1957641.0

小于 2 MB。节省了 8 MB 带宽,约 80%,还不错!

当然,这在某种程度上是人为的:流性能测试不是真正的应用程序,它不太可能像真正的应用程序那样分发消息和过滤值。但是,它仍然可以让我们了解使用流过滤功能可以节省多少带宽。

总结

我们简要概述了 RabbitMQ 3.13 中的流过滤功能。当消息从代理分发到消费应用程序时,它可以节省带宽。并非所有用例都能从流过滤功能中获益,但对于那些可以获益的用例,其带宽优势非常明显。

请继续关注后续博客文章,它将介绍流过滤的内部细节,以帮助您以最佳方式使用和配置它。

© 2024 RabbitMQ. All rights reserved.