RabbitMQ 教程 - 工作队列
工作队列
(使用 php-amqplib)
先决条件
本教程假设 RabbitMQ 已安装并在 localhost
上的标准端口 (5672) 上运行。如果您使用不同的主机、端口或凭据,则连接设置需要进行调整。
在哪里寻求帮助
如果您在完成本教程时遇到问题,您可以通过GitHub 讨论区或RabbitMQ 社区 Discord联系我们。
在第一个教程中,我们编写了程序来发送和接收来自命名队列的消息。在本教程中,我们将创建一个工作队列,用于在多个工作进程之间分配耗时的任务。
工作队列(也称为:任务队列)背后的主要思想是避免立即执行资源密集型任务并等待其完成。相反,我们安排稍后完成任务。我们将任务封装为消息并将其发送到队列。后台运行的工作进程将弹出任务并最终执行作业。当您运行多个工作进程时,任务将在它们之间共享。
此概念在 Web 应用程序中特别有用,在 Web 应用程序中,在短暂的 HTTP 请求窗口期间无法处理复杂的任务。
准备
在本教程的前面部分,我们发送了一条包含“Hello World!”的消息。现在我们将发送表示复杂任务的字符串。我们没有像要调整大小的图像或要渲染的 pdf 文件这样的现实世界任务,所以让我们通过假装自己很忙来伪造它——通过使用 sleep()
函数。我们将字符串中点的数量作为其复杂性;每个点将占用一秒钟的“工作”。例如,由 Hello...
描述的伪任务将花费三秒钟。
我们将稍微修改我们之前示例中的send.php代码,以允许从命令行发送任意消息。此程序将任务调度到我们的工作队列,因此让我们将其命名为 new_task.php
$data = implode(' ', array_slice($argv, 1));
if (empty($data)) {
$data = "Hello World!";
}
$msg = new AMQPMessage($data);
$channel->basic_publish($msg, '', 'hello');
echo ' [x] Sent ', $data, "\n";
我们旧的receive.php脚本也需要一些更改:它需要为消息正文中的每个点模拟一秒钟的工作。它将从队列中弹出消息并执行任务,因此让我们将其称为 worker.php
$callback = function ($msg) {
echo ' [x] Received ', $msg->getBody(), "\n";
sleep(substr_count($msg->getBody(), '.'));
echo " [x] Done\n";
};
$channel->basic_consume('hello', '', false, true, false, false, $callback);
请注意,我们的伪任务模拟执行时间。
像教程一中那样运行它们
# shell 1
php worker.php
# shell 2
php new_task.php "A very hard task which takes two seconds.."
循环分发
使用任务队列的优势之一是可以轻松地并行化工作。如果我们正在积压工作,我们可以简单地添加更多工作进程,从而轻松地扩展。
首先,让我们尝试同时运行两个 worker.php
脚本。它们都将从队列中获取消息,但究竟是如何获取的呢?让我们看看。
您需要打开三个控制台。两个将运行 worker.php
脚本。这些控制台将是我们的两个消费者 - C1 和 C2。
# shell 1
php worker.php
# => [*] Waiting for messages. To exit press CTRL+C
# shell 2
php worker.php
# => [*] Waiting for messages. To exit press CTRL+C
在第三个控制台中,我们将发布新任务。启动消费者后,您可以发布一些消息
# shell 3
php new_task.php First message.
php new_task.php Second message..
php new_task.php Third message...
php new_task.php Fourth message....
php new_task.php Fifth message.....
让我们看看传递给我们的工作进程的是什么
# shell 1
php worker.php
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'First message.'
# => [x] Received 'Third message...'
# => [x] Received 'Fifth message.....'
# shell 2
php worker.php
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'Second message..'
# => [x] Received 'Fourth message....'
默认情况下,RabbitMQ 将依次将每条消息发送到下一个消费者。平均而言,每个消费者将获得相同数量的消息。这种分发消息的方式称为循环分发。使用三个或更多工作进程试一下。
消息确认
执行任务可能需要几秒钟,您可能想知道如果消费者启动一个长时间的任务并在其完成之前终止会发生什么。使用我们当前的代码,一旦 RabbitMQ 将消息传递给消费者,它就会立即将其标记为删除。在这种情况下,如果您终止工作进程,则它正在处理的消息将丢失。已调度到此特定工作进程但尚未处理的消息也会丢失。
但我们不想丢失任何任务。如果工作进程死亡,我们希望将任务传递给另一个工作进程。
为了确保消息永远不会丢失,RabbitMQ 支持消息确认。消费者发送回一个确认以告诉 RabbitMQ 特定消息已接收、已处理,并且 RabbitMQ 可以将其删除。
如果消费者在未发送确认的情况下死亡(其通道关闭、连接关闭或 TCP 连接丢失),RabbitMQ 将理解消息未完全处理,并将将其重新排队。如果有其他消费者同时在线,它将很快将其重新传递给另一个消费者。这样,即使工作进程偶尔死亡,您也可以确保没有消息丢失。
消费者传递确认上实施了超时(默认情况下为 30 分钟)。这有助于检测从未确认传递的错误(卡住)消费者。您可以按照传递确认超时中的说明增加此超时。
消息确认之前已由我们自己关闭。现在是时候通过将第四个参数设置为 basic_consume
为 false
(true 表示无确认)并从工作进程发送适当的确认(一旦我们完成任务)来打开它们。
$callback = function ($msg) {
echo ' [x] Received ', $msg->getBody(), "\n";
sleep(substr_count($msg->getBody(), '.'));
echo " [x] Done\n";
$msg->ack();
};
$channel->basic_consume('task_queue', '', false, false, false, false, $callback);
使用此代码,您可以确保即使您在工作进程正在处理消息时使用 CTRL+C 终止它,也不会丢失任何内容。工作进程终止后不久,所有未确认的消息都会重新传递。
确认必须在接收传递的同一通道上发送。尝试使用不同的通道进行确认会导致通道级别的协议异常。请参阅关于确认的文档指南以了解更多信息。
忘记确认
遗漏
ack
是一个常见的错误。这是一个容易犯的错误,但后果很严重。当您的客户端退出时,消息将被重新传递(这可能看起来像是随机重新传递),但 RabbitMQ 将消耗越来越多的内存,因为它无法释放任何未确认的消息。为了调试这种错误,您可以使用
rabbitmqctl
打印messages_unacknowledged
字段sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
在 Windows 上,删除 sudo
rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged
消息持久性
我们已经了解了如何确保即使消费者死亡,任务也不会丢失。但是,如果 RabbitMQ 服务器停止,我们的任务仍然会丢失。
当 RabbitMQ 退出或崩溃时,它会忘记队列和消息,除非您告诉它不要这样做。要确保消息不会丢失,需要两件事:我们需要将队列和消息都标记为持久。
首先,我们需要确保队列能够在 RabbitMQ 节点重启后继续存在。为此,我们需要将其声明为持久。为此,我们将第三个参数传递给 queue_declare
作为 true
$channel->queue_declare('hello', false, true, false, false);
尽管此命令本身是正确的,但它在我们的当前设置中不起作用。这是因为我们已经定义了一个名为 hello
的队列,该队列不是持久的。RabbitMQ 不允许您使用不同的参数重新定义现有队列,并且会向尝试执行此操作的任何程序返回错误。但是有一个快速的解决方法——让我们声明一个具有不同名称的队列,例如 task_queue
$channel->queue_declare('task_queue', false, true, false, false);
此标志设置为 true
需要应用于生产者和消费者代码。
此时,我们确定即使 RabbitMQ 重新启动,task_queue
队列也不会丢失。现在我们需要将我们的消息标记为持久
- 通过设置
delivery_mode = 2
消息属性,AMQPMessage
将其作为属性数组的一部分。
$msg = new AMQPMessage(
$data,
array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
);
关于消息持久性的说明
将消息标记为持久并不完全保证消息不会丢失。虽然它告诉 RabbitMQ 将消息保存到磁盘,但仍然存在一个短暂的时间窗口,RabbitMQ 在该窗口中接受了消息但尚未保存它。此外,RabbitMQ 不会对每条消息执行
fsync(2)
——它可能只保存到缓存中,而不是真正写入磁盘。持久性保证并不强,但对于我们的简单任务队列来说已经足够了。如果您需要更强的保证,则可以使用发布者确认。
公平分发
您可能已经注意到,分发仍然无法完全按照我们想要的方式工作。例如,在有两个工作进程的情况下,当所有奇数消息都很重而偶数消息很轻时,一个工作进程将始终繁忙,而另一个工作进程几乎不做任何工作。好吧,RabbitMQ 对此一无所知,并且仍然会均匀地分发消息。
发生这种情况是因为 RabbitMQ 仅在消息进入队列时分发消息。它不会查看消费者的未确认消息数量。它只是盲目地将每第 n 条消息分发给第 n 个消费者。
为了解决这个问题,我们可以使用 basic_qos
方法,并使用 prefetch_count
= 1
设置。这告诉 RabbitMQ 每次不要向工作进程提供超过一条消息。或者,换句话说,在工作进程处理并确认前一条消息之前,不要向其分发新消息。相反,它将将其分发给下一个不忙的工作进程。
$channel->basic_qos(null, 1, false);
关于队列大小的说明
如果所有 worker 都处于繁忙状态,您的队列可能会填满。您需要关注这一点,并可能添加更多 worker,或者采用其他策略。
综合示例
new_task.php
文件的最终代码
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('task_queue', false, true, false, false);
$data = implode(' ', array_slice($argv, 1));
if (empty($data)) {
$data = "Hello World!";
}
$msg = new AMQPMessage(
$data,
array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
);
$channel->basic_publish($msg, '', 'task_queue');
echo ' [x] Sent ', $data, "\n";
$channel->close();
$connection->close();
以及我们的 worker.php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('task_queue', false, true, false, false);
echo " [*] Waiting for messages. To exit press CTRL+C\n";
$callback = function ($msg) {
echo ' [x] Received ', $msg->getBody(), "\n";
sleep(substr_count($msg->getBody(), '.'));
echo " [x] Done\n";
$msg->ack();
};
$channel->basic_qos(null, 1, false);
$channel->basic_consume('task_queue', '', false, false, false, false, $callback);
try {
$channel->consume();
} catch (\Throwable $exception) {
echo $exception->getMessage();
}
$channel->close();
$connection->close();
使用消息确认和 prefetch
,您可以设置一个工作队列。持久化选项允许任务即使在 RabbitMQ 重新启动后也能幸存。
现在我们可以继续学习 教程 3,了解如何将相同的消息传递给多个消费者。