AMQP 1.0 修改后的结果
这篇博客文章探讨了 AMQP 1.0 修改后的结果 的用例。
修改后的结果是 AMQP 1.0 独有的 功能,在 AMQP 0.9.1 中不可用。它在 仲裁队列 中受支持,但在 经典队列 中不受支持。
此功能使消费者能够在重新排队或 死信 消息之前添加或更新 消息注释。
重新排队
在重新排队消息时包含额外的元数据对于提高消息处理期间的可追溯性和调试非常有价值。
例如,使用 RabbitMQ AMQP 1.0 Java 客户端 的应用程序可以在仲裁队列的头部重新排队消息之前设置特定的消息注释,如下所示
Consumer consumer = connection.consumerBuilder()
.queue(ordersQueue)
.messageHandler((context, message) -> {
Map<String, Object> annotations = new HashMap<>();
annotations.put("x-opt-requeue-reason", "external_service_unavailable");
annotations.put("x-opt-requeue-time", System.currentTimeMillis());
annotations.put("x-opt-requeued-by", "consumer_1");
context.requeue(annotations);
}).build();
这些注释可以使用不同的类型,包括 map、list 或 array。这种灵活性不仅允许设置诸如上次重新排队的原因、时间和消费者等详细信息,还允许跟踪重新排队事件的历史记录。维护这样的历史记录可以揭示模式,例如识别更频繁地重新排队消息的消费者或发现系统中常见的重新排队原因。但是,请记住,仲裁队列在内存中保留修改后的消息注释,这会增加每个重新排队消息的内存开销。
AMQP 0.9.1 不支持在队列头部重新排队消息之前设置自定义标头。
无论是通过 AMQP 1.0 还是 AMQP 0.9.1 将消息重新排队到仲裁队列,x-delivery-count 注释将始终递增。
死信
在死信消息时,消费者可以在消息注释中包含死信的自定义原因
Consumer consumer = connection.consumerBuilder()
.queue(ordersQueue)
.messageHandler((context, message) -> {
Map<String, Object> annotations = new HashMap<>();
annotations.put("x-opt-dead-letter-reason", "Incompatible Message Format");
context.discard(annotations);
}).build();
当死信到 标头交换器 时,消费者甚至可以决定消息将被路由到哪个目标队列
在此示例中,两个死信仲裁队列绑定到死信标头交换器
transient-failures-dlq
business-logic-failures-dlq
不同的死信队列可以由不同的应用或团队处理,根据故障的性质采取不同的操作。例如,transient-failures-dlq
中的所有消息都可以重新发布到原始的 orders
队列,而 business-logic-failures-dlq
中的消息可能需要人工干预。
可以添加更多死信队列,例如
data-integrity-dlq
用于具有未知模式的消息resource-limit-dlq
用于超出速率限制的情况critical-errors-dlq
用于需要管理员注意的情况。
至关重要的是,从 orders
队列死信的所有消息都应该是可路由的。上图中的 备用交换器 提供了“否则”路由语义,确保在未设置 x-opt-dead-letter-category
注释的情况下,消息最终会进入 uncategorised-dlq
。例如,如果发布者设置了 ttl
标头,但没有消费者授予 链接信用,则可能发生这种情况,导致消息过期并被死信。
上面描述的场景在 modified-outcome 示例应用程序 中得到演示。
modified-outcome 示例应用程序
示例应用程序使用 RabbitMQ AMQP 1.0 Java 客户端。
您可以按如下方式运行此示例应用程序
- 通过
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:4.0-management
启动 RabbitMQ 服务器 - 在 示例应用程序 的根目录中,通过
mvn clean compile exec:java
启动客户端。
在将消息发布到 orders
队列后,客户端应用程序会消费该消息,并在控制台上输出以下内容
publisher: received ACCEPTED outcome
consumer: setting annotations {x-opt-dead-letter-reason=Customer Not Eligible for Discount, x-opt-dead-letter-category=business-logic} and dead lettering...
该消息将被死信到 business-logic-failures-dlq
。
为了防止死信期间的消息丢失,示例应用程序使用了 至少一次死信。
死信 vs. 重新发布
AMQP 0.9.1 消费者无法在死信消息之前设置自定义标头。但是,AMQP 0.9.1 客户端可以采用以下方法,而不是使用 basic.nack
或 basic.reject
以及 requeue=false
来死信消息
- 将消息直接重新发布到具有新自定义标头的特定“死信”队列。
- 等待 RabbitMQ 确认重新发布的消息。
- 通过
basic.ack
确认原始消息。
AMQP 1.0 客户端可以选择使用带有自定义消息注释的死信或重新发布消息。这两种方法都有其优点和缺点
标准 | 带有自定义原因的死信 | 带有自定义原因的重新发布 |
---|---|---|
简易性 | 对于消费者来说更简单。 | 更复杂,因为消费者必须处理重新发布过程。 |
开销 | 低开销。 | 客户端的开销更高:消息负载必须从客户端重新发布到 RabbitMQ,由于额外的发布和确认步骤,导致额外的延迟。 |
客户端和 RabbitMQ 之间在解决消费消息之前的网络故障。 | 消息被重新排队。 | 消息可能既被重新发布又被重新排队,导致一个副本最终进入“死信”队列,另一个副本进入原始队列。 |
灵活性 | 只能修改消息注释并根据死信标头交换器进行路由。 | 允许修改消息的任何部分并重新发布到任何交换器。 |
总结
AMQP 1.0 的修改后的结果功能允许消费者在重新排队或死信之前修改消息注释。
消费者可以自定义死信事件跟踪,甚至可以选择将消息发送到哪个死信队列,而不是仅仅依赖 RabbitMQ 通过 x-opt-deaths 进行的内置死信跟踪。