RabbitMQ 教程 - 远程过程调用 (RPC)
远程过程调用 (RPC)
(使用 .NET 客户端)
先决条件
本教程假设 RabbitMQ 已安装并在 localhost
上的标准端口 (5672) 上运行。如果您使用不同的主机、端口或凭据,则连接设置需要进行调整。
获取帮助
如果您在学习本教程时遇到问题,可以通过 GitHub 讨论区 或 RabbitMQ 社区 Discord 联系我们。
在.NET 第二个教程中,我们学习了如何使用工作队列在多个工作进程之间分配耗时任务。
但是,如果我们需要在远程计算机上运行一个函数并等待结果呢?好吧,这是一个不同的故事。这种模式通常称为远程过程调用或RPC。
在本教程中,我们将使用 RabbitMQ 来构建一个 RPC 系统:一个客户端和一个可扩展的 RPC 服务器。由于我们没有任何值得分配的耗时任务,因此我们将创建一个返回斐波那契数的虚拟 RPC 服务。
客户端接口
为了说明如何使用 RPC 服务,我们将创建一个简单的客户端类。它将公开一个名为 CallAsync
的方法,该方法发送 RPC 请求并阻塞,直到收到答案。
using var rpcClient = new RpcClient();
Console.WriteLine(" [x] Requesting fib({0})", n);
var response = await rpcClient.CallAsync(n);
Console.WriteLine(" [.] Got '{0}'", response);
关于 RPC 的说明
尽管 RPC 在计算中是一种非常常见的模式,但它经常受到批评。问题出现在程序员不知道函数调用是本地的还是缓慢的 RPC 时。这种混淆会导致系统不可预测,并增加调试的复杂性。滥用 RPC 不会简化软件,反而会导致难以维护的意大利面条代码。
考虑到这一点,请考虑以下建议
- 确保很明显哪个函数调用是本地的,哪个是远程的。
- 记录您的系统。使组件之间的依赖关系清晰明了。
- 处理错误情况。当 RPC 服务器长时间宕机时,客户端应该如何反应?
如有疑问,请避免使用 RPC。如果可以,您应该使用异步管道 - 与 RPC 阻塞相反,结果被异步推送到下一个计算阶段。
回调队列
通常,通过 RabbitMQ 进行 RPC 很容易。客户端发送请求消息,服务器回复响应消息。为了接收响应,我们需要在请求中发送“回调”队列地址。
var props = channel.CreateBasicProperties();
props.ReplyTo = replyQueueName;
var messageBytes = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: string.Empty,
routingKey: "rpc_queue",
basicProperties: props,
body: messageBytes);
// ... then code to read a response message from the callback_queue ...
消息属性
AMQP 0-9-1 协议预定义了一组 14 个与消息一起使用的属性。大多数属性很少使用,但以下除外
Persistent
:将消息标记为持久性 (值为true
) 或瞬态 (任何其他值)。请查看.NET 第二个教程。DeliveryMode
:熟悉该协议的人可以选择使用此属性而不是Persistent
。它们控制相同的事物。ContentType
:用于描述编码的 MIME 类型。例如,对于常用的 JSON 编码,最好将此属性设置为:application/json
。ReplyTo
:通常用于命名回调队列。CorrelationId
:用于将 RPC 响应与请求相关联。
关联 ID
在上面介绍的方法中,我们建议为每个 RPC 请求创建一个回调队列。这效率很低,但幸运的是,有一种更好的方法 - 让我们为每个客户端创建一个回调队列。
这引发了一个新问题,在该队列中收到响应后,不清楚响应属于哪个请求。这时 CorrelationId
属性就派上用场了。我们将它设置为每个请求的唯一值。稍后,当我们在回调队列中收到消息时,我们将查看此属性,并据此能够将响应与请求匹配。如果我们看到一个未知的 CorrelationId
值,我们可以安全地丢弃该消息 - 它不属于我们的请求。
您可能会问,为什么我们应该忽略回调队列中的未知消息,而不是报错?这是因为服务器端可能存在竞争条件。虽然不太可能,但 RPC 服务器在发送答案给我们后但在发送请求确认消息之前死亡是可能的。如果发生这种情况,重新启动的 RPC 服务器将再次处理该请求。因此,客户端必须优雅地处理重复响应,并且 RPC 应该在理想情况下是幂等的。
总结
我们的 RPC 将按以下方式工作
- 客户端启动时,它会创建一个匿名的排他回调队列。
- 对于 RPC 请求,客户端会发送一条消息,其中包含两个属性:
ReplyTo
,设置为回调队列,以及CorrelationId
,设置为每个请求的唯一值。 - 请求被发送到
rpc_queue
队列。 - RPC 工作进程(又名:服务器)正在等待该队列上的请求。当出现请求时,它会执行任务并将带有结果的消息发送回客户端,使用
ReplyTo
属性中的队列。 - 客户端等待回调队列上的数据。当出现消息时,它会检查
CorrelationId
属性。如果它与请求中的值匹配,则它会将响应返回给应用程序。
综合示例
斐波那契任务
static int Fib(int n)
{
if (n is 0 or 1)
{
return n;
}
return Fib(n - 1) + Fib(n - 2);
}
我们声明了我们的斐波那契函数。它只假设有效的正整数输入。(不要期望它对大数字有效,它可能是最慢的递归实现。)
我们的 RPC 服务器代码 RPCServer.cs 如下所示
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
var factory = new ConnectionFactory { HostName = "localhost" };
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
channel.QueueDeclare(queue: "rpc_queue",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
var consumer = new EventingBasicConsumer(channel);
channel.BasicConsume(queue: "rpc_queue",
autoAck: false,
consumer: consumer);
Console.WriteLine(" [x] Awaiting RPC requests");
consumer.Received += (model, ea) =>
{
string response = string.Empty;
var body = ea.Body.ToArray();
var props = ea.BasicProperties;
var replyProps = channel.CreateBasicProperties();
replyProps.CorrelationId = props.CorrelationId;
try
{
var message = Encoding.UTF8.GetString(body);
int n = int.Parse(message);
Console.WriteLine($" [.] Fib({message})");
response = Fib(n).ToString();
}
catch (Exception e)
{
Console.WriteLine($" [.] {e.Message}");
response = string.Empty;
}
finally
{
var responseBytes = Encoding.UTF8.GetBytes(response);
channel.BasicPublish(exchange: string.Empty,
routingKey: props.ReplyTo,
basicProperties: replyProps,
body: responseBytes);
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
}
};
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
// Assumes only valid positive integer input.
// Don't expect this one to work for big numbers, and it's probably the slowest recursive implementation possible.
static int Fib(int n)
{
if (n is 0 or 1)
{
return n;
}
return Fib(n - 1) + Fib(n - 2);
}
服务器代码非常简单
- 像往常一样,我们首先建立连接、通道并声明队列。
- 我们可能希望运行多个服务器进程。为了在多个服务器上平均分配负载,我们需要在
channel.BasicQos
中设置prefetchCount
设置。 - 我们使用
BasicConsume
访问队列。然后我们注册一个传递处理程序,在其中我们执行工作并将响应发送回去。
我们的 RPC 客户端代码 RPCClient.cs
using System.Collections.Concurrent;
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
public class RpcClient : IDisposable
{
private const string QUEUE_NAME = "rpc_queue";
private readonly IConnection connection;
private readonly IModel channel;
private readonly string replyQueueName;
private readonly ConcurrentDictionary<string, TaskCompletionSource<string>> callbackMapper = new();
public RpcClient()
{
var factory = new ConnectionFactory { HostName = "localhost" };
connection = factory.CreateConnection();
channel = connection.CreateModel();
// declare a server-named queue
replyQueueName = channel.QueueDeclare().QueueName;
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
if (!callbackMapper.TryRemove(ea.BasicProperties.CorrelationId, out var tcs))
return;
var body = ea.Body.ToArray();
var response = Encoding.UTF8.GetString(body);
tcs.TrySetResult(response);
};
channel.BasicConsume(consumer: consumer,
queue: replyQueueName,
autoAck: true);
}
public Task<string> CallAsync(string message, CancellationToken cancellationToken = default)
{
IBasicProperties props = channel.CreateBasicProperties();
var correlationId = Guid.NewGuid().ToString();
props.CorrelationId = correlationId;
props.ReplyTo = replyQueueName;
var messageBytes = Encoding.UTF8.GetBytes(message);
var tcs = new TaskCompletionSource<string>();
callbackMapper.TryAdd(correlationId, tcs);
channel.BasicPublish(exchange: string.Empty,
routingKey: QUEUE_NAME,
basicProperties: props,
body: messageBytes);
cancellationToken.Register(() => callbackMapper.TryRemove(correlationId, out _));
return tcs.Task;
}
public void Dispose()
{
// closing a connection will also close all channels on it
connection.Close();
}
}
public class Rpc
{
public static async Task Main(string[] args)
{
Console.WriteLine("RPC Client");
string n = args.Length > 0 ? args[0] : "30";
await InvokeAsync(n);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
private static async Task InvokeAsync(string n)
{
using var rpcClient = new RpcClient();
Console.WriteLine(" [x] Requesting fib({0})", n);
var response = await rpcClient.CallAsync(n);
Console.WriteLine(" [.] Got '{0}'", response);
}
}
客户端代码稍微复杂一些
- 我们建立一个连接和通道,并为回复声明一个独占的“回调”队列。
- 我们订阅“回调”队列,以便我们可以接收 RPC 响应。
- 我们的
Call
方法执行实际的 RPC 请求。 - 在这里,我们首先生成一个唯一的
CorrelationId
数字并将其保存以在响应到达时识别相应的响应。 - 接下来,我们发布请求消息,其中包含两个属性:
ReplyTo
和CorrelationId
。 - 此时,我们可以坐下来等待合适的响应到达。
- 对于每个响应消息,客户端都会检查
CorrelationId
是否是我们正在寻找的那个。如果是,则保存响应。 - 最后,我们将响应返回给用户。
发出客户端请求
using var rpcClient = new RpcClient();
Console.WriteLine(" [x] Requesting fib({0})", n);
var response = await rpcClient.CallAsync(n);
Console.WriteLine(" [.] Got '{0}'", response);
现在是查看我们的完整示例源代码(包括基本异常处理)的好时机,请参见 RPCClient.cs 和 RPCServer.cs。
像往常一样设置(请参阅.NET 第一个教程)
我们的 RPC 服务现在已准备就绪。我们可以启动服务器
cd RPCServer
dotnet run
# => [x] Awaiting RPC requests
要请求斐波那契数,请运行客户端
cd RPCClient
dotnet run
# => [x] Requesting fib(30)
此处提供的设计不是 RPC 服务的唯一可能实现,但它具有一些重要的优势
- 如果 RPC 服务器太慢,您可以通过运行另一个服务器来扩展它。尝试在新控制台中运行第二个
RPCServer
。 - 在客户端,RPC 只需要发送和接收一条消息。不需要像
QueueDeclare
这样的同步调用。因此,RPC 客户端只需要一个网络往返即可完成单个 RPC 请求。
我们的代码仍然非常简单,没有尝试解决更复杂(但很重要)的问题,例如
- 如果没有任何服务器正在运行,客户端应该如何反应?
- 客户端是否应该为 RPC 设置某种超时?
- 如果服务器出现故障并引发异常,是否应将其转发到客户端?
- 在处理之前保护免受无效的传入消息(例如检查边界、类型)。
如果您想尝试,可能会发现管理 UI对于查看队列很有用。