流过滤
流过滤是 RabbitMQ 3.13 中的一项新功能。当消费应用程序只需要流中的一部分消息时,它可以节省代理和消费应用程序之间的带宽。
继续阅读以了解流过滤的工作原理并查看其实际应用。
流过滤的概念
设想你有一个包含来自世界各地数据的流,而你的应用程序只需要处理其中的一个子集,比如某个特定区域的消息。应用程序可以读取整个流并过滤出它感兴趣的消息。这虽然可行,但意味着整个流的内容都需要通过网络传输。
流过滤在代理(Broker)端提供了第一层高效过滤,且无需代理去解析消息内容。在某些使用场景下,这可以显著减少网络交换的数据量。让我们一起来了解这项令人兴奋的新功能背后的语义。
在发布端
流过滤基于“过滤值”(filter value):发布应用程序可以将每个消息与一个字符串值关联。过滤值可以是任何内容,但为了使过滤正常工作,它们应当满足一定的标准。跨消息共享的一组预定义值是理想的选择,例如:地理位置(如国家、省份)、存储文档信息流中的文档类型(如工资单、发票、订单)、产品类别(如书籍、行李、玩具)。
如何将消息与过滤值关联取决于所使用的客户端库。以下是流 Java 客户端的一个示例。
Producer producer = environment.producerBuilder()
.stream("invoices")
.filterValue(msg -> msg.getApplicationProperties().get("region").toString())
.build();
在此示例中,应用程序开发人员提供了从消息应用程序属性中提取过滤值的逻辑。使用过滤非常简单:无需更改实际的消息发布代码,只需在创建 Producer 时提供过滤值逻辑即可。
现在让我们看看消费者端是如何工作的。
在消费者端
以下是一个 Java 代码片段,用于声明一个只对 emea 区域的消息感兴趣的消费者。
Consumer consumer = environment.consumerBuilder()
.stream("invoices")
.filter()
.values("emea")
.postFilter(msg -> "emea".equals(msg.getApplicationProperties().get("region")))
.builder()
.messageHandler((ctx, msg) -> {
// message processing code
})
.build();
过滤在以下两个地方进行配置:
filter().values(String... filterValues):告知代理我们对关联了这些值的消息感兴趣(可以指定多个值,而不仅限于一个)。filter().postFilter(Predicate<Message> filter):提供客户端逻辑,以过滤掉那些与预期过滤值不匹配的消息。
为什么需要这种客户端过滤逻辑?这是因为代理端的过滤逻辑使用了布隆过滤器(Bloom filter)。
布隆过滤器是一种空间效率极高的概率型数据结构,用于测试某个元素是否属于一个集合。
布隆过滤器在存储和速度方面非常高效,但它具有概率性:可能会产生“假阳性”(false positives)。因此,代理可能会发送它认为匹配但实际上不匹配的消息。这就是为什么必须具备一些客户端过滤逻辑的原因。
这是一个需要注意的地方,但与流过滤带来的好处相比,这只是一个小小的瑕疵。
对于有兴趣了解技术细节的用户,可以查看后续发布的博客文章,其中涵盖了流过滤的内部机制。你也可以参阅流 Java 客户端关于过滤的文档以获取更多信息。该文档还涵盖了其他事项,例如消息并非必须关联过滤值,消费者可以选择接收具有特定过滤值的消息,以及接收“无”过滤值的消息(使用 filter().matchUnfiltered())。
试用一下
让我们看看流过滤的实际操作。启动一个 RabbitMQ 3.13+ 节点。
docker run -it --rm --name rabbitmq -p 5552:5552 \
-e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS='-rabbitmq_stream advertised_host localhost' \
rabbitmq:3.13
启用流插件。
docker exec rabbitmq rabbitmq-plugins enable rabbitmq_stream
下载 Stream PerfTest(需要 Java 11 或更高版本)。
cd /tmp
wget -O stream-perf-test.jar \
https://github.com/rabbitmq/rabbitmq-java-tools-binaries-dev/releases/download/v-stream-perf-test-latest/stream-perf-test-latest.jar
让我们发布 10 秒的消息。
java -jar stream-perf-test.jar --producers 1 --consumers 0 --rate 100 --filter-value-set 1..50 --size 10000 --time 10
消息长度为 10 KB,--filter-value-set 1..50 表示为每条消息关联一个 "1" 到 "50" 之间的随机过滤值。
让我们消费所有消息(不进行任何过滤)。
java -jar stream-perf-test.jar --producers 0 --consumers 1 --offset first --prometheus
当消费者到达流的末尾时,输出应该会在几秒钟后停止。不要停止应用程序,打开另一个终端标签页,查询 Stream PerfTest 指标以查看它读取了多少数据。
curl --silent localhost:8080/metrics | grep rabbitmq_stream_read_bytes_total
你应该会得到类似下面的结果。
# HELP rabbitmq_stream_read_bytes_total
# TYPE rabbitmq_stream_read_bytes_total counter
rabbitmq_stream_read_bytes_total 1.0046894E7
这大约是 10 MB。客户端不得不传输整个流。
现在停止 Stream PerfTest (Ctrl-C) 并重新启动它,这次开启过滤功能。
java -jar stream-perf-test.jar --producers 0 --consumers 1 --offset first --prometheus --filter-values 5
在这里,我们要求仅获取带有 "5" 过滤值的消息 (--filter-values 5)。同样,等待输出停止并检查读取的字节数。
curl --silent localhost:8080/metrics | grep rabbitmq_stream_read_bytes_total
你应该会得到类似的结果。
# HELP rabbitmq_stream_read_bytes_total
# TYPE rabbitmq_stream_read_bytes_total counter
rabbitmq_stream_read_bytes_total 1957641.0
这不到 2 MB。节省了 8 MB 的带宽,大约 80%,还不错!
当然,这在某种程度上是人工模拟的:Stream PerfTest 并不是一个真实的应用程序,它分发消息和过滤值的方式可能与实际应用不同。但即便如此,它也让我们对流过滤带来的带宽节省有了一个直观的认识。
总结
我们简要概述了 RabbitMQ 3.13 中的流过滤功能。当消息从代理分发到消费应用程序时,它可以节省带宽。并非所有场景都能从流过滤中获益,但对于那些能受益的场景,带宽节省的效果是非常可观的。
请持续关注后续发布的博客文章,我们将涵盖流过滤的内部细节,帮助你以最优方式使用和配置它。
