一些排队理论:吞吐量、延迟和带宽
在 Rabbit 中,你有一个队列,有一些客户端从该队列中消费消息。如果你没有设置任何 QoS 设置(basic.qos
),那么 Rabbit 会尽可能快地将队列中的所有消息推送到客户端,只要网络和客户端允许。当消费者缓冲所有消息到自己的 RAM 中时,它们的内存会膨胀。如果你向 Rabbit 查询,队列可能显示为空,但可能会有数百万条未确认的消息在客户端中等待处理。如果你添加一个新的消费者,队列中没有消息可以发送给新的消费者。消息只是被缓冲在现有的客户端中,并且可能在很长一段时间内保持在那里,即使有其他消费者可以更快地处理这些消息。这是相当不理想的。
因此,默认的 QoS prefetch
设置给了客户端一个无限的缓冲区,这会导致性能低下。那么你应该将 QoS prefetch
缓冲区大小设置为多少呢?目标是让消费者始终保持工作状态,但同时也要尽量减少客户端的缓冲区大小,以便更多消息保留在 Rabbit 的队列中,从而可以被新的消费者使用,或者当消费者空闲时被发送出去。
假设 Rabbit 从队列中取出一条消息,将其放到网络上,并将其传送到消费者需要 50 毫秒。客户端处理消息需要 4 毫秒。消费者处理完消息后,它会向 Rabbit 发送一个 ack
,这又需要 50 毫秒才能发送到 Rabbit 并被 Rabbit 处理。因此,总的往返时间为 104 毫秒。如果我们的 QoS prefetch
设置为 1 条消息,那么 Rabbit 在这个往返时间完成之前不会发送下一条消息。因此,客户端在每 104 毫秒中只有 4 毫秒处于忙碌状态,或者说只有 3.8% 的时间处于忙碌状态。我们希望它一直处于忙碌状态。
如果我们用总往返时间除以每个消息在客户端上的处理时间,得到104 / 4 = 26
。如果我们的 QoS prefetch
为 26 条消息,那么就解决了我们的问题:假设客户端有 26 条消息被缓冲,准备并等待处理。(这是一个合理的假设:一旦你设置了 basic.qos
并从队列中 consume
,Rabbit 会将你订阅的队列中尽可能多的消息发送给客户端,直到达到 QoS 限额。如果你假设消息不是很大,并且带宽很高,那么 Rabbit 很可能能够比你的客户端处理消息的速度更快地将消息发送给你的消费者客户端。因此,从客户端缓冲区已满的假设开始进行所有计算是合理的(也更简单)。)如果每条消息的处理时间为 4 毫秒,那么处理整个缓冲区需要26 * 4 = 104ms
。前 4 毫秒是客户端处理第一条消息的时间。然后客户端发出一个 ack
,并继续处理缓冲区中的下一条消息。该 ack
需要 50 毫秒才能到达代理。代理然后向客户端发送一个新消息,这又需要 50 毫秒才能到达,因此当 104 毫秒过去,客户端完成处理缓冲区时,来自代理的下一条消息已经到达,并且准备并等待客户端进行处理。因此,客户端始终保持忙碌状态:设置更大的 QoS prefetch
不会使其运行得更快;但我们最小化了缓冲区大小,从而降低了客户端中消息的延迟:消息在客户端的缓冲区中停留的时间不超过为了让客户端始终保持工作状态所需的时长。事实上,客户端能够在下一条消息到达之前完全清空缓冲区,因此缓冲区实际上一直保持为空。
只要处理时间和网络行为保持不变,这个解决方案就绝对没问题。但是,如果网络速度突然下降一半会发生什么?你的 prefetch
缓冲区不再足够大,现在客户端会处于空闲状态,等待新消息到达,因为客户端能够比 Rabbit 提供新消息的速度更快地处理消息。
为了解决这个问题,我们可能只是决定将 QoS prefetch
大小翻倍(或接近翻倍)。如果我们将它从 26 增加到 51,那么如果客户端的处理时间仍然是每条消息 4 毫秒,那么现在缓冲区中有51 * 4 = 204ms
的消息,其中 4 毫秒用于处理消息,剩下的 200 毫秒用于发送 ack
回到 Rabbit 以及接收下一条消息。因此,我们现在可以应对网络速度下降一半的情况。
但是,如果网络正常运行,将我们的 QoS prefetch
翻倍意味着每条消息都会在客户端缓冲区中停留一段时间,而不是在到达客户端后立即被处理。同样地,从现在 51 条消息的完整缓冲区开始,我们知道新消息将在客户端完成处理第一条消息后的 100 毫秒开始出现在客户端。但在那 100 毫秒内,客户端将已经处理了100 / 4 = 25
条消息,而不是 50 条可用消息中的 25 条。这意味着当新消息到达客户端时,它会被添加到缓冲区的末尾,因为客户端从缓冲区的头部删除消息。因此,缓冲区将始终保持50 - 25 = 25
条消息,因此每条消息在缓冲区中停留的时间将从 50 毫秒增加到 150 毫秒,即25 * 4 = 100ms
,从而增加了 Rabbit 将消息发送到客户端和客户端开始处理消息之间的延迟。
因此,我们看到增加 prefetch
缓冲区,以便客户端能够应对网络性能下降,同时保持客户端忙碌,会显著增加网络正常运行时的延迟。
同样地,除了网络性能下降,如果客户端开始每条消息处理时间从 4 毫秒增加到 40 毫秒会发生什么?如果 Rabbit 中的队列以前处于稳定长度(即,入站和出站速率相同),那么现在它将开始快速增长,因为出站速率下降到原来的十分之一。你可能会决定通过添加更多消费者来尝试解决不断增长的积压问题,但现在有一些消息被现有的客户端缓冲。假设原始缓冲区大小为 26 条消息,客户端将花费 40 毫秒处理第一条消息,然后将 ack
发送回 Rabbit,并继续处理下一条消息。该 ack
仍然需要 50 毫秒才能到达 Rabbit,还需要 50 毫秒才能让 Rabbit 发送出新消息,但在那 100 毫秒内,客户端只处理了100 / 40 = 2.5
条消息,而不是剩下的 25 条消息。因此,缓冲区此时长为25 - 3 = 22
条消息。来自 Rabbit 的新消息,而不是立即被处理,现在排在第 23 位,位于 22 条等待处理的消息之后,并且在接下来的22 * 40 = 880ms
内不会被客户端触碰。鉴于从 Rabbit 到客户端的网络延迟只有 50 毫秒,这个额外的 880 毫秒延迟现在占延迟的 95%(880 / (880 + 50) = 0.946
)。
更糟糕的是,如果我们将缓冲区大小翻倍到 51 条消息,以应对网络性能下降会发生什么?处理完第一条消息后,客户端将缓冲 50 条消息。100 毫秒后(假设网络正常运行),一条新消息将从 Rabbit 到达,客户端将处理完这 50 条消息中的第 3 条消息的一半(缓冲区现在将有 47 条消息),因此新消息将排在第 48 位,并且在接下来的47 * 40 = 1880ms
内不会被触碰。同样地,鉴于网络延迟(将消息传送到客户端)只有 50 毫秒,这个额外的 1880 毫秒延迟现在意味着客户端缓冲占延迟的 97% 以上(1880 / (1880 + 50) = 0.974
)。这很可能无法接受:数据可能只有在被及时处理时才是有效的和有用的,而不是在客户端收到数据后 2 秒才处理!如果其他消费客户端处于空闲状态,它们无能为力:一旦 Rabbit 将消息发送到客户端,该消息就由客户端负责,直到它确认或拒绝消息。客户端无法在消息被发送到客户端后互相窃取消息。你想要的是让客户端保持忙碌状态,但让客户端尽可能少地缓冲消息,这样消息就不会因为客户端缓冲而延迟,从而新的消费客户端可以快速从 Rabbit 的队列中获取消息。
因此,缓冲区太小会导致客户端在网络变慢时处于空闲状态,但缓冲区太大会导致网络正常运行时出现大量额外的延迟,以及客户端突然开始处理每条消息的时间比平时长时出现大量额外的延迟。很明显,你真正想要的是一个可变的缓冲区大小。这些问题在网络设备中很常见,并且一直是许多研究的主题。主动队列管理算法试图尝试丢弃或拒绝消息,以便避免消息在缓冲区中停留很长时间。当缓冲区保持为空时(每条消息只经历网络延迟,并且不会在缓冲区中停留),可以实现最低延迟,缓冲区是用来吸收峰值的。 Jim Gettys 从网络路由器的角度研究了这个问题:LAN 和 WAN 之间的性能差异会遇到完全相同的这类问题。事实上,无论何时你有一个缓冲区位于生产者(在我们的例子中是 Rabbit)和消费者(客户端应用程序逻辑)之间,并且两者的性能都可以动态变化,你都会遇到这类问题。最近,一种名为 受控延迟 的新算法被发布,它 似乎可以很好地解决 这些问题。
作者声称他们的CoDel(“coddle”)算法是一种“无旋钮”算法。这有点谎言:有两个旋钮,它们确实需要适当设置。但他们不需要每次性能变化时就改变,这是一个巨大的优势。我已经 实现了这个算法 作为 QueueingConsumer 的变体,用于我们的 AMQP Java 客户端。虽然原始算法的目标是 TCP 层,在那里简单丢弃数据包是有效的(TCP 本身会处理丢失数据包的重新传输),但在 AMQP 中,这样做是不礼貌的!因此,我的实现使用 Rabbit 的 basic.nack
扩展显式地将消息返回到队列,以便其他客户端可以处理它们。
使用它与 普通 QueueingConsumer 大致相同,只是您应该在构造函数中提供三个额外的参数以获得最佳性能。
- 第一个是
requeue
,它表示当消息被拒绝时,是否应该将它们重新排队还是丢弃。如果为 false,它们将被丢弃,如果设置了死信交换机制,这可能会触发该机制。 - 第二个是
targetDelay
,它表示消息在客户端 QoSprefetch
缓冲区中等待的允许时间(毫秒)。 - 第三个是
interval
,它表示一条消息的预期最坏情况处理时间(毫秒)。它不必精确,但处于数量级内肯定会有所帮助。
您仍然应该适当地设置 QoS prefetch
大小。如果您没有这样做,客户端可能会收到大量消息,并且如果这些消息在缓冲区中停留的时间过长,该算法将不得不将它们返回给 Rabbit。当消息返回给 Rabbit 时,很容易导致大量额外的网络流量。CoDel 算法旨在仅在性能偏离正常值时才开始丢弃(或拒绝)消息,因此一个实际的示例可能会有所帮助。
同样,假设每个方向的网络遍历时间为 50 毫秒,并且我们期望客户端平均花费 4 毫秒来处理每条消息,但这可能会飙升至 20 毫秒。因此,我们将 CoDel 的 interval
参数设置为 20。有时网络速度会减半,因此每个方向的遍历时间可以为 100 毫秒。为了解决这个问题,我们将 basic.qos prefetch
设置为 204 / 4 = 51
。是的,这意味着当网络正常运行时,缓冲区大部分时间将保持 25 条消息的长度(参见前面的计算结果),但我们认为这没关系。因此,每条消息将在缓冲区中停留的预期时间为 25 * 4 = 100 毫秒
,因此我们将 CoDel 的 targetDelay
设置为 100。
当一切正常运行时,CoDel 应该不会造成任何干扰,并且很少有消息会被拒绝。但是,如果客户端开始比平时更慢地处理消息,CoDel 会发现消息在客户端中被缓冲的时间过长,并将这些消息返回到队列。如果这些消息被重新排队,那么它们将可供交付给其他客户端。
目前这仍然是实验性的,并且您可以看到 CoDel 在处理 AMQP 消息方面不如处理纯 IP 消息那样合适的原因。还值得记住的是,通过拒绝消息进行重新排队是一个相当昂贵的操作,因此最好设置 CoDel 的参数,以确保在正常操作中很少有消息被拒绝。管理插件是检查被拒绝消息数量的简便方法。与往常一样,欢迎您提出评论、反馈和改进建议!