跳至主内容

RabbitMQ 教程 - Topic

Topic

(使用 Pika Python 客户端)

信息

先决条件

本教程假设 RabbitMQ 已 安装 并在 localhost 上的 标准端口 (5672) 上运行。如果您使用不同的主机、端口或凭据,则需要调整连接设置。

哪里寻求帮助

如果您在学习本教程时遇到困难,可以通过 GitHub DiscussionsRabbitMQ 社区 Discord 联系我们。

先决条件

与其他的 Python 教程一样,我们将使用 Pika RabbitMQ 客户端 1.0.0 版本

本教程重点介绍

上一个教程中,我们改进了日志系统。我们没有使用只能进行虚拟广播的fanout交换机,而是使用了一个direct交换机,从而获得了选择性接收日志的可能性。

虽然使用 direct 交换机改进了我们的系统,但它仍然存在局限性——它无法基于多个标准进行路由。

在我们的日志系统中,我们可能不仅想根据严重程度订阅日志,还想根据发出日志的来源订阅日志。您可能从syslog Unix工具中了解到这个概念,该工具根据严重程度(info/warn/crit...)和设施(auth/cron/kern...)来路由日志。

这将为我们提供极大的灵活性——我们可能只想收听来自“cron”的关键错误,但也想收听来自“kern”的所有日志。

为了在我们的日志系统中实现这一点,我们需要了解一个更复杂的 topic 交换机。

Topic 交换机

发送到 topic 交换机的消息不能具有任意的 routing_key——它必须是由点分隔的单词列表。单词可以是任何内容,但通常它们指定与消息相关的一些特征。一些有效的路由键示例:stock.usd.nysenyse.vmwquick.orange.rabbit。路由键中的单词数量不限,最多 255 字节。

绑定键也必须采用相同的形式。topic 交换机背后的逻辑与 direct 交换机类似——具有特定路由键的消息将发送到所有与匹配绑定键绑定的队列。但是,绑定键有两个重要的特殊情况:

  • * (星号) 可以替代正好一个单词。
  • # (哈希) 可以替代零个或多个单词。

用一个例子来解释最简单

在这个例子中,我们将发送描述动物的消息。消息将使用包含三个单词(两个点)的路由键发送。路由键中的第一个单词将描述严重程度,第二个单词描述颜色,第三个单词描述物种:<severity>.<color>.<species>

我们创建了三个绑定:Q1 绑定了绑定键 *.orange.*,Q2 绑定了 *.*.rabbitlazy.#

这些绑定可以总结为:

  • Q1 对所有橙色的动物感兴趣。
  • Q2 想听到关于兔子的一切,以及关于懒惰动物的一切。

路由键设置为 quick.orange.rabbit 的消息将发送到两个队列。消息 lazy.orange.elephant 也将发送到这两个队列。另一方面,quick.orange.fox 只会发送到第一个队列,而 lazy.brown.fox 只会发送到第二个队列。lazy.pink.rabbit 将只发送到第二个队列一次,即使它匹配两个绑定。quick.brown.fox 不匹配任何绑定,因此将被丢弃。

如果我们破坏了约定并发送一个单词或四个单词的消息,例如 orangequick.orange.new.rabbit,会发生什么?好吧,这些消息将不匹配任何绑定,并且将被丢失。

另一方面,lazy.orange.new.rabbit 尽管有四个单词,但将匹配最后一个绑定,并将被发送到第二个队列。

Topic 交换机

Topic 交换机功能强大,并且可以像其他交换机一样工作。

当一个队列用 # (哈希) 绑定键绑定时——它将接收所有消息,而不管路由键是什么——就像 fanout 交换机一样。

当绑定中不使用特殊字符 * (星号) 和 # (哈希) 时,topic 交换机将像 direct 交换机一样工作。

总而言之

我们将在日志系统中使用 topic 交换机。我们最初的假设是日志的路由键将有两个单词:<facility>.<severity>

代码与上一个教程中的几乎相同。

emit_log_topic.py (源代码)

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs', exchange_type='topic')

routing_key = sys.argv[1] if len(sys.argv) > 2 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(
exchange='topic_logs', routing_key=routing_key, body=message)
print(f" [x] Sent {routing_key}:{message}")
connection.close()

receive_logs_topic.py (源代码)

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs', exchange_type='topic')

result = channel.queue_declare('', exclusive=True)
queue_name = result.method.queue

binding_keys = sys.argv[1:]
if not binding_keys:
sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
sys.exit(1)

for binding_key in binding_keys:
channel.queue_bind(
exchange='topic_logs', queue=queue_name, routing_key=binding_key)

print(' [*] Waiting for logs. To exit press CTRL+C')


def callback(ch, method, properties, body):
print(f" [x] {method.routing_key}:{body}")


channel.basic_consume(
queue=queue_name, on_message_callback=callback, auto_ack=True)

channel.start_consuming()

要接收所有日志,请运行

python receive_logs_topic.py "#"

接收来自 kern 设施的所有日志

python receive_logs_topic.py "kern.*"

或者如果你只想听到关于 critical 日志的信息

python receive_logs_topic.py "*.critical"

您可以创建多个绑定

python receive_logs_topic.py "kern.*" "*.critical"

要发出具有路由键 kern.critical 的日志,请键入:

python emit_log_topic.py "kern.critical" "A critical kernel error"

尽情玩转这些程序吧。请注意,代码不会对路由键或绑定键做任何假设,您可能需要尝试使用两个以上的路由键参数。

继续学习教程 6,了解RPC

© . This site is unofficial and not affiliated with VMware.