跳到主要内容

流过滤

·6 分钟阅读

流过滤是 RabbitMQ 3.13 中的一项新功能。当应用程序只需要流消息的子集时,它允许节省 broker 和消费应用程序之间的带宽。

继续阅读以了解流过滤的工作原理并查看实际应用。

流过滤的概念

假设你有一个包含来自世界各地数据的流,并且一个应用程序只需要处理此数据的子集,例如特定区域的消息。应用程序可以读取整个流并过滤掉数据,只处理它感兴趣的消息。这可以工作,但这意味着整个流内容将通过网络传输。

流过滤在 broker 端提供第一级高效过滤,无需 broker 解释消息。对于某些用例,它可以显着减少网络上交换的数据量。让我们了解一下这个令人兴奋的新功能的语义。

在发布端

流过滤基于过滤器值:发布应用程序可以将每个消息与一个字符串值关联。过滤器值可以是任何内容,但它们应满足某些标准才能使过滤正常工作。跨消息共享的已定义值集是一个很好的选择:地理位置(例如,国家、州)、存储文档信息的流中的文档类型(例如,工资单、发票、订单)、产品类别(例如,书籍、行李、玩具)。

消息如何与过滤器值关联取决于客户端库。这是一个使用 stream Java 客户端的示例

Producer producer = environment.producerBuilder()
.stream("invoices")
.filterValue(msg -> msg.getApplicationProperties().get("region").toString())
.build();

在此示例中,应用程序开发人员提供了一些逻辑来从消息应用程序属性中提取过滤器值。使用过滤就像这样简单:无需更改实际的消息发布代码,只需在创建 Producer 时提供过滤器值逻辑即可。

现在让我们看看它在消费者端是如何工作的。

在消费者端

这是一个 Java 代码片段,用于声明一个仅对来自 emea 区域的消息感兴趣的消费者

Consumer consumer = environment.consumerBuilder()
.stream("invoices")
.filter()
.values("emea")
.postFilter(msg -> "emea".equals(msg.getApplicationProperties().get("region")))
.builder()
.messageHandler((ctx, msg) -> {
// message processing code
})
.build();

过滤在两个地方配置

  • filter().values(String... filterValues) 告诉 broker 我们对与这些值关联的消息感兴趣(我们可以指定多个值,而不仅仅是一个)
  • filter().postFilter(Predicate<Message> filter) 提供一些客户端逻辑来过滤掉与预期过滤器值关联的消息

为什么需要这种客户端过滤逻辑?broker 端过滤逻辑使用 布隆过滤器

布隆过滤器是一种节省空间的概率数据结构,用于测试元素是否是集合的成员。

布隆过滤器在存储和速度方面非常高效,但它是概率性的:它可能会返回误报。因此,broker 可能会发送它认为与预期过滤器值匹配的消息,但实际上它们并不匹配。这就是为什么需要一些客户端过滤逻辑。

这是需要注意的事情,但与流过滤带来的好处相比,这只是一个小小的缺点。

后续博客文章涵盖了对技术细节感兴趣的人的流过滤内部原理。您还可以查看 stream Java 客户端关于过滤的文档以获取更多信息。它涵盖了消息不一定总是需要与过滤器值关联,并且消费者可以选择接收具有给定过滤器值和没有过滤器值的消息(使用 filter().matchUnfiltered())。

尝试一下

让我们看看流过滤的实际应用。启动一个 RabbitMQ 3.13+ 节点

docker run -it --rm --name rabbitmq -p 5552:5552 \
-e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS='-rabbitmq_stream advertised_host localhost' \
rabbitmq:3.13

启用 stream 插件

docker exec rabbitmq rabbitmq-plugins enable rabbitmq_stream

下载 Stream PerfTest(它需要 Java 11 或更高版本才能运行)

cd /tmp
wget -O stream-perf-test.jar \
https://github.com/rabbitmq/rabbitmq-java-tools-binaries-dev/releases/download/v-stream-perf-test-latest/stream-perf-test-latest.jar

让我们发布消息 10 秒钟

java -jar stream-perf-test.jar --producers 1 --consumers 0 --rate 100 --filter-value-set 1..50 --size 10000 --time 10

消息长度为 10 KB,--filter-value-set 1..50 表示每个消息都关联一个介于 "1""50" 之间的随机过滤器值。

让我们消费所有消息(没有任何过滤)

java -jar stream-perf-test.jar --producers 0 --consumers 1 --offset first --prometheus

输出应在几秒钟后停止,当消费者到达流的末尾时。不要停止应用程序,而是打开另一个终端选项卡,并查询 Stream PerfTest 指标以查看它读取了多少数据

curl --silent localhost:8080/metrics | grep rabbitmq_stream_read_bytes_total

您应该得到类似以下内容

# HELP rabbitmq_stream_read_bytes_total
# TYPE rabbitmq_stream_read_bytes_total counter
rabbitmq_stream_read_bytes_total 1.0046894E7

大约 10 MB。客户端必须传输整个流。

现在停止 Stream PerfTest (Ctrl-C) 并再次启动它,这次启用过滤

java -jar stream-perf-test.jar --producers 0 --consumers 1 --offset first --prometheus --filter-values 5

这里我们要求只获取过滤器值为 "5" 的消息 (--filter-values 5)。再次,等待输出停止并检查读取的字节数

curl --silent localhost:8080/metrics | grep rabbitmq_stream_read_bytes_total

您应该得到类似

# HELP rabbitmq_stream_read_bytes_total
# TYPE rabbitmq_stream_read_bytes_total counter
rabbitmq_stream_read_bytes_total 1957641.0

这不到 2 MB。节省了 8 MB 的带宽,大约 80%,还不错!

当然,这有点人为:Stream PerfTest 不是一个真实的应用程序,并且它不太可能以真实应用程序的方式分发消息和过滤器值。但即便如此,它还是给出了流过滤可以节省多少带宽的概念。

总结

我们快速概述了 RabbitMQ 3.13 中的流过滤。它允许在消息从 broker 分发到消费应用程序时节省带宽。并非所有用例都可以从流过滤中受益,但对于那些可以受益的用例来说,带宽方面的优势非常引人注目。

请继续关注后续博客文章,其中将介绍流过滤的内部细节,以帮助您以最佳方式使用和配置它。

© . All rights reserved.