RabbitMQ 3.5.5 的新信用流设置
为了防止快速发布者在任何特定时刻向代理发送超出其处理能力的消息,RabbitMQ 实现了一种称为信用流的内部机制,该机制将由 RabbitMQ 内部的各种系统用来限制发布者,同时允许消息消费者赶上进度。在这篇博文中,我们将了解信用流的工作原理,以及我们可以做些什么来调整其配置以获得最佳行为。
RabbitMQ 的最新版本包含几个新的配置值,允许用户调整内部信用流设置。根据您的特定工作负载了解这些值如何工作,可以帮助您在性能方面充分利用 RabbitMQ,但请注意,仅仅为了看看会发生什么而增加这些值可能会对 RabbitMQ 响应消息突发的方式产生不利影响,影响 RabbitMQ 用于处理内存压力的内部策略。请谨慎操作。
要了解新的信用流设置,首先我们需要了解 RabbitMQ 在消息发布和将消息分页到磁盘方面内部的工作原理。让我们首先看看 RabbitMQ 中的消息发布是如何工作的。
消息发布
要了解credit_flow
及其设置如何影响发布,让我们看看 RabbitMQ 中内部消息的流动方式。请记住,RabbitMQ 是用 Erlang 实现的,其中进程通过相互发送消息进行通信。
每当 RabbitMQ 实例运行时,可能会有数百个 Erlang 进程交换消息以相互通信。例如,我们有一个读取器进程,它从网络读取 AMQP 帧。这些帧被转换为 AMQP 命令,然后转发到 AMQP 通道进程。如果此通道正在处理发布,则需要向特定交换机请求此消息最终应发送到的队列列表,这意味着通道将向每个队列传递消息。最后,如果需要持久化 AMQP 消息,则msg_store进程将接收它并将其写入磁盘。因此,每当我们将 AMQP 消息发布到 RabbitMQ 时,我们都会有以下 Erlang 消息流1
reader -> channel -> queue process -> message store.
为了防止任何这些进程淹没链中的下一个进程,我们设置了信用流机制。每个进程最初都会向发送消息的进程授予一定数量的信用。一旦某个进程能够处理 N 条消息,它就会向发送这些消息的进程授予更多信用。在默认的信用流设置(rabbitmq.config
下的credit_flow_default_credit
)下,这些值为 200 条初始信用消息,并且在接收进程处理 50 条消息后,发送消息的进程将被授予 50 条更多信用。
假设我们正在向 RabbitMQ 发布消息,这意味着读取器将为每个接收到的 AMQP basic.publish
向通道进程发送一条 Erlang 消息。每条消息将消耗通道的一个信用。一旦通道能够处理 50 条消息,它就会向读取器授予更多信用。到目前为止一切顺利。
反过来,通道将消息发送到与消息路由规则匹配的队列进程。这将消耗队列进程授予通道的信用的一个信用。在队列进程设法处理 50 次传递后,它将向通道授予 50 个更多信用。
最后,如果消息被认为是持久的(它是持久的并发布到持久队列),它将被发送到消息存储,在这种情况下,它也将消耗消息存储授予队列进程的信用的一个信用。在这种情况下,初始值不同,由msg_store_credit_disc_bound
设置处理:初始信用为2000条消息,并且在消息存储处理 500 条消息后,再授予500个更多信用。
因此,我们知道内部消息如何在 RabbitMQ 中流动以及何时向消息流中上游的进程授予信用。棘手的是在进程之间授予信用时。在正常情况下,通道将处理读取器发来的 50 条消息,然后向读取器授予 50 个更多信用,但请记住,通道不仅仅处理发布,它还向消费者发送消息,将消息路由到队列等等。
如果读取器以高于通道处理速度的速度向通道发送消息会发生什么?如果我们遇到这种情况,则通道将阻塞读取器进程,这将导致 RabbitMQ 限制生产者。在默认设置下,一旦读取器向通道发送 200 条消息,但通道无法处理至少 50 条消息以向读取器授予信用,读取器就会被阻塞。
同样,在正常情况下,一旦通道设法处理完消息积压,它将向读取器授予更多信用,但有一个问题。如果由于类似原因,通道进程被队列进程阻塞会怎样?然后,原本应该发送给读取器进程的新信用将被延迟。读取器进程将保持阻塞状态。
一旦队列进程设法处理完来自通道的传递积压,它将向通道授予更多信用,从而解除通道的阻塞,这将导致通道向读取器授予更多信用,从而解除读取器的阻塞。再次强调,这是在正常情况下,但是,你猜对了,如果消息存储阻塞了队列进程会怎样?然后,对通道的信用将被延迟,通道将保持阻塞状态,延迟对读取器的信用,导致读取器保持阻塞状态。在某个时刻,消息存储将向队列进程授予消息,队列进程将向通道授予消息,然后通道最终将向读取器授予消息并解除读取器的阻塞
reader <--[grant]-- channel <--[grant]-- queue process <--[grant]-- message store.
有一个通道和一个队列进程使事情更容易理解,但它可能无法反映现实。RabbitMQ 用户通常有多个通道在同一个连接上发布消息。更常见的是一条消息被路由到多个队列。我们刚刚解释的信用流方案中发生的事情是,如果其中一个队列阻塞了通道,则读取器也将被阻塞。
问题在于,从读取器的角度来看,当我们从网络读取帧时,我们甚至不知道它属于哪个通道。请记住,通道是 AMQP 连接之上的一种逻辑概念。因此,即使新的 AMQP 命令最终将进入一个没有阻塞读取器的通道,读取器也无法知道这一点。请注意,我们只阻塞发布连接,消费者连接不受影响,因为我们希望消费者从队列中提取消息。这是一个很好的理由,说明为什么最好将连接专用于发布消息,并将连接专用于消费者。
以类似的方式,每当通道处理消息发布时,它都不知道消息最终将发送到哪里,直到它执行路由。因此,通道可能正在接收一条消息,该消息应最终发送到一个没有阻塞通道的队列中。由于在入口时我们不知道任何这些信息,因此现有的信用流策略是阻塞读取器,直到链中的进程能够处理新消息。
RabbitMQ 3.5.5 中引入的新设置之一是能够修改credit_flow_default_credit
的值。此设置采用{InitialCredit, MoreCreditAfter}
形式的元组。InitialCredit 默认设置为200,MoreCreditAfter 设置为50。根据您的特定工作流程,您需要决定是否值得增加这些值。让我们再次看看消息流方案
reader -> channel -> queue process -> message store.
增加{InitialCredit, MoreCreditAfter}
的值意味着在该链中的任何点,我们最终都可能拥有比代理在该特定时间点能够处理的更多消息。更多消息意味着更多的 RAM 使用量。对于msg_store_credit_disc_bound
也可以这么说,但请记住,每个 RabbitMQ 实例只有一个消息存储2,并且可能有许多通道向同一个队列进程发送消息。因此,虽然队列进程从消息存储中获得 2000 的InitialCredit
值,但该队列可以从不同的通道/连接源中获取该值的许多倍。因此,200 个初始credit_flow_default_credit
值可能被视为过于保守,但您需要了解根据您的工作流程,这是否足够好。
消息分页
让我们看看 RabbitMQ 队列如何存储消息。当消息进入队列时,队列需要确定是否应持久化该消息。如果必须持久化消息,则 RabbitMQ 将立即执行此操作3。现在,即使消息已持久化到磁盘,也不意味着消息已从 RAM 中删除,因为 RabbitMQ 会在 RAM 中缓存消息,以便在将消息传递给消费者时快速访问。每当我们谈论将消息分页到磁盘时,我们都在谈论 RabbitMQ 在必须将此缓存中的消息发送到文件系统时所做的事情。
当 RabbitMQ 决定需要将消息分页到磁盘时,它将在内部队列实现上调用函数reduce_memory_use
以将消息发送到文件系统。消息将批量分页;这些批次的大小取决于当前的内存压力状态。它基本上是这样工作的
函数reduce_memory_use
将接收一个名为target ram count
的数字,该数字告诉 RabbitMQ 它应该尝试分页消息,直到 RAM 中仅保留这么多消息。请记住,无论消息是否持久化,它们都仍然保存在 RAM 中,以便快速传递给消费者。只有当内存压力出现时,内存中的消息才会被分页到磁盘。引用我们代码中的注释:“消息是否在 RAM 中以及消息是否持久化是正交的”。
在此块计算过程中考虑的消息数量包括 RAM 中的消息(在前面提到的缓存中),以及保存在 RAM 中的待确认消息数量(即:已传递给消费者且正在等待确认的消息)。如果 RAM 中有 20000 条消息(缓存 + 待确认消息),然后将target ram count
设置为 8000,则我们将必须分页 12000 条消息。这意味着分页将收到 12000 条消息的配额。每条分页到磁盘的消息将消耗该配额的一个单位,无论它是待确认的消息还是缓存中的实际消息。
一旦我们知道需要分页多少条消息,我们就需要决定应该从哪里首先分页它们:待确认的消息还是消息缓存。如果待确认的消息增长速度快于缓存中的消息,即:传递给消费者的消息多于被摄取的消息,这意味着算法将首先尝试分页待确认的消息,然后尝试将缓存中的消息推送到文件系统。如果缓存增长速度快于待确认的消息,则将首先推送缓存中的消息。
这里的陷阱是,从待确认的消息(或如果先出现则从缓存)中分页消息可能会导致流程的第一部分消耗所有需要推送到磁盘的消息配额。因此,如果待确认的消息像我们的示例一样将 12000 个确认推送到磁盘,这意味着我们不会从缓存中分页消息,反之亦然。
此分页过程的第一部分将一定数量的消息发送到磁盘(在确认 + 从缓存分页的消息之间)。刚刚分页的消息仅将其内容分页,但它们在队列中的位置仍保留在 RAM 中。现在,队列需要确定是否也需要将保存在 RAM 中的这些额外信息分页出去,以进一步减少内存使用。这就是msg_store_io_batch_size
最终发挥作用的地方(也与msg_store_credit_disc_bound
结合使用)。让我们尝试了解它们的工作原理。
msg_store_credit_disc_bound
的设置会影响将消息发送到磁盘时内部信用流的处理方式。rabbitmq_msg_store
模块实现了一个数据库,负责将消息持久化到磁盘。有关此实现原因的一些详细信息,请参阅此处:RabbitMQ、后备存储、数据库和磁盘。
消息存储为每个向其发送写入的客户端提供了一个信用系统。每个 RabbitMQ 队列都将是此存储的读/写客户端。消息存储具有一个信用机制,以防止特定写入器用消息淹没其收件箱。假设当前默认值,当写入器开始与消息存储通信时,它将收到2000条消息的初始信用,并在处理500条消息后将收到更多信用。那么何时会消耗此信用呢?每当我们写入消息存储时,都会消耗信用,但这并非对每条消息都适用。情节变得复杂了。
从 3.5.0 版本开始,可以将小消息嵌入到队列索引中,而不必为该消息访问消息存储。小于可配置设置(当前为 4096 字节)的消息将在持久化时进入队列索引,因此这些消息不会消耗此信用。现在,让我们看看需要转到消息存储的消息会发生什么。
每当我们发布确定为持久的消息(发布到持久队列的持久消息)时,该消息将消耗这些信用之一。如果必须从上面提到的缓存中将消息分页到磁盘,它也将消耗一个信用。因此,如果在消息分页期间,我们消耗的信用多于当前队列可用的信用,则分页过程的第一部分可能会停止,因为当消息存储不接受写入时,将其发送到消息存储没有意义。这意味着,在我们必须分页的初始 12000 个配额中,我们仅设法处理了其中的 2000 个(假设所有这些都需要转到消息存储)。
因此,我们设法分页了 2000 条消息,但我们仍然保留了它们在 RAM 中的队列位置。现在,分页过程将确定是否还需要将这些消息位置中的任何一个也分页到磁盘。RabbitMQ 将计算其中有多少个可以保留在 RAM 中,然后尝试将其余的分页到磁盘。为了进行第二次分页,必须分页到磁盘的消息数量必须大于msg_store_io_batch_size
。此数字越大,RabbitMQ 将在 RAM 中保留的消息位置就越多,因此,根据您的特定工作负载,您也需要调整此参数。
我们在 3.5.5 中显着改进的另一件事是将队列索引内容分页到磁盘的性能。如果您的消息通常小于queue_index_embed_msgs_below
,那么您将看到这些更改带来的好处。这些更改还会影响消息位置如何分页到磁盘,因此您也应该看到这方面的改进。因此,虽然msg_store_io_batch_size
较低可能意味着队列索引将有更多工作要分页到磁盘,但请记住此过程已得到优化。
队列镜像
为了使上述描述更简单一些,我们避免将队列镜像纳入考虑范围。信用流也会从通道的角度影响镜像。当通道将 AMQP 消息传递到队列时,它会将消息发送到每个镜像,并从每个镜像进程中消耗一个信用。如果任何镜像处理消息的速度很慢,则该特定镜像可能是导致通道阻塞的原因。如果通道被镜像阻塞,并且该队列镜像与网络断开连接,则只有在 RabbitMQ 检测到镜像死亡后,通道才会被解除阻塞。
信用流在同步镜像队列时也起作用,但这并不是您需要过分关注的事情,主要是因为您对此无能为力,因为镜像同步完全由 RabbitMQ 处理。
结论
无论如何,我们希望这篇博文能提供信息,并帮助您进行 RabbitMQ 调优。如果您对新的信用流设置有任何意见或疑问,请随时联系我们,发送邮件至 RabbitMQ 邮件列表:rabbitmq-users。