跳至主要内容

RabbitMQ Stream 教程 - "Hello World!"

简介

信息

先决条件

本教程假设 RabbitMQ 已安装,并在 localhost 上运行,并且流插件已启用。标准流端口为 5552。如果您使用不同的主机、端口或凭据,则连接设置需要进行调整。

使用 Docker

如果您没有安装 RabbitMQ,可以在 Docker 容器中运行它

docker run -it --rm --name rabbitmq -p 5552:5552 -p 15672:15672 -p 5672:5672  \
-e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS='-rabbitmq_stream advertised_host localhost' \
rabbitmq:3.13

等待服务器启动,然后启用流和流管理插件

docker exec rabbitmq rabbitmq-plugins enable rabbitmq_stream rabbitmq_stream_management 

获取帮助

如果您在学习本教程时遇到问题,可以通过邮件列表Discord 社区服务器联系我们。

RabbitMQ Streams 在 RabbitMQ 3.9 中引入。更多信息请访问此处

"Hello World"

(使用 .NET/C# 流客户端)

在本教程的这一部分,我们将用 C# 编写两个程序;一个生产者,发送一条消息,以及一个消费者,接收消息并打印出来。我们将略过 .NET 客户端 API 中的一些细节,专注于这个非常简单的事情,以便开始。这是 RabbitMQ Streams 的“Hello World”。

.NET 流客户端库

RabbitMQ 支持多种协议。本教程使用 RabbitMQ 流协议,它是专为RabbitMQ 流设计的协议。许多不同的语言都有许多 RabbitMQ 客户端,请参阅每种语言的流客户端库。我们将使用 RabbitMQ 提供的.NET 流客户端

该客户端支持.NET。本教程将使用 RabbitMQ .NET 流客户端 1.8.0 和 .NET,因此请确保您已安装它并在您的 PATH 中。

RabbitMQ .NET 流客户端 1.8 及更高版本通过nuget分发。

本教程假设您在 Windows 上使用 PowerShell。在 MacOS 和 Linux 上,几乎任何 shell 都可以使用。

设置

首先,让我们验证您是否已将 .NET 工具链添加到 PATH

dotnet --help

运行该命令应该会生成一条帮助消息。

本教程的可执行版本可以在RabbitMQ 教程存储库中找到。

现在让我们生成两个项目,一个用于发布者,一个用于消费者

dotnet new console --name Send
mv Send/Program.cs Send/Send.cs
dotnet new console --name Receive
mv Receive/Program.cs Receive/Receive.cs

这将创建两个名为 SendReceive 的新目录。

然后我们添加客户端依赖项。

cd Send
dotnet add package RabbitMQ.Stream.Client
cd ../Receive
dotnet add package RabbitMQ.Stream.Client
cd ..

现在我们已经设置了 .NET 项目,我们可以编写一些代码了。

发送

我们将我们的消息生产者(发送者)称为 Send.cs,我们的消息消费者(接收者)称为 Receive.cs。生产者将连接到 RabbitMQ,发送一条消息,然后退出。

Send.cs中,我们需要使用一些命名空间

using System.Text;
using RabbitMQ.Stream.Client;
using RabbitMQ.Stream.Client.Reliable;

然后我们可以创建与服务器的连接

var streamSystem = await StreamSystem.Create(new StreamSystemConfig());

流 .NET 客户端的入口点是 StreamSystem。它用于配置 RabbitMQ 流发布者、流消费者和流本身。

它抽象了套接字连接,并为我们处理协议版本协商和身份验证等。

本教程假设流发布者和消费者连接到本地运行的 RabbitMQ 节点,即 localhost 上。要连接到其他机器上的节点,只需在 StreamSystemConfig 上指定目标主机名或 IP 地址即可。

接下来让我们创建一个生产者。

生产者还将声明一个它将向其发布消息的流,然后发布一条消息

await streamSystem.CreateStream(new StreamSpec("hello-stream")
{
MaxLengthBytes = 5_000_000_000
});

var producer = await Producer.Create(new ProducerConfig(streamSystem, "hello-stream"));

await producer.Send(new Message(Encoding.UTF8.GetBytes($"Hello, World")));

流声明操作是幂等的:只有在流不存在时才会创建它。

流是一种追加式日志抽象,允许重复使用消息,直到它们过期。始终定义保留策略是一个好习惯。在上面的示例中,流的大小限制为 5 GiB。

消息内容是一个字节数组。应用程序可以使用任何合适的格式(如 JSON、MessagePack 等)对它们需要传输的数据进行编码。

当上面的代码完成运行时,生产者连接和流系统连接将关闭。这就是我们的生产者。

每次运行生产者时,它都会向服务器发送一条消息,并将该消息追加到流中。

完整的Send.cs 文件可以在 GitHub 上找到。

发送不起作用!

如果这是您第一次使用 RabbitMQ 并且您没有看到“已发送”消息,那么您可能会挠头想知道哪里出了问题。也许代理在启动时没有足够的可用磁盘空间(默认情况下需要至少 50 MB 的可用空间),因此拒绝接受消息。检查代理日志文件,查看是否记录了资源警报,如有必要,降低可用磁盘空间阈值。配置指南将向您展示如何设置 disk_free_limit

另一个原因可能是程序在消息到达代理之前退出。在某些客户端库中,发送是异步的:函数立即返回,但消息在通过网络发送之前会排队到 IO 层。发送程序要求用户按一个键来完成该过程:消息有足够的时间到达代理。流协议提供了一种确认机制来确保代理收到出站消息,但为了简单起见,本教程没有使用这种机制。

接收

本教程的另一部分,消费者,将连接到 RabbitMQ 节点并等待消息被推送到它。与本教程中发布一条消息并停止的生产者不同,消费者将持续运行,使用 RabbitMQ 推送给它的消息,并打印收到的有效负载。

Send.cs 类似,Receive.cs 将需要使用一些命名空间

using System.Text;
using RabbitMQ.Stream.Client;
using RabbitMQ.Stream.Client.Reliable;

在初始设置方面,消费者部分与生产者部分非常相似;我们使用默认连接设置并声明消费者将从中消费的流。

请注意,流名称必须与生产者使用的名称匹配。

var streamSystem = await StreamSystem.Create(new StreamSystemConfig());

await streamSystem.CreateStream(new StreamSpec("hello-stream")
{
MaxLengthBytes = 5_000_000_000
});

请注意,消费者部分也声明了流。这是为了允许任何一方先启动,无论是生产者还是消费者。

Consumer 类用于实例化流消费者,而 ConsumerConfig 记录用于配置它。我们提供了一个 MessageHandler 回调来处理传递的消息。

OffsetSpec 属性定义了消费者的起点。在这种情况下,消费者从流中可用的第一条消息开始。

var consumer = await Consumer.Create(new ConsumerConfig(streamSystem, "hello-stream")
{
OffsetSpec = new OffsetTypeFirst(),
MessageHandler = async (stream, _, _, message) =>
{
Console.WriteLine($"Stream: {stream} - " +
$"Received message: {Encoding.UTF8.GetString(message.Data.Contents)}");
await Task.CompletedTask;
}
});

完整的Receive.cs 文件可以在 GitHub 上找到。

综合示例

为了运行这两个示例,请打开两个终端(shell)选项卡。

本教程的这两个部分可以按任何顺序运行,因为它们都声明了流。让我们先运行消费者,以便在第一个发布者启动时,消费者会打印它

cd Receive
dotnet run

然后运行生产者

cd Send
dotnet run

消费者将打印它从发布者通过 RabbitMQ 获取的消息。消费者将继续运行,等待新的传递。尝试多次重新运行发布者以观察这一点。

流与队列的不同之处在于,它们是可重复使用的消息的追加式日志。当多个消费者从流中消费时,它们将从第一个可用消息开始。

© 2024 RabbitMQ. All rights reserved.