跳至主内容
版本:4.2

发布者

概述

本指南涵盖了与发布者相关的各种主题

等等。

本指南侧重于 AMQP 0-9-1,并提到了与 RabbitMQ 支持的其他协议(AMQP 1.0、MQTT 和 STOMP)的关键协议特定差异。

术语

“发布者”一词在不同上下文中具有不同的含义。总的来说,在消息传递中,“发布者”(也称为“生产者”)是发布(生产)消息的应用程序(或应用程序实例)。同一个应用程序也可以消费消息,因此可以同时成为消费者

消息传递协议还具有用于消息传递的持久订阅的概念。订阅是描述此类实体的常用术语之一。消费者是另一个。RabbitMQ 支持的消息传递协议都使用这两个术语,但 RabbitMQ 文档倾向于使用后者。

基础知识

RabbitMQ 是一个消息代理。它接收来自发布者的消息,将它们路由,如果存在要路由到的队列,则将它们存储以供消费,或者如果存在消费者,则立即将它们传递给消费者。

发布者发布到因协议而异的目标。

在 AMQP 0-9-1 中,发布者发布到交换器。在 AMQP 1.0 中,发布发生在链接上。在MQTT 中,发布者发布到主题。最后,STOMP 支持各种目标类型:主题、队列、AMQP 0-9-1 交换器。

这将在协议特定差异部分更详细地介绍。

已发布的的消息必须路由到队列(主题等)。队列(主题)可能有关联的消费者。当消息成功路由到队列并且存在一个在线的消费者可以接受更多传递时,消息将被发送到消费者。

尝试发布到不存在的队列(主题)将导致一个通道级异常,代码为 404 Not Found,并导致尝试该操作的通道关闭。

发布者生命周期

发布者通常是长期存在的:也就是说,在发布者的整个生命周期中,它会发布多条消息。为单个消息的发布打开连接或通道(会话)不是最优的。

发布者通常在应用程序启动时打开其连接。它们通常会持续到其连接甚至应用程序运行为止。

发布者可以更加动态,并在响应系统事件时开始发布,在不再需要时停止。这在使用Web STOMPWeb MQTT 插件、移动客户端等时很常见。

协议差异

在 RabbitMQ 支持的每种协议中,发布消息的过程都非常相似。所有四种协议都允许用户发布一条具有有效负载(正文)和一条或多条消息属性(头)的消息。

所有四种协议还支持用于发布者的确认机制,该机制允许发布应用程序跟踪已成功被代理接受的消息或未被接受的消息,并继续发布下一批或重试发布当前批次。

差异通常与使用的术语有关,而不是与语义有关。消息属性也因协议而异。

AMQP 0-9-1

在 AMQP 0-9-1 中,发布通过通道进行到交换器。交换器使用通过定义一个或多个队列和交换器之间的绑定,或源交换器和目标交换器来设置的路由拓扑。成功路由的消息被存储在队列中。

每个实体的作用在AMQP 0-9-1 概念指南中有介绍。

发布者确认是发布者确认机制。

存在几种常见的发布者错误类型,它们使用不同的协议功能进行处理

  • 发布到不存在的交换器会导致通道错误,该错误会关闭通道,从而不允许在该通道上进行任何进一步的发布(或其他操作)。
  • 当已发布的消息无法路由到任何队列时(例如,因为没有为目标交换器定义绑定),并且发布者将 mandatory 消息属性设置为 false(这是默认值),则消息将被丢弃或重新发布到备用交换器(如果存在)。
  • 当已发布的消息无法路由到任何队列时,并且发布者将 mandatory 消息属性设置为 true,则消息将被退回给发布者。发布者必须设置已退回的消息处理程序才能处理退回(例如,通过记录错误或使用不同的交换器重试)。

AMQP 1.0

在 AMQP 1.0 中,发布发生在链接的上下文中。

MQTT

在 MQTT 中,消息在与主题的连接上发布。服务器端 MQTT 连接过程通过主题交换器将消息路由到队列

当发布者选择使用 QoS 1 时,已发布的消息由 RabbitMQ 使用PUBACK 数据包进行确认。

发布者可以向服务器提供一个提示,即在主题上发布的保留的消息(存储以供将来传递给新订阅者)。每个主题只能保留最后发布的消息。

MQTT 5.0 PUBACK 数据包包含一个原因码,该原因码告诉发布者发布是否成功。RabbitMQ 返回的原因码包括:

  • 0 - 成功:消息成功路由到的所有队列均已成功接受该消息。
  • 16 - 没有匹配的订阅者:RabbitMQ 无法将消息路由到任何队列(因为没有定义到主题交换器的绑定)。
  • 131 - 实现特定的错误:RabbitMQ 拒绝了该消息(例如,当目标经典队列不可用时)。

在 MQTT 3.1 和 3.1.1 中,除了关闭连接之外,服务器没有机制可以向客户端传达发布错误。

请参阅MQTTMQTT-over-WebSockets 指南以了解更多信息。

STOMP

STOMP 客户端在与一个或多个目标(在 RabbitMQ 的情况下可能具有不同语义)的连接上发布。

STOMP 提供了一种让服务器将消息处理中的错误传达给发布者的方式。其发布者确认的变体称为收据,这是一项客户端在发布时启用的功能。

请参阅STOMP 指南STOMP-over-WebSocketsSTOMP 1.2 规范以了解更多信息。

路由

AMQP 0-9-1

在 AMQP 0-9-1 中,路由由交换器执行。交换器是命名的路由表。表项称为绑定。这在AMQP 0-9-1 概念指南中有更详细的介绍。

有几种内置的交换器类型

  • 主题
  • 扇出
  • 直连(包括默认交换器)
  • Headers

前三种类型在教程中通过示例进行了介绍。

可以通过插件提供更多交换器类型。一致性哈希交换器随机路由交换器内部事件交换器延迟消息交换器是随 RabbitMQ 提供的交换器插件。与所有插件一样,它们必须先启用才能使用。

无法路由的消息处理

客户端可能会尝试将消息发布到不存在的目标(交换器、主题、队列)。本节介绍不同协议如何处理这种情况。

RabbitMQ 收集并公开指标,可用于检测发布无法路由消息的发布者。

AMQP 0-9-1

当已发布的消息无法路由到任何队列时(例如,因为没有为目标交换器定义绑定),并且发布者将 mandatory 消息属性设置为 false(这是默认值),则消息将被丢弃或重新发布到备用交换器(如果存在)。

当已发布的消息无法路由到任何队列时,并且发布者将 mandatory 消息属性设置为 true,则消息将被退回给发布者。发布者必须设置已退回的消息处理程序才能处理退回(例如,通过记录错误或使用不同的交换器重试)。

备用交换器是 AMQP 0-9-1 的交换器功能,它允许客户端处理交换器无法路由的消息(即,因为没有绑定的队列或没有匹配的绑定)。典型示例包括检测客户端何时意外或恶意地发布无法路由的消息,或者“否则”路由语义,其中一些消息由特殊处理,其余消息由通用处理程序处理。

MQTT

发布到新主题将为其设置一个队列。不同的主题/QoS 级别组合将使用具有不同属性的不同队列。因此,发布者和消费者必须使用相同的 QoS 级别。

STOMP

STOMP 支持多种不同的目标,包括那些假定存在预先存在的拓扑的目标。

  • /topic:发布到尚未有消费者的主题将导致消息丢失。主题的第一个订阅者将为此声明一个队列。
  • /exchange:目标交换器必须存在,否则服务器将报告错误
  • /amq/queue:目标队列必须存在,否则服务器将报告错误
  • /queue:发布到不存在的队列将创建它。
  • /temp-queue:发布到不存在的临时队列将创建它。

指标

有一个关于无法路由的已丢弃消息的指标

Unroutable message metrics

在上例中,所有已发布的消息都因无法路由(且非强制)而被丢弃。

消息属性

AMQP 0-9-1

每次传递都结合了消息元数据和传递信息。不同的客户端库使用略有不同的方式提供对这些属性的访问。通常,传递处理程序可以访问传递数据结构。

以下属性是传递和路由详细信息;它们本身不是消息属性,由 RabbitMQ 在路由和传递时设置

属性类型描述
传递标签正整数

传递标识符,请参阅确认

已重传布尔值如果此消息以前被传递并重新排队,则设置为 true
交换器字符串路由此消息的交换器
路由键字符串发布者使用的路由键
消费者标签字符串消费者(订阅)标识符

以下是消息属性。大多数是可选的。它们由发布者在发布时设置

属性类型描述是否必需?
传递模式枚举(1 或 2)

2 表示“持久”,1 表示“瞬时”。一些客户端库将此属性公开为布尔值或枚举。

类型字符串特定于应用程序的消息类型,例如“orders.created”
Headers映射(字符串 => 任意)具有字符串头名称的任意头映射
内容类型字符串内容类型,例如“application/json”。由应用程序使用,而不是核心 RabbitMQ
内容编码字符串内容编码,例如“gzip”。由应用程序使用,而不是核心 RabbitMQ
消息 ID字符串任意消息 ID
关联 ID字符串帮助关联请求和响应,请参阅教程 6
回复至字符串包含响应队列名称,请参阅教程 6
到期时间字符串每条消息的 TTL
时间戳时间戳应用程序提供的时间戳
用户 ID字符串用户 ID,如果设置了已验证
应用 ID字符串应用程序名称

消息类型

消息上的 type 属性是一个任意字符串,它帮助应用程序传达消息的种类。它由发布者在发布时设置。该值可以是发布者和消费者就其达成一致的任何特定于域的字符串。

RabbitMQ 不验证或使用此字段,它供应用程序和插件使用和解释。

实践中的消息类型自然会分为几类,点分隔的命名约定很常见(但 RabbitMQ 或客户端不强制要求),例如 orders.createdlogs.lineprofiles.image.changed

如果消费者收到了未知类型的传递,强烈建议记录此类事件以方便故障排除。

内容类型和编码

内容(MIME 媒体)类型和内容编码字段允许发布者传达消费者应如何反序列化和解码消息有效负载。

RabbitMQ 不验证或使用这些字段,它供应用程序和插件使用和解释。

例如,具有 JSON 有效负载的消息应使用 application/json。如果有效负载已使用 LZ77(GZip)算法压缩,则其内容编码应为 gzip

可以通过用逗号分隔来指定多种编码。

发布者确认(确认)和数据安全

确保数据安全是应用程序、客户端库和 RabbitMQ 集群节点的集体责任。本节介绍了一些与数据安全相关的主题。

网络可能以不明显的方式发生故障,检测某些故障需要时间。因此,写入协议帧或一组帧(例如,已发布的消息)到其套接字的客户端不能假定消息已到达服务器并被成功处理。它可能在途中丢失,或者其传递可能会被严重延迟。

为了解决这个问题,开发了发布者端确认机制。它模仿了协议中已有的消费者确认机制

使用发布者确认的策略

发布者确认提供了一种机制,供应用程序开发人员跟踪哪些消息已被 RabbitMQ 成功接受。有几种常用的发布者确认使用策略:

  • 单独发布消息并使用流式确认(异步 API 元素:确认事件处理程序、未来/承诺等)
  • 发布一批消息并等待所有未完成的确认
  • 单独发布消息,并在继续发布下一条消息之前等待其被确认。此选项强烈不推荐,因为它对发布者吞吐量有严重的负面影响。

它们在吞吐量影响和易用性方面有所不同。

流式确认

大多数客户端库通常提供一种方式供开发人员在收到服务器的单个确认时进行处理。确认会异步到达。由于 AMQP 0-9-1 中的发布本质上也是异步的,因此此选项可以以非常小的开销进行安全发布。算法通常与此类似:

  • 在通道上启用发布者确认
  • 对于每条已发布的消息,添加一个映射条目,将当前序列号映射到消息
  • 当收到积极确认时,删除该条目
  • 当收到消极确认时,删除该条目并安排其消息重发(或适合的其他操作)。

在 RabbitMQ Java 客户端中,确认处理程序通过ConfirmCallbackConfirmListener 接口公开。必须将一个或多个监听器添加到通道

批量发布

此策略涉及发布消息批次并等待整个批次被确认。重试在批次级别进行。

  • 在通道上启用发布者确认
  • 对于每批已发布的消息,等待所有未完成的确认
  • 当所有确认都为肯定时,发布下一批
  • 如果存在否定确认或超时,则重发整个批次或仅重发相关消息。

某些客户端提供了方便的 API 元素来等待所有未完成的确认。例如,在 Java 客户端中,有Channel#waitForConfirms(timeout)

由于此方法涉及等待确认,因此会对发布者吞吐量产生负面影响。批次越大,影响越小。

发布并等待

此策略可以被视为一种反模式,主要为了完整性而记录。它涉及发布一条消息并立即等待未完成的确认。可以将其视为上述批量发布策略,其中批次大小等于一。

这种方法将对吞吐量产生非常显著的负面影响,不推荐使用。

从连接故障中恢复

客户端和 RabbitMQ 节点之间的网络连接可能会失败。应用程序如何处理这些故障直接影响到整个系统的数据安全性。

许多 RabbitMQ 客户端支持连接和拓扑(队列、交换器、绑定和消费者)的自动恢复:Java、.NET、Bunny 是其中的一些例子。

其他客户端不提供自动恢复作为一项功能,但提供了应用程序开发人员如何实现恢复的示例。

许多应用程序的自动恢复过程遵循以下步骤:

  1. 重新连接到可达节点
  2. 恢复连接监听器
  3. 重新打开通道
  4. 恢复通道监听器
  5. 恢复通道 basic.qos 设置、发布者确认和事务设置

在连接和通道恢复后,拓扑恢复可以开始。拓扑恢复包括以下操作,针对每个通道执行:

  1. 重新声明交换器(预定义交换器除外)
  2. 重新声明队列
  3. 恢复所有绑定
  4. 恢复所有消费者

异常处理

发布者通常会遇到两种类型的异常

  • 由于写入失败或超时导致的 I/O 网络异常
  • 确认传递超时

请注意,这里的“异常”是指广义上的错误;某些编程语言根本没有异常,因此那里的客户端会以不同的方式传达错误。本节的讨论和建议应同样适用于大多数客户端库和编程语言。

第一种异常可能立即在写入时发生,也可能在一定延迟后发生。这是因为某些类型的 I/O 故障(例如,网络拥塞严重或丢包率高)可能需要时间才能检测到。在连接恢复后,发布可以继续,但如果连接因告警而被阻止,则在告警清除之前,所有后续尝试都将失败。这将在下面资源告警的影响部分更详细地介绍。

第二种异常仅在应用程序开发人员提供超时时才会发生。对于给定应用程序来说,什么超时值是合理的由开发人员决定。它不应低于有效的心跳超时

资源告警的影响

当集群节点处于资源告警状态时,集群中尝试发布消息的所有连接都将被阻止,直到集群中的所有告警清除。

当连接被阻止时,此连接发送的任何数据都不会在连接上被读取、解析或处理。当连接被解除阻止时,所有客户端流量处理将恢复。

兼容的 AMQP 0-9-1 客户端将在被阻止和解除阻止时收到通知

被阻止连接上的写入将超时或以 I/O 写入异常失败。

指标

指标收集和监控对于发布者来说与应用程序或组件一样重要。在发布者方面,RabbitMQ 收集的几个指标尤其值得关注:

发布和确认速率大多不言自明。耗尽率之所以如此重要,是因为它们有助于检测未最优使用连接或通道的应用程序,从而提供次优的发布速率并浪费资源。

无法路由的消息速率有助于检测发布无法路由到任何队列的消息的应用程序。例如,这可能表明配置错误。

客户端库也可能收集指标。例如RabbitMQ Java 客户端。这些指标可以提供对应用程序特定架构的见解(例如,哪个发布组件发布了无法路由的消息),这是 RabbitMQ 节点无法推断的。

并发考虑

并发主题都是关于客户端库实现细节的,但可以提供一些通用建议。总的来说,应避免在共享的“发布上下文”(AMQP 0-9-1 中的通道,STOMP 中的连接,AMQP 1.0 中的会话等)上发布,并认为其不安全。

这样做可能导致在线数据帧的错误帧。这会导致连接关闭。

对于少量并发发布者在单个应用程序中使用一个线程(或类似)进行发布是最优解决方案。对于大量(例如,数百或数千)的发布者,请使用线程池。

暂时阻止发布

可以通过将内存高水位标记设置为 0 来有效地阻止集群中的所有发布,从而立即触发资源告警

rabbitmqctl set_vm_memory_high_watermark 0

排查发布者问题

本节介绍发布者的一些常见问题,以及如何识别和解决它们。分布式系统中的故障形式多样,因此此列表绝非详尽无遗。

连接故障

像任何客户端一样,发布者必须首先成功连接并成功进行身份验证。

潜在的连接问题数量非常广泛,并且有一个专门的指南

身份验证和授权

像任何客户端一样,发布者可能无法进行身份验证,或者没有权限访问其目标虚拟主机,或者无法发布到目标交换器。

此类故障会被 RabbitMQ 记录为错误。

请参阅访问控制指南中关于身份验证授权的故障排除部分。

连接耗尽

某些应用程序为发布的每条消息打开一个新连接。这非常低效,也不是消息协议的设计用途。这种情况可以通过连接指标进行检测

如果可能,请优先使用长期连接。

连接中断

网络连接可能会失败。一些客户端库支持自动连接和拓扑恢复,另一些则方便在应用程序代码中实现连接恢复。

当连接断开时,将不会有任何发布通过,或者客户端不会在内部排队(延迟)。此外,以前序列化并写入套接字的消息不能保证能到达目标节点。因此,对于需要可靠发布和数据安全的发布者来说,使用发布者确认来跟踪哪些发布被 RabbitMQ 确认是至关重要的。未确认的消息应在一段时间后视为未传递。这些消息如果对应用程序安全,可以被重新发布。这在教程 7 和本指南的数据安全部分有介绍。

有关详细信息,请参阅从网络连接故障中恢复

路由问题

发布者可以成功连接、进行身份验证,并获得发布到交换器(主题、目标)的权限。但是,这些消息可能不会被路由到任何队列或消费者。这可能是由于:

  • 应用程序之间的配置不匹配,例如发布者和消费者使用的主题不匹配
  • 发布者配置错误(交换器、主题、路由键不是预期的)
  • 对于 AMQP 0-9-1,目标交换器上缺少绑定
  • 资源告警生效:参见下面的部分
  • 网络连接失败且客户端未恢复:参见上面的部分

检查拓扑和指标通常有助于快速缩小问题范围。例如,在管理 UI 中,每个交换器页面可用于确认是否有入站消息活动(入站速率大于零)以及绑定的情况。

在以下示例中,交换器没有绑定,因此任何消息都不会被路由到任何地方。

An exchange without bindings

也可以使用rabbitmq-diagnostics 列出绑定。

# note that the implicit default exchange bindings won't
# be listed as of RabbitMQ 3.8
rabbitmq-diagnostics list_bindings --vhost "/"
=> Listing bindings for vhost /...

在上例中,命令没有产生任何结果。

从 RabbitMQ 3.8 开始,有一个新的无法路由的已丢弃消息的指标。

Unroutable message metrics

在上例中,所有已发布的消息都因无法路由(且非强制)而被丢弃。请参阅本指南中的无法路由的消息处理部分。

集群范围和连接指标以及服务器日志将有助于发现生效的资源告警。

资源告警

当存在资源告警时,所有发布连接都将被阻止,直到告警清除。客户端可以选择接收通知,告知它们何时被阻止。有关详细信息,请参阅资源告警指南

协议异常

对于某些协议,例如 AMQP 0-9-1 和 STOMP,发布者可能会遇到称为协议错误(异常)的情况。例如,发布到不存在的交换器或将交换器绑定到不存在的交换器将导致通道异常,并导致通道关闭。在关闭的通道上无法发布。此类事件会由发布者连接到的 RabbitMQ 节点记录。失败的发布尝试也会导致客户端异常或返回错误,具体取决于使用的客户端库。

共享通道上的并发发布

客户端库不支持共享通道上的并发发布。有关详细信息,请参阅并发考虑部分。

© . This site is unofficial and not affiliated with VMware.