跳到主要内容

RabbitMQ Stream 教程 - “Hello World!”

简介

信息

前提条件

本教程假设您已安装 RabbitMQ,并在 localhost 上运行,且已启用 stream 插件。标准 stream 端口为 5552。如果您使用不同的主机、端口或凭据,则需要调整连接设置。

使用 Docker

如果您没有安装 RabbitMQ,可以在 Docker 容器中运行它

docker run -it --rm --name rabbitmq -p 5552:5552 -p 15672:15672 -p 5672:5672  \
-e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS='-rabbitmq_stream advertised_host localhost' \
rabbitmq:4-management

等待服务器启动,然后启用 stream 和 stream management 插件

docker exec rabbitmq rabbitmq-plugins enable rabbitmq_stream rabbitmq_stream_management 

在哪里获得帮助

如果您在学习本教程时遇到问题,可以通过邮件列表Discord 社区服务器 联系我们。

RabbitMQ Streams 在 RabbitMQ 3.9 中引入。更多信息请点击 这里

“Hello World”

(使用 Python (rstream) Stream 客户端)

在本教程的这一部分,我们将用 Python 编写两个程序:一个生产者,用于发送单条消息;一个消费者,用于接收消息并打印出来。我们将略过 Python 客户端 API 的一些细节,专注于这个非常简单的示例,以便快速入门。这是 RabbitMQ Streams 的 “Hello World”。

Python (rstream) stream 客户端库

RabbitMQ 支持多种协议。本教程使用 RabbitMQ stream 协议,它是 RabbitMQ streams 的专用协议。RabbitMQ 有许多不同语言的客户端,请查看 每种语言的 stream 客户端库。我们将使用由 George Fortunatov 最初构建,现在由 RabbitMQ 支持的 Python (rstream) stream 客户端

该客户端支持 Python >= 3.9。本教程将使用 rstream 客户端 0.19.1 版本。Python (rstream) 客户端 0.19.1 及更高版本通过 pip 发布。

本教程假设您在 Windows 上使用 powershell。在 MacOS 和 Linux 上,几乎任何 shell 都可以工作。

设置

首先,让我们验证您的 PATH 中是否包含 Python 工具链

python --help

运行该命令应生成帮助消息。

本教程的可执行版本可以在 RabbitMQ tutorials 仓库中找到。

现在,让我们创建一个文件夹项目并安装依赖项

# using pip
mkdir python-rstream
cd python-rstream
pip install rstream

# using Pipenv
mkdir python-rstream
cd python-rstream
pipenv install rstream
pipenv shell

现在创建两个新文件,分别命名为 send.pyreceive.py。现在我们已经设置好了 Python 项目,可以编写一些代码了。

发送

我们将消息生产者(发送者)命名为 send.py,消息消费者(接收者)命名为 receive.py。生产者将连接到 RabbitMQ,发送一条消息,然后退出。

send.py 中,我们需要一些导入

import asyncio
from rstream import Producer

然后我们可以创建到服务器的连接

async with Producer(
host="localhost",
username="guest",
password="guest",
) as producer

生产者的入口点是 Producer 类。它用于配置 RabbitMQ stream 发布者和 stream 本身。

它抽象了套接字连接,并为我们处理协议版本协商和身份验证等。

本教程假设 stream 发布者和消费者连接到本地运行的 RabbitMQ 节点,即 localhost。要连接到不同机器上的节点,只需在 Producers 参数中指定目标主机名或 IP 地址。

接下来,让我们创建一个生产者。

生产者还将声明一个 stream,它将向该 stream 发布消息,然后发布一条消息


STREAM_NAME = "hello-python-stream"
# 5GB
STREAM_RETENTION = 5000000000

await producer.create_stream(
STREAM_NAME, exists_ok=True, arguments={"MaxLengthBytes": STREAM_RETENTION})

await producer.send(stream=STREAM_NAME, message=b"Hello, World!")

stream 声明操作是幂等的:仅当 stream 尚不存在时才会被创建。

stream 是一种仅追加日志的抽象,允许重复消费消息,直到消息过期。始终定义保留策略是一个好习惯。在上面的示例中,stream 的大小限制为 5 GiB。

消息内容是一个字节数组。应用程序可以使用任何适当的格式(例如 JSON、MessagePack 等)对需要传输的数据进行编码。

当上面的代码运行完成后,生产者连接将被关闭。这就是我们的生产者。

每次运行生产者时,它都会向服务器发送一条消息,并且该消息将被追加到 stream 中。

完整的 send.py 文件可以在 GitHub 上找到。

发送不工作!

如果您是第一次使用 RabbitMQ,并且没有看到 “Sent” 消息,那么您可能会挠头想知道哪里出了问题。可能是 broker 启动时没有足够的可用磁盘空间(默认情况下至少需要 50 MB 可用空间),因此拒绝接受消息。检查 broker 日志文件,查看是否有 资源告警 记录,如有必要,降低可用磁盘空间阈值。《配置指南》将向您展示如何设置 disk_free_limit

另一个原因可能是程序在消息到达 broker 之前就退出了。在某些客户端库中,发送是异步的:函数立即返回,但消息在通过网络传输之前会在 IO 层排队。发送程序要求用户按键以完成进程:消息有足够的时间到达 broker。stream 协议提供了一种确认机制,以确保 broker 接收到出站消息,但为了简单起见,本教程未使用此机制。

接收

本教程的另一部分,消费者,将连接到 RabbitMQ 节点并等待消息被推送给它。与本教程中发布单条消息后停止的生产者不同,消费者将持续运行,消费 RabbitMQ 推送给它的消息,并打印接收到的负载。

send.py 类似,receive.py 也需要一些导入

import asyncio
import signal

from rstream import (
AMQPMessage,
Consumer,
MessageContext,
ConsumerOffsetSpecification,
OffsetType
)

在初始设置方面,消费者部分与生产者部分非常相似;我们使用默认连接设置并声明消费者将从中消费的 stream。

consumer = Consumer(host="localhost", username="guest", password="guest")
await consumer.create_stream(
STREAM_NAME, exists_ok=True, arguments={"MaxLengthBytes": STREAM_RETENTION}
)

请注意,消费者部分也声明了 stream。这是为了允许生产者或消费者任何一方首先启动。

我们为 consumer.subscribe 函数提供了一个 on_message 回调。

offset_specification 定义了消费者的起始点。在本例中,消费者从 stream 中可用的第一条消息开始。

async def on_message(msg: AMQPMessage, message_context: MessageContext):
stream = message_context.consumer.get_stream(message_context.subscriber_name)
print("Got message: {} from stream {}".format(msg, stream))

await consumer.start()
await consumer.subscribe(
stream=STREAM_NAME,
callback=on_message,
offset_specification=ConsumerOffsetSpecification(OffsetType.FIRST, None),
)
await consumer.run()

完整的 receive.py 文件可以在 GitHub 上找到。

将它们放在一起

为了运行这两个示例,请打开两个终端(shell)标签页。

本教程的两个部分可以按任意顺序运行,因为它们都声明了 stream。让我们首先运行消费者,这样当第一个发布者启动时,消费者将打印它

python receive.py

然后运行生产者

python send.py

消费者将打印它通过 RabbitMQ 从发布者那里收到的消息。消费者将保持运行,等待新的交付。尝试多次重新运行发布者以观察这一点。

stream 与队列的不同之处在于,它们是可以重复消费的消息的仅追加日志。当多个消费者从一个 stream 消费时,他们将从第一条可用消息开始。

© . All rights reserved.