RabbitMQ 教程 - 工作队列
工作队列
(使用 Spring AMQP)
先决条件
本教程假设您已安装 RabbitMQ 并在 localhost
上的标准端口 (5672) 上运行。如果您使用不同的主机、端口或凭据,则需要调整连接设置。
哪里可以获得帮助
如果您在学习本教程时遇到问题,可以通过 GitHub Discussions 或 RabbitMQ 社区 Discord 联系我们。
在第一个教程中,我们编写了程序来从命名队列发送和接收消息。在本教程中,我们将创建一个工作队列,用于在多个工作进程之间分配耗时的任务。
工作队列(又名:任务队列)背后的主要思想是避免立即执行资源密集型任务并等待其完成。相反,我们安排任务稍后完成。我们将任务封装为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当您运行多个工作进程时,任务将在它们之间共享。
这个概念在 Web 应用程序中尤其有用,在 Web 应用程序中,在短暂的 HTTP 请求窗口期间处理复杂任务是不可能的。
准备工作
在本教程的前一部分中,我们发送了一条包含“Hello World!”的消息。现在我们将发送代表复杂任务的字符串。我们没有真实的world任务,例如要调整大小的图像或要渲染的 PDF 文件,因此让我们通过假装我们很忙来伪造它 - 通过使用 Thread.sleep()
函数。我们将字符串中点的数量作为其复杂性;每个点将代表一秒钟的“工作”。例如,由 Hello...
描述的虚假任务将花费三秒钟。
如果您尚未设置项目,请参阅第一个教程中的设置。我们将遵循与第一个教程相同的模式:1) 创建一个包 tut2
并创建 Tut2Config
、Tut2Receiver
和 Tut2Sender
类。首先创建一个新的包 tut2
,我们将在其中放置我们的三个类。在配置类中,我们设置了两个配置文件,教程的标签 tut2
和模式的名称 (work-queues
)。我们利用 Spring 将队列公开为 bean。我们将接收器设置为配置文件,并定义两个 bean 以对应于我们上面图表中的工作进程;receiver1
和 receiver2
。最后,我们为发送器定义一个配置文件,并定义发送器 bean。配置现已完成。
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
@Profile({"tut2", "work-queues"})
@Configuration
public class Tut2Config {
@Bean
public Queue hello() {
return new Queue("hello");
}
@Profile("receiver")
private static class ReceiverConfig {
@Bean
public Tut2Receiver receiver1() {
return new Tut2Receiver(1);
}
@Bean
public Tut2Receiver receiver2() {
return new Tut2Receiver(2);
}
}
@Profile("sender")
@Bean
public Tut2Sender sender() {
return new Tut2Sender();
}
}
发送器
我们将修改发送器,以通过使用 RabbitTemplate
上的相同方法 convertAndSend
在消息中附加一个点,以非常人为的方式提供识别它是否是长时间运行任务的方法。文档将其定义为“将 Java 对象转换为消息并将其发送到具有默认路由键的默认交换机。”
package org.springframework.amqp.tutorials.tut2;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import java.util.concurrent.atomic.AtomicInteger;
public class Tut2Sender {
@Autowired
private RabbitTemplate template;
@Autowired
private Queue queue;
AtomicInteger dots = new AtomicInteger(0);
AtomicInteger count = new AtomicInteger(0);
@Scheduled(fixedDelay = 1000, initialDelay = 500)
public void send() {
StringBuilder builder = new StringBuilder("Hello");
if (dots.incrementAndGet() == 4) {
dots.set(1);
}
for (int i = 0; i < dots.get(); i++) {
builder.append('.');
}
builder.append(count.incrementAndGet());
String message = builder.toString();
template.convertAndSend(queue.getName(), message);
System.out.println(" [x] Sent '" + message + "'");
}
}
接收器
我们的接收器 Tut2Receiver
在 doWork()
方法中模拟虚假任务的任意长度,其中点的数量转换为工作将花费的秒数。同样,我们在 hello
队列上利用 @RabbitListener
,并利用 @RabbitHandler
接收消息。消耗消息的实例被添加到我们的监视器中,以显示哪个实例、消息和处理消息的时间长度。
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.util.StopWatch;
@RabbitListener(queues = "hello")
public class Tut2Receiver {
private final int instance;
public Tut2Receiver(int i) {
this.instance = i;
}
@RabbitHandler
public void receive(String in) throws InterruptedException {
StopWatch watch = new StopWatch();
watch.start();
System.out.println("instance " + this.instance +
" [x] Received '" + in + "'");
doWork(in);
watch.stop();
System.out.println("instance " + this.instance +
" [x] Done in " + watch.getTotalTimeSeconds() + "s");
}
private void doWork(String in) throws InterruptedException {
for (char ch : in.toCharArray()) {
if (ch == '.') {
Thread.sleep(500);
}
}
}
}
将它们放在一起
使用 mvn package 编译它们并使用以下选项运行
./mvnw clean package
# shell 1
java -jar target/rabbitmq-tutorials.jar --spring.profiles.active=work-queues,receiver
# shell 2
java -jar target/rabbitmq-tutorials.jar --spring.profiles.active=work-queues,sender
发送器的输出应如下所示
Ready ... running for 10000ms
[x] Sent 'Hello.1'
[x] Sent 'Hello..2'
[x] Sent 'Hello...3'
[x] Sent 'Hello.4'
[x] Sent 'Hello..5'
[x] Sent 'Hello...6'
[x] Sent 'Hello.7'
[x] Sent 'Hello..8'
[x] Sent 'Hello...9'
[x] Sent 'Hello.10'
工作进程的输出应如下所示
Ready ... running for 10000ms
instance 1 [x] Received 'Hello.1'
instance 2 [x] Received 'Hello..2'
instance 1 [x] Done in 1.001s
instance 1 [x] Received 'Hello...3'
instance 2 [x] Done in 2.004s
instance 2 [x] Received 'Hello.4'
instance 2 [x] Done in 1.0s
instance 2 [x] Received 'Hello..5'
消息确认
执行任务可能需要几秒钟。您可能想知道,如果其中一个消费者启动了一个长时间运行的任务并在任务仅完成一部分时就死机了,会发生什么情况。默认情况下,Spring AMQP 采用保守的方法进行消息确认。如果侦听器抛出异常,则容器调用
channel.basicReject(deliveryTag, requeue)
除非您显式设置,否则 Requeue 默认为 true
defaultRequeueRejected=false
或者侦听器抛出 AmqpRejectAndDontRequeueException
。这通常是您希望从侦听器获得的行为。在这种模式下,无需担心忘记确认。处理消息后,侦听器调用
channel.basicAck()
确认必须在接收到传递的同一通道上发送。尝试使用不同的通道进行确认将导致通道级协议异常。请参阅关于确认的文档指南以了解更多信息。Spring AMQP 通常会处理此问题,但当与直接使用 RabbitMQ Java 客户端的代码结合使用时,这是需要记住的事情。
忘记确认
错过
basicAck
是一个常见的错误,Spring AMQP 通过其默认配置帮助避免这种情况。后果很严重。当您的客户端退出时,消息将被重新传递(这可能看起来像是随机重新传递),但 RabbitMQ 将消耗越来越多的内存,因为它将无法释放任何未确认的消息。为了调试这种错误,您可以使用
rabbitmqctl
打印messages_unacknowledged
字段sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
在 Windows 上,删除 sudo
rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged
消息持久性
默认情况下,Spring AMQP 中的消息是持久的。请注意,消息最终将进入的队列也需要是持久的,否则消息将无法在 broker 重启后幸存下来,因为非持久队列本身无法在重启后幸存下来。
为了更好地控制消息持久性或出站消息的各个方面,您需要使用接受 MessagePostProcessor
参数的 RabbitTemplate#convertAndSend(...)
方法。MessagePostProcessor
在实际发送消息之前提供回调,因此这是修改消息负载或标头的好地方。
关于消息持久性的注意事项
将消息标记为持久性并不能完全保证消息不会丢失。尽管它告诉 RabbitMQ 将消息保存到磁盘,但在 RabbitMQ 接受消息但尚未保存消息时,仍然有一个短暂的时间窗口。此外,RabbitMQ 不会对每条消息执行
fsync(2)
-- 它可能只是保存到缓存而不是真正写入磁盘。持久性保证不强,但这对于我们简单的任务队列来说已经足够了。如果您需要更强的保证,那么您可以使用发布者确认。
公平调度与轮询调度
默认情况下,RabbitMQ 将按顺序将每条消息发送给下一个消费者。平均而言,每个消费者将获得相同数量的消息。这种分发消息的方式称为轮询。在这种模式下,调度不一定完全按照我们的意愿工作。例如,在有两个工作进程的情况下,当所有奇数消息都很重而偶数消息很轻时,一个工作进程将一直很忙,而另一个工作进程几乎不执行任何工作。好吧,RabbitMQ 对此一无所知,并且仍然会均匀地调度消息。
发生这种情况是因为 RabbitMQ 只是在消息进入队列时调度消息。它不查看消费者的未确认消息数量。它只是盲目地将每第 n 条消息分派给第 n 个消费者。
但是,“公平调度”是 Spring AMQP 的默认配置。AbstractMessageListenerContainer
将 DEFAULT_PREFETCH_COUNT
的值定义为 250。如果 DEFAULT_PREFETCH_COUNT
设置为 1,则行为将是上面描述的轮询交付。
关于
prefetchCount
= 1 的注意事项在大多数情况下,
prefetchCount
等于 1 会过于保守,并严重限制消费者吞吐量。在 Spring AMQP 消费者文档 中可以找到一些适用此配置的情况有关预取的更多详细信息,请参阅消费者确认指南。
但是,由于 prefetchCount
默认设置为 250,这告诉 RabbitMQ 一次不要给一个工作进程超过 250 条消息。或者,换句话说,在未确认消息的数量为 250 时,不要向工作进程分派新消息。相反,它会将其分派给下一个不忙的工作进程。
所需的 prefetchCount
值可以通过 AbstractMessageListenerContainer.setPrefetchCount(int prefetchCount)
设置。
关于队列大小的注意事项
如果所有工作进程都很忙,您的队列可能会填满。您需要密切关注这一点,也许可以添加更多工作进程,或者采取其他策略。
通过使用 Spring AMQP,您可以获得为消息确认和公平调度配置的合理值。Spring AMQP 提供的队列的默认持久性和消息的持久性允许消息即使在 RabbitMQ 重新启动后也能幸存下来。
有关 Channel
方法和 MessageProperties
的更多信息,您可以浏览javadoc 在线文档。为了理解 Spring AMQP 的底层基础,您可以找到 rabbitmq-java-client。
现在我们可以继续教程 3,学习如何将同一条消息传递给多个消费者。