跳至主要内容
版本:4.0

配置动态铲子

概述

本指南重点介绍动态配置的铲子。假设您熟悉 铲子插件 背后的关键概念。

与静态铲子不同,动态铲子是使用 运行时参数 配置的。它们可以随时启动和停止,包括以编程方式。动态铲子可用于临时(一次性)和永久运行的工作负载。

有关动态铲子的信息存储在 RabbitMQ 的模式数据库中,与用户、权限、队列等一起。因此,它们可以与其他 模式定义 一起导出,并结合使用 预声明拓扑模式

配置

可以使用 rabbitmqctl、通过 管理 HTTP API 或(使用 rabbitmq_shovel_management 插件 启用)通过管理 UI 的管理部分来定义参数。

铲子是使用定义体声明的,定义体是一个 JSON 对象。一些键是强制性的,其他键是可选的。它们控制连接参数、使用的协议、消息传输源和目标、数据安全性 协议功能等等。

每个铲子都属于一个虚拟主机。请注意,铲子不仅可以从不同的虚拟主机消费和发布,还可以从完全不同的集群消费和发布,因此虚拟主机选择主要充当组织铲子和访问它们的工具,就像 RabbitMQ 中的其他 权限 一样。

每个铲子也都有一个名称。在 检查其状态删除它们重新启动它们 时,名称用于识别铲子。

声明动态铲子

在这个例子中,我们将设置一个动态铲子,它将使用 AMQP 0-9-1,将本地 RabbitMQ 集群中的队列 "source-queue" 中的消息移动到远程 RabbitMQ 节点上的队列 "target-queue"

使用 CLI 工具

使用 rabbitmqctl set_parameter 命令声明铲子,该命令具有组件名称 shovel、铲子名称和作为 JSON 文档的定义体

# my-shovel here is the name of the shovel
rabbitmqctl set_parameter shovel my-shovel \
'{"src-protocol": "amqp091", "src-uri": "amqp://", "src-queue": "source-queue", "dest-protocol": "amqp091", "dest-uri": "amqp://remote-server", "dest-queue": "target-queue", "dest-queue-args": {"x-queue-type": "quorum"}}'

在 Windows 上,rabbitmqctl 被命名为 rabbitmqctl.bat,命令行值转义将不同

rabbitmqctl.bat set_parameter shovel my-shovel ^
"{""src-protocol"": ""amqp091"", ""src-uri"":""amqp://127.0.0.1"", ""src-queue"": ""source-queue"", ^
""dest-protocol"": ""amqp091"", ""dest-uri"": ""amqp://remote.rabbitmq.local"", ^
""dest-queue"": ""target-queue"", ""dest-queue-args"": {""x-queue-type"": ""quorum""}}"

本例中的主体包含几个键

基本动态铲子定义设置
描述
src-uri

源连接 URI。必填。有关 RabbitMQ 如何一般处理 AMQP URI 的信息,请参阅 AMQP URI 参考

cacertfilecertfilekeyfile

客户端 TLS 证书和私钥路径。有关详细信息,请参阅 TLS 指南。仅在 URI 方案为 amqps 时有用。

verifyfail_if_no_peer_cert

用于启用或禁用对服务器 TLS 证书的对等验证。有关详细信息,请参阅 TLS 指南。仅在 URI 方案为 amqps 时有用。

重要

请注意,从 Erlang 26 开始,默认情况下为 TLS 客户端(例如铲子)启用对等验证。

此参数的值可以是字符串,也可以是字符串列表。如果提供多个字符串,铲子将随机从列表中选择 **一个** URI,直到其中一个端点成功。

src-protocol

连接到源时要使用的协议。amqp091amqp10 之一。如果省略,则默认为 amqp091。请参阅下面的协议特定属性。

src-queue

铲子将从中消费的源队列。要从中消费的队列。必须设置此值或 src-exchange(但不能同时设置)。

如果源队列在目标虚拟主机上不存在,并且未提供 src-queue-args 参数,铲子将声明一个经典的持久队列,没有可选参数。

铲子可以使用预声明的拓扑,而不是声明源。请参阅下面的 预声明拓扑 部分。

src-queue-args

src-queue 声明的可选参数,例如队列类型。

dest-uri

与上面的 src-uri 相同,但用于目标连接。

dest-protocol

连接到目标时要使用的协议。amqp091amqp10 之一。如果省略,则默认为 amqp091。请参阅下面的协议特定属性。

dest-queue

应将消息发布到的队列。可以设置此值或 dest-exchange(但不能同时设置)。如果两者都没有设置,则消息将使用其原始交换机和路由键重新发布。

如果目标队列在目标虚拟主机上不存在,并且未提供 dest-queue-args 参数,铲子将声明一个经典的持久队列,没有可选参数。

铲子可以使用预声明的拓扑,而不是声明目标。请参阅下面的 预声明拓扑 部分。

dest-queue-args

dest-queue 声明的可选参数,例如队列类型。

本指南稍后将介绍其他铲子定义键。

预声明拓扑

铲子可以使用预声明的拓扑,而不是声明其源和目标。

例如,当拓扑 从启动时导入的定义文件导入 时,这可能是必需的。使用预声明的拓扑可以避免定义导入和铲子插件启动之间的“先有鸡还是先有蛋”问题:插件必须启用才能验证定义(运行时参数部分)。

以下是插件如何配置以等待源可用 使用 rabbitmq.conf

# all shovels started on this node will use pre-declared topology
shovel.topology.predeclared = true

如果只有部分铲子需要使用预声明的拓扑,可以使用以下铲子属性为特定铲子配置相同行为

其他动态铲子定义设置
描述
src-predeclared

当设置为 true 时,插件将等待 src-queue 可用,而不是使用 src-queue-args 自行声明拓扑。

dest-predeclared

当设置为 true 时,插件将等待 dest-queue 可用,而不是使用 dest-queue-args 自行声明拓扑。

使用 HTTP API

要使用 HTTP API 声明铲子,请确保 管理 插件已启用,然后使用以下端点

PUT /api/parameters/shovel/{vhost}/{name}

其中 {vhost} 是铲子应启动的虚拟主机,{name} 是新铲子的名称。该端点要求调用它的用户具有 policymaker 权限(标签)。

请求主体是一个 JSON 文档,其结构类似于本指南前面描述的结构

{
"value": {
"src-protocol": "amqp091",
"src-uri": "amqp://127.0.0.1",
"src-queue": "source-queue",
"dest-protocol": "amqp091",
"dest-uri": "amqp://remote.rabbitmq.local",
"dest-queue": "destination-queue"
}
}

以下是一个使用 curl 在本地节点上使用 默认用户凭据 声明铲子的示例。铲子将在默认虚拟主机中将两个队列 "source-queue""destination-queue" 之间传输消息。

请注意,如果针对远程节点调用,此确切命令将失败。请 添加一个新的用户,并将其标记为 policymaker,以进行自己的实验。

# Note: this user's access is limited to localhost!
curl -v -u guest:guest -X PUT https://127.0.0.1:15672/api/parameters/shovel/%2f/my-shovel \
-H "content-type: application/json" \
-d @- <<EOF
{
"value": {
"src-protocol": "amqp091",
"src-uri": "amqp://127.0.0.1",
"src-queue": "source-queue",
"dest-protocol": "amqp091",
"dest-uri": "amqp://127.0.0.1",
"dest-queue": "destination-queue"
}
}
EOF

使用管理 UI

要使用管理 UI 声明铲子,首先请确保 管理 插件已启用。

然后

  • 导航到 Admin > Shovel Management > Add a new shovel
  • 使用本指南前面介绍的铲子参数填写表单
  • 单击添加铲子

检查动态铲子的状态

使用 CLI 工具

使用 rabbitmqctl shovel_status 检查集群中的动态铲子。rabbitmq_shovel 插件必须在执行此命令的主机上启用。

rabbitmqctl shovel_status --formatter=pretty_table

输出可以格式化为 JSON 并重定向到诸如 jq 之类的工具

rabbitmqctl shovel_status --formatter=json | jq

使用 HTTP API

GET /api/shovels 是一个端点,可用于列出集群中的动态铲子。该端点由 rabbitmq_shovel_management 插件提供,该插件必须在目标节点上启用。

# Note: this user's access is limited to localhost!
curl -v -u guest:guest -X GET https://127.0.0.1:15672/api/shovels/

要检查特定虚拟主机中的铲子,请使用 GET /api/shovels/{vhost} {vhost} 是虚拟主机名称。该值必须进行百分比编码。

# Note: this user's access is limited to localhost!
curl -v -u guest:guest -X GET https://127.0.0.1:15672/api/shovels/%2f

要检查特定铲子的状态,请使用 GET /api/shovels/vhost/{vhost}/{name} {vhost} 是铲子运行的虚拟主机,{name} 是铲子的名称。这两个值都必须进行百分比编码。

# Note: this user's access is limited to localhost!
curl -v -u guest:guest -X GET https://127.0.0.1:15672/api/shovels/vhost/%2f/my-shovel

使用管理 UI

  • 导航到 Admin > Shovel Status
  • 在表格中找到您感兴趣的铲子

重新启动铲子

动态铲斗可以重新启动。重新启动铲斗会短暂地中断其操作,并使其重新连接到源和目标。当铲斗使用适当的 确认模式 时,中断是安全的:从源消费或发布到目标的任何未确认或未确认(“正在传输”)的消息将在铲斗停止时自动重新排队,并在重新启动后再次消费。

使用 CLI 工具

使用 rabbitmqctl restart_shovel 使用铲斗的名称重新启动铲斗。必须在执行此命令的主机上启用 rabbitmq_shovel 插件。

rabbitmqctl restart_shovel "my-shovel"

使用 HTTP API

DELETE /api/shovels/vhost/{vhost}/{name}/restart 是一个重新启动动态铲斗的端点。该端点由 rabbitmq_shovel_management 插件提供,必须在目标节点上启用。

{vhost} 是铲斗运行的虚拟主机,{name} 是要重新启动的铲斗的名称。这两个值都必须进行百分比编码。

# Note: this user's access is limited to localhost!
curl -v -u guest:guest -X DELETE https://127.0.0.1:15672/api/shovels/vhost/%2f/my-shovel/restart

使用管理 UI

  • 导航到 Admin > Shovel Status
  • 在表格中找到您感兴趣的铲子
  • 单击“重新启动”并等待下一个 UI 刷新

删除铲斗

使用 CLI 工具

要使用 CLI 工具删除铲斗,请使用 rabbitmqctl clear_parameter 并为组件名称传递 shovel 以及要删除的铲斗的名称。

rabbitmqctl clear_parameter shovel "my-shovel"

使用 HTTP API

DELETE /api/parameters/shovel/{vhost}/{name} 是一个用于删除铲斗的端点。

{vhost} 是铲斗运行的虚拟主机,{name} 是要删除的铲斗的名称。这两个值都必须进行百分比编码。

# Note: this user's access is limited to localhost!
curl -v -u guest:guest -X DELETE https://127.0.0.1:15672/api/parameters/shovel/%2f/my-shovel

AMQP 0-9-1 铲斗定义参考

在上面的示例中,没有介绍几种铲斗属性。它们不会从根本上改变动态铲斗的工作方式,也不会改变声明过程。

可选的动态铲斗定义设置(AMQP 0-9-1)
描述
reconnect-delay

断开连接后等待重新连接到代理的持续时间(以秒为单位)。默认值为 1。

ack-mode

确定铲斗应如何 确认 消费的消息。有效值为 on-confirmon-publishno-ack。默认情况下使用 on-confirm

如果设置为 on-confirm(默认值),则在目标确认消息后,将向源代理 确认 消息。这处理网络错误和代理故障,而不会丢失消息,并且是速度最慢的选择。

如果设置为 on-publish,则在将消息发布到目标(但尚未确认)后,将向源代理 确认 消息。这处理网络错误,而不会丢失消息,但在代理故障的情况下可能会丢失消息。

如果设置为 no-ack,则将使用 自动消息确认。此选项将提供最高的吞吐量,但并不安全(在网络或代理故障的情况下会丢失消息)。

src-delete-after

确定铲斗何时(如果有)应删除自身。如果将铲斗更多地视为移动操作(例如,用于将消息从一个队列移动到另一个队列),则这将非常有用。

默认值为 never,这意味着铲斗永远不应该删除自身。

如果设置为 queue-length,则铲斗将在启动时测量源队列的长度,并在传输完那么多消息后删除自身。

如果设置为整数,则铲斗将在删除自身之前传输那么多消息。

src-prefetch-count

铲斗在任何时间点复制的未确认消息的最大数量。默认值为 1000

src-exchange

要从中消费的交换机。必须设置此项或 src-queue(但不能同时设置两者)。

铲斗将在消费来自队列之前,声明一个专用的队列并将其绑定到具有 src-exchange-key 的命名交换机。

如果源交换机在源代理上不存在,则不会声明它;铲斗将无法启动。

src-exchange-key

使用 src-exchange 时的路由键。

src-consumer-args

消费者参数,例如 x-single-active-consumerx-stream-offset

dest-exchange

应将消息发布到的交换机。可以设置此项或 dest-queue(但不能同时设置两者)。

如果目标交换机在目标代理上不存在,则不会声明它;铲斗将无法启动。

dest-exchange-key

使用 dest-exchange 时的路由键。如果未设置此项,则将使用原始消息的路由键。

dest-publish-properties

要覆盖铲斗消息时使用的属性映射(JSON 对象)。目前不支持以这种方式设置标头。默认值为

dest-add-forward-headers

是否将 x-shovelled 标头添加到铲斗消息,以指示消息已从何处铲斗到何处。默认值为 false。

dest-add-timestamp-header

是否将 x-shovelled-timestamp 标头添加到铲斗消息中,该标头包含消息铲斗时的时间戳(以自纪元以来的秒数为单位)。默认值为 false。

AMQP 1.0 铲斗定义参考

AMQP 1.0 源和目标属性与 AMQP 0-9-1 对等属性有一些区别。

可选的动态铲斗定义设置(AMQP 1.0)
描述
src-uri

源的 AMQP URI。必需。AMQP 1.0 URI 实现了 AMQP URI 参考 中描述的一部分内容。AMQP 1.0 中没有 虚拟主机 概念,因此不支持 URI 路径段。它支持的查询参数集与 AMQP 0.9.1 URI(s) 不同

idle_time_out
心跳间隔
hostname

目标主机的名称。某些供应商(如 Azure ServiceBus)要求即使主机与 uri 中的主机段相同,也必须设置此项。

sasl

anonnoneplain 默认值为:none。使用 plain 时,需要设置 URI 的用户和密码段。

cacertfilecertfilekeyfile

客户端 TLS 证书和私钥路径。有关详细信息,请参阅 TLS 指南。仅在 URI 方案为 amqps 时有用。

verifyfail_if_no_peer_cert

用于启用或禁用对服务器 TLS 证书的对等验证。有关详细信息,请参阅 TLS 指南。仅在 URI 方案为 amqps 时有用。

::: important

请注意,从 Erlang 26 开始,默认情况下为 TLS 客户端(例如铲子)启用对等验证。

:::

src-address

AMQP 1.0 链接地址。必需。

dest-address

AMQP 1.0 链接地址。必需。

src-prefetch-count

铲斗在任何时间点复制的未确认消息的最大数量。默认值为 1000

dest-properties

铲斗消息时要覆盖的属性。有关所有可能属性的详细信息,请参见 AMQP 1.0 规范 §3.2.4。

dest-application-properties

铲斗消息时要设置的应用程序属性。

dest-add-forward-headers

是否将 x-shovelled 应用程序属性添加到铲斗消息,以指示消息已从何处铲斗到何处。默认值为 false。

dest-add-timestamp-header

是否将 creation_time 标头设置为消息重新发布时的时间戳(以自纪元以来的毫秒数为单位)。默认值为 false。

reconnect-delay

断开连接后等待重新连接到代理的持续时间(以秒为单位)。默认值为 1。

ack-mode

确定铲斗应如何 确认 消费的消息。有效值为 on-confirmon-publishno-ack。默认情况下使用 on-confirm

如果设置为 on-confirm(默认值),则在目标确认消息后,将向源代理 确认 消息。这处理网络错误和代理故障,而不会丢失消息,并且是速度最慢的选择。

如果设置为 on-publish,则在将消息发布到目标(但尚未确认)后,将向源代理 确认 消息。这处理网络错误,而不会丢失消息,但在代理故障的情况下可能会丢失消息。

如果设置为 no-ack,则将使用 自动消息确认。此选项将提供最高的吞吐量,但并不安全(在网络或代理故障的情况下会丢失消息)。

src-delete-after

确定铲斗何时(如果有)应删除自身。如果将铲斗更多地视为移动操作(例如,用于将消息从一个队列移动到另一个队列),则这将非常有用。

默认值为 never,这意味着铲斗永远不应该删除自身。

如果设置为 queue-length,则铲斗将在启动时测量源队列的长度,并在传输完那么多消息后删除自身。

如果设置为整数,则铲斗将在删除自身之前传输那么多消息。

监控铲斗

请参见铲斗插件指南概述中的 监控铲斗

© 2024 RabbitMQ. All rights reserved.