跳至主要内容
版本:4.0

流插件

概述

流是一种持久化且复制的数据结构,它模拟了具有非破坏性消费者语义的追加日志。

此功能在所有当前维护的发布系列中可用。

流可以用作常规的 AMQP 0.9.1 队列,也可以通过专用二进制协议 插件和相关客户端使用。请查看流核心和流插件比较页面以获取功能矩阵。

本页面介绍流插件,该插件允许使用此新二进制协议与流进行交互。有关概念和操作流方式的概述,请参阅有关 RabbitMQ 流的指南

流协议的客户端库在多个平台上可用。

注意:带有对勾(✓)的项目由 VMware 的 RabbitMQ 团队正式支持。

使用流性能测试来模拟工作负载并衡量 RabbitMQ 流系统的性能。

启用插件

流插件包含在 RabbitMQ 分发版中。在客户端成功连接之前,必须使用rabbitmq-plugins启用它。

rabbitmq-plugins enable rabbitmq_stream

插件配置

TCP 侦听器

如果没有指定配置,流适配器将在端口 5552 上的所有接口上侦听,并具有默认的 guest/guest 用户登录名/密码。

流侦听器将侦听的端口可以通过rabbitmq.conf更改。

下面是一个最小配置文

stream.listeners.tcp.1 = 12345

而将侦听器更改为仅侦听本地主机(对于 IPv4 和 IPv6)的配置将如下所示

stream.listeners.tcp.1 = 127.0.0.1:5552
stream.listeners.tcp.2 = ::1:5552

TCP 侦听器选项

该插件支持 TCP 侦听器选项配置。

这些设置使用一个通用前缀,stream.tcp_listen_options,并控制 TCP 缓冲区大小、入站 TCP 连接队列长度、是否启用TCP 保活等。有关详细信息,请参阅网络指南

stream.listeners.tcp.1 = 127.0.0.1:5552
stream.listeners.tcp.2 = ::1:5552

stream.tcp_listen_options.backlog = 4096
stream.tcp_listen_options.recbuf = 131072
stream.tcp_listen_options.sndbuf = 131072

stream.tcp_listen_options.keepalive = true
stream.tcp_listen_options.nodelay = true

stream.tcp_listen_options.exit_on_close = true
stream.tcp_listen_options.send_timeout = 120

心跳超时

心跳超时值定义在多长时间后,RabbitMQ 和客户端库应将对等 TCP 连接视为无法访问(关闭)。

RabbitMQ 支持的消息传递协议使用类似的机制

流协议连接的默认值为 60 秒。

# use a lower heartbeat timeout value
stream.heartbeat = 20

将心跳超时值设置得太低会导致误报(在对等方实际上可用时将其视为不可用),因为网络拥塞、短暂的服务器流量控制等因素。

在选择超时值时应考虑这一点。

多年来来自用户和客户端库维护者的反馈表明,低于 5 秒的值很可能导致误报,而 1 秒或更低的值很可能导致误报。5 到 20 秒范围内的值对于大多数环境来说是最佳的。

流量控制

如果代理无法跟上写入和复制入站消息,快速发布者可能会压倒代理。因此,每个连接在被阻止之前都允许一定数量的未确认消息(initial_credits,默认值为 50,000)。当确认一定数量的消息时,连接将解除阻止(credits_required_for_unblocking,默认值为 12,500)。您可以根据工作负载更改这些值。

stream.initial_credits = 100000
stream.credits_required_for_unblocking = 25000

这些设置的高值可以提高发布吞吐量,但会增加内存消耗(这可能导致代理崩溃)。低值可以帮助应对大量中等速度的发布连接。

此设置仅适用于发布者,不适用于消费者。

消费者信用流量

本节介绍流协议信用流量机制,该机制允许消费者控制代理如何调度消息。

消费者在其创建订阅时提供一定数量的初始信用。信用代表代理允许发送给消费者的消息

是一批消息。这是 RabbitMQ Stream 中使用的存储和传输单元,即消息连续存储在一个块中,并且它们作为块的一部分传递。块可以由一到几千条消息组成,具体取决于入口。

因此,如果消费者创建了一个具有 5 个初始信用的订阅,代理将发送 5 个消息块。每次传递块时,代理都会减去一个信用。当订阅没有剩余信用时,代理将停止发送消息。所以在我们的示例中,代理将在传递完 5 个块后停止为该订阅发送消息。这通常不是我们想要的,因此消费者可以为其订阅提供信用以获取更多消息。

这取决于消费者(即客户端库和/或应用程序)来提供信用,具体取决于它处理消息的速度。我们希望消息连续流动,因此一个好的经验法则是用至少 2 个信用创建订阅,并在每个新的消息块上提供一个信用。通过这样做,网络上应该始终有一些消息在流动,消费者应该一直处于繁忙状态,而不是处于空闲状态。

消费者可以使用此信用流量机制选择代理如何将消息传递给他们。这有助于避免消费者超载或空闲。消费者信用流量如何公开给应用程序取决于客户端库,没有服务器端设置可以更改其行为。

公布的主机和端口

流协议允许发现流的拓扑结构,即给定一组流的领导者和副本在集群中的位置。这样,客户端可以选择连接到适当的节点以与流进行交互:领导者节点用于发布,副本用于消费。默认情况下,节点会返回其主机名和侦听器端口,这对于大多数情况来说可能足够好,但并非总是如此(代理位于集群节点和客户端之间,集群节点和/或客户端运行在容器中,等等)。

advertised_hostadvertised_port 键允许指定代理节点在被询问流的拓扑结构时返回哪些信息。您可以根据基础设施设置这些设置,以便客户端可以连接到集群节点。

stream.advertised_host = rabbitmq-1
stream.advertised_port = 12345

连接到流 博客文章介绍了为什么在某些部署中需要 advertised_hostadvertised_port 设置。

最大帧大小

RabbitMQ Stream 协议使用最大帧大小限制。默认值为 1 MiB,如果需要,可以增加该值。

# in bytes
stream.frame_max = 2097152

TLS 支持

要对流连接使用 TLS,必须在代理中配置 TLS。要启用支持 TLS 的流连接,请使用 stream.listeners.ssl.* 配置键添加流的 TLS 侦听器。

该插件将使用核心 RabbitMQ 服务器证书和密钥(与 AMQP 0-9-1 和 AMQP 1.0 侦听器一样)。

ssl_options.cacertfile = /path/to/tls/ca_certificate.pem
ssl_options.certfile = /path/to/tls/server_certificate.pem
ssl_options.keyfile = /path/to/tls/server_key.pem
ssl_options.verify = verify_peer
ssl_options.fail_if_no_peer_cert = true

stream.listeners.tcp.1 = 5552
# default TLS-enabled port for stream connections
stream.listeners.ssl.1 = 5551

此配置在端口 5552 上创建一个标准 TCP 侦听器,并在端口 5551 上创建一个 TLS 侦听器。

当设置 TLS 侦听器时,您可能希望停用所有非 TLS 侦听器。这可以通过以下方式配置。

stream.listeners.tcp   = none
stream.listeners.ssl.1 = 5551

就像普通连接一样,可以配置公布的 TLS 主机和端口。当使用 TLS 时,插件将返回以下元数据。

  • 主机名:如果已设置,则为 advertised_host,如果未设置 advertised_host,则为主机名
  • 端口:当前 TLS 端口

可以通过一起设置或单独设置 advertised_tls_hostadvertised_tls_port 配置项来覆盖此行为。

stream.advertised_host = private-rabbitmq-1
stream.advertised_port = 12345
stream.advertised_tls_host = public-rabbitmq-1
stream.advertised_tls_port = 12344
© 2024 RabbitMQ. All rights reserved.