跳至主内容

RabbitMQ Stream 教程 - “Hello World!”

简介

信息

先决条件

本教程假定 RabbitMQ 已安装,正在localhost上运行,并且stream 插件已启用。标准的 stream 端口是 5552。如果您使用不同的主机、端口或凭据,则需要调整连接设置。

使用 Docker

如果您没有安装 RabbitMQ,可以在 Docker 容器中运行它

docker run -it --rm --name rabbitmq -p 5552:5552 -p 15672:15672 -p 5672:5672 \
-e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS='-rabbitmq_stream advertised_host localhost' \
rabbitmq:4-management

等待服务器启动,然后启用 stream 和 stream management 插件

docker exec rabbitmq rabbitmq-plugins enable rabbitmq_stream rabbitmq_stream_management

哪里寻求帮助

如果您在学习本教程时遇到困难,可以通过邮件列表Discord 社区服务器与我们联系。

RabbitMQ Streams 在 RabbitMQ 3.9 中引入。更多信息请参见此处

"Hello World"

(使用 Rust Stream 客户端)

在本教程的这一部分,我们将编写两个 Rust 程序:一个发送单条消息的生产者,以及一个接收并打印消息的消费者。我们将略过 Rust 客户端 API 的一些细节,专注于这个非常简单的入门示例。这就是 RabbitMQ Streams 的“Hello World”。

Rust Stream 客户端库

RabbitMQ 支持多种协议。本教程使用 RabbitMQ 流协议,这是专为 RabbitMQ 流设计的协议。RabbitMQ 在 多种不同语言 中都有相应的客户端,请查看各语言对应的流客户端库。我们将使用 RabbitMQ 提供的 Rust 流客户端

RabbitMQ Rust 流客户端 0.4.2 及更高版本通过 crates.io 分发。

本教程假设您在 Windows 上使用 PowerShell。在 MacOS 和 Linux 上,几乎任何 shell 都可以工作。

设置

首先,请确认您的 PATH 中已配置好 Rust 工具链。

rustc --version
cargo --version

运行该命令应该会产生一个帮助消息。

本教程的可执行版本可以在 RabbitMQ 教程仓库中找到。

现在让我们创建项目。

cargo new rust-stream
cd rust-stream

从 GitHub 上的 Cargo.toml 文件中复制所需的依赖项到您自己的 Cargo.toml 文件中。

现在我们已经设置好了 Rust 项目,可以开始编写代码了。

发送

我们将消息生产者(发送者)命名为 send.rs,将消息消费者(接收者)命名为 receive.rs。生产者将连接到 RabbitMQ,发送单条消息,然后退出。

src/bin 目录下创建 send.rs 文件。

send.rs 中,我们需要一些 use 声明。

use rabbitmq_stream_client::error::StreamCreateError;
use rabbitmq_stream_client::types::{ByteCapacity, Message, ResponseCode};

然后,我们可以创建一个到服务器的连接

use rabbitmq_stream_client::Environment;
let environment = Environment::builder().build().await?;

Stream Rust 客户端的入口点是 Environment。它用于配置 RabbitMQ 流发布者、流消费者以及流本身。

它封装了套接字连接,并为我们处理协议版本协商和身份验证等事宜。

本教程假设流发布者和消费者连接到本地运行的 RabbitMQ 节点,即 localhost。要连接到其他机器上的节点,只需在 Environment::builder() 中指定目标主机名或 IP 地址即可。

接下来,让我们创建一个生产者。

生产者还将声明一个它将发布消息的 stream,然后发布一条消息

let stream = "hello-rust-stream";
let create_response = environment
.stream_creator()
.max_length(ByteCapacity::GB(5))
.create(stream)
.await;

if let Err(e) = create_response {
if let StreamCreateError::Create { stream, status } = e {
match status {
// we can ignore this error because the stream already exists
ResponseCode::StreamAlreadyExists => {}
err => {
println!("Error creating stream: {:?} {:?}", stream, err);
}
}
}
}
let producer = environment.producer().build(stream).await?;
producer
.send_with_confirm(Message::builder().body("Hello, World!").build())
.await?;

流声明操作是幂等的:只有在流不存在时才会创建流。在此示例中,我们忽略了 StreamAlreadyExists 错误,因为我们并不关心流是否已经存在。

stream 是一个追加式日志抽象,允许在消息过期之前反复消费。定义保留策略是一个好习惯。在上面的示例中,stream 的大小限制为 5 GiB。

消息内容是字节数组。应用程序可以使用任何合适的格式(如 JSON、MessagePack 等)来编码需要传输的数据。

当上面的代码运行完毕后,生产者连接和 stream-system 连接将被关闭。我们的生产者就完成了。

每次运行生产者时,它都会向服务器发送一条消息,该消息将被追加到 stream 中。

完整的 send.rs 文件可以在 GitHub 上找到。

发送不起作用!

如果这是您第一次使用 RabbitMQ,但没有看到“Sent”消息,您可能会感到困惑,不知道哪里出了问题。也许代理启动时磁盘空间不足(默认需要至少 50 MB 可用空间),因此拒绝接收消息。检查代理的日志文件,看看是否有资源警报已记录,并在必要时降低可用磁盘空间阈值。配置指南将展示如何设置 disk_free_limit

另一个原因可能是程序在消息发送到代理之前就退出了。在某些客户端库中,发送是异步的:函数立即返回,但消息在发送到网络之前被放入 IO 层进行排队。发送程序要求用户按键完成进程:这样消息就有充足的时间到达代理。Stream 协议提供了一个确认机制来确保代理接收到出站消息,但为了简单起见,本教程不使用此机制。

接收

本教程的另一部分,即消费者,将连接到 RabbitMQ 节点并等待消息被推送到它。与本教程中的生产者不同,生产者发送一条消息然后停止,消费者将持续运行,消费 RabbitMQ 推送给它的消息,并打印收到的负载。

src/bin 目录下创建 receive.rs 文件。

send.rs 类似,receive.rs 也需要 use 声明。

use std::io::stdin;
use rabbitmq_stream_client::error::StreamCreateError;
use rabbitmq_stream_client::types::{ByteCapacity, OffsetSpecification, ResponseCode};
use futures::{StreamExt};
use tokio::task;

在初始设置方面,消费者部分与生产者部分非常相似;我们使用默认的连接设置,并声明消费者将从中消费的 stream。

use rabbitmq_stream_client::Environment;
let environment = Environment::builder().build().await?;
let stream = "hello-rust-stream";
let create_response = environment
.stream_creator()
.max_length(ByteCapacity::GB(5))
.create(stream)
.await;

if let Err(e) = create_response {
if let StreamCreateError::Create { stream, status } = e {
match status {
// we can ignore this error because the stream already exists
ResponseCode::StreamAlreadyExists => {}
err => {
println!("Error creating stream: {:?} {:?}", stream, err);
}
}
}
}

请注意,消费者部分也声明了 stream。这是为了允许任何一方先启动,无论是生产者还是消费者。

我们使用 Consumer 结构体来创建消费者。

Consumer 提供了 next() 方法来从流中获取下一条消息。

offset 定义了消费者的起始位置。在本例中,消费者从流中可用的第一条消息开始消费。

let mut consumer = environment
.consumer()
.offset(OffsetSpecification::First)
.build(stream)
.await
.unwrap();

let handle = consumer.handle();
task::spawn(async move {
while let Some(delivery) = consumer.next().await {
let d = delivery.unwrap();
println!("Got message: {:#?} with offset: {}",
d.message().data().map(|data| String::from_utf8(data.to_vec()).unwrap()),
d.offset(),);
}
});

完整的 receive.rs 文件可以在 GitHub 上找到。

整合

要运行这两个示例,请打开两个终端(shell)标签页。

本教程的两个部分可以按任何顺序运行,因为它们都声明了 stream。让我们先运行消费者,这样当第一个发布者启动时,消费者就会打印出它

cargo run --bin receive

然后运行发布者

cargo run --bin send

消费者将打印它通过 RabbitMQ 从发布者那里收到的消息。消费者将继续运行,等待新的交付。尝试多次重新运行发布者来观察这一点。

Streams 与队列不同,它们是消息的追加式日志,可以被反复消费。当多个消费者从一个 stream 消费时,它们将从第一条可用消息开始。

© . This site is unofficial and not affiliated with VMware.