RabbitMQ 教程 - Topic
Topic
(使用 Kotlin 客户端)
先决条件
本教程假设 RabbitMQ 已 安装 并在 localhost 上的 标准端口 (5672) 上运行。如果您使用不同的主机、端口或凭据,则需要调整连接设置。
哪里寻求帮助
如果您在学习本教程时遇到困难,可以通过 GitHub Discussions 或 RabbitMQ 社区 Discord 联系我们。
在上一篇教程中,我们改进了日志系统。我们不再使用只能进行简单广播的 fanout 交换机,而是改用了 direct 交换机,从而实现了选择性地接收日志。
虽然使用 direct 交换机改进了我们的系统,但它仍然存在局限性——它无法基于多个标准进行路由。
在我们的日志系统中,我们可能不仅想根据严重性订阅日志,还想根据发出日志的来源订阅日志。您可能从 syslog unix 工具中了解过这个概念,该工具根据严重性(info/warn/crit...)和设施(auth/cron/kern...)来路由日志。
这将为我们提供极大的灵活性——我们可能只想收听来自“cron”的关键错误,但也想收听来自“kern”的所有日志。
为了在我们的日志系统中实现这一点,我们需要了解一个更复杂的 topic 交换机。
Topic 交换机
发送到 topic 交换机的消息不能使用任意的 routing_key,它必须是由点号分隔的单词列表。这些单词可以是任何内容,但通常它们指定了与消息相关的一些特征。一些有效的路由键示例:“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”。路由键中的单词数量可以任意多,上限为 255 字节。
绑定键也必须采用相同的形式。topic 交换机背后的逻辑与 direct 交换机类似——具有特定路由键的消息将发送到所有与匹配绑定键绑定的队列。但是,绑定键有两个重要的特殊情况:
*(星号) 可以替代正好一个单词。#(哈希) 可以替代零个或多个单词。
用一个例子来解释最简单
在这个例子中,我们将发送描述动物的消息。发送的消息将带有由三个单词(两个点号)组成的路由键。路由键中的第一个单词描述速度,第二个描述颜色,第三个描述物种:“<speed>.<colour>.<species>”。
我们创建了三个绑定:Q1 绑定了绑定键 “*.orange.*”,Q2 绑定了 “*.*.rabbit” 和 “lazy.#”。
这些绑定可以总结为:
- Q1 对所有橙色的动物感兴趣。
- Q2 想听到关于兔子的一切,以及关于懒惰动物的一切。
路由键设置为 “quick.orange.rabbit” 的消息将被传递到两个队列。消息 “lazy.orange.elephant” 也将进入这两个队列。另一方面,“quick.orange.fox” 只会进入第一个队列,“lazy.brown.fox” 只会进入第二个队列。“lazy.pink.rabbit” 虽然匹配两个绑定,但只会被传递到第二个队列一次。“quick.brown.fox” 不匹配任何绑定,因此将被丢弃。
如果我们违反约定,发送了一个包含一个或四个单词的消息(如 “orange” 或 “quick.orange.male.rabbit”)会发生什么?好吧,这些消息将无法匹配任何绑定,并会丢失。
另一方面,“lazy.orange.male.rabbit” 尽管有四个单词,但它将匹配最后一个绑定并被传递到第二个队列。
Topic 交换机
Topic 交换机功能强大,并且可以像其他交换机一样工作。
当队列使用 “
#” (井号) 绑定键时,无论路由键是什么,它都会接收所有消息——就像在fanout交换机中一样。当绑定中不使用特殊字符 “
*” (星号) 和 “#” (井号) 时,topic 交换机的行为就像direct交换机一样。
总而言之
我们将在日志系统中使用 topic 交换机。我们将从一个工作假设开始:日志的路由键将包含两个单词:“<facility>.<severity>”。
代码与上一篇教程几乎相同。
emitLogTopic 的代码
import dev.kourier.amqp.BuiltinExchangeType
import dev.kourier.amqp.Properties
import dev.kourier.amqp.connection.amqpConfig
import dev.kourier.amqp.connection.createAMQPConnection
import kotlinx.coroutines.CoroutineScope
suspend fun emitLogTopic(coroutineScope: CoroutineScope, routingKey: String, message: String) {
val config = amqpConfig {
server {
host = "localhost"
}
}
val connection = createAMQPConnection(coroutineScope, config)
val channel = connection.openChannel()
channel.exchangeDeclare(
"topic_logs",
BuiltinExchangeType.TOPIC,
durable = false,
autoDelete = false,
internal = false,
arguments = emptyMap()
)
channel.basicPublish(
message.toByteArray(),
exchange = "topic_logs",
routingKey = routingKey,
properties = Properties()
)
println(" [x] Sent '$routingKey':'$message'")
channel.close()
connection.close()
}
receiveLogsTopic 的代码
import dev.kourier.amqp.BuiltinExchangeType
import dev.kourier.amqp.connection.amqpConfig
import dev.kourier.amqp.connection.createAMQPConnection
import kotlinx.coroutines.CoroutineScope
suspend fun receiveLogsTopic(
coroutineScope: CoroutineScope,
bindingKeys: List<String>
) {
val config = amqpConfig {
server {
host = "localhost"
}
}
val connection = createAMQPConnection(coroutineScope, config)
val channel = connection.openChannel()
channel.exchangeDeclare(
"topic_logs",
BuiltinExchangeType.TOPIC,
durable = false,
autoDelete = false,
internal = false,
arguments = emptyMap()
)
val queueDeclared = channel.queueDeclare(
name = "",
durable = false,
exclusive = true,
autoDelete = true,
arguments = emptyMap()
)
val queueName = queueDeclared.queueName
if (bindingKeys.isEmpty()) {
System.err.println("Usage: receiveLogsTopic [binding_key]...")
return
}
for (bindingKey in bindingKeys) {
channel.queueBind(
queue = queueName,
exchange = "topic_logs",
routingKey = bindingKey
)
}
println(" [*] Waiting for logs. To exit press CTRL+C")
val consumer = channel.basicConsume(queueName, noAck = true)
for (delivery in consumer) {
val message = delivery.message.body.decodeToString()
val routingKey = delivery.message.routingKey
println(" [x] Received '$routingKey':'$message'")
}
channel.close()
connection.close()
}
接收所有日志
import kotlinx.coroutines.runBlocking
fun main() = runBlocking {
receiveLogsTopic(this, listOf("#"))
}
接收来自 “kern” 设施的所有日志
import kotlinx.coroutines.runBlocking
fun main() = runBlocking {
receiveLogsTopic(this, listOf("kern.*"))
}
或者如果你只想了解 “critical” 级别的日志
import kotlinx.coroutines.runBlocking
fun main() = runBlocking {
receiveLogsTopic(this, listOf("*.critical"))
}
您可以创建多个绑定
import kotlinx.coroutines.runBlocking
fun main() = runBlocking {
receiveLogsTopic(this, listOf("kern.*", "*.critical"))
}
发送一个路由键为 “kern.critical” 的日志
import kotlinx.coroutines.runBlocking
fun main() = runBlocking {
emitLogTopic(this, "kern.critical", "A critical kernel error")
}
尽情玩转这些程序吧。请注意,代码不会对路由键或绑定键做任何假设,您可能需要尝试使用两个以上的路由键参数。
继续阅读教程 6,了解如何执行作为远程过程调用的往返消息。