跳至主要内容

RabbitMQ 流教程 - 偏移量跟踪

简介

信息

先决条件

本教程假设 RabbitMQ 已安装,并在 localhost 上运行,并且已启用流插件标准流端口 为 5552。如果您使用不同的主机、端口或凭据,则连接设置需要进行调整。

使用 Docker

如果您没有安装 RabbitMQ,可以在 Docker 容器中运行它

docker run -it --rm --name rabbitmq -p 5552:5552 -p 15672:15672 -p 5672:5672  \
-e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS='-rabbitmq_stream advertised_host localhost' \
rabbitmq:3.13

等待服务器启动,然后启用流和流管理插件

docker exec rabbitmq rabbitmq-plugins enable rabbitmq_stream rabbitmq_stream_management 

获取帮助

如果您在学习本教程时遇到问题,可以通过邮件列表Discord 社区服务器 联系我们。

RabbitMQ Streams 在 RabbitMQ 3.9 中引入。更多信息请访问此处

偏移量跟踪

设置

本教程的一部分包括用 JavaScript 编写两个程序;一个生产者,它发送一系列消息,并在末尾添加一个标记消息;一个消费者,它接收消息,并在收到标记消息时停止。它展示了消费者如何浏览流,甚至可以在之前的执行中从中断的地方重新开始。

本教程使用流 JavaScript 客户端。请确保按照第一个教程中的设置步骤 操作。

本教程的可执行版本可以在RabbitMQ 教程存储库 中找到。发送程序称为 offset_tracking_send.js,接收程序称为 offset_tracking_receive.js。本教程重点介绍客户端库的使用,因此应使用存储库中的最终代码来创建文件的脚手架(例如导入、主函数等)。

发送

offset_tracking_send.js 中,我们需要添加客户端

const rabbit = require("rabbitmq-stream-js-client");

然后我们可以创建到服务器的连接和一个流

const client = await rabbit.connect({
hostname: "localhost",
port: 5552,
username: "guest",
password: "guest",
vhost: "/",
});

console.log("Making sure the stream exists...");
const streamSizeRetention = 5 * 1e9;
const streamName = "stream-offset-tracking-javascript";
await client.createStream({ stream: streamName, arguments: { "max-length-bytes": streamSizeRetention } });

程序然后创建一个 producer 并发布 100 条消息。最后一条消息的主体值设置为 marker;这是 consumer 停止消费的标记。

console.log("Creating the publisher...");
const publisher = await client.declarePublisher({ stream: streamName });

const messageCount = 100;
console.log(`Publishing ${messageCount} messages`);
for (let i = 0; i < messageCount; i++) {
const body = i === messageCount - 1 ? "marker" : `hello ${i}`;
await publisher.send(Buffer.from(body));
}

如果您在发送消息时遇到任何问题,建议您查看一些解决方案此处

现在让我们创建接收程序。

接收

接收程序offset_tracking_receive.js 添加客户端,创建到服务器的连接和一个流。此代码部分与发送程序相同,因此为了简洁起见,在接下来的代码片段中省略了它。

接收程序启动一个消费者,该消费者从流的开头(rabbit.Offset.first())开始附加。它使用变量在程序结束时输出接收到的第一条和最后一条消息的偏移量。

消费者在收到标记消息时停止:它将偏移量分配给一个变量,并将消息偏移量存储在服务器上。与发送者一样,通道告诉程序在消费者完成其工作时继续执行。

const startFrom = rabbit.Offset.first();
let firstOffset = startFrom.value;
let lastOffset = startFrom.value;
let messageCount = 0;
const consumer = await client.declareConsumer({ stream: streamName, offset: startFrom }, (message) => {
messageCount++;
if (messageCount === 1) {
console.log("First message received");
firstOffset = message.offset;
}
if (message.content.toString() === "marker") {
console.log("Marker found");
lastOffset = message.offset;
console.log(`Done consuming, first offset was ${firstOffset}, last offset was ${lastOffset}`);
}
});

console.log(`Start consuming...`);
await sleep(2000);

探索流

为了运行这两个示例,请打开两个终端(shell)选项卡。

在第一个选项卡中,运行发送者以发布一系列消息

npm run offset-tracking-publish

输出如下

Connecting...
Making sure the stream exists...
Creating the publisher...
Publishing 100 messages
Closing the connection...
done!

现在让我们运行接收者。打开一个新选项卡。请记住,它应该从流的开头开始,因为使用了 first 偏移量规范。

npm run offset-tracking-receive

以下是输出

Connecting...
First message received
Start consuming...
Marker found
Done consuming, first offset was 0, last offset was 99
什么是偏移量?

流可以看作一个数组,其中元素是消息。偏移量是给定消息在数组中的索引。

流与队列不同:消费者可以读取和重新读取相同的消息,并且消息保留在流中。

让我们尝试使用 offset(bigint) 规范在给定偏移量处附加来使用此功能。将 startFrom 变量从 rabbit.Offset.first() 设置为 rabbit.Offset.offset(42)

const startFrom = rabbit.Offset.offset(42n);

偏移量 42 是任意的,它可以是 0 到 99 之间的任何数字。再次运行接收者

npm run offset-tracking-receive

输出如下

Connecting...
Start consuming...
First message received
Marker found
Done consuming, first offset was 42, last offset was 99

还有一种方法可以附加到流的末尾,以便仅在消费者创建时查看新消息。这是 next 偏移量规范。让我们尝试一下

const startFrom = rabbit.Offset.next();

运行接收者

npm run offset-tracking-receive

这次消费者没有收到任何消息

Connecting...
Start consuming...

它正在等待流中的新消息。让我们通过再次运行发送者来发布一些消息。返回到第一个选项卡

npm run offset-tracking-publish

等待程序退出,然后切换回接收者选项卡。消费者收到了新消息

First message received
Marker found
Done consuming, first offset was 100, last offset was 199

接收者停止是因为发送者在流的末尾放置了新的标记消息。

本节展示了如何“浏览”流:从开头、从任何偏移量,甚至查看新消息。下一节介绍如何利用服务器端偏移量跟踪来恢复消费者在之前执行中中断的地方。

服务器端偏移量跟踪

RabbitMQ Streams 提供服务器端偏移量跟踪,以在流中存储给定消费者的进度。如果消费者因任何原因停止(崩溃、升级等),它将能够从之前停止的地方重新附加,以避免处理相同的消息。

RabbitMQ Streams 提供了用于偏移量跟踪的 API,但可以使用其他解决方案来存储消费应用程序的进度。这可能取决于用例,但关系数据库也可以是一个不错的解决方案。

让我们修改接收者以存储已处理消息的偏移量。更新的行用注释突出显示

// start consuming at the beginning of the stream
const consumerRef = "offset-tracking-tutorial"; // the consumer must a have name
let firstOffset = undefined;
let offsetSpecification = rabbit.Offset.first();
try {
const offset = await client.queryOffset({ reference: consumerRef, stream: streamName }); // take the offset stored on the server if it exists
offsetSpecification = rabbit.Offset.offset(offset + 1n); // start from the message after 'marker'
} catch (e) {}

let lastOffset = offsetSpecification.value;
let messageCount = 0;
const consumer = await client.declareConsumer(
{ stream: streamName, offset: offsetSpecification, consumerRef },
async (message) => {
messageCount++;
if (!firstOffset && messageCount === 1) {
firstOffset = message.offset;
console.log("First message received");
}
if (messageCount % 10 === 0) {
await consumer.storeOffset(message.offset); // store offset every 10 messages
}
if (message.content.toString() === "marker") {
console.log("Marker found");
lastOffset = message.offset;
await consumer.storeOffset(message.offset); // store the offset on consumer closing
await consumer.close(true);
}
}
);

console.log(`Start consuming...`);
await sleep(2000);
console.log(`Done consuming, first offset was ${firstOffset}, last offset was ${lastOffset}`);
process.exit(0);

最相关的更改是

  • 消费者在第一次启动时使用 rabbit.Offset.first() 从流的开头附加。
  • 消费者必须有一个名称。它是存储和检索上次存储的偏移量值的键。
  • 每 10 条消息存储一次偏移量。对于偏移量存储频率来说,这是一个异常低的数值,但对于本教程来说是可以的。现实世界中的值通常在数百或数千范围内。
  • 在关闭消费者之前存储偏移量,就在获取标记消息之后。

让我们运行更新后的接收者

npm run offset-tracking-receive

以下是输出

Connecting...
Start consuming...
First message received
Marker found
Done consuming, first offset was 0, last offset was 99

这里没有令人惊讶的事情:消费者从流的开头获取消息,并在到达标记消息时停止。

让我们发布另外 100 条消息,并再次启动接收者

npm run offset-tracking-publish
npm run offset-tracking-receive

以下是输出

Connecting...
Start consuming...
Marker found
Done consuming, first offset was 100, last offset was 201

消费者从中断的地方重新开始:第一次运行中的最后一个偏移量为 99,第二次运行中的第一个偏移量为 100。请注意,在第二次运行中,偏移量规范是通过 queryOffset 方法获取的。消费者在第一次运行中存储了偏移量跟踪信息,因此客户端库提供了检索它的可能性,以便在第二次运行中从正确的位置恢复消费。

这结束了本关于 RabbitMQ Streams 中消费语义的教程。它介绍了消费者如何在流中的任何位置附加。消费应用程序可能会跟踪他们在流中到达的点。他们可以使用本教程中演示的内置服务器端偏移量跟踪功能。他们还可以自由地使用任何其他数据存储解决方案来完成此任务。

有关偏移量跟踪的更多信息,请参阅RabbitMQ 博客流 JavaScript Node.js 客户端文档

© 2024 RabbitMQ. All rights reserved.