跳至主内容

RabbitMQ 教程 - Topic

Topic

(使用 Spring AMQP)

信息

先决条件

本教程假设 RabbitMQ 已 安装 并在 localhost 上的 标准端口 (5672) 上运行。如果您使用不同的主机、端口或凭据,则需要调整连接设置。

哪里寻求帮助

如果您在学习本教程时遇到困难,可以通过 GitHub DiscussionsRabbitMQ 社区 Discord 联系我们。

上一个教程中,我们提高了消息的灵活性。我们没有使用只能进行虚拟广播的fanout交换机,而是使用了一个direct交换机,从而可以根据路由键选择性地接收消息。

虽然使用 direct 交换机改进了我们的系统,但它仍然存在局限性——它无法基于多个标准进行路由。

在我们的消息系统中,我们可能不仅想基于路由键订阅队列,还想基于生成消息的源订阅队列。您可能从syslog Unix工具中知道这个概念,它根据严重性(info/warn/crit...)和设施(auth/cron/kern...)来路由日志。我们的例子比这要简单一些。

这个例子将为我们提供很大的灵活性——我们可能只想监听来自“cron”的关键错误,但也想监听来自“kern”的所有日志。

为了在我们的日志系统中实现这种灵活性,我们需要学习一个更复杂的topic交换机。

Topic 交换机

发送到 topic 交换机的消息不能具有任意的 routing_key——它必须是由点分隔的单词列表。单词可以是任何内容,但通常它们指定与消息相关的一些特征。一些有效的路由键示例:stock.usd.nysenyse.vmwquick.orange.rabbit。路由键中的单词数量不限,最多 255 字节。

绑定键也必须采用相同的形式。topic 交换机背后的逻辑与 direct 交换机类似——具有特定路由键的消息将发送到所有与匹配绑定键绑定的队列。但是,绑定键有两个重要的特殊情况:

  • * (星号) 可以替代正好一个单词。
  • # (哈希) 可以替代零个或多个单词。

用一个例子来解释最简单

在这个例子中,我们将发送描述动物的消息。消息将使用一个包含三个单词(两个点)的路由键发送。路由键中的第一个单词将描述速度,第二个单词描述颜色,第三个单词描述物种:<speed>.<colour>.<species>

我们创建了三个绑定:Q1 绑定了绑定键 *.orange.*,Q2 绑定了 *.*.rabbitlazy.#

这些绑定可以总结为:

  • Q1 对所有橙色的动物感兴趣。
  • Q2 想听到关于兔子的一切,以及关于懒惰动物的一切。

路由键设置为 quick.orange.rabbit 的消息将发送到两个队列。消息 lazy.orange.elephant 也将发送到这两个队列。另一方面,quick.orange.fox 只会发送到第一个队列,而 lazy.brown.fox 只会发送到第二个队列。lazy.pink.rabbit 将只发送到第二个队列一次,即使它匹配两个绑定。quick.brown.fox 不匹配任何绑定,因此将被丢弃。

如果我们破坏了约定并发送一个单词或四个单词的消息,例如 orangequick.orange.new.rabbit,会发生什么?好吧,这些消息将不匹配任何绑定,并且将被丢失。

另一方面,lazy.orange.new.rabbit 尽管有四个单词,但将匹配最后一个绑定,并将被发送到第二个队列。

Topic 交换机

Topic 交换机功能强大,并且可以像其他交换机一样工作。

当一个队列用 # (哈希) 绑定键绑定时——它将接收所有消息,而不管路由键是什么——就像 fanout 交换机一样。

当绑定中不使用特殊字符 * (星号) 和 # (哈希) 时,topic 交换机将像 direct 交换机一样工作。

总而言之

我们将在消息系统中使用的topic交换机。我们将从一个工作假设开始,即路由键将利用通配符和哈希标签。

代码与上一个教程中的几乎相同。

首先,让我们在tut5包的Tut5Config类中配置一些配置文件和bean。

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;

@Profile({"tut5","topics"})
@Configuration
public class Tut5Config {

@Bean
public TopicExchange topic() {
return new TopicExchange("tut.topic");
}

@Profile("receiver")
private static class ReceiverConfig {

@Bean
public Tut5Receiver receiver() {
return new Tut5Receiver();
}

@Bean
public Queue autoDeleteQueue1() {
return new AnonymousQueue();
}

@Bean
public Queue autoDeleteQueue2() {
return new AnonymousQueue();
}

@Bean
public Binding binding1a(TopicExchange topic,
Queue autoDeleteQueue1) {
return BindingBuilder.bind(autoDeleteQueue1)
.to(topic)
.with("*.orange.*");
}

@Bean
public Binding binding1b(TopicExchange topic,
Queue autoDeleteQueue1) {
return BindingBuilder.bind(autoDeleteQueue1)
.to(topic)
.with("*.*.rabbit");
}

@Bean
public Binding binding2a(TopicExchange topic,
Queue autoDeleteQueue2) {
return BindingBuilder.bind(autoDeleteQueue2)
.to(topic)
.with("lazy.#");
}

}

@Profile("sender")
@Bean
public Tut5Sender sender() {
return new Tut5Sender();
}

}

我们为tut5topics的选择设置了执行主题的配置文件。然后我们创建了TopicExchange的bean。receiver配置文件是定义我们的接收器ReceiverConfig类,以及与上一个教程中相同的两个AnonymousQueue,以及利用主题语法的为主题创建的绑定。我们还创建了sender配置文件,即Tut5Sender类的创建。

Tut5Receiver再次使用@RabbitListener注解从各自的主题接收消息。

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.util.StopWatch;

public class Tut5Receiver {

@RabbitListener(queues = "#{autoDeleteQueue1.name}")
public void receive1(String in) throws InterruptedException {
receive(in, 1);
}

@RabbitListener(queues = "#{autoDeleteQueue2.name}")
public void receive2(String in) throws InterruptedException {
receive(in, 2);
}

public void receive(String in, int receiver) throws
InterruptedException {
StopWatch watch = new StopWatch();
watch.start();
System.out.println("instance " + receiver + " [x] Received '"
+ in + "'");
doWork(in);
watch.stop();
System.out.println("instance " + receiver + " [x] Done in "
+ watch.getTotalTimeSeconds() + "s");
}

private void doWork(String in) throws InterruptedException {
for (char ch : in.toCharArray()) {
if (ch == '.') {
Thread.sleep(1000);
}
}
}
}

Tut5Sender.java的代码

package org.springframework.amqp.tutorials.tut5;

import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import java.util.concurrent.atomic.AtomicInteger;

public class Tut5Sender {

@Autowired
private RabbitTemplate template;

@Autowired
private TopicExchange topic;

AtomicInteger index = new AtomicInteger(0);

AtomicInteger count = new AtomicInteger(0);

private final String[] keys = {"quick.orange.rabbit", "lazy.orange.elephant", "quick.orange.fox",
"lazy.brown.fox", "lazy.pink.rabbit", "quick.brown.fox"};

@Scheduled(fixedDelay = 1000, initialDelay = 500)
public void send() {
StringBuilder builder = new StringBuilder("Hello to ");
if (this.index.incrementAndGet() == keys.length) {
this.index.set(0);
}
String key = keys[this.index.get()];
builder.append(key).append(' ');
builder.append(this.count.incrementAndGet());
String message = builder.toString();
template.convertAndSend(topic.getName(), key, message);
System.out.println(" [x] Sent '" + message + "'");
}

}

按照教程1中的说明编译并运行示例。或者,如果您一直跟着教程进行,只需要执行以下操作:

构建项目

./mvnw clean package

使用正确的配置文件执行发送者和接收者,通过正确的参数执行jar文件。

# shell 1
java -jar target/rabbitmq-tutorials.jar \
--spring.profiles.active=topics,receiver \
--tutorial.client.duration=60000
# shell 2
java -jar target/rabbitmq-tutorials.jar \
--spring.profiles.active=topics,sender \
--tutorial.client.duration=60000

发送者的输出将类似于:

Ready ... running for 60000ms
[x] Sent 'Hello to lazy.orange.elephant 1'
[x] Sent 'Hello to quick.orange.fox 2'
[x] Sent 'Hello to lazy.brown.fox 3'
[x] Sent 'Hello to lazy.pink.rabbit 4'
[x] Sent 'Hello to quick.brown.fox 5'
[x] Sent 'Hello to quick.orange.rabbit 6'
[x] Sent 'Hello to lazy.orange.elephant 7'
[x] Sent 'Hello to quick.orange.fox 8'
[x] Sent 'Hello to lazy.brown.fox 9'
[x] Sent 'Hello to lazy.pink.rabbit 10'

接收者的响应将是以下输出:

instance 1 [x] Received 'Hello to lazy.orange.elephant 1'
instance 2 [x] Received 'Hello to lazy.orange.elephant 1'
instance 2 [x] Done in 2.005s
instance 1 [x] Done in 2.005s
instance 1 [x] Received 'Hello to quick.orange.fox 2'
instance 2 [x] Received 'Hello to lazy.brown.fox 3'
instance 1 [x] Done in 2.003s
instance 2 [x] Done in 2.003s
instance 1 [x] Received 'Hello to lazy.pink.rabbit 4'
instance 2 [x] Received 'Hello to lazy.pink.rabbit 4'
instance 1 [x] Done in 2.006s
instance 2 [x] Done in 2.006s

尽情玩转这些程序吧。请注意,代码不会对路由键或绑定键做任何假设,您可能需要尝试使用两个以上的路由键参数。

Tut5Receiver.java 源代码Tut5Sender.java 源代码的完整源代码。配置位于Tut5Config.java 源代码。)

接下来,在教程6中了解如何作为远程过程调用执行往返消息。

© . This site is unofficial and not affiliated with VMware.