RabbitMQ 教程 - 主题
主题
(使用 Spring AMQP)
先决条件
本教程假设 RabbitMQ 已安装并在 localhost
上运行,使用标准端口 (5672)。如果您使用不同的主机、端口或凭据,则需要调整连接设置。
获取帮助
如果您在学习本教程时遇到问题,可以通过 GitHub Discussions 或 RabbitMQ 社区 Discord 联系我们。
在上一个教程中,我们提高了消息传递的灵活性。我们没有使用只能进行虚拟广播的 fanout
交换机,而是使用了 direct
交换机,并获得了基于路由键选择性接收消息的可能性。
虽然使用 direct
交换机改进了我们的系统,但它仍然存在局限性 - 它无法基于多个标准进行路由。
在我们的消息传递系统中,我们可能不仅希望根据路由键订阅队列,还希望根据生成消息的来源进行订阅。您可能从 syslog
unix 工具中了解了这个概念,该工具根据严重性(info/warn/crit...)和设施(auth/cron/kern...)路由日志。我们的示例比这稍微简单一些。
该示例将为我们提供很大的灵活性 - 我们可能希望仅监听来自“cron”的关键错误,但也监听来自“kern”的所有日志。
为了在我们的日志记录系统中实现这种灵活性,我们需要了解更复杂的 topic
交换机。
主题交换机
发送到 topic
交换机的消息不能有任意的 routing_key
- 它必须是由点分隔的单词列表。单词可以是任何内容,但通常它们指定与消息相关的某些特征。一些有效的路由键示例:stock.usd.nyse
、nyse.vmw
、quick.orange.rabbit
。路由键中可以包含任意数量的单词,最多 255 字节的限制。
绑定键也必须采用相同的形式。 topic
交换机背后的逻辑与 direct
交换机类似 - 使用特定路由键发送的消息将传递到所有绑定了匹配绑定键的队列。但是,绑定键有两个重要的特殊情况
*
(星号)可以替代正好一个单词。#
(井号)可以替代零个或多个单词。
通过一个例子最容易解释这一点
在本例中,我们将发送描述动物的消息。消息将使用由三个单词(两个点)组成的路由键发送。路由键中的第一个单词将描述速度,第二个单词描述颜色,第三个单词描述物种:<speed>.<colour>.<species>
。
我们创建了三个绑定:Q1 绑定了绑定键 *.orange.*
,Q2 绑定了 *.*.rabbit
和 lazy.#
。
这些绑定可以概括为
- Q1 对所有橙色动物感兴趣。
- Q2 想要听到关于兔子的一切,以及关于懒惰动物的一切。
路由键设置为 quick.orange.rabbit
的消息将传递到两个队列。消息 lazy.orange.elephant
也将发送到它们两个。另一方面,quick.orange.fox
将仅发送到第一个队列,而 lazy.brown.fox
仅发送到第二个队列。 lazy.pink.rabbit
将仅传递到第二个队列一次,即使它匹配两个绑定。 quick.brown.fox
不匹配任何绑定,因此将被丢弃。
如果我们违反约定并发送一个或四个单词的消息,例如 orange
或 quick.orange.new.rabbit
会发生什么?嗯,这些消息将不匹配任何绑定,并且会丢失。
另一方面,lazy.orange.new.rabbit
,即使它有四个单词,也将匹配最后一个绑定,并将传递到第二个队列。
主题交换机
主题交换机功能强大,可以像其他交换机一样运行。
当队列绑定了
#
(井号)绑定键时 - 它将接收所有消息,而与路由键无关 - 就像在fanout
交换机中一样。当绑定中未使用特殊字符
*
(星号)和#
(井号)时,主题交换机的行为将与direct
交换机完全相同。
整合在一起
我们将在我们的消息传递系统中使用 topic
交换机。我们将从一个工作假设开始,即路由键将利用通配符和井号标签。
代码与上一个教程中的代码几乎相同。
首先,在 tut5
包的 Tut5Config
类中配置一些 profile 和 bean
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
@Profile({"tut5","topics"})
@Configuration
public class Tut5Config {
@Bean
public TopicExchange topic() {
return new TopicExchange("tut.topic");
}
@Profile("receiver")
private static class ReceiverConfig {
@Bean
public Tut5Receiver receiver() {
return new Tut5Receiver();
}
@Bean
public Queue autoDeleteQueue1() {
return new AnonymousQueue();
}
@Bean
public Queue autoDeleteQueue2() {
return new AnonymousQueue();
}
@Bean
public Binding binding1a(TopicExchange topic,
Queue autoDeleteQueue1) {
return BindingBuilder.bind(autoDeleteQueue1)
.to(topic)
.with("*.orange.*");
}
@Bean
public Binding binding1b(TopicExchange topic,
Queue autoDeleteQueue1) {
return BindingBuilder.bind(autoDeleteQueue1)
.to(topic)
.with("*.*.rabbit");
}
@Bean
public Binding binding2a(TopicExchange topic,
Queue autoDeleteQueue2) {
return BindingBuilder.bind(autoDeleteQueue2)
.to(topic)
.with("lazy.#");
}
}
@Profile("sender")
@Bean
public Tut5Sender sender() {
return new Tut5Sender();
}
}
我们设置了 profile 以将主题作为 tut5
或 topics
的选择来执行。然后,我们为 TopicExchange
创建了 bean。 receiver
profile 是 ReceiverConfig
类,它定义了我们的接收器、两个与上一个教程中相同的 AnonymousQueue
以及利用主题语法的主题绑定。我们还创建了 sender
profile 作为 Tut5Sender
类的创建。
Tut5Receiver
再次使用 @RabbitListener
注解来接收来自相应主题的消息。
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.util.StopWatch;
public class Tut5Receiver {
@RabbitListener(queues = "#{autoDeleteQueue1.name}")
public void receive1(String in) throws InterruptedException {
receive(in, 1);
}
@RabbitListener(queues = "#{autoDeleteQueue2.name}")
public void receive2(String in) throws InterruptedException {
receive(in, 2);
}
public void receive(String in, int receiver) throws
InterruptedException {
StopWatch watch = new StopWatch();
watch.start();
System.out.println("instance " + receiver + " [x] Received '"
+ in + "'");
doWork(in);
watch.stop();
System.out.println("instance " + receiver + " [x] Done in "
+ watch.getTotalTimeSeconds() + "s");
}
private void doWork(String in) throws InterruptedException {
for (char ch : in.toCharArray()) {
if (ch == '.') {
Thread.sleep(1000);
}
}
}
}
Tut5Sender.java
的代码
package org.springframework.amqp.tutorials.tut5;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import java.util.concurrent.atomic.AtomicInteger;
public class Tut5Sender {
@Autowired
private RabbitTemplate template;
@Autowired
private TopicExchange topic;
AtomicInteger index = new AtomicInteger(0);
AtomicInteger count = new AtomicInteger(0);
private final String[] keys = {"quick.orange.rabbit", "lazy.orange.elephant", "quick.orange.fox",
"lazy.brown.fox", "lazy.pink.rabbit", "quick.brown.fox"};
@Scheduled(fixedDelay = 1000, initialDelay = 500)
public void send() {
StringBuilder builder = new StringBuilder("Hello to ");
if (this.index.incrementAndGet() == keys.length) {
this.index.set(0);
}
String key = keys[this.index.get()];
builder.append(key).append(' ');
builder.append(this.count.incrementAndGet());
String message = builder.toString();
template.convertAndSend(topic.getName(), key, message);
System.out.println(" [x] Sent '" + message + "'");
}
}
按照教程 1 中描述的方式编译并运行示例。或者,如果您一直在学习这些教程,您只需要执行以下操作
构建项目
./mvnw clean package
要使用正确的 profile 执行发送者和接收者,请使用正确的参数执行 jar
# shell 1
java -jar target/rabbitmq-tutorials.jar \
--spring.profiles.active=topics,receiver \
--tutorial.client.duration=60000
# shell 2
java -jar target/rabbitmq-tutorials.jar \
--spring.profiles.active=topics,sender \
--tutorial.client.duration=60000
来自发送者的输出将如下所示
Ready ... running for 60000ms
[x] Sent 'Hello to lazy.orange.elephant 1'
[x] Sent 'Hello to quick.orange.fox 2'
[x] Sent 'Hello to lazy.brown.fox 3'
[x] Sent 'Hello to lazy.pink.rabbit 4'
[x] Sent 'Hello to quick.brown.fox 5'
[x] Sent 'Hello to quick.orange.rabbit 6'
[x] Sent 'Hello to lazy.orange.elephant 7'
[x] Sent 'Hello to quick.orange.fox 8'
[x] Sent 'Hello to lazy.brown.fox 9'
[x] Sent 'Hello to lazy.pink.rabbit 10'
接收者将响应以下输出
instance 1 [x] Received 'Hello to lazy.orange.elephant 1'
instance 2 [x] Received 'Hello to lazy.orange.elephant 1'
instance 2 [x] Done in 2.005s
instance 1 [x] Done in 2.005s
instance 1 [x] Received 'Hello to quick.orange.fox 2'
instance 2 [x] Received 'Hello to lazy.brown.fox 3'
instance 1 [x] Done in 2.003s
instance 2 [x] Done in 2.003s
instance 1 [x] Received 'Hello to lazy.pink.rabbit 4'
instance 2 [x] Received 'Hello to lazy.pink.rabbit 4'
instance 1 [x] Done in 2.006s
instance 2 [x] Done in 2.006s
尽情享受这些程序。请注意,代码没有对路由或绑定键做任何假设,您可能希望使用超过两个路由键参数进行尝试。
(Tut5Receiver.java 源代码 和 Tut5Sender.java 源代码 的完整源代码。配置位于 Tut5Config.java 源代码。)
接下来,了解如何在教程 6中将往返消息作为远程过程调用执行