跳到主要内容

RabbitMQ 3.11 功能预览: Streams 的单活动消费者

·10 分钟阅读

RabbitMQ 3.11 将为 streams 带来一个值得关注的功能:单活动消费者。单活动消费者在 stream 上提供独占消费消费连续性。对于充分利用 超级流(super streams)(我们的分区解决方案,为 streams 提供可扩展性)而言,它也至关重要。

请继续阅读以了解有关 streams 的单活动消费者的更多信息,并随时试用已提供的功能:尝试一下,破坏它,告诉我们您喜欢和不喜欢什么,缺少什么。您的反馈对于使此功能尽可能完善至关重要。

概述

那么,独占消费消费连续性,这意味着什么? 假设您要确保您的 stream 处理不会停止,即使处理此操作的应用程序实例来来去去。 您启动同一处理应用程序的多个实例,并在您的 stream 消费者上启用单活动消费者标志。 Broker 将确保一次只有一个实例获取消息:活动的实例。

Only one instance receives messages in a group of consumers when single active consumer is enabled.
启用单活动消费者后,在一组消费者中只有一个实例接收消息。

当活动消费者消失(关闭或崩溃)时,broker 将自动回退到队列中的下一个实例并开始向其分发消息。

The broker dispatches messages to the next consumer when the active one goes away.
当活动消费者消失时,broker 将消息分发给下一个消费者。

因此,启用单活动消费者后,您不必担心多个实例并行执行相同的处理,broker 会处理一切。

请注意,在给定的 stream 上可以有任意数量的单活动消费者组:应用程序必须为消费者提供名称,并且此名称充当消费者组的标识符。 单活动消费者语义将应用于共享相同名称的消费者实例。

让我们来看一个演示,了解 streams 的单活动消费者是如何工作的。

Stream 中的单活动消费者操作

我们将启动 3 个模拟同一应用程序实例的消费者。 我们将看到只有一个消费者实例被激活并接收消息,另外 2 个实例将保持空闲。 我们将关闭活动消费者的连接,并看到另一个实例接管。

为了简单起见,我们将在同一进程中启动 3 个消费者。 这是使用 stream Java 客户端的消费者的定义

System.out.println("Starting consumer instance " + i);
Consumer consumer = environment.consumerBuilder()
.stream(stream)
.name(reference)
.singleActiveConsumer()
.autoTrackingStrategy()
.messageCountBeforeStorage(10)
.builder()
.messageHandler((context, message) -> {
System.out.printf(
"Consumer instance %d received a message (%d).%n",
i, sequence.incrementAndGet()
);
})
.build();

声明单活动消费者的代码与常规消费者的代码几乎相同:您必须提供一个名称并使用 ConsumerBuilder#singleActiveConsumer()isequence 变量仅用于帮助理解正在发生的事情。

如果您想在阅读时运行代码,可以继续下一节。 请注意,您可以按照帖子的其余部分进行操作,而无需运行任何内容,因此如果您不想试用代码,则可以跳过下一节。

设置示例项目

运行示例需要安装 Docker、Git 和 Java 11 或更高版本。 如果 Docker 镜像已在本地存在,请删除它,以确保稍后拉取最新的镜像

docker rmi pivotalrabbitmq/rabbitmq-stream

然后启动 broker

docker run -it --rm --name rabbitmq -p 5552:5552 -p 5672:5672 -p 15672:15672 \
-e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS='-rabbitmq_stream advertised_host localhost' \
pivotalrabbitmq/rabbitmq-stream

容器开始监听多个端口:stream 插件的 5552 端口,AMQP 的 5672 端口,HTTP 管理插件的 15672 端口。

示例项目代码 托管在 GitHub 上。 在本地克隆项目并启动模拟 3 个消费者实例的应用程序

cd /tmp
git clone https://github.com/acogoluegnes/rabbitmq-stream-single-active-consumer.git
cd rabbitmq-stream-single-active-consumer

设置完成,让我们启动单活动消费者。

启动消费者

使用以下命令启动消费者实例

./mvnw -q compile exec:java -Dexec.mainClass=com.rabbitmq.stream.SingleActiveConsumer

应用程序确认它注册了 3 个消费者

Created stream single-active-consumer
Starting consumer instance 0
Starting consumer instance 1
Starting consumer instance 2

这些消费者将在真实系统的不同主机上运行。 但是在同一进程中运行它们对于我们的演示来说更简单,并且也能说明我们的观点。

我们都准备好了,让我们发布到 stream。

发布消息

我们有一个应用程序正在运行,它注册了 3 个相同的消费者实例,并启用了单活动消费者。 我们正在使用 stream 性能工具 每秒发布一条消息

cd /tmp
wget -O stream-perf-test.jar \
https://github.com/rabbitmq/rabbitmq-java-tools-binaries-dev/releases/download/v-stream-perf-test-latest/stream-perf-test-latest.jar
java -jar /tmp/stream-perf-test.jar --rate 1 -x 1 -y 0 --streams single-active-consumer

性能工具开始发布(它发出关于 stream 创建的警告,这是预期的)

Starting producer
1, published 0 msg/s, confirmed 0 msg/s, consumed 0 msg/s, latency min/median/75th/95th/99th 0/0/0/0/0 ms, chunk size 0
2, published 1 msg/s, confirmed 1 msg/s, consumed 0 msg/s, latency min/median/75th/95th/99th 0/0/0/0/0 ms, chunk size 0
3, published 1 msg/s, confirmed 1 msg/s, consumed 0 msg/s, latency min/median/75th/95th/99th 0/0/0/0/0 ms, chunk size 0
4, published 1 msg/s, confirmed 1 msg/s, consumed 0 msg/s, latency min/median/75th/95th/99th 0/0/0/0/0 ms, chunk size 0
5, published 1 msg/s, confirmed 1 msg/s, consumed 0 msg/s, latency min/median/75th/95th/99th 0/0/0/0/0 ms, chunk size 0
...

第一个消费者实例从 stream 获取消息

Created stream single-active-consumer
Starting consumer 0
Starting consumer 1
Starting consumer 2
Consumer instance 0 received a message (1).
Consumer instance 0 received a message (2).
Consumer instance 0 received a message (3).
Consumer instance 0 received a message (4).
...

如果不启用单活动消费者,每个消费者实例都会获取 stream 中的消息,并且您会看到每条消息有 3 行。 因此我们的应用程序按预期工作,只有一个消费者处于活动状态。 让我们看看是否可以了解有关我们消费者的更多信息...

检查消费者

list_stream_consumers CLI 命令列出虚拟主机中所有 stream 的消费者

docker exec rabbitmq rabbitmqctl list_stream_consumers \
connection_pid,stream,messages_consumed,active,activity_status

我们看到我们的消费者及其各自的状态

Listing stream consumers ...
┌────────────────┬────────────────────────┬───────────────────┬────────┬─────────────────┐
│ connection_pid │ stream │ messages_consumed │ active │ activity_status │
├────────────────┼────────────────────────┼───────────────────┼────────┼─────────────────┤
│ <11771.870.0> │ single-active-consumer │ 259 │ true │ single_active │
├────────────────┼────────────────────────┼───────────────────┼────────┼─────────────────┤
│ <11771.882.0> │ single-active-consumer │ 0 │ false │ waiting │
├────────────────┼────────────────────────┼───────────────────┼────────┼─────────────────┤
│ <11771.894.0> │ single-active-consumer │ 0 │ false │ waiting │
└────────────────┴────────────────────────┴───────────────────┴────────┴─────────────────┘

假设我们有很多 stream,每个 stream 都有许多消费者组。 如果能有一个简洁的视图,那就太好了。 让我们使用 list_stream_consumer_groups 命令

docker exec rabbitmq rabbitmqctl list_stream_consumer_groups stream,reference,consumers

我们的 my-app 组显示出来了,其中注册了消费者数量

Listing stream consumer groups ...
┌────────────────────────┬───────────┬───────────┐
│ stream │ reference │ consumers │
├────────────────────────┼───────────┼───────────┤
│ single-active-consumer │ my-app │ 3 │
└────────────────────────┴───────────┴───────────┘

我们可以使用 list_stream_group_consumers 命令深入了解我们的组

docker exec rabbitmq rabbitmqctl list_stream_group_consumers --stream single-active-consumer --reference my-app

并且该组的消费者显示出来

Listing group consumers ...
┌─────────────────┬─────────────────────────────────────┬──────────┐
│ subscription_id │ connection_name │ state │
├─────────────────┼─────────────────────────────────────┼──────────┤
│ 0 │ 172.17.0.1:58370 -> 172.17.0.2:5552 │ active │
├─────────────────┼─────────────────────────────────────┼──────────┤
│ 0 │ 172.17.0.1:58376 -> 172.17.0.2:5552 │ inactive │
├─────────────────┼─────────────────────────────────────┼──────────┤
│ 0 │ 172.17.0.1:58380 -> 172.17.0.2:5552 │ inactive │
└─────────────────┴─────────────────────────────────────┴──────────┘

太好了,我们有 CLI 命令来获取有关启用单活动消费者标志的消费者的一些见解。

查看管理 UI

我们还有比 CLI 更图形化的东西,即管理 UI 中 stream 的页面(用户名/密码:guest/guest

The stream page in the management UI lists the consumers with their status.
管理 UI 中的 stream 页面列出了消费者及其状态。

消费者 channel 列并不完全显示 AMQP 通道:我们的示例使用 stream 协议,其中不存在通道概念。 管理 UI 进行了调整并显示了消费者的连接。 如果我们单击活动消费者的通道/连接的链接,我们可以获得更多详细信息

The connection page shows details about the active consumer.
连接页面显示有关活动消费者的详细信息。

现在是时候体验单活动消费者语义了。

终止活动消费者

让我们使用连接页面底部的 强制关闭 按钮关闭活动消费者的连接。

我们从消费应用程序控制台输出中看到,活动消费者已消失,下一个消费者已接管

...
Consumer instance 0 received a message (130).
Consumer instance 0 received a message (131).
Consumer instance 0 received a message (132).
Consumer instance 1 received a message (133). <---- Instance #1 got activated
Consumer instance 1 received a message (134).
Consumer instance 1 received a message (135).
...

故障转移按预期工作:活动消费者死亡,队列中的下一个消费者被激活。 在现实世界中,活动消费者实例可能在操作系统升级后重新启动,或者其连接可能在网络故障后关闭。 在此示例中,我们只是有目的地关闭了连接。

让我们列出该组的消费者以确认我们的发现

docker exec rabbitmq rabbitmqctl list_stream_group_consumers --stream single-active-consumer --reference my-app

我们仍然有 3 个消费者,因为客户端库自动恢复了关闭的连接

Listing group consumers ...
┌─────────────────┬─────────────────────────────────────┬──────────┐
│ subscription_id │ connection_name │ state │
├─────────────────┼─────────────────────────────────────┼──────────┤
│ 0 │ 172.17.0.1:58376 -> 172.17.0.2:5552 │ active │
├─────────────────┼─────────────────────────────────────┼──────────┤
│ 0 │ 172.17.0.1:58380 -> 172.17.0.2:5552 │ inactive │
├─────────────────┼─────────────────────────────────────┼──────────┤
│ 0 │ 172.17.0.1:58388 -> 172.17.0.2:5552 │ inactive │
└─────────────────┴─────────────────────────────────────┴──────────┘

来自 172.17.0.1:58376 -> 172.17.0.2:5552 连接的消费者现在是活动的。 在我们关闭第一个活动消费者之前,它是队列中的第二个,并且它得到了提升。 172.17.0.1:58388 -> 172.17.0.2:5552 连接是客户端库恢复的连接,现在位于列表的底部。

演示结束了,您现在可以使用 Ctrl-C 关闭正在运行的程序:stream-perf-test、消费者和 broker Docker 容器。

总结

在这篇博文中,我们介绍了 streams 的单活动消费者,这是即将发布的 RabbitMQ 3.11 版本中的一项新功能。 它允许将多个消费者实例附加到一个 stream,并且一次只有一个实例处于活动状态。 如果活动消费者实例关闭,则下一个实例将自动接管,这样处理就可以在尽可能少的中断下继续进行。

请随时运行演示源代码可用。 您也可以自行尝试,stream Java 客户端中对单活动消费者的支持已记录在案,并且持续构建的 Docker 镜像可用。 我们很高兴与 RabbitMQ 社区分享这项新功能,我们迫不及待地想在 RabbitMQ 3.11 今年晚些时候 GA 之前听到一些反馈

我们稍后将发布另一篇关于 RabbitMQ 3.11 功能的博文:超级流(super streams),我们扩展 stream 的解决方案。 敬请关注!

© . All rights reserved.