RabbitMQ 是一个支持多种协议的消息代理,支持
消息可以很容易地通过一种协议发布,并由其他协议消费。这就需要不同协议格式之间的数据格式转换。在 RabbitMQ 3.13 之前,所有消息在发布到 RabbitMQ 时都会被转换为基于 AMQP 0.9.1 协议的内部格式。这种方法经常导致不必要的转换和/或数据保真度问题。
在当前的 RabbitMQ 版本中,AMQP、AMQP 0.9.1 和 MQTT 协议的方法已有所改变,即消息将始终以其原始格式存储,只有当消息被与发布协议不同的协议消费时,才会被转换*。这样做的好处是,当发布协议和消费协议相同时,不会丢失任何协议特定的信息。
此规则唯一的例外是 streams:无论哪种协议发布,streams 内部都将其消息以 AMQP 编码数据格式存储。因此,例如使用 AMQP 0.9.1 发布时,会进行到 AMQP 的转换。
所有转换都已重写并进行了正式化。本文档旨在捕获实现的转换规则,以便应用程序开发人员可以参考本文档来理解他们在多协议消息应用程序中消息的转换方式。
shortstr:(条件)一个字符串,其长度小于 256 字节,且仅包含有效的 UTF-8 编码数据,不含 NUL(二进制零字节)字符。
*:读作“任意”(字段/类型等)。
条件可以按编写顺序读取并评估。例如,类型为 utf8 且长度超过 255 字节的 AMQP message_id 将首先失败 shortstr 条件,然后继续检查下一行。如果没有匹配的行,则数据将不会被转换(因此,消费应用程序将无法获得该数据)。
AMQP 1.0 -> AMQP 0.9.1
如果消息被 AMQP 0.9.1 消费,并且该消息是:
- 使用 AMQP 1.0 发布,或
- 从 stream 消费。
| AMQP 部分 | AMQP 字段 | AMQP 类型 | 条件 | AMQP 0.9.1 部分 | AMQP 0.9.1 字段 | AMQP 0.9.1 类型 | 注释 |
|---|
| header | durable | boolean | | properties | delivery_mode | ubyte | 2 = durable, 1 = transient |
| header | priority | ubyte | | properties | priority | ubyte | |
| header | ttl | milliseconds (uint) | | properties | expiration | shortstr | 毫秒转换为字符串值 |
| properties | message_id | utf8 | shortstr | properties | message_id | shortstr | |
| properties | message_id | utf8 | > 256 字节 | properties | headers, Key: x-message-id | longstr | |
| properties | message_id | uuid (16 字节二进制) | | properties | message_id | shortstr | 转换为文本,例如 "urn:uuid:550e8400-e29b-41d4-a716-446655440000" |
| properties | message_id | ulong | | properties | message_id | shortstr | 转换为文本 |
| properties | message_id | binary | | properties | headers, Key: x-message-id | binary | |
| properties | user_id | binary | shortstr | properties | user_id | shortstr | |
| properties | 更改为 | address | | | | | |
| properties | subject | utf8 | | | | | |
| properties | reply_to | address | shortstr | properties | reply_to | shortstr | |
| properties | correlation_id | utf8 | shortstr | properties | correlation_id | shortstr | |
| properties | correlation_id | utf8 | > 256 字节 | properties | headers, Key: x-correlation-id | longstr | |
| properties | correlation_id | uuid (16 字节二进制) | | properties | correlation_id | shortstr | 转换为文本表示,例如 "urn:uuid:550e8400-e29b-41d4-a716-446655440000" |
| properties | correlation_id | ulong | | properties | correlation_id | shortstr | 转换为数字的文本表示 |
| properties | correlation_id | binary | | properties | headers, Key: x-correlation-id | binary | |
| properties | content_type | symbol | | properties | content_type | shortstr | |
| properties | content_encoding | symbol | | properties | content_encoding | shortstr | |
| properties | absolute_expiry_time | timestamp | | | | | |
| properties | creation_time | timestamp | | properties | timestamp | timestamp | 转换为秒 |
| properties | group_id | utf8 | shortstr | properties | app_id | shortstr | |
| properties | group_sequence | sequence-no | | | | | |
| properties | reply_to_group_id | utf8 | | | | | |
| application properties | * | * (见类型转换) | Key: shortstr | properties | headers | * | |
| message annotations | * (symbol - x-cc*) | | Key: x-cc | properties | headers, Key: "CC" | longstr | |
| message annotations | * (symbol - x-*) | * (见类型转换) | Key: shortstr & x-* | properties | headers | * | 通常这意味着 x- headers |
| data | | data | 单个数据段 | payload | | binary | 从数据段中提取的纯二进制数据 |
| data (multiple) | * | * | 多个数据段 | payload | | AMQP 1.0 编码的二进制数据 | properties.type 将被设置为 "amqp-1.0" |
| amqp.value | * | * | | payload | | AMQP 1.0 编码的二进制数据 | properties.type 将被设置为 "amqp-1.0" |
| amqp.sequence | * | * | | payload | | AMQP 1.0 编码的二进制数据 | properties.type 将被设置为 "amqp-1.0" |
类型转换
| AMQP 1.0 类型 | 条件 | AMQP 0.9.1 类型 | 注释 |
|---|
| utf8 (string) | | longstr | RabbitMQ 不支持 shortstr 头部值,因此所有 utf8 输入都会被转换为 longstr,除非它是 basic properties 中的字段。 |
| binary | | binary | |
| long | | long | |
| ulong | | long | 溢出风险 |
| ubyte | | unsignedbyte | |
| short | | short | |
| ushort | | usignedshort | |
| uint | | unsignedint | |
| int | | signedint | |
| double | 既不是 NaN 也不是 Inf | double | |
| float | 既不是 NaN 也不是 Inf | float | |
| decimal | | | 类型为 decimal32、decimal64 或 decimal128 的消息注解或应用程序属性不会被转换。 |
| boolean | | bool | |
| timestamp | | timestamp (seconds) | 值除以 1000 |
| byte | | byte | |
| null | | void | |
| list | | array | |
| map | | table | |
| symbol | | longstr | |
AMQP 0.9.1 -> AMQP 1.0
| AMQP 0.9.1 部分 | AMQP 0.9.1 字段 | AMQP 0.9.1 类型 | 条件 | AMQP 1.0 部分 | AMQP 1.0 字段 | AMQP 1.0 类型 | 注释 |
|---|
| basic properties | message_id | shortstr | 有效的 urn uuid | properties | message_id | uuid | |
| basic properties | message_id | shortstr | | properties | message_id | utf8 | |
| basic properties | correlation_id | shortstr | 有效的 urn uuid | properties | correlation_id | uuid | |
| basic properties | correlation_id | shortstr | | properties | correlation_id | utf8 | |
| basic properties | user_id | shortstr | | properties | user_id | binary | |
| basic properties | expiration | shortstr | 如果可转换为数值类型 | header | ttl | uint | |
| basic properties | type | shortstr | | message annotations | x-basic-type | utf8 | |
| basic properties | reply_to | shortstr | | properties | reply_to | utf8 | |
| basic properties | app_id | shortstr | | properties | group-id | utf8 | |
| basic properties | timestamp | timestamp (seconds) | | properties | creation_time | timestamp | 转换为毫秒 |
| basic properties | content-type | shortstr | | properties | content-type | symbol | |
| basic properties | content-encoding | shortstr | | properties | content-encoding | symbol | |
| basic properties | delivery-mode | octet | | header | durable | boolean | 以前在 x-basic-delivery-mode 中 |
| basic properties | priority | octet | | header | priority | ubyte | 以前在 x-basic-priority 中 |
| basic.properties | headers | | Key=x-amqp-1.0-properties | properties | - | - | 旧的 1.0 插件头部 |
| basic.properties | headers | | Key=x-amqp-1.0-application-properties | application properties | - | - | 旧的 1.0 插件头部 |
| basic.properties | headers | | Key=x-amqp-1.0-message-annotations | message annotations | - | - | 旧的 1.0 插件头部 |
| basic properties | headers | * | Key 以 "x-" 开头 | message annotations | 键 | * | 参见类型转换规则 |
| basic properties | headers | table | 值不是数组或表 | application properties | 键 | * | 参见类型转换规则 |
| basic properties | headers | table | 值是数组或表 | - | - | - | 未转换 |
| payload | - | binary | | data | - | data | |
类型转换
| AMQP 0.9.1 类型 | 条件 | AMQP 1.0 类型 | 注释 |
|---|
| longstr | shortstr | utf8 | 性能/精度权衡 |
| longstr | | binary | |
| long | | long | |
| ubyte | | ubyte | |
| short | | short | |
| ushort | | ushort | |
| uint | | uint | |
| int | | int | |
| double | | double | |
| float | | float | |
| bool | | boolean | |
| binary | | binary | |
| timestamp | | timestamp | 从秒转换为毫秒 |
| byte | | byte | |
| void | | null | |
| array | | list | |
| table | | map | |
| table.key | x-* header | symbol | |
| table.key | | utf8 | |
| table.value | | * | 根据此表进行转换 |
MQTT 5.0 -> AMQP 1.0
| MQTT 5.0 部分 | MQTT 字段 | MQTT 5.0 类型 | 条件 | AMQP 1.0 部分 | AMQP 1.0 字段 | AMQP 1.0 类型 | 注释 |
|---|
| Fixed Header | Dup | Bits | | | | | 设置为 header first-acquirer 没有意义,因为 DUP 标志仅适用于从客户端到服务器(从服务器到客户端的消费由 Redelivered 标志决定,该标志从队列发送) |
| Fixed Header | QoS | Bits | | header | durable | boolean | 如果 QoS > 0,则 durable 为 true |
| Fixed Header | Retain | Bits | | | | | |
| Variable Header | Payload Format Indicator | Bits | | | | | 参见“Payload”行,条件为“Payload Format Indicator set” |
| Variable Header | Message Expiry Interval | uint | | header | ttl | milliseconds | 秒转换为毫秒 |
| Variable Header | Topic Alias | ushort | | | | | |
| Variable Header | Response Topic | utf8 | | properties | reply_to | utf8 | 将 MQTT topic 转换为 AMQP routing key (RK)。将 reply-to 地址设置为 "/exchanges/" X "/" RK。 |
| Variable Header | Correlation Data | binary | urn:uuid | properties | correlation_id | uuid | |
| Variable Header | Correlation Data | binary | | properties | correlation_id | binary | |
| Variable Header | User Property | utf8 | Key 以 "x-" 开头且 key 为 ASCII | message annotation | | value: utf8 | Key 为 symbol 类型 |
| | | Key 不以 "x-" 开头 | application properties | | value: utf8 | Key 为 utf8 类型 |
| Variable Header | Subscription Identifier | uint | | | | | |
| Variable Header | Content Type | utf8 | 有效的 ASCII | properties | content_type | symbol | MQTT content type 是 UTF-8,而 AMQP 1.0 content type 仅为 ASCII。 |
| Payload | | | Payload Format Indicator 未设置 | data | | data | |
| Payload | | | Payload Format Indicator 已设置 | amqp.value | | utf8 | 如果设置了 Payload Format Indicator,则将 MQTT payload 转换为字符串(即单个 AMQP value section),因为 AMQP 字符串是 UTF-8 编码的。 |
类型转换
| MQTT 5.0 类型 | 条件 | AMQP 1.0 类型 | 注释 |
|---|
| Bits | | boolean | 仅转换选定的标志 |
| ushort | | ushort | |
| uint | | uint | |
| utf8 | | utf8 | |
| binary data | utf8 | utf8 | |
| binary data | | binary | |
| utf8 string pairs | | map | 去重键,但保持顺序 |
MQTT 5.0 -> AMQP 0.9.1
| MQTT 5.0 部分 | MQTT 5.0 字段 | MQTT 5.0 类型 | 条件 | AMQP 0.9.1 部分 | AMQP 0.9.1 字段 | AMQP 0.9.1 类型 | 注释 |
|---|
| Fixed Header | Dup | Bits | | | | | 设置为 basic.deliver 字段的 redelivered 没有意义,因为 DUP 标志仅适用于从客户端到服务器(从服务器到客户端的消费由 Redelivered 标志决定,该标志从队列发送) |
| Fixed Header | QoS | Bits | | properties | delivery_mode | | QoS 0 映射到 delivery_mode 1,QoS 1 映射到 delivery_mode 2 |
| Fixed Header | Retain | Bits | | | | | |
| Variable Header | Payload Format Indicator | Bits | | | | | AMQP 0.9.1 的 content_encoding 指的是 MIME content encoding,其有效值列在 https://www.iana.org/assignments/http-parameters/http-parameters.xml。因此,AMQP 0.9.1 的 content_encoding 是不同的。最好将其映射到 content_type 的 text/<subtype>; charset="utf-8,但 <subtype> 未定义。此外,MQTT 5.0 的 Content Type 属性已定义。 |
| Variable Header | Message Expiry Interval | uint | | properties | expiration | shortstr | 秒转换为毫秒 |
| Variable Header | Topic Alias | ushort | | | | | |
| Variable Header | Response Topic | utf8 | | properties | headers, Key: x-reply-to-topic | longstr | AMQP 0.9.1 的 reply_to 属性指的是队列名称,而不是 topic 名称。 |
| Variable Header | Correlation Data | binary | shortstr | properties | correlation_id | shortstr | |
| | | | properties | headers, Key: x-correlation-id | longstr | |
| Variable Header | User Property | utf8 string pair | 名称为 shortstr | properties | headers | longstr | RabbitMQ 不能将 shortstr 作为例如头部值。 |
| Variable Header | Subscription Identifier | variable byte integer | | | | | |
| Variable Header | Content Type | utf8 | shortstr | properties | content_type | shortstr | |
| Payload | | | | payload | | | |
类型转换
| MQTT 5.0 类型 | 条件 | AMQP 0.9.1 类型 | 注释 |
|---|
| Bits | | Bits | |
| ushort | | ushort | |
| uint | | uint | |
| utf8 | < 256 字节 且 目标不是 header | shortstr | RabbitMQ 不能将 shortstr 作为例如头部值。 |
| utf8 | >= 256 字节 | longstr | |
| binary | < 256 字节的 UTF-8 编码数据 | shortstr | |
| binary | 无有效 UTF-8 或 >= 256 字节 | longstr | |
| utf8 pairs | | table | AMQP 0.9.1 的 Field Tables 中“重复字段是非法的”。RabbitMQ 按键对字段进行排序。 |
AMQP 1.0 -> MQTT 5.0
| AMQP 1.0 部分 | AMQP 1.0 字段 | AMQP 1.0 类型 | 条件 | MQTT 5.0 部分 | MQTT 5.0 字段 | MQTT 5.0 类型 | 注释 |
|---|
| header | durable | boolean | | Fixed Header | QoS | Bits | durable=true 映射到 QoS 1,durable=false 映射到 QoS 0 |
| properties | message_id | * | | | | | |
| properties | user_id | binary | | | | | |
| properties | 更改为 | address | | | | | |
| properties | subject | utf8 | | | | | |
| properties | reply_to | address | "/exchanges/" X "/" RK,其中 X 匹配 mqtt.exchange | Variable Header | Response Topic | utf8 | 将 AMQP topic 转换为 MQTT topic |
| properties | correlation_id | * | | Variable Header | Correlation Data | binary | 转换为二进制 |
| properties | content_type | symbol | | Variable Header | Content Type | utf8 | |
| properties | content_encoding | symbol | | | | | |
| properties | absolute_expiry_time | timestamp | | | | | |
| properties | creation_time | timestamp | | | | | |
| properties | group_id | utf8 | | | | | |
| properties | group_sequence | sequence-no | | | | | |
| properties | reply_to_group_id | utf8 | | | | | |
| application properties | * | * | 不是 array、list、map 或 binary | User Properties | | utf8 | 值被转换为字符串表示 |
| message annotations | * | * | 仅 x- headers,不包括值是 array、list、map 或 binary 的那些。 | User Properties | | utf8 | |
| data | | data | 一个或多个数据段 | Payload | | binary | |
| data | * | * | 单个 AMQP 二进制值段 | Payload | | binary | |
| data | | amqp.value(utf8) | 可转换为 UTF-8 的单个 AMQP 值段 | Payload | | binary | Payload-Format-Indicator 设置为 1,以便 MQTT 5.0 客户端可以解析消息的文本表示,从而理解消息。 |
| data | | * | 其他单个 AMQP 值段或 amqp-sequence 段 | Payload | | binary | 使用 AMQP 类型系统进行编码,并包含 Content Type message/vnd.rabbitmq.amqp。此 Content Type 未注册。 |
AMQP 0.9.1 -> MQTT 5.0
| AMQP 0.9.1 部分 | AMQP 0.9.1 字段 | AMQP 0.9.1 类型 | 条件 | MQTT 5.0 部分 | MQTT 5.0 字段 | MQTT 5.0 类型 | 注释 |
|---|
| basic properties | message_id | shortstr | | | | | |
| basic properties | correlation_id | shortstr | | Variable Header | Correlation Data | binary | |
| basic properties | user_id | shortstr | | | | | |
| basic properties | expiration | shortstr | | Variable Header | Message Expiry Interval | uint | 从毫秒转换为秒 |
| basic properties | type | shortstr | | | | | |
| basic properties | reply_to | shortstr | | | | | |
| basic properties | app_id | shortstr | | | | | |
| basic properties | timestamp | timestamp (seconds) | | | | | |
| basic properties | content_type | shortstr | | Variable Header | Content Type | utf8 | |
| basic properties | content_encoding | shortstr | | | | | |
| basic properties | delivery_mode | octet | | Fixed Header | QoS | Bits | delivery-mode 2 映射到 QoS 1,delivery-mode 1 映射到 QoS 0 |
| basic properties | priority | octet | | | | | |
| basic.properties | headers | longstr | Key=x-reply-to-topic | Variable Header | Response Topic | utf8 | 将 AMQP topic 转换为 MQTT topic |
| basic.properties | headers | longstr | Key=x-correlation-id | Variable Header | Correlation Data | binary | |
| basic.properties | headers | * | 不是 array,而是 table | User properties | | utf8 | 简单值被转换为其字符串表示 |
| basic properties | headers | table | | | | | |
| payload | | binary | | Payload | | | |