跳至主要内容

使用 RabbitMQ Streams 的第一个应用

·阅读 7 分钟

RabbitMQ Streams 概述 介绍了流,这是 RabbitMQ 3.9 中的一项新功能。本文将继续介绍如何使用 Java 客户端与流交互。我们将编写第一个应用程序,该应用程序将消息发布到流中,然后使用它们。

启用流的情况下启动 RabbitMQ

让我们启动一个 RabbitMQ Docker 容器

docker run -it --rm --name rabbitmq -p 5552:5552 \
-e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS='-rabbitmq_stream advertised_host localhost' \
rabbitmq:3.9

流作为 RabbitMQ 3.9 中的核心插件提供,因此我们必须确保启用了此插件。打开一个新的终端标签并执行以下命令

docker exec rabbitmq rabbitmq-plugins enable rabbitmq_stream

下一步是将客户端应用程序连接到流插件。

连接到 RabbitMQ Streams

我们将使用流 Java 客户端 与流进行交互。客户端文档介绍了如何在Maven 项目Gradle 项目中声明相应的依赖项,因此我们可以在本文中专注于代码。

流 Java 客户端的入口点是Environment。它处理流管理以及发布者和使用者实例的创建。以下是如何创建Environment实例

try (Environment environment = Environment.builder()
.uri("rabbitmq-stream://127.0.0.1:5552").build()) {

// ...

}

我们有了环境,让我们创建一个流。

创建流

环境提供了一个用于创建流的 API,我们将使用它来创建一个使用所有默认值的first-application-stream

environment.streamCreator().stream("first-application-stream").create();

流已存在,是时候发布到它了。

发布到流

我们需要创建一个Producer实例来发布到流。我们再次使用Environment来创建此对象

Producer producer = environment
.producerBuilder()
.stream("first-application-stream") // stream to publish to
.build();

我们将循环发布一些消息,让我们一步一步进行并构建发布循环的框架

int messageCount = 1_000_000;
CountDownLatch confirmLatch = new CountDownLatch(messageCount);
IntStream.range(0, messageCount).forEach(i -> {
// send one message
});
boolean done = confirmLatch.await(1, TimeUnit.MINUTES);

请注意使用CountDownLatch来确保我们仅在获得所有发布确认后才继续,稍后将详细介绍。

我们现在将重点放在创建消息上。RabbitMQ Streams 使用AMQP 1.0 消息格式,因为它是一种灵活且强大的格式,具有高级类型系统。使用 AMQP 1.0 消息格式允许互操作性,使流与 RabbitMQ 支持的其他协议(AMQP 0.9.1 和 1.0、MQTT、STOMP)兼容。

流 Java 客户端提供了一个消息构建器接口来创建消息,我们用它来创建一个带有一些属性和二进制有效负载的消息

Message message = producer.messageBuilder()
.properties()
.creationTime(System.currentTimeMillis())
.messageId(i)
.messageBuilder()
.addData("hello world".getBytes(StandardCharsets.UTF_8))
.build();

好的,我们有了消息实例,下一步是发布它。但是让我们快速回到这个 AMQP 1.0 消息格式的问题上。我们要强调的是,RabbitMQ Streams 仅使用 AMQP 1.0 的消息格式,而不是 AMQP 1.0 的协议。RabbitMQ Streams 有自己的二进制协议,它碰巧在其某些帧中以 AMQP 1.0 格式编码的消息。消息编码实际上是客户端的责任:RabbitMQ Streams 与消息格式无关。消息只是字节数组,例如[100, 76, 240, ...]。但是,AMQP 1.0 消息格式使流能够与其他协议(如 AMQP 0.9.1、MQTT 等)高度互操作,流默认支持这些协议。

现在是时候发送我们的消息了,我们只需将其传递给生产者

producer.send(message, confirmationStatus -> confirmLatch.countDown());

请注意send方法的第二个参数:这是此消息的发布确认异步到达时的回调。这是确保消息不会丢失的方法。在这里,我们只是减少CoundDownLatch的计数。

以下是所有内容在代码中的样子

int messageCount = 1_000_000;
CountDownLatch confirmLatch = new CountDownLatch(messageCount);
IntStream.range(0, messageCount).forEach(i -> {
Message message = producer.messageBuilder()
.properties()
.creationTime(System.currentTimeMillis())
.messageId(i)
.messageBuilder()
.addData("hello world".getBytes(StandardCharsets.UTF_8))
.build();
producer.send(message, confirmationStatus -> confirmLatch.countDown());
});
boolean done = confirmLatch.await(1, TimeUnit.MINUTES);

运行发布者

您可以在本地运行发布者示例,代码托管在 GitHub 上。您只需要安装 JDK 8 或更高版本,以及一个启用了 rabbit_stream 插件的 RabbitMQ 3.9 正在运行的实例,如上所述。

git clone https://github.com/acogoluegnes/rabbitmq-streams-blog-posts.git
cd rabbitmq-streams-blog-posts
./mvnw -q compile exec:java -Dexec.mainClass='com.rabbitmq.stream.FirstApplication$Publish'

您应该会看到如下输出,确认代理已考虑这些消息

Connecting...
Connected
Creating stream...
Stream created
Creating producer...
Producer created
Sending 1,000,000 messages
Messages sent, waiting for confirmation...
All messages confirmed? yes (1440 ms)
Closing environment...
Environment closed

rabbitmq-streams stream_status CLI 命令确认消息已到达代理

docker exec rabbitmq rabbitmq-streams stream_status first-application-stream

您应该看到以下输出

Status of stream first-application-stream on node rabbit@ba9dbabe12b8 ...
┌────────┬─────────────────────┬────────┬──────────────────┬──────────────┬─────────┬──────────┐
│ role │ node │ offset │ committed_offset │ first_offset │ readers │ segments │
├────────┼─────────────────────┼────────┼──────────────────┼──────────────┼─────────┼──────────┤
│ writer │ rabbit@ba9dbabe12b8 │ 999999 │ 999938 │ 0 │ 0 │ 1 │
└────────┴─────────────────────┴────────┴──────────────────┴──────────────┴─────────┴──────────┘

我想重点介绍offset列,它告诉我们流中最后一条消息的索引,在上面的示例中为999,999。这确认流包含 100 万条消息(偏移量从0开始)。

使用消息

使用代码很简单。我们需要从Environment创建一个Consumer实例。这需要设置一些参数:要从中使用的流、要从中开始使用的偏移量——此处为first——以及接收消息时的行为。以下是代码

AtomicInteger messageConsumed = new AtomicInteger(0); // just a counter
Consumer consumer = environment.consumerBuilder()
.stream("first-application-stream") // stream to consume from
.offset(OffsetSpecification.first()) // where to start consuming
.messageHandler((context, message) -> messageConsumed.incrementAndGet()) // behavior
.build();

代码在收到新消息时只递增一个计数器。

运行使用者

您可以使用以下命令运行使用者代码

./mvnw -q compile exec:java -Dexec.mainClass='com.rabbitmq.stream.FirstApplication$Consume'

您应该在控制台中看到类似以下内容

Connecting...
Connected
Start consumer...
Consumed 1,000,000 messages in 732 ms
Closing environment...
Environment closed

恭喜!消息已发送到使用者。

您可以通过多次运行使用者程序来确保使用者可以读取和重新读取消息而不会将其从流中删除。您每次都会收到相同数量的已使用消息。

总结

这结束了我们第一个 RabbitMQ Streams 应用程序的编写。以下是需要记住的主要内容

  • 流 Java 客户端 为 RabbitMQ Streams 提供了全面的支持。
  • 主要 API 为EnvironmentProducerConsumer
  • 消息使用丰富且可互操作的 AMQP 1.0 格式。
  • 流 Java 客户端提供了一个高级 API,它处理样板代码并让开发人员专注于应用程序代码。

作为奖励,以下视频介绍了 RabbitMQ Streams 和流 Go 客户端

RabbitMQ 团队期待收到您对流和流客户端库的反馈(JavaGo)。我们计划为流协议编写一个.NET 客户端,因此如果您具备 .NET 技能,您可以提出设计建议甚至原型。

敬请关注有关流的其他博文,我们将介绍发布重复数据删除、偏移量跟踪以及 RabbitMQ 支持的协议之间的互操作性等功能。

© 2024 RabbitMQ. All rights reserved.