一些队列理论:吞吐量、延迟和带宽
你在 RabbitMQ 中有一个队列。有一些客户端正在从该队列消费。如果你根本不设置 QoS 设置(basic.qos),那么 RabbitMQ 会尽可能快地将队列中的所有消息推送到客户端,只要网络和客户端允许。消费者将在内存中膨胀,因为它们会在自己的 RAM 中缓冲所有消息。如果你询问 RabbitMQ,队列可能看起来是空的,但可能有数百万条消息未被确认,因为它们位于客户端中,随时准备被客户端应用程序处理。如果你添加一个新的消费者,队列中就没有消息可以发送给新消费者了。消息只是被现有客户端缓冲,并且可能需要很长时间才能被处理,即使有其他消费者变得可用,可以更快地处理这些消息。这很不理想。
因此,默认的 QoS prefetch 设置为客户端提供了一个无限缓冲区,这可能导致行为和性能不佳。但是,QoS prefetch 缓冲区大小应该设置为多少?目标是让消费者保持饱和工作,但要最小化客户端的缓冲区大小,以便更多消息保留在 RabbitMQ 的队列中,从而可供新消费者使用,或者在消费者空闲时发送给它们。
假设 RabbitMQ 从这个队列中获取一条消息,将其放到网络上,然后发送到消费者需要 50 毫秒。客户端处理消息需要 4 毫秒。一旦消费者处理完消息,它就会向 RabbitMQ 发送一个 ack,这需要另外 50 毫秒才能发送并由 RabbitMQ 处理。因此,我们总的往返时间是 104 毫秒。如果我们的 QoS prefetch 设置为 1 条消息,那么 RabbitMQ 要到此往返完成后才会发送下一条消息。因此,客户端在每 104 毫秒中只忙碌 4 毫秒,即 3.8% 的时间。我们希望它 100% 的时间都在忙碌。
如果我们进行总往返时间 / 客户端对每条消息的处理时间,我们得到 104 / 4 = 26。如果我们设置 QoS prefetch 为 26 条消息,这就能解决我们的问题:假设客户端有 26 条消息缓冲,已准备好并等待处理。(这是一个合理的假设:一旦你设置了 basic.qos 然后从队列中 consume,RabbitMQ 就会将你订阅的队列中尽可能多的消息发送给客户端,直到达到 QoS 限制。如果你假设消息不是很大并且带宽很高,那么 RabbitMQ 很可能能够比客户端处理它们的速度更快地将消息发送到你的消费客户端。因此,从客户端缓冲区已满的假设进行所有计算是合理的(而且更简单)。)如果每条消息需要 4 毫秒的处理时间,那么处理整个缓冲区总共需要 26 * 4 = 104 毫秒。最初的 4 毫秒是客户端处理第一条消息的时间。然后客户端发出 ack 并继续处理缓冲区中的下一条消息。这个 ack 需要 50 毫秒才能到达代理。代理然后向客户端发出一条新消息,这条消息需要 50 毫秒才能到达,所以在 104 毫秒过去并且客户端完成处理其缓冲区时,来自代理的下一条消息已经到达并已准备好等待客户端处理。因此,客户端一直保持忙碌:拥有更大的 QoS prefetch 不会使其运行更快;但我们最小化了缓冲区大小,从而最小化了客户端中消息的延迟:消息在客户端中缓冲的时间不会超过它们保持客户端饱和所必需的时间。事实上,客户端能够在下一条消息到达之前完全清空缓冲区,因此缓冲区实际上保持为空。
只要处理时间和网络行为保持不变,这个解决方案就绝对没问题。但请考虑一下,如果网络速度突然减半会发生什么:你的 prefetch 缓冲区不再够大,现在客户端会空闲下来,等待新消息到达,因为客户端处理消息的速度比 RabbitMQ 提供新消息的速度快。
为了解决这个问题,我们可能会决定将 QoS prefetch 大小加倍(或接近加倍)。如果我们将其从 26 增加到 51,那么如果客户端处理速度保持在每条消息 4 毫秒,我们现在缓冲区中有 51 * 4 = 204 毫秒的消息,其中 4 毫秒将用于处理一条消息,剩下 200 毫秒用于将 ack 发送回 RabbitMQ 并接收下一条消息。因此,我们现在可以应对网络速度减半的情况。
但是,如果网络运行正常,将我们的 QoS prefetch 加倍现在意味着每条消息会在客户端缓冲区中停留一段时间,而不是在到达客户端后立即处理。再次,从现在 51 条消息的完整缓冲区开始,我们知道在客户端处理完第一条消息后的 100 毫秒内,新消息将开始到达客户端。但在 100 毫秒内,客户端将处理完 50 条可用消息中的 100 / 4 = 25 条。这意味着当一条新消息到达客户端时,它将被添加到缓冲区的末尾,而客户端则从缓冲区的头部移除。因此,缓冲区将始终保持 50 - 25 = 25 条消息的长度,每条消息将在缓冲区中停留 25 * 4 = 100 毫秒,从而将 RabbitMQ 发送给客户端的消息与客户端开始处理它之间的延迟从 50 毫秒增加到 150 毫秒。
因此,我们看到增加 prefetch 缓冲区以使客户端能够应对网络性能下降并保持客户端忙碌,会显著增加网络正常运行时期的延迟。
同样,而不是网络性能下降,如果客户端开始每条消息处理 40 毫秒而不是 4 毫秒会发生什么?如果 RabbitMQ 中的队列以前保持稳定长度(即入口和出口速率相同),它现在将开始迅速增长,因为出口速率已降至之前的十分之一。你可能会决定通过添加更多消费者来尝试处理这个不断增长的积压,但现有客户端已经缓冲了一些消息。假设原始缓冲区大小为 26 条消息,客户端将花费 40 毫秒处理第一条消息,然后将 ack 发送回 RabbitMQ 并继续处理下一条消息。ack 到达 RabbitMQ 仍需 50 毫秒,RabbitMQ 发送新消息再需要 50 毫秒,但在 100 毫秒内,客户端只处理了 100 / 40 = 2.5 条额外的消息,而不是剩余的 25 条消息。因此,此时缓冲区长度为 25 - 3 = 22 条消息。来自 RabbitMQ 的新消息,而不是立即处理,现在将排在第 23 位,落后于仍在等待处理的 22 条其他消息,并且在接下来的 22 * 40 = 880 毫秒内不会被客户端触及。考虑到 RabbitMQ 到客户端的网络延迟仅为 50 毫秒,这额外的 880 毫秒延迟现在是延迟的 95%(880 / (880 + 50) = 0.946)。
更糟糕的是,如果我们为了应对网络性能下降而将缓冲区大小加倍到 51 条消息会发生什么?在处理完第一条消息后,客户端中将有另外 50 条消息被缓冲。100 毫秒后(假设网络正常运行),RabbitMQ 将发送一条新消息,此时客户端将处理完这 50 条消息中的第 3 条(缓冲区现在将有 47 条消息),因此新消息将排在缓冲区的第 48 位,并且在接下来的 47 * 40 = 1880 毫秒内不会被处理。再次,考虑到将消息发送到客户端的网络延迟仅为 50 毫秒,这额外的 1880 毫秒延迟现在意味着客户端缓冲占延迟的 97% 以上(1880 / (1880 + 50) = 0.974)。这很可能无法接受:数据只有在及时处理时才可能有效且有用,而不是在客户端接收到数据后大约 2 秒才处理!如果其他消费客户端空闲,它们也无能为力:一旦 RabbitMQ 将消息发送给客户端,在客户端确认或拒绝消息之前,该消息就由客户端负责。一旦消息发送给某个客户端,客户端就不能互相窃取消息。你希望的是让客户端保持忙碌,但让客户端缓冲的消息尽可能少,这样消息就不会因为客户端缓冲区而延迟,从而新消费客户端可以快速从 RabbitMQ 的队列中获得消息。
因此,缓冲区太小会导致网络变慢时客户端空闲,但缓冲区太大则会导致网络正常运行时产生大量额外延迟,并且当客户端处理每条消息的时间突然变长时会产生巨大的额外延迟。很明显,你真正想要的是一个可变的缓冲区大小。这些问题在网络设备中很常见,并且已经过大量研究。*主动队列管理*算法旨在尝试丢弃或拒绝消息,以避免消息在缓冲区中停留过长时间。当缓冲区保持为空时(每条消息仅承受网络延迟,根本不会在缓冲区中停留),并且缓冲区用于吸收峰值时,会实现最低延迟。 Jim Gettys 一直在从网络路由器的角度研究这个问题:局域网和广域网性能之间的差异遭受着完全相同的问题。事实上,只要你在生产者(在我们的例子中是 RabbitMQ)和消费者(客户端应用程序逻辑)之间有一个缓冲区,并且双方的性能都可以动态变化,你就会遇到这些问题。最近,一项名为 Controlled Delay 的新算法已经发表,它 似乎在解决这些问题方面效果很好。
作者声称他们的*CoDel*(“coddle”)算法是一种“无调节旋钮”的算法。这有点撒谎:有两个调节旋钮,并且需要将它们适当地设置。但它们不需要在每次性能变化时都进行更改,这是一个巨大的优势。我已经为我们的 AMQP Java 客户端 实现了这个算法,作为 QueueingConsumer 的一个变体。虽然原始算法针对 TCP 层,在那里丢弃数据包是有效的(TCP 本身会处理丢失数据包的重传),但在 AMQP 中这样做不太礼貌!因此,我的实现使用 RabbitMQ 的 basic.nack 扩展来显式地将消息返回给队列,以便其他客户端可以处理它们。
使用方法几乎与 普通的 QueueingConsumer 相同,只是你应该向构造函数提供三个额外的参数以获得最佳性能。
- 第一个是
requeue,它表示当消息被 nack 时,应该重新排队还是丢弃。如果为 false,它们将被丢弃,如果设置了死信交换,可能会触发死信交换机制。 - 第二个是
targetDelay,这是消息在客户端 QoSprefetch缓冲区中等待的可接受时间(以毫秒为单位)。 - 第三个是
interval,它是每条消息的预期最坏处理时间(以毫秒为单位)。不必精确,但至少在一个数量级内会有所帮助。
你仍然应该适当地设置 QoS prefetch 大小。如果这样做,很可能会向客户端发送大量消息,然后算法不得不将它们返回给 RabbitMQ,如果它们在缓冲区中停留时间过长。很容易导致大量的额外网络流量,因为消息被退回给 RabbitMQ。CoDel 算法旨在仅在性能偏离正常值时才开始丢弃(或拒绝)消息,因此一个工作示例可能会有帮助。
再次假设每方向的网络传输时间为 50 毫秒,并且我们预计客户端平均处理每条消息需要 4 毫秒,但可能飙升至 20 毫秒。因此,我们将 CoDel 的 interval 参数设置为 20。有时网络速度会减半,因此传输时间每方向可能为 100 毫秒。为了应对这种情况,我们将 basic.qos prefetch 设置为 204 / 4 = 51。是的,这意味着在网络正常运行时,缓冲区在大多数时间将保持 25 条消息的长度(见前面的计算),但我们认为没关系。每条消息因此将在缓冲区中等待 25 * 4 = 100 毫秒的预期时间,所以我们将 CoDel 的 targetDelay 设置为 100。
当一切正常运行时,CoDel 不应该碍事,很少有消息(甚至没有)应该被 nack。但如果客户端开始比正常情况处理消息慢,CoDel 将会发现消息在客户端中缓冲的时间过长,并将这些消息退回队列。如果消息被重新排队,它们将变得可供其他客户端使用。
这目前非常实验性,并且有可能看出 CoDel 不适合处理 AMQP 消息的原因,就像它适合处理普通 IP 一样。还值得记住的是,通过 nack 重新排队消息是一项相当昂贵的操作,因此最好将 CoDel 的参数设置好,以确保在正常运行中很少有消息(甚至没有)被 nack。管理插件是检查 nack 消息数量的简便方法。一如既往,欢迎评论、反馈和改进!