跳至主要内容

RabbitMQ 教程 - 主题

主题

(使用 Java 客户端)

信息

先决条件

本教程假设 RabbitMQ 已安装并在 localhost 上的标准端口 (5672) 上运行。如果您使用不同的主机、端口或凭据,则连接设置需要进行调整。

获取帮助

如果您在学习本教程时遇到问题,可以通过GitHub 讨论区RabbitMQ 社区 Discord联系我们。

上一教程中,我们改进了我们的日志系统。我们没有只使用只能进行虚拟广播的 fanout 交换机,而是使用了 direct 交换机,从而获得了选择性接收日志的可能性。

尽管使用 direct 交换机改进了我们的系统,但它仍然存在局限性 - 它无法根据多个条件进行路由。

在我们的日志系统中,我们可能不仅希望订阅基于严重性的日志,还希望订阅基于发出日志的源的日志。您可能从syslog Unix 工具中了解过这个概念,该工具根据严重性(info/warn/crit...)和设备(auth/cron/kern...)对日志进行路由。

这将为我们提供很大的灵活性 - 我们可能希望监听来自 'cron' 的所有严重错误,以及来自 'kern' 的所有日志。

为了在我们的日志系统中实现这一点,我们需要了解更复杂的 topic 交换机。

主题交换机

发送到 topic 交换机的消息不能具有任意的 routing_key - 它必须是一个由点分隔的单词列表。这些单词可以是任何内容,但通常它们指定与消息相关的某些特征。一些有效的路由键示例:stock.usd.nysenyse.vmwquick.orange.rabbit。路由键中的单词数量可以任意多,最多可达 255 个字节。

绑定键也必须采用相同的格式。topic 交换机背后的逻辑类似于 direct 交换机 - 使用特定路由键发送的消息将传递到所有使用匹配绑定键绑定的队列。但是,绑定键有两个重要的特殊情况

  • *(星号)可以替换恰好一个单词。
  • #(井号)可以替换零个或多个单词。

最简单的解释方法是举个例子

在这个例子中,我们将发送描述动物的所有消息。消息将使用由三个单词(两个点)组成的路由键发送。路由键中的第一个单词将描述速度,第二个单词描述颜色,第三个单词描述物种:<speed>.<colour>.<species>

我们创建了三个绑定:Q1 使用绑定键 *.orange.* 绑定,Q2 使用 *.*.rabbitlazy.# 绑定。

这些绑定可以概括为

  • Q1 对所有橙色动物感兴趣。
  • Q2 想要了解关于兔子的所有信息,以及关于懒惰动物的所有信息。

路由键设置为 quick.orange.rabbit 的消息将传递到这两个队列。消息 lazy.orange.elephant 也将传递到这两个队列。另一方面,quick.orange.fox 仅传递到第一个队列,而 lazy.brown.fox 仅传递到第二个队列。lazy.pink.rabbit 将仅传递到第二个队列一次,即使它匹配两个绑定。quick.brown.fox 与任何绑定都不匹配,因此将被丢弃。

如果我们违反约定并发送一个或四个单词的消息,例如 orangequick.orange.new.rabbit 会发生什么?嗯,这些消息将与任何绑定都不匹配,并将丢失。

另一方面,lazy.orange.new.rabbit 即使有四个单词,也会匹配最后一个绑定,并将传递到第二个队列。

主题交换机

主题交换机功能强大,可以像其他交换机一样工作。

当队列使用 #(井号)绑定键绑定时 - 它将接收所有消息,而不管路由键是什么 - 就像 fanout 交换机一样。

当在绑定中未使用特殊字符 *(星号)和 #(井号)时,主题交换机将像 direct 交换机一样工作。

综合示例

我们将在日志系统中使用 topic 交换机。我们将从一个工作假设开始,即日志的路由键将有两个单词:<facility>.<severity>

代码与上一教程中的代码几乎相同。

EmitLogTopic.java 的代码

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class EmitLogTopic {

private static final String EXCHANGE_NAME = "topic_logs";

public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {

channel.exchangeDeclare(EXCHANGE_NAME, "topic");

String routingKey = getRouting(argv);
String message = getMessage(argv);

channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
}
}
//..
}

ReceiveLogsTopic.java 的代码

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

public class ReceiveLogsTopic {

private static final String EXCHANGE_NAME = "topic_logs";

public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String queueName = channel.queueDeclare().getQueue();

if (argv.length < 1) {
System.err.println("Usage: ReceiveLogsTopic [binding_key]...");
System.exit(1);
}

for (String bindingKey : argv) {
channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
}

System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" +
delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
}

编译并运行这些示例,包括类路径,如教程 1中所示 - 在 Windows 上,使用 %CP%。

要编译

javac -cp $CP ReceiveLogsTopic.java EmitLogTopic.java

要接收所有日志

java -cp $CP ReceiveLogsTopic "#"

要接收来自设备 kern 的所有日志

java -cp $CP ReceiveLogsTopic "kern.*"

或者,如果您只想了解 critical 日志

java -cp $CP ReceiveLogsTopic "*.critical"

您可以创建多个绑定

java -cp $CP ReceiveLogsTopic "kern.*" "*.critical"

并使用路由键 kern.critical 发出日志

java -cp $CP EmitLogTopic "kern.critical" "A critical kernel error"

玩这些程序玩得开心。请注意,代码没有对路由或绑定键做出任何假设,您可能希望使用两个以上的路由键参数。

EmitLogTopic.javaReceiveLogsTopic.java 的完整源代码)

接下来,了解如何在教程 6中将往返消息作为远程过程调用。

© 2024 RabbitMQ. All rights reserved.