跳至主要内容

至少一次死信投递

·阅读时长:20分钟

RabbitMQ 3.10 中,仲裁队列提供了一种更安全的死信投递形式,它使用至少一次的保证来确保消息在队列之间的传输。这篇博文解释了开始使用至少一次死信投递所需了解的一切。

本文还介绍了RabbitMQ 3.10 的另外两个功能:仲裁队列的消息生存时间 (TTL) 和死信消息的 Prometheus 指标。

概述

存储在 RabbitMQ 队列中的一些消息将过期或被消费者否定确认。RabbitMQ 可以配置为将这些消息“死信投递”而不是静默丢弃,也就是说将这些消息重新发布到一个专用交换机。

在 RabbitMQ 3.10 之前,死信投递并不安全。从队列(“源队列”)投递到死信的消息无法保证传递到由 dead-letter-exchange 策略中配置的交换机路由到的队列(以下简称“目标队列”)。

这是因为消息在死信投递时内部没有启用发布确认。我们称之为“最多一次”死信策略。死信投递的消息可能会到达目标队列。它们也可能由于各种原因丢失

  • 目标队列不可用。例如,经典队列的主机节点已关闭或正在升级,或者仲裁队列暂时丢失了大多数节点。
  • 目标队列的长度限制已达到,同时其溢出行为设置为 reject-publish,拒绝所有传入的消息。
  • 网络分区阻止了源队列和目标队列之间的通信。
  • 死信路由拓扑未正确配置。例如,配置的 dead-letter-exchange 不存在,或者没有目标队列绑定到 dead-letter-exchange

RabbitMQ 3.10 引入了一项名为“至少一次”死信投递的新功能。这是一项针对源队列为仲裁队列的可选功能。此新功能可确保所有从源仲裁队列投递到死信的消息最终都会到达目标队列(经典队列、仲裁队列或流),即使在上述“最多一次”策略下消息会丢失的场景中也是如此。

这篇博文介绍了如何启用至少一次死信投递的说明、提供了一个详细的示例,并描述了此新功能的注意事项和最佳实践。

用法

要为源仲裁队列启用 至少一次 死信投递,我们需要应用以下策略(或其等效的以 x- 开头的队列参数)

  • dead-letter-strategy 设置为 at-least-once。默认值为 at-most-once
  • overflow 设置为 reject-publish。默认值为 drop-head
  • dead-letter-exchange 已配置。

此外,必须启用功能标志 stream_queue。默认情况下,该功能标志已为 3.9 之后创建的 RabbitMQ 集群启用。即使在至少一次死信投递中未使用流(除非目标队列碰巧是流),也需要 stream_queue 功能标志,因为至少一次死信投递依赖于该功能标志附带的一些实现细节。

示例

按照此示例需要安装kubectl 客户端并使其指向任何正在运行的 Kubernetes 集群 v1.19 或更高版本。

如果无法使用 Kubernetes 集群,最快的办法是安装kind 以在 Docker 中启动本地 Kubernetes 集群。

> kind create cluster

安装rabbitmq/cluster-operator

> kubectl apply -f https://github.com/rabbitmq/cluster-operator/releases/latest/download/cluster-operator.yml

部署一个 3 节点的 RabbitMQ 集群

> cat <<EOF | kubectl apply -f -
---
apiVersion: rabbitmq.com/v1beta1
kind: RabbitmqCluster
metadata:
name: my-rabbit
spec:
replicas: 3
image: rabbitmq:3.10.0-management
EOF

一旦所有 3 个 Pod 准备就绪(不到 1 分钟),我们就创建一个源队列和一个目标队列

> kubectl exec my-rabbit-server-0 -c rabbitmq -- rabbitmqadmin declare queue name=my-source-queue \
durable=true queue_type=quorum arguments='{"x-dead-letter-exchange" : "",
"x-dead-letter-routing-key" : "my-target-queue" , "x-overflow" : "reject-publish"}'

> kubectl exec my-rabbit-server-0 -c rabbitmq -- rabbitmqadmin declare queue name=my-target-queue \
durable=true queue_type=classic

最后两条命令通过在 Pod my-rabbit-server-0 的 RabbitMQ 容器中执行 rabbitmqadmin 命令来声明队列。

rabbitmqadmin 命令是一个与 RabbitMQ 管理 API 通信的 Python 脚本。rabbitmqadmin 命令不是声明队列和发送消息的推荐方法。我们在本博文中使用它,因为它是最简单的方法供您遵循示例。

rabbitmq/cluster-operator 以 <rabbitmq-cluster-name>-server-<index> 的格式创建 Pod 名称。在上面的 YAML 中,我们将 <rabbitmq-cluster-name> 定义为 my-rabbit

第一条命令创建源队列。为了使至少一次死信投递正常工作,它必须是 queue_type=quorum。对于源队列,我们进一步定义了以 JSON 格式编码的队列参数(以 x- 开头)

  • x-dead-letter-exchange 设置为空字符串 ("") 表示源队列投递到死信的消息发布到默认交换机。(虽然我们可以创建一个新的死信交换机,但将死信投递到默认交换机可以使此示例更简单。)
  • x-dead-letter-routing-key 设置为 my-target-queue 表示死信投递的消息将使用路由键 my-target-queue 发布。由于此路由键与目标队列的队列名称(由第 2 条命令创建)匹配,因此死信投递的消息将由默认交换机路由到目标队列,而无需创建任何其他绑定。
  • 如上所述,x-overflow 必须设置为 reject-publish 作为至少一次死信投递的先决条件。

第二条命令创建目标队列。它可以是任何队列类型。在此示例中,我们选择了一个经典队列。请注意,与源仲裁队列在所有 3 个节点上具有 3 个副本相比,目标经典队列不是高可用的,并且驻留在单个节点上。

让我们发布我们的第一条消息 msg1

> kubectl exec my-rabbit-server-0 -c rabbitmq -- rabbitmqadmin publish exchange=amq.default routing_key=my-source-queue \
payload=msg1 properties='{"expiration" : "1000", "delivery_mode" : 2}'

此命令演示了 RabbitMQ 3.10 的另一个新功能:仲裁队列支持消息 TTL。下图说明了消息的流程

  1. 我们将一条消息发布到默认交换机。
  2. 它被路由到源仲裁队列,在那里它在 1 秒(1000 毫秒)后过期。
  3. 过期导致消息被投递到死信到默认交换机。
  4. 它被路由到目标经典队列。

Figure 1: Dead letter routing topology (at-most-once)
图 1:死信路由拓扑(最多一次)

请注意,我们将 delivery_mode 设置为整数 2,表示消息是持久化的。在最初将消息发布到源仲裁队列时,此标志无关紧要,因为仲裁队列中的所有消息都会写入磁盘。但是,一旦消息被投递到死信到目标队列(可能不是仲裁队列),此持久化标志就变得很重要。

我们可以验证消息是否已到达目标队列

> kubectl exec my-rabbit-server-0 -c rabbitmq -- rabbitmqctl list_queues --formatter=pretty_table --quiet \
name type messages messages_ready messages_unacknowledged

┌─────────────────┬─────────┬──────────┬────────────────┬─────────────────────────┐
│ name │ type │ messages │ messages_ready │ messages_unacknowledged │
├─────────────────┼─────────┼──────────┼────────────────┼─────────────────────────┤
│ my-target-queue │ classic │ 1 │ 1 │ 0 │
├─────────────────┼─────────┼──────────┼────────────────┼─────────────────────────┤
│ my-source-queue │ quorum │ 0 │ 0 │ 0 │
└─────────────────┴─────────┴──────────┴────────────────┴─────────────────────────┘

接下来,让我们尝试一下目标队列不可用时会发生什么。确定目标经典队列的主机节点的一种方法是列出队列的进程标识符 (PID)

> kubectl exec my-rabbit-server-0 -c rabbitmq -- rabbitmqctl list_queues --quiet name pid

name pid
my-target-queue <rabbit@my-rabbit-server-0.my-rabbit-nodes.default.1646297039.856.0>
my-source-queue <rabbit@my-rabbit-server-0.my-rabbit-nodes.default.1646297039.821.0>

PID 显示目标经典队列进程和源仲裁队列领导者进程都驻留在 Pod my-rabbit-server-0 中。让我们停止该 RabbitMQ 服务器

> kubectl exec my-rabbit-server-0 -c rabbitmq -- rabbitmqctl stop_app

Stopping rabbit application on node [email protected] ...

源仲裁队列仍然可用,因为大多数节点(3 个中的 2 个)可用,另一个节点成为新的领导者。

与之前一样,我们再次向源队列发送一条消息,并让它在 1 秒后过期。由于 Pod my-rabbit-server-0 中的 RabbitMQ 节点已关闭,因此我们在 my-rabbit-server-1 中执行以下命令

> kubectl exec my-rabbit-server-1 -c rabbitmq -- rabbitmqadmin publish exchange=amq.default routing_key=my-source-queue \
payload=msg2 properties='{"expiration" : "1000", "delivery_mode" : 2}'

由于目标队列已关闭,它将不会报告其统计信息

>  kubectl exec my-rabbit-server-1 -c rabbitmq -- rabbitmqctl list_queues --formatter=pretty_table --quiet \
name type messages messages_ready messages_unacknowledged state

┌─────────────────┬──────────────────────┬──────────┬────────────────┬─────────────────────────┬─────────┐
│ name │ type │ messages │ messages_ready │ messages_unacknowledged │ state │
├─────────────────┼──────────────────────┼──────────┼────────────────┼─────────────────────────┼─────────┤
│ my-target-queue │ rabbit_classic_queue │ │ │ │ down │
├─────────────────┼──────────────────────┼──────────┼────────────────┼─────────────────────────┼─────────┤
│ my-source-queue │ quorum │ 0 │ 0 │ 0 │ running │
└─────────────────┴──────────────────────┴──────────┴────────────────┴─────────────────────────┴─────────┘

但是,由于目标队列已关闭,并且由于源队列不包含任何消息,因此我们知道第二条消息在死信投递时丢失了!

由于在上面声明源队列时我们尚未将 dead-letter-strategy 定义为 at-least-once,因此源队列使用默认策略 at-most-once。我们可以做得更好。在此示例中,我们通过应用策略动态地将死信策略切换到 at-least-once

> kubectl exec my-rabbit-server-1 -c rabbitmq -- rabbitmqctl set_policy --apply-to queues \
my-policy my-source-queue '{"dead-letter-strategy" : "at-least-once"}'

Setting policy "my-policy" for pattern "my-source-queue" to "{"dead-letter-strategy" : "at-least-once"}"
with priority "0" for vhost "/" ...

让我们发送第三条消息

> kubectl exec my-rabbit-server-1 -c rabbitmq -- rabbitmqadmin publish exchange=amq.default routing_key=my-source-queue \
payload=msg3 properties='{"expiration" : "1000", "delivery_mode" : 2}'

使用新的 at-least-once 策略,当第 3 条消息过期并被投递到死信时,它将被源队列存储,因为目标队列不可用

> kubectl exec my-rabbit-server-1 -c rabbitmq -- rabbitmqctl list_queues --formatter=pretty_table --quiet \
name type messages messages_ready messages_unacknowledged state

┌─────────────────┬──────────────────────┬──────────┬────────────────┬─────────────────────────┬─────────┐
│ name │ type │ messages │ messages_ready │ messages_unacknowledged │ state │
├─────────────────┼──────────────────────┼──────────┼────────────────┼─────────────────────────┼─────────┤
│ my-target-queue │ rabbit_classic_queue │ │ │ │ down │
├─────────────────┼──────────────────────┼──────────┼────────────────┼─────────────────────────┼─────────┤
│ my-source-queue │ quorum │ 1 │ 0 │ 0 │ running │
└─────────────────┴──────────────────────┴──────────┴────────────────┴─────────────────────────┴─────────┘

该消息既不是“就绪”(即可供普通队列使用者使用)也不是“未确认”(即被普通队列使用者使用但尚未确认)。但是,该消息安全地保存在源仲裁队列中一个单独的数据结构中,该数据结构仅可供特殊的 RabbitMQ 内部死信使用者进程使用。

让我们输出该死信使用者进程的日志。死信使用者进程与仲裁队列领导者节点位于同一位置。我们首先需要确定哪个节点成为新的领导者

> kubectl exec my-rabbit-server-1 -c rabbitmq -- rabbitmqctl list_queues --formatter=pretty_table --quiet name leader
┌─────────────────┬───────────────────────────────────────────────────┐
│ name │ leader │
├─────────────────┼───────────────────────────────────────────────────┤
│ my-target-queue │ │
├─────────────────┼───────────────────────────────────────────────────┤
│ my-source-queue │ [email protected]
└─────────────────┴───────────────────────────────────────────────────┘

在我们的示例中,新的领导者碰巧位于 Pod my-rabbit-server-1 上。当您运行此示例时,它也可能是 my-rabbit-server-2,在这种情况下,您需要在下面的命令中将 1 替换为 2

日志显示了一条描述性警告消息

> kubectl logs my-rabbit-server-1 -c rabbitmq | grep dead-letter

[warn] <0.4156.0> Cannot forward any dead-letter messages from source quorum queue 'my-source-queue'
in vhost '/' with configured dead-letter-exchange exchange '' in vhost '/' and configured
dead-letter-routing-key 'my-target-queue'. This can happen either if the dead-letter routing topology is misconfigured
(for example no queue bound to dead-letter-exchange or wrong dead-letter-routing-key configured)
or if non-mirrored classic queues are bound whose host node is down.
Fix this issue to prevent dead-lettered messages from piling up in the source quorum queue.
This message will not be logged again.

我们通过重新启动目标经典队列的主机节点来解决此问题

> kubectl exec my-rabbit-server-0 -c rabbitmq -- rabbitmqctl start_app

Starting node [email protected] ...

内部死信使用者进程会定期重试发送消息。当前的默认重试间隔为 3 分钟。最晚在 3 分钟后,第 3 条消息应该已到达目标队列

> kubectl exec my-rabbit-server-0 -c rabbitmq -- rabbitmqctl list_queues --formatter=pretty_table --quiet \
name type messages messages_ready messages_unacknowledged state

┌─────────────────┬─────────┬──────────┬────────────────┬─────────────────────────┬─────────┐
│ name │ type │ messages │ messages_ready │ messages_unacknowledged │ state │
├─────────────────┼─────────┼──────────┼────────────────┼─────────────────────────┼─────────┤
│ my-source-queue │ quorum │ 0 │ 0 │ 0 │ running │
├─────────────────┼─────────┼──────────┼────────────────┼─────────────────────────┼─────────┤
│ my-target-queue │ classic │ 2 │ 2 │ 0 │ running │
└─────────────────┴─────────┴──────────┴────────────────┴─────────────────────────┴─────────┘

我们理解第 1 条和第 3 条消息位于目标队列中,但第 2 条消息丢失了,因为在目标队列关闭时它使用了 最多一次 死信投递

> kubectl exec my-rabbit-server-0 -c rabbitmq -- rabbitmqadmin get queue=my-target-queue count=2
+-----------------+----------+---------------+---------+---------------+------------------+-------------+
| routing_key | exchange | message_count | payload | payload_bytes | payload_encoding | redelivered |
+-----------------+----------+---------------+---------+---------------+------------------+-------------+
| my-target-queue | | 1 | msg1 | 4 | string | False |
| my-target-queue | | 0 | msg3 | 4 | string | False |
+-----------------+----------+---------------+---------+---------------+------------------+-------------+

payload 列验证了我们的理解是否正确,以及 at-least-once 死信投递是否按预期工作。即使目标队列不可用,死信消息在目标队列再次可用后也成功投递到了目标队列。第一条消息仍然存储在目标队列中,因为我们在发布到源仲裁队列时设置了持久化标志。如果我们没有设置持久化标志,第一条消息也会丢失。

下图总结了第三条消息的流程。

Figure 2: Dead letter routing topology (at-least-once)
图 2:死信路由拓扑(at-least-once)

  1. 消息发布到默认交换机。
  2. 消息被路由到源仲裁队列。仲裁队列是 Raft 共识算法中的一个复制状态机。仲裁队列的状态不仅仅包含一个用于存储发布者消息的队列数据结构:状态还包括有关发布者、消费者以及发送到(但尚未被消费者确认)的消息的信息,以及一些其他统计信息。At-least-once 死信投递在仲裁队列的状态中添加了另一个队列数据结构:一个仅包含死信消息的队列。因此,当消息在 1 秒后过期时,它将从“普通”消息队列移动到死信消息队列。消息安全地存储在那里,直到步骤 7 确认它。
  3. 在仲裁队列领导者节点上,有一个(RabbitMQ 内部)死信消费者进程与其共存。它的工作是从单个源仲裁队列的死信消息队列中消费消息,并将它们转发到所有目标队列,等待所有发布者确认收到(步骤 6),最后向源仲裁队列确认死信消息(步骤 7)。
  4. 死信消费者通过配置的 dead-letter-exchange 路由死信消息。在我们的示例中,我们将默认交换机配置为死信交换机。如果路由不存在,死信消费者将在一段时间后尝试再次路由。
  5. 如果路由存在,则消息将发送到目标队列。
  6. 目标队列向死信消费者发送发布者确认。
  7. 死信消费者向源仲裁队列发送消费者确认,在那里死信消息将被删除。

Prometheus 指标

RabbitMQ 3.10 带来了另一个新功能:死信消息的 Prometheus 指标。节点全局计数器将返回死信消息的数量,并按以下维度细分

  1. 死信原因

    • expired:消息 TTL 超时(如我们的示例)。
    • rejected:消费者发送了 basic.rejectbasic.nack 且未设置重新入队选项。
    • maxlen:队列长度超过限制,并且 overflow 设置为 drop-headreject-publish-dlx。(后者仅适用于经典队列。)
    • delivery_limit:投递限制超过。 (仅适用于仲裁队列)。例如,由于消费者发送了带有重新入队选项的 basic.rejectbasic.nack,或者消费者与仲裁队列领导者断开连接,导致消息被重新入队次数过多。
  2. 源队列类型。即消息来自哪个队列类型

    • rabbit_classic_queue
    • rabbit_quorum_queue
    • (流不会将消息设为死信,因为它们是追加式日志,其中消息根据保留策略被截断。)
  3. 死信策略

    • disabled:队列未配置 dead-letter-exchange 或配置的 dead-letter-exchange 不存在,这意味着消息将被丢弃。
    • at_most_once:队列配置的死信交换机存在。
    • at_least_once:队列类型为 rabbit_quorum_queue,配置了 dead-letter-exchangedead-letter-strategy 设置为 at-least-onceoverflow 设置为 reject-publish

接下来,让我们根据我们的示例输出这些指标。在一个 shell 窗口中,将 RabbitMQ 的 Prometheus 端口 15692 进行端口转发

> kubectl port-forward pod/my-rabbit-server-1 15692

在另一个 shell 窗口中,抓取 Prometheus 端点

> curl --silent localhost:15692/metrics/ | grep rabbitmq_global_messages_dead_lettered

# TYPE rabbitmq_global_messages_dead_lettered_confirmed_total counter
# HELP rabbitmq_global_messages_dead_lettered_confirmed_total Total number of messages dead-lettered and confirmed by target queues
rabbitmq_global_messages_dead_lettered_confirmed_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="at_least_once"} 1

# TYPE rabbitmq_global_messages_dead_lettered_delivery_limit_total counter
# HELP rabbitmq_global_messages_dead_lettered_delivery_limit_total Total number of messages dead-lettered due to
# delivery-limit exceeded
rabbitmq_global_messages_dead_lettered_delivery_limit_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="at_least_once"} 0
rabbitmq_global_messages_dead_lettered_delivery_limit_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="at_most_once"} 0
rabbitmq_global_messages_dead_lettered_delivery_limit_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="disabled"} 0

# TYPE rabbitmq_global_messages_dead_lettered_expired_total counter
# HELP rabbitmq_global_messages_dead_lettered_expired_total Total number of messages dead-lettered due to message TTL exceeded
rabbitmq_global_messages_dead_lettered_expired_total{queue_type="rabbit_classic_queue",dead_letter_strategy="at_most_once"} 0
rabbitmq_global_messages_dead_lettered_expired_total{queue_type="rabbit_classic_queue",dead_letter_strategy="disabled"} 0
rabbitmq_global_messages_dead_lettered_expired_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="at_least_once"} 1
rabbitmq_global_messages_dead_lettered_expired_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="at_most_once"} 1
rabbitmq_global_messages_dead_lettered_expired_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="disabled"} 0

# TYPE rabbitmq_global_messages_dead_lettered_maxlen_total counter
# HELP rabbitmq_global_messages_dead_lettered_maxlen_total Total number of messages dead-lettered due to overflow drop-head
# or reject-publish-dlx
rabbitmq_global_messages_dead_lettered_maxlen_total{queue_type="rabbit_classic_queue",dead_letter_strategy="at_most_once"} 0
rabbitmq_global_messages_dead_lettered_maxlen_total{queue_type="rabbit_classic_queue",dead_letter_strategy="disabled"} 0
rabbitmq_global_messages_dead_lettered_maxlen_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="at_most_once"} 0
rabbitmq_global_messages_dead_lettered_maxlen_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="disabled"} 0

# TYPE rabbitmq_global_messages_dead_lettered_rejected_total counter
# HELP rabbitmq_global_messages_dead_lettered_rejected_total Total number of messages dead-lettered due to basic.reject or basic.nack
rabbitmq_global_messages_dead_lettered_rejected_total{queue_type="rabbit_classic_queue",dead_letter_strategy="at_most_once"} 0
rabbitmq_global_messages_dead_lettered_rejected_total{queue_type="rabbit_classic_queue",dead_letter_strategy="disabled"} 0
rabbitmq_global_messages_dead_lettered_rejected_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="at_least_once"} 0
rabbitmq_global_messages_dead_lettered_rejected_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="at_most_once"} 0
rabbitmq_global_messages_dead_lettered_rejected_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="disabled"} 0

我们仅抓取了 pod my-rabbit-server-1 的 Prometheus 指标。由于这些计数器是“节点全局”的,这意味着上面的列表仅显示了节点 my-rabbit-server-1 观察到的指标(但在该节点上的所有队列中是全局的)。

我们发送的第一条消息在停止该节点之前发送到了 pod my-rabbit-server-0。此后,在我们的示例中,仲裁队列领导者从 my-rabbit-server-0 更改为 my-rabbit-server-1。然后,我们使用 at-most-once 死信策略发送了第二条消息,并使用 at-least-once 死信策略发送了第三条消息。最终,死信消费者确认了被设为死信的第三条消息(或者换句话说,目标队列确认了它)。这就是以下计数器值为 1 的原因

rabbitmq_global_messages_dead_lettered_confirmed_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="at_least_once"} 1
rabbitmq_global_messages_dead_lettered_expired_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="at_least_once"} 1
rabbitmq_global_messages_dead_lettered_expired_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="at_most_once"} 1

如果您好奇,可以抓取 pod my-rabbit-server-0 的 Prometheus 指标。您期望看到什么?输出是否符合您的预期?提示:“Prometheus 计数器是一个累积指标,表示一个单调递增的计数器,其值只能增加或在重新启动时重置为零。”

注意事项

我们看到了如何将被设为死信的消息最终从死信源队列发送到死信目标队列,即使目标队列暂时不可用也是如此。那么,为什么 at-least-once 死信投递不是新的默认死信策略呢?

启用 at-least-once 死信投递时,需要注意一些事项

注意事项 1 - 源仲裁队列中的消息积压

At-least-once 死信投递在确保目标队列暂时不可用或路由拓扑配置不正确时消息不会丢失方面做得很好。但是,如果死信消费者进程长时间无法从所有目标队列获取发布者确认,而源队列中越来越多的消息不断被设为死信,则会导致源队列中消息过度积压。在最坏的情况下,源仲裁队列将仅包含死信消息。为防止消息过度积压,请为源队列设置队列长度限制max-lengthmax-length-bytes)。

注意事项 2 - 死信吞吐量

死信消费者有一个可配置的设置,称为 dead_letter_worker_consumer_prefetch,其当前默认值为 32。这意味着死信消费者进程将最多预取和缓冲 32 条消息,同时等待来自目标队列的发布者确认。

由于 RabbitMQ 3.10 仲裁队列始终将所有消息正文/有效负载存储在磁盘上。仲裁队列中每条消息仍然存在非常小的每消息内存开销,因为仲裁队列在内存中保存每条消息的一些元数据(例如 Raft 索引和消息有效负载大小)。

另一方面,死信消费者进程将消息正文保存在内存中。为了防止出现数百个仲裁队列启用了 at-least-once 死信投递且未收到发布者确认的最坏情况,将此预取值设置为 32 的适中默认值,以防止死信消费者导致高内存使用率。

但是,较低的预取值会导致较低的吞吐量。如果您遇到需要每秒持续处理数千条死信消息的吞吐量的情况(例如,每秒钟有数千条消息过期或被拒绝),您可以在高级配置文件中增加预取设置。

以下是如何在 Kubernetes 中增加预取的示例

---
apiVersion: rabbitmq.com/v1beta1
kind: RabbitmqCluster
metadata:
name: my-rabbit
spec:
replicas: 3
rabbitmq:
advancedConfig: |
[
{rabbit, [
{dead_letter_worker_consumer_prefetch, 512}
]}
].

注意事项 3 - 资源使用增加

为每个仲裁队列启用 at-least-once 死信投递将增加资源使用量。将消耗更多的内存和 CPU。比较图 1(at-most-once 死信投递)和图 2(at-least-once 死信投递),我们观察到 at-least-once 死信投递将需要发送更多消息(包括确认)。

注意事项 4 - Overflow drop-head

用法部分所述,启用 at-least-once 死信投递需要将 overflow 设置为 reject-publish。将 overflow 设置为 drop-head 将使死信策略回退到 at-most-once。不支持 drop-head,因为从源仲裁队列中删除死信消息将违反 at-least-once 语义。

注意事项 5 - 切换死信策略

对于仲裁队列,可以通过策略在 at-most-onceat-least-once 之间切换死信策略。如果死信策略直接从 at-least-once 更改为 at-most-once 或间接更改,例如通过将 overflow 从 reject-publish 更改为 drop-head 或取消设置 dead-letter-exchange,则源仲裁队列中所有目标队列尚未确认的任何死信消息都将被永久删除。

最佳实践

根据我们上面学到的内容,at-least-once 死信投递的最佳实践包括

最佳实践 1

在源仲裁队列中设置 max-lengthmax-length-bytes 以防止消息过度积压。

最佳实践 2

将目标仲裁队列或流绑定到死信交换机。这比目标经典队列提供了更高的可用性。对于目标仲裁队列或目标流,死信消息的重新投递也将比目标经典队列更快。这是因为仲裁队列和流有自己的客户端、投递协议和重试机制。请记住,经典镜像队列已弃用。

最佳实践 3

在发布到源仲裁队列的所有消息上设置持久化标志。如果未设置持久化标志,则死信消息也不会设置它。当死信消息被路由到目标经典队列时,这变得很重要。

总结

在 RabbitMQ 3.10 之前,RabbitMQ 中的死信投递并不安全。被设为死信的消息可能由于各种原因而丢失——尤其是在多节点 RabbitMQ 集群中。

At-least-once 死信投递确保死信消息最终到达目标队列,即使存在滚动升级和临时故障(如网络分区或路由拓扑配置错误)。

除了用于仲裁队列的至少一次死信投递外,我们还在 RabbitMQ 3.10 中了解了另外两个新特性:仲裁队列中的消息 TTL 和死信消息的 Prometheus 指标。

由于至少一次死信投递会增加资源使用量,因此只有在死信消息并非“真正”的死信,而是“存活”且对您的业务逻辑至关重要时才应启用它。如果在您的用例中,死信消息仅具有信息性质,则应使用至多一次死信投递。

仲裁队列的至少一次死信投递功能为新的用例铺平了道路,在这些用例中,您知道可能会对消息进行负确认,但仍然需要处理这些消息,或者您不能丢失具有过期 TTL 的消息。以前,使用 RabbitMQ 执行此类场景是不安全的或难以实现的。

© 2024 RabbitMQ. All rights reserved.