使用 RabbitMQ 调度消息
一段时间以来,人们一直在寻找在 RabbitMQ 中实现延迟消息的方法。到目前为止,公认的解决方案是使用 消息 TTL 和 死信交换器 的组合,NServiceBus 在此 处 实现。在考虑了开箱即用的解决方案一段时间后,我们有机会将其实现为一个插件。引入 RabbitMQ 延迟消息插件。
RabbitMQ 延迟消息插件 为 RabbitMQ 添加了一种新的交换器类型,在这种交换器中,由该交换器路由的消息可以在用户选择延迟的情况下延迟。让我们看看它是如何工作的。
安装插件
要安装插件,请访问我们的 社区插件页面,并下载适用于您的 RabbitMQ 安装的相应 .ez 文件。将插件复制到 RabbitMQ 的插件文件夹中,然后运行以下命令进行启用
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
插件启用后,我们就可以开始使用它了。
使用交换器
要使用延迟消息交换器,您只需声明一个交换器,并提供 "x-delayed-message" 交换器类型,如下所示
// ... elided code ...
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-delayed-type", "direct");
channel.exchangeDeclare("my-exchange", "x-delayed-message", true, false, args);
// ... more code ...
稍后我们将解释我们在交换器声明中提供的特殊参数 "x-delayed-type" 的含义。
延迟消息
要延迟消息,用户必须发布带有名为 x-delay 的特殊标头的消息,该标头接受一个整数,表示 RabbitMQ 应延迟消息的毫秒数。值得注意的是,这里的*延迟*意味着:延迟消息路由到队列或其他交换器。交换器没有消费者概念。因此,一旦延迟过期,插件将尝试将消息路由到与交换器的路由规则匹配的队列,并将消息分配给它们。请注意,如果消息无法路由到任何队列,则会像 AMQP 对无法路由的消息指定的那样被丢弃。下面是一些向消息添加 x-delay 标头并发布到我们的交换器的示例代码。
// ... elided code ...
byte[] messageBodyBytes = "delayed payload".getBytes();
AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder();
headers = new HashMap<String, Object>();
headers.put("x-delay", 5000);
props.headers(headers);
channel.basicPublish("my-exchange", "", props.build(), messageBodyBytes);
在上面的示例中,消息将在插件路由之前延迟五秒。该示例假定您已建立与 RabbitMQ 的连接并获得了通道。
灵活路由
当我们上面声明交换器时,我们提供了 x-delayed-type 参数,其值为 direct。这告诉交换器在路由消息、创建绑定等时我们希望它具有何种行为。在此示例中,我们的交换器将表现得像*direct*交换器,但我们可以传入 topic、fanout 或其他插件提供的自定义交换器类型。通过这样做,我们不会限制用户对延迟消息插件提供的路由行为。
检查消息是否被延迟
一旦我们在消费者端收到一条消息,我们如何判断该消息是否被延迟了?插件将保留 x-delay 消息标头,但会对其值取反。因此,如果您发布了一条延迟 5000 毫秒的消息,收到该消息的消费者将发现 x-delay 标头设置为 -5000。
我们需要反馈
我们已将该插件作为实验版本发布,以收集社区的反馈。请使用它,并在插件的 问题页面 或我们的 官方邮件列表 上向我们报告。
了解更多
- 网络研讨会:RabbitMQ 3.8 有哪些新特性?
- 网络研讨会:使用 RabbitMQ 的开发者应该知道的 10 件事