跳至主要内容

RabbitMQ Stream 教程 - 偏移量跟踪

简介

信息

先决条件

本教程假设 RabbitMQ 已安装,并在 localhost 上运行,并且已启用流插件标准流端口 为 5552。如果您使用不同的主机、端口或凭据,则需要调整连接设置。

使用 Docker

如果您尚未安装 RabbitMQ,可以在 Docker 容器中运行它

docker run -it --rm --name rabbitmq -p 5552:5552 -p 15672:15672 -p 5672:5672  \
-e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS='-rabbitmq_stream advertised_host localhost' \
rabbitmq:3.13

等待服务器启动,然后启用流和流管理插件

docker exec rabbitmq rabbitmq-plugins enable rabbitmq_stream rabbitmq_stream_management 

获取帮助

如果您在学习本教程时遇到问题,可以通过邮件列表Discord 社区服务器 联系我们。

RabbitMQ Streams 在 RabbitMQ 3.9 中引入。更多信息请访问此处

偏移量跟踪

设置

本教程的一部分包括用 Python 编写两个程序;一个生产者,发送一系列消息,并在最后发送一个标记消息,以及一个消费者,接收消息并在收到标记消息时停止。它展示了消费者如何浏览流,甚至可以在之前的执行中从中断的地方重新开始。

本教程使用rstream Python 客户端。请确保按照第一个教程中的设置步骤 进行操作。

本教程的可执行版本可以在RabbitMQ 教程仓库 中找到。

请注意,可执行版本已实现本教程末尾解释的服务器端偏移量跟踪 功能,在测试此场景时需要考虑这一点。

发送程序称为 offset_tracking_send.py,接收程序称为 offset_tracking_receive.py。本教程重点介绍客户端库的使用,因此应使用存储库中的最终代码创建文件的脚手架(例如导入、主函数等)。

发送

发送程序创建 Producer 实例并发布 100 条消息。

最后一条消息的消息体值设置为 marker;这是消费者停止消费的标记。

该程序通过 _on_publish_confirm_client 回调处理消息确认。

请注意 asyncio.Condition 的使用:主例程在它上面等待,直到所有消息在 _on_publish_confirm_client 回调中得到确认,然后通知主例程。这确保了代理在关闭程序之前收到了所有消息。

STREAM = "stream-offset-tracking-python"
MESSAGES = 100
# 2GB
STREAM_RETENTION = 2000000000
confirmed_messages = 0
all_confirmed_messages_cond = asyncio.Condition()

async def _on_publish_confirm_client(confirmation: ConfirmationStatus) -> None:
global confirmed_messages
if confirmation.is_confirmed:
confirmed_messages = confirmed_messages + 1
if confirmed_messages == 100:
async with all_confirmed_messages_cond:
all_confirmed_messages_cond.notify()

async def publish():
async with Producer("localhost", username="guest", password="guest") as producer:
# create a stream if it doesn't already exist
await producer.create_stream(
STREAM, exists_ok=True, arguments={"max-length-bytes": STREAM_RETENTION}
)

print("Publishing {} messages".format(MESSAGES))
# Send 99 hello message
for i in range(MESSAGES - 1):
amqp_message = AMQPMessage(
body=bytes("hello: {}".format(i), "utf-8"),
)

await producer.send(
stream=STREAM,
message=amqp_message,
on_publish_confirm=_on_publish_confirm_client,
)
# Send a final marker message
amqp_message = AMQPMessage(
body=bytes("marker: {}".format(i + 1), "utf-8"),
)

await producer.send(
stream=STREAM,
message=amqp_message,
on_publish_confirm=_on_publish_confirm_client,
)

async with all_confirmed_messages_cond:
await all_confirmed_messages_cond.wait()

print("Messages confirmed.")


asyncio.run(publish())

现在让我们创建接收程序。

接收

接收程序启动一个消费者,该消费者从流的开头附加 ConsumerOffsetSpecification(OffsetType.FIRST)。它使用两个变量:first_offsetlast_offset 来输出程序结束时接收到的第一条和最后一条消息的偏移量。on_message 回调处理传入的消息。当消费者接收到标记消息时停止:它将消息偏移量分配给 last_offset 变量并关闭消费者。

message_count = -1
first_offset = -1
last_offset = -1
STREAM_NAME = "stream-offset-tracking-python"
# 2GB
STREAM_RETENTION = 2000000000

async def on_message(msg: AMQPMessage, message_context: MessageContext):
global first_offset
global last_offset

offset = message_context.offset
if first_offset == -1:
print("First message received")
first_offset = offset

consumer = message_context.consumer
stream = message_context.consumer.get_stream(message_context.subscriber_name)

if "marker" in str(msg):
last_offset = offset
await consumer.close()

async def consume():

global first_offset
global last_offset

consumer = Consumer(
host="localhost",
port=5552,
username="guest",
password="guest",
)

await consumer.create_stream(
STREAM_NAME, exists_ok=True, arguments={"max-length-bytes": STREAM_RETENTION}
)

try:
await consumer.start()
print("Starting consuming Press control +C to close")

await consumer.subscribe(
stream=STREAM_NAME,
callback=on_message,
decoder=amqp_decoder,
offset_specification=ConsumerOffsetSpecification(
OffsetType.FIRST
),
)
await consumer.run()

except (KeyboardInterrupt, asyncio.exceptions.CancelledError):
await consumer.close()

# give time to the consumer task to close the consumer
await asyncio.sleep(1)

if first_offset != -1:
print(
"Done consuming first_offset: {} last_offset {} ".format(
first_offset, last_offset
)
)


with asyncio.Runner() as runner:
runner.run(consume())

探索流

要运行这两个示例,请打开两个终端(shell)选项卡。

在第一个选项卡中,运行发送方发布一系列消息

 python3 offset_tracking_send.py

输出如下

Publishing 100 messages...
Messages confirmed: true.

现在让我们运行接收方。打开一个新选项卡。请记住,它应该从流的开头开始,因为使用了 FIRST 偏移量规范。

 python3 offset_tracking_receive.py

以下是输出

Started consuming: Press control +C to close
First message received.
Done consuming, first offset 0, last offset 99.
什么是偏移量?

流可以看作一个数组,其中元素是消息。偏移量是给定消息在数组中的索引。

流与队列不同:消费者可以读取和重新读取相同的消息,并且消息保留在流中。

让我们尝试使用 ConsumerOffsetSpecification(OffsetType.OFFSET, long) 规范从不同的偏移量(非 0)开始附加来使用此功能。将消费者的订阅方法中的 ConsumerOffsetSpecification 变量从

offset_specification=ConsumerOffsetSpecification(
OffsetType.FIRST
),

更改为

offset_specification = ConsumerOffsetSpecification(
OffsetType.OFFSET, 42
)

偏移量 42 是任意的,它可以是 0 到 99 之间的任何数字。再次运行接收方

 python3 offset_tracking_receive.py

输出如下

Started consuming: Press control +C to close
First message received.
Done consuming, first offset 42, last offset 99.

还有一种方法可以附加到流的末尾,以便在消费者创建时仅查看新消息。这是 ConsumerOffsetSpecification(OffsetType.NEXT) 偏移量规范。让我们尝试一下

offset_specification = ConsumerOffsetSpecification(
OffsetType.NEXT)

运行接收方

 python3 offset_tracking_receive.py

这次消费者没有收到任何消息

Started consuming: Press control +C to close

它正在等待流中的新消息。让我们通过再次运行发送方来发布一些消息。回到第一个选项卡

 python3 offset_tracking_send.py

等待程序退出并切换回接收方选项卡。消费者收到了新消息

Started consuming: Press control +C to close
First message received.
Done consuming, first offset 100, last offset 199.

接收方由于发送方在流末尾放置的新标记消息而停止。

本节介绍了如何“浏览”流:从开头、从任何偏移量,甚至获取新消息。下一节介绍如何利用服务器端偏移量跟踪来恢复消费者在之前执行中停止的位置。

服务器端偏移量跟踪

RabbitMQ Streams 提供服务器端偏移量跟踪来存储给定消费者在流中的进度。如果消费者因任何原因停止(崩溃、升级等),它将能够从之前停止的位置重新附加,以避免处理相同的消息。

RabbitMQ Streams 提供了用于偏移量跟踪的 API,但也可以使用其他解决方案来存储消费应用程序的进度。这可能取决于用例,但关系数据库也可能是一个不错的解决方案。

让我们修改接收方以存储已处理消息的偏移量。更新的行用注释标出

async def on_message(msg: AMQPMessage, message_context: MessageContext):
# variable to keep track of the number of received messages
global message_count
global first_offset
global last_offset

offset = message_context.offset
if first_offset == -1:
print("First message received")
first_offset = offset

consumer = message_context.consumer
stream = message_context.consumer.get_stream(message_context.subscriber_name)

# store the offset after every 10 messages received
message_count = message_count + 1

if message_count % 10 == 0:
# store_message needs to take a subscriber_name parameter
await consumer.store_offset(
stream=stream,
offset=offset,
subscriber_name=message_context.subscriber_name,
)

# store the offset after receiving the marker message
if "marker" in str(msg):
await consumer.store_offset(
stream=stream,
offset=offset,
subscriber_name=message_context.subscriber_name,
)
last_offset = offset
await consumer.close()

async def consume():
# the offset to start consuming from
stored_offset = -1
global first_offset
global last_offset

# start a consumer and creates the stream is not exist (same as before...)

try:
await consumer.start()
print("Started consuming: Press control +C to close")
try:
# query_offset must take a subscriber_name as parameter
stored_offset = await consumer.query_offset(
stream=STREAM_NAME, subscriber_name="subscriber_1"
)
except OffsetNotFound as offset_exception:
print(f"Offset not previously stored. {offset_exception}")

except ServerError as server_error:
print(f"Server error: {server_error}")
exit(1)

# if no offset was previously stored start from the first offset
stored_offset = stored_offset + 1
await consumer.subscribe(
stream=STREAM_NAME,
# We explicitely need to assign a name to the consumer
subscriber_name="subscriber_1",
callback=on_message,
decoder=amqp_decoder,
offset_specification=ConsumerOffsetSpecification(
OffsetType.OFFSET, stored_offset
),
)
await consumer.run()

except (KeyboardInterrupt, asyncio.exceptions.CancelledError):
await consumer.close()

最相关的更改是

  • 消费者必须有一个名称。它是存储和检索最后存储的偏移量值的键。
  • 每 10 条消息存储一次偏移量。对于偏移量存储频率来说,这是一个异常低的值,但这对于本教程来说是可以的。现实世界中的值通常在数百或数千范围内。
  • 在关闭消费者之前,也就是在接收到标记消息后立即存储偏移量。

让我们运行接收方

 python3 offset_tracking_receive.py

以下是输出

Started consuming: Press control +C to close
First message received.
Done consuming, first offset 0, last offset 99.

这里没有什么令人惊讶的:消费者从流的开头获取消息,并在到达标记消息时停止。

让我们再次启动它

 python3 offset_tracking_receive.py

以下是输出

Started consuming...
First message received.
Done consuming, first offset 100, last offset 199.

消费者从中断的地方重新开始:第一次运行中的最后一个偏移量是 99,第二次运行中的第一个偏移量是 100。消费者在第一次运行中存储了偏移量跟踪信息,因此客户端库在第二次运行中使用它来恢复到正确的位置。

至此,本教程关于 RabbitMQ Streams 中的消费语义部分结束。它介绍了消费者如何附加到流中的任何位置。消费应用程序可能会跟踪它们在流中到达的点。它们可以使用内置的服务器端偏移量跟踪功能,如本教程中所示。它们也可以自由地使用任何其他数据存储解决方案来完成此任务。

有关偏移量跟踪的更多信息,请参阅RabbitMQ 博客rstream 文档

© 2024 RabbitMQ. All rights reserved.