AMQP 1.0 流量控制的十个优势
这篇博文概述了 AMQP 1.0 流量控制相对于 AMQP 0.9.1 的十个优势,并通过两个基准测试证明了显著的性能提升。此外,我们还深入探讨了功能强大的 AMQP 1.0 流量控制原语及其在 RabbitMQ 中的用法。
流量控制 是管理两个节点之间数据传输速率的过程,以防止快速发送方压垮缓慢接收方。
AMQP 1.0 协议在两个不同的层面上定义了流量控制
链路流量控制
在 AMQP 1.0 中,消息通过一个链路 发送。一个链路连接发送方客户端应用程序到 RabbitMQ 中的交换机,或连接 RabbitMQ 中的队列 到接收方客户端应用程序。
链路信用
AMQP 1.0 链路流量控制背后的核心思想很简单:**要接收消息,消费方必须向发送队列授予信用。**
一个信用对应一条消息。例如,当消费方授予 10 个信用时,RabbitMQ 允许发送 10 条消息。这个简单的原则,即接收方向发送方提供**反馈**,确保发送方永远不会压垮接收方。
接收方和发送方都维护着自己“链路状态”。这种状态的一部分是当前链路信用。每次传输消息时,链路信用就会减少 1。具体来说,发送方在发送消息时减少链路信用 1,接收方在接收消息时减少链路信用 1。当发送方的链路信用达到 0 时,它必须停止发送消息。
消息在传输 帧中发送。
信用在流量 帧中授予
<field name="link-credit" type="uint"/>
正如你可能已经猜到的那样,它们被称为“流量”帧,因为这些帧承载着流量控制信息。类型uint 代表无符号整数,一个介于 0 和一个大数(2^32 - 1)之间的值。
即使链路成功建立(在 AMQP 1.0 术语中是“附加”),RabbitMQ 也不允许在消费方发送第一个 flow
帧之前开始向消费方发送消息,这个 flow
帧会向发送队列授予链路信用。
在最简单的情况下,当客户端(接收方)向队列(发送方)授予单个信用时,队列将发送一条消息,如 AMQP 1.0 规范的图 2.43:同步获取 中所示。
Receiver Sender
=================================================================
...
flow(link-credit=1) ---------->
+---- transfer(...)
*block until transfer arrives* /
<---+
...
-----------------------------------------------------------------
同步地每次获取一条消息将导致低吞吐量。因此,客户端通常会向队列授予多个信用,如 AMQP 1.0 规范的图 2.45:异步通知 中所示。
Receiver Sender
=====================================================================
...
<---------- transfer(...)
<---------- transfer(...)
flow(link-credit=delta) ---+ +--- transfer(...)
\ /
x
/ \
<--+ +-->
<---------- transfer(...)
<---------- transfer(...)
flow(link-credit=delta) ---+ +--- transfer(...)
\ /
x
/ \
<--+ +-->
...
---------------------------------------------------------------------
如果接收方授予 N 个信用,并在等待接收所有 N 条消息后再授予接下来的 N 个信用,那么与图 2.43 中的 N=1
相比,吞吐量会更高。但是,如果你仔细观察图 2.45,你会发现接收方在接收完所有之前消息之前授予了更多的信用。这种方法可以实现最高吞吐量。例如,在图 2.45 中,接收方可能最初授予了 6 个信用,然后每当它收到 3 条消息时,它就会向 RabbitMQ 发送另一个 link-credit = 6
的 flow
帧。
授予链路信用不是累积的。
当接收方发送一个 link-credit = N
的 flow
帧时,接收方会将当前信用**设置为** N,而不是增加 N 个信用。例如,如果接收方发送两个 link-credit = 50
的 flow
帧,期间没有任何消息被传输,那么接收方将拥有 50 个信用,而不是 100 个。
接收方知道自己的当前处理能力,因此始终是接收方(而不是发送方)决定当前链路信用。发送方只通过发送更多消息来“消耗”接收方授予的链路信用。
接收方可以根据自己的当前处理能力动态地增加或减少链路信用的数量。
消费方客户端应用程序可以动态调整它希望从特定队列接收多少消息。
这是 AMQP 1.0 中的链路流量控制相对于 AMQP 0.9.1 中的消费方预取 的一个巨大优势。在 AMQP 0.9.1 中,basic.qos 方法适用于给定AMQP 0.9.1 通道 上的**所有**消费方。此外,动态更新消费方预取是不可能或不方便的,如#10174 和#11955 中所述。
消费方客户端应用程序可以动态优先考虑它希望从中接收消息的多个队列中的哪个队列。
这是 AMQP 1.0 中的链路流量控制相对于 AMQP 0.9.1 中的消费方预取 的另一个优势。一旦 AMQP 0.9.1 客户端对多个队列调用 basic.consume
,它将持续地从所有这些队列接收消息,直到它调用 basic.cancel
。
你可能想知道:链路信用的好值是多少,客户端应该多久补充一次链路信用?正如通常情况一样,答案是你需要用不同的值对你的特定工作负载进行基准测试,以找出答案。
我建议从简单的地方开始,而不是实现花哨的算法:例如,客户端可以最初授予 200 个链路信用,并在剩余链路信用低于 100 时发送一个 link-credit = 200
的流量。
事实上,这就是 RabbitMQ 的反向做法:RabbitMQ AMQP 1.0 会话进程 最初向发布方授予了170 个链路信用,并在剩余链路信用低于一半(即 85)**并且**未确认消息的数量少于 170 时再次授予 170 个链路信用。(在代理内部,发布方确认始终在 AMQP 1.0 会话进程和目标队列之间启用,即使没有确认发送到发布方客户端。这意味着如果目标队列没有足够快地确认,RabbitMQ 将停止向发送应用程序授予链路信用。)请注意,这些 RabbitMQ 实现细节可能会随时改变。
170
的值可以通过advanced.config 设置 rabbit.max_link_credit
来配置。
当一个目标队列过载时,发布方可以继续高速向所有其他目标队列发布消息。
应用程序可以向同一个 AMQP 1.0 连接或会话上的多个队列发送消息(通过附加多个链路)。假设一个简单的场景,客户端打开两个链路
- 链路 1 向一个经典队列发送消息。
- 链路 2 向一个 5 副本的仲裁队列发送消息。
在确认一条消息之前,仲裁队列必须将消息复制到大多数副本,每个副本都fsync 消息到其本地磁盘。
相反,经典队列不会复制消息。此外,当一条消息被消费并及时确认时,经典队列可以在(此后)将消息确认回发布方,而无需将消息写入磁盘。因此,在这种情况下,经典队列的吞吐量将远远高于仲裁队列。
AMQP 1.0 链接流量控制的优势在于,RabbitMQ 可以减慢对链接 2 的信用授权速度,同时继续以高频率对链接 1 授权信用。因此,即使当 5 个副本的仲裁队列没有像(单副本)经典队列那样快地处理消息时,客户端也可以继续以全速向经典队列发送消息。
以下图片来自 之前 的 AMQP 0.9.1 博客文章
图片中的“信用”指的是 RabbitMQ 用于 AMQP 0.9.1 连接的内部流量控制,与 AMQP 1.0 中的链接信用无关。
上图中的 reader
是从套接字读取 AMQP 0.9.1 帧的 Erlang 进程。图片显示,对于 AMQP 0.9.1 连接,RabbitMQ 会阻塞 reader
,导致 TCP 反压被应用到客户端。因此,当一个目标队列过载时,RabbitMQ 会限制 AMQP 0.9.1 连接,影响对所有其他目标队列的发布。
以下基准测试显示,当一个连接发送到多个目标队列时,AMQP 1.0 与 AMQP 0.9.1 相比,可以提供高出数倍的吞吐量。
基准测试:两个发送者
为了将我们刚刚讨论的理论付诸实践,two_senders 程序模拟了类似场景。
该程序打开一个 AMQP 0.9.1 连接和通道,以及一个 AMQP 1.0 连接和会话。
在 AMQP 0.9.1 通道和 AMQP 1.0 会话上,都有两个协程尽可能快地发布到经典队列和仲裁队列中。这总共导致了四个目标队列。
main.go RabbitMQ
+-------------+ +----------------------------------+
| | AMQP 0.9.1 connection | |
| |#####################################| |
| +---+ |-------------------------------------| +------------------------+ |
| | P | | classic-queue-amqp-091 | |
| +---+ +------------------------+ |
| AMQP 0.9.1 channel |
| +---+ +------------------------+ |
| | P | | quorum-queue-amqp-091 | |
| +---+ |-------------------------------------| +------------------------+ |
| |#####################################| |
| | | |
| | | |
| | | |
| |#####################################| |
| +---+ |-------------------------------------| +-----------------------+ |
| | P |O============================================>O| classic-queue-amqp-10 | |
| +---+ +-----------------------+ |
| AMQP 1.0 session |
| +---+ +-----------------------+ |
| | P |O======================================+=====>O| quorum-queue-amqp-10 | |
| +-+-+ |----------------------------------|--| +-----------------------+ |
| | |##################################|##| |
| | | AMQP 1.0 connection | | |
+------|------+ | +----------------------------------+
| |
| |
Publisher AMQP 1.0 link
goroutine
按照以下步骤运行基准测试
- 使用 Ubuntu 系统上的
make run-broker
启动 RabbitMQ 服务器 v4.0.0-beta.6。(在 macOS 上,fsync
不会物理写入磁盘。) - 使用
go run two_senders/main.go
执行 Go 程序。10 秒后,Go 程序将完成。 - 列出每个队列中的消息数量
./sbin/rabbitmqctl --silent list_queues name type messages --formatter=pretty_table
┌────────────────────────┬─────────┬──────────┐
│ name │ type │ messages │
├────────────────────────┼─────────┼──────────┤
│ classic-queue-amqp-091 │ classic │ 159077 │
├────────────────────────┼─────────┼──────────┤
│ quorum-queue-amqp-091 │ quorum │ 155782 │
├────────────────────────┼─────────┼──────────┤
│ classic-queue-amqp-10 │ classic │ 1089075 │
├────────────────────────┼─────────┼──────────┤
│ quorum-queue-amqp-10 │ quorum │ 148580 │
└────────────────────────┴─────────┴──────────┘
正如 AMQP 1.0 基准测试 博客文章中所解释的,仲裁队列 fsync(准确地说是 fdatasync
),而经典队列则不执行。因此,即使没有复制,仲裁队列也比经典队列慢得多,因为我使用的是消费级磁盘,每次 fsync
至少需要 5 毫秒。对于生产环境集群,建议使用高端、企业级的磁盘,这些磁盘可以更快地执行 fsync
操作。
结果显示,单个 AMQP 0.9.1 连接向目标经典队列和目标仲裁队列发送了大致相同数量的消息。这是因为 quorum-queue-amqp-091
导致整个 AMQP 0.9.1 连接在我的基准测试中每秒被阻塞(和解除阻塞)约 80 次。因此,单个 AMQP 0.9.1 连接上对多个目标队列(classic-queue-amqp-091
和 quorum-queue-amqp-091
)的发布速率受到最慢目标队列(quorum-queue-amqp-091
)的限制。总共,AMQP 0.9.1 连接发送了 159,077 + 155,782 = 314,859
条消息。
相反,由于链接流量控制,RabbitMQ 只限制了到 quorum-queue-amqp-10
目标 的链接,允许 AMQP 1.0 客户端继续以全速向 classic-queue-amqp-10
目标发布消息。总共,AMQP 1.0 连接发送了 1,089,075 + 148,580 = 1,237,655
条消息。
因此,在我们简单的基准测试中,AMQP 1.0 的总发送吞吐量是 AMQP 0.9.1 的四倍(!)。
当一个目标队列过载时,客户端可以继续以高速从所有源队列消费消息。因此,AMQP 1.0 客户端可以使用单个连接来以高吞吐量进行发布和消费。
优势 #3 描述了如何让单个过载的目标队列导致 RabbitMQ 阻止阅读器进程读取任何 AMQP 0.9.1 帧。这意味着客户端不仅不能再发布消息,而且也 **不能消费** 消息。这是因为客户端的消息 确认 没有被 RabbitMQ 处理,当消费者的 预取 限制达到时,会阻止新消息传递给消费者。
虽然这种消费方面的限制是临时的(AMQP 0.9.1 reader
进程每秒会被阻塞和解除阻塞很多次),但它会显著降低消费速率。
RabbitMQ AMQP 0.9.1 的 文档 建议
因此,建议发布者和消费者尽可能使用独立的连接,以便将消费者与可能应用于发布连接的潜在流量控制隔离,从而影响手动消费者确认。
这导致了整个 AMQP 0.9.1 客户端库生态系统都采用了这种使用独立连接进行发布和消费的“最佳实践”。例如,RabbitMQ AMQP 0.9.1 C++ 库 说明
发布和消费发生在不同的连接上
常见的应用程序陷阱是在同一个连接上进行消费和生产。这会导致消费速率变慢,因为 RabbitMQ 会对快速发布者施加反压 - 根据消费/发布的具体队列,这会导致恶性循环。
相反,AMQP 1.0 链接流量控制允许只减慢客户端应用程序中的链接发送方。所有其他链接(无论是发送还是消费)都可以继续以全速运行。
因此,在 AMQP 1.0 中,客户端可以使用单个连接进行发布和消费。
基准测试:一个发送者,一个接收者
程序 one_sender_one_receiver 模拟了客户端打开两个链接的场景
- 链接 1 从经典队列接收消息。
- 链接 2 向仲裁队列发送消息。
该程序打开一个 AMQP 0.9.1 连接和通道,以及一个 AMQP 1.0 连接和会话。
为了准备基准测试,程序将一百万条消息写入每个经典队列。
在 AMQP 0.9.1 通道和 AMQP 1.0 会话上,都有两个协程
- 一个协程(链接 1)以 200 的预取率从经典队列接收消息,并确认每个消息。
- 一个协程(链接 2)以 10,000 消息的批次向仲裁队列发布消息。(收到所有 10,000 条确认后,将发布下一个批次。)
main.go RabbitMQ
+-------------+ +----------------------------------+
| | AMQP 0.9.1 connection | |
| |#####################################| |
| +---+ |-------------------------------------| +------------------------+ |
| | C | | classic-queue-amqp-091 | |
| +---+ +------------------------+ |
| AMQP 0.9.1 channel |
| +---+ +------------------------+ |
| | P | | quorum-queue-amqp-091 | |
| +---+ |-------------------------------------| +------------------------+ |
| |#####################################| |
| | | |
| | | |
| | | |
| |#####################################| |
| +---+ |-------------------------------------| +-----------------------+ |
| | C |O<============================================O| classic-queue-amqp-10 | |
| +---+ +-----------------------+ |
| AMQP 1.0 session |
| +---+ +-----------------------+ |
| | P |O======================================+=====>O| quorum-queue-amqp-10 | |
| +-+-+ |----------------------------------|--| +-----------------------+ |
| | |##################################|##| |
| | | AMQP 1.0 connection | | |
+------|------+ | +----------------------------------+
| |
| |
Publisher or Consumer AMQP 1.0 link
goroutine
按照以下步骤运行基准测试
- 使用 Ubuntu 系统上的
make run-broker
启动 RabbitMQ 服务器 v4.0.0-beta.6。 - 使用
go run one_sender_one_receiver/main.go
执行 Go 程序 - 程序完成后,列出每个队列中的消息数量
./sbin/rabbitmqctl --silent list_queues name type messages --formatter=pretty_table
┌────────────────────────┬─────────┬──────────┐
│ name │ type │ messages │
├────────────────────────┼─────────┼──────────┤
│ classic-queue-amqp-091 │ classic │ 990932 │
├────────────────────────┼─────────┼──────────┤
│ quorum-queue-amqp-091 │ quorum │ 172800 │
├────────────────────────┼─────────┼──────────┤
│ classic-queue-amqp-10 │ classic │ 336229 │
├────────────────────────┼─────────┼──────────┤
│ quorum-queue-amqp-10 │ quorum │ 130000 │
└────────────────────────┴─────────┴──────────┘
虽然 AMQP 0.9.1 客户端只消费了 1,000,000 - 990,932 = 9,068
条消息,但 AMQP 1.0 客户端消费了 1,000,000 - 336,229 = 663,771
条消息。
因此,在这个基准测试中,AMQP 1.0 客户端接收的消息数量是 AMQP 0.9.1 客户端的 73 倍(!)。
交付计数
到目前为止,我们只了解 flow 帧的一个字段:link-credit
。
以下场景会发生什么?
Receiver Sender
=======================================================================
...
link state: link state:
link-credit = 3 link-credit = 3
flow(link-credit = 6) ---+ +--- transfer(...)
\ /
x
/ \
<--+ +-->
link state: link state:
link-credit = 5 link-credit = 6
最初,link-credit
为 3。接收者决定发送一个 flow
帧,将新的 link-credit
设置为 6。同时,发送者发送一个 transfer
帧。
由于接收者在发送 flow
帧后收到了 transfer
帧,它会将其新的链接信用计算为 6 - 1 = 5
。但是,因为发送者在发送 transfer
帧后收到了 flow
帧,它会将信用设置为 6。因此,状态(以及链接剩余多少信用的视图)就会变得不一致。这会导致问题,因为发送者可能导致接收者溢出。
为了防止这种不一致,链接状态和 flow
帧都需要第二个字段
<field name="delivery-count" type="sequence-no"/>
每次消息被传递时,交付计数都会减少 1。具体来说,发送者在每次发送消息时将交付计数减少 1,接收者在每次接收消息时将交付计数减少 1。
当发送者收到一个 flow
帧(包含链接信用和交付计数)时,它会根据此 公式 设置其链接信用
link-credit(snd) := delivery-count(rcv) + link-credit(rcv) - delivery-count(snd).
(snd)
指的是发送者处的链接状态,而接收者处的链接状态 (rcv)
会通过 flow
帧发送给发送者。
在发送者处,这个公式意味着:“将新的链接信用设置为我在 flow
帧中收到的链接信用减去任何正在传输的交付。”
交付计数的目的是在以下事件序列中建立顺序:
- 发送者发送消息
- 接收者接收消息
- 接收者授予链接信用
- 发送者计算接收者的链接信用
使用交付计数可以解决我们之前讨论的不一致问题。
假设交付计数最初设置为 20
Receiver Sender
========================================================================================
...
link state: link state:
delivery-count = 20 delivery-count = 20
link-credit = 3 link-credit = 3
flow(delivery-count = 20,
link-credit = 6) ---+ +--- transfer(...)
\ /
x
/ \
<--+ +-->
link state: link state:
delivery-count = 21 delivery-count = 21
link-credit = 5 link-credit = 20+6-21 = 5 (above formula)
一些 AMQP 1.0 字段(包括交付计数)的类型为 sequence-no。这些是 32 位 RFC-1982 序列号,范围为 [0 .. 4,294,967,295]
,并且会循环:对 4,294,967,295 加 1 会得到 0。
交付计数由发送者初始化,发送者在 attach 帧的 initial-delivery-count
字段中发送其选择的数值。
发送者可以将交付计数初始化为它选择的任何数值,例如 0、10 或 4,294,967,295。这个数值除了我们之前讨论的目的以外,没有其他内在含义:将接收者的交付计数与发送者的交付计数进行比较,以确定有多少消息正在传输。
据我所知,AMQP 1.0 的链接流量控制似乎基于 1995 年的论文 基于信用的 ATM 网络流量控制。
公式
link-credit(snd) := delivery-count(rcv) + link-credit(rcv) - delivery-count(snd).
来自 AMQP 1.0 规范第 2.6.7 节的公式与论文中的公式
Crd_Bal = Buf_Alloc - (Tx_Cnt - Fwd_Cnt)(1)
一致。
AMQP 1.0 规范甚至采用了论文中类似的术语,例如“链接”和“节点”。此外,规范可能指的是“delivery-count”,因为它在论文中也被称为“count”(尽管规范明确说明在 AMQP 中它不是一个计数,而是一个序列号)。
论文中的图 2 很好地说明了这个概念
在此图中
Tx_Cnt
对应于delivery-count(snd)
Fwd_Cnt
对应于delivery-count(rcv)
Crd_Bal
对应于link-credit(snd)
Buf_Alloc
对应于link-credit(rcv)
论文详细解释了接收者应该以多高的频率和多少数量来补充链接信用。如果您想成为 AMQP 1.0 流量控制的专家,我建议您阅读 AMQP 1.0 规范和论文。
Drain
了解了 flow 帧的两个最重要的链接控制字段(link-credit
和 delivery-count
)之后,让我们继续讨论 drain
字段
<field name="drain" type="boolean" default="false"/>
默认情况下,drain
字段设置为 false
。接收者决定是否启用排水模式,发送者将 drain
设置为接收者最后已知的值。
排水意味着发送者应该通过发送可用消息来使用接收者所有的链接信用。如果没有足够的邮件发送,发送者仍然必须用以下方式耗尽所有链接信用
- 将 delivery-count 增加剩余的 link-credit。
- 将 link-credit 设置为 0。
- 向接收者发送一个
flow
帧。
通过将 drain
字段设置为 true
,消费者请求 RabbitMQ “发送 transfer 帧或 flow 帧”。如果源队列为空,RabbitMQ 将立即回复只有 flow
帧。
因此,drain
字段允许消费者在同步获取消息时设置超时,如 AMQP 1.0 规范的 图 2.44:带有超时的同步获取 所示
Receiver Sender
=================================================================
...
flow(link-credit=1) ---------->
*wait for link-credit <= 0*
flow(drain=True) ---+ +--- transfer(...)
\ /
x
/ \
(1) <--+ +-->
(2) <---------- flow(...)
...
-----------------------------------------------------------------
(1) If a message is available within the timeout, it will
arrive at this point.
(2) If a message is not available within the timeout, the
drain flag will ensure that the sender promptly advances the
delivery-count until link-credit is consumed.
由于链接信用被快速消耗,消费者可以明确地确定消息是否被接收或操作是否超时。
Echo
与 drain
字段类似,echo
字段是
- 默认设置为
false
- 由消费者决定,因为 RabbitMQ 在其当前实现中不设置此字段
<field name="echo" type="boolean" default="false"/>
消费者可以设置 echo
字段来请求 RabbitMQ 回复 flow
帧。一个用例在 AMQP 1.0 规范的 图 2.46:停止传入消息 中描述
Receiver Sender
================================================================
...
<---------- transfer(...)
flow(..., ---+ +--- transfer(...)
link-credit=0, \ /
echo=True) x
/ \
(1) <--+ +-->
(2) <---------- flow(...)
...
----------------------------------------------------------------
(1) In-flight transfers can still arrive until the flow state
is updated at the sender.
(2) At this point no further transfers will arrive.
在 AMQP 1.0 中,可以停止/暂停消费者,并在稍后恢复。
在 AMQP 0.9.1 中,消费者不能被暂停和恢复。相反,消费者必须使用 basic.cancel
方法取消,然后使用 basic.consume
注册一个新的消费者。
AMQP 1.0 允许从一个 单个活动消费者 到下一个消费者进行平滑的移交,同时保持消息顺序。
在 AMQP 1.0 中,消费者可以在 分离 链接之前先停止链接并确认消息(首选),或者直接分离链接。直接分离链接将重新排队所有未确认的消息。无论哪种方式,分离链接都会导致下一个消费者被激活并按原始顺序接收消息。
相反,AMQP 0.9.1 中的单个活动消费者不能平滑安全地移交给下一个消费者。当 AMQP 0.9.1 消费者通过 basic.cancel
取消消费但仍有未确认的消息时,队列将激活下一个消费者。如果 AMQP 0.9.1 客户端在短时间后崩溃,签出到旧消费者的消息将被重新排队,这可能会违反消息顺序。为了保持消息顺序,AMQP 0.9.1 客户端必须关闭整个通道(不先调用 basic.cancel
),以便在激活下一个消费者之前重新排队消息。
Available
RabbitMQ 在 flow 帧中设置 available
字段,以告知消费者有多少消息可用
<field name="available" type="uint"/>
available
值只是一个近似值,因为从队列发出此信息到 flow
帧到达消费者,此信息可能已经过时,例如当其他客户端发布消息到此队列或从该队列消费消息时。
对于 经典队列 和 仲裁队列,available
表示已准备好交付的消息数量(即队列长度,不包括签出到消费者的消息)。
对于 流,available
表示提交偏移量和最后消费的 偏移量 之间的差值。大致来说,提交偏移量是 RabbitMQ 集群中不同的流 副本 同意作为流的结束位置。最后消费的偏移量可能与提交偏移量相同,或者落后于提交偏移量。available
值只是一个估计值,因为 流偏移量不一定代表一条消息。
如果启用了 单个活动消费者 功能,仲裁队列将为所有非活动(等待)消费者返回 available = 0
。这很有道理,因为无论非活动消费者补充了多少信用,都没有可用的消息。
在 上一节 中,我们了解到消费者设置 echo
字段的一个用例是停止链接。另一个用例是当消费者想要了解它所消费的队列中可用的消息数量时。
AMQP 1.0 会通知消费者队列中可用的消息数量。
在 RabbitMQ 发送给消费者的每个 flow
帧中包含此信息,是相对于 AMQP 0.9.1 的优势。在 AMQP 0.9.1 中,查询可用消息的方法更加繁琐且效率低下
queue.declare
withpassive=true
:queue.declare_ok
回复将包含message_count
字段。basic.get
:basic.get_ok
回复将包含message_count
字段。
available
字段也可以由发布者设置,以告诉 RabbitMQ 发布者有多少消息可以发送。目前,RabbitMQ 忽略此信息。
Properties
flow
帧的 properties
字段可以承载与应用程序相关的链接状态属性
<field name="properties" type="fields"/>
目前,RabbitMQ 不使用此字段。
AMQP 1.0 链接流量控制是可扩展的。
假设 RabbitMQ 想要启用发布者直接将消息发送到仲裁队列领导者。由于所有 Ra 命令(包括排队消息)都必须先通过领导者,因此客户端直接连接到托管仲裁队列领导者的 RabbitMQ 节点是有意义的。这种“队列本地性”将减少 RabbitMQ 集群内流量,从而提高延迟和吞吐量。当领导者发生变化时,RabbitMQ 可以通过 properties
字段向发布者推送领导者更改通知,包括托管仲裁队列领导者的新 RabbitMQ 节点,而不是导致 RabbitMQ 分离 链接(这可能会对发布应用程序造成破坏)。然后,应用程序可以决定何时方便分离链接并在不同的连接上连接一个新链接,以继续“本地”发布。
或者,RabbitMQ 可以为每个 flow
帧的 properties
字段中的 local
键发送一个布尔值,以指示链接当前是否正在本地发布或消费。如果 local
字段从 true
变为 false
,客户端可以通过 AMQP 1.0 上的 HTTP 查询新的队列拓扑和领导者。
这些只是假设的例子,说明了 RabbitMQ 如何在未来利用链接流量控制的可扩展性,这是相对于 AMQP 0.9.1 的优势。
Summary
我们了解到 AMQP 1.0 链接流量控制可以保护单个消费者或队列免受消息过载。接收者定期向发送者提供反馈,告知它当前可以处理多少消息。
每个链接端点(发送者和接收者)维护完整的链接流量控制状态,并通过 flow 帧与另一方交换此状态。
以下链接流量控制字段由 **接收者** 独立决定
link-credit
drain
以下链接流量控制字段由 **发送者** 独立决定
delivery-count
可用
会话流控制
会话
客户端库为每个AMQP 连接创建一个 TCP 连接。使用open 帧建立 AMQP 1.0 连接。
在 AMQP 1.0 连接中,客户端可以启动多个 AMQP 1.0 会话。这与 AMQP 0.9.1 客户端如何在 AMQP 0.9.1 连接中打开多个AMQP 0.9.1 通道类似。使用begin 帧启动 AMQP 1.0 会话。
Client App RabbitMQ
+-------------+ +-------------+
| |################################| |
| +---+ |--------------------------------| +---+ |
| | C |O<===============================+========O| Q | |
| +-+-+ \ |-------------------+-------|----| |+-+-+ |
| | \ |#######+###########|#######|####| | | |
+-----|-----\-+ | | | +---|--|------+
| \ | | | | |
| Target | | | Source |
| | | | |
Consumer | | Link Queue
| Session
Connection
例如,客户端应用程序可以创建
- 单个连接,包含单个会话,或
- 单个连接,包含多个会话,或
- 多个连接,每个连接包含一个或多个会话
通常,单个连接,包含单个会话就足够了。
打开 AMQP 连接会带来一些开销:必须建立 TCP 连接,在客户端和服务器中分配操作系统资源(如套接字和 TCP 缓冲区),并为 TCP 或 TLS 握手带来延迟。此外,在 RabbitMQ 节点上,会为每个传入的 AMQP 连接创建一个监控树。
可以将会话视为“轻量级”连接。每个 AMQP 1.0 会话目前都作为自己的Erlang 进程实现。因此,如果连接已建立,则创建会话的成本很低。
在 AMQP 连接中创建第二个会话可能在以下情况下很有用
- 快速且廉价地创建另一个“虚拟连接”,而不会产生上述 AMQP 连接设置开销。
- 确保高优先级transfer 帧(需要低延迟)不会被其他 transfer 帧阻塞。
- 当 RabbitMQ会话进程变得非常繁忙时(例如,将消息路由到队列),提高并行性。
流字段
会话提供一种基于传输的 transfer 帧数量的流控制方案。由于对于给定的连接,帧具有最大大小,因此这提供了基于传输的字节数的流控制。
请记住,大型消息将拆分为多个 transfer 帧。
会话流控制在 AMQP 1.0 中的操作层级高于链接流控制。链接流控制保护单个消费者和队列,而会话流控制旨在保护整个客户端应用程序和 RabbitMQ。
每个链接端点维护链接流控制状态并通过 flow
帧交换该状态,每个会话端点维护会话流控制状态并在相同的 flow
帧中交换该状态。因此,flow 帧中的其余字段用于会话流控制
<field name="next-incoming-id" type="transfer-number"/>
<field name="incoming-window" type="uint" mandatory="true"/>
<field name="next-outgoing-id" type="transfer-number" mandatory="true"/>
<field name="outgoing-window" type="uint" mandatory="true"/>
incoming-window
与link-credit 字段类似,接收方通过该字段告知发送方它可以容忍接收多少个“单元”。不同之处在于,对于 link-credit
,单元是可能很大的应用程序消息,而对于 incoming-window
,单元是 transfer
帧。
为了更好地理解,举一个极端的例子:如果连接上协商的 max-frame-size
非常低,而一条消息非常大,如果接收方授予发送方一个链接信用额度,则发送方可能仍然会被会话流控制阻止,无法完整发送此消息。
next-incoming-id
和 next-outgoing-id
是序列号,与链接流控制中的delivery-count 目的相同:它们确保在 transfer
帧与 flow
帧并行发送时,窗口在另一端正确计算。会话流控制需要两个序列号,而链接流控制只需要一个,因为会话是双向的,而链接是单向的。
最初,RabbitMQ 允许发布者发送400 个 transfer 帧。每当 RabbitMQ 会话进程处理完一半数量的帧(200 个 transfer 帧)时,RabbitMQ 就会通过向发布者发送包含 incoming_window = 400
的 flow
帧来扩展此窗口。
400
的值可以通过advanced.config 设置 rabbit.max_incoming_window
进行配置。
RabbitMQ 报警
此规则的唯一例外是触发内存或磁盘报警时。为了保护 RabbitMQ 不至于耗尽内存或磁盘空间,每个会话都会通过发送包含 incoming_window = 0
的 flow
帧来关闭其传入窗口,从而有效地阻止发布者发送任何进一步的 transfer
帧。
如果发生集群范围的内存或磁盘报警,RabbitMQ 仅会阻止 AMQP 1.0 客户端发布 transfer
帧。其他操作,例如 AMQP 1.0 客户端在同一会话上消费或创建仅消费的新连接以清空 RabbitMQ 中的队列(从而减少内存和磁盘使用量),仍然允许。
在 AMQP 0.9.1 中,RabbitMQ 会完全阻止从连接套接字读取数据,并阻止打开新连接,直到报警清除。AMQP 0.9.1 客户端必须使用单独的连接来发布和消费,以便在报警期间继续消费。
正如优点 #4 和 #9 中所述,由于客户端可以安全有效地使用单个 AMQP 1.0 连接进行发布和消费,因此 RabbitMQ 中的内存使用量会减少。
AMQP 0.9.1 实际上需要的连接数量是 AMQP 1.0 的两倍。在AMQP 1.0 基准测试中,可以了解到可以节省多少内存。
Incoming-Window
AMQP 1.0 流控制的缺点是它的复杂性。虽然链接流控制和会话流控制背后的理念和动机都很合理,但在每个场景中正确有效地实现一层(会话流控制)在另一层(链接流控制)之上,可能会很困难。请考虑客户端可以
- 独立修改会话流控制和链接流控制(在同一个
flow
帧中或在单独的flow
帧中)。 - 随时以及以不同的频率发送
flow
帧。 - 动态地增加或减少会话窗口或链接信用额度。
- 授予 0 的链接信用额度(导致队列停止发送)或授予大量的链接信用额度(最多 40 亿)。
- 关闭其会话
incoming-window
(导致服务器会话停止发送)或将其打开(最多 40 亿)。 - 停止从套接字读取数据,对服务器应用 TCP 反压。
- 将其他特殊逻辑(如
drain=true
)添加到混合中。
在 AMQP 1.0 程序中,并发性是理想的。根据编程语言的不同,客户端或代理的不同部分由不同的线程或进程实现。
RabbitMQ 通过在不同的Erlang 进程中运行连接读取器进程(解析帧)、连接写入器进程(序列化帧)、会话进程(例如路由消息)和队列进程(存储和转发消息)来实现并发性。
在客户端授予发送队列大量 link-credit
但保持非常小的会话 incoming-window
的情况下,允许队列进程传递消息,但不允许会话进程将消息转发到客户端。在这种情况下,消息将在会话进程中缓冲,直到客户端的 incoming-window
允许将它们转发。
为了防止这种情况(大量消息可能在会话进程中缓冲),RabbitMQ 在其当前实现中,在内部以最多 256 个链接信用额度的批次,从会话进程向队列进程授予链接信用额度。即使客户端授予大量链接信用额度,队列也只会看到给定消费者最多 256 个信用额度。一旦服务器会话进程发送完这 256 条消息,服务器会话进程就会向队列授予下一批 256 个信用额度。
256
的值可以通过advanced.config 设置 rabbit.max_queue_credit
进行配置。
总结
这篇博客文章介绍了 AMQP 1.0 链接流控制和会话流控制的工作原理。
我们了解了 RabbitMQ 中的各个进程如何保护自己免受过载
- 队列进程受到链路流量控制的保护。
- 会话进程和 RabbitMQ 整体受到会话流量控制的保护。
- 连接读取进程受到 TCP 回压的保护,当它们无法快速读取帧时(这种情况应该很少见)。
这篇文章还强调了 AMQP 1.0 中的流量控制相较于 AMQP 0.9.1 中的流量控制的十个优势。主要优势包括
- 对消费者进行细粒度控制,允许它们在任何时间点确定它们想要从特定队列中消费的确切消息数量。
- 当目标队列达到其限制时,在单个 AMQP 连接上发布和消费的吞吐量更高。我们运行了两个基准测试
- 当发送到两个不同的目标队列时,AMQP 1.0 的总发送吞吐量是 AMQP 0.9.1 的四倍。
- 在极端情况下,当从一个队列接收消息时,同时快速发送消息到另一个队列,AMQP 1.0 的消费速度是 AMQP 0.9.1 的 73 倍。
- 安全高效地使用单个连接进行发布和消费。
- 消费者能够在任何时间点停止和恢复。
- 可扩展性,以适应未来用例。