第一个使用 RabbitMQ Streams 的应用程序
RabbitMQ Streams 概述 介绍了 Streams,这是 RabbitMQ 3.9 中的一项新功能。本文将继续介绍如何使用 Java 客户端处理 Streams。我们将编写我们的第一个应用程序,该应用程序将消息发布到流,然后进行消耗。
启用 Streams 的 RabbitMQ 入门
让我们启动一个启用了 Streams 的 RabbitMQ Docker 容器
docker run -it --rm --name rabbitmq -p 5552:5552 \
-e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS='-rabbitmq_stream advertised_host localhost' \
rabbitmq:3.9
Streams 作为 RabbitMQ 3.9 的核心插件发布,因此我们需要确保此插件已启用。打开一个新的终端标签页并执行以下命令
docker exec rabbitmq rabbitmq-plugins enable rabbitmq_stream
下一步是*连接*一个客户端应用程序到 Streams 插件。
连接到 RabbitMQ Streams
我们将使用 stream Java 客户端 与 streams 进行交互。客户端文档涵盖了如何在 Maven 项目 和 Gradle 项目 中声明适当的依赖项,因此我们可以专注于本文中的代码。
Streams Java 客户端的入口点是 Environment。它负责 streams 的管理以及 publisher 和 consumer 实例的创建。下面是如何创建一个 Environment 实例
try (Environment environment = Environment.builder()
.uri("rabbitmq-stream://:5552").build()) {
// ...
}
我们已经有了 Environment,现在来创建一个 stream。
创建 Stream
Environment 提供了一个创建 streams 的 API,我们将使用它来创建一个 first-application-stream stream,并使用所有默认设置
environment.streamCreator().stream("first-application-stream").create();
Stream 已经创建好了,是时候向其中发布消息了。
发布到 Stream
我们需要创建一个 Producer 实例来发布消息到 stream。我们再次使用 Environment 来创建这个对象
Producer producer = environment
.producerBuilder()
.stream("first-application-stream") // stream to publish to
.build();
我们将循环发布一些消息,让我们一步一步来构建发布循环的骨架
int messageCount = 1_000_000;
CountDownLatch confirmLatch = new CountDownLatch(messageCount);
IntStream.range(0, messageCount).forEach(i -> {
// send one message
});
boolean done = confirmLatch.await(1, TimeUnit.MINUTES);
请注意 CountDownLatch 的使用,以确保我们只在收到所有发布确认后才继续前进,更多内容稍后介绍。
现在我们来关注消息的创建。RabbitMQ Streams 使用 AMQP 1.0 消息格式,因为它是一种灵活且强大的格式,具有高级类型系统。使用 AMQP 1.0 消息格式可以实现互操作性,使 streams 与 RabbitMQ 支持的其他协议(AMQP 0.9.1 和 1.0、MQTT、STOMP)兼容。
Streams Java 客户端提供了一个消息构建器接口来创建消息,我们使用它来创建一个带有几个属性和一个二进制 payload 的消息
Message message = producer.messageBuilder()
.properties()
.creationTime(System.currentTimeMillis())
.messageId(i)
.messageBuilder()
.addData("hello world".getBytes(StandardCharsets.UTF_8))
.build();
好的,我们已经有了消息实例,下一步是发布它。但让我们快速回顾一下 AMQP 1.0 消息格式。我们想强调的是,RabbitMQ Streams 只使用 AMQP 1.0*消息格式*,而不是 AMQP 1.0*协议*。RabbitMQ Streams 有自己的二进制协议,该协议恰好在某些帧中传输以 AMQP 1.0 格式编码的消息。消息编码实际上是客户端的责任:RabbitMQ Streams 与消息格式无关。消息只是字节数组,例如 [100, 76, 240, ...]。然而,AMQP 1.0 消息格式使得 streams 与其他协议(如 AMQP 0.9.1、MQTT 等)高度可操作,而 streams 默认支持这些协议。
现在是时候发送我们的消息了,我们只需将其传递给 producer
producer.send(message, confirmationStatus -> confirmLatch.countDown());
注意 send 方法的第二个参数:这是当此消息的发布确认异步到达时的回调。这就是你可以确保消息不丢失的方式。在这里,我们只是递减 CountDownLatch 的计数。
以上所有内容用代码表示如下
int messageCount = 1_000_000;
CountDownLatch confirmLatch = new CountDownLatch(messageCount);
IntStream.range(0, messageCount).forEach(i -> {
Message message = producer.messageBuilder()
.properties()
.creationTime(System.currentTimeMillis())
.messageId(i)
.messageBuilder()
.addData("hello world".getBytes(StandardCharsets.UTF_8))
.build();
producer.send(message, confirmationStatus -> confirmLatch.countDown());
});
boolean done = confirmLatch.await(1, TimeUnit.MINUTES);
运行 Publisher
你可以在本地运行 publisher 示例,代码托管在 GitHub 上。你只需要安装 JDK 8 或更高版本,以及一个运行中的 RabbitMQ 3.9 实例,并启用 rabbit_stream 插件,如上所述。
git clone https://github.com/acogoluegnes/rabbitmq-streams-blog-posts.git
cd rabbitmq-streams-blog-posts
./mvnw -q compile exec:java -Dexec.mainClass='com.rabbitmq.stream.FirstApplication$Publish'
你应该会看到类似以下的输出,确认消息已被 broker 处理
Connecting...
Connected
Creating stream...
Stream created
Creating producer...
Producer created
Sending 1,000,000 messages
Messages sent, waiting for confirmation...
All messages confirmed? yes (1440 ms)
Closing environment...
Environment closed
rabbitmq-streams stream_status CLI 命令确认消息已成功进入 broker
docker exec rabbitmq rabbitmq-streams stream_status first-application-stream
你应该看到以下输出
Status of stream first-application-stream on node rabbit@ba9dbabe12b8 ...
┌────────┬─────────────────────┬────────┬──────────────────┬──────────────┬─────────┬──────────┐
│ role │ node │ offset │ committed_offset │ first_offset │ readers │ segments │
├────────┼─────────────────────┼────────┼──────────────────┼──────────────┼─────────┼──────────┤
│ writer │ rabbit@ba9dbabe12b8 │ 999999 │ 999938 │ 0 │ 0 │ 1 │
└────────┴─────────────────────┴────────┴──────────────────┴──────────────┴─────────┴──────────┘
我想强调一下 offset 列,它告诉我们 stream 中最后一条消息的索引,在上面的示例中是 999,999。这证实了 stream 包含一百万条消息(偏移量从 0 开始)。
消费消息
消费代码非常简单。我们需要从 Environment 创建一个 Consumer 实例。这需要设置几个参数:要从中消费的 stream,开始消费的偏移量——这里是 first——以及接收消息时的行为。这是代码
AtomicInteger messageConsumed = new AtomicInteger(0); // just a counter
Consumer consumer = environment.consumerBuilder()
.stream("first-application-stream") // stream to consume from
.offset(OffsetSpecification.first()) // where to start consuming
.messageHandler((context, message) -> messageConsumed.incrementAndGet()) // behavior
.build();
代码只是在新消息收到时递增计数器。
运行 Consumer
你可以使用以下命令运行 consumer 代码
./mvnw -q compile exec:java -Dexec.mainClass='com.rabbitmq.stream.FirstApplication$Consume'
你会在控制台中看到类似以下的内容
Connecting...
Connected
Start consumer...
Consumed 1,000,000 messages in 732 ms
Closing environment...
Environment closed
恭喜!消息已成功到达 consumer。
你可以通过多次运行 consumer 程序来确保 consumer 可以*在不将其从 stream 中移除的情况下*读取和重新读取消息。每次你都会得到相同数量的已消费消息。
总结
至此,我们完成了第一个 RabbitMQ Streams 应用的编写。以下是需要记住的主要内容
- stream Java 客户端 为 RabbitMQ Streams 提供了全面的支持。
- 主要 API 是
Environment、Producer和Consumer。 - 消息使用丰富且可互操作的 AMQP 1.0 格式。
- stream Java 客户端提供了一个高级 API,它处理了样板代码,让开发者可以专注于应用程序代码。
作为奖励,这里有一个视频,涵盖了 RabbitMQ Streams 和 stream Go 客户端
RabbitMQ 团队期待听到你对 streams 和 stream 客户端库(Java、Go)的反馈。我们计划为 stream 协议编写一个 .NET 客户端,所以如果你有 .NET 技能,可以提出设计建议,甚至是一个原型。
请继续关注关于 streams 的其他博客文章,我们将涵盖诸如发布去重、偏移量跟踪以及 RabbitMQ 中支持的协议之间的互操作性等功能。
