RabbitMQ 3.13 将支持 MQTT 5.0
RabbitMQ 3.12 中发布的原生 MQTT 为 IoT 用例带来了显著的可伸缩性和性能改进。
RabbitMQ 3.13 将支持MQTT 5.0,因此将是我们成为领先 MQTT 代理之一的旅程中的下一个重要里程碑。
这篇博文解释了 RabbitMQ 中如何使用新的 MQTT 5.0 功能。
MQTT 概述
MQTT 是物联网 (IoT) 的标准协议。
物联网远程设备在连接到代理(Broker)时网络质量可能较差。因此,MQTT 设计得非常轻量:MQTT 协议头部很小,以节省网络带宽。
由于物联网设备经常会断开连接并重新连接——想象一下汽车穿过隧道的情景——MQTT 也非常高效:客户端通过比其他消息协议更简短的握手过程进行连接和身份验证。
MQTT 协议已经存在多年。如下表所示,最新的 MQTT 协议版本是 5.0。
| MQTT 版本 | CONNECT 数据包中的协议版本 | MQTT 规范发布年份 | RabbitMQ 支持年份(版本) |
|---|---|---|---|
| 3.1 | 3 | 2010 | 2012 (3.0) |
| 3.1.1 | 4 | 2014 | 2014 (3.3) |
| 5.0 | 5 | 2019 | 2024 (3.13) |
值得一提的是,面向用户的协议版本与“内部”协议版本(也称为协议级别)之间存在区别。后者由客户端在 CONNECT 数据包中发送给服务器。由于面向用户的 3.1.1 协议版本对应内部协议版本 4,为了避免进一步混淆,MQTT 委员会决定跳过面向用户的 4.0 版本,这样面向用户的 5.0 版本就对应内部协议版本 5。
MQTT 5.0 特性
附录 C:MQTT v5.0 新特性总结提供了 MQTT 5.0 新特性的完整列表。
由于网络上已经有很多关于 MQTT 5.0 的优秀资源(包括图示和使用模式),本文档仅关注 RabbitMQ 的具体实现。本节解释了 PR #7263 中实现的最重要特性。对于每个特性,我们都提供了其在 RabbitMQ 中的使用示例,或概述了其在 RabbitMQ 中的实现原理。
要亲自运行这些示例,请启动 RabbitMQ 3.13 服务器,例如使用此 Docker 镜像标签
docker run -it --rm --name rabbitmq -p 1883:1883 -p 15672:15672 -p 15692:15692 rabbitmq:3.13.0-management
在另一个终端窗口中,启用 MQTT 插件
docker exec rabbitmq rabbitmq-plugins enable rabbitmq_mqtt
由于 MQTT 插件是动态启用的,MQTT 插件定义的特性标志处于禁用状态。请启用所有特性标志,包括 mqtt_v5 特性标志
docker exec rabbitmq rabbitmqctl enable_feature_flag all
列出特性标志,现在应该显示所有特性标志都已启用
docker exec rabbitmq rabbitmqctl list_feature_flags --formatter=pretty_table
以下示例使用 MQTTX CLI 1.9.4 版本。我们使用命令行界面而不是图形用户界面,这样您可以通过复制粘贴命令轻松运行示例。
所有新特性同样适用于 RabbitMQ Web MQTT 插件。
以下是本文档涵盖的 MQTT 5.0 特性列表
- 特性 1:消息过期 (Message Expiry)
- 特性 2:订阅标识符 (Subscription Identifier)
- 特性 3:订阅选项 (Subscription Options)
- 特性 4:所有确认信息中的原因码 (Reason code on all ACKs)
- 特性 5:用户属性 (User properties)
- 特性 6:负载格式和内容类型 (Payload Format and Content Type)
- 特性 7:请求/响应 (Request / Response)
- 特性 8:分配的客户端标识符 (Assigned Client Identifier)
- 特性 9:主题别名 (Topic Alias)
- 特性 10:流控制 (Flow control)
- 特性 11:最大数据包大小 (Maximum Packet Size)
- 特性 12:服务器发起的 DISCONNECT
- 特性 13:会话过期 (Session Expiry)
- 特性 14:遗嘱延迟 (Will delay)
- 特性 15:可选的服务器特性可用性 (Optional Server feature availability)
特性 1:消息过期
描述
可以为发布到代理的每条消息设置一个以秒为单位的过期时间。如果消息在该过期时间内未被消费,则该消息会被丢弃或发送到死信队列。
示例
为主题 t/1 创建订阅。这会在 RabbitMQ 中创建一个队列。通过在终端输入 Ctrl+C 断开客户端连接。由于我们使用 600 秒的会话过期时间,该队列将继续存在 10 分钟。
mqttx sub --client-id sub-1 --topic t/1 --session-expiry-interval 600 --qos 1
… Connecting...
✔ Connected
… Subscribing to t/1...
✔ Subscribed to t/1
^C
使用 30 秒的消息过期时间向同一主题发布一条消息
mqttx pub --topic t/1 --message m1 --message-expiry-interval 30 --qos 1
… Connecting...
✔ Connected
… Message publishing...
✔ Message published
在接下来的 30 秒内,列出队列
docker exec rabbitmq rabbitmqctl --quiet --formatter=pretty_table list_queues name type messages
┌─────────────────────────────┬─────────┬──────────┐
│ name │ type │ messages │
├─────────────────────────────┼─────────┼──────────┤
│ mqtt-subscription-sub-1qos1 │ classic │ 1 │
└─────────────────────────────┴─────────┴──────────┘
等待 30 秒,然后再次列出队列
docker exec rabbitmq rabbitmqctl --quiet --formatter=pretty_table list_queues
┌─────────────────────────────┬─────────┬──────────┐
│ name │ type │ messages │
├─────────────────────────────┼─────────┼──────────┤
│ mqtt-subscription-sub-1qos1 │ classic │ 0 │
└─────────────────────────────┴─────────┴──────────┘
消息已过期,因为客户端 sub-1 没有连接到代理来消费该消息。如果设置了死信策略,消息将被发送到指定的交换机。在我们的案例中,死信功能已禁用。查询 Prometheus 端点证明了有 1 条消息从经典队列中过期。
curl --silent localhost:15692/metrics | grep rabbitmq_global_messages_dead_lettered_expired_total
# TYPE rabbitmq_global_messages_dead_lettered_expired_total counter
# HELP rabbitmq_global_messages_dead_lettered_expired_total Total number of messages dead-lettered due to message TTL exceeded
rabbitmq_global_messages_dead_lettered_expired_total{queue_type="rabbit_classic_queue",dead_letter_strategy="at_most_once"} 0
rabbitmq_global_messages_dead_lettered_expired_total{queue_type="rabbit_classic_queue",dead_letter_strategy="disabled"} 1
rabbitmq_global_messages_dead_lettered_expired_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="at_least_once"} 0
rabbitmq_global_messages_dead_lettered_expired_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="at_most_once"} 0
rabbitmq_global_messages_dead_lettered_expired_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="disabled"} 0
另一个有趣的特性是以下要求
服务器发送给客户端的 PUBLISH 数据包必须包含一个消息过期间隔,该值等于接收到的值减去应用程序消息在服务器中等待的时间。
向代理发送第二条消息,过期时间为 60 秒
mqttx pub --topic t/1 --message m2 --message-expiry-interval 60 --qos 1
在重新连接订阅客户端之前等待 20 秒
mqttx sub --client-id sub-1 --topic t/1 --no-clean --session-expiry-interval 0 --qos 1 --output-mode clean
{
"topic": "t/1",
"payload": "m2",
"packet": {
...
"properties": {
"messageExpiryInterval": 40
}
}
}
根据 MQTT 5.0 协议规范的要求,客户端收到的第二条消息中,消息过期时间设置为 40 秒:即代理接收到的 60 秒减去消息在代理中等待的 20 秒。
实现
RabbitMQ 中的 MQTT 5.0 消息过期功能是使用每条消息的 TTL 实现的,类似于 AMQP 0.9.1 发布者中的 expiration 字段。
特性 2:订阅标识符
描述
客户端可以在 SUBSCRIBE 数据包中设置订阅标识符。如果客户端因为该订阅而收到消息,代理会将该订阅标识符包含在 PUBLISH 数据包中。
订阅标识符的使用案例列在 SUBSCRIBE 操作一节中。
示例
从同一个客户端向服务器发送 3 个单独的 SUBSCRIBE 数据包,每个数据包具有不同的主题过滤器和不同的订阅标识符
mqttx sub --client-id sub-2 --topic t/1 --subscription-identifier 1 --session-expiry-interval 600
^C
mqttx sub --client-id sub-2 --topic t/2 --subscription-identifier 2 --session-expiry-interval 600 --no-clean
^C
mqttx sub --client-id sub-2 --topic "t/#" --subscription-identifier 3 --session-expiry-interval 0 --no-clean --output-mode clean
在第二个终端窗口中,我们看到从同一个队列到同一个主题交换机的 3 个绑定,每个绑定具有不同的路由键
docker exec rabbitmq rabbitmqctl --quiet --formatter=pretty_table list_bindings \
source_name source_kind destination_name destination_kind routing_key
┌─────────────┬─────────────┬─────────────────────────────┬──────────────────┬─────────────────────────────┐
│ source_name │ source_kind │ destination_name │ destination_kind │ routing_key │
├─────────────┼─────────────┼─────────────────────────────┼──────────────────┼─────────────────────────────┤
│ │ exchange │ mqtt-subscription-sub-2qos0 │ queue │ mqtt-subscription-sub-2qos0 │
├─────────────┼─────────────┼─────────────────────────────┼──────────────────┼─────────────────────────────┤
│ amq.topic │ exchange │ mqtt-subscription-sub-2qos0 │ queue │ t.# │
├─────────────┼─────────────┼─────────────────────────────┼──────────────────┼─────────────────────────────┤
│ amq.topic │ exchange │ mqtt-subscription-sub-2qos0 │ queue │ t.1 │
├─────────────┼─────────────┼─────────────────────────────┼──────────────────┼─────────────────────────────┤
│ amq.topic │ exchange │ mqtt-subscription-sub-2qos0 │ queue │ t.2 │
└─────────────┴─────────────┴─────────────────────────────┴──────────────────┴─────────────────────────────┘
第一条是到默认交换机的隐式绑定。
每个带有 MQTT 主题过滤器的 MQTT 订阅对应一个带有绑定键的 AMQP 0.9.1 绑定。准确地说,表中的 routing_key 列命名有误:应该称为 binding_key。MQTT 中的主题层级分隔符是“/”字符,而 AMQP 0.9.1 主题交换机中的分隔符是“.”字符。
再次在第二个终端窗口中,向主题 t/1 发送一条消息
mqttx pub --topic t/1 --message m1
第一个终端窗口(订阅客户端)接收到以下 PUBLISH 数据包
{
"topic": "t/1",
"payload": "m1",
"packet": {
...
"properties": {
"subscriptionIdentifier": [
1,
3
]
}
}
}
它包含订阅标识符 1 和 3,因为 t/1 和 t/# 两个主题过滤器都匹配主题 t/1。
同样,如果您向主题 t/2 发送第二条消息,订阅客户端将收到包含订阅标识符 2 和 3 的 PUBLISH 数据包。
实现
订阅标识符是 MQTT 会话状态的一部分。因此,当客户端断开连接直到 MQTT 会话结束期间,订阅标识符必须持久化在服务器的数据库中。RabbitMQ 将订阅标识符存储在绑定参数中
docker exec rabbitmq rabbitmqctl --quiet --formatter=pretty_table list_bindings routing_key arguments
┌─────────────────────────────┬───────────────────────────────────────────────────────────────────────────────────┐
│ routing_key │ arguments │
├─────────────────────────────┼───────────────────────────────────────────────────────────────────────────────────┤
│ mqtt-subscription-sub-2qos0 │ │
├─────────────────────────────┼───────────────────────────────────────────────────────────────────────────────────┤
│ t.# │ {mqtt_subscription_opts,0,false,false,0,3}{<<"x-binding-key">>,longstr,<<"t.#">>} │
├─────────────────────────────┼───────────────────────────────────────────────────────────────────────────────────┤
│ t.1 │ {mqtt_subscription_opts,0,false,false,0,1}{<<"x-binding-key">>,longstr,<<"t.1">>} │
├─────────────────────────────┼───────────────────────────────────────────────────────────────────────────────────┤
│ t.2 │ {mqtt_subscription_opts,0,false,false,0,2}{<<"x-binding-key">>,longstr,<<"t.2">>} │
└─────────────────────────────┴───────────────────────────────────────────────────────────────────────────────────┘
绑定参数的确切结构并不重要,将来可能会在 RabbitMQ 版本中更改。但是,您可以在绑定参数中看到数字 1、2 和 3,它们对应于订阅标识符。
当主题交换机路由消息时,发布消息的 Erlang 进程会将所有匹配的绑定键包含在消息中。订阅 MQTT 客户端的 Erlang 进程会将匹配的绑定键与它已知的主题过滤器进行比较,并将订阅标识符包含在发送给 MQTT 客户端的 PUBLISH 数据包中。
发送消息的 Erlang 进程可以是 MQTT 连接进程或 AMQP 0.9.1 通道进程。一如既往,RabbitMQ 在跨协议互操作性方面表现出色:当 AMQP 0.9.1(或 STOMP 或 AMQP 1.0)客户端向主题交换机发送消息时,正确的订阅标识符将被包含在发送给 MQTT 客户端的 PUBLISH 数据包中。
特性 3:订阅选项
描述
MQTT 5.0 提供了 3 种新的订阅选项
- No Local (不接收本地消息)
- Retain as Published (按原样保留)
- Retain Handling (保留处理)
所有订阅选项均由 RabbitMQ 实现。在此,我们仅关注“保留处理”选项
此选项指定在建立订阅时是否发送保留消息。
其取值如下:
0 = 在订阅时发送保留消息
1 = 仅当订阅当前不存在时,才在订阅时发送保留消息
2 = 在订阅时不要发送保留消息
示例
发送一条保留消息
mqttx pub --topic mytopic --message m --retain
保留处理值为 0 的将接收到保留消息,而值为 2 的则不会。
mqttx sub --topic mytopic --retain-handling 0
… Connecting...
✔ Connected
… Subscribing to mytopic...
✔ Subscribed to mytopic
payload: m
retain: true
^C
mqttx sub --topic mytopic --retain-handling 2
… Connecting...
✔ Connected
… Subscribing to mytopic...
✔ Subscribed to mytopic
特性 4:所有 ACK 中的原因码
描述
CONNACK、PUBACK、SUBACK、UNSUBACK 和 DISCONNECT 数据包均包含原因码。
实现
一个实现示例是:如果消息未路由到任何队列,RabbitMQ 将在 PUBACK 数据包中回复原因码 No matching subscribers(无匹配订阅者)。MQTT 5.0 的原因码 No matching subscribers 在概念上对应于 AMQP 0.9.1 中的 mandatory(强制)消息属性和 BasicReturn 处理程序。
特性 5:用户属性
描述
大多数 MQTT 数据包都可以包含用户属性。MQTT 规范并未定义用户属性的含义。
PUBLISH 数据包示例
PUBLISH 数据包中的用户属性由客户端应用程序定义,并由服务器原样转发。
在第一个终端窗口中订阅
mqttx sub --topic t/5
在第二个终端窗口中发布一条带用户属性的消息
mqttx pub --topic t/5 --message m --user-properties "key1: value1"
第一个终端窗口将原样接收到这些用户属性
payload: m
userProperties: [ { key: 'key1', value: 'value1' } ]
MQTT 5.0 PUBLISH 数据包中的用户属性类似于 AMQP 0.9.1 中的 headers(消息头)属性。
CONNECT 数据包示例
使用用户属性进行连接
mqttx conn --client-id myclient --user-properties "connecting-from: London"
在浏览器中打开管理界面 https://:15672/#/connections(用户名和密码均为 guest),然后点击该 MQTT 连接

RabbitMQ 将在管理界面中显示 CONNECT 数据包中的用户属性。
特性 6:负载格式和内容类型
描述
发布者可以指定 MIME 内容类型。它还可以设置负载格式指示符,指示负载是 UTF-8 编码的字符数据还是未指定的二进制数据。
示例
在第一个终端窗口中订阅主题
mqttx sub --topic t/6 --output-mode clean
在第二个终端窗口中,发送一条带有内容类型和负载格式指示符的消息
mqttx pub --topic t/6 --message "my UTF-8 encoded data 🙂" --content-type text/plain --payload-format-indicator
第一个终端窗口将原样接收到内容类型和负载格式指示符
{
"topic": "t/6",
"payload": "my UTF-8 encoded data 🙂",
"packet": {
...
"properties": {
"payloadFormatIndicator": true,
"contentType": "text/plain"
}
}
}
特性 7:请求/响应
描述
MQTT 5.0 正式引入了请求/响应模式。
在发布消息之前,MQTT 客户端(请求者)会订阅一个响应主题。请求者将响应主题和一些关联数据包含在请求消息中。
另一个 MQTT 客户端(响应者)接收请求消息,执行某些操作,并将带有相同关联数据的响应消息发布到响应主题。
MQTT 5.0 的请求/响应特性对应于 AMQP 0.9.1 中的远程过程调用 (RPC)。但是,在 AMQP 0.9.1 中,请求者会将回调队列的名称包含在 AMQP 0.9.1 消息属性 reply_to 中。MQTT 协议没有定义队列的概念。因此,在 MQTT 中,回复的“地址”是一个主题名称。
尽管协议规范之间存在不兼容性,但 RabbitMQ 在协议互操作性方面表现卓越:因此,RabbitMQ 支持跨协议的请求/响应交互。
例如,MQTT 客户端可以在请求消息中包含响应主题和关联数据。如果 AMQP 0.9.1 客户端创建了一个绑定到主题交换机 amq.topic 且绑定键匹配请求消息主题的队列,它将收到一条 AMQP 0.9.1 消息,其 correlation_id 属性设置为 MQTT 客户端发送的关联数据,并带有一个名为 x-opt-reply-to-topic 的头部。然后,AMQP 0.9.1 客户端可以使用相同的 correlation_id 将响应消息发布到主题交换机 amq.topic,并指定 x-opt-reply-to-topic 头部中提供的主题,从而回复 MQTT 5.0 客户端。
示例
此示例仅关注 MQTT 客户端。
在第一个终端窗口中,响应 MQTT 客户端订阅主题 t/7;
mqttx sub --client-id responder --topic t/7 --session-expiry-interval 600 --output-mode clean --qos 1
在第二个终端窗口中,请求 MQTT 客户端订阅一个名为 my/response/topic 的主题
mqttx sub --client-id requester --topic my/response/topic --session-expiry-interval 600 --qos 1
… Connecting...
✔ Connected
… Subscribing to my/response/topic...
✔ Subscribed to my/response/topic
^C
在第二个终端窗口中,请求者随后发布一条请求消息
mqttx pub --client-id requester --topic t/7 --message "my request" \
--correlation-data abc-123 --response-topic my/response/topic \
--session-expiry-interval 600 --no-clean
在第一个终端窗口中,响应者收到请求消息
{
"topic": "t/7",
"payload": "my request",
"packet": {
...
"properties": {
"responseTopic": "my/response/topic",
"correlationData": {
"type": "Buffer",
"data": [
97,
98,
99,
45,
49,
50,
51
]
}
}
}
}
^C
在第一个终端窗口中,响应者通过复制关联数据并发布到响应主题来回复请求者
mqttx pub --client-id responder --topic my/response/topic --message "my response" --correlation-data abc-123
在第二个终端窗口中,请求者收到响应。
mqttx sub --client-id requester --topic my/response/topic --no-clean --qos 1 --output-mode clean
{
"topic": "my/response/topic",
"payload": "my response",
"packet": {
...
"properties": {
"correlationData": {
"type": "Buffer",
"data": [
97,
98,
99,
45,
49,
50,
51
]
}
}
}
}
关联数据有助于将响应与请求相关联。请求者通常为每个发布的请求选择唯一的关联数据。
特性 8:分配的客户端标识符
描述
如果客户端使用长度为零的客户端标识符进行连接,服务器必须回复包含“分配的客户端标识符”的 CONNACK。
与 MQTT 3.1.1 相比,这取消了服务器分配的客户端 ID 只能用于 Clean Session = 1 连接的限制。
实现
RabbitMQ 将生成一个随机的客户端 ID(例如 dcGB2kSwS0JlXnaBa1A6QA)并将其返回在 CONNACK 数据包中。
特性 9:主题别名
描述
主题别名是一个整数值,用于标识主题而不是使用主题名称。这减小了 PUBLISH 数据包的大小,并且在主题名称很长且在网络连接中重复使用相同主题名称时非常有用。
实现
RabbitMQ 中的默认主题别名最大值是 16。您可以在 rabbitmq.conf 中配置此值,例如
mqtt.topic_alias_maximum = 32
此配置值映射到从 RabbitMQ 发送给客户端的 CONNACK 数据包中的“主题别名最大值”。它限制了两个方向(即从客户端到 RabbitMQ 和从 RabbitMQ 到客户端)上的主题别名数量。如果客户端向多个不同主题发送或接收消息,设置更高的值将需要更多的内存。
RabbitMQ 操作员可以通过设置以下内容来禁止使用主题别名
mqtt.topic_alias_maximum = 0
特性 10:流控制
描述
MQTT 5.0 的“接收最大值”属性定义了未确认的 QoS 1 PUBLISH 数据包的上限。
实现
从 RabbitMQ 发送给客户端的未确认 QoS 1 PUBLISH 数据包的最大数量由客户端在 CONNECT 数据包中发送给 RabbitMQ 的“接收最大值”与配置的 mqtt.prefetch 值中的较小者决定
mqtt.prefetch = 10
mqtt.prefetch 的默认值为 10。
mqtt.prefetch 值在 RabbitMQ 3.13 之前就已经存在于 MQTT 3.1 和 3.1.1 中。它映射到 RabbitMQ 中的消费者预取 (consumer prefetch)。换句话说,它定义了队列向其 MQTT 连接进程发送多少条在途消息。
特性 11:最大数据包大小
描述
客户端和服务器可以独立指定它们支持的最大数据包大小。
示例
此示例演示了如何限制从客户端发送给 RabbitMQ 的最大 MQTT 数据包大小。
假设在身份验证成功后,RabbitMQ 操作员不希望 RabbitMQ 接受大于 1 KiB 的 MQTT 数据包。将以下配置写入 rabbitmq.conf(在您当前的工作目录中)
mqtt.max_packet_size_authenticated = 1024
停止 RabbitMQ 服务器后,使用应用了新配置的 RabbitMQ 服务器启动
docker run -it --rm --name rabbitmq -p 1883:1883 -p 15672:15672 -p 15692:15692 \
--mount type=bind,source="$(pwd)"/rabbitmq.conf,target=/etc/rabbitmq/conf.d/11-blog-post.conf \
rabbitmq:3.13.0-beta.2-management
docker exec rabbitmq rabbitmq-plugins enable rabbitmq_mqtt
docker exec rabbitmq rabbitmqctl enable_feature_flag all
在第一个终端窗口中订阅主题
mqttx sub --topic t/11
在第二个终端窗口中,向该主题发送一条有效负载为 3 字节的消息
payload=$(head --bytes 3 < /dev/zero | tr '\0' x)
mqttx pub --topic t/11 -m "$payload"
第一行从特殊文件 /dev/zero 读取 3 个字节(3 个空字符),将每个空字符转换为 ASCII 字符 x,并将结果 xxx 保存到变量 payload 中。
第一个终端窗口将收到该消息
payload: xxx
接下来,在第二个终端窗口中,发送一条有效负载为 2000 字节的消息
payload=$(head --bytes 2000 < /dev/zero | tr '\0' x)
mqttx pub --topic t/11 -m "$payload"
这一次,第一个终端窗口没有收到该消息,因为从客户端发送到 RabbitMQ 的 PUBLISH 数据包大于配置的 1024 字节的最大数据包大小。
相反,RabbitMQ 会记录一条描述性的错误消息
[error] <0.836.0> MQTT packet size (2007 bytes, type 3) exceeds mqtt.max_packet_size_authenticated (1024 bytes)
日志消息显示 2007 字节,因为 PUBLISH 数据包的固定和可变头部需要 7 个字节(其中 4 个字节用于主题名称 t/11)。
特性 12:服务器发起的 DISCONNECT
描述
在 MQTT 5.0 中,DISCONNECT 数据包不仅可以从客户端发送到服务器,还可以从服务器发送到客户端。
实现
在终止连接之前,RabbitMQ 会在以下情况下向客户端发送 DISCONNECT 数据包
| DISCONNECT 原因码名称 | 情况 |
|---|---|
| Session taken over(会话被接管) | 另一个客户端使用相同的客户端 ID 连接。 |
| Server shutting down(服务器正在关闭) | RabbitMQ 进入维护模式。 |
| Keep Alive timeout(心跳超时) | 客户端未能在 Keep Alive 时间内进行通信。 |
| Packet too large(数据包过大) | RabbitMQ 收到大小超过 mqtt.max_packet_size_authenticated 的数据包 |
特性 13:会话过期
描述
在 MQTT 5.0 中,客户端可以在 CONNECT 数据包中向服务器建议会话过期时间。服务器可以在 CONNACK 数据包中接受建议的会话过期时间或强制执行不同的时间。
会话可以在一系列网络连接中持续存在。它持续的时间等于最新的网络连接时长加上会话过期时间。
当会话过期时间结束时,客户端和服务器都将删除任何会话状态。
实现
客户端和服务器保留会话状态的时间与会话持续时间一致。
服务器中的会话状态包括:已发送给客户端但尚未确认的消息、等待发送给客户端的消息以及客户端的订阅。RabbitMQ 以队列和绑定的形式对 MQTT 会话状态进行建模。
因此,会话过期时间映射到 RabbitMQ 中的队列 TTL。当 MQTT 会话过期时,队列及其包含的消息和绑定将被删除。
示例
默认情况下,服务器允许的最大会话过期时间为 1 天。如果 MQTT 客户端在 1 天内未重新连接,其会话状态将在 RabbitMQ 中被删除。
此值是可配置的。为了本示例的目的,让我们在 rabbitmq.conf 中设置一个非常短的 1 分钟会话过期时间
mqtt.max_session_expiry_interval_seconds = 60
设置名称包含前缀 max,因为 MQTT 5.0 客户端可以通过在 CONNECT 数据包中发送会话过期时间来选择更小的值。按照最大数据包大小示例中的操作,重启 RabbitMQ 节点以使新设置生效。
连接到 RabbitMQ,会话过期时间为 20 秒,并创建一个订阅
mqttx sub --client-id sub-13 --topic t/13 --session-expiry-interval 20 --qos 1
… Connecting...
✔ Connected
… Subscribing to t/13...
✔ Subscribed to t/13
^C
在终端中输入 Ctrl+C 以断开客户端连接。
在接下来的 20 秒内,列出队列和绑定
docker exec rabbitmq rabbitmqctl list_queues name
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name
mqtt-subscription-sub-13qos1
docker exec rabbitmq rabbitmqctl list_bindings source_name destination_name routing_key --formatter=pretty_table
Listing bindings for vhost /...
┌─────────────┬──────────────────────────────┬──────────────────────────────┐
│ source_name │ destination_name │ routing_key │
├─────────────┼──────────────────────────────┼──────────────────────────────┤
│ │ mqtt-subscription-sub-13qos1 │ mqtt-subscription-sub-13qos1 │
├─────────────┼──────────────────────────────┼──────────────────────────────┤
│ amq.topic │ mqtt-subscription-sub-13qos1 │ t.13 │
└─────────────┴──────────────────────────────┴──────────────────────────────┘
20 秒后,再次列出队列和绑定
docker exec rabbitmq rabbitmqctl list_queues name
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
docker exec rabbitmq rabbitmqctl list_bindings source_name destination_name routing_key --formatter=pretty_table
Listing bindings for vhost /...
队列及其绑定被 RabbitMQ 删除,因为我们的客户端没有在 20 秒的会话过期时间内以 Clean Session = 0 重新连接到 RabbitMQ。
接下来,进行同样的测试,但设置较长的会话过期时间,例如 1 小时
mqttx sub --client-id sub-13 --topic t/13 --session-expiry-interval 3600 --qos 1
… Connecting...
✔ Connected
… Subscribing to t/13...
✔ Subscribed to t/13
^C
您应该会观察到队列及其绑定在 1 分钟后被删除,因为有效的会话过期时间是客户端请求的时间(1 小时)和 RabbitMQ 中配置的 mqtt.max_session_expiry_interval_seconds 值(1 分钟)中的较小值。
特性 14:遗嘱延迟
描述
客户端可以在 CONNECT 数据包中定义遗嘱延迟间隔。
服务器会延迟发布客户端的遗嘱消息,直到遗嘱延迟间隔结束或会话结束,以先发生者为准。如果在遗嘱延迟间隔结束之前建立了该会话的新网络连接,则服务器不得发送遗嘱消息。其用途之一是避免在出现临时网络断开连接,且客户端在遗嘱消息发布前成功重新连接并继续其会话时发布遗嘱消息。
遗嘱延迟间隔的另一个使用场景是通知会话过期
客户端可以通过将遗嘱延迟间隔设置为比会话过期时间更长,并发送原因码为 0x04(带遗嘱消息的断开连接)的 DISCONNECT 数据包,从而安排遗嘱消息来通知会话已过期。
实现
虽然遗嘱消息负载通常很小,但 MQTT 规范允许遗嘱消息负载大小高达 64 KiB。
为了避免在 Khepri(RabbitMQ 未来的元数据存储)中存储大量二进制数据,RabbitMQ 创建一个包含此条遗嘱消息的经典队列。我们将此队列称为“遗嘱队列”。此消息设置了以毫秒为单位的每条消息 TTL,该值对应于以秒为单位的遗嘱延迟间隔。此外,遗嘱队列还设置了以毫秒为单位的队列 TTL,该值对应于以秒为单位的会话过期时间。有效的每条消息 TTL 至少比队列 TTL 小几毫秒,以便消息在队列(会话)过期前不久发布。
遗嘱队列还将 amq.topic(MQTT 插件使用的默认主题交换机)定义为死信交换机,并将遗嘱主题定义为死信路由键。
如果 MQTT 客户端未能在其遗嘱延迟间隔内重新连接,遗嘱队列中的消息将被死信路由到主题交换机。
让我们通过一个示例来说明这一点。
示例
在第一个终端窗口中,创建一个将消费遗嘱消息的订阅
mqttx sub --client-id sub-14 --topic t/14
在第二个终端窗口中,创建一个具有 20 秒遗嘱延迟间隔的连接
mqttx conn --client-id conn-14 --will-topic t/14 --will-message my-will-message --will-delay-interval 20 --session-expiry-interval 40
在第三个终端窗口中,我们看到到目前为止,由订阅 MQTT 客户端创建了一个队列
docker exec rabbitmq rabbitmqctl --quiet --formatter=pretty_table list_queues name type messages arguments
┌──────────────────────────────┬────────────┬──────────┬───────────┐
│ name │ type │ messages │ arguments │
├──────────────────────────────┼────────────┼──────────┼───────────┤
│ mqtt-subscription-sub-14qos0 │ MQTT QoS 0 │ 0 │ │
└──────────────────────────────┴────────────┴──────────┴───────────┘
在第二个终端窗口中,输入 Ctrl+C 断开客户端 ID 为 conn-14 的 MQTT 连接。
这一次,列出队列显示遗嘱队列已经创建
docker exec rabbitmq rabbitmqctl --quiet --formatter=pretty_table list_queues name type messages arguments
┌──────────────────────────────┬────────────┬──────────┬────────────────────────────────────────────────────────────┐
│ name │ type │ messages │ arguments │
├──────────────────────────────┼────────────┼──────────┼────────────────────────────────────────────────────────────┤
│ mqtt-subscription-sub-14qos0 │ MQTT QoS 0 │ 0 │ │
├──────────────────────────────┼────────────┼──────────┼────────────────────────────────────────────────────────────┤
│ mqtt-will-conn-14 │ classic │ 1 │ {<<"x-expires">>,long,40000} │
│ │ │ │ {<<"x-dead-letter-exchange">>,longstr,<<"amq.topic">>} │
│ │ │ │ {<<"x-dead-letter-routing-key">>,longstr,<<"t.14">>} │
└──────────────────────────────┴────────────┴──────────┴────────────────────────────────────────────────────────────┘
遗嘱队列的命名模式为 mqtt-will-<MQTT Client ID>。它包含一条消息:遗嘱消息。
正如上一节所述,队列 TTL (x-expires) 为 40,000 毫秒,因此与我们上述命令中的 40 秒会话过期时间相匹配。如果您等待 20 秒,您的第一个终端窗口应该会收到遗嘱消息,因为我们的客户端没有在遗嘱延迟间隔内重新连接
› payload: my-will-message
特性 15:可选的服务器特性可用性
描述
定义一组服务器不允许的特性,并提供一种机制供服务器向客户端说明这一点。可以通过这种方式指定的特性包括
- 最大 QoS (Maximum QoS)
- 保留可用 (Retain Available)
- 通配符订阅可用 (Wildcard Subscription Available)
- 订阅标识符可用 (Subscription Identifier Available)
- 共享订阅可用 (Shared Subscription Available)
如果客户端使用了服务器声明为不可用的特性,则视为错误。
实现
RabbitMQ 3.13 在 CONNACK 属性中包含了 Maximum QoS = 1 和 Shared Subscription Available = 0。
RabbitMQ 尚未支持 QoS 2。
正如下一节所述,共享订阅将在未来的 RabbitMQ 版本中得到支持。
限制
本节列出了 RabbitMQ MQTT 实现的限制。
MQTT 5.0 特定限制
共享订阅
共享订阅将在未来的 RabbitMQ 版本中添加。虽然此特性很好地映射到 RabbitMQ 中的队列,但共享订阅是会话状态的一部分,并且需要进行一些 RabbitMQ 数据库迁移,以便为给定的 MQTT 客户端 ID 高效地查询共享订阅。
延迟和保留的遗嘱消息
既延迟又保留的遗嘱消息将不会被保留。这是因为延迟的遗嘱消息将被死信路由到主题交换机,而保留程序目前并不从队列中消费消息。这一限制可以在将来通过为保留消息提供新的存储机制来解决。
非 MQTT 5.0 特定限制
为完整起见,本节列出了在 RabbitMQ 3.13 支持 MQTT 5.0 之前以及在 RabbitMQ 3.12 发布原生 MQTT 之前就已存在的限制。
保留消息
RabbitMQ 中保留消息的功能是有限的。
保留消息仅在节点本地存储和查询。
一个有效的示例是:MQTT 客户端向节点 A 发布一条带有主题 topic/1 的保留消息。此后,另一个客户端在节点 A 上以主题过滤器 topic/1 订阅。新的订阅者将收到该保留消息。
但是,如果主题过滤器包含通配符(多级通配符字符“#”或单级通配符字符“+”),则不会发送保留消息(问题 #8824)。
此外,如果客户端在节点 A 上发布保留消息,而随后另一个客户端在节点 B 上订阅,该订阅客户端将不会收到存储在节点 A 上的任何保留消息(问题 #8096)。
未来的 RabbitMQ 版本将在集群中复制保留消息,并发送匹配包含通配符的主题过滤器的保留消息。
总结
总而言之,RabbitMQ
- 是领先的 AMQP 0.9.1 代理
- 是一个流代理
- 在跨协议互操作性方面表现出色
- 凭借 3.13 中发布的 MQTT 5.0 支持和 3.12 中发布的原生 MQTT,正成为领先的 MQTT 代理之一
我们将 RabbitMQ 打造为功能齐全的物联网代理的旅程尚未结束,未来几个月和几年内计划投入更多开发工作。敬请关注!
