跳至主要内容

使用 RabbitMQ 实现分布式信号量

·阅读时长:7 分钟
Alvaro Videla

在本篇博文中,我们将探讨如何解决分布式系统中控制对特定资源的访问问题。解决这个问题的技术在计算机科学领域早已为人熟知,它被称为信号量,由 Dijkstra 于 1965 年在其名为“协作顺序进程”的论文中提出。我们将了解如何使用 AMQP 的构建块(如消费者、生产者和队列)来实现它。

对信号量的需求

在深入探讨实际解决方案之前,让我们先看看我们什么时候会真正需要这样的东西。

假设我们的应用程序有多个进程从队列中获取任务,然后将记录插入数据库,我们可能需要限制同时执行此操作的进程数量。

类似地,工作者正在调整图像大小,这些图像在准备就绪后需要通过网络存储在远程服务器上。我们希望防止图像传输使我们的网络链接过载,因此我们也限制了工作者同时传输图像的数量。这样,当工作者以最快的速度调整图像大小时,它们将按批次将图像移动到最终目的地,直到轮到它们使用网络链接为止。

另一个与 RabbitMQ 相关的例子是,您的应用程序可能需要从一组生产者中只有一个生产者向交换机发送消息,但一旦该进程停止,您希望该组中的下一个生产者开始发送消息。您可能希望这样做的原因有很多,容量控制就是其中之一。

另一方面,消费者可能需要竞争访问队列,但虽然 AMQP 提供了一种方法来拥有独占队列和独占消费者,但闲置的消费者无法知道队列访问何时释放。因此,使用与上述类似的方法,我们可以让消费者轮流访问队列。

值得注意的是,没有什么能阻止我们有多个进程访问特定的资源。假设我们有十个生产者,但我们只想让其中五个同时发布消息。使用信号量,我们也可以实现这一点。

前面的例子都有一些共同的额外要求:竞争资源的进程不应该轮询 RabbitMQ 或其他协调器以了解何时可以开始工作。理想情况下,它们将处于闲置状态,等待轮到自己,一旦资源释放,RabbitMQ 将通知下一个进程,以便它可以自动开始工作。

现在让我们继续进行实现。

实现信号量

我们的信号量将使用队列和消息来实现。不出所料!

我们首先声明一个名为 resource.semaphore 的队列,其中 resource 将是我们信号量将要控制的资源的名称,它可以是“images”、“database”、“file_server”,或者适合我们特定应用程序的任何名称。

我们将一条消息发布到 resource.semaphore 队列中。然后我们启动将寻求访问该消息的进程。每个进程都将从 resource.semaphore 队列中消费;第一个到达的进程将获得该消息,所有其他进程将处于闲置状态,等待它。诀窍是,这些进程永远不会确认消息,但它们将使用 ack_mode=onresource.semaphore 队列中消费。因此,RabbitMQ 将跟踪消息,如果进程崩溃或退出,消息将返回队列,并将传递给正在监听我们信号量队列的下一个进程。

通过这种简单的技术,我们将一次只允许一个进程访问资源,并且可以确定进程在崩溃时不会持有资源。当然,我们假设所有访问信号量的进程都是行为良好的,即:它们永远不会确认消息。如果这样做,RabbitMQ 将删除该消息,并且该组中的所有其他进程将处于饥饿状态。

当一个进程想要停止时,我们该怎么做?它如何返回“令牌”?当然,进程可以突然关闭通道,RabbitMQ 会自动处理该消息,但也有一种礼貌的方式。进程可以basic.reject该消息,告诉 RabbitMQ 将消息重新排队,以便它返回到信号量队列。

让我们看看如何在代码中实现这一点,我们假设我们已经获得了一个连接和一个通道。

以下是设置信号量的代码。

channel.queueDeclare("resource.semaphore", true, false, false, null);
String message = "resource";
channel.basicPublish("", "resource.semaphore", null, message.getBytes());

我们创建一个名为 "resource.semaphore" 的持久队列,然后使用默认交换机向其发布一条消息。

以下是进程用来访问信号量的代码。

QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicQos(1);
channel.basicConsume("resource.semaphore", false, consumer);

while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();

// here we access the resource controlled by the semaphore.

if(shouldStopProcessing()) {
channel.basicReject(delivery.getEnvelope().getDeliveryTag(), true);
}
}

在那里,我们创建了一个 QueueingConsumer,它正在等待来自 "resource.semaphore" 队列的消息。我们通过在 basicQos 调用中将prefetch-count 设置为 1 来确保我们的进程只从队列中获取一条消息。一旦消息到达,该进程将开始使用该资源。当条件 shouldStopProcessing() 满足时,该进程将 basicReject 该消息,告诉 RabbitMQ 将其重新排队。请记住,消费者是在 ack-mode 模式下启动的,并且它永远不会确认从信号量队列中收到的消息。如果这样做,则它被认为有错误。

优先访问信号量

是否有可能优先访问信号量?是的,从 RabbitMQ 3.2.0 版本开始,RabbitMQ 支持 消费者优先级。通过使用消费者优先级,我们可以告诉 RabbitMQ 在传递信号量的令牌消息时,哪些进程应该优先。

二进制信号量与计数信号量

到目前为止,我们已经实现了所谓的二进制信号量,即一次只允许一个进程访问资源的信号量。如果我们可以允许多个进程同时访问同一个资源,但我们仍然需要对该操作进行限制,那么我们可以实现一个计数信号量。为此,当我们设置信号量时,我们可以发布与允许同时工作的进程数量一样多的消息,而不是发布一条消息。我们需要确保我们的进程将prefetch-count 值设置为 1,就像之前一样。

更改计数

请注意,设置信号量队列的进程可以随着时间的推移添加额外的消息来提高处理能力。如果我们想减少可以同时访问资源的进程数量,那么我们需要停止正在运行的进程并清除队列。另一种方法是启动一个具有非常 高优先级 的额外消费者,以便它可以从信号量队列中获取尽可能多的消息并确认它们,以便它们从系统中删除。

一些阅读材料

正如您所看到的,使用 AMQP 的基本构造来实现信号量非常容易,而且使用 RabbitMQ,我们还可以优先访问资源。

最后,我想分享一些关于信号量作为并发构造的文章。首先是 Dijkstra 的开创性论文 协作顺序进程。最后是维基百科关于信号量的文章,它解释了许多定义: 信号量

编辑:20.02.2014

正如 这里以及其他地方与我的同事讨论的那样,这种设置不具备对 网络分区 的弹性,因此请谨慎使用。感谢 @aphyr 和其他人对博文的反馈。RabbitMQ 团队一直希望保持诚实,告诉我们的用户服务器能够做什么,以及不能做什么。

编辑:10.03.2014

值得注意的是,这种设置通常不具备对网络故障的弹性。例如,可能发生这种情况:一个工作者拥有一个令牌,但与服务器的连接突然断开。然后,服务器将获取令牌并将其排队,以便可以将其传递给另一个工作者。在此期间,网络连接断开的 worker 仍然认为自己拥有令牌,因此,它将继续访问它不应该访问的资源。

© 2024 RabbitMQ. All rights reserved.