使用 RabbitMQ Streams 进行偏移量跟踪
RabbitMQ Streams 为消费者提供服务器端偏移量跟踪。此功能允许消耗应用程序在上次中断的地方恢复消耗。本文介绍偏移量跟踪的语义以及它在 Streams Java 客户端中的实现方式。
什么是偏移量跟踪?
偏移量跟踪是指消费应用程序存储其在流中的位置的过程。如果一个消费应用程序在流中的位置是 1,000,这意味着它已经处理了直到该位置的所有消息。如果应用程序停止并重新启动,它将从其最后存储的位置之后重新连接到流,也就是我们例子中的 1,001,并从那里重新开始消费。
我们使用偏移量的概念来指定流中的确切位置。
首先,什么是偏移量?
如果我们像一个数组一样模拟一个流,其中每个元素都是一条消息,那么偏移量就是该数组中给定消息的索引。
这种心理模型足以理解我们这里的偏移量跟踪,但它并不完全准确。我们姑且认为这个大数组中的一个元素不总是消息,它可能是一些流维护所需的信息。其结果是,两个连续的消息可能不总是具有两个连续的偏移量值,但这对于本文的讨论很重要。
流中不同的偏移量规格
绝对偏移量没有实际意义,它只是技术性的。因此,当应用程序第一次想要连接到一个流时,它不太可能使用偏移量,而是更喜欢使用像流的beginning(开头)或end(结尾)这样的高级概念,甚至流中的某个point in time(时间点)。
幸运的是,RabbitMQ Stream 除了绝对偏移量之外,还支持不同的偏移量规格:first、last、next 和 timestamp。
对于流的“末尾”,有两种偏移量规格:next 表示下一个将要写入的偏移量。如果一个消费者连接到一个流的 next 位置,并且没有人发布消息,那么该消费者将不会收到任何消息。当新消息进来时,它才会开始接收。
last 表示“从最后一批消息开始”。批次(chunk)是 RabbitMQ Streams 中的一个术语,指的是“消息集合”,因为出于性能原因,消息是批量处理的。
下图显示了流中的偏移量规格。
偏移量跟踪的不同步骤
因此,应用程序通常会在首次启动时指定一个偏移量,例如 first 或 next,然后处理消息,并定期(例如,每 10,000 条消息)在服务器上存储一个偏移量值。
应用程序可能因为某些原因停止(例如,升级)。当它重新启动时,它将检索其最后存储的偏移量,并从该位置之后重新开始消费。
RabbitMQ Stream 协议提供了命令,用于存储和查找给定应用程序在给定流上的偏移量,因此偏移量跟踪主要是一个客户端关注的问题。
偏移量跟踪应用程序要求
希望在流中使用偏移量跟踪的应用程序有一个简单的要求:它必须使用一个在应用程序重启之间保持不变的跟踪引用。
指定此引用的方式取决于客户端库,更普遍地说,客户端库可能有不同的功能或方法来处理偏移量跟踪。在此处 阅读有关流 Java 客户端中偏移量跟踪的更多信息。
服务器端偏移量跟踪的阴暗面
RabbitMQ Streams 中的服务器端偏移量跟踪非常有用,但仍需谨慎使用。偏移量跟踪信息存储在流日志中。还记得流的数组表示吗?想象一下,这个大数组中的大多数元素都是消息,但有些包含偏移量跟踪信息,例如“应用程序 'my-application' 的偏移量 1000”。
因此,每次客户端请求为给定消费者存储偏移量时,都会在日志中创建一个小条目。您不希望创建太多此类条目,这就是我们建议每隔几千条消息存储一次偏移量,并避免消费者为每条消息存储偏移量。再次强调,在使用服务器端偏移量跟踪时,请谨慎并合理。
同时也要记住,服务器端偏移量跟踪只是一种便利功能,绝不是让消费者从上次中断的地方恢复的唯一解决方案。想象一下消息处理是一个数据库事务内的操作。将偏移量作为同一事务的一部分存储在数据库中是一个好主意,因为它确保了数据更改和偏移量存储是原子发生的。
服务器端偏移量跟踪实战
现在让我们看看如何使用 Stream Java 客户端设置偏移量跟踪。
跟踪消费者
以下是一些启动带有偏移量跟踪的消费者的代码
AtomicInteger messageConsumed = new AtomicInteger(0);
Consumer consumer = environment.consumerBuilder()
.stream("offset-tracking-stream") // the stream to consume from
.offset(OffsetSpecification.first()) // start consuming at the beginning
.name("my-application") // the name (reference) of the consumer
.manualTrackingStrategy() // tracking is done in application code
.builder()
.messageHandler((context, message) -> {
// ... message processing ...
// condition to store the offset: every 10,000 messages
if (messageConsumed.incrementAndGet() % 10_000 == 0) {
context.storeOffset(); // store the message offset
}
// ...
})
.build();
如果您想回顾 Stream Java 客户端 API,您可以阅读 RabbitMQ Streams 第一个应用程序。
以下是此代码片段中的关键点
- 消费者必须有一个名称才能启用偏移量跟踪。它使用其名称作为存储偏移量值时的跟踪引用。
- 消费者使用
OffsetSpecification.first()从流的开头开始消费。当消费者重启并且存在为其存储的偏移量值时,此规格将被忽略。 - 应用程序代码通过
Context#storeOffset()显式处理跟踪。Consumer#store(long)方法是存储偏移量的另一种选择。
您可以看到偏移量跟踪如何依赖于客户端库。Stream Java 客户端还提供了一个 自动偏移量跟踪策略,如果您不想在代码中处理偏移量跟踪。
正如您所见,当理解了要求和语义后,偏移量跟踪就变得非常容易。现在让我们运行示例。
设置示例项目
运行示例需要安装 Docker、Git 和 Java 8 或更高版本。您可以使用以下命令启动代理:
docker run -it --rm --name rabbitmq -p 5552:5552 \
-e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS='-rabbitmq_stream advertised_host localhost' \
rabbitmq:3.9
然后您需要启用 Streams 插件:
docker exec rabbitmq rabbitmq-plugins enable rabbitmq_stream
代码 托管在 GitHub 上。以下是如何克隆存储库
git clone https://github.com/acogoluegnes/rabbitmq-streams-blog-posts.git
cd rabbitmq-streams-blog-posts
在示例项目中,我们将
- 发布第一批消息
- 启动消费者,它将消耗这些消息,定期存储偏移量,并在第一批消息结束时停止(感谢毒丸消息)
- 发布第二批消息
- 再次启动消费者,并确保它从上次中断的地方(第一批消息的末尾)重新启动,并且只消耗第二批消息
发布第一批消息
使用以下命令发布第一批消息
./mvnw -q compile exec:java -Dexec.mainClass='com.rabbitmq.stream.OffsetTracking$PublishFirstWave'
您应该在控制台中看到以下内容
Connecting...
Connected
Creating stream...
Stream created
Creating producer...
Producer created
Sending 500,000 messages
Messages sent, waiting for confirmation...
All messages confirmed? yes (928 ms)
Closing environment...
Environment closed
该程序发布了 500,000 条消息,它们都具有相同的“first wave”正文,除了最后一条消息是用于停止消费的毒丸消息。
消费第一批消息
启动消费者
./mvnw -q compile exec:java -Dexec.mainClass='com.rabbitmq.stream.OffsetTracking$Consume'
您得到
Connecting...
Connected
Start consumer...
Consumed 500,000 messages in 364 ms (bodies: poison, first wave)
Closing environment...
Environment closed
消费者表现符合预期:它从头开始,因为它消费了 500,000 条消息。消费者还将其收到的所有正文记录在一个集合中,它收到了“first wave”消息和“poison”消息,很好。
发布第二批消息
让我们再发布一批消息
./mvnw -q compile exec:java -Dexec.mainClass='com.rabbitmq.stream.OffsetTracking$PublishSecondWave'
这次我们得到
Connecting...
Connected
Creating producer...
Producer created
Sending 100,000 messages
Messages sent, waiting for confirmation...
All messages confirmed? yes (312 ms)
Closing environment...
Environment closed
好的,流中增加了 100,000 条消息。
消费第二批消息
让我们重新启动消费者
./mvnw -q compile exec:java -Dexec.mainClass='com.rabbitmq.stream.OffsetTracking$Consume'
我们得到
Connecting...
Connected
Start consumer...
Consumed 100,000 messages in 215 ms (bodies: poison, second wave)
Closing environment...
Environment closed
太好了!尽管消费者本应从流的开头开始,但它检测到有已存储的偏移量,因此它利用该偏移量正确地重新启动。它只消费了第二批消息。
总结
RabbitMQ Streams 为消费者提供服务器端偏移量跟踪。消费者实例必须有一个名称(或跟踪引用),该名称在重启之间保持不变。客户端应用程序会定期存储偏移量,其实现方式取决于客户端库。
对于需要从上次中断处恢复的应用程序来说,偏移量跟踪至关重要。
Stream Java 客户端对偏移量跟踪提供了出色的支持,其 文档 深入介绍了它。
