跳至主内容
版本:4.2

直接回复到

概述

Direct Reply-To (直接回复目标) 允许您在不创建专用回复 队列 的情况下实现类似 教程 6 中的 RPC (请求/回复) 模式。

动机

RPC (请求/回复) 是 RabbitMQ 等消息代理中一种常见的模式。 教程 6 展示了多种客户端实现。通常,请求者 (RPC 客户端) 发送被路由到长期存在的、已知的请求队列的消息。响应者 (RPC 服务器) 从该队列消费消息,并使用请求消息 reply-to 属性中提供的队列名称发送回复。

请求者的回复队列从何而来?请求者可以为每个请求-回复对声明一个一次性队列,但这效率低下:即使是非复制队列,与接收回复的成本相比,创建和删除它的成本也相对较高。在集群中,开销更高,因为所有节点都必须就创建、类型、复制参数和其他元数据达成一致。

更好的方法是为每个请求者创建一个回复队列并跨请求重用它。该队列的 属性 取决于用例

  • 独占队列 在单个客户端消费回复且队列在断开连接时被删除时很常见。
  • 非独占、长期存在的队列适用于长时间运行的任务,其中回复应在短暂的客户端断开连接后仍然存在。

Direct Reply-To 是 RabbitMQ 特有的替代方案,它完全消除了回复队列。这意味着

  • 不会向 元数据存储 (Khepri) 写入队列元数据。
  • 没有队列会缓冲或持久化回复消息。
  • 回复队列没有单独的 Erlang 进程。

主要优点

  • 降低 元数据存储 的负载 (无需插入/删除队列元数据)。
  • 降低 管理 HTTP API 的负载:在 管理 UI 中需要列出的队列更少,需要发出的指标也更少。
  • 代理上的 Erlang 进程更少。

使用 Direct Reply-To 时,在代理端,响应者的 AMQP 1.0 会话 或 AMQP 0.9.1 通道 进程直接将回复传递给请求者的会话/通道进程,而无需经过实际队列。

“直接”仍然意味着通过代理;客户端应用程序之间没有点对点网络连接。

何时使用 Direct Reply-To

主要用例是规模化:许多 (数万) 客户端执行请求/回复。

虽然客户端应偏好长期连接,但 Direct Reply-To 也适用于高连接流失,即客户端连接以执行单个 RPC 然后立即断开连接。避免队列创建/删除可以减少开销和延迟。

由于 Direct Reply-To 对回复具有最多一次的传递语义,因此仅在可以容忍丢失回复的情况下使用它。例如,如果在超时内未收到回复,则假定请求者会重新发送请求。

何时避免使用 Direct Reply-To

如果出现以下任何情况,请避免使用 Direct Reply-To

  • 您需要对回复有至少一次的保证 (即,丢失回复是不可接受的)。
  • 回复必须由代理进行持久化缓冲。
  • 您需要对同一请求者的高吞吐量 (例如,每秒数百条消息)。队列用于在消费者跟不上时进行缓冲。

对于具有长期连接和多个 RPC 的工作负载,Direct Reply-To 的优势相对于使用经典队列较小。现代 RabbitMQ 版本对经典队列进行了优化,以实现低延迟和低资源使用,因此在这些场景中它们可以同样高效。使用显式声明的经典队列进行常规的请求-回复同样有效,并且对于长时间运行的任务可能更可取。

代理实现细节

内部,RabbitMQ 使用 rabbit_volatile_queue 队列类型来实现 Direct Reply-To。“Volatile”描述了其语义:非持久化、零缓冲、最多一次、可能丢失,并且不存储在元数据存储中。

您只会看到 rabbit_volatile_queue 在少数几个地方。实例不会出现在管理 UI 或 rabbitmqctl list_queues 中。

其中一个地方是 Prometheus 指标,例如

  • rabbitmq_global_messages_delivered_total{protocol="amqp10",queue_type="rabbit_volatile_queue"} 传递给 AMQP 1.0 请求者的消息 (“直接回复”) 的数量。
  • rabbitmq_global_messages_delivered_total{protocol="amqp091",queue_type="rabbit_volatile_queue"} 传递给 AMQP 0.9.1 请求者的消息 (“直接回复”) 的数量。
  • rabbitmq_global_messages_dead_lettered_maxlen_total{queue_type="rabbit_volatile_queue",dead_letter_strategy="disabled"} 被 RabbitMQ 丢弃的消息 (“直接回复”) 的数量。

用法

Direct Reply-To 支持 AMQP 1.0 和 AMQP 0.9.1。它也适用于跨协议 (例如,AMQP 1.0 请求者与 AMQP 0.9.1 响应者,反之亦然)。

AMQP 1.0 中的用法

请求者首先附加一个链接以接收回复消息。 attach (带有 source) 必须设置特定的字段。如果您的 AMQP 1.0 客户端库支持 Direct Reply-To,它将为您设置这些字段 (请参阅 示例)。否则

  • snd-settle-mode 设置为 settled,因为 RabbitMQ 发送所有已处理的回复。如果请求者断开连接或拒绝回复,则没有队列可以返回回复。
  • address 留空;RabbitMQ 将生成地址。
  • durable 留空或设置为 none;RabbitMQ 元数据存储中不保留任何状态。
  • expiry-policy 设置为 link-detach
  • timeout 留空或设置为 0
  • capabilities 中包含 rabbitmq:volatile-queue

RabbitMQ 在 attach 响应的 address 字段中返回一个代理生成的伪队列地址。它看起来像 /queues/amq.rabbitmq.reply-to.<opaque-suffix>,其中 <opaque-suffix> 对客户端没有意义。

在发送第一个请求之前,请求者必须为此伪队列授予链接信用。

对于每个请求,设置以下消息 属性

  1. message-id:一个全局唯一的值。(响应者会将回复的 correlation-id 设置为此值。)
  2. reply-to:在 attach 响应中收到的地址。

响应者读取请求的 reply-to 并通过以下两种选项之一将回复发送到该地址

  1. 将发送链接附加到匿名终结点 (null target address),如 Target Address v2 中所述,并在消息的 to 属性中设置回复地址。当回复许多不同的请求者时很有用 (没有每个请求者的链接)。
  2. 直接创建一个发送链接到提供的地址。在这种情况下,RabbitMQ 会检查请求者是否仍然连接;如果不是,RabbitMQ 将拒绝该链接。

如果响应者将执行昂贵的操作,它可以主动检查请求者是否仍然存在,通过 AMQP 发出 HTTP GET 请求。200 状态码表示请求者仍然连接 (请参阅 示例)。

AMQP 1.0 的注意事项和限制

  • 请求者必须以已处理 (settled) 的方式接收回复。如果请求者断开连接或拒绝回复,则没有队列可以返回回复。
  • 请求者只能在附加了接收链接的同一连接和会话上接收回复。不支持从另一个会话上的此伪队列进行消费。
  • 响应者应以已处理 (settled) 的方式发送回复。如果它发送未经处理的回复,RabbitMQ 会立即以 accepted 结果来处理,即使回复随后可能被丢弃。
  • 通过 Direct Reply-To 发送的回复不是容错的。由于此伪队列不进行缓冲,因此 RabbitMQ 会在以下情况下丢弃回复
    • AMQP 1.0 请求者的链接信用耗尽。请求者有责任授予足够的链接信用。
    • AMQP 1.0 会话流控制不允许传递。请求者必须保持其 incoming-window 足够大。
    • 代理的 AMQP 1.0 写入器进程无法足够快地将回复发送给请求者。

如果消息丢失是不可接受的,请使用经典队列而不是 Direct Reply-To。

示例:AMQP 1.0

String requestQueue = "request-queue";

// create the responder
Responder responder = connection.responderBuilder()
.requestQueue(requestQueue)
.handler((ctx, req) -> {
// check whether the requester is still connected (optional)
if (ctx.isRequesterAlive(req)) {
String in = new String(req.body(), UTF_8);
String out = "*** " + in + " ***";
return ctx.message(out.getBytes(UTF_8));
} else {
return null;
}
}).build();

// create the requester, it uses direct reply-to by default
Requester requester = connection.requesterBuilder()
.requestAddress().queue(requestQueue)
.requester()
.build();

// create the request message
Message request = requester.message("hello".getBytes(UTF_8));
// send the request
CompletableFuture<Message> responseFuture = requester.publish(request);
// wait for the response
Message response = responseFuture.get(10, TimeUnit.SECONDS);

AMQP 0.9.1 中的用法

要使用 Direct Reply-To,请求者必须

  1. 以无确认 (no-ack) 模式从伪队列 amq.rabbitmq.reply-to 进行消费。无需预先声明此“队列” (尽管客户端可以)。
  2. 将请求消息的 reply-to 设置为 amq.rabbitmq.reply-to

转发请求时,RabbitMQ 会透明地将 reply-to 重写为 amq.rabbitmq.reply-to.<opaque-suffix>,其中 <opaque-suffix> 对客户端没有意义。然后,响应者使用该值作为路由键将回复发布到默认交换机 ("")。

如果响应者将执行昂贵的操作,它可以检查客户端是否已断开连接,方法是在一个一次性通道上被动声明生成的回复队列名称。即使 passive=false,也无法创建它;声明要么成功 (0 条就绪消息,1 个消费者),要么失败。

AMQP 0.9.1 的注意事项和限制

  • 请求者必须在自动确认模式下进行消费。如果请求者断开连接或拒绝回复,则没有队列可以返回回复。
  • 请求者必须使用同一连接和通道来从 amq.rabbitmq.reply-to 进行消费以及发布请求。
  • 通过 Direct Reply-To 发送的回复不是容错的;如果发布请求的客户端断开连接,它们将被丢弃。假定请求者会重新连接并重新提交请求。
  • amq.rabbitmq.reply-to 这个名称在 basic.consumereply-to 属性中被用作队列;然而它不是。它不能被删除,也不会出现在管理插件或 rabbitmqctl list_queues 中。
  • 如果响应者使用 mandatory 标志发布消息,amq.rabbitmq.reply-to.* 将被视为用于路由的队列。在路由时不会检查请求者是否仍然存在。换句话说,仅路由到此名称的消息被认为是“已路由”,RabbitMQ 不会发送 basic.return
  • 请求者每个通道最多可以创建一个 Direct Reply-To 消费者 (basic.consume)。

示例:AMQP 0.9.1

%% 1. Requester consumes from pseudo-queue in no-ack mode.
amqp_channel:subscribe(RequesterChan,
#'basic.consume'{queue = <<"amq.rabbitmq.reply-to">>,
no_ack = true},
self()),
CTagRequester = receive #'basic.consume_ok'{consumer_tag = CTag} -> CTag
end,

%% 2. Requester sends the request.
amqp_channel:cast(
RequesterChan,
#'basic.publish'{routing_key = RequestQueue},
#amqp_msg{props = #'P_basic'{reply_to = <<"amq.rabbitmq.reply-to">>,
message_id = RpcId},
payload = RequestPayload}),

%% 3. Responder receives the request.
{ReplyTo, RpcId} =
receive {#'basic.deliver'{consumer_tag = CTagResponder},
#amqp_msg{payload = RequestPayload,
props = #'P_basic'{reply_to = ReplyTo0,
message_id = RpcId0}}} ->
{ReplyTo0, RpcId0}
end,

%% 4. Responder replies.
amqp_channel:cast(
ResponderChan,
#'basic.publish'{routing_key = ReplyTo},
#amqp_msg{props = #'P_basic'{correlation_id = RpcId},
payload = ReplyPayload}),

%% 5. Requester receives the reply
receive {#'basic.deliver'{consumer_tag = CTagRequester},
#amqp_msg{payload = ReplyPayload,
props = #'P_basic'{correlation_id = RpcId}}} ->
%% process reply here...
ok
end.
© . This site is unofficial and not affiliated with VMware.