AMQP 1.0 流控制的十大优势
这篇博文概述了 AMQP 1.0 流控制相对于 AMQP 0.9.1 的十个优势,并提供了两个基准测试,证明了显著的性能提升。此外,我们深入研究了强大的 AMQP 1.0 流控制原语以及它们在 RabbitMQ 中的使用方式。
流控制是管理两个节点之间数据传输速率的过程,以防止快速发送者压垮慢速接收者。
AMQP 1.0 协议在两个不同的级别定义了流控制
链路流控制
在 AMQP 1.0 中,消息通过链路发送。链路连接发送客户端应用程序到 RabbitMQ 中的交换机,或连接 RabbitMQ 中的队列到消费客户端应用程序。
链路信用
AMQP 1.0 链路流控制背后的核心思想很简单:要接收消息,消费者必须向发送队列授予信用。
一个信用对应一条消息。例如,当消费者授予 10 个信用时,RabbitMQ 被允许发送 10 条消息。这个简单的原则,即接收者向发送者提供反馈,确保发送者永远不会压垮接收者。
接收者和发送者都维护自己的“链路状态”。此状态的一部分是当前的链路信用。每次传输消息时,链路信用都会减少 1。具体来说,发送者在发送消息时链路信用减少 1,接收者在接收消息时链路信用减少 1。当发送者的链路信用达到 0 时,它必须停止发送消息。
消息在 transfer 帧中发送。
信用在 flow 帧中授予
<field name="link-credit" type="uint"/>
您可能已经猜到,它们被称为“flow”帧是因为这些帧携带流控制信息。类型 uint 代表无符号整数,一个介于 0 和一个大数(2^32 - 1)之间的值。
即使在链路成功建立(在 AMQP 1.0 术语中为“attached”)之后,在消费者发送其第一个 flow
帧,向发送队列授予链路信用之前,RabbitMQ 也不允许开始向消费者发送消息。
在其最简单的形式中,当客户端(接收者)向队列(发送者)授予一个信用时,队列将发送一条消息,如 AMQP 1.0 规范的 图 2.43:同步获取 所示
Receiver Sender
=================================================================
...
flow(link-credit=1) ---------->
+---- transfer(...)
*block until transfer arrives* /
<---+
...
-----------------------------------------------------------------
一次同步获取一条消息将导致低吞吐量。因此,客户端通常向队列授予多个信用,如 AMQP 1.0 规范的 图 2.45:异步通知 所示
Receiver Sender
=====================================================================
...
<---------- transfer(...)
<---------- transfer(...)
flow(link-credit=delta) ---+ +--- transfer(...)
\ /
x
/ \
<--+ +-->
<---------- transfer(...)
<---------- transfer(...)
flow(link-credit=delta) ---+ +--- transfer(...)
\ /
x
/ \
<--+ +-->
...
---------------------------------------------------------------------
如果接收者授予 N 个信用,并等待所有 N 条消息到达后再授予接下来的 N 个信用,则吞吐量将高于图 2.43 中 N=1
的情况。但是,如果您仔细观察图 2.45,您会发现接收者在接收到所有之前的消息之前授予了更多信用。这种方法可以实现最高的吞吐量。例如,在图 2.45 中,接收者最初可能授予了 6 个信用,然后在每收到 3 条消息时向 RabbitMQ 发送另一个 flow
帧,其中 link-credit = 6
。
授予链路信用不是累积的。
当接收者发送一个 flow
帧,其中 link-credit = N
时,接收者将当前信用设置为 N,而不是增加 N 个信用。例如,如果接收者发送两个 flow
帧,其中 link-credit = 50
,并且在两者之间没有任何消息传输,则接收者将拥有 50 个信用,而不是 100 个。
接收者知道其当前处理能力,因此始终是接收者(而不是发送者)决定当前的链路信用。发送者仅通过发送更多消息来“消耗”接收者授予的链路信用。
接收者可以根据其当前处理能力动态增加或减少链路信用量。
消费客户端应用程序可以动态调整它希望从特定队列接收多少消息。
这是 AMQP 1.0 中链路流控制相对于 AMQP 0.9.1 中消费者预取的一个巨大优势。在 AMQP 0.9.1 中,basic.qos 方法应用于给定 AMQP 0.9.1 通道上的所有消费者。此外,动态更新消费者预取是不可能或不方便的,如 #10174 和 #11955 中讨论的那样。
消费客户端应用程序可以动态地优先处理它希望从同一会话上的多个队列中的哪个队列接收消息。
这是 AMQP 1.0 中链路流控制相对于 AMQP 0.9.1 中消费者预取的另一个优势。一旦 AMQP 0.9.1 客户端在多个队列上调用 basic.consume
,它将持续从所有这些队列接收消息,直到它调用 basic.cancel
。
您可能想知道:链路信用的良好值是多少?客户端应该多久补充一次链路信用?通常情况下,答案是您需要使用不同的值对您的特定工作负载进行基准测试才能找到答案。
与其实现花哨的算法,我建议从简单开始:例如,客户端可以最初授予 200 个链路信用,并在剩余链路信用降至 100 以下时发送一个 link-credit = 200
的 flow。
实际上,这正是 RabbitMQ 反过来所做的:RabbitMQ AMQP 1.0 会话进程最初向发布者授予 170 个链路信用,并在剩余链路信用降至一半以下(即 85)且未确认消息的数量少于 170 时再次授予 170 个链路信用。(在 Broker 内部,即使没有向发布客户端发送确认,发布者确认也始终在 AMQP 1.0 会话进程和目标队列之间启用。这意味着如果目标队列确认速度不够快,RabbitMQ 将停止向发送应用程序授予链路信用。)请注意,这些 RabbitMQ 实现细节可能随时更改。
值 170
可通过 advanced.config 设置 rabbit.max_link_credit
进行配置。
当一个目标队列过载时,发布者可以继续以高速率向所有其他目标队列发布。
应用程序可以在同一 AMQP 1.0 连接或会话上发送到多个队列(通过附加多个链路)。让我们假设一个简单的场景,其中客户端打开两个链路
- 链路 1 发送到经典队列。
- 链路 2 发送到 5 副本仲裁队列。
在确认消息之前,仲裁队列必须已将消息复制到大多数副本,每个副本都 fsync 将消息刷新到其本地磁盘。
相比之下,经典队列不复制消息。此外,当消息被消费并足够快地确认时,经典队列可以(之后)将消息确认回发布者,而无需将其写入磁盘。因此,在这种情况下,经典队列的吞吐量将远高于仲裁队列。
AMQP 1.0 链路流控制的优点在于,RabbitMQ 可以减慢链路 2 上信用的授予速度,同时继续以高频率在链路 1 上授予信用。因此,即使 5 副本仲裁队列处理消息的速度不如(单副本)经典队列快,客户端也可以继续以全速向经典队列发送消息。
下图是从一篇之前的 AMQP 0.9.1 博客文章中复制的

此图中的“信用”一词是指 RabbitMQ 用于 AMQP 0.9.1 连接的内部流控制,与 AMQP 1.0 中的链路信用无关。
上图中的 reader
是从套接字读取 AMQP 0.9.1 帧的 Erlang 进程。该图显示,对于 AMQP 0.9.1 连接,RabbitMQ 将阻止 reader
,导致 TCP 背压应用于客户端。因此,当单个目标队列过载时,RabbitMQ 会限制 AMQP 0.9.1 连接,从而影响向所有其他目标队列的发布。
以下基准测试显示,当连接发送到多个目标队列时,AMQP 1.0 可以提供比 AMQP 0.9.1 高出数倍的吞吐量。
基准测试:两个发送者
为了将我们刚刚讨论的理论付诸实践,two_senders 程序模拟了一个类似的场景。
该程序打开一个 AMQP 0.9.1 连接和通道,以及一个 AMQP 1.0 连接和会话。
在 AMQP 0.9.1 通道和 AMQP 1.0 会话上,都有两个 Goroutine 尽可能快地发布到经典队列和仲裁队列。这总共导致四个目标队列
main.go RabbitMQ
+-------------+ +----------------------------------+
| | AMQP 0.9.1 connection | |
| |#####################################| |
| +---+ |-------------------------------------| +------------------------+ |
| | P | | classic-queue-amqp-091 | |
| +---+ +------------------------+ |
| AMQP 0.9.1 channel |
| +---+ +------------------------+ |
| | P | | quorum-queue-amqp-091 | |
| +---+ |-------------------------------------| +------------------------+ |
| |#####################################| |
| | | |
| | | |
| | | |
| |#####################################| |
| +---+ |-------------------------------------| +-----------------------+ |
| | P |O============================================>O| classic-queue-amqp-10 | |
| +---+ +-----------------------+ |
| AMQP 1.0 session |
| +---+ +-----------------------+ |
| | P |O======================================+=====>O| quorum-queue-amqp-10 | |
| +-+-+ |----------------------------------|--| +-----------------------+ |
| | |##################################|##| |
| | | AMQP 1.0 connection | | |
+------|------+ | +----------------------------------+
| |
| |
Publisher AMQP 1.0 link
goroutine
按如下方式运行基准测试
- 在 Ubuntu 机器上使用
make run-broker
启动 RabbitMQ 服务器 v4.0.0-beta.6。(在 macOS 上,fsync
不会物理写入磁盘)。 - 使用
go run two_senders/main.go
执行 Go 程序。10 秒后,Go 程序将完成。 - 列出每个队列中的消息数
./sbin/rabbitmqctl --silent list_queues name type messages --formatter=pretty_table
┌────────────────────────┬─────────┬──────────┐
│ name │ type │ messages │
├────────────────────────┼─────────┼──────────┤
│ classic-queue-amqp-091 │ classic │ 159077 │
├────────────────────────┼─────────┼──────────┤
│ quorum-queue-amqp-091 │ quorum │ 155782 │
├────────────────────────┼─────────┼──────────┤
│ classic-queue-amqp-10 │ classic │ 1089075 │
├────────────────────────┼─────────┼──────────┤
│ quorum-queue-amqp-10 │ quorum │ 148580 │
└────────────────────────┴─────────┴──────────┘
正如 AMQP 1.0 基准测试 博客文章中所解释的那样,仲裁队列 fsync(准确地说是 fdatasync
),而经典队列则不这样做。因此,即使没有复制,仲裁队列也比经典队列慢得多,因为我使用的是消费级磁盘,每个 fsync
至少需要 5 毫秒。对于生产集群,建议使用高端企业级磁盘,其 fsync
速度更快。
结果表明,单个 AMQP 0.9.1 连接向目标经典队列和目标仲裁队列发送的消息数量大致相同。这是因为在我的基准测试中,quorum-queue-amqp-091
导致整个 AMQP 0.9.1 连接每秒大约被阻塞(和解除阻塞)80 次。因此,单个 AMQP 0.9.1 连接上向多个目标队列(classic-queue-amqp-091
和 quorum-queue-amqp-091
)的发布速率受到最慢的目标队列(quorum-queue-amqp-091
)的限制。总而言之,AMQP 0.9.1 连接发送了 159,077 + 155,782 = 314,859
条消息。
相比之下,由于链路流控制,RabbitMQ 仅限制了到 quorum-queue-amqp-10
目标的链路,从而允许 AMQP 1.0 客户端继续以全速发布到 classic-queue-amqp-10
目标。总而言之,AMQP 1.0 连接发送了 1,089,075 + 148,580 = 1,237,655
条消息。
因此,在我们的简单基准测试中,AMQP 1.0 的总发送吞吐量是 AMQP 0.9.1 的四倍(!)。
当一个目标队列过载时,客户端可以继续以高速率从所有源队列消费。因此,AMQP 1.0 客户端可以使用单个连接进行高吞吐量的发布和消费。
优势 #3 描述了单个过载的目标队列如何导致 RabbitMQ 阻止读取器进程读取任何 AMQP 0.9.1 帧。这意味着客户端不仅无法再发布消息,而且也无法消费消息。这是因为客户端的消息确认不再被 RabbitMQ 处理,一旦达到其预取限制,就会阻止向消费者传递新消息。
虽然这种消费限制是暂时的(AMQP 0.9.1 reader
进程每秒被阻塞和解除阻塞多次),但它会显著降低消费速率。
RabbitMQ AMQP 0.9.1 文档建议
因此,建议尽可能让发布者和消费者使用单独的连接,以便消费者与可能应用于发布连接的潜在流控制隔离,从而影响手动消费者确认。
这导致了整个 AMQP 0.9.1 客户端库生态系统采用这种“最佳实践”,即为发布和消费使用单独的连接。例如,RabbitMQ AMQP 0.9.1 C++ 库 声明
发布和消费发生在不同的连接上
一个常见的应用程序缺陷是在同一连接上消费和生产。这可能会导致消费速率降低,因为 RabbitMQ 会对快速发布者施加背压 - 具体取决于要消费/发布的队列,这可能会导致恶性循环。
相比之下,AMQP 1.0 链路流控制允许仅减慢客户端应用程序中的链路发送者。所有其他链路(无论是发送还是消费)都可以继续以全速运行。
因此,在 AMQP 1.0 中,客户端可以使用单个连接进行发布和消费。
基准测试:一个发送者,一个接收者
程序 one_sender_one_receiver 模拟了一个场景,其中客户端打开两个链路
- 链路 1 从经典队列接收。
- 链路 2 发送到仲裁队列。
该程序打开一个 AMQP 0.9.1 连接和通道,以及一个 AMQP 1.0 连接和会话。
为了准备基准测试,该程序将一百万条消息写入每个经典队列。
在 AMQP 0.9.1 通道和 AMQP 1.0 会话上,都有两个 Goroutine
- 一个 Goroutine(链路 1)以 200 的预取从经典队列接收消息并确认每条消息。
- 一个 Goroutine(链路 2)以 10,000 条消息的批次发布到仲裁队列。(收到所有 10,000 次确认后,发布下一批)。
main.go RabbitMQ
+-------------+ +----------------------------------+
| | AMQP 0.9.1 connection | |
| |#####################################| |
| +---+ |-------------------------------------| +------------------------+ |
| | C | | classic-queue-amqp-091 | |
| +---+ +------------------------+ |
| AMQP 0.9.1 channel |
| +---+ +------------------------+ |
| | P | | quorum-queue-amqp-091 | |
| +---+ |-------------------------------------| +------------------------+ |
| |#####################################| |
| | | |
| | | |
| | | |
| |#####################################| |
| +---+ |-------------------------------------| +-----------------------+ |
| | C |O<============================================O| classic-queue-amqp-10 | |
| +---+ +-----------------------+ |
| AMQP 1.0 session |
| +---+ +-----------------------+ |
| | P |O======================================+=====>O| quorum-queue-amqp-10 | |
| +-+-+ |----------------------------------|--| +-----------------------+ |
| | |##################################|##| |
| | | AMQP 1.0 connection | | |
+------|------+ | +----------------------------------+
| |
| |
Publisher or Consumer AMQP 1.0 link
goroutine
按如下方式运行基准测试
- 在 Ubuntu 机器上使用
make run-broker
启动 RabbitMQ 服务器 v4.0.0-beta.6。 - 使用
go run one_sender_one_receiver/main.go
执行 Go 程序 - 程序完成后,列出每个队列中的消息数
./sbin/rabbitmqctl --silent list_queues name type messages --formatter=pretty_table
┌────────────────────────┬─────────┬──────────┐
│ name │ type │ messages │
├────────────────────────┼─────────┼──────────┤
│ classic-queue-amqp-091 │ classic │ 990932 │
├────────────────────────┼─────────┼──────────┤
│ quorum-queue-amqp-091 │ quorum │ 172800 │
├────────────────────────┼─────────┼──────────┤
│ classic-queue-amqp-10 │ classic │ 336229 │
├────────────────────────┼─────────┼──────────┤
│ quorum-queue-amqp-10 │ quorum │ 130000 │
└────────────────────────┴─────────┴──────────┘
虽然 AMQP 0.9.1 客户端仅消费了 1,000,000 - 990,932 = 9,068
条消息,但 AMQP 1.0 客户端消费了 1,000,000 - 336,229 = 663,771
条消息。
因此,在此基准测试中,AMQP 1.0 客户端接收的消息比 AMQP 0.9.1 客户端多 73 倍(!)。
在 AMQP 0.9.1 中,消费者预取限制了未确认消息的数量。当消费者通过发送 basic.ack
帧来确认消息时,RabbitMQ 会传递更多消息。
在 AMQP 1.0 中,消息确认独立于链路流控制。消费者通过发送 disposition 帧来确认消息不会提示 RabbitMQ 传递更多消息。相反,客户端必须通过发送 flow
帧来补充链路信用,以便 RabbitMQ 继续发送消息。为了方便起见,某些 AMQP 1.0 客户端库在您的应用程序确认消息时自动发送 disposition
和 flow
帧。
交付计数
到目前为止,我们只了解了 flow 帧的一个字段:link-credit
。
在以下情况下会发生什么?
Receiver Sender
=======================================================================
...
link state: link state:
link-credit = 3 link-credit = 3
flow(link-credit = 6) ---+ +--- transfer(...)
\ /
x
/ \
<--+ +-->
link state: link state:
link-credit = 5 link-credit = 6
最初,link-credit
为 3。接收者决定发送一个 flow
帧,将新的 link-credit
设置为 6。同时,发送者发送一个 transfer
帧。
由于接收者在发送 flow
帧之后接收到 transfer
帧,因此它将计算其新的链路信用为 6 - 1 = 5
。但是,由于发送者在发送 transfer
帧之后接收到 flow
帧,因此它将信用设置为 6。结果,状态 - 以及链路剩余多少信用的视图 - 变得错位。这是有问题的,因为发送者可能会溢出接收者。
为了防止此类错位,在链路状态和 flow
帧中都需要第二个字段
<field name="delivery-count" type="sequence-no"/>
每次传输消息时,交付计数都会增加 1。具体来说,发送者在发送消息时递增交付计数,接收者在接收消息时递增交付计数。
当发送者接收到 flow
帧(其中包含链路信用和交付计数)时,发送者根据此 公式 设置其链路信用
link-credit(snd) := delivery-count(rcv) + link-credit(rcv) - delivery-count(snd).
(snd)
指的是发送者的链路状态,而接收者的链路状态 (rcv)
在 flow
帧中发送给发送者。
在发送者处,此公式表示:“将新的链路信用设置为我刚刚在 flow 帧中收到的链路信用减去任何正在传输的交付。”
交付计数的目的是在事件序列中建立顺序,这些事件是
- 发送者发送消息
- 接收者接收消息
- 接收者授予链路信用
- 发送者计算接收者的链路信用
使用交付计数可以解决我们之前讨论的错位问题。
让我们假设交付计数最初设置为 20
Receiver Sender
========================================================================================
...
link state: link state:
delivery-count = 20 delivery-count = 20
link-credit = 3 link-credit = 3
flow(delivery-count = 20,
link-credit = 6) ---+ +--- transfer(...)
\ /
x
/ \
<--+ +-->
link state: link state:
delivery-count = 21 delivery-count = 21
link-credit = 5 link-credit = 20+6-21 = 5 (above formula)
一些 AMQP 1.0 字段,包括交付计数,类型为 sequence-no。这些是 32 位 RFC-1982 序列号,范围从 [0 .. 4,294,967,295]
并环绕:将 1 加到 4,294,967,295 会得到 0。
交付计数由发送者初始化,发送者在 attach 帧的 initial-delivery-count
字段中发送其选择的值。
发送者可以将交付计数初始化为它选择的任何值,例如 0、10 或 4,294,967,295。除了我们之前讨论的目的之外,此值没有内在含义:将接收者的交付计数与发送者的交付计数进行比较,以确定有多少消息正在传输中。
据我所知,AMQP 1.0 链路流控制似乎基于 1995 年的论文 Credit-Based Flow Control for ATM Networks。
公式
link-credit(snd) := delivery-count(rcv) + link-credit(rcv) - delivery-count(snd).
来自 AMQP 1.0 规范的 2.6.7 节与公式匹配
Crd_Bal = Buf_Alloc - (Tx_Cnt - Fwd_Cnt)(1)
在论文中。
AMQP 1.0 规范甚至采用了论文中类似的术语,例如“链路”和“节点”。此外,规范可能指的是“交付计数”,因为它在论文中也被称为“计数”(即使规范澄清说在 AMQP 中它实际上不是计数,而是序列号)。
论文中的图 2 很好地说明了这个概念

在此图中
Tx_Cnt
对应于delivery-count(snd)
Fwd_Cnt
对应于delivery-count(rcv)
Crd_Bal
对应于link-credit(snd)
Buf_Alloc
对应于link-credit(rcv)
该论文详细解释了接收者应该多久以及以多大的幅度补充链路信用。如果您想成为 AMQP 1.0 流控制方面的专家,我建议您阅读 AMQP 1.0 规范和该论文。
Drain
在理解了 flow 帧的两个最重要的链路控制字段(link-credit
和 delivery-count
)之后,让我们继续讨论 drain
字段
<field name="drain" type="boolean" default="false"/>
默认情况下,drain
字段设置为 false
。接收者决定是否启用 Drain 模式,发送者将 drain
设置为接收者的最后一个已知值。
Drain 意味着发送者应通过发送可用消息来使用接收者的所有链路信用。如果消息不足以发送,发送者仍然必须按如下方式耗尽所有链路信用
- 将交付计数提前剩余的链路信用。
- 将链路信用设置为 0。
- 向接收者发送一个
flow
帧。
通过将 drain
字段设置为 true
,消费者请求 RabbitMQ “发送一个 transfer 或一个 flow 帧。” 如果源队列为空,RabbitMQ 将立即仅回复一个 flow
帧。
因此,drain
字段允许消费者在同步获取消息时设置超时,如 AMQP 1.0 规范的 图 2.44:具有超时的同步获取 所示
Receiver Sender
=================================================================
...
flow(link-credit=1) ---------->
*wait for link-credit <= 0*
flow(drain=True) ---+ +--- transfer(...)
\ /
x
/ \
(1) <--+ +-->
(2) <---------- flow(...)
...
-----------------------------------------------------------------
(1) If a message is available within the timeout, it will
arrive at this point.
(2) If a message is not available within the timeout, the
drain flag will ensure that the sender promptly advances the
delivery-count until link-credit is consumed.
由于链路信用消耗很快,消费者可以明确地确定是否收到了消息,或者操作是否超时。
Echo
与 drain
字段类似,echo
字段是
- 默认设置为
false
- 由消费者决定,因为 RabbitMQ 在其当前实现中未设置此字段
<field name="echo" type="boolean" default="false"/>
消费者可以设置 echo
字段以请求 RabbitMQ 回复一个 flow
帧。一个用例在 AMQP 1.0 规范的 图 2.46:停止传入消息 中描述
Receiver Sender
================================================================
...
<---------- transfer(...)
flow(..., ---+ +--- transfer(...)
link-credit=0, \ /
echo=True) x
/ \
(1) <--+ +-->
(2) <---------- flow(...)
...
----------------------------------------------------------------
(1) In-flight transfers can still arrive until the flow state
is updated at the sender.
(2) At this point no further transfers will arrive.
在 AMQP 1.0 中,消费者可以被停止/暂停,并在稍后恢复。
在 AMQP 0.9.1 中,消费者无法暂停和恢复。相反,消费者必须在使用 basic.consume
注册新消费者之前使用 basic.cancel
方法取消。
AMQP 1.0 允许从一个单一活动消费者到下一个消费者的平滑切换,同时保持消息顺序。
在 AMQP 1.0 中,消费者可以先停止链接并确认消息,然后再分离链接(首选方式),或者直接分离链接。直接分离链接将会重新排队任何未确认的消息。无论哪种方式,分离链接都会导致下一个消费者被激活,并按原始顺序接收消息。
相比之下,AMQP 0.9.1 中的单个活跃消费者无法优雅且安全地移交给下一个消费者。当 AMQP 0.9.1 消费者通过 basic.cancel
取消消费但仍有未确认的消息时,队列将激活下一个消费者。如果 AMQP 0.9.1 客户端在此后不久崩溃,则签出给旧消费者的消息将被重新排队,这可能会违反消息顺序。为了保持消息顺序,AMQP 0.9.1 客户端必须关闭整个通道(不先调用 basic.cancel
),以便在下一个消费者激活之前重新排队消息。
可用
RabbitMQ 在 flow 帧中设置 available
字段,以告知消费者有多少消息可用
<field name="available" type="uint"/>
available
值只是一个近似值,因为从队列发出此信息到 flow
帧到达消费者之间存在时间差,在此期间信息可能已过时,例如当其他客户端发布消息到此队列或从此队列消费消息时。
对于经典队列和仲裁队列,available
表示准备好交付的消息数量(即队列长度,不包括已签出给消费者的消息)。
对于流,available
表示已提交偏移量与上次消费的偏移量之间的差值。粗略地说,已提交偏移量是 RabbitMQ 集群中不同的流副本一致认为是流的结尾的位置。上次消费的偏移量可能与已提交偏移量相同或落后于它。available
值仅是一个估计值,因为流偏移量不一定代表一条消息。
如果启用了单活动消费者功能,仲裁队列将为所有非活动(等待中)消费者返回 available = 0
。这是有道理的,因为无论非活动消费者有多少信用额度,都没有消息可供他们使用。
在上一节中,我们了解到消费者设置 echo
字段的一个用例是停止链接。另一个用例是当消费者想要了解其消费的队列中可用的消息数量时。
AMQP 1.0 通知消费者队列中可用的消息数量。
在 RabbitMQ 发送到消费者的每个 flow
帧中包含此信息是优于 AMQP 0.9.1 的一个优势。在 AMQP 0.9.1 中,查询可用消息的方法更加繁琐且效率较低
- 使用
passive=true
的queue.declare
:queue.declare_ok
回复将包含message_count
字段。 basic.get
:basic.get_ok
回复将包含message_count
字段。
available
字段也可以由发布者设置,以告知 RabbitMQ 发布者有多少消息可发送。目前,RabbitMQ 忽略此信息。
属性
flow 帧的 properties
字段可以携带应用程序特定的链接状态属性
<field name="properties" type="fields"/>
目前,RabbitMQ 没有使用此字段。
AMQP 1.0 链接流控制是可扩展的。
假设 RabbitMQ 想要使发布者能够将消息直接发送到仲裁队列领导者。由于所有 Ra 命令,包括将消息入队,都必须首先经过领导者,因此客户端直接连接到托管仲裁队列领导者的 RabbitMQ 节点是有意义的。这种“队列局部性”将减少 RabbitMQ 集群内流量,从而提高延迟和吞吐量。当领导者更改时,RabbitMQ 可以通过 properties
字段向发布者推送领导者更改通知(包括托管仲裁队列领导者的新 RabbitMQ 节点),而不是导致 RabbitMQ 分离链接 - 这可能会对发布应用程序造成破坏。然后,应用程序可以决定何时方便地分离链接并在不同的连接上附加新链接,以继续“本地”发布。
或者,RabbitMQ 可以在每个 flow
帧的 properties
字段中为键 local
发送一个布尔值,以指示链接当前是否正在本地发布或消费。如果 local
字段从 true
翻转为 false
,客户端可以通过 HTTP over AMQP 1.0 查询新的队列拓扑和领导者。
这些只是假设性的示例,展示了 RabbitMQ 未来如何利用链接流控制的可扩展性,这是优于 AMQP 0.9.1 的一个优势。
总结
我们了解到 AMQP 1.0 链接流控制可以保护单个消费者或队列免受消息过载的影响。接收方定期向发送方提供反馈,说明当前可以处理多少消息。
每个链接端点(发送方和接收方)都维护完整的链接流控制状态,并通过 flow 帧与另一方交换此状态。
以下链接流控制字段由接收方独立确定
link-credit
drain
以下链接流控制字段由发送方独立确定
delivery-count
available
会话流控制
在深入研究 会话流控制之前,让我们回顾一下会话实际上是什么。
会话
客户端库为每个 AMQP 连接创建一个 TCP 连接。AMQP 1.0 连接是使用 open 帧建立的。
在 AMQP 1.0 连接中,客户端可以启动多个 AMQP 1.0 会话。这与 AMQP 0.9.1 客户端如何在 AMQP 0.9.1 连接中打开多个 AMQP 0.9.1 通道是类似的。AMQP 1.0 会话是使用 begin 帧启动的。
Client App RabbitMQ
+-------------+ +-------------+
| |################################| |
| +---+ |--------------------------------| +---+ |
| | C |O<===============================+========O| Q | |
| +-+-+ \ |-------------------+-------|----| |+-+-+ |
| | \ |#######+###########|#######|####| | | |
+-----|-----\-+ | | | +---|--|------+
| \ | | | | |
| Target | | | Source |
| | | | |
Consumer | | Link Queue
| Session
Connection
客户端应用程序可以创建,例如
- 具有单个会话的单个连接,或
- 具有多个会话的单个连接,或
- 每个连接具有一个或多个会话的多个连接
通常,具有单个会话的单个连接就足够了。
打开 AMQP 连接会带来一些开销:必须建立 TCP 连接,在客户端和服务器中分配操作系统资源(如套接字和 TCP 缓冲区),并产生 TCP 或 TLS 握手的延迟。此外,在 RabbitMQ 节点上,会为每个传入的 AMQP 连接创建一个监督树。
会话可以被认为是“轻量级”连接。目前,每个 AMQP 1.0 会话都实现为其自己的Erlang 进程。因此,如果连接已建立,则创建会话的成本不高。
在以下情况下,在 AMQP 连接中创建第二个会话可能很有用
- 快速且廉价地创建另一个“虚拟连接”,而不会产生上述 AMQP 连接设置开销。
- 确保高优先级 transfer 帧(需要低延迟)不会被其他 transfer 帧阻塞。
- 当 RabbitMQ 会话进程变得非常繁忙时(例如,将消息路由到队列),提高并行性。
流字段
会话提供了一种基于传输的 transfer 帧数量的流控制方案。由于帧对于给定的连接具有最大大小,因此这提供了基于传输的字节数的流控制。
请记住,大型消息被拆分为多个 transfer 帧。
AMQP 1.0 中的会话流控制比链接流控制在更高的层级上运行。虽然链接流控制保护单个消费者和队列,但会话流控制旨在保护整个客户端应用程序和整个 RabbitMQ。
正如每个链接端点维护链接流控制状态并通过 flow
帧交换此状态一样,每个会话端点也维护会话流控制状态,并在相同的 flow
帧中交换此状态。flow 帧中的其余字段因此用于会话流控制
<field name="next-incoming-id" type="transfer-number"/>
<field name="incoming-window" type="uint" mandatory="true"/>
<field name="next-outgoing-id" type="transfer-number" mandatory="true"/>
<field name="outgoing-window" type="uint" mandatory="true"/>
incoming-window
类似于link-credit 字段,接收方告知发送方它可以容忍接收多少“单元”。不同之处在于,对于 link-credit
,单位是潜在的大型应用程序消息,而对于 incoming-window
,单位是 transfer
帧。
为了提供一个极端的例子以便更好地理解:如果连接上协商的 max-frame-size
非常低,并且单个消息非常大,如果接收方授予发送方一个链接信用,则发送方仍然可能因会话流控制而无法完整发送此消息。
next-incoming-id
和 next-outgoing-id
是序列号,其作用与链接流控制中的 delivery-count 相同:它们确保当 transfer
帧与 flow
帧并行发送时,窗口在另一侧被正确计算。会话流控制需要两个序列号,而不是链接流控制的一个序列号,因为会话是双向的,而链接是单向的。
最初,RabbitMQ 允许发布者发送 400 个 transfer 帧。每当 RabbitMQ 会话进程处理了该数量的一半(200 个 transfer 帧)时,RabbitMQ 都会通过向发布者发送包含 incoming_window = 400
的 flow
帧来扩展此窗口。
400
的值可以通过 advanced.config 设置 rabbit.max_incoming_window
进行配置。
RabbitMQ 警报
此规则的唯一例外是触发内存或磁盘警报时。为了保护 RabbitMQ 免于耗尽内存或磁盘空间,每个会话都会通过向发布者发送 incoming_window = 0
的 flow
帧来关闭其传入窗口,从而有效地阻止他们发送任何进一步的 transfer
帧。
如果发生集群范围的内存或磁盘警报,RabbitMQ 将仅阻止 AMQP 1.0 客户端发布 transfer
帧。其他操作,例如 AMQP 1.0 客户端在同一会话上消费或创建仅用于消费以清空 RabbitMQ 中的队列的新连接(从而减少内存和磁盘使用量),仍将被允许。
在 AMQP 0.9.1 中,RabbitMQ 将完全阻止从连接套接字读取,并阻止打开新连接,直到警报清除。AMQP 0.9.1 客户端必须使用单独的连接进行发布和消费,以便在警报期间继续消费。
如优势 #4 和 #9 中所述,由于客户端可以安全有效地使用单个 AMQP 1.0 连接进行发布和消费,因此降低了 RabbitMQ 中的内存使用量。
与 AMQP 1.0 相比,AMQP 0.9.1 实际上需要两倍的连接数。AMQP 1.0 基准测试提供了关于可以节省多少内存的见解。
Incoming-Window
AMQP 1.0 流控制的缺点是其复杂性。虽然链接流控制和会话流控制背后的思想和动机是合理的,但在一个层(链接流控制)之上实现另一层(会话流控制)在每种情况下都难以正确且高效地实现。请考虑客户端可以
- 独立修改会话流控制和链接流控制(在同一个
flow
帧中或在不同的flow
帧中)。 - 随时以不同的频率发送
flow
帧。 - 动态增加或减少会话窗口或链接信用。
- 授予 0 链接信用(导致队列停止发送)或大量的链接信用(高达 40 亿)。
- 关闭其会话
incoming-window
(导致服务器会话停止发送)或广泛打开它(高达 40 亿)。 - 停止从其套接字读取,对服务器应用 TCP 反压。
- 在组合中添加其他特殊逻辑,例如
drain=true
。
在 AMQP 1.0 程序中,并发是可取的。根据编程语言,客户端或 broker 的不同部分由不同的线程或进程实现。
RabbitMQ 通过在不同的 Erlang 进程中运行连接读取器进程(解析帧)、连接写入器进程(序列化帧)、会话进程(例如,路由消息)和 队列进程(存储和转发消息)来实现并发。
在客户端向发送队列授予大量 link-credit
但保持非常小的会话 incoming-window
的情况下,队列进程被允许传递消息,而会话进程不允许将它们转发到客户端。在这种情况下,消息在会话进程中缓冲,直到客户端的 incoming-window
允许它们被转发。
为了防止这种情况 - 大量消息可能在会话进程中缓冲 - RabbitMQ 在其当前实现中,在会话进程和队列进程之间内部批量授予链接信用,每次最多 256 个链接信用。即使客户端授予大量的链接信用,对于给定的消费者,队列也只会看到最多 256 个信用。一旦服务器会话进程发送出这 256 条消息,服务器会话进程将向队列授予下一批 256 个信用。
256
的值可以通过 advanced.config 设置 rabbit.max_queue_credit
进行配置。
总结
这篇博文解释了 AMQP 1.0 链接流控制和会话流控制的工作原理。
我们了解了 RabbitMQ 内的各个进程如何保护自己免受过载
- 队列进程受到链接流控制的保护。
- 会话进程和整个 RabbitMQ 受到会话流控制的保护。
- 当连接读取器进程无法足够快地读取帧时(这种情况应该很少见),通过应用 TCP 反压来保护连接读取器进程。
这篇博文还重点介绍了 AMQP 1.0 中流控制相对于 AMQP 0.9.1 中流控制的十个优势。主要优势包括
- 对消费者的细粒度控制,允许他们随时确定他们想要从特定队列消费的确切消息数量。
- 当目标队列达到其限制时,在单个 AMQP 连接上发布和消费的更高吞吐量。我们运行了两个基准测试
- 当发送到两个不同的目标队列时,AMQP 1.0 的总发送吞吐量是 AMQP 0.9.1 的四倍。
- 在极端情况下,当从一个队列接收消息而向另一个队列发送消息速度过快时,AMQP 1.0 的消费率是 AMQP 0.9.1 的 73 倍。
- 安全有效地使用单个连接进行发布和消费。
- 消费者随时停止和恢复的能力。
- 未来用例的可扩展性。