跳至主要内容

RabbitMQ 3.13 将支持 MQTT 5.0

·阅读 27 分钟

RabbitMQ 3.12 中发布的原生 MQTT为物联网用例带来了巨大的可扩展性和性能改进。

RabbitMQ 3.13 将支持MQTT 5.0,因此将是我们努力将 RabbitMQ 打造成领先的 MQTT 代理的下一步重要举措。

这篇博文解释了如何在 RabbitMQ 中使用新的 MQTT 5.0 功能。

MQTT 概述

MQTT 是物联网 (IoT) 的标准协议。

连接到代理的物联网远程设备可能网络质量较差。因此,MQTT 非常轻量级:MQTT 协议头很小,以节省网络带宽。

由于物联网设备可能经常断开连接并重新连接 - 想象一下汽车穿过隧道 - MQTT 也非常高效:与其他消息传递协议相比,客户端通过更短的握手进行连接和身份验证。

MQTT 协议已经存在很多年了。如下表所示,最新的 MQTT 协议版本为 5.0。

MQTT 版本CONNECT 数据包中的协议版本MQTT 规范发布年份RabbitMQ 自哪一年开始支持(版本)
3.1320102012 (3.0)
3.1.1420142014 (3.3)
5.0520192024 (3.13)

值得一提的是,用户界面协议版本和“内部”协议版本(也称为协议级别)之间存在差异。后者从客户端发送到服务器CONNECT 数据包中。由于用户界面协议版本 3.1.1 映射到内部协议版本 4,为了避免进一步混淆,MQTT 委员会决定跳过用户界面版本 4.0,以便用户界面版本 5.0 映射到内部协议版本 5。

MQTT 5.0 功能

附录 C. MQTT v5.0 中新功能摘要 提供了 MQTT 5.0 新功能的完整列表。

由于您可以在网上找到很多优秀的 MQTT 5.0 资源,包括说明性图表和用法模式,因此这篇博文仅关注 RabbitMQ 的具体细节。本节解释了 PR #7263 中实现的最重要功能。对于每个功能,我们都会提供一个示例说明如何在 RabbitMQ 中使用它,或概述如何在 RabbitMQ 中实现它的高级描述。

要自己运行这些示例,请启动 RabbitMQ 服务器 3.13,例如,使用此 Docker 镜像标签

docker run -it --rm --name rabbitmq -p 1883:1883 -p 15672:15672 -p 15692:15692 rabbitmq:3.13.0-management

在另一个终端窗口中,启用MQTT 插件

docker exec rabbitmq rabbitmq-plugins enable rabbitmq_mqtt

由于 MQTT 插件是动态启用的,因此MQTT 插件定义的功能标志 已禁用。启用所有功能标志,包括功能标志 mqtt_v5

docker exec rabbitmq rabbitmqctl enable_feature_flag all

列出功能标志现在应该显示所有功能标志都已启用

docker exec rabbitmq rabbitmqctl list_feature_flags --formatter=pretty_table

以下示例使用MQTTX CLI 版本 1.9.4。我们使用 CLI 而不是图形用户界面,以便您可以通过复制粘贴命令轻松运行示例。

所有新功能也适用于RabbitMQ Web MQTT 插件

以下是本博文中介绍的 MQTT 5.0 功能列表

功能 1:消息过期

描述

可以为发布到代理的每条消息设置一个以秒为单位的过期间隔。如果在该过期间隔内未使用该消息,则会将其丢弃或放入死信队列。

示例

为主题 t/1 创建订阅。这会在 RabbitMQ 中创建一个队列。通过在终端中键入 Ctrl+C 断开客户端连接。由于我们使用的是会话过期间隔 600 秒,因此此队列将存在 10 分钟。

mqttx sub --client-id sub-1 --topic t/1 --session-expiry-interval 600 --qos 1
… Connecting...
✔ Connected
… Subscribing to t/1...
✔ Subscribed to t/1
^C

以 30 秒的消息过期间隔将消息发布到同一主题

mqttx pub --topic t/1 --message m1 --message-expiry-interval 30 --qos 1
… Connecting...
✔ Connected
… Message publishing...
✔ Message published

在接下来的 30 秒内,列出队列

docker exec rabbitmq rabbitmqctl --quiet --formatter=pretty_table list_queues name type messages
┌─────────────────────────────┬─────────┬──────────┐
│ name │ type │ messages │
├─────────────────────────────┼─────────┼──────────┤
│ mqtt-subscription-sub-1qos1 │ classic │ 1
└─────────────────────────────┴─────────┴──────────┘

等待 30 秒,然后再次列出队列

docker exec rabbitmq rabbitmqctl --quiet --formatter=pretty_table list_queues
┌─────────────────────────────┬─────────┬──────────┐
│ name │ type │ messages │
├─────────────────────────────┼─────────┼──────────┤
│ mqtt-subscription-sub-1qos1 │ classic │ 0
└─────────────────────────────┴─────────┴──────────┘

由于客户端 sub-1 未连接到代理以使用该消息,因此该消息已过期。如果设置了死信队列 策略,则该消息将被放入死信队列交换机。在本例中,死信队列已禁用。查询 Prometheus 端点证明经典队列中 1 条消息已过期。

curl --silent localhost:15692/metrics | grep rabbitmq_global_messages_dead_lettered_expired_total
# TYPE rabbitmq_global_messages_dead_lettered_expired_total counter
# HELP rabbitmq_global_messages_dead_lettered_expired_total Total number of messages dead-lettered due to message TTL exceeded
rabbitmq_global_messages_dead_lettered_expired_total{queue_type="rabbit_classic_queue",dead_letter_strategy="at_most_once"} 0
rabbitmq_global_messages_dead_lettered_expired_total{queue_type="rabbit_classic_queue",dead_letter_strategy="disabled"} 1
rabbitmq_global_messages_dead_lettered_expired_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="at_least_once"} 0
rabbitmq_global_messages_dead_lettered_expired_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="at_most_once"} 0
rabbitmq_global_messages_dead_lettered_expired_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="disabled"} 0

另一个有趣的特性是以下要求

服务器发送到客户端的 PUBLISH 数据包**必须**包含一个消息过期间隔,该间隔设置为接收到的值减去应用程序消息在服务器中等待的时间。

向代理发送第二条消息,消息过期间隔为 60 秒

mqttx pub --topic t/1 --message m2 --message-expiry-interval 60 --qos 1

在重新连接订阅客户端之前等待 20 秒

mqttx sub --client-id sub-1 --topic t/1 --no-clean --session-expiry-interval 0  --qos 1 --output-mode clean
{
"topic": "t/1",
"payload": "m2",
"packet": {
...
"properties": {
"messageExpiryInterval": 40
}
}
}

根据 MQTT 5.0 协议规范的要求,客户端接收到的第二条消息的消息过期间隔设置为 40 秒:代理接收到的 60 秒减去消息在代理中等待的 20 秒。

实现

RabbitMQ 使用每消息 TTL(类似于 AMQP 0.9.1 发布者中的 expiration 字段)实现了 MQTT 5.0 消息过期。

功能 2:订阅标识符

描述

客户端可以在SUBSCRIBE 数据包中设置订阅标识符。如果由于该订阅而向客户端发送消息,则代理会将该订阅标识符包含到PUBLISH 数据包中。

订阅标识符的用例列在第SUBSCRIBE 操作 节中。

示例

从同一客户端向服务器发送 3 个单独的 SUBSCRIBE 数据包,每个数据包都有不同的主题过滤器和不同的订阅标识符

mqttx sub --client-id sub-2 --topic t/1 --subscription-identifier 1 --session-expiry-interval 600
^C
mqttx sub --client-id sub-2 --topic t/2 --subscription-identifier 2 --session-expiry-interval 600 --no-clean
^C
mqttx sub --client-id sub-2 --topic "t/#" --subscription-identifier 3 --session-expiry-interval 0 --no-clean --output-mode clean

在第二个终端窗口中,我们看到来自同一队列到同一主题交换机的 3 个绑定,每个绑定都有不同的路由键。

docker exec rabbitmq rabbitmqctl --quiet --formatter=pretty_table list_bindings \
source_name source_kind destination_name destination_kind routing_key
┌─────────────┬─────────────┬─────────────────────────────┬──────────────────┬─────────────────────────────┐
│ source_name │ source_kind │ destination_name │ destination_kind │ routing_key │
├─────────────┼─────────────┼─────────────────────────────┼──────────────────┼─────────────────────────────┤
│ │ exchange │ mqtt-subscription-sub-2qos0 │ queue │ mqtt-subscription-sub-2qos0 │
├─────────────┼─────────────┼─────────────────────────────┼──────────────────┼─────────────────────────────┤
│ amq.topic │ exchange │ mqtt-subscription-sub-2qos0 │ queue │ t.# │
├─────────────┼─────────────┼─────────────────────────────┼──────────────────┼─────────────────────────────┤
│ amq.topic │ exchange │ mqtt-subscription-sub-2qos0 │ queue │ t.1 │
├─────────────┼─────────────┼─────────────────────────────┼──────────────────┼─────────────────────────────┤
│ amq.topic │ exchange │ mqtt-subscription-sub-2qos0 │ queue │ t.2 │
└─────────────┴─────────────┴─────────────────────────────┴──────────────────┴─────────────────────────────┘

第一个条目是到默认交换机的隐式绑定。

每个 MQTT 订阅及其 MQTT 主题过滤器对应于一个具有绑定键的 AMQP 0.9.1 绑定。准确地说,表列 routing_key 的命名不正确:它应该称为 binding_key。MQTT 中的主题分隔符是“/”字符,而 AMQP 0.9.1 主题交换机中的主题分隔符是“.”字符。

再次在第二个终端窗口中,向主题 t/1 发送消息

mqttx pub --topic t/1 --message m1

第一个终端窗口(订阅客户端)收到以下 PUBLISH 数据包

{
"topic": "t/1",
"payload": "m1",
"packet": {
...
"properties": {
"subscriptionIdentifier": [
1,
3
]
}
}
}

它包含订阅标识符 1 和 3,因为主题过滤器 t/1t/# 都匹配主题 t/1

同样,如果您向主题 t/2 发送第二条消息,则订阅客户端将收到一个包含订阅标识符 2 和 3 的 PUBLISH 数据包。

实现

订阅标识符是 MQTT 会话状态的一部分。因此,在客户端断开连接以及 MQTT 会话结束之前,必须将订阅标识符持久化到服务器的数据库中。RabbitMQ 将订阅标识符存储在绑定参数中

docker exec rabbitmq rabbitmqctl --quiet --formatter=pretty_table list_bindings routing_key arguments
┌─────────────────────────────┬───────────────────────────────────────────────────────────────────────────────────┐
│ routing_key │ arguments │
├─────────────────────────────┼───────────────────────────────────────────────────────────────────────────────────┤
│ mqtt-subscription-sub-2qos0 │ │
├─────────────────────────────┼───────────────────────────────────────────────────────────────────────────────────┤
│ t.# │ {mqtt_subscription_opts,0,false,false,0,3}{<<"x-binding-key">>,longstr,<<"t.#">>} │
├─────────────────────────────┼───────────────────────────────────────────────────────────────────────────────────┤
│ t.1 │ {mqtt_subscription_opts,0,false,false,0,1}{<<"x-binding-key">>,longstr,<<"t.1">>}
├─────────────────────────────┼───────────────────────────────────────────────────────────────────────────────────┤
│ t.2 │ {mqtt_subscription_opts,0,false,false,0,2}{<<"x-binding-key">>,longstr,<<"t.2">>}
└─────────────────────────────┴───────────────────────────────────────────────────────────────────────────────────┘

绑定参数的确切结构并不重要,并且可能会在未来的 RabbitMQ 版本中发生变化。但是,您可以在绑定参数中看到整数 1、2 和 3,它们对应于订阅标识符。

当主题交换机路由消息时,发布 Erlang 进程会将所有匹配的绑定键包含到消息中。订阅 MQTT 客户端的 Erlang 进程会将匹配的绑定键与其了解的 MQTT 主题过滤器进行比较,并将订阅标识符包含到发送到 MQTT 客户端的 PUBLISH 数据包中。

发布 Erlang 进程可以是 MQTT 连接进程或 AMQP 0.9.1 通道进程。与往常一样,RabbitMQ 在跨协议互操作性方面表现出色:当 AMQP 0.9.1(或 STOMP 或 AMQP 1.0)客户端向主题交换机发送消息时,正确的订阅标识符将包含在发送到 MQTT 客户端的 PUBLISH 数据包中。

功能 3:订阅选项

描述

MQTT 5.0 带有 3 个新的订阅选项

  1. 无本地
  2. 保留为已发布
  3. 保留处理

RabbitMQ 已实现所有订阅选项。在这里,我们只关注保留处理选项

此选项指定在建立订阅时是否发送保留消息。
其值为
0 = 在订阅时发送保留消息
1 = 仅当订阅当前不存在时,在订阅时发送保留消息
2 = 在订阅时不发送保留消息

示例

发送保留消息

mqttx pub --topic mytopic --message m --retain

保留处理值 0 将接收保留消息,而值 2 则不会

mqttx sub --topic mytopic --retain-handling 0
… Connecting...
✔ Connected
… Subscribing to mytopic...
✔ Subscribed to mytopic
payload: m
retain: true
^C

mqttx sub --topic mytopic --retain-handling 2
… Connecting...
✔ Connected
… Subscribing to mytopic...
✔ Subscribed to mytopic

特性 4:所有 ACK 上的 Reason Code

描述

CONNACK、PUBACK、SUBACK、UNSUBACK 和 DISCONNECT 数据包包含 Reason Code。

实现

一个实现示例是,如果消息未路由到任何队列,RabbitMQ 将在 PUBACK 数据包中回复 Reason Code No matching subscribers。MQTT 5.0 Reason Code No matching subscribers 在概念上对应于 AMQP 0.9.1 中的 mandatory 消息属性和 BasicReturn 处理程序。

特性 5:用户属性

描述

大多数 MQTT 数据包可以包含用户属性。MQTT 规范未定义用户属性的含义。

示例 PUBLISH 数据包

PUBLISH 数据包中的用户属性由客户端应用程序定义,并由服务器原样转发。

在第一个终端窗口中订阅

mqttx sub --topic t/5

在第二个终端窗口中发布带有用户属性的消息

mqttx pub --topic t/5 --message m --user-properties "key1: value1"

第一个终端窗口将接收未更改的用户属性

payload: m
userProperties: [ { key: 'key1', value: 'value1' } ]

MQTT 5.0 PUBLISH 数据包中的用户属性类似于 AMQP 0.9.1 中的 headers 消息属性。

示例 CONNECT 数据包

使用用户属性连接

mqttx conn --client-id myclient --user-properties "connecting-from: London"

在浏览器中打开管理 UI https://127.0.0.1:15672/#/connections(用户名和密码均为 guest),然后单击 MQTT 连接

Figure 1: User Property in the CONNECT packet
图 1:CONNECT 数据包中的用户属性

RabbitMQ 将在管理 UI 中显示来自 CONNECT 数据包的用户属性。

特性 6:有效负载格式内容类型

描述

发布者可以指定 MIME 内容类型。它还可以设置有效负载格式指示器,指示有效负载是否由 UTF-8 编码的字符数据或未指定的二进制数据组成。

示例

在第一个终端窗口中,订阅主题

mqttx sub --topic t/6 --output-mode clean

在第二个终端窗口中,发送带有内容类型和有效负载格式指示器消息

mqttx pub --topic t/6 --message "my UTF-8 encoded data 🙂" --content-type text/plain --payload-format-indicator

第一个终端窗口将接收未更改的内容类型和有效负载格式指示器

{
"topic": "t/6",
"payload": "my UTF-8 encoded data 🙂",
"packet": {
...
"properties": {
"payloadFormatIndicator": true,
"contentType": "text/plain"
}
}
}

特性 7:请求/响应

描述

MQTT 5.0 将请求/响应模式形式化。

在发布消息之前,MQTT 客户端(请求方)将订阅响应主题。请求方将响应主题和一些相关数据包含在请求消息中。

另一个 MQTT 客户端(响应方)接收请求消息,执行某些操作,并将带有相同相关数据的响应消息发布到响应主题。

MQTT 5.0 请求/响应特性对应于 AMQP 0.9.1 中的远程过程调用。但是,在 AMQP 0.9.1 中,请求方将在 AMQP 0.9.1 消息属性 reply_to 中包含回调队列的名称。MQTT 协议未定义队列的概念。因此,在 MQTT 中,“回复到的地址”是主题名称。

尽管协议规范之间存在不兼容性,但 RabbitMQ 在协议互操作性方面表现出色:因此,RabbitMQ 在跨协议支持请求/响应交互。

例如,MQTT 客户端可以在请求消息中包含响应主题和相关数据。如果 AMQP 0.9.1 客户端创建了一个绑定到主题交换机 amq.topic 的队列,并使用与请求消息的主题匹配的绑定键,它将接收一个 AMQP 0.9.1 消息,该消息的属性 correlation_id 设置为 MQTT 客户端发送的相关数据,并且有一个名为 x-opt-reply-to-topic 的标头。然后,AMQP 0.9.1 客户端可以使用相同的 correlation_id 并将响应消息发布到主题交换机 amq.topic,其中包含 x-opt-reply-to-topic 标头中存在的主题,从而回复 MQTT 5.0 客户端。

示例

此示例仅关注 MQTT 客户端。

在第一个终端窗口中,响应 MQTT 客户端订阅主题 t/7

mqttx sub --client-id responder --topic t/7 --session-expiry-interval 600 --output-mode clean --qos 1

在第二个终端窗口中,请求 MQTT 客户端订阅名为 my/response/topic 的主题

mqttx sub --client-id requester --topic my/response/topic --session-expiry-interval 600 --qos 1
… Connecting...
✔ Connected
… Subscribing to my/response/topic...
✔ Subscribed to my/response/topic
^C

在第二个终端窗口中,请求方然后发布请求消息

mqttx pub --client-id requester --topic t/7 --message "my request" \
--correlation-data abc-123 --response-topic my/response/topic \
--session-expiry-interval 600 --no-clean

在第一个终端窗口中,响应方接收请求消息

{
"topic": "t/7",
"payload": "my request",
"packet": {
...
"properties": {
"responseTopic": "my/response/topic",
"correlationData": {
"type": "Buffer",
"data": [
97,
98,
99,
45,
49,
50,
51
]
}
}
}
}
^C

在第一个终端窗口中,响应方通过复制相关数据并发布到响应主题来响应请求方

mqttx pub --client-id responder --topic my/response/topic --message "my response" --correlation-data abc-123

在第二个终端窗口中,请求方接收响应。

mqttx sub --client-id requester --topic my/response/topic --no-clean --qos 1 --output-mode clean
{
"topic": "my/response/topic",
"payload": "my response",
"packet": {
...
"properties": {
"correlationData": {
"type": "Buffer",
"data": [
97,
98,
99,
45,
49,
50,
51
]
}
}
}
}

相关数据用于将响应与请求关联起来。请求方通常为其发布的每个请求选择唯一相关数据。

特性 8:分配的客户端标识符

描述

如果客户端使用零长度客户端标识符连接,则服务器必须响应包含分配的客户端标识符的 CONNACK。

与 MQTT 3.1.1 相比,这消除了服务器分配的客户端 ID 只能与 Clean Session = 1 连接一起使用的限制。

实现

RabbitMQ 将生成一些随机客户端 ID(例如 dcGB2kSwS0JlXnaBa1A6QA)并在 CONNACK 数据包中返回它。

特性 9:主题别名

描述

主题别名是一个整数,用于标识主题,而不是使用主题名称。这减少了 PUBLISH 数据包的大小,并且在主题名称很长并且在网络连接中重复使用相同的主题名称时很有用。

实现

RabbitMQ 中的默认 主题别名最大值 为 16。您可以在 rabbitmq.conf 中配置此值,例如

mqtt.topic_alias_maximum = 32

此配置值映射到 RabbitMQ 发送到客户端的 CONNACK 数据包中的主题别名最大值。它限制了两个方向上的主题别名数量,即从客户端到 RabbitMQ 和 RabbitMQ 到客户端。如果客户端发送到或接收来自许多不同主题的消息,则设置更高的值将需要 RabbitMQ 使用更多内存。

RabbitMQ 操作员可以通过设置来禁止使用主题别名

mqtt.topic_alias_maximum = 0

特性 10:流量控制

描述

MQTT 5.0 属性 Receive Maximum 定义了未确认的 QoS 1 PUBLISH 数据包的上限。

实现

从 RabbitMQ 发送到客户端的未确认 QoS 1 PUBLISH 数据包的最大数量由客户端在 CONNECT 数据包中发送到 RabbitMQ 的 Receive Maximum 和配置的 mqtt.prefetch 值的最小值决定

mqtt.prefetch = 10

mqtt.prefetch 的默认值为 10。

mqtt.prefetch 值在 RabbitMQ 3.13 之前已存在于 MQTT 3.1 和 3.1.1 中。它映射到 RabbitMQ 中的 消费者预取。换句话说,它定义了队列向其 MQTT 连接进程发送的未处理消息的数量。

特性 11:最大数据包大小

描述

客户端和服务器可以独立指定其支持的最大数据包大小。

示例

此示例演示如何限制从客户端发送到 RabbitMQ 的最大 MQTT 数据包大小。

假设在成功进行身份验证后,RabbitMQ 操作员不希望 RabbitMQ 接受任何大于 1 KiB 的 MQTT 数据包。将以下配置写入 rabbitmq.conf(在您当前的工作目录中)

mqtt.max_packet_size_authenticated = 1024

停止 RabbitMQ 服务器后,使用应用了新配置的 RabbitMQ 服务器启动 RabbitMQ 服务器

docker run -it --rm --name rabbitmq -p 1883:1883 -p 15672:15672 -p 15692:15692 \
--mount type=bind,source="$(pwd)"/rabbitmq.conf,target=/etc/rabbitmq/conf.d/11-blog-post.conf \
rabbitmq:3.13.0-beta.2-management
docker exec rabbitmq rabbitmq-plugins enable rabbitmq_mqtt
docker exec rabbitmq rabbitmqctl enable_feature_flag all

在第一个终端窗口中,订阅主题

mqttx sub --topic t/11

在第二个终端窗口中,向该主题发送有效负载为 3 字节的消息

payload=$(head --bytes 3 < /dev/zero | tr '\0' x)
mqttx pub --topic t/11 -m "$payload"

第一行从特殊文件 /dev/zero 读取 3 个字节(3 个空字符),将每个空字符转换为 ASCII 字符 x 并将结果 xxx 保存到变量 payload 中。

第一个终端窗口将接收该消息

payload: xxx

接下来,在第二个终端窗口中,发送有效负载为 2,000 字节的消息

payload=$(head --bytes 2000 < /dev/zero | tr '\0' x)
mqttx pub --topic t/11 -m "$payload"

这次,第一个终端窗口没有收到消息,因为从客户端发送到 RabbitMQ 的 PUBLISH 数据包大于配置的最大数据包大小 1024 字节。

相反,RabbitMQ 记录了一条描述性错误消息

[error] <0.836.0> MQTT packet size (2007 bytes, type 3) exceeds mqtt.max_packet_size_authenticated (1024 bytes)

日志消息显示 2,007 字节,因为 PUBLISH 数据包的固定和可变标头需要 7 字节(其中 4 字节用于主题名称 t/11)。

特性 12:服务器发起的 DISCONNECT

描述

在 MQTT 5.0 中,DISCONNECT 数据包不仅可以从客户端发送到服务器,还可以从服务器发送到客户端。

实现

在以下情况下,RabbitMQ 会在终止连接之前向客户端发送 DISCONNECT 数据包

断开连接原因代码名称情况
会话被接管另一个客户端使用相同的客户端 ID 连接。
服务器正在关闭RabbitMQ 进入维护模式
保持活动超时客户端在保持活动时间内未能通信。
数据包过大RabbitMQ 收到大小超过 mqtt.max_packet_size_authenticated 的数据包。

功能 13:会话过期

描述

在 MQTT 5.0 中,客户端可以在 CONNECT 数据包中向服务器建议一个会话过期间隔。服务器可以接受建议的会话过期间隔,或者在 CONNACK 数据包中强制使用不同的间隔。

会话可以在一系列网络连接中继续。它持续到最新的网络连接加上会话过期间隔。

当会话过期间隔到期时,客户端和服务器都会删除任何会话状态。

实现

客户端和服务器在会话持续期间保持会话状态。

服务器中的会话状态包括已发送到客户端但尚未确认的消息、待发送到客户端的消息以及客户端的订阅。RabbitMQ 以队列和绑定的形式对这种 MQTT 会话状态进行建模。

因此,会话过期间隔映射到 RabbitMQ 中的队列 TTL。当 MQTT 会话过期时,队列及其消息和绑定将被删除。

示例

默认情况下,服务器允许的最大会话过期间隔为 1 天。如果 MQTT 客户端在 1 天内未重新连接,则其会话状态将在 RabbitMQ 中被删除。

此值是可配置的。出于本示例的目的,让我们在 rabbitmq.conf 中设置一个非常低的会话过期间隔,即 1 分钟。

mqtt.max_session_expiry_interval_seconds = 60

设置名称包含前缀 max,因为 MQTT 5.0 客户端可以通过在 CONNECT 数据包中发送会话过期间隔来选择较低的值。如最大数据包大小示例中所做的那样,重新启动 RabbitMQ 节点以应用新设置。

使用 20 秒的会话过期间隔连接到 RabbitMQ 并创建订阅。

mqttx sub --client-id sub-13 --topic t/13 --session-expiry-interval 20 --qos 1
… Connecting...
✔ Connected
… Subscribing to t/13...
✔ Subscribed to t/13
^C

在终端中键入 Ctrl+C 以断开客户端连接。

在接下来的 20 秒内,列出队列和绑定。

docker exec rabbitmq rabbitmqctl list_queues name
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name
mqtt-subscription-sub-13qos1

docker exec rabbitmq rabbitmqctl list_bindings source_name destination_name routing_key --formatter=pretty_table
Listing bindings for vhost /...
┌─────────────┬──────────────────────────────┬──────────────────────────────┐
│ source_name │ destination_name │ routing_key │
├─────────────┼──────────────────────────────┼──────────────────────────────┤
│ │ mqtt-subscription-sub-13qos1 │ mqtt-subscription-sub-13qos1 │
├─────────────┼──────────────────────────────┼──────────────────────────────┤
│ amq.topic │ mqtt-subscription-sub-13qos1 │ t.13 │
└─────────────┴──────────────────────────────┴──────────────────────────────┘

20 秒后,再次列出队列和绑定。

docker exec rabbitmq rabbitmqctl list_queues name
Timeout: 60.0 seconds ...
Listing queues for vhost / ...

docker exec rabbitmq rabbitmqctl list_bindings source_name destination_name routing_key --formatter=pretty_table
Listing bindings for vhost /...

RabbitMQ 删除了队列及其绑定,因为我们的客户端在 20 秒的会话过期间隔内未使用 Clean Session = 0 连接到 RabbitMQ。

接下来,执行相同的测试,但使用较高的会话过期间隔,例如 1 小时。

mqttx sub --client-id sub-13 --topic t/13 --session-expiry-interval 3600 --qos 1
… Connecting...
✔ Connected
… Subscribing to t/13...
✔ Subscribed to t/13
^C

您应该会观察到,队列及其绑定将在 1 分钟后被删除,因为有效的会话过期间隔是客户端请求的值(1 小时)和 RabbitMQ 中配置的 mqtt.max_session_expiry_interval_seconds 值(1 分钟)的最小值。

功能 14:遗嘱延迟

描述

客户端可以在 CONNECT 数据包中定义遗嘱延迟间隔。

服务器延迟发布客户端的遗嘱消息,直到遗嘱延迟间隔过去或会话结束,以先发生者为准。如果在遗嘱延迟间隔过去之前建立了与此会话的新网络连接,则服务器必须不发送遗嘱消息。这样做的一个用途是在发生临时网络断开连接并且客户端成功重新连接并在遗嘱消息发布之前继续其会话时避免发布遗嘱消息。

遗嘱延迟间隔的另一个用例是通知会话过期。

客户端可以通过将遗嘱延迟间隔设置为长于会话过期间隔并在使用原因代码 0x04(带有遗嘱消息的断开连接)断开连接时安排遗嘱消息通知会话过期已发生。

实现

虽然遗嘱消息有效负载通常很小,但 MQTT 规范允许遗嘱消息有效负载大小最大为 64 KiB。

为了避免在Khepri(RabbitMQ 的未来元数据存储)中存储大型二进制数据,RabbitMQ 创建了一个包含此单个遗嘱消息的经典队列。我们称此队列为遗嘱队列。此消息设置了每消息 TTL,该 TTL 以毫秒为单位定义,并对应于以秒为单位的遗嘱延迟间隔。此外,遗嘱队列还设置了队列 TTL,该 TTL 以毫秒为单位定义,并对应于以秒为单位的会话过期间隔。有效的每消息 TTL 至少比队列 TTL 低几毫秒,以便在队列(会话)过期前不久发布消息。

遗嘱队列还定义 amq.topic(MQTT 插件使用的默认主题交换机)作为死信交换机,并将遗嘱主题作为死信路由键

如果 MQTT 客户端在其遗嘱延迟间隔内未重新连接,则遗嘱队列中的消息将被死信发送到主题交换机。

让我们用一个示例来说明这一点。

示例

在第一个终端窗口中,创建一个将使用遗嘱消息的订阅。

mqttx sub --client-id sub-14 --topic t/14

在第二个终端窗口中,创建一个具有 20 秒遗嘱延迟间隔的连接。

mqttx conn --client-id conn-14 --will-topic t/14 --will-message my-will-message --will-delay-interval 20 --session-expiry-interval 40

在第三个终端窗口中,我们看到到目前为止,订阅的 MQTT 客户端创建了一个队列。

docker exec rabbitmq rabbitmqctl --quiet --formatter=pretty_table list_queues name type messages arguments
┌──────────────────────────────┬────────────┬──────────┬───────────┐
│ name │ type │ messages │ arguments │
├──────────────────────────────┼────────────┼──────────┼───────────┤
│ mqtt-subscription-sub-14qos0 │ MQTT QoS 00 │ │
└──────────────────────────────┴────────────┴──────────┴───────────┘

在第二个终端窗口中,键入 Ctrl+C 以断开客户端 ID 为 conn-14 的 MQTT 连接。

这次,列出队列显示已创建遗嘱队列。

docker exec rabbitmq rabbitmqctl --quiet --formatter=pretty_table list_queues name type messages arguments
┌──────────────────────────────┬────────────┬──────────┬────────────────────────────────────────────────────────────┐
│ name │ type │ messages │ arguments │
├──────────────────────────────┼────────────┼──────────┼────────────────────────────────────────────────────────────┤
│ mqtt-subscription-sub-14qos0 │ MQTT QoS 00 │ │
├──────────────────────────────┼────────────┼──────────┼────────────────────────────────────────────────────────────┤
│ mqtt-will-conn-14 │ classic │ 1{<<"x-expires">>,long,40000}
│ │ │ │ {<<"x-dead-letter-exchange">>,longstr,<<"amq.topic">>}
│ │ │ │ {<<"x-dead-letter-routing-key">>,longstr,<<"t.14">>}
└──────────────────────────────┴────────────┴──────────┴────────────────────────────────────────────────────────────┘

遗嘱队列的命名模式为 mqtt-will-<MQTT Client ID>。它包含一条消息:遗嘱消息。

如上一节所述,队列 TTL(x-expires)为 40,000 毫秒,因此与我们上面命令中的 40 秒会话过期间隔匹配。如果您等待 20 秒,则您的第一个终端窗口应该会收到遗嘱消息,因为我们的客户端在遗嘱延迟间隔内未重新连接。

› payload: my-will-message

功能 15:可选服务器功能可用性

描述

定义一组服务器不允许的功能,并提供一种机制供服务器将其指定给客户端。可以通过这种方式指定的功能包括:

  • 最大 QoS
  • 保留可用
  • 通配符订阅可用
  • 订阅标识符可用
  • 共享订阅可用

如果客户端使用服务器已声明不可用的功能,则会发生错误。

实现

RabbitMQ 3.13 在CONNACK 属性中包含最大 QoS = 1 和共享订阅可用 = 0。

QoS 2 未被 RabbitMQ 支持。

如下一节所述,共享订阅将在未来的 RabbitMQ 版本中得到支持。

限制

本节列出了 RabbitMQ MQTT 实现的限制。

MQTT 5.0 特定限制

共享订阅

共享订阅将在未来的 RabbitMQ 版本中添加。尽管此功能很好地映射到 RabbitMQ 中的队列,但共享订阅是会话状态的一部分,并且需要一些 RabbitMQ 数据库迁移才能有效地查询给定 MQTT 客户端 ID 的共享订阅。

延迟和保留的遗嘱消息

延迟保留的遗嘱消息将不会被保留。这是因为延迟的遗嘱消息将被死信发送到主题交换机,但保留过程当前不会从队列中使用。此限制将来可以通过新的保留消息存储来解决。

非 MQTT 5.0 特定限制

为了完整起见,本节列出了在 RabbitMQ 3.13 中支持 MQTT 5.0 之前以及在 RabbitMQ 3.12 中原生 MQTT 发布之前存在的限制。

保留的消息

保留消息的功能在 RabbitMQ 中有限

保留的消息仅在节点本地存储和查询。

一个有效的示例如下:一个 MQTT 客户端将保留消息发布到节点 A,主题为 topic/1。此后,另一个客户端在节点 A 上使用主题过滤器 topic/1 订阅。新的订阅者将收到保留的消息。

但是,如果主题过滤器包含通配符(多级通配符字符“#”或单级通配符字符“+”),则不会发送任何保留的消息(问题 #8824)。

此外,如果客户端在节点 A 上发布保留消息,另一个客户端随后在节点 B 上订阅,则该订阅客户端将不会收到存储在节点 A 上的任何保留消息(问题 #8096)。

未来的 RabbitMQ 版本将在集群中复制保留的消息,并发送与包含通配符的主题过滤器匹配的保留消息。

总结

总而言之,RabbitMQ

  • 是领先的 AMQP 0.9.1 代理
  • 是一个 流式 代理
  • 在跨协议互操作性方面表现出色
  • 由于支持 3.13 中发布的 MQTT 5.0 和 3.12 中发布的 原生 MQTT,正在成为领先的 MQTT 代理之一

我们将 RabbitMQ 打造为成熟的物联网代理的旅程尚未结束,未来几个月和几年内计划进行更多开发工作。敬请期待!

© 2024 RabbitMQ. All rights reserved.