跳至主要内容

使用原生 MQTT 为数百万客户端提供服务

·阅读时间:24分钟

RabbitMQ 的核心协议一直是 AMQP 0.9.1。为了支持 MQTT、STOMP 和 AMQP 1.0,代理通过其核心协议透明地进行代理。虽然这是一种通过支持更多消息传递协议来扩展 RabbitMQ 的简单方法,但它会降低可扩展性和性能。

在过去的 9 个月里,我们重写了 MQTT 插件,不再通过 AMQP 0.9.1 进行代理。相反,MQTT 插件解析 MQTT 消息并将其直接发送到队列。这就是我们所说的 **原生 MQTT**。

结果令人惊叹

  1. 内存使用量最多降低 95%,在大量连接的情况下可降低数百 GB。
  2. RabbitMQ 首次能够处理数百万个连接。
  3. 端到端延迟降低了 50% - 70%。
  4. 吞吐量提高了 30% - 40%。

原生 MQTT 将 RabbitMQ 变成一个 MQTT 代理,为更广泛的物联网用例打开了大门。

原生 MQTT 在 RabbitMQ 3.12 中提供。

概述

如图 1 所示,在 RabbitMQ 3.11 之前,MQTT 插件的工作原理是解析 MQTT 消息并通过 AMQP 0.9.1 协议将其转发到 通道,然后通道将消息路由到 队列。图 1 中的每个蓝色点代表一个 Erlang 进程。每个传入的 MQTT 连接都会创建总共 22 个 Erlang 进程。

Figure 1: RabbitMQ 3.11 - MQTT proxied via AMQP 0.9.1 - 22 Erlang processes per MQTT connection
图 1:RabbitMQ 3.11 - 通过 AMQP 0.9.1 代理的 MQTT - 每个 MQTT 连接 22 个 Erlang 进程

MQTT 插件中为每个 MQTT 连接创建 16 个 Erlang 进程(即 Erlang 应用程序 rabbitmq_mqtt),其中包括一个负责 MQTT 保持活动 的进程以及许多充当 AMQP 0.9.1 客户端的进程。

Erlang 应用程序 rabbit 中为每个 MQTT 连接创建 6 个 Erlang 进程。它们实现了核心 AMQP 0.9.1 服务器的每个客户端部分。

图 2 显示原生 MQTT 每个传入的 MQTT 连接只需要一个 Erlang 进程。

Figure 2: RabbitMQ 3.12 - Native MQTT - 1 Erlang process per MQTT connection
图 2:RabbitMQ 3.12 - 原生 MQTT - 每个 MQTT 连接 1 个 Erlang 进程

该 Erlang 进程负责解析 MQTT 消息、遵守 MQTT 保持活动、执行 身份验证和授权 以及将消息路由到队列。

常见的 MQTT 工作负载包括许多物联网 (IoT) 设备定期向 MQTT 代理发送数据。例如,可能有数十万甚至数百万个设备,每个设备每隔几秒或几分钟发送一次状态更新。

我们在去年的 RabbitMQ 峰会演讲 RabbitMQ 性能改进 中了解到,创建 Erlang 进程的成本很低。(Erlang 进程比 Java 线程轻量级得多。Erlang 进程可以与 Golang 中的 Goroutine 相比。)但是,对于 100 万个传入的 MQTT 客户端连接,RabbitMQ 是创建 2200 万个 Erlang 进程(RabbitMQ 3.11 之前)还是只创建 100 万个 Erlang 进程(RabbitMQ 3.12)对可扩展性有很大影响。这种差异在 内存使用延迟和吞吐量 部分进行了分析。

不仅重写了 rabbitmq_mqtt 插件,还重写了 rabbitmq_web_mqtt 插件。这意味着,从 3.12 开始,每个传入的 通过 WebSockets 的 MQTT 连接也将只有一个 Erlang 进程。因此,本文档中概述的所有性能改进也适用于通过 WebSockets 的 MQTT。

术语“原生 MQTT”不是 MQTT 规范的正式术语。“原生 MQTT”指的是新的 RabbitMQ 3.12 实现,其中 RabbitMQ “原生”地支持 MQTT,即 MQTT 流量不再通过 AMQP 0.9.1 进行代理。

新的 MQTT QoS 0 队列类型

原生 MQTT 附带一种名为 rabbit_mqtt_qos0_queue 的新的 RabbitMQ 队列类型。要使 MQTT 插件创建这种新队列类型的队列,必须启用 3.12 功能标志,其名称与 rabbit_mqtt_qos0_queue 相同。(请记住,功能标志并非旨在用作集群配置的一种形式。在成功完成滚动升级后,您应该启用所有功能标志。每个功能标志将在 RabbitMQ 的未来版本中成为强制性标志。)

在解释新队列类型之前,我们应该首先了解队列如何与 MQTT 订阅者相关联。

在所有 RabbitMQ 版本中,MQTT 插件都会为每个 MQTT 订阅者创建一个专用的队列。更准确地说,每个 MQTT 连接可能会有 0、1 或 2 个队列

  • 如果 MQTT 客户端从未发送 SUBSCRIBE 数据包,则 MQTT 连接会有 0 个队列。MQTT 客户端仅发布消息。
  • 如果 MQTT 客户端使用相同的 服务质量 (QoS) 级别 创建一个或多个订阅,则 MQTT 连接会有 1 个队列。
  • 如果 MQTT 客户端使用 QoS 0(最多一次)QoS 1(至少一次) 创建一个或多个订阅,则 MQTT 连接会有 2 个队列。

列出队列时,您会观察到队列命名模式 mqtt-subscription-<MQTT client ID>qos[0|1],其中 <MQTT client ID>MQTT 客户端标识符,而 [0|1] 则为 QoS 0 订阅的 0 或 QoS 1 订阅的 1。为每个 MQTT 订阅者设置一个单独的队列是有意义的,因为每个 MQTT 订阅者都会收到应用程序消息的副本。

默认情况下,MQTT 插件会创建经典队列。

MQTT 插件会为 MQTT 订阅客户端透明地创建队列。MQTT 规范未定义队列的概念,并且 MQTT 客户端不知道这些队列的存在。队列是 RabbitMQ 如何实现 MQTT 协议的实现细节。

图 3 显示了一个使用 CleanSession=1 连接并使用 QoS 0 订阅的 MQTT 订阅者。清除会话 表示 MQTT 会话仅在客户端和服务器之间的网络连接持续期间有效。当会话结束时,服务器中的所有会话状态都会被删除,这意味着队列会被自动删除。

如图 3 所示,每个经典队列都会产生两个额外的 Erlang 进程:一个 监管程序 进程和一个工作进程。

Figure 3: Native MQTT - Feature flag rabbit_mqtt_qos0_queue disabled
图 3:原生 MQTT - 功能标志 rabbit_mqtt_qos0_queue 未启用

新队列类型优化工作原理如下:如果

  1. 功能标志 rabbit_mqtt_qos0_queue 已启用,并且
  2. MQTT 客户端使用 CleanSession=1 连接,并且
  3. MQTT 客户端使用 QoS 0 订阅

那么,MQTT 插件将创建类型为 rabbit_mqtt_qos0_queue 的队列,而不是经典队列。

图 4 显示新队列类型是“伪”队列或“虚拟”队列:它与您知道的队列类型(经典队列、仲裁队列)非常不同,因为这种新队列类型既不是单独的 Erlang 进程,也不会将消息存储在磁盘上。相反,这种队列类型是 Erlang 进程邮箱的一个子集。MQTT 消息会直接发送到订阅客户端的 MQTT 连接进程。换句话说,MQTT 消息会发送到任何“在线”MQTT 订阅者。

更准确地说,可以认为跳过了队列(如 RabbitMQ 峰会 2022 年演讲的这张幻灯片 中的最后一行 根本没有队列?🤔 所示)。将消息直接发送到 MQTT 连接进程这一事实被实现为一种队列类型是为了简化消息路由和协议互操作性,这样消息不仅可以从 MQTT 发布连接进程发送,还可以从通道进程发送。后者允许从 AMQP 0.9.1、AMQP 1.0 或 STOMP 客户端直接将消息发送到 MQTT 订阅者连接进程,从而跳过专用的队列进程。

Figure 4: Native MQTT - Feature flag rabbit_mqtt_qos0_queue enabled
图 4:原生 MQTT - 功能标志 rabbit_mqtt_qos0_queue 已启用

现在我们了解了这种新队列类型跳过了队列进程。但是,在 MQTT 上下文中这样做有什么优势呢?MQTT 工作负载的特点是许多设备向代理发送数据并从代理接收数据。以下是新队列类型如何提供优化的四个原因,按重要性递减排列

优势 1:大型扇出

启用从“云”(MQTT 代理)向所有设备发送消息的大型扇出。

对于经典队列和仲裁队列,每个队列客户端(即通道进程或 MQTT 连接进程)都会为所有目标队列保留状态,以用于流量控制。例如,通道进程会在其 进程字典 中保存来自每个目标队列的信用额度。(阅读 我们关于基于信用的流量控制的博文 以了解更多信息。)

  • 如果有一个通道进程向 300 万个 MQTT 设备发送消息,则通道进程字典中会保留 300 万个条目(数百 MB 的内存)。
  • 如果有 100 个通道进程,每个进程向 300 万个设备发送消息,则进程字典中总共有 3 亿个条目(最好不要尝试这样做)。
  • 如果有几千个通道或 MQTT 连接进程,每个进程向 300 万个设备发送消息,则 RabbitMQ 会耗尽内存并崩溃。

即使这些巨大的扇出事件非常罕见,例如每天一次,发送 Erlang 进程到所有目标队列的状态也会始终保存在内存中(直到目标队列被删除)。

新队列类型最重要的特性是其队列类型客户端是无状态的。这意味着 MQTT 连接或通道进程可以将消息(“即发即弃”)发送到 300 万个 MQTT 连接进程(这仍然暂时需要大量内存),而无需保留到目标队列的任何状态。一旦垃圾回收启动,队列客户端进程的内存使用量就会降至 0 MB。

优势 2:更低的内存使用

不仅在大扇出场景下,即使在 1:1 的拓扑结构中(每个发布者只发送到一个订阅者),新的队列类型 rabbit_mqtt_qos0_queue 通过跳过队列处理过程,节省了大量的内存。

即使在 3.12 版本的原生 MQTT 中,禁用特性标志 rabbit_mqtt_qos0_queue 时,300 万个使用 QoS 0 订阅的 MQTT 设备会导致 900 万个 Erlang 进程。启用特性标志 rabbit_mqtt_qos0_queue 后,相同的工作负载只会产生 300 万个 Erlang 进程,因为跳过了每个 MQTT 订阅者的额外队列主管和队列工作进程,节省了数 GB 的进程内存。

优势 3:更低的发布者确认延迟

尽管一个 MQTT 客户端使用 QoS 0 订阅,但另一个 MQTT 客户端仍然可以发送 QoS 1 的消息(或者等效地,来自 AMQP 0.9.1 发送客户端的通道可以设置为确认模式)。在这种情况下,发布客户端需要来自代理的发布确认。

新的队列类型的客户端(MQTT 发布者连接过程或通道过程的一部分)直接代表“队列服务器进程”自动确认,因为至多一次的 QoS 0 消息无论如何都可能在从代理到 MQTT 订阅者的过程中丢失。这导致了更低的发布者确认延迟。

在 RabbitMQ 中,只有当**所有**目标队列确认收到消息后,才会将消息确认回发布客户端。因此,更重要的是,发布过程只等待来自可能具有至少一次消费者的队列的确认。使用新的队列类型,在将一条消息路由到一个重要的仲裁队列以及一百万个 MQTT QoS 0 订阅者,而其中一百万个 MQTT 连接进程中的一个过载(因此回复非常缓慢)的情况下,发布过程不会被阻塞发送确认回发布客户端。

优势 4:更低的端到端延迟

由于跳过了队列进程,消息跳数减少了一个,从而导致更低的端到端延迟。

过载保护

由于新的队列类型没有流量控制,MQTT 消息可能比从 MQTT 连接进程传递到 MQTT 订阅客户端的速度更快地到达 MQTT 连接进程的邮箱。当 MQTT 订阅客户端和 RabbitMQ 之间的网络连接较差,或者在许多发布者过载单个 MQTT 订阅客户端的大扇入场景中,可能会发生这种情况。

为了防止由于 MQTT QoS 0 消息在 MQTT 连接进程邮箱中堆积导致的高内存使用,如果以下两个条件都为真,RabbitMQ 会有意丢弃来自 rabbit_mqtt_qos0_queue 的 QoS 0 消息

  1. MQTT 连接进程邮箱中的消息数量超过设置 mqtt.mailbox_soft_limit(默认为 200),并且
  2. 发送到 MQTT 客户端的套接字繁忙(发送速度不够快)。

请注意,进程邮箱中可能还有其他消息(例如,从 MQTT 订阅客户端发送到 RabbitMQ 的应用程序消息或来自其他队列类型到 MQTT 连接进程的确认),这些消息显然不会被丢弃。但是,这些其他消息也会影响 mqtt.mailbox_soft_limit

mqtt.mailbox_soft_limit 设置为 0 会禁用过载保护机制,这意味着 RabbitMQ 永远不会有意丢弃 QoS 0 消息。将 mqtt.mailbox_soft_limit 设置为非常高的值会降低有意丢弃 QoS 0 消息的可能性,同时也会增加导致集群范围内存警报的风险(尤其是在消息有效负载很大或存在许多过载的 rabbit_mqtt_qos0_queue 类型队列时)。

mqtt.mailbox_soft_limit 可以被认为是队列长度限制,尽管不完全是,因为如前所述,Erlang 进程邮箱可以包含除 MQTT 应用程序消息之外的其他消息。这就是配置键 mqtt.mailbox_soft_limit 包含单词 soft 的原因。所描述的过载保护机制大致对应于您已经从经典队列和仲裁队列中了解到的溢出行为 drop-head

队列类型命名

不要依赖此新队列类型的名称。特性标志 rabbit_mqtt_qos0_queue 的名称不会更改。但是,如果我们决定在其他 RabbitMQ 使用案例(例如直接回复)中重用其设计的一部分,我们可能会在将来更改队列类型 rabbit_mqtt_qos0_queue 的名称。

事实上,MQTT 客户端应用程序和 RabbitMQ 核心(Erlang 应用程序 rabbit)都不知道新的队列类型。除了在管理 UI 或通过 rabbitmqctl 列出队列时,最终用户也不会意识到新的队列类型。

鉴于我们现在了解了原生 MQTT 的架构和 MQTT QoS 0 队列类型的意图,我们可以继续进行性能基准测试。

内存使用

本节比较了 RabbitMQ 3.11 中的 MQTT 和 RabbitMQ 3.12 中的原生 MQTT 的内存使用情况。完整的测试设置可以在ansd/rabbitmq-mqtt中找到。

本节中的两个测试针对一个 3 节点集群进行,并使用以下 RabbitMQ 配置子集

mqtt.tcp_listen_options.sndbuf = 1024
mqtt.tcp_listen_options.recbuf = 1024
mqtt.tcp_listen_options.buffer = 1024
management_agent.disable_metrics_collector = true

TCP 缓冲区大小配置为较小值,以避免导致高二进制内存使用。

rabbitmq_management 插件中禁用了指标收集。对于生产环境用例,应使用 Prometheus。rabbitmq_management 插件并非设计用于处理大量统计信息发射对象(如队列和连接)。

100 万个 MQTT 连接

第一个测试使用 1,000,000 个 MQTT 连接,这些连接在建立连接后仅发送 MQTT 保持活动消息。没有发送或接收 MQTT 应用程序消息。

如图 5 所示,3 节点集群在 3.11 版本中需要 108.0 + 100.7 + 92.4 = 301.1 GiB,而在 3.12 版本中仅需要 6.1 + 6.3 + 6.3 = 18.7 GiB 的内存。因此,3.11 版本需要的内存是 3.12 版本的 16 倍(或 282 GiB)。原生 MQTT 将内存使用量降低了 94%。

Figure 5: Memory usage connecting 1 million MQTT clients
图 5:连接 100 万个 MQTT 客户端时的内存使用情况

在 3.11 版本中,导致内存使用量最大的部分是**进程**内存。如“概述”部分所述,3.12 版本的原生 MQTT 每个 MQTT 连接使用 1 个 Erlang 进程,而 3.11 版本每个 MQTT 连接使用 22 个 Erlang 进程。3.11 版本的一些进程会保留大量状态,从而导致高内存使用。

原生 MQTT 的低内存使用量不仅是通过使用单个 Erlang 进程实现的,还通过减少该单个 Erlang 进程的状态实现的。在 PR #5895 中实现了大量内存优化,例如从进程状态中删除长列表和不必要的函数引用。

开发环境中的进一步测试表明,在 3 节点集群中,原生 MQTT 每个节点需要大约 56 GB 的内存,总共 900 万个 MQTT 连接。

10 万个发布者,10 万个订阅者

第二个测试使用 100,000 个发布者和 100,000 个订阅者。它们以 1:1 的拓扑结构发送和接收,这意味着每个发布者每 2 分钟向一个 QoS 0 订阅者发送一条 QoS 0 应用程序消息。

如图 6 所示,3 节点集群在 3.11 版本中需要 21.6 + 21.5 + 21.7 = 64.8 GiB,而在 3.12 版本中仅需要 2.6 + 2.6 + 2.6 = 7.8 GiB 的内存。

Figure 6: Memory usage connecting 100,000 publishers and 100,000 subscribers
图 6:连接 100,000 个发布者和 100,000 个订阅者时的内存使用情况

开发环境中的进一步测试表明,原生 MQTT 在以下场景下每个节点需要大约 47 GB 的内存,3 节点集群

  • 总共 300 万个 MQTT 连接
  • 1:1 拓扑结构
  • 150 万个发布者每 3 分钟发送带有 64 字节有效负载的 QoS 1 消息
  • 150 万个 QoS 1 订阅者(即 150 万个经典队列版本 2

RabbitMQ 3.11.6 中发布的 PR #6684 大大减少了许多经典队列的内存使用量,有利于许多 MQTT QoS 1 或 CleanSession=0 订阅者的使用案例。

延迟和吞吐量

为了比较 3.11 版本中的 MQTT 和 3.12 版本中的原生 MQTT 之间的延迟,我们使用mqtt-bm-latency

以下延迟基准测试在具有 8 个 CPU 和 32 GB RAM 的物理 Ubuntu 22.04 机器上执行。客户端和单节点 RabbitMQ 服务器在同一台机器上运行。

rabbitmq-server的根目录中,使用 4 个调度程序线程启动服务器

make run-broker PLUGINS="rabbitmq_mqtt" RABBITMQ_CONFIG_FILE="rabbitmq.conf" RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="+S 4"

服务器使用以下 rabbitmq.conf 运行

mqtt.mailbox_soft_limit = 0
mqtt.tcp_listen_options.nodelay = true
mqtt.tcp_listen_options.backlog = 128
mqtt.tcp_listen_options.sndbuf = 87380
mqtt.tcp_listen_options.recbuf = 87380
mqtt.tcp_listen_options.buffer = 87380
classic_queue.default_version = 2

第一行仅在 RabbitMQ 3.12 版本中使用,并禁用过载保护,以确保所有 QoS 0 消息都传递到订阅者。

3.11 版本的基准测试使用 Git 标签 v3.11.10。3.12 版本的基准测试使用 Git 标签 v3.12.0-beta.2

QoS 1 的延迟和吞吐量

第一个基准测试比较了发送到 QoS 1 订阅者的 QoS 1 消息的延迟和吞吐量。

./mqtt-bm-latency -clients 100 -count 10000 -pubqos 1 -subqos 1 -size 100 -keepalive 120 -topic t

此基准测试使用 100 个 MQTT 客户端“对”以 1:1 的拓扑结构发送。换句话说,总共有 200 个 MQTT 客户端:100 个发布者和 100 个订阅者。每个发布者向单个订阅者发送 10,000 条消息。

所有 MQTT 客户端同时运行。但是,应用程序消息会从发布客户端同步发送到 RabbitMQ:每个发布者发送一条 QoS 1 消息,并等待收到来自 RabbitMQ 的PUBACK,然后再发送下一条消息。

订阅客户端也会对收到的每个PUBLISH数据包回复一个 PUBACK 数据包。

上述命令中的主题 t 只是一个主题前缀。每个发布客户端都会通过将自己的索引附加到给定主题来发布到不同的主题(例如,第一个发布客户端发布到主题 t-0,第二个发布到 t-1,依此类推)。

3.11 版本的结果

================= TOTAL PUBLISHER (100) =================
Total Publish Success Ratio: 100.000% (1000000/1000000)
Total Runtime (sec): 75.835
Average Runtime (sec): 75.488
Pub time min (ms): 0.167
Pub time max (ms): 101.331
Pub time mean mean (ms): 7.532
Pub time mean std (ms): 0.037
Average Bandwidth (msg/sec): 132.475
Total Bandwidth (msg/sec): 13247.470

================= TOTAL SUBSCRIBER (100) =================
Total Forward Success Ratio: 100.000% (1000000/1000000)
Forward latency min (ms): 0.177
Forward latency max (ms): 94.045
Forward latency mean std (ms): 0.032
Total Mean forward latency (ms): 6.737

3.12 版本的结果

================= TOTAL PUBLISHER (100) =================
Total Publish Success Ratio: 100.000% (1000000/1000000)
Total Runtime (sec): 55.955
Average Runtime (sec): 55.570
Pub time min (ms): 0.175
Pub time max (ms): 96.725
Pub time mean mean (ms): 5.550
Pub time mean std (ms): 0.031
Average Bandwidth (msg/sec): 179.959
Total Bandwidth (msg/sec): 17995.888

================= TOTAL SUBSCRIBER (100) =================
Total Forward Success Ratio: 100.000% (1000000/1000000)
Forward latency min (ms): 0.108
Forward latency max (ms): 51.421
Forward latency mean std (ms): 0.028
Total Mean forward latency (ms): 2.880

Total Publish Success RatioTotal Forward Success Ratio 验证了 RabbitMQ 处理了总共 100 个发布者 * 10,000 条消息 = 1,000,000 条消息

Total RuntimeAverage Runtime 显示了第一个巨大的性能差异:使用原生 MQTT,每个发布者在 55 秒内收到服务器的所有 10,000 个发布确认(PUBACK)。3.11 版本需要 75 秒。

Pub time mean mean 表明,平均发布者确认延迟从 3.11 版本的 7.532 毫秒下降了 1.982 毫秒或 26%,在 3.12 版本中降至 5.550 毫秒。

如图 7 所示,100 个并发客户端分别同步发布 MQTT QoS 1 消息到 RabbitMQ 的吞吐量从 3.11 版本的每秒 13,247 条消息提升到 3.12 版本的每秒 17,995 条消息,提升了 4,748 条消息/秒,即 36%。

Figure 7: Publish Throughput with QoS 1 (3.12 performs better because throughput is higher)
图 7:QoS 1 发布吞吐量(3.12 性能更好,因为吞吐量更高)

forward latency 指标代表端到端延迟,即从发布消息到客户端接收到消息所花费的时间。它是通过让发布者在每个消息负载中包含一个时间戳来测量的。

图 8 说明 总平均转发延迟 从 3.11 版本的 6.737 毫秒下降到 3.12 版本的 2.880 毫秒,下降了 3.857 毫秒,即 57%。

Figure 8: End-to-end latency with QoS 1 (3.12 performs better because latency is lower)
图 8:QoS 1 端到端延迟(3.12 性能更好,因为延迟更低)

QoS 0 延迟

第二个延迟基准测试与第一个非常相似,但使用 QoS 0 发布消息,并且订阅也使用 QoS 0。

./mqtt-bm-latency -clients 100 -count 10000 -pubqos 0 -subqos 0 -size 100 -keepalive 120 -topic t

由于客户端连接时 CleanSession=1,RabbitMQ 为每个订阅者创建一个 rabbit_mqtt_qos0_queue。因此跳过了队列处理过程。

此处省略了 3.11 和 3.12 版本的发布者结果,因为它们非常低(Pub time mean mean 为 5 微秒)。发布客户端将所有 100 万条消息一次性发送到服务器,而无需等待 PUBACK 响应。

3.11 版本的结果

================= TOTAL SUBSCRIBER (100) =================
Total Forward Success Ratio: 100.000% (1000000/1000000)
Forward latency min (ms): 3.907
Forward latency max (ms): 12855.600
Forward latency mean std (ms): 883.343
Total Mean forward latency (ms): 7260.071

3.12 版本的结果

================= TOTAL SUBSCRIBER (100) =================
Total Forward Success Ratio: 100.000% (1000000/1000000)
Forward latency min (ms): 0.461
Forward latency max (ms): 5003.936
Forward latency mean std (ms): 596.133
Total Mean forward latency (ms): 2426.867

总平均转发延迟 从 3.11 版本的 7,260.071 毫秒下降到 3.12 版本的 2,426.867 毫秒,下降了 4,833.204 毫秒,即 66%。

与 QoS 1 基准测试相比,3.11 版本中 7.2 秒和 3.12 版本中 2.4 秒的端到端延迟非常高,因为代理暂时被所有 100 万条 MQTT 消息同时淹没。

禁用信贷流(在 advanced.config 中设置 {credit_flow_default_credit, {0, 0}})无法改善 3.11 版本中的延迟。

留给感兴趣的读者的一项练习是,针对 3.12 版本运行相同的基准测试,同时禁用功能标志 rabbit_mqtt_qos0_queue,以查看跳过队列处理过程的延迟差异。

3.12 版本中的原生 MQTT 还有什么改进?

以下列表包含 3.12 版本中原生 MQTT 发生变化的杂项。

  • 实现了 MQTT 3.1.1 功能,允许 SUBACK 数据包包含错误返回码 (0x80)。
  • 删除了 AMQP 0.9.1 标头 x-mqtt-dup,因为在 3.11 版本之前,它在 MQTT 插件中的用法是错误的。如果您的 AMQP 0.9.1 客户端依赖于此标头,则这是一个重大更改。
  • MQTT 插件创建的所有队列都是持久的。这样做是为了简化将来 RabbitMQ 版本中向 Khepri 的过渡。
  • MQTT 插件为干净会话创建 排他(而不是 自动删除)队列。
  • pg 中实现了 MQTT 客户端 ID 跟踪(如果新客户端使用相同的客户端 ID 连接,则终止现有 MQTT 连接)。当启用功能标志 delete_ra_cluster_mqtt_node 时,将删除以前用于跟踪 MQTT 客户端 ID 的 Ra 集群。旧的 Ra 集群需要大量的内存,并且在一次断开太多客户端连接时会成为瓶颈。
  • 使用 全局计数器 实现了 Prometheus 指标。协议标签的值为 mqtt310mqtt311。此类指标的一个示例是:rabbitmq_global_messages_routed_total{protocol="mqtt311"} 10
  • MQTT 解析器得到了优化。
  • 从未发布过应用程序消息的 MQTT 客户端在 内存和磁盘警报 期间不会被阻塞,以便订阅者可以继续清空队列。

我应该将 RabbitMQ 用作 MQTT 代理吗?

原生 MQTT 启用了许多新的用例,因为它允许数量级更多的物联网设备连接到 RabbitMQ。用例范围广泛。

RabbitMQ 只是市场上众多 MQTT 代理之一。还有其他优秀的 MQTT 代理,其中一些能够处理比 RabbitMQ 更多的 MQTT 客户端连接,因为其他代理专门用于 MQTT。

RabbitMQ 的优势在于它不仅仅是一个 MQTT 代理。它是一个通用的多协议代理。RabbitMQ 的真正强大之处在于协议互操作性,以及灵活的路由和队列类型的选择。

举例来说:作为一家支付处理公司,您可以将分布在世界各地的数十万甚至数百万个现金终端连接到一个中央 RabbitMQ 集群,每个现金终端每隔几分钟(每当客户进行支付时)向 RabbitMQ 发送 MQTT QoS 1 消息。

这些包含支付数据的 MQTT 消息对您的业务至关重要。在任何情况下(节点故障、磁盘故障、网络分区)都不允许消息丢失。它们还需要由一个或多个微服务进行处理。

一种解决方案是将几个仲裁队列绑定到主题交换机,每个仲裁队列使用不同的路由键进行绑定,从而将负载分散到各个队列中。每个仲裁队列在 3 个或 5 个 RabbitMQ 节点上进行复制,通过使用底层的 Raft 共识算法提供高数据安全性。每个仲裁队列中的消息可以使用比 MQTT 更专业的信使协议(如 AMQP 0.9.1 或 AMQP 1.0)由不同的微服务进行处理。

或者,MQTT 插件可以将 MQTT 支付消息转发到一个 超级流,该流在多个 RabbitMQ 节点上复制。然后,其他客户端可以使用 RabbitMQ 流协议 从流中多次消费相同的消息,从而执行不同形式的数据分析。

RabbitMQ 提供的灵活性几乎是无限的,市场上没有其他消息代理能够提供如此广泛的协议互操作性、数据安全性、容错性、可扩展性和性能。

未来工作

3.12 版本中提供的原生 MQTT 是 RabbitMQ 作为 MQTT 代理使用的关键一步。但是,MQTT 之旅并没有就此结束。将来,我们计划(不保证!)添加以下 MQTT 改进

  • 添加对 MQTT 5.0 (v5) 的支持 - 社区强烈要求的功能。您可以在 PR #7263 中跟踪当前进度。截至 3.12 版本,RabbitMQ 仅支持 MQTT 3.1.1 (v4)MQTT 3.1.0 (v3)。原生 MQTT 一直是开发 MQTT 5.0 支持的先决条件。
  • 在升级处理数百万 MQTT 连接的 RabbitMQ 节点时提高弹性,并提高大量客户端断开连接的弹性(例如,由于负载均衡器崩溃)。
  • 支持 QoS 1 的巨大扇出,至少向所有设备发送一次消息。新的队列类型 rabbit_mqtt_qos0_queue 通过不在发送和接收 Erlang 进程中保留任何状态来改进大型扇出。但是,截至今天,发送需要队列确认的消息(MQTT 中的 QoS 1 或 AMQP 0.9.1 中的发布者确认)到数百万个队列时应格外小心:始终使用相同的发布者,并且非常少见地执行此操作(每隔几分钟)。请注意,1:1 拓扑和大型扇入对于 RabbitMQ 来说没有问题。

总结

尽管 RabbitMQ 在 3.11 版本中已支持 MQTT,但关于客户端连接数量的可扩展性一直很差,并且内存使用量过高。

3.12 版本中提供的原生 MQTT 将 RabbitMQ 变成一个 MQTT 代理。它允许将数百万个客户端连接到 RabbitMQ。即使您不打算连接这么多客户端,通过将您的 MQTT 工作负载升级到 3.12 版本,您也将大幅节省基础设施成本,因为内存使用量最多可下降 95%。

下一步,我们计划添加对 MQTT 5.0 的支持。

© 2024 RabbitMQ. All rights reserved.