跳至主要内容

RabbitMQ 教程 - “Hello World!”

简介

信息

先决条件

本教程假设 RabbitMQ 已安装并在 localhost 上的标准端口 (5672) 上运行。如果您使用不同的主机、端口或凭据,则连接设置需要进行调整。

获取帮助

如果您在学习本教程时遇到问题,可以通过GitHub 讨论RabbitMQ 社区 Discord联系我们。

RabbitMQ 是一个消息代理:它接收并转发消息。您可以将其视为邮局:当您将要寄出的邮件投入邮箱时,您可以确信邮递员最终会将邮件投递给您的收件人。在这个比喻中,RabbitMQ 是邮箱、邮局和邮递员。

RabbitMQ 与邮局的主要区别在于它不处理纸质邮件,而是接收、存储和转发二进制数据块——消息

RabbitMQ 和一般意义上的消息传递使用了一些术语。

  • 生产仅仅意味着发送。发送消息的程序称为生产者

  • 队列是 RabbitMQ 中邮箱的名称。尽管消息流经 RabbitMQ 和您的应用程序,但它们只能存储在队列中。队列仅受主机的内存和磁盘限制,它本质上是一个大型消息缓冲区。

    许多生产者可以发送消息到一个队列,并且许多消费者可以尝试从一个队列接收数据。

    这是我们表示队列的方式

  • 消费与接收具有相似的含义。消费者是一个程序,它主要等待接收消息

请注意,生产者、消费者和代理不必位于同一主机上;实际上,在大多数应用程序中它们并不在同一主机上。应用程序也可以同时是生产者和消费者。

“Hello World”

(使用 Spring AMQP)

在本教程的这一部分中,我们将使用 spring-amqp 库编写两个程序;一个生产者发送单个消息,一个消费者接收消息并打印出来。我们将略过 Spring AMQP API 中的一些细节,专注于这个非常简单的事情,以便开始入门。这是消息传递的“Hello World”。

在下图中,“P” 是我们的生产者,“C” 是我们的消费者。中间的框是一个队列——RabbitMQ 代表消费者保留的消息缓冲区。

Spring AMQP 框架

RabbitMQ 支持多种协议。本教程使用 AMQP 0-9-1,这是一种用于消息传递的开放式通用协议。许多不同语言中都有许多 RabbitMQ 客户端。

我们将使用 Spring Boot 来引导和配置我们的 Spring AMQP 项目。我们选择 Maven 来构建项目,但我们也可以使用 Gradle。

项目的源代码可在网上获取,但您也可以从头开始进行教程。

如果您选择后者,请打开Spring Initializr并提供:组 ID(例如 org.springframework.amqp.tutorials)工件 ID(例如 rabbitmq-amqp-tutorials)搜索 RabbitMQ 依赖项并选择 RabbitMQ 依赖项。

生成项目并将生成的项目解压缩到您选择的任何位置。现在可以将其导入您最喜欢的 IDE 中。或者,您也可以在您喜欢的编辑器中对其进行操作。

配置项目

Spring Boot 提供了许多功能,但我们这里只突出显示一些。首先,Spring Boot 应用程序可以选择通过 application.propertiesapplication.yml 文件提供其属性(还有更多选项,但这将使我们开始)。您会在生成的项目中找到一个 application.properties 文件,其中没有任何内容。将 application.properties 重命名为 application.yml 文件,并包含以下属性

spring:
profiles:
active: usage_message

logging:
level:
org: ERROR

tutorial:
client:
duration: 10000

创建一个新包 tut1,我们可以在其中放置教程代码。我们现在将创建一个 Java 配置文件 Tut1Config.java,以以下方式描述我们的 Spring bean

package org.springframework.amqp.tutorials.tut1;

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;

@Profile({"tut1","hello-world"})
@Configuration
public class Tut1Config {

@Bean
public Queue hello() {
return new Queue("hello");
}

@Profile("receiver")
@Bean
public Tut1Receiver receiver() {
return new Tut1Receiver();
}

@Profile("sender")
@Bean
public Tut1Sender sender() {
return new Tut1Sender();
}
}

请注意,我们已将第一个教程配置文件定义为 tut1、包名或 hello-world。我们使用 @Configuration 注解让 Spring 知道这是一个 Java 配置,并在其中创建我们队列 (“hello”) 的定义,并定义我们的 SenderReceiver bean。

我们现在将通过 Boot 应用程序运行所有教程,只需传入我们正在使用的配置文件即可。为了启用此功能,我们将使用以下内容修改生成的 RabbitAmqpTutorialsApplication

import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Profile;
import org.springframework.scheduling.annotation.EnableScheduling;

@SpringBootApplication
@EnableScheduling
public class RabbitAmqpTutorialsApplication {

@Profile("usage_message")
@Bean
public CommandLineRunner usage() {
return args -> {
System.out.println("This app uses Spring Profiles to
control its behavior.\n");
System.out.println("Sample usage: java -jar
rabbit-tutorials.jar
--spring.profiles.active=hello-world,sender");
};
}

@Profile("!usage_message")
@Bean
public CommandLineRunner tutorial() {
return new RabbitAmqpTutorialsRunner();
}

public static void main(String[] args) throws Exception {
SpringApplication.run(RabbitAmqpTutorialsApplication.class, args);
}
}

并添加 RabbitAmqpTutorialsRunner 类,如下所示

package org.springframework.amqp.tutorials.tut1;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.context.ConfigurableApplicationContext;

public class RabbitAmqpTutorialsRunner implements CommandLineRunner {

@Value("${tutorial.client.duration:0}")
private int duration;

@Autowired
private ConfigurableApplicationContext ctx;

@Override
public void run(String... arg0) throws Exception {
System.out.println("Ready ... running for " + duration + "ms");
Thread.sleep(duration);
ctx.close();
}
}

发送

现在,发送者和接收者类中需要很少的代码。让我们分别称它们为 Tut1ReceiverTut1Sender。发送者利用我们的配置和 RabbitTemplate 来发送消息。

// Sender
package org.springframework.amqp.tutorials.tut1;

import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;

public class Tut1Sender {

@Autowired
private RabbitTemplate template;

@Autowired
private Queue queue;

@Scheduled(fixedDelay = 1000, initialDelay = 500)
public void send() {
String message = "Hello World!";
this.template.convertAndSend(queue.getName(), message);
System.out.println(" [x] Sent '" + message + "'");
}
}

您会注意到 Spring AMQP 移除了样板代码,只让您关注消息传递的逻辑。我们在 Tut1Config 类中定义的 bean 定义中自动连接了队列,并且像许多 Spring 连接抽象一样,我们将样板 RabbitMQ 客户端类包装在一个 RabbitTemplate 中,该模板可以自动连接到发送者。剩下的就是创建一个消息并调用模板的 convertAndSend 方法,传入我们定义的 bean 中的队列名称以及我们刚刚创建的消息。

发送失败!

如果这是您第一次使用 RabbitMQ,并且您没有看到“已发送”消息,那么您可能会挠头想知道可能出了什么问题。也许代理在启动时没有足够的可用磁盘空间(默认情况下,它需要至少 50 MB 的可用空间),因此拒绝接收消息。检查代理日志文件以查看是否记录了资源警报,并在必要时降低可用磁盘空间阈值。配置指南将向您展示如何设置 disk_free_limit

接收

接收器同样简单。我们使用 @RabbitListener 注解我们的接收器类,并传入队列的名称。然后,我们使用 @RabbitHandler 注解我们的 receive 方法,传入已推送到队列的有效负载。

package org.springframework.amqp.tutorials.tut1;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;

@RabbitListener(queues = "hello")
public class Tut1Receiver {

@RabbitHandler
public void receive(String in) {
System.out.println(" [x] Received '" + in + "'");
}
}

整合

我们现在必须构建 JAR 文件

./mvnw clean package

应用程序使用 Spring Profiles 来控制它正在运行的教程以及它是否为发送者或接收者。要运行接收器,请执行以下命令

# consumer
java -jar target/rabbitmq-tutorials.jar --spring.profiles.active=hello-world,receiver

打开另一个 shell 以运行发送者

# sender
java -jar target/rabbitmq-tutorials.jar --spring.profiles.active=hello-world,sender

列出队列

您可能希望查看 RabbitMQ 有哪些队列以及每个队列中有多少消息。您可以(作为特权用户)使用 rabbitmqctl 工具执行此操作

sudo rabbitmqctl list_queues

在 Windows 上,省略 sudo

rabbitmqctl.bat list_queues

是时候继续学习第 2 部分并构建一个简单的工作队列了。

© 2024 RabbitMQ. All rights reserved.