流插件
概述
流是一种持久化且复制的数据结构,它模拟了具有非破坏性消费者语义的追加日志。
此功能在所有当前维护的发布系列中可用。
流可以用作常规的 AMQP 0.9.1 队列,也可以通过专用二进制协议 插件和相关客户端使用。请查看流核心和流插件比较页面以获取功能矩阵。
本页面介绍流插件,该插件允许使用此新二进制协议与流进行交互。有关概念和操作流方式的概述,请参阅有关 RabbitMQ 流的指南。
流协议的客户端库在多个平台上可用。
注意:带有对勾(✓)的项目由 VMware 的 RabbitMQ 团队正式支持。
- ✓ RabbitMQ Java 流客户端
- ✓ RabbitMQ Golang 流客户端
- ✓ RabbitMQ .NET 流客户端
- ✓ RabbitMQ Rust 流客户端
- ✓ RabbitMQ Python 流客户端 (rstream)
- RabbitMQ Python 流客户端 (rbfly)
- RabbitMQ NodeJS 流客户端
- RabbitMQ Erlang 流客户端 (lake)
- RabbitMQ Elixir 流客户端
- RabbitMQ C 流客户端
使用流性能测试来模拟工作负载并衡量 RabbitMQ 流系统的性能。
启用插件
流插件包含在 RabbitMQ 分发版中。在客户端成功连接之前,必须使用rabbitmq-plugins启用它。
rabbitmq-plugins enable rabbitmq_stream
插件配置
TCP 侦听器
如果没有指定配置,流适配器将在端口 5552 上的所有接口上侦听,并具有默认的 guest
/guest
用户登录名/密码。
流侦听器将侦听的端口可以通过rabbitmq.conf
更改。
下面是一个最小配置文
stream.listeners.tcp.1 = 12345
而将侦听器更改为仅侦听本地主机(对于 IPv4 和 IPv6)的配置将如下所示
stream.listeners.tcp.1 = 127.0.0.1:5552
stream.listeners.tcp.2 = ::1:5552
TCP 侦听器选项
该插件支持 TCP 侦听器选项配置。
这些设置使用一个通用前缀,stream.tcp_listen_options
,并控制 TCP 缓冲区大小、入站 TCP 连接队列长度、是否启用TCP 保活等。有关详细信息,请参阅网络指南。
stream.listeners.tcp.1 = 127.0.0.1:5552
stream.listeners.tcp.2 = ::1:5552
stream.tcp_listen_options.backlog = 4096
stream.tcp_listen_options.recbuf = 131072
stream.tcp_listen_options.sndbuf = 131072
stream.tcp_listen_options.keepalive = true
stream.tcp_listen_options.nodelay = true
stream.tcp_listen_options.exit_on_close = true
stream.tcp_listen_options.send_timeout = 120
心跳超时
心跳超时
值定义在多长时间后,RabbitMQ 和客户端库应将对等 TCP 连接视为无法访问(关闭)。
RabbitMQ 支持的消息传递协议使用类似的机制。
流协议连接的默认值为 60 秒。
# use a lower heartbeat timeout value
stream.heartbeat = 20
将心跳超时值设置得太低会导致误报(在对等方实际上可用时将其视为不可用),因为网络拥塞、短暂的服务器流量控制等因素。
在选择超时值时应考虑这一点。
多年来来自用户和客户端库维护者的反馈表明,低于 5 秒的值很可能导致误报,而 1 秒或更低的值很可能导致误报。5 到 20 秒范围内的值对于大多数环境来说是最佳的。
流量控制
如果代理无法跟上写入和复制入站消息,快速发布者可能会压倒代理。因此,每个连接在被阻止之前都允许一定数量的未确认消息(initial_credits
,默认值为 50,000)。当确认一定数量的消息时,连接将解除阻止(credits_required_for_unblocking
,默认值为 12,500)。您可以根据工作负载更改这些值。
stream.initial_credits = 100000
stream.credits_required_for_unblocking = 25000
这些设置的高值可以提高发布吞吐量,但会增加内存消耗(这可能导致代理崩溃)。低值可以帮助应对大量中等速度的发布连接。
此设置仅适用于发布者,不适用于消费者。
消费者信用流量
本节介绍流协议信用流量机制,该机制允许消费者控制代理如何调度消息。
消费者在其创建订阅时提供一定数量的初始信用。信用代表代理允许发送给消费者的消息块。
块是一批消息。这是 RabbitMQ Stream 中使用的存储和传输单元,即消息连续存储在一个块中,并且它们作为块的一部分传递。块可以由一到几千条消息组成,具体取决于入口。
因此,如果消费者创建了一个具有 5 个初始信用的订阅,代理将发送 5 个消息块。每次传递块时,代理都会减去一个信用。当订阅没有剩余信用时,代理将停止发送消息。所以在我们的示例中,代理将在传递完 5 个块后停止为该订阅发送消息。这通常不是我们想要的,因此消费者可以为其订阅提供信用以获取更多消息。
这取决于消费者(即客户端库和/或应用程序)来提供信用,具体取决于它处理消息的速度。我们希望消息连续流动,因此一个好的经验法则是用至少 2 个信用创建订阅,并在每个新的消息块上提供一个信用。通过这样做,网络上应该始终有一些消息在流动,消费者应该一直处于繁忙状态,而不是处于空闲状态。
消费者可以使用此信用流量机制选择代理如何将消息传递给他们。这有助于避免消费者超载或空闲。消费者信用流量如何公开给应用程序取决于客户端库,没有服务器端设置可以更改其行为。
公布的主机和端口
流协议允许发现流的拓扑结构,即给定一组流的领导者和副本在集群中的位置。这样,客户端可以选择连接到适当的节点以与流进行交互:领导者节点用于发布,副本用于消费。默认情况下,节点会返回其主机名和侦听器端口,这对于大多数情况来说可能足够好,但并非总是如此(代理位于集群节点和客户端之间,集群节点和/或客户端运行在容器中,等等)。
advertised_host
和 advertised_port
键允许指定代理节点在被询问流的拓扑结构时返回哪些信息。您可以根据基础设施设置这些设置,以便客户端可以连接到集群节点。
stream.advertised_host = rabbitmq-1
stream.advertised_port = 12345
连接到流 博客文章介绍了为什么在某些部署中需要 advertised_host
和 advertised_port
设置。
最大帧大小
RabbitMQ Stream 协议使用最大帧大小限制。默认值为 1 MiB,如果需要,可以增加该值。
# in bytes
stream.frame_max = 2097152
TLS 支持
要对流连接使用 TLS,必须在代理中配置 TLS。要启用支持 TLS 的流连接,请使用 stream.listeners.ssl.*
配置键添加流的 TLS 侦听器。
该插件将使用核心 RabbitMQ 服务器证书和密钥(与 AMQP 0-9-1 和 AMQP 1.0 侦听器一样)。
ssl_options.cacertfile = /path/to/tls/ca_certificate.pem
ssl_options.certfile = /path/to/tls/server_certificate.pem
ssl_options.keyfile = /path/to/tls/server_key.pem
ssl_options.verify = verify_peer
ssl_options.fail_if_no_peer_cert = true
stream.listeners.tcp.1 = 5552
# default TLS-enabled port for stream connections
stream.listeners.ssl.1 = 5551
此配置在端口 5552 上创建一个标准 TCP 侦听器,并在端口 5551 上创建一个 TLS 侦听器。
当设置 TLS 侦听器时,您可能希望停用所有非 TLS 侦听器。这可以通过以下方式配置。
stream.listeners.tcp = none
stream.listeners.ssl.1 = 5551
就像普通连接一样,可以配置公布的 TLS 主机和端口。当使用 TLS 时,插件将返回以下元数据。
- 主机名:如果已设置,则为
advertised_host
,如果未设置advertised_host
,则为主机名 - 端口:当前 TLS 端口
可以通过一起设置或单独设置 advertised_tls_host
和 advertised_tls_port
配置项来覆盖此行为。
stream.advertised_host = private-rabbitmq-1
stream.advertised_port = 12345
stream.advertised_tls_host = public-rabbitmq-1
stream.advertised_tls_port = 12344