跳至主要内容

RabbitMQ 教程 - 工作队列

工作队列

(使用 .NET 客户端)

信息

先决条件

本教程假设 RabbitMQ 已安装并在 localhost 上的标准端口 (5672) 上运行。如果您使用不同的主机、端口或凭据,则连接设置需要进行调整。

获取帮助

如果您在学习本教程时遇到问题,您可以通过GitHub 讨论RabbitMQ 社区 Discord联系我们。

第一个教程中,我们编写了程序来发送和接收来自命名队列的消息。在本教程中,我们将创建一个工作队列,用于在多个工作进程之间分配耗时的任务。

工作队列(也称为:任务队列)背后的主要思想是避免立即执行资源密集型任务并等待其完成。相反,我们安排稍后执行任务。我们将一个任务封装为消息并将其发送到队列。后台运行的工作进程将弹出任务并最终执行作业。当您运行多个工作进程时,任务将在它们之间共享。

此概念在 Web 应用程序中特别有用,在 Web 应用程序中,无法在短暂的 HTTP 请求窗口期间处理复杂的任务。

准备

在本教程的前面部分,我们发送了一条包含“Hello World!”的消息。现在我们将发送代表复杂任务的字符串。我们没有现实世界中的任务,例如需要调整大小的图像或需要渲染的 pdf 文件,因此让我们通过假装忙碌来模拟它 - 通过使用 Thread.Sleep() 函数(您需要在文件顶部附近添加 using System.Threading; 以访问线程 API)。我们将字符串中的点号数量作为其复杂度;每个点号将代表一秒钟的“工作”。例如,由 Hello... 描述的伪任务将花费三秒钟。

我们将稍微修改我们之前示例中的发送程序,以允许从命令行发送任意消息。此程序将任务安排到我们的工作队列中,因此让我们将其命名为 NewTask

教程一一样,我们需要生成两个项目。

dotnet new console --name NewTask
mv NewTask/Program.cs NewTask/NewTask.cs
dotnet new console --name Worker
mv Worker/Program.cs Worker/Worker.cs
cd NewTask
dotnet add package RabbitMQ.Client
cd ../Worker
dotnet add package RabbitMQ.Client

将我们旧的 Send.cs 中的代码复制到 NewTask.cs 中,并进行以下修改。

更新message变量的初始化

var message = GetMessage(args);

GetMessage 方法添加到 NewTask 类的末尾

static string GetMessage(string[] args)
{
return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!");
}

我们旧的 Receive.cs 脚本也需要进行一些更改,以便为消息正文中的每个点模拟一秒钟的工作。它将处理 RabbitMQ 传递的消息并执行任务,因此让我们将其复制到 Worker.cs 并进行如下修改。

在接收消息的现有 WriteLine 之后,添加模拟执行时间的伪任务

Console.WriteLine($" [x] Received {message}");

int dots = message.Split('.').Length - 1;
Thread.Sleep(dots * 1000);

Console.WriteLine(" [x] Done");

循环分发

使用任务队列的优点之一是可以轻松地并行化工作。如果我们正在积压工作,我们可以添加更多工作进程,从而轻松扩展。

首先,让我们尝试同时运行两个 Worker 实例。它们都将从队列中获取消息,但究竟是如何获取的呢?让我们看看。

您需要打开三个控制台。两个将运行 Worker 程序。这些控制台将是我们的两个消费者 - C1 和 C2。

# shell 1
cd Worker
dotnet run
# => Press [enter] to exit.
# shell 2
cd Worker
dotnet run
# => Press [enter] to exit.

在第三个控制台中,我们将发布新任务。启动消费者后,您可以发布一些消息

# shell 3
cd NewTask
dotnet run "First message."
dotnet run "Second message.."
dotnet run "Third message..."
dotnet run "Fourth message...."
dotnet run "Fifth message....."

让我们看看我们的工作进程收到了什么

# shell 1
# => Press [enter] to exit.
# => [x] Received First message.
# => [x] Done
# => [x] Received Third message...
# => [x] Done
# => [x] Received Fifth message.....
# => [x] Done
# shell 2
# => Press [enter] to exit.
# => [x] Received Second message..
# => [x] Done
# => [x] Received Fourth message....
# => [x] Done

默认情况下,RabbitMQ 将依次将每条消息发送到下一个消费者。平均而言,每个消费者将获得相同数量的消息。这种分发消息的方式称为循环分发。尝试使用三个或更多工作进程来体验一下。

消息确认

执行任务可能需要几秒钟。您可能想知道如果某个消费者启动一个长时间的任务并在任务完成之前死亡会发生什么。使用我们当前的代码,一旦 RabbitMQ 将消息传递给消费者,它就会立即将其标记为删除。在这种情况下,如果您终止工作进程,我们将丢失它正在处理的消息。我们还将丢失分配给此特定工作进程但尚未处理的所有消息。

但我们不想丢失任何任务。如果工作进程死亡,我们希望任务被传递给另一个工作进程。

为了确保消息永不丢失,RabbitMQ 支持消息确认。确认由消费者发送回,以告知 RabbitMQ 特定消息已接收、处理以及 RabbitMQ 可以将其删除。

如果消费者在未发送确认的情况下死亡(其通道关闭、连接关闭或 TCP 连接丢失),RabbitMQ 将理解消息未完全处理,并将其重新排队。如果有其他消费者同时在线,它将很快将其重新传递给另一个消费者。这样,您可以确保即使工作进程偶尔死亡,也不会丢失任何消息。

消费者传递确认上强制执行超时(默认为 30 分钟)。这有助于检测从不确认传递的有问题的(卡住的)消费者。您可以按照传递确认超时中的说明增加此超时。

手动消息确认默认情况下处于启用状态。在前面的示例中,我们通过将 autoAck(“自动确认模式”)参数设置为 true 来显式地将其关闭。现在是时候删除此标志并在完成任务后从工作进程手动发送正确的确认了。

在现有 WriteLine 之后,添加对 BasicAck 的调用,并使用 autoAck:false 更新 BasicConsume

    Console.WriteLine(" [x] Done");

// here channel could also be accessed as ((EventingBasicConsumer)sender).Model
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
channel.BasicConsume(queue: "hello",
autoAck: false,
consumer: consumer);

使用此代码,您可以确保即使您在工作进程节点处理消息时使用 CTRL+C 终止它,也不会丢失任何内容。工作进程节点终止后不久,所有未确认的消息都将重新传递。

必须在接收传递的同一通道上发送确认。尝试使用不同的通道进行确认会导致通道级协议异常。请参阅关于确认的文档指南以了解更多信息。

忘记确认

遗漏 BasicAck 是一个常见的错误。这是一个很容易犯的错误,但后果很严重。当您的客户端退出时,消息将被重新传递(这可能看起来像随机重新传递),但 RabbitMQ 将消耗越来越多的内存,因为它将无法释放任何未确认的消息。

为了调试这种错误,您可以使用 rabbitmqctl 打印 messages_unacknowledged 字段

sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged

在 Windows 上,删除 sudo

rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged

消息持久性

我们已经了解了如何确保即使消费者死亡,任务也不会丢失。但是,如果 RabbitMQ 服务器停止,我们的任务仍然会丢失。

当 RabbitMQ 退出或崩溃时,它将忘记队列和消息,除非您告诉它不要忘记。要确保消息不会丢失,需要两件事:我们需要将队列和消息都标记为持久化。

首先,我们需要确保队列能够在 RabbitMQ 节点重新启动后仍然存在。为此,我们需要将其声明为持久化

channel.QueueDeclare(queue: "hello",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);

虽然此命令本身是正确的,但它在我们当前的设置中不起作用。这是因为我们已经定义了一个名为 hello 的队列,该队列不是持久化的。RabbitMQ 不允许您使用不同的参数重新定义现有队列,并且会向尝试执行此操作的任何程序返回错误。但是有一个快速的解决方法 - 让我们声明一个具有不同名称的队列,例如 task_queue

channel.QueueDeclare(queue: "task_queue",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);

QueueDeclare 更改需要应用于生产者和消费者代码。您还需要更改 BasicConsumeBasicPublish 的队列名称。

此时,我们确信即使 RabbitMQ 重新启动,task_queue 队列也不会丢失。现在我们需要将我们的消息标记为持久化。

在现有 GetBytes 之后,将 IBasicProperties.Persistent 设置为 true

var body = Encoding.UTF8.GetBytes(message);

var properties = channel.CreateBasicProperties();
properties.Persistent = true;

关于消息持久性的说明

将消息标记为持久化并不能完全保证消息不会丢失。虽然它告诉 RabbitMQ 将消息保存到磁盘,但仍然存在一个短暂的时间窗口,在此期间 RabbitMQ 已接受消息但尚未保存它。此外,RabbitMQ 不会为每条消息执行 fsync(2) - 它可能只是保存到缓存,而不是真正写入磁盘。持久性保证并不强大,但对于我们的简单任务队列来说已经足够了。如果您需要更强的保证,则可以使用发布者确认

公平分发

您可能已经注意到,分发机制仍然没有完全按照我们想要的方式工作。例如,在有两个工作进程的情况下,如果所有奇数号消息都很重,偶数号消息都很轻,那么一个工作进程将持续繁忙,而另一个工作进程几乎不做任何工作。RabbitMQ 对此一无所知,并且仍然会均匀地分发消息。

发生这种情况的原因是,RabbitMQ 只在消息进入队列时分发消息。它不会查看消费者未确认消息的数量。它只是盲目地将每个第 n 个消息分发给第 n 个消费者。

为了改变这种行为,我们可以使用带有 prefetchCount = 1 设置的 BasicQos 方法。这告诉 RabbitMQ 一次不要给工作进程发送超过一条消息。或者,换句话说,在工作进程处理并确认前一条消息之前,不要向其分发新消息。相反,它会将其分发给下一个尚未繁忙的工作进程。

Worker.cs 中现有的 QueueDeclare 后添加对 BasicQos 的调用。

channel.QueueDeclare(queue: "task_queue",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);

channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

关于队列大小的说明

如果所有工作进程都很繁忙,您的队列可能会填满。您需要关注这一点,并可能添加更多工作进程,或采用其他策略。

综合示例

打开两个终端。

首先运行消费者(工作进程),以便拓扑结构(主要是队列)就位。以下是其完整代码

using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

var factory = new ConnectionFactory { HostName = "localhost" };
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();

channel.QueueDeclare(queue: "task_queue",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);

channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

Console.WriteLine(" [*] Waiting for messages.");

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
byte[] body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine($" [x] Received {message}");

int dots = message.Split('.').Length - 1;
Thread.Sleep(dots * 1000);

Console.WriteLine(" [x] Done");

// here channel could also be accessed as ((EventingBasicConsumer)sender).Model
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
channel.BasicConsume(queue: "task_queue",
autoAck: false,
consumer: consumer);

Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();

现在运行任务发布者(NewTask)。其最终代码是

using System.Text;
using RabbitMQ.Client;

var factory = new ConnectionFactory { HostName = "localhost" };
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();

channel.QueueDeclare(queue: "task_queue",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);

var message = GetMessage(args);
var body = Encoding.UTF8.GetBytes(message);

var properties = channel.CreateBasicProperties();
properties.Persistent = true;

channel.BasicPublish(exchange: string.Empty,
routingKey: "task_queue",
basicProperties: properties,
body: body);
Console.WriteLine($" [x] Sent {message}");

Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();

static string GetMessage(string[] args)
{
return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!");
}

(NewTask.cs 源代码)

使用消息确认和 BasicQos,您可以设置一个工作队列。持久化选项允许任务即使在 RabbitMQ 重新启动后也能存活。

有关 IModel 方法和 IBasicProperties 的更多信息,您可以浏览 RabbitMQ .NET 客户端 API 参考在线文档

现在我们可以继续学习 教程 3,并了解如何将相同的消息传递给多个消费者。

© 2024 RabbitMQ. All rights reserved.