RabbitMQ 教程 - 工作队列
工作队列
(使用 AMQP 1.0 Java 客户端)
先决条件
本教程假设 RabbitMQ 已 安装 并在 localhost 上的 标准端口 (5672) 上运行。如果您使用不同的主机、端口或凭据,则需要调整连接设置。
哪里寻求帮助
如果您在学习本教程时遇到困难,可以通过 GitHub Discussions 或 RabbitMQ 社区 Discord 联系我们。
在第一个教程中,我们编写了一些程序,用于向指定的队列发送消息和接收消息。在这一篇中,我们将创建一个工作队列 (Work Queue),用于在多个工作进程(Worker)之间分发耗时的任务。
工作队列(又称:任务队列)背后的主要思想是避免立即执行一个资源密集型任务并等待其完成。相反,我们安排任务稍后执行。我们将一个任务封装成一条消息并将其发送到队列。一个在后台运行的工作进程会接收任务并最终执行工作。当您运行多个工作进程时,任务将在它们之间共享。
这个概念对于 Web 应用程序尤其有用,在这些应用程序中,不可能在短暂的 HTTP 请求窗口内处理一个复杂的任务。
本教程使用 RabbitMQ AMQP 1.0 Java 客户端 (com.rabbitmq.client:amqp-client)。它需要 RabbitMQ 4.0 或更高版本。可运行的源代码位于 RabbitMQ 教程代码库中(java-amqp 目录)。
准备工作
在上一篇教程中,我们发送了一条包含“Hello World!”的消息。现在我们将发送代表复杂任务的字符串。由于我们没有图像缩放或 PDF 渲染之类的真实任务,因此我们通过 Thread.sleep() 函数来模拟任务的繁忙状态。我们将字符串中点的数量作为任务的复杂度;每个点代表一秒的“工作量”。例如,由 Hello... 描述的虚拟任务将耗时三秒。
我们将对前一个示例中的 Send.java 模式进行微调,以允许从命令行发送任意消息。该程序将把任务调度到我们的工作队列中,因此我们将其命名为 NewTask.java。
String message = String.join(" ", argv);
// ... obtain a Publisher for the task queue, then:
publisher.publish(
publisher.message(message.getBytes(StandardCharsets.UTF_8)).durable(true),
context -> { /* wait for broker outcome; print on ACCEPTED */ });
我们原来的消费者程序也需要进行一些更改:它需要为消息体中的每个点模拟一秒的工作时间。它将处理已投递的消息并执行任务,因此我们将其命名为 Worker.java。
connection.consumerBuilder()
.queue(TASK_QUEUE_NAME)
.messageHandler((context, message) -> {
String text = new String(message.body(), StandardCharsets.UTF_8);
System.out.println(" [x] Received '" + text + "'");
try {
doWork(text);
} finally {
System.out.println(" [x] Done");
context.accept();
}
})
.build();
结算(accept)将在下文详细介绍;如果任务必须在工作进程故障后存续,则工作进程不得使用 preSettled() 投递模式。
我们用于模拟执行时间的虚构任务
private static void doWork(String task) throws InterruptedException {
for (char ch: task.toCharArray()) {
if (ch == '.') Thread.sleep(1000);
}
}
像第一个教程中那样使用 Maven 进行构建和运行(使用相同的 com.rabbitmq.client:amqp-client 依赖和 Exec 插件)。
轮询分发
使用任务队列的一个优点是能够轻松地并行化工作。如果我们正在积累工作积压,我们可以添加更多的工作进程,从而轻松扩展。
首先,让我们尝试同时运行两个 worker 实例。它们都会从队列中获取消息,但具体是如何分配的呢?让我们来看看。
你需要打开三个控制台。其中两个将运行工作进程程序。这两个控制台将作为我们的两个消费者——C1 和 C2。
# shell 1
mvn -q compile exec:java -Dexec.mainClass=Worker
# => [*] Waiting for messages. To exit press CTRL+C
# shell 2
mvn -q compile exec:java -Dexec.mainClass=Worker
# => [*] Waiting for messages. To exit press CTRL+C
在第三个中,我们将发布新任务。一旦您启动了消费者,您就可以发布几条消息
# shell 3
mvn -q compile exec:java -Dexec.mainClass=NewTask -Dexec.args='First message.'
# => [x] Sent 'First message.'
mvn -q compile exec:java -Dexec.mainClass=NewTask -Dexec.args='Second message..'
# => [x] Sent 'Second message..'
使用 Shell 的引号规则,以便将任务字符串作为程序参数传递给 NewTask(上述示例在 Unix 下使用 -Dexec.args;在 Windows 上,你可能更喜欢从 IDE 或小脚本中运行)。
让我们看看分发给我们的工作进程的内容
mvn -q compile exec:java -Dexec.mainClass=Worker
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'First message.'
# => [x] Received 'Third message...'
# => [x] Received 'Fifth message.....'
mvn -q compile exec:java -Dexec.mainClass=Worker
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'Second message..'
# => [x] Received 'Fourth message....'
默认情况下,RabbitMQ 会按顺序将每条消息发送给下一个消费者。平均而言,每个消费者将获得相同数量的消息。这种分发消息的方式称为轮询。尝试使用三个或更多工作进程进行此操作。
消息确认
执行任务可能需要几秒钟,你可能想知道如果消费者启动了一个长任务但在完成之前终止了会发生什么。使用默认的至少一次 (at-least-once) 消费模式,消费者必须对每条消息进行结算(accept、discard 或 requeue)。如果你不进行结算,在工作进程停止时,Broker 可以重新投递该消息。
为了确保消息在工作进程接收后但在完成处理前因崩溃而丢失,请务必仅在任务完成后进行结算。这里我们在 doWork 返回后的 finally 块中调用 context.accept()。
如果消费者在未结算的情况下死亡,RabbitMQ 将重新投递该消息。如果有其他消费者同时在线,它会迅速将消息重新投递给另一个消费者。这样,即使工作进程偶尔崩溃,你也可以确保没有消息丢失。
消费者投递确认会被强制执行超时机制(参见 投递确认超时)。
在本教程中,请不要启用 ConsumerBuilder.preSettled():该模式投递的消息是已结算的,如果工作进程崩溃,它们将无法被重新投递。
connection.consumerBuilder()
.queue(TASK_QUEUE_NAME)
.messageHandler((context, message) -> {
String text = new String(message.body(), StandardCharsets.UTF_8);
System.out.println(" [x] Received '" + text + "'");
try {
doWork(text);
} finally {
System.out.println(" [x] Done");
context.accept();
}
})
.build();
使用这种模式,如果你在工作进程处理消息时使用 CTRL+C 将其终止,那么在确认超时后,该消息可以被重新投递给另一个消费者。
遗忘结算
省略
context.accept()(或在任务完成前调用它)是一个常见的错误。当你的客户端退出时,消息会被重新投递(这看起来像是随机的重新投递),并且未确认的投递会堆积在 Broker 上。你可以使用
rabbitmqctl来检查队列。sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged在 Windows 上,删除 sudo
rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged
消息持久性
我们已经学会了如何确保即使消费者死亡,任务在结算前也不会丢失。但如果 RabbitMQ 服务器停止,我们的任务仍然会丢失。
当 RabbitMQ 退出或崩溃时,除非你配置了持久化,否则它会遗忘队列和消息。在现代版本的 RabbitMQ(即 4.0 或更高版本)中,瞬态非独占队列已被弃用。在几乎所有用例中,都应该使用持久化队列。仲裁队列 (Quorum Queues) 是始终持久化的。经典队列可以被设置为持久化,这是强烈推荐的做法。
该库默认创建持久化队列。对于消息,在发布时将出站消息标记为持久化 (durable)。在 Message 上设置 durable(true) 以将消息标记为持久化。这对经典队列很重要。在仲裁队列中,消息始终是持久化的。
首先,声明一个名为 task_queue 的仲裁队列(不要使用 hello,以避免与实验中现有的非持久化队列冲突)。
connection.management().queue("task_queue").quorum().queue().declare();
在发布或消费之前,在生产者和消费者中应用相同的声明。
发布消息时,构建一条持久化消息。
publisher.publish(
publisher.message(message.getBytes(StandardCharsets.UTF_8)).durable(true),
callback);
关于消息持久化的注意事项
将消息标记为持久化并不能完全保证消息永远不会丢失。在 Broker 的 I/O 层面仍存在一个小的时间窗口。如需更强的保证,请考虑使用 发布者确认 (publisher confirms) 以及 AMQP 1.0 客户端的逐条发布回调 (
Publisher.Status)。
公平分派
您可能已经注意到调度仍然不能完全按我们想要的方式工作。例如,在有两个工作进程的情况下,当所有奇数消息都很重而偶数消息都很轻时,一个工作进程将持续忙碌,而另一个工作进程几乎不做任何工作。好吧,RabbitMQ 对此一无所知,它仍然会均匀地分发消息。
发生这种情况是因为 Broker 可能会在之前消息结算前分发多条消息。
使用 AMQP 1.0 Java 客户端,可以通过在消费者构建器上将初始信用 (initial credits) 设置为 1,来限制每个消费者正在进行中 (in flight) 的消息数量。这相当于 AMQP 0-9-1 中的 basicQos(1) 或预取 (prefetch) 1。
connection.consumerBuilder()
.queue(TASK_QUEUE_NAME)
.initialCredits(1)
.messageHandler((context, message) -> { ... })
.build();
关于队列大小的说明
如果所有工作进程都忙碌,您的队列可能会填满。您需要密切关注这一点,并可能添加更多工作进程,或采取其他策略。
总而言之
NewTask.java 的最终概要
import com.rabbitmq.client.amqp.Connection;
import com.rabbitmq.client.amqp.Environment;
import com.rabbitmq.client.amqp.Publisher;
import com.rabbitmq.client.amqp.impl.AmqpEnvironmentBuilder;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class NewTask {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] argv) throws Exception {
try (Environment environment = new AmqpEnvironmentBuilder()
.connectionSettings()
.uri("amqp://guest:guest@localhost:5672/%2f")
.environmentBuilder()
.build();
Connection connection = environment.connectionBuilder().build()) {
connection.management().queue(TASK_QUEUE_NAME).quorum().queue().declare();
String message = String.join(" ", argv);
try (Publisher publisher = connection.publisherBuilder().queue(TASK_QUEUE_NAME).build()) {
CountDownLatch latch = new CountDownLatch(1);
publisher.publish(
publisher.message(message.getBytes(StandardCharsets.UTF_8)).durable(true),
context -> {
if (context.status() == Publisher.Status.ACCEPTED) {
System.out.println(" [x] Sent '" + message + "'");
}
latch.countDown();
});
if (!latch.await(5, TimeUnit.SECONDS)) {
throw new IllegalStateException("Timed out waiting for publish outcome");
}
}
}
}
}
以及 Worker.java
import com.rabbitmq.client.amqp.Connection;
import com.rabbitmq.client.amqp.Consumer;
import com.rabbitmq.client.amqp.Environment;
import com.rabbitmq.client.amqp.impl.AmqpEnvironmentBuilder;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;
public class Worker {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] argv) throws Exception {
Environment environment = new AmqpEnvironmentBuilder()
.connectionSettings()
.uri("amqp://guest:guest@localhost:5672/%2f")
.environmentBuilder()
.build();
Connection connection = environment.connectionBuilder().build();
connection.management().queue(TASK_QUEUE_NAME).quorum().queue().declare();
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
Consumer consumer = connection.consumerBuilder()
.queue(TASK_QUEUE_NAME)
.initialCredits(1)
.messageHandler((context, message) -> {
String text = new String(message.body(), StandardCharsets.UTF_8);
System.out.println(" [x] Received '" + text + "'");
try {
doWork(text);
} finally {
System.out.println(" [x] Done");
context.accept();
}
})
.build();
new CountDownLatch(1).await();
}
private static void doWork(String task) {
for (char ch : task.toCharArray()) {
if (ch == '.') {
try {
Thread.sleep(1000);
} catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
}
}
}
}
}
使用显式结算和 initialCredits(1),你可以建立一个工作队列。仲裁队列和持久化消息可以让任务像在 AMQP 0-9-1 教程中那样,在 Broker 重启后依然存续。
有关 API 的详细信息,请参阅 AMQP 1.0 Java 客户端 Javadoc。
现在我们可以进入教程 3,学习如何将同一条消息投递给多个消费者。