跳到主要内容

RabbitMQ Streams 概述

·5 分钟阅读

RabbitMQ 3.9 引入了一种新型数据结构:streams(流)。流解锁了一系列使用“传统”队列可能难以实现的用例。让我们在这篇文章中了解流如何扩展 RabbitMQ 的功能。

什么是 RabbitMQ Streams

RabbitMQ stream(流)模拟了一个仅追加日志,具有非破坏性消费语义。这意味着——与 RabbitMQ 中的传统队列相反——从流中消费不会删除消息。

RabbitMQ 中的流是持久化和复制的。这转化为数据安全性和可用性(以防节点丢失),以及扩展性(从不同节点读取相同的流)。

与非常通用的队列相比,流看起来可能有点固执己见,但它们在一系列用例中非常方便。它们以一种非常好的方式扩展了 RabbitMQ 的功能。

流适用于什么

RabbitMQ Streams 在以下用例中表现出色

  • 大型扇出:许多应用程序需要读取相同的消息(使用传统队列,这将需要为每个应用程序声明一个队列,并将同一消息的副本传递给每个队列)
  • 大型积压:流将消息存储在磁盘上,而不是内存中,因此唯一的限制是磁盘容量
  • 重放和时间旅行:消费者可以附加到流中的任何位置,使用绝对偏移量或时间戳,并且他们可以读取和重新读取相同的数据
  • 高吞吐量:与传统队列相比,流速度非常快,快几个数量级

由于流作为 RabbitMQ 3.9 中的核心插件发布,因此您可以将它们与所有现有的 RabbitMQ 功能一起使用。

RabbitMQ Streams 概览

让我们更具体地了解流

  • 由于发布者确认和发布者端的消息去重,流提供了至少一次的保证。
  • 流支持服务器端偏移跟踪,以便消费者可以从上次停止的地方重新开始。
  • 由于流具有非破坏性语义,因此它们可以增长很多。RabbitMQ Streams 可以根据基于大小或年龄的保留策略自动截断流。
  • 流可以通过专用的、极快的二进制协议以及 AMQP 0.9.1 和 1.0(速度较慢)访问。
  • stream 协议可以通过stream 插件访问,该插件在 RabbitMQ 3.9 的核心发行版中提供。
  • RabbitMQ Streams 支持客户端-服务器 TLS。
  • 一个现代的、高度优化的Java 客户端可用。它使用 stream 协议以获得更好的性能。它有完整的文档
  • 一个Go 客户端也可用。
  • 还有一个基于 Java 客户端的性能工具。是的,它以 Docker 镜像的形式提供。

如果您想了解更多信息,可以查看下面 RabbitMQ Summit 2021 的流概述演示文稿。如果您时间紧迫,可以跳过它,直接进入下一节的 Docker 快速入门。

废话不多说,让我们开始运行吧。

Docker 快速入门

使用 Docker 练习流非常容易。让我们确保您本地还没有我们要使用的 Docker 镜像

docker rmi rabbitmq:3.9 pivotalrabbitmq/stream-perf-test

如果计算机上没有镜像,您将收到错误消息,但这没关系。

现在让我们为我们的服务器和性能工具容器创建一个网络以进行通信

docker network create rabbitmq-streams

是时候启动 broker 了

docker run -it --rm --network rabbitmq-streams --name rabbitmq rabbitmq:3.9

broker 应该在几秒钟内启动。准备就绪后,启用 stream 插件

docker exec rabbitmq rabbitmq-plugins enable rabbitmq_stream

现在启动性能工具。它将创建一个流,并尽可能快地发布和消费

docker run -it --rm --network rabbitmq-streams pivotalrabbitmq/stream-perf-test \
--uris rabbitmq-stream://rabbitmq:5552

您可以让性能工具运行一段时间,然后使用 Ctrl+C 停止它

19, published 1180489 msg/s, confirmed 1180145 msg/s, consumed 1180648 msg/s, \
latency min/median/75th/95th/99th 1537/7819/9631/12136/14425 µs, chunk size 2639
20, published 1181929 msg/s, confirmed 1181597 msg/s, consumed 1182074 msg/s, \
latency min/median/75th/95th/99th 1537/7838/9562/11967/14355 µs, chunk size 2657
^C
Summary: published 1205835 msg/s, confirmed 1205435 msg/s, consumed 1205477 msg/s, latency 95th 12158 µs, chunk size 2654

这些是常规 Linux 工作站上的数字,您获得的数字取决于您自己的设置。请注意,在 macOS 和 Windows 上,数字可能会显着降低,因为 Docker 在这些操作系统上的虚拟化环境中运行。

然后您可以使用 Ctrl+C 停止 broker 容器并删除网络

docker network rm rabbitmq-streams

如果您想进一步开始构建应用程序,stream Java 客户端文档是一个很好的起点。

本文总结了我们对 RabbitMQ Streams 的概述,这是一种具有强大功能和工具的新型仅追加日志数据结构。敬请关注后续文章,了解更多关于 stream 的信息!

© . All rights reserved.