发布者
概述
本指南涵盖了与发布者相关的各种主题
- 基础知识
- 发布者生命周期
- 协议差异
- 消息属性 和传递元数据
- 发布者端数据安全 主题(连接恢复、发布者确认)
- 异常处理
- 资源警报的影响
- 不可路由消息处理
- 指标 与发布者相关
- 并发考虑
- 如何临时阻止所有发布者
- 如何排查发布者常见问题
等等。
本指南侧重于 AMQP 0-9-1,并提到了 RabbitMQ 支持的其他协议(AMQP 1.0、MQTT 和 STOMP)的关键协议特定差异。
术语
术语“发布者”在不同的上下文中含义不同。一般来说,在消息传递中,发布者(也称为“生产者”)是发布(产生)消息的应用程序(或应用程序实例)。同一个应用程序也可以消费消息,因此可以同时充当消费者。
消息传递协议也包含消息传递的持久订阅的概念。订阅是常用的一种描述此类实体的术语。消费者是另一种。RabbitMQ 支持的消息传递协议都使用这两个术语,但 RabbitMQ 文档倾向于使用后者。
基础知识
RabbitMQ 是一个消息代理。它接收来自发布者的消息,路由这些消息,如果存在要路由到的队列,则存储这些消息以供消费,或者立即将消息传递给消费者(如果有)。
发布者发布到不同的目标,具体取决于协议。在 AMQP 0-9-1 中,发布者发布到交换机。在 AMQP 1.0 中,发布发生在链接上。在MQTT 中,发布者发布到主题。最后,STOMP 支持多种目标类型:主题、队列、AMQP 0-9-1 交换机。这将在协议特定差异部分中详细介绍。
发布的消息必须路由到队列(主题等)。队列(主题)可能具有在线消费者。当消息成功路由到队列并且存在可以接受更多传递的在线消费者时,消息将发送到消费者。
尝试发布到不存在的队列(主题)将导致通道级别的异常,代码为 404 Not Found
,并导致尝试发布的通道关闭。
发布者生命周期
发布者通常是长生命周期的:也就是说,在发布者的整个生命周期中,它都会发布多条消息。打开连接或通道(会话)来发布单个消息不是最佳做法。
发布者通常在应用程序启动期间打开其连接。它们通常会与其连接甚至应用程序运行的时间一样长。
发布者可以更动态,并响应系统事件开始发布,并在不再需要时停止发布。这在通过Web STOMP 和Web MQTT 插件使用的 WebSocket 客户端、移动客户端等中很常见。
协议差异
发布消息的过程在 RabbitMQ 支持的每个协议中都非常相似。所有四种协议都允许用户发布一条消息,该消息具有有效负载(正文)和一个或多个消息属性(标头)。
所有四种协议还支持发布者的确认机制,该机制允许发布应用程序跟踪代理已成功接受或未成功接受的消息,并继续发布下一批消息或重试发布当前消息。
差异通常与使用的术语而非语义相关。消息属性 也因协议而异。
AMQP 0-9-1
在 AMQP 0-9-1 中,发布发生在通道 上到交换机。交换机使用通过定义一个或多个队列和交换机之间的绑定或源交换机和目标交换机设置的路由拓扑。成功路由的消息存储在队列中。
每个实体的作用在AMQP 0-9-1 概念指南中进行了介绍。
发布者确认 是发布者确认机制。
有几种常见的发布者错误类型使用不同的协议功能进行处理
- 发布到不存在的交换机将导致通道错误,这将关闭通道,因此不允许在其上进行进一步的发布(或任何其他操作)。
- 当已发布的消息无法路由到任何队列(例如,因为目标交换机没有定义任何绑定)并且发布者将
mandatory
消息属性设置为false
(这是默认值)时,消息将被丢弃或重新发布到备用交换机(如果有)。 - 当已发布的消息无法路由到任何队列并且发布者将
mandatory
消息属性设置为true
时,消息将返回给它。发布者必须设置一个返回的消息处理程序才能处理返回(例如,通过记录错误或使用不同的交换机重试)
AMQP 1.0
在 AMQP 1.0 中,发布发生在链接的上下文中。
MQTT
在 MQTT 中,消息在连接到主题上发布。服务器端 MQTT 连接过程通过主题交换机将消息路由到队列。
当发布者选择使用 QoS 1 时,RabbitMQ 使用PUBACK 数据包确认已发布的消息。
发布者可以向服务器提供提示,表明主题上的已发布消息必须保留(存储以供将来传递给新订阅者)。每个主题仅保留最新发布的消息。
MQTT 5.0 PUBACK 数据包包含一个原因代码,用于告知发布者发布是否成功。RabbitMQ 返回的原因代码包括
0 - 成功
:消息路由到的所有队列都成功接受了消息。16 - 没有匹配的订阅者
:RabbitMQ 无法将消息路由到任何队列(因为主题交换机没有定义任何绑定)。131 - 实现特定错误
:RabbitMQ 拒绝了消息(例如,当目标经典队列不可用时)。
在 MQTT 3.1 和 3.1.1 中,除了关闭连接之外,服务器无法通过任何机制将发布错误传达给客户端。
请参阅MQTT 和MQTT-over-WebSockets 指南以了解更多信息。
STOMP
STOMP 客户端在连接到一个或多个目标上发布,在 RabbitMQ 的情况下,这些目标可能具有不同的语义。
STOMP 提供了一种方法,使服务器可以将消息处理中的错误传达回发布者。其发布者确认的变体称为回执,这是客户端在发布时启用的功能。
请参阅STOMP 指南、STOMP-over-WebSockets 和STOMP 1.2 规范以了解更多信息。
路由
AMQP 0-9-1
AMQP 0-9-1 中的路由由交换机执行。交换机是命名的路由表。表条目称为绑定。这在AMQP 0-9-1 概念指南中进行了更详细的介绍。
有几种内置的交换机类型
- 主题
- 扇出
- 直接(包括默认交换机)
- 标头
前三种类型在教程中提供了示例。
插件可以提供更多交换机类型。一致哈希交换机、随机路由交换机、内部事件交换机 和延迟消息交换机 是随 RabbitMQ 一起提供的交换机插件。与所有插件一样,必须先启用它们才能使用。
不可路由消息处理
客户端可能会尝试向不存在的目标(交换机、主题、队列)发布消息。本节介绍不同协议在处理此类情况时的差异。
RabbitMQ 收集并公开 指标,可用于检测发布不可路由消息的发布者。
AMQP 0-9-1
当已发布的消息无法路由到任何队列(例如,因为目标交换机没有定义任何绑定)并且发布者将 mandatory
消息属性设置为 false
(这是默认值)时,消息将被丢弃或重新发布到备用交换机(如果有)。
当已发布的消息无法路由到任何队列,并且发布者将 mandatory
消息属性设置为 true
时,该消息将被返回给发布者。发布者必须设置返回消息处理程序才能处理返回(例如,通过记录错误或使用不同的交换机重试)。
备用交换机 是 AMQP 0-9-1 交换机的一个特性,允许客户端处理交换机无法路由的消息(即,由于没有绑定队列或没有匹配的绑定)。这方面的典型示例包括检测客户端意外或恶意发布无法路由的消息,或者“否则”路由语义,其中某些消息将被特殊处理,其余消息由通用处理程序处理。
MQTT
发布到新主题将为其设置一个队列。不同的主题/QoS 级别组合将使用具有不同属性的不同队列。因此,发布者和消费者必须使用相同的 QoS 级别。
STOMP
STOMP 支持多个不同的目标,包括那些假设预先存在的拓扑结构的目标。
/topic
:发布到尚未有消费者的主题将导致消息丢失。第一个订阅该主题的消费者将为其声明一个队列。/exchange
:目标交换机必须存在,否则服务器将报告错误。/amq/queue
:目标队列必须存在,否则服务器将报告错误。/queue
:发布到不存在的队列将设置该队列。/temp-queue
:发布到不存在的临时队列将设置该队列。
指标
有一个用于不可路由已丢弃消息的指标。
在上面的示例中,所有发布的消息都作为不可路由(且非强制)消息被丢弃。
消息属性
AMQP 0-9-1
每次传递都结合了消息元数据和传递信息。不同的客户端库使用略微不同的方式来提供对这些属性的访问。通常,传递处理程序可以访问传递数据结构。
以下属性是传递和路由详细信息;它们本身并非消息属性,而是在路由和传递时由 RabbitMQ 设置的。
属性 | 类型 | 描述 |
传递标签 | 正整数 | 传递标识符,请参阅 确认。 |
重新传递 | 布尔值 | 如果此消息之前 已传递并重新入队,则设置为 true 。 |
交换机 | 字符串 | 路由此消息的交换机 |
路由键 | 字符串 | 发布者使用的路由键 |
消费者标签 | 字符串 | 消费者(订阅)标识符 |
以下是消息属性。大多数属性是可选的。它们由发布者在发布时设置。
属性 | 类型 | 描述 | 必需? |
传递模式 | 枚举(1 或 2) | 2 表示“持久”,1 表示“瞬态”。某些客户端库将此属性公开为布尔值或枚举。 | 是 |
类型 | 字符串 | 特定于应用程序的消息类型,例如“orders.created”。 | 否 |
标头 | 映射(字符串 => 任何) | 带有字符串标头名称的标头的任意映射。 | 否 |
内容类型 | 字符串 | 内容类型,例如“application/json”。由应用程序使用,而非核心 RabbitMQ。 | 否 |
内容编码 | 字符串 | 内容编码,例如“gzip”。由应用程序使用,而非核心 RabbitMQ。 | 否 |
消息 ID | 字符串 | 任意消息 ID | 否 |
相关 ID | 字符串 | 帮助将请求与响应相关联,请参阅 教程 6。 | 否 |
回复到 | 字符串 | 携带响应队列名称,请参阅 教程 6。 | 否 |
过期时间 | 字符串 | 每条消息的 TTL。 | 否 |
时间戳 | 时间戳 | 应用程序提供的时间戳。 | 否 |
用户 ID | 字符串 | 用户 ID,如果设置,则 已验证。 | 否 |
应用 ID | 字符串 | 应用程序名称 | 否 |
消息类型
消息上的 type 属性是一个任意字符串,有助于应用程序传达消息的类型。它由发布者在发布时设置。该值可以是发布者和消费者商定的任何特定于域的字符串。
RabbitMQ 不会验证或使用此字段,它供应用程序和插件使用和解释。
实践中的消息类型自然会分成组,点分隔命名约定很常见(但 RabbitMQ 或客户端不要求),例如 orders.created
或 logs.line
或 profiles.image.changed
。
如果消费者收到未知类型的传递,强烈建议记录此类事件,以便于故障排除。
内容类型和编码
内容(MIME 媒体)类型和内容编码字段允许发布者传达消费者应如何反序列化和解码消息有效负载。
RabbitMQ 不会验证或使用这些字段,它供应用程序和插件使用和解释。
例如,具有 JSON 有效负载的消息 应使用 application/json
。如果有效负载使用 LZ77(GZip)算法压缩,则其内容编码应为 gzip
。
可以通过逗号分隔多个编码。
发布者确认(确认)和数据安全
确保数据安全是应用程序、客户端库和 RabbitMQ 集群节点共同的责任。本节介绍了一些与数据安全相关的话题。
网络可能会以不明显的方式发生故障,并且检测某些故障 需要时间。因此,已将其协议帧或一组帧(例如已发布的消息)写入其套接字的客户端不能假设该消息已到达服务器并已成功处理。它可能在传输过程中丢失,或者其传递可能会被显着延迟。
为了解决此问题,开发了一种 发布者端确认机制。它模仿了协议中已存在的 消费者确认机制。
使用发布者确认的策略
发布者确认 为应用程序开发人员提供了一种机制来跟踪哪些消息已成功被 RabbitMQ 接受。使用发布者确认有几种常用的策略。
- 逐条发布消息并使用流式确认(异步 API 元素:确认事件处理程序、期货/承诺等)。
- 发布一批消息并等待所有未完成的确认。
- 逐条发布消息并在继续发布之前等待确认。由于此选项会对发布者吞吐量产生强烈的负面影响,因此强烈建议不要使用此选项。
它们的吞吐量影响和易用性各不相同。
流式确认
大多数客户端库通常提供一种方法,允许开发人员在确认从服务器到达时处理单个确认。确认将异步到达。由于发布在 AMQP 0-9-1 中也本质上是异步的,因此此选项允许以很少的开销安全发布。该算法通常类似于以下内容。
- 在通道上启用发布者确认。
- 对于每条已发布的消息,添加一个映射条目,将当前序列号映射到该消息。
- 当收到肯定确认时,删除该条目。
- 当收到否定确认时,删除该条目并安排重新发布其消息(或其他合适的操作)。
在 RabbitMQ Java 客户端中,确认处理程序通过 ConfirmCallback 和 ConfirmListener 接口公开。必须将一个或多个侦听器 添加到通道。
批量发布
此策略涉及发布消息批次并等待整个批次得到确认。对批次执行重试。
- 在通道上启用发布者确认。
- 对于每批已发布的消息,等待所有未完成的确认。
- 当所有确认都为肯定时,发布下一批。
- 如果存在否定确认或超时命中,则重新发布整个批次或仅重新发布相关消息。
某些客户端提供用于等待所有未完成确认的便捷 API 元素。例如,在 Java 客户端中,有 Channel#waitForConfirms(timeout)。
由于此方法涉及等待确认,因此它会对发布者吞吐量产生负面影响。批次越大,影响越小。
发布并等待
此策略可以被认为是一种反模式,主要出于完整性考虑而记录。它涉及发布消息并立即等待到达未完成的确认。可以将其视为上述策略,其中批次发布的批次大小等于 1。
此方法将对吞吐量产生非常大的负面影响,不建议使用。
从连接故障中恢复
客户端和 RabbitMQ 节点之间的网络连接可能会发生故障。应用程序如何处理此类故障直接关系到整个系统的数据安全。
几个 RabbitMQ 客户端支持自动恢复连接和拓扑结构(队列、交换机、绑定和消费者):Java、.NET、Bunny 就是一些示例。
其他客户端没有将自动恢复作为一项功能提供,但提供了有关应用程序开发人员如何实现恢复的示例。
许多应用程序的自动恢复过程遵循以下步骤。
- 重新连接到可访问的节点。
- 恢复连接侦听器。
- 重新打开通道。
- 恢复通道侦听器。
- 恢复通道
basic.qos
设置、发布者确认和事务设置。
连接和信道恢复后,拓扑恢复即可开始。拓扑恢复包括以下操作,针对每个信道执行
- 重新声明交换机(预定义的交换机除外)
- 重新声明队列
- 恢复所有绑定
- 恢复所有消费者
异常处理
发布者通常会遇到两种类型的异常
- 由于写入失败或超时导致的网络 I/O 异常
- 确认确认传递超时
请注意,这里的“异常”指的是一般意义上的错误;某些编程语言根本没有异常,因此那里的客户端会以不同的方式传达错误。本节中的讨论和建议应同样适用于大多数客户端库和编程语言。
第一种类型的异常可能在写入过程中立即发生,也可能在一段时间后发生。这是因为某些类型的 I/O 故障(例如网络拥塞过高或数据包丢失率)可能需要一段时间才能检测到。连接恢复后,发布可以继续进行,但如果连接因警报而被阻止,则所有后续尝试都将失败,直到警报清除。这将在下面的资源警报的影响部分中详细介绍。
后一种类型的异常仅在应用程序开发人员提供超时时才会发生。对于给定应用程序而言,合理的超时值由开发人员决定。它不应低于有效的心跳超时。
资源警报的影响
当集群节点存在资源警报时,集群中尝试发布消息的所有连接都将被阻止,直到集群中的所有警报都清除。
当连接被阻止时,通过此连接发送的更多数据将不会在连接上读取、解析或处理。当连接被解除阻止时,所有客户端流量处理将恢复。
兼容的 AMQP 0-9-1 客户端将在被阻止和解除阻止时收到通知。
被阻止连接上的写入将超时或因 I/O 写入异常而失败。
指标
指标收集和监控对于发布者而言与应用程序或应用程序中的任何其他组件一样重要。RabbitMQ 收集的几个指标在发布者方面特别令人关注
发布和确认速率大多是不言而喻的。波动速率非常重要,因为它们有助于检测未以最佳方式使用连接或信道的应用程序,从而提供次优的发布速率并浪费资源。
不可路由消息速率可以帮助检测发布无法路由到任何队列的消息的应用程序。例如,这可能表明配置错误。
客户端库也可以收集指标。RabbitMQ Java 客户端就是一个例子。这些指标可以深入了解特定于应用程序的架构(例如,哪个发布组件发布不可路由的消息),而 RabbitMQ 节点无法推断这些信息。
并发注意事项
并发主题完全与客户端库实现细节有关,但可以提供一些通用建议。一般而言,应避免在共享“发布上下文”(AMQP 0-9-1 中的信道、STOMP 中的连接、AMQP 1.0 中的会话等)上发布,并将其视为不安全。
这样做会导致网络上传输的数据帧格式不正确。这会导致连接关闭。
对于单个应用程序中的少量并发发布者,每个发布者使用一个线程(或类似方法)是最佳解决方案。对于大量发布者(例如数百或数千个),请使用线程池。
临时阻止发布
可以通过将内存高水位标记设置为0
来有效地阻止集群中的所有发布,从而使资源警报立即触发
rabbitmqctl set_vm_memory_high_watermark 0
发布者故障排除
本节介绍了发布者的一些常见问题,以及如何识别和解决这些问题。分布式系统中的故障有多种形式,因此此列表绝非详尽无遗。
连接故障
与任何客户端一样,发布者必须首先成功连接并成功进行身份验证。
潜在的连接问题数量非常广泛,并且有一个专门的指南。
身份验证和授权
与任何客户端一样,发布者可能无法进行身份验证,或者没有权限访问其目标虚拟主机或发布到目标交换机。
RabbitMQ 会将此类故障记录为错误。
连接波动
某些应用程序为每个发布的消息打开一个新的连接。这效率极低,并且不是消息协议设计的使用方式。可以使用连接指标检测此类情况。
在可能的情况下,优先使用长期存在的连接。
连接中断
网络连接可能会失败。某些客户端库支持自动连接和拓扑恢复,其他库使在应用程序代码中实现连接恢复变得容易。
当连接断开时,不会有发布通过或由客户端内部排队(延迟)。此外,先前已序列化并写入套接字的消息不能保证到达目标节点。因此,对于需要可靠发布和数据安全的发布者而言,至关重要的是使用发布者确认来跟踪 RabbitMQ 已确认哪些发布。在一段时间后,应将未确认的消息视为未传递。如果对应用程序来说这样做是安全的,则可以重新发布这些消息。这在教程 7和本指南中的数据安全部分中进行了介绍。
有关详细信息,请参阅从网络连接故障中恢复。
路由问题
发布者可以成功连接、进行身份验证并被授予发布到交换机(主题、目标)的权限。但是,这些消息可能无法路由到任何队列或消费者。这可能是由于
- 应用程序之间的配置不匹配,例如发布者和消费者使用的主题不匹配
- 发布者配置错误(交换机、主题、路由密钥不正确)
- 对于 AMQP 0-9-1,目标交换机上缺少绑定
- 存在资源警报:请参阅以下部分
- 网络连接已失败,客户端未恢复:请参阅上一节
检查拓扑和指标通常有助于快速缩小问题范围。例如,管理 UI中的单个交换机页面可用于确认是否存在入站消息活动(入口速率高于零)以及绑定是什么。
在以下示例中,交换机没有绑定,因此不会将任何消息路由到任何位置
还可以使用rabbitmq-diagnostics列出绑定
# note that the implicit default exchange bindings won't
# be listed as of RabbitMQ 3.8
rabbitmq-diagnostics list_bindings --vhost "/"
=> Listing bindings for vhost /...
在上面的示例中,该命令没有产生任何结果。
从 RabbitMQ 3.8 开始,有一个新的指标用于不可路由的已丢弃消息
在上面的示例中,所有发布的消息都作为不可路由(且非强制性)消息被丢弃。请参阅本指南中的不可路由消息处理部分。
集群范围和连接指标以及服务器日志将有助于发现有效的资源警报。
资源警报
当存在资源警报时,所有发布的连接都将被阻止,直到警报清除。客户端可以选择在被阻止时接收通知。在资源警报指南中了解更多信息。
协议异常
对于某些协议(例如 AMQP 0-9-1 和 STOMP),发布者可能会遇到称为协议错误(异常)的情况。例如,发布到不存在的交换机或将交换机绑定到不存在的交换机将导致信道异常,并使信道关闭。无法在已关闭的信道上发布。RabbitMQ 节点(发布者已连接到的节点)会记录此类事件。根据使用的客户端库,发布尝试失败也会导致客户端异常或返回错误。
在共享信道上并发发布
客户端库不支持在共享信道上并发发布。在并发注意事项部分中了解更多信息。