跳至主内容

仲裁队列如何在提供排序保证的同时实现本地交付

·阅读 17 分钟
Jack Vanlightly

团队最近被问到,鉴于仲裁队列在可能的情况下会从本地队列副本(leader 或 follower)交付消息,它们是否以及如何提供与经典队列相同的消息排序保证。镜像队列始终从主节点(leader)交付,因此从任何队列副本交付听起来都可能影响这些保证。

这正是本帖的主题。请注意,本帖是一篇面向好奇者和分布式系统爱好者的技术深度分析。我们将探讨仲裁队列如何在不增加額外协调(Raft 之外)的情况下,从任何队列副本(leader 或 follower)交付消息,同时保持消息排序保证。

摘要

所有队列,包括仲裁队列,都为非重试消息提供按通道的顺序保证。最简单的理解方式是,如果一个队列只有一个消费者,那么该消费者将按 FIFO(先进先出)顺序接收消息。一旦一个队列有两个消费者,那么这些保证将变为对非重试消息的单调排序——也就是说,可能会有间隙(因为消费者现在会竞争),但消费者绝不会在接收到早期消息(非重试)之前接收到后续消息(非重试)。

如果您需要对所有消息(包括重试消息)进行可靠的 FIFO 顺序保证,那么您需要使用单活动消费者功能,并将预取值设置为 1。任何重试消息都会在下次投递发生之前重新添加到队列中,从而保持 FIFO 顺序。

仲裁队列提供与经典队列相同的顺序保证。它们只是碰巧能够从任何本地副本进行投递,即本地到消费者通道。如果您想了解仲裁队列如何实现这一点,请继续阅读!如果您不想了解,则在此处停止,但可以放心,通常的顺序保证仍然得到维持。

代理流量的成本

RabbitMQ 允许任何客户端连接到集群中的任何节点,从而使事情变得简单。如果一个消费者连接到代理 1,而队列存在于代理 2 上,那么流量将从代理 2 代理到代理 1,然后再回来。消费者不知道队列托管在不同的节点上。

这种灵活性和易用性是有代价的。在一个三节点集群中,最糟糕的情况是发布者连接到代理 1,其消息被路由到一个位于代理 2 上的经典非复制队列,而该队列的消费者连接到代理 3。要处理这些消息,所有三个代理都被卷入其中,这当然效率较低。

如果该队列已复制,那么消息将需要传输一次以进行代理,然后再额外传输一次以进行复制。除此之外,我们还介绍了镜像队列算法在每条消息的多次发送中效率低下。 (可在此处阅读)。

Fig 1 shows the mirrored queue replication traffic in blue, with additional traffic for proxying publisher and consumer traffic.
图 1 显示了镜像队列的复制流量(蓝色),以及用于代理发布者和消费者流量的额外流量。

如果消费者能够从它们连接到的地方接收消息,而不是从存在于不同代理上的领导者那里接收消息,那将是很好的——这将节省网络利用率,并减轻队列领导者的压力。

Fig 2 shows a quorum queue with replication traffic in blue and only the publisher traffic being proxied as the consumer consumes directly from a follower.
图 2 显示了一个仲裁队列,其中复制流量为蓝色,只有发布者流量被代理,因为消费者直接从副本那里消费。

协调的成本

使用镜像队列,所有消息都由队列主节点(并可能通过另一个代理发送给消费者)传递。这很简单,并且不需要主节点与其镜像之间的协调。

在仲裁队列中,我们可以添加领导者和副本之间的协调来实现本地投递。领导者和副本之间的通信将协调谁将传递哪条消息——因为我们不能让一条消息被传递两次或根本不传递。不幸的事情可能会发生,消费者可能失败,代理可能失败,网络可能分区等等,并且协调需要处理所有这些。

但协调不利于性能。为了实现本地投递的协调类型可能会对性能产生极大影响,并且也极其复杂。我们需要另一种方法,幸运的是,我们所需的一切都已内置在协议中。

无需协调,只需确定性

在分布式系统中避免协调的一种常用方法是使用确定性。如果集群中的每个节点都以相同的顺序获得相同的数据,并且仅基于该数据做出决策,那么每个节点在该日志点将做出相同的决策。

确定性决策需要每个节点以相同的顺序接收相同的数据。仲裁队列构建在 Raft 之上,Raft 是一个复制提交日志——操作的有序序列。因此,只要执行本地投递所需的所有信息都写入此有序操作日志中,那么每个副本(领导者或跟随者)都将知道谁应该传递每条消息,而无需相互通信。

事实证明,即使对于仅领导者投递,我们也仍然需要消费者在日志中的进出。如果一个代理失败,另一个跟随者被提升为领导者,它将需要了解集群中存在的其他活动的消费者通道,以便能够将消息传递给它们。这些信息也使得无需协调即可实现本地投递。

效果和本地标志

仲裁队列构建在名为 Ra 的 Raft 实现之上(也由 RabbitMQ 团队开发)。 Ra 是一个可编程的状态机,可以复制操作日志。它区分所有副本都应执行的操作(命令)以实现一致性,以及只有领导者应执行的外部操作(效果)。这些命令、状态和效果由开发人员编程。仲裁队列有自己的命令、状态和效果。

命令和效果的一个好例子是键值存储。数据的添加、更新和删除应由所有副本执行。每个副本都需要拥有相同的数据,因此当领导者失败时,跟随者就可以接管,拥有相同的数据。因此,数据修改是命令。但是通知客户端应用程序某个键已更改的操作应只发生一次。如果一个客户端应用程序要求在键更新时得到通知,它不希望收到来自领导者和所有二次副本的通知!因此,只有领导者应该执行效果。

Ra 支持“本地”效果。在仲裁队列的情况下,只有发送消息效果是本地的。它的工作方式是,所有副本都知道存在哪些消费者通道以及在哪些节点上。当消费者注册时,该信息将添加到日志中,同样,当它失败或取消注册时,也会添加到日志中。

每个副本按顺序“应用”日志中已提交(由多数副本确认)的每个命令。应用入队命令会将消息添加到队列,应用消费者下线命令会将该消费者从服务队列(稍后详细介绍)中移除,并将其所有待处理的消息返回给队列以供重试。

消费者被添加到服务队列(SQ)中,该队列是确定性维护的——这意味着所有副本在日志的任何给定时间点都拥有相同的 SQ。每个消费者将评估任何尚未传递的消息,其 SQ 与所有其他副本完全相同,并将从 SQ 中出队一个消费者。如果该消费者是本地的(意味着其通道进程托管在与副本相同的代理上),那么副本将消息发送到该本地通道。然后该通道将其发送给消费者。如果消费者通道不是本地的,那么副本将不会传递它,但会跟踪其状态(它被传递给谁,是否已确认等)。一个例外是,如果不存在与消费者通道本地对应的副本,那么领导者会将其发送到该通道(代理方法)。

如果您仍然觉得这很有趣,但发现它难以理解,我不会责怪您。我们需要图表和事件序列来演示这一点。

带图的示例

我将把一组事件分组到每个图表中,以尽可能减少图表数量。

每个图包含三个队列副本,一个领导者和两个跟随者。我们看到日志的状态、服务队列、队列表示和“应用”操作。每个操作的格式为“command term:offsetdata”。例如,E 1:1 m1 是入队命令,它在第一个任期添加,具有第一个偏移量,并且是消息 m1。任期和偏移量是 Raft 算法的任期,对于理解本地投递来说并不重要(但如果您觉得这很有趣,我建议您阅读 Raft 算法)。

图表指南

第 1 组(事件序列 1)

  • 发布者通道为消息 m1 添加一个*入队 m1* 命令。

第 2 组(事件序列 2-3)

  • 领导者将**入队 m1** 命令复制到跟随者 A。
  • 领导者将**入队 m1** 命令复制到跟随者 C。

第 3 组(事件序列 4-5)

  • 连接到跟随者 A 代理的消费者 1 的通道添加了一个**订阅 c1** 命令。
  • 领导者 B 应用了**入队 m1** 命令(因为它现在已提交)。
    1. 领导者将其添加到其队列。
    2. 领导者通知发布者通道已提交。

第 4 组(事件序列 6-9)

  • 领导者将**订阅 c1** 命令复制到跟随者 A。
  • 领导者将**订阅 c1** 命令复制到跟随者 C。
  • 跟随者 C 应用了**入队 m1** 命令。
    1. 将其消息添加到其队列。
    2. 未发现消费者,因此无需投递。
  • 跟随者 A 应用了**入队 m1** 命令。
    1. 将其消息添加到其队列。
    2. 未发现消费者,因此无需投递。

消费者确实存在,但副本仅在应用其日志中的订阅命令时才了解消费者。它们确实在日志中有这些命令,但尚未应用它们。

第 5 组(事件序列 10-12)

  • 领导者 B 应用了**订阅 c1** 命令。
    1. C1 已添加到其服务队列 (SQ)。
    2. 评估尚未传递的消息 m1。它从 SQ 中出队 C1,但发现它不是本地的,因此不将消息发送给 C1,而是只跟踪 m1 作为由 C1 处理。将 C1 重新排入 SQ。
  • C2 向领导者添加了一个**订阅 c2** 命令。
  • 发布者通道为消息 m2 添加了一个**入队 m2** 命令。

第 6 组(事件序列 13-16)

  • 领导者将**订阅 c2** 命令复制到跟随者 A。
  • 领导者将**订阅 c2** 命令复制到跟随者 C。
  • 跟随者 C 应用了**订阅 c1** 命令。
    1. C1 已添加到其服务队列 (SQ)。
    2. 评估其 SQ 中的第一个消费者与消息 m1,但该通道不是本地的,将 C1 重新排入 SQ。
  • 跟随者 A 应用了**订阅 c1** 命令。
    1. C1 已添加到其服务队列 (SQ)。
    2. 评估其 SQ 中的第一个消费者与消息 m1,并发现通道是本地的,因此将消息发送到该本地通道。将 C1 重新排入 SQ。

第 7 组(事件序列 17-20)

  • 领导者 B 应用了**订阅 c2** 命令。
    1. 将 C2 入队到其 SQ。
  • 领导者将**入队 m2** 命令复制到跟随者 A。
  • 消费者 1 的通道为 m1 添加了一个**确认 m1** 命令。
  • 跟随者 A 应用了**订阅 c2** 命令。
    1. 将 C2 入队到其 SQ。

请注意,跟随者 A 和领导者 B 的日志处于同一位置,并且拥有相同的服务队列。

第 8 组(事件序列 21-23)

  • 领导者 B 将**入队 m2** 命令复制到跟随者 C。
  • 跟随者 C 应用了**订阅 c2** 命令。
    1. 将 C2 入队到其 SQ。
  • 领导者 B 应用了**入队 m2** 命令。
    1. 将其消息 m2 添加到其队列。
    2. 从其 SQ 中出队 C1。发现该通道不是本地的。将 C1 重新排入 SQ。跟踪消息 m1 的状态(C1 将处理它)。
    3. 通知发布者通道该消息已提交。

此时,领导者 B 的 SQ 与跟随者不同,但这仅仅是因为它在日志中领先一个命令。

第 9 组(事件序列 24-26)

  • 领导者 B 应用了**确认 m1** 命令。
    1. 从其队列中移除消息。*跟随者 C 应用了**入队 m2** 命令。
    2. 将其消息 m2 添加到其队列。
    3. 从 SQ 中出队 C1,但 C1 不是本地的。
  • 跟随者 A 应用了**入队 m2** 命令。
    1. 将其消息 m2 添加到其队列。
    2. 从 SQ 中出队 C1,并发现 C1 是本地的,因此将消息 m2 发送给 C1。将 C1 重新排入 SQ。

可以看出,服务队列是匹配的——跟随者处于相同的偏移量,领导者领先一个,但确认不会影响服务队列。

第 10 组(事件序列 27)

  • 托管领导者 B 的代理失败或被关闭。

第 11 组(事件序列 28)

  • 发生领导者选举,跟随者 A 获胜,因为它在日志中拥有最高的纪元:offset在它的日志中的操作。

第 12 组(事件序列 29-30)

  • 发布者通道添加了一个**入队 m3** 命令。
  • 领导者 A 将**确认 m1** 命令复制到跟随者 C。

第 13 组(事件序列 31-33)

  • 领导者 A 将**入队 m3** 命令复制到跟随者 C。
  • 领导者 A 应用了**确认 m1** 命令。
    1. 从其队列中移除 m1。
  • 跟随者 C 应用了**确认 m1** 命令。
    1. 从其队列中移除 m1。

第 14 组(事件序列 34-35)

  • 领导者 A 应用了**入队 m3** 命令。
    1. 将其消息 m3 添加到其队列。
    2. 从其 SQ 中出队 C2。C2 不是本地的,因此将 C2 重新排入队列并跟踪消息 m3 的状态。
  • 跟随者 C 应用了**入队 m3** 命令。
    1. 将其消息 m3 添加到其队列。
    2. 从其 SQ 中出队 C2。C2 是本地的,因此将消息 m3 发送给该本地通道。将 C2 重新排入其 SQ。

因此,我们看到,在副本之间没有额外的协调的情况下,我们实现了本地投递,同时保持了 FIFO 顺序,即使在领导者故障转移期间也是如此。

但是,如果消费者在收到来自副本的消息后失败了呢?这会被检测到并向不同代理上的另一个消费者通道重试消息吗?

替代场景 - 消费者失败

我们将从第 6 组开始,继续我们之前的内容——m1 已交付给 C1 但尚未确认。

第 7 组 - 替代(事件序列 17-20)

  • C1 的通道消失了(无论出于何种原因)。
  • 领导者 B 应用了**订阅 c2** 命令。
    1. 将 C2 入队到 SQ。
  • 跟随者 A 应用了**订阅 c2** 命令。
    1. 将 C2 入队到 SQ。
  • 领导者 B 的监视器看到 C1 已消失。在日志中添加了一个**下线 c1** 命令。

第 8 组 - 替代(事件序列 21-25)

  • 领导者 A 将**下线 c1** 命令复制到跟随者 A。
  • 领导者 A 将**入队 m2** 命令复制到跟随者 C。
  • 跟随者 C 应用了**订阅 c2** 命令。
    1. 将 C2 入队到其 SQ。
  • 领导者 B 应用了**入队 m2** 命令。
    1. 从其 SQ 中出队 C1,但 C1 不是本地的,因此将其重新排入队列。
  • 跟随者 A 应用了**入队 m2** 命令。
    1. 从其 SQ 中出队 C1。C1 是本地的。尝试向其发送消息 m2(但无法发送,因为它已不存在)。将 C1 重新排入其 SQ。

第 9 组 - 替代(事件序列 26-28)

  • 领导者 B 应用了**下线 c1** 命令。
    1. 从其 SQ 中移除 C1。
    2. 将之前已交付给 C1 但未确认的消息 m1 和 m2 退还给队列。
    3. 对于重试消息 m1,从 SQ 中出队 C2,但发现它不是本地的。将 C2 重新排入队列。
  • 跟随者 C 应用了**入队 m2** 命令。
    1. 从其 SQ 中出队 C1,但 C1 不是本地的,因此将其重新排入队列,跟踪 m2 作为由 C1 处理。
  • 跟随者 A 应用了**下线 c1** 命令。
    1. 从其 SQ 中移除 C1。
    2. 将之前已交付给 C1 但未确认的消息 m1 和 m2 退还给队列。
    3. 对于重试消息 m1,从 SQ 中出队 C2,但发现它不是本地的。跟踪作为由 C2 处理。

第 10 组 - 替代(事件序列 29-31)

  • 跟随者 A 获取下一个未传递的消息 m2。从其 SQ 中出队 C2,但 C2 不是本地的。跟踪 m2 作为由 C2 处理,将 C2 重新排入 SQ。
  • 领导者 B 获取下一个未传递的消息 m2。从其 SQ 中出队 C2,但 C2 不是本地的。跟踪 m2 作为由 C2 处理,将 C2 重新排入 SQ。
  • 跟随者 C 应用了**下线 c1** 命令。
    1. 从其 SQ 中移除 C1。
    2. 跟随者 C 获取下一个未传递的消息 m1。从其 SQ 中出队 C2,并发现 C2 是本地的。将 m1 发送给该本地通道,将 C2 重新排入 SQ。

第 11 组 - 替代(事件序列 32)

  • 跟随者 C 获取下一个未传递的消息 m2。从其 SQ 中出队 C2,并发现 C2 是本地的。将 m2 发送给该本地通道。将 C2 重新排入 SQ。

仲裁队列在没有任何问题的情况下处理了消费者 1 的失败,同时在没有额外协调的情况下实现了从本地副本的投递。关键在于确定性决策,这要求每个节点仅使用日志中的数据来指导其决策,并且其已提交条目的日志没有分歧(所有这些都由 Raft 处理)。

最后的想法

仲裁队列具有与任何队列相同的顺序保证,并且还能够从本地副本投递消息。它们如何实现这一点很有趣,但与开发人员或管理员无关。真正有用的是理解这是选择仲裁队列而非镜像队列的另一个原因。我们之前描述了镜像队列背后非常低效的网络算法,现在您已经看到,使用仲裁队列,我们已大大优化了网络利用率。

从副本消费不仅可以提高网络利用率,还可以提高发布者和消费者负载之间的隔离度。发布者会影响消费者,反之亦然,因为它们将争用同一个资源——一个队列。通过允许消费者从不同的代理进行消费,我们可以获得更好的隔离。请参阅最近的集群大小调整案例研究,该研究表明,即使在巨大的队列积压和来自消费者的额外压力下,仲裁队列也能维持很高的发布速率。镜像队列更容易受到影响。

所以……考虑使用仲裁队列!

© . This site is unofficial and not affiliated with VMware.