直接回复到
概述
Direct Reply-To (直接回复目标) 允许您在不创建专用回复 队列 的情况下实现类似 教程 6 中的 RPC (请求/回复) 模式。
动机
RPC (请求/回复) 是 RabbitMQ 等消息代理中一种常见的模式。 教程 6 展示了多种客户端实现。通常,请求者 (RPC 客户端) 发送被路由到长期存在的、已知的请求队列的消息。响应者 (RPC 服务器) 从该队列消费消息,并使用请求消息 reply-to 属性中提供的队列名称发送回复。
请求者的回复队列从何而来?请求者可以为每个请求-回复对声明一个一次性队列,但这效率低下:即使是非复制队列,与接收回复的成本相比,创建和删除它的成本也相对较高。在集群中,开销更高,因为所有节点都必须就创建、类型、复制参数和其他元数据达成一致。
更好的方法是为每个请求者创建一个回复队列并跨请求重用它。该队列的 属性 取决于用例
- 独占队列 在单个客户端消费回复且队列在断开连接时被删除时很常见。
- 非独占、长期存在的队列适用于长时间运行的任务,其中回复应在短暂的客户端断开连接后仍然存在。
Direct Reply-To 是 RabbitMQ 特有的替代方案,它完全消除了回复队列。这意味着
- 不会向 元数据存储 (Khepri) 写入队列元数据。
- 没有队列会缓冲或持久化回复消息。
- 回复队列没有单独的 Erlang 进程。
主要优点
- 降低 元数据存储 的负载 (无需插入/删除队列元数据)。
- 降低 管理 HTTP API 的负载:在 管理 UI 中需要列出的队列更少,需要发出的指标也更少。
- 代理上的 Erlang 进程更少。
使用 Direct Reply-To 时,在代理端,响应者的 AMQP 1.0 会话 或 AMQP 0.9.1 通道 进程直接将回复传递给请求者的会话/通道进程,而无需经过实际队列。
“直接”仍然意味着通过代理;客户端应用程序之间没有点对点网络连接。
何时使用 Direct Reply-To
主要用例是规模化:许多 (数万) 客户端执行请求/回复。
虽然客户端应偏好长期连接,但 Direct Reply-To 也适用于高连接流失,即客户端连接以执行单个 RPC 然后立即断开连接。避免队列创建/删除可以减少开销和延迟。
由于 Direct Reply-To 对回复具有最多一次的传递语义,因此仅在可以容忍丢失回复的情况下使用它。例如,如果在超时内未收到回复,则假定请求者会重新发送请求。
何时避免使用 Direct Reply-To
如果出现以下任何情况,请避免使用 Direct Reply-To
- 您需要对回复有至少一次的保证 (即,丢失回复是不可接受的)。
- 回复必须由代理进行持久化缓冲。
- 您需要对同一请求者的高吞吐量 (例如,每秒数百条消息)。队列用于在消费者跟不上时进行缓冲。
对于具有长期连接和多个 RPC 的工作负载,Direct Reply-To 的优势相对于使用经典队列较小。现代 RabbitMQ 版本对经典队列进行了优化,以实现低延迟和低资源使用,因此在这些场景中它们可以同样高效。使用显式声明的经典队列进行常规的请求-回复同样有效,并且对于长时间运行的任务可能更可取。
代理实现细节
内部,RabbitMQ 使用 rabbit_volatile_queue 队列类型来实现 Direct Reply-To。“Volatile”描述了其语义:非持久化、零缓冲、最多一次、可能丢失,并且不存储在元数据存储中。
您只会看到 rabbit_volatile_queue 在少数几个地方。实例不会出现在管理 UI 或 rabbitmqctl list_queues 中。
其中一个地方是 Prometheus 指标,例如
rabbitmq_global_messages_delivered_total{protocol="amqp10",queue_type="rabbit_volatile_queue"}传递给 AMQP 1.0 请求者的消息 (“直接回复”) 的数量。rabbitmq_global_messages_delivered_total{protocol="amqp091",queue_type="rabbit_volatile_queue"}传递给 AMQP 0.9.1 请求者的消息 (“直接回复”) 的数量。rabbitmq_global_messages_dead_lettered_maxlen_total{queue_type="rabbit_volatile_queue",dead_letter_strategy="disabled"}被 RabbitMQ 丢弃的消息 (“直接回复”) 的数量。
用法
Direct Reply-To 支持 AMQP 1.0 和 AMQP 0.9.1。它也适用于跨协议 (例如,AMQP 1.0 请求者与 AMQP 0.9.1 响应者,反之亦然)。
AMQP 1.0 中的用法
请求者首先附加一个链接以接收回复消息。 attach (带有 source) 必须设置特定的字段。如果您的 AMQP 1.0 客户端库支持 Direct Reply-To,它将为您设置这些字段 (请参阅 示例)。否则
- 将
snd-settle-mode设置为settled,因为 RabbitMQ 发送所有已处理的回复。如果请求者断开连接或拒绝回复,则没有队列可以返回回复。 - 将
address留空;RabbitMQ 将生成地址。 - 将
durable留空或设置为none;RabbitMQ 元数据存储中不保留任何状态。 - 将
expiry-policy设置为link-detach。 - 将
timeout留空或设置为0。 - 在
capabilities中包含rabbitmq:volatile-queue。
RabbitMQ 在 attach 响应的 address 字段中返回一个代理生成的伪队列地址。它看起来像 /queues/amq.rabbitmq.reply-to.<opaque-suffix>,其中 <opaque-suffix> 对客户端没有意义。
在发送第一个请求之前,请求者必须为此伪队列授予链接信用。
对于每个请求,设置以下消息 属性
message-id:一个全局唯一的值。(响应者会将回复的correlation-id设置为此值。)reply-to:在attach响应中收到的地址。
响应者读取请求的 reply-to 并通过以下两种选项之一将回复发送到该地址
- 将发送链接附加到匿名终结点 (null target address),如 Target Address v2 中所述,并在消息的
to属性中设置回复地址。当回复许多不同的请求者时很有用 (没有每个请求者的链接)。 - 直接创建一个发送链接到提供的地址。在这种情况下,RabbitMQ 会检查请求者是否仍然连接;如果不是,RabbitMQ 将拒绝该链接。
如果响应者将执行昂贵的操作,它可以主动检查请求者是否仍然存在,通过 AMQP 发出 HTTP GET 请求。200 状态码表示请求者仍然连接 (请参阅 示例)。
AMQP 1.0 的注意事项和限制
- 请求者必须以已处理 (settled) 的方式接收回复。如果请求者断开连接或拒绝回复,则没有队列可以返回回复。
- 请求者只能在附加了接收链接的同一连接和会话上接收回复。不支持从另一个会话上的此伪队列进行消费。
- 响应者应以已处理 (settled) 的方式发送回复。如果它发送未经处理的回复,RabbitMQ 会立即以 accepted 结果来处理,即使回复随后可能被丢弃。
- 通过 Direct Reply-To 发送的回复不是容错的。由于此伪队列不进行缓冲,因此 RabbitMQ 会在以下情况下丢弃回复
- AMQP 1.0 请求者的链接信用耗尽。请求者有责任授予足够的链接信用。
- AMQP 1.0 会话流控制不允许传递。请求者必须保持其
incoming-window足够大。 - 代理的 AMQP 1.0 写入器进程无法足够快地将回复发送给请求者。
如果消息丢失是不可接受的,请使用经典队列而不是 Direct Reply-To。
示例:AMQP 1.0
- Java
- Erlang
- Go
String requestQueue = "request-queue";
// create the responder
Responder responder = connection.responderBuilder()
.requestQueue(requestQueue)
.handler((ctx, req) -> {
// check whether the requester is still connected (optional)
if (ctx.isRequesterAlive(req)) {
String in = new String(req.body(), UTF_8);
String out = "*** " + in + " ***";
return ctx.message(out.getBytes(UTF_8));
} else {
return null;
}
}).build();
// create the requester, it uses direct reply-to by default
Requester requester = connection.requesterBuilder()
.requestAddress().queue(requestQueue)
.requester()
.build();
// create the request message
Message request = requester.message("hello".getBytes(UTF_8));
// send the request
CompletableFuture<Message> responseFuture = requester.publish(request);
// wait for the response
Message response = responseFuture.get(10, TimeUnit.SECONDS);
%% 1. Requester attaches its receiving link.
OpnConfRequester = OpnConfRequester0#{notify_with_performative => true},
{ok, ConnRequester} = amqp10_client:open_connection(OpnConfRequester),
{ok, SessionRequester} = amqp10_client:begin_session_sync(ConnRequester),
Source = #{address => undefined,
durable => none,
expiry_policy => <<"link-detach">>,
dynamic => true,
capabilities => [<<"rabbitmq:volatile-queue">>]},
AttachArgs = #{name => <<"receiver requester">>,
role => {receiver, Source, self()},
snd_settle_mode => settled,
rcv_settle_mode => first},
{ok, ReceiverRequester} = amqp10_client:attach_link(SessionRequester, AttachArgs),
%% Requester learns the broker-generated reply address.
Addr = receive {amqp10_event, {link, ReceiverRequester, {attached, Attach}}} ->
#'v1_0.attach'{
source = #'v1_0.source'{
address = {utf8, Addr0}}} = Attach,
Addr0
end,
%% Requester must grant link credit before sending the first request.
ok = amqp10_client:flow_link_credit(ReceiverRequester, 1000, 500),
%% 2. Requester sends the request.
ok = amqp10_client:send_msg(
SenderRequester,
amqp10_msg:set_properties(
#{message_id => RpcId,
reply_to => Addr},
amqp10_msg:new(DeliveryTag, RequestPayload, true))),
%% 3. Responder receives the request and reads relevant properties.
...
#{message_id := RpcId,
reply_to := ReplyToAddr} = amqp10_msg:properties(RequestMsg),
%% Optionally, the responder checks whether the requester is still connected.
{ok, #{queue := ReplyQueue}} = rabbitmq_amqp_address:to_map(ReplyToAddr),
case rabbitmq_amqp_client:get_queue(LinkPairResponder, ReplyQueue) of
{ok, #{}} ->
%% requester is still there
ok;
_ ->
throw(requester_absent)
end,
%% 4. Responder replies (attached to the anonymous terminus).
ok = amqp10_client:send_msg(
SenderResponder,
amqp10_msg:set_properties(
#{to => ReplyToAddr,
correlation_id => RpcId},
amqp10_msg:new(Tag, ReplyPayload, true))),
%% 5. Requester receives the reply.
receive {amqp10_msg, ReceiverRequester, ReplyMsg} ->
%% process reply here...
ok
end.
完整的示例可在教程仓库中找到。
// RPC client creates a receiver
receiver, err := session.NewReceiver(ctx, "", &amqp.ReceiverOptions{
SourceCapabilities: []string{"rabbitmq:volatile-queue"},
SourceExpiryPolicy: amqp.ExpiryPolicyLinkDetach,
DynamicAddress: true,
RequestedSenderSettleMode: amqp.SenderSettleModeSettled.Ptr(),
})
// RPC client uses the generated address when sending a request
replyAddress := receiver.Address()
requestMsg := &amqp.Message{
Properties: &amqp.MessageProperties{
MessageID: messageID,
ReplyTo: &replyAddress,
},
Data: ...,
}
// RPC server extracts the message ID and reply-to address
msg, _ := receiver.Receive(ctx, nil)
_ = receiver.AcceptMessage(ctx, msg)
messageID := msg.Properties.MessageID.(string)
replyTo := *msg.Properties.ReplyTo
// RPC server uses the reply-to value and message ID in its response
sender, _ := session.NewSender(ctx, replyTo, nil)
replyMsg := &amqp.Message{
Properties: &amqp.MessageProperties{
CorrelationID: messageID,
},
Data: ...,
}
AMQP 0.9.1 中的用法
要使用 Direct Reply-To,请求者必须
- 以无确认 (no-ack) 模式从伪队列
amq.rabbitmq.reply-to进行消费。无需预先声明此“队列” (尽管客户端可以)。 - 将请求消息的
reply-to设置为amq.rabbitmq.reply-to。
转发请求时,RabbitMQ 会透明地将 reply-to 重写为 amq.rabbitmq.reply-to.<opaque-suffix>,其中 <opaque-suffix> 对客户端没有意义。然后,响应者使用该值作为路由键将回复发布到默认交换机 ("")。
如果响应者将执行昂贵的操作,它可以检查客户端是否已断开连接,方法是在一个一次性通道上被动声明生成的回复队列名称。即使 passive=false,也无法创建它;声明要么成功 (0 条就绪消息,1 个消费者),要么失败。
AMQP 0.9.1 的注意事项和限制
- 请求者必须在自动确认模式下进行消费。如果请求者断开连接或拒绝回复,则没有队列可以返回回复。
- 请求者必须使用同一连接和通道来从
amq.rabbitmq.reply-to进行消费以及发布请求。 - 通过 Direct Reply-To 发送的回复不是容错的;如果发布请求的客户端断开连接,它们将被丢弃。假定请求者会重新连接并重新提交请求。
amq.rabbitmq.reply-to这个名称在basic.consume和reply-to属性中被用作队列;然而它不是。它不能被删除,也不会出现在管理插件或rabbitmqctl list_queues中。- 如果响应者使用
mandatory标志发布消息,amq.rabbitmq.reply-to.*将被视为用于路由的队列。在路由时不会检查请求者是否仍然存在。换句话说,仅路由到此名称的消息被认为是“已路由”,RabbitMQ 不会发送basic.return。 - 请求者每个通道最多可以创建一个 Direct Reply-To 消费者 (
basic.consume)。
示例:AMQP 0.9.1
- Erlang
%% 1. Requester consumes from pseudo-queue in no-ack mode.
amqp_channel:subscribe(RequesterChan,
#'basic.consume'{queue = <<"amq.rabbitmq.reply-to">>,
no_ack = true},
self()),
CTagRequester = receive #'basic.consume_ok'{consumer_tag = CTag} -> CTag
end,
%% 2. Requester sends the request.
amqp_channel:cast(
RequesterChan,
#'basic.publish'{routing_key = RequestQueue},
#amqp_msg{props = #'P_basic'{reply_to = <<"amq.rabbitmq.reply-to">>,
message_id = RpcId},
payload = RequestPayload}),
%% 3. Responder receives the request.
{ReplyTo, RpcId} =
receive {#'basic.deliver'{consumer_tag = CTagResponder},
#amqp_msg{payload = RequestPayload,
props = #'P_basic'{reply_to = ReplyTo0,
message_id = RpcId0}}} ->
{ReplyTo0, RpcId0}
end,
%% 4. Responder replies.
amqp_channel:cast(
ResponderChan,
#'basic.publish'{routing_key = ReplyTo},
#amqp_msg{props = #'P_basic'{correlation_id = RpcId},
payload = ReplyPayload}),
%% 5. Requester receives the reply
receive {#'basic.deliver'{consumer_tag = CTagRequester},
#amqp_msg{payload = ReplyPayload,
props = #'P_basic'{correlation_id = RpcId}}} ->
%% process reply here...
ok
end.