跳至主要内容

RabbitMQ 教程 - 工作队列

工作队列

(使用 amqp.node 客户端)

信息

先决条件

本教程假设 RabbitMQ 已 安装 并在 localhost 上运行,端口为 标准端口 (5672)。如果您使用不同的主机、端口或凭据,则需要调整连接设置。

在哪里寻求帮助

如果您在完成本教程时遇到问题,可以通过 GitHub 讨论RabbitMQ 社区 Discord 与我们联系。

第一个教程 中,我们编写了程序来发送和接收来自命名队列的消息。在本教程中,我们将创建一个工作队列,用于将耗时的任务分配给多个工作者。

工作队列(又称:任务队列)背后的主要思想是避免立即执行资源密集型任务并等待其完成。相反,我们安排任务稍后执行。我们将任务封装为一条消息并将其发送到队列。在后台运行的 worker 进程将弹出任务并最终执行作业。当您运行多个 worker 时,任务将在它们之间共享。

此概念在 Web 应用程序中特别有用,因为在短暂的 HTTP 请求窗口内无法处理复杂的任务。

准备

在本教程的前一部分,我们发送了一条包含 "Hello World!" 的消息。现在,我们将发送代表复杂任务的字符串。我们没有像要调整大小的图像或要渲染的 pdf 文件这样的实际任务,因此让我们通过假装很忙来伪造它 - 使用 setTimeout 方法。我们将使用字符串中的点号数量作为其复杂性;每个点号将代表一秒的“工作”。例如,由 Hello... 描述的假任务将需要三秒钟。

我们将稍微修改我们之前示例中的 send.js 代码,以允许从命令行发送任意消息。此程序将安排任务到我们的工作队列,因此让我们将其命名为 new_task.js

var queue = 'task_queue';
var msg = process.argv.slice(2).join(' ') || "Hello World!";

channel.assertQueue(queue, {
durable: true
});
channel.sendToQueue(queue, Buffer.from(msg), {
persistent: true
});
console.log(" [x] Sent '%s'", msg);

我们旧的 receive.js 脚本也需要一些更改:它需要为消息体中的每个点号模拟一秒的工作。它将从队列中弹出消息并执行任务,因此让我们将其命名为 worker.js

var queue = 'task_queue';

// This makes sure the queue is declared before attempting to consume from it
channel.assertQueue(queue, {
durable: true
});

channel.consume(queue, function(msg) {
var secs = msg.content.toString().split('.').length - 1;

console.log(" [x] Received %s", msg.content.toString());
setTimeout(function() {
console.log(" [x] Done");
}, secs * 1000);
}, {
// automatic acknowledgment mode,
// see /docs/confirms for details
noAck: true
});

请注意,我们的假任务模拟了执行时间。

像教程一那样运行它们

# shell 1
./worker.js
# shell 2
./new_task.js

循环调度

使用任务队列的优势之一是可以轻松地并行化工作。如果我们正在积压工作,我们可以添加更多 worker,这样就可以轻松地扩展。

首先,让我们尝试同时运行两个 worker.js 脚本。它们都将从队列中获取消息,但具体是如何呢?让我们看看。

您需要打开三个控制台。两个将运行 worker.js 脚本。这些控制台将是我们的两个消费者 - C1 和 C2。

# shell 1
./worker.js
# => [*] Waiting for messages. To exit press CTRL+C
# shell 2
./worker.js
# => [*] Waiting for messages. To exit press CTRL+C

在第三个控制台中,我们将发布新任务。启动消费者后,您可以发布一些消息

# shell 3
./new_task.js First message.
./new_task.js Second message..
./new_task.js Third message...
./new_task.js Fourth message....
./new_task.js Fifth message.....

让我们看看发送给 worker 的内容

# shell 1
./worker.js
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'First message.'
# => [x] Received 'Third message...'
# => [x] Received 'Fifth message.....'
# shell 2
./worker.js
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'Second message..'
# => [x] Received 'Fourth message....'

默认情况下,RabbitMQ 会将每条消息依次发送给下一个消费者。平均而言,每个消费者都会收到相同数量的消息。这种分配消息的方式称为循环调度。使用三个或更多 worker 尝试一下。

消息确认

执行任务可能需要几秒钟,您可能想知道如果消费者在完成任务之前启动了一个长时间的任务并终止了会发生什么。在当前代码中,一旦 RabbitMQ 将消息传递给消费者,它就会立即将其标记为删除。在这种情况下,如果您终止了一个 worker,您将丢失它正在处理的消息。分配给该特定 worker 但尚未处理的消息也会丢失。

但我们不想丢失任何任务。如果一个 worker 死亡,我们希望将任务传递给另一个 worker。

为了确保消息永不丢失,RabbitMQ 支持 消息确认。消费者会发送一个确认消息,以告诉 RabbitMQ 特定消息已收到、已处理,并且 RabbitMQ 可以将其删除。

如果消费者在没有发送确认消息的情况下死亡(其通道已关闭、连接已关闭或 TCP 连接已丢失),RabbitMQ 将理解消息未完全处理,并将将其重新排队。如果有其他消费者同时在线,它将立即将其重新传递给另一个消费者。这样一来,即使 worker 偶尔死亡,您也可以确保没有消息丢失。

对消费者交付确认施加了一个超时(默认情况下为 30 分钟)。这有助于检测从不确认交付的错误(卡住)消费者。您可以根据 交付确认超时 中的说明增加此超时。

在之前的示例中,手动消费者确认已关闭。现在是时候使用 {noAck: false} 选项将其打开,并在完成任务后从 worker 发送适当的确认消息。

channel.consume(queue, function(msg) {
var secs = msg.content.toString().split('.').length - 1;

console.log(" [x] Received %s", msg.content.toString());
setTimeout(function() {
console.log(" [x] Done");
channel.ack(msg);
}, secs * 1000);
}, {
// manual acknowledgment mode,
// see /docs/confirms for details
noAck: false
});

使用此代码,您可以确保即使您使用 CTRL+C 在 worker 处理消息时终止了它,也不会丢失任何东西。worker 终止后不久,所有未确认的消息将被重新传递。

确认必须在接收交付的同一通道上发送。尝试使用不同的通道进行确认会导致通道级协议异常。请参阅 有关确认的文档指南 以了解更多信息。

忘记确认

错过 ack 是一个常见的错误。这是一个容易犯的错误,但后果很严重。当您的客户端退出时,消息将被重新传递(这可能看起来像随机重新传递),但 RabbitMQ 将消耗越来越多的内存,因为它无法释放任何未确认的消息。

为了调试这种错误,您可以使用 rabbitmqctl 打印 messages_unacknowledged 字段

sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged

在 Windows 上,去掉 sudo

rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged

消息持久性

我们已经了解了如何确保即使消费者死亡,任务也不会丢失。但是,如果 RabbitMQ 服务器停止,我们的任务仍然会丢失。

当 RabbitMQ 退出或崩溃时,它会忘记队列和消息,除非您告诉它不要这样做。为了确保消息不会丢失,需要做两件事:我们需要将队列和消息都标记为持久。

首先,我们需要确保队列能够在 RabbitMQ 节点重新启动后存活下来。为此,我们需要将其声明为持久

channel.assertQueue('hello', {durable: true});

虽然此命令本身是正确的,但它在当前设置中不起作用。这是因为我们已经定义了一个名为 hello 的队列,它不是持久的。RabbitMQ 不允许您使用不同的参数重新定义现有队列,并且会将错误返回给尝试执行此操作的任何程序。但是,有一个快速的解决方法 - 让我们声明一个不同名称的队列,例如 task_queue

channel.assertQueue('task_queue', {durable: true});

durable 选项更改需要应用于生产者和消费者代码。

此时,我们确信即使 RabbitMQ 重新启动,task_queue 队列也不会丢失。现在我们需要将我们的消息标记为持久

  • 通过使用 persistent 选项 Channel.sendToQueue 接受。
channel.sendToQueue(queue, Buffer.from(msg), {persistent: true});

关于消息持久性的说明

将消息标记为持久并不完全保证消息不会丢失。虽然它告诉 RabbitMQ 将消息保存到磁盘,但在 RabbitMQ 接受消息但尚未保存消息的短暂时间窗口内,消息仍然会丢失。此外,RabbitMQ 不会对每条消息执行 fsync(2) - 它可能只是保存到缓存中,而没有真正写入磁盘。持久性保证并不强,但对于我们简单的任务队列来说已经足够了。如果您需要更强的保证,那么可以使用 发布确认

公平分配

您可能已经注意到,分配仍然没有完全按我们想要的方式工作。例如,在有两个 worker 的情况下,当所有奇数消息都很重而所有偶数消息都很轻时,一个 worker 将一直很忙,而另一个 worker 几乎不会工作。好吧,RabbitMQ 对此一无所知,它仍然会均匀地分配消息。

之所以发生这种情况,是因为 RabbitMQ 只会在消息进入队列时分配消息。它不会查看消费者未确认消息的数量。它只是盲目地将每条第 n 条消息分配给第 n 个消费者。

为了克服这个问题,我们可以使用值为 1prefetch 方法。这告诉 RabbitMQ 每次最多只给一个 worker 一条消息。或者,换句话说,不要在 worker 处理并确认之前的消息之前分配新消息给它。相反,它将分配给下一个尚未忙的 worker。

channel.prefetch(1);

关于队列大小的说明

如果所有工作者都很忙,你的队列可能会填满。你需要注意这一点,也许可以添加更多工作者,或者采取其他策略。

整合在一起

我们 new_task.js 类的最终代码

#!/usr/bin/env node

var amqp = require('amqplib/callback_api');

amqp.connect('amqp://127.0.0.1', function(error0, connection) {
if (error0) {
throw error0;
}
connection.createChannel(function(error1, channel) {
if (error1) {
throw error1;
}
var queue = 'task_queue';
var msg = process.argv.slice(2).join(' ') || "Hello World!";

channel.assertQueue(queue, {
durable: true
});
channel.sendToQueue(queue, Buffer.from(msg), {
persistent: true
});
console.log(" [x] Sent '%s'", msg);
});
setTimeout(function() {
connection.close();
process.exit(0)
}, 500);
});

(new_task.js 源代码)

以及我们的 worker.js

#!/usr/bin/env node

var amqp = require('amqplib/callback_api');

amqp.connect('amqp://127.0.0.1', function(error0, connection) {
if (error0) {
throw error0;
}
connection.createChannel(function(error1, channel) {
if (error1) {
throw error1;
}
var queue = 'task_queue';

channel.assertQueue(queue, {
durable: true
});
channel.prefetch(1);
console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", queue);
channel.consume(queue, function(msg) {
var secs = msg.content.toString().split('.').length - 1;

console.log(" [x] Received %s", msg.content.toString());
setTimeout(function() {
console.log(" [x] Done");
channel.ack(msg);
}, secs * 1000);
}, {
// manual acknowledgment mode,
// see /docs/confirms for details
noAck: false
});
});
});

(worker.js 源代码)

使用消息确认和 prefetch,你可以设置一个工作队列。持久化选项可以使任务即使在 RabbitMQ 重启后也能存活下来。

有关 Channel 方法和消息属性的更多信息,请浏览 amqplib 文档

现在我们可以继续 教程 3,学习如何将同一消息传递给多个消费者。

© 2024 RabbitMQ. All rights reserved.