第一个使用 RabbitMQ Streams 的应用程序
RabbitMQ Streams 概述 介绍了 Streams,这是 RabbitMQ 3.9 中的一项新功能。本文将继续介绍如何使用 Java 客户端处理 Streams。我们将编写我们的第一个应用程序,该应用程序将消息发布到流,然后进行消耗。
启动启用了 Streams 的 RabbitMQ
让我们启动一个 RabbitMQ Docker 容器
docker run -it --rm --name rabbitmq -p 5552:5552 \
-e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS='-rabbitmq_stream advertised_host localhost' \
rabbitmq:3.9
Streams 作为 RabbitMQ 3.9 的核心插件提供,因此我们必须确保此插件已启用。打开一个新的终端标签页并执行以下命令
docker exec rabbitmq rabbitmq-plugins enable rabbitmq_stream
下一步是将客户端应用程序连接到 stream 插件。
连接到 RabbitMQ Streams
我们将使用 stream Java 客户端与流进行交互。客户端文档涵盖了如何在 Maven 项目和 Gradle 项目中声明相应的依赖项,因此我们可以在本文中专注于代码实现。
Stream Java 客户端的入口点是 Environment。它负责流管理以及创建发布者(publisher)和消费者(consumer)实例。以下是创建 Environment 实例的方法
try (Environment environment = Environment.builder()
.uri("rabbitmq-stream://:5552").build()) {
// ...
}
有了环境,接下来创建一个流。
创建流
环境提供了一个创建流的 API,我们将使用它来创建一个采用所有默认设置的 first-application-stream 流
environment.streamCreator().stream("first-application-stream").create();
流已经创建好了,现在是向其发布消息的时候了。
向流发布消息
我们需要创建一个 Producer 实例来向流发布消息。我们再次使用 Environment 来创建该对象
Producer producer = environment
.producerBuilder()
.stream("first-application-stream") // stream to publish to
.build();
我们将在循环中发布一些消息,让我们逐步构建发布循环的框架
int messageCount = 1_000_000;
CountDownLatch confirmLatch = new CountDownLatch(messageCount);
IntStream.range(0, messageCount).forEach(i -> {
// send one message
});
boolean done = confirmLatch.await(1, TimeUnit.MINUTES);
请注意 CountDownLatch 的使用,以确保仅在获得所有发布确认后才继续进行,稍后会详细说明。
现在我们将重点关注消息的创建。RabbitMQ Streams 使用 AMQP 1.0 消息格式,因为它是一种灵活且强大的格式,具有先进的类型系统。使用 AMQP 1.0 消息格式可以实现互操作性,使流能够兼容 RabbitMQ 支持的其他协议(AMQP 0.9.1 和 1.0、MQTT、STOMP)。
Stream Java 客户端提供了一个消息构建器接口来创建消息,我们使用它来创建一个包含若干属性和二进制有效载荷(payload)的消息
Message message = producer.messageBuilder()
.properties()
.creationTime(System.currentTimeMillis())
.messageId(i)
.messageBuilder()
.addData("hello world".getBytes(StandardCharsets.UTF_8))
.build();
好了,我们有了消息实例,下一步就是发布它。但让我们快速回到这个 AMQP 1.0 消息格式的问题上。我们想强调的是,RabbitMQ Streams 仅使用 AMQP 1.0 消息格式,而不是 AMQP 1.0 协议。RabbitMQ Streams 拥有自己的二进制协议,该协议恰好在某些帧中传送以 AMQP 1.0 格式编码的消息。消息编码实际上是客户端的责任:RabbitMQ Streams 对消息格式是不可知的。消息只是字节数组,例如 [100, 76, 240, ...]。然而,AMQP 1.0 消息格式使流能够与流默认支持的其他协议(如 AMQP 0.9.1、MQTT 等)高度互操作。
现在是发送消息的时候了,我们只需将其传递给 producer
producer.send(message, confirmationStatus -> confirmLatch.countDown());
请注意 send 方法的第二个参数:这是当该消息的发布确认异步到达时的回调。这就是您可以确保消息不会丢失的方法。在这里,我们只是递减 CountDownLatch 的计数。
上述所有内容在代码中看起来是这样的
int messageCount = 1_000_000;
CountDownLatch confirmLatch = new CountDownLatch(messageCount);
IntStream.range(0, messageCount).forEach(i -> {
Message message = producer.messageBuilder()
.properties()
.creationTime(System.currentTimeMillis())
.messageId(i)
.messageBuilder()
.addData("hello world".getBytes(StandardCharsets.UTF_8))
.build();
producer.send(message, confirmationStatus -> confirmLatch.countDown());
});
boolean done = confirmLatch.await(1, TimeUnit.MINUTES);
运行发布者
您可以在本地运行发布者示例,代码托管在 GitHub 上。您只需要安装 JDK 8 或更高版本,并如上所述运行已启用 rabbit_stream 插件的 RabbitMQ 3.9 实例。
git clone https://github.com/acogoluegnes/rabbitmq-streams-blog-posts.git
cd rabbitmq-streams-blog-posts
./mvnw -q compile exec:java -Dexec.mainClass='com.rabbitmq.stream.FirstApplication$Publish'
您应该得到如下输出,确认消息已被代理(broker)接收
Connecting...
Connected
Creating stream...
Stream created
Creating producer...
Producer created
Sending 1,000,000 messages
Messages sent, waiting for confirmation...
All messages confirmed? yes (1440 ms)
Closing environment...
Environment closed
rabbitmq-streams stream_status CLI 命令确认消息已到达代理
docker exec rabbitmq rabbitmq-streams stream_status first-application-stream
您应该看到以下输出
Status of stream first-application-stream on node rabbit@ba9dbabe12b8 ...
┌────────┬─────────────────────┬────────┬──────────────────┬──────────────┬─────────┬──────────┐
│ role │ node │ offset │ committed_offset │ first_offset │ readers │ segments │
├────────┼─────────────────────┼────────┼──────────────────┼──────────────┼─────────┼──────────┤
│ writer │ rabbit@ba9dbabe12b8 │ 999999 │ 999938 │ 0 │ 0 │ 1 │
└────────┴─────────────────────┴────────┴──────────────────┴──────────────┴─────────┴──────────┘
我想强调 offset 列,它告诉我们流中最后一条消息的索引,在上面的示例中为 999,999。这确认了该流包含 100 万条消息(偏移量从 0 开始)。
消费消息
消费代码很简单。我们需要从 Environment 创建一个 Consumer 实例。这需要设置一些参数:要消费的流、开始消费的偏移量(这里是 first),以及接收到消息时的行为。这是代码
AtomicInteger messageConsumed = new AtomicInteger(0); // just a counter
Consumer consumer = environment.consumerBuilder()
.stream("first-application-stream") // stream to consume from
.offset(OffsetSpecification.first()) // where to start consuming
.messageHandler((context, message) -> messageConsumed.incrementAndGet()) // behavior
.build();
当收到新消息时,代码只是增加一个计数器。
运行消费者
您可以使用以下命令运行消费者代码
./mvnw -q compile exec:java -Dexec.mainClass='com.rabbitmq.stream.FirstApplication$Consume'
您应该在控制台中看到类似以下的内容
Connecting...
Connected
Start consumer...
Consumed 1,000,000 messages in 732 ms
Closing environment...
Environment closed
恭喜!消息已成功传送到消费者。
您可以通过多次运行消费者程序来确保消费者可以读取和重读消息,而不会将它们从流中删除。每次您都会获得相同数量的已消费消息。
总结
至此,我们编写了第一个 RabbitMQ Streams 应用程序。以下是要记住的主要要素
- Stream Java 客户端提供了对 RabbitMQ Streams 的全面支持。
- 主要的 API 是
Environment、Producer和Consumer。 - 消息使用丰富且可互操作的 AMQP 1.0 格式。
- Stream Java 客户端提供高级 API,它处理了样板代码,让开发人员能够专注于应用程序代码。
作为奖励,这是一个涵盖 RabbitMQ Streams 和 stream Go 客户端的视频
RabbitMQ 团队期待听到您对 Streams 以及流客户端库(Java, Go)的反馈。我们计划为流协议编写一个 .NET 客户端,因此如果您具备 .NET 技能,可以提出设计建议甚至原型。
敬请关注有关 Streams 的其他博客文章,我们将涵盖诸如发布去重、偏移量跟踪以及 RabbitMQ 支持的协议之间的互操作性等功能。
