RabbitMQ 教程 - 主题
主题
(使用 Go RabbitMQ 客户端)
在 上一教程 中,我们改进了日志记录系统。我们没有使用只能进行虚拟广播的 fanout
交换机,而是使用 direct
交换机,获得了选择性接收日志的可能性。
虽然使用 direct
交换机改善了我们的系统,但它仍然存在局限性 - 它无法根据多个条件进行路由。
在我们的日志记录系统中,我们可能不仅想订阅基于严重性的日志,还想订阅基于发出日志的源的日志。您可能在 syslog
unix 工具中了解过这个概念,它根据严重性 (info/warn/crit...) 和设施 (auth/cron/kern...) 路由日志。
这将为我们提供很大的灵活性 - 我们可能想监听来自 'cron' 的所有严重错误,以及来自 'kern' 的所有日志。
为了在我们的日志记录系统中实现这一点,我们需要了解更复杂的 topic
交换机。
主题交换机
发送到 topic
交换机的消息不能具有任意 routing_key
- 它必须是点分隔的单词列表。这些单词可以是任何东西,但通常它们指定与消息相关的某些特征。一些有效的路由键示例:stock.usd.nyse
、nyse.vmw
、quick.orange.rabbit
。路由键中的单词可以是任意多个,最多 255 字节。
绑定键也必须采用相同的形式。topic
交换机背后的逻辑类似于 direct
交换机 - 使用特定路由键发送的消息将传递到所有使用匹配绑定键绑定的队列。但是,绑定键有两个重要的特殊情况
*
(星号) 可以替换正好一个单词。#
(井号) 可以替换零个或多个单词。
通过示例解释这一点最容易
在这个示例中,我们将发送描述动物的所有消息。这些消息将使用包含三个单词 (两个点) 的路由键发送。路由键中的第一个单词将描述速度,第二个描述颜色,第三个描述物种:<speed>.<colour>.<species>
。
我们创建了三个绑定:Q1 使用绑定键 *.orange.*
绑定,Q2 使用 *.*.rabbit
和 lazy.#
绑定。
这些绑定可以总结为
- Q1 对所有橙色的动物感兴趣。
- Q2 想听所有关于兔子的消息,以及所有关于懒惰动物的消息。
具有设置为 quick.orange.rabbit
的路由键的消息将被传递到两个队列。消息 lazy.orange.elephant
也将被传递到这两个队列。另一方面,quick.orange.fox
仅传递到第一个队列,而 lazy.brown.fox
仅传递到第二个队列。lazy.pink.rabbit
仅传递到第二个队列一次,即使它匹配两个绑定。quick.brown.fox
不匹配任何绑定,因此将被丢弃。
如果我们违反协议并发送只有一个或四个单词的消息,例如 orange
或 quick.orange.new.rabbit
会发生什么?好吧,这些消息将不匹配任何绑定,并将丢失。
另一方面,lazy.orange.new.rabbit
尽管有四个单词,但将匹配最后一个绑定,并将传递到第二个队列。
主题交换机
主题交换机功能强大,可以像其他交换机一样工作。
当队列使用
#
(井号) 绑定键绑定时 - 它将接收所有消息,无论路由键如何 - 就像在fanout
交换机中一样。当在绑定中不使用特殊字符
*
(星号) 和#
(井号) 时,主题交换机将像direct
交换机一样工作。
将所有内容整合在一起
我们将在我们的日志记录系统中使用 topic
交换机。我们将从一个工作假设开始,即日志的路由键将有两个单词:<facility>.<severity>
。
代码几乎与 上一教程 中的代码相同。
emit_log_topic.go
的代码
package main
import (
"context"
"log"
"os"
"strings"
"time"
amqp "github.com/rabbitmq/amqp091-go"
)
func failOnError(err error, msg string) {
if err != nil {
log.Panicf("%s: %s", msg, err)
}
}
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
err = ch.ExchangeDeclare(
"logs_topic", // name
"topic", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare an exchange")
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
body := bodyFrom(os.Args)
err = ch.PublishWithContext(ctx,
"logs_topic", // exchange
severityFrom(os.Args), // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s", body)
}
func bodyFrom(args []string) string {
var s string
if (len(args) < 3) || os.Args[2] == "" {
s = "hello"
} else {
s = strings.Join(args[2:], " ")
}
return s
}
func severityFrom(args []string) string {
var s string
if (len(args) < 2) || os.Args[1] == "" {
s = "anonymous.info"
} else {
s = os.Args[1]
}
return s
}
receive_logs_topic.go
的代码
package main
import (
"log"
"os"
amqp "github.com/rabbitmq/amqp091-go"
)
func failOnError(err error, msg string) {
if err != nil {
log.Panicf("%s: %s", msg, err)
}
}
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
err = ch.ExchangeDeclare(
"logs_topic", // name
"topic", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare an exchange")
q, err := ch.QueueDeclare(
"", // name
false, // durable
false, // delete when unused
true, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
if len(os.Args) < 2 {
log.Printf("Usage: %s [binding_key]...", os.Args[0])
os.Exit(0)
}
for _, s := range os.Args[1:] {
log.Printf("Binding queue %s to exchange %s with routing key %s",
q.Name, "logs_topic", s)
err = ch.QueueBind(
q.Name, // queue name
s, // routing key
"logs_topic", // exchange
false,
nil)
failOnError(err, "Failed to bind a queue")
}
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto ack
false, // exclusive
false, // no local
false, // no wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
var forever chan struct{}
go func() {
for d := range msgs {
log.Printf(" [x] %s", d.Body)
}
}()
log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
<-forever
}
接收所有日志
go run receive_logs_topic.go "#"
接收来自设施 kern
的所有日志
go run receive_logs_topic.go "kern.*"
或者,如果您只想听取有关 critical
日志的消息
go run receive_logs_topic.go "*.critical"
您可以创建多个绑定
go run receive_logs_topic.go "kern.*" "*.critical"
以及使用路由键 kern.critical
类型发出日志
go run emit_log_topic.go "kern.critical" "A critical kernel error"
玩这些程序玩得开心。请注意,代码没有对路由或绑定键进行任何假设,您可能想玩更多两个路由键参数。
(emit_log_topic.go 和 receive_logs_topic.go 的完整源代码)
接下来,了解如何在 教程 6 中将往返消息作为远程过程调用进行。