跳至主内容
版本:4.3

Khepri 的日常操作

尽管元数据存储不存储消息,但它的行为会影响使用该存储的 RabbitMQ 的日常行为和技术操作,至少在使用该集群的应用程序需要进行身份验证以及声明或删除队列或流等资源时是这样。

正如在集群注意事项中已经提到的,Khepri 是一个基于 Raft 的系统。与任何基于 Raft 的系统一样,集群成员必须有法定人数(Quorum)在线并可用,元数据存储才能接受更新(写入/删除或集群成员变更)。

一致性模型与可见性保证

在涉及队列或流声明、绑定声明等模式修改时,Khepri 与 Mnesia 存在一个重要的区别。这些变化在许多工作负载中不会被察觉,但可能会影响某些工作负载,特别是某些集成测试。

示例场景

考虑两个场景,A 和 B。

只有一个客户端。客户端执行以下步骤:

  1. 声明一个队列 Q。
  2. 将 Q 绑定到交换机 X。
  3. 向交换机 X 发布一条消息 M。
  4. 期望消息被路由到队列 Q。
  5. 消费该消息。
场景 A

在这种场景下,行为上不应该有可察觉的差异。客户端的预期将会得到满足。

有两个客户端,One 和 Two,分别连接到节点 R1 和 R3,并使用相同的虚拟主机。节点 R2 没有客户端连接。

  1. 客户端 One 声明一个队列 Q。
  2. 将 Q 绑定到交换机 X。
  3. 它收到了队列和绑定声明的确认。
  4. 它通知客户端 Two,或者客户端 Two 隐式地发现它已完成上述步骤(例如,在集成测试中)。
  5. 客户端 Two 向 X 发布一条消息 M。
  6. 客户端 One 和 Two 期望消息被路由到 Q。
场景 B

在这种场景下,在第三步中,当所有集群节点都提交了更新后,Mnesia 才会返回。然而,Khepri 会在大多数节点(包括处理客户端 One 操作的节点)返回后立即返回。

这可能包括节点 R1 和 R2,但不包括节点 R3,这意味着在上述示例中,连接到节点 R3 的客户端 Two 所发布的消息 M 不能保证被路由

一旦所有模式变更同步到节点 R3,客户端 Two 随后在节点 R3 上进行的发布操作将保证被路由

这就是基于 Raft 的系统的权衡,即假设被大多数节点接受的写入可以被视为成功。

变通策略

为了满足场景 B 中客户端 Two 的预期,Khepri 可以在路由消息时对绑定执行一致性查询(涉及大多数副本),但这会对某些协议(如 MQTT)和交换机/目的地类型(任何类似于 AMQP 0-9-1 中主题交换机的类型)的吞吐量产生显著影响。

依赖于共享拓扑且使用多个连接的应用程序有几种应对策略。

如果应用程序使用两个或更多连接连接到不同的节点,它可以在启动时声明其拓扑,然后在进行其他操作之前注入短暂的暂停(1-2 秒)。

依赖于动态拓扑的应用程序可以切换为使用“静态”的交换机和绑定集合。

不需要使用共享拓扑的应用程序组件可以各自配置自己的队列/流/绑定。

使用多个连接到不同节点的测试套件可以选择只使用一个连接、连接到同一个节点、注入暂停,或者等待某种表明拓扑已就绪的条件。

在集群少数派侧进行客户端资源声明

定义消息如何路由的拓扑存储在元数据存储中。无论活动的元数据存储后端是什么,声明资源的方式保持不变。

然而,在使用 Mnesia 时,即使集群中的大多数节点宕机或无法访问,也可以声明队列;而使用 Khepri 时,相同的操作将超时。这样客户端可以适当地对问题做出反应,而不是寄希望于 Mnesia 之上实现的各种网络分区恢复策略。

以下是一个 PerfTest 工具示例,它尝试在一个 5 个节点中仅 1 个节点运行的集群中声明其所需的交换机和队列:

./scripts/PerfTest

# => id: test-161339-979, starting consumer #0
# => id: test-161339-979, starting consumer #0, channel #0
# => Main thread caught exception: java.io.IOException
# => 16:14:10.638 [com.rabbitmq.perf.PerfTest.main()] ERROR com.rabbitmq.perf.PerfTest - Main thread caught exception
# => (...)

同时,RabbitMQ 节点记录了以下消息:

[error] <0.1373.0> Error on AMQP connection <0.1373.0> (127.0.0.1:55165 -> 127.0.0.1:5672 - perf-test-consumer-0, vhost: '/', user: 'guest', state: running), channel 1:
[error] <0.1373.0> operation exchange.declare caused a connection exception internal_error: "failed to declare exchange 'direct' in vhost '/' because the operation timed out"
© . This site is unofficial and not affiliated with VMware.