跳到主要内容

RabbitMQ 教程 - 远程过程调用 (RPC)

远程过程调用 (RPC)

(使用 .NET 客户端)

信息

先决条件

本教程假设 RabbitMQ 已安装并在 localhost 上的标准端口 (5672) 上运行。 如果您使用不同的主机、端口或凭据,则需要调整连接设置。

在哪里获得帮助

如果您在学习本教程时遇到问题,可以通过 GitHub DiscussionsRabbitMQ 社区 Discord 联系我们。

第二个教程中,我们学习了如何使用工作队列在多个工作者之间分配耗时的任务。

但是,如果我们需要在远程计算机上运行一个函数并等待结果呢? 嗯,那是另一个故事了。 这种模式通常被称为远程过程调用RPC

在本教程中,我们将使用 RabbitMQ 构建一个 RPC 系统:一个客户端和一个可扩展的 RPC 服务器。 由于我们没有任何值得分配的耗时任务,我们将创建一个虚拟 RPC 服务,该服务返回斐波那契数。

客户端接口

为了说明如何使用 RPC 服务,我们将创建一个简单的客户端类。 它将公开一个名为 CallAsync 的方法,该方法发送 RPC 请求并等待答案

dotnet/RPCClient/RPCClient.cs
loading...

关于 RPC 的说明

尽管 RPC 在计算中是一种非常常见的模式,但它经常受到批评。 当程序员不知道函数调用是本地调用还是慢速 RPC 时,问题就出现了。 这样的混淆会导致系统不可预测,并增加不必要的调试复杂性。 误用的 RPC 不但不能简化软件,反而可能导致难以维护的意大利面条式代码。

考虑到这一点,请考虑以下建议

  • 确保清楚地知道哪个函数调用是本地调用,哪个是远程调用。
  • 记录您的系统。 明确组件之间的依赖关系。
  • 处理错误情况。 当 RPC 服务器长时间宕机时,客户端应如何反应?

如有疑问,请避免使用 RPC。 如果可以,则应使用异步管道 - 结果会异步推送到下一个计算阶段,而不是类似 RPC 的阻塞。

回调队列

通常,通过 RabbitMQ 进行 RPC 很简单。 客户端发送请求消息,服务器回复响应消息。 为了接收响应,我们需要在请求中发送“回调”队列地址

dotnet/RPCClient/RPCClient.cs
loading...

消息属性

AMQP 0-9-1 协议预定义了一组 14 个与消息一起发送的属性。 大多数属性很少使用,但以下属性除外

  • Persistent:将消息标记为持久性(值为 true)或瞬态(任何其他值)。 请参阅第二个教程
  • DeliveryMode:熟悉该协议的人可能会选择使用此属性而不是 Persistent。 它们控制的是同一件事。
  • ContentType:用于描述编码的 mime 类型。 例如,对于常用的 JSON 编码,最好将此属性设置为:application/json
  • ReplyTo:通常用于命名回调队列。
  • CorrelationId:用于将 RPC 响应与请求关联起来。

关联 ID

为每个 RPC 请求创建一个回调队列效率低下。 更好的方法是为每个客户端创建一个单独的回调队列。

这提出了一个新问题,即在队列中收到响应后,不清楚该响应属于哪个请求。 这时可以使用 CorrelationId 属性。 我们将为每个请求将其设置为唯一值。 稍后,当我们在回调队列中收到消息时,我们将查看此属性,并根据此属性将响应与请求匹配。 如果我们看到未知的 CorrelationId 值,我们可以安全地丢弃该消息 - 它不属于我们的请求。

您可能会问,为什么我们应该忽略回调队列中的未知消息,而不是因错误而失败? 这是由于服务器端存在竞争条件的可能性。 尽管不太可能,但 RPC 服务器可能会在我们收到答案后立即死机,但在发送请求的确认消息之前。 如果发生这种情况,重新启动的 RPC 服务器将再次处理该请求。 这就是为什么在客户端我们必须优雅地处理重复的响应,并且 RPC 理想情况下应该是幂等的。

总结

我们的 RPC 将像这样工作

  • 当客户端启动时,它会创建一个独占回调队列。
  • 对于 RPC 请求,客户端发送一条消息,其中包含两个属性:ReplyTo(设置为回调队列)和 CorrelationId(设置为每个请求的唯一值)。
  • 请求被发送到 rpc_queue 队列。
  • RPC 工作者(又名:服务器)正在该队列上等待请求。 当请求出现时,它会完成工作并将包含结果的消息发送回客户端,使用来自 ReplyTo 属性的队列。
  • 客户端在回调队列上等待数据。 当消息出现时,它会检查 CorrelationId 属性。 如果它与请求中的值匹配,则将响应返回给应用程序。

将它们放在一起

斐波那契任务

dotnet/RPCServer/RPCServer.cs
loading...

我们声明了斐波那契函数。 它仅接受有效的正整数输入。 (不要指望它能处理大数字,这可能是最慢的递归实现)。

我们的 RPC 服务器的代码如下所示

dotnet/RPCServer/RPCServer.cs
loading...

服务器代码相当简单明了

  • 像往常一样,我们首先建立连接、通道并声明队列。
  • 我们可能希望运行多个服务器进程。 为了在多个服务器之间平均分配负载,我们需要在 channel.BasicQosAsync 中设置 prefetchCount 设置。
  • 我们使用 BasicConsumeAsync 来访问队列。 然后我们注册一个传递处理程序,在其中完成工作并将响应发送回去。

我们的 RPC 客户端的代码

dotnet/RPCClient/RPCClient.cs
loading...

客户端代码稍微复杂一些

  • 我们建立连接和通道,并声明一个独占的“回调”队列用于回复。
  • 我们订阅“回调”队列,以便我们可以接收 RPC 响应。
  • 我们的 CallAsync 方法发出实际的 RPC 请求。
  • 在这里,我们首先生成唯一的 CorrelationId 数字并保存它,以便在响应到达时识别合适的响应。
  • 接下来,我们发布请求消息,其中包含两个属性:ReplyToCorrelationId
  • 此时,我们可以坐下来等待,直到合适的响应到达。
  • 对于每个响应消息,客户端都会检查 CorrelationId 是否是我们正在寻找的那个。 如果是,则保存响应。
  • 最后,我们将响应返回给用户。

发出客户端请求

dotnet/RPCClient/RPCClient.cs
loading...

像往常一样设置(请参阅教程一

我们的 RPC 服务现在已准备就绪。 我们可以启动服务器

cd RPCServer
dotnet run
# => [x] Awaiting RPC requests

要请求斐波那契数,请运行客户端

cd RPCClient
dotnet run
# => [x] Requesting fib(30)

此处介绍的设计不是 RPC 服务的唯一可能实现,但它具有一些重要的优势

  • 如果 RPC 服务器太慢,您可以只需运行另一个服务器来扩展。 尝试在新控制台中运行第二个 RPCServer
  • 在客户端,RPC 只需要发送和接收一条消息。 不需要像 QueueDeclareAsync 这样的同步调用。 因此,RPC 客户端对于单个 RPC 请求只需要一次网络往返。

我们的代码仍然非常简单,并没有尝试解决更复杂(但重要)的问题,例如

  • 如果没有服务器正在运行,客户端应该如何反应?
  • 客户端是否应该为 RPC 设置某种超时?
  • 如果服务器发生故障并引发异常,是否应该将其转发给客户端?
  • 在处理之前防止无效的传入消息(例如,检查边界、类型)。

如果您想进行实验,您可能会发现 管理 UI 对于查看队列很有用。

© . All rights reserved.