RabbitMQ Stream 教程 - “Hello World!”
简介
先决条件
本教程假设 RabbitMQ 已安装,并在 localhost
上运行,并且流插件已启用。标准流端口为 5552。如果您使用不同的主机、端口或凭据,则连接设置需要进行调整。
使用 Docker
如果您尚未安装 RabbitMQ,则可以在 Docker 容器中运行它
docker run -it --rm --name rabbitmq -p 5552:5552 -p 15672:15672 -p 5672:5672 \
-e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS='-rabbitmq_stream advertised_host localhost' \
rabbitmq:3.13
等待服务器启动,然后启用流和流管理插件
docker exec rabbitmq rabbitmq-plugins enable rabbitmq_stream rabbitmq_stream_management
获取帮助
如果您在学习本教程时遇到问题,可以通过邮件列表或Discord 社区服务器联系我们。
RabbitMQ Streams 在 RabbitMQ 3.9 中引入。更多信息请参见此处。
"Hello World"
(使用 Java Stream 客户端)
在本教程的这一部分,我们将用 Java 编写两个程序;一个生产者,发送一条消息,以及一个消费者,接收消息并打印出来。我们将略过 Java 客户端 API 中的一些细节,专注于这个非常简单的事情,以便开始。这是 RabbitMQ Streams 的“Hello World”。
Java 流客户端库
RabbitMQ 支持多种协议。本教程使用 RabbitMQ 流协议,该协议是专为RabbitMQ 流设计的协议。多种不同语言中都有许多 RabbitMQ 客户端,请参阅每种语言的流客户端库。我们将使用 RabbitMQ 提供的Java 流客户端。
RabbitMQ Java 客户端 0.16.0 及更高版本通过Maven 存储库分发。
本教程假设您在 Windows 上使用 PowerShell。在 macOS 和 Linux 上,几乎任何 Shell 都可以工作。
设置
本教程需要将 java
添加到 PATH
中。要验证它,请运行
java --help
本教程将使用Maven 来管理依赖项并构建项目。无需安装 Maven,因为本教程使用Maven Wrapper。本教程的可执行版本可在RabbitMQ 教程存储库中找到。
接下来,让我们验证 Maven 是否正常工作
./mvnw --version
接下来,创建一个包含 RabbitMQ Stream Java 客户端作为依赖项的 pom.xml
文件
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>stream-client</artifactId>
<version>0.16.0</version>
</dependency>
发送
接下来,让我们为本教程的消息生产者(发送方)和消息消费者(接收方)部分创建两个文件。它们将分别称为Send.java
和Receiver.java
。
生产者将连接到 RabbitMQ,发送一条消息,然后退出。消费者将接收消息并将其打印到标准输出。
在 Send.java
的顶部,导入本教程使用的一些关键类
import com.rabbitmq.stream.*;
import java.io.IOException;
导入这些类后,现在可以实例化一个 Environment
Environment environment = Environment.builder().build();
流 Java 客户端的入口点是 Environment
。它用于配置 RabbitMQ 流发布者、流消费者和流本身。
它抽象了 TCP 或 TLS 套接字连接,并为我们处理协议版本协商、身份验证等。
本教程假设流发布者和消费者连接到本地运行的 RabbitMQ 节点,即 localhost。要连接到其他机器上的节点,只需使用 Environment.builder()
返回的构建器指定目标主机名或 IP 地址即可。
接下来,让我们创建一个生产者。
生产者还将声明一个它将向其发布消息的流,然后发布一条消息
String stream = "hello-java-stream";
environment.streamCreator().stream(stream).maxLengthBytes(ByteCapacity.GB(5)).create();
Producer producer = environment.producerBuilder().stream(stream).build();
producer.send(producer.messageBuilder().addData("Hello, World!".getBytes()).build(), null);
System.out.println(" [x] 'Hello, World!' message sent");
流声明操作是幂等的:只有在流不存在时才会创建它。
流是一种追加式日志抽象,允许重复使用消息,直到它们过期。始终定义保留策略是一个好习惯。在上面的示例中,流的大小限制为 5 GiB。
消息内容是一个字节数组。应用程序可以使用任何合适的格式(如 JSON、MessagePack 等)对它们需要传输的数据进行编码。
当上面的代码执行完毕时,生产者连接和流系统连接将关闭。这就是我们的生产者。
每次运行生产者时,它都会向服务器发送一条消息,并将该消息追加到流中。
完整的Send.java
文件可以在 GitHub 上找到。
发送不起作用!
如果这是您第一次使用 RabbitMQ,并且您没有看到“已发送”消息,那么您可能会挠头,想知道哪里出了问题。也许代理在启动时没有足够的可用磁盘空间(默认情况下需要至少 50 MB 的可用空间),因此拒绝接受消息。检查代理日志文件,查看是否记录了资源警报,并在必要时降低可用磁盘空间阈值。配置指南将向您展示如何设置
disk_free_limit
。另一个原因可能是程序在消息到达代理之前就退出了。在某些客户端库中,发送是异步的:函数立即返回,但消息在通过网络传输之前会排队到 IO 层。发送程序要求用户按一个键来完成该过程:消息有足够的时间到达代理。流协议提供了一种确认机制来确保代理接收出站消息,但出于简单起见,本教程不使用此机制。
接收
本教程的另一部分,消费者,将连接到 RabbitMQ 节点并等待消息被推送到它。与本教程中发布一条消息并停止的生产者不同,消费者将持续运行,接收 RabbitMQ 推送到它的消息,并将接收到的有效负载打印出来。
与 Send.java
类似,Receive.java
首先需要导入一些类
import com.rabbitmq.stream.ByteCapacity;
import com.rabbitmq.stream.Consumer;
import com.rabbitmq.stream.Environment;
import com.rabbitmq.stream.OffsetSpecification;
在初始设置方面,消费者部分与生产者部分非常相似;我们使用默认连接设置并声明消费者将从中接收消息的流。
请注意,流名称必须与生产者使用的名称匹配。
Environment environment = Environment.builder().build();
String stream = "hello-java-stream";
environment.streamCreator().stream(stream).maxLengthBytes(ByteCapacity.GB(5)).create();
请注意,消费者部分也声明了流。这样做是为了允许生产者或消费者中的任何一方先启动。
Consumer
类用于实例化流消费者,environment.consumerBuilder()
提供一个构建器对象来配置它。最后,.messageHandler
方法接受已传递消息的处理程序。
offset
参数定义了消费者的起点。在这种情况下,消费者从流中第一个可用的消息开始。
Consumer consumer = environment.consumerBuilder()
.stream(stream)
.offset(OffsetSpecification.first())
.messageHandler((unused, message) -> {
System.out.println("Received message: " + new String(message.getBodyAsBinary()));
}).build();
完整的Receive.java
文件可以在 GitHub 上找到。
综合示例
为了运行这两个示例,请打开两个终端(Shell)选项卡。
本教程的这两个部分可以按任何顺序运行,因为它们都声明了流。让我们先运行消费者,以便在第一个发布者启动时,消费者会打印它
./mvnw -q compile exec:java '-Dexec.mainClass=Receive'
然后运行生产者
./mvnw -q compile exec:java '-Dexec.mainClass=Send'
消费者将打印它通过 RabbitMQ 从发布者接收到的消息。消费者将继续运行,等待新的传递。尝试重新运行发布者几次以观察这一点。
流与队列的不同之处在于,它们是可重复使用的消息的追加式日志。当多个消费者从流中消费时,它们将从第一个可用消息开始。