直接回复到
概述
“直接回复” (Direct Reply-To) 功能允许您实现 RPC(请求/回复)模式,如 教程 6 中所述,且无需创建专用的回复 队列。
动机
RPC(请求/回复)是 RabbitMQ 等代理中常见的模式。教程 6 展示了几种客户端实现。通常,请求方(RPC 客户端)发送请求,这些请求被路由到长期存在的已知请求队列。响应方(RPC 服务器)从该队列中消费消息,并使用请求消息的 reply-to 属性中提供的队列名称发送回复。
请求方的回复队列从何而来?请求方可以为每一对请求-回复声明一个一次性队列,但这样做效率很低:即使是非复制队列,与接收回复的成本相比,创建和删除队列的成本也相对较高。在集群中,这种开销更高,因为所有节点必须就创建、类型、复制参数和其他元数据达成一致。
更好的方法是为每个请求方创建一个单独的回复队列,并在多个请求之间复用它。该队列的 属性 取决于具体用例。
- 排他队列在单个客户端消费回复且队列在断开连接时删除的情况下很常见。
- 非排他的长期存在队列适用于长时间运行的任务,因为在这种情况下,回复需要能够在短暂的客户端断开连接后存活。
直接回复 (Direct Reply-To) 是 RabbitMQ 特有的替代方案,它完全 消除了回复队列。这意味着:
- 无需将队列元数据写入 元数据存储 (Khepri)。
- 没有队列缓冲区或持久化回复消息。
- 不存在用于回复队列的独立 Erlang 进程。
主要优势:
- 降低对 元数据存储 的负载(无需插入/删除队列元数据)。
- 降低对 管理 HTTP API 的负载:在 管理 UI 中需要列出的队列更少,且需要发出的指标更少。
- 代理上的 Erlang 进程更少。
使用“直接回复”时,在代理端,响应方的 AMQP 1.0 会话 (session) 或 AMQP 0.9.1 通道 (channel) 进程会将回复直接传递给请求方的会话/通道进程,而无需经过实际的队列。
“直接”仍然意味着通过代理;两个客户端应用程序之间不存在点对点的网络连接。
何时使用直接回复
主要用例是扩展性:大量(数万个)执行请求/回复的客户端。
虽然客户端应优先使用长连接,但“直接回复”也适用于 高连接流失 (connection churn) 的场景,即客户端为了单个 RPC 进行连接,之后立即断开。避免创建/删除队列可减少开销和延迟。
由于“直接回复”对回复具有 最多一次 (at-most-once) 的传递语义,请仅在丢失回复是可以接受的情况下使用它。例如,如果未在超时时间内收到回复,请求方预计会重新发送请求。
何时避免使用直接回复
如果出现以下任何情况,请避免使用直接回复:
- 您需要回复的 至少一次 (at-least-once) 保证(即丢失回复是不可接受的)。
- 回复必须由代理持久化缓存。
- 您需要对同一请求方有高吞吐量(例如每秒数百条以上消息)。队列的存在是为了在消费者无法跟上时提供缓冲。
对于具有长连接和多次 RPC 的工作负载,直接回复的优势相对于使用 经典队列 较小。现代 RabbitMQ 版本针对低延迟和低资源使用优化了经典队列,因此在这些场景中它们可能同样高效。使用明确声明的经典队列进行常规请求-回复同样有效,且对于长期运行的任务可能更可取。
代理实现细节
在内部,RabbitMQ 使用 rabbit_volatile_queue 队列类型实现直接回复。“易失性 (Volatile)”描述了其语义:非持久、零缓冲、最多一次、可能会丢失,且不存储在元数据存储中。
您仅会在少数地方看到 rabbit_volatile_queue。其实例不会出现在管理 UI 或 rabbitmqctl list_queues 中。
一个地方是在 Prometheus 指标中,例如:
rabbitmq_global_messages_delivered_total{protocol="amqp10",queue_type="rabbit_volatile_queue"}传递给 AMQP 1.0 请求方的消息(“直接回复”)数量。rabbitmq_global_messages_delivered_total{protocol="amqp091",queue_type="rabbit_volatile_queue"}传递给 AMQP 0.9.1 请求方的消息(“直接回复”)数量。rabbitmq_global_messages_dead_lettered_maxlen_total{queue_type="rabbit_volatile_queue",dead_letter_strategy="disabled"}被 RabbitMQ 丢弃的消息(“直接回复”)数量。
用法
直接回复支持 AMQP 1.0 和 AMQP 0.9.1。它也支持跨协议工作(例如 AMQP 1.0 请求方与 AMQP 0.9.1 响应方,反之亦然)。
在 AMQP 1.0 中的使用
请求方首先附加一个链接以接收回复消息。该 attach(带有 source)必须设置特定的字段。如果您的 AMQP 1.0 客户端库 支持直接回复,它将为您设置这些字段(参见 示例)。否则:
- 将
snd-settle-mode设置为settled,因为 RabbitMQ 会发送所有已结算的回复。如果请求方断开连接或拒绝回复,则没有队列可以退回该回复。 - 将
address留空;RabbitMQ 将生成该地址。 - 将
durable留空或设置为none;RabbitMQ 元数据存储中不会保存任何状态。 - 将
expiry-policy设置为link-detach。 - 将
timeout留空或设置为0。 - 在
capabilities中包含rabbitmq:volatile-queue。
RabbitMQ 会在 attach 响应的 address 字段中返回一个代理生成的伪队列地址。它看起来像 /queues/amq.rabbitmq.reply-to.<opaque-suffix>,其中 <opaque-suffix> 对客户端没有意义。
在发送第一个请求之前,请求方必须向该伪队列授予链接信用 (link credit)。
对于每个请求,设置以下消息 属性:
message-id:一个全局唯一值。(响应方会将回复的correlation-id设置为该值。)reply-to:在attach响应中收到的地址。
响应方读取请求的 reply-to 并通过以下两个选项之一将回复发送到该地址:
- 按照 目标地址 v2 中的描述,将发送链接附加到匿名终点(空目标地址),并在消息的
to属性中设置回复地址。这在回复给多个不同请求方(无需每个请求方都有链接)时很有用。 - 直接创建指向所提供地址的发送链接。在这种情况下,RabbitMQ 会检查请求方是否仍已连接;如果没有,RabbitMQ 会拒绝该链接。
如果响应方需要执行昂贵的工作,它可以主动通过 AMQP 发送 HTTP GET 来检查请求方是否仍然存在。200 状态码表示请求方仍已连接(参见 示例)。
AMQP 1.0 注意事项和局限性
- 请求方必须接收已结算 (settled) 的回复。如果请求方断开连接或拒绝回复,没有队列可以退回该回复。
- 请求方只能在附加其接收链接的同一连接和会话上接收回复。不支持在另一个会话上从此伪队列进行消费。
- 响应方应该发送已结算的回复。如果它发送未结算的回复,RabbitMQ 会立即以 accepted 结果结算它们,即使该回复随后可能会被丢弃。
- 通过直接回复发送的回复不具备容错性。由于此伪队列不进行缓冲,RabbitMQ 会在以下情况下丢弃回复:
- AMQP 1.0 请求方的链接信用用尽。授予足够的链接信用是请求方的责任。
- AMQP 1.0 会话流量控制 不允许传递。请求方必须保持其
incoming-window足够大。 - 代理的 AMQP 1.0 写入进程无法足够快地向请求方发送回复。
如果消息丢失是不可接受的,请使用 经典队列 代替直接回复。
示例:AMQP 1.0
- Java
- C#
- Erlang
- Go
String requestQueue = "request-queue";
// create the responder
Responder responder = connection.responderBuilder()
.requestQueue(requestQueue)
.handler((ctx, req) -> {
// check whether the requester is still connected (optional)
if (ctx.isRequesterAlive(req)) {
String in = new String(req.body(), UTF_8);
String out = "*** " + in + " ***";
return ctx.message(out.getBytes(UTF_8));
} else {
return null;
}
}).build();
// create the requester, it uses direct reply-to by default
Requester requester = connection.requesterBuilder()
.requestAddress().queue(requestQueue)
.requester()
.build();
// create the request message
Message request = requester.message("hello".getBytes(UTF_8));
// send the request
CompletableFuture<Message> responseFuture = requester.publish(request);
// wait for the response
Message response = responseFuture.get(10, TimeUnit.SECONDS);
完整示例可在 RabbitMQ Amqp1.0 .NET Client 仓库 中找到。
const string requestQueue = "amqp10.net-request-queue";
// create the responder
IResponder responder = await connection.ResponderBuilder().
RequestQueue(requestQueue).Handler(
(context, message) =>
{
// "message" parameter is the incoming message
Trace.WriteLine(TraceLevel.Information, $"[Responder] Message received: {message.BodyAsString()} ");
// create a reply message
IMessage reply = context.Message("reply message");
return Task.FromResult(reply);
}
).BuildAsync();
// create the requester, it uses direct reply-to by default
IRequester requester = await connection.RequesterBuilder().RequestAddress().
Queue(requestQueue).Requester().BuildAsync();
IMessage response = await requester.PublishAsync(
new AmqpMessage("Hello"));
Trace.WriteLine(TraceLevel.Information, $"[Requester] Response received: {response.BodyAsString()}");
%% 1. Requester attaches its receiving link.
OpnConfRequester = OpnConfRequester0#{notify_with_performative => true},
{ok, ConnRequester} = amqp10_client:open_connection(OpnConfRequester),
{ok, SessionRequester} = amqp10_client:begin_session_sync(ConnRequester),
Source = #{address => undefined,
durable => none,
expiry_policy => <<"link-detach">>,
dynamic => true,
capabilities => [<<"rabbitmq:volatile-queue">>]},
AttachArgs = #{name => <<"receiver requester">>,
role => {receiver, Source, self()},
snd_settle_mode => settled,
rcv_settle_mode => first},
{ok, ReceiverRequester} = amqp10_client:attach_link(SessionRequester, AttachArgs),
%% Requester learns the broker-generated reply address.
Addr = receive {amqp10_event, {link, ReceiverRequester, {attached, Attach}}} ->
#'v1_0.attach'{
source = #'v1_0.source'{
address = {utf8, Addr0}}} = Attach,
Addr0
end,
%% Requester must grant link credit before sending the first request.
ok = amqp10_client:flow_link_credit(ReceiverRequester, 1000, 500),
%% 2. Requester sends the request.
ok = amqp10_client:send_msg(
SenderRequester,
amqp10_msg:set_properties(
#{message_id => RpcId,
reply_to => Addr},
amqp10_msg:new(DeliveryTag, RequestPayload, true))),
%% 3. Responder receives the request and reads relevant properties.
...
#{message_id := RpcId,
reply_to := ReplyToAddr} = amqp10_msg:properties(RequestMsg),
%% Optionally, the responder checks whether the requester is still connected.
{ok, #{queue := ReplyQueue}} = rabbitmq_amqp_address:to_map(ReplyToAddr),
case rabbitmq_amqp_client:get_queue(LinkPairResponder, ReplyQueue) of
{ok, #{}} ->
%% requester is still there
ok;
_ ->
throw(requester_absent)
end,
%% 4. Responder replies (attached to the anonymous terminus).
ok = amqp10_client:send_msg(
SenderResponder,
amqp10_msg:set_properties(
#{to => ReplyToAddr,
correlation_id => RpcId},
amqp10_msg:new(Tag, ReplyPayload, true))),
%% 5. Requester receives the reply.
receive {amqp10_msg, ReceiverRequester, ReplyMsg} ->
%% process reply here...
ok
end.
完整示例可在 客户端仓库 中找到。
使用 Azure amqp 客户端 的完整示例可在 教程仓库 中找到。
const requestQueue = "go-amqp1.0-request-queue"
// RPC client creates a responder
responder, err := conn.NewResponder(context.TODO(), rabbitmqamqp.ResponderOptions{
RequestQueue: requestQueue,
Handler: func(ctx context.Context, request *amqp.Message) (*amqp.Message, error) {
return request, nil
},
})
requester, err := clientConn.NewRequester(context.TODO(), &rabbitmqamqp.RequesterOptions{
RequestQueueName: requestQueue,
// the option to enable the DirectReplyTo feature
DirectReplyTo: true,
})
resp, err := requester.Publish(context.TODO(), amqp.NewMessage([]byte("hello")))
m, ok := <-resp
if !ok {
fmt.Println("timed out waiting for response")
continue
}
fmt.Printf("response: %s\n", m.GetData())
在 AMQP 0.9.1 中的使用
要使用直接回复,请求方必须:
- 在 no-ack 模式下从伪队列
amq.rabbitmq.reply-to中消费。无需先声明此“队列”(尽管客户端可以这样做)。 - 将请求消息的
reply-to设置为amq.rabbitmq.reply-to。
转发请求时,RabbitMQ 会透明地将 reply-to 重写为 amq.rabbitmq.reply-to.<opaque-suffix>,其中 <opaque-suffix> 对客户端没有意义。随后响应方使用该值作为路由键,将回复发布到默认交换机 ("")。
如果响应方需要执行昂贵的工作,它可以通过在一次性通道上被动声明生成的回复队列名称来检查客户端是否已离开。即使 passive=false,也无法创建它;声明操作要么成功(0 条就绪消息,1 个消费者),要么失败。
AMQP 0.9.1 注意事项和局限性
- 请求方必须以 自动确认模式 进行消费。如果请求方断开连接或拒绝回复,没有队列可以退回该回复。
- 请求方必须使用相同的连接和 通道 来消费
amq.rabbitmq.reply-to并发布请求。 - 通过直接回复发送的回复不具备容错性;如果发布请求的客户端断开连接,它们会被丢弃。请求方预计会重新连接并重新提交请求。
- 名称
amq.rabbitmq.reply-to在basic.consume和reply-to属性中被使用,就像它是一个队列一样;但它并不是。它无法被删除,也不会出现在管理插件或rabbitmqctl list_queues中。 - 如果响应方使用
mandatory标志发布,amq.rabbitmq.reply-to.*将被视为用于路由的队列。路由时不会检查请求方是否仍然存在。换句话说,仅路由到此名称的消息被视为“已路由”,RabbitMQ 不会发送basic.return。 - 请求方每个通道最多只能创建一个直接回复消费者 (
basic.consume)。
示例:AMQP 0.9.1
- Erlang
%% 1. Requester consumes from pseudo-queue in no-ack mode.
amqp_channel:subscribe(RequesterChan,
#'basic.consume'{queue = <<"amq.rabbitmq.reply-to">>,
no_ack = true},
self()),
CTagRequester = receive #'basic.consume_ok'{consumer_tag = CTag} -> CTag
end,
%% 2. Requester sends the request.
amqp_channel:cast(
RequesterChan,
#'basic.publish'{routing_key = RequestQueue},
#amqp_msg{props = #'P_basic'{reply_to = <<"amq.rabbitmq.reply-to">>,
message_id = RpcId},
payload = RequestPayload}),
%% 3. Responder receives the request.
{ReplyTo, RpcId} =
receive {#'basic.deliver'{consumer_tag = CTagResponder},
#amqp_msg{payload = RequestPayload,
props = #'P_basic'{reply_to = ReplyTo0,
message_id = RpcId0}}} ->
{ReplyTo0, RpcId0}
end,
%% 4. Responder replies.
amqp_channel:cast(
ResponderChan,
#'basic.publish'{routing_key = ReplyTo},
#amqp_msg{props = #'P_basic'{correlation_id = RpcId},
payload = ReplyPayload}),
%% 5. Requester receives the reply
receive {#'basic.deliver'{consumer_tag = CTagRequester},
#amqp_msg{payload = ReplyPayload,
props = #'P_basic'{correlation_id = RpcId}}} ->
%% process reply here...
ok
end.