至少一次死信
RabbitMQ 3.10 中的仲裁队列 (Quorum queues) 提供了更安全的死信机制,该机制使用至少一次的保证来传输队列之间的消息。这篇博文将解释您开始使用至少一次死信所需了解的一切。
这篇帖子还介绍了 RabbitMQ 3.10 的另外两项功能:仲裁队列的消息生存时间 (TTL) 以及死信消息的 Prometheus 指标。
概述
存储在 RabbitMQ 队列中的一些消息会过期或被消费者否定确认。RabbitMQ 可以配置为将它们“死信”,而不是悄悄地将它们丢弃,即将这些消息重新发布到一个特殊用途的交换器。
在 RabbitMQ 3.10 之前,死信不安全。从队列(“源队列”)死信的消息不能保证会被传递到配置在dead-letter-exchange策略中的交换器所路由到的队列(以下称为“目标队列”)。
这是因为消息在死信时内部没有启用发布者确认。我们称之为“最多一次”死信策略。死信的消息可能会到达目标队列。它们也可能因为各种原因丢失。
- 目标队列不可用。例如,经典队列的主机节点已关闭或正在升级,或者仲裁队列暂时失去了大多数节点。
- 目标队列的长度限制已达到,而其溢出行为设置为
reject-publish,拒绝任何入站消息。 - 网络分区阻止了源队列和目标队列之间的通信。
- 死信路由拓扑配置不正确。例如,配置的
dead-letter-exchange不存在,或者没有目标队列绑定到dead-letter-exchange。
RabbitMQ 3.10 引入了一个名为“至少一次”死信的新功能。这是一个选择加入的功能,适用于作为仲裁队列的源队列。这项新功能确保了在源仲裁队列中死信的所有消息最终都会到达目标队列(经典队列、仲裁队列或流),即使在上述消息会因“最多一次”策略而丢失的情况下也是如此。
这篇博文介绍了如何启用至少一次死信的说明,提供了一个详细的示例,并描述了此新功能的注意事项和最佳实践。
用法
要为源仲裁队列启用at-least-once死信,我们需要应用以下策略(或其等效的以x-开头的队列参数):
dead-letter-strategy设置为at-least-once。默认值为at-most-once。overflow设置为reject-publish。默认值为drop-head。- 已配置
dead-letter-exchange。
此外,还必须启用功能标志stream_queue。默认情况下,此功能标志对 3.9 版本后创建的 RabbitMQ 集群是启用的。尽管流在至少一次死信中(除非目标队列恰好是一个流)不被使用,但stream_queue功能标志是必需的,因为至少一次死信依赖于该功能标志附带的一些实现细节。
示例
遵循此示例需要安装kubectl客户端,并将其指向任何正在运行的 Kubernetes 集群 v1.19 或更高版本。
如果您没有可用的 Kubernetes 集群,最快的方法是安装kind以在 Docker 中启动本地 Kubernetes 集群。
> kind create cluster
> kubectl apply -f https://github.com/rabbitmq/cluster-operator/releases/latest/download/cluster-operator.yml
部署一个 3 节点 RabbitMQ 集群。
> cat <<EOF | kubectl apply -f -
---
apiVersion: rabbitmq.com/v1beta1
kind: RabbitmqCluster
metadata:
name: my-rabbit
spec:
replicas: 3
image: rabbitmq:3.10.0-management
EOF
一旦所有 3 个 pod 都就绪(这需要不到 1 分钟),我们就创建一个源队列和一个目标队列。
> kubectl exec my-rabbit-server-0 -c rabbitmq -- rabbitmqadmin declare queue name=my-source-queue \
durable=true queue_type=quorum arguments='{"x-dead-letter-exchange" : "",
"x-dead-letter-routing-key" : "my-target-queue" , "x-overflow" : "reject-publish"}'
> kubectl exec my-rabbit-server-0 -c rabbitmq -- rabbitmqadmin declare queue name=my-target-queue \
durable=true queue_type=classic
最后两个命令通过在 pod my-rabbit-server-0的 RabbitMQ 容器中执行rabbitmqadmin命令来声明一个队列。
rabbitmqadmin命令是一个与 RabbitMQ Management API 对话的 Python 脚本。rabbitmqadmin命令不是声明队列和发送消息的推荐方式。我们在此博文中将其用于此目的,因为它是您遵循示例的最简单方法。
rabbitmq/cluster-operator 以<rabbitmq-cluster-name>-server-<index>的格式创建 pod 名称。在上面的 YAML 中,我们将<rabbitmq-cluster-name>定义为my-rabbit。
第一个命令创建源队列。为了使至少一次死信生效,它必须是queue_type=quorum。对于源队列,我们进一步定义了以 JSON 格式编码的队列参数(以x-开头)。
x-dead-letter-exchange设置为字符串("")表示源队列死信的消息将被发布到默认交换器。(虽然我们可以创建一个新的死信交换器,但死信到默认交换器可以使此示例更简单。)x-dead-letter-routing-key设置为my-target-queue表示死信消息将以路由键my-target-queue发布。由于此路由键与目标队列的队列名称(由第二个命令创建)匹配,因此死信消息将通过默认交换器路由到目标队列,而无需创建任何进一步的绑定。- 如上所述,
x-overflow必须设置为reject-publish,这是至少一次死信的先决条件。
第二个命令创建目标队列。它可以是任何队列类型。在本例中,我们选择了一个经典队列。请注意,与在所有 3 个节点上具有 3 个副本的源仲裁队列相比,目标经典队列不是高可用的,并且驻留在单个节点上。
让我们发布我们的第一条消息msg1。
> kubectl exec my-rabbit-server-0 -c rabbitmq -- rabbitmqadmin publish exchange=amq.default routing_key=my-source-queue \
payload=msg1 properties='{"expiration" : "1000", "delivery_mode" : 2}'
此命令演示了 RabbitMQ 3.10 的另一项新功能:仲裁队列支持消息 TTL。下图说明了消息流。
- 我们将一条消息发布到默认交换器。
- 它被路由到源仲裁队列,并在 1 秒(1000 毫秒)后过期。
- 过期导致消息被死信到默认交换器。
- 它被路由到目标经典队列。
请注意,我们将delivery_mode设置为整数2,表示消息已持久化。最初发布消息到源仲裁队列时,该标志无关紧要,因为仲裁队列中的所有消息都会写入磁盘。但是,当消息被死信到目标队列(可能不是仲裁队列)时,此持久化标志变得很重要。
我们可以验证消息是否已到达目标队列。
> kubectl exec my-rabbit-server-0 -c rabbitmq -- rabbitmqctl list_queues --formatter=pretty_table --quiet \
name type messages messages_ready messages_unacknowledged
┌─────────────────┬─────────┬──────────┬────────────────┬─────────────────────────┐
│ name │ type │ messages │ messages_ready │ messages_unacknowledged │
├─────────────────┼─────────┼──────────┼────────────────┼─────────────────────────┤
│ my-target-queue │ classic │ 1 │ 1 │ 0 │
├─────────────────┼─────────┼──────────┼────────────────┼─────────────────────────┤
│ my-source-queue │ quorum │ 0 │ 0 │ 0 │
└─────────────────┴─────────┴──────────┴────────────────┴─────────────────────────┘
接下来,让我们尝试一下当目标队列变得不可用时会发生什么。确定目标经典队列主机节点的一种方法是列出队列的进程标识符(PID)。
> kubectl exec my-rabbit-server-0 -c rabbitmq -- rabbitmqctl list_queues --quiet name pid
name pid
my-target-queue <rabbit@my-rabbit-server-0.my-rabbit-nodes.default.1646297039.856.0>
my-source-queue <rabbit@my-rabbit-server-0.my-rabbit-nodes.default.1646297039.821.0>
PID 显示目标经典队列进程和源仲裁队列领导者进程都位于 pod my-rabbit-server-0。让我们停止那个 RabbitMQ 服务器。
> kubectl exec my-rabbit-server-0 -c rabbitmq -- rabbitmqctl stop_app
Stopping rabbit application on node rabbit@my-rabbit-server-0.my-rabbit-nodes.default ...
源仲裁队列仍然可用,因为大多数节点(3 个中的 2 个)可用,并且另一个节点成为新的领导者。
与之前一样,我们再次将一条消息发送到源队列,并让它在一秒后过期。由于 pod my-rabbit-server-0中的 RabbitMQ 节点已关闭,我们在my-rabbit-server-1中执行以下命令。
> kubectl exec my-rabbit-server-1 -c rabbitmq -- rabbitmqadmin publish exchange=amq.default routing_key=my-source-queue \
payload=msg2 properties='{"expiration" : "1000", "delivery_mode" : 2}'
由于目标队列已关闭,它不会报告其统计信息。
> kubectl exec my-rabbit-server-1 -c rabbitmq -- rabbitmqctl list_queues --formatter=pretty_table --quiet \
name type messages messages_ready messages_unacknowledged state
┌─────────────────┬──────────────────────┬──────────┬────────────────┬─────────────────────────┬─────────┐
│ name │ type │ messages │ messages_ready │ messages_unacknowledged │ state │
├─────────────────┼──────────────────────┼──────────┼────────────────┼─────────────────────────┼─────────┤
│ my-target-queue │ rabbit_classic_queue │ │ │ │ down │
├─────────────────┼──────────────────────┼──────────┼────────────────┼─────────────────────────┼─────────┤
│ my-source-queue │ quorum │ 0 │ 0 │ 0 │ running │
└─────────────────┴──────────────────────┴──────────┴────────────────┴─────────────────────────┴─────────┘
但是,因为目标队列已关闭并且源队列不包含任何消息,所以我们知道第二条消息在死信时丢失了!
由于我们在上面声明源队列时尚未将dead-letter-strategy定义为at-least-once,因此源队列使用默认策略at-most-once。我们可以做得更好。在此示例中,我们通过应用策略将死信策略动态切换到at-least-once。
> kubectl exec my-rabbit-server-1 -c rabbitmq -- rabbitmqctl set_policy --apply-to queues \
my-policy my-source-queue '{"dead-letter-strategy" : "at-least-once"}'
Setting policy "my-policy" for pattern "my-source-queue" to "{"dead-letter-strategy" : "at-least-once"}"
with priority "0" for vhost "/" ...
让我们发送第三条消息。
> kubectl exec my-rabbit-server-1 -c rabbitmq -- rabbitmqadmin publish exchange=amq.default routing_key=my-source-queue \
payload=msg3 properties='{"expiration" : "1000", "delivery_mode" : 2}'
使用新的at-least-once策略,当第三条消息过期并被死信时,由于目标队列不可用,它将被源队列存储。
> kubectl exec my-rabbit-server-1 -c rabbitmq -- rabbitmqctl list_queues --formatter=pretty_table --quiet \
name type messages messages_ready messages_unacknowledged state
┌─────────────────┬──────────────────────┬──────────┬────────────────┬─────────────────────────┬─────────┐
│ name │ type │ messages │ messages_ready │ messages_unacknowledged │ state │
├─────────────────┼──────────────────────┼──────────┼────────────────┼─────────────────────────┼─────────┤
│ my-target-queue │ rabbit_classic_queue │ │ │ │ down │
├─────────────────┼──────────────────────┼──────────┼────────────────┼─────────────────────────┼─────────┤
│ my-source-queue │ quorum │ 1 │ 0 │ 0 │ running │
└─────────────────┴──────────────────────┴──────────┴────────────────┴─────────────────────────┴─────────┘
该消息既不是“就绪”(即可供常规队列使用者使用),也不是“未确认”(即已被常规队列使用者使用但尚未确认)。但是,消息安全地存储在源仲裁队列中,使用一个单独的数据结构,该结构只能由特殊的 RabbitMQ 内部死信消费者进程进行消耗。
让我们输出该死信消费者进程的日志。死信消费者进程与仲裁队列领导者节点位于同一位置。我们首先需要弄清楚哪个节点成为了新的领导者。
> kubectl exec my-rabbit-server-1 -c rabbitmq -- rabbitmqctl list_queues --formatter=pretty_table --quiet name leader
┌─────────────────┬───────────────────────────────────────────────────┐
│ name │ leader │
├─────────────────┼───────────────────────────────────────────────────┤
│ my-target-queue │ │
├─────────────────┼───────────────────────────────────────────────────┤
│ my-source-queue │ rabbit@my-rabbit-server-1.my-rabbit-nodes.default │
└─────────────────┴───────────────────────────────────────────────────┘
在我们的示例中,新领导者恰好在 pod my-rabbit-server-1上。当您运行此示例时,它也可能是my-rabbit-server-2,在这种情况下,您需要在以下命令中将1替换为2。
日志显示了描述性的警告消息。
> kubectl logs my-rabbit-server-1 -c rabbitmq | grep dead-letter
[warn] <0.4156.0> Cannot forward any dead-letter messages from source quorum queue 'my-source-queue'
in vhost '/' with configured dead-letter-exchange exchange '' in vhost '/' and configured
dead-letter-routing-key 'my-target-queue'. This can happen either if the dead-letter routing topology is misconfigured
(for example no queue bound to dead-letter-exchange or wrong dead-letter-routing-key configured)
or if non-mirrored classic queues are bound whose host node is down.
Fix this issue to prevent dead-lettered messages from piling up in the source quorum queue.
This message will not be logged again.
我们通过重新启动目标经典队列的主机节点来修复此问题。
> kubectl exec my-rabbit-server-0 -c rabbitmq -- rabbitmqctl start_app
Starting node rabbit@my-rabbit-server-0.my-rabbit-nodes.default ...
内部死信消费者进程会定期重试发送消息。当前的默认重试间隔为 3 分钟。最晚在 3 分钟后,第三条消息应该已经到达目标队列。
> kubectl exec my-rabbit-server-0 -c rabbitmq -- rabbitmqctl list_queues --formatter=pretty_table --quiet \
name type messages messages_ready messages_unacknowledged state
┌─────────────────┬─────────┬──────────┬────────────────┬─────────────────────────┬─────────┐
│ name │ type │ messages │ messages_ready │ messages_unacknowledged │ state │
├─────────────────┼─────────┼──────────┼────────────────┼─────────────────────────┼─────────┤
│ my-source-queue │ quorum │ 0 │ 0 │ 0 │ running │
├─────────────────┼─────────┼──────────┼────────────────┼─────────────────────────┼─────────┤
│ my-target-queue │ classic │ 2 │ 2 │ 0 │ running │
└─────────────────┴─────────┴──────────┴────────────────┴─────────────────────────┴─────────┘
我们的理解是,第一条和第三条消息在目标队列中,而第二条消息丢失了,因为它在使用at-most-once死信时目标队列是关闭的。
> kubectl exec my-rabbit-server-0 -c rabbitmq -- rabbitmqadmin get queue=my-target-queue count=2
+-----------------+----------+---------------+---------+---------------+------------------+-------------+
| routing_key | exchange | message_count | payload | payload_bytes | payload_encoding | redelivered |
+-----------------+----------+---------------+---------+---------------+------------------+-------------+
| my-target-queue | | 1 | msg1 | 4 | string | False |
| my-target-queue | | 0 | msg3 | 4 | string | False |
+-----------------+----------+---------------+---------+---------------+------------------+-------------+
payload列验证了我们的理解是正确的,并且at-least-once死信按预期工作。尽管目标队列不可用,但当它再次可用时,死信消息还是到达了目标队列。第一条消息仍然存储在目标队列中,因为我们在发布到源仲裁队列时设置了持久化标志。如果我们没有设置持久化标志,第一条消息也会丢失。
下图总结了第三条消息的流。
- 消息被发布到默认交换器。
- 消息被路由到源仲裁队列。仲裁队列是 Raft 共识算法中的复制状态机。仲裁队列的状态不仅仅包含消息从发布者入队的队列数据结构:状态还包括关于发布者、消费者以及发送到(但尚未被消费者确认)的消息以及其他一些统计信息。
At-least-once死信向仲裁队列的状态添加了另一个队列数据结构:一个只包含死信消息的队列。因此,当消息在一秒后过期时,它会从“普通”消息队列移动到死信消息队列。消息安全地存储在那里,直到它被步骤 7 确认。 - 有一个(RabbitMQ 内部)死信消费者进程与仲裁队列领导者节点位于同一位置。它的工作是从单个源仲裁队列的死信消息队列中消费消息,将它们转发到所有目标队列,等待接收到**所有**发布者确认(步骤 6),最后将死信消息确认回源仲裁队列(步骤 7)。
- 死信消费者通过配置的
dead-letter-exchange路由死信消息。在我们的示例中,我们将默认交换器配置为死信交换器。如果路由不存在,死信消费者将在一段时间后尝试再次路由。 - 如果路由存在,消息将被发送到目标队列。
- 目标队列向死信消费者发送发布者确认。
- 死信消费者将消费者确认发送回源仲裁队列,其中的死信消息将被删除。
Prometheus 指标
RabbitMQ 3.10 附带了另一个新功能:用于死信消息的 Prometheus 指标。节点全局计数器将返回死信消息的数量,并按以下维度细分:
-
死信原因
expired:消息 TTL 超时(如我们的示例)。rejected:消费者发送了basic.reject或basic.nack且未设置 requeue 选项。maxlen:队列长度超过限制,且overflow设置为drop-head或reject-publish-dlx。(后者仅适用于经典队列。)delivery_limit:投递次数超过限制。(仅适用于仲裁队列)。消息被重新排队次数过多,例如因为消费者发送了带 requeue 选项的basic.reject或basic.nack,或者消费者与仲裁队列领导者断开连接。
-
源队列类型。即消息被死信的队列类型**来自**。
rabbit_classic_queuerabbit_quorum_queue- (流不执行死信,因为它们是仅附加日志,消息根据保留策略被截断。)
-
死信策略
disabled:队列没有配置dead-letter-exchange,或者配置的dead-letter-exchange不存在,这意味着消息将被丢弃。at_most_once:队列配置的死信交换器存在。at_least_once:队列类型为rabbit_quorum_queue,已配置dead-letter-exchange,dead-letter-strategy设置为at-least-once,overflow设置为reject-publish。
遵循我们的示例,让我们输出这些指标。在一个 shell 窗口中,将 RabbitMQ 的 Prometheus 端口15692进行端口转发。
> kubectl port-forward pod/my-rabbit-server-1 15692
在另一个 shell 窗口中,抓取 Prometheus 端点。
> curl --silent localhost:15692/metrics/ | grep rabbitmq_global_messages_dead_lettered
# TYPE rabbitmq_global_messages_dead_lettered_confirmed_total counter
# HELP rabbitmq_global_messages_dead_lettered_confirmed_total Total number of messages dead-lettered and confirmed by target queues
rabbitmq_global_messages_dead_lettered_confirmed_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="at_least_once"} 1
# TYPE rabbitmq_global_messages_dead_lettered_delivery_limit_total counter
# HELP rabbitmq_global_messages_dead_lettered_delivery_limit_total Total number of messages dead-lettered due to
# delivery-limit exceeded
rabbitmq_global_messages_dead_lettered_delivery_limit_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="at_least_once"} 0
rabbitmq_global_messages_dead_lettered_delivery_limit_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="at_most_once"} 0
rabbitmq_global_messages_dead_lettered_delivery_limit_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="disabled"} 0
# TYPE rabbitmq_global_messages_dead_lettered_expired_total counter
# HELP rabbitmq_global_messages_dead_lettered_expired_total Total number of messages dead-lettered due to message TTL exceeded
rabbitmq_global_messages_dead_lettered_expired_total{queue_type="rabbit_classic_queue",dead_letter_strategy="at_most_once"} 0
rabbitmq_global_messages_dead_lettered_expired_total{queue_type="rabbit_classic_queue",dead_letter_strategy="disabled"} 0
rabbitmq_global_messages_dead_lettered_expired_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="at_least_once"} 1
rabbitmq_global_messages_dead_lettered_expired_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="at_most_once"} 1
rabbitmq_global_messages_dead_lettered_expired_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="disabled"} 0
# TYPE rabbitmq_global_messages_dead_lettered_maxlen_total counter
# HELP rabbitmq_global_messages_dead_lettered_maxlen_total Total number of messages dead-lettered due to overflow drop-head
# or reject-publish-dlx
rabbitmq_global_messages_dead_lettered_maxlen_total{queue_type="rabbit_classic_queue",dead_letter_strategy="at_most_once"} 0
rabbitmq_global_messages_dead_lettered_maxlen_total{queue_type="rabbit_classic_queue",dead_letter_strategy="disabled"} 0
rabbitmq_global_messages_dead_lettered_maxlen_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="at_most_once"} 0
rabbitmq_global_messages_dead_lettered_maxlen_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="disabled"} 0
# TYPE rabbitmq_global_messages_dead_lettered_rejected_total counter
# HELP rabbitmq_global_messages_dead_lettered_rejected_total Total number of messages dead-lettered due to basic.reject or basic.nack
rabbitmq_global_messages_dead_lettered_rejected_total{queue_type="rabbit_classic_queue",dead_letter_strategy="at_most_once"} 0
rabbitmq_global_messages_dead_lettered_rejected_total{queue_type="rabbit_classic_queue",dead_letter_strategy="disabled"} 0
rabbitmq_global_messages_dead_lettered_rejected_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="at_least_once"} 0
rabbitmq_global_messages_dead_lettered_rejected_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="at_most_once"} 0
rabbitmq_global_messages_dead_lettered_rejected_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="disabled"} 0
我们只抓取了 pod my-rabbit-server-1的 Prometheus 指标。由于这些计数器是“节点全局”的,这意味着上面的列表只显示了节点my-rabbit-server-1观察到的指标(但对于该节点上的所有队列都是全局的)。
我们发送的第一条消息在停止节点之前发送到了 pod my-rabbit-server-0。之后,我们的示例中的仲裁队列领导者从my-rabbit-server-0更改为my-rabbit-server-1。然后我们使用at-most-once死信策略发送了第二条消息,使用at-least-once死信策略发送了第三条消息。死信的第三条消息最终被死信消费者确认(换句话说,被目标队列确认)。这就是为什么以下计数器的值为1。
rabbitmq_global_messages_dead_lettered_confirmed_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="at_least_once"} 1
rabbitmq_global_messages_dead_lettered_expired_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="at_least_once"} 1
rabbitmq_global_messages_dead_lettered_expired_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="at_most_once"} 1
如果您好奇,可以抓取 pod my-rabbit-server-0的 Prometheus 指标。您会期望看到什么?输出是否符合您的预期?提示:“Prometheus 计数器是一个累积指标,它表示一个单一的单调递增的计数器,其值只能增加或在重启时重置为零。”
注意事项
我们已经看到,即使目标队列暂时不可用,死信的消息最终也能从死信源队列到达死信目标队列。那么,为什么至少一次死信不是新的默认死信策略呢?
启用至少一次死信时,有一些注意事项需要牢记。
注意事项 1 - 源仲裁队列中的消息堆积
至少一次死信在确保消息不会在目标队列暂时不可用或路由拓扑配置不正确时丢失方面做得很好。但是,如果死信消费者进程长时间无法从**所有**目标队列获得发布者确认,并且越来越多的消息在源队列中死信,则可能导致源队列中的消息过度堆积。在最坏的情况下,源仲裁队列将仅包含死信消息。为了防止消息过度堆积,请为源队列设置队列长度限制(max-length或max-length-bytes)。
注意事项 2 - 死信吞吐量
死信消费者有一个可配置的设置,名为dead_letter_worker_consumer_prefetch,其当前默认值为32。这意味着死信消费者进程将在等待来自目标队列的发布者确认时,最多预取和缓冲 32 条消息。
自 RabbitMQ 3.10 起,仲裁队列始终将所有消息体/负载存储在磁盘上。由于仲裁队列在内存中保留每条消息的某些元数据(例如 Raft 索引和消息负载大小),因此仲裁队列中每条消息仍然有非常小的内存开销。
另一方面,死信消费者进程将消息体保存在内存中。为了防止最坏的情况,即数百个仲裁队列启用了至少一次死信并且未收到发布者确认,此预取值设置为适度的默认值32,以避免死信消费者造成高内存使用。
然而,较低的预取值会导致吞吐量降低。如果您有一个需要每秒数千条消息的持续死信吞吐量(例如,每秒数千条消息过期或被拒绝)的场景,您可以在高级配置文件中增加预取设置。
以下是如何在 Kubernetes 中增加预取的示例。
---
apiVersion: rabbitmq.com/v1beta1
kind: RabbitmqCluster
metadata:
name: my-rabbit
spec:
replicas: 3
rabbitmq:
advancedConfig: |
[
{rabbit, [
{dead_letter_worker_consumer_prefetch, 512}
]}
].
注意事项 3 - 资源使用增加
为每个仲裁队列启用至少一次死信将增加资源使用。将消耗更多的内存和更多的 CPU。比较图 1(最多一次死信)和图 2(至少一次死信),我们观察到至少一次死信需要发送更多的消息(包括确认)。
注意事项 4 - 溢出drop-head
如用法部分所述,启用at-least-once死信需要将overflow设置为reject-publish。将overflow设置为drop-head将使死信策略回退到at-most-once。不支持drop-head,因为从源仲裁队列中删除死信消息会违反at-least-once语义。
注意事项 5 - 切换死信策略
对于仲裁队列,可以通过策略将死信策略从at-most-once切换到at-least-once,反之亦然。如果死信策略被更改,无论是直接从at-least-once更改为at-most-once,还是间接更改(例如,通过将overflow从reject-publish更改为drop-head或取消设置dead-letter-exchange),任何尚未被所有目标队列确认的死信消息都将在源仲裁队列中被永久删除。
最佳实践
基于我们上面学到的内容,至少一次死信的最佳实践包括:
最佳实践 1
在源仲裁队列中设置max-length或max-length-bytes以防止消息过度堆积。
最佳实践 2
将目标仲裁队列或流绑定到死信交换器。这比目标经典队列提供了更高的可用性。对于目标仲裁队列或目标流,死信消息的重传也将比目标经典队列更快。这是因为仲裁队列和流有自己的客户端、投递协议和重试机制。请记住,经典镜像队列已被弃用。
最佳实践 3
在发布到源仲裁队列的所有消息上设置持久化标志。如果未设置持久化标志,死信消息也不会设置。当死信消息路由到目标经典队列时,这一点会变得很重要。
总结
在 RabbitMQ 3.10 之前,RabbitMQ 中的死信是不安全的。死信的消息可能因为各种原因丢失——尤其是在多节点 RabbitMQ 集群中。
至少一次死信可确保死信消息最终到达目标队列,即使在滚动升级和临时故障(如网络分区或路由拓扑配置错误)的情况下也是如此。
除了仲裁队列的至少一次死信,我们还学习了 RabbitMQ 3.10 中的另外两个新功能:仲裁队列中的消息 TTL 和死信消息的 Prometheus 指标。
由于至少一次死信会增加资源使用,因此只有当死信消息不是“死”的(在原始意义上),而是对您的业务逻辑至关重要时,才应启用它。如果在您的用例中,死信消息仅具有信息性质,则应使用最多一次死信。
仲裁队列的至少一次死信功能为新的用例铺平了道路,这些用例适用于您知道可能被否定确认但仍需要处理的消息,或者您无法丢失具有过期 TTL 的消息。以前,这些场景在 RabbitMQ 中是不安全的或难以实现的。

