RabbitMQ Streams 中的互操作性
RabbitMQ Streams 允许应用程序通过其使用的强大消息格式传递详细信息。Streams 本身是一项功能,但它们也与 RabbitMQ 支持的现有资源和协议完全集成。这篇博文涵盖了 RabbitMQ Streams 的互操作性,并探讨了它带来的场景。
RabbitMQ Streams 中的消息
在《RabbitMQ Streams 初体验》博客文章中,我们了解到流消息具有二进制正文和属性,并支持不同类型。例如:
Message message = producer.messageBuilder()
.properties()
.creationTime(System.currentTimeMillis()) // Unix time, with a precision of milliseconds
.messageId(i) // long, string, binary, or UUID
.messageBuilder()
.addData("hello world".getBytes(StandardCharsets.UTF_8))
.build();
RabbitMQ Stream 使用 AMQP 1.0 消息格式 来编码消息。它是一种灵活、强大且高效的格式,拥有复杂的类型系统。它还得到了 Java、Go、.NET 等多种平台的支持。我们的示例使用了 Java,但消息也可以由 不同 平台 发布和消费。
请注意,RabbitMQ Streams 不使用 AMQP 1.0 协议,因为它有自己的 二进制协议,该协议碰巧在其某些帧中承载了以 AMQP 1.0 格式编码的消息。Stream Java 客户端文档包含 有关 AMQP 1.0 消息的更多详细信息。
其他协议怎么办?
因此,流应用程序可以发布和消费复杂的 Messa ges,目前为止一切顺利。但 Streams 在 RabbitMQ 中并非孤岛:它们与其他 RabbitMQ 支持的协议(即 AMQP 0.9.1、AMQP 1.0、STOMP 和 MQTT)具有互操作性。
因此,如果一个使用给定协议的应用程序可以发布最终进入队列的消息,那么流应用程序就可以消费这些消息,只要该队列是一个流。反之亦然:如果一个流应用程序将消息发布到一个流,使用其他协议的应用程序会将该流视为一个队列,并可以从中消费消息。
这种互操作性结合 RabbitMQ 的路由功能带来了极大的灵活性,并在涉及多种平台和编程语言的许多场景中非常有用。这也意味着,尽管 Streams 是 RabbitMQ 3.9 的一项新功能,但您不必只为具有流发布者和流消费者的全新应用程序开始使用它们,而是可以将它们包含到您现有的 RabbitMQ 体系结构中。
现有系统中的 Streams
想象一个现有的系统,发布者将消息发送到主题交换器。根据它们的路由键,消息会被路由到给定的队列,对应于给定的世界区域。每个队列都有自己的处理,取决于区域。请注意,发布者不必使用 AMQP 0.9.1,它们可以使用 RabbitMQ 支持的任何协议,只要它们将消息发布到具有适当路由键的交换器即可。
现在想象一下,我们想保持现有处理不变,但要对所有消息进行一些额外的处理,例如分析。我们只需要创建一个流,并使用通配符将其绑定到主题交换器。
流的非破坏性消费者语义使其非常适合分析,而且由于流协议非常快,因此结果可以每天从头开始重新计算。这也可以使任何需要全部或部分消息历史记录的应用程序受益,因为任何数量的消费者都可以从任何时间点读取和重新读取流。
队列中的区域处理可以保持完全不变,只要队列语义符合需求(例如,破坏性消费、竞争性消费者等)。
这是一个绝佳的示例,说明了如何轻松地将 Streams 引入现有 RabbitMQ 体系结构而无需承担风险,从而立即提供增值。
使用 AMQP 0.9.1 发布,使用 Stream 协议消费
让我们看一段代码来演示这个示例。
以下代码片段展示了一个使用 RabbitMQ 0.9.1 AMQP Java 客户端的发布者。
channel.basicPublish(
"events", // exchange
REGIONS[i % REGIONS.length], // routing key, round robin across regions
new AMQP.BasicProperties.Builder()
.messageId(String.valueOf(i)) // message ID
.timestamp(new Date()) // creation time
.contentType("text/plain") // content type
.build(),
("message " + i).getBytes(StandardCharsets.UTF_8) // body
);
发布者设置了一些属性,以说明 AMQP 0.9.1 和流消息之间的互操作性。
这是使用 Stream Java 客户端处理来自全球流的消息的消费者。
environment.consumerBuilder()
.stream("world")
.offset(OffsetSpecification.first())
.messageHandler((context, message) -> {
String body = new String(message.getBodyAsBinary());
log(
"Message #%s, creation time %tF %tT, content type '%s', from exchange %s with routing key %s",
message.getProperties().getMessageId(),
message.getProperties().getCreationTime(),
message.getProperties().getCreationTime(),
message.getProperties().getContentType(),
message.getMessageAnnotations().get("x-exchange"),
message.getMessageAnnotations().get("x-routing-key"));
})
.build();
消费者可以检索原始 AMQP 0.9.1 消息中的属性,即使 Streams 使用 AMQP 1.0 消息格式。还可以将消息的交换器和路由键作为消息注释(AMQP 1.0 的概念)的一部分进行检索。
互操作性实战
让我们在一个具有上述拓扑的项目中运行前面的代码。
设置示例项目
运行示例需要安装 Docker、Git 和 Java 8 或更高版本。您可以使用以下命令启动代理:
docker run -it --rm --name rabbitmq -p 5552:5552 -p 5672:5672 -p 15672:15672 \
-e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS='-rabbitmq_stream advertised_host localhost' \
rabbitmq:3.9-management
然后您需要启用 Streams 插件:
docker exec rabbitmq rabbitmq-plugins enable rabbitmq_stream
代码 托管在 GitHub 上。以下是克隆存储库的方法:
git clone https://github.com/acogoluegnes/rabbitmq-streams-blog-posts.git
cd rabbitmq-streams-blog-posts
在示例项目中,我们将:
- 创建上面图表中所示的拓扑
- 使用 AMQP 0.9.1 发布者发布消息
- 使用 Stream 协议从流中消费消息
创建拓扑
以下命令将创建交换器、队列、流以及它们各自的绑定。
./mvnw -q compile exec:java -Dexec.mainClass='com.rabbitmq.stream.Interoperability$CreateTopology'
您应该会在控制台上看到以下内容:
Connecting...
Connected
Creating 'events' topic exchange...
Creating 'amer' queue and binding it to 'events' exchange...
Creating 'emea' queue and binding it to 'events' exchange...
Creating 'apac' queue and binding it to 'events' exchange...
Creating 'world' stream and binding it to 'events' exchange...
Closing connection
可以在 管理 UI 中(用户 `guest`,密码 `guest`)检查资源是否已创建。
发布消息
以下命令运行一个发送 100 条消息的发布者。
./mvnw -q compile exec:java -Dexec.mainClass='com.rabbitmq.stream.Interoperability$Publish'
您应该会看到以下控制台输出:
Connecting...
Connected
Sending 100 messages
Messages sent, waiting for confirmation...
Messages confirmed
Closing connection
如上面的代码片段所示,发布者设置了一些消息属性,并以轮循方式更改路由键以发布到每个区域。您可以回到 管理 UI 来检查队列和流的内容。请注意,流中包含一条额外的消息:这是一条毒丸消息,用于停止我们接下来运行的消费者。
从流中消费消息
现在是时候消费所有最终进入流的消息了。
./mvnw -q compile exec:java -Dexec.mainClass='com.rabbitmq.stream.Interoperability$Consume'
您应该会看到类似以下的输出:
Connecting...
Connected
Start consumer...
Message #0, creation time 2021-10-06 14:19:55, content type 'text/plain', from exchange events with routing key amer
Message #1, creation time 2021-10-06 14:19:55, content type 'text/plain', from exchange events with routing key emea
Message #2, creation time 2021-10-06 14:19:55, content type 'text/plain', from exchange events with routing key apac
Message #3, creation time 2021-10-06 14:19:55, content type 'text/plain', from exchange events with routing key amer
...
Message #97, creation time 2021-10-06 14:19:55, content type 'text/plain', from exchange events with routing key emea
Message #98, creation time 2021-10-06 14:19:55, content type 'text/plain', from exchange events with routing key apac
Message #99, creation time 2021-10-06 14:19:55, content type 'text/plain', from exchange events with routing key amer
Received poison message, stopping...
Closing environment...
Environment closed
Stream 消费者会获取所有消息,在 AMQP 0.9.1 到 AMQP 1.0 消息格式转换过程中不会丢失信息。请注意,交换器和路由键信息也作为消息注释的一部分提供。
总结
在本篇博客文章中,我们看到 RabbitMQ 中的 Streams 可以承载具有复杂格式(二进制正文、属性以及复杂类型)的消息。应用程序可以使用快速高效的 Stream 协议访问 Streams,也可以使用 RabbitMQ 支持的其他协议访问。这使得 Streams 成为需要互操作性的系统中的一等公民,而互操作性是 RabbitMQ 的强项之一。您可以将 Streams 用于全新的应用程序,但也可以毫不犹豫地将它们平滑地集成到您现有的系统中,正如上述场景所示。
