RabbitMQ Stream 教程 - “Hello World!”
简介
先决条件
本教程假定 RabbitMQ 已安装,正在localhost上运行,并且stream 插件已启用。标准的 stream 端口是 5552。如果您使用不同的主机、端口或凭据,则需要调整连接设置。
使用 Docker
如果您没有安装 RabbitMQ,可以在 Docker 容器中运行它
docker run -it --rm --name rabbitmq -p 5552:5552 -p 15672:15672 -p 5672:5672 \
-e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS='-rabbitmq_stream advertised_host localhost' \
rabbitmq:4-management
等待服务器启动,然后启用 stream 和 stream management 插件
docker exec rabbitmq rabbitmq-plugins enable rabbitmq_stream rabbitmq_stream_management
哪里寻求帮助
如果您在学习本教程时遇到困难,可以通过邮件列表或Discord 社区服务器与我们联系。
RabbitMQ Streams 在 RabbitMQ 3.9 中引入。更多信息请参见此处。
"Hello World"
(使用 Java Stream Client)
在本教程的这一部分中,我们将编写两个 Java 程序:一个是发送单条消息的生产者,另一个是接收并打印消息的消费者。为了方便入门,我们将略过 Java 客户端 API 的一些细节,重点关注这个非常简单的功能。这就是 RabbitMQ Streams 的“Hello World”。
Java Stream 客户端库
RabbitMQ 支持多种协议。本教程使用 RabbitMQ 流协议,这是专为 RabbitMQ 流设计的协议。RabbitMQ 有多种不同语言的客户端,请参阅各语言的流客户端库。我们将使用 RabbitMQ 提供的 Java 流客户端。
RabbitMQ Java 客户端 0.16.0 及更高版本通过 Maven 仓库发布。
本教程假设您在 Windows 上使用 PowerShell。在 MacOS 和 Linux 上,几乎任何 shell 都可以工作。
设置
本教程要求 java 命令在 PATH 环境变量中。要验证这一点,请运行
java --help
本教程将使用 Maven 来管理依赖项并构建项目。由于教程使用了 Maven Wrapper,因此无需安装 Maven。本教程的可执行版本可以在 RabbitMQ 教程仓库中找到。
接下来,让我们验证 Maven 是否正常工作
./mvnw --version
接下来,创建一个 pom.xml 文件,并将 RabbitMQ Stream Java 客户端作为依赖项添加进去
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>stream-client</artifactId>
<version>0.16.0</version>
</dependency>
发送
接下来,让我们为本教程的消息生产者(发送者)和消息消费者(接收者)创建两个文件。它们分别命名为 Send.java 和 Receiver.java。
生产者将连接到 RabbitMQ,发送一条消息,然后退出。消费者将消费该消息并将其打印到标准输出。
在 Send.java 的顶部,导入本教程使用的一些关键类
import com.rabbitmq.stream.*;
import java.io.IOException;
导入这些类后,现在可以实例化一个 Environment
Environment environment = Environment.builder().build();
Stream Java 客户端的入口点是 Environment。它用于配置 RabbitMQ 流发布者、流消费者以及流本身。
它抽象了 TCP 或 TLS 套接字连接,并为我们处理协议版本协商、身份验证等工作。
本教程假设流发布者和消费者连接到本地运行的 RabbitMQ 节点,即 localhost。若要连接到不同机器上的节点,只需使用 Environment.builder() 返回的构建器指定目标主机名或 IP 地址即可。
接下来,让我们创建一个生产者。
生产者还将声明一个它将发布消息的 stream,然后发布一条消息
String stream = "hello-java-stream";
environment.streamCreator().stream(stream).maxLengthBytes(ByteCapacity.GB(5)).create();
Producer producer = environment.producerBuilder().stream(stream).build();
producer.send(producer.messageBuilder().addData("Hello, World!".getBytes()).build(), null);
System.out.println(" [x] 'Hello, World!' message sent");
stream 声明操作是幂等的:只有当 stream 不存在时才会创建它。
stream 是一个追加式日志抽象,允许在消息过期之前反复消费。定义保留策略是一个好习惯。在上面的示例中,stream 的大小限制为 5 GiB。
消息内容是字节数组。应用程序可以使用任何合适的格式(如 JSON、MessagePack 等)来编码需要传输的数据。
当上面的代码运行完毕后,生产者连接和 stream-system 连接将被关闭。我们的生产者就完成了。
每次运行生产者时,它都会向服务器发送一条消息,该消息将被追加到 stream 中。
完整的 Send.java 文件可在 GitHub 上找到。
发送不起作用!
如果这是您第一次使用 RabbitMQ,但没有看到“Sent”消息,您可能会感到困惑,不知道哪里出了问题。也许代理启动时磁盘空间不足(默认需要至少 50 MB 可用空间),因此拒绝接收消息。检查代理的日志文件,看看是否有资源警报已记录,并在必要时降低可用磁盘空间阈值。配置指南将展示如何设置
disk_free_limit。另一个原因可能是程序在消息发送到代理之前就退出了。在某些客户端库中,发送是异步的:函数立即返回,但消息在发送到网络之前被放入 IO 层进行排队。发送程序要求用户按键完成进程:这样消息就有充足的时间到达代理。Stream 协议提供了一个确认机制来确保代理接收到出站消息,但为了简单起见,本教程不使用此机制。
接收
本教程的另一部分,即消费者,将连接到 RabbitMQ 节点并等待消息被推送到它。与本教程中的生产者不同,生产者发送一条消息然后停止,消费者将持续运行,消费 RabbitMQ 推送给它的消息,并打印收到的负载。
与 Send.java 类似,Receive.java 首先也需要导入一些类
import com.rabbitmq.stream.ByteCapacity;
import com.rabbitmq.stream.Consumer;
import com.rabbitmq.stream.Environment;
import com.rabbitmq.stream.OffsetSpecification;
在初始设置方面,消费者部分与生产者部分非常相似;我们使用默认的连接设置,并声明消费者将从中消费的 stream。
请注意,流名称必须与生产者使用的名称一致。
Environment environment = Environment.builder().build();
String stream = "hello-java-stream";
environment.streamCreator().stream(stream).maxLengthBytes(ByteCapacity.GB(5)).create();
请注意,消费者部分也声明了 stream。这是为了允许任何一方先启动,无论是生产者还是消费者。
Consumer 类用于实例化流消费者,environment.consumerBuilder() 提供了一个用于配置它的构建器对象。最后,.messageHandler 方法接受一个用于处理已送达消息的处理器。
offset 参数定义了消费者的起始位置。在本例中,消费者从流中可用的第一条消息开始消费。
Consumer consumer = environment.consumerBuilder()
.stream(stream)
.offset(OffsetSpecification.first())
.messageHandler((unused, message) -> {
System.out.println("Received message: " + new String(message.getBodyAsBinary()));
}).build();
完整的 Receive.java 文件可在 GitHub 上找到。
整合
要运行这两个示例,请打开两个终端(shell)标签页。
本教程的两个部分可以按任何顺序运行,因为它们都声明了 stream。让我们先运行消费者,这样当第一个发布者启动时,消费者就会打印出它
./mvnw -q compile exec:java '-Dexec.mainClass=Receive'
然后运行发布者
./mvnw -q compile exec:java '-Dexec.mainClass=Send'
消费者将打印它通过 RabbitMQ 从发布者那里收到的消息。消费者将继续运行,等待新的交付。尝试多次重新运行发布者来观察这一点。
Streams 与队列不同,它们是消息的追加式日志,可以被反复消费。当多个消费者从一个 stream 消费时,它们将从第一条可用消息开始。