跳至主内容

RabbitMQ 3.11 功能预览:Stream 单活动消费者

·10 分钟阅读

RabbitMQ 3.11 将为 Stream 带来一个值得关注的功能:单活动消费者。单活动消费者为 Stream 提供独占消费消费连续性。它对于充分利用我们用于分区的解决方案Super Streams 也至关重要,该解决方案为 Stream 提供可扩展性。

请继续阅读以了解有关 Stream 单活动消费者的更多信息,并且不要犹豫尝试已经可用的功能:试用,破坏它,告诉我们您喜欢和不喜欢什么,还缺少什么。您的反馈对于使此功能尽善尽美至关重要。

概述

那么,排他消费消费连续性,这些意味着什么?想象一下,您希望确保流处理不会停止,即使处理此任务的应用程序实例不断启动和停止。您可以启动同一个处理应用程序的多个实例,并在流消费者上启用“单一活跃消费者”标志。代理(Broker)将确保在任何给定时间只有一个实例接收消息:即活跃实例。

Only one instance receives messages in a group of consumers when single active consumer is enabled.
启用“单一活跃消费者”后,消费者组中只有一个实例会接收消息。

当活跃消费者离开(关闭或崩溃)时,代理会自动切换到队列中的下一个实例,并开始向其分发消息。

The broker dispatches messages to the next consumer when the active one goes away.
当活跃消费者离开时,代理会将消息分发给下一个消费者。

因此,开启“单一活跃消费者”后,您不必担心多个实例并行执行相同的处理,代理会处理好一切。

请注意,在给定的流上可以有任意数量的“单一活跃消费者组”:应用程序必须为消费者提供一个名称,该名称充当消费者组的标识符。单一活跃消费者语义将适用于共享同一名称的消费者实例。

让我们通过一个演示来看看流的单一活跃消费者是如何工作的。

流的单一活跃消费者实战

我们将启动 3 个消费者来模拟同一个应用程序的实例。我们将看到只有一个消费者实例被激活并接收消息,另外 2 个实例将保持空闲。我们将关闭活跃消费者的连接,并观察另一个实例如何接管工作。

为了简单起见,我们将在同一个进程中启动这 3 个消费者。以下是使用流 Java 客户端定义消费者的代码

System.out.println("Starting consumer instance " + i);
Consumer consumer = environment.consumerBuilder()
.stream(stream)
.name(reference)
.singleActiveConsumer()
.autoTrackingStrategy()
.messageCountBeforeStorage(10)
.builder()
.messageHandler((context, message) -> {
System.out.printf(
"Consumer instance %d received a message (%d).%n",
i, sequence.incrementAndGet()
);
})
.build();

声明单一活跃消费者的代码与普通消费者几乎相同:您必须提供一个名称并使用 ConsumerBuilder#singleActiveConsumer()。变量 isequence 只是为了帮助理解正在发生的事情。

如果您想在阅读时运行代码,可以继续阅读下一节。请注意,您可以在不运行任何内容的情况下阅读文章的其余部分,因此如果您不想尝试代码,可以跳过下一节。

设置示例项目

运行示例需要安装 Docker、Git 以及 Java 11 或更高版本。如果本地已有 Docker 镜像,请将其移除,以确保稍后拉取最新的镜像

docker rmi pivotalrabbitmq/rabbitmq-stream

然后启动代理

docker run -it --rm --name rabbitmq -p 5552:5552 -p 5672:5672 -p 15672:15672 \
-e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS='-rabbitmq_stream advertised_host localhost' \
pivotalrabbitmq/rabbitmq-stream

该容器开始监听多个端口:5552 用于流插件,5672 用于 AMQP,15672 用于 HTTP 管理插件。

示例项目代码托管在 GitHub 上。在本地克隆项目并启动模拟 3 个消费者实例的应用程序

cd /tmp
git clone https://github.com/acogoluegnes/rabbitmq-stream-single-active-consumer.git
cd rabbitmq-stream-single-active-consumer

设置已完成,让我们启动单一活跃消费者。

启动消费者

使用以下命令启动消费者实例

./mvnw -q compile exec:java -Dexec.mainClass=com.rabbitmq.stream.SingleActiveConsumer

应用程序确认它已注册 3 个消费者

Created stream single-active-consumer
Starting consumer instance 0
Starting consumer instance 1
Starting consumer instance 2

在真实的系统中,这些消费者将运行在不同的主机上。但在同一个进程中运行它们对于我们的演示来说更简单,同样也能说明问题。

一切准备就绪,让我们向流发布消息。

发布消息

我们运行了一个应用程序,它注册了 3 个相同的消费者实例,并启用了单一活跃消费者功能。我们正在使用 流性能工具 每秒发布一条消息

cd /tmp
wget -O stream-perf-test.jar \
https://github.com/rabbitmq/rabbitmq-java-tools-binaries-dev/releases/download/v-stream-perf-test-latest/stream-perf-test-latest.jar
java -jar /tmp/stream-perf-test.jar --rate 1 -x 1 -y 0 --streams single-active-consumer

性能工具开始发布(它会发出关于流创建的警告,这是预期之中的)

Starting producer
1, published 0 msg/s, confirmed 0 msg/s, consumed 0 msg/s, latency min/median/75th/95th/99th 0/0/0/0/0 ms, chunk size 0
2, published 1 msg/s, confirmed 1 msg/s, consumed 0 msg/s, latency min/median/75th/95th/99th 0/0/0/0/0 ms, chunk size 0
3, published 1 msg/s, confirmed 1 msg/s, consumed 0 msg/s, latency min/median/75th/95th/99th 0/0/0/0/0 ms, chunk size 0
4, published 1 msg/s, confirmed 1 msg/s, consumed 0 msg/s, latency min/median/75th/95th/99th 0/0/0/0/0 ms, chunk size 0
5, published 1 msg/s, confirmed 1 msg/s, consumed 0 msg/s, latency min/median/75th/95th/99th 0/0/0/0/0 ms, chunk size 0
...

第一个消费者实例从流中获取消息

Created stream single-active-consumer
Starting consumer 0
Starting consumer 1
Starting consumer 2
Consumer instance 0 received a message (1).
Consumer instance 0 received a message (2).
Consumer instance 0 received a message (3).
Consumer instance 0 received a message (4).
...

如果没有启用单一活跃消费者,每个消费者实例都会获取流中的消息,您会看到每条消息有 3 行输出。因此,我们的应用程序正如预期那样工作,只有一个消费者处于活跃状态。让我们看看能否了解更多关于消费者的信息……

检查消费者

list_stream_consumers CLI 命令列出了虚拟主机中所有流的消费者

docker exec rabbitmq rabbitmqctl list_stream_consumers \
connection_pid,stream,messages_consumed,active,activity_status

我们可以看到消费者及其各自的状态

Listing stream consumers ...
┌────────────────┬────────────────────────┬───────────────────┬────────┬─────────────────┐
│ connection_pid │ stream │ messages_consumed │ active │ activity_status │
├────────────────┼────────────────────────┼───────────────────┼────────┼─────────────────┤
│ <11771.870.0> │ single-active-consumer │ 259 │ true │ single_active │
├────────────────┼────────────────────────┼───────────────────┼────────┼─────────────────┤
│ <11771.882.0> │ single-active-consumer │ 0 │ false │ waiting │
├────────────────┼────────────────────────┼───────────────────┼────────┼─────────────────┤
│ <11771.894.0> │ single-active-consumer │ 0 │ false │ waiting │
└────────────────┴────────────────────────┴───────────────────┴────────┴─────────────────┘

想象一下,我们有很多流,每个流都有许多消费者组。我们需要一个简洁的视图。让我们使用 list_stream_consumer_groups 命令

docker exec rabbitmq rabbitmqctl list_stream_consumer_groups stream,reference,consumers

我们的 my-app 组显示出来了,并带有已注册的消费者数量

Listing stream consumer groups ...
┌────────────────────────┬───────────┬───────────┐
│ stream │ reference │ consumers │
├────────────────────────┼───────────┼───────────┤
│ single-active-consumer │ my-app │ 3 │
└────────────────────────┴───────────┴───────────┘

我们可以使用 list_stream_group_consumers 命令深入了解我们的组

docker exec rabbitmq rabbitmqctl list_stream_group_consumers --stream single-active-consumer --reference my-app

该组的消费者显示出来了

Listing group consumers ...
┌─────────────────┬─────────────────────────────────────┬──────────┐
│ subscription_id │ connection_name │ state │
├─────────────────┼─────────────────────────────────────┼──────────┤
│ 0 │ 172.17.0.1:58370 -> 172.17.0.2:5552 │ active │
├─────────────────┼─────────────────────────────────────┼──────────┤
│ 0 │ 172.17.0.1:58376 -> 172.17.0.2:5552 │ inactive │
├─────────────────┼─────────────────────────────────────┼──────────┤
│ 0 │ 172.17.0.1:58380 -> 172.17.0.2:5552 │ inactive │
└─────────────────┴─────────────────────────────────────┴──────────┘

太好了,我们有 CLI 命令来深入了解启用了单一活跃消费者标志的消费者。

查看管理界面

除了 CLI,我们还有更直观的图形化界面,即 管理界面中的流页面 (用户名/密码: guest/guest)

The stream page in the management UI lists the consumers with their status.
管理界面中的流页面列出了消费者及其状态。

消费者的 channel 列显示的并不是严格意义上的 AMQP 通道:我们的示例使用的是流协议,其中不存在通道概念。管理界面会自动适应并显示消费者的连接。如果我们点击活跃消费者通道/连接的链接,可以获得更多详细信息

The connection page shows details about the active consumer.
连接页面显示了有关活跃消费者的详细信息。

现在是时候体验单一活跃消费者语义了。

终止活跃消费者

让我们通过连接页面底部的 Force Close(强制关闭)按钮来关闭活跃消费者的连接。

我们从消费应用程序的控制台输出中看到,活跃消费者已经离开,下一个消费者接管了工作

...
Consumer instance 0 received a message (130).
Consumer instance 0 received a message (131).
Consumer instance 0 received a message (132).
Consumer instance 1 received a message (133). <---- Instance #1 got activated
Consumer instance 1 received a message (134).
Consumer instance 1 received a message (135).
...

故障转移如预期般工作:活跃消费者死亡,队列中的下一个被激活。在现实世界中,活跃消费者实例可能在 OS 升级后重新启动,或者其连接可能在网络故障后关闭。在这个示例中,我们只是故意关闭了连接。

让我们列出组中的消费者来确认我们的发现

docker exec rabbitmq rabbitmqctl list_stream_group_consumers --stream single-active-consumer --reference my-app

我们仍然有 3 个消费者,因为客户端库自动恢复了已关闭的连接

Listing group consumers ...
┌─────────────────┬─────────────────────────────────────┬──────────┐
│ subscription_id │ connection_name │ state │
├─────────────────┼─────────────────────────────────────┼──────────┤
│ 0 │ 172.17.0.1:58376 -> 172.17.0.2:5552 │ active │
├─────────────────┼─────────────────────────────────────┼──────────┤
│ 0 │ 172.17.0.1:58380 -> 172.17.0.2:5552 │ inactive │
├─────────────────┼─────────────────────────────────────┼──────────┤
│ 0 │ 172.17.0.1:58388 -> 172.17.0.2:5552 │ inactive │
└─────────────────┴─────────────────────────────────────┴──────────┘

来自 172.17.0.1:58376 -> 172.17.0.2:5552 连接的消费者现在是活跃的。在我们在关闭第一个活跃消费者之前,它是队列中的第二个,现在它被提升了。172.17.0.1:58388 -> 172.17.0.2:5552 连接是客户端库恢复的连接,它现在位于列表的最下方。

演示结束了,您现在可以使用 Ctrl-C 关闭正在运行的程序:stream-perf-test、消费者以及 Docker 容器代理。

总结

在本文中,我们介绍了流的单一活跃消费者,这是即将发布的 RabbitMQ 3.11 版本中的一项新功能。它允许将多个消费者实例附加到同一个流,并确保在同一时间只有一个实例处于活跃状态。如果活跃消费者实例宕机,下一个实例将自动接管,从而以最小的干扰持续进行处理。

欢迎运行演示源代码是公开的。您也可以自行尝试,流 Java 客户端对单一活跃消费者的支持已有相关文档,并且可用的持续构建 Docker 镜像 也可以获取。我们很高兴与 RabbitMQ 社区分享这一新功能,并且期待在 RabbitMQ 3.11 于今年晚些时候正式发布 (GA) 之前听到大家的反馈

我们将很快跟进另一篇关于 RabbitMQ 3.11 功能的博客文章:超级流 (Super Streams),这是我们扩展流的解决方案。敬请期待!

© . This site is unofficial and not affiliated with VMware.