跳到主要内容
版本:4.1

MQTT 插件

概述

RabbitMQ 通过核心发行版中包含的插件支持 MQTT 版本 3.13.1.15.0

本指南涵盖以下主题

启用插件

MQTT 插件包含在 RabbitMQ 发行版中。在客户端可以成功连接之前,必须在所有集群节点上使用 rabbitmq-plugins 启用它

rabbitmq-plugins enable rabbitmq_mqtt

支持的 MQTT 功能

RabbitMQ 支持大多数 MQTT 5.0 功能,包括以下内容

MQTT 5.0 博客文章 提供了支持的 MQTT 5.0 功能的完整列表,包括其用法和实现细节。

MQTT 客户端可以与其他协议互操作。例如,如果 MQTT 发布者将消息发送到从绑定到 MQTT 主题交换机(通过 mqtt.exchange 配置,默认为 amq.topic)的队列中消费的 AMQP 0.9.1 或 AMQP 1.0 消费者,则可以实现互操作。同样,如果 AMQP 0.9.1、AMQP 1.0 或 STOMP 发布者发布到 MQTT 主题交换机,则可以将消息发送到 MQTT 订阅者。

MQTT 插件的工作原理

RabbitMQ MQTT 插件的目标是 MQTT 3.1、3.1.1 和 5.0,支持广泛的 MQTT 客户端。它还使 MQTT 客户端可以与 AMQP 0-9-1、AMQP 1.0 和 STOMP 客户端互操作。还支持多租户。

将 MQTT 映射到 AMQP 0.9.1 模型

RabbitMQ 核心实现了 AMQP 0.9.1 协议。该插件构建在 AMQP 0.9.1 实体之上:交换机队列 和绑定。发布到 MQTT 主题的消息由 AMQP 0.9.1 主题交换机路由。MQTT 订阅者从绑定到主题交换机的队列中消费。

MQTT 插件为每个 MQTT 订阅者创建一个专用队列。更准确地说,每个 MQTT 会话可能有 0、1 或 2 个队列

  • 如果 MQTT 客户端从不发送 SUBSCRIBE 数据包,则 MQTT 会话有 0 个队列。MQTT 客户端仅发布消息。
  • 如果 MQTT 客户端使用相同的 QoS 级别创建一个或多个订阅,则 MQTT 会话有 1 个队列。
  • 如果 MQTT 客户端使用 QoS 0 和 QoS 1 级别创建订阅,则 MQTT 会话有 2 个队列。

当列出队列时,您将观察到队列命名模式 mqtt-subscription-<MQTT 客户端 ID>qos[0|1],其中 <MQTT 客户端 ID> 是 MQTT 客户端标识符,而 [0|1] 对于 QoS 0 订阅为 0,对于 QoS 1 订阅为 1。为每个 MQTT 订阅者设置单独的队列允许每个 MQTT 订阅者接收其自己的应用程序消息副本。

插件为 MQTT 订阅客户端透明地创建队列。MQTT 规范未定义队列的概念,MQTT 客户端也不知道这些队列的存在。队列是 RabbitMQ 实现 MQTT 协议的实现细节。

队列类型

MQTT 客户端可以将消息发布到任何队列类型。为此,经典队列仲裁队列 必须绑定到主题交换机,绑定键与 PUBLISH 数据包的主题匹配。

MQTT 插件为每个 MQTT 订阅者创建一个经典队列、仲裁队列或 MQTT QoS 0 队列。默认情况下,插件会创建一个经典队列。

可以配置插件为 MQTT 会话持续时间长于 MQTT 网络连接的订阅者创建仲裁队列(而不是经典队列)。这在 仲裁队列 部分中进行了解释。

如果启用了 功能标志 rabbit_mqtt_qos0_queue,则插件为 MQTT 会话持续时间与 MQTT 网络连接时间相同的 QoS 0 订阅者创建 MQTT QoS 0 队列。这在 MQTT QoS 0 队列类型 部分中进行了解释。

队列属性参数

自 RabbitMQ 3.12 起,MQTT 插件创建的所有队列

  • 都是 持久的,即队列元数据存储在磁盘上。
  • 如果 MQTT 会话持续时间与 MQTT 网络连接时间相同,则队列是 独占的。在这种情况下,当网络连接(和会话)结束时,RabbitMQ 将删除 MQTT 客户端的所有状态,包括其队列。只有订阅的 MQTT 客户端可以从其队列中消费。
  • 不是 auto-delete。例如,如果 MQTT 客户端订阅一个主题,然后取消订阅,则队列不会被删除。但是,当 MQTT 会话结束时,队列将被删除。
  • 如果 MQTT 会话最终过期(即,RabbitMQ 运算符未禁用会话过期,请参见下文)并且超过 MQTT 网络连接时间,则队列具有 队列 TTL 设置(队列参数 x-expires)。队列 TTL(以毫秒为单位)由 MQTT 客户端在 CONNECT 数据包中请求的 会话过期间隔(以秒为单位)和服务器端配置的 mqtt.max_session_expiry_interval_seconds 的最小值确定。

mqtt.max_session_expiry_interval_seconds 的默认值为 86400(1 天)。RabbitMQ 运算符可以通过将此参数设置为 0 来强制所有 MQTT 会话在其网络连接结束后立即结束。

RabbitMQ 运算符可以通过将此参数设置为 infinity 来允许 MQTT 会话永久持续。这存在风险:不使用 Clean 会话的短期客户端可能会留下队列和消息,这将消耗资源并需要手动清理。

当 MQTT 会话结束时,RabbitMQ 会删除 MQTT 客户端的任何状态。此状态包括客户端的队列(包括 QoS 0 和 QoS 1 消息)和队列绑定(即客户端的订阅)。

主题级别分隔符和通配符

MQTT 协议规范定义斜杠 (“/”) 为 主题级别分隔符,而 AMQP 0-9-1 定义点 (“.”) 为主题级别分隔符。此插件在后台转换模式以桥接两者。

例如,MQTT 主题 cities/london 变为 AMQP 0.9.1 主题 cities.london,反之亦然。因此,当 MQTT 客户端发布主题为 cities/london 的消息时,如果 AMQP 0.9.1 客户端想要接收该消息,则应从其队列创建到主题交换机的绑定,绑定键为 cities.london

反之亦然,当 AMQP 0.9.1 客户端发布主题为 cities.london 的消息时,如果 MQTT 客户端想要接收该消息,则应创建主题过滤器为 cities/london 的 MQTT 订阅。

这有一个重要的限制:主题中包含点的 MQTT 主题将无法按预期工作,应避免使用,同样适用于包含斜杠的 AMQP 0-9-1 路由和绑定键。

此外,MQTT 定义加号 (“+”) 为 单级别通配符,而 AMQP 0.9.1 定义星号 (“*”) 以匹配单个单词

MQTTAMQP 0.9.1描述
/(斜杠).(点)主题级别分隔符
+(加号)*(星号)单级别通配符(匹配单个单词)
#(井号)#(井号)多级别通配符(匹配零个或多个单词)

使用仲裁队列

使用 mqtt.durable_queue_type 选项,可以为 MQTT 会话持续时间长于 MQTT 网络连接时间的订阅者选择使用 仲裁队列

此值只能为新集群启用,在任何客户端声明持久订阅之前。由于队列类型在声明后无法更改,因此如果为现有集群更改此设置的值,则具有现有持久状态的客户端将遇到队列类型不匹配错误,并且 订阅将失败

以下是 rabbitmq.conf 示例,该示例选择使用仲裁队列

# must ONLY be enabled for new clusters before any clients declare durable subscriptions
mqtt.durable_queue_type = quorum

当前,此设置适用于所有 MQTT 客户端,这些客户端

  1. 以 QoS 1 订阅,并且
  2. 使用大于 0 的 会话过期间隔(MQTT 5.0)或将 CleanSession 设置为 0(MQTT 3.1.1)连接

第二个条件意味着 MQTT 会话超过 MQTT 网络连接时间。

虽然仲裁队列旨在提高数据安全性以及从副本故障中可预测的有效恢复,但它们也存在缺点。根据定义,仲裁队列在集群中至少需要三个副本。因此,仲裁队列声明和删除时间更长,并且不适合 高客户端连接 churn 或具有大量(数十万)订阅者的环境。

仲裁队列非常适合少数(数百个)长期存在的客户端,这些客户端实际上非常关心数据安全。

MQTT QoS 0 队列类型

如果满足以下三个条件,则 MQTT 插件将创建 MQTT QoS 0 队列

  1. 功能标志 rabbit_mqtt_qos0_queue 已启用。
  2. MQTT 客户端以 QoS 0 订阅。
  3. MQTT 5.0 客户端使用 会话过期间隔 0 连接,或者 MQTT 3.1.1 客户端使用 CleanSession 设置为 1 连接。

第三个条件意味着 MQTT 会话的持续时间仅与网络连接的持续时间相同。

MQTT QoS 0 队列类型可以被认为是“伪”或“虚拟”队列:它与其他队列类型(经典队列、仲裁队列和流)非常不同,因为这种新的队列类型既不是单独的 Erlang 进程,也不将消息存储在磁盘上。相反,此队列类型是 Erlang 进程邮箱的子集。MQTT 消息直接发送到订阅客户端的 MQTT 连接进程。换句话说,MQTT 消息被发送到任何“在线” MQTT 订阅者。

将队列视为“跳过”更为准确。将消息直接发送到 MQTT 连接进程的事实被实现为队列类型是为了简化消息路由和协议互操作性,以便消息不仅可以从 MQTT 发布连接进程发送,还可以从 AMQP 0.9.1 通道 进程发送。后者使 AMQP 0.9.1、AMQP 1.0 或 STOMP 客户端能够直接向 MQTT 订阅者连接进程发送消息,从而跳过专用队列进程。

使用 MQTT QoS 0 队列类型的好处是

  1. 支持更大的扇出,例如,将消息从“云”(RabbitMQ)发送到所有设备(MQTT 客户端)。
  2. 更低的 内存使用量
  3. 更低的发布者确认延迟
  4. 更低的端到端延迟

由于 MQTT QoS 0 队列类型没有流量控制,因此 MQTT 消息到达 MQTT 连接进程邮箱的速度可能比从 MQTT 连接进程传递到 MQTT 订阅客户端的速度更快。当 MQTT 订阅客户端和 RabbitMQ 之间的网络连接不良或在大量扇入场景中(其中许多发布者使单个 MQTT 订阅客户端过载)时,可能会发生这种情况。

过载保护

为了防止由于 MQTT QoS 0 消息堆积在 MQTT 连接进程邮箱中而导致 内存使用量 过高,如果以下两个条件都为真,则 RabbitMQ 会有意丢弃 MQTT QoS 0 队列中的 QoS 0 消息

  1. MQTT 连接进程邮箱中的消息数超过配置的 mqtt.mailbox_soft_limit(默认为 200),并且
  2. 发送到 MQTT 客户端的套接字正忙(由于 TCP 背压而发送速度不够快)。

请注意,进程邮箱中可能还有其他消息(例如,从 MQTT 订阅客户端发送到 RabbitMQ 的应用程序消息或从另一种队列类型到 MQTT 连接进程的确认),这些消息显然不会被丢弃。但是,这些其他消息也会计入 mqtt.mailbox_soft_limit

mqtt.mailbox_soft_limit 设置为 0 会禁用过载保护机制,这意味着 RabbitMQ 永远不会有意丢弃 QoS 0 消息。将 mqtt.mailbox_soft_limit 设置为非常高的值会降低有意丢弃 QoS 0 消息的可能性,同时增加导致集群范围内存警报的风险(特别是如果消息负载很大或者存在许多 rabbit_mqtt_qos0_queue 类型的过载队列)。

mqtt.mailbox_soft_limit 可以被视为 队列长度限制(尽管不完全精确,因为如前所述,Erlang 进程邮箱可能包含 MQTT 应用程序消息以外的其他消息)。这就是配置键 mqtt.mailbox_soft_limit 包含单词 soft 的原因。所描述的过载保护机制大致对应于经典队列和仲裁队列中存在的 溢出行为 drop-head

由给定 RabbitMQ 节点报告的以下 Prometheus 指标显示了在该节点生命周期内所有 rabbit_mqtt_qos0_queue 类型的队列中总共丢弃了多少 QoS 0 消息

rabbitmq_global_messages_dead_lettered_maxlen_total{queue_type="rabbit_mqtt_qos0_queue",dead_letter_strategy="disabled"} 0

Native MQTT 博客文章更详细地描述了 MQTT QoS 0 队列类型。

用户和身份验证

MQTT 客户端将能够连接,前提是他们拥有具有适当权限的现有用户的凭据集。

为了使 MQTT 连接成功,它必须成功进行身份验证,并且用户必须具有 适当的权限 才能访问插件使用的虚拟主机(请参见下文)。

MQTT 客户端可以(并且通常会)在连接时指定一组凭据。凭据可以是用户名和密码对,也可以是 x.509 证书(请参见下文)。

该插件支持匿名身份验证,但不建议使用,并且它受到某些限制(在下面列出),这些限制是为了默认情况下的合理安全级别而强制执行的。

可以使用 rabbitmqctl管理 UIHTTP API 管理用户及其权限。

例如,以下命令为 MQTT 连接创建一个新用户,该用户对插件使用的默认 虚拟主机 具有完全访问权限

# username and password are both "mqtt-test"
rabbitmqctl add_user mqtt-test mqtt-test
rabbitmqctl set_permissions -p "/" mqtt-test ".*" ".*" ".*"
rabbitmqctl set_user_tags mqtt-test management

请注意,用户名中可能不会出现冒号。

本地与远程客户端连接

当 MQTT 客户端未提供登录凭据时,插件默认使用 guest 帐户,该帐户不允许非 localhost 连接。从远程主机连接时,以下选项可确保远程客户端可以成功连接

  • 创建一个或多个新用户,授予他们对 MQTT 插件使用的虚拟主机的完全权限,并使从远程主机连接的客户端使用这些凭据。这是推荐的选项。
  • anonymous_login_useranonymous_login_pass 设置为具有 适当权限 的非 guest 用户。

匿名连接

MQTT 支持可选身份验证(客户端可以不提供凭据)。因此,默认凭据集用于匿名连接。

anonymous_login_useranonymous_login_pass 配置键用于指定凭据

anonymous_login_user = some-user
anonymous_login_pass = s3kRe7

可以禁用匿名连接

mqtt.allow_anonymous = false
anonymous_login_user = none

如果 mqtt.allow_anonymous 键设置为 false,则客户端 必须 提供凭据。

强烈建议不要使用匿名连接,并且它受到某些限制(请参见上文),这些限制是为了默认情况下的合理安全级别而强制执行的。

插件配置

这是一个 配置 示例,演示了许多 MQTT 插件设置

mqtt.listeners.tcp.default = 1883
## Default MQTT with TLS port is 8883
# mqtt.listeners.ssl.default = 8883

# anonymous connections, if allowed, will use the default
# credentials specified here
mqtt.allow_anonymous = true

mqtt.vhost = /
mqtt.exchange = amq.topic
mqtt.prefetch = 10
# 24 hours by default
mqtt.max_session_expiry_interval_seconds = 86400

TCP 监听器

当未指定配置时,MQTT 插件将侦听端口 1883 上的所有接口,并具有默认用户登录名/密码 guest/guest

要更改监听器端口,请编辑您的 配置文件,以包含 rabbitmq_mqtt 应用程序的 tcp_listeners 变量。

例如,将监听器端口更改为 12345 的最小配置文件如下所示

mqtt.listeners.tcp.1 = 12345

而将监听器更改为仅在 localhost(对于 IPv4 和 IPv6)上监听的配置文件如下所示

mqtt.listeners.tcp.1 = 127.0.0.1:1883
mqtt.listeners.tcp.2 = ::1:1883

TCP 监听器选项

该插件支持 TCP 监听器选项配置。

这些设置使用通用前缀 mqtt.tcp_listen_options,并控制诸如 TCP 缓冲区大小、入站 TCP 连接队列长度、是否启用 TCP keepalive 等。有关详细信息,请参见 网络指南

mqtt.listeners.tcp.1 = 127.0.0.1:1883
mqtt.listeners.tcp.2 = ::1:1883

mqtt.tcp_listen_options.backlog = 4096
mqtt.tcp_listen_options.buffer = 131072
mqtt.tcp_listen_options.recbuf = 131072
mqtt.tcp_listen_options.sndbuf = 131072

mqtt.tcp_listen_options.keepalive = true
mqtt.tcp_listen_options.nodelay = true

mqtt.tcp_listen_options.exit_on_close = true
mqtt.tcp_listen_options.send_timeout = 120

TLS 支持

要将 TLS 用于 MQTT 连接,必须在 Broker 中 配置 TLS。要启用支持 TLS 的 MQTT 连接,请使用 mqtt.listeners.ssl.* 配置键为 MQTT 添加 TLS 监听器。

插件将使用核心 RabbitMQ 服务器证书和密钥(就像 AMQP 0-9-1 和 AMQP 1.0 监听器一样)

ssl_options.cacertfile = /path/to/ca_certificate.pem
ssl_options.certfile = /path/to/server_certificate.pem
ssl_options.keyfile = /path/to/server_key.pem
ssl_options.verify = verify_peer
ssl_options.fail_if_no_peer_cert = true

# default TLS-enabled port for MQTT connections
mqtt.listeners.ssl.default = 8883
mqtt.listeners.tcp.default = 1883

请注意,RabbitMQ 默认拒绝 SSLv3 连接,因为已知该协议已泄露。

有关详细信息,请参见 TLS 配置指南

虚拟主机

RabbitMQ 核心是一个多租户系统,每个连接都属于一个虚拟主机。某些消息传递协议具有 vhost 的概念,而另一些则没有。MQTT 属于后一类。因此,MQTT 插件需要提供一种将连接映射到 vhost 的方法。

vhost 选项控制适配器默认连接到的 RabbitMQ vhost。仅当在连接建立期间未提供 vhost 时,才会咨询 vhost 配置。有几种(可选)方法可以指定客户端将连接到的 vhost。

端口到虚拟主机映射

第一种方法是将 MQTT 插件(TCP 或 TLS)监听器端口映射到 vhost。映射通过 mqtt_port_to_vhost_mapping 全局运行时参数 指定。让我们采用以下插件配置

mqtt.listeners.tcp.1 = 1883
mqtt.listeners.tcp.2 = 1884

mqtt.listeners.ssl.1 = 8883
mqtt.listeners.ssl.2 = 8884

# (other TLS settings are omitted for brevity)

mqtt.vhost = /

请注意,插件侦听端口 1883、1884、8883 和 8884。假设您希望连接到端口 1883 和 8883 的客户端连接到 vhost1 虚拟主机,而连接到端口 1884 和 8884 的客户端连接到 vhost2 虚拟主机。可以通过使用 rabbitmqctl 设置 mqtt_port_to_vhost_mapping 全局参数来创建端口到 vhost 的映射

rabbitmqctl set_global_parameter mqtt_port_to_vhost_mapping \
'{"1883":"vhost1", "8883":"vhost1", "1884":"vhost2", "8884":"vhost2"}'

如果给定端口没有映射(因为在 mqtt_port_to_vhost_mapping 全局参数 JSON 文档中找不到该端口,或者根本未设置全局参数),则插件将尝试从用户名中提取虚拟主机(请参见下文),并最终使用 vhost 插件配置选项。

Broker 在连接时查询 mqtt_port_to_vhost_mapping 全局参数值。如果该值发生更改,则不会通知或断开已连接的客户端。他们需要重新连接才能切换到新的虚拟主机。

虚拟主机作为用户名的一部分

连接时指定 vhost 的另一种更具体的方法是将 vhost 预先添加到用户名,并用冒号分隔。

例如,使用以下方式连接

/:guest

等效于默认 vhost 和用户名,而

mqtt-vhost:mqtt-username

表示连接到 vhost mqtt-host,用户名为 mqtt-username

在用户名中指定虚拟主机优先于使用 mqtt_port_to_vhost_mapping 全局参数指定的端口到 vhost 的映射。

使用 TLS/x509 客户端证书进行身份验证

该插件可以通过从客户端的 TLS (x509) 证书中提取名称来对启用 TLS 的连接进行身份验证,而无需使用密码。

为了安全起见,服务器必须 配置 TLS 选项,将 fail_if_no_peer_cert 设置为 true,并将 verify 设置为 verify_peer,以强制所有 TLS 客户端都具有可验证的客户端证书。

要启用此功能,请为 rabbitmq_mqtt 应用程序将 ssl_cert_login 设置为 true。例如

mqtt.ssl_cert_login = true

默认情况下,这将将用户名设置为证书主题的可分辨名称的 RFC4514-ish 字符串形式,类似于 OpenSSL 的 “-nameopt RFC2253” 选项产生的形式。

要改用通用名称,请添加

ssl_cert_login_from = common_name

到您的配置。

请注意

  • 经过身份验证的用户必须存在于配置的身份验证/授权后端中。
  • 客户端 不得 提供用户名和密码。

您可以选择使用 mqtt_default_vhosts 全局运行时参数 为客户端证书指定虚拟主机。此全局参数的值必须包含一个 JSON 文档,该文档将证书的主题可分辨名称映射到其目标虚拟主机。让我们看看如何将 2 个证书 O=client,CN=guestO=client,CN=rabbit 分别映射到 vhost1vhost2 虚拟主机。

可以使用以下方法设置全局参数

rabbitmqctl set_global_parameter mqtt_default_vhosts \
'{"O=client,CN=guest": "vhost1", "O=client,CN=rabbit": "vhost2"}'

请注意

  • 如果找不到证书的虚拟主机(因为在 mqtt_default_vhosts 全局参数 JSON 文档中找不到证书主题的可分辨名称,或者根本未设置全局参数),则将使用 vhost 插件配置选项指定的虚拟主机。
  • Broker 在连接时查询 mqtt_default_vhosts 全局参数值。如果该值发生更改,则不会通知或断开已连接的客户端。他们需要重新连接才能切换到新的虚拟主机。
  • 使用 mqtt_default_vhosts 全局参数的证书到 vhost 的映射被认为比使用 mqtt_port_to_vhost_mapping 全局参数的端口到 vhost 的映射更具体,因此优先于后者。

使用客户端证书中的 client_id 提取进行身份验证

可以将节点配置为验证 MQTT 连接上设置的 client_id 是否与客户端证书中找到的 client_id 匹配。

如果它们匹配,则 RabbitMQ 通过将用户的身份连同 client_id 一起传递到配置的身份验证后端来继续用户的身份验证。某些身份验证后端(如 rabbitmq_auth_backend_http)可能会使用 client_id 凭据以及 username 来做出身份验证和/或授权决策。如果 client_id 不匹配,则 RabbitMQ 将关闭连接,原因代码为 2,这意味着“服务器不允许客户端标识符”。

为此,必须首先指示 RabbitMQ 如何从证书中获取 client_id。这通过 mqtt.ssl_cert_client_id_from 配置键完成。可接受的值为

  • distinguished_name:这是证书的 DN 或 可分辨名称
  • subject_alternative_name:主题备用名称或 SAN,它来自证书的扩展部分。由于存在不同类型的主题备用名称,因此可能还需要指定类型

如果 mqtt.ssl_cert_client_id_from 设置为 subject_alternative_name,则可以使用 mqtt.ssl_cert_login_san_type 配置备用名称的类型。如果省略该设置,则使用的默认类型为 dns。可接受的值为

  • dns
  • ip
  • email
  • uri
  • other_name

证书可以包含同一类型的多个 SAN 字段,例如,两个备用 DNS 名称。如果是这种情况,请使用 mqtt.ssl_cert_login_san_index 配置键来指定要使用的索引。默认情况下,RabbitMQ 将选择第一个值,即 mqtt.ssl_cert_login_san_index 的默认值为 0

下面是一个示例,其中用户名从证书的可分辨名称中提取,而 client_id 从类型为 uri 的第一个 SAN(主题备用名称)中提取

ssl_cert_login_from = distinguished_name
mqtt.ssl_cert_client_id_from = subject_alternative_name
mqtt.ssl_cert_login_san_type = uri

流量控制

prefetch 选项控制将要传递的最大未确认的 QoS=1 PUBLISH 数据包数量。此选项的解读方式与 consumer prefetch 相同。

MQTT 5.0 客户端可以通过在 CONNECT 数据包中设置 接收最大值(Receive Maximum) 来定义一个更小的数值。

自定义交换机

exchange 选项确定 MQTT 客户端的消息将发布到哪个交换机。交换机必须在客户端发布任何消息之前创建。该交换机预计是一个 主题交换机

默认主题交换机 amq.topic 是预先声明的:因此,当 RabbitMQ 启动时,它就存在。

保留消息和存储

该插件支持保留消息,但存在本节中描述的限制。消息存储实现是可插拔的,并且该插件开箱即用地提供了两种实现。

  • 基于 ETS(内存中),在模块 rabbit_mqtt_retained_msg_store_ets 中实现
  • 基于 DETS(磁盘上),在模块 rabbit_mqtt_retained_msg_store_dets 中实现

这两种实现都有局限性和权衡。对于第一种实现,可以保留的最大消息数受 RAM 限制。对于第二种实现,每个 vhost 限制为 2 GB。两者都是 节点本地的:保留消息既不会复制到远程集群节点,也不会从远程集群节点查询。

一个可行的例子如下:一个 MQTT 客户端向节点 A 发布一个主题为 topic/1 的保留消息。之后,另一个客户端在节点 A 上订阅主题过滤器 topic/1。新的订阅者将收到保留消息。

然而,如果一个客户端在节点 A 上发布一个保留消息,而另一个客户端随后在节点 B 上订阅,那么订阅客户端将不会收到存储在节点 A 上的任何保留消息。

此外,如果主题过滤器包含通配符(多级通配符 “#” 或 单级通配符 “+”),则不会发送保留消息。

要配置存储,请使用 mqtt.retained_message_store 配置键。

## use DETS (disk-based) store for retained messages
mqtt.retained_message_store = rabbit_mqtt_retained_msg_store_dets
## only used by DETS store (in milliseconds)
mqtt.retained_message_store_dets_sync_interval = 2000

该值必须是一个实现存储的模块。

  • rabbit_mqtt_retained_msg_store_ets 用于基于 RAM 的存储
  • rabbit_mqtt_retained_msg_store_dets 用于基于磁盘的存储(这是默认值。)

这些实现适用于开发,但有时不适用于生产需求。MQTT 规范没有定义保留消息存储的一致性或复制要求,因此 RabbitMQ 允许自定义存储以满足特定环境的一致性和可用性需求。例如,基于 RiakCassandra 的存储将适用于大多数生产环境,因为这些数据存储提供 可调整的一致性

消息存储必须实现 rabbit_mqtt_retained_msg_store 行为。

指标

Prometheus

此插件发出 全局计数器 中列出的 Prometheus 指标。

Prometheus 标签 protocol 的值是 mqtt310mqtt311mqtt50,具体取决于 MQTT 客户端使用的是 MQTT 3.1、MQTT 3.1.1 还是 MQTT 5.0。

Prometheus 标签 queue_type 的值是 rabbit_classic_queuerabbit_quorum_queuerabbit_mqtt_qos0_queue,具体取决于 MQTT 客户端使用的队列类型。(请注意,MQTT 客户端从不直接从 streams 消费,尽管他们可以将消息发布到 streams。)

RabbitMQ 管理 API

管理 API 提供 MQTT 连接的指标(例如,来自/到客户端的网络流量)以及经典队列和仲裁队列的指标(例如,它们包含多少消息)。但是,自 3.12 起,与 AMQP 0.9.1 通道 相关的管理 API 指标(例如消息速率)不可用。

性能和可扩展性检查清单

MQTT 是物联网(IoT)的标准协议。一种常见的 IoT 工作负载是许多 MQTT 设备定期向 MQTT 代理发布传感器数据。可能有成千上万,有时甚至数百万的 IoT 设备连接到 MQTT 代理。Native MQTT 这篇博文演示了此类工作负载的性能基准。

本节旨在提供一个非详尽的检查清单,其中包含配置 RabbitMQ 作为高效 MQTT 代理的技巧和窍门,以支持大量客户端连接。

  1. 设置 management_agent.disable_metrics_collector = true 以禁用 管理插件 中的指标收集。RabbitMQ 管理插件并非设计用于过度的指标收集。事实上,通过管理 API 传递指标已被弃用。相反,请使用专为收集和查询大量指标而设计的工具:Prometheus。
  2. QoS 0 的 MQTT 数据包和订阅比 QoS 1 提供更好的性能。与 AMQP 0.9.1 和 AMQP 1.0 不同,MQTT 的设计目的不是最大化吞吐量:例如,没有 多重确认。每个 QoS 1 的 PUBLISH 数据包都需要单独确认。
  3. 按照 TCP 监听器选项 部分的描述,减小 TCP 缓冲区大小。

这大大减少了许多并发连接客户端环境中的 内存使用量

  1. 较少的主题级别(在 MQTT 主题和 MQTT 主题过滤器中)比更多的主题级别表现更好。例如,如果可能,最好将您的主题结构化为 city/name 而不是 continent/country/city/name。主题过滤器中的每个主题级别当前都会在 RabbitMQ 使用的数据库中创建自己的条目。因此,当主题级别较少时,创建和删除许多订阅将更快。此外,路由主题级别较少的消息也更快。
  2. 在订阅变更率高的工作负载中,增加 Mnesia 配置参数 dump_log_write_threshold(例如 RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="-mnesia dump_log_write_threshold 20000")。
  3. 当连接大量客户端时,增加 Erlang 进程的最大数量(例如 RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="+P 10000000)和打开端口的最大数量(例如 ERL_MAX_PORTS=10000000)。

有关更多信息,请查阅 网络配置 指南。

代理协议

MQTT 插件支持 代理协议。此功能默认禁用,要为 MQTT 客户端启用它,请...

mqtt.proxy_protocol = true

有关代理协议的更多信息,请参阅 网络指南

Sparkplug 支持

Sparkplug 是一个规范,为 MQTT 系统的设计提供指导。在 Sparkplug 中,MQTT 主题必须以 spAvM.NspBvM.N 开头,其中 MN 是整数。不幸的是,这与 RabbitMQ MQTT 插件将 MQTT 主题 转换为 AMQP 0.9.1 路由键 的方式冲突。

为了解决这个问题,可以将 sparkplug 配置条目设置为 true

mqtt.sparkplug = true

启用 Sparkplug 支持后,MQTT 插件将不会转换主题名称的 spAvM.N/spBvM.N 部分。

局限性

RabbitMQ MQTT 插件目前有以下局限性。

QoS 2

不支持 QoS 2:精确一次传递

如果 MQTT 3.0 或 3.1.1 客户端发布 QoS 2 的消息,RabbitMQ 会将 QoS 级别降级为 1。如果 MQTT 5.0 客户端发布 QoS 2 的消息,RabbitMQ 将断开客户端连接,原因代码为 155: QoS not supported(QoS 不受支持)。

重新身份验证

RabbitMQ 不支持 MQTT 5.0 AUTH 数据包,因此不支持重新身份验证

虽然 RabbitMQ 支持 AMQP 1.0、AMQP 0.9.1 和 RabbitMQ 流协议的 OAuth 2.0 令牌续订,但 RabbitMQ 不支持 MQTT 的 OAuth 2.0 令牌续订。如果令牌过期,RabbitMQ 将断开 MQTT 客户端连接,原因代码为 160: Maximum connect time(最大连接时间)。

共享订阅

不支持共享订阅

重叠订阅

具有不同 QoS 级别的重叠订阅可能会导致传递重复消息。应用程序需要考虑到这一点。例如,当同一个 MQTT 客户端为主题过滤器 /sports/football/# 创建 QoS 0 订阅,并为主题过滤器 /sports/# 创建 QoS 1 订阅时,它将收到重复的消息。

保留消息存储

保留消息和存储 中所述,保留消息仅在本地节点存储和查询。此外,如果主题过滤器包含通配符(多级通配符 “#” 或单级通配符 “+”),则不会发送保留消息。

延迟和保留遗嘱消息

延迟保留遗嘱消息 不会被保留。

禁用插件

在节点上禁用插件或从集群中移除节点之前,必须使用 rabbitmqctl 将其停用。

rabbitmqctl decommission_mqtt_node <node>;

保留消息和存储

该插件支持保留消息。消息存储实现是可插拔的,并且该插件开箱即用地提供了两种实现

  • 基于 ETS(内存中),在 rabbit_mqtt_retained_msg_store_ets 模块中实现
  • 基于 DETS(磁盘上),在 rabbit_mqtt_retained_msg_store_dets 中实现

这两种实现都有局限性和权衡。对于第一种实现,可以保留的最大消息数受 RAM 限制。对于第二种实现,每个 vhost 限制为 2 GB。两者都是节点本地的(在一个代理节点上保留的消息不会复制到集群中的其他节点)。

要配置存储,请使用 rabbitmq_mqtt.retained_message_store 配置键。

mqtt.allow_anonymous                     = true
mqtt.vhost = /
mqtt.exchange = amq.topic
mqtt.max_session_expiry_interval_seconds = 1800
mqtt.prefetch = 10

## use DETS (disk-based) store for retained messages
mqtt.retained_message_store = rabbit_mqtt_retained_msg_store_dets
## only used by DETS store
mqtt.retained_message_store_dets_sync_interval = 2000

mqtt.listeners.ssl = none
mqtt.listeners.tcp.default = 1883

该值必须是一个实现存储的模块。

  • rabbit_mqtt_retained_msg_store_ets 用于基于 RAM 的存储
  • rabbit_mqtt_retained_msg_store_dets 用于基于磁盘的存储

这些实现适用于开发,但有时不适用于生产需求。MQTT 3.1 规范没有定义保留消息存储的一致性或复制要求,因此 RabbitMQ 允许自定义存储以满足特定环境的一致性和可用性需求。例如,基于 RiakCassandra 的存储将适用于大多数生产环境,因为这些数据存储提供 可调整的一致性

消息存储必须实现 rabbit_mqtt_retained_msg_store 行为。

© . All rights reserved.