使用 RabbitMQ 实现分布式信号量
在这篇博文中,我们将解决在分布式系统中控制对特定资源的访问的问题。解决这个问题的方法在计算机科学中是众所周知的,它被称为信号量(Semaphore),由 Dijkstra 在 1965 年的论文“Cooperating Sequential Processes”中发明。我们将看到如何使用 AMQP 的构建块(如消费者、生产者和队列)来实现它。
信号量的需求
在深入实际解决方案之前,让我们先看看我们可能在什么时候真正需要这样的东西。
假设我们的应用程序有许多进程从队列中获取作业,然后将记录插入数据库,我们可能需要限制同时执行此操作的进程数量。
类似地,工作进程正在调整图像大小,一旦准备好,这些图像就需要存储在远程服务器上。我们希望防止我们的网络链路被图像传输淹没,因此我们也限制了同时传输图像的工作进程数量。这样,虽然我们的工作进程会尽可能快地调整图像大小,但一旦轮到它们使用网络链路,它们就会分批将图像传输到最终目的地。
另一个与 RabbitMQ 相关的例子是,您的应用程序可能只需要一组中的一个生产者向交换机发送消息,但一旦该进程停止,您就希望该组中的下一个生产者开始发送消息。您可能出于多种原因需要这样的功能,容量控制就是其中之一。
另一方面,可能需要消费者竞争访问队列,但尽管 AMQP 提供了创建独占队列和独占消费者的机制,但空闲的消费者无法知道队列访问何时被释放。因此,使用与上述类似的方法,我们可以让消费者轮流访问队列。
值得注意的是,没有什么可以阻止我们让一个以上的进程访问某个特定资源。假设我们有十个生产者,但我们只想让其中五个同时发布消息。使用信号量,我们也可以实现这一点。
先前的例子都有一个共同的额外要求:竞争资源的进程不应该轮询 RabbitMQ 或其他协调器来了解何时可以开始工作。理想情况下,它们将处于空闲状态等待轮到它们,一旦资源被释放,RabbitMQ 就会通知下一个进程,使其能够自动开始工作。
现在我们开始实现。
实现信号量
我们将使用队列和消息来实现我们的信号量。惊喜,惊喜!
我们首先声明一个名为 resource.semaphore 的队列,其中 resource 将是我们信号量将要控制的资源的名称,它可以是“images”、“database”、“file_server”或适合我们特定应用程序的任何名称。
我们将 *一条* 消息发布到 resource.semaphore 队列。然后我们启动将尝试访问该消息的进程。每个进程将从 resource.semaphore 队列进行消费;第一个到达的进程将获取消息,而所有其他进程将处于空闲状态等待。诀窍在于,这些进程将 *从不确认* 消息,但它们将以 ack_mode=on 的方式从 resource.semaphore 队列进行消费。因此,RabbitMQ 将跟踪该消息,如果进程崩溃或退出,消息将返回队列,并将传递给监听我们信号量队列的下一个进程。
通过这种简单的技术,我们将一次只有一个进程可以访问该资源,并且我们可以确保进程在崩溃时不会持有该资源。当然,我们假设所有访问信号量的进程都行为良好,即:它们永远不会确认消息。如果它们确认了,RabbitMQ 将删除该消息,并且该组中的所有其他进程将饿死。
当一个进程想要停止时,我们该怎么办?它如何返回“令牌”?当然,进程可以突然关闭通道,RabbitMQ 会自动处理消息,但也有一个礼貌的方式来做到这一点。进程可以 *基本拒绝* 消息,告知 RabbitMQ 重新排队该消息,使其返回到信号量队列。
让我们在代码中看看实现,我们假设我们已经获得了连接和通道。
这是设置我们信号量的代码。
channel.queueDeclare("resource.semaphore", true, false, false, null);
String message = "resource";
channel.basicPublish("", "resource.semaphore", null, message.getBytes());
我们创建一个名为 “resource.semaphore” 的持久化队列,然后使用 *默认* 交换机向其发布一条消息。
这是进程用于访问信号量的代码。
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicQos(1);
channel.basicConsume("resource.semaphore", false, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
// here we access the resource controlled by the semaphore.
if(shouldStopProcessing()) {
channel.basicReject(delivery.getEnvelope().getDeliveryTag(), true);
}
}
在这里,我们创建一个 QueueingConsumer,它正在等待来自 “resource.semaphore” 队列的消息。我们通过在 basicQos 调用中设置 *预取计数* 为 1 来确保我们的进程仅从队列中拾取一条消息。一旦消息到达,进程将开始使用该资源。当满足条件 shouldStopProcessing() 时,进程将 *基本拒绝* 消息,告知 RabbitMQ 重新排队。请记住,消费者是以确认模式启动的,并且它将永远不会确认从信号量队列接收到的消息。如果它这样做,那么它将被视为有缺陷。
信号量的优先级访问
是否可以优先访问信号量?是的,自 3.2.0 版本以来,RabbitMQ 支持消费者优先级。通过使用消费者优先级,我们可以告诉 RabbitMQ 在传递信号量令牌消息时优先考虑哪些进程。
二元信号量与计数信号量
到目前为止,我们已经实现了一个 *二元信号量*,即一次只允许一个进程访问资源的信号量。如果我们允许一个以上的进程同时访问同一资源,但仍然需要限制该操作,那么我们可以实现一个 *计数信号量*。为此,在设置信号量时,我们可以发布与允许同时工作的进程数量相同的消息,而不是发布一条消息。我们需要确保我们的进程将 *预取计数* 值设置为 1,就像我们之前做的那样。
更改计数
请注意,设置信号量队列的进程可以随时添加额外的消息来增加处理能力。如果我们想减少可以同时访问资源的进程数量,那么我们就必须停止正在运行的进程并清除队列。另一种方法是启动一个具有非常高优先级的额外消费者,以便它从信号量队列中获取尽可能多的消息并确认它们,使它们从系统中删除。
一些阅读材料
如您所见,使用 AMQP 基本构造实现信号量非常容易,借助 RabbitMQ,我们还可以优先访问资源。
最后,我想分享一些关于作为并发构造的信号量的文章。首先是 Dijkstra 的开创性论文Cooperating Sequential Processes。最后是维基百科关于信号量的文章,它解释了许多定义:Semaphore。
编辑:2014年2月20日
正如在此处以及与同事的讨论中所述,此设置对网络分区的容错性不强,因此请谨慎使用。感谢@aphyr 和其他人提供博客文章的反馈。在 RabbitMQ 团队,我们始终乐于保持诚实,并告诉用户服务器的功能和局限性。
编辑:2014年3月10日
值得注意的是,此设置通常不能抵抗网络故障。例如,可能会发生工作进程拥有令牌,并且与服务器的连接突然关闭。然后服务器将获取令牌并将其放入队列,以便可以将其传递给另一个工作进程。在此期间,网络连接已关闭的工作进程仍将认为它拥有令牌,因此它将继续访问它不应该访问的资源。