跳至主要内容

RabbitMQ Stream 教程 - "Hello World!"

简介

info

先决条件

本教程假设 RabbitMQ 已 安装,运行在 localhost 上,并且 流插件 已启用。 标准流端口 为 5552。如果您使用不同的主机、端口或凭据,则连接设置需要调整。

使用 docker

如果您没有安装 RabbitMQ,您可以在 Docker 容器中运行它

docker run -it --rm --name rabbitmq -p 5552:5552 -p 15672:15672 -p 5672:5672  \
-e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS='-rabbitmq_stream advertised_host localhost' \
rabbitmq:3.13

等待服务器启动,然后启用流和流管理插件

docker exec rabbitmq rabbitmq-plugins enable rabbitmq_stream rabbitmq_stream_management 

哪里可以获得帮助

如果您在学习本教程时遇到困难,您可以通过 邮件列表discord 社区服务器 联系我们。

RabbitMQ Streams 在 RabbitMQ 3.9 中推出。更多信息请访问 这里

"Hello World"

(使用 Go Stream 客户端)

在本教程的这一部分,我们将用 Go 编写两个程序;一个发送单个消息的生产者,以及一个接收消息并打印出来的消费者。我们将略过 Go 客户端 API 中的一些细节,专注于这个非常简单的事情,以开始入门。这是 RabbitMQ Streams 的“Hello World”。

Go Stream 客户端库

RabbitMQ 支持多种协议。本教程使用 RabbitMQ 流协议,该协议是专为 RabbitMQ Streams 设计的协议。许多不同的语言都有许多用于 RabbitMQ 的客户端,请参阅每种语言的流客户端库。我们将使用 RabbitMQ 提供的 Go Stream 客户端

RabbitMQ Go 客户端 1.4 及更高版本通过 go get 分发。

本教程假设您在 Windows 上使用 powershell。在 MacOS 和 Linux 上,几乎任何 shell 都可以工作。

设置

首先,让我们验证您是否在 PATH 中拥有 Go 工具链

go help

运行该命令应该会生成一个帮助消息。

本教程的可执行版本可以在 RabbitMQ 教程存储库 中找到。

现在让我们创建项目

mkdir go-stream
cd go-stream
go mod init github.com/rabbitmq/rabbitmq-tutorials
go get -u github.com/rabbitmq/rabbitmq-stream-go-client

现在我们已经设置了 Go 项目,我们可以编写一些代码。

发送

我们将消息生产者(发送者)称为 send.go,消息消费者(接收者)称为 receive.go。生产者将连接到 RabbitMQ,发送单个消息,然后退出。

send.go 中,我们需要导入一些包

import (
"bufio"
"fmt"
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp"
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream"
"log"
"os"
)

然后我们可以创建与服务器的连接

env, err := stream.NewEnvironment(
stream.NewEnvironmentOptions())

Go Stream 客户端的入口点是 Environment。它用于配置 RabbitMQ 流发布者、流消费者和流本身。

它抽象了套接字连接,并为我们处理协议版本协商、身份验证等等。

本教程假设流发布者和消费者连接到本地运行的 RabbitMQ 节点,即 localhost。要连接到其他机器上的节点,只需在 EnvironmentOptions 上指定目标主机名或 IP 地址。

接下来,让我们创建一个生产者。

生产者还将声明它将向其发布消息的流,然后发布消息

streamName := "hello-go-stream"
env.DeclareStream(streamName,
&stream.StreamOptions{
MaxLengthBytes: stream.ByteCapacity{}.GB(2),
},
)

producer, err := env.NewProducer(streamName, stream.NewProducerOptions())
if err != nil {
log.Fatalf("Failed to create producer: %v", err)
}

err = producer.Send(amqp.NewMessage([]byte("Hello world")))
if err != nil {
log.Fatalf("Failed to send message: %v", err)
}

流声明操作是幂等的:只有在流不存在时才会创建它。

流是一种追加式日志抽象,允许重复使用消息,直到它们过期。始终定义保留策略是一个好习惯。在上面的示例中,流的大小限制为 5 GiB。

消息内容是字节数组。应用程序可以使用任何合适的格式(例如 JSON、MessagePack 等)对它们需要传输的数据进行编码。

当上面的代码运行完毕时,生产者连接和流系统连接将关闭。这就是我们生产者的全部内容。

每次运行生产者时,它都会向服务器发送一条消息,并且该消息将被追加到流中。

完整的 send.go 文件 可以在 GitHub 上找到。

发送不起作用!

如果这是您第一次使用 RabbitMQ 并且没有看到“已发送”消息,那么您可能会挠头,想知道问题出在哪里。也许代理在启动时没有足够的可用磁盘空间(默认情况下,它至少需要 50 MB 可用空间),因此拒绝接受消息。检查代理 日志文件,查看是否记录了 资源警报,并根据需要降低可用磁盘空间阈值。 配置指南 将向您展示如何设置 disk_free_limit

另一个原因可能是程序在消息到达代理之前就退出了。在某些客户端库中,发送是异步的:函数立即返回,但消息在通过网络传输之前会在 IO 层中排队。发送程序要求用户按一个键来完成该过程:消息有足够的时间到达代理。流协议提供了一个确认机制,以确保代理接收到了出站消息,但为了简单起见,本教程没有使用此机制。

接收

本教程的另一部分(消费者)将连接到 RabbitMQ 节点,并等待将消息推送到它。与本教程中仅发布一条消息并停止的生产者不同,消费者将持续运行,使用 RabbitMQ 推送到它的消息,并将接收到的有效负载打印出来。

send.go 类似,receive.go 需要导入一些包

import (
"bufio"
"fmt"
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp"
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream"
"log"
"os"
)

在初始设置方面,消费者部分与生产者部分非常相似;我们使用默认连接设置,并声明消费者将从中消费的流。

env, err := stream.NewEnvironment(stream.NewEnvironmentOptions())
if err != nil {
log.Fatalf("Failed to create environment: %v", err)
}

streamName := "hello-go-stream"
env.DeclareStream(streamName,
&stream.StreamOptions{
MaxLengthBytes: stream.ByteCapacity{}.GB(2),
},
)

请注意,消费者部分也声明了流。这是为了允许生产者或消费者中的任何一方先启动。

我们使用 Consumer 结构来实例化消费者,并使用 ConsumerOptions 结构来配置它。我们提供一个 messageHandler 回调来处理已传递的消息。

SetOffset 定义了消费者的起始点。在本例中,消费者从流中可用的第一条消息开始。

messagesHandler := func(consumerContext stream.ConsumerContext, message *amqp.Message) {
fmt.Printf("Stream: %s - Received message: %s\n", consumerContext.Consumer.GetStreamName(),
message.Data)}

consumer, err := env.NewConsumer(streamName, messagesHandler,
stream.NewConsumerOptions().SetOffset(stream.OffsetSpecification{}.First()))
if err != nil {
log.Fatalf("Failed to create consumer: %v", err)
}

完整的 receive.go 文件 可以在 GitHub 上找到。

整合

为了运行这两个示例,请打开两个终端(shell)选项卡。

我们需要先拉取依赖项

go get -u

本教程的两个部分都可以以任何顺序运行,因为它们都声明了流。让我们先运行消费者,以便当第一个发布者启动时,消费者会打印它

 go run receive.go

然后运行生产者

go run send.go

消费者将打印它通过 RabbitMQ 从发布者获取的消息。消费者将保持运行,等待新的传递。尝试多次重新运行发布者,以观察这一点。

流与队列的不同之处在于它们是消息的追加式日志,可以重复消费。当多个消费者从一个流中消费时,它们将从第一个可用的消息开始。

© 2024 RabbitMQ. All rights reserved.