流过滤内部原理
一篇之前的文章介绍了流过滤,这是 RabbitMQ 3.13 中一个令人兴奋的新功能。在这篇文章中,我们将介绍流过滤的内部原理。了解其设计和实现将帮助您以最适合您用例的方式配置和使用流过滤。
概念
流过滤的想法是在代理端提供第一层高效的过滤,而无需代理解释消息。 这样,只需要流的一部分的消费者无需获取所有数据并自行处理所有过滤。 这可以大大减少传输给消费者的数据量。
通过过滤,可以将过滤器值与每条消息关联起来。 它可以是地理信息,例如每条消息来自的世界区域,如下图所示
所以我们的流有 1 条 AMER(绿色)消息,1 条 APAC(深蓝色)消息,2 条 EMEA(紫色)消息,然后是 2 条 AMER 消息。
消息发布
发布者可以将每个出站消息与其过滤器值关联起来
在上图中,发布者发布了 1 条 AMER(绿色)消息和 2 条 EMEA(紫色)消息,这些消息将被添加到流中。
消息消费
当消费者订阅时,它可以指定一个或多个过滤器值,并且期望代理仅分发具有此过滤器值或这些过滤器值的消息。 我们很快就会看到这在实践中有点不同,但这足以理解这些概念。
在下图中,顶部的消费者指定它只想要 AMER(绿色)消息,并且代理只分发这些消息。 中间的消费者使用 EMEA 消息,底部的消费者使用 APAC 消息也是如此。
概念就到这里,现在让我们了解一下实现细节。
流的结构
我们需要了解流是如何构造的,才能理解流过滤的内部原理。 流是一个目录,其中包含段文件。 每个段文件都有一个关联的索引文件(用于知道在段文件中的给定偏移量处附加消费者的位置等)。 拥有多个“小”段文件比为整个流拥有一个大型单体文件更好:例如,删除“旧”段文件以截断流比删除大型文件的开头更有效和更安全。
段文件由包含消息的块组成。 块中消息的数量取决于入口速率(高入口速率意味着一个块中有许多消息,低入口速率意味着一个块中的消息较少)。 一个块中的消息数量从几个(甚至 1 个)到几千个不等。
块有什么作用? 块是流中的工作单元:它们用于复制,更重要的是对于我们的主题,用于消费者交付。 代理一次一个地将块发送给消费者,使用 sendfile
系统调用(将整个块从文件系统发送到网络套接字,而无需将数据复制到用户空间)。
下图说明了流的结构
有了这些基础知识,让我们看看代理如何知道是否要分发块。
代理端过滤
假设我们有一个只想要 AMER(绿色)消息的消费者。 当代理即将分发一个块时,它需要知道该块是否包含 AMER 消息。 如果包含,它可以将该块发送给消费者;如果不包含,代理可以跳过该块,移动到下一个块,然后重新迭代。
每个块都有一个标头,其中可以包含一个 Bloom 过滤器,该过滤器告诉代理该块是否包含具有给定过滤器值的消息。 Bloom 过滤器 是一种节省空间的概率数据结构,用于测试元素是否是集合的成员。 在我们的示例中,集合包含 AMER、EMEA 和 APAC,元素是 AMER。
下图说明了我们 3 个块的代理端过滤过程
如上图所示,过滤器可能会返回误报,即不包含具有预期过滤器值的消息的块。 这是正常的,因为 Bloom 过滤器是概率性的。 但是,它们不会返回假阴性:如果过滤器说没有 AMER(绿色)消息,我们可以确定它是真的。 我们必须接受这种不确定性:有时我们可能会无缘无故地分发一些块,但这比分发所有块要好。
可以肯定的是,消费者可能会收到它不想要的消息:看看我们左边的第一个块,它包含消费者要求的 AMER(绿色)消息,但也包含 EMEA(紫色)和 APAC(深蓝色)消息。 这就是为什么也必须在客户端进行过滤的原因。
客户端过滤
代理在传递消息时处理第一级过滤,但是由于传递单元是块,因此消费者仍然可能收到它不想要的消息。 因此,客户端也必须进行一些过滤,这显然必须与订阅时设置的过滤器值一致。
下图说明了一个只想要 AMER(绿色)消息并且必须执行最后一步过滤的消费者
让我们看看这如何转化为应用程序代码。
API 示例
过滤是非侵入性的,可以作为横切关注点来处理,从而最大限度地减少对应用程序代码的影响。 以下是如何在使用 stream Java 客户端 (filterValue(Function<Message,String>)
方法) 声明生产者时,设置从消息中提取过滤器值的逻辑
Producer producer = environment.producerBuilder()
.stream("invoices")
.filterValue(msg -> msg.getApplicationProperties().get("region").toString())
.build();
在消费端,stream Java 客户端提供了 filter().values(String... filterValues)
方法来设置过滤器值,以及 filter().postFilter(Predicate<Message> filter)
方法来设置客户端过滤逻辑。 这两种方法都必须在声明消费者时调用
Consumer consumer = environment.consumerBuilder()
.stream("invoices")
.filter()
.values("AMER")
.postFilter(msg -> "AMER".equals(msg.getApplicationProperties().get("region")))
.builder()
.messageHandler((ctx, msg) -> {
// message processing code
})
.build();
如您所见,过滤不会更改发布和消费代码,只会更改生产者和消费者的声明。
现在让我们看看如何以最适合用例的方式配置流过滤。
流过滤配置
关于流过滤的第一篇文章提供了一些数字(与不进行过滤相比,过滤大约节省 80% 的带宽)。 流过滤的好处在很大程度上取决于用例:入口速率、过滤器值的基数和分布,以及过滤器大小。 过滤器越大越好(错误率越小)。 可以为块中使用的过滤器大小设置 16 到 255 字节之间的值,默认值为 16 字节。
stream Java 客户端提供了 filterSize(int)
方法,用于在创建流时设置过滤器大小(它在内部设置 stream-filter-size-bytes
参数)
environment.streamCreator()
.stream("invoices")
.filterSize(32)
.create()
如何估计过滤器的大小? 有许多在线 Bloom 过滤器计算器可用。 参数是哈希函数的数量(RabbitMQ 流过滤为 2)、预期元素的数量、错误率和大小。 您通常对元素的数量有一个大致的了解,因此您需要在错误率和过滤器大小之间找到一个权衡。
这里有一些例子
- 10 个值,16 字节 => 2% 错误率
- 30 个值,16 字节 => 14% 错误率
- 200 个值,128 字节 => 10% 错误率
那么,过滤器越大越好吗? 不完全是:即使 Bloom 过滤器在存储方面非常高效,因为它不存储元素,只存储元素是否在集合中,但过滤器大小是预先分配的。 如果您将过滤器大小设置为 255,并且每个块都包含至少一条带有过滤器值的消息,则每个块标头中将分配 255 字节。 如果块包含许多大型消息,这很好,因为过滤器大小与块大小相比可以忽略不计。 但是,对于像包含 10 字节消息和 10 字节过滤器值的单消息块这样的退化情况,您最终会得到一个比实际数据更大的过滤器。
您需要根据自己的用例进行实验,以估计过滤器大小对流大小的影响。 关于流过滤的第一篇文章提供了一个技巧,可以使用 Stream PerfTest 估算流的大小(读取整个流而不进行过滤,并查阅 rabbitmq_stream_read_bytes_total
指标)。
奖励:AMQP 上的流过滤
即使访问流的首选方式是流协议,但也支持其他协议,例如 AMQP。 任何 AMQP 客户端库也支持流过滤
- 声明:将
x-queue-type
参数设置为stream
,并在声明流时使用x-stream-filter-size-bytes
设置过滤器大小。 - 发布:使用
x-stream-filter-value
标头为出站消息设置过滤器值。 - 消费:使用
x-stream-filter
消费者参数设置预期的过滤器值(字符串或字符串数组),并可选择使用x-stream-match-unfiltered
消费者参数也接收没有任何过滤器值的消息(默认为false
)。 客户端过滤仍然是必要的!
总结
这篇博文深入描述了 RabbitMQ 3.13 中的流过滤。 它补充了 第一篇文章,该文章介绍了流过滤的用法和演示。
流过滤易于使用且易于从中受益,但了解一些内部原理对于优化其使用非常有用,尤其是在棘手的用例中。 请记住,客户端过滤是必要的,并且必须与配置的过滤器值保持一致。 这通常很容易实现。 也可以为给定的用例以最合适的方式设置过滤器大小。