跳至主要内容

RabbitMQ 流教程 - 偏移量跟踪

简介

信息

先决条件

本教程假设 RabbitMQ 已安装,并在 localhost 上运行,并且已启用流插件标准流端口 为 5552。如果您使用不同的主机、端口或凭据,则连接设置需要进行调整。

使用 Docker

如果您没有安装 RabbitMQ,可以在 Docker 容器中运行它

docker run -it --rm --name rabbitmq -p 5552:5552 -p 15672:15672 -p 5672:5672  \
-e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS='-rabbitmq_stream advertised_host localhost' \
rabbitmq:3.13

等待服务器启动,然后启用流和流管理插件

docker exec rabbitmq rabbitmq-plugins enable rabbitmq_stream rabbitmq_stream_management 

获取帮助

如果您在学习本教程时遇到问题,可以通过邮件列表Discord 社区服务器联系我们。

RabbitMQ 流在 RabbitMQ 3.9 中引入。更多信息请访问此处

偏移量跟踪

设置

本教程的一部分包括使用 Go 编写两个程序;一个生产者,发送一连串的消息,并在最后发送一个标记消息,以及一个消费者,接收消息并在收到标记消息时停止。它展示了消费者如何浏览流,甚至可以在之前的执行中从中断的地方重新开始。

本教程使用Go 流客户端。请确保按照第一个教程中的设置步骤操作。

本教程的可执行版本可以在RabbitMQ 教程仓库中找到。发送程序称为 offset_tracking_send.go,接收程序称为 offset_tracking_receive.go。本教程重点介绍客户端库的使用,因此应使用仓库中的最终代码创建文件的脚手架(例如导入、主函数等)。

发送

发送程序首先创建环境并声明流

env, _ := stream.NewEnvironment(
stream.NewEnvironmentOptions().
SetHost("localhost").
SetPort(5552).
SetUser("guest").
SetPassword("guest"))

streamName := "stream-offset-tracking-go"
env.DeclareStream(streamName,
&stream.StreamOptions{
MaxLengthBytes: stream.ByteCapacity{}.GB(2),
},
)

请注意,为了简洁起见,错误处理代码已省略。

然后,程序创建一个生产者并发布 100 条消息。最后一条消息的消息体设置为 marker;这是消费者停止消费的标记。

程序使用 handlePublishConfirm 函数和一个通道来确保所有消息在退出之前都发送到代理。现在让我们跳过这部分,先看看发送的主要部分。

producer, _ := env.NewProducer(streamName, stream.NewProducerOptions())

messageCount := 100
ch := make(chan bool)
chPublishConfirm := producer.NotifyPublishConfirmation()
handlePublishConfirm(chPublishConfirm, messageCount, ch)

fmt.Printf("Publishing %d messages\n", messageCount)
for i := 0; i < messageCount; i++ {
var body string
if i == messageCount-1 {
body = "marker"
} else {
body = "hello"
}
producer.Send(amqp.NewMessage([]byte(body)))
}
_ = <-ch
fmt.Println("Messages confirmed")

producer.Close()

发送程序使用 2 个通道和一个 Go 协程,在代理确认消息后继续执行并退出。让我们关注这部分。

生产者的 NotifyPublishConfirmation 函数返回第一个通道。客户端库将代理确认信息发送到此通道,并在 handlePublishConfirm 中声明的协程接收这些确认信息。

messageCount := 100
chPublishConfirm := producer.NotifyPublishConfirmation()
ch := make(chan bool)
handlePublishConfirm(chPublishConfirm, messageCount, ch)

该协程处理消息,并在达到预期的确认数量时将 true 发送到第二个通道。

func handlePublishConfirm(confirms stream.ChannelPublishConfirm, messageCount int, ch chan bool) {
go func() {
confirmedCount := 0
for confirmed := range confirms {
for _, _ = range confirmed {
if msg.IsConfirmed() {
confirmedCount++
if confirmedCount == messageCount {
ch <- true
}
}
}
}
}()
}

主程序在发送循环之后等待第二个通道,因此只要通道中有任何内容到达(协程发送的 true 值),它就会继续执行。

由于这种同步机制,程序在所有消息都发送到代理之前停止的风险不存在。

现在让我们创建接收程序。

接收

接收程序也创建环境并声明流。这部分代码与发送程序中的相同,因此为了简洁起见,在接下来的代码片段中将跳过它。

接收程序启动一个消费者,该消费者从流的开头附加(stream.OffsetSpecification{}.First())。它使用变量在程序结束时输出接收到的第一条和最后一条消息的偏移量。

消费者在收到标记消息时停止:它将偏移量分配给一个变量,关闭消费者,并将 true 发送到一个通道。与发送者一样,通道告诉程序在消费者完成其工作时继续执行。

var firstOffset int64 = -1
var lastOffset atomic.Int64
ch := make(chan bool)
messagesHandler := func(consumerContext stream.ConsumerContext, message *amqp.Message) {
if atomic.CompareAndSwapInt64(&firstOffset, -1, consumerContext.Consumer.GetOffset()) {
fmt.Println("First message received.")
}
if string(message.GetData()) == "marker" {
lastOffset.Store(consumerContext.Consumer.GetOffset())
_ = consumerContext.Consumer.Close()
ch <- true
}
}

offsetSpecification := stream.OffsetSpecification{}.First()
_, _ = env.NewConsumer(streamName, messagesHandler,
stream.NewConsumerOptions().
SetOffset(offsetSpecification))

fmt.Println("Started consuming...")
_ = <-ch

fmt.Printf("Done consuming, first offset %d, last offset %d.\n", firstOffset, lastOffset.Load())

探索流

要运行这两个示例,请打开两个终端(shell)选项卡。

在第一个选项卡中,运行发送者以发布一连串的消息

go run offset_tracking_send.go

输出如下

Publishing 100 messages
Messages confirmed.

现在让我们运行接收者。打开一个新选项卡。请记住,它应该从流的开头开始,因为使用了 first 偏移量规范。

go run offset_tracking_receive.go

以下是输出

Started consuming...
First message received.
Done consuming, first offset 0, last offset 99.
什么是偏移量?

流可以看作一个数组,其中元素是消息。偏移量是给定消息在数组中的索引。

流与队列不同:消费者可以读取和重新读取相同的消息,并且消息保留在流中。

让我们尝试使用 offset 规范在给定偏移量处附加来使用此功能。将 offsetSpecification 变量从 stream.OffsetSpecification{}.First() 设置为 stream.OffsetSpecification{}.Offset(42)

offsetSpecification := stream.OffsetSpecification{}.Offset(42)

偏移量 42 是任意的,它可以是 0 到 99 之间的任何数字。再次运行接收者

go run offset_tracking_receive.go

输出如下

Started consuming...
First message received.
Done consuming, first offset 42, last offset 99.

还有一种方法可以附加到流的末尾,以便仅在消费者创建时查看新消息。这是 next 偏移量规范。让我们尝试一下

offsetSpecification := stream.OffsetSpecification{}.Next()

运行接收者

go run offset_tracking_receive.go

这次消费者没有收到任何消息

Started consuming...

它正在等待流中的新消息。让我们通过再次运行发送者来发布一些消息。回到第一个选项卡

go run offset_tracking_send.go

等待程序退出并切换回接收者选项卡。消费者收到了新消息

Started consuming...
First message received.
Done consuming, first offset 100, last offset 199.

接收者停止是因为发送者在流的末尾添加了新的标记消息。

本节展示了如何“浏览”流:从开头、从任何偏移量,甚至获取新消息。下一节介绍如何利用服务器端偏移量跟踪来恢复消费者在先前执行中中断的地方。

服务器端偏移量跟踪

RabbitMQ 流提供服务器端偏移量跟踪以在流中存储给定消费者的进度。如果消费者因任何原因停止(崩溃、升级等),它将能够从之前停止的地方重新附加,以避免处理相同的消息。

RabbitMQ 流提供用于偏移量跟踪的 API,但也可以使用其他解决方案来存储消费应用程序的进度。这可能取决于用例,但关系数据库也可以是一个不错的解决方案。

让我们修改接收者以存储已处理消息的偏移量。已更新的行用注释突出显示

var firstOffset int64 = -1
var messageCount int64 = -1 // number of received messages
var lastOffset atomic.Int64
ch := make(chan bool)
messagesHandler := func(consumerContext stream.ConsumerContext, message *amqp.Message) {
if atomic.CompareAndSwapInt64(&firstOffset, -1, consumerContext.Consumer.GetOffset()) {
fmt.Println("First message received.")
}
if atomic.AddInt64(&messageCount, 1)%10 == 0 {
consumerContext.Consumer.StoreOffset() // store offset every 10 messages
}
if string(message.GetData()) == "marker" {
lastOffset.Store(consumerContext.Consumer.GetOffset())
consumerContext.Consumer.StoreOffset() // store the offset on consumer closing
consumerContext.Consumer.Close()
ch <- true
}
}

var offsetSpecification stream.OffsetSpecification
consumerName := "offset-tracking-tutorial" // name of the consumer
storedOffset, err := env.QueryOffset(consumerName, streamName) // get last stored offset
if errors.Is(err, stream.OffsetNotFoundError) {
// start consuming at the beginning of the stream if no stored offset
offsetSpecification = stream.OffsetSpecification{}.First()
} else {
// start just after the last stored offset
offsetSpecification = stream.OffsetSpecification{}.Offset(storedOffset + 1)
}

_, err = env.NewConsumer(streamName, messagesHandler,
stream.NewConsumerOptions().
SetManualCommit(). // activate manual offset tracking
SetConsumerName(consumerName). // the consumer must a have name
SetOffset(offsetSpecification))
fmt.Println("Started consuming...")
_ = <-ch

fmt.Printf("Done consuming, first offset %d, last offset %d.\n", firstOffset, lastOffset.Load())

最相关的更改是

  • 该程序在创建消费者之前查找最后存储的偏移量。如果没有存储偏移量(这可能是此消费者第一次启动),它将使用 first。如果存在存储的偏移量,它将使用 offset 规范从其后开始(存储的偏移量 + 1),这假定具有存储偏移量的消息已在应用程序的先前实例中处理。
  • 消费者必须有一个名称。它是存储和检索最后存储的偏移量值的键。
  • 手动跟踪策略被激活,这意味着需要显式调用以存储偏移量。
  • 每 10 条消息存储一次偏移量。对于偏移量存储频率来说,这是一个异常低的数值,但对于本教程来说是可以的。现实世界中的值通常在数百或数千范围内。
  • 在关闭消费者之前,在获取标记消息之后存储偏移量。

让我们运行更新后的接收者

go run offset_tracking_receive.go

以下是输出

Started consuming...
First message received.
Done consuming, first offset 0, last offset 99.

这里没有什么令人惊讶的:消费者从流的开头获取消息,并在到达标记消息时停止。

让我们再启动一次

go run offset_tracking_receive.go

以下是输出

Started consuming...
First message received.
Done consuming, first offset 100, last offset 199.

消费者从中断的地方重新开始:第一次运行中的最后一个偏移量是 99,第二次运行中的第一个偏移量是 100。消费者在第一次运行中存储了偏移量跟踪信息,因此客户端库使用它在第二次运行中恢复到正确的位置。

本教程关于 RabbitMQ 流中消费语义的介绍到此结束。它介绍了消费者如何在流中的任何位置附加。消费应用程序可能会跟踪它们在流中到达的点。它们可以使用本教程中演示的内置服务器端偏移量跟踪功能。它们也可以自由地为此任务使用任何其他数据存储解决方案。

有关偏移量跟踪的更多信息,请参阅RabbitMQ 博客流 Go 客户端文档

© 2024 RabbitMQ. All rights reserved.