RabbitMQ 教程 - 工作队列
工作队列
(使用 Java 客户端)
在第一个教程中,我们编写了程序来发送和接收来自命名队列的消息。在本教程中,我们将创建一个工作队列,用于在多个工作进程之间分配耗时的任务。
工作队列 (又称:任务队列) 背后的主要思想是避免立即执行资源密集型任务,并等待其完成。相反,我们将任务安排在稍后完成。我们将任务封装为消息,并将其发送到队列。后台运行的工作进程将弹出任务并最终执行作业。当您运行多个工作进程时,任务将在它们之间共享。
此概念在 Web 应用程序中特别有用,在 Web 应用程序中,在短 HTTP 请求窗口期间处理复杂任务是不可能的。
准备
在本教程的上一部分中,我们发送了一条包含“Hello World!”的消息。现在,我们将发送代表复杂任务的字符串。我们没有真实的任务,比如要调整大小的图像或要渲染的 pdf 文件,所以让我们通过假装很忙来模拟它 - 通过使用Thread.sleep()
函数。我们将使用字符串中的点号数量作为其复杂度;每个点号将代表一秒钟的“工作”。例如,由Hello...
描述的假任务将花费三秒钟。
我们将略微修改我们之前示例中的Send.java 代码,以允许从命令行发送任意消息。此程序将安排任务到我们的工作队列,因此让我们将其命名为NewTask.java
String message = String.join(" ", argv);
channel.basicPublish("", "hello", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
我们旧的Recv.java 程序也需要进行一些更改:它需要为消息主体中的每个点号模拟一秒钟的工作。它将处理已传递的消息并执行任务,因此让我们将其命名为Worker.java
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
try {
doWork(message);
} finally {
System.out.println(" [x] Done");
}
};
boolean autoAck = true; // acknowledgment is covered below
channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });
我们用来模拟执行时间的假任务
private static void doWork(String task) throws InterruptedException {
for (char ch: task.toCharArray()) {
if (ch == '.') Thread.sleep(1000);
}
}
像教程一中那样编译它们(使用工作目录中的 jar 文件和环境变量CP
)
javac -cp $CP NewTask.java Worker.java
循环调度
使用任务队列的优势之一是能够轻松地并行化工作。如果我们正在积压工作,我们可以简单地添加更多工作进程,并以此方式轻松地进行扩展。
首先,让我们尝试同时运行两个工作进程实例。它们都将从队列中获取消息,但究竟如何呢?让我们看看。
您需要打开三个控制台。两个将运行 worker 程序。这些控制台将成为我们的两个消费者 - C1 和 C2。
# shell 1
java -cp $CP Worker
# => [*] Waiting for messages. To exit press CTRL+C
# shell 2
java -cp $CP Worker
# => [*] Waiting for messages. To exit press CTRL+C
在第三个控制台中,我们将发布新任务。启动消费者后,您可以发布几条消息
# shell 3
java -cp $CP NewTask First message.
# => [x] Sent 'First message.'
java -cp $CP NewTask Second message..
# => [x] Sent 'Second message..'
java -cp $CP NewTask Third message...
# => [x] Sent 'Third message...'
java -cp $CP NewTask Fourth message....
# => [x] Sent 'Fourth message....'
java -cp $CP NewTask Fifth message.....
# => [x] Sent 'Fifth message.....'
让我们看看发送到我们的工作进程的内容
java -cp $CP Worker
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'First message.'
# => [x] Received 'Third message...'
# => [x] Received 'Fifth message.....'
java -cp $CP Worker
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'Second message..'
# => [x] Received 'Fourth message....'
默认情况下,RabbitMQ 会将每条消息依次发送到下一个消费者。平均而言,每个消费者将收到相同数量的消息。这种消息分发方式称为循环调度。尝试使用三个或更多工作进程来测试这一点。
消息确认
执行任务可能需要几秒钟,您可能想知道如果消费者开始一个长时间的任务并在完成之前终止会发生什么。在我们当前的代码中,一旦 RabbitMQ 将消息传递给消费者,它会立即将其标记为已删除。在这种情况下,如果您终止一个工作进程,它正在处理的消息就会丢失。已调度到该特定工作进程但尚未处理的消息也会丢失。
但是我们不想丢失任何任务。如果工作进程死亡,我们希望该任务被传递到另一个工作进程。
为了确保消息永远不会丢失,RabbitMQ 支持消息确认。确认由消费者发送回,以告诉 RabbitMQ 特定消息已收到、处理,并且 RabbitMQ 可以将其删除。
如果消费者死亡(其通道关闭、连接关闭或 TCP 连接丢失)但没有发送确认,RabbitMQ 会理解消息没有完全处理,并将将其重新排队。如果有其他消费者同时在线,它会立即将其重新传递给另一个消费者。这样,您可以确保没有消息丢失,即使工作进程偶尔会死亡。
对消费者传递确认实施了超时(默认值为 30 分钟)。这有助于检测从未确认传递的错误(卡住)消费者。您可以按照传递确认超时中的说明增加此超时。
手动消息确认默认情况下处于开启状态。在之前的示例中,我们通过autoAck=true
标志显式地将其关闭。现在是将此标志设置为false
并从工作进程发送适当的确认的时候了,一旦我们完成了任务。
channel.basicQos(1); // accept only one unack-ed message at a time (see below)
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
try {
doWork(message);
} finally {
System.out.println(" [x] Done");
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
};
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });
使用此代码,您可以确保即使您在工作进程处理消息时使用 CTRL+C 终止它,也不会丢失任何内容。工作进程终止后不久,所有未确认的消息将被重新传递。
确认必须在接收传递的同一通道上发送。尝试使用其他通道确认会导致通道级别的协议异常。请参阅关于确认的文档指南以了解更多信息。
遗漏的确认
遗漏
basicAck
是一个常见的错误。这是一个容易犯的错误,但后果很严重。当您的客户端退出时,消息将被重新传递(这可能看起来像随机重新传递),但 RabbitMQ 将占用越来越多的内存,因为它无法释放任何未确认的消息。为了调试这种错误,您可以使用
rabbitmqctl
打印messages_unacknowledged
字段sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
在 Windows 上,请删除 sudo
rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged
消息持久性
我们已经了解了如何确保即使消费者死亡,任务也不会丢失。但是,如果 RabbitMQ 服务器停止,我们的任务仍然会丢失。
当 RabbitMQ 退出或崩溃时,它会忘记队列和消息,除非您告诉它不要这样做。为了确保消息不会丢失,需要两件事:我们需要将队列和消息都标记为持久性。
首先,我们需要确保队列能够在 RabbitMQ 节点重启后继续存在。为此,我们需要将其声明为持久性
boolean durable = true;
channel.queueDeclare("hello", durable, false, false, null);
虽然此命令本身是正确的,但它在当前设置中无法正常工作。这是因为我们已经定义了一个名为hello
的队列,它不是持久性的。RabbitMQ 不允许您使用不同的参数重新定义现有的队列,并且会向尝试执行此操作的任何程序返回错误。但有一个快速解决方法 - 让我们声明一个名称不同的队列,例如task_queue
boolean durable = true;
channel.queueDeclare("task_queue", durable, false, false, null);
此queueDeclare
更改需要应用于生产者和消费者代码。
此时,我们确定task_queue
队列即使在 RabbitMQ 重启后也不会丢失。现在我们需要将我们的消息标记为持久性
- 通过将
MessageProperties
(实现BasicProperties
)设置为PERSISTENT_TEXT_PLAIN
值。
import com.rabbitmq.client.MessageProperties;
channel.basicPublish("", "task_queue",
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes());
关于消息持久性的说明
将消息标记为持久性并不能完全保证消息不会丢失。虽然它告诉 RabbitMQ 将消息保存到磁盘,但仍然存在一个短暂的时间窗口,RabbitMQ 已接受消息但尚未保存它。此外,RabbitMQ 不会为每条消息执行
fsync(2)
- 它可能只保存到缓存中,而实际上没有写入磁盘。持久性保证并不牢固,但这对于我们简单的任务队列来说已经足够了。如果您需要更牢固的保证,可以使用发布者确认。
公平调度
您可能已经注意到,消息分发仍然没有完全按我们预期的方式工作。例如,在有两个工作进程的情况下,如果所有奇数消息都很重,而偶数消息很轻,那么一个工作进程会一直很忙,而另一个工作进程几乎不会做任何工作。然而,RabbitMQ 并不知道这一点,仍然会平均分发消息。
这是因为 RabbitMQ 只是在消息进入队列时分发消息。它不会查看消费者未确认消息的数量。它只是盲目地将每第 n 个消息分发给第 n 个消费者。
为了解决这个问题,我们可以使用 `basicQos` 方法,并设置 `prefetchCount` = `1`。这告诉 RabbitMQ 每次只向一个工作进程发送一条消息。换句话说,在工作进程处理并确认前一条消息之前,不要向其分发新的消息。相反,它会将消息分发给下一个未处于繁忙状态的工作进程。
int prefetchCount = 1;
channel.basicQos(prefetchCount);
关于队列大小的说明
如果所有工作进程都处于繁忙状态,您的队列可能会填满。您需要关注这一点,并可能添加更多工作进程,或采取其他策略。
整合在一起
我们 `NewTask.java` 类最终的代码
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
public class NewTask {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
String message = String.join(" ", argv);
channel.basicPublish("", TASK_QUEUE_NAME,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");
}
}
}
以及我们的 `Worker.java`
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
public class Worker {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
channel.basicQos(1);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
try {
doWork(message);
} finally {
System.out.println(" [x] Done");
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
};
channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> { });
}
private static void doWork(String task) {
for (char ch : task.toCharArray()) {
if (ch == '.') {
try {
Thread.sleep(1000);
} catch (InterruptedException _ignored) {
Thread.currentThread().interrupt();
}
}
}
}
}
使用消息确认和 `prefetchCount`,您可以设置一个工作队列。持久化选项可以让任务即使在 RabbitMQ 重新启动后也能存活下来。
有关 `Channel` 方法和 `MessageProperties` 的更多信息,您可以浏览 在线 JavaDocs。
现在我们可以继续学习 教程 3,并学习如何将同一消息传递给多个消费者。