RabbitMQ 教程 - 主题
主题
(使用 Objective-C 客户端)
在 上一篇教程 中,我们改进了日志系统。我们没有只使用只能进行虚拟广播的 fanout
交换机,而是使用了 direct
交换机,从而获得了选择性接收日志的可能性。
尽管使用 direct
交换机改进了我们的系统,但它仍然存在局限性——它无法基于多个条件进行路由。
在我们的日志系统中,我们可能不仅希望订阅基于严重性的日志,还希望订阅基于发出日志的源的日志。您可能从 syslog
unix 工具中了解过这个概念,该工具根据严重性(信息/警告/严重...)和设备(身份验证/cron/内核...)来路由日志。
这将为我们提供很大的灵活性——我们可能希望监听来自 'cron' 的所有严重错误,以及来自 'kern' 的所有日志。
为了在我们的日志系统中实现这一点,我们需要了解更复杂的 topic
交换机。
主题交换机
发送到 topic
交换机的消息不能具有任意的 routingKey
——它必须是一个用点分隔的单词列表。这些单词可以是任何内容,但通常它们指定与消息相关的一些特征。一些有效的路由键示例:stock.usd.nyse
、nyse.vmw
、quick.orange.rabbit
。路由键中可以包含任意数量的单词,最多 255 字节。
绑定键也必须采用相同的形式。topic
交换机背后的逻辑类似于 direct
交换机——使用特定路由键发送的消息将传递到所有绑定具有匹配绑定键的队列。但是,绑定键有两个重要的特殊情况
*
(星号)可以替换恰好一个单词。#
(井号)可以替换零个或多个单词。
通过示例来解释这一点最简单
在这个示例中,我们将发送描述动物的所有消息。消息将使用由三个单词(两个点)组成的路由键发送。路由键中的第一个单词将描述速度,第二个单词描述颜色,第三个单词描述物种:<speed>.<colour>.<species>
。
我们创建了三个绑定:Q1 使用绑定键 *.orange.*
绑定,Q2 使用 *.*.rabbit
和 lazy.#
绑定。
这些绑定可以总结为
- Q1 对所有橙色动物感兴趣。
- Q2 想听到关于兔子的一切,以及关于懒惰动物的一切。
路由键设置为 quick.orange.rabbit
的消息将传递到这两个队列。消息 lazy.orange.elephant
也将传递到这两个队列。另一方面,quick.orange.fox
仅传递到第一个队列,lazy.brown.fox
仅传递到第二个队列。lazy.pink.rabbit
即使匹配两个绑定,也将只传递到第二个队列一次。quick.brown.fox
不匹配任何绑定,因此将被丢弃。
如果我们违反约定并发送一个或四个单词的消息,例如 orange
或 quick.orange.new.rabbit
会发生什么?好吧,这些消息将不匹配任何绑定,并将丢失。
另一方面,lazy.orange.new.rabbit
即使有四个单词,也将匹配最后一个绑定,并将传递到第二个队列。
主题交换机
主题交换机功能强大,可以像其他交换机一样工作。
当队列使用
#
(井号)绑定键绑定时,它将接收所有消息,而不管路由键是什么——就像fanout
交换机一样。当绑定中未使用特殊字符
*
(星号)和#
(井号)时,主题交换机将像direct
交换机一样工作。
综合示例
我们将在日志系统中使用 topic
交换机。我们将从一个工作假设开始,即日志的路由键将有两个单词:<facility>.<severity>
。
代码与 上一篇教程 中的代码几乎相同。
emitLogTopic
的代码
- (void)emitLogTopic:(NSString *)msg routingKey:(NSString *)routingKey {
RMQConnection *conn = [[RMQConnection alloc] initWithDelegate:[RMQConnectionDelegateLogger new]];
[conn start];
id<RMQChannel> ch = [conn createChannel];
RMQExchange *x = [ch topic:@"topic_logs"];
[x publish:[msg dataUsingEncoding:NSUTF8StringEncoding] routingKey:routingKey];
NSLog(@"Sent '%@'", msg);
[conn close];
}
receiveLogsTopic
的代码
- (void)receiveLogsTopic:(NSArray *)routingKeys {
RMQConnection *conn = [[RMQConnection alloc] initWithDelegate:[RMQConnectionDelegateLogger new]];
[conn start];
id<RMQChannel> ch = [conn createChannel];
RMQExchange *x = [ch topic:@"topic_logs"];
RMQQueue *q = [ch queue:@"" options:RMQQueueDeclareExclusive];
for (NSString *routingKey in routingKeys) {
[q bind:x routingKey:routingKey];
}
NSLog(@"Waiting for logs.");
[q subscribe:^(RMQMessage * _Nonnull message) {
NSLog(@"%@:%@", message.routingKey, [[NSString alloc] initWithData:message.body encoding:NSUTF8StringEncoding]);
}];
}
接收所有日志
[self receiveLogsTopic:@[@"#"]];
接收来自 kern
设备的所有日志
[self receiveLogsTopic:@[@"kern.*"]];
或者如果您只想听到 critical
日志
[self receiveLogsTopic:@[@"*.critical"]];
您可以创建多个绑定
[self receiveLogsTopic:@[@"kern.*", @"*.critical"]];
并使用路由键 kern.critical
类型发出日志
[self emitLogTopic:@"A critical kernel error" routingKey:@"kern.critical"];
尽情使用这些方法。请注意,代码对路由或绑定键没有任何假设,您可能希望使用两个以上的路由键参数。
(源代码)