使用 RabbitMQ 调度消息
一段时间以来,人们一直在寻找使用 RabbitMQ 实现延迟消息的方法。到目前为止,公认的解决方案是使用 James Carr 这里提出的消息 TTL 和死信交换的组合。我们已经考虑为这个问题提供开箱即用的解决方案,并且在过去的一个月中,我们有时间将其作为一个插件实现。请输入 RabbitMQ 延迟消息插件。
**RabbitMQ 延迟消息插件** 在 RabbitMQ 中添加了一种新的交换类型,通过该交换类型路由的消息可以在用户选择的情况下被延迟。让我们看看它是如何工作的。
安装插件
要安装插件,请访问我们的 社区插件页面 并下载与您的 RabbitMQ 安装相对应的 .ez 文件。将插件复制到 RabbitMQ 的插件文件夹中,然后通过运行以下命令启用它
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
插件启用后,我们就可以开始使用它了。
使用交换机
要使用延迟消息交换,您只需要声明一个交换机,提供 "x-delayed-message"
交换类型,如下所示
// ... elided code ...
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-delayed-type", "direct");
channel.exchangeDeclare("my-exchange", "x-delayed-message", true, false, args);
// ... more code ...
稍后我们将解释我们在交换机声明中提供的特殊参数 "x-delayed-type"
的含义。
延迟消息
要延迟消息,用户必须使用名为 x-delay
的特殊标头发布消息,该标头接受一个整数,表示 RabbitMQ 应该延迟消息的毫秒数。值得注意的是,这里的“延迟”意味着:延迟将消息路由到队列或其他交换机。交换机没有消费者概念。因此,一旦延迟时间到期,插件将尝试将消息路由到匹配交换机路由规则和分配给消息的队列。请注意,如果消息无法路由到任何队列,则将被丢弃,如 AMQP 对无法路由的消息所规定。以下是一些示例代码,这些代码将 x-delay
标头添加到消息中,并发布到我们的交换机。
// ... elided code ...
byte[] messageBodyBytes = "delayed payload".getBytes();
AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder();
headers = new HashMap<String, Object>();
headers.put("x-delay", 5000);
props.headers(headers);
channel.basicPublish("my-exchange", "", props.build(), messageBodyBytes);
在前面的示例中,消息将在五秒钟后被插件路由。该示例假设您已建立与 RabbitMQ 的连接并获得了通道。
灵活路由
当我们声明上面的交换机时,我们提供了一个 x-delayed-type
参数,该参数设置为 direct
。这样做的作用是告诉交换机我们希望它在路由消息、创建绑定等方面的行为。在示例中,我们的交换机将像 direct 交换机一样工作,但我们可以传递主题、扇出或其他插件提供的自定义交换机类型。通过这样做,我们不会限制用户在延迟消息插件提供的路由行为方面的选择。
检查消息是否被延迟
一旦我们在消费者端收到消息,我们如何才能判断消息是否被延迟了呢?插件将保留 x-delay
消息标头,但会将传递的值取反。因此,如果您发布了一条延迟 5000
毫秒的消息,则接收该消息的消费者将发现 x-delay
标头设置为 -5000
我们需要反馈
我们已将该插件作为实验性插件发布,以收集社区的反馈。请使用它并向我们报告该插件的 问题页面 或我们的 官方邮件列表。