跳至主要内容

使用消费者优先级与 RabbitMQ

·阅读 4 分钟
Alvaro Videla

在 RabbitMQ 3.2.0 中,我们引入了 消费者优先级,顾名思义,它允许我们为消费者设置优先级。这为我们提供了一些控制权,可以控制 RabbitMQ 如何将消息传递给消费者,从而获得可能对我们的应用程序有益的不同类型的调度。

在您的代码中何时需要使用消费者优先级?

异构集群

假设我们的工作集群并非在完全相同的硬件上运行。某些机器具有一些硬件特性,根据我们正在运行的任务类型,使它们在集群中的其他机器中具有优势。例如,某些机器具有 SSD,而我们的任务需要大量的 I/O;或者任务需要更快的 CPU 来执行计算;或者需要更多 RAM 来缓存未来计算的结果。无论哪种情况,如果我们有两个消费者准备接收更多消息,并且其中一个在更好的机器上,那么 RabbitMQ 应该选择更好的机器上的消费者并将消息传递给它,而不是选择较差机器上的另一个消费者,这将很有趣。请记住,消费者优先级**仅**对准备好接收消息的消费者生效。因此,如果我们较差机器中的一个消费者已准备就绪,并且较好机器中没有消费者已准备就绪,那么 RabbitMQ 将直接将消息发送到该特定消费者,而无需等待更快的消费者可用。

数据本地性

消费者优先级的另一个用途是利用数据本地性。在 RabbitMQ 中,队列内容位于最初声明队列的节点上,并且在 镜像队列 的情况下,将有一个主节点来协调队列,因此,虽然消费者可以连接到集群中的各个节点并从镜像中获取消息,但最终,有关谁消费了哪些消息的信息将传回主节点。在这种情况下,我们可以使用消费者优先级来告诉 RabbitMQ 首先将消息传递给连接到主节点的消费者。为此,连接到主节点的消费者在发出 basic.consume 命令时将为自己设置更高的优先级(前提是它有办法知道它已连接到主节点)。

声明消费者优先级

您可以在下面找到示例代码,该代码显示了如何使用 RabbitMQ Java 客户端 声明消费者优先级。

import java.util.*;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;

public class Consumer {

private final static String EXCHANGE_NAME = "my_exchange";
private final static String QUEUE_NAME = "my_queue";

public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
System.out.println("Waiting for messages. To exit press CTRL+C");

QueueingConsumer consumer = new QueueingConsumer(channel);

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-priority", 10);
channel.basicConsume(QUEUE_NAME, false, "", false, false, args, consumer);

while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("Received '" + message + "'");
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}

此代码基于 教程 1 中的示例实现了一个非常简单的消费者。有趣的部分是从第 25 行到第 27 行,我们首先创建一个 HashMap 来保存传递给 basicConsume 的参数。我们创建一个名为 x-priority 的参数,其值为 10(值越高,优先级越高)。当我们调用 basicConsume 时,我们将这些参数传递给 RabbitMQ,就是这样!一个非常强大的功能,使用起来相当简单。像往常一样,最好运行性能测试以确定什么才是我们消费者最佳的优先级策略。

© 2024 RabbitMQ. All rights reserved.