MQTT 插件
概述
RabbitMQ 通过核心分发版中附带的插件支持 MQTT 版本 3.1、3.1.1 和 5.0。
本指南涵盖以下主题
- 如何启用插件
- 支持的 MQTT 功能 和 限制
- MQTT 插件实现概述
- 何时(不)使用 仲裁队列
- MQTT QoS 0 队列类型
- 用户和身份验证 和 远程连接
- 插件的关键可配置设置
- TLS 支持
- 虚拟主机
- 指标
- 性能和可扩展性检查清单
- 代理协议
- Sparkplug 支持
启用插件
MQTT 插件包含在 RabbitMQ 分发版中。在客户端可以成功连接之前,必须使用 rabbitmq-plugins 在所有集群节点上启用它。
rabbitmq-plugins enable rabbitmq_mqtt
支持的 MQTT 功能
RabbitMQ 支持大多数 MQTT 5.0 功能,包括以下功能
- QoS 0(最多一次) 和 QoS 1(至少一次) 发布和订阅
- TLS、OAuth 2.0
- 清洁 和非清洁会话
- 消息过期时间间隔
- 订阅标识符 和 订阅选项
- 遗嘱消息,包括 遗嘱延迟时间间隔
- 请求/响应,包括与其他协议(如 AMQP 0.9.1 和 AMQP 1.0)的互操作性
- 主题别名
- 保留 消息,但存在第 保留消息和存储 节中描述的限制。
- MQTT 通过 WebSocket,通过 RabbitMQ Web MQTT 插件。
MQTT 5.0 博客文章 提供了支持的 MQTT 5.0 功能的完整列表,包括其用法和实现细节。
MQTT 客户端可以与其他协议互操作。例如,如果这些消费者从绑定到 MQTT 主题交换(通过 mqtt.exchange
配置,默认为 amq.topic
)的队列中消费,则 MQTT 发布者可以将消息发送到 AMQP 0.9.1 或 AMQP 1.0 消费者。类似地,AMQP 0.9.1、AMQP 1.0 或 STOMP 发布者可以将消息发送到 MQTT 订阅者,如果发布者发布到 MQTT 主题交换。
限制
以下 MQTT 功能不受支持
- QoS 2(完全一次)
- 共享订阅
- 既 延迟 又 保留 的 遗嘱消息 不会被保留。
以下 MQTT 功能在支持时存在限制
- 保留消息仅存储和查询节点本地。请参见 保留消息和存储。
- 具有不同 QoS 级别 的重叠订阅可能会导致重复消息被传递。应用程序需要考虑到这一点。例如,当同一个 MQTT 客户端为主题过滤器
/sports/football/epl/#
创建 QoS 0 订阅,并为主题过滤器/sports/football/#
创建 QoS 1 订阅时,它将收到重复消息。
MQTT 插件的工作原理
RabbitMQ MQTT 插件针对 MQTT 3.1、3.1.1 和 5.0,支持广泛的 MQTT 客户端。它还使 MQTT 客户端能够与 AMQP 0-9-1、AMQP 1.0 和 STOMP 客户端互操作。还支持多租户。
将 MQTT 映射到 AMQP 0.9.1 模型
RabbitMQ 核心实现了 AMQP 0.9.1 协议。该插件建立在 AMQP 0.9.1 实体的基础上:交换、队列 和绑定。发布到 MQTT 主题的消息由 AMQP 0.9.1 主题交换路由。MQTT 订阅者从绑定到主题交换的队列中消费。
MQTT 插件为每个 MQTT 订阅者创建一个专用队列。更准确地说,每个 MQTT 会话可能存在 0、1 或 2 个队列
- 如果 MQTT 客户端从未发送 SUBSCRIBE 数据包,则 MQTT 会话有 0 个队列。MQTT 客户端仅发布消息。
- 如果 MQTT 客户端使用相同的 QoS 级别创建了一个或多个订阅,则 MQTT 会话有 1 个队列。
- 如果 MQTT 客户端使用 QoS 0 和 QoS 1 两种 QoS 级别创建了订阅,则 MQTT 会话有 2 个队列。
列出队列时,您将看到队列命名模式 mqtt-subscription-<MQTT 客户端 ID>qos[0|1]
,其中 <MQTT 客户端 ID>
是 MQTT 客户端标识符,[0|1]
对于 QoS 0 订阅为 0
,对于 QoS 1 订阅为 1
。每个 MQTT 订阅者拥有一个单独的队列,使每个 MQTT 订阅者都能接收其应用程序消息的副本。
该插件为 MQTT 订阅客户端透明地创建队列。MQTT 规范没有定义队列的概念,MQTT 客户端不知道这些队列的存在。队列是 RabbitMQ 如何实现 MQTT 协议的实现细节。
队列类型
MQTT 客户端可以将消息发布到任何队列类型。为此,经典队列、仲裁队列 或 流 必须绑定到主题交换,其绑定键与 PUBLISH 数据包的主题匹配。
MQTT 插件为每个 MQTT 订阅者创建经典队列、仲裁队列或 MQTT QoS 0 队列。默认情况下,该插件创建经典队列。
该插件可以配置为为订阅者的 MQTT 会话持续时间比其 MQTT 网络连接时间长的订阅者创建仲裁队列(而不是经典队列)。这将在 仲裁队列 节中说明。
如果启用 功能标志 rabbit_mqtt_qos0_queue
,该插件将为 QoS 0 订阅者创建 MQTT QoS 0 队列,其 MQTT 会话持续时间与其 MQTT 网络连接时间相同。这将在 MQTT QoS 0 队列类型 节中说明。
队列属性 和 参数
从 RabbitMQ 3.12 开始,MQTT 插件创建的所有队列
- 是 持久的,即队列元数据存储在磁盘上。
- 如果 MQTT 会话持续时间与其 MQTT 网络连接时间相同,则为 排他的。在这种情况下,RabbitMQ 将在网络连接(和会话)结束时删除 MQTT 客户端的所有状态,包括其队列。只有订阅的 MQTT 客户端才能从其队列中消费。
- 不是
auto-delete
。例如,如果 MQTT 客户端订阅了一个主题,然后取消订阅,该队列不会被删除。但是,该队列将在 MQTT 会话结束时被删除。 - 如果 MQTT 会话最终过期(即会话过期未被 RabbitMQ 操作员禁用,见下文)并持续时间超过 MQTT 网络连接时间,则具有设置的 队列 TTL(队列参数
x-expires
)。队列 TTL(以毫秒为单位)由 MQTT 客户端在 CONNECT 数据包中请求的 会话过期时间间隔(以秒为单位)和服务器端配置的mqtt.max_session_expiry_interval_seconds
之间的最小值确定。
mqtt.max_session_expiry_interval_seconds
的默认值为 86400(1 天)。RabbitMQ 操作员可以通过将此参数设置为 0
来强制所有 MQTT 会话在网络连接结束时立即结束。
RabbitMQ 操作员可以通过将此参数设置为 infinity
来允许 MQTT 会话永久持续。这样做存在风险:寿命短的客户端如果未使用清洁会话,可能会留下队列和消息,这将消耗资源并需要手动清理。
RabbitMQ 在 MQTT 会话结束时删除 MQTT 客户端的任何状态。此状态包括客户端的队列(包括 QoS 0 和 QoS 1 消息)和队列绑定(即客户端的订阅)。
主题级别分隔符和通配符
MQTT 协议规范将斜杠 ("/") 定义为 主题级别分隔符,而 AMQP 0-9-1 将点 (".") 定义为主题级别分隔符。此插件在幕后翻译模式,以连接两者。
例如,MQTT 主题 cities/london
变成 AMQP 0.9.1 主题 cities.london
,反之亦然。因此,当 MQTT 客户端使用主题 cities/london
发布消息时,如果 AMQP 0.9.1 客户端想要接收该消息,则应从其队列到主题交换机创建绑定,并使用绑定键 cities.london
。
反之,当 AMQP 0.9.1 客户端使用主题 cities.london
发布消息时,如果 MQTT 客户端想要接收该消息,则应使用主题过滤器 cities/london
创建 MQTT 订阅。
这有一个重要的限制:MQTT 主题中包含点号将无法按预期工作,应避免使用,同样,包含斜杠的 AMQP 0-9-1 路由和绑定键也应避免使用。
此外,MQTT 将加号 ("+") 定义为 单级通配符,而 AMQP 0.9.1 将星号 ("*") 定义为匹配单个词。
MQTT | AMQP 0.9.1 | 描述 |
---|---|---|
/ (斜杠) | . (点) | 主题级分隔符 |
+ (加号) | * (星号) | 单级通配符(匹配单个词) |
# (井号) | # (井号) | 多级通配符(匹配零个或多个词) |
使用 Quorum 队列
使用 mqtt.durable_queue_type
选项,可以选择使用 Quorum 队列 用于 MQTT 会话持续时间超过其 MQTT 网络连接的订阅者。
此值只能在任何客户端声明持久订阅之前为新集群启用。由于队列类型在声明后无法更改,如果此设置的值为现有集群更改,则具有现有持久状态的客户端将遇到队列类型不匹配错误,并且无法订阅。
下面是选择使用 Quorum 队列的 rabbitmq.conf 示例。
# must ONLY be enabled for new clusters before any clients declare durable subscriptions
mqtt.durable_queue_type = quorum
目前,此设置适用于使用
- QoS 1 订阅的所有 MQTT 客户端,以及
- 使用 会话过期间隔 大于 0(MQTT 5.0)或将 CleanSession 设置为 0(MQTT 3.1.1)连接的客户端。
第二个条件意味着 MQTT 会话持续时间超过 MQTT 网络连接。
虽然 Quorum 队列专为数据安全性和从副本故障中可预测的有效恢复而设计,但它们也有一些缺点。根据定义,Quorum 队列需要集群中至少三个副本。因此,Quorum 队列声明和删除需要更长时间,并且不适合具有 高客户端连接抖动 或具有大量(数十万)订阅者的环境。
对于少数(数百个)寿命更长的客户端来说,Quorum 队列非常适合,这些客户端实际上非常关心数据安全。
MQTT QoS 0 队列类型
如果满足以下三个条件,MQTT 插件将创建 MQTT QoS 0 队列。
- 特性标志
rabbit_mqtt_qos0_queue
已启用。 - MQTT 客户端使用 QoS 0 订阅。
- MQTT 5.0 客户端使用 会话过期间隔 0 连接,或 MQTT 3.1.1 客户端使用 CleanSession 设置为 1 连接。
第三个条件意味着 MQTT 会话仅持续网络连接时间。
MQTT QoS 0 队列类型可以看作是“伪”或“虚拟”队列:它与其他队列类型(经典队列、Quorum 队列和流)非常不同,因为这种新的队列类型既不是单独的 Erlang 进程,也不在磁盘上存储消息。相反,此队列类型是 Erlang 进程邮箱的子集。MQTT 消息直接发送到订阅客户端的 MQTT 连接进程。换句话说,MQTT 消息将发送到任何“在线”MQTT 订阅者。
更准确地说,应该认为队列被“跳过”。将直接将消息发送到 MQTT 连接进程实现为队列类型是为了简化消息路由和协议互操作性,这样消息不仅可以从 MQTT 发布连接进程发送,还可以从 AMQP 0.9.1 通道 进程发送。后者使从 AMQP 0.9.1、AMQP 1.0 或 STOMP 客户端直接将消息发送到 MQTT 订阅者连接进程成为可能,跳过专用队列进程。
使用 MQTT QoS 0 队列类型的优势是
- 支持更大扇出,例如从“云”(RabbitMQ)向所有设备(MQTT 客户端)发送消息。
- 降低 内存使用量
- 降低发布者确认延迟
- 降低端到端延迟
由于 MQTT QoS 0 队列类型没有流量控制,因此 MQTT 消息可能会在 MQTT 连接进程邮箱中比从 MQTT 连接进程传递到 MQTT 订阅客户端更快地到达。当 MQTT 订阅客户端与 RabbitMQ 之间的网络连接很差或在许多发布者过载单个 MQTT 订阅客户端的大扇入场景中时,可能会发生这种情况。
过载保护
为了防止由于 MQTT QoS 0 消息在 MQTT 连接进程邮箱中堆积而导致的 内存使用量 过高,RabbitMQ 会故意从 MQTT QoS 0 队列中删除 QoS 0 消息,前提是两个条件都为真。
- MQTT 连接进程邮箱中的消息数量超过配置的
mqtt.mailbox_soft_limit
(默认值为 200),并且 - 发送到 MQTT 客户端的套接字处于繁忙状态(由于 TCP 反压而没有足够快地发送)。
请注意,进程邮箱中可能存在其他消息(例如从 MQTT 订阅客户端发送到 RabbitMQ 的应用程序消息或来自另一个队列类型到 MQTT 连接进程的确认),这些消息显然不会被删除。但是,这些其他消息也会影响 mqtt.mailbox_soft_limit
。
将 mqtt.mailbox_soft_limit
设置为 0 将禁用过载保护机制,这意味着 RabbitMQ 绝不会故意删除 QoS 0 消息。将 mqtt.mailbox_soft_limit
设置为非常高的值将降低故意删除 QoS 0 消息的可能性,同时增加导致集群范围内存警报的风险(尤其是当消息有效负载很大或存在许多类型的 rabbit_mqtt_qos0_queue
过载队列时)。
mqtt.mailbox_soft_limit
可以看作是 队列长度限制(虽然不完全相同,因为如前所述,Erlang 进程邮箱可以包含除 MQTT 应用程序消息之外的其他消息)。这就是为什么配置键 mqtt.mailbox_soft_limit
包含单词 soft
。描述的过载保护机制大致对应于经典队列和 Quorum 队列中存在的 溢出行为 drop-head
。
给定 RabbitMQ 节点报告的以下 Prometheus 指标显示了在该节点的整个生命周期中,在所有类型的 rabbit_mqtt_qos0_queue
队列中总共删除了多少 QoS 0 消息。
rabbitmq_global_messages_dead_lettered_maxlen_total{queue_type="rabbit_mqtt_qos0_queue",dead_letter_strategy="disabled"} 0
本机 MQTT 博客文章更详细地介绍了 MQTT QoS 0 队列类型。
用户和身份验证
只要 MQTT 客户端具有现有用户的凭据集,该用户具有相应的权限,它们就可以连接。
要使 MQTT 连接成功,它必须成功进行身份验证,并且用户必须对插件使用的虚拟主机具有 适当的权限(见下文)。
MQTT 客户端可以在连接时(通常会)指定一组凭据。凭据可以是用户名和密码对,也可以是 x.509 证书(见下文)。
插件支持匿名身份验证,但强烈建议不要使用匿名身份验证,并且它会受到某些限制(列于下方),这些限制默认情况下是为了确保合理的安全性而实施的。
可以使用 rabbitmqctl、管理 UI 或 HTTP API 管理用户及其权限。
例如,以下命令为具有对此插件使用的默认 虚拟主机 的完全访问权限的 MQTT 连接创建一个新用户。
# username and password are both "mqtt-test"
rabbitmqctl add_user mqtt-test mqtt-test
rabbitmqctl set_permissions -p "/" mqtt-test ".*" ".*" ".*"
rabbitmqctl set_user_tags mqtt-test management
请注意,用户名中不能包含冒号。
本地与远程客户端连接
当 MQTT 客户端不提供任何登录凭据时,插件默认使用 guest
帐户,该帐户不允许非 localhost
连接。从远程主机连接时,以下选项可确保远程客户端可以成功连接。
- 创建一个或多个新用户,授予他们对 MQTT 插件使用的虚拟主机的完全权限,并使从远程主机连接的客户端使用这些凭据。这是推荐的选项。
- 将
anonymous_login_user
和anonymous_login_pass
设置为具有 适当权限 的非guest
用户。
匿名连接
MQTT 支持可选身份验证(客户端可能不提供任何凭据)。因此,对于匿名连接,将使用一组默认凭据。
anonymous_login_user
和 anonymous_login_pass
配置键用于指定凭据。
anonymous_login_user = some-user
anonymous_login_pass = s3kRe7
可以禁用匿名连接。
mqtt.allow_anonymous = false
anonymous_login_user = none
如果将 mqtt.allow_anonymous
键设置为 false
,则客户端必须提供凭据。
强烈建议不要使用匿名连接,并且它会受到某些限制(见上文),这些限制默认情况下是为了确保合理的安全性而实施的。
插件配置
这是一个示例 配置,它演示了多个 MQTT 插件设置。
mqtt.listeners.tcp.default = 1883
## Default MQTT with TLS port is 8883
# mqtt.listeners.ssl.default = 8883
# anonymous connections, if allowed, will use the default
# credentials specified here
mqtt.allow_anonymous = true
mqtt.vhost = /
mqtt.exchange = amq.topic
mqtt.prefetch = 10
# 24 hours by default
mqtt.max_session_expiry_interval_seconds = 86400
TCP 监听器
当没有指定配置时,MQTT 插件将在端口 1883 上的所有接口上监听,并且默认用户登录/密码为 guest
/guest
。
要更改监听器端口,请编辑您的 配置文件,使其包含 rabbitmq_mqtt
应用程序的 tcp_listeners
变量。
例如,一个将监听器端口更改为 12345 的最小配置将如下所示。
mqtt.listeners.tcp.1 = 12345
而将监听器更改为仅在本地主机(对于 IPv4 和 IPv6)上监听的配置将如下所示。
mqtt.listeners.tcp.1 = 127.0.0.1:1883
mqtt.listeners.tcp.2 = ::1:1883
TCP 监听器选项
插件支持 TCP 监听器选项配置。
这些设置使用公共前缀 mqtt.tcp_listen_options
,并控制诸如 TCP 缓冲区大小、入站 TCP 连接队列长度、是否启用 TCP 保持活动 等内容。有关详细信息,请参阅 网络指南。
mqtt.listeners.tcp.1 = 127.0.0.1:1883
mqtt.listeners.tcp.2 = ::1:1883
mqtt.tcp_listen_options.backlog = 4096
mqtt.tcp_listen_options.buffer = 131072
mqtt.tcp_listen_options.recbuf = 131072
mqtt.tcp_listen_options.sndbuf = 131072
mqtt.tcp_listen_options.keepalive = true
mqtt.tcp_listen_options.nodelay = true
mqtt.tcp_listen_options.exit_on_close = true
mqtt.tcp_listen_options.send_timeout = 120
TLS 支持
要使用 TLS 进行 MQTT 连接,需要在代理中配置 TLS。要启用支持 TLS 的 MQTT 连接,请使用mqtt.listeners.ssl.*
配置键添加用于 MQTT 的 TLS 侦听器。
该插件将使用核心 RabbitMQ 服务器证书和密钥(与 AMQP 0-9-1 和 AMQP 1.0 侦听器相同)。
ssl_options.cacertfile = /path/to/ca_certificate.pem
ssl_options.certfile = /path/to/server_certificate.pem
ssl_options.keyfile = /path/to/server_key.pem
ssl_options.verify = verify_peer
ssl_options.fail_if_no_peer_cert = true
# default TLS-enabled port for MQTT connections
mqtt.listeners.ssl.default = 8883
mqtt.listeners.tcp.default = 1883
请注意,RabbitMQ 默认情况下拒绝 SSLv3 连接,因为该协议已知存在安全漏洞。
有关详细信息,请参阅TLS 配置指南。
虚拟主机
RabbitMQ 在核心上是一个多租户系统,每个连接都属于一个虚拟主机。一些消息传递协议具有 vhost 的概念,而另一些则没有。MQTT 属于后者。因此,MQTT 插件需要提供一种将连接映射到 vhost 的方法。
vhost
选项控制适配器默认连接到的 RabbitMQ vhost。仅当在建立连接期间未提供 vhost 时才会参考vhost
配置。有多种(可选)方法可以指定客户端将连接到的 vhost。
端口到虚拟主机映射
第一个方法是将 MQTT 插件(TCP 或 TLS)侦听器端口映射到 vhost。通过mqtt_port_to_vhost_mapping
全局运行时参数指定映射。让我们看一下以下插件配置
mqtt.listeners.tcp.1 = 1883
mqtt.listeners.tcp.2 = 1884
mqtt.listeners.ssl.1 = 8883
mqtt.listeners.ssl.2 = 8884
# (other TLS settings are omitted for brevity)
mqtt.vhost = /
请注意,插件侦听端口 1883、1884、8883 和 8884。假设您希望连接到端口 1883 和 8883 的客户端连接到vhost1
虚拟主机,而连接到端口 1884 和 8884 的客户端连接到vhost2
虚拟主机。可以通过使用rabbitmqctl
设置mqtt_port_to_vhost_mapping
全局参数来创建端口到 vhost 的映射。
- bash
- PowerShell
- HTTP API
rabbitmqctl set_global_parameter mqtt_port_to_vhost_mapping \
'{"1883":"vhost1", "8883":"vhost1", "1884":"vhost2", "8884":"vhost2"}'
rabbitmqctl.bat set_global_parameter mqtt_port_to_vhost_mapping ^
"{""1883"":""vhost1"", ""8883"":""vhost1"", ""1884"":""vhost2"", ""8884"":""vhost2""}"
PUT /api/global-parameters/mqtt_port_to_vhost_mapping
# => {"value": {"1883":"vhost1", "8883":"vhost1", "1884":"vhost2", "8884":"vhost2"}}
如果对于给定端口没有映射(因为在mqtt_port_to_vhost_mapping
全局参数 JSON 文档中找不到该端口,或者根本没有设置全局参数),则插件将尝试从用户名中提取虚拟主机(见下文),并最终使用vhost
插件配置选项。
代理在连接时查询mqtt_port_to_vhost_mapping
全局参数值。如果该值发生更改,连接的客户端不会收到通知或断开连接。他们需要重新连接以切换到新的虚拟主机。
虚拟主机作为用户名的一部分
另一种更具体的方式是在连接时将 vhost 添加到用户名前面,并用冒号分隔。
例如,使用以下方式连接
/:guest
等效于默认 vhost 和用户名,而
mqtt-vhost:mqtt-username
表示连接到具有用户名mqtt-username
的 vhostmqtt-host
。
在用户名中指定虚拟主机优先于使用mqtt_port_to_vhost_mapping
全局参数指定的端口到 vhost 的映射。
使用 TLS/x509 客户端证书进行身份验证
该插件可以通过从客户端的 TLS (x509) 证书中提取名称来对支持 TLS 的连接进行身份验证,而无需使用密码。
出于安全考虑,服务器必须配置 TLS 选项,将fail_if_no_peer_cert
设置为true
,并将verify
设置为verify_peer
,以强制所有 TLS 客户端都具有可验证的客户端证书。
要启用此功能,请为rabbitmq_mqtt
应用程序将ssl_cert_login
设置为true
。例如
mqtt.ssl_cert_login = true
默认情况下,这会将用户名设置为证书主体区分名称的 RFC4514 式字符串形式,类似于 OpenSSL 的“-nameopt RFC2253”选项生成的字符串。
要使用通用名称,请添加
ssl_cert_login_from = common_name
到您的配置中。
请注意
- 已验证的用户必须存在于已配置的身份验证/授权后端中。
- 客户端**不能**提供用户名和密码。
您可以使用mqtt_default_vhosts
全局运行时参数为客户端证书可选地指定虚拟主机。此全局参数的值必须包含一个 JSON 文档,该文档将证书主体区分名称映射到其目标虚拟主机。让我们看看如何将 2 个证书O=client,CN=guest
和O=client,CN=rabbit
分别映射到vhost1
和vhost2
虚拟主机。
全局参数可以使用以下方法设置
- bash
- PowerShell
- HTTP API
rabbitmqctl set_global_parameter mqtt_default_vhosts \
'{"O=client,CN=guest": "vhost1", "O=client,CN=rabbit": "vhost2"}'
rabbitmqctl set_global_parameter mqtt_default_vhosts ^
"{""O=client,CN=guest"": ""vhost1"", ""O=client,CN=rabbit"": ""vhost2""}'
PUT /api/global-parameters/mqtt_default_vhosts
# => {"value": {"O=client,CN=guest": "vhost1", "O=client,CN=rabbit": "vhost2"}}
请注意
- 如果找不到证书的虚拟主机(因为在
mqtt_default_vhosts
全局参数 JSON 文档中找不到证书主体 DN,或者根本没有设置全局参数),则将使用vhost
插件配置选项指定的虚拟主机。 - 代理在连接时查询
mqtt_default_vhosts
全局参数值。如果该值发生更改,连接的客户端不会收到通知或断开连接。他们需要重新连接以切换到新的虚拟主机。 - 使用
mqtt_default_vhosts
全局参数的证书到 vhost 的映射被认为比使用mqtt_port_to_vhost_mapping
全局参数的端口到 vhost 的映射更具体,因此优先于后者。
使用从客户端证书中提取的 client_id 进行身份验证
可以配置节点以验证 MQTT 连接上设置的client_id
是否与客户端证书中找到的client_id
匹配。
如果匹配,RabbitMQ 将通过将用户的身份以及client_id
传递给已配置的身份验证后端来继续用户的身份验证。一些身份验证后端(例如rabbitmq_auth_backend_http
)可能会除了username
之外还使用client_id
凭据来进行身份验证和/或授权决策。如果client_id
不匹配,RabbitMQ 将关闭连接,原因代码为2
,表示“服务器不允许使用客户端标识符”。
为此,RabbitMQ 必须首先获得有关如何从证书中获取client_id
的指示。这是通过mqtt.ssl_cert_client_id_from
配置键完成的。可接受的值为
distinguished_name
:这是证书的 DN 或区分名称subject_alternative_name
:主题备用名称或 SAN,来自证书的扩展部分。由于存在不同类型的主题备用名称,因此可能需要指定类型
如果mqtt.ssl_cert_client_id_from
设置为subject_alternative_name
,则可以使用mqtt.ssl_cert_client_id_san_type
配置备用名称的类型。默认类型(如果省略设置,则使用)为dns
。可接受的值为
dns
ip
email
uri
other_name
证书可以包含相同类型的多个 SAN 字段,例如两个备用 DNS 名称。如果是这种情况,请使用mqtt.ssl_cert_client_id_san_index
配置键指定要使用的索引。默认情况下,RabbitMQ 将选择第一个值,即mqtt.ssl_cert_client_id_san_index
的默认值为0
。
以下示例演示了从证书区分名称中提取用户名,并从类型为uri
的第一个 SAN(主题备用名称)中提取client_id
的情况
ssl_cert_login_from = distinguished_name
mqtt.ssl_cert_client_id_from = subject_alternative_name
mqtt.ssl_cert_client_id_san_type = uri
流量控制
prefetch
选项控制将要传送的具有 QoS=1 的未确认 PUBLISH 数据包的最大数量。此选项的解释方式与消费者预取相同。
MQTT 5.0 客户端可以通过在CONNECT
数据包中设置接收最大值来定义更小的数字。
自定义交换机
exchange
选项确定将 MQTT 客户端发送的消息发布到哪个交换机。必须在客户端发布任何消息之前创建该交换机。该交换机应为主题交换机。
默认主题交换机amq.topic
已预先声明:因此,它在 RabbitMQ 启动时存在。
保留的消息和存储
该插件支持保留消息,但存在本节中描述的限制。消息存储实现是可插拔的,该插件开箱即用地提供两个实现
- ETS 为基础(在内存中),在模块
rabbit_mqtt_retained_msg_store_ets
中实现 - DETS 为基础(在磁盘上),在模块
rabbit_mqtt_retained_msg_store_dets
中实现
两种实现都具有限制和权衡。对于第一种,可以保留的消息数量上限由 RAM 限制。对于第二种,每个 vhost 的限制为 2 GB。两者都是**节点本地**:保留的消息既不会复制到远程集群节点,也不会从远程集群节点查询。
一个有效的示例如下:MQTT 客户端将保留消息发布到具有主题topic/1
的节点 A。之后,另一个客户端在节点 A 上使用主题过滤器topic/1
订阅。新的订阅者将收到保留消息。
但是,如果客户端在节点 A 上发布保留消息,而另一个客户端随后在节点 B 上订阅,则该订阅客户端将不会收到存储在节点 A 上的任何保留消息。
此外,如果主题过滤器包含通配符(多级通配符字符“#”或单级通配符字符“+”),则不会发送任何保留消息。
要配置存储,请使用mqtt.retained_message_store
配置键
## use DETS (disk-based) store for retained messages
mqtt.retained_message_store = rabbit_mqtt_retained_msg_store_dets
## only used by DETS store (in milliseconds)
mqtt.retained_message_store_dets_sync_interval = 2000
该值必须是实现存储的模块
rabbit_mqtt_retained_msg_store_ets
用于基于 RAM 的存储rabbit_mqtt_retained_msg_store_dets
用于基于磁盘的存储(这是默认值)。
这些实现适用于开发,但有时不适用于生产环境。MQTT 规范没有定义保留消息存储的一致性或复制要求,因此 RabbitMQ 允许使用自定义存储来满足特定环境的一致性和可用性需求。例如,基于Riak 和Cassandra 的存储将适合大多数生产环境,因为这些数据存储提供可调整的一致性。
消息存储必须实现rabbit_mqtt_retained_msg_store
行为。
指标
Prometheus
此插件会发出全局计数器中列出的 Prometheus 指标。
对于 Prometheus 标签protocol
,其值为mqtt310
、mqtt311
和mqtt50
,具体取决于 MQTT 客户端使用的是 MQTT 3.1、MQTT 3.1.1 还是 MQTT 5.0。
Prometheus 标签 queue_type
的值根据 MQTT 客户端消费的队列类型而定,分别是 rabbit_classic_queue
、rabbit_quorum_queue
和 rabbit_mqtt_qos0_queue
。 (请注意,尽管 MQTT 客户端可以将消息发布到 流,但它们永远不会直接从流中消费。)
RabbitMQ 管理 API
管理 API 提供了 MQTT 连接的指标(例如,来自/到客户端的网络流量)以及经典队列和仲裁队列的指标(例如,它们包含多少消息)。 但是,从 3.12 版本开始,与 AMQP 0.9.1 通道 相关的管理 API 指标(例如,消息速率)不再可用。
性能和可扩展性检查清单
MQTT 是物联网 (IoT) 的标准协议。 常见的 IoT 工作负载是许多 MQTT 设备定期将传感器数据发布到 MQTT 代理。 可能有数十万甚至数百万个 IoT 设备连接到 MQTT 代理。 博客文章 原生 MQTT 演示了此类工作负载的性能基准。
本节旨在提供一个非详尽的清单,其中包含一些提示和技巧,以将 RabbitMQ 配置为支持大量客户端连接的高效 MQTT 代理。
- 在 管理插件 中设置
management_agent.disable_metrics_collector = true
以禁用指标收集。 RabbitMQ 管理插件并非为过度收集指标而设计。 事实上,通过管理 API 传递指标已 弃用。 相反,请使用为收集和查询大量指标而设计的工具:Prometheus。 - 与 QoS 1 相比,具有 QoS 0 的 MQTT 数据包和订阅提供更好的性能。 与 AMQP 0.9.1 和 AMQP 1.0 不同,MQTT 并非为最大限度地提高吞吐量而设计:例如,没有 多确认。 每个具有 QoS 1 的 PUBLISH 数据包都需要单独确认。
- 如 TCP 监听器选项 部分所述,减小 TCP 缓冲区大小。
这大大减少了在具有大量并发连接的客户端的环境中的 内存使用量。
- 主题级别较少(在 MQTT 主题和 MQTT 主题过滤器中)比主题级别较多执行得更好。 例如,如果可能,最好将您的主题结构化为
city/name
而不是continent/country/city/name
。 主题过滤器中的每个主题级别当前都会在 RabbitMQ 使用的数据库中创建自己的条目。 因此,当主题级别较少时,创建和删除许多订阅会更快。 此外,路由具有较少主题级别的消息速度更快。 - 在订阅更新频繁的工作负载中,增加 Mnesia 配置参数
dump_log_write_threshold
(例如,RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="-mnesia dump_log_write_threshold 20000"
)。 - 连接许多客户端时,增加最大 Erlang 进程数(例如,
RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="+P 10000000
)和最大打开端口数(例如,ERL_MAX_PORTS=10000000
)。
代理协议
MQTT 插件支持 代理协议。 此功能默认情况下处于禁用状态,要为 MQTT 客户端启用它,请执行以下操作
mqtt.proxy_protocol = true
有关代理协议的更多信息,请参阅 网络指南。
Sparkplug 支持
Sparkplug 是一种规范,它为 MQTT 系统的设计提供指导。 在 Sparkplug 中,MQTT 主题必须以 spAvM.N
或 spBvM.N
开头,其中 M
和 N
是整数。 这不幸地与 RabbitMQ MQTT 插件 将 MQTT 主题转换为 AMQP 0.9.1 路由键 的方式冲突。
为了解决这个问题,可以将 sparkplug
配置项设置为 true
mqtt.sparkplug = true
启用 Sparkplug 支持后,MQTT 插件不会转换主题名称中的 spAvM.N
/spBvM.N
部分。
限制
不支持 QoS 2
QoS 2 订阅将被视为 QoS 1 订阅。
重叠订阅
来自同一客户端的重叠订阅(例如,/sports/football/epl/#
和 /sports/football/#
)可能会导致重复消息被传递。 应用程序需要考虑这一点。
保留消息存储
请参阅上面的保留消息。 不同的保留消息存储具有不同的优势、权衡和局限性。
禁用插件
在节点上禁用插件或从集群中删除节点之前,必须使用 rabbitmqctl
对其进行退役。
rabbitmqctl decommission_mqtt_node <node>;
保留的消息和存储
该插件支持保留消息。 消息存储实现是可插拔的,该插件开箱即用地提供两种实现
- 基于 ETS(内存中),在
rabbit_mqtt_retained_msg_store_ets
模块中实现 - 基于 DETS(磁盘上),在
rabbit_mqtt_retained_msg_store_dets
中实现
这两种实现都有局限性和权衡。 对于第一个,可以保留的消息的最大数量受 RAM 的限制。 对于第二个,每个 vhost 的限制为 2 GB。 两者都是节点本地的(在一个代理节点上保留的消息不会复制到集群中的其他节点)。
要配置存储,请使用 rabbitmq_mqtt.retained_message_store
配置键
mqtt.allow_anonymous = true
mqtt.vhost = /
mqtt.exchange = amq.topic
mqtt.max_session_expiry_interval_seconds = 1800
mqtt.prefetch = 10
## use DETS (disk-based) store for retained messages
mqtt.retained_message_store = rabbit_mqtt_retained_msg_store_dets
## only used by DETS store
mqtt.retained_message_store_dets_sync_interval = 2000
mqtt.listeners.ssl = none
mqtt.listeners.tcp.default = 1883
该值必须是实现存储的模块
rabbit_mqtt_retained_msg_store_ets
用于基于 RAM 的存储rabbit_mqtt_retained_msg_store_dets
用于基于磁盘的
这些实现适合开发,但有时不适合生产需求。 MQTT 3.1 规范没有定义保留消息存储的一致性或复制要求,因此 RabbitMQ 允许使用自定义存储来满足特定环境的一致性和可用性需求。 例如,基于 Riak 和 Cassandra 的存储适合大多数生产环境,因为这些数据存储提供 可调一致性。
消息存储必须实现rabbit_mqtt_retained_msg_store
行为。