跳至主内容
版本:4.3

流(Streams)与超级流(Superstreams,即分区流)

什么是流

RabbitMQ 流是一种持久化的、复制的数据结构,可以完成与队列相同的任务:它们缓存来自生产者的消息并由消费者读取。然而,流在消息的存储和消费方式上与队列有两个重要区别。

流模拟了只追加(append-only)的消息日志,这些消息可以被反复读取,直到过期为止。流始终是持久化且经过复制的。对这种流行为更技术的描述是“非破坏性消费语义”。

要在 RabbitMQ 中从流中读取消息,一个或多个消费者需要订阅它,并可以根据需要多次读取相同的消息。

流中的数据可以通过 RabbitMQ 客户端库使用,也可以通过专用二进制协议插件及关联客户端使用。后者被**强烈推荐**,因为它提供了对所有流特定功能的访问,并能提供最佳的吞吐量(性能)。

配套指南《流客户端连接》解释了流协议客户端应如何连接到集群节点,以获得最佳的数据局部性和效率(吞吐量、延迟)。

信息

除了流之外,RabbitMQ 还支持名为超级流的分区流。本指南后续将详细介绍它们。

现在,您可能会问以下问题:

  • 那么流会取代队列吗?
  • 我应该放弃使用队列吗?

回答这些问题:引入流并不是为了取代队列,而是为了补充它们。流为 RabbitMQ 开辟了许多新的用例,这些用例在《流的使用场景》中进行了描述。

以下信息详细介绍了流的使用,以及流的管理和维护操作。

您还应该查看流插件信息,以了解更多关于在二进制 RabbitMQ 流协议下使用流的内容,并参阅《RabbitMQ 核心功能与流插件对比页面》查看功能矩阵。

流的使用场景

开发流的初衷是为了覆盖 4 种现有的队列类型无法提供,或者提供时存在缺陷的消息传递用例:

  1. 大规模分发(Fan-outs)

    当希望将同一条消息发送给多个订阅者时,用户目前必须为每个消费者绑定一个专用队列。如果消费者数量庞大,这会变得非常低效,特别是在需要持久化和/或复制的情况下。流将允许任意数量的消费者以非破坏性的方式从同一个队列消费相同的消息,从而无需绑定多个队列。流消费者还可以从副本中读取数据,允许将读取负载分散到整个集群中。

  2. 重放(时间旅行)

    由于所有现有的 RabbitMQ 队列类型都具有破坏性消费行为(即消息在消费者消费完成后会从队列中删除),因此无法重新读取已消费的消息。流将允许消费者在日志的任何位置进行挂载并从该处读取。

  3. 吞吐量性能

    没有任何持久化队列类型能够提供与现有任何基于日志的消息系统相媲美的吞吐量。流在设计时将高性能作为一个主要目标。

  4. 大量积压

    大多数 RabbitMQ 队列设计为趋向于空状态,并以此进行优化,因此在给定队列上有数百万条消息时,性能可能会下降。流旨在以高效的方式存储大量数据,且内存开销极小。

如何使用 RabbitMQ 流

能够指定可选队列和消费者参数的 AMQP 0.9.1 客户端库将能够像使用常规 AMQP 0.9.1 队列一样使用流。

就像队列一样,流也必须先声明。

声明 RabbitMQ 流

要声明流,请将 x-queue-type 队列参数设置为 stream(默认值为 classic)。此参数必须由客户端在声明时提供;它不能使用策略(policy)来设置或更改。这是因为策略定义或适用的策略可以动态更改,但队列类型不能。它必须在声明时指定。

以下代码片段展示了如何使用 AMQP 0.9.1 Java 客户端创建流:

ConnectionFactory factory = new ConnectionFactory();
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(
"my-stream",
true, // durable
false, false, // not exclusive, not auto-delete
Collections.singletonMap("x-queue-type", "stream")
);

x-queue-type 参数设置为 stream 声明队列,将在每个配置的 RabbitMQ 节点上创建一个流副本。流是仲裁系统,因此强烈建议集群大小为奇数。

流仍然是一个 AMQP 0.9.1 队列,因此它可以在创建后绑定到任何交换机,就像任何其他 RabbitMQ 队列一样。

如果使用管理 UI 进行声明,必须在队列类型下拉菜单中指定 stream 类型。

流支持额外的队列参数,这些参数也可以使用策略进行配置:

  • x-max-length-bytes

设置流的最大字节大小。请参见保留策略(retention)。默认:未设置。

  • x-max-age

设置流的最大保留时长。请参见保留策略(retention)。默认:未设置。

  • x-stream-max-segment-size-bytes

流在磁盘上被划分为固定大小的段文件。此设置以字节为单位控制这些段的大小。默认:500,000,000 字节。

  • x-stream-filter-size-bytes

用于过滤的布隆过滤器的大小(以字节为单位)。该值必须介于 16 和 255 之间。默认:16 字节。

虽然 x-stream-max-segment-size-bytesx-stream-filter-size-bytes 参数可以通过策略配置,但它们在流声明时策略已设置(存在)的情况下才会应用于该流。如果为匹配的但已存在的流更改这些参数,它们将不会被更改,即使队列记录的生效策略可能显示已更改。

因此,最好仅通过队列参数来配置这些设置。

下面的 Java 示例展示了如何在应用程序代码中于流声明时设置参数:

Map<String, Object> arguments = new HashMap<>();
arguments.put("x-queue-type", "stream");
// maximum stream size: 20 GB
arguments.put("x-max-length-bytes", 20_000_000_000);
// size of segment files: 100 MB
arguments.put("x-stream-max-segment-size-bytes", 100_000_000);
// size of stream bloom filter: 32
arguments.put("x-stream-filter-size-bytes", 32);

channel.queueDeclare(
"my-stream",
true, // durable
false, false, // not exclusive, not auto-delete
arguments
);

客户端操作

消费

由于流永远不会删除任何消息,因此任何消费者都可以从日志中的任何位置开始读取/消费。这由 x-stream-offset 消费者参数控制。如果未指定,消费者将从消费者启动后写入日志的下一个偏移量(offset)开始读取。支持以下值:

  • first - 从日志中第一个可用的消息开始。
  • last - 从最后写入的消息“块(chunk)”开始读取(块是流中使用的存储和传输单元,简单来说,它是一批消息,根据输入情况,包含几条到几千条消息)
  • next - 与不指定偏移量相同。
  • Offset - 一个数值,指定要挂载到的日志的确切偏移量。如果该偏移量不存在,它将分别被限制在日志的开头或结尾。
  • Timestamp:一个时间戳值,指定挂载到流中的位置,基于消息到达时间(而非应用程序提供的时间戳)。它将限制在最接近的偏移量;如果时间戳超出流的范围,它将分别限制在日志的开头或结尾。在使用 AMQP 0.9.1 时,所使用的时间戳为 POSIX 时间,精度为一秒,即自 1970-01-01 00:00:00 UTC 以来的秒数。由于流被分段为共享单个时间戳的块,消费者在块边界处挂载,并可能接收到在指定时间戳之前不久发布的消息。
  • Interval - 一个字符串值,指定相对于当前时间挂载日志的时间间隔。使用与 x-max-age 相同的规范(请参见保留策略)。

以下代码片段展示了如何使用 first 偏移量规范:

channel.basicQos(100); // QoS must be specified
channel.basicConsume(
"my-stream",
false,
Collections.singletonMap("x-stream-offset", "first"), // "first" offset specification
(consumerTag, message) -> {
// message processing
// ...
channel.basicAck(message.getEnvelope().getDeliveryTag(), false); // ack is required
},
consumerTag -> { });

以下代码片段展示了如何指定特定的消费偏移量:

channel.basicQos(100); // QoS must be specified
channel.basicConsume(
"my-stream",
false,
Collections.singletonMap("x-stream-offset", 5000), // offset value
(consumerTag, message) -> {
// message processing
// ...
channel.basicAck(message.getEnvelope().getDeliveryTag(), false); // ack is required
},
consumerTag -> { });

以下代码片段展示了如何指定特定的消费时间戳:

// an hour ago
Date timestamp = new Date(System.currentTimeMillis() - 60 * 60 * 1_000)
channel.basicQos(100); // QoS must be specified
channel.basicConsume(
"my-stream",
false,
Collections.singletonMap("x-stream-offset", timestamp), // timestamp offset
(consumerTag, message) -> {
// message processing
// ...
channel.basicAck(message.getEnvelope().getDeliveryTag(), false); // ack is required
},
consumerTag -> { });

其他流操作

以下操作可以以类似于经典队列和仲裁队列的方式使用,但某些操作具有特定的队列行为:

流的单活跃消费者(Single Active Consumer)功能

流的单活跃消费者功能是 RabbitMQ 3.11 及更高版本中可用的功能。它在流上提供了独占消费消费连续性。当多个共享相同流和名称的消费者实例启用单活跃消费者时,同一时间只有一个实例处于活跃状态并接收消息。其他实例将处于空闲状态。

单活跃消费者功能带来两个好处:

  • 消息按顺序处理:同一时间只有一个消费者。
  • 保持消费连续性:如果活跃消费者停止或崩溃,组中的另一个消费者将接管工作。

一篇博客文章提供了有关流的单活跃消费者的更多详细信息。

超级流(Super Streams)

超级流是一种通过将大型流分区为较小的流来实现横向扩展的方法。它们与单活跃消费者集成,以保持分区内的消息顺序。超级流从 RabbitMQ 3.11 开始可用。

超级流是由独立的常规流组成的逻辑流。它是使用 RabbitMQ 流扩展发布和消费的一种方式:将一个大的逻辑流划分为分区流,从而将存储和流量分散到多个集群节点上。

超级流仍然是一个逻辑实体:得益于客户端库的智能,应用程序将其视为一个“大型”流。超级流的拓扑结构基于 AMQP 0.9.1 模型,即交换机、队列以及它们之间的绑定。

可以使用任何 AMQP 0.9.1 库或管理插件创建超级流的拓扑结构,这需要创建直接交换机(direct exchange)、“分区”流,并将它们绑定在一起。不过,使用 rabbitmq-streams add_super_stream 命令可能会更简单。以下是如何使用它创建一个具有 3 个分区的 invoices 超级流:

rabbitmq-streams add_super_stream invoices --partitions 3

使用 rabbitmq-streams add_super_stream --help 了解有关该命令的更多信息。

与单个流相比,超级流增加了复杂性,因此不应将其视为所有涉及流的用例的默认解决方案。仅在确保达到单个流的限制时,才考虑使用超级流。

一篇博客文章提供了超级流的概述。

功能对比:常规队列与流

流在传统意义上并不是真正的队列,因此与 AMQP 0.9.1 队列语义的契合度并不高。其他队列类型支持的许多功能在流中不受支持,并且由于队列类型的性质,将来也不会支持。

能够使用常规队列的 AMQP 0.9.1 客户端库,只要使用消费者确认,就能够使用流。

由于流的非破坏性读取语义,许多功能将永远不会在流中得到支持。

功能矩阵

功能经典
非持久化队列
独占性
单条消息持久化单条消息始终
成员变更手动
TTL否(但请参见保留策略
队列长度限制否(但请参见保留策略
将消息保存在内存中参见 经典队列从不
消息优先级
消费者优先级
死信交换机
遵守策略是(参见保留策略
响应内存警报否(使用极少的 RAM)
有害消息处理

非持久化队列

根据其预期的使用场景,流始终是持久化的,它们不能像常规队列那样是非持久化的。

独占性

根据其预期的使用场景,流始终是持久化的,它们不能像常规队列那样是独占的。它们不打算用作临时队列

全局 QoS

流不支持全局 QoS 预取(即通道为该通道上的所有消费者设置单个预取限制)。如果尝试从启用了全局 QoS 的通道消费流,将返回通道错误。

请使用针对消费者的 QoS 预取,这是许多流行客户端的默认设置。

数据保留(Data Retention)

流被实现为不可变的、只追加的磁盘日志。这意味着日志将无限增长,直到磁盘空间耗尽。为避免这种情况,可以为每个流设置保留配置,该配置将根据日志总大小和/或年龄丢弃日志中最旧的数据。

有两个参数可以控制流的保留策略。它们可以组合使用。这些参数可以在声明时使用队列参数设置,也可以作为可以动态更新的策略设置。策略的优先级高于队列参数。

  • max-age:

    有效单位:Y, M, D, h, m, s

    例如 7D 表示一周

  • max-length-bytes:

    最大总大小(字节)

注意:保留策略是按段(segment)评估的,因此还有一个参数会起作用,那就是流的段大小。只要段中包含至少一条消息,流就将始终保留至少一个段。当使用代理提供的偏移量跟踪时,每个消费者的偏移量作为非消息数据持久化存储在流本身中。

性能特征

由于流在执行任何操作之前都会将所有数据持久化到磁盘,因此建议尽可能使用最快的磁盘。

由于流具有磁盘 I/O 密集型的特性,其吞吐量会随消息大小的增加而降低。

就像仲裁队列一样,流也会受到集群大小的影响。流拥有的副本越多,其吞吐量通常就越低,因为需要完成更多的工作来复制数据并达成共识。

控制初始复制因子

x-initial-cluster-size 队列参数控制初始流集群应跨越多少个 Rabbit 节点。

管理流副本

流的副本由操作员显式管理。当集群中添加新节点时,除非操作员明确将其添加到流的副本集中,否则它不会托管任何流副本。

当需要停用某个节点(从集群中永久移除)时,必须将其从当前托管的所有流的副本列表中显式移除。

提供了两个命令行工具 (CLI) 命令来执行上述操作:rabbitmq-streams add_replicarabbitmq-streams delete_replica

rabbitmq-streams add_replica [-p <vhost>] <stream-name> <node>
rabbitmq-streams delete_replica [-p <vhost>] <stream-name> <node>

为了成功添加和删除副本,流协调器必须在集群中可用。

在执行涉及成员变更的维护操作时,需要小心,以免因丢失仲裁(quorum)而意外导致流不可用。

因为流的成员身份信息并未嵌入在流本身中,所以目前无法完全安全地添加副本。因此,如果此时存在不同步的副本,则无法添加另一个副本,并会返回错误。

更换集群节点时,较安全的做法是先添加一个新节点,等待它同步完成,然后再停用它所替换的节点。

可以使用以下命令查询流的复制状态:

rabbitmq-streams stream_status [-p <vhost>] <stream-name>

此外,流可以使用以下命令重启:

rabbitmq-streams restart_stream [-p <vhost>] <stream-name>

流的行为

每个流都有一个主写入器(领导者,leader)和零个或多个副本。

领导者选举与故障处理

声明新流时,将随机选择托管其副本的节点集,但始终会包含声明该流的客户端所连接的节点。

哪个副本成为初始领导者可以通过三种方式控制:即使用 x-queue-leader-locator 可选队列参数、设置 queue-leader-locator 策略键,或在配置文件中定义 queue_leader_locator 键。可能的值如下:

  • client-local:选择声明该流的客户端所连接的节点。这是默认值。
  • balanced:如果队列总数少于 1000 个(包括经典队列、仲裁队列和流),则选择托管最少数量流领导者的节点。如果队列总数超过 1000 个,则随机选择一个节点。

流需要声明节点中的仲裁数量可用才能运行。当托管流领导者的 RabbitMQ 节点发生故障或停止时,托管该流副本的另一个节点将被选为领导者并恢复操作。

发生故障并重新加入的副本将与领导者重新同步(“追赶”)。类似于仲裁队列,临时副本故障不需要从当前选举的领导者进行完全重新同步。如果重新加入的副本落后于领导者,只会传输增量数据。此“追赶”过程不会影响领导者的可用性。

副本必须显式添加。当添加新副本时,它将从领导者处同步整个流状态,类似于新添加的仲裁队列副本。

容错能力与在线副本的最低数量

共识系统可以在数据安全方面提供一定的保证。这些保证意味着在它们生效之前必须满足某些条件,例如需要至少三个集群节点来提供容错能力,并且需要超过半数的成员可用才能工作。

各种大小集群的故障容忍特性可以用下表描述:

集群节点数容忍的节点故障数容忍网络分区
10不适用
20
31
41是(如果其中一边存在多数派)
52
62是(如果其中一边存在多数派)
73
83是(如果其中一边存在多数派)
94

使用流时的数据安全

流在多个节点之间复制数据,并且只有在数据已复制到流副本的仲裁数之后,才会发出发布者确认。

流始终将数据存储在磁盘上,但是,它们不会显式地将数据从操作系统页缓存刷新(fsync)到底层存储介质,而是依赖操作系统在需要时执行。这意味着服务器的非正常关闭可能导致托管在该节点上的副本出现数据丢失。虽然理论上这开启了数据确认后丢失的可能性,但这种情况在正常操作中发生的几率非常小,并且单节点上的数据丢失通常只会从系统中的其他节点重新复制回来。

如果需要更高的数据安全性,请考虑使用仲裁队列,因为在至少有仲裁数节点既写入将数据刷新到磁盘之前,不会发出任何发布者确认。

对于未通过发布者确认机制确认的消息,不提供任何保证。此类消息可能会在“中途”丢失、在操作系统缓冲区中丢失,或者无法到达流领导者。

流的可用性

流应该能够容忍少数流副本不可用,且对可用性几乎没有影响。

请注意,根据所使用的分区处理策略,RabbitMQ 可能会在恢复期间自行重启并重置节点,但只要这种情况不发生,此可用性保证就应成立。

例如,具有三个副本的流可以在不丢失可用性的情况下容忍一个节点故障。具有五个副本的流可以容忍两个,依此类推。

如果无法恢复节点仲裁(例如,永久丢失 3 个 RabbitMQ 节点中的 2 个),则队列将永久不可用,且很可能需要操作员介入才能恢复。

配置流

有关流协议端口、TLS 和其他配置,请参见流插件指南。有关所需的流复制端口,请参见网络指南

流如何使用资源

流通常比仲裁队列具有更低的 CPU 和内存占用。

所有数据都存储在磁盘上,只有未写入的数据存储在内存中。

由于流是磁盘 I/O 密集型的,它们将大量使用内核页缓存,这对容器化部署中的内存监控有重要影响。

使用流时的偏移量跟踪

当使用代理提供的偏移量跟踪功能时(目前仅在使用流插件时可用),偏移量作为非消息数据持久化存储在流本身中。这意味着当请求偏移量持久化时,流将在磁盘上增加少量空间以存储每个偏移量持久化请求。

发布消息的去重

RabbitMQ 流可以基于两个客户端元素检测并过滤掉重复的已发布消息:生产者名称消息发布 ID

客户端应用程序可以选择为给定的生产者激活去重功能。去重的使用取决于所使用的客户端库,本节仅涵盖基础知识。有关 API 的详细信息,请查阅您所使用的客户端库的文档。RabbitMQ 团队维护的流客户端库默认未激活去重功能。

生产者名称

生产者名称在给定的流中必须是唯一的。在给定的流上,同一时间只能有一个给定名称的生产者,因为去重不支持并发发布(发布 ID 可能会与并发的生产者实例交织在一起)。生产者名称在应用程序重启之间应保持稳定,并对人类读者清晰易懂。像 online-shop-orderonline-shop-invoice 这样的名称比像 3d235e79-047a-46a6-8c80-9d159d3e1b05 这样的随机序列更好。

发布 ID

发布 ID 是一个严格递增的序列。发布应用程序必须为每条出站消息递增它。以下是发布 ID 序列的规则:必须严格递增,序列中可以有间隔(例如 0, 1, 2, 3, 6, 7, 9, 10 等),不必从 0 开始。

去重的工作原理

代理会跟踪给定命名生产者在给定流上发送的最高发布 ID(即“限制”)。代理将过滤掉任何发布 ID 小于或等于此限制的出站消息。但它会发送一个确认,告知生产者该消息已被考虑在内,不应再次发送。

大于当前限制值的消息将被存储、确认,并确立为新的限制。

创建命名生产者时,客户端应用程序可以查询代理获取其最后的发布 ID。然后应用程序可以从中断处恢复发布。

一个关于激活去重的命名生产者如何工作的良好心智模型是:将生产者视为在读取文件的行。每一行都是一条消息,发布 ID 就是行号。

示例

让我们通过一些示例来理解去重的工作方式。在示例中,发布例如“消息 2”意味着发布一条内容任意且使用 2 作为发布 ID 的消息。

发布与重启

客户端应用程序执行以下操作:

  • 声明一个带有名称的生产者以激活去重
  • 发布消息 1、消息 2、消息 3
  • 接收来自代理的每条消息的异步确认
  • 发布消息 4
  • 崩溃

我们假设代理收到了消息 4 并发送了确认,但应用程序在收到确认之前崩溃了。

应用程序重启:

  • 使用相同的名称声明发布者
  • 向代理查询最后的发布 ID
  • 从代理收到发布 ID 4
  • “滚动”到消息 4(参见上面的文件类比)
  • 再次发布消息 4
  • 获得消息 4 的确认(代理过滤了该消息,但发送了确认)
  • 发布消息 5,依此类推。

应用程序本可以直接发布消息 5,因为代理返回了它收到的最后一个发布 ID 4。但应用程序选择了重新发布消息 4,因为它从未收到过它的确认。代理还是过滤掉了消息 4 并发送了确认,告诉应用程序消息 4 已安全存储,可以继续处理下一条消息。

滥用去重

客户端应用程序执行以下操作:

  • 声明一个带有名称的生产者以激活去重
  • 发布消息 1、消息 2、消息 3
  • 接收来自代理的每条消息的异步确认
  • 发布消息 10
  • 收到消息 10 的异步确认
  • 发布消息 4
  • 收到消息 4 的异步确认

应用程序收到了消息 4 的确认,但代理过滤掉了此消息,因为当时的限制是 10。代理只跟踪最高的发布 ID,而不是每个单独的发布 ID。消息 4 的发布 ID 小于当前限制(即由消息 10 设置的 10),所以消息 4 被过滤掉了,即使它之前没有被发布或存储过。

有人可能认为代理在误导应用程序,但实际上是应用程序在使用去重功能时犯了错。它先使用了 10,然后又使用 4 作为发布 ID,违反了严格递增序列的要求。如果使用上面的文件类比,应用程序滚动到了第 10 行,然后又回到了第 4 行。去重功能不涵盖这种情况。

深入了解

有关去重的更多信息,请参阅您偏好的流客户端库文档。

另请参阅去重博客文章以获取分步示例。

限制

消息编码

流在内部将其消息存储为 AMQP 1.0 编码数据。这意味着当使用 AMQP 0.9.1 发布时,会发生转换。尽管 AMQP 1.0 数据模型大多能够容纳所有 AMQP 0.9.1 的数据模型,但仍存在一些限制。如果 AMQP 0.9.1 消息包含具有复杂值(如数组或表)的标题条目,这些标题将不会被转换。这是因为标题作为应用程序属性存储在 AMQP 1.0 消息内部,而这些属性只能包含简单类型的值,如字符串和数字。

UI 指标准确性

管理 UI 显示的消息计数可能略微超过流中的实际计数。由于流存储的实现方式,偏移量跟踪信息也被计算为消息,使得消息计数在人工感知上比实际更大。在大多数系统中,这在实际操作中不会有任何影响。

© . This site is unofficial and not affiliated with VMware.