RabbitMQ 3.11 功能预览:Stream 单活动消费者
RabbitMQ 3.11 将为 Stream 带来一个值得关注的功能:单活动消费者。单活动消费者为 Stream 提供独占消费和消费连续性。它对于充分利用我们用于分区的解决方案Super Streams 也至关重要,该解决方案为 Stream 提供可扩展性。
请继续阅读以了解有关 Stream 单活动消费者的更多信息,并且不要犹豫尝试已经可用的功能:试用,破坏它,告诉我们您喜欢和不喜欢什么,还缺少什么。您的反馈对于使此功能尽善尽美至关重要。
概述
那么,排他消费、消费连续性,这些意味着什么?想象一下,您希望确保流处理不会停止,即使处理此任务的应用程序实例不断启动和停止。您可以启动同一个处理应用程序的多个实例,并在流消费者上启用“单一活跃消费者”标志。代理(Broker)将确保在任何给定时间只有一个实例接收消息:即活跃实例。
当活跃消费者离开(关闭或崩溃)时,代理会自动切换到队列中的下一个实例,并开始向其分发消息。
因此,开启“单一活跃消费者”后,您不必担心多个实例并行执行相同的处理,代理会处理好一切。
请注意,在给定的流上可以有任意数量的“单一活跃消费者组”:应用程序必须为消费者提供一个名称,该名称充当消费者组的标识符。单一活跃消费者语义将适用于共享同一名称的消费者实例。
让我们通过一个演示来看看流的单一活跃消费者是如何工作的。
流的单一活跃消费者实战
我们将启动 3 个消费者来模拟同一个应用程序的实例。我们将看到只有一个消费者实例被激活并接收消息,另外 2 个实例将保持空闲。我们将关闭活跃消费者的连接,并观察另一个实例如何接管工作。
为了简单起见,我们将在同一个进程中启动这 3 个消费者。以下是使用流 Java 客户端定义消费者的代码
System.out.println("Starting consumer instance " + i);
Consumer consumer = environment.consumerBuilder()
.stream(stream)
.name(reference)
.singleActiveConsumer()
.autoTrackingStrategy()
.messageCountBeforeStorage(10)
.builder()
.messageHandler((context, message) -> {
System.out.printf(
"Consumer instance %d received a message (%d).%n",
i, sequence.incrementAndGet()
);
})
.build();
声明单一活跃消费者的代码与普通消费者几乎相同:您必须提供一个名称并使用 ConsumerBuilder#singleActiveConsumer()。变量 i 和 sequence 只是为了帮助理解正在发生的事情。
如果您想在阅读时运行代码,可以继续阅读下一节。请注意,您可以在不运行任何内容的情况下阅读文章的其余部分,因此如果您不想尝试代码,可以跳过下一节。
设置示例项目
运行示例需要安装 Docker、Git 以及 Java 11 或更高版本。如果本地已有 Docker 镜像,请将其移除,以确保稍后拉取最新的镜像
docker rmi pivotalrabbitmq/rabbitmq-stream
然后启动代理
docker run -it --rm --name rabbitmq -p 5552:5552 -p 5672:5672 -p 15672:15672 \
-e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS='-rabbitmq_stream advertised_host localhost' \
pivotalrabbitmq/rabbitmq-stream
该容器开始监听多个端口:5552 用于流插件,5672 用于 AMQP,15672 用于 HTTP 管理插件。
示例项目代码托管在 GitHub 上。在本地克隆项目并启动模拟 3 个消费者实例的应用程序
cd /tmp
git clone https://github.com/acogoluegnes/rabbitmq-stream-single-active-consumer.git
cd rabbitmq-stream-single-active-consumer
设置已完成,让我们启动单一活跃消费者。
启动消费者
使用以下命令启动消费者实例
./mvnw -q compile exec:java -Dexec.mainClass=com.rabbitmq.stream.SingleActiveConsumer
应用程序确认它已注册 3 个消费者
Created stream single-active-consumer
Starting consumer instance 0
Starting consumer instance 1
Starting consumer instance 2
在真实的系统中,这些消费者将运行在不同的主机上。但在同一个进程中运行它们对于我们的演示来说更简单,同样也能说明问题。
一切准备就绪,让我们向流发布消息。
发布消息
我们运行了一个应用程序,它注册了 3 个相同的消费者实例,并启用了单一活跃消费者功能。我们正在使用 流性能工具 每秒发布一条消息
cd /tmp
wget -O stream-perf-test.jar \
https://github.com/rabbitmq/rabbitmq-java-tools-binaries-dev/releases/download/v-stream-perf-test-latest/stream-perf-test-latest.jar
java -jar /tmp/stream-perf-test.jar --rate 1 -x 1 -y 0 --streams single-active-consumer
性能工具开始发布(它会发出关于流创建的警告,这是预期之中的)
Starting producer
1, published 0 msg/s, confirmed 0 msg/s, consumed 0 msg/s, latency min/median/75th/95th/99th 0/0/0/0/0 ms, chunk size 0
2, published 1 msg/s, confirmed 1 msg/s, consumed 0 msg/s, latency min/median/75th/95th/99th 0/0/0/0/0 ms, chunk size 0
3, published 1 msg/s, confirmed 1 msg/s, consumed 0 msg/s, latency min/median/75th/95th/99th 0/0/0/0/0 ms, chunk size 0
4, published 1 msg/s, confirmed 1 msg/s, consumed 0 msg/s, latency min/median/75th/95th/99th 0/0/0/0/0 ms, chunk size 0
5, published 1 msg/s, confirmed 1 msg/s, consumed 0 msg/s, latency min/median/75th/95th/99th 0/0/0/0/0 ms, chunk size 0
...
第一个消费者实例从流中获取消息
Created stream single-active-consumer
Starting consumer 0
Starting consumer 1
Starting consumer 2
Consumer instance 0 received a message (1).
Consumer instance 0 received a message (2).
Consumer instance 0 received a message (3).
Consumer instance 0 received a message (4).
...
如果没有启用单一活跃消费者,每个消费者实例都会获取流中的消息,您会看到每条消息有 3 行输出。因此,我们的应用程序正如预期那样工作,只有一个消费者处于活跃状态。让我们看看能否了解更多关于消费者的信息……
检查消费者
list_stream_consumers CLI 命令列出了虚拟主机中所有流的消费者
docker exec rabbitmq rabbitmqctl list_stream_consumers \
connection_pid,stream,messages_consumed,active,activity_status
我们可以看到消费者及其各自的状态
Listing stream consumers ...
┌────────────────┬────────────────────────┬───────────────────┬────────┬─────────────────┐
│ connection_pid │ stream │ messages_consumed │ active │ activity_status │
├────────────────┼────────────────────────┼───────────────────┼────────┼─────────────────┤
│ <11771.870.0> │ single-active-consumer │ 259 │ true │ single_active │
├────────────────┼────────────────────────┼───────────────────┼────────┼─────────────────┤
│ <11771.882.0> │ single-active-consumer │ 0 │ false │ waiting │
├────────────────┼────────────────────────┼───────────────────┼────────┼─────────────────┤
│ <11771.894.0> │ single-active-consumer │ 0 │ false │ waiting │
└────────────────┴────────────────────────┴───────────────────┴────────┴─────────────────┘
想象一下,我们有很多流,每个流都有许多消费者组。我们需要一个简洁的视图。让我们使用 list_stream_consumer_groups 命令
docker exec rabbitmq rabbitmqctl list_stream_consumer_groups stream,reference,consumers
我们的 my-app 组显示出来了,并带有已注册的消费者数量
Listing stream consumer groups ...
┌────────────────────────┬───────────┬───────────┐
│ stream │ reference │ consumers │
├────────────────────────┼───────────┼───────────┤
│ single-active-consumer │ my-app │ 3 │
└────────────────────────┴───────────┴───────────┘
我们可以使用 list_stream_group_consumers 命令深入了解我们的组
docker exec rabbitmq rabbitmqctl list_stream_group_consumers --stream single-active-consumer --reference my-app
该组的消费者显示出来了
Listing group consumers ...
┌─────────────────┬─────────────────────────────────────┬──────────┐
│ subscription_id │ connection_name │ state │
├─────────────────┼─────────────────────────────────────┼──────────┤
│ 0 │ 172.17.0.1:58370 -> 172.17.0.2:5552 │ active │
├─────────────────┼─────────────────────────────────────┼──────────┤
│ 0 │ 172.17.0.1:58376 -> 172.17.0.2:5552 │ inactive │
├─────────────────┼─────────────────────────────────────┼──────────┤
│ 0 │ 172.17.0.1:58380 -> 172.17.0.2:5552 │ inactive │
└─────────────────┴─────────────────────────────────────┴──────────┘
太好了,我们有 CLI 命令来深入了解启用了单一活跃消费者标志的消费者。
查看管理界面
除了 CLI,我们还有更直观的图形化界面,即 管理界面中的流页面 (用户名/密码: guest/guest)

消费者的 channel 列显示的并不是严格意义上的 AMQP 通道:我们的示例使用的是流协议,其中不存在通道概念。管理界面会自动适应并显示消费者的连接。如果我们点击活跃消费者通道/连接的链接,可以获得更多详细信息

现在是时候体验单一活跃消费者语义了。
终止活跃消费者
让我们通过连接页面底部的 Force Close(强制关闭)按钮来关闭活跃消费者的连接。
我们从消费应用程序的控制台输出中看到,活跃消费者已经离开,下一个消费者接管了工作
...
Consumer instance 0 received a message (130).
Consumer instance 0 received a message (131).
Consumer instance 0 received a message (132).
Consumer instance 1 received a message (133). <---- Instance #1 got activated
Consumer instance 1 received a message (134).
Consumer instance 1 received a message (135).
...
故障转移如预期般工作:活跃消费者死亡,队列中的下一个被激活。在现实世界中,活跃消费者实例可能在 OS 升级后重新启动,或者其连接可能在网络故障后关闭。在这个示例中,我们只是故意关闭了连接。
让我们列出组中的消费者来确认我们的发现
docker exec rabbitmq rabbitmqctl list_stream_group_consumers --stream single-active-consumer --reference my-app
我们仍然有 3 个消费者,因为客户端库自动恢复了已关闭的连接
Listing group consumers ...
┌─────────────────┬─────────────────────────────────────┬──────────┐
│ subscription_id │ connection_name │ state │
├─────────────────┼─────────────────────────────────────┼──────────┤
│ 0 │ 172.17.0.1:58376 -> 172.17.0.2:5552 │ active │
├─────────────────┼─────────────────────────────────────┼──────────┤
│ 0 │ 172.17.0.1:58380 -> 172.17.0.2:5552 │ inactive │
├─────────────────┼─────────────────────────────────────┼──────────┤
│ 0 │ 172.17.0.1:58388 -> 172.17.0.2:5552 │ inactive │
└─────────────────┴─────────────────────────────────────┴──────────┘
来自 172.17.0.1:58376 -> 172.17.0.2:5552 连接的消费者现在是活跃的。在我们在关闭第一个活跃消费者之前,它是队列中的第二个,现在它被提升了。172.17.0.1:58388 -> 172.17.0.2:5552 连接是客户端库恢复的连接,它现在位于列表的最下方。
演示结束了,您现在可以使用 Ctrl-C 关闭正在运行的程序:stream-perf-test、消费者以及 Docker 容器代理。
总结
在本文中,我们介绍了流的单一活跃消费者,这是即将发布的 RabbitMQ 3.11 版本中的一项新功能。它允许将多个消费者实例附加到同一个流,并确保在同一时间只有一个实例处于活跃状态。如果活跃消费者实例宕机,下一个实例将自动接管,从而以最小的干扰持续进行处理。
欢迎运行演示,源代码是公开的。您也可以自行尝试,流 Java 客户端对单一活跃消费者的支持已有相关文档,并且可用的持续构建 Docker 镜像 也可以获取。我们很高兴与 RabbitMQ 社区分享这一新功能,并且期待在 RabbitMQ 3.11 于今年晚些时候正式发布 (GA) 之前听到大家的反馈。
我们将很快跟进另一篇关于 RabbitMQ 3.11 功能的博客文章:超级流 (Super Streams),这是我们扩展流的解决方案。敬请期待!
