跳至主内容

RabbitMQ 教程 - 工作队列

工作队列

(使用 Java 客户端)

信息

先决条件

本教程假设 RabbitMQ 已 安装 并在 localhost 上的 标准端口 (5672) 上运行。如果您使用不同的主机、端口或凭据,则需要调整连接设置。

哪里寻求帮助

如果您在学习本教程时遇到困难,可以通过 GitHub DiscussionsRabbitMQ 社区 Discord 联系我们。

第一个教程中,我们编写了从命名队列发送和接收消息的程序。在这一篇中,我们将创建一个工作队列(Work Queue),用于在多个工作者(workers)之间分发耗时的任务。

工作队列(又称:任务队列)背后的主要思想是避免立即执行一个资源密集型任务并等待其完成。相反,我们安排任务稍后执行。我们将一个任务封装成一条消息并将其发送到队列。一个在后台运行的工作进程会接收任务并最终执行工作。当您运行多个工作进程时,任务将在它们之间共享。

这个概念对于 Web 应用程序尤其有用,在这些应用程序中,不可能在短暂的 HTTP 请求窗口内处理一个复杂的任务。

准备工作

在本教程的前一部分,我们发送了一条包含“Hello World!”的消息。现在,我们将发送代表复杂任务的字符串。由于我们没有实际的任务(例如调整图像大小或渲染 PDF 文件),我们通过使用 Thread.sleep() 函数来模拟忙碌状态。我们将字符串中的点号数量视为任务的复杂度;每一个点号代表一秒的“工作时间”。例如,Hello... 所代表的虚构任务将耗时三秒。

我们将对之前示例中的 Send.java 代码进行微调,以允许从命令行发送任意消息。这个程序将向我们的工作队列调度任务,所以我们将其命名为 NewTask.java

String message = String.join(" ", argv);

channel.basicPublish("", "hello", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");

我们旧的 Recv.java 程序也需要进行一些修改:它需要为消息体中的每一个点号模拟一秒的工作时间。它将处理已投递的消息并执行任务,所以我们将其命名为 Worker.java

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");

System.out.println(" [x] Received '" + message + "'");
try {
doWork(message);
} finally {
System.out.println(" [x] Done");
}
};
boolean autoAck = true; // acknowledgment is covered below
channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });

我们用于模拟执行时间的虚构任务

private static void doWork(String task) throws InterruptedException {
for (char ch: task.toCharArray()) {
if (ch == '.') Thread.sleep(1000);
}
}

像第一个教程那样编译它们(将 jar 文件放在工作目录中,并设置环境变量 CP)。

javac -cp $CP NewTask.java Worker.java

轮询分发

使用任务队列的一个优点是能够轻松地并行化工作。如果我们正在积累工作积压,我们可以添加更多的工作进程,从而轻松扩展。

首先,让我们尝试同时运行两个 worker 实例。它们都会从队列中获取消息,但具体是如何分配的呢?让我们来看看。

你需要打开三个控制台。其中两个将运行工作者程序。这些控制台将充当我们的两个消费者——C1 和 C2。

# shell 1
java -cp $CP Worker
# => [*] Waiting for messages. To exit press CTRL+C
# shell 2
java -cp $CP Worker
# => [*] Waiting for messages. To exit press CTRL+C

在第三个中,我们将发布新任务。一旦您启动了消费者,您就可以发布几条消息

# shell 3
java -cp $CP NewTask First message.
# => [x] Sent 'First message.'
java -cp $CP NewTask Second message..
# => [x] Sent 'Second message..'
java -cp $CP NewTask Third message...
# => [x] Sent 'Third message...'
java -cp $CP NewTask Fourth message....
# => [x] Sent 'Fourth message....'
java -cp $CP NewTask Fifth message.....
# => [x] Sent 'Fifth message.....'

让我们看看分发给我们的工作进程的内容

java -cp $CP Worker
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'First message.'
# => [x] Received 'Third message...'
# => [x] Received 'Fifth message.....'
java -cp $CP Worker
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'Second message..'
# => [x] Received 'Fourth message....'

默认情况下,RabbitMQ 会按顺序将每条消息发送给下一个消费者。平均而言,每个消费者将获得相同数量的消息。这种分发消息的方式称为轮询。尝试使用三个或更多工作进程进行此操作。

消息确认

完成一项任务可能需要几秒钟,您可能会想,如果一个消费者开始一项长时间任务然后终止了怎么办?使用我们当前的代码,一旦 RabbitMQ 将消息传递给消费者,它会立即将其标记为删除。在这种情况下,如果您终止一个工作者,它正在处理的消息就会丢失。分派给该特定工作者但尚未处理的消息也会丢失。

但是我们不想丢失任何任务。如果一个工作进程死亡,我们希望将任务分发给另一个工作进程。

为了确保消息永不丢失,RabbitMQ 支持消息确认(acknowledgments)。消费者会回传一个确认信息,告诉 RabbitMQ 某个特定的消息已经被接收、处理,并且 RabbitMQ 可以将其删除。

如果消费者死亡(其通道关闭、连接关闭或 TCP 连接丢失)而未发送 ack,RabbitMQ 将理解消息未完全处理,并会将其重新排队。如果当时有其他消费者在线,它将迅速将其重新分发给另一个消费者。这样,您可以确信没有消息会丢失,即使工作进程偶尔会死亡。

强制执行消费者交付确认的超时(默认为 30 分钟)。这有助于检测从未确认交付的错误(卡住)的消费者。您可以按照交付确认超时中的说明增加此超时。

手动消息确认默认是开启的。在之前的示例中,我们通过 autoAck=true 标志显式地将其关闭。现在是时候将此标志设置为 false,并在工作者完成任务后发送正式的确认信息了。

channel.basicQos(1); // accept only one unack-ed message at a time (see below)

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");

System.out.println(" [x] Received '" + message + "'");
try {
doWork(message);
} finally {
System.out.println(" [x] Done");
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
};
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });

使用这段代码,你可以确保即使你在使用CTRL+C终止一个正在处理消息的工作进程时,也不会丢失任何东西。工作进程终止后不久,所有未确认的消息都会被重新分发。

确认必须在接收到交付的同一通道上发送。尝试使用不同通道进行确认将导致通道级协议异常。请参阅确认文档指南以了解更多信息。

遗忘的确认

漏掉 basicAck 是一个常见的错误。虽然这很容易出错,但后果很严重。当你的客户端退出时,消息会被重新投递(看起来像是随机重新投递),但由于 RabbitMQ 无法释放任何未确认的消息,它会消耗越来越多的内存。

为了调试此类错误,您可以使用 rabbitmqctl 打印 messages_unacknowledged 字段

sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged

在 Windows 上,删除 sudo

rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged

消息持久性

我们已经学习了如何确保即使工作进程死亡,任务也不会丢失。但是,如果 RabbitMQ 服务器停止,我们的任务仍然会丢失。

当 RabbitMQ 退出或崩溃时,它会忘记队列和消息,除非您告诉它不要这样做。需要两件事来确保消息不会丢失:我们需要将队列和消息都标记为持久。

首先,我们需要确保队列能够承受 RabbitMQ 节点重启。为此,我们需要将其声明为持久

boolean durable = true;
Map<String, Object> args = Map.of("x-queue-type", "quorum");
channel.queueDeclare("hello", durable, false, false, args);

虽然此命令本身是正确的,但它在我们当前的设置中不起作用。这是因为我们已经定义了一个名为 hello 的队列,该队列不是持久的。RabbitMQ 不允许您使用不同的参数重新定义现有队列,并且会向尝试这样做的任何程序返回错误。但是有一个快速的解决方法——让我们声明一个不同的队列名称,例如 task_queue

boolean durable = true;
Map<String, Object> args = Map.of("x-queue-type", "quorum");
channel.queueDeclare("task_queue", durable, false, false, args);

这个 queueDeclare 的更改需要同时应用于生产者和消费者的代码中。

此时,我们确信即使 RabbitMQ 重启,task_queue 队列也不会丢失。现在我们需要将我们的消息标记为持久化。

  • 通过将 MessageProperties(实现了 BasicProperties)设置为 PERSISTENT_TEXT_PLAIN 值。
import com.rabbitmq.client.MessageProperties;

channel.basicPublish("", "task_queue",
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes());

关于消息持久化的注意事项

将消息标记为持久并不能完全保证消息不会丢失。尽管它告诉 RabbitMQ 将消息保存到磁盘,但在 RabbitMQ 接受消息但尚未保存的短时间窗口内仍然存在风险。此外,RabbitMQ 不会为每条消息执行 fsync(2) ——它可能只保存在缓存中而未真正写入磁盘。持久性保证不强,但对于我们简单的任务队列来说已绰绰有余。如果您需要更强的保证,可以使用发布者确认

公平分派

您可能已经注意到调度仍然不能完全按我们想要的方式工作。例如,在有两个工作进程的情况下,当所有奇数消息都很重而偶数消息都很轻时,一个工作进程将持续忙碌,而另一个工作进程几乎不做任何工作。好吧,RabbitMQ 对此一无所知,它仍然会均匀地分发消息。

这是因为 RabbitMQ 在消息进入队列时就会分发消息。它不会查看消费者未确认消息的数量。它只是盲目地将第 n 条消息分发给第 n 个消费者。

为了解决这个问题,我们可以使用 basicQos 方法并设置 prefetchCount = 1。这告诉 RabbitMQ 不要同时给一个工作者分配多于一条的消息。换句话说,在工作者处理并确认上一条消息之前,不要给它分发新的消息。取而代之的是,它会将消息分发给下一个尚未忙碌的工作者。

int prefetchCount = 1;
channel.basicQos(prefetchCount);

关于队列大小的说明

如果所有工作进程都忙碌,您的队列可能会填满。您需要密切关注这一点,并可能添加更多工作进程,或采取其他策略。

总而言之

我们的 NewTask.java 类的最终代码:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

import java.util.Map;

public class NewTask {

private static final String TASK_QUEUE_NAME = "task_queue";

public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
Map<String, Object> args = Map.of("x-queue-type", "quorum");
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, args);

String message = String.join(" ", argv);

channel.basicPublish("", TASK_QUEUE_NAME,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");
}
}

}

(NewTask.java 源码)

以及我们的 Worker.java

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

import java.util.Map;

public class Worker {

private static final String TASK_QUEUE_NAME = "task_queue";

public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();

Map<String, Object> args = Map.of("x-queue-type", "quorum");
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, args);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

channel.basicQos(1);

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");

System.out.println(" [x] Received '" + message + "'");
try {
doWork(message);
} finally {
System.out.println(" [x] Done");
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
};
channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> { });
}

private static void doWork(String task) {
for (char ch : task.toCharArray()) {
if (ch == '.') {
try {
Thread.sleep(1000);
} catch (InterruptedException _ignored) {
Thread.currentThread().interrupt();
}
}
}
}
}

(Worker.java 源码)

使用消息确认和 prefetchCount,你可以建立一个工作队列。持久化选项即使在 RabbitMQ 重启时也能让任务存活下来。

有关 Channel 方法和 MessageProperties 的更多信息,你可以浏览 在线 JavaDocs

现在我们可以继续学习教程 3,了解如何将同一条消息发送给多个消费者。

© . This site is unofficial and not affiliated with VMware.