跳到主要内容

RabbitMQ Stream 教程 - Offset Tracking

介绍

信息

前提条件

本教程假设 RabbitMQ 已安装,运行在 localhost 上,并且已启用 stream 插件标准 stream 端口是 5552。如果您使用不同的主机、端口或凭据,则需要调整连接设置。

使用 docker

如果您没有安装 RabbitMQ,您可以在 Docker 容器中运行它

docker run -it --rm --name rabbitmq -p 5552:5552 -p 15672:15672 -p 5672:5672  \
-e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS='-rabbitmq_stream advertised_host localhost' \
rabbitmq:4-management

等待服务器启动,然后启用 stream 和 stream 管理插件

docker exec rabbitmq rabbitmq-plugins enable rabbitmq_stream rabbitmq_stream_management 

在哪里获得帮助

如果您在学习本教程时遇到问题,可以通过邮件列表Discord 社区服务器联系我们。

RabbitMQ Streams 在 RabbitMQ 3.9 中引入。更多信息请访问 此处

Offset Tracking

设置

本教程的这一部分包括用 C# 编写两个程序;一个生产者,它发送一连串消息,并在末尾带有一个标记消息;一个消费者,它接收消息并在收到标记消息时停止。它展示了消费者如何浏览 stream,甚至可以在之前的执行中停止的位置重新开始。

本教程使用 stream .NET 客户端。请务必按照第一个教程中的 设置步骤进行操作。

本教程的可执行版本可以在 RabbitMQ tutorials 存储库中找到。发送程序名为 OffsetTrackingSend.cs,接收程序名为 OffsetTrackingReceive.cs

让我们为每个程序创建一个项目

dotnet new console --name OffsetTrackingSend
mv OffsetTrackingSend/Program.cs OffsetTrackingSend/OffsetTrackingSend.cs
dotnet new console --name OffsetTrackingReceive
mv OffsetTrackingReceive/Program.cs OffsetTrackingReceive/OffsetTrackingReceive.cs

然后添加客户端依赖项。

cd OffsetTrackingSend
dotnet add package RabbitMQ.Stream.Client
cd ../OffsetTrackingReceive
dotnet add package RabbitMQ.Stream.Client
cd ..

本教程侧重于客户端库的用法,因此应使用存储库中的最终代码来创建文件的脚手架(例如,导入)。

发送

发送程序首先创建 StreamSystem 并声明 stream

var streamSystem = await StreamSystem.Create(new StreamSystemConfig());

var stream = "stream-offset-tracking-dotnet";
await streamSystem.CreateStream(new StreamSpec(stream));

然后,程序创建一个 Producer 实例并发布 100 条消息。最后一条消息的主体值设置为 marker;这是消费者停止消费的标记。

请注意 CountdownEvent 的使用:它在每个消息确认回调中使用 Signal() 递减。这确保了 broker 在关闭程序之前接收到所有消息。

var messageCount = 100;
var confirmedCde = new CountdownEvent(messageCount);
var producer = await Producer.Create(new ProducerConfig(streamSystem, stream) {
ConfirmationHandler = async confirmation => {
if (confirmation.Status == ConfirmationStatus.Confirmed) {
confirmedCde.Signal();
}
await Task.CompletedTask.ConfigureAwait(false);
}
});

Console.WriteLine("Publishing {0} messages...", messageCount);
for (int i = 0; i < messageCount; i++) {
var body = i == messageCount - 1 ? "marker" : "hello";
await producer.Send(new Message(Encoding.UTF8.GetBytes(body)));
}

confirmedCde.Wait();
Console.WriteLine("Messages confirmed.");
await producer.Close();
await streamSystem.Close();

现在让我们创建接收程序。

接收

接收程序创建一个 StreamSystem 实例,并确保 stream 也被创建。这部分代码与发送程序中的代码相同,因此为了简洁起见,在接下来的代码片段中跳过。

接收程序启动一个消费者,该消费者从 stream 的开头开始附加 (new OffsetTypeFirst())。它使用变量在程序结束时输出第一个和最后一个接收消息的 offset。

当消费者收到标记消息时停止:它将 offset 分配给一个变量,关闭消费者,并递减 CountdownEvent。与发送者一样,当消费者完成其工作时,CountdownEvent 告诉程序继续执行。

IOffsetType offsetSpecification = new OffsetTypeFirst();
ulong initialValue = UInt64.MaxValue;
ulong firstOffset = initialValue;
ulong lastOffset = initialValue;
var consumedCde = new CountdownEvent(1);
var consumer = await Consumer.Create(new ConsumerConfig(streamSystem, stream)
{
OffsetSpec = offsetSpecification,
MessageHandler = async (_, consumer, context, message) => {
if (Interlocked.CompareExchange(ref firstOffset, context.Offset, initialValue) == initialValue) {
Console.WriteLine("First message received.");
}
if ("marker".Equals(Encoding.UTF8.GetString(message.Data.Contents))) {
Interlocked.Exchange(ref lastOffset, context.Offset);
await consumer.Close();
consumedCde.Signal();
}
await Task.CompletedTask;
}
});
Console.WriteLine("Started consuming...");

consumedCde.Wait();
Console.WriteLine("Done consuming, first offset {0}, last offset {1}.", firstOffset, lastOffset);
await streamSystem.Close();

探索 Stream

为了运行这两个示例,请打开两个终端(shell)选项卡。

在第一个选项卡中,cd 进入 OffsetTrackingSend 目录,并运行发送者以发布一连串消息

dotnet run

输出如下

Publishing 100 messages...
Messages confirmed.

现在让我们运行接收器。打开一个新的选项卡,cd 进入 OffsetTrackingSend 目录。请记住,由于 first offset 规范,它应该从 stream 的开头开始。

dotnet run

这是输出

Started consuming...
First message received.
Done consuming, first offset 0, last offset 99
什么是 offset?

可以将 stream 视为一个数组,其中元素是消息。Offset 是数组中给定消息的索引。

Stream 与队列不同:消费者可以读取和重新读取相同的消息,并且消息保留在 stream 中。

让我们通过使用 offset(long) 规范在给定的 offset 处附加来尝试此功能。将 offsetSpecification 变量从 OffsetTypeFirst() 设置为 OffsetTypeOffset(42)

IOffsetType offsetSpecification = new OffsetTypeOffset(42);

Offset 42 是任意的,它可以是 0 到 99 之间的任何数字。再次运行接收器

dotnet run

输出如下

Started consuming...
First message received.
Done consuming, first offset 42, last offset 99.

还有一种方法可以在 stream 的末尾附加,以便仅查看消费者创建时的新消息。这是 next offset 规范。让我们尝试一下

IOffsetType offsetSpecification = new OffsetTypeNext();

运行接收器

dotnet run

这次消费者没有收到任何消息

Started consuming...

它正在等待 stream 中的新消息。让我们通过再次运行发送器来发布一些消息。回到第一个选项卡

dotnet run

等待程序退出并切换回接收器选项卡。消费者收到了新消息

Started consuming...
First message received.
Done consuming, first offset 100, last offset 199.

接收器由于发送器放在 stream 末尾的新标记消息而停止。

本节展示了如何“浏览” stream:从头开始,从任何 offset,甚至对于新消息。下一节介绍如何利用服务器端 offset tracking 来恢复消费者在之前的执行中停止的位置。

服务器端 Offset Tracking

RabbitMQ Streams 提供服务器端 offset tracking,以存储给定消费者在 stream 中的进度。如果消费者因任何原因(崩溃、升级等)停止,它将能够重新附加到之前停止的位置,以避免处理相同的消息。

RabbitMQ Streams 提供了用于 offset tracking 的 API,但可以使用其他解决方案来存储消费应用程序的进度。这可能取决于用例,但关系数据库也可能是一个不错的解决方案。

让我们修改接收器以存储已处理消息的 offset。更新的行用注释标出

var consumerName = "offset-tracking-tutorial"; // name of the consumer
IOffsetType offsetSpecification;
try {
// get last stored offset
ulong storedOffset = await streamSystem.QueryOffset(consumerName, stream).ConfigureAwait(false);
// start just after the last stored offset
offsetSpecification = new OffsetTypeOffset(storedOffset + 1);
} catch (OffsetNotFoundException) {
// start consuming at the beginning of the stream if no stored offset
offsetSpecification = new OffsetTypeFirst();
}
ulong initialValue = UInt64.MaxValue;
ulong firstOffset = initialValue;
int messageCount = 0; // number of received messages
ulong lastOffset = initialValue;
var consumedCde = new CountdownEvent(1);
var consumer = await Consumer.Create(new ConsumerConfig(streamSystem, stream)
{
OffsetSpec = offsetSpecification,
Reference = consumerName, // the consumer must a have name
MessageHandler = async (_, consumer, context, message) => {
if (Interlocked.CompareExchange(ref firstOffset, context.Offset, initialValue) == initialValue) {
Console.WriteLine("First message received.");
}
if (Interlocked.Increment(ref messageCount) % 10 == 0) {
// store offset every 10 messages
await consumer.StoreOffset(context.Offset).ConfigureAwait(false);
}
if ("marker".Equals(Encoding.UTF8.GetString(message.Data.Contents))) {
Interlocked.Exchange(ref lastOffset, context.Offset);
// store the offset on consumer closing
await consumer.StoreOffset(context.Offset).ConfigureAwait(false);
await consumer.Close();
consumedCde.Signal();
}
await Task.CompletedTask;
}
});
Console.WriteLine("Started consuming...");

consumedCde.Wait();
Console.WriteLine("Done consuming, first offset {0}, last offset {1}.", firstOffset, lastOffset);
await streamSystem.Close();

最相关的更改是

  • 程序在创建消费者之前查找上次存储的 offset。如果没有存储的 offset(很可能是此消费者第一次启动),则使用 first。如果存在存储的 offset,则使用 offset 规范在之后立即开始 (stored offset + 1),这假设具有存储 offset 的消息已在应用程序的先前实例中处理。
  • 消费者必须有一个名称。它是存储和检索上次存储的 offset 值的关键。
  • offset 每 10 条消息存储一次。对于 offset 存储频率来说,这是一个异常低的值,但这对于本教程来说是可以的。现实世界中的值通常在数百或数千个。
  • offset 在关闭消费者之前存储,就在获取标记消息之后。

让我们运行更新后的接收器

dotnet run

这是输出

Started consuming...
First message received.
Done consuming, first offset 0, last offset 99.

那里没有什么令人惊讶的:消费者从 stream 的开头获取消息,并在到达标记消息时停止。

让我们再次启动它

dotnet run

这是输出

Started consuming...
First message received.
Done consuming, first offset 100, last offset 199.

消费者在它停止的位置精确地重新启动:第一次运行中的最后一个 offset 是 99,第二次运行中的第一个 offset 是 100。消费者在第一次运行中存储了 offset tracking 信息,因此客户端库使用它来恢复在第二次运行中正确位置的消费。

本教程总结了 RabbitMQ Streams 中关于消费语义的内容。它涵盖了消费者如何在 stream 中的任何位置附加。消费应用程序很可能跟踪它们在 stream 中到达的点。他们可以使用本教程中演示的内置服务器端 offset tracking 功能。他们也可以自由地使用任何其他数据存储解决方案来完成此任务。

有关 offset tracking 的更多信息,请参阅 RabbitMQ 博客stream .NET 客户端文档

© . All rights reserved.