使用 RabbitMQ 阻止无界缓冲区
我们架构中的不同服务需要一定量的资源来运行,无论是 CPU、RAM 还是磁盘空间,我们都需要确保有足够的资源。如果我们不限制服务器要使用的资源数量,那么在某个时刻我们会遇到麻烦。这会发生在您的数据库用完文件系统空间时,或者您的媒体存储空间被图片填满而您从未将其转移到其他地方时,或者您的 JVM 用完 RAM 时。即使您的备份解决方案也会有问题,如果您没有过期/删除旧备份的策略。嗯,队列也不例外。我们必须确保我们的应用程序不会让队列永远增长。我们需要有一些策略来删除/驱逐/迁移旧消息。
为什么会出现这个问题?
我们的队列可能会因为很多原因被消息填满。第一个原因是数据生产者超过了消费者的处理速度。幸运的是,解决方案很简单:添加更多消费者。
如果我们的应用程序仍然无法处理负载怎么办?例如,您的消费者处理每条消息需要很长时间,而您无法添加更多消费者,因为您已经用完服务器了。然后您的队列将开始被消息填满。RabbitMQ 已针对快速消息传递进行了优化,队列中包含的尽可能少的 消息。虽然 RabbitMQ 带有各种流量控制机制,但您可能希望有一种方法来防止进入流量控制被激活的情况。让我们看看 RabbitMQ 如何帮助我们。
每个队列消息 TTL
RabbitMQ 允许设置每个队列消息 TTL,这将使服务器不传递在队列中停留时间超过定义的每个队列 TTL 的消息。此外,服务器将尽快尝试过期或死信这些消息。
当您拥有仅对生产者在指定时间内到达时才相关的的数据时,这非常有效。如果您的数据不能丢弃,但您仍然希望队列尽可能为空,请参阅下面的“死信”部分。
有两种方法可以设置队列 TTL,一种是通过在 queue.declare
期间传递一些额外的参数,如下所示
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-message-ttl", 60000);
channel.queueDeclare("myqueue", false, false, false, args);
上面的代码将告诉 RabbitMQ 在 60 秒后使队列 myqueue
上的消息过期。
可以通过向队列添加策略来设置相同的配置
rabbitmqctl set_policy TTL ".*" '{"message-ttl":60000}' --apply-to queues
此策略将匹配默认虚拟主机中的所有队列,并将使消息在 60 秒后过期。请注意,Windows 命令有点不同。当然,您可以使该策略仅匹配一个队列。有关此方面的更多详细信息,请参阅:参数和策略。
如果我们想要对哪些消息将过期进行更细粒度的控制怎么办?
每条消息 TTL
RabbitMQ 还支持设置每条消息 TTL。我们可以通过在 basic.publish
方法调用中设置 expiration
字段来设置消息的 TTL。与前面的情况一样,该值应以毫秒为单位。以下代码将发布将在 60 秒后过期的消息
byte[] messageBodyBytes = "Hello, world!".getBytes();
AMQP.BasicProperties properties = new AMQP.BasicProperties();
properties.setExpiration("60000");
channel.basicPublish("my-exchange", "routing-key", properties, messageBodyBytes);
如果我们将每条消息 TTL 与每个队列 TTL 结合使用,那么最短的 TTL 将优先。RabbitMQ 将确保消费者永远不会收到过期消息,但在每条消息 TTL 的情况下,这些消息将不会过期,直到它们到达队列的头部。
队列 TTL
使用 RabbitMQ,我们还可以让整个队列过期,也就是说,在队列在一定时间内未被使用后由 RabbitMQ 删除。假设我们设置了在 1 小时后过期队列。如果在一个小时内,该队列上没有消费者,没有发出 basic.get 命令,或者队列没有重新声明,那么 RabbitMQ 将认为它未被使用并将删除它。
例如,如果您为在线用户创建队列,但在 15 分钟的闲置时间后您想删除这些队列,您可能需要使用此功能。假设一个聊天应用程序为每个连接用户保留一个队列。您可以声明一个在用户关闭频道后就会消失的 auto_delete
队列,但这可能对某些场景有用,如果用户实际上断开连接是因为他们在移动网络中,连接质量很差怎么办?当然,您不希望在他们断开连接后立即删除他们的所有消息。使用此功能,您可以让这些队列存活更长时间。
以下是使用 Java 客户端设置 15 分钟队列过期时间的方法
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-expires", 900000);
channel.queueDeclare("myqueue", false, false, false, args);
以及通过策略的方法
rabbitmqctl set_policy expiry ".*" '{"expires":900000}' --apply-to queues
队列长度限制
如果我们希望队列中的消息数量不超过某个阈值,我们可以通过在声明队列时使用 x-max-length
参数来配置它。这是一种非常简单有效的方法来控制容量;如果我们的队列达到阈值,并且到达了新的消息,那么队列前端的“旧消息”将被丢弃,为新到达的消息腾出空间。这种行为的原因之一是,旧消息可能与您的应用程序无关,因此允许新消息进入队列。
请记住,队列长度仅考虑准备交付的消息。未确认的消息不会计入计数。具有适当的 basic.qos
设置将在此处帮助您的应用程序,因为默认情况下,RabbitMQ 会尽可能多地将消息发送给消费者,从而导致队列看起来为空,但实际上您有很多未确认的消息,这些消息也会占用资源。
设置队列长度限制非常容易,以下是一个在 Java 中设置 10 条消息限制的示例
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-max-length", 10);
channel.queueDeclare("myqueue", false, false, false, args);
以及通过策略的方法
rabbitmqctl set_policy Ten ".*" '{"max-length":10}' --apply-to queues
混合策略
请记住,最多 只有一个策略在任何给定时间应用于一个队列。因此,如果您依次运行前面的 set_policy
命令,那么只有最后一个命令会生效。使多个策略应用于同一资源的诀窍是将所有策略一起传递到同一个 JSON 对象中,例如
rabbitmqctl set_policy capped_queues "^capped\." \
'{"max-length":10, "expires":900000, "message-ttl":60000}' --apply-to queues
完全不排队
等等,我读对了吗?是的。完全不排队。
假设您在一个非常繁忙的日子里,您来到邮局,却发现每个柜台都忙着。由于您没有时间浪费在排队上,您只是转身继续做您之前正在做的事情。换句话说:您有一个需要立即处理的请求,也就是:无需排队。嗯,RabbitMQ 可以对您的应用程序消息和队列执行类似的操作。
诀窍在于将 per-queue-TTL
设置为 0
(零)。如果消息无法立即传递给消费者,那么它们将立即过期。如果您设置了一个死信交换机,那么您可以让消息被死信到一个单独的队列中。
死信
我们之前已经多次提到过死信。此功能的作用是,您可以为其中一个队列设置一个死信交换机 (DLX),然后当该队列上的消息过期或队列限制超过时,消息将发布到 DLX。您可以将一个单独的队列绑定到该交换机,然后稍后处理发送到那里的消息。
以下是一个设置 DLX 的 queue.declare
示例
channel.exchangeDeclare("some.exchange.name", "direct");
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-dead-letter-exchange", "some.exchange.name");
channel.queueDeclare("myqueue", false, false, false, args);
死信消息将使您的队列保持正确的大小和预期的消息数量,但这并不能防止节点被消息填满。如果这些消息在同一节点上的另一个队列中排队,那么在某个时候,这个新的死信队列可能会出现问题。在这种情况下,您可以使用交换机联合将这些消息发送到一个单独的节点,并与应用程序的主流程分开处理它们。
结论
关于到达我们系统的请求的排队理论的基本问题之一可以表述如下1
λ = 平均到达时间
µ = 平均服务速率
如果 λ > µ 会发生什么?
队列长度随着时间的推移趋于无穷大。
我们知道,如果我们在架构中的任何地方遇到此问题,我们的应用程序迟早会遇到麻烦。幸运的是,RabbitMQ 提供了许多功能,例如队列和消息 TTL、队列过期和队列长度,专门用来避免这个问题。更有趣的是,我们不必因为使用这些功能而丢失消息。死信交换机可以帮助我们将消息重新路由到更合适的位置。现在是时候将这些技术纳入我们的排队和消息传递工具中了。