消费者优先级与 RabbitMQ
在 RabbitMQ 3.2.0 中,我们引入了 消费者优先级,顾名思义,它允许我们为消费者设置优先级。这让我们能够在一定程度上控制 RabbitMQ 如何将消息传递给消费者,以实现对应用程序有益的另一种调度方式。
您会在什么时候在代码中使用消费者优先级?
异构集群
假设我们的工作节点集群并非运行在完全相同的硬件上。某些机器拥有一些硬件功能,这些功能使其在处理我们正在运行的任务类型时比集群中的其他机器具有优势。例如,某些机器配备了 SSD,而我们的任务需要大量的 I/O;或者任务需要更快的 CPU 来执行计算;或者需要更多的 RAM 来缓存未来计算的结果。无论如何,如果我们有两个消费者都已准备好接收更多消息,而其中一个位于更好的机器上,那么 RabbitMQ 应该优先选择位于更好机器上的消费者并将其传递消息,而不是选择位于较差机器上的另一个消费者。请记住,消费者优先级 *仅* 对已准备好接收消息的消费者生效。因此,如果位于较差机器上的某个消费者已准备就绪,而更好的机器上没有已准备就绪的消费者,那么 RabbitMQ 将直接将消息发送到该特定消费者,而不会等待更快的消费者可用。
数据本地性
消费者优先级的另一个用途是利用数据本地性。在 RabbitMQ 中,队列内容驻留在最初声明队列的节点上,对于 镜像队列,会有一个主节点协调队列。因此,虽然消费者可以连接到集群中的各种节点并从镜像中获取消息,但归根结底,关于谁消费了哪些消息的信息会发送回主节点。在这种情况下,我们可以使用消费者优先级来指示 RabbitMQ 首先将消息传递给连接到主节点的消费者。为此,连接到主节点的消费者在发出 basic.consume 命令时会为自己设置更高的优先级(前提是它有办法知道自己连接到了主节点)。
声明消费者优先级
下面是使用 RabbitMQ Java 客户端 声明消费者优先级的示例代码。
import java.util.*;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;
public class Consumer {
private final static String EXCHANGE_NAME = "my_exchange";
private final static String QUEUE_NAME = "my_queue";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
System.out.println("Waiting for messages. To exit press CTRL+C");
QueueingConsumer consumer = new QueueingConsumer(channel);
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-priority", 10);
channel.basicConsume(QUEUE_NAME, false, "", false, false, args, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("Received '" + message + "'");
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
此代码实现了基于 教程 1 示例的一个非常简单的消费者。有趣的部分从第 25 行到第 27 行,在那里我们首先创建一个 HashMap 来保存传递给 basicConsume 的参数。我们创建了一个名为 x-priority 的参数,其值为 10(值越高,优先级越高)。当我们调用 basicConsume 时,我们将这些参数传递给 RabbitMQ,仅此而已!这是一个非常强大的功能,而且相当简单易用。一如既往,运行性能测试以决定最适合我们消费者的优先级策略是明智的。