跳至主要内容

RabbitMQ 3.11 功能预览:流的单个活动消费者

·阅读时间:10 分钟

RabbitMQ 3.11 将为流带来一项值得注意的功能:单个活动消费者。单个活动消费者为流提供了独占消费消费连续性。它对于充分利用 超级流(我们用于分区,以提供流的可扩展性的解决方案)也至关重要。

继续阅读以了解更多关于流的单个活动消费者的信息,并不要犹豫,体验一下现有的功能:试试,打破它,告诉我们你喜欢什么,不喜欢什么,缺少什么。您的反馈对于使此功能发挥最佳效果至关重要。

概述

那么,独占消费消费连续性,这意味着什么呢?想象一下,您希望确保您的流处理不会停止,即使处理此处理的应用程序实例来来往往。您启动了同一处理应用程序的多个实例,并在您的流消费者上启用了单个活动消费者标志。代理将确保一次只有一个实例获取消息:活动实例。

Only one instance receives messages in a group of consumers when single active consumer is enabled.
启用单个活动消费者后,消费者组中只有一个实例会收到消息。

当活动消费者消失(关闭或崩溃)时,代理将自动回退到下一个实例并开始向其分发消息。

The broker dispatches messages to the next consumer when the active one goes away.
当活动消费者消失时,代理会将消息分发给下一个消费者。

因此,在启用单个活动消费者的情况下,您不必担心有多个实例并行执行相同的处理,代理会处理一切。

请注意,在一个给定的流上可以有任意多个单个活动消费者组:应用程序必须为消费者提供一个名称,该名称充当消费者组的标识符。单个活动消费者语义将应用于共享相同名称的消费者实例。

让我们通过一个演示来了解流的单个活动消费者如何工作。

流的单个活动消费者在实践中

我们将启动 3 个消费者,模拟相同应用程序的实例。我们将看到,只有一个消费者实例被激活并接收消息,另外 2 个实例将保持空闲状态。我们将关闭活动消费者的连接,并观察另一个实例接管。

为了简单起见,我们将在这同一个进程中启动 3 个消费者。以下是使用流 Java 客户端定义消费者的代码

System.out.println("Starting consumer instance " + i);
Consumer consumer = environment.consumerBuilder()
.stream(stream)
.name(reference)
.singleActiveConsumer()
.autoTrackingStrategy()
.messageCountBeforeStorage(10)
.builder()
.messageHandler((context, message) -> {
System.out.printf(
"Consumer instance %d received a message (%d).%n",
i, sequence.incrementAndGet()
);
})
.build();

声明单个活动消费者的代码与常规消费者的代码几乎相同:您必须提供一个名称并使用 ConsumerBuilder#singleActiveConsumer()isequence 变量只是为了帮助您了解发生了什么。

如果您想在阅读时运行代码,您可以继续下一节。请注意,您可以按照本文的其余部分进行操作,而无需运行任何内容,因此如果您不想尝试代码,可以跳过下一节。

设置示例项目

运行示例需要安装 Docker、Git 和 Java 11 或更高版本。如果本地已经存在 Docker 镜像,请将其删除,以确保稍后拉取最新镜像

docker rmi pivotalrabbitmq/rabbitmq-stream

然后启动代理

docker run -it --rm --name rabbitmq -p 5552:5552 -p 5672:5672 -p 15672:15672 \
-e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS='-rabbitmq_stream advertised_host localhost' \
pivotalrabbitmq/rabbitmq-stream

容器开始在多个端口上监听:5552 用于流插件,5672 用于 AMQP,15672 用于 HTTP 管理插件。

示例项目代码托管在 GitHub 上。在本地克隆项目并启动模拟 3 个消费者实例的应用程序

cd /tmp
git clone https://github.com/acogoluegnes/rabbitmq-stream-single-active-consumer.git
cd rabbitmq-stream-single-active-consumer

设置已完成,让我们启动我们的单个活动消费者。

启动消费者

使用以下命令启动消费者实例

./mvnw -q compile exec:java -Dexec.mainClass=com.rabbitmq.stream.SingleActiveConsumer

应用程序确认它已注册 3 个消费者

Created stream single-active-consumer
Starting consumer instance 0
Starting consumer instance 1
Starting consumer instance 2

这些消费者在真实的系统中将在不同的主机上运行。但是,在同一个进程中运行它们对于我们的演示来说更简单,并且也证明了我们的观点。

一切都已准备就绪,让我们发布到流中。

发布消息

我们有一个正在运行的应用程序,它注册了 3 个相同的消费者实例,并启用了单个活动消费者。我们正在使用 流性能工具 每秒发布一条消息

cd /tmp
wget -O stream-perf-test.jar \
https://github.com/rabbitmq/rabbitmq-java-tools-binaries-dev/releases/download/v-stream-perf-test-latest/stream-perf-test-latest.jar
java -jar /tmp/stream-perf-test.jar --rate 1 -x 1 -y 0 --streams single-active-consumer

性能工具开始发布(它会发出有关流创建的警告,这是预期的)

Starting producer
1, published 0 msg/s, confirmed 0 msg/s, consumed 0 msg/s, latency min/median/75th/95th/99th 0/0/0/0/0 ms, chunk size 0
2, published 1 msg/s, confirmed 1 msg/s, consumed 0 msg/s, latency min/median/75th/95th/99th 0/0/0/0/0 ms, chunk size 0
3, published 1 msg/s, confirmed 1 msg/s, consumed 0 msg/s, latency min/median/75th/95th/99th 0/0/0/0/0 ms, chunk size 0
4, published 1 msg/s, confirmed 1 msg/s, consumed 0 msg/s, latency min/median/75th/95th/99th 0/0/0/0/0 ms, chunk size 0
5, published 1 msg/s, confirmed 1 msg/s, consumed 0 msg/s, latency min/median/75th/95th/99th 0/0/0/0/0 ms, chunk size 0
...

第一个消费者实例从流中获取消息

Created stream single-active-consumer
Starting consumer 0
Starting consumer 1
Starting consumer 2
Consumer instance 0 received a message (1).
Consumer instance 0 received a message (2).
Consumer instance 0 received a message (3).
Consumer instance 0 received a message (4).
...

如果没有启用单个活动消费者,每个消费者实例都会从流中获取消息,您会看到每条消息 3 行。因此,我们的应用程序按预期工作,只有一个消费者处于活动状态。让我们看看是否可以了解更多关于消费者的信息…

检查消费者

list_stream_consumers CLI 命令列出虚拟主机中所有流的消费者

docker exec rabbitmq rabbitmqctl list_stream_consumers \
connection_pid,stream,messages_consumed,active,activity_status

我们看到了消费者及其各自的状态

Listing stream consumers ...
┌────────────────┬────────────────────────┬───────────────────┬────────┬─────────────────┐
│ connection_pid │ stream │ messages_consumed │ active │ activity_status │
├────────────────┼────────────────────────┼───────────────────┼────────┼─────────────────┤
│ <11771.870.0> │ single-active-consumer │ 259 │ true │ single_active │
├────────────────┼────────────────────────┼───────────────────┼────────┼─────────────────┤
│ <11771.882.0> │ single-active-consumer │ 0 │ false │ waiting │
├────────────────┼────────────────────────┼───────────────────┼────────┼─────────────────┤
│ <11771.894.0> │ single-active-consumer │ 0 │ false │ waiting │
└────────────────┴────────────────────────┴───────────────────┴────────┴─────────────────┘

假设我们有很多流,每个流都有很多消费者组。希望有一个简洁的视图。让我们使用 list_stream_consumer_groups 命令

docker exec rabbitmq rabbitmqctl list_stream_consumer_groups stream,reference,consumers

我们的 my-app 组显示出来,并显示已注册的消费者数量

Listing stream consumer groups ...
┌────────────────────────┬───────────┬───────────┐
│ stream │ reference │ consumers │
├────────────────────────┼───────────┼───────────┤
│ single-active-consumer │ my-app │ 3 │
└────────────────────────┴───────────┴───────────┘

我们可以使用 list_stream_group_consumers 命令深入了解我们的组

docker exec rabbitmq rabbitmqctl list_stream_group_consumers --stream single-active-consumer --reference my-app

并显示该组的消费者

Listing group consumers ...
┌─────────────────┬─────────────────────────────────────┬──────────┐
│ subscription_id │ connection_name │ state │
├─────────────────┼─────────────────────────────────────┼──────────┤
│ 0 │ 172.17.0.1:58370 -> 172.17.0.2:5552 │ active │
├─────────────────┼─────────────────────────────────────┼──────────┤
│ 0 │ 172.17.0.1:58376 -> 172.17.0.2:5552 │ inactive │
├─────────────────┼─────────────────────────────────────┼──────────┤
│ 0 │ 172.17.0.1:58380 -> 172.17.0.2:5552 │ inactive │
└─────────────────┴─────────────────────────────────────┴──────────┘

不错,我们有 CLI 命令可以了解启用了单个活动消费者标志的消费者。

查看管理界面

我们还有比 CLI 更图形化的东西,即管理界面的 流页面(用户名/密码:guest/guest

The stream page in the management UI lists the consumers with their status.
管理界面的流页面列出了消费者及其状态。

消费者的 channel 列并没有完全显示 AMQP 通道:我们的示例使用流协议,其中不存在通道的概念。管理界面进行了调整,显示了消费者的连接。如果我们单击指向活动消费者通道/连接的链接,我们可以获得更多详细信息

The connection page shows details about the active consumer.
连接页面显示了有关活动消费者的详细信息。

现在是时候体验一下单个活动消费者语义了。

终止活动消费者

让我们使用连接页面底部的 强制关闭 按钮关闭活动消费者的连接。

我们从我们的消费应用程序控制台输出中看到,活动消费者消失了,下一个消费者接管了

...
Consumer instance 0 received a message (130).
Consumer instance 0 received a message (131).
Consumer instance 0 received a message (132).
Consumer instance 1 received a message (133). <---- Instance #1 got activated
Consumer instance 1 received a message (134).
Consumer instance 1 received a message (135).
...

故障转移按预期工作:活动消费者死了,下一个消费者被激活。在现实世界中,活动消费者实例可能在操作系统升级后重新启动,或者其连接可能在网络故障后关闭。在本示例中,我们只是故意关闭了连接。

让我们列出该组的消费者以确认我们的发现

docker exec rabbitmq rabbitmqctl list_stream_group_consumers --stream single-active-consumer --reference my-app

我们仍然有 3 个消费者,因为客户端库自动恢复了关闭的连接

Listing group consumers ...
┌─────────────────┬─────────────────────────────────────┬──────────┐
│ subscription_id │ connection_name │ state │
├─────────────────┼─────────────────────────────────────┼──────────┤
│ 0 │ 172.17.0.1:58376 -> 172.17.0.2:5552 │ active │
├─────────────────┼─────────────────────────────────────┼──────────┤
│ 0 │ 172.17.0.1:58380 -> 172.17.0.2:5552 │ inactive │
├─────────────────┼─────────────────────────────────────┼──────────┤
│ 0 │ 172.17.0.1:58388 -> 172.17.0.2:5552 │ inactive │
└─────────────────┴─────────────────────────────────────┴──────────┘

来自 172.17.0.1:58376 -> 172.17.0.2:5552 连接的消费者现在是活动消费者。在我们关闭第一个活动消费者之前,它是第二个消费者,现在被提升了。172.17.0.1:58388 -> 172.17.0.2:5552 连接是客户端库恢复的连接,现在位于列表底部。

演示结束,您现在可以使用 Ctrl-C 关闭正在运行的程序:stream-perf-test、消费者和代理 Docker 容器。

总结

我们在本文中介绍了流的单个活动消费者,这是即将发布的 RabbitMQ 3.11 版本中的一项新功能。它允许将多个消费者实例附加到流中,并一次只使一个实例处于活动状态。如果活动消费者实例停止运行,下一个实例会自动接管,从而尽可能减少处理中断。

不要犹豫,运行演示源代码可用。您也可以自行尝试,流 Java 客户端中对单个活动消费者的支持已 记录,并且可以 使用持续构建的 Docker 镜像。我们很高兴与 RabbitMQ 社区分享这项新功能,并迫不及待地想在今年晚些时候 RabbitMQ 3.11 正式发布之前听到您的反馈

我们将在不久后发布一篇关于另一项 RabbitMQ 3.11 功能的博客文章:超级流,这是我们用于扩展流的解决方案。敬请关注!

© 2024 RabbitMQ. All rights reserved.