跳至主要内容
版本:4.0

通道

概述

本指南涵盖了与通道相关的各种主题,通道是特定于 AMQP 0-9-1 的抽象。通道不能独立存在,必须与连接关联,因此建议先熟悉 连接指南

本指南涵盖以下内容

以及其他与连接相关的主题。

基础知识

某些应用程序需要与代理建立多个逻辑连接。但是,保持多个 TCP 连接同时打开是不可取的,因为这样做会消耗系统资源,并且使配置防火墙变得更加困难。AMQP 0-9-1 连接使用 通道 进行多路复用,通道可以被认为是“共享单个 TCP 连接的轻量级连接”。

客户端执行的每个协议操作都在通道上进行。特定通道上的通信与其他通道上的通信完全独立,因此每个协议方法都包含一个通道 ID(也称为通道编号),这是一个整数,代理和客户端都使用它来确定该方法属于哪个通道。

通道仅存在于连接的上下文中,不能独立存在。当连接关闭时,连接上的所有通道也会关闭。

对于使用多个线程/进程进行处理的应用程序,通常做法是为每个线程/进程打开一个新通道,而不是在它们之间共享通道。

通道生命周期

打开通道

应用程序在成功打开 连接 后立即打开通道。

以下是一个 Java 客户端示例,它在打开新连接后使用自动分配的通道 ID 打开一个新通道

ConnectionFactory cf = new ConnectionFactory();
Connection conn = cf.createConnection();

Channel ch = conn.createChannel();

// ... use the channel to declare topology, publish, consume

在 .NET 客户端中,通道使用 IModel 接口表示,因此 API 中的名称不同

var cf = new ConnectionFactory();
var conn = cf.newConnection();

// the .NET client calls channels "models"
var ch = conn.CreateModel();

// ... use the channel to declare topology, publish, consume

与连接类似,通道旨在长期存在。也就是说,无需为每个操作打开通道,这样做效率非常低,因为打开通道需要网络往返。

关闭通道

不再需要通道时,应将其关闭。关闭通道会使其无法使用,并安排回收其资源

Channel ch = conn.createChannel();

// do some work

// close the channel when it is no longer needed

ch.close();

使用 .NET 客户端的相同示例

// the .NET client calls channels "models"
var ch = conn.CreateModel();

// do some work

// close the channel when it is no longer needed

ch.Close();

如上所述,已关闭的通道无法使用。尝试对已关闭的通道执行操作会导致异常,指出该通道已关闭。

当通道的连接关闭时,通道也会关闭。

如果在通道上 确认了多个传递 后立即关闭通道,则确认可能在通道终止之前到达目标队列,也可能没有到达。在这种情况下,通道上待确认的消息将在通道关闭后自动重新入队。

这种情况通常适用于使用短暂通道的工作负载。使用长期存在的通道,并以允许消费者处理重新传递的方式设计消费者,可以避免此边缘情况行为带来的潜在意外。请注意,重新传递的消息将 明确标记为重新传递

通道和错误处理

在上一节中,通道是由应用程序关闭的。还有一种方法可以关闭通道:由于协议异常。

某些场景被认为是协议中的可恢复(“软”)错误。它们导致通道关闭,但应用程序可以打开另一个通道并尝试恢复或重试多次。最常见的示例包括

  • 重新声明现有队列 或交换机,但属性不匹配,会导致 406 PRECONDITION_FAILED 错误
  • 访问用户无权访问的资源 会导致 403 ACCESS_REFUSED 错误
  • 绑定不存在的队列或不存在的交换机会导致 404 NOT_FOUND 错误
  • 从不存在的队列中消费会导致 404 NOT_FOUND 错误
  • 向不存在的交换机发布消息会导致 404 NOT_FOUND 错误
  • 从与声明该队列的连接不同的连接访问 独占队列 会导致 405 RESOURCE_LOCKED 错误

客户端库提供了一种观察和响应通道异常的方法。例如,在 Java 客户端中,可以注册错误处理程序 并访问通道关闭(关闭)原因。

对已关闭的通道执行任何操作都会导致异常。请注意,当 RabbitMQ 关闭通道时,它会使用异步协议方法通知客户端。换句话说,导致通道异常的操作不会立即失败,但通道关闭事件处理程序将在不久后触发。

某些客户端库可能使用阻塞操作来等待响应。在这种情况下,它们可能以不同的方式传达通道异常,例如使用运行时异常、错误类型或其他适合语言的方法。

有关错误代码的更完整列表,请参阅 AMQP 0-9-1 参考

资源使用

每个通道在客户端上消耗相对较少的内存。根据客户端库的实现细节,它还可以使用专用的线程池(或类似机制),在其中调度消费者操作,因此使用一个或多个线程(或类似机制)。

每个通道在客户端连接到的节点上也会消耗相对较少的内存,以及几个 Erlang 进程。由于节点通常为多个通道连接提供服务,因此过度使用通道或通道泄漏的影响主要体现在 RabbitMQ 节点的 指标 中,而不是客户端的指标中。

鉴于这两个因素,强烈建议限制每个连接使用的通道数量。作为指南,大多数应用程序每个连接可以使用一位数的通道。那些具有特别高并发率的应用程序(通常此类应用程序是 消费者)可以从每个线程/进程/协程一个通道开始,并在指标表明原始模型不再可持续时(例如,因为它消耗了太多内存)切换到通道池。

有关如何检查通道、连接上的通道数量、通道周转率等的详细信息,请参阅 监控、指标和诊断 部分。

每个连接的最大通道数

可以同时在连接上打开的最大通道数量由客户端和服务器在连接时协商。该值可以为 RabbitMQ 和客户端库配置。

在服务器端,限制使用 channel_max 控制

# no more 100 channels can be opened on a connection at the same time
channel_max = 100

如果配置的限制被超过,连接将因致命错误而关闭

2019-02-11 16:04:06.296 [error] <0.887.0> Error on AMQP connection <0.887.0> (127.0.0.1:49956 -> 127.0.0.1:5672, vhost: '/', user: 'guest', state: running), channel 23:
operation none caused a connection exception not_allowed: "number of channels opened (22) has reached the negotiated channel_max (22)"

客户端可以配置为允许每个连接使用更少的通道。使用 RabbitMQ Java 客户端ConnectionFactory#setRequestedChannelMax 是控制限制的方法

ConnectionFactory cf = new ConnectionFactory();
// Ask for up to 32 channels per connection. Will have an effect as long as the server is configured
// to use a higher limit, otherwise the server's limit will be used.
cf.setRequestedChannelMax(32);

使用 RabbitMQ .NET 客户端,使用 ConnectionFactory#RequestedChannelMax 属性

var cf = new ConnectionFactory();
// Ask for up to 32 channels per connection. Will have an effect as long as the server is configured
// to use a higher limit, otherwise the server's limit will be used.
cf.RequestedChannelMax = 32;

使用较小的值:客户端不能配置为允许比服务器配置的最大值更多的通道。尝试此操作的客户端将在日志中遇到类似于以下内容的错误

2019-02-11 16:03:16.543 [error] <0.882.0> closing AMQP connection <0.882.0> (127.0.0.1:49911 -> 127.0.0.1:5672):
failed to negotiate connection parameters: negotiated channel_max = 2047 is higher than the maximum allowed value (32)

每个节点的最大通道数

可以使用配置参数 channel_max_per_node 配置允许在集群中每个节点上打开的最大通道数

# no more than 500 channels can be opened on each node at the same time
channel_max_per_node = 500

监控、指标和诊断

由于它们会影响节点资源的使用,因此当前打开的通道数量和通道打开/关闭速率是系统的重要指标,应进行 监控。监控它们将有助于检测一些常见问题

  • 通道泄漏
  • 通道周转率高

这两个问题最终会导致节点耗尽 资源

单独的通道指标,例如 未确认的消息 数量或 basic.get 操作速率,可以帮助识别应用程序行为中的异常和低效率。

内存使用

监控系统 和操作人员都需要检查通道在节点上消耗了多少内存、节点上的通道总数,然后识别每个连接上的通道数量。

通道数量显示在 管理 UI 的概述选项卡上,以及 连接数量。通过将通道数量除以连接数量,操作人员可以确定每个连接的平均通道数量。

要找出节点上通道使用了多少内存,请使用 rabbitmq-diagnostics memory_breakdown

rabbitmq-diagnostics memory_breakdown -q --unit mb
# => [elided for brevity]
# ...
# => connection_channels: 3.596 mb (2.27%)
# ...
# => [elided for brevity]

有关详细信息,请参阅 RabbitMQ 内存使用分析指南

通道泄漏

通道泄漏是指应用程序重复打开通道而不关闭它们,或者至少只关闭其中一部分通道的情况。

通道泄漏最终会使节点(或多个目标节点)耗尽 RAM 和 CPU 资源。

相关指标

管理 UI 的概览标签列出了当前用户有权访问的所有虚拟主机中的通道总数

mgmt-ui-global-channel-count.png

要检查连接上的当前通道数以及每个连接的通道限制,请导航到“连接”标签,如果相关列未显示,则启用它们

Per connection channel count in management UI

概览和单个节点页面提供了一个通道流失率图表,截至 RabbitMQ 3.7.9。如果通道打开操作的速率始终高于通道关闭操作的速率,则表明应用程序之一存在通道泄漏

Channel count growth in management UI

要找出哪些连接泄漏了通道,请检查每个连接的通道计数,如本指南中所述。

高通道流失率

如果系统的新建通道打开速率始终很高,并且通道关闭速率也始终很高,则该系统被认为具有高通道流失率。这通常意味着应用程序使用的是短暂通道,或者由于通道级异常而经常关闭通道。

虽然在某些工作负载中,这是系统的自然状态,但在可能的情况下,应使用长期通道。

管理 UI 提供了一个通道流失率图表。以下是展示在一个给定的时间段内,通道打开和关闭数量几乎相同的相当低的通道流失率的图表

Node channel churn in management UI

虽然连接和断开连接速率是特定于系统的,但始终高于每秒 100 的速率可能表明一个或多个应用程序的连接管理不佳,通常值得调查。

High channel churn in management UI

请注意,某些客户端和运行时(特别是 PHP)不使用长期连接,并且如果未使用 专用代理,则预计它们会有很高的连接流失率。

在管理 UI 中检查通道及其状态

要在管理 UI 中检查通道,请导航到“通道”标签,并根据需要添加或删除列

High channel churn in management UI

使用 CLI 工具检查通道及其状态

rabbitmqctl list_connectionsrabbitmqctl list_channels 是用于检查每个连接的通道计数和通道详细信息(如消费者数量、未确认消息预取 等)的主要命令。

rabbitmqctl list_connections name channels -q
# => name channels
# => 127.0.0.1:52956 -> 127.0.0.1:5672 10
# => 127.0.0.1:52964 -> 127.0.0.1:5672 33

最右侧的列包含连接上的通道计数。

可以隐藏表头

rabbitmqctl list_connections name channels -q --no-table-headers
# => 127.0.0.1:52956 -> 127.0.0.1:5672 10
# => 127.0.0.1:52964 -> 127.0.0.1:5672 33

要检查单个通道,请使用 rabbitmqctl list_channels

rabbitmqctl list_channels -q
# => pid user consumer_count messages_unacknowledged
# => <rabbit@mercurio.3.815.0> guest 0 0
# => <rabbit@mercurio.3.820.0> guest 0 0
# => <rabbit@mercurio.3.824.0> guest 0 0
# => <rabbit@mercurio.3.828.0> guest 0 0
# => <rabbit@mercurio.3.832.0> guest 0 0
# => <rabbit@mercurio.3.839.0> guest 0 0
# => <rabbit@mercurio.3.840.0> guest 0 0

可以隐藏表头

rabbitmqctl list_channels -q --no-table-headers
# => <rabbit@mercurio.3.815.0> guest 0 0
# => <rabbit@mercurio.3.820.0> guest 0 0
# => <rabbit@mercurio.3.824.0> guest 0 0
# => <rabbit@mercurio.3.828.0> guest 0 0
# => <rabbit@mercurio.3.832.0> guest 0 0
# => <rabbit@mercurio.3.839.0> guest 0 0
# => <rabbit@mercurio.3.840.0> guest 0 0

可以显示不同的列集

rabbitmqctl list_channels -q --no-table-headers vhost connection number  prefetch_count messages_unconfirmed
# => / <rabbit@mercurio.3.799.0> 1 0 0
# => / <rabbit@mercurio.3.802.0> 1 0 0
# => / <rabbit@mercurio.3.799.0> 2 0 0
# => / <rabbit@mercurio.3.799.0> 3 0 0
# => / <rabbit@mercurio.3.802.0> 2 0 0
# => / <rabbit@mercurio.3.802.0> 3 0 0
# => / <rabbit@mercurio.3.799.0> 4 0 0
# => / <rabbit@mercurio.3.802.0> 4 0 0
# => / <rabbit@mercurio.3.799.0> 5 0 0
# => / <rabbit@mercurio.3.799.0> 6 0 0
rabbitmqctl list_channels -s vhost connection number confirm
# => / <rabbit@mercurio.3.799.0> 1 false
# => / <rabbit@mercurio.3.802.0> 1 false
# => / <rabbit@mercurio.3.799.0> 2 false
# => / <rabbit@mercurio.3.799.0> 3 false
# => / <rabbit@mercurio.3.802.0> 2 false
# => / <rabbit@mercurio.3.802.0> 3 false
# => / <rabbit@mercurio.3.799.0> 4 false
# => / <rabbit@mercurio.3.802.0> 4 false
# => / <rabbit@mercurio.3.799.0> 5 false

发布者流控制

发布消息的通道可能会超过系统的其他部分的速度,最有可能的是繁忙的队列和执行复制的队列。发生这种情况时,流控制 将应用于发布通道,进而应用于连接。仅消费消息的通道和连接不会受到影响。

对于使用 自动确认模式 的较慢的消费者,连接和通道在写入 TCP 套接字时很可能遇到流控制。

监控 系统可以收集关于流状态中连接数量的指标。经常遇到流控制的应用程序可以考虑使用单独的连接来发布和消费,以避免流控制对非发布操作(例如队列管理)的影响。

© 2024 RabbitMQ. All rights reserved.