Khepri 的日常操作
即使元数据存储不存储消息,它的行为也会影响使用它的 RabbitMQ 的日常行为和技术操作,至少在这些集群上运行的应用程序仍然想要对队列或流等资源进行身份验证以及声明或删除操作时是如此。
正如在集群注意事项中已经提到的,Khepri 是一个基于 Raft 的系统,与任何基于 Raft 的系统一样,元数据存储需要一定数量的集群成员在线且可用才能接受更新(写入/删除或集群成员更改)。
一致性模型和可见性保证
在执行队列或流声明、绑定声明等模式修改方面,Khepri 与 Mnesia 有一个重要的区别。这些更改对许多工作负载来说不会有明显影响,但可能会影响一些,特别是某些集成测试。
示例场景
考虑两个场景 A 和 B。
只有一个客户端。客户端执行以下步骤
- 它声明一个队列 Q。
- 它将 Q 绑定到交换机 X。
- 它将消息 M 发布到交换机 X。
- 它期望消息被路由到队列 Q。
- 它消费消息。
在此场景中,行为上应无明显差异。客户端的期望将得到满足。
有两个客户端,One 和 Two,分别连接到节点 R1 和 R3,并使用同一个虚拟主机。节点 R2 没有客户端连接。
- 客户端 One 声明一个队列 Q。
- 它将 Q 绑定到交换机 X。
- 它收到了队列和绑定声明的确认。
- 它通知客户端 Two,或者客户端 Two 隐式得知它已完成上述步骤(例如,在集成测试中)。
- 客户端 Two 将消息 M 发布到 X。
- 客户端 One 和 Two 期望消息被路由到 Q。
在此场景中,在第三步,当所有集群节点都已提交更新时,Mnesia 会返回。然而,Khepri 会在多数节点(包括处理客户端 One 操作的节点)返回时就返回。
这可能包括节点 R1 和 R2,但不包括节点 R3,这意味着在上述示例中由连接到节点 R3 的客户端 Two 发布的消息 M不保证会被路由。
一旦所有模式更改都传播到节点 R3,客户端 Two 在节点 R3 上后续的发布将保证被路由。
这是基于 Raft 的系统的权衡,它假定一个被多数节点接受的写入可以被视为成功。
变通策略
为了满足场景 B 中客户端 Two 的期望,Khepri 可以在路由消息时执行一致的(涉及多数副本)绑定查询,但这将对某些协议(如 MQTT)和交换机/目标类型(任何类似于 AMQP 0-9-1 中的主题交换机)的吞吐量产生显著影响。
依赖于依赖于共享拓扑的多个连接的应用程序有几种应对策略。
如果应用程序使用两个或更多连接到不同节点,它可以在启动时声明其拓扑,然后注入一个短暂的暂停(1-2 秒),之后再继续执行其他操作。
依赖于动态拓扑的应用程序可以切换到使用“静态”的交换机和绑定集。
不需要使用共享拓扑的应用程序组件可以各自配置自己的队列/流/绑定。
使用多个连接到不同节点的测试套件可以选择只使用一个连接,或者连接到同一个节点,或者注入暂停,或者等待一个指示拓扑已就绪的特定条件。
集群少数端的客户端资源声明
定义消息如何路由的拓扑存储在元数据存储中。无论活动元数据存储后端是什么,声明资源的方式都保持不变。
但是,使用 Mnesia 时,即使集群的大部分节点宕机或无法访问,仍然可以声明队列;而使用 Khepri 时,相同的操作会超时。这样,客户端可以适当地对问题做出反应,而不是依赖于 Mnesia 上实现的各种网络分区恢复策略来碰运气。
这是一个 PerfTest 工具尝试在只有一个节点(5 个节点中)运行的集群中声明所需交换机和队列的示例
./scripts/PerfTest
# => id: test-161339-979, starting consumer #0
# => id: test-161339-979, starting consumer #0, channel #0
# => Main thread caught exception: java.io.IOException
# => 16:14:10.638 [com.rabbitmq.perf.PerfTest.main()] ERROR com.rabbitmq.perf.PerfTest - Main thread caught exception
# => (...)
同时,RabbitMQ 节点记录了以下消息
[error] <0.1373.0> Error on AMQP connection <0.1373.0> (127.0.0.1:55165 -> 127.0.0.1:5672 - perf-test-consumer-0, vhost: '/', user: 'guest', state: running), channel 1:
[error] <0.1373.0> operation exchange.declare caused a connection exception internal_error: "failed to declare exchange 'direct' in vhost '/' because the operation timed out"