跳到主要内容

RabbitMQ Streams 的第一个应用

·7 分钟阅读

RabbitMQ Streams 概览 介绍了 Streams,这是 RabbitMQ 3.9 中的一项新功能。这篇文章继续展示如何将 Streams 与 Java 客户端一起使用。我们将编写我们的第一个应用程序,该应用程序将消息发布到 Stream,然后消费它们。

启动启用 Streams 的 RabbitMQ

让我们启动一个 RabbitMQ Docker 容器

docker run -it --rm --name rabbitmq -p 5552:5552 \
-e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS='-rabbitmq_stream advertised_host localhost' \
rabbitmq:3.9

Streams 作为 RabbitMQ 3.9 中的核心插件发布,因此我们必须确保此插件已启用。打开一个新的终端标签页并执行以下命令

docker exec rabbitmq rabbitmq-plugins enable rabbitmq_stream

下一步是连接客户端应用程序到 Stream 插件。

连接到 RabbitMQ Streams

我们将使用 stream Java 客户端 与 Streams 交互。客户端文档涵盖了如何在 Maven 项目Gradle 项目 中声明适当的依赖项,因此我们可以专注于这篇文章中的代码。

stream Java 客户端的入口点是 Environment。它处理 Stream 管理以及发布者和消费者实例的创建。以下是如何创建 Environment 实例

try (Environment environment = Environment.builder()
.uri("rabbitmq-stream://localhost:5552").build()) {

// ...

}

我们有了环境,让我们创建一个 Stream。

创建 Stream

环境提供了一个 API 来创建 Stream,我们将使用它来创建一个具有所有默认值的 first-application-stream Stream

environment.streamCreator().stream("first-application-stream").create();

Stream 已经创建好了,现在是时候发布到它了。

发布到 Stream

我们需要创建一个 Producer 实例来发布到 Stream。我们再次使用 Environment 来创建此对象

Producer producer = environment
.producerBuilder()
.stream("first-application-stream") // stream to publish to
.build();

我们将在一个循环中发布一些消息,让我们逐步进行并构建发布循环的骨架

int messageCount = 1_000_000;
CountDownLatch confirmLatch = new CountDownLatch(messageCount);
IntStream.range(0, messageCount).forEach(i -> {
// send one message
});
boolean done = confirmLatch.await(1, TimeUnit.MINUTES);

请注意使用 CountDownLatch 来确保我们仅在获得所有发布确认后才继续进行,稍后会详细介绍。

现在我们将重点关注消息的创建。RabbitMQ Streams 使用 AMQP 1.0 消息格式,因为它是一种灵活而强大的格式,具有高级类型系统。使用 AMQP 1.0 消息格式可以实现互操作性,使 Streams 与 RabbitMQ 支持的其他协议(AMQP 0.9.1 和 1.0、MQTT、STOMP)兼容。

stream Java 客户端提供了一个消息构建器接口来创建消息,我们使用它来创建一个带有几个属性和一个二进制负载的消息

Message message = producer.messageBuilder()
.properties()
.creationTime(System.currentTimeMillis())
.messageId(i)
.messageBuilder()
.addData("hello world".getBytes(StandardCharsets.UTF_8))
.build();

好的,我们有了消息实例,下一步是发布它。但是让我们快速回到这个 AMQP 1.0 消息格式的事情。我们要强调的是,RabbitMQ Streams 仅使用 AMQP 1.0 消息格式,而不是 AMQP 1.0 协议。RabbitMQ Streams 有其自己的二进制协议,该协议恰好在其某些帧中传递以 AMQP 1.0 格式编码的消息。消息编码实际上是客户端的责任:RabbitMQ Streams 与消息格式无关。消息只是字节数组,例如 [100, 76, 240, ...]。但是,AMQP 1.0 消息格式使 Streams 与其他协议(例如 AMQP 0.9.1、MQTT 等)高度可操作,Streams 默认情况下支持这些协议。

现在是时候发送我们的消息了,我们只需要将其传递给生产者即可

producer.send(message, confirmationStatus -> confirmLatch.countDown());

请注意 send 方法的第二个参数:这是此消息的发布确认异步到达时的回调。这是您可以确保消息不丢失的方式。这里我们只是减少 CoundDownLatch 的计数。

以下是以上所有内容在代码中的样子

int messageCount = 1_000_000;
CountDownLatch confirmLatch = new CountDownLatch(messageCount);
IntStream.range(0, messageCount).forEach(i -> {
Message message = producer.messageBuilder()
.properties()
.creationTime(System.currentTimeMillis())
.messageId(i)
.messageBuilder()
.addData("hello world".getBytes(StandardCharsets.UTF_8))
.build();
producer.send(message, confirmationStatus -> confirmLatch.countDown());
});
boolean done = confirmLatch.await(1, TimeUnit.MINUTES);

运行发布者

您可以在本地运行发布者示例,代码托管在 GitHub 上。您只需要安装 JDK 8 或更高版本,以及运行 RabbitMQ 3.9 的实例并启用 rabbit_stream 插件,如上所述。

git clone https://github.com/acogoluegnes/rabbitmq-streams-blog-posts.git
cd rabbitmq-streams-blog-posts
./mvnw -q compile exec:java -Dexec.mainClass='com.rabbitmq.stream.FirstApplication$Publish'

您应该获得如下所示的输出,确认消息已被 Broker 考虑在内

Connecting...
Connected
Creating stream...
Stream created
Creating producer...
Producer created
Sending 1,000,000 messages
Messages sent, waiting for confirmation...
All messages confirmed? yes (1440 ms)
Closing environment...
Environment closed

rabbitmq-streams stream_status CLI 命令确认消息已落在 Broker 上

docker exec rabbitmq rabbitmq-streams stream_status first-application-stream

您应该看到以下输出

Status of stream first-application-stream on node rabbit@ba9dbabe12b8 ...
┌────────┬─────────────────────┬────────┬──────────────────┬──────────────┬─────────┬──────────┐
│ role │ node │ offset │ committed_offset │ first_offset │ readers │ segments │
├────────┼─────────────────────┼────────┼──────────────────┼──────────────┼─────────┼──────────┤
│ writer │ rabbit@ba9dbabe12b8 │ 999999 │ 999938 │ 0 │ 0 │ 1 │
└────────┴─────────────────────┴────────┴──────────────────┴──────────────┴─────────┴──────────┘

我想强调 offset 列,它告诉我们 Stream 中最后一条消息的索引,在上面的示例中为 999,999。这确认 Stream 包含 100 万条消息(偏移量从 0 开始)。

消费消息

消费代码很简单。我们需要从 Environment 创建一个 Consumer 实例。这需要设置几个参数:要从中消费的 Stream,要从哪个偏移量开始消费 — 此处为 first — 以及接收消息时的行为。这是代码

AtomicInteger messageConsumed = new AtomicInteger(0); // just a counter
Consumer consumer = environment.consumerBuilder()
.stream("first-application-stream") // stream to consume from
.offset(OffsetSpecification.first()) // where to start consuming
.messageHandler((context, message) -> messageConsumed.incrementAndGet()) // behavior
.build();

该代码只是在收到新消息时增加计数器。

运行消费者

您可以使用以下命令运行消费者代码

./mvnw -q compile exec:java -Dexec.mainClass='com.rabbitmq.stream.FirstApplication$Consume'

您应该在控制台中看到如下内容

Connecting...
Connected
Start consumer...
Consumed 1,000,000 messages in 732 ms
Closing environment...
Environment closed

恭喜!消息已到达消费者。

您可以多次运行消费者程序,以确保消费者可以读取和重新读取消息,而无需从 Stream 中删除它们。每次您都会获得相同数量的已消费消息。

总结

这结束了我们的第一个 RabbitMQ Streams 应用程序的编写。以下是要记住的主要元素

  • stream Java 客户端 为 RabbitMQ Streams 提供了全面的支持。
  • 主要的 API 是 EnvironmentProducerConsumer
  • 消息使用丰富且可互操作的 AMQP 1.0 格式。
  • stream Java 客户端提供了一个高级 API,它可以处理样板代码,并让开发人员专注于应用程序代码。

作为奖励,这是一个视频,涵盖了 RabbitMQ Streams 和 stream Go 客户端

RabbitMQ 团队期待听到您对 Streams 和 Stream 客户端库(JavaGo)的反馈。我们计划为 stream 协议 编写一个 .NET 客户端,因此如果您具有 .NET 技能,则可以提出设计建议,甚至可以提出原型。

请继续关注有关 Streams 的其他博客文章,我们将在其中介绍诸如发布去重、偏移量跟踪以及 RabbitMQ 中支持的协议之间的互操作性等功能。

© . All rights reserved.