跳至主要内容

Puka - 重思 AMQP 客户端

·阅读时长:5 分钟
Marek Majkowski

我从根本上不同意我们当前 AMQP 客户端库所公开的 API。

它们之所以不完美是有原因的:从一开始,我们就有意避免在 API 中进行创新。我们的客户端库的目的是公开通用的 AMQP,而不是任何一种消息传递视图。但在我看来,试图将 AMQP 直接映射到客户端库 API 是错误的,会导致过度复杂化和难以使用的抽象。

没有共同点:盲目遵循 AMQP 模型的客户端库将很复杂;易于使用的客户端库必须有自己的观点。

1. 通道

遵循协议的客户端库的主要问题是由 AMQP 通道的性质造成的。通道通常被解释为匹配操作系统线程的抽象 - 你可能有很多这样的线程,并且每个线程都是同步的。

这很好,但 AMQP 通道不仅限于线程 - 它远不止这些:错误范围、事务范围、排序保证和确认范围。

程序员可以选择在一个线程中使用多个通道,或者相反:多个线程可能需要在一个通道上工作。

第一个情况的例子:在两个队列之间转发消息(一个通道用于发布,一个通道用于消费)。第二种情况:将一个通道中的工作拆分到多个工作线程之间(以便在工作线程之间共享 basic.qos 配额)。

不可避免地,客户端库的作者必须决定通道与线程之间的关系。如果你来自 .NET/Java 背景,这听起来可能很无聊 - 这些框架对线程有自己的看法。但在某些语言中,例如 C 和 Python,在第三方库中假设任何关于线程模型的内容都是非常不好的做法。

我们可以针对处理多个连接的问题重复几乎相同的讨论。例如,单个线程可能需要与两个连接通信。

每个客户端库的作者都必须回答以下两个问题

  • 是否可以在多个通道上同时运行多个同步方法?
  • 是否可以从单个线程运行多个连接?

两个问题 - 四种可能的答案

在多个通道上阻塞从单个线程处理多个连接
简单的阻塞客户端(pyamqplib)
半异步客户端(pika 0.5.2)
线程化客户端(rabbitmq-java、rabbitmq-dotnet)
完全异步客户端(puka)

2. 错误处理

下一个问题是错误处理。使用某些客户端库,实际上不可能捕获 AMQP 错误并从中恢复,而无需重新启动整个程序。这通常是由于用户不了解通道作为错误范围的性质造成的。但这些库并没有让处理错误变得容易:你遇到了通道错误,现在怎么办?例如,执行 basic.publish 可能会在理论上的任何时候终止你的通道。

3. 同步发布

最后一个问题是缺乏对同步发布的支持。在 RabbitMQ 扩展 AMQP 以支持“确认”之前,实际上无法确保消息在发送到代理之前已成功发送。唯一的解决方案是使用事务,这会极大地降低发布速度。现在,有了“确认”,这是可能的,但相当困难 - 除了编写回调,用户还需要维护库线程和用户线程之间的锁,这需要理解库的线程模型。

诞生

正是这种挫败感催生了新的实验性 Python 客户端:Puka

Puka 试图为底层 AMQP 协议提供简单的 API 和合理的错误处理。Puka 的主要功能

  • 单线程。它不针对底层线程模型做出任何假设;如果需要,用户可以在 Puka 之上编写一个简单的线程层。
  • 可以混合同步和异步编程风格。
  • AMQP 错误是可预测的并且可以恢复。
  • Basic.publish 可以根据需要同步或异步执行。

Puka 的反功能

  • AMQP 通道不会公开给用户。
  • 移除了对某些 AMQP 功能的支持,最显著的是心跳。

代码片段

作为预告,这里有一些代码片段。

逐个声明 1000 个队列

for i in range(1000):
promise = client.queue_declare(queue='a%04i' % i)
client.wait(promise)

并行声明 1000 个队列

promises = [client.queue_declare(queue='a%04i' % i) for i in range(1000)]
for promise in promises:
client.wait(promise)

异步发布

client.basic_publish(exchange='', routing_key='test',
body="Hello world!")

同步发布

promise = client.basic_publish(exchange='', routing_key='test',
body="Hello world!")
client.wait(promise)

AMQP 错误不会影响程序的其他部分(发布、消费等)。例如,如果“test”队列已被声明为“持久化”,并且你尝试在没有适当标志的情况下重新声明它,你将收到错误

> promise = client.queue_declare(queue='test')
> client.wait(promise)
Traceback (most recent call last):
[...]
puka.spec_exceptions.PreconditionFailed: {'class_id': 50, 'method_id': 10,
'reply_code': 406, 'reply_text': "PRECONDITION_FAILED - parameters for queue
'test' in vhost '/' not equivalent"}

在 Puka 中,你可以简单地捕获此异常并继续

try:
promise = client.queue_declare(queue='test')
client.wait(promise)
except puka.PreconditionFailed:
# Oh, sorry. Forgot it was durable.
promise = client.queue_declare(queue='test', durable=True)
client.wait(promise)

你可以查看Puka 的 RabbitMQ 教程代码、Puka 的示例测试

总结

总之,Puka 提供了更简单的 API、灵活的编程模型、适当的错误处理,并且不针对线程做出任何决策。它让使用 AMQP 再次变得有趣。

© 2024 RabbitMQ. All rights reserved.