跳至主要内容

使用 RabbitMQ Streams 进行偏移量跟踪

·阅读 8 分钟

RabbitMQ Streams 为消费者提供服务器端偏移量跟踪功能。此功能允许消费应用程序从上次运行中断的地方重新开始消费。本文介绍了偏移量跟踪的语义以及如何在流式 Java 客户端中实现它。

什么是偏移量跟踪?

偏移量跟踪是消费应用程序存储其在流中的位置的过程。如果消费应用程序在流中的位置是 1,000,则表示它已处理了直到此位置的所有消息。如果应用程序停止并重新启动,它将从其上次存储的位置(在我们的示例中为 1,001)之后重新连接到流,并从那里重新开始消费。

我们使用偏移量的概念来指定流中的精确位置。

首先,什么是偏移量?

如果我们将流建模为一个数组,其中每个元素都是一条消息,那么偏移量就是此数组中给定消息的索引。

A stream can be represented as an array. The offset is the index of an element in the array.
流可以表示为一个数组。偏移量是数组中元素的索引。

这种思维方式足以让我们理解我们这种情况下的偏移量跟踪,但它并不完全准确。让我们假设此大型数组中的元素并不总是消息,它可能是流维护所需的一些信息。结果是,两条连续的消息可能并不总是有两个连续的偏移量值,但这在本文的上下文中并不重要。

流中不同的偏移量规范

绝对偏移量没有任何意义,它只是一个技术细节。因此,当应用程序想要首次连接到流时,它不太可能使用偏移量,而是更倾向于使用更高级别的概念,例如流的开头结尾,甚至流中的某个时间点

幸运的是,除了绝对偏移量之外,RabbitMQ Streams 还支持不同的偏移量规范firstlastnexttimestamp

有两种偏移量规范用于流的“结尾”:next 表示下一个要写入的偏移量。如果消费者以 next 连接到流并且没有人发布消息,则消费者将不会收到任何消息。当有新消息进来时,它将开始接收。

last 表示“来自最后的消息块”。块是 RabbitMQ Streams 中用于“消息批次”的术语,因为出于性能原因,消息是批量处理的。

下图显示了流中的偏移量规范。

Different offset specifications in a stream with 2 chunks.
包含 2 个块的流中的不同偏移量规范。

偏移量跟踪的不同步骤

因此,应用程序通常会在首次启动时指定 firstnext 之类的偏移量,处理消息,并在服务器上定期(例如每 10,000 条消息)存储偏移量值。

应用程序可能会因某些原因停止(例如升级)。当它重新启动时,它将检索其上次存储的偏移量,并在该位置之后重新开始消费。

RabbitMQ Stream 协议提供命令来为给定流上的给定应用程序存储和查找偏移量,因此偏移量跟踪主要是客户端关注的问题。

偏移量跟踪应用程序要求

对于想要在流中使用偏移量跟踪的应用程序,有一个简单的要求:它必须使用一个在应用程序重启之间保持不变的跟踪引用

指定此引用的方法取决于客户端库,更一般地说,客户端库可以有不同的功能或方法来处理偏移量跟踪。在此处阅读有关流式 Java 客户端中偏移量跟踪的更多信息 此处

服务器端偏移量跟踪的阴暗面

RabbitMQ Streams 中的服务器端偏移量跟踪非常简洁,但应谨慎使用。偏移量跟踪信息存储在流日志中。还记得流的数组表示形式吗?假设数组的大多数元素都是消息,但有些包含偏移量跟踪信息,例如“应用程序“my-application”的偏移量 1000”。

因此,每次客户端请求为给定消费者存储偏移量时,都会在日志中创建一个小的条目。您不希望创建太多这样的条目,因此我们建议每隔几千条消息存储一次偏移量,并避免消费者为每条消息都存储偏移量。再次强调,在使用服务器端偏移量跟踪时,请谨慎和合理。

还要记住,服务器端偏移量跟踪只是一个工具,绝不是允许消费者从中断处重新开始的唯一解决方案。假设消息处理是在事务内的数据库操作。将偏移量存储在数据库中作为同一事务的一部分是一个好主意,因为它确保数据更改和偏移量存储以原子方式发生。

服务器端偏移量跟踪的实际应用

现在让我们看看如何使用流式 Java 客户端设置偏移量跟踪。

跟踪消费者

以下是一些启动带有偏移量跟踪的消费者的代码

AtomicInteger messageConsumed = new AtomicInteger(0);
Consumer consumer = environment.consumerBuilder()
.stream("offset-tracking-stream") // the stream to consume from
.offset(OffsetSpecification.first()) // start consuming at the beginning
.name("my-application") // the name (reference) of the consumer
.manualTrackingStrategy() // tracking is done in application code
.builder()
.messageHandler((context, message) -> {
// ... message processing ...

// condition to store the offset: every 10,000 messages
if (messageConsumed.incrementAndGet() % 10_000 == 0) {
context.storeOffset(); // store the message offset
}
// ...
})
.build();

如果您想提醒一下流式 Java 客户端 API,您可以阅读 RabbitMQ Streams 首个应用程序

此代码段中的关键点如下

  • 消费者**必须**具有名称才能启用偏移量跟踪。它在存储偏移量值时使用其名称作为跟踪引用。
  • 消费者从流的开头开始消费,使用 OffsetSpecification.first()。当消费者重新启动并且存在其存储的偏移量值时,此规范将被忽略。
  • 应用程序代码使用 Context#storeOffset() 显式处理跟踪。Consumer#store(long) 方法是存储偏移量的另一种可能性。

您可以看到偏移量跟踪如何依赖于客户端库。如果您不想在代码中处理偏移量跟踪,流式 Java 客户端还提供了一种 自动偏移量跟踪策略

如您所见,在理解了要求和语义之后,偏移量跟踪非常简单。现在让我们运行示例。

设置示例项目

运行示例需要安装 Docker、Git 和 Java 8 或更高版本。您可以使用以下命令启动代理

docker run -it --rm --name rabbitmq -p 5552:5552 \
-e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS='-rabbitmq_stream advertised_host localhost' \
rabbitmq:3.9

然后,您需要启用流插件

docker exec rabbitmq rabbitmq-plugins enable rabbitmq_stream

代码托管在 GitHub 上。以下是如何克隆存储库

git clone https://github.com/acogoluegnes/rabbitmq-streams-blog-posts.git
cd rabbitmq-streams-blog-posts

在示例项目中,我们将

  • 发布第一波消息
  • 启动消费者,它将消费消息,定期存储偏移量,并在第一波结束时停止(感谢毒药消息)
  • 发布第二波消息
  • 再次启动消费者,并确保它从中断处重新开始(在第一波结束时),只消费第二波的消息

发布第一波消息

使用以下命令发布第一批消息

./mvnw -q compile exec:java -Dexec.mainClass='com.rabbitmq.stream.OffsetTracking$PublishFirstWave'

您应该在控制台上看到以下内容

Connecting...
Connected
Creating stream...
Stream created
Creating producer...
Producer created
Sending 500,000 messages
Messages sent, waiting for confirmation...
All messages confirmed? yes (928 ms)
Closing environment...
Environment closed

该程序发布了 500,000 条具有相同 first wave 主体的消息,除了最后一条消息,它是一条毒药消息,用于停止消费。

消费第一波消息

启动消费者

./mvnw -q compile exec:java -Dexec.mainClass='com.rabbitmq.stream.OffsetTracking$Consume'

您将看到

Connecting...
Connected
Start consumer...
Consumed 500,000 messages in 364 ms (bodies: poison, first wave)
Closing environment...
Environment closed

消费者的行为符合预期:它从开头开始,因为它消费了 500,000 条消息。消费者还将其获取的所有主体记录在一个集合中,它获取了 first wave 消息和 poison 消息,很好。

发布第二波消息

让我们发布另一波消息

./mvnw -q compile exec:java -Dexec.mainClass='com.rabbitmq.stream.OffsetTracking$PublishSecondWave'

这次我们得到

Connecting...
Connected
Creating producer...
Producer created
Sending 100,000 messages
Messages sent, waiting for confirmation...
All messages confirmed? yes (312 ms)
Closing environment...
Environment closed

好的,流中又增加了 100,000 条消息。

消费第二波消息

让我们重新启动消费者

./mvnw -q compile exec:java -Dexec.mainClass='com.rabbitmq.stream.OffsetTracking$Consume'          

我们得到

Connecting...
Connected
Start consumer...
Consumed 100,000 messages in 215 ms (bodies: poison, second wave)
Closing environment...
Environment closed

不错!即使消费者应该从流的开头开始,它也检测到存在存储的偏移量,因此它使用它来正确地重新启动。它只消费了第二波消息。

总结

RabbitMQ Streams 为消费者提供服务器端偏移量跟踪功能。消费者实例必须具有一个在重启之间保持不变的名称(或跟踪引用)。客户端应用程序定期存储偏移量,存储方式取决于客户端库。

偏移量跟踪对于需要从中断处重新开始的应用程序至关重要。

Stream Java 客户端对偏移量跟踪提供了极佳的支持,其文档深入介绍了该功能。

© 2024 RabbitMQ. All rights reserved.