跳至主内容

使用 RabbitMQ 防止无界缓冲区

·9 分钟阅读
Álvaro Videla

我们架构中的不同服务在运行过程中需要一定量的资源,无论是 CPU、RAM 还是磁盘空间,我们都需要确保有足够的资源。如果我们不对服务器将要使用的资源数量设置限制,迟早会遇到麻烦。当数据库耗尽文件系统空间、媒体存储填满图片且从不转移它们,或者 JVM 耗尽 RAM 时,就会发生这种情况。即使是备份解决方案,如果不设置过期/删除旧备份的策略,也会成为一个问题。嗯,队列也不例外。我们必须确保我们的应用程序不会让队列无休止地增长。我们需要有某种策略来删除/驱逐/迁移旧消息。

为什么可能出现此问题?

队列堆积消息的原因有很多。第一个原因是我们的数据生产者生产速度超过了消费者消费速度。幸运的是,解决方案很简单:添加更多消费者。

如果我们的应用程序仍然无法处理负载,会发生什么?例如,如果您的消费者处理每条消息花费的时间太长,而您又因为用完了服务器而无法添加更多消费者。那么您的队列将开始堆积消息。RabbitMQ 已经针对快速消息传递进行了优化,其队列中的消息数量尽可能少。虽然 RabbitMQ 提供了各种流量控制机制,但您可能希望有一种方法可以防止进入启动流量控制的境地。让我们看看 RabbitMQ 在这方面能提供什么帮助。

每个队列的消息 TTL

RabbitMQ 允许为每个队列设置消息 TTL,这样服务器就不会传递在队列中存活时间超过指定队列 TTL 的消息。此外,服务器将尽快尝试使这些消息过期或发送到死信队列。

这在您的数据只有在按时到达时对生产者才有意义的情况下非常有效。如果您的数据不能被丢弃,但您仍然希望队列尽可能保持为空,那么请参阅下面的“死信”部分

有两种方法可以设置队列 TTL,一种是在 `queue.declare` 期间传递一些额外参数,如下所示:

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-message-ttl", 60000);
channel.queueDeclare("myqueue", false, false, false, args);

前面的代码将告诉 RabbitMQ 在 60 秒后使 `myqueue` 队列上的消息过期。

同样也可以通过向队列添加策略来设置:

rabbitmqctl set_policy TTL ".*" '{"message-ttl":60000}' --apply-to queues

此策略将匹配默认虚拟主机中的所有队列,并将使消息在 60 秒后过期。请注意,Windows 命令略有不同。当然,您可以让该策略仅匹配一个队列。有关更多详细信息,请参阅:参数和策略

如果我们想要更精细地控制哪些消息会被过期,该怎么办?

每个消息 TTL

RabbitMQ 还支持设置每个消息的 TTL。我们可以通过在 `basic.publish` 方法调用中设置 `expiration` 字段来设置消息的 TTL。与前一种情况一样,该值应以毫秒为单位。以下代码将发布一条将在 60 秒后过期的消息:

byte[] messageBodyBytes = "Hello, world!".getBytes();
AMQP.BasicProperties properties = new AMQP.BasicProperties();
properties.setExpiration("60000");
channel.basicPublish("my-exchange", "routing-key", properties, messageBodyBytes);

如果我们结合使用每个消息 TTL 和每个队列 TTL,那么最短的 TTL 将生效。RabbitMQ 将确保消费者永远不会收到过期消息,但在每个消息 TTL 的情况下,直到这些消息到达队列头部,它们才会被过期。

队列 TTL

使用 RabbitMQ,我们还可以让整个队列过期,即在一段时间不使用后由 RabbitMQ 删除。假设我们设置队列在 1 小时后过期。如果在一小时内,该队列没有消费者,没有发出 `basic.get` 命令,或者队列没有被重新声明,那么 RabbitMQ 将认为它未被使用并将其删除。

您可能希望使用此功能,例如,当您的用户在线时为他们创建队列,但在不活动 15 分钟后希望删除这些队列。可以想象一个为每个连接用户保留一个队列的聊天应用程序。您可以声明一个 `auto_delete` 队列,一旦用户关闭通道就会消失,但这对于某些场景可能很有用,但如果用户因移动网络连接质量差而断开连接,又会怎样呢?当然,您不希望在用户断开连接后立即删除他们的所有消息。有了这个功能,您可以让这些队列再保留一段时间。

这是使用 Java 客户端设置 15 分钟队列过期的方法:

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-expires", 900000);
channel.queueDeclare("myqueue", false, false, false, args);

以及通过策略

rabbitmqctl set_policy expiry ".*" '{"expires":900000}' --apply-to queues

队列长度限制

如果我们希望我们的队列消息数量不超过某个阈值,我们可以通过在声明队列时使用 `x-max-length` 参数来配置。这是一种相当不错且简单的控制容量的方法;如果我们的队列达到阈值并且有新消息到达,那么队列头部的“旧消息”将被丢弃,为新到达的消息腾出空间。这种行为的原因之一是旧消息可能对您的应用程序来说已无关紧要,因此新消息会被允许进入队列。

请记住,队列长度仅考虑已准备好传递的消息。未确认的消息不会计入计数。拥有正确的 `basic.qos` 设置将有助于您的应用程序,因为默认情况下 RabbitMQ 会尽可能多地向消费者发送消息,从而导致您的队列似乎为空,但实际上您有很多未确认的消息也占用了资源。

设置队列长度限制非常简单,以下是一个设置限制为 10 条消息的 Java 示例:

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-max-length", 10);
channel.queueDeclare("myqueue", false, false, false, args);

以及通过策略

rabbitmqctl set_policy Ten ".*" '{"max-length":10}' --apply-to queues

混合策略

请记住,在任何给定时间,最多只会有一个策略应用于队列。因此,如果您连续运行前面的 `set_policy` 命令,则只有最后一个才会生效。让多个策略应用于同一资源的技巧是将所有策略一起放在同一个 JSON 对象中,例如:

rabbitmqctl set_policy capped_queues "^capped\." \ 
'{"max-length":10, "expires":900000, "message-ttl":60000}' --apply-to queues

根本不排队

等等,我没看错吧?是的。不排队。

想象一下,在一个非常忙碌的日子,您来到邮局,发现所有柜台都已满员。由于您没有时间等待排队,所以您直接离开了,继续做您之前在做的事情。换句话说:您有一个请求需要立即得到服务,即:无需排队。那么,RabbitMQ 可以对您的应用程序消息和队列做类似的事情。

诀窍在于将 `per-queue-TTL` 设置为 `0`(零)。如果消息无法立即传递给消费者,它们将立即过期。如果您设置了死信交换,那么您可以将消息发送到单独的队列。

死信

我们已经提到过死信几次了。这个功能的作用是,您可以为队列设置一个死信交换 (DLX),然后当队列中的消息过期,或者队列长度超过限制时,该消息将被发布到 DLX。您可以将一个单独的队列绑定到该交换,然后稍后处理发送到那里的消息。

以下是设置 DLX 的 `queue.declare` 示例:

channel.exchangeDeclare("some.exchange.name", "direct");

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-dead-letter-exchange", "some.exchange.name");
channel.queueDeclare("myqueue", false, false, false, args);

死信消息将使您的队列保持正确的尺寸和预期的消息量,但这并不能防止节点被消息填满。如果这些消息在同一节点上的另一个队列中排队,那么最终这个新的死信队列可能会成为一个问题。在这种情况下,您可以使用交换联邦将这些消息发送到单独的节点,并与应用程序的主流程分开处理。

结论

关于到达我们系统的请求,队列理论的一个基本问题可以表述如下1

λ = 平均到达时间
µ = 平均服务率
如果 λ > µ,会发生什么?
队列长度随时间趋于无穷。

我们知道,如果我们发现我们架构的任何一点遇到这个问题,迟早我们的应用程序都会陷入麻烦。幸运的是,RabbitMQ 提供了许多针对此问题量身定制的功能,如队列和消息 TTL、队列过期和队列长度。更有趣的是,我们不需要因为使用这些功能而丢失消息。死信交换可以帮助我们将消息重新路由到更合适的地方。是时候将这些技术融入我们的队列和消息传递工具箱了。

脚注

  1. Performance Modeling and Design of Computer Systems: Queueing Theory in Action

© . This site is unofficial and not affiliated with VMware.