跳至主内容

RabbitMQ 教程 - 工作队列

工作队列

(使用 .NET 客户端)

信息

先决条件

本教程假设 RabbitMQ 已 安装 并在 localhost 上的 标准端口 (5672) 上运行。如果您使用不同的主机、端口或凭据,则需要调整连接设置。

哪里寻求帮助

如果您在学习本教程时遇到困难,可以通过 GitHub DiscussionsRabbitMQ 社区 Discord 联系我们。

第一个教程中,我们编写了发送和接收命名队列消息的程序。在这个教程中,我们将创建一个工作队列,用于将耗时任务分发给多个工作进程。

工作队列(又称:任务队列)背后的主要思想是避免立即执行一个资源密集型任务并等待其完成。相反,我们安排任务稍后执行。我们将一个任务封装成一条消息并将其发送到队列。一个在后台运行的工作进程会接收任务并最终执行工作。当您运行多个工作进程时,任务将在它们之间共享。

这个概念对于 Web 应用程序尤其有用,在这些应用程序中,不可能在短暂的 HTTP 请求窗口内处理一个复杂的任务。

准备工作

在本教程的前一部分,我们发送了一条包含“Hello World!”的消息。现在,我们将发送代表复杂任务的字符串。我们没有真实世界的任务,比如需要调整大小的图片或需要渲染的 PDF 文件,所以让我们通过假装我们很忙来模拟它——通过使用 Task.Delay() 函数(您需要在文件顶部附近添加 using System.Threading.Tasks; 才能访问 Task API)。我们将字符串中的点数作为其复杂度;每个点代表一秒的“工作”。例如,一个由 Hello... 描述的虚假任务将需要三秒钟。

我们将稍微修改我们之前示例中的发送程序,以便可以从命令行发送任意消息。此程序将为我们的工作队列安排任务,因此让我们将其命名为 NewTask

教程一一样,我们需要生成两个项目。

dotnet new console --name NewTask
mv NewTask/Program.cs NewTask/NewTask.cs
dotnet new console --name Worker
mv Worker/Program.cs Worker/Worker.cs
cd NewTask
dotnet add package RabbitMQ.Client
cd ../Worker
dotnet add package RabbitMQ.Client

将我们旧的 Send.cs 的代码复制到 NewTask.cs 并进行以下修改。

更新 message 变量的初始化

dotnet/NewTask/NewTask.cs
loading...

GetMessage 方法添加到 NewTask 类的末尾

dotnet/NewTask/NewTask.cs
loading...

我们旧的 Receive.cs 脚本也需要一些更改,以模拟消息正文中每个点的秒级工作。它将处理 RabbitMQ 传递的消息并执行任务,因此让我们将其复制到 Worker.cs 并按如下方式修改。

首先,将事件处理程序 lambda 修改为 async,然后,在我们现有的用于接收消息的 WriteLine 之后,添加虚假任务以模拟执行时间

var consumer = new AsyncEventingBasicConsumer(channel);
consumer.ReceivedAsync += async (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine($" [x] Received {message}");

int dots = message.Split('.').Length - 1;
await Task.Delay(dots * 1000);

Console.WriteLine(" [x] Done");
}

轮询分发

使用任务队列的一个优点是能够轻松地并行化工作。如果我们正在积累工作积压,我们可以添加更多的工作进程,从而轻松扩展。

首先,让我们尝试同时运行两个 Worker 实例。它们都将从队列中获取消息,但具体是如何获取的呢?让我们看看。

您需要打开三个控制台。两个将运行 Worker 程序。这些控制台将是我们的两个消费者 - C1 和 C2。

# shell 1
cd Worker
dotnet run
# => Press [enter] to exit.
# shell 2
cd Worker
dotnet run
# => Press [enter] to exit.

在第三个中,我们将发布新任务。一旦您启动了消费者,您就可以发布几条消息

# shell 3
cd NewTask
dotnet run "First message."
dotnet run "Second message.."
dotnet run "Third message..."
dotnet run "Fourth message...."
dotnet run "Fifth message....."

让我们看看分发给我们的工作进程的内容

# shell 1
# => Press [enter] to exit.
# => [x] Received First message.
# => [x] Done
# => [x] Received Third message...
# => [x] Done
# => [x] Received Fifth message.....
# => [x] Done
# shell 2
# => Press [enter] to exit.
# => [x] Received Second message..
# => [x] Done
# => [x] Received Fourth message....
# => [x] Done

默认情况下,RabbitMQ 会按顺序将每条消息发送给下一个消费者。平均而言,每个消费者将获得相同数量的消息。这种分发消息的方式称为轮询。尝试使用三个或更多工作进程进行此操作。

消息确认

执行任务可能需要几秒钟。您可能会想,如果一个消费者开始一个长时间任务并在中途死亡怎么办?使用我们当前的 K 代码,一旦 RabbitMQ 将消息传递给消费者,它会立即将其标记为删除。在这种情况下,如果您终止一个工作进程,我们将丢失它正在处理的消息。我们还将丢失分发给该特定工作进程但尚未处理的所有消息。

但是我们不想丢失任何任务。如果一个工作进程死亡,我们希望将任务分发给另一个工作进程。

为了确保消息永远不会丢失,RabbitMQ 支持消息确认。ack(确认)由消费者发送回来,告知 RabbitMQ 特定消息已收到、已处理,并且 RabbitMQ 可以删除它。

如果消费者死亡(其通道关闭、连接关闭或 TCP 连接丢失)而未发送 ack,RabbitMQ 将理解消息未完全处理,并会将其重新排队。如果当时有其他消费者在线,它将迅速将其重新分发给另一个消费者。这样,您可以确信没有消息会丢失,即使工作进程偶尔会死亡。

强制执行消费者交付确认的超时(默认为 30 分钟)。这有助于检测从未确认交付的错误(卡住)的消费者。您可以按照交付确认超时中的说明增加此超时。

手动消息确认默认开启。在之前的示例中,我们通过将 autoAck(“自动确认模式”)参数设置为 true 来显式关闭它们。现在是时候删除这个标志,并在我们完成任务后,从工作进程手动发送一个适当的确认了。

在现有的 WriteLine 之后,添加对 BasicAckAsync 的调用,并使用 autoAck:false 更新 BasicConsumeAsync

    Console.WriteLine(" [x] Done");

// here channel could also be accessed as ((AsyncEventingBasicConsumer)sender).Channel
await channel.BasicAckAsync(deliveryTag: ea.DeliveryTag, multiple: false);
};

await channel.BasicConsumeAsync("hello", autoAck: false, consumer: consumer);

使用此代码,您可以确保即使您在处理消息时使用 CTRL+C 终止了一个工作节点,也不会丢失任何内容。在工作节点终止后不久,所有未确认的消息都将被重新分发。

确认必须在接收到交付的同一通道上发送。尝试使用不同通道进行确认将导致通道级协议异常。请参阅确认文档指南以了解更多信息。

遗忘的确认

错过 BasicAck 是一个常见的错误。这是一个简单的错误,但后果很严重。当您的客户端退出时,消息将被重新分发(这可能看起来像随机重新分发),但 RabbitMQ 将占用越来越多的内存,因为它无法释放任何未确认的消息。

为了调试此类错误,您可以使用 rabbitmqctl 打印 messages_unacknowledged 字段

sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged

在 Windows 上,删除 sudo

rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged

消息持久性

我们已经学习了如何确保即使工作进程死亡,任务也不会丢失。但是,如果 RabbitMQ 服务器停止,我们的任务仍然会丢失。

当 RabbitMQ 退出或崩溃时,它会忘记队列和消息,除非您告诉它不要这样做。需要两件事来确保消息不会丢失:我们需要将队列和消息都标记为持久。

首先,我们需要确保队列能够承受 RabbitMQ 节点重启。为此,我们需要将其声明为持久

await channel.QueueDeclareAsync(queue: "hello",
durable: true, exclusive: false,
autoDelete: false, arguments: null);

虽然此命令本身是正确的,但它在我们当前的设置中不起作用。这是因为我们已经定义了一个名为 hello 的队列,该队列不是持久的。RabbitMQ 不允许您使用不同的参数重新定义现有队列,并且会向尝试这样做的任何程序返回错误。但是有一个快速的解决方法——让我们声明一个不同的队列名称,例如 task_queue

dotnet/NewTask/NewTask.cs
loading...

QueueDeclareAsync 更改需要应用于生产者和消费者代码。您还需要为 BasicConsumeAsyncBasicPublishAsync 更改队列名称。

此时,我们确信 task_queue 队列即使在 RabbitMQ 重启后也不会丢失。现在我们需要将我们的消息标记为持久。

在现有的 GetBytes 之后,将 IBasicProperties.Persistent 设置为 true

dotnet/NewTask/NewTask.cs
loading...

关于消息持久性的说明

将消息标记为持久并不能完全保证消息不会丢失。尽管它告诉 RabbitMQ 将消息保存到磁盘,但在 RabbitMQ 接受消息但尚未保存的短时间窗口内仍然存在风险。此外,RabbitMQ 不会为每条消息执行 fsync(2) ——它可能只保存在缓存中而未真正写入磁盘。持久性保证不强,但对于我们简单的任务队列来说已绰绰有余。如果您需要更强的保证,可以使用发布者确认

公平调度

您可能已经注意到调度仍然不能完全按我们想要的方式工作。例如,在有两个工作进程的情况下,当所有奇数消息都很重而偶数消息都很轻时,一个工作进程将持续忙碌,而另一个工作进程几乎不做任何工作。好吧,RabbitMQ 对此一无所知,它仍然会均匀地分发消息。

这是因为 RabbitMQ 在消息进入队列时就会分发消息。它不会查看消费者未确认消息的数量。它只是盲目地将第 n 条消息分发给第 n 个消费者。

为了改变这种行为,我们可以使用 BasicQos 方法,并将 prefetchCount 设置为 1。这告诉 RabbitMQ 一次不要给工作进程超过一条消息。换句话说,在工作进程处理并确认上一条消息之前,不要向其分发新消息。相反,它将分发给下一个未忙碌的工作进程。

Worker.cs 中现有的 QueueDeclareAsync 调用之后,添加对 BasicQos 的调用

dotnet/Worker/Worker.cs
loading...

关于队列大小的说明

如果所有工作进程都忙碌,您的队列可能会填满。您需要密切关注这一点,并可能添加更多工作进程,或采取其他策略。

整合

打开两个终端。

首先运行消费者(工作进程),以便拓扑(主要是队列)到位。这是其完整代码

dotnet/Worker/Worker.cs
loading...

现在运行任务发布者 (NewTask)。这是其最终代码

dotnet/NewTask/NewTask.cs
loading...

通过使用消息确认和 BasicQosAsync,您可以设置工作队列。持久性选项使任务即使在 RabbitMQ 重启后也能幸存。

有关 IChannel 方法和 IBasicProperties 的更多信息,您可以浏览在线的 RabbitMQ .NET 客户端 API 参考

现在我们可以继续教程 3,学习如何将同一消息传递给多个消费者。

© . This site is unofficial and not affiliated with VMware.