跳至主内容
版本:4.2

通道 (Channels)

概述

本指南涵盖了与通道相关的各种主题,通道是 AMQP 0-9-1 特有的抽象。通道无法独立于连接而存在,因此强烈建议您先熟悉 连接指南

本指南涵盖

以及与连接相关的其他主题。

基础知识

某些应用程序需要与代理建立多个逻辑连接。然而,同时保持许多 TCP 连接是不理想的,因为这样做会消耗系统资源,并使防火墙配置更加困难。AMQP 0-9-1 连接通过 通道 进行多路复用,通道可以被视为“共享单个 TCP 连接的轻量级连接”。

客户端执行的每个协议操作都在通道上进行。特定通道上的通信与另一通道上的通信完全分开,因此每个协议方法都携带一个通道 ID(又名通道号),这是一个整数,代理和客户端都使用它来确定该方法属于哪个通道。

通道仅在连接的上下文中存在,从不独立存在。当连接关闭时,其上的所有通道也会关闭。

对于使用多线程/多进程进行处理的应用程序,通常会为每个线程/进程打开一个新通道,而不是在它们之间共享通道。

通道生命周期

打开通道

应用程序在成功打开 连接 后立即打开一个通道。

这是一个 Java 客户端示例,它在打开新连接后,使用自动分配的通道 ID 打开一个新通道。

ConnectionFactory cf = new ConnectionFactory();
Connection conn = cf.createConnection();

Channel ch = conn.createChannel();

// ... use the channel to declare topology, publish, consume

在 .NET 客户端中,通道使用 IModel 接口表示,因此 API 中的名称不同。

var cf = new ConnectionFactory();
var conn = cf.newConnection();

// the .NET client calls channels "models"
var ch = conn.CreateModel();

// ... use the channel to declare topology, publish, consume

与连接一样,通道的目的是长久存在的。也就是说,没有必要为每个操作打开一个通道,这样做会非常低效,因为打开通道需要一次网络往返。

关闭通道

当不再需要通道时,应将其关闭。关闭通道将使其无法使用,并安排回收其资源。

Channel ch = conn.createChannel();

// do some work

// close the channel when it is no longer needed

ch.close();

使用 .NET 客户端的相同示例。

var ch = await conn.CreateChannelAsync();

// do some work

// close the channel when it is no longer needed

ch.Close();

如上所述,已关闭的通道不能使用。尝试在已关闭的通道上执行操作将导致一个异常,说明该通道已关闭。

当通道的连接关闭时,通道也会关闭。

如果通道在使用者 确认了其中的多条消息 后立即关闭,则这些确认消息可能在通道终止之前到达或未到达目标队列。在这种情况下,通道上待确认的消息将在通道关闭后自动重新入队。

这种情况通常适用于生命周期短暂的通道。使用长生命周期通道并设计能够处理重传的消费者可以缓解上述行为。长生命周期通道通常与更好的性能相关。请注意,重传的消息将 明确标记为如此

通道与错误处理

在上一节中,通道由应用程序关闭。通道还可以通过另一种方式关闭:由于协议异常。

某些场景被假定为可恢复的(“软”)协议错误。它们会关闭通道,但应用程序可以打开另一个通道并尝试恢复或重试多次。最常见的例子是:

  • 使用不匹配属性 重新声明现有队列 或交换器将导致 406 PRECONDITION_FAILED 错误。
  • 访问用户无权访问的 资源 将导致 403 ACCESS_REFUSED 错误。
  • 绑定不存在的队列或不存在的交换器将导致 404 NOT_FOUND 错误。
  • 从不存在的队列消费将导致 404 NOT_FOUND 错误。
  • 发布到不存在的交换器将导致 404 NOT_FOUND 错误。
  • 从非声明连接访问 独占队列 将导致 405 RESOURCE_LOCKED

客户端库提供了观察和响应通道异常的方法。例如,在 Java 客户端中,有一个 注册错误处理程序 并访问通道关闭(closure)原因的方法。

对已关闭通道的任何操作尝试都将导致异常。请注意,当 RabbitMQ 关闭通道时,它会使用异步协议方法通知客户端。换句话说,导致通道异常的操作不会立即失败,但通道关闭事件处理程序将在稍后触发。

某些客户端库可能使用阻塞操作来等待响应。在这种情况下,它们可能会以不同的方式传达通道异常,例如使用运行时异常、错误类型或其他适合该语言的手段。

有关错误代码的更完整列表,请参阅 AMQP 0-9-1 参考

资源使用

每个通道在客户端上消耗的内存量相对较少。根据客户端库的实现细节,它还可以使用专用的线程池(或其他类似机制)来分派使用者操作,因此会占用一个或多个线程(或其他类似资源)。

每个通道还在客户端连接到的节点上消耗相对较少的内存,以及几个 Erlang 进程。由于一个节点通常服务于多个通道连接,过量使用通道或通道泄漏的影响将主要体现在 RabbitMQ 节点的 指标 中,而不是客户端的指标。

鉴于这两个因素,强烈建议限制每个连接使用的通道数量。作为指导,大多数应用程序每个连接可以使用个位数数量的通道。那些并发率特别高(通常是 消费者)的应用程序可以从每个线程/进程/协程一个通道开始,当指标表明原始模型不再可持续(例如,因为它消耗过多内存)时,再切换到通道池。

请参阅 监控、指标和诊断 部分,了解如何检查通道、连接上的通道数量、通道的更换率等。

每个连接的最大通道数

在连接期间,客户端和服务器会协商连接上同时打开的最大通道数。此值对于 RabbitMQ 和客户端库均可配置。

在服务器端,限制由 channel_max 控制。

# no more 100 channels can be opened on a connection at the same time
channel_max = 100

如果超过配置的限制,连接将以致命错误关闭。

2019-02-11 16:04:06.296 [error] <0.887.0> Error on AMQP connection <0.887.0> (127.0.0.1:49956 -> 127.0.0.1:5672, vhost: '/', user: 'guest', state: running), channel 23:
operation none caused a connection exception not_allowed: "number of channels opened (22) has reached the negotiated channel_max (22)"

客户端可以配置为允许每个连接的通道数较少。使用 RabbitMQ Java 客户端ConnectionFactory#setRequestedChannelMax 是控制该限制的方法。

ConnectionFactory cf = new ConnectionFactory();
// Ask for up to 32 channels per connection. Will have an effect as long as the server is configured
// to use a higher limit, otherwise the server's limit will be used.
cf.setRequestedChannelMax(32);

使用 RabbitMQ .NET 客户端,请使用 ConnectionFactory#RequestedChannelMax 属性。

var cf = new ConnectionFactory();
// Ask for up to 32 channels per connection. Will have an effect as long as the server is configured
// to use a higher limit, otherwise the server's limit will be used.
cf.RequestedChannelMax = 32;

两者中较低的值将被使用:客户端无法配置为允许比服务器配置的最大值更多的通道。尝试这样做的客户端将在日志中遇到类似如下的错误:

2019-02-11 16:03:16.543 [error] <0.882.0> closing AMQP connection <0.882.0> (127.0.0.1:49911 -> 127.0.0.1:5672):
failed to negotiate connection parameters: negotiated channel_max = 2047 is higher than the maximum allowed value (32)

每个节点的最多通道数

可以使用配置参数 channel_max_per_node 来配置集群中每个节点允许打开的最大通道数。

# no more than 500 channels can be opened on each node at the same time
channel_max_per_node = 500

监控、指标和诊断

由于它们会影响节点资源使用,当前打开的通道数量以及通道的打开/关闭速率是系统的重要指标,应予以 监控。监控它们将有助于检测许多常见问题:

  • 通道泄漏
  • 高通道更换率

这两种问题最终都会导致节点 资源 耗尽。

单个通道的指标,例如 未确认消息 的数量或 basic.get 操作速率,有助于识别应用程序行为中的异常和低效率。

内存使用

监控系统 和运营商可能需要检查通道在节点上占用的内存量,节点上的总通道数,然后确定每个连接上有多少通道。

通道数量显示在 管理 UI 的“概述”选项卡中,连接数 也显示在那里。通过将通道数除以连接数,操作员可以确定每个连接的平均通道数。

要找出节点上通道占用了多少内存,请使用 rabbitmq-diagnostics memory_breakdown

rabbitmq-diagnostics memory_breakdown -q --unit mb
# => [elided for brevity]
# ...
# => connection_channels: 3.596 mb (2.27%)
# ...
# => [elided for brevity]

有关详细信息,请参阅 RabbitMQ 内存使用分析指南

通道泄漏

通道泄漏是指应用程序反复打开通道而不关闭它们,或者至少只关闭其中一部分。

通道泄漏最终会导致目标节点(或多个目标节点)的 RAM 和 CPU 资源耗尽。

相关指标

管理 UI 的“概述”选项卡列出了当前用户有权访问的所有虚拟主机中的总通道数。

mgmt-ui-global-channel-count.png

要检查连接上的当前通道数以及每个连接的通道限制,请导航到“连接”选项卡,并在需要时启用相关列。

Per connection channel count in management UI

RabbitMQ 3.7.9 起,概述和各个节点页面提供了一个通道更换率图表。如果通道打开操作的速率持续高于通道关闭操作的速率,则表明其中一个应用程序存在通道泄漏。

Channel count growth in management UI

要找出哪个连接泄漏了通道,请按本指南所示检查每个连接的通道计数。

高通道更换率

当一个系统的持续新开通道速率很高且持续关闭通道速率也很高时,就被称为高通道更换率。这通常意味着应用程序使用了生命周期短暂的通道,或者通道经常因通道级异常而关闭。

虽然对于某些工作负载来说这是系统的自然状态,但如果可能,应使用长生命周期通道。

管理 UI 提供了一个通道更换率图表。下图演示了一个通道更换率相当低的情况,在给定时间内通道的打开和关闭数量几乎相同。

Node channel churn in management UI

虽然连接和断开连接的速率是系统特定的,但持续高于 100 次/秒的速率很可能表明一个或多个应用程序存在次优的连接管理,并且通常值得调查。

High channel churn in management UI

请注意,某些客户端和运行时(尤其是 PHP)不使用长连接,并且预期会产生高连接更换率,除非 使用了专门的代理

在管理 UI 中检查通道及其状态

要在管理 UI 中检查通道,请导航到“通道”选项卡,并根据需要添加或删除列。

High channel churn in management UI

使用 CLI 工具检查通道及其状态

rabbitmqctl list_connectionsrabbitmqctl list_channels 是检查每个连接的通道计数和通道详细信息(如使用者数量、未确认消息预取 等)的主要命令。

rabbitmqctl list_connections name channels -q
# => name channels
# => 127.0.0.1:52956 -> 127.0.0.1:5672 10
# => 127.0.0.1:52964 -> 127.0.0.1:5672 33

最右边的列包含连接上的通道计数。

可以隐藏表头。

rabbitmqctl list_connections name channels -q --no-table-headers
# => 127.0.0.1:52956 -> 127.0.0.1:5672 10
# => 127.0.0.1:52964 -> 127.0.0.1:5672 33

要检查单个通道,请使用 rabbitmqctl list_channels

rabbitmqctl list_channels -q
# => pid user consumer_count messages_unacknowledged
# => <rabbit@mercurio.3.815.0> guest 0 0
# => <rabbit@mercurio.3.820.0> guest 0 0
# => <rabbit@mercurio.3.824.0> guest 0 0
# => <rabbit@mercurio.3.828.0> guest 0 0
# => <rabbit@mercurio.3.832.0> guest 0 0
# => <rabbit@mercurio.3.839.0> guest 0 0
# => <rabbit@mercurio.3.840.0> guest 0 0

可以隐藏表头。

rabbitmqctl list_channels -q --no-table-headers
# => <rabbit@mercurio.3.815.0> guest 0 0
# => <rabbit@mercurio.3.820.0> guest 0 0
# => <rabbit@mercurio.3.824.0> guest 0 0
# => <rabbit@mercurio.3.828.0> guest 0 0
# => <rabbit@mercurio.3.832.0> guest 0 0
# => <rabbit@mercurio.3.839.0> guest 0 0
# => <rabbit@mercurio.3.840.0> guest 0 0

可以显示不同的列集。

rabbitmqctl list_channels -q --no-table-headers vhost connection number  prefetch_count messages_unconfirmed
# => / <rabbit@mercurio.3.799.0> 1 0 0
# => / <rabbit@mercurio.3.802.0> 1 0 0
# => / <rabbit@mercurio.3.799.0> 2 0 0
# => / <rabbit@mercurio.3.799.0> 3 0 0
# => / <rabbit@mercurio.3.802.0> 2 0 0
# => / <rabbit@mercurio.3.802.0> 3 0 0
# => / <rabbit@mercurio.3.799.0> 4 0 0
# => / <rabbit@mercurio.3.802.0> 4 0 0
# => / <rabbit@mercurio.3.799.0> 5 0 0
# => / <rabbit@mercurio.3.799.0> 6 0 0
rabbitmqctl list_channels -s vhost connection number confirm
# => / <rabbit@mercurio.3.799.0> 1 false
# => / <rabbit@mercurio.3.802.0> 1 false
# => / <rabbit@mercurio.3.799.0> 2 false
# => / <rabbit@mercurio.3.799.0> 3 false
# => / <rabbit@mercurio.3.802.0> 2 false
# => / <rabbit@mercurio.3.802.0> 3 false
# => / <rabbit@mercurio.3.799.0> 4 false
# => / <rabbit@mercurio.3.802.0> 4 false
# => / <rabbit@mercurio.3.799.0> 5 false

发布者流量控制

发布消息的通道可能会比系统的其他部分(最可能是繁忙的队列和执行复制的队列)更快。发生这种情况时,流量控制 将应用于发布通道,进而应用于连接。仅消耗消息的通道和连接不受影响。

对于使用 自动确认模式 的较慢的消费者,当写入 TCP 套接字时,连接和通道极有可能经历流量控制。

监控 系统可以收集处于流量状态的连接数量的指标。经常遇到流量控制的应用程序可以考虑使用单独的连接进行发布和消耗,以避免流量控制对非发布操作(例如队列管理)的影响。

© . This site is unofficial and not affiliated with VMware.