Puka - 重思 AMQP 客户端
我从根本上不同意我们当前 AMQP 客户端库所公开的 API。
它们之所以不完美是有原因的:从一开始,我们就有意避免在 API 中进行创新。我们的客户端库的目的是公开通用的 AMQP,而不是任何一种消息传递视图。但在我看来,试图将 AMQP 直接映射到客户端库 API 是错误的,会导致过度复杂化和难以使用的抽象。
没有共同点:盲目遵循 AMQP 模型的客户端库将很复杂;易于使用的客户端库必须有自己的观点。
1. 通道
遵循协议的客户端库的主要问题是由 AMQP 通道的性质造成的。通道通常被解释为匹配操作系统线程的抽象 - 你可能有很多这样的线程,并且每个线程都是同步的。
这很好,但 AMQP 通道不仅限于线程 - 它远不止这些:错误范围、事务范围、排序保证和确认范围。
程序员可以选择在一个线程中使用多个通道,或者相反:多个线程可能需要在一个通道上工作。
第一个情况的例子:在两个队列之间转发消息(一个通道用于发布,一个通道用于消费)。第二种情况:将一个通道中的工作拆分到多个工作线程之间(以便在工作线程之间共享 basic.qos 配额)。
不可避免地,客户端库的作者必须决定通道与线程之间的关系。如果你来自 .NET/Java 背景,这听起来可能很无聊 - 这些框架对线程有自己的看法。但在某些语言中,例如 C 和 Python,在第三方库中假设任何关于线程模型的内容都是非常不好的做法。
我们可以针对处理多个连接的问题重复几乎相同的讨论。例如,单个线程可能需要与两个连接通信。
每个客户端库的作者都必须回答以下两个问题
- 是否可以在多个通道上同时运行多个同步方法?
- 是否可以从单个线程运行多个连接?
两个问题 - 四种可能的答案
在多个通道上阻塞 | 从单个线程处理多个连接 | |
---|---|---|
否 | 否 | 简单的阻塞客户端(pyamqplib) |
否 | 是 | 半异步客户端(pika 0.5.2) |
是 | 否 | 线程化客户端(rabbitmq-java、rabbitmq-dotnet) |
是 | 是 | 完全异步客户端(puka) |
2. 错误处理
下一个问题是错误处理。使用某些客户端库,实际上不可能捕获 AMQP 错误并从中恢复,而无需重新启动整个程序。这通常是由于用户不了解通道作为错误范围的性质造成的。但这些库并没有让处理错误变得容易:你遇到了通道错误,现在怎么办?例如,执行 basic.publish 可能会在理论上的任何时候终止你的通道。
3. 同步发布
最后一个问题是缺乏对同步发布的支持。在 RabbitMQ 扩展 AMQP 以支持“确认”之前,实际上无法确保消息在发送到代理之前已成功发送。唯一的解决方案是使用事务,这会极大地降低发布速度。现在,有了“确认”,这是可能的,但相当困难 - 除了编写回调,用户还需要维护库线程和用户线程之间的锁,这需要理解库的线程模型。
诞生
正是这种挫败感催生了新的实验性 Python 客户端:Puka。
Puka 试图为底层 AMQP 协议提供简单的 API 和合理的错误处理。Puka 的主要功能
- 单线程。它不针对底层线程模型做出任何假设;如果需要,用户可以在 Puka 之上编写一个简单的线程层。
- 可以混合同步和异步编程风格。
- AMQP 错误是可预测的并且可以恢复。
- Basic.publish 可以根据需要同步或异步执行。
Puka 的反功能
- AMQP 通道不会公开给用户。
- 移除了对某些 AMQP 功能的支持,最显著的是心跳。
代码片段
作为预告,这里有一些代码片段。
逐个声明 1000 个队列
for i in range(1000):
promise = client.queue_declare(queue='a%04i' % i)
client.wait(promise)
并行声明 1000 个队列
promises = [client.queue_declare(queue='a%04i' % i) for i in range(1000)]
for promise in promises:
client.wait(promise)
异步发布
client.basic_publish(exchange='', routing_key='test',
body="Hello world!")
同步发布
promise = client.basic_publish(exchange='', routing_key='test',
body="Hello world!")
client.wait(promise)
AMQP 错误不会影响程序的其他部分(发布、消费等)。例如,如果“test”队列已被声明为“持久化”,并且你尝试在没有适当标志的情况下重新声明它,你将收到错误
> promise = client.queue_declare(queue='test')
> client.wait(promise)
Traceback (most recent call last):
[...]
puka.spec_exceptions.PreconditionFailed: {'class_id': 50, 'method_id': 10,
'reply_code': 406, 'reply_text': "PRECONDITION_FAILED - parameters for queue
'test' in vhost '/' not equivalent"}
在 Puka 中,你可以简单地捕获此异常并继续
try:
promise = client.queue_declare(queue='test')
client.wait(promise)
except puka.PreconditionFailed:
# Oh, sorry. Forgot it was durable.
promise = client.queue_declare(queue='test', durable=True)
client.wait(promise)
你可以查看Puka 的 RabbitMQ 教程代码、Puka 的示例和测试。
总结
总之,Puka 提供了更简单的 API、灵活的编程模型、适当的错误处理,并且不针对线程做出任何决策。它让使用 AMQP 再次变得有趣。