RabbitMQ Stream 教程 - 偏移量跟踪
简介
先决条件
本教程假定 RabbitMQ 已安装,正在localhost上运行,并且stream 插件已启用。标准的 stream 端口是 5552。如果您使用不同的主机、端口或凭据,则需要调整连接设置。
使用 Docker
如果您没有安装 RabbitMQ,可以在 Docker 容器中运行它
docker run -it --rm --name rabbitmq -p 5552:5552 -p 15672:15672 -p 5672:5672 \
-e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS='-rabbitmq_stream advertised_host localhost' \
rabbitmq:4-management
等待服务器启动,然后启用 stream 和 stream management 插件
docker exec rabbitmq rabbitmq-plugins enable rabbitmq_stream rabbitmq_stream_management
哪里寻求帮助
如果您在学习本教程时遇到困难,可以通过邮件列表或Discord 社区服务器与我们联系。
RabbitMQ Streams 在 RabbitMQ 3.9 中引入。更多信息请参见此处。
偏移量跟踪
设置
本教程的这一部分将编写两个 Javascript 程序:一个生产者,它发送一连串带有标记消息的消息,还有一个消费者,它接收消息并在收到标记消息时停止。它展示了消费者如何导航消息流,甚至可以在之前的执行中断处重新开始。
本教程使用 流 Javascript 客户端。请务必按照第一个教程中的 设置步骤 进行操作。
本教程的可执行版本可在 RabbitMQ 教程仓库 中找到。发送程序名为 offset_tracking_send.js,接收程序名为 offset_tracking_receive.js。本教程侧重于客户端库的使用,因此应使用仓库中的最终代码来创建文件的框架(例如,导入、主函数等)。
发送
在 offset_tracking_send.js 中,我们需要添加客户端。
const rabbit = require("rabbitmq-stream-js-client");
然后我们可以创建一个到服务器的连接和一个流。
const client = await rabbit.connect({
hostname: "localhost",
port: 5552,
username: "guest",
password: "guest",
vhost: "/",
});
console.log("Making sure the stream exists...");
const streamSizeRetention = 5 * 1e9;
const streamName = "stream-offset-tracking-javascript";
await client.createStream({ stream: streamName, arguments: { "max-length-bytes": streamSizeRetention } });
该程序随后创建一个 producer 并发布 100 条消息。最后一条消息的正文设置为 marker;这是 consumer 停止消耗的标记。
console.log("Creating the publisher...");
const publisher = await client.declarePublisher({ stream: streamName });
const messageCount = 100;
console.log(`Publishing ${messageCount} messages`);
for (let i = 0; i < messageCount; i++) {
const body = i === messageCount - 1 ? "marker" : `hello ${i}`;
await publisher.send(Buffer.from(body));
}
对于您在发送消息时遇到的任何问题,我建议您在此处 查看 一些解决方案。
现在,让我们创建接收程序。
接收
接收程序 offset_tracking_receive.js 添加了客户端,也创建了到服务器的连接和流。这部分代码与发送程序相同,因此为简洁起见,在接下来的代码片段中省略了。
接收程序启动一个消费者,该消费者附加到流的开头(rabbit.Offset.first())。它使用变量在程序结束时输出接收到的第一条和最后一条消息的偏移量。
当收到标记消息时,消费者停止:它将偏移量分配给一个变量,并将消息偏移量存储在服务器上。与发送方一样,当消费者完成其工作时,通道会告知程序继续进行。
const startFrom = rabbit.Offset.first();
let firstOffset = startFrom.value;
let lastOffset = startFrom.value;
let messageCount = 0;
const consumer = await client.declareConsumer({ stream: streamName, offset: startFrom }, (message) => {
messageCount++;
if (messageCount === 1) {
console.log("First message received");
firstOffset = message.offset;
}
if (message.content.toString() === "marker") {
console.log("Marker found");
lastOffset = message.offset;
console.log(`Done consuming, first offset was ${firstOffset}, last offset was ${lastOffset}`);
}
});
console.log(`Start consuming...`);
await sleep(2000);
探索流
要运行这两个示例,请打开两个终端(shell)标签页。
在第一个选项卡中,运行发送程序以发布一系列消息。
npm run offset-tracking-publish
输出如下:
Connecting...
Making sure the stream exists...
Creating the publisher...
Publishing 100 messages
Closing the connection...
done!
现在让我们运行接收程序。打开一个新标签页。请记住,由于 first 偏移量规范,它应该从流的开头开始。
npm run offset-tracking-receive
这是输出:
Connecting...
First message received
Start consuming...
Marker found
Done consuming, first offset was 0, last offset was 99
流可以看作是一个包含消息的数组。偏移量是数组中给定消息的索引。
流与队列不同:消费者可以读取和重读相同的消息,并且消息会保留在流中。
让我们尝试使用 offset(bigint) 规范附加到给定偏移量来使用此功能。将 startFrom 变量从 rabbit.Offset.first() 设置为 rabbit.Offset.offset(42)。
const startFrom = rabbit.Offset.offset(42n);
偏移量 42 是任意的,它可以是 0 到 99 之间的任何数字。再次运行接收程序。
npm run offset-tracking-receive
输出如下:
Connecting...
Start consuming...
First message received
Marker found
Done consuming, first offset was 42, last offset was 99
还有一种方法可以附加到流的末尾,以便在创建消费者时只查看新消息。这就是 next 偏移量规范。让我们试试。
const startFrom = rabbit.Offset.next();
运行接收程序。
npm run offset-tracking-receive
这次消费者没有收到任何消息。
Connecting...
Start consuming...
它正在等待流中的新消息。通过再次运行发送程序来发布一些消息。回到第一个选项卡。
npm run offset-tracking-publish
等待程序退出,然后切换回接收程序选项卡。消费者收到了新消息。
First message received
Marker found
Done consuming, first offset was 100, last offset was 199
接收程序因为发送程序将其放在流末尾的新标记消息而停止。
本节展示了如何“浏览”流:从开头、从任何偏移量,甚至对于新消息。下一节将介绍如何利用服务器端偏移量跟踪,以便从消费者前一次执行的中断处恢复。
服务器端偏移量跟踪
RabbitMQ Streams 提供服务器端偏移量跟踪,用于存储流中给定消费者的进度。如果消费者因任何原因停止(崩溃、升级等),它将能够从先前停止的位置重新连接,以避免处理相同的消息。
RabbitMQ Streams 提供了偏移量跟踪的 API,但也可以使用其他解决方案来存储正在消耗的应用程序的进度。这可能取决于用例,但关系型数据库也是一个不错的解决方案。
让我们修改接收程序以存储已处理消息的偏移量。已更新的行用注释标出。
// start consuming at the beginning of the stream
const consumerRef = "offset-tracking-tutorial"; // the consumer must a have name
let firstOffset = undefined;
let offsetSpecification = rabbit.Offset.first();
try {
const offset = await client.queryOffset({ reference: consumerRef, stream: streamName }); // take the offset stored on the server if it exists
offsetSpecification = rabbit.Offset.offset(offset + 1n); // start from the message after 'marker'
} catch (e) {}
let lastOffset = offsetSpecification.value;
let messageCount = 0;
const consumer = await client.declareConsumer(
{ stream: streamName, offset: offsetSpecification, consumerRef },
async (message) => {
messageCount++;
if (!firstOffset && messageCount === 1) {
firstOffset = message.offset;
console.log("First message received");
}
if (messageCount % 10 === 0) {
await consumer.storeOffset(message.offset); // store offset every 10 messages
}
if (message.content.toString() === "marker") {
console.log("Marker found");
lastOffset = message.offset;
await consumer.storeOffset(message.offset); // store the offset on consumer closing
await consumer.close(true);
}
}
);
console.log(`Start consuming...`);
await sleep(2000);
console.log(`Done consuming, first offset was ${firstOffset}, last offset was ${lastOffset}`);
process.exit(0);
最重要的更改是
- 首次启动时,消费者使用
rabbit.Offset.first()附加到流的开头。 - 消费者必须有一个名称。它是存储和检索最后一个存储的偏移量值的关键。
- 偏移量每 10 条消息存储一次。对于偏移量存储频率而言,这是一个异常低的值,但对于本教程来说没问题。实际世界中的值通常是几百或几千。
- 偏移量在关闭消费者之前存储,就在收到标记消息之后。
现在让我们运行更新后的接收程序。
npm run offset-tracking-receive
这是输出:
Connecting...
Start consuming...
First message received
Marker found
Done consuming, first offset was 0, last offset was 99
这没什么令人惊讶的:消费者从流的开头获取了消息,并在到达标记消息时停止。
让我们再发布一批 100 条消息,然后再次启动接收器。
npm run offset-tracking-publish
npm run offset-tracking-receive
这是输出:
Connecting...
Start consuming...
Marker found
Done consuming, first offset was 100, last offset was 201
消费者正好从上次中断的地方恢复:第一次运行的最后一个偏移量是 99,这次第二次运行的第一个偏移量是 100。请注意,第二次运行时,偏移量规范是通过 queryOffset 方法获取的。消费者在第一次运行时存储了偏移量跟踪信息,因此客户端库提供了检索这些信息以在第二次运行时恢复到正确位置消费的可能性。
本教程关于 RabbitMQ Streams 中的消费语义的内容到此结束。它涵盖了消费者如何附加到流中的任何位置。消费应用程序很可能需要跟踪它们在流中达到的点。它们可以使用本教程中演示的内置服务器端偏移量跟踪功能。它们也可以自由使用任何其他数据存储解决方案来完成此任务。
有关偏移量跟踪的更多信息,请参阅 RabbitMQ 博客 和 流 Javascript nodejs 客户端文档。