RabbitMQ 教程 - “Hello World!”
简介
RabbitMQ 是一个消息代理:它接收并转发消息。您可以将其视为邮局:当您将要寄出的邮件投入邮箱时,您可以确信邮递员最终会将邮件投递给您的收件人。在这个比喻中,RabbitMQ 是邮箱、邮局和邮递员。
RabbitMQ 与邮局的主要区别在于它不处理纸质邮件,而是接收、存储和转发二进制数据块——消息。
RabbitMQ 和一般意义上的消息传递使用了一些术语。
-
生产仅仅意味着发送。发送消息的程序称为生产者
-
队列是 RabbitMQ 中邮箱的名称。尽管消息流经 RabbitMQ 和您的应用程序,但它们只能存储在队列中。队列仅受主机的内存和磁盘限制,它本质上是一个大型消息缓冲区。
许多生产者可以发送消息到一个队列,并且许多消费者可以尝试从一个队列接收数据。
这是我们表示队列的方式
-
消费与接收具有相似的含义。消费者是一个程序,它主要等待接收消息
请注意,生产者、消费者和代理不必位于同一主机上;实际上,在大多数应用程序中它们并不在同一主机上。应用程序也可以同时是生产者和消费者。
“Hello World”
(使用 Spring AMQP)
在本教程的这一部分中,我们将使用 spring-amqp 库编写两个程序;一个生产者发送单个消息,一个消费者接收消息并打印出来。我们将略过 Spring AMQP API 中的一些细节,专注于这个非常简单的事情,以便开始入门。这是消息传递的“Hello World”。
在下图中,“P” 是我们的生产者,“C” 是我们的消费者。中间的框是一个队列——RabbitMQ 代表消费者保留的消息缓冲区。
Spring AMQP 框架
RabbitMQ 支持多种协议。本教程使用 AMQP 0-9-1,这是一种用于消息传递的开放式通用协议。许多不同语言中都有许多 RabbitMQ 客户端。
我们将使用 Spring Boot 来引导和配置我们的 Spring AMQP 项目。我们选择 Maven 来构建项目,但我们也可以使用 Gradle。
项目的源代码可在网上获取,但您也可以从头开始进行教程。
如果您选择后者,请打开Spring Initializr并提供:组 ID(例如 org.springframework.amqp.tutorials
)工件 ID(例如 rabbitmq-amqp-tutorials
)搜索 RabbitMQ 依赖项并选择 RabbitMQ 依赖项。
生成项目并将生成的项目解压缩到您选择的任何位置。现在可以将其导入您最喜欢的 IDE 中。或者,您也可以在您喜欢的编辑器中对其进行操作。
配置项目
Spring Boot 提供了许多功能,但我们这里只突出显示一些。首先,Spring Boot 应用程序可以选择通过 application.properties
或 application.yml
文件提供其属性(还有更多选项,但这将使我们开始)。您会在生成的项目中找到一个 application.properties
文件,其中没有任何内容。将 application.properties 重命名为 application.yml
文件,并包含以下属性
spring:
profiles:
active: usage_message
logging:
level:
org: ERROR
tutorial:
client:
duration: 10000
创建一个新包 tut1
,我们可以在其中放置教程代码。我们现在将创建一个 Java 配置文件 Tut1Config.java
,以以下方式描述我们的 Spring bean
package org.springframework.amqp.tutorials.tut1;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
@Profile({"tut1","hello-world"})
@Configuration
public class Tut1Config {
@Bean
public Queue hello() {
return new Queue("hello");
}
@Profile("receiver")
@Bean
public Tut1Receiver receiver() {
return new Tut1Receiver();
}
@Profile("sender")
@Bean
public Tut1Sender sender() {
return new Tut1Sender();
}
}
请注意,我们已将第一个教程配置文件定义为 tut1
、包名或 hello-world
。我们使用 @Configuration
注解让 Spring 知道这是一个 Java 配置,并在其中创建我们队列 (“hello”) 的定义,并定义我们的 Sender
和 Receiver
bean。
我们现在将通过 Boot 应用程序运行所有教程,只需传入我们正在使用的配置文件即可。为了启用此功能,我们将使用以下内容修改生成的 RabbitAmqpTutorialsApplication
类
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Profile;
import org.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication
@EnableScheduling
public class RabbitAmqpTutorialsApplication {
@Profile("usage_message")
@Bean
public CommandLineRunner usage() {
return args -> {
System.out.println("This app uses Spring Profiles to
control its behavior.\n");
System.out.println("Sample usage: java -jar
rabbit-tutorials.jar
--spring.profiles.active=hello-world,sender");
};
}
@Profile("!usage_message")
@Bean
public CommandLineRunner tutorial() {
return new RabbitAmqpTutorialsRunner();
}
public static void main(String[] args) throws Exception {
SpringApplication.run(RabbitAmqpTutorialsApplication.class, args);
}
}
并添加 RabbitAmqpTutorialsRunner
类,如下所示
package org.springframework.amqp.tutorials.tut1;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.context.ConfigurableApplicationContext;
public class RabbitAmqpTutorialsRunner implements CommandLineRunner {
@Value("${tutorial.client.duration:0}")
private int duration;
@Autowired
private ConfigurableApplicationContext ctx;
@Override
public void run(String... arg0) throws Exception {
System.out.println("Ready ... running for " + duration + "ms");
Thread.sleep(duration);
ctx.close();
}
}
发送
现在,发送者和接收者类中需要很少的代码。让我们分别称它们为 Tut1Receiver
和 Tut1Sender
。发送者利用我们的配置和 RabbitTemplate
来发送消息。
// Sender
package org.springframework.amqp.tutorials.tut1;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
public class Tut1Sender {
@Autowired
private RabbitTemplate template;
@Autowired
private Queue queue;
@Scheduled(fixedDelay = 1000, initialDelay = 500)
public void send() {
String message = "Hello World!";
this.template.convertAndSend(queue.getName(), message);
System.out.println(" [x] Sent '" + message + "'");
}
}
您会注意到 Spring AMQP 移除了样板代码,只让您关注消息传递的逻辑。我们在 Tut1Config
类中定义的 bean 定义中自动连接了队列,并且像许多 Spring 连接抽象一样,我们将样板 RabbitMQ 客户端类包装在一个 RabbitTemplate
中,该模板可以自动连接到发送者。剩下的就是创建一个消息并调用模板的 convertAndSend
方法,传入我们定义的 bean 中的队列名称以及我们刚刚创建的消息。
发送失败!
如果这是您第一次使用 RabbitMQ,并且您没有看到“已发送”消息,那么您可能会挠头想知道可能出了什么问题。也许代理在启动时没有足够的可用磁盘空间(默认情况下,它需要至少 50 MB 的可用空间),因此拒绝接收消息。检查代理日志文件以查看是否记录了资源警报,并在必要时降低可用磁盘空间阈值。配置指南将向您展示如何设置
disk_free_limit
。
接收
接收器同样简单。我们使用 @RabbitListener
注解我们的接收器类,并传入队列的名称。然后,我们使用 @RabbitHandler
注解我们的 receive
方法,传入已推送到队列的有效负载。
package org.springframework.amqp.tutorials.tut1;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
@RabbitListener(queues = "hello")
public class Tut1Receiver {
@RabbitHandler
public void receive(String in) {
System.out.println(" [x] Received '" + in + "'");
}
}
整合
我们现在必须构建 JAR 文件
./mvnw clean package
应用程序使用 Spring Profiles 来控制它正在运行的教程以及它是否为发送者或接收者。要运行接收器,请执行以下命令
# consumer
java -jar target/rabbitmq-tutorials.jar --spring.profiles.active=hello-world,receiver
打开另一个 shell 以运行发送者
# sender
java -jar target/rabbitmq-tutorials.jar --spring.profiles.active=hello-world,sender
列出队列
您可能希望查看 RabbitMQ 有哪些队列以及每个队列中有多少消息。您可以(作为特权用户)使用
rabbitmqctl
工具执行此操作sudo rabbitmqctl list_queues
在 Windows 上,省略 sudo
rabbitmqctl.bat list_queues
是时候继续学习第 2 部分并构建一个简单的工作队列了。