RabbitMQ 教程 - 主题
主题
(使用 php-amqplib)
先决条件
本教程假设 RabbitMQ 已安装并在 localhost
上标准端口 (5672) 上运行。如果您使用不同的主机、端口或凭据,则需要调整连接设置。
在哪里获得帮助
如果您在学习本教程时遇到问题,可以通过 GitHub Discussions 或 RabbitMQ 社区 Discord 联系我们。
在之前的教程中,我们改进了日志系统。我们没有使用只能进行虚拟广播的 fanout
交换器,而是使用了 direct
交换器,并获得了选择性接收日志的可能性。
虽然使用 direct
交换器改进了我们的系统,但它仍然存在局限性 - 它无法基于多个标准进行路由。
在我们的日志系统中,我们可能不仅想订阅基于严重性的日志,还想订阅基于发出日志的来源的日志。您可能从 syslog
unix 工具中了解这个概念,该工具根据严重性(info/warn/crit...)和工具(auth/cron/kern...)路由日志。
这将为我们提供很大的灵活性 - 我们可能只想监听来自“cron”的严重错误,但也监听来自“kern”的所有日志。
为了在我们的日志系统中实现这一点,我们需要了解更复杂的 topic
交换器。
主题交换器
发送到 topic
交换器的消息不能具有任意的 routing_key
- 它必须是由点分隔的单词列表。单词可以是任何内容,但通常它们指定与消息相关的某些特征。一些有效的路由键示例:stock.usd.nyse
、nyse.vmw
、quick.orange.rabbit
。路由键中可以包含任意数量的单词,最多 255 个字节的限制。
绑定键也必须采用相同的形式。topic
交换器背后的逻辑与 direct
交换器类似 - 使用特定路由键发送的消息将传递到所有使用匹配绑定键绑定的队列。但是,绑定键有两个重要的特殊情况
*
(星号)可以替代一个单词。#
(井号)可以替代零个或多个单词。
用一个例子来解释最容易
在本例中,我们将发送所有描述动物的消息。消息将使用由三个单词(两个点)组成的路由键发送。路由键中的第一个单词将描述速度,第二个单词描述颜色,第三个单词描述物种:<speed>.<colour>.<species>
。
我们创建了三个绑定:Q1 绑定了绑定键 *.orange.*
,Q2 绑定了 *.*.rabbit
和 lazy.#
。
这些绑定可以概括为
- Q1 对所有橙色动物感兴趣。
- Q2 想要听到关于兔子的一切,以及关于懒惰动物的一切。
路由键设置为 quick.orange.rabbit
的消息将传递到两个队列。消息 lazy.orange.elephant
也将发送到它们两个。另一方面,quick.orange.fox
将仅发送到第一个队列,而 lazy.brown.fox
仅发送到第二个队列。lazy.pink.rabbit
将仅传递到第二个队列一次,即使它匹配两个绑定。quick.brown.fox
不匹配任何绑定,因此将被丢弃。
如果我们违反约定并发送包含一个或四个单词的消息,例如 orange
或 quick.orange.new.rabbit
会发生什么?好吧,这些消息将不匹配任何绑定,并且会丢失。
另一方面,lazy.orange.new.rabbit
即使有四个单词,也将匹配最后一个绑定,并将传递到第二个队列。
主题交换器
主题交换器功能强大,可以像其他交换器一样工作。
当队列与
#
(井号)绑定键绑定时 - 它将接收所有消息,而与路由键无关 - 就像在fanout
交换器中一样。当绑定中未使用特殊字符
*
(星号)和#
(井号)时,主题交换器的行为将就像direct
交换器一样。
将它们放在一起
我们将在我们的日志系统中使用 topic
交换器。我们将从一个工作假设开始,即日志的路由键将有两个单词:<facility>.<severity>
。
代码几乎与之前的教程中的代码相同。
emit_log_topic.php
的代码
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->exchange_declare('topic_logs', 'topic', false, false, false);
$routing_key = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'anonymous.info';
$data = implode(' ', array_slice($argv, 2));
if (empty($data)) {
$data = "Hello World!";
}
$msg = new AMQPMessage($data);
$channel->basic_publish($msg, 'topic_logs', $routing_key);
echo ' [x] Sent ', $routing_key, ':', $data, "\n";
$channel->close();
$connection->close();
receive_logs_topic.php
的代码
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->exchange_declare('topic_logs', 'topic', false, false, false);
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
$binding_keys = array_slice($argv, 1);
if (empty($binding_keys)) {
file_put_contents('php://stderr', "Usage: $argv[0] [binding_key]\n");
exit(1);
}
foreach ($binding_keys as $binding_key) {
$channel->queue_bind($queue_name, 'topic_logs', $binding_key);
}
echo " [*] Waiting for logs. To exit press CTRL+C\n";
$callback = function ($msg) {
echo ' [x] ', $msg->getRoutingKey(), ':', $msg->getBody(), "\n";
};
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);
try {
$channel->consume();
} catch (\Throwable $exception) {
echo $exception->getMessage();
}
$channel->close();
$connection->close();
接收所有日志
php receive_logs_topic.php "#"
接收来自工具 kern
的所有日志
php receive_logs_topic.php "kern.*"
或者,如果您只想听到关于 critical
日志的信息
php receive_logs_topic.php "*.critical"
您可以创建多个绑定
php receive_logs_topic.php "kern.*" "*.critical"
并发出具有路由键 kern.critical
类型的日志
php emit_log_topic.php "kern.critical" "A critical kernel error"
尽情使用这些程序。请注意,代码不对路由键或绑定键做任何假设,您可能想使用两个以上的路由键参数。
(emit_log_topic.php 和 receive_logs_topic.php 的完整源代码)
接下来,了解如何在教程 6 中将往返消息作为远程过程调用执行