RabbitMQ 教程 - “Hello World!”
简介
RabbitMQ 是一个消息代理:它接收和转发消息。您可以将其视为邮局:当您将要邮寄的邮件放入邮箱时,您可以确信邮递员最终会将邮件投递给您的收件人。在这个比喻中,RabbitMQ 是邮箱、邮局和邮递员。
RabbitMQ 与邮局的主要区别在于它不处理纸质邮件,而是接收、存储和转发数据二进制块——消息。
RabbitMQ 和一般的消息传递使用一些术语。
-
生产仅仅意味着发送。发送消息的程序是生产者
-
队列是 RabbitMQ 中邮箱的名称。尽管消息流经 RabbitMQ 和您的应用程序,但它们只能存储在队列中。队列仅受主机内存和磁盘限制的约束,它本质上是一个大型消息缓冲区。
许多生产者可以发送消息到一个队列,并且许多消费者可以尝试从一个队列接收数据。
这是我们表示队列的方式
-
消费与接收具有相似的含义。消费者是一个程序,它主要等待接收消息
请注意,生产者、消费者和代理不需要驻留在同一主机上;实际上,在大多数应用程序中它们并不驻留在同一主机上。一个应用程序也可以既是生产者又是消费者。
“Hello World”
(使用 .NET/C# 客户端)
在本教程的这一部分中,我们将用 C# 编写两个程序;一个生产者发送单个消息,一个消费者接收消息并打印出来。我们将略过 .NET 客户端 API 中的一些细节,专注于这个非常简单的事情,以便开始。这是消息传递的“Hello World”。
在下图中,“P” 是我们的生产者,“C” 是我们的消费者。中间的框是一个队列——RabbitMQ 代表消费者保留的消息缓冲区。
.NET 客户端库
RabbitMQ 支持多种协议。本教程使用 AMQP 0-9-1,这是一种开放的、通用的消息传递协议。在许多不同的语言中,都有许多 RabbitMQ 的客户端。我们将使用 RabbitMQ 提供的 .NET 客户端。
该客户端支持.NET Core以及 .NET Framework 4.5.1+。本教程将使用 RabbitMQ .NET 客户端 5.0 和 .NET Core,因此您需要确保已安装它并在您的 PATH 中。
您也可以使用 .NET Framework 完成本教程,但设置步骤会有所不同。
RabbitMQ .NET 客户端 5.0 及更高版本通过nuget分发。
本教程假设您在 Windows 上使用 PowerShell。在 MacOS 和 Linux 上,几乎任何 shell 都可以工作。
设置
首先,让我们验证您的 PATH 中是否包含 .NET Core 工具链
dotnet --help
应产生一条帮助消息。
现在让我们生成两个项目,一个用于发布者,一个用于消费者
dotnet new console --name Send
mv Send/Program.cs Send/Send.cs
dotnet new console --name Receive
mv Receive/Program.cs Receive/Receive.cs
这将创建两个名为 Send
和 Receive
的新目录。
然后添加客户端依赖项。
cd Send
dotnet add package RabbitMQ.Client
cd ../Receive
dotnet add package RabbitMQ.Client
现在我们已经设置了 .NET 项目,我们可以编写一些代码了。
发送
我们将把我们的消息发布者(发送者)称为 Send.cs
,并将我们的消息消费者(接收者)称为 Receive.cs
。发布者将连接到 RabbitMQ,发送一条消息,然后退出。
在Send.cs
中,我们需要使用一些命名空间
using System.Text;
using RabbitMQ.Client;
然后我们可以创建与服务器的连接
var factory = new ConnectionFactory { HostName = "localhost" };
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
...
连接抽象了套接字连接,并为我们处理协议版本协商和身份验证等。在这里,我们连接到本地机器上的 RabbitMQ 节点——因此是 localhost。如果我们要连接到其他机器上的节点,我们只需在此处指定其主机名或 IP 地址即可。
接下来,我们创建一个通道,这是大多数 API 用于完成任务的地方。
要发送,我们必须声明一个队列供我们发送;然后我们可以将消息发布到队列
using System.Text;
using RabbitMQ.Client;
var factory = new ConnectionFactory { HostName = "localhost" };
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
channel.QueueDeclare(queue: "hello",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
const string message = "Hello World!";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: string.Empty,
routingKey: "hello",
basicProperties: null,
body: body);
Console.WriteLine($" [x] Sent {message}");
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
声明队列是幂等的——只有在它不存在时才会创建它。消息内容是一个字节数组,因此您可以在其中编码任何您想要的内容。
当上面的代码运行完成后,通道和连接将被释放。这就是我们的发布者。
发送不起作用!
如果这是您第一次使用 RabbitMQ 并且您没有看到“已发送”消息,那么您可能会挠头想知道哪里出了问题。也许代理在启动时没有足够的可用磁盘空间(默认情况下,它至少需要 50 MB 的可用空间),因此拒绝接收消息。检查代理日志文件以查看是否记录了资源警报,并在必要时降低可用磁盘空间阈值。配置指南将向您展示如何设置
disk_free_limit
。
接收
至于消费者,它正在侦听来自 RabbitMQ 的消息。因此,与发布单个消息的发布者不同,我们将使消费者持续运行以侦听消息并将其打印出来。
代码(在Receive.cs
中)与 Send
具有几乎相同的 using
语句
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
设置与发布者相同;我们打开一个连接和一个通道,并声明我们将从中消费的队列。请注意,这与 Send
发布到的队列相匹配。
var factory = new ConnectionFactory { HostName = "localhost" };
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
channel.QueueDeclare(queue: "hello",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
...
请注意,我们也在此处声明了队列。因为我们可能在发布者之前启动消费者,所以我们要确保队列存在,然后再尝试从中消费消息。
我们即将告诉服务器将队列中的消息传递给我们。由于它将异步地向我们推送消息,因此我们提供了一个回调。这就是 EventingBasicConsumer.Received
事件处理程序的作用。
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
var factory = new ConnectionFactory { HostName = "localhost" };
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
channel.QueueDeclare(queue: "hello",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
Console.WriteLine(" [*] Waiting for messages.");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine($" [x] Received {message}");
};
channel.BasicConsume(queue: "hello",
autoAck: true,
consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
综合应用
打开两个终端。
您可以按任何顺序运行客户端,因为两者都声明了队列。我们将首先运行消费者,以便您可以看到它正在等待然后接收消息
cd Receive
dotnet run
然后运行生产者
cd Send
dotnet run
消费者将打印它通过 RabbitMQ 从发布者收到的消息。消费者将继续运行,等待消息,因此尝试重新启动发布者几次。
现在是时候继续学习第 2 部分并构建一个简单的工作队列了。