RabbitMQ 流教程 - “Hello World!”
简介
先决条件
本教程假设 RabbitMQ 已安装,并在 localhost
上运行,并且已启用流插件。 标准流端口 为 5552。如果您使用不同的主机、端口或凭据,则连接设置需要进行调整。
使用 Docker
如果您尚未安装 RabbitMQ,则可以在 Docker 容器中运行它
docker run -it --rm --name rabbitmq -p 5552:5552 -p 15672:15672 -p 5672:5672 \
-e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS='-rabbitmq_stream advertised_host localhost' \
rabbitmq:3.13
等待服务器启动,然后启用流和流管理插件
docker exec rabbitmq rabbitmq-plugins enable rabbitmq_stream rabbitmq_stream_management
获取帮助
如果您在学习本教程时遇到问题,可以通过邮件列表或Discord 社区服务器联系我们。
RabbitMQ Streams 在 RabbitMQ 3.9 中引入。更多信息请访问此处。
"Hello World"
(使用 NodeJs Stream 客户端)
在本教程的这一部分,我们将用 JavaScript 编写两个程序;一个生产者,发送一条消息,一个消费者,接收消息并打印出来。我们将略过 JavaScript 客户端 API 中的一些细节,专注于这个非常简单的事情,以便开始。这是 RabbitMQ Streams 的“Hello World”。
Node.js 流客户端库
RabbitMQ 支持多种协议。本教程使用 RabbitMQ 流协议,该协议是专为RabbitMQ 流设计的协议。在多种不同的语言中,有许多 RabbitMQ 客户端,请参阅每种语言的流客户端库。我们将使用由 Coders51 构建和支持的Node.js 流客户端。
该客户端支持Node.js >= 16.x。本教程将使用 Node.js 流客户端 0.3.1 版本。客户端 0.3.1 及更高版本通过npm分发。
本教程假设您在 Windows 上使用 PowerShell。在 MacOS 和 Linux 上,几乎任何 shell 都可以使用。
设置
首先,让我们验证 PATH
中是否有 Node.js 工具链
npm --help
运行该命令应该会显示一条帮助消息。
现在让我们创建一个项目
npm init
然后安装客户端
npm install rabbitmq-stream-js-client
package.json
应如下所示
{
"name": "rabbitmq-stream-node-tutorial",
"version": "1.0.0",
"description": "Tutorial for the nodejs RabbitMQ stream client",
"scripts": {
"send": "node send.js",
"receive": "node receive.js"
},
"dependencies": {
"rabbitmq-stream-js-client": "^0.3.1"
}
}
现在创建名为 receive.js
和 send.js
的新文件。现在我们已经设置了 Node.js 项目,我们可以编写一些代码了。
发送
我们将消息生产者(发送方)称为 send.js
,消息消费者(接收方)称为 receive.js
。生产者将连接到 RabbitMQ,发送一条消息,然后退出。
在send.js
中,我们需要添加客户端
const rabbit = require("rabbitmq-stream-js-client")
然后我们可以创建到服务器的连接
const client = await rabbit.connect({
hostname: "localhost",
port: 5552,
username: "guest",
password: "guest",
vhost: "/",
})
客户端的入口点是 Client
类。它用于流管理和发布者实例的创建。
它抽象了套接字连接,并为我们处理协议版本协商、身份验证等。
本教程假设流发布者和消费者连接到本地运行的 RabbitMQ 节点,即在 localhost 上。要连接到其他机器上的节点,只需在 Client
参数中指定目标主机名或 IP 地址即可。
接下来,让我们创建一个生产者。
生产者还将声明一个它将向其发布消息的流,然后发布一条消息
const streamName = "hello-nodejs-stream";
console.log("Connecting...");
const client = await rabbit.connect({
vhost: "/",
port: 5552,
hostname: "localhost",
username: "guest",
password: "guest",
});
console.log("Making sure the stream exists...");
const streamSizeRetention = 5 * 1e9
await client.createStream({ stream: streamName, arguments: { "max-length-bytes": streamSizeRetention } });
const publisher = await client.declarePublisher({ stream: streamName });
console.log("Sending a message...");
await publisher.send(Buffer.from("Test message"));
流声明操作是幂等的:只有在流不存在时才会创建它。
流是一种追加式日志抽象,允许重复使用消息,直到它们过期。始终定义保留策略是一个好习惯。在上面的示例中,流的大小限制为 5 GiB。
消息内容是一个字节数组。应用程序可以使用任何合适的格式(例如 JSON、MessagePack 等)对它们需要传输的数据进行编码。
当上面的代码运行完成后,生产者连接和流系统连接将关闭。这就是我们的生产者。
每次运行生产者时,它都会向服务器发送一条消息,并将该消息追加到流中。
完整的send.js 文件可以在 GitHub 上找到。
发送不起作用
如果这是您第一次使用 RabbitMQ 并且您没有看到“已发送”消息,那么您可能会挠头想知道哪里出了问题。也许代理在启动时没有足够的可用磁盘空间(默认情况下需要至少 50 MB 的可用空间),因此拒绝接受消息。检查代理日志文件以查看是否记录了资源警报,并在必要时降低可用磁盘空间阈值。配置指南将向您展示如何设置
disk_free_limit
。另一个原因可能是程序在消息到达代理之前就退出了。在某些客户端库中,发送是异步的:函数立即返回,但消息在通过网络传输之前先被排队到 IO 层。发送程序要求用户按一个键来结束进程:消息有足够的时间到达代理。流协议提供了一种确认机制来确保代理接收出站消息,但出于简单起见,本教程没有使用这种机制。
接收
本教程的另一部分,消费者,将连接到 RabbitMQ 节点并等待消息被推送到它。与生产者不同(在本教程中,生产者发布一条消息并停止),消费者将持续运行,使用 RabbitMQ 推送给它的消息,并打印收到的有效负载。
与 send.js
类似,receive.js
将需要使用客户端
const rabbit = require("rabbitmq-stream-js-client")
在初始设置方面,消费者部分与生产者部分非常相似;我们使用默认连接设置并声明消费者将从中消费的流。
const client = await rabbit.connect({
hostname: "localhost",
port: 5552,
username: "guest",
password: "guest",
vhost: "/",
})
const streamSizeRetention = 5 * 1e9
await client.createStream({ stream: streamName, arguments: { "max-length-bytes": streamSizeRetention } });
请注意,消费者部分也声明了流。这是为了允许生产者或消费者先启动。
我们使用 declareConsumer
方法创建消费者。我们提供一个回调来处理传递的消息。
offset
定义了消费者的起始点。在这种情况下,消费者从流中第一个可用的消息开始。
await client.declareConsumer({ stream: streamName, offset: rabbit.Offset.first() }, (message) => {
console.log(`Received message ${message.content.toString()}`)
})
完整的receive.js 文件可以在 GitHub 上找到。
综合示例
为了运行这两个示例,请打开两个终端(shell)选项卡。
本教程的两个部分都可以以任何顺序运行,因为它们都声明了流。让我们先运行消费者,以便在第一个发布者启动时,消费者会打印它
npm run receive
然后运行生产者
npm run send
消费者将打印它通过 RabbitMQ 从发布者获取的消息。消费者将继续运行,等待新的传递。尝试多次重新运行发布者以观察这一点。
流与队列的不同之处在于,它们是可重复使用的消息的追加式日志。当多个消费者从流中消费时,它们将从第一个可用消息开始。