RabbitMQ Streams 的互操作性
RabbitMQ streams 允许应用程序借助其强大的消息格式传递详细信息。Streams 本身就是一个功能,但它们也与 RabbitMQ 支持的现有资源和协议完全集成。这篇博文介绍了 RabbitMQ 中 streams 的互操作性,并探讨了它所解锁的场景。
RabbitMQ Streams 中的消息
我们在 RabbitMQ Streams First Application 博文中看到,stream 消息具有二进制正文、属性,并支持不同的类型。例如
Message message = producer.messageBuilder()
.properties()
.creationTime(System.currentTimeMillis()) // Unix time, with a precision of milliseconds
.messageId(i) // long, string, binary, or UUID
.messageBuilder()
.addData("hello world".getBytes(StandardCharsets.UTF_8))
.build();
RabbitMQ Stream 使用 AMQP 1.0 消息格式来编码消息。它是一种灵活、强大且高效的格式,具有复杂的类型系统。它也受到各种平台的支持,例如 Java、Go、.NET 和许多其他平台。我们的示例使用 Java,但消息也可以由 不同的 平台发布和消费。
请注意,RabbitMQ Streams 不使用 AMQP 1.0 协议,因为它有自己的 二进制协议,该协议恰好在其某些帧中传递以 AMQP 1.0 格式编码的消息。stream Java 客户端文档包含 有关 AMQP 1.0 消息的更多详细信息。
其他协议呢?
因此,stream 应用程序可以发布和消费复杂消息,到目前为止一切都很好。但 streams 在 RabbitMQ 中并非孤立存在:它们可以与 RabbitMQ 支持的其他协议互操作,即 AMQP 0.9.1、AMQP 1.0、STOMP 和 MQTT。
因此,如果使用给定协议的应用程序可以发布最终进入队列的消息,则 stream 应用程序可以消费这些消息,只要队列是一个 stream。反之亦然:如果 stream 应用程序将消息发布到 stream,则使用其他协议的应用程序会将该 stream 视为队列,并可以从中消费消息。
这种互操作性与 RabbitMQ 的路由功能相结合,带来了很大的灵活性,并且在许多可能涉及各种平台和编程语言的场景中都很有帮助。这也意味着,即使 streams 是 RabbitMQ 3.9 中的一项新功能,您也不必仅在新应用程序中使用它们(使用 stream 发布者和 stream 消费者),而是可以将它们包含在您现有的 RabbitMQ 架构中。
现有系统中的 Streams
想象一个现有系统,其中发布者将消息发送到主题交换机。根据其路由键,消息被路由到给定的队列,匹配世界的给定区域。每个队列都有自己的处理,这取决于区域。请注意,发布者不必使用 AMQP 0.9.1,他们可以使用 RabbitMQ 支持的任何协议,只要他们使用适当的路由键发布到交换机即可。
现在想象一下,我们想保持现有的处理方式不变,但对所有消息进行一些额外的处理,例如分析。我们只需要创建一个 stream 并使用通配符将其绑定到主题交换机
stream 适用于分析,这归功于其非破坏性的消费者语义,并且由于 stream 协议非常快,因此每天都可以从头开始重新计算结果。这也可以使任何其他需要消息的全部或部分历史记录的应用程序受益,因为任何数量的消费者都可以从任何时间点读取和重新读取 stream。
从队列完成的区域处理可以保持完全相同,只要队列语义符合需求(破坏性消费、竞争消费者等)。
这是一个很好的例子,说明如何轻松且无风险地将 streams 引入现有的 RabbitMQ 架构中,以立即提供附加值。
使用 AMQP 0.9.1 发布,使用 Stream 协议消费
让我们看一些代码来演示这个例子。
下一个代码片段显示了一个使用 RabbitMQ 0.9.1 AMQP Java 客户端的发布者
channel.basicPublish(
"events", // exchange
REGIONS[i % REGIONS.length], // routing key, round robin across regions
new AMQP.BasicProperties.Builder()
.messageId(String.valueOf(i)) // message ID
.timestamp(new Date()) // creation time
.contentType("text/plain") // content type
.build(),
("message " + i).getBytes(StandardCharsets.UTF_8) // body
);
发布者设置了一些属性,以说明 AMQP 0.9.1 和 stream 消息之间的互操作性。
这是一个消费者,它使用 stream Java 客户端来处理来自 world-wide stream 的消息
environment.consumerBuilder()
.stream("world")
.offset(OffsetSpecification.first())
.messageHandler((context, message) -> {
String body = new String(message.getBodyAsBinary());
log(
"Message #%s, creation time %tF %tT, content type '%s', from exchange %s with routing key %s",
message.getProperties().getMessageId(),
message.getProperties().getCreationTime(),
message.getProperties().getCreationTime(),
message.getProperties().getContentType(),
message.getMessageAnnotations().get("x-exchange"),
message.getMessageAnnotations().get("x-routing-key"));
})
.build();
即使 stream 使用 AMQP 1.0 消息格式,消费者也可以从原始 AMQP 0.9.1 消息中检索属性。也可以在消息注解(AMQP 1.0 概念)中检索消息的交换机和路由键。
互操作性实践
让我们在具有上述拓扑结构的项目中运行之前的代码。
设置示例项目
运行示例需要安装 Docker、Git 和 Java 8 或更高版本。您可以使用以下命令启动 broker
docker run -it --rm --name rabbitmq -p 5552:5552 -p 5672:5672 -p 15672:15672 \
-e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS='-rabbitmq_stream advertised_host localhost' \
rabbitmq:3.9-management
然后您需要启用 stream 插件
docker exec rabbitmq rabbitmq-plugins enable rabbitmq_stream
代码托管在 GitHub 上。以下是如何克隆存储库
git clone https://github.com/acogoluegnes/rabbitmq-streams-blog-posts.git
cd rabbitmq-streams-blog-posts
在示例项目中,我们将
- 创建上图所示的拓扑结构
- 使用 AMQP 0.9.1 发布者发布消息
- 使用 stream 协议从 stream 消费消息
创建拓扑结构
以下命令创建交换机、队列、stream 及其各自的绑定
./mvnw -q compile exec:java -Dexec.mainClass='com.rabbitmq.stream.Interoperability$CreateTopology'
您应该在控制台上看到以下内容
Connecting...
Connected
Creating 'events' topic exchange...
Creating 'amer' queue and binding it to 'events' exchange...
Creating 'emea' queue and binding it to 'events' exchange...
Creating 'apac' queue and binding it to 'events' exchange...
Creating 'world' stream and binding it to 'events' exchange...
Closing connection
可以在 管理 UI 中检查资源是否已创建(用户 guest
,密码 guest
。)
发布消息
以下命令运行一个发送 100 条消息的发布者
./mvnw -q compile exec:java -Dexec.mainClass='com.rabbitmq.stream.Interoperability$Publish'
您应该获得以下控制台输出
Connecting...
Connected
Sending 100 messages
Messages sent, waiting for confirmation...
Messages confirmed
Closing connection
如上面的代码片段所示,发布者设置了一些消息属性,并以轮询方式更改路由键以发布到每个区域。您可以返回 管理 UI 以检查队列和 stream 的内容。请注意,stream 包含一条额外的消息:这是一条毒丸消息,用于停止我们接下来要运行的消费者。
从 Stream 消费消息
现在是时候消费所有最终进入 stream 的消息了
./mvnw -q compile exec:java -Dexec.mainClass='com.rabbitmq.stream.Interoperability$Consume'
您应该得到类似以下的内容
Connecting...
Connected
Start consumer...
Message #0, creation time 2021-10-06 14:19:55, content type 'text/plain', from exchange events with routing key amer
Message #1, creation time 2021-10-06 14:19:55, content type 'text/plain', from exchange events with routing key emea
Message #2, creation time 2021-10-06 14:19:55, content type 'text/plain', from exchange events with routing key apac
Message #3, creation time 2021-10-06 14:19:55, content type 'text/plain', from exchange events with routing key amer
...
Message #97, creation time 2021-10-06 14:19:55, content type 'text/plain', from exchange events with routing key emea
Message #98, creation time 2021-10-06 14:19:55, content type 'text/plain', from exchange events with routing key apac
Message #99, creation time 2021-10-06 14:19:55, content type 'text/plain', from exchange events with routing key amer
Received poison message, stopping...
Closing environment...
Environment closed
stream 消费者获取所有消息,而不会在 AMQP 0.9.1 到 AMQP 1.0 消息格式转换之间丢失信息。请注意,交换机和路由键信息也作为消息注解的一部分提供。
总结
我们在本博文中看到,RabbitMQ 中的 streams 可以传递具有复杂格式的消息(二进制正文,但也包括具有复杂类型的属性)。应用程序可以使用快速高效的 stream 协议访问 streams,也可以使用 RabbitMQ 支持的其他协议访问 streams。这使得 streams 成为需要互操作性的系统中的一等公民,这是 RabbitMQ 的优势之一。您可以将 streams 用于全新的应用程序,但也不要犹豫将它们顺利集成到您现有的系统中,如上述场景所示。