跳至主要内容

RabbitMQ Streams 的互操作性

·阅读时长:8分钟

RabbitMQ Streams 允许应用程序通过其强大的消息格式传递详细的信息。Streams 本身就是一个功能,但它也与 RabbitMQ 支持的现有资源和协议完全集成。这篇博文介绍了 RabbitMQ 中 Streams 的互操作性,并探讨了它解锁的场景。

RabbitMQ Streams 中的消息

我们在 RabbitMQ Streams 首个应用程序 博文中看到,流消息具有二进制主体和属性,并支持不同的类型。例如

Message message = producer.messageBuilder()
.properties()
.creationTime(System.currentTimeMillis()) // Unix time, with a precision of milliseconds
.messageId(i) // long, string, binary, or UUID
.messageBuilder()
.addData("hello world".getBytes(StandardCharsets.UTF_8))
.build();

RabbitMQ Stream 使用 AMQP 1.0 消息格式 来编码消息。这是一种灵活、强大且高效的格式,具有复杂的类型系统。它还受到各种平台的支持,如 Java、Go、.NET 等等。我们的示例使用 Java,但消息可以由 不同的 平台 发布和消费。

注意 RabbitMQ Streams 不使用 AMQP 1.0 协议,因为它有自己的 二进制协议,它恰好在其某些帧中传递使用 AMQP 1.0 格式编码的消息。Stream Java 客户端文档包含 有关 AMQP 1.0 消息的更多详细信息

其他协议呢?

因此,流应用程序可以发布和消费复杂的消息,到目前为止一切顺利。但 Streams 并非 RabbitMQ 中的孤岛:它们可以与 RabbitMQ 支持的其他协议互操作,即 AMQP 0.9.1、AMQP 1.0、STOMP 和 MQTT。

因此,如果使用给定协议的应用程序可以发布最终进入队列的消息,那么只要队列是流,流应用程序就可以消费这些消息。反之亦然:如果流应用程序将消息发布到流中,则使用其他协议的应用程序会将流视为队列,并可以从中消费消息。

这种互操作性结合 RabbitMQ 的路由功能带来了很大的灵活性,并且在可能涉及各种平台和编程语言的许多场景中很有帮助。这也意味着,即使 Streams 是 RabbitMQ 3.9 中的新功能,您也不必只为具有流发布者和流消费者的新应用程序开始使用它们,而是可以将它们包含在您现有的 RabbitMQ 架构中。

现有系统中的 Streams

假设一个现有系统,其中发布者将消息发送到主题交换机。根据其路由键,消息会被路由到给定的队列,匹配给定的世界区域。每个队列都有自己的处理过程,这取决于区域。请注意,发布者不必使用 AMQP 0.9.1,它们可以使用 RabbitMQ 支持的任何协议,只要它们使用适当的路由键发布到交换机即可。

A classic RabbitMQ architecture where messages from an exchange are routed to different queues for a specific processing.
经典的 RabbitMQ 架构,其中来自交换机的消息被路由到不同的队列以进行特定处理。

现在假设我们希望保持现有的处理方式不变,但对所有消息进行一些额外的处理,例如分析。我们只需创建一个流并使用通配符将其绑定到主题交换机即可

The same system with an additional stream that receives all the messages thanks to the wildcard binding.
具有附加流的相同系统,该流通过通配符绑定接收所有消息。

Streams 由于其非破坏性消费者语义而适合于分析,并且由于流协议非常快,因此可以每天从头开始重新计算结果。这也可以使任何其他需要消息的全部或部分历史记录的应用程序受益,因为任意数量的消费者可以从任何时间点读取和重新读取流。

只要队列语义适合需求(破坏性消费、竞争性消费者等),从队列中完成的区域处理就可以保持完全相同。

这是一个极好的例子,说明如何在现有 RabbitMQ 架构中轻松且无风险地引入 Streams,以立即提供增值。

使用 AMQP 0.9.1 发布,使用流协议消费

让我们看一些说明此示例的代码。

下一段代码显示了一个使用 RabbitMQ 0.9.1 AMQP Java 客户端的发布者

channel.basicPublish(
"events", // exchange
REGIONS[i % REGIONS.length], // routing key, round robin across regions
new AMQP.BasicProperties.Builder()
.messageId(String.valueOf(i)) // message ID
.timestamp(new Date()) // creation time
.contentType("text/plain") // content type
.build(),
("message " + i).getBytes(StandardCharsets.UTF_8) // body
);

发布者设置了一些属性来说明 AMQP 0.9.1 和流消息之间的互操作性。

这是一个使用流 Java 客户端处理来自全球流的消息的消费者

environment.consumerBuilder()
.stream("world")
.offset(OffsetSpecification.first())
.messageHandler((context, message) -> {
String body = new String(message.getBodyAsBinary());
log(
"Message #%s, creation time %tF %tT, content type '%s', from exchange %s with routing key %s",
message.getProperties().getMessageId(),
message.getProperties().getCreationTime(),
message.getProperties().getCreationTime(),
message.getProperties().getContentType(),
message.getMessageAnnotations().get("x-exchange"),
message.getMessageAnnotations().get("x-routing-key"));
})
.build();

即使流使用 AMQP 1.0 消息格式,消费者也可以从原始 AMQP 0.9.1 消息中检索属性。也可以在消息注释(AMQP 1.0 概念)中检索消息的交换机和路由键。

互操作性实战

让我们在具有上述拓扑结构的项目中运行前面的代码。

设置示例项目

运行示例需要安装 Docker、Git 和 Java 8 或更高版本。您可以使用以下命令启动代理

docker run -it --rm --name rabbitmq -p 5552:5552 -p 5672:5672 -p 15672:15672 \
-e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS='-rabbitmq_stream advertised_host localhost' \
rabbitmq:3.9-management

然后,您需要启用流插件

docker exec rabbitmq rabbitmq-plugins enable rabbitmq_stream

代码托管在 GitHub 上。以下是克隆存储库的方法

git clone https://github.com/acogoluegnes/rabbitmq-streams-blog-posts.git
cd rabbitmq-streams-blog-posts

在示例项目中,我们将

  • 创建上图所示的拓扑结构
  • 使用 AMQP 0.9.1 发布者发布消息
  • 使用流协议从流中消费消息

创建拓扑结构

以下命令创建交换机、队列、流及其各自的绑定

./mvnw -q compile exec:java -Dexec.mainClass='com.rabbitmq.stream.Interoperability$CreateTopology'

您应该在控制台上看到以下内容

Connecting...
Connected
Creating 'events' topic exchange...
Creating 'amer' queue and binding it to 'events' exchange...
Creating 'emea' queue and binding it to 'events' exchange...
Creating 'apac' queue and binding it to 'events' exchange...
Creating 'world' stream and binding it to 'events' exchange...
Closing connection

可以在 管理 UI 中检查资源是否已创建(用户 guest,密码 guest)。

发布消息

以下运行一个发布者,发送 100 条消息

./mvnw -q compile exec:java -Dexec.mainClass='com.rabbitmq.stream.Interoperability$Publish'

您应该获得以下控制台输出

Connecting...
Connected
Sending 100 messages
Messages sent, waiting for confirmation...
Messages confirmed
Closing connection

如上所示,发布者设置了一些消息属性,并以循环方式更改路由键以发布到每个区域。您可以返回到 管理 UI 以检查队列和流的内容。请注意,流包含一条额外的消息:它是一条毒性消息,用于停止我们接下来要运行的消费者。

从流中消费消息

现在是时候消费所有最终进入流的消息了

./mvnw -q compile exec:java -Dexec.mainClass='com.rabbitmq.stream.Interoperability$Consume'

您应该获得类似以下内容

Connecting...
Connected
Start consumer...
Message #0, creation time 2021-10-06 14:19:55, content type 'text/plain', from exchange events with routing key amer
Message #1, creation time 2021-10-06 14:19:55, content type 'text/plain', from exchange events with routing key emea
Message #2, creation time 2021-10-06 14:19:55, content type 'text/plain', from exchange events with routing key apac
Message #3, creation time 2021-10-06 14:19:55, content type 'text/plain', from exchange events with routing key amer
...
Message #97, creation time 2021-10-06 14:19:55, content type 'text/plain', from exchange events with routing key emea
Message #98, creation time 2021-10-06 14:19:55, content type 'text/plain', from exchange events with routing key apac
Message #99, creation time 2021-10-06 14:19:55, content type 'text/plain', from exchange events with routing key amer
Received poison message, stopping...
Closing environment...
Environment closed

流消费者获取所有消息,而不会在 AMQP 0.9.1 到 AMQP 1.0 消息格式转换之间丢失信息。请注意,交换机和路由键信息也作为消息注释的一部分可用。

总结

我们在本文中看到,RabbitMQ 中的 Streams 可以使用复杂的格式(二进制主体,以及属性,以及复杂类型)传递消息。应用程序可以使用流协议访问 Streams,该协议快速高效,也可以使用 RabbitMQ 支持的其他协议。这使得 Streams 成为需要互操作性的系统中的一等公民,这是 RabbitMQ 的一大优势。您可以将 Streams 用于全新的应用程序,但不要犹豫,如上文所述的场景中建议的那样,将它们平滑地集成到您的现有系统中。

© 2024 RabbitMQ. All rights reserved.