使用 RabbitMQ Streams 进行偏移量跟踪
RabbitMQ Streams 为消费者提供服务器端偏移量跟踪。此功能允许消费应用程序在上次运行停止的位置重新开始消费。这篇文章涵盖了偏移量跟踪的语义以及它在 stream Java 客户端中的实现方式。
什么是偏移量跟踪?
偏移量跟踪是消费应用程序存储其在 stream 中的位置的过程。如果消费应用程序在 stream 中的位置是 1,000,则意味着它已处理完此位置之前的所有消息。如果应用程序停止并重新启动,它将重新连接到 stream,位置紧随其最后存储的位置之后,在本例中为 1,001,并从那里重新开始消费。
我们使用偏移量的概念来指定 stream 中的确切位置。
偏移量首先是什么?
如果我们像数组一样建模 stream,其中每个元素都是一条消息,那么偏移量只是此数组中给定消息的索引。
这种心智模型足以理解我们情况下的偏移量跟踪,但它并非完全准确。 让我们假设这个大数组中的元素并不总是消息,它可能是一些 stream 内部管理所需的信息。 一个结果是 2 条连续的消息可能不总是具有 2 个连续的偏移量值,但这在本文的上下文中并不重要。
stream 中不同的偏移量规范
绝对偏移量没有任何意义,它只是技术性的。 因此,当应用程序首次想要连接到 stream 时,它不太可能使用偏移量,它会更喜欢使用更高级别的概念,例如 stream 的开头或结尾,甚至 stream 中的某个时间点。
幸运的是,除了绝对偏移量之外,RabbitMQ Streams 还支持不同的偏移量规范:first
、last
、next
和 timestamp
。
对于 stream 的“结尾”,有 2 个偏移量规范:next
表示要写入的下一个偏移量。 如果消费者在 next
处连接到 stream 且没有人发布消息,则消费者将不会收到任何消息。 当新消息到达时,它将开始接收。
last
表示“来自最后的消息块”。 块是 RabbitMQ Streams 术语,表示“批量消息”,因为出于性能原因,消息是批量处理的。
下图显示了 stream 中的偏移量规范。
偏移量跟踪中的不同步骤
因此,应用程序通常会在首次启动时指定像 first
或 next
这样的偏移量,处理消息,并在服务器上定期(例如,每 10,000 条消息)存储偏移量值。
应用程序可能会因某些原因(例如,升级)而停止。 当它重新启动时,它将检索其最后存储的偏移量,并在该位置之后立即重新开始消费。
RabbitMQ Stream 协议提供了用于在给定 stream 上为给定应用程序存储和查找偏移量的命令,因此偏移量跟踪主要是一个客户端关注点。
偏移量跟踪应用程序要求
对于想要在 stream 中使用偏移量跟踪的应用程序有一个简单的要求:它必须使用在应用程序重启之间保持不变的跟踪引用。
指定此引用的方式取决于客户端库,更一般而言,客户端库可以具有不同的功能或方法来处理偏移量跟踪。 在此处阅读有关 stream Java 客户端中偏移量跟踪的更多信息。
服务器端偏移量跟踪的阴暗面
RabbitMQ Streams 中的服务器端偏移量跟踪非常简洁,但应谨慎使用。 偏移量跟踪信息存储在 stream 日志中。 还记得 stream 的数组表示吗? 想象一下,大多数数组元素都是消息,但有些元素包含偏移量跟踪信息,例如“应用程序 'my-application' 的偏移量 1000”。
因此,每次客户端请求为给定消费者存储偏移量时,都会在日志中创建一个小条目。 您不希望创建太多此类条目,这就是为什么我们建议每隔几千条消息存储偏移量,并避免消费者为每条消息存储偏移量。 再次强调,对于服务器端偏移量跟踪,请保持谨慎和合理。
还要记住,服务器端偏移量跟踪只是一种便利措施,绝不是允许消费者从上次停止的位置重新启动的唯一解决方案。 想象一下,消息处理是事务内的数据库操作。 将偏移量作为同一事务的一部分存储在数据库中是一个好主意,因为它确保数据更改和偏移量存储以原子方式发生。
服务器端偏移量跟踪实战
现在让我们看看如何使用 stream Java 客户端设置偏移量跟踪。
跟踪消费者
以下是一些代码,用于启动具有偏移量跟踪的消费者
AtomicInteger messageConsumed = new AtomicInteger(0);
Consumer consumer = environment.consumerBuilder()
.stream("offset-tracking-stream") // the stream to consume from
.offset(OffsetSpecification.first()) // start consuming at the beginning
.name("my-application") // the name (reference) of the consumer
.manualTrackingStrategy() // tracking is done in application code
.builder()
.messageHandler((context, message) -> {
// ... message processing ...
// condition to store the offset: every 10,000 messages
if (messageConsumed.incrementAndGet() % 10_000 == 0) {
context.storeOffset(); // store the message offset
}
// ...
})
.build();
如果您想回顾一下 stream Java 客户端 API,您可以阅读RabbitMQ Streams 第一个应用程序。
以下是此代码片段中的关键点
- 消费者必须有一个名称才能启用偏移量跟踪。 它在存储偏移量值时使用其名称作为跟踪引用。
- 当消费者重新启动并且存在为其存储的偏移量值时,将忽略此规范。 消费者使用
OffsetSpecification.first()
从 stream 的开头开始消费。 - 应用程序代码使用
Context#storeOffset()
显式处理跟踪。Consumer#store(long)
方法是存储偏移量的另一种可能性。
您可以看到偏移量跟踪如何依赖于客户端库。 如果您不想在代码中处理偏移量跟踪,stream Java 客户端还提供了一个自动偏移量跟踪策略。
如您所见,当理解了要求和语义时,偏移量跟踪非常容易。 现在让我们运行示例。
设置示例项目
运行示例需要安装 Docker、Git 和 Java 8 或更高版本。 您可以使用以下命令启动 broker
docker run -it --rm --name rabbitmq -p 5552:5552 \
-e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS='-rabbitmq_stream advertised_host localhost' \
rabbitmq:3.9
然后您需要启用 stream 插件
docker exec rabbitmq rabbitmq-plugins enable rabbitmq_stream
代码托管在 GitHub 上。 以下是如何克隆存储库
git clone https://github.com/acogoluegnes/rabbitmq-streams-blog-posts.git
cd rabbitmq-streams-blog-posts
在示例项目中,我们将
- 发布第一波消息
- 启动消费者,它将消费消息,定期存储偏移量,并在第一波结束时停止(感谢 poison message)
- 发布第二波消息
- 再次启动消费者,并确保它从上次停止的位置(第一波的结尾)重新启动,并且仅消费来自第二波的消息
发布第一波消息
使用以下命令发布第一批消息
./mvnw -q compile exec:java -Dexec.mainClass='com.rabbitmq.stream.OffsetTracking$PublishFirstWave'
您应该在控制台上看到以下内容
Connecting...
Connected
Creating stream...
Stream created
Creating producer...
Producer created
Sending 500,000 messages
Messages sent, waiting for confirmation...
All messages confirmed? yes (928 ms)
Closing environment...
Environment closed
该程序发布 500,000 条消息,消息体都相同,都是 first wave
,除了最后一条消息是 poison message,用于停止消费。
消费第一波消息
启动消费者
./mvnw -q compile exec:java -Dexec.mainClass='com.rabbitmq.stream.OffsetTracking$Consume'
您将得到
Connecting...
Connected
Start consumer...
Consumed 500,000 messages in 364 ms (bodies: poison, first wave)
Closing environment...
Environment closed
消费者的行为符合预期:它从头开始,因为它消费了 500,000 条消息。 消费者还记录了它在一组中获得的所有消息体,它获得了 first wave
消息和 poison
消息,很好。
发布第二波消息
让我们发布另一波消息
./mvnw -q compile exec:java -Dexec.mainClass='com.rabbitmq.stream.OffsetTracking$PublishSecondWave'
这次我们得到
Connecting...
Connected
Creating producer...
Producer created
Sending 100,000 messages
Messages sent, waiting for confirmation...
All messages confirmed? yes (312 ms)
Closing environment...
Environment closed
好的,stream 中增加了 100,000 条消息。
消费第二波消息
让我们重新启动消费者
./mvnw -q compile exec:java -Dexec.mainClass='com.rabbitmq.stream.OffsetTracking$Consume'
我们得到
Connecting...
Connected
Start consumer...
Consumed 100,000 messages in 215 ms (bodies: poison, second wave)
Closing environment...
Environment closed
太棒了! 即使消费者应该从 stream 的开头开始,它也检测到存在存储的偏移量,因此它使用它来正确地重新启动。 它只消费了第二波消息。
总结
RabbitMQ Streams 为消费者提供服务器端偏移量跟踪。 消费者实例必须具有在重启之间保持不变的名称(或跟踪引用)。 客户端应用程序定期存储偏移量,其完成方式取决于客户端库。
偏移量跟踪对于需要从上次停止的位置重新启动的应用程序至关重要。
stream Java 客户端对偏移量跟踪具有出色的支持,其文档深入介绍了它。