RabbitMQ 教程 - "Hello World!"
简介
先决条件
本教程假设 RabbitMQ 已 安装 并在 localhost 上的 标准端口 (5672) 上运行。如果您使用不同的主机、端口或凭据,则需要调整连接设置。
哪里寻求帮助
如果您在学习本教程时遇到困难,可以通过 GitHub Discussions 或 RabbitMQ 社区 Discord 联系我们。
RabbitMQ 是一个消息代理:它接收并转发消息。你可以把它想象成一个邮局:当你把想要寄出的邮件放进邮箱时,你可以确信邮递员最终会将邮件送到你的收件人手中。在这个比喻中,RabbitMQ 就是邮箱、邮局和邮递员。
RabbitMQ 和邮局的主要区别在于,它不处理纸张,而是接收、存储和转发数据的二进制数据块——消息。
RabbitMQ 和消息传递通常会使用一些行话。
-
发送无非就是发送。发送消息的程序就是生产者。
-
队列 是 RabbitMQ 中邮箱的名称。虽然消息在 RabbitMQ 和你的应用程序之间流动,但它们只能存储在队列 中。队列 的大小仅受主机内存和磁盘空间的限制,它本质上是一个大的消息缓冲区。
许多生产者 可以发送消息到同一个队列,许多消费者 可以尝试从同一个队列 接收数据。
这就是我们表示队列的方式
-
消费 的含义与接收类似。消费者 是一个主要等待接收消息的程序。
请注意,生产者、消费者和代理不一定需要驻留在同一台主机上;事实上,在大多数应用程序中它们都不会。一个应用程序也可以同时是生产者和消费者。
"Hello World"
(使用 Spring AMQP)
在本教程部分,我们将使用 spring-amqp 库编写两个程序:一个生产者用于发送单条消息,一个消费者用于接收消息并打印出来。我们将忽略 Spring AMQP API 中的一些细节,仅专注于这个非常简单的入门内容。这是消息传递的“Hello World”。
在下面的图表中,“P”是我们的生产者,“C”是我们的消费者。中间的框是一个队列——RabbitMQ 代表消费者存储的消息缓冲区。
Spring AMQP 框架
RabbitMQ 支持多种协议。本教程使用 AMQP 0-9-1,这是一种开放的通用消息传递协议。在 多种不同语言 中都有许多 RabbitMQ 客户端。
我们将使用 Spring Boot 来引导和配置我们的 Spring AMQP 项目。我们选择 Maven 来构建项目,但也可以使用 Gradle。
项目的源代码 在线可用,但您也可以从头开始完成教程。
如果选择后者,请打开 Spring Initializr 并提供:组 ID(例如,org.springframework.amqp.tutorials),工件 ID(例如,rabbitmq-amqp-tutorials)。搜索 RabbitMQ 依赖项并选择它。
生成项目并将其解压缩到您选择的位置。现在可以将它导入您喜欢的 IDE。或者,您也可以在您喜欢的编辑器中处理它。
配置项目
Spring Boot 提供了许多功能,但我们在此仅重点介绍几项。首先,Spring Boot 应用程序可以选择通过 application.properties 或 application.yml 文件提供其属性(还有许多其他选项,但这足以让我们开始)。您将在生成的项目中找到一个空的 application.properties 文件。将 application.properties 重命名为 application.yml 文件,并添加以下属性:
spring:
profiles:
active: usage_message
logging:
level:
org: ERROR
tutorial:
client:
duration: 10000
创建一个新的包 tut1,我们将在此处放置教程代码。现在我们将创建一个 Java 配置类 Tut1Config.java 来按以下方式描述我们的 Spring bean:
package org.springframework.amqp.tutorials.tut1;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
@Profile({"tut1","hello-world"})
@Configuration
public class Tut1Config {
@Bean
public Queue hello() {
return new Queue("hello");
}
@Profile("receiver")
@Bean
public Tut1Receiver receiver() {
return new Tut1Receiver();
}
@Profile("sender")
@Bean
public Tut1Sender sender() {
return new Tut1Sender();
}
}
请注意,我们已将第一个教程配置文件定义为 tut1(包名)或 hello-world。我们使用 @Configuration 注解来让 Spring 知道这是一个 Java 配置,并在其中创建我们的 Queue(“hello”)的定义,并定义我们的 Sender 和 Receiver bean。
我们将通过简单地传入我们正在使用的配置文件来运行所有教程。为了实现这一点,我们将修改生成的 RabbitAmqpTutorialsApplication 类,如下所示:
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Profile;
import org.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication
@EnableScheduling
public class RabbitAmqpTutorialsApplication {
@Profile("usage_message")
@Bean
public CommandLineRunner usage() {
return args -> {
System.out.println("This app uses Spring Profiles to
control its behavior.\n");
System.out.println("Sample usage: java -jar
rabbit-tutorials.jar
--spring.profiles.active=hello-world,sender");
};
}
@Profile("!usage_message")
@Bean
public CommandLineRunner tutorial() {
return new RabbitAmqpTutorialsRunner();
}
public static void main(String[] args) throws Exception {
SpringApplication.run(RabbitAmqpTutorialsApplication.class, args);
}
}
并添加 RabbitAmqpTutorialsRunner 类,如下所示:
package org.springframework.amqp.tutorials.tut1;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.context.ConfigurableApplicationContext;
public class RabbitAmqpTutorialsRunner implements CommandLineRunner {
@Value("${tutorial.client.duration:0}")
private int duration;
@Autowired
private ConfigurableApplicationContext ctx;
@Override
public void run(String... arg0) throws Exception {
System.out.println("Ready ... running for " + duration + "ms");
Thread.sleep(duration);
ctx.close();
}
}
发送
现在,在发送者和接收者类中需要很少的代码。我们将它们命名为 Tut1Receiver 和 Tut1Sender。发送者利用我们的配置和 RabbitTemplate 来发送消息。
// Sender
package org.springframework.amqp.tutorials.tut1;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
public class Tut1Sender {
@Autowired
private RabbitTemplate template;
@Autowired
private Queue queue;
@Scheduled(fixedDelay = 1000, initialDelay = 500)
public void send() {
String message = "Hello World!";
this.template.convertAndSend(queue.getName(), message);
System.out.println(" [x] Sent '" + message + "'");
}
}
您会注意到 Spring AMQP 删除了样板代码,让您只需关注消息传递的逻辑。我们在 Tut1Config 类中自动注入了在我们 bean 定义中配置的队列,并且像许多 Spring 连接抽象一样,我们用一个可以自动注入到发送者中的 RabbitTemplate 来包装样板 RabbitMQ 客户端类。剩下的就是创建一个消息并调用模板的 convertAndSend 方法,传入 bean 中定义的队列名称和我们刚刚创建的消息。
发送不起作用!
如果这是您第一次使用 RabbitMQ,但没有看到“Sent”消息,您可能会感到困惑,不知道哪里出了问题。也许代理启动时磁盘空间不足(默认需要至少 50 MB 可用空间),因此拒绝接收消息。检查代理的日志文件,看看是否有资源警报已记录,并在必要时降低可用磁盘空间阈值。配置指南将展示如何设置
disk_free_limit。
接收
接收者同样简单。我们使用 @RabbitListener 注解我们的接收者类,并传入队列名称。然后,我们使用 @RabbitHandler 注解我们的 receive 方法,传入推送到队列的载荷。
package org.springframework.amqp.tutorials.tut1;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
@RabbitListener(queues = "hello")
public class Tut1Receiver {
@RabbitHandler
public void receive(String in) {
System.out.println(" [x] Received '" + in + "'");
}
}
总而言之
现在我们必须构建 JAR 文件
./mvnw clean package
该应用程序使用 Spring Profiles 来控制它正在运行的教程,以及它是发送者还是接收者。要运行接收者,请执行以下命令:
# consumer
java -jar target/rabbitmq-tutorials.jar --spring.profiles.active=hello-world,receiver
打开另一个 shell 来运行发送者:
# sender
java -jar target/rabbitmq-tutorials.jar --spring.profiles.active=hello-world,sender
列出队列
您可能希望查看 RabbitMQ 有哪些队列以及其中有多少消息。您可以使用
rabbitmqctl工具(作为特权用户)来完成此操作。sudo rabbitmqctl list_queues在 Windows 上,省略 sudo。
rabbitmqctl.bat list_queues
现在是时候进入 第 2 部分,构建一个简单的工作队列了。