跳至主要内容

RabbitMQ 教程 - 工作队列

工作队列

(使用 Spring AMQP)

信息

先决条件

本教程假设 RabbitMQ 已安装并在 localhost 上的标准端口 (5672) 上运行。如果您使用不同的主机、端口或凭据,则连接设置需要进行调整。

获取帮助

如果您在学习本教程时遇到问题,可以通过GitHub 讨论区RabbitMQ 社区 Discord联系我们。

第一个教程中,我们编写了程序来发送和接收来自命名队列的消息。在本教程中,我们将创建一个工作队列,用于在多个工作进程之间分配耗时的任务。

工作队列(也称为:任务队列)背后的主要思想是避免立即执行资源密集型任务并等待其完成。相反,我们安排稍后执行任务。我们将一个任务封装成消息并将其发送到队列。后台运行的工作进程将弹出任务并最终执行作业。当您运行多个工作进程时,任务将在它们之间共享。

此概念在 Web 应用程序中特别有用,在 Web 应用程序中,无法在短暂的 HTTP 请求窗口期间处理复杂的任务。

准备

在本教程的前面部分,我们发送了一条包含“Hello World!”的消息。现在,我们将发送代表复杂任务的字符串。我们没有现实世界中的任务,例如需要调整大小的图像或需要渲染的 PDF 文件,因此让我们通过假装很忙来模拟它 - 通过使用 Thread.sleep() 函数。我们将字符串中点的数量视为其复杂度;每个点将占“工作”一秒钟。例如,由 Hello... 描述的模拟任务将花费三秒钟。

如果您尚未设置项目,请参阅第一个教程中的设置。我们将遵循与第一个教程相同的模式:1) 创建一个包 tut2 并创建 Tut2ConfigTut2ReceiverTut2Sender 类。首先创建一个名为 tut2 的新包,我们将在此包中放置三个类。在配置类中,我们设置了两个配置文件,教程的标签 tut2 和模式的名称 (work-queues)。我们利用 Spring 将队列公开为 Bean。我们将接收器设置为配置文件,并定义两个 Bean 以对应于我们上面图表中的工作进程;receiver1receiver2。最后,我们为发送方定义一个配置文件并定义发送方 Bean。现在配置已完成。

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;

@Profile({"tut2", "work-queues"})
@Configuration
public class Tut2Config {

@Bean
public Queue hello() {
return new Queue("hello");
}

@Profile("receiver")
private static class ReceiverConfig {

@Bean
public Tut2Receiver receiver1() {
return new Tut2Receiver(1);
}

@Bean
public Tut2Receiver receiver2() {
return new Tut2Receiver(2);
}
}

@Profile("sender")
@Bean
public Tut2Sender sender() {
return new Tut2Sender();
}
}

发送方

我们将修改发送方以提供一种识别它是否是较长时间运行的任务的方法,方法是在消息中附加一个点,使用 RabbitTemplate 上的相同方法以非常人为的方式发布消息,convertAndSend。文档将此定义为:“将 Java 对象转换为消息并将其发送到具有默认路由键的默认交换机。”

package org.springframework.amqp.tutorials.tut2;

import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import java.util.concurrent.atomic.AtomicInteger;

public class Tut2Sender {

@Autowired
private RabbitTemplate template;

@Autowired
private Queue queue;

AtomicInteger dots = new AtomicInteger(0);

AtomicInteger count = new AtomicInteger(0);

@Scheduled(fixedDelay = 1000, initialDelay = 500)
public void send() {
StringBuilder builder = new StringBuilder("Hello");
if (dots.incrementAndGet() == 4) {
dots.set(1);
}
for (int i = 0; i < dots.get(); i++) {
builder.append('.');
}
builder.append(count.incrementAndGet());
String message = builder.toString();
template.convertAndSend(queue.getName(), message);
System.out.println(" [x] Sent '" + message + "'");
}

}

接收方

我们的接收器 Tut2ReceiverdoWork() 方法中模拟了模拟任务的任意长度,其中点的数量转换为工作将花费的秒数。同样,我们利用 hello 队列上的 @RabbitListener@RabbitHandler 来接收消息。正在使用消息的实例将添加到我们的监视器中,以显示哪个实例、消息以及处理消息所需的时间。

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.util.StopWatch;

@RabbitListener(queues = "hello")
public class Tut2Receiver {

private final int instance;

public Tut2Receiver(int i) {
this.instance = i;
}

@RabbitHandler
public void receive(String in) throws InterruptedException {
StopWatch watch = new StopWatch();
watch.start();
System.out.println("instance " + this.instance +
" [x] Received '" + in + "'");
doWork(in);
watch.stop();
System.out.println("instance " + this.instance +
" [x] Done in " + watch.getTotalTimeSeconds() + "s");
}

private void doWork(String in) throws InterruptedException {
for (char ch : in.toCharArray()) {
if (ch == '.') {
Thread.sleep(500);
}
}
}
}

综合示例

使用 mvn package 编译它们,并使用以下选项运行

./mvnw clean package

# shell 1
java -jar target/rabbitmq-tutorials.jar --spring.profiles.active=work-queues,receiver
# shell 2
java -jar target/rabbitmq-tutorials.jar --spring.profiles.active=work-queues,sender

发送方的输出应类似于

Ready ... running for 10000ms
[x] Sent 'Hello.1'
[x] Sent 'Hello..2'
[x] Sent 'Hello...3'
[x] Sent 'Hello.4'
[x] Sent 'Hello..5'
[x] Sent 'Hello...6'
[x] Sent 'Hello.7'
[x] Sent 'Hello..8'
[x] Sent 'Hello...9'
[x] Sent 'Hello.10'

工作进程的输出应类似于

Ready ... running for 10000ms
instance 1 [x] Received 'Hello.1'
instance 2 [x] Received 'Hello..2'
instance 1 [x] Done in 1.001s
instance 1 [x] Received 'Hello...3'
instance 2 [x] Done in 2.004s
instance 2 [x] Received 'Hello.4'
instance 2 [x] Done in 1.0s
instance 2 [x] Received 'Hello..5'

消息确认

执行任务可能需要几秒钟。您可能想知道如果某个消费者开始执行一项长时间的任务并在完成一半时死亡会发生什么。Spring AMQP 默认采用保守的消息确认方法。如果侦听器抛出异常,容器将调用

channel.basicReject(deliveryTag, requeue)

重新入队默认情况下为 true,除非您显式设置

defaultRequeueRejected=false

或侦听器抛出 AmqpRejectAndDontRequeueException。这通常是您希望侦听器具有的行为。在此模式下,无需担心遗忘的确认。处理完消息后,侦听器将调用

channel.basicAck()

必须在接收传递的同一通道上发送确认。尝试使用不同的通道进行确认会导致通道级别的协议异常。请参阅确认文档指南以了解更多信息。Spring AMQP 通常会处理此问题,但在与直接使用 RabbitMQ Java 客户端的代码结合使用时,需要注意这一点。

遗忘的确认

遗漏 basicAck 是一个常见错误,Spring AMQP 通过其默认配置帮助避免了这种情况。后果很严重。当您的客户端退出时,消息将重新传递(这可能看起来像是随机重新传递),但 RabbitMQ 将消耗越来越多的内存,因为它无法释放任何未确认的消息。

为了调试这种错误,您可以使用 rabbitmqctl 打印 messages_unacknowledged 字段

sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged

在 Windows 上,删除 sudo

rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged

消息持久性

使用 Spring AMQP 时,消息默认情况下是持久化的。请注意,消息最终所在的队列也需要是持久的,否则消息将无法在代理重新启动后存活下来,因为非持久队列本身无法在重新启动后存活下来。

要更好地控制消息持久性或出站消息的各个方面,您需要使用接受 MessagePostProcessor 参数的 RabbitTemplate#convertAndSend(...) 方法。MessagePostProcessor 在消息实际发送之前提供回调,因此这是修改消息有效负载或标头的理想位置。

关于消息持久性的说明

将消息标记为持久化并不能完全保证消息不会丢失。尽管它告诉 RabbitMQ 将消息保存到磁盘,但在 RabbitMQ 接受消息但尚未保存消息的短暂时间窗口内仍然存在。此外,RabbitMQ 不会为每条消息执行 fsync(2) - 它可能只保存到缓存中,而不是真正写入磁盘。持久性保证并不强,但对于我们简单的任务队列来说已经足够了。如果您需要更强的保证,则可以使用发布者确认

公平调度与循环调度

默认情况下,RabbitMQ 会依次将每条消息发送到下一个消费者。平均而言,每个消费者都会收到相同数量的消息。这种分发消息的方式称为循环调度。在这种模式下,调度不一定完全按我们预期的那样工作。例如,在有两个工作进程的情况下,当所有奇数消息都很重而偶数消息很轻时,一个工作进程将始终处于繁忙状态,而另一个工作进程几乎不做任何工作。好吧,RabbitMQ 对此一无所知,并且仍然会均匀地分发消息。

发生这种情况是因为 RabbitMQ 只在消息进入队列时分发消息。它不会查看消费者的未确认消息数量。它只是盲目地将每第 n 条消息分发给第 n 个消费者。

但是,“公平调度”是 Spring AMQP 的默认配置。AbstractMessageListenerContainerDEFAULT_PREFETCH_COUNT 的值定义为 250。如果 DEFAULT_PREFETCH_COUNT 设置为 1,则行为将如上所述为循环传递。

关于 prefetchCount = 1 的说明

在大多数情况下,prefetchCount 等于 1 会过于保守并严重限制消费者的吞吐量。可以在Spring AMQP 消费者文档中找到此配置适用的几个案例

有关预取的更多详细信息,请参阅消费者确认指南

但是,默认情况下,prefetchCount 设置为 250,这意味着 RabbitMQ 每次不会向工作进程发送超过 250 条消息。或者换句话说,当未确认的消息数量为 250 时,不会向工作进程分发新消息。相反,它会将消息分发给下一个未忙碌的工作进程。

可以通过 AbstractMessageListenerContainer.setPrefetchCount(int prefetchCount) 设置所需的 prefetchCount 值。

关于队列大小的说明

如果所有工作进程都处于繁忙状态,您的队列可能会填满。您需要关注这一点,并可能添加更多工作进程,或采用其他策略。

使用 Spring AMQP,您可以获得为消息确认和公平分发配置的合理值。Spring AMQP 提供的队列的默认持久性和消息的持久性允许消息即使在 RabbitMQ 重新启动后也能存活。

有关 Channel 方法和 MessageProperties 的更多信息,您可以浏览 在线 javadocs 为了理解 Spring AMQP 的底层基础,您可以找到 rabbitmq-java-client

现在我们可以继续学习 教程 3,并了解如何将同一消息传递给多个消费者。

© 2024 RabbitMQ. All rights reserved.