RabbitMQ 流教程 - 偏移量跟踪
简介
先决条件
本教程假设 RabbitMQ 已 安装,运行在 localhost
上,并且 流插件 已启用。 标准流端口 为 5552。如果您使用不同的主机、端口或凭据,则连接设置需要调整。
使用 docker
如果您没有安装 RabbitMQ,可以在 Docker 容器中运行它
docker run -it --rm --name rabbitmq -p 5552:5552 -p 15672:15672 -p 5672:5672 \
-e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS='-rabbitmq_stream advertised_host localhost' \
rabbitmq:3.13
等待服务器启动,然后启用流和流管理插件
docker exec rabbitmq rabbitmq-plugins enable rabbitmq_stream rabbitmq_stream_management
在哪里寻求帮助
如果您在完成本教程时遇到问题,可以通过 邮件列表 或 Discord 社区服务器 与我们联系。
RabbitMQ Streams 在 RabbitMQ 3.9 中推出。有关更多信息,请访问 此处。
偏移量跟踪
设置
本教程的一部分包括用 C# 编写两个程序;一个生产者,它发送一批消息,并在最后发送一个标记消息;一个消费者,它接收消息,并在接收到标记消息时停止。它展示了消费者如何在一个流中导航,甚至可以从上次执行的位置重新开始。
本教程使用 .NET 流客户端。请确保遵循 第一个教程的设置步骤。
本教程的可执行版本可以在 RabbitMQ 教程仓库 中找到。发送程序称为 OffsetTrackingSend.cs
,接收程序称为 OffsetTrackingReceive.cs
。
让我们为每个程序创建一个项目
dotnet new console --name OffsetTrackingSend
mv OffsetTrackingSend/Program.cs OffsetTrackingSend/OffsetTrackingSend.cs
dotnet new console --name OffsetTrackingReceive
mv OffsetTrackingReceive/Program.cs OffsetTrackingReceive/OffsetTrackingReceive.cs
然后添加客户端依赖项。
cd OffsetTrackingSend
dotnet add package RabbitMQ.Stream.Client
cd ../OffsetTrackingReceive
dotnet add package RabbitMQ.Stream.Client
cd ..
本教程重点介绍客户端库的使用,因此应使用仓库中的最终代码来创建文件的脚手架(例如导入)。
发送
发送程序首先创建 StreamSystem
并声明流
var streamSystem = await StreamSystem.Create(new StreamSystemConfig());
var stream = "stream-offset-tracking-dotnet";
await streamSystem.CreateStream(new StreamSpec(stream));
然后,程序创建 Producer
实例并发布 100 条消息。最后一条消息的主体值为 marker
;这是消费者停止消费的标记。
请注意 CountdownEvent
的使用:它在每个消息确认回调中通过 Signal()
减 1。这确保了在关闭程序之前,代理收到了所有消息。
var messageCount = 100;
var confirmedCde = new CountdownEvent(messageCount);
var producer = await Producer.Create(new ProducerConfig(streamSystem, stream) {
ConfirmationHandler = async confirmation => {
if (confirmation.Status == ConfirmationStatus.Confirmed) {
confirmedCde.Signal();
}
await Task.CompletedTask.ConfigureAwait(false);
}
});
Console.WriteLine("Publishing {0} messages...", messageCount);
for (int i = 0; i < messageCount; i++) {
var body = i == messageCount - 1 ? "marker" : "hello";
await producer.Send(new Message(Encoding.UTF8.GetBytes(body)));
}
confirmedCde.Wait();
Console.WriteLine("Messages confirmed.");
await producer.Close();
await streamSystem.Close();
现在让我们创建接收程序。
接收
接收程序创建 StreamSystem
实例,并确保流已创建。这部分代码与发送程序相同,因此为了简洁起见,将在后面的代码片段中省略。
接收程序启动一个消费者,该消费者从流的开头附加(new OffsetTypeFirst()
)。它使用变量在程序结束时输出第一个和最后一个接收消息的偏移量。
消费者在收到标记消息时停止:它将偏移量分配给一个变量,关闭消费者,并将 CountdownEvent
减 1。与发送程序一样,CountdownEvent
告诉程序在消费者完成工作后继续执行。
IOffsetType offsetSpecification = new OffsetTypeFirst();
ulong initialValue = UInt64.MaxValue;
ulong firstOffset = initialValue;
ulong lastOffset = initialValue;
var consumedCde = new CountdownEvent(1);
var consumer = await Consumer.Create(new ConsumerConfig(streamSystem, stream)
{
OffsetSpec = offsetSpecification,
MessageHandler = async (_, consumer, context, message) => {
if (Interlocked.CompareExchange(ref firstOffset, context.Offset, initialValue) == initialValue) {
Console.WriteLine("First message received.");
}
if ("marker".Equals(Encoding.UTF8.GetString(message.Data.Contents))) {
Interlocked.Exchange(ref lastOffset, context.Offset);
await consumer.Close();
consumedCde.Signal();
}
await Task.CompletedTask;
}
});
Console.WriteLine("Started consuming...");
consumedCde.Wait();
Console.WriteLine("Done consuming, first offset {0}, last offset {1}.", firstOffset, lastOffset);
await streamSystem.Close();
探索流
为了运行这两个示例,请打开两个终端(shell)选项卡。
在第一个选项卡中,cd
到 OffsetTrackingSend
目录,并运行发送程序以发布一批消息
dotnet run
输出如下
Publishing 100 messages...
Messages confirmed.
现在让我们运行接收程序。打开一个新选项卡,并 cd
到 OffsetTrackingSend
目录。请记住,它应该从流的开头开始,因为使用了 first
偏移量规范。
dotnet run
以下是输出
Started consuming...
First message received.
Done consuming, first offset 0, last offset 99
流可以看作一个数组,其中元素是消息。偏移量是给定消息在数组中的索引。
流与队列不同:消费者可以读取和重新读取相同的消息,并且消息会保留在流中。
让我们使用 offset(long)
规范来尝试这个功能,以便在给定偏移量处附加。将 offsetSpecification
变量从 OffsetTypeFirst()
设置为 OffsetTypeOffset(42)
IOffsetType offsetSpecification = new OffsetTypeOffset(42);
偏移量 42 是任意的,它可以是 0 到 99 之间的任何数字。再次运行接收程序
dotnet run
输出如下
Started consuming...
First message received.
Done consuming, first offset 42, last offset 99.
还有一种方法是在流的末尾附加,以便仅在创建消费者时查看新消息。这就是 next
偏移量规范。让我们尝试一下
IOffsetType offsetSpecification = new OffsetTypeNext();
运行接收程序
dotnet run
这次消费者没有收到任何消息
Started consuming...
它正在等待流中的新消息。让我们通过再次运行发送程序来发布一些消息。回到第一个选项卡
dotnet run
等待程序退出,然后切换回接收程序选项卡。消费者收到了新消息
Started consuming...
First message received.
Done consuming, first offset 100, last offset 199.
接收程序由于发送程序在流末尾放置的新标记消息而停止。
本节展示了如何“浏览”流:从开头、从任何偏移量,甚至查看新消息。下一节将介绍如何利用服务器端偏移量跟踪来恢复消费者上次执行停止的位置。
服务器端偏移量跟踪
RabbitMQ Streams 提供服务器端偏移量跟踪来存储给定消费者在流中的进度。如果消费者因任何原因停止(崩溃、升级等),它将能够从上次停止的位置重新附加,以避免处理相同的消息。
RabbitMQ Streams 提供了偏移量跟踪的 API,但可以使用其他解决方案来存储消费应用程序的进度。这可能取决于用例,但关系数据库也是一个不错的解决方案。
让我们修改接收程序以存储已处理消息的偏移量。更新的行用注释标出
var consumerName = "offset-tracking-tutorial"; // name of the consumer
IOffsetType offsetSpecification;
try {
// get last stored offset
ulong storedOffset = await streamSystem.QueryOffset(consumerName, stream).ConfigureAwait(false);
// start just after the last stored offset
offsetSpecification = new OffsetTypeOffset(storedOffset + 1);
} catch (OffsetNotFoundException) {
// start consuming at the beginning of the stream if no stored offset
offsetSpecification = new OffsetTypeFirst();
}
ulong initialValue = UInt64.MaxValue;
ulong firstOffset = initialValue;
int messageCount = 0; // number of received messages
ulong lastOffset = initialValue;
var consumedCde = new CountdownEvent(1);
var consumer = await Consumer.Create(new ConsumerConfig(streamSystem, stream)
{
OffsetSpec = offsetSpecification,
Reference = consumerName, // the consumer must a have name
MessageHandler = async (_, consumer, context, message) => {
if (Interlocked.CompareExchange(ref firstOffset, context.Offset, initialValue) == initialValue) {
Console.WriteLine("First message received.");
}
if (Interlocked.Increment(ref messageCount) % 10 == 0) {
// store offset every 10 messages
await consumer.StoreOffset(context.Offset).ConfigureAwait(false);
}
if ("marker".Equals(Encoding.UTF8.GetString(message.Data.Contents))) {
Interlocked.Exchange(ref lastOffset, context.Offset);
// store the offset on consumer closing
await consumer.StoreOffset(context.Offset).ConfigureAwait(false);
await consumer.Close();
consumedCde.Signal();
}
await Task.CompletedTask;
}
});
Console.WriteLine("Started consuming...");
consumedCde.Wait();
Console.WriteLine("Done consuming, first offset {0}, last offset {1}.", firstOffset, lastOffset);
await streamSystem.Close();
最重要的更改是
- 该程序在创建消费者之前查找最后存储的偏移量。如果没有存储的偏移量(这很可能是该消费者第一次启动),它使用
first
。如果存在存储的偏移量,它使用offset
规范从其后开始(存储的偏移量 + 1
),这假定带有存储的偏移量的那条消息已在应用程序的前一个实例中处理过。 - 消费者必须有一个名称。它是存储和检索最后存储的偏移量值的键。
- 每 10 条消息存储一次偏移量。对于偏移量存储频率来说,这是一个非常低的数值,但对于本教程来说是可以的。现实世界中的数值通常在数百或数千范围内。
- 在关闭消费者之前存储偏移量,就在收到标记消息之后。
让我们运行更新后的接收程序
dotnet run
以下是输出
Started consuming...
First message received.
Done consuming, first offset 0, last offset 99.
这里没有什么令人惊讶的地方:消费者从流的开头获取消息,并在到达标记消息时停止。
让我们再启动一次
dotnet run
以下是输出
Started consuming...
First message received.
Done consuming, first offset 100, last offset 199.
消费者从上次停止的位置精确地重新开始:第一次运行中的最后一个偏移量是 99,第二次运行中的第一个偏移量是 100。消费者在第一次运行中存储了偏移量跟踪信息,因此客户端库在第二次运行中使用它来恢复从正确的位置消费。
本 RabbitMQ Streams 消费语义教程到此结束。它介绍了消费者如何附加到流中的任何位置。消费应用程序可能会跟踪它们在流中到达的位置。它们可以使用本教程中演示的内置服务器端偏移量跟踪功能。它们也可以自由地使用任何其他数据存储解决方案来完成此任务。
有关偏移量跟踪的更多信息,请参阅 RabbitMQ 博客 和 .NET 流客户端文档。