RabbitMQ 教程 - Topic
Topic
(使用 BunnySwift)
先决条件
本教程假设 RabbitMQ 已 安装 并在 localhost 上的 标准端口 (5672) 上运行。如果您使用不同的主机、端口或凭据,则需要调整连接设置。
哪里寻求帮助
如果您在学习本教程时遇到困难,可以通过 GitHub Discussions 或 RabbitMQ 社区 Discord 联系我们。
在上一篇教程中,我们改进了日志系统。我们不再使用只能进行简单广播的 fanout 交换机,而是改用了 direct 交换机,从而实现了有选择地接收日志。
虽然使用 direct 交换机改进了我们的系统,但它仍然存在局限性——它无法基于多个标准进行路由。
在我们的日志系统中,我们可能不仅想根据严重性订阅日志,还想根据发出日志的来源订阅日志。您可能从 syslog unix 工具中了解过这个概念,该工具根据严重性(info/warn/crit...)和设施(auth/cron/kern...)来路由日志。
这将为我们提供极大的灵活性——我们可能只想收听来自“cron”的关键错误,但也想收听来自“kern”的所有日志。
为了在我们的日志系统中实现这一点,我们需要了解一个更复杂的 topic 交换机。
Topic 交换机
发送到 topic(主题)交换机的消息不能使用任意的 routingKey——它必须是一个由点号分隔的单词列表。这些单词可以是任何内容,但通常它们会指定与消息相关的一些特征。以下是一些有效的路由键示例:stock.usd.nyse、nyse.vmw、quick.orange.rabbit。路由键中的单词数量没有限制,只要总长度不超过 255 字节即可。
绑定键也必须采用相同的形式。topic 交换机背后的逻辑与 direct 交换机类似——具有特定路由键的消息将发送到所有与匹配绑定键绑定的队列。但是,绑定键有两个重要的特殊情况:
*(星号) 可以替代正好一个单词。#(哈希) 可以替代零个或多个单词。
用一个例子来解释最简单
在这个例子中,我们将发送描述动物的消息。消息将使用一个包含三个单词(两个点)的路由键发送。路由键中的第一个单词将描述速度,第二个单词描述颜色,第三个单词描述物种:<speed>.<colour>.<species>。
我们创建了三个绑定:Q1 绑定了绑定键 *.orange.*,Q2 绑定了 *.*.rabbit 和 lazy.#。
这些绑定可以总结为:
- Q1 对所有橙色的动物感兴趣。
- Q2 想听到关于兔子的一切,以及关于懒惰动物的一切。
路由键设置为 quick.orange.rabbit 的消息将发送到两个队列。消息 lazy.orange.elephant 也将发送到这两个队列。另一方面,quick.orange.fox 只会发送到第一个队列,而 lazy.brown.fox 只会发送到第二个队列。lazy.pink.rabbit 将只发送到第二个队列一次,即使它匹配两个绑定。quick.brown.fox 不匹配任何绑定,因此将被丢弃。
如果我们破坏了约定并发送一个单词或四个单词的消息,例如 orange 或 quick.orange.new.rabbit,会发生什么?好吧,这些消息将不匹配任何绑定,并且将被丢失。
另一方面,lazy.orange.new.rabbit 尽管有四个单词,但将匹配最后一个绑定,并将被发送到第二个队列。
Topic 交换机
Topic 交换机功能强大,并且可以像其他交换机一样工作。
当一个队列用
#(哈希) 绑定键绑定时——它将接收所有消息,而不管路由键是什么——就像fanout交换机一样。当绑定中不使用特殊字符
*(星号) 和#(哈希) 时,topic 交换机将像direct交换机一样工作。
总而言之
我们将在日志系统中使用 topic 交换机。我们最初的假设是日志的路由键将有两个单词:<facility>.<severity>。
代码与上一篇教程中的几乎相同。
一如既往,我们需要先创建一个交换
let exchange = try await channel.topic("topic_logs")
我们已准备好发送消息
try await channel.basicPublish(
body: Data(message.utf8),
exchange: exchange.name,
routingKey: routingKey
)
EmitLogTopic 的代码
import BunnySwift
import Foundation
@main
struct EmitLogTopic {
static func main() async throws {
let connection = try await Connection.open()
let channel = try await connection.openChannel()
let exchange = try await channel.topic("topic_logs")
var args = Array(CommandLine.arguments.dropFirst())
let routingKey = args.isEmpty ? "anonymous.info" : args.removeFirst()
let message = args.isEmpty ? "Hello World!" : args.joined(separator: " ")
try await channel.basicPublish(
body: Data(message.utf8),
exchange: exchange.name,
routingKey: routingKey
)
print(" [x] Sent '\(routingKey):\(message)'")
try await connection.close()
}
}
ReceiveLogsTopic 的代码
import BunnySwift
@main
struct ReceiveLogsTopic {
static func main() async throws {
let bindingKeys = Array(CommandLine.arguments.dropFirst())
guard !bindingKeys.isEmpty else {
print("Usage: ReceiveLogsTopic [binding_key]...")
return
}
let connection = try await Connection.open()
let channel = try await connection.openChannel()
let exchange = try await channel.topic("topic_logs")
let queue = try await channel.queue("", exclusive: true)
for key in bindingKeys {
try await queue.bind(to: exchange, routingKey: key)
}
print(" [*] Waiting for logs. To exit press CTRL+C")
let consumer = try await channel.basicConsume(
queue: queue.name,
acknowledgementMode: .automatic
)
for try await message in consumer {
print(" [x] \(message.deliveryInfo.routingKey):\(message.bodyString ?? "")")
}
}
}
要接收所有日志,请运行
swift run ReceiveLogsTopic "#"
接收来自 kern 设施的所有日志
swift run ReceiveLogsTopic "kern.*"
或者如果你只想听到关于 critical 日志的信息
swift run ReceiveLogsTopic "*.critical"
您可以创建多个绑定
swift run ReceiveLogsTopic "kern.*" "*.critical"
要发出具有路由键 kern.critical 的日志,请键入:
swift run EmitLogTopic "kern.critical" "A critical kernel error"
尽情玩转这些程序吧。请注意,代码不会对路由键或绑定键做任何假设,您可能需要尝试使用两个以上的路由键参数。
继续学习教程 6 以了解 RPC。