使用 RabbitMQ 调度消息
一段时间以来,人们一直在寻找使用 RabbitMQ 实现延迟消息传递的方法。到目前为止,公认的解决方案是混合使用message TTL 和 Dead Letter Exchanges,如 NServiceBus here 实现的那样。在考虑了一段时间的开箱即用解决方案后,我们有机会将其作为插件来实现。RabbitMQ 延迟消息插件由此诞生。
RabbitMQ 延迟消息插件为 RabbitMQ 添加了一种新的交换机类型,用户可以选择延迟通过该交换机路由的消息。让我们看看它是如何工作的。
安装插件
要安装插件,请访问我们的 Community Plugins page 并下载与您的 RabbitMQ 安装相对应的 .ez 文件。将插件复制到 RabbitMQ 的插件文件夹中,然后运行以下命令启用它
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
一旦插件被启用,我们就可以开始使用它了。
使用交换机
要使用延迟消息交换机,您只需声明一个交换机,并提供 "x-delayed-message"
交换机类型,如下所示
// ... elided code ...
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-delayed-type", "direct");
channel.exchangeDeclare("my-exchange", "x-delayed-message", true, false, args);
// ... more code ...
稍后我们将解释我们在交换机声明中提供的特殊参数 "x-delayed-type"
的含义。
延迟消息
要延迟消息,用户必须发布带有名为 x-delay
的特殊标头的消息,该标头接受一个整数,表示消息应被 RabbitMQ 延迟的毫秒数。值得注意的是,这里的延迟意味着:延迟消息路由到队列或其他交换机。交换机没有消费者的概念。因此,一旦延迟到期,插件将尝试将消息路由到与交换机的路由规则以及分配给消息的规则匹配的队列。请注意,如果消息无法路由到任何队列,则会根据 AMQP 关于不可路由消息的规定将其丢弃。这是一些示例代码,它将 x-delay
标头添加到消息并发布到我们的交换机。
// ... elided code ...
byte[] messageBodyBytes = "delayed payload".getBytes();
AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder();
headers = new HashMap<String, Object>();
headers.put("x-delay", 5000);
props.headers(headers);
channel.basicPublish("my-exchange", "", props.build(), messageBodyBytes);
在前面的示例中,消息将在被插件路由之前延迟五秒钟。该示例假设您已建立与 RabbitMQ 的连接并获得了通道。
灵活路由
当我们在上面声明交换机时,我们提供了一个设置为 direct
的 x-delayed-type
参数。它的作用是告诉交换机,我们希望它在路由消息、创建绑定等时具有什么样的行为。在示例中,我们的交换机将像 direct 交换机一样工作,但我们可以在那里传递 topic、fanout 或由其他插件提供的自定义交换机类型。通过这样做,我们不会限制用户可以从延迟消息插件获得哪种路由行为。
检查消息是否被延迟
一旦我们在消费者端收到消息,我们如何判断消息是否被延迟?插件将保留 x-delay
消息标头,但会否定传递的值。因此,如果您发布了一条延迟 5000
毫秒的消息,则接收消息的消费者将发现 x-delay
标头设置为 -5000
我们需要反馈
我们已将该插件作为实验性插件发布,以收集来自社区的反馈。请使用它并在插件的 issue page 或我们的 official mailing list 上向我们报告。
了解更多
- 网络研讨会: RabbitMQ 3.8 的新功能?
- 网络研讨会: 每个使用 RabbitMQ 的开发人员都应该知道的 10 件事