RabbitMQ 教程 - 主题
主题
(使用 amqp.node 客户端)
在上一教程中,我们改进了日志系统。我们不再使用只能进行虚拟广播的 fanout
交换机,而是使用 direct
交换机,并获得了选择性接收日志的可能性。
虽然使用 direct
交换机改进了我们的系统,但它仍然存在局限性 - 它无法根据多个条件进行路由。
在我们的日志系统中,我们可能不仅想要订阅基于严重性的日志,还要订阅基于发出日志的来源的日志。您可能在 syslog
unix 工具中了解过这个概念,它根据严重性(info/warn/crit…)和设施(auth/cron/kern…)对日志进行路由。
这将给我们带来很大的灵活性 - 我们可能想要监听来自 'cron' 的所有严重错误,以及来自 'kern' 的所有日志。
为了在我们的日志系统中实现这一点,我们需要了解更复杂的 topic
交换机。
主题交换机
发送到 topic
交换机的消息不能有任意 routing_key
- 它必须是点分隔的单词列表。这些词可以是任何东西,但通常它们指定与消息相关的某些特征。一些有效的路由键示例:stock.usd.nyse
、nyse.vmw
、quick.orange.rabbit
。路由键中的单词数量可以是任意多个,最多 255 个字节。
绑定键也必须采用相同的形式。topic
交换机背后的逻辑类似于 direct
交换机 - 使用特定路由键发送的消息将被传递到所有使用匹配绑定键绑定的队列。但是,绑定键有两个重要的特殊情况
*
(星号) 可以替换一个单词。#
(井号) 可以替换零个或多个单词。
用一个例子来解释是最容易的
在这个例子中,我们将发送关于动物的消息。发送的消息将使用由三个单词(两个点)组成的路由键。路由键中的第一个单词将描述速度,第二个单词描述颜色,第三个单词描述物种:<speed>.<colour>.<species>
。
我们创建了三个绑定:Q1 使用绑定键 *.orange.*
绑定,Q2 使用绑定键 *.*.rabbit
和 lazy.#
绑定。
这些绑定可以总结为
- Q1 对所有橙色的动物感兴趣。
- Q2 想要听到关于兔子的一切,以及关于所有懒惰动物的一切。
路由键设置为 quick.orange.rabbit
的消息将被传递到两个队列。消息 lazy.orange.elephant
也将被传递到这两个队列。另一方面,quick.orange.fox
将只被传递到第一个队列,lazy.brown.fox
将只被传递到第二个队列。lazy.pink.rabbit
将只被传递到第二个队列一次,即使它匹配两个绑定。quick.brown.fox
不匹配任何绑定,因此将被丢弃。
如果我们违反了我们的约定,发送了一个只有一个或四个单词的消息,例如 orange
或 quick.orange.new.rabbit
会怎么样?好吧,这些消息将不会匹配任何绑定,并将被丢失。
另一方面,lazy.orange.new.rabbit
即使有四个单词,也会匹配最后一个绑定,并将被传递到第二个队列。
主题交换机
主题交换机功能强大,可以像其他交换机一样工作。
当一个队列使用
#
(井号) 绑定键绑定时 - 它将接收所有消息,无论路由键是什么 - 就像fanout
交换机一样。当在绑定中未使用特殊字符
*
(星号) 和#
(井号) 时,主题交换机将像direct
交换机一样工作。
综合起来
我们将在日志系统中使用 topic
交换机。我们将从一个工作假设开始,即日志的路由键将有两个单词:<facility>.<severity>
。
代码几乎与上一教程中相同。
emit_log_topic.js
的代码
#!/usr/bin/env node
var amqp = require('amqplib/callback_api');
amqp.connect('amqp://127.0.0.1', function(error0, connection) {
if (error0) {
throw error0;
}
connection.createChannel(function(error1, channel) {
if (error1) {
throw error1;
}
var exchange = 'topic_logs';
var args = process.argv.slice(2);
var key = (args.length > 0) ? args[0] : 'anonymous.info';
var msg = args.slice(1).join(' ') || 'Hello World!';
channel.assertExchange(exchange, 'topic', {
durable: false
});
channel.publish(exchange, key, Buffer.from(msg));
console.log(" [x] Sent %s:'%s'", key, msg);
});
setTimeout(function() {
connection.close();
process.exit(0)
}, 500);
});
receive_logs_topic.js
的代码
#!/usr/bin/env node
var amqp = require('amqplib/callback_api');
var args = process.argv.slice(2);
if (args.length == 0) {
console.log("Usage: receive_logs_topic.js <facility>.<severity>");
process.exit(1);
}
amqp.connect('amqp://127.0.0.1', function(error0, connection) {
if (error0) {
throw error0;
}
connection.createChannel(function(error1, channel) {
if (error1) {
throw error1;
}
var exchange = 'topic_logs';
channel.assertExchange(exchange, 'topic', {
durable: false
});
channel.assertQueue('', {
exclusive: true
}, function(error2, q) {
if (error2) {
throw error2;
}
console.log(' [*] Waiting for logs. To exit press CTRL+C');
args.forEach(function(key) {
channel.bindQueue(q.queue, exchange, key);
});
channel.consume(q.queue, function(msg) {
console.log(" [x] %s:'%s'", msg.fields.routingKey, msg.content.toString());
}, {
noAck: true
});
});
});
});
接收所有日志
./receive_logs_topic.js "#"
接收来自设施 kern
的所有日志
./receive_logs_topic.js "kern.*"
或者,如果您只想听到 critical
日志
./receive_logs_topic.js "*.critical"
您可以创建多个绑定
./receive_logs_topic.js "kern.*" "*.critical"
要使用路由键 kern.critical
类型的日志,请输入
./emit_log_topic.js "kern.critical" "A critical kernel error"
尽情玩这些程序。请注意,代码没有对路由键或绑定键进行任何假设,您可能想要使用两个以上的路由键参数来玩。
(emit_log_topic.js 和 receive_logs_topic.js 的完整源代码)
接下来,在教程 6中了解如何将往返消息作为远程过程调用。