RabbitMQ 教程 - 工作队列
工作队列
(使用 Spring AMQP)
先决条件
本教程假设 RabbitMQ 已 安装 并在 localhost 上的 标准端口 (5672) 上运行。如果您使用不同的主机、端口或凭据,则需要调整连接设置。
哪里寻求帮助
如果您在学习本教程时遇到困难,可以通过 GitHub Discussions 或 RabbitMQ 社区 Discord 联系我们。
在第一个教程中,我们编写了从命名队列发送和接收消息的程序。在这一篇中,我们将创建一个工作队列 (Work Queue),用于在多个工作者(Workers)之间分配耗时的任务。
工作队列(又称:任务队列)背后的主要思想是避免立即执行一个资源密集型任务并等待其完成。相反,我们安排任务稍后执行。我们将一个任务封装成一条消息并将其发送到队列。一个在后台运行的工作进程会接收任务并最终执行工作。当您运行多个工作进程时,任务将在它们之间共享。
这个概念对于 Web 应用程序尤其有用,在这些应用程序中,不可能在短暂的 HTTP 请求窗口内处理一个复杂的任务。
准备工作
在本教程的前一部分,我们发送了一条包含“Hello World!”的消息。现在我们将发送代表复杂任务的字符串。我们没有真实的业务场景(如图像缩放或 PDF 渲染),所以我们通过使用 Thread.sleep() 函数来模拟忙碌状态。我们将字符串中点的数量视为任务的复杂度;每一个点代表一秒的“工作”时间。例如,由 Hello... 描述的虚构任务将耗时三秒。
如果您尚未设置项目,请参阅第一个教程中的设置。我们将遵循与第一个教程相同的模式:1) 创建一个包 tut2,并创建 Tut2Config、Tut2Receiver 和 Tut2Sender 类。首先创建一个新的包 tut2,我们将在这里放置这三个类。在配置类中,我们设置了两个 profile:教程标签 tut2 和模式名称 (work-queues)。我们利用 Spring 将队列暴露为 Bean。我们将接收者设置为一个 profile,并定义了两个 Bean 来对应上述图表中的工作者;即 receiver1 和 receiver2。最后,我们为发送者定义了一个 profile,并定义了发送者 Bean。配置至此完成。
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
@Profile({"tut2", "work-queues"})
@Configuration
public class Tut2Config {
@Bean
public Queue hello() {
return QueueBuilder.durable("task_queue").quorum().build();
}
@Profile("receiver")
private static class ReceiverConfig {
@Bean
public Tut2Receiver receiver1() {
return new Tut2Receiver(1);
}
@Bean
public Tut2Receiver receiver2() {
return new Tut2Receiver(2);
}
}
@Profile("sender")
@Bean
public Tut2Sender sender() {
return new Tut2Sender();
}
}
发送者 (Sender)
我们将修改发送者,通过在消息末尾追加点的方式来标识其是否为耗时较长的任务。我们使用 RabbitTemplate 上的相同方法 convertAndSend 来发布消息。文档对其定义为:“将 Java 对象转换为消息,并将其发送到具有默认交换机和默认路由键的目的地。”
package org.springframework.amqp.tutorials.tut2;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import java.util.concurrent.atomic.AtomicInteger;
public class Tut2Sender {
@Autowired
private RabbitTemplate template;
@Autowired
private Queue queue;
AtomicInteger dots = new AtomicInteger(0);
AtomicInteger count = new AtomicInteger(0);
@Scheduled(fixedDelay = 1000, initialDelay = 500)
public void send() {
StringBuilder builder = new StringBuilder("Hello");
if (dots.incrementAndGet() == 4) {
dots.set(1);
}
for (int i = 0; i < dots.get(); i++) {
builder.append('.');
}
builder.append(count.incrementAndGet());
String message = builder.toString();
template.convertAndSend(queue.getName(), message);
System.out.println(" [x] Sent '" + message + "'");
}
}
接收者 (Receiver)
我们的接收者 Tut2Receiver 在 doWork() 方法中模拟了虚构任务的任意长度,其中点的数量转化为工作所需的秒数。同样,我们利用 task_queue 队列上的 @RabbitListener 和用于接收消息的 @RabbitHandler。消费消息的实例被添加到我们的监视器中,以显示是哪个实例在处理消息、消息内容以及处理该消息所需的时间长度。
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.util.StopWatch;
@RabbitListener(queues = "task_queue")
public class Tut2Receiver {
private final int instance;
public Tut2Receiver(int i) {
this.instance = i;
}
@RabbitHandler
public void receive(String in) throws InterruptedException {
StopWatch watch = new StopWatch();
watch.start();
System.out.println("instance " + this.instance +
" [x] Received '" + in + "'");
doWork(in);
watch.stop();
System.out.println("instance " + this.instance +
" [x] Done in " + watch.getTotalTimeSeconds() + "s");
}
private void doWork(String in) throws InterruptedException {
for (char ch : in.toCharArray()) {
if (ch == '.') {
Thread.sleep(500);
}
}
}
}
总而言之
使用 mvn package 编译它们并使用以下选项运行
./mvnw clean package
# shell 1
java -jar target/rabbitmq-tutorials.jar --spring.profiles.active=work-queues,receiver
# shell 2
java -jar target/rabbitmq-tutorials.jar --spring.profiles.active=work-queues,sender
发送者的输出看起来应该像这样
Ready ... running for 10000ms
[x] Sent 'Hello.1'
[x] Sent 'Hello..2'
[x] Sent 'Hello...3'
[x] Sent 'Hello.4'
[x] Sent 'Hello..5'
[x] Sent 'Hello...6'
[x] Sent 'Hello.7'
[x] Sent 'Hello..8'
[x] Sent 'Hello...9'
[x] Sent 'Hello.10'
工作者的输出看起来应该像这样
Ready ... running for 10000ms
instance 1 [x] Received 'Hello.1'
instance 2 [x] Received 'Hello..2'
instance 1 [x] Done in 1.001s
instance 1 [x] Received 'Hello...3'
instance 2 [x] Done in 2.004s
instance 2 [x] Received 'Hello.4'
instance 2 [x] Done in 1.0s
instance 2 [x] Received 'Hello..5'
消息确认
执行任务可能需要几秒钟。您可能想知道,如果其中一个消费者开始执行一个长任务,但在任务只执行了一部分时意外终止,会发生什么。Spring AMQP 默认采用保守的消息确认 (message acknowledgement) 方法。如果监听器抛出异常,容器会调用
channel.basicReject(deliveryTag, requeue)
除非您明确设置,否则 Requeue(重新入队)默认是开启的
defaultRequeueRejected=false
或者监听器抛出了 AmqpRejectAndDontRequeueException。这通常是您希望从监听器中获得的行为。在这种模式下,无需担心遗忘确认。在处理完消息后,监听器会调用
channel.basicAck()
确认必须在接收到交付消息的同一个通道 (channel) 上发送。尝试使用不同的通道进行确认将导致通道级的协议异常。请参阅确认文档指南以了解更多信息。Spring AMQP 通常会处理这些细节,但在与直接使用 RabbitMQ Java 客户端的代码结合使用时,这一点需要牢记。
遗忘的确认
遗漏
basicAck是一个常见的错误,Spring AMQP 通过其默认配置帮助避免了这种情况。后果是很严重的:当您的客户端退出时,消息会被重新投递(这看起来像是随机的重新投递),并且由于 RabbitMQ 无法释放任何未确认的消息,它会消耗越来越多的内存。为了调试此类错误,您可以使用
rabbitmqctl打印messages_unacknowledged字段sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged在 Windows 上,删除 sudo
rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged
消息持久性 (Message persistence)
使用 Spring AMQP 时,消息默认是持久的。请注意,消息最终进入的队列也需要是持久的(durable),否则如果 RabbitMQ 代理重启,消息将无法存活,因为非持久化队列本身在重启后不会保存。
要更精确地控制消息持久性或出站消息的各个方面,您需要使用接受 MessagePostProcessor 参数的 RabbitTemplate#convertAndSend(...) 方法。MessagePostProcessor 在消息实际发送前提供了一个回调,因此这是修改消息有效载荷或标头的最佳位置。
关于消息持久化的注意事项
将消息标记为持久并不能完全保证消息不会丢失。尽管它告诉 RabbitMQ 将消息保存到磁盘,但在 RabbitMQ 接受消息但尚未保存的短时间窗口内仍然存在风险。此外,RabbitMQ 不会为每条消息执行
fsync(2)——它可能只保存在缓存中而未真正写入磁盘。持久性保证不强,但对于我们简单的任务队列来说已绰绰有余。如果您需要更强的保证,可以使用发布者确认。
公平分发 vs 轮询分发 (Fair dispatch vs Round-robin dispatching)
默认情况下,RabbitMQ 会依次将每条消息发送给下一个消费者。平均而言,每个消费者将获得相同数量的消息。这种分发消息的方式称为轮询 (round-robin)。在这种模式下,分发并不一定完全符合我们的需求。例如,在有两个工作者的情况下,如果所有奇数消息都是繁重的,而偶数消息是轻量的,那么一个工作者将始终处于忙碌状态,而另一个几乎不做任何工作。好吧,RabbitMQ 对此一无所知,仍然会均匀地分发消息。
这是因为 RabbitMQ 在消息进入队列时就会分发消息。它不会查看消费者未确认消息的数量。它只是盲目地将第 n 条消息分发给第 n 个消费者。
然而,“公平分发 (Fair dispatch)”是 Spring AMQP 的默认配置。AbstractMessageListenerContainer 将 DEFAULT_PREFETCH_COUNT 的值定义为 250。如果 DEFAULT_PREFETCH_COUNT 设置为 1,则行为将是上述的轮询分发。
关于
prefetchCount= 1 的说明在大多数情况下,
prefetchCount等于 1 会过于保守,并严重限制消费者的吞吐量。可以在 Spring AMQP 消费者文档中找到几个适用此配置的情况。有关预取的更多详细信息,请参阅消费者确认指南。
然而,由于 prefetchCount 默认设置为 250,这意味着 RabbitMQ 一次不会给一个工作者超过 250 条消息。换句话说,当未确认的消息数量达到 250 时,不要向该工作者分发新消息。相反,它会将消息分发给下一个尚未忙碌的工作者。
期望的 prefetchCount 值可以通过 AbstractMessageListenerContainer.setPrefetchCount(int prefetchCount) 进行设置。
关于队列大小的说明
如果所有工作进程都忙碌,您的队列可能会填满。您需要密切关注这一点,并可能添加更多工作进程,或采取其他策略。
通过使用 Spring AMQP,您可以获得为消息确认和公平分发配置的合理值。Spring AMQP 提供的队列默认持久性和消息默认持久化功能,使得即使 RabbitMQ 重启,消息也能够存活。
有关 Channel 方法和 MessageProperties 的更多信息,您可以浏览 在线 Javadoc。要了解 Spring AMQP 的底层基础,您可以查看 rabbitmq-java-client。
现在我们可以继续学习 教程 3,了解如何将同一条消息发送给多个消费者。