跳至主要内容

RabbitMQ 教程 - 远程过程调用 (RPC)

远程过程调用 (RPC)

(使用 Pika Python 客户端)

信息

先决条件

本教程假设 RabbitMQ 已安装并在 localhost 上的标准端口 (5672) 上运行。如果您使用不同的主机、端口或凭据,则连接设置需要进行调整。

获取帮助

如果您在学习本教程时遇到问题,可以通过GitHub 讨论RabbitMQ 社区 Discord联系我们。

先决条件

与其他 Python 教程一样,我们将使用Pika RabbitMQ 客户端版本 1.0.0

本教程的重点

第二个教程中,我们学习了如何使用工作队列在多个工作进程之间分配耗时的任务。

但是,如果我们需要在远程计算机上运行一个函数并等待结果呢?好吧,这是一个不同的故事。这种模式通常称为远程过程调用RPC

在本教程中,我们将使用 RabbitMQ 来构建一个 RPC 系统:一个客户端和一个可扩展的 RPC 服务器。由于我们没有任何值得分配的耗时任务,因此我们将创建一个返回斐波那契数的虚拟 RPC 服务。

客户端接口

为了说明如何使用 RPC 服务,我们将创建一个简单的客户端类。它将公开一个名为 call 的方法,该方法发送 RPC 请求并阻塞直到收到答案。

fibonacci_rpc = FibonacciRpcClient()
result = fibonacci_rpc.call(4)
print(f"fib(4) is {result}")

关于 RPC 的说明

尽管 RPC 在计算中是一种相当常见的模式,但它也经常受到批评。问题出现在程序员没有意识到函数调用是本地调用还是缓慢的 RPC 调用时。这样的混淆会导致系统不可预测,并增加调试的复杂性。滥用 RPC 不会简化软件,反而会导致难以维护的意大利面条式代码。

请记住以下建议

  • 确保清楚哪个函数调用是本地调用,哪个是远程调用。
  • 记录您的系统。使组件之间的依赖关系清晰明了。
  • 处理错误情况。当 RPC 服务器长时间停机时,客户端应该如何反应?

如有疑问,请避免使用 RPC。如果可以,您应该使用异步管道 - 而不是类似 RPC 的阻塞,结果会异步推送到下一个计算阶段。

回调队列

通常,通过 RabbitMQ 进行 RPC 很容易。客户端发送请求消息,服务器回复响应消息。为了接收响应,客户端需要在请求中发送“回调”队列地址。让我们试试吧

result = channel.queue_declare(queue='', exclusive=True)
callback_queue = result.method.queue

channel.basic_publish(exchange='',
routing_key='rpc_queue',
properties=pika.BasicProperties(
reply_to = callback_queue,
),
body=request)

# ... and some code to read a response message from the callback_queue ...

消息属性

AMQP 0-9-1 协议预定义了一组 14 个与消息一起使用的属性。大多数属性很少使用,但以下除外

  • delivery_mode:将消息标记为持久性(值为 2)或瞬态(任何其他值)。您可能还记得第二个教程中的此属性。
  • content_type:用于描述编码的 mime 类型。例如,对于常用的 JSON 编码,最好将此属性设置为:application/json
  • reply_to:通常用于命名回调队列。
  • correlation_id:有助于将 RPC 响应与请求相关联。

相关 ID

在上面介绍的方法中,我们建议为每个 RPC 请求创建一个回调队列。这效率很低,但幸运的是,有一种更好的方法 - 让我们为每个客户端创建一个回调队列。

这引发了一个新问题,在该队列中收到响应后,不清楚该响应属于哪个请求。这时就会使用 correlation_id 属性。我们将它设置为每个请求的唯一值。稍后,当我们在回调队列中收到消息时,我们将查看此属性,并根据此属性,我们将能够将响应与请求匹配。如果我们看到未知的 correlation_id 值,我们可以安全地丢弃该消息 - 它不属于我们的请求。

您可能会问,为什么我们应该忽略回调队列中的未知消息,而不是引发错误?这是由于服务器端可能存在竞争条件。虽然不太可能,但 RPC 服务器可能在向我们发送答案后立即死亡,但在发送请求的确认消息之前死亡。如果发生这种情况,重新启动的 RPC 服务器将再次处理该请求。这就是为什么在客户端上我们必须优雅地处理重复响应,并且 RPC 理想情况下应该是幂等的。

总结

我们的 RPC 将按如下方式工作

  • 当客户端启动时,它会创建一个匿名的独占回调队列。
  • 对于 RPC 请求,客户端会发送一条消息,其中包含两个属性:reply_to,设置为回调队列,以及 correlation_id,设置为每个请求的唯一值。
  • 该请求被发送到 rpc_queue 队列。
  • RPC 工作进程(又名:服务器)正在等待该队列上的请求。当出现请求时,它会完成工作并将带有结果的消息发送回客户端,使用 reply_to 字段中的队列。
  • 客户端等待回调队列上的数据。当出现消息时,它会检查 correlation_id 属性。如果它与请求中的值匹配,它会将响应返回给应用程序。

整合在一起

rpc_server.py (源代码)

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))

channel = connection.channel()

channel.queue_declare(queue='rpc_queue')

def fib(n):
if n == 0:
return 0
elif n == 1:
return 1
else:
return fib(n - 1) + fib(n - 2)

def on_request(ch, method, props, body):
n = int(body)

print(f" [.] fib({n})")
response = fib(n)

ch.basic_publish(exchange='',
routing_key=props.reply_to,
properties=pika.BasicProperties(correlation_id = \
props.correlation_id),
body=str(response))
ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='rpc_queue', on_message_callback=on_request)

print(" [x] Awaiting RPC requests")
channel.start_consuming()

服务器代码相当简单

  • 像往常一样,我们首先建立连接并声明队列 rpc_queue
  • 我们声明了我们的斐波那契函数。它只假设有效的正整数输入。(不要期望它对大数起作用,它可能是最慢的递归实现。)
  • 我们为 basic_consume 声明了一个回调 on_request,这是 RPC 服务器的核心。当收到请求时,它就会被执行。它完成工作并将响应发送回去。
  • 我们可能希望运行多个服务器进程。为了在多个服务器之间平均分配负载,我们需要设置 prefetch_count 设置。

rpc_client.py (源代码)

#!/usr/bin/env python
import pika
import uuid


class FibonacciRpcClient(object):

def __init__(self):
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))

self.channel = self.connection.channel()

result = self.channel.queue_declare(queue='', exclusive=True)
self.callback_queue = result.method.queue

self.channel.basic_consume(
queue=self.callback_queue,
on_message_callback=self.on_response,
auto_ack=True)

self.response = None
self.corr_id = None

def on_response(self, ch, method, props, body):
if self.corr_id == props.correlation_id:
self.response = body

def call(self, n):
self.response = None
self.corr_id = str(uuid.uuid4())
self.channel.basic_publish(
exchange='',
routing_key='rpc_queue',
properties=pika.BasicProperties(
reply_to=self.callback_queue,
correlation_id=self.corr_id,
),
body=str(n))
while self.response is None:
self.connection.process_data_events(time_limit=None)
return int(self.response)


fibonacci_rpc = FibonacciRpcClient()

print(" [x] Requesting fib(30)")
response = fibonacci_rpc.call(30)
print(f" [.] Got {response}")

客户端代码稍微复杂一些

  • 我们建立连接、通道并声明一个用于回复的独占 callback_queue
  • 我们订阅了 callback_queue,以便我们可以接收 RPC 响应。
  • 在每个响应上执行的 on_response 回调执行一项非常简单的任务,对于每个响应消息,它都会检查 correlation_id 是否是我们正在寻找的那个。如果是,它会将响应保存在 self.response 中并中断使用循环。
  • 接下来,我们定义我们的主要 call 方法 - 它执行实际的 RPC 请求。
  • call 方法中,我们生成一个唯一的 correlation_id 数字并保存它 - on_response 回调函数将使用此值来捕获相应的响应。
  • 同样在 call 方法中,我们发布请求消息,并包含两个属性:reply_tocorrelation_id
  • 最后,我们等待正确的响应到达并将响应返回给用户。

我们的 RPC 服务现在已经准备就绪。我们可以启动服务器

python rpc_server.py
# => [x] Awaiting RPC requests

要请求斐波那契数,请运行客户端

python rpc_client.py
# => [x] Requesting fib(30)

所呈现的设计并不是 RPC 服务的唯一可能的实现,但它具有一些重要的优势

  • 如果 RPC 服务器太慢,您可以通过简单地运行另一个服务器来进行扩展。尝试在新控制台中运行第二个 rpc_server.py
  • 在客户端,RPC 只需要发送和接收一条消息。不需要像 queue_declare 这样的同步调用。因此,RPC 客户端只需要一个网络往返就可以完成单个 RPC 请求。

我们的代码仍然非常简单,并且没有尝试解决更复杂(但很重要)的问题,例如

  • 如果没有任何服务器运行,客户端应该如何反应?
  • 客户端是否应该对 RPC 设置某种超时?
  • 如果服务器发生故障并引发异常,是否应该将其转发给客户端?
  • 在处理之前,保护免受无效的传入消息(例如检查边界)。

如果您想尝试,可以使用 管理 UI 查看队列。

© 2024 RabbitMQ. All rights reserved.