RabbitMQ 3.11 功能预览:超级流
RabbitMQ 3.11 将带来一个拥有史上最酷名称的功能:超级流。超级流是一种通过将大型流划分为较小的流来扩展的方法。它们与 单一活动消费者 集成以在分区内保持消息顺序。
本文概述了超级流及其解锁的用例。继续阅读以了解更多信息,我们重视 您的反馈,以便使此功能尽善尽美。
概述
超级流是由单个常规 流 组成的逻辑流。它是一种通过 RabbitMQ Streams 扩展发布和消费 的方式:一个大型逻辑流被划分为分区流,将存储和流量分散到多个集群节点上。
超级流仍然是一个逻辑实体:应用程序将其视为一个“大型”流,这得益于客户端库的智能设计。超级流的拓扑结构基于 AMQP 0.9.1 模型,即交换机、队列以及它们之间的绑定。
下图显示了一个由 3 个分区组成的 invoices
超级流。它由一个与 3 个流绑定的 invoices
交换机定义。
消息不会通过交换机,而是直接发送到分区流。但 客户端库 使用拓扑信息来确定将消息路由到哪里以及从哪里消费。
让我们谈谈大家最关心的问题:它与 Kafka 相比如何?我们可以将超级流与 Kafka 主题进行比较,将流与 Kafka 主题的一个分区进行比较。但是,RabbitMQ 流是一个独立的、具有唯一名称的一级对象,而 Kafka 分区是 Kafka 主题的一个子对象。这个解释遗漏了很多细节,没有真正的 1 对 1 映射,但对于本文的论点来说已经足够准确了。
超级流还利用 单一活动消费者 在消费者处理期间在分区内保持消息顺序。有关此方面的更多信息,请参见下文。
请不要误会,超级流并没有使单个流过时,也没有使其变得毫无用处。它们也不是流 2.0 版,因为我们一开始就把流做错了。超级流位于流之上,它们带来了自己的特性和功能集。您可以继续在适合的地方使用单个流,并探索超级流以应对更苛刻的用例。
发布到超级流
应用程序发布到超级流的消息必须发送到其中一个分区。应用程序可以根据客户端库的帮助选择分区,代理不会处理路由。这样可以提高灵活性,并避免服务器端的瓶颈。
应用程序必须提供至少一个信息:从消息中提取的路由键。以下代码段展示了应用程序如何使用流 Java 客户端库提供此代码
Producer producer = environment.producerBuilder()
.superStream("invoices") // set the super stream name
.routing(message -> message.getProperties().getMessageIdAsString()) // extract routing key
.producerBuilder()
.build();
producer.send(...);
发布与 普通流 相同,只是生产者配置略有不同。发布到流或超级流不会对应用程序的代码造成太大影响。
消息将发送到哪里?在本例中,库使用 MurmurHash3 对路由键进行哈希来选择流分区。此哈希函数提供了良好的均匀性、性能和可移植性,使其成为一个不错的默认选择。在本例中,路由键是一个发票 ID,因此发票应均匀分布在所有分区中。如果路由键是客户 ID,那么我们就能保证给定客户的所有发票都位于同一个分区上。以下是一个可以利用此优势的用例:使用适当的 基于时间戳的偏移量规范,对给定时间段内的每个客户进行报告。
谈到处理,让我们看看如何从超级流中消费。
从超级流中消费
超级流的分区是普通流,因此应用程序可以照常从这些分区中消费。但这意味着需要了解超级流的拓扑结构,而我们把它卖成一个逻辑实体,应用程序将其视为一个独立的流。幸运的是,客户端库可以做到这一点,并且在代理的帮助下,对应用程序来说是完全透明的。
客户端库可以实现一个 众所周知的模式,并提供一个复合消费者,该消费者可以同时从超级流的所有分区中消费
以这种方式实现的复合消费者有一些局限性:如果您启动同一应用程序的多个实例以实现冗余和可扩展性,它们将消费相同的消息,从而导致处理重复。所有这些都需要协调,幸运的是,我们可以将 单一活动消费者 的语义应用于我们的超级流复合消费者。
现在,在启用单一活动消费者后,我们的复合消费者的实例将与代理进行协调,以确保在给定分区上一次只有一个消费者处于活动状态。
好消息是,所有这些都是实现细节。应用程序实例可以启动和停止,消费者将根据单一活动消费者的语义被激活或停用。
它如何转化为代码?以下是一个使用流 Java 客户端的示例
Consumer consumer = environment.consumerBuilder()
.superStream("invoices") // set the super stream name
.name("application-1") // set the consumer name (mandatory)
.singleActiveConsumer() // enable single active consumer
.messageHandler((context, message) -> {
// message processing
})
.build();
这与 普通流的消费者 类似,只是配置发生了变化,更重要的是,消息处理代码保持不变。
总结
本文介绍了超级流,这是即将发布的 RabbitMQ 3.11 版本中的一个新功能。超级流是分区流,它们为 RabbitMQ Stream 带来了可扩展性。与 单一活动消费者 相结合,它们提供了在分区内按发布顺序处理消息的保证。
本文附带了一个 配套示例项目,该项目提供了一个逐步演示,以说明所涵盖的功能。请随时查看它!
我们很高兴与 RabbitMQ 社区分享这一新功能,我们迫不及待地想在今年晚些时候 RabbitMQ 3.11 正式发布之前听到 您的反馈。