跳至主要内容

RabbitMQ 流教程 - "Hello World!"

简介

信息

先决条件

本教程假设 RabbitMQ 已安装,运行在 localhost 上,并且流插件已启用。 标准流端口为 5552。 如果您使用不同的主机、端口或凭据,则连接设置需要调整。

使用 Docker

如果您没有安装 RabbitMQ,可以在 Docker 容器中运行它

docker run -it --rm --name rabbitmq -p 5552:5552 -p 15672:15672 -p 5672:5672  \
-e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS='-rabbitmq_stream advertised_host localhost' \
rabbitmq:3.13

等待服务器启动,然后启用流和流管理插件

docker exec rabbitmq rabbitmq-plugins enable rabbitmq_stream rabbitmq_stream_management 

在何处获取帮助

如果您在完成本教程时遇到问题,可以通过邮件列表Discord 社区服务器联系我们。

RabbitMQ Streams 在 RabbitMQ 3.9 中引入。 更多信息请参见此处

"Hello World"

(使用 Rust 流客户端)

在本教程的这一部分,我们将用 Rust 编写两个程序;一个发送单个消息的生产者,以及一个接收消息并打印它们的消费者。 我们将略过 Rust 客户端 API 中的一些细节,集中在这个非常简单的事情上,只是为了开始。 这是 RabbitMQ Streams 的“Hello World”。

Rust 流客户端库

RabbitMQ 支持多种协议。 本教程使用 RabbitMQ 流协议,它是一种专用于RabbitMQ 流的协议。 有许多用于 RabbitMQ 的客户端多种不同的语言,请参阅每种语言的流客户端库。 我们将使用 RabbitMQ 提供的Rust 流客户端

RabbitMQ Rust 流客户端 0.4.2 及更高版本通过crates.io分发。

本教程假设您在 Windows 上使用 PowerShell。 在 MacOS 和 Linux 上,几乎任何 Shell 都可以使用。

设置

首先让我们验证您在 PATH 中是否拥有 Rust 工具链

rustc --version
cargo --version

运行该命令应该会产生一条帮助消息。

本教程的可执行版本可以在RabbitMQ 教程存储库中找到。

现在让我们创建项目

cargo new rust-stream
cd rust-stream

从 GitHub 上的Cargo.toml 文件复制所需的依赖项到您自己的 Cargo.toml 文件中。

现在我们已经设置了 Rust 项目,可以编写一些代码了。

发送

我们将消息生产者(发送者)称为 send.rs,消息消费者(接收者)称为 receive.rs。 生产者将连接到 RabbitMQ,发送单个消息,然后退出。

src/bin 目录中创建 send.rs 文件。

send.rs中,我们需要一些 use 声明

use rabbitmq_stream_client::error::StreamCreateError;
use rabbitmq_stream_client::types::{ByteCapacity, Message, ResponseCode};

然后我们可以创建与服务器的连接

use rabbitmq_stream_client::Environment;
let environment = Environment::builder().build().await?;

流 Rust 客户端的入口点是 Environment。 它用于配置 RabbitMQ 流发布者、流消费者和流本身。

它抽象了套接字连接,并为我们处理协议版本协商、身份验证等。

本教程假设流发布者和消费者连接到本地运行的 RabbitMQ 节点,即在 localhost 上。 要连接到其他机器上的节点,只需在 Environment::builder() 上指定目标主机名或 IP 地址。

接下来让我们创建一个生产者。

生产者还将声明一个它将发布消息的流,然后发布一条消息

let stream = "hello-rust-stream";
let create_response = environment
.stream_creator()
.max_length(ByteCapacity::GB(5))
.create(stream)
.await;

if let Err(e) = create_response {
if let StreamCreateError::Create { stream, status } = e {
match status {
// we can ignore this error because the stream already exists
ResponseCode::StreamAlreadyExists => {}
err => {
println!("Error creating stream: {:?} {:?}", stream, err);
}
}
}
}
let producer = environment.producer().build(stream).await?;
producer
.send_with_confirm(Message::builder().body("Hello, World!").build())
.await?;

流声明操作是幂等的:只有在流不存在时才会创建流。 在这种情况下,我们忽略错误 StreamAlreadyExists,因为我们不关心流是否已经存在。

流是一种追加式日志抽象,允许重复使用消息,直到它们过期。 最好始终定义保留策略。 在上面的示例中,流的大小限制为 5 GiB。

消息内容是一个字节数组。 应用程序可以使用任何合适的格式(如 JSON、MessagePack 等)对它们需要传输的数据进行编码。

上面的代码运行完成后,生产者连接和流系统连接将关闭。 这就是我们的生产者的全部内容。

每次运行生产者时,它都会向服务器发送一条消息,这条消息将被追加到流中。

可以在 GitHub 上找到完整的send.rs 文件

发送不起作用!

如果您是第一次使用 RabbitMQ,并且没有看到“已发送”消息,那么您可能会摸不着头脑,想知道问题出在哪里。 也许代理在启动时没有足够的可用磁盘空间(默认情况下至少需要 50 MB 的可用空间),因此拒绝接受消息。 检查代理日志文件,看看是否有资源警报记录,并根据需要降低可用磁盘空间阈值。 配置指南将向您展示如何设置 disk_free_limit

另一个原因可能是程序在消息到达代理之前退出。 在某些客户端库中,发送是异步的:该函数立即返回,但消息在通过网络传输之前会在 IO 层排队。 发送程序要求用户按一个键来完成该过程:消息有足够的时间到达代理。 流协议提供了一种确认机制来确保代理接收到出站消息,但为了简单起见,本教程没有使用这种机制。

接收

本教程的另一部分,消费者,将连接到 RabbitMQ 节点,并等待消息被推送到它。 与本教程中只发布一条消息就停止的生产者不同,消费者将持续运行,消耗 RabbitMQ 推送给它的消息,并打印收到的有效负载。

src/bin 目录中创建 receive.rs 文件。

send.rs 类似,receive.rs 需要 use 声明

use std::io::stdin;
use rabbitmq_stream_client::error::StreamCreateError;
use rabbitmq_stream_client::types::{ByteCapacity, OffsetSpecification, ResponseCode};
use futures::{StreamExt};
use tokio::task;

在初始设置方面,消费者部分与生产者部分非常相似;我们使用默认连接设置,并声明消费者将从中消耗的流。

use rabbitmq_stream_client::Environment;
let environment = Environment::builder().build().await?;
let stream = "hello-rust-stream";
let create_response = environment
.stream_creator()
.max_length(ByteCapacity::GB(5))
.create(stream)
.await;

if let Err(e) = create_response {
if let StreamCreateError::Create { stream, status } = e {
match status {
// we can ignore this error because the stream already exists
ResponseCode::StreamAlreadyExists => {}
err => {
println!("Error creating stream: {:?} {:?}", stream, err);
}
}
}
}

请注意,消费者部分也会声明流。 这样是为了允许生产者或消费者中的任何一方先启动。

我们使用 Consumer 结构体来创建消费者。

Consumer 提供了 next() 方法来从流中获取下一条消息。

offset 定义了消费者的起始点。 在这种情况下,消费者从流中可用的第一条消息开始。

let mut consumer = environment
.consumer()
.offset(OffsetSpecification::First)
.build(stream)
.await
.unwrap();

let handle = consumer.handle();
task::spawn(async move {
while let Some(delivery) = consumer.next().await {
let d = delivery.unwrap();
println!("Got message: {:#?} with offset: {}",
d.message().data().map(|data| String::from_utf8(data.to_vec()).unwrap()),
d.offset(),);
}
});

可以在 GitHub 上找到完整的receive.rs 文件

综合运用

为了运行这两个示例,请打开两个终端(Shell)选项卡。

本教程的这两个部分可以按任何顺序运行,因为它们都会声明流。 让我们先运行消费者,这样当第一个发布者启动时,消费者将打印它

cargo run --bin receive

然后运行生产者

cargo run --bin send

消费者将打印它从发布者通过 RabbitMQ 获取的消息。 消费者将继续运行,等待新的传递。 尝试多次重新运行发布者以观察这一点。

流与队列不同,流是消息的追加式日志,可以重复使用。 当多个消费者从一个流中消耗时,它们将从第一个可用消息开始。

© 2024 RabbitMQ. All rights reserved.