跳至主内容

使用 RabbitMQ 调度消息

·阅读时长4分钟
Álvaro Videla

一段时间以来,人们一直在寻找在 RabbitMQ 中实现延迟消息的方法。到目前为止,公认的解决方案是使用 消息 TTL死信交换器 的组合,NServiceBus 在此 实现。在考虑了开箱即用的解决方案一段时间后,我们有机会将其实现为一个插件。引入 RabbitMQ 延迟消息插件

RabbitMQ 延迟消息插件 为 RabbitMQ 添加了一种新的交换器类型,在这种交换器中,由该交换器路由的消息可以在用户选择延迟的情况下延迟。让我们看看它是如何工作的。

安装插件

要安装插件,请访问我们的 社区插件页面,并下载适用于您的 RabbitMQ 安装的相应 .ez 文件。将插件复制到 RabbitMQ 的插件文件夹中,然后运行以下命令进行启用

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

插件启用后,我们就可以开始使用它了。

使用交换器

要使用延迟消息交换器,您只需声明一个交换器,并提供 "x-delayed-message" 交换器类型,如下所示

// ... elided code ...
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-delayed-type", "direct");
channel.exchangeDeclare("my-exchange", "x-delayed-message", true, false, args);
// ... more code ...

稍后我们将解释我们在交换器声明中提供的特殊参数 "x-delayed-type" 的含义。

延迟消息

要延迟消息,用户必须发布带有名为 x-delay 的特殊标头的消息,该标头接受一个整数,表示 RabbitMQ 应延迟消息的毫秒数。值得注意的是,这里的*延迟*意味着:延迟消息路由到队列或其他交换器。交换器没有消费者概念。因此,一旦延迟过期,插件将尝试将消息路由到与交换器的路由规则匹配的队列,并将消息分配给它们。请注意,如果消息无法路由到任何队列,则会像 AMQP 对无法路由的消息指定的那样被丢弃。下面是一些向消息添加 x-delay 标头并发布到我们的交换器的示例代码。

// ... elided code ...
byte[] messageBodyBytes = "delayed payload".getBytes();
AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder();
headers = new HashMap<String, Object>();
headers.put("x-delay", 5000);
props.headers(headers);
channel.basicPublish("my-exchange", "", props.build(), messageBodyBytes);

在上面的示例中,消息将在插件路由之前延迟五秒。该示例假定您已建立与 RabbitMQ 的连接并获得了通道。

灵活路由

当我们上面声明交换器时,我们提供了 x-delayed-type 参数,其值为 direct。这告诉交换器在路由消息、创建绑定等时我们希望它具有何种行为。在此示例中,我们的交换器将表现得像*direct*交换器,但我们可以传入 topic、fanout 或其他插件提供的自定义交换器类型。通过这样做,我们不会限制用户对延迟消息插件提供的路由行为。

检查消息是否被延迟

一旦我们在消费者端收到一条消息,我们如何判断该消息是否被延迟了?插件将保留 x-delay 消息标头,但会对其值取反。因此,如果您发布了一条延迟 5000 毫秒的消息,收到该消息的消费者将发现 x-delay 标头设置为 -5000

我们需要反馈

我们已将该插件作为实验版本发布,以收集社区的反馈。请使用它,并在插件的 问题页面 或我们的 官方邮件列表 上向我们报告。

了解更多

© . This site is unofficial and not affiliated with VMware.