RabbitMQ 3.5.5 的新信用流设置
这篇博文是为 2015 年发布的 RabbitMQ 3.5 编写的。虽然有些部分仍然适用,但有很多信息已过时。例如,RabbitMQ 4.0 不再支持队列镜像,“将消息分页到磁盘”已不再是 RabbitMQ 需要做的事情,因为消息几乎总是立即持久化到磁盘。
为了防止快速的发布者使代理(broker)超载,超出其当前处理能力,RabbitMQ 实现了一个内部机制,称为*信用流(credit flow)*。RabbitMQ 内部的各种系统将使用此机制来降低发布者的速度,同时允许消息使用者追赶。在这篇博文中,我们将了解*信用流*的工作原理,以及我们可以做什么来调整其配置以获得最佳性能。
RabbitMQ 的最新版本包含了一些新的配置值,允许用户调整内部流控设置。理解这些设置如何根据你的具体工作负载运行,可以帮助你充分发挥 RabbitMQ 的性能。但请注意,随意增加这些值“看看会发生什么”可能会对 RabbitMQ 响应消息突发的能力产生不利影响,并影响 RabbitMQ 处理内存压力的内部策略。请谨慎操作。
要理解新的流控设置,我们首先需要了解 RabbitMQ 在消息发布和分页到磁盘方面的内部工作原理。我们先来看看消息发布是如何在 RabbitMQ 中工作的。
消息发布
为了了解 credit_flow 及其设置如何影响发布,让我们来看看 RabbitMQ 内部消息是如何流动的。请记住,RabbitMQ 是用 Erlang 实现的,其中的进程通过相互发送消息进行通信。
每当 RabbitMQ 实例运行时,可能都有数百个 Erlang 进程在交换消息以相互通信。例如,我们有一个读取器进程,它从网络读取 AMQP 帧。这些帧会被转换为 AMQP 命令,然后转发给 AMQP 通道进程。如果该通道正在处理发布,它需要询问特定的交换机消息应该去往哪些队列,这意味着通道会将消息传递给每个队列。最后,如果 AMQP 消息需要持久化,msg_store 进程将接收它并将其写入磁盘。因此,每当我们向 RabbitMQ 发布 AMQP 消息时,都会有以下 Erlang 消息流1。
reader -> channel -> queue process -> message store.
为了防止链中的任何一个进程溢出下一个进程,我们有一个“流控”机制。每个进程最初会向发送消息的进程授予一定数量的信用额度。一旦一个进程能够处理 N 条消息,它就会向发送消息的进程授予更多的信用额度。在默认的流控设置(rabbitmq.config 下的 credit_flow_default_credit)下,初始信用额度为 200 条消息,在接收进程处理了 50 条消息后,发送消息的进程将获得另外 50 条信用额度。
假设我们正在向 RabbitMQ 发布消息,这意味着读取器将为收到的每个 AMQP basic.publish 向通道进程发送一条 Erlang 消息。每条消息将消耗通道的信用额度中的一条。一旦通道能够处理 50 条消息,它将向读取器授予更多信用额度。到目前为止一切顺利。
反过来,通道会将消息发送到匹配消息路由规则的队列进程。这将消耗队列进程授予通道的信用额度中的一条。在队列进程成功处理 50 次传递后,它将向通道授予另外 50 条信用额度。
最后,如果一条消息被认为是持久化的(它是持久化的并且发布到了持久化队列),它将被发送到消息存储,在这种情况下,它也将消耗消息存储授予队列进程的信用额度。在这种情况下,初始值不同,由 msg_store_credit_disc_bound 设置处理:初始信用额度为2000条消息,处理 500 条消息后额外增加500条信用额度。
所以我们知道了内部消息在 RabbitMQ 中是如何流动的,以及何时向消息流上游的进程授予信用额度。棘手的部分在于信用额度如何在进程之间授予。在正常情况下,通道会处理来自读取器的 50 条消息,然后授予读取器另外 50 条信用额度,但请记住,通道不仅仅处理发布,它还要向消费者发送消息、将消息路由到队列等等。
如果读取器以比通道处理速度更快的方式向通道发送消息会发生什么?如果我们遇到这种情况,通道就会阻塞读取器进程,这将导致生产者被 RabbitMQ 节流。在默认设置下,一旦读取器向通道发送了 200 条消息,但通道无法至少处理其中的 50 条以授予读取器信用额度,读取器就会被阻塞。
同样,在正常情况下,一旦通道成功处理了消息积压,它就会向读取器授予更多的信用额度,但这里有一个问题。如果通道进程因为类似的原因被队列进程阻塞了呢?那么本应流向读取器进程的新信用额度将被延迟。读取器进程将保持阻塞状态。
一旦队列进程成功处理了通道传递的消息积压,它就会向通道授予更多信用额度,从而解除通道的阻塞,进而导致通道向读取器授予更多信用额度,解除读取器的阻塞。再次强调,这是在正常情况下的情况,但你猜对了,如果消息存储阻塞了队列进程呢?那么流向通道的信用额度将被延迟,通道将保持阻塞,进而延迟流向读取器的信用额度,导致读取器被阻塞。在某个时候,消息存储会向队列进程授予消息,队列进程会向通道传回消息,然后通道最终会向读取器授予消息并解除读取器的阻塞。
reader <--[grant]-- channel <--[grant]-- queue process <--[grant]-- message store.
有一个通道和一个队列进程可以使事情更容易理解,但这可能无法反映现实。RabbitMQ 用户通常会在同一连接上拥有多个通道发布消息。更常见的是一条消息被路由到多个队列。我们刚才解释的流控方案会发生什么?如果其中一个队列阻塞了通道,那么读取器也会被阻塞。
问题在于,从读取器的角度来看,当我们从网络读取一个帧时,我们甚至不知道它属于哪个通道。请记住,通道是 AMQP 连接之上的一个逻辑概念。因此,即使一条新的 AMQP 命令会进入一个没有阻塞读取器的通道,读取器也无法得知。请注意,我们只阻塞发布连接,消费者连接不受影响,因为我们希望消费者将消息从队列中清空。因此,拥有专门用于发布消息的连接,以及专门用于消费者的连接可能更好,这是一个很好的理由。
同样,每当通道处理消息发布时,它在执行路由之前都不知道消息将去往何处。因此,通道可能正在接收一条本应去往一个没有阻塞通道的队列的消息。由于在入口时我们不知道这一点,因此现有的流控策略是阻塞读取器,直到链下游的进程能够处理新消息。
RabbitMQ 3.5.5 中引入的新设置之一是修改 credit_flow_default_credit 值的能力。此设置接受一个形式为 {InitialCredit, MoreCreditAfter} 的元组。默认情况下,InitialCredit 设置为200,MoreCreditAfter 设置为50。根据你的具体工作流程,你需要决定是否值得提高这些值。让我们再次查看消息流图。
reader -> channel -> queue process -> message store.
提高 {InitialCredit, MoreCreditAfter} 的值意味着在链中的任何一点,我们最终可能会有比当前时间点可以被代理处理的消息更多的消息。更多的消息意味着更多的 RAM 使用量。对于 msg_store_credit_disc_bound 也是如此,但请记住,每个 RabbitMQ 实例只有一个消息存储2,并且可以有多个通道向同一个队列进程发送消息。因此,虽然一个队列进程从消息存储中获得的 InitialCredit 值为 2000,但该队列可以从不同的通道/连接源接收到该值很多倍的消息。因此,200 条信用额度作为初始 credit_flow_default_credit 值可能被视为过于保守,但你需要了解根据你的工作流程,这是否仍然足够好。
消息分页
让我们看看 RabbitMQ 队列是如何存储消息的。当一条消息进入队列时,队列需要确定消息是否应该被持久化。如果消息需要持久化,那么 RabbitMQ 将立即这样做3。现在,即使消息已持久化到磁盘,这也不意味着消息已从 RAM 中删除,因为 RabbitMQ 会将消息缓存到 RAM 中以便在将消息传递给消费者时快速访问。当我们谈论“将消息分页到磁盘”时,我们指的是 RabbitMQ 在必须将这些缓存中的消息发送到文件系统时所做的操作。
当 RabbitMQ 决定需要将消息分页到磁盘时,它会调用内部队列实现中的 reduce_memory_use 函数,以便将消息发送到文件系统。消息将分批分页;这些批次的大小取决于当前的内存压力状态。基本工作原理如下:
reduce_memory_use 函数会接收一个名为 target ram count 的数字,它告诉 RabbitMQ 应该尝试分页出消息,直到只剩下这个数量的消息。请记住,无论消息是否持久化,它们都保留在 RAM 中以便快速传递给消费者。只有当内存压力出现时,内存中的消息才会被分页到磁盘。引用我们的代码注释:“消息是否在 RAM 中以及消息是否持久化是正交的。”
在此批次计算中计数的意的数量是 RAM 中的消息(在上述缓存中)加上 RAM 中待确认的数量(即:已传递给消费者但尚未确认的消息)。如果我们 RAM 中有 20000 条消息(缓存 + 待确认)并且 target ram count 设置为 8000,那么我们将不得不分页出 12000 条消息。这意味着分页将获得 12000 条消息的配额。每分页一条消息到磁盘将消耗该配额中的一个单位,无论是待确认消息,还是来自缓存的实际消息。
一旦我们知道需要分页出多少消息,我们就需要决定首先从哪里分页:待确认消息,还是消息缓存。如果待确认消息的增长速度快于消息缓存,即:传递给消费者的消息多于摄入的消息,那么算法将尝试先分页出待确认消息,然后尝试将消息从缓存推送到文件系统。如果缓存的增长速度快于待确认消息,那么将首先推送缓存中的消息。
这里的问题是,从待确认消息(或优先的缓存)分页消息可能会导致该过程的第一部分消耗掉所有需要分页到磁盘的消息配额。因此,如果待确认消息将 12000 条待确认消息分页到磁盘(如我们的示例),这意味着我们不会从缓存中分页出消息,反之亦然。
分页过程的第一部分是将一定数量的消息(来自待确认 + 从缓存中分页的消息)发送到磁盘。已分页的消息仅将其内容分页出去,但它们在队列中的位置仍在 RAM 中。现在队列需要决定是否还需要将 RAM 中保留的这些额外信息分页出去,以进一步减少内存使用。这里 msg_store_io_batch_size(与 msg_store_credit_disc_bound 一起)最终发挥作用。让我们尝试理解它们是如何工作的。
msg_store_credit_disc_bound 的设置影响将消息发送到磁盘时内部信用流的_处理方式。rabbitmq_msg_store 模块实现了一个负责将消息持久化到磁盘的数据库。有关此实现原因的一些细节可以在这里找到:RabbitMQ,后端存储,数据库和磁盘。
消息存储为向其写入的每个客户端都有一个信用系统。每个 RabbitMQ 队列都是此存储的读/写客户端。消息存储具有信用机制,以防止特定写入器向其收件箱中注入过多消息。假设当前默认值,当一个写入器开始与消息存储通信时,它会获得2000条消息的初始信用额度,并且在处理了500条消息后会获得更多信用额度。那么这个信用额度是什么时候被消耗的呢?信用额度在写入消息存储时被消耗,但这并不是每次消息都会发生。事情变得复杂了。
从 3.5.0 版本开始,可以将小消息嵌入到队列索引中,而无需访问消息存储。小于可配置设置(目前为 4096 字节)的消息在持久化时会进入队列索引,因此这些消息不会消耗此信用额度。现在,让我们看看需要进入消息存储的消息。
每当我们发布一条被确定为持久化的消息(发布到持久化队列的持久化消息)时,该消息将消耗这些信用额度中的一条。如果一条消息需要从上述缓存分页到磁盘,它也将消耗一条信用额度。因此,如果在消息分页过程中消耗的信用额度超过了当前为我们的队列提供的信用额度,分页过程的第一部分可能会停止,因为当消息存储无法接受时,将写入发送到消息存储是没有意义的。这意味着,从我们原本需要分页的 12000 条消息的初始配额中,我们只能处理其中的 2000 条(假设所有这些消息都需要进入消息存储)。
因此,我们成功地分页出了 2000 条消息,但我们仍然在 RAM 中保留它们在队列中的位置。现在分页过程将确定是否还需要将这些消息的位置分页到磁盘。RabbitMQ 将计算有多少可以保留在 RAM 中,然后尝试将剩余的消息分页到磁盘。要进行第二次分页,必须分页到磁盘的消息数量大于 msg_store_io_batch_size。这个数字越大,RabbitMQ 将在 RAM 中保留的消息位置就越多,所以同样,根据你的具体工作负载,你也需要调整这个参数。
我们在 3.5.5 版本中还显著改进了将队列索引内容分页到磁盘的性能。如果你的消息通常小于 queue_index_embed_msgs_below,那么你将看到这些更改的好处。这些更改还会影响消息位置如何分页到磁盘,因此你应该在这个领域看到改进。所以,虽然较低的 msg_store_io_batch_size 可能意味着队列索引在分页到磁盘时需要更多工作,但请记住,这个过程已经过优化。
队列镜像
为了简化上述描述,我们没有涉及队列镜像。从通道的角度来看,流控也会影响镜像。当通道将 AMQP 消息传递给队列时,它会将消息发送给每个镜像,并消耗每个镜像进程的一个信用额度。如果任何一个镜像处理消息的速度很慢,那么该特定镜像可能导致通道被阻塞。如果通道被镜像阻塞,并且该队列镜像与网络断开连接,那么只有在 RabbitMQ 检测到镜像死亡后,通道才会被解除阻塞。
流控在同步镜像队列时也起作用,但这并不是你太应该关心的事情,主要是因为你对此无能为力,因为镜像同步完全由 RabbitMQ 处理。
结论
无论如何,我们希望这篇博文内容丰富,能够帮助你优化 RabbitMQ。如果你对新的流控设置有任何评论或问题,请随时通过 RabbitMQ 邮件列表与我们联系: rabbitmq-users。