跳转到主要内容

RabbitMQ Stream 教程 - 偏移量跟踪

简介

信息

先决条件

本教程假定您已安装 RabbitMQ,并在 localhost 上运行,并且已启用 stream 插件标准 stream 端口为 5552。如果您使用不同的主机、端口或凭据,则需要调整连接设置。

使用 Docker

如果您没有安装 RabbitMQ,可以在 Docker 容器中运行它

docker run -it --rm --name rabbitmq -p 5552:5552 -p 15672:15672 -p 5672:5672  \
-e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS='-rabbitmq_stream advertised_host localhost' \
rabbitmq:4-management

等待服务器启动,然后启用 stream 和 stream management 插件

docker exec rabbitmq rabbitmq-plugins enable rabbitmq_stream rabbitmq_stream_management 

在哪里获得帮助

如果您在学习本教程时遇到问题,可以通过邮件列表Discord 社区服务器联系我们。

RabbitMQ Streams 在 RabbitMQ 3.9 中引入。更多信息请参见此处

偏移量跟踪

设置

本教程的这部分内容是编写两个 Java 程序;一个生产者发送一连串消息,并在末尾添加一个标记消息,以及一个消费者接收消息并在收到标记消息时停止。它展示了消费者如何浏览 stream,甚至可以从上次执行停止的位置重新开始。

本教程使用stream Java 客户端。请务必按照第一个教程中的设置步骤进行操作。

本教程的可执行版本可以在RabbitMQ tutorials 仓库中找到。发送程序名为 OffsetTrackingSend.java,接收程序名为 OffsetTrackingReceive.java。本教程重点介绍客户端库的用法,因此应使用仓库中的最终代码来创建文件的框架(例如,导入、main 函数等)。

发送

发送程序首先实例化 Environment 并创建 stream

try (Environment environment = Environment.builder().build()) {
String stream = "stream-offset-tracking-java";
environment.streamCreator()
.stream(stream)
.maxLengthBytes(ByteCapacity.GB(1))
.create();

// publishing code to come

}

然后,该程序创建一个 Producer 实例并发布 100 条消息。最后一条消息的主体值设置为 marker;这是消费者停止消费的标记。

请注意 CountDownLatch 的使用:在每个消息确认回调中,它都会通过 countDown 递减。这确保了 Broker 在关闭程序之前收到了所有消息。

Producer producer = environment.producerBuilder()
.stream(stream)
.build();

int messageCount = 100;
CountDownLatch confirmedLatch = new CountDownLatch(messageCount);
System.out.printf("Publishing %d messages%n", messageCount);
IntStream.range(0, messageCount).forEach(i -> {
String body = i == messageCount - 1 ? "marker" : "hello";
producer.send(producer.messageBuilder()
.addData(body.getBytes(UTF_8))
.build(),
ctx -> {
if (ctx.isConfirmed()) {
confirmedLatch.countDown();
}
}
);
});

boolean completed = confirmedLatch.await(60, TimeUnit.SECONDS);
System.out.printf("Messages confirmed: %b.%n", completed);

现在让我们创建接收程序。

接收

接收程序创建一个 Environment 实例,并确保也创建了 stream。这部分代码与发送程序中的代码相同,因此为了简洁起见,在接下来的代码片段中跳过了这部分代码。

接收程序启动一个消费者,该消费者附加到 stream 的开头 (OffsetSpecification.first())。它使用变量在程序结束时输出第一个和最后一个接收到的消息的偏移量。

当消费者收到标记消息时停止:它将偏移量分配给变量,关闭消费者,并递减 latch 计数。与发送者一样,当消费者完成其工作时,CountDownLatch 会告诉程序继续执行。

OffsetSpecification offsetSpecification = OffsetSpecification.first();
AtomicLong firstOffset = new AtomicLong(-1);
AtomicLong lastOffset = new AtomicLong(0);
CountDownLatch consumedLatch = new CountDownLatch(1);
environment.consumerBuilder()
.stream(stream)
.offset(offsetSpecification)
.messageHandler((ctx, msg) -> {
if (firstOffset.compareAndSet(-1, ctx.offset())) {
System.out.println("First message received.");
}
String body = new String(msg.getBodyAsBinary(), StandardCharsets.UTF_8);
if ("marker".equals(body)) {
lastOffset.set(ctx.offset());
ctx.consumer().close();
consumedLatch.countDown();
}
})
.build();
System.out.println("Started consuming...");

consumedLatch.await(60, TimeUnit.MINUTES);

System.out.printf("Done consuming, first offset %d, last offset %d.%n",
firstOffset.get(), lastOffset.get());

探索 Stream

为了运行这两个示例,请打开两个终端(shell)标签页。

在第一个标签页中,运行发送者以发布一连串消息

./mvnw -q compile exec:java '-Dexec.mainClass=OffsetTrackingSend'

输出如下

Publishing 100 messages...
Messages confirmed: true.

现在让我们运行接收者。打开一个新的标签页。请记住,由于 first 偏移量规范,它应该从 stream 的开头开始。

./mvnw -q compile exec:java '-Dexec.mainClass=OffsetTrackingReceive'

这是输出

Started consuming...
First message received.
Done consuming, first offset 0, last offset 99.
什么是偏移量?

Stream 可以看作是一个数组,其中的元素是消息。偏移量是给定消息在数组中的索引。

Stream 与队列不同:消费者可以读取和重新读取相同的消息,并且消息会保留在 stream 中。

让我们通过使用 offset(long) 规范附加到给定的偏移量来尝试此功能。将 offsetSpecification 变量从 OffsetSpecification.first() 设置为 OffsetSpecification.offset(42)

OffsetSpecification offsetSpecification = OffsetSpecification.offset(42);

偏移量 42 是任意的,它可以是 0 到 99 之间的任何数字。再次运行接收者

./mvnw -q compile exec:java '-Dexec.mainClass=OffsetTrackingReceive'

输出如下

Started consuming...
First message received.
Done consuming, first offset 42, last offset 99.

还有一种方法可以附加到 stream 的末尾,以便仅查看消费者创建时的新消息。这就是 next 偏移量规范。让我们尝试一下

OffsetSpecification offsetSpecification = OffsetSpecification.next();

运行接收者

./mvnw -q compile exec:java '-Dexec.mainClass=OffsetTrackingReceive'

这次消费者没有收到任何消息

Started consuming...

它正在等待 stream 中的新消息。让我们通过再次运行发送者来发布一些消息。返回到第一个标签页

./mvnw -q compile exec:java '-Dexec.mainClass=OffsetTrackingSend'

等待程序退出,然后切换回接收者标签页。消费者收到了新消息

Started consuming...
First message received.
Done consuming, first offset 100, last offset 199.

由于发送者放在 stream 末尾的新标记消息,接收者停止了。

本节展示了如何“浏览” stream:从开头、从任何偏移量,甚至对于新消息。下一节介绍如何利用服务器端偏移量跟踪来恢复消费者在上次执行中停止的位置。

服务器端偏移量跟踪

RabbitMQ Streams 提供服务器端偏移量跟踪,以存储给定消费者在 stream 中的进度。如果消费者因任何原因(崩溃、升级等)停止,它将能够重新附加到之前停止的位置,以避免处理相同的消息。

RabbitMQ Streams 提供了用于偏移量跟踪的 API,但也可以使用其他解决方案来存储消费应用程序的进度。这可能取决于用例,但关系数据库也可以是一个不错的解决方案。

让我们修改接收者以存储已处理消息的偏移量。更新的行用注释标出

// start consuming at the beginning of the stream
OffsetSpecification offsetSpecification = OffsetSpecification.first();
AtomicLong messageCount = new AtomicLong(0);
environment.consumerBuilder()
.stream(stream)
.offset(offsetSpecification)
.name("offset-tracking-tutorial") // the consumer must a have name
.manualTrackingStrategy().builder() // activate manual offset tracking
.messageHandler((ctx, msg) -> {
if (firstOffset.compareAndSet(-1, ctx.offset())) {
System.out.println("First message received.");
}
if (messageCount.incrementAndGet() % 10 == 0) {
ctx.storeOffset(); // store offset every 10 messages
}
String body = new String(msg.getBodyAsBinary(), StandardCharsets.UTF_8);
if (body.equals("marker")) {
lastOffset.set(ctx.offset());
ctx.storeOffset(); // store the offset on consumer closing
ctx.consumer().close();
consumedLatch.countDown();
}
})
.build();

最相关的更改是

  • 消费者使用 OffsetSpecification.first() 附加到 stream 的开头。
  • 消费者必须有一个名称。它是存储和检索上次存储的偏移量值的键。
  • 手动跟踪策略被激活,这意味着显式调用来存储偏移量。
  • 每 10 条消息存储一次偏移量。对于偏移量存储频率来说,这是一个异常低的值,但这对于本教程来说是可以的。实际应用中的值通常在数百或数千。
  • 偏移量在关闭消费者之前存储,就在收到标记消息之后。

让我们运行更新后的接收者

./mvnw -q compile exec:java '-Dexec.mainClass=OffsetTrackingReceive'

这是输出

Started consuming...
First message received.
Done consuming, first offset 0, last offset 99.

没有什么令人惊讶的:消费者从 stream 的开头获取了消息,并在到达标记消息时停止。

让我们再次启动它

./mvnw -q compile exec:java '-Dexec.mainClass=OffsetTrackingReceive'

这是输出

Started consuming...
First message received.
Done consuming, first offset 100, last offset 199.

消费者从上次停止的位置准确地重新启动:第一次运行中的最后一个偏移量是 99,第二次运行中的第一个偏移量是 100。请注意,first 偏移量规范被忽略:存储的偏移量优先于偏移量规范参数。消费者在第一次运行中存储了偏移量跟踪信息,因此客户端库使用它来恢复在第二次运行中正确位置的消费。

这结束了关于 RabbitMQ Streams 中消费语义的教程。它涵盖了消费者如何在 stream 中的任何位置附加。消费应用程序很可能需要跟踪它们在 stream 中到达的点。他们可以使用本教程中演示的内置服务器端偏移量跟踪功能。他们也可以自由使用任何其他数据存储解决方案来完成此任务。

有关偏移量跟踪的更多信息,请参阅RabbitMQ 博客stream Java 客户端文档

© . All rights reserved.