Puka - 重新思考 AMQP 客户端
我从根本上不同意我们当前 AMQP 客户端库公开的 API。
它们不完美是有原因的:我们从一开始就故意避免在 API 上进行创新。我们的客户端库的目的是公开通用的 AMQP,而不是任何一种消息传递的视图。但是,在我看来,试图将 AMQP 直接映射到客户端库的 API 是错误的,会导致过度复杂化和难以使用的抽象。
没有共同点:盲目遵循 AMQP 模型设计的客户端库将是复杂的;易于使用的客户端库必须是有主见的(opinionated)。
1. 通道 (Channels)
客户端库遵循协议的主要问题是由 AMQP 通道的性质引起的。通道通常被解释为与操作系统线程相匹配的抽象——你可以有很多这样的通道,并且每个通道都是同步的。
这都很好,但 AMQP 通道的功能不仅仅是线程——它远不止于此:错误范围、事务范围、顺序保证和确认(acks)范围。
程序员可以决定在一个线程中使用多个通道,或者相反:多个线程可能需要在单个通道上工作。
第一种情况的例子:在两个队列之间转发消息(一个通道用于发布,一个通道用于消费)。第二种情况:将一个通道的工作分配给多个工作线程(以便为工作线程共享 basic.qos 配额)。
不可避免地,客户端库的作者必须决定通道和线程之间的关系。如果你来自 .NET/Java 背景,这听起来可能很无聊——这些框架对多线程有自己的看法。但在某些语言中,例如 C 和 Python,假设第三方库的任何关于多线程模型的事情都是一个非常糟糕的做法。
对于处理多个连接的问题,我们可以进行几乎相同的讨论。例如,一个线程可能需要与两个连接进行通信。
每个客户端库的作者都必须回答以下两个问题:
- 是否可以同时在多个通道上运行多个同步方法?
- 是否可以从单个线程运行多个连接?
两个问题——四种可能的选择
| 在多个通道上阻塞 | 从单个线程处理多个连接 | |
|---|---|---|
| 否 | 否 | 简单的阻塞客户端 (pyamqplib) |
| 否 | 是 | 半异步客户端 (pika 0.5.2) |
| 是 | 否 | 多线程客户端 (rabbitmq-java, rabbitmq-dotnet) |
| 是 | 是 | 全异步客户端 (puka) |
2. 错误处理 (Error handling)
下一个问题是错误处理。使用一些客户端库,几乎不可能在不重新启动整个程序的情况下捕获 AMQP 错误并从中恢复。这通常是由于用户不理解通道作为错误范围的性质。但是库并没有让处理错误变得容易:你得到了一个通道错误,接下来怎么办?例如,执行 basic.publish 可能会在任何时候破坏你的通道。
3. 同步发布 (Synchronous publish)
最后一个缺陷是缺乏对同步发布的支持。在 RabbitMQ 扩展 AMQP 支持“确认”(confirms)之前,实际上无法确保消息已成功传递到代理。唯一的解决方案是使用事务,这会极大地减慢发布速度。现在,有了“确认”,这是可能的,但相当困难——除了编写回调函数,用户还需要在库线程和用户线程之间维护一个锁,这需要对库的多线程模型有所了解。
诞生 (The birth)
正是这种挫败感催生了一个新的实验性 Python 客户端:Puka。
Puka 试图为底层 AMQP 协议提供简单的 API 和合理的错误处理。Puka 的主要特点是:
- 单线程。它不对底层多线程模型做任何假设;如果需要,用户可以在 Puka 之上编写一个轻量级多线程层。
- 可以混合使用同步和异步编程风格。
- AMQP 错误是可预测且可恢复的。
- Basic.publish 可以是同步的也可以是异步的,随你选择。
Puka 的非特性:
- AMQP 通道不暴露给用户。
- 移除了对某些 AMQP 功能的支持,最值得注意的是心跳(heartbeats)。
代码片段 (Code snippets)
作为预告,这里有一些代码片段。
逐个声明 1000 个队列
for i in range(1000):
promise = client.queue_declare(queue='a%04i' % i)
client.wait(promise)
并行声明 1000 个队列
promises = [client.queue_declare(queue='a%04i' % i) for i in range(1000)]
for promise in promises:
client.wait(promise)
异步发布
client.basic_publish(exchange='', routing_key='test',
body="Hello world!")
同步发布
promise = client.basic_publish(exchange='', routing_key='test',
body="Hello world!")
client.wait(promise)
AMQP 错误不会影响你程序的其他部分(发布、消费等)。例如,如果一个 'test' 队列已经被声明为 'durable',而你尝试在没有正确标志的情况下重新声明它,你将收到一个错误。
> promise = client.queue_declare(queue='test')
> client.wait(promise)
Traceback (most recent call last):
[...]
puka.spec_exceptions.PreconditionFailed: {'class_id': 50, 'method_id': 10,
'reply_code': 406, 'reply_text': "PRECONDITION_FAILED - parameters for queue
'test' in vhost '/' not equivalent"}
在 Puka 中,你可以轻松地捕获这个异常并继续。
try:
promise = client.queue_declare(queue='test')
client.wait(promise)
except puka.PreconditionFailed:
# Oh, sorry. Forgot it was durable.
promise = client.queue_declare(queue='test', durable=True)
client.wait(promise)
你可以查看 RabbitMQ 教程的 Puka 代码,以及 Puka 的 示例 和 测试。
总结
总之,Puka 提供了更简单的 API、灵活的编程模型、正确的错误处理,并且不假定任何关于多线程的决策。它让使用 AMQP 重新变得有趣。