RabbitMQ 教程 - 工作队列
工作队列
(使用 .NET 客户端)
先决条件
本教程假设 RabbitMQ 已安装并在 localhost
上的标准端口 (5672) 上运行。如果您使用不同的主机、端口或凭据,则需要调整连接设置。
哪里可以获得帮助
如果您在学习本教程时遇到问题,可以通过 GitHub Discussions 或 RabbitMQ 社区 Discord 联系我们。
在第一个教程中,我们编写了程序来发送和接收来自命名队列的消息。在本教程中,我们将创建一个工作队列,它将用于在多个工作者之间分配耗时的任务。
工作队列(也称为:任务队列)背后的主要思想是避免立即执行资源密集型任务,并且不必等待其完成。相反,我们安排任务稍后完成。我们将任务封装为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当您运行多个工作者时,任务将在它们之间共享。
这个概念在 Web 应用程序中特别有用,在 Web 应用程序中,在短暂的 HTTP 请求窗口期间处理复杂的任务是不可能的。
准备工作
在本教程的前一部分中,我们发送了一条包含“Hello World!”的消息。现在我们将发送代表复杂任务的字符串。我们没有真实的实际任务,例如要调整大小的图像或要渲染的 pdf 文件,所以让我们通过假装我们很忙来伪造它 - 通过使用 Task.Delay()
函数(您需要将 using System.Threading.Tasks;
添加到文件顶部附近以访问 Task
API)。我们将字符串中点的数量作为其复杂性;每个点将代表一秒钟的“工作”。例如,由 Hello...
描述的虚假任务将花费三秒钟。
我们将稍微修改我们之前示例中的Send程序,以允许从命令行发送任意消息。这个程序将调度任务到我们的工作队列,所以让我们将其命名为 NewTask
像教程一一样,我们需要生成两个项目。
dotnet new console --name NewTask
mv NewTask/Program.cs NewTask/NewTask.cs
dotnet new console --name Worker
mv Worker/Program.cs Worker/Worker.cs
cd NewTask
dotnet add package RabbitMQ.Client
cd ../Worker
dotnet add package RabbitMQ.Client
将我们旧的 Send.cs 中的代码复制到 NewTask.cs 并进行以下修改。
更新 message 变量的初始化
loading...
将 GetMessage 方法添加到 NewTask 类的末尾
loading...
我们旧的 Receive.cs 脚本也需要一些更改,以便为消息正文中的每个点伪造一秒钟的工作。它将处理 RabbitMQ 传递的消息并执行任务,所以让我们将其复制到 Worker.cs 并按如下方式修改它。
首先,修改事件处理程序 lambda 为 async
,然后,在我们现有的用于接收消息的 WriteLine 之后,添加伪造任务以模拟执行时间
var consumer = new AsyncEventingBasicConsumer(channel);
consumer.ReceivedAsync += async (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine($" [x] Received {message}");
int dots = message.Split('.').Length - 1;
await Task.Delay(dots * 1000);
Console.WriteLine(" [x] Done");
}
轮询分发
使用任务队列的优势之一是能够轻松地并行化工作。如果我们正在积累工作积压,我们可以简单地添加更多工作者,从而轻松扩展。
首先,让我们尝试同时运行两个 Worker
实例。它们都将从队列中获取消息,但具体如何?让我们看看。
您需要打开三个控制台。两个将运行 Worker
程序。这些控制台将是我们的两个消费者 - C1 和 C2。
# shell 1
cd Worker
dotnet run
# => Press [enter] to exit.
# shell 2
cd Worker
dotnet run
# => Press [enter] to exit.
在第三个控制台中,我们将发布新任务。一旦您启动了消费者,您就可以发布一些消息
# shell 3
cd NewTask
dotnet run "First message."
dotnet run "Second message.."
dotnet run "Third message..."
dotnet run "Fourth message...."
dotnet run "Fifth message....."
让我们看看交付给我们的工作者是什么
# shell 1
# => Press [enter] to exit.
# => [x] Received First message.
# => [x] Done
# => [x] Received Third message...
# => [x] Done
# => [x] Received Fifth message.....
# => [x] Done
# shell 2
# => Press [enter] to exit.
# => [x] Received Second message..
# => [x] Done
# => [x] Received Fourth message....
# => [x] Done
默认情况下,RabbitMQ 将按顺序将每条消息发送给下一个消费者。平均而言,每个消费者将获得相同数量的消息。这种分发消息的方式称为轮询。使用三个或更多工作者尝试一下。
消息确认
完成一项任务可能需要几秒钟。您可能想知道,如果其中一个消费者开始一项长时间的任务,并且在任务只完成一部分时就崩溃了,会发生什么情况。使用我们当前的代码,一旦 RabbitMQ 将消息传递给消费者,它会立即将其标记为删除。在这种情况下,如果您终止一个工作者,我们将丢失它正在处理的消息。我们还将丢失已分派给此特定工作者但尚未处理的所有消息。
但是我们不想丢失任何任务。如果工作者崩溃,我们希望将任务传递给另一个工作者。
为了确保消息永远不会丢失,RabbitMQ 支持消息确认。确认 (acknowledgement) 是由消费者发回的,以告知 RabbitMQ 已收到并处理了特定消息,并且 RabbitMQ 可以自由删除它。
如果消费者崩溃(其通道关闭、连接关闭或 TCP 连接丢失)而没有发送确认,RabbitMQ 将理解消息未被完全处理,并将重新将其放入队列。如果同时有其他消费者在线,它将快速地将其重新传递给另一个消费者。这样,您可以确保即使工作者偶尔崩溃,也不会丢失任何消息。
消费者传递确认有超时时间(默认为 30 分钟)。这有助于检测从不确认交付的有缺陷(卡住)的消费者。您可以按照交付确认超时中所述增加此超时时间。
手动消息确认默认情况下是开启的。在之前的示例中,我们通过将 autoAck(“自动确认模式”)参数设置为 true 来显式关闭它们。现在是时候删除此标志,并在我们完成任务后从工作者手动发送适当的确认。
在现有的 WriteLine 之后,添加对 BasicAckAsync 的调用,并使用 autoAck:false 更新 BasicConsumeAsync
Console.WriteLine(" [x] Done");
// here channel could also be accessed as ((AsyncEventingBasicConsumer)sender).Channel
await channel.BasicAckAsync(deliveryTag: ea.DeliveryTag, multiple: false);
};
await channel.BasicConsumeAsync("hello", autoAck: false, consumer: consumer);
使用此代码,您可以确保即使您在使用 CTRL+C 终止工作者节点时,它正在处理消息,也不会丢失任何内容。在工作者节点终止后不久,所有未确认的消息都将被重新传递。
确认必须在接收交付的同一通道上发送。尝试使用不同的通道进行确认将导致通道级协议异常。请参阅关于确认的文档指南以了解更多信息。
忘记确认
错过
BasicAck
是一个常见的错误。这是一个容易犯的错误,但后果很严重。当您的客户端退出时,消息将被重新传递(这可能看起来像是随机重新传递),但 RabbitMQ 将消耗越来越多的内存,因为它将无法释放任何未确认的消息。为了调试此类错误,您可以使用
rabbitmqctl
打印messages_unacknowledged
字段sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
在 Windows 上,删除 sudo
rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged
消息持久性
我们已经学习了如何确保即使消费者崩溃,任务也不会丢失。但是,如果 RabbitMQ 服务器停止,我们的任务仍然会丢失。
当 RabbitMQ 退出或崩溃时,它会忘记队列和消息,除非您告诉它不要这样做。要确保消息不会丢失,需要做两件事:我们需要将队列和消息都标记为持久。
首先,我们需要确保队列在 RabbitMQ 节点重启后仍然存在。为了做到这一点,我们需要将其声明为持久
await channel.QueueDeclareAsync(queue: "hello",
durable: true, exclusive: false,
autoDelete: false, arguments: null);
尽管此命令本身是正确的,但在我们目前的设置中它不起作用。这是因为我们已经定义了一个名为 hello
的队列,它不是持久的。RabbitMQ 不允许您使用不同的参数重新定义现有队列,并且会向任何尝试这样做的程序返回错误。但是有一个快速的解决方法 - 让我们声明一个具有不同名称的队列,例如 task_queue
loading...
这个 QueueDeclareAsync
更改需要应用于生产者和消费者代码。您还需要更改 BasicConsumeAsync
和 BasicPublishAsync
的队列名称。
此时,我们确信即使 RabbitMQ 重启,task_queue
队列也不会丢失。现在我们需要将我们的消息标记为持久。
在现有的 GetBytes 之后,将 IBasicProperties.Persistent
设置为 true
loading...
关于消息持久性的说明
将消息标记为持久性并不能完全保证消息不会丢失。尽管它告诉 RabbitMQ 将消息保存到磁盘,但在 RabbitMQ 接受消息但尚未保存消息时,仍然存在一个短暂的时间窗口。此外,RabbitMQ 不会对每条消息执行
fsync(2)
——它可能只是保存到缓存中,而不是真正写入磁盘。持久性保证并不强,但对于我们简单的任务队列来说已经足够了。如果您需要更强的保证,那么您可以使用发布者确认。
公平分发
您可能已经注意到,分发仍然没有完全按照我们想要的方式工作。例如,在有两个工作者的情况下,当所有奇数消息都很重而偶数消息很轻时,一个工作者将一直处于忙碌状态,而另一个工作者几乎不做任何工作。好吧,RabbitMQ 对此一无所知,仍然会均匀地分发消息。
发生这种情况是因为 RabbitMQ 仅在消息进入队列时才分发消息。它不查看消费者的未确认消息的数量。它只是盲目地将每第 n 条消息分发给第 n 个消费者。
为了改变这种行为,我们可以使用 BasicQos
方法,并将 prefetchCount
设置为 1
。这告诉 RabbitMQ 一次不要给一个工作者超过一条消息。或者,换句话说,在工作者处理并确认上一条消息之前,不要向其分发新消息。相反,它会将其分发给下一个尚未忙碌的工作者。
在 Worker.cs 中现有的 QueueDeclareAsync 之后,添加对 BasicQos
的调用
loading...
关于队列大小的说明
如果所有工作者都很忙,您的队列可能会被填满。您需要密切关注这一点,并可能添加更多工作者,或者制定其他策略。
将所有内容放在一起
打开两个终端。
首先运行消费者(工作者),以便拓扑(主要是队列)就位。这是它的完整代码
loading...
现在运行任务发布者 (NewTask)。它的最终代码是
loading...
使用消息确认和 BasicQosAsync
,您可以设置工作队列。持久性选项使任务即使在 RabbitMQ 重新启动后也能幸存。
有关 IChannel
方法和 IBasicProperties
的更多信息,您可以浏览 RabbitMQ .NET 客户端 API 在线参考。
现在我们可以继续学习教程 3,并学习如何将同一消息传递给多个消费者。