跳转到主要内容
版本: 4.1

配置动态 Shovel

概述

本指南重点介绍动态配置的 Shovel。它假设您熟悉 Shovel 插件 背后的关键概念。

与静态 Shovel 不同,动态 Shovel 使用 运行时参数 进行配置。 它们可以随时启动和停止,包括以编程方式。 动态 Shovel 可用于瞬态(一次性)和永久运行的工作负载。

有关动态 Shovel 的信息存储在 RabbitMQ 的模式数据库中,以及用户、权限、队列等。 因此,它们可以与其他 模式定义 结合 预声明拓扑模式 一起导出。

配置

可以使用 rabbitmqctl、通过 管理 HTTP API 或(在 启用 rabbitmq_shovel_management 插件的情况下)通过管理 UI 的管理部分来定义参数。

Shovel 通过定义主体(JSON 对象)声明。 有些键是强制性的,有些是可选的。 它们控制连接参数、使用的协议、消息传输源和目标、数据安全 协议功能等等。

每个 Shovel 都属于一个虚拟主机。 请注意,Shovel 不仅可以从不同的虚拟主机消费和发布到不同的虚拟主机,还可以消费和发布到完全不同的集群,因此虚拟主机的选择主要用作组织 Shovel 和访问它们的一种方式,就像 RabbitMQ 中其余的 权限控制 一样。

每个 Shovel 也被命名。 该名称用于在 检查其状态删除它们重启它们 时识别 Shovel。

声明动态 Shovel

在本示例中,我们将设置一个动态 Shovel,它将使用 AMQP 0-9-1 将消息从本地 RabbitMQ 集群中的队列 "source-queue" 移动到远程 RabbitMQ 节点上的队列 "target-queue"

使用 CLI 工具

Shovel 使用 rabbitmqctl set_parameter 命令声明,组件名称为 shovel,Shovel 名称和一个定义主体(JSON 文档)

# my-shovel here is the name of the shovel
rabbitmqctl set_parameter shovel my-shovel \
'{"src-protocol": "amqp091", "src-uri": "amqp://", "src-queue": "source-queue", "dest-protocol": "amqp091", "dest-uri": "amqp://remote-server", "dest-queue": "target-queue", "dest-queue-args": {"x-queue-type": "quorum"}}'

在 Windows 上,rabbitmqctl 命名为 rabbitmqctl.bat,并且命令行值转义会有所不同

rabbitmqctl.bat set_parameter shovel my-shovel ^
"{""src-protocol"": ""amqp091"", ""src-uri"":""amqp://localhost"", ""src-queue"": ""source-queue"", ^
""dest-protocol"": ""amqp091"", ""dest-uri"": ""amqp://remote.rabbitmq.local"", ^
""dest-queue"": ""target-queue"", ""dest-queue-args"": {""x-queue-type"": ""quorum""}}"

本示例中的主体包含几个键

必要的动态 Shovel 定义设置
描述
src-uri

源连接 URI。 强制性。 有关 RabbitMQ 如何处理 AMQP URI 的信息,请参阅 AMQP URI 参考

cacertfile, certfile, keyfile

客户端 TLS 证书和私钥路径。 有关详细信息,请参阅 TLS 指南。 仅当 URI 方案为 amqps 时才有用。

verify, fail_if_no_peer_cert

用于启用或禁用服务器 TLS 证书的对等方验证。 请参阅 使用 TLS 保护 Shovel 连接 和通用的 TLS 指南 以了解更多信息。 仅当 URI 方案为 amqps 时才有用。

重要提示

请注意,从 Erlang 26 开始,默认情况下为 TLS 客户端(例如 Shovel)启用对等方验证。

此参数的值可以是字符串或字符串列表。 如果提供多个字符串,则 Shovel 将随机从列表中选择 一个 URI,直到其中一个端点成功。

src-protocol

连接到源时要使用的协议。 可以是 amqp091amqp10。 如果省略,则默认为 amqp091。 请参阅下面的协议特定属性。

src-queue

Shovel 将从中消费的源队列。 要从中消费的队列。 必须设置此项或 src-exchange(但不能同时设置)。

如果目标虚拟主机上不存在源队列,并且未提供 src-queue-args 参数,则 Shovel 将声明一个没有可选参数的经典持久队列。

Shovel 可以使用预声明的拓扑而不是声明源。 请参阅下面的 预声明拓扑 部分。

src-queue-args

src-queue 声明的可选参数,例如队列类型。

dest-uri

与上面的 src-uri 相同,但用于目标连接。

dest-protocol

连接到目标时要使用的协议。 可以是 amqp091amqp10。 如果省略,则默认为 amqp091。 请参阅下面的协议特定属性。

dest-queue

消息应发布到的队列。 可以设置此项或 dest-exchange(但不能同时设置)。 如果两者都未设置,则消息将使用其原始交换机和路由键重新发布。

如果目标虚拟主机中不存在目标队列,并且未提供 dest-queue-args 参数,则 Shovel 将声明一个没有可选参数的经典持久队列。

Shovel 可以使用预声明的拓扑而不是声明目标。 请参阅下面的 预声明拓扑 部分。

dest-queue-args

dest-queue 声明的可选参数,例如队列类型。

本指南稍后将介绍其他 Shovel 定义键。

预声明拓扑

Shovel 可以使用预声明的拓扑而不是声明其源和目标。

例如,当拓扑在 启动时从定义文件导入 时,这可能是必要的。 使用预声明的拓扑避免了定义导入和 Shovel 插件启动之间的先有鸡还是先有蛋的问题:必须启用插件才能使定义通过验证(运行时参数部分)。

以下是如何配置插件以等待源可用 使用 rabbitmq.conf

# all shovels started on this node will use pre-declared topology
shovel.topology.predeclared = true

如果只有一些 Shovel 需要使用预声明的拓扑,则可以使用以下 Shovel 属性为特定 Shovel 配置相同的行为

其他动态 Shovel 定义设置
描述
src-predeclared

设置为 true 时,插件会等待 src-queue 可用,而不是使用 src-queue-args 声明拓扑本身。

dest-predeclared

设置为 true 时,插件会等待 dest-queue 可用,而不是使用 dest-queue-args 声明拓扑本身。

使用 HTTP API

要使用 HTTP API 声明 Shovel,请确保 管理 插件已启用,然后使用以下端点

PUT /api/parameters/shovel/{vhost}/{name}

其中 {vhost} 是应在其中启动 Shovel 的虚拟主机,{name} 是新 Shovel 的名称。 该端点要求调用它的用户具有 policymaker 权限(标签)。

请求正文是一个 JSON 文档,其结构与本指南前面描述的结构类似

{
"value": {
"src-protocol": "amqp091",
"src-uri": "amqp://localhost",
"src-queue": "source-queue",
"dest-protocol": "amqp091",
"dest-uri": "amqp://remote.rabbitmq.local",
"dest-queue": "destination-queue"
}
}

下面是一个示例,该示例使用 curl 在本地节点上使用 默认用户凭据 声明 Shovel。 Shovel 将在默认虚拟主机中的两个队列 "source-queue""destination-queue" 之间传输消息。

请注意,如果针对远程节点调用此确切命令,则会失败。 请添加一个标记为 policymaker 的新用户 以进行您自己的实验。

# Note: this user's access is limited to localhost!
curl -v -u guest:guest -X PUT http://localhost:15672/api/parameters/shovel/%2f/my-shovel \
-H "content-type: application/json" \
-d @- <<EOF
{
"value": {
"src-protocol": "amqp091",
"src-uri": "amqp://localhost",
"src-queue": "source-queue",
"dest-protocol": "amqp091",
"dest-uri": "amqp://localhost",
"dest-queue": "destination-queue"
}
}
EOF

使用管理 UI

要使用管理 UI 声明 Shovel,首先请确保 管理 插件已启用。

然后

  • 导航到 Admin > Shovel 管理 > 添加新的 shovel
  • 使用本指南前面介绍的 Shovel 参数填写表格
  • 单击 添加 shovel

检查动态 Shovel 的状态

使用 CLI 工具

使用 rabbitmqctl shovel_status 检查集群中的动态 Shovel。 必须在执行此命令的主机上启用 rabbitmq_shovel 插件。

rabbitmqctl shovel_status --formatter=pretty_table

输出可以格式化为 JSON 并重定向到诸如 jq 之类的工具

rabbitmqctl shovel_status --formatter=json | jq

使用 HTTP API

GET /api/shovels 是一个可用于列出集群中动态 Shovel 的端点。 该端点由 rabbitmq_shovel_management 插件提供,该插件必须在目标节点上启用。

# Note: this user's access is limited to localhost!
curl -v -u guest:guest -X GET http://localhost:15672/api/shovels/

要检查特定虚拟主机中的 Shovel,请使用 GET /api/shovels/{vhost} {vhost} 是虚拟主机名称。 该值必须进行百分比编码。

# Note: this user's access is limited to localhost!
curl -v -u guest:guest -X GET http://localhost:15672/api/shovels/%2f

要检查特定 Shovel 的状态,请使用 GET /api/shovels/vhost/{vhost}/{name} {vhost} 是 Shovel 正在运行的虚拟主机,{name} 是 Shovel 的名称。 两个值都必须进行百分比编码。

# Note: this user's access is limited to localhost!
curl -v -u guest:guest -X GET http://localhost:15672/api/shovels/vhost/%2f/my-shovel

使用管理 UI

  • 导航到 Admin > Shovel 状态
  • 在表格中找到感兴趣的 Shovel

重启 Shovel

可以重启动态 Shovel。 重启 Shovel 会短暂中断其操作,并使其重新连接到源和目标。 当 Shovel 使用适当的 确认模式 时,中断是安全的:当 Shovel 停止时,从源消费或发布到目标的任何未确认或未确认(“正在传输中”)的消息将自动重新排队,并在重启后再次消费。

使用 CLI 工具

使用 rabbitmqctl restart_shovel 使用其名称重启 Shovel。 必须在执行此命令的主机上启用 rabbitmq_shovel 插件。

rabbitmqctl restart_shovel "my-shovel"

使用 HTTP API

DELETE /api/shovels/vhost/{vhost}/{name}/restart 是重启动态 Shovel 的端点。 该端点由 rabbitmq_shovel_management 插件提供,该插件必须在目标节点上启用。

{vhost} 是 Shovel 正在运行的虚拟主机,{name} 是要重启的 Shovel 的名称。 两个值都必须进行百分比编码。

# Note: this user's access is limited to localhost!
curl -v -u guest:guest -X DELETE http://localhost:15672/api/shovels/vhost/%2f/my-shovel/restart

使用管理 UI

  • 导航到 Admin > Shovel 状态
  • 在表格中找到感兴趣的 Shovel
  • 单击 重启 并等待下一次 UI 刷新

删除 Shovel

使用 CLI 工具

要使用 CLI 工具删除 Shovel,请使用 rabbitmqctl clear_parameter 并传递 shovel 作为组件名称和应删除的 Shovel 的名称

rabbitmqctl clear_parameter shovel "my-shovel"

使用 HTTP API

DELETE /api/parameters/shovel/{vhost}/{name} 是可用于删除 Shovel 的端点。

{vhost} 是 Shovel 正在运行的虚拟主机,{name} 是要删除的 Shovel 的名称。 两个值都必须进行百分比编码。

# Note: this user's access is limited to localhost!
curl -v -u guest:guest -X DELETE http://localhost:15672/api/parameters/shovel/%2f/my-shovel

AMQP 0-9-1 Shovel 定义参考

以上示例中未介绍几个 Shovel 属性。 它们不会从根本上改变动态 Shovel 的工作方式,也不会改变声明过程。

可选的动态 Shovel 定义设置 (AMQP 0-9-1)
描述
reconnect-delay

在任一端断开连接后,重新连接到 Broker 之前等待的持续时间(以秒为单位)。 默认为 1。

ack-mode

确定 Shovel 应如何 确认 消费的消息。 有效值为 on-confirmon-publishno-ack。 默认使用 on-confirm

如果设置为 on-confirm(默认值),则在目标确认消息后,将向源 Broker 确认 消息。 这可以处理网络错误和 Broker 故障而不会丢失消息,并且是最慢的选项。

如果设置为 on-publish,则在消息已在目标发布(但尚未确认)后,将向源 Broker 确认 消息。 在发生网络或 Broker 故障时,消息可能会丢失。

如果设置为 no-ack,将使用 自动消息确认。 此选项将提供最高的吞吐量,但不安全(在发生网络或 Broker 故障时会丢失消息)。

src-delete-after

确定 Shovel 何时(如果曾经)应删除自身。 如果将 Shovel 视为更像移动操作(即,用于临时将消息从一个队列移动到另一个队列),这将非常有用。

默认值为 never,表示 Shovel 永远不应删除自身。

如果设置为 queue-length,则 Shovel 将在启动时测量源队列的长度,并在传输了这么多消息后删除自身。

如果设置为整数,则 Shovel 将在删除自身之前传输该数量的消息。

src-prefetch-count

任何时候通过 Shovel 复制的最大未确认消息数。 默认为 1000

src-exchange

要从中消费的交换机。 必须设置此项或 src-queue(但不能同时设置)。

Shovel 将声明一个独占队列,并使用 src-exchange-key 将其绑定到命名的交换机,然后再从队列中消费。

如果源交换机在源 Broker 上不存在,则不会声明它; Shovel 将无法启动。

src-exchange-key

使用 src-exchange 时的路由键。

src-consumer-args

消费者参数,例如 x-single-active-consumerx-stream-offset

dest-exchange

消息应发布到的交换机。 可以设置此项或 dest-queue(但不能同时设置)。

如果目标交换机在目标 Broker 上不存在,则不会声明它; Shovel 将无法启动。

dest-exchange-key

使用 dest-exchange 时的路由键。 如果未设置,将使用原始消息的路由键。

dest-publish-properties

覆盖 Shovel 消息时要覆盖的属性的映射(JSON 对象)。 当前不支持以此方式设置标头。 默认为

dest-add-forward-headers

是否将 x-shovelled 标头添加到 Shovel 消息,指示它们从何处 Shovel 以及 Shovel 到何处。 默认为 false。

dest-add-timestamp-header

是否将 x-shovelled-timestamp 标头添加到 Shovel 消息,其中包含消息被 Shovel 时的 Unix 时间戳(以秒为单位)。 默认为 false。

AMQP 1.0 Shovel 定义参考

AMQP 1.0 源和目标属性与其 AMQP 0-9-1 对应项有一些差异。

可选的动态 Shovel 定义设置 (AMQP 1.0)
描述
src-uri

源的 AMQP URI。 强制性。 AMQP 1.0 URI 实现了 AMQP URI 参考 中描述的子集。 AMQP 1.0 中没有 虚拟主机 概念,因此不支持 URI 路径段。 它支持的查询参数集与 AMQP 0.9.1 URI(s) 不同

idle_time_out
心跳间隔
hostname

此字段指示连接目标。 这到底意味着什么取决于目标 AMQP 1.0 Broker。

例如,即使 Azure ServiceBus 与 URI 中的主机段相同,也需要设置此项。

对于 RabbitMQ,如果目标 虚拟主机 与默认虚拟主机不同,则可以使用此字段指定目标虚拟主机。 为此,请将此查询参数设置为 vhost:{name},例如 vhost:example-vhost

sasl

anonnoneplain 默认为:none。 使用 plain 时,需要设置 URI 的用户和密码段。

cacertfile, certfile, keyfile

客户端 TLS 证书和私钥路径。 有关详细信息,请参阅 TLS 指南。 仅当 URI 方案为 amqps 时才有用。

verify, fail_if_no_peer_cert

用于启用或禁用服务器 TLS 证书的对等方验证。 请参阅 TLS 指南 以了解更多信息。 仅当 URI 方案为 amqps 时才有用。

重要提示

请注意,从 Erlang 26 开始,默认情况下为 TLS 客户端(例如 Shovel)启用对等方验证。

src-address

AMQP 1.0 链接地址。 强制性。

dest-address

AMQP 1.0 链接地址。 强制性。

src-prefetch-count

任何时候通过 Shovel 复制的最大未确认消息数。 默认为 1000

dest-properties

Shovel 消息时要覆盖的属性。 有关所有可能的属性的详细信息,请参阅 AMQP 1.0 规范 §3.2.4。

dest-application-properties

Shovel 消息时要设置的应用程序属性。

dest-add-forward-headers

是否将 x-shovelled 应用程序属性添加到 Shovel 消息,指示它们从何处 Shovel 以及 Shovel 到何处。 默认为 false。

dest-add-timestamp-header

是否将 creation_time 标头设置为消息重新发布时的 Unix 时间戳(以毫秒为单位)。 默认为 false。

reconnect-delay

在任一端断开连接后,重新连接到 Broker 之前等待的持续时间(以秒为单位)。 默认为 1。

ack-mode

确定 Shovel 应如何 确认 消费的消息。 有效值为 on-confirmon-publishno-ack。 默认使用 on-confirm

如果设置为 on-confirm(默认值),则在目标确认消息后,将向源 Broker 确认 消息。 这可以处理网络错误和 Broker 故障而不会丢失消息,并且是最慢的选项。

如果设置为 on-publish,则在消息已在目标发布(但尚未确认)后,将向源 Broker 确认 消息。 在发生网络或 Broker 故障时,消息可能会丢失。

如果设置为 no-ack,将使用 自动消息确认。 此选项将提供最高的吞吐量,但不安全(在发生网络或 Broker 故障时会丢失消息)。

src-delete-after

确定 Shovel 何时(如果曾经)应删除自身。 如果将 Shovel 视为更像移动操作(即,用于临时将消息从一个队列移动到另一个队列),这将非常有用。

默认值为 never,表示 Shovel 永远不应删除自身。

如果设置为 queue-length,则 Shovel 将在启动时测量源队列的长度,并在传输了这么多消息后删除自身。

如果设置为整数,则 Shovel 将在删除自身之前传输该数量的消息。

监控 Shovel

请参阅 Shovel 插件概述指南中的 监控 Shovel

© . All rights reserved.