跳至主内容

至少一次死信

·阅读 22 分钟

RabbitMQ 3.10 中的仲裁队列 (Quorum queues) 提供了更安全的死信机制,该机制使用至少一次的保证来传输队列之间的消息。这篇博文将解释您开始使用至少一次死信所需了解的一切。

这篇帖子还介绍了 RabbitMQ 3.10 的另外两项功能:仲裁队列的消息生存时间 (TTL) 以及死信消息的 Prometheus 指标。

概述

存储在 RabbitMQ 队列中的某些消息会过期或被消费者拒绝确认。RabbitMQ 可以配置为将这些消息“死信化”(dead letter),即将它们重新发布到专门的交换机中,而不是直接丢弃它们。

在 RabbitMQ 3.10 之前,死信机制并不安全。从队列(“源队列”)中死信化的消息,不能保证一定能被投递到由 dead-letter-exchange 策略配置的交换机所路由到的队列(即“目标队列”)。

这是因为消息在死信化时内部并未开启发布者确认(publisher confirms)。我们称之为“至多一次”(at-most-once)死信策略。死信消息可能会到达目标队列,但也可能由于多种原因丢失:

  • 目标队列不可用。例如,经典队列的主机节点宕机或正在升级,或者仲裁队列(quorum queue)临时失去了多数节点。
  • 目标队列达到了长度限制,且其溢出行为被设置为 reject-publish,从而拒绝任何传入的消息。
  • 网络分区导致源队列和目标队列之间无法通信。
  • 死信路由拓扑配置错误。例如,配置的 dead-letter-exchange 不存在,或者没有目标队列绑定到该 dead-letter-exchange

RabbitMQ 3.10 引入了一项名为“至少一次”(at-least-once)死信机制的新功能。这是一个针对仲裁队列(作为源队列)的可选功能。此新功能确保源仲裁队列中死信化的所有消息,即使在上述可能导致“至多一次”策略下丢失消息的场景中,最终也能够到达目标队列(无论是经典队列、仲裁队列还是流队列)。

本篇博文介绍了如何启用“至少一次”死信机制、提供了详细的示例,并描述了该功能的注意事项和最佳实践。

用法

要为源仲裁队列启用 at-least-once 死信机制,我们需要应用以下策略(或等效的以 x- 开头的队列参数):

  • dead-letter-strategy 设置为 at-least-once。默认值为 at-most-once
  • overflow 设置为 reject-publish。默认值为 drop-head
  • 配置了 dead-letter-exchange

此外,必须启用功能标志 stream_queue。对于 3.9 版本之后创建的 RabbitMQ 集群,该标志默认已启用。尽管“至少一次”死信机制并不一定使用流(除非目标队列恰好是流队列),但仍需要 stream_queue 功能标志,因为“至少一次”死信机制依赖于该标志引入的一些实现细节。

示例

跟随此示例需要安装 kubectl 客户端,并指向任何运行中的 Kubernetes 1.19 或更高版本的集群。

如果您没有可用的 Kubernetes 集群,最快的方法是安装 kind,以便在 Docker 中启动一个本地 Kubernetes 集群。

> kind create cluster

安装 rabbitmq/cluster-operator

> kubectl apply -f https://github.com/rabbitmq/cluster-operator/releases/latest/download/cluster-operator.yml

部署一个 3 节点的 RabbitMQ 集群

> cat <<EOF | kubectl apply -f -
---
apiVersion: rabbitmq.com/v1beta1
kind: RabbitmqCluster
metadata:
name: my-rabbit
spec:
replicas: 3
image: rabbitmq:3.10.0-management
EOF

一旦所有 3 个 pod 就绪(不到 1 分钟),我们就创建一个源队列和一个目标队列。

> kubectl exec my-rabbit-server-0 -c rabbitmq -- rabbitmqadmin declare queue name=my-source-queue \
durable=true queue_type=quorum arguments='{"x-dead-letter-exchange" : "",
"x-dead-letter-routing-key" : "my-target-queue" , "x-overflow" : "reject-publish"}'

> kubectl exec my-rabbit-server-0 -c rabbitmq -- rabbitmqadmin declare queue name=my-target-queue \
durable=true queue_type=classic

最后两个命令通过在 my-rabbit-server-0 pod 的 RabbitMQ 容器中执行 rabbitmqadmin 命令来声明队列。

rabbitmqadmin 命令是一个与 RabbitMQ 管理 API 交互的 Python 脚本。不建议通过 rabbitmqadmin 命令来声明队列和发送消息。我们在本文中使用它,是因为它是让您跟随示例操作的最简单方式。

rabbitmq/cluster-operator 创建的 pod 名称格式为 <rabbitmq-cluster-name>-server-<index>。在上面的 YAML 中,我们将 <rabbitmq-cluster-name> 定义为 my-rabbit

第一个命令创建源队列。要使“至少一次”死信机制生效,其必须为 queue_type=quorum。对于源队列,我们定义了以 JSON 格式编码的额外队列参数(以 x- 开头)。

  • x-dead-letter-exchange 设置为空字符串 ("") 意味着源队列产生的死信消息将发布到默认交换机。(虽然我们可以创建一个新的死信交换机,但使用默认交换机可以简化此示例。)
  • x-dead-letter-routing-key 设置为 my-target-queue 意味着死信消息将以路由键 my-target-queue 发布。由于此路由键与目标队列(由第 2 个命令创建)的名称匹配,死信消息将通过默认交换机自动路由到目标队列,无需创建任何额外的绑定。
  • 如上所述,x-overflow 必须设置为 reject-publish,这是“至少一次”死信机制的先决条件。

第二个命令创建目标队列。它可以是任何队列类型。在此示例中,我们选择了经典队列。请注意,与在所有 3 个节点上拥有 3 个副本的源仲裁队列相比,目标经典队列并非高可用,且驻留在单个节点上。

让我们发布第一条消息 msg1

> kubectl exec my-rabbit-server-0 -c rabbitmq -- rabbitmqadmin publish exchange=amq.default routing_key=my-source-queue \
payload=msg1 properties='{"expiration" : "1000", "delivery_mode" : 2}'

此命令演示了 RabbitMQ 3.10 的另一个新功能:仲裁队列支持消息 TTL。下图说明了消息的流向:

  1. 我们向默认交换机发布一条消息。
  2. 它被路由到源仲裁队列,并在 1 秒(1000 毫秒)后过期。
  3. 过期导致消息被死信化并发送到默认交换机。
  4. 它被路由到目标经典队列。

Figure 1: Dead letter routing topology (at-most-once)
图 1:死信路由拓扑(至多一次)

请注意,我们将 delivery_mode 设置为整数 2,表示消息已持久化。在最初将消息发布到源仲裁队列时,该标志并不重要,因为仲裁队列中的所有消息都会写入磁盘。然而,一旦消息被死信化到目标队列(可能不是仲裁队列)时,该持久化标志就变得非常重要。

我们可以验证消息是否已到达目标队列。

> kubectl exec my-rabbit-server-0 -c rabbitmq -- rabbitmqctl list_queues --formatter=pretty_table --quiet \
name type messages messages_ready messages_unacknowledged

┌─────────────────┬─────────┬──────────┬────────────────┬─────────────────────────┐
│ name │ type │ messages │ messages_ready │ messages_unacknowledged │
├─────────────────┼─────────┼──────────┼────────────────┼─────────────────────────┤
│ my-target-queue │ classic │ 1 │ 1 │ 0 │
├─────────────────┼─────────┼──────────┼────────────────┼─────────────────────────┤
│ my-source-queue │ quorum │ 0 │ 0 │ 0 │
└─────────────────┴─────────┴──────────┴────────────────┴─────────────────────────┘

接下来,让我们尝试一下目标队列不可用时会发生什么。确定目标经典队列所在主机节点的一种方法是列出队列的进程标识符 (PID)。

> kubectl exec my-rabbit-server-0 -c rabbitmq -- rabbitmqctl list_queues --quiet name pid

name pid
my-target-queue <rabbit@my-rabbit-server-0.my-rabbit-nodes.default.1646297039.856.0>
my-source-queue <rabbit@my-rabbit-server-0.my-rabbit-nodes.default.1646297039.821.0>

PID 显示目标经典队列进程和源仲裁队列领导者进程都驻留在 pod my-rabbit-server-0 中。让我们停止该 RabbitMQ 服务器。

> kubectl exec my-rabbit-server-0 -c rabbitmq -- rabbitmqctl stop_app

Stopping rabbit application on node rabbit@my-rabbit-server-0.my-rabbit-nodes.default ...

源仲裁队列将仍然可用,因为多数节点(3 个中的 2 个)可用,并且另一个节点将成为新的领导者。

像之前一样,我们再次向源队列发送一条消息,让它在 1 秒后过期。由于 pod my-rabbit-server-0 中的 RabbitMQ 节点已宕机,我们在 my-rabbit-server-1 中执行以下命令。

> kubectl exec my-rabbit-server-1 -c rabbitmq -- rabbitmqadmin publish exchange=amq.default routing_key=my-source-queue \
payload=msg2 properties='{"expiration" : "1000", "delivery_mode" : 2}'

由于目标队列已宕机,它将无法报告统计信息。

> kubectl exec my-rabbit-server-1 -c rabbitmq -- rabbitmqctl list_queues --formatter=pretty_table --quiet \
name type messages messages_ready messages_unacknowledged state

┌─────────────────┬──────────────────────┬──────────┬────────────────┬─────────────────────────┬─────────┐
│ name │ type │ messages │ messages_ready │ messages_unacknowledged │ state │
├─────────────────┼──────────────────────┼──────────┼────────────────┼─────────────────────────┼─────────┤
│ my-target-queue │ rabbit_classic_queue │ │ │ │ down │
├─────────────────┼──────────────────────┼──────────┼────────────────┼─────────────────────────┼─────────┤
│ my-source-queue │ quorum │ 0 │ 0 │ 0 │ running │
└─────────────────┴──────────────────────┴──────────┴────────────────┴─────────────────────────┴─────────┘

然而,因为目标队列宕机且源队列中没有任何消息,我们知道第二条消息在死信化时丢失了!

由于我们在上面声明源队列时没有定义 dead-letter-strategyat-least-once,源队列使用了默认的 at-most-once 策略。我们可以改进这一点。在本示例中,我们通过应用策略将死信策略动态切换为 at-least-once

> kubectl exec my-rabbit-server-1 -c rabbitmq -- rabbitmqctl set_policy --apply-to queues \
my-policy my-source-queue '{"dead-letter-strategy" : "at-least-once"}'

Setting policy "my-policy" for pattern "my-source-queue" to "{"dead-letter-strategy" : "at-least-once"}"
with priority "0" for vhost "/" ...

让我们发送第三条消息。

> kubectl exec my-rabbit-server-1 -c rabbitmq -- rabbitmqadmin publish exchange=amq.default routing_key=my-source-queue \
payload=msg3 properties='{"expiration" : "1000", "delivery_mode" : 2}'

使用新的 at-least-once 策略,当第 3 条消息过期并被死信化时,由于目标队列不可用,它将被源队列存储。

> kubectl exec my-rabbit-server-1 -c rabbitmq -- rabbitmqctl list_queues --formatter=pretty_table --quiet \
name type messages messages_ready messages_unacknowledged state

┌─────────────────┬──────────────────────┬──────────┬────────────────┬─────────────────────────┬─────────┐
│ name │ type │ messages │ messages_ready │ messages_unacknowledged │ state │
├─────────────────┼──────────────────────┼──────────┼────────────────┼─────────────────────────┼─────────┤
│ my-target-queue │ rabbit_classic_queue │ │ │ │ down │
├─────────────────┼──────────────────────┼──────────┼────────────────┼─────────────────────────┼─────────┤
│ my-source-queue │ quorum │ 1 │ 0 │ 0 │ running │
└─────────────────┴──────────────────────┴──────────┴────────────────┴─────────────────────────┴─────────┘

该消息既不是“就绪”(Ready,即对普通消费者可用),也不是“未确认”(Unacknowledged,即被普通消费者消费但尚未确认)。相反,该消息被安全地保存在源仲裁队列的一个独立数据结构中,该结构仅供特殊的 RabbitMQ 内部死信消费者进程消费。

让我们输出该死信消费者进程的日志。死信消费者进程与仲裁队列领导者节点托管在一起。我们首先需要找出哪个节点成为了新的领导者。

> kubectl exec my-rabbit-server-1 -c rabbitmq -- rabbitmqctl list_queues --formatter=pretty_table --quiet name leader
┌─────────────────┬───────────────────────────────────────────────────┐
│ name │ leader │
├─────────────────┼───────────────────────────────────────────────────┤
│ my-target-queue │ │
├─────────────────┼───────────────────────────────────────────────────┤
│ my-source-queue │ rabbit@my-rabbit-server-1.my-rabbit-nodes.default │
└─────────────────┴───────────────────────────────────────────────────┘

在我们的示例中,新的领导者恰好在 pod my-rabbit-server-1 上。当您运行此示例时,它也可能是 my-rabbit-server-2,在这种情况下,您需要将下面命令中的 1 替换为 2

日志显示了一条描述性的警告信息。

> kubectl logs my-rabbit-server-1 -c rabbitmq | grep dead-letter

[warn] <0.4156.0> Cannot forward any dead-letter messages from source quorum queue 'my-source-queue'
in vhost '/' with configured dead-letter-exchange exchange '' in vhost '/' and configured
dead-letter-routing-key 'my-target-queue'. This can happen either if the dead-letter routing topology is misconfigured
(for example no queue bound to dead-letter-exchange or wrong dead-letter-routing-key configured)
or if non-mirrored classic queues are bound whose host node is down.
Fix this issue to prevent dead-lettered messages from piling up in the source quorum queue.
This message will not be logged again.

我们通过重启目标经典队列的主机节点来解决此问题。

> kubectl exec my-rabbit-server-0 -c rabbitmq -- rabbitmqctl start_app

Starting node rabbit@my-rabbit-server-0.my-rabbit-nodes.default ...

内部死信消费者进程会定期重试发送消息。当前的默认重试间隔为 3 分钟。最晚在 3 分钟后,第 3 条消息应该已经到达了目标队列。

> kubectl exec my-rabbit-server-0 -c rabbitmq -- rabbitmqctl list_queues --formatter=pretty_table --quiet \
name type messages messages_ready messages_unacknowledged state

┌─────────────────┬─────────┬──────────┬────────────────┬─────────────────────────┬─────────┐
│ name │ type │ messages │ messages_ready │ messages_unacknowledged │ state │
├─────────────────┼─────────┼──────────┼────────────────┼─────────────────────────┼─────────┤
│ my-source-queue │ quorum │ 0 │ 0 │ 0 │ running │
├─────────────────┼─────────┼──────────┼────────────────┼─────────────────────────┼─────────┤
│ my-target-queue │ classic │ 2 │ 2 │ 0 │ running │
└─────────────────┴─────────┴──────────┴────────────────┴─────────────────────────┴─────────┘

我们的理解是,第 1 条和第 3 条消息在目标队列中,但第 2 条消息丢失了,因为它在目标队列宕机时使用了 at-most-once 死信机制。

> kubectl exec my-rabbit-server-0 -c rabbitmq -- rabbitmqadmin get queue=my-target-queue count=2
+-----------------+----------+---------------+---------+---------------+------------------+-------------+
| routing_key | exchange | message_count | payload | payload_bytes | payload_encoding | redelivered |
+-----------------+----------+---------------+---------+---------------+------------------+-------------+
| my-target-queue | | 1 | msg1 | 4 | string | False |
| my-target-queue | | 0 | msg3 | 4 | string | False |
+-----------------+----------+---------------+---------+---------------+------------------+-------------+

payload 列验证了我们的理解是正确的,并且 at-least-once 死信机制如预期般工作。即使目标队列不可用,死信消息在目标队列恢复可用后也成功到达了那里。第 1 条消息仍然存储在目标队列中,因为我们在发布到源仲裁队列时设置了持久化标志。如果我们没有设置持久化标志,第 1 条消息也会丢失。

下图总结了第 3 条消息的流向。

Figure 2: Dead letter routing topology (at-least-once)
图 2:死信路由拓扑(至少一次)

  1. 消息被发布到默认交换机。
  2. 消息被路由到源仲裁队列。仲裁队列是 Raft 共识算法中的复制状态机。仲裁队列的状态不仅包含发布者排队消息的队列数据结构,还包括发布者、消费者、发送给消费者但尚未确认的消息以及其他统计数据。“至少一次”死信机制向仲裁队列的状态中添加了另一个队列数据结构:一个仅包含死信消息的队列。因此,当消息在 1 秒后过期时,它会从“普通”消息队列移动到死信消息队列。消息将安全地存储在那里,直到步骤 7 完成确认。
  3. 有一个(RabbitMQ 内部的)死信消费者进程与仲裁队列领导者节点托管在一起。它的工作是从单个源仲裁队列的死信消息队列中消费消息,将它们转发到所有目标队列,等待直到收到所有发布者确认(步骤 6),最后将死信确认回传给源仲裁队列(步骤 7)。
  4. 死信消费者通过配置的 dead-letter-exchange 路由死信消息。在我们的示例中,我们将默认交换机配置为死信交换机。如果路由不存在,死信消费者将在一段时间后尝试重新路由。
  5. 如果路由存在,消息将被发送到目标队列。
  6. 目标队列将发布者确认回传给死信消费者。
  7. 死信消费者向源仲裁队列发送消费者确认,源仲裁队列中对应的死信消息将被删除。

Prometheus 指标

RabbitMQ 3.10 带来了另一个新功能:用于死信消息的 Prometheus 指标。节点全局计数器将返回死信消息的数量,并按以下维度细分:

  1. 死信原因

    • expired:消息 TTL 过期(如我们的示例)。
    • rejected:消费者发送了 basic.rejectbasic.nack 且未设置 requeue 选项。
    • maxlen:队列长度超过限制,且 overflow 设置为 drop-headreject-publish-dlx。(后者仅适用于经典队列。)
    • delivery_limit:投递限制超过(仅适用于仲裁队列)。消息被重新入队次数过多,例如因为消费者发送了 basic.rejectbasic.nack 且设置了 requeue 选项,或者消费者与仲裁队列领导者断开连接。
  2. 源队列类型:即消息被中死信化的队列类型。

    • rabbit_classic_queue
    • rabbit_quorum_queue
    • (流队列不会死信消息,因为它们是仅追加的日志,消息根据保留策略进行截断。)
  3. 死信策略

    • disabled:队列未配置 dead-letter-exchange 或配置的 dead-letter-exchange 不存在,意味着消息被丢弃。
    • at_most_once:队列配置的 dead-letter-exchange 存在。
    • at_least_once:队列类型为 rabbit_quorum_queue,配置了 dead-letter-exchangedead-letter-strategy 设置为 at-least-once,且 overflow 设置为 reject-publish

遵循我们的示例,让我们输出这些指标。在一个 shell 窗口中,将 RabbitMQ 的 Prometheus 端口 15692 转发到本地。

> kubectl port-forward pod/my-rabbit-server-1 15692

在另一个 shell 窗口中,抓取 Prometheus 端点:

> curl --silent localhost:15692/metrics/ | grep rabbitmq_global_messages_dead_lettered

# TYPE rabbitmq_global_messages_dead_lettered_confirmed_total counter
# HELP rabbitmq_global_messages_dead_lettered_confirmed_total Total number of messages dead-lettered and confirmed by target queues
rabbitmq_global_messages_dead_lettered_confirmed_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="at_least_once"} 1

# TYPE rabbitmq_global_messages_dead_lettered_delivery_limit_total counter
# HELP rabbitmq_global_messages_dead_lettered_delivery_limit_total Total number of messages dead-lettered due to
# delivery-limit exceeded
rabbitmq_global_messages_dead_lettered_delivery_limit_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="at_least_once"} 0
rabbitmq_global_messages_dead_lettered_delivery_limit_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="at_most_once"} 0
rabbitmq_global_messages_dead_lettered_delivery_limit_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="disabled"} 0

# TYPE rabbitmq_global_messages_dead_lettered_expired_total counter
# HELP rabbitmq_global_messages_dead_lettered_expired_total Total number of messages dead-lettered due to message TTL exceeded
rabbitmq_global_messages_dead_lettered_expired_total{queue_type="rabbit_classic_queue",dead_letter_strategy="at_most_once"} 0
rabbitmq_global_messages_dead_lettered_expired_total{queue_type="rabbit_classic_queue",dead_letter_strategy="disabled"} 0
rabbitmq_global_messages_dead_lettered_expired_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="at_least_once"} 1
rabbitmq_global_messages_dead_lettered_expired_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="at_most_once"} 1
rabbitmq_global_messages_dead_lettered_expired_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="disabled"} 0

# TYPE rabbitmq_global_messages_dead_lettered_maxlen_total counter
# HELP rabbitmq_global_messages_dead_lettered_maxlen_total Total number of messages dead-lettered due to overflow drop-head
# or reject-publish-dlx
rabbitmq_global_messages_dead_lettered_maxlen_total{queue_type="rabbit_classic_queue",dead_letter_strategy="at_most_once"} 0
rabbitmq_global_messages_dead_lettered_maxlen_total{queue_type="rabbit_classic_queue",dead_letter_strategy="disabled"} 0
rabbitmq_global_messages_dead_lettered_maxlen_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="at_most_once"} 0
rabbitmq_global_messages_dead_lettered_maxlen_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="disabled"} 0

# TYPE rabbitmq_global_messages_dead_lettered_rejected_total counter
# HELP rabbitmq_global_messages_dead_lettered_rejected_total Total number of messages dead-lettered due to basic.reject or basic.nack
rabbitmq_global_messages_dead_lettered_rejected_total{queue_type="rabbit_classic_queue",dead_letter_strategy="at_most_once"} 0
rabbitmq_global_messages_dead_lettered_rejected_total{queue_type="rabbit_classic_queue",dead_letter_strategy="disabled"} 0
rabbitmq_global_messages_dead_lettered_rejected_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="at_least_once"} 0
rabbitmq_global_messages_dead_lettered_rejected_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="at_most_once"} 0
rabbitmq_global_messages_dead_lettered_rejected_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="disabled"} 0

我们仅抓取了 pod my-rabbit-server-1 的 Prometheus 指标。由于这些计数器是“节点全局”的,这意味着上述列表仅显示节点 my-rabbit-server-1 所观察到的指标(但在该节点的所有队列中是全局的)。

我们在停止节点 my-rabbit-server-0 之前,发送的第一条消息去了该节点。此后,我们的仲裁队列领导者从 my-rabbit-server-0 变为了 my-rabbit-server-1。我们随后使用 at-most-once 死信策略发送了第 2 条消息,并使用 at-least-once 死信策略发送了第 3 条消息。第 3 条被死信化的消息最终被死信消费者确认(换句话说,被目标队列确认)。这就是为什么以下计数器的值为 1

rabbitmq_global_messages_dead_lettered_confirmed_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="at_least_once"} 1
rabbitmq_global_messages_dead_lettered_expired_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="at_least_once"} 1
rabbitmq_global_messages_dead_lettered_expired_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="at_most_once"} 1

如果您有兴趣,可以抓取 pod my-rabbit-server-0 的 Prometheus 指标。您预期会看到什么?输出是否符合您的预期?提示:“Prometheus 计数器是一种累积指标,代表单个单调递增的计数器,其值只能增加或在重启时重置为零。”

注意事项

我们了解了死信消息即使在目标队列临时不可用的情况下,最终也能从源队列到达目标队列。那么,为什么“至少一次”死信机制不是默认的死信策略呢?

启用“至少一次”死信机制时,需要注意一些注意事项。

注意事项 1 - 源仲裁队列中的消息堆积

“至少一次”死信机制在确保消息在目标队列不可用或路由拓扑配置错误时不丢失方面做得非常好。然而,如果死信消费者进程长时间无法从所有目标队列获得发布者确认,而源队列中不断有更多消息被死信化,则可能导致源队列中出现过多的消息堆积。在最坏的情况下,源仲裁队列将仅包含死信消息。为防止过多的消息堆积,请为源队列设置队列长度限制max-lengthmax-length-bytes)。

注意事项 2 - 死信吞吐量

死信消费者有一个名为 dead_letter_worker_consumer_prefetch 的可配置设置,其当前默认值为 32。这意味着死信消费者进程在等待目标队列的发布者确认时,最多会预取并缓冲 32 条消息。

自 RabbitMQ 3.10 起,仲裁队列总是将所有消息体/载荷存储在磁盘上。仲裁队列中的每条消息仍然有极小的内存开销,因为它在内存中保存了每条消息的一些元数据(例如 Raft 索引和消息载荷大小)。

另一方面,死信消费者进程将消息体保存在内存中。为了防止最坏的情况(例如数百个仲裁队列启用了“至少一次”死信机制且未收到发布者确认),此预取值被设置为 32 这一中等默认值,以避免死信消费者导致高内存占用。

然而,较低的预取值会导致较低的吞吐量。如果您所处的场景需要持续每秒数千条消息的死信吞吐量(例如每秒有数千条消息过期或被拒绝),您可以在高级配置文件中增加预取设置。

以下是如何在 Kubernetes 中增加预取的示例:

---
apiVersion: rabbitmq.com/v1beta1
kind: RabbitmqCluster
metadata:
name: my-rabbit
spec:
replicas: 3
rabbitmq:
advancedConfig: |
[
{rabbit, [
{dead_letter_worker_consumer_prefetch, 512}
]}
].

注意事项 3 - 增加的资源使用

为每个仲裁队列启用“至少一次”死信机制会增加资源使用量,消耗更多的内存和 CPU。比较图 1(至多一次死信机制)和图 2(至少一次死信机制),我们观察到“至少一次”死信机制需要发送更多的消息(包括确认信息)。

注意事项 4 - 溢出策略 drop-head

如“用法”部分所述,启用 at-least-once 死信机制需要将 overflow 设置为 reject-publish。将 overflow 设置为 drop-head 会使死信策略回退到 at-most-once。不支持 drop-head,因为从源仲裁队列中丢弃死信消息将违反 at-least-once 语义。

注意事项 5 - 切换死信策略

对于仲裁队列,可以通过策略将死信策略从 at-most-once 切换到 at-least-once,反之亦然。如果更改了死信策略(无论是直接从 at-least-once 变为 at-most-once,还是间接更改,例如通过将溢出策略从 reject-publish 改为 drop-head,或取消设置 dead-letter-exchange),任何尚未被所有目标队列确认的死信消息都将在源仲裁队列中被永久删除。

最佳实践

基于以上了解,“至少一次”死信机制的最佳实践包括:

最佳实践 1

在源仲裁队列中设置 max-lengthmax-length-bytes,以防止过多的消息堆积。

最佳实践 2

将目标仲裁队列或流队列绑定到死信交换机。这比目标经典队列提供更高的可用性。对于目标仲裁队列或目标流,死信消息的重投递速度也比目标经典队列快。这是因为仲裁队列和流有它们自己的客户端、投递协议和重试机制。请记住,经典的镜像队列已被弃用。

最佳实践 3

对所有发布到源仲裁队列的消息设置持久化标志。如果未设置持久化标志,死信消息也不会设置该标志。这在死信消息被路由到目标经典队列时非常重要。

总结

在 RabbitMQ 3.10 之前,RabbitMQ 中的死信机制并不安全。被死信化的消息可能会因为各种原因而丢失——特别是在多节点 RabbitMQ 集群中。

“至少一次”死信机制确保了死信消息最终能够到达目标队列,即使在滚动升级以及网络分区或路由拓扑配置错误等临时故障的情况下。

除了仲裁队列的“至少一次”死信机制外,我们还了解了 RabbitMQ 3.10 中的另外两个新功能:仲裁队列中的消息 TTL 和死信消息的 Prometheus 指标。

由于“至少一次”死信机制会增加资源使用,因此只有当死信消息并非“已死”(原义),而是“活着”且对您的业务逻辑至关重要时,才应该启用它。如果您的用例中死信消息仅具有参考意义,则应使用“至多一次”死信机制。

仲裁队列的“至少一次”死信功能为新的用例铺平了道路,在这些用例中,您知道可能存在被拒绝确认但仍需处理的消息,或者您不能容忍带有过期 TTL 的消息丢失。这些场景在以前是不安全或难以通过 RabbitMQ 实现的。

© . This site is unofficial and not affiliated with VMware.