跳到主要内容

RabbitMQ 教程 - 主题

主题

(使用 amqp.node 客户端)

信息

前提条件

本教程假设您已安装并运行 RabbitMQ,并且在 标准端口 (5672) 上通过 localhost 访问。如果您使用不同的主机、端口或凭据,则需要调整连接设置。

在哪里获得帮助

如果您在学习本教程时遇到问题,可以通过 GitHub DiscussionsRabbitMQ 社区 Discord 联系我们。

之前的教程中,我们改进了日志系统。我们没有使用只能进行虚拟广播的 fanout 交换机,而是使用了 direct 交换机,并获得了选择性接收日志的可能性。

虽然使用 direct 交换机改进了我们的系统,但它仍然存在局限性 - 它无法基于多个条件进行路由。

在我们的日志系统中,我们可能不仅希望根据严重程度订阅日志,还希望根据发出日志的来源进行订阅。您可能从 syslog unix 工具中了解这个概念,该工具根据严重程度(info/warn/crit...)和设施(auth/cron/kern...)来路由日志。

这将为我们提供很大的灵活性 - 我们可能只想监听来自 'cron' 的关键错误,但也监听来自 'kern' 的所有日志。

为了在我们的日志系统中实现这一点,我们需要学习更复杂的 topic 交换机。

主题交换机

发送到 topic 交换机的消息不能具有任意的 routing_key - 它必须是由点分隔的单词列表。单词可以是任何内容,但通常它们指定与消息相关的某些特征。一些有效的路由键示例:stock.usd.nysenyse.vmwquick.orange.rabbit。路由键中可以包含任意数量的单词,最多 255 字节。

绑定键也必须采用相同的形式。topic 交换机背后的逻辑与 direct 交换机类似 - 使用特定路由键发送的消息将传递到所有绑定了匹配绑定键的队列。但是,绑定键有两个重要的特殊情况

  • *(星号)可以替代一个单词。
  • #(井号)可以替代零个或多个单词。

通过一个例子来解释这一点最容易

在这个例子中,我们将发送描述动物的消息。消息将使用由三个单词(两个点)组成的路由键发送。路由键中的第一个单词将描述速度,第二个单词描述颜色,第三个单词描述物种:<speed>.<colour>.<species>

我们创建了三个绑定:Q1 绑定了绑定键 *.orange.*,Q2 绑定了 *.*.rabbitlazy.#

这些绑定可以总结为

  • Q1 对所有橙色动物感兴趣。
  • Q2 想要听到关于兔子的一切,以及关于懒惰动物的一切。

路由键设置为 quick.orange.rabbit 的消息将传递到两个队列。消息 lazy.orange.elephant 也将发送到这两个队列。另一方面,quick.orange.fox 只会发送到第一个队列,而 lazy.brown.fox 只会发送到第二个队列。lazy.pink.rabbit 将只发送到第二个队列一次,即使它匹配两个绑定。quick.brown.fox 不匹配任何绑定,因此将被丢弃。

如果我们违反约定并发送一个或四个单词的消息,例如 orangequick.orange.new.rabbit 会发生什么?好吧,这些消息将不匹配任何绑定,并且会丢失。

另一方面,lazy.orange.new.rabbit,即使它有四个单词,也会匹配最后一个绑定,并将被传递到第二个队列。

主题交换机

主题交换机功能强大,可以像其他交换机一样工作。

当队列绑定了 #(井号)绑定键时 - 它将接收所有消息,无论路由键是什么 - 就像 fanout 交换机一样。

当绑定中未使用特殊字符 *(星号)和 #(井号)时,主题交换机的行为将与 direct 交换机完全相同。

整合在一起

我们将在日志系统中使用 topic 交换机。我们将从一个工作假设开始,即日志的路由键将有两个单词:<facility>.<severity>

代码与之前的教程中的代码几乎相同。

emit_log_topic.js 的代码

#!/usr/bin/env node

var amqp = require('amqplib/callback_api');

amqp.connect('amqp://localhost', function(error0, connection) {
if (error0) {
throw error0;
}
connection.createChannel(function(error1, channel) {
if (error1) {
throw error1;
}
var exchange = 'topic_logs';
var args = process.argv.slice(2);
var key = (args.length > 0) ? args[0] : 'anonymous.info';
var msg = args.slice(1).join(' ') || 'Hello World!';

channel.assertExchange(exchange, 'topic', {
durable: false
});
channel.publish(exchange, key, Buffer.from(msg));
console.log(" [x] Sent %s:'%s'", key, msg);
});

setTimeout(function() {
connection.close();
process.exit(0)
}, 500);
});

receive_logs_topic.js 的代码

#!/usr/bin/env node

var amqp = require('amqplib/callback_api');

var args = process.argv.slice(2);

if (args.length == 0) {
console.log("Usage: receive_logs_topic.js <facility>.<severity>");
process.exit(1);
}

amqp.connect('amqp://localhost', function(error0, connection) {
if (error0) {
throw error0;
}
connection.createChannel(function(error1, channel) {
if (error1) {
throw error1;
}
var exchange = 'topic_logs';

channel.assertExchange(exchange, 'topic', {
durable: false
});

channel.assertQueue('', {
exclusive: true
}, function(error2, q) {
if (error2) {
throw error2;
}
console.log(' [*] Waiting for logs. To exit press CTRL+C');

args.forEach(function(key) {
channel.bindQueue(q.queue, exchange, key);
});

channel.consume(q.queue, function(msg) {
console.log(" [x] %s:'%s'", msg.fields.routingKey, msg.content.toString());
}, {
noAck: true
});
});
});
});

接收所有日志

./receive_logs_topic.js "#"

接收来自 facility kern 的所有日志

./receive_logs_topic.js "kern.*"

或者,如果您只想听到关于 critical 日志的信息

./receive_logs_topic.js "*.critical"

您可以创建多个绑定

./receive_logs_topic.js "kern.*" "*.critical"

并发出路由键为 kern.critical 类型的日志

./emit_log_topic.js "kern.critical" "A critical kernel error"

尽情玩这些程序吧。请注意,代码没有对路由键或绑定键做任何假设,您可能希望使用两个以上的路由键参数进行实验。

emit_log_topic.jsreceive_logs_topic.js 的完整源代码)

接下来,在教程 6 中了解如何将往返消息作为远程过程调用执行

© . All rights reserved.