跳至主内容

RabbitMQ 3.11 功能预览:Stream 单活动消费者

·10 分钟阅读

RabbitMQ 3.11 将为 Stream 带来一个值得关注的功能:单活动消费者。单活动消费者为 Stream 提供独占消费消费连续性。它对于充分利用我们用于分区的解决方案Super Streams 也至关重要,该解决方案为 Stream 提供可扩展性。

请继续阅读以了解有关 Stream 单活动消费者的更多信息,并且不要犹豫尝试已经可用的功能:试用,破坏它,告诉我们您喜欢和不喜欢什么,还缺少什么。您的反馈对于使此功能尽善尽美至关重要。

概述

那么,独占消费消费连续性,这意味着什么?想象一下,您想确保您的 Stream 处理不会停止,即使处理这些的应用程序实例会时有时无。您可以启动同一处理应用程序的多个实例,并在您的 Stream 消费者上启用单活动消费者标志。代理会确保一次只有一个实例接收消息:活动实例。

Only one instance receives messages in a group of consumers when single active consumer is enabled.
当启用了单活动消费者时,一组消费者中只有一个实例接收消息。

当活动消费者消失(关闭或崩溃)时,代理会自动回退到下一个实例,并开始向其分发消息。

The broker dispatches messages to the next consumer when the active one goes away.
当活动消费者消失时,代理会向下一个消费者分发消息。

因此,启用单活动消费者后,您无需担心有多个实例并行执行相同的处理,代理会负责所有事情。

请注意,给定 Stream 上可以有任意数量的单活动消费者组:应用程序必须为消费者提供一个名称,该名称用作消费者组的标识符。单活动消费者语义将应用于共享相同名称的消费者实例。

让我们通过一个演示来了解 Stream 的单活动消费者是如何工作的。

Stream 单活动消费者实战

我们将启动 3 个模拟同一应用程序实例的消费者。我们将看到只有一个消费者实例获得激活并接收消息,另外 2 个实例将保持空闲。我们将关闭活动消费者的连接,然后看到另一个实例接管。

为了简单起见,我们将启动同一个进程中的 3 个消费者。这是 Stream Java 客户端的消费者定义。

System.out.println("Starting consumer instance " + i);
Consumer consumer = environment.consumerBuilder()
.stream(stream)
.name(reference)
.singleActiveConsumer()
.autoTrackingStrategy()
.messageCountBeforeStorage(10)
.builder()
.messageHandler((context, message) -> {
System.out.printf(
"Consumer instance %d received a message (%d).%n",
i, sequence.incrementAndGet()
);
})
.build();

声明单活动消费者的代码与声明常规消费者的代码几乎相同:您必须提供一个名称并使用 ConsumerBuilder#singleActiveConsumer()isequence 变量只是为了帮助理解发生了什么。

如果您想在阅读时运行代码,可以继续下一节。请注意,您可以不运行任何内容而继续阅读帖子其余部分,因此如果您不想尝试代码,可以跳过下一节。

设置示例项目

运行示例需要安装 Docker、Git 和 Java 11 或更高版本。如果 Docker 镜像已在本地存在,请将其删除,以确保稍后拉取最新镜像。

docker rmi pivotalrabbitmq/rabbitmq-stream

然后启动代理

docker run -it --rm --name rabbitmq -p 5552:5552 -p 5672:5672 -p 15672:15672 \
-e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS='-rabbitmq_stream advertised_host localhost' \
pivotalrabbitmq/rabbitmq-stream

容器开始监听多个端口:Stream 插件的 5552、AMQP 的 5672、HTTP 管理插件的 15672。

示例项目的代码托管在 GitHub 上。将项目克隆到本地,然后启动模拟 3 个消费者实例的应用程序。

cd /tmp
git clone https://github.com/acogoluegnes/rabbitmq-stream-single-active-consumer.git
cd rabbitmq-stream-single-active-consumer

设置完成,让我们启动我们的单活动消费者。

启动消费者

使用以下命令启动消费者实例

./mvnw -q compile exec:java -Dexec.mainClass=com.rabbitmq.stream.SingleActiveConsumer

应用程序确认已注册 3 个消费者

Created stream single-active-consumer
Starting consumer instance 0
Starting consumer instance 1
Starting consumer instance 2

在真实的系统中,这些消费者将在不同的主机上运行。但为了我们的演示,将它们运行在同一个进程中更简单,并且同样能说明问题。

一切就绪,让我们发布到 Stream。

发布消息

我们有一个正在运行的应用程序,它注册了 3 个相同的消费者实例,并启用了单活动消费者。我们使用Stream 性能工具每秒发布一条消息。

cd /tmp
wget -O stream-perf-test.jar \
https://github.com/rabbitmq/rabbitmq-java-tools-binaries-dev/releases/download/v-stream-perf-test-latest/stream-perf-test-latest.jar
java -jar /tmp/stream-perf-test.jar --rate 1 -x 1 -y 0 --streams single-active-consumer

性能工具开始发布(它会发出关于 Stream 创建的警告,这是预期的)

Starting producer
1, published 0 msg/s, confirmed 0 msg/s, consumed 0 msg/s, latency min/median/75th/95th/99th 0/0/0/0/0 ms, chunk size 0
2, published 1 msg/s, confirmed 1 msg/s, consumed 0 msg/s, latency min/median/75th/95th/99th 0/0/0/0/0 ms, chunk size 0
3, published 1 msg/s, confirmed 1 msg/s, consumed 0 msg/s, latency min/median/75th/95th/99th 0/0/0/0/0 ms, chunk size 0
4, published 1 msg/s, confirmed 1 msg/s, consumed 0 msg/s, latency min/median/75th/95th/99th 0/0/0/0/0 ms, chunk size 0
5, published 1 msg/s, confirmed 1 msg/s, consumed 0 msg/s, latency min/median/75th/95th/99th 0/0/0/0/0 ms, chunk size 0
...

第一个消费者实例从 Stream 获取消息

Created stream single-active-consumer
Starting consumer 0
Starting consumer 1
Starting consumer 2
Consumer instance 0 received a message (1).
Consumer instance 0 received a message (2).
Consumer instance 0 received a message (3).
Consumer instance 0 received a message (4).
...

如果没有启用单活动消费者,每个消费者实例都会获取 Stream 中的消息,您会看到每条消息有 3 行。所以我们的应用程序工作正常,只有一个消费者是活动的。让我们看看是否能了解更多关于我们的消费者...

检查消费者

list_stream_consumers CLI 命令列出了虚拟主机中所有 Stream 的消费者。

docker exec rabbitmq rabbitmqctl list_stream_consumers \
connection_pid,stream,messages_consumed,active,activity_status

我们看到我们的消费者及其各自的状态。

Listing stream consumers ...
┌────────────────┬────────────────────────┬───────────────────┬────────┬─────────────────┐
│ connection_pid │ stream │ messages_consumed │ active │ activity_status │
├────────────────┼────────────────────────┼───────────────────┼────────┼─────────────────┤
│ <11771.870.0> │ single-active-consumer │ 259 │ true │ single_active │
├────────────────┼────────────────────────┼───────────────────┼────────┼─────────────────┤
│ <11771.882.0> │ single-active-consumer │ 0 │ false │ waiting │
├────────────────┼────────────────────────┼───────────────────┼────────┼─────────────────┤
│ <11771.894.0> │ single-active-consumer │ 0 │ false │ waiting │
└────────────────┴────────────────────────┴───────────────────┴────────┴─────────────────┘

假设我们有很多 Stream,每个 Stream 都有很多消费者组。一个简洁的视图将很受欢迎。让我们使用 list_stream_consumer_groups 命令。

docker exec rabbitmq rabbitmqctl list_stream_consumer_groups stream,reference,consumers

我们的 my-app 组显示出来,并列出了注册的消费者数量。

Listing stream consumer groups ...
┌────────────────────────┬───────────┬───────────┐
│ stream │ reference │ consumers │
├────────────────────────┼───────────┼───────────┤
│ single-active-consumer │ my-app │ 3 │
└────────────────────────┴───────────┴───────────┘

我们可以使用 list_stream_group_consumers 命令深入了解我们的组。

docker exec rabbitmq rabbitmqctl list_stream_group_consumers --stream single-active-consumer --reference my-app

然后显示组的消费者。

Listing group consumers ...
┌─────────────────┬─────────────────────────────────────┬──────────┐
│ subscription_id │ connection_name │ state │
├─────────────────┼─────────────────────────────────────┼──────────┤
│ 0 │ 172.17.0.1:58370 -> 172.17.0.2:5552 │ active │
├─────────────────┼─────────────────────────────────────┼──────────┤
│ 0 │ 172.17.0.1:58376 -> 172.17.0.2:5552 │ inactive │
├─────────────────┼─────────────────────────────────────┼──────────┤
│ 0 │ 172.17.0.1:58380 -> 172.17.0.2:5552 │ inactive │
└─────────────────┴─────────────────────────────────────┴──────────┘

很好,我们有 CLI 命令可以了解启用单活动消费者标志的消费者。

查看管理 UI

我们还有比 CLI 更图形化的东西,即管理 UI 中的 Stream 页面(用户名/密码:guest/guest)。

The stream page in the management UI lists the consumers with their status.
Stream 页面在管理 UI 中列出了消费者及其状态。

channel 列并不完全显示 AMQP 通道:我们的示例使用了 Stream 协议,其中没有通道的概念。管理 UI 会进行适应,并显示消费者的连接。如果我们点击活动消费者通道/连接的链接,我们可以获得更多详细信息。

The connection page shows details about the active consumer.
连接页面显示了活动消费者的详细信息。

是时候尝试单活动消费者语义了。

终止活动消费者

让我们使用连接页面底部的 Force Close 按钮关闭活动消费者的连接。

从我们的消费应用程序控制台输出中可以看到,活动消费者已消失,下一个消费者已接管。

...
Consumer instance 0 received a message (130).
Consumer instance 0 received a message (131).
Consumer instance 0 received a message (132).
Consumer instance 1 received a message (133). <---- Instance #1 got activated
Consumer instance 1 received a message (134).
Consumer instance 1 received a message (135).
...

故障转移按预期工作:活动消费者崩溃,下一个排队中的消费者被激活。在现实世界中,活动消费者实例可能在操作系统升级后重启,或者其连接可能在网络故障后关闭。在此示例中,我们只是故意关闭了连接。

让我们列出组的消费者来确认我们的发现。

docker exec rabbitmq rabbitmqctl list_stream_group_consumers --stream single-active-consumer --reference my-app

我们仍然有 3 个消费者,因为客户端库自动恢复了关闭的连接。

Listing group consumers ...
┌─────────────────┬─────────────────────────────────────┬──────────┐
│ subscription_id │ connection_name │ state │
├─────────────────┼─────────────────────────────────────┼──────────┤
│ 0 │ 172.17.0.1:58376 -> 172.17.0.2:5552 │ active │
├─────────────────┼─────────────────────────────────────┼──────────┤
│ 0 │ 172.17.0.1:58380 -> 172.17.0.2:5552 │ inactive │
├─────────────────┼─────────────────────────────────────┼──────────┤
│ 0 │ 172.17.0.1:58388 -> 172.17.0.2:5552 │ inactive │
└─────────────────┴─────────────────────────────────────┴──────────┘

来自 172.17.0.1:58376 -> 172.17.0.2:5552 连接的消费者现在是活动的。在我们关闭第一个活动消费者之前,它是排在第二位的,现在它被提升了。172.17.0.1:58388 -> 172.17.0.2:5552 连接是客户端库恢复的连接,现在它位于列表的底部。

演示结束,您现在可以使用 Ctrl-C 关闭正在运行的程序:stream-perf-test、消费者和代理 Docker 容器。

总结

在这篇博文中,我们介绍了即将发布的 RabbitMQ 3.11 版本中的一项新功能:Stream 单活动消费者。它允许将多个消费者实例附加到一个 Stream,并一次只让一个实例处于活动状态。如果活动消费者实例发生故障,下一个实例将自动接管,从而使处理尽可能中断地进行。

不要犹豫运行演示源代码是可用的。您也可以自己进行实验,Stream Java 客户端中对单活动消费者的支持有文档记录,并且持续构建的Docker 镜像也是可用的。我们很高兴与 RabbitMQ 社区分享这项新功能,我们迫不及待地想在 RabbitMQ 3.11 今年晚些时候发布 GA 之前听到一些反馈

我们很快将发布另一篇关于 RabbitMQ 3.11 另一项功能的博文:Super Streams,这是我们用于扩展 Stream 规模的解决方案。敬请关注!

© . This site is unofficial and not affiliated with VMware.