跳到主要内容
版本: 4.1

死信交换器

什么是死信交换器

当发生以下四个事件中的任何一个时,来自队列的消息可以被“死信化”,这意味着这些消息将被重新发布到一个交换器。

  1. 消息被 AMQP 1.0 接收者使用 rejected 结果否定确认,或者被 AMQP 0.9.1 消费者使用 basic.rejectbasic.nackrequeue 参数设置为 false,或者
  2. 消息由于 单消息 TTL 而过期,或者
  3. 消息因其队列超出 长度限制 而被丢弃,或者
  4. 消息返回到仲裁队列的次数超过 delivery-limit

如果整个 队列过期,则队列中的消息不会被死信化。

死信交换器 (DLX) 是普通的交换器。它们可以是任何常用类型,并且像普通交换器一样声明。

如何配置死信处理

对于任何给定的队列,DLX 可以由客户端使用 策略 定义。有几个与 DLX 相关的策略键,包括一些 仅仲裁队列支持 的策略键,但两个关键的策略键是

  • dead-letter-exchange: 要使用的 DLX 的名称
  • dead-letter-routing-key: 死信消息时要使用的路由键
重要

策略键也可以在队列声明时通过 可选参数 由应用程序设置。

强烈建议不要使用硬编码的 x-arguments,因为它们 无法在不重新部署应用程序的情况下更新,而策略可以随时更新。

如果在策略和参数中都指定了 DLX,则参数中指定的 DLX 将覆盖策略中指定的 DLX。

除了目标 DLX 名称之外,还需要指定在死信消息时使用的路由键。如果未设置路由键,则使用消息自身的路由键。

当指定死信交换器时,除了对声明的队列具有通常的配置权限外,用户还必须对该队列具有读取权限,并对死信交换器具有写入权限。权限在声明队列时进行验证。

使用策略配置死信交换器

要使用策略指定 DLX,请将键 "dead-letter-exchange" 添加到策略定义中

rabbitmqctl set_policy DLX ".*" '{"dead-letter-exchange":"my-dlx"}' --apply-to queues --priority 7

之前的示例声明了一个名为 “DLX” 的策略,该策略应用于所有队列(无论类型如何),并将名为 “my-dlx” 的交换器配置为死信处理目标。这只是一个示例,在实践中,常见的是看到多个策略,每个策略应用于队列的子集。

类似地,可以通过将键 “dead-letter-routing-key” 添加到策略来指定显式路由键

rabbitmqctl set_policy DLX ".*" '{"dead-letter-exchange":"my-dlx", "dead-letter-routing-key":"my-routing-key"}' --apply-to queues --priority 7

使用可选队列参数配置死信交换器

警告

强烈建议不要使用硬编码的 x-arguments,因为它们 无法在不重新部署应用程序和删除队列(然后才能重新声明队列)的情况下更新,而策略可以随时更新。

要为队列设置 DLX,请在声明队列时指定可选的 x-dead-letter-exchange 参数。该值必须是同一虚拟主机中的交换器名称

channel.exchangeDeclare("some.exchange.name", "direct");

// Important: prefer using policies over hardcoded x-arguments
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-dead-letter-exchange", "some.exchange.name");
channel.queueDeclare("myqueue", false, false, false, args);

上面的代码声明了一个名为 some.exchange.name 的新交换器,并将此新交换器设置为新创建队列的死信交换器。请注意,交换器不必在声明队列时声明,但它应该在消息需要被死信处理时存在。如果那时它丢失了,则消息将被静默丢弃。

除了目标 DLX 名称之外,还需要指定在死信消息时使用的路由键。如果未设置路由键,则使用消息自身的路由键。

// Important: prefer using policies over hardcoded x-arguments.
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-dead-letter-exchange", "some.exchange.name");
args.put("x-dead-letter-routing-key", "some-routing-key");

当指定死信交换器时,除了对声明的队列具有通常的配置权限外,用户还必须对该队列具有读取权限,并对死信交换器具有写入权限。权限在声明队列时进行验证。

路由死信消息

死信消息被路由到其死信交换器,方式为:

  • 使用为其所在队列指定的路由键;或者,如果未设置
  • 使用它们最初发布时使用的相同路由键

例如,如果您使用 foo 路由键将消息发布到交换器,并且该消息被死信处理,则它将使用 foo 路由键发布到其死信交换器。如果消息最初落入的队列在声明时将 x-dead-letter-routing-key 设置为 bar,则该消息将使用 bar 路由键发布到其死信交换器。

请注意,如果未为队列设置特定的路由键,则队列上的消息将使用所有原始路由键进行死信处理。这包括由 CCBCC 标头添加的路由键(有关这两个标头的详细信息,请参阅 发送者选择分发)。

死信循环

有可能形成消息死信循环,其中同一消息两次到达同一队列。例如,当队列将消息“死信”到默认交换器而不指定死信路由键时,可能会发生这种情况。为了防止 RabbitMQ 中自动无限的消息循环,RabbitMQ 将检测循环并丢弃消息,如果在整个循环中没有拒绝

安全性

死信处理是消息发布的一种形式,并且与任何形式的发布一样,它在某些情况下可能会失败。例如,如果死信处理配置为使用没有在线仲裁的仲裁队列,则发布将失败,并且执行死信处理的节点将记录类似于以下内容的消息

Cannot forward any dead-letter messages from source quorum queue 'qq.input' in vhost 'my-vhost'
with configured dead-letter-exchange exchange 'amq.topic' in vhost 'my-vhost'
and configured dead-letter-routing-key 'my-app.events.type.abc'

使用发布者确认重新发布

默认情况下,死信消息在内部重新发布时启用发布者 确认。因此,在集群 RabbitMQ 环境中使用 DLX 不能保证安全。消息在发布到 DLX 目标队列后立即从原始队列中删除。这确保不会出现可能耗尽 broker 资源的过度消息堆积。但是,如果目标队列不可用于接受消息,则消息可能会丢失。

仲裁队列支持 至少一次死信处理,其中消息在内部重新发布时启用发布者确认。

死信处理对消息的影响

死信消息会修改其标头

  • 交换器名称被替换为最新的死信交换器的名称
  • 路由键可能会被替换为在执行死信处理的队列中指定的路由键(即配置的 dead-letter-routing-key),
  • 如果发生上述情况,CC 标头也将被删除,并且
  • BCC 标头将根据 发送者选择分发 删除

单个消息可以被死信处理多次。每次消息被死信处理时,此事件都将记录在消息标头中。为了防止标头无限增长,死信事件历史记录通过 {Queue, Reason} 对进行压缩。

AMQP 1.0 消息将包含一个 消息注释,其符号键为 x-opt-deaths,值为 数组map。AMQP 0.9.1 消息将包含一个 x-death 标头,其值为数组。AMQP 1.0 和 AMQP 0.9.1 中的数组都按时间顺序排列,即最近的死信事件记录在第一个数组元素中。

下表描述了 AMQP 1.0 map 键值对和数组元素的 AMQP 0.9.1 表。所有 AMQP 1.0 键的类型均为 symbol。AMQP 1.0 客户端不得依赖 map 键值对的顺序。

AMQP 1.0 键AMQP 1.0 值类型AMQP 0.9.1 键AMQP 0.9.1 值类型描述
queuestringqueuelongstr此消息从中死信处理的队列的名称。
reasonsymbolreasonlongstr此消息被死信处理的原因(如下所述)。
countulongcountlong此消息因该原因从该队列中死信处理的次数。
first-timetimestamp此消息因该原因首次从该队列中死信处理的时间。
last-timetimestamp此消息因该原因最后一次从该队列中死信处理的时间。
timetimestamp此消息因该原因首次从该队列中死信处理的时间。
exchangestringexchangelongstr此消息首次因该原因从该队列中死信处理之前发布到的交换器。
routing-keysarray of stringrouting-keysarray of longstr此消息首次因该原因从该队列中死信处理之前的路由键(包括 CC 但不包括 BCC)。
ttluintAMQP 1.0 headerttl(生存时间,以毫秒为单位),在此消息首次因该原因从该队列中死信处理之前。
original-expirationlongstr此消息首次因该原因从该队列中死信处理之前的原始 expiration 属性。

记录 AMQP 1.0 ttl 和 AMQP 0.9.1 original-expiration 是可选的,因为原始消息的 TTL 在死信处理时从消息中删除,以防止它在路由到的任何队列中再次过期。

reason 是一个名称,描述了消息被死信处理的原因,并且是以下之一

此外,对于第一次死信处理事件,还会添加以下六个 AMQP 1.0 消息注释或 AMQP 0.9.1 标头

  1. x-first-death-queue: 此消息从中死信处理的第一个队列。
  2. x-first-death-reason: 此消息首次被死信处理的原因。
  3. x-first-death-exchange: 此消息首次被死信处理之前发布到的交换器。
  4. x-last-death-queue: 此消息从中死信处理的最后一个队列。
  5. x-last-death-reason: 此消息最后一次被死信处理的原因。
  6. x-last-death-exchange: 此消息最后一次被死信处理之前发布到的交换器。

x-first-* 注释永远不会被修改。每当消息随后被死信处理时,x-last-* 注释都会被更新。

© . All rights reserved.