RabbitMQ 教程 - 发布/订阅
发布/订阅
(使用 AMQP 1.0 Go 客户端)
先决条件
本教程假设 RabbitMQ 已 安装 并在 localhost 上的 标准端口 (5672) 上运行。如果您使用不同的主机、端口或凭据,则需要调整连接设置。
哪里寻求帮助
如果您在学习本教程时遇到困难,可以通过 GitHub Discussions 或 RabbitMQ 社区 Discord 联系我们。
在上一篇教程中,我们创建了一个工作队列。我们假设任务是在多个工作者(worker)之间分配的。在本篇教程中,我们将要做一些完全不同的事情——我们将把一条消息发送给多个消费者。这种模式被称为“发布/订阅”。
为了说明这种模式,我们将构建一个简单的日志系统。它将由两个程序组成——第一个程序负责发送日志消息,第二个程序负责接收并打印它们。
在我们的日志系统中,接收者程序的每一个运行副本都会收到消息。这样,我们就可以运行一个接收者将日志写入磁盘,同时在终端运行另一个接收者,并在那里查看打印出来的日志。
本质上,发布的日志消息将被广播给所有接收者。
交换器
在之前的教程中,我们向队列发送消息并从队列接收消息。现在是时候介绍 RabbitMQ 中完整的消息传递模型了。
让我们快速回顾一下我们所学的内容
- 生产者 是发送消息的用户应用程序。
- 队列 是存储消息的缓冲区。
- 消费者 是接收消息的用户应用程序。
RabbitMQ 消息传递模型的核心思想是:生产者永远不会直接向队列发送任何消息。事实上,很多时候生产者甚至不知道消息是否会被发送到某个队列。
相反,生产者只能将消息发送到交换机(exchange)。交换机非常简单。它的一侧从生产者接收消息,另一侧将它们推送到队列中。交换机必须确切知道如何处理它接收到的消息。它是应该追加到特定队列中吗?应该追加到多个队列中吗?还是应该丢弃?这些规则由交换机类型定义。
有几种可用的交换机类型:direct、topic、headers 和 fanout。我们将重点关注最后一种——fanout。让我们创建一个这种类型的交换机
_, err = conn.Management().DeclareExchange(ctx, &rmq.FanOutExchangeSpecification{Name: "logs"})
if err != nil {
log.Panicf("Failed to declare an exchange: %v", err)
}
fanout 交换机会将其接收到的所有消息广播给它知道的所有队列。这正是我们日志系统所需要的。
绑定
我们已经创建了一个 fanout 交换机和一个队列。现在我们需要告诉交换机将消息发送到我们的队列。交换机和队列之间的这种关系称为绑定(binding)。
qInfo, err := conn.Management().DeclareQueue(ctx, &rmq.AutoGeneratedQueueSpecification{
IsExclusive: true,
IsAutoDelete: true,
})
if err != nil {
log.Panicf("Failed to declare a queue: %v", err)
}
_, err = conn.Management().Bind(ctx, &rmq.ExchangeToQueueBindingSpecification{
SourceExchange: "logs",
DestinationQueue: qInfo.Name(),
BindingKey: "",
})
if err != nil {
log.Panicf("Failed to bind a queue: %v", err)
}
从现在开始,logs 交换机将把消息追加到我们的队列中。排他性队列是指只能由声明它的消费者访问,并且在消费者断开连接时会被删除的队列。
让我们运行这段代码。我们将使用 go run 来创建日志消费者
go run receive_logs.go
现在让我们发出一些日志,并将消息发布到 logs 交换机
go run emit_log.go "Here is the first log"
总而言之
生产者 emit_log.go
package main
import (
"context"
"log"
"os"
"strings"
rmq "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmqamqp"
)
const brokerURI = "amqp://guest:guest@localhost:5672/"
func main() {
ctx := context.Background()
env := rmq.NewEnvironment(brokerURI, nil)
conn, err := env.NewConnection(ctx)
if err != nil {
log.Panicf("Failed to connect to RabbitMQ: %v", err)
}
defer func() {
_ = env.CloseConnections(context.Background())
}()
_, err = conn.Management().DeclareExchange(ctx, &rmq.FanOutExchangeSpecification{Name: "logs"})
if err != nil {
log.Panicf("Failed to declare an exchange: %v", err)
}
publisher, err := conn.NewPublisher(ctx, &rmq.ExchangeAddress{Exchange: "logs", Key: ""}, nil)
if err != nil {
log.Panicf("Failed to create publisher: %v", err)
}
defer func() { _ = publisher.Close(context.Background()) }()
body := bodyFrom(os.Args)
res, err := publisher.Publish(ctx, rmq.NewMessage([]byte(body)))
if err != nil {
log.Panicf("Failed to publish a message: %v", err)
}
switch res.Outcome.(type) {
case *rmq.StateAccepted:
default:
log.Panicf("Unexpected publish outcome: %v", res.Outcome)
}
log.Printf(" [x] Sent %s", body)
}
func bodyFrom(args []string) string {
var s string
if (len(args) < 2) || args[1] == "" {
s = "hello"
} else {
s = strings.Join(args[1:], " ")
}
return s
}
以及消费者 receive_logs.go
package main
import (
"context"
"errors"
"log"
rmq "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmqamqp"
)
const brokerURI = "amqp://guest:guest@localhost:5672/"
func main() {
ctx := context.Background()
env := rmq.NewEnvironment(brokerURI, nil)
conn, err := env.NewConnection(ctx)
if err != nil {
log.Panicf("Failed to connect to RabbitMQ: %v", err)
}
defer func() {
_ = env.CloseConnections(context.Background())
}()
_, err = conn.Management().DeclareExchange(ctx, &rmq.FanOutExchangeSpecification{Name: "logs"})
if err != nil {
log.Panicf("Failed to declare an exchange: %v", err)
}
qInfo, err := conn.Management().DeclareQueue(ctx, &rmq.AutoGeneratedQueueSpecification{
IsExclusive: true,
IsAutoDelete: true,
})
if err != nil {
log.Panicf("Failed to declare a queue: %v", err)
}
_, err = conn.Management().Bind(ctx, &rmq.ExchangeToQueueBindingSpecification{
SourceExchange: "logs",
DestinationQueue: qInfo.Name(),
BindingKey: "",
})
if err != nil {
log.Panicf("Failed to bind a queue: %v", err)
}
consumer, err := conn.NewConsumer(ctx, qInfo.Name(), nil)
if err != nil {
log.Panicf("Failed to create consumer: %v", err)
}
defer func() { _ = consumer.Close(context.Background()) }()
log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
for {
delivery, err := consumer.Receive(ctx)
if err != nil {
if errors.Is(err, context.Canceled) {
return
}
log.Panicf("Failed to receive a message: %v", err)
}
msg := delivery.Message()
var body string
if len(msg.Data) > 0 {
body = string(msg.Data[0])
}
log.Printf(" [x] %s", body)
err = delivery.Accept(ctx)
if err != nil {
log.Panicf("Failed to accept message: %v", err)
}
}
}
要进行测试,请打开一个终端并运行接收者
go run receive_logs.go
# => [*] Waiting for logs. To exit press CTRL+C
然后,在其他终端中运行发送者
go run emit_log.go "Here is the first log"
# => [x] Sent Here is the first log
go run emit_log.go "Here is the second log"
# => [x] Sent Here is the second log
消费者会收到发布者发送的每一条消息。你可以根据需要运行任意多个接收者——它们每一个都会获得消息的副本。
接收到的日志将出现在接收终端中,如下所示
go run receive_logs.go
# => [*] Waiting for logs. To exit press CTRL+C
# => [x] Here is the first log
# => [x] Here is the second log
现在我们可以继续学习教程 4,了解如何根据路由键(routing keys)进行路由消息。