RabbitMQ Stream 教程 - Offset Tracking
简介
前提条件
本教程假设您已安装 RabbitMQ,并在 localhost
上运行,且已启用 stream 插件。 标准 stream 端口是 5552。如果您使用不同的主机、端口或凭据,则需要调整连接设置。
使用 Docker
如果您没有安装 RabbitMQ,可以在 Docker 容器中运行它
docker run -it --rm --name rabbitmq -p 5552:5552 -p 15672:15672 -p 5672:5672 \
-e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS='-rabbitmq_stream advertised_host localhost' \
rabbitmq:4-management
等待服务器启动,然后启用 stream 和 stream management 插件
docker exec rabbitmq rabbitmq-plugins enable rabbitmq_stream rabbitmq_stream_management
在哪里获得帮助
如果您在学习本教程时遇到问题,可以通过邮件列表或 Discord 社区服务器联系我们。
RabbitMQ Streams 在 RabbitMQ 3.9 中引入。更多信息请参见此处。
Offset Tracking
设置
本教程的这一部分包括编写两个 Go 程序;一个生产者,它发送一波消息,并在末尾发送一个标记消息,以及一个消费者,它接收消息并在收到标记消息时停止。它展示了消费者如何浏览 stream,甚至可以在之前的执行中停止的位置重新开始。
本教程使用stream Go 客户端。请务必按照第一个教程中的设置步骤进行操作。
本教程的可执行版本可以在 RabbitMQ 教程仓库中找到。发送程序名为 offset_tracking_send.go
,接收程序名为 offset_tracking_receive.go
。本教程重点介绍客户端库的用法,因此应使用仓库中的最终代码来创建文件的支架(例如,导入、main 函数等)。
发送
发送程序首先创建环境并声明 stream
env, _ := stream.NewEnvironment(
stream.NewEnvironmentOptions().
SetHost("localhost").
SetPort(5552).
SetUser("guest").
SetPassword("guest"))
streamName := "stream-offset-tracking-go"
env.DeclareStream(streamName,
&stream.StreamOptions{
MaxLengthBytes: stream.ByteCapacity{}.GB(2),
},
)
请注意,为了简洁起见,省略了错误处理代码。
然后,程序创建一个生产者并发布 100 条消息。最后一条消息的正文值设置为 marker
;这是消费者停止消费的标记。
该程序使用 handlePublishConfirm
函数和一个 channel 来确保所有消息都到达 broker 后再退出。我们先跳过这部分,先看看发送的主要部分
producer, _ := env.NewProducer(streamName, stream.NewProducerOptions())
messageCount := 100
ch := make(chan bool)
chPublishConfirm := producer.NotifyPublishConfirmation()
handlePublishConfirm(chPublishConfirm, messageCount, ch)
fmt.Printf("Publishing %d messages\n", messageCount)
for i := 0; i < messageCount; i++ {
var body string
if i == messageCount-1 {
body = "marker"
} else {
body = "hello"
}
producer.Send(amqp.NewMessage([]byte(body)))
}
_ = <-ch
fmt.Println("Messages confirmed")
producer.Close()
发送程序使用 2 个 channel 和一个 Go routine,以便在 broker 确认消息后继续并退出。让我们关注这一部分。
生产者 NotifyPublishConfirmation
函数返回第一个 channel。客户端库在此 channel 上发送 broker 确认,而 handlePublishConfirm
中声明的 routine 接收这些确认
messageCount := 100
chPublishConfirm := producer.NotifyPublishConfirmation()
ch := make(chan bool)
handlePublishConfirm(chPublishConfirm, messageCount, ch)
该 routine 处理消息,并在达到预期的确认数量时向第二个 channel 发送 true
func handlePublishConfirm(confirms stream.ChannelPublishConfirm, messageCount int, ch chan bool) {
go func() {
confirmedCount := 0
for confirmed := range confirms {
for _, _ = range confirmed {
if msg.IsConfirmed() {
confirmedCount++
if confirmedCount == messageCount {
ch <- true
}
}
}
}
}()
}
主程序在发送循环之后等待第二个 channel,因此一旦 channel 上有任何内容到达(routine 发送的 true
值),它就会继续执行。
由于这种同步机制,程序不会在所有消息都到达 broker 之前停止,因此不存在风险。
现在让我们创建接收程序。
接收
接收程序创建环境并声明 stream。这部分代码与发送程序中的代码相同,因此为了简洁起见,在接下来的代码片段中跳过。
接收程序启动一个消费者,该消费者从 stream 的开头附加 (stream.OffsetSpecification{}.First()
)。它使用变量在程序结束时输出第一个和最后一个接收到的消息的 offset。
当消费者收到标记消息时停止:它将 offset 分配给一个变量,关闭消费者,并向 channel 发送 true
。与发送者一样,channel 告诉程序在消费者完成其工作后继续执行。
var firstOffset int64 = -1
var lastOffset atomic.Int64
ch := make(chan bool)
messagesHandler := func(consumerContext stream.ConsumerContext, message *amqp.Message) {
if atomic.CompareAndSwapInt64(&firstOffset, -1, consumerContext.Consumer.GetOffset()) {
fmt.Println("First message received.")
}
if string(message.GetData()) == "marker" {
lastOffset.Store(consumerContext.Consumer.GetOffset())
_ = consumerContext.Consumer.Close()
ch <- true
}
}
offsetSpecification := stream.OffsetSpecification{}.First()
_, _ = env.NewConsumer(streamName, messagesHandler,
stream.NewConsumerOptions().
SetOffset(offsetSpecification))
fmt.Println("Started consuming...")
_ = <-ch
fmt.Printf("Done consuming, first offset %d, last offset %d.\n", firstOffset, lastOffset.Load())
探索 Stream
为了运行这两个示例,请打开两个终端(shell)选项卡。
在第一个选项卡中,运行发送者以发布一波消息
go run offset_tracking_send.go
输出如下
Publishing 100 messages
Messages confirmed.
现在运行接收器。打开一个新选项卡。记住,由于 first
offset 规范,它应该从 stream 的开头开始。
go run offset_tracking_receive.go
这是输出
Started consuming...
First message received.
Done consuming, first offset 0, last offset 99.
stream 可以看作是一个数组,其中的元素是消息。offset 是数组中给定消息的索引。
stream 与队列不同:消费者可以读取和重新读取相同的消息,并且消息保留在 stream 中。
让我们通过使用 offset
规范附加到给定的 offset 来尝试此功能。将 offsetSpecification
变量从 stream.OffsetSpecification{}.First()
设置为 stream.OffsetSpecification{}.Offset(42)
offsetSpecification := stream.OffsetSpecification{}.Offset(42)
Offset 42 是任意的,它可以是 0 到 99 之间的任何数字。再次运行接收器
go run offset_tracking_receive.go
输出如下
Started consuming...
First message received.
Done consuming, first offset 42, last offset 99.
还有一种方法可以附加到 stream 的末尾,以便仅查看消费者创建时的新消息。这是 next
offset 规范。让我们试试看
offsetSpecification := stream.OffsetSpecification{}.Next()
运行接收器
go run offset_tracking_receive.go
这次消费者没有收到任何消息
Started consuming...
它正在等待 stream 中的新消息。让我们通过再次运行发送者来发布一些消息。回到第一个选项卡
go run offset_tracking_send.go
等待程序退出,然后切换回接收器选项卡。消费者收到了新消息
Started consuming...
First message received.
Done consuming, first offset 100, last offset 199.
由于发送者放在 stream 末尾的新标记消息,接收器停止了。
本节展示了如何“浏览” stream:从开头、从任何 offset,甚至对于新消息。下一节介绍如何利用服务器端 offset 跟踪来恢复消费者在先前执行中停止的位置。
服务器端 Offset Tracking
RabbitMQ Streams 提供服务器端 offset 跟踪,以存储给定消费者在 stream 中的进度。如果消费者因任何原因(崩溃、升级等)停止,它将能够重新附加到先前停止的位置,以避免处理相同的消息。
RabbitMQ Streams 提供了用于 offset 跟踪的 API,但也可以使用其他解决方案来存储消费应用程序的进度。这可能取决于用例,但关系数据库也可能是一个不错的解决方案。
让我们修改接收器以存储已处理消息的 offset。更新后的行用注释标出
var firstOffset int64 = -1
var messageCount int64 = -1 // number of received messages
var lastOffset atomic.Int64
ch := make(chan bool)
messagesHandler := func(consumerContext stream.ConsumerContext, message *amqp.Message) {
if atomic.CompareAndSwapInt64(&firstOffset, -1, consumerContext.Consumer.GetOffset()) {
fmt.Println("First message received.")
}
if atomic.AddInt64(&messageCount, 1)%10 == 0 {
consumerContext.Consumer.StoreOffset() // store offset every 10 messages
}
if string(message.GetData()) == "marker" {
lastOffset.Store(consumerContext.Consumer.GetOffset())
consumerContext.Consumer.StoreOffset() // store the offset on consumer closing
consumerContext.Consumer.Close()
ch <- true
}
}
var offsetSpecification stream.OffsetSpecification
consumerName := "offset-tracking-tutorial" // name of the consumer
storedOffset, err := env.QueryOffset(consumerName, streamName) // get last stored offset
if errors.Is(err, stream.OffsetNotFoundError) {
// start consuming at the beginning of the stream if no stored offset
offsetSpecification = stream.OffsetSpecification{}.First()
} else {
// start just after the last stored offset
offsetSpecification = stream.OffsetSpecification{}.Offset(storedOffset + 1)
}
_, err = env.NewConsumer(streamName, messagesHandler,
stream.NewConsumerOptions().
SetManualCommit(). // activate manual offset tracking
SetConsumerName(consumerName). // the consumer must a have name
SetOffset(offsetSpecification))
fmt.Println("Started consuming...")
_ = <-ch
fmt.Printf("Done consuming, first offset %d, last offset %d.\n", firstOffset, lastOffset.Load())
最相关的更改是
- 程序在创建消费者之前查找最后存储的 offset。如果没有存储的 offset(很可能是消费者第一次启动),它将使用
first
。如果有存储的 offset,它将使用offset
规范从刚刚之后开始 (stored offset + 1
),这假设具有存储 offset 的消息已在应用程序的先前实例中处理。 - 消费者必须有一个名称。它是存储和检索最后存储的 offset 值的密钥。
- 手动跟踪策略已激活,这意味着显式调用以存储 offset。
- 每 10 条消息存储一次 offset。对于 offset 存储频率来说,这是一个异常低的数值,但对于本教程来说是可以的。实际世界中的值通常在数百或数千。
- offset 在关闭消费者之前存储,就在收到标记消息之后。
让我们运行更新后的接收器
go run offset_tracking_receive.go
这是输出
Started consuming...
First message received.
Done consuming, first offset 0, last offset 99.
没有什么令人惊讶的:消费者从 stream 的开头获取消息,并在到达标记消息时停止。
让我们再次启动它
go run offset_tracking_receive.go
这是输出
Started consuming...
First message received.
Done consuming, first offset 100, last offset 199.
消费者在上次停止的位置重新启动:第一次运行中的最后一个 offset 是 99,而第二次运行中的第一个 offset 是 100。消费者在第一次运行中存储了 offset 跟踪信息,因此客户端库使用它来在第二次运行中恢复到正确的位置进行消费。
本教程结束了关于 RabbitMQ Streams 中消费语义的介绍。它涵盖了消费者如何在 stream 中的任何位置附加。消费应用程序很可能需要跟踪它们在 stream 中到达的点。他们可以使用本教程中演示的内置服务器端 offset 跟踪功能。他们也可以自由使用任何其他数据存储解决方案来完成此任务。
有关 offset 跟踪的更多信息,请参阅 RabbitMQ 博客和 stream Go 客户端文档。