配置动态 Shovels
概述
本指南重点介绍动态配置的 Shovels。它假定您熟悉 Shovel 插件背后的关键概念。
与静态 Shovels 不同,动态 Shovels 是使用 运行时参数 配置的。它们可以随时启动和停止,包括以编程方式。动态 Shovels 可用于瞬时(一次性)和永久运行的工作负载。
动态 Shovels 的信息与用户、权限、队列等一起存储在 RabbitMQ 的 schema 数据库中。因此,它们可以与其他 schema 定义一起导出,并结合 预声明拓扑模式。
配置
动态 Shovels 可以使用 rabbitmqadmin、rabbitmqctl、管理 HTTP API 或(在 启用 rabbitmq_shovel_management 插件后)通过管理 UI 的管理部分来定义。
Shovel 使用定义体(一个 JSON 对象)来声明。其中一些键是必需的,另一些是可选的。它们控制连接参数、使用的协议、消息传输的源和目标、数据安全协议功能等。
每个 Shovel 都属于一个虚拟主机。请注意,Shovel 不仅可以从不同虚拟主机消费并发布到不同虚拟主机,甚至可以消费和发布到完全不同的集群,因此虚拟主机的选择主要充当组织 Shovels 和访问它们的手段,就像 RabbitMQ 中其他 权限一样。
每个 Shovel 都有一个名称。当 检查其状态、删除它们或 重启它们时,可以使用该名称来标识 Shovels。
声明一个动态 Shovel
在此示例中,我们将设置一个动态 Shovel,它将使用 AMQP 0-9-1 将消息从本地 RabbitMQ 集群中的 "source-queue" 队列移动到远程 RabbitMQ 节点上的 "target-queue" 队列。
使用 CLI 工具
可以使用 CLI 工具或 HTTP API 来声明 Shovel。
使用 rabbitmqctl 时,使用 set_parameter 命令声明 Shovel,该命令声明一个 [运行时参数] (./parameters)。rabbitmqadmin 包含用于在 shovels 组下声明 Shovels 的专用命令,即 shovels declare_amqp091 和 shovels declare_amqp10。
以下是使用 CLI 工具声明动态 Shovel 的示例
- 使用 rabbitmqctl (bash)
- 使用 rabbitmqadmin (bash)
- 使用 rabbitmqctl (PowerShell)
- 使用 PowerShell 的 rabbitmqadmin.exe
# my-shovel here is the name of the shovel
rabbitmqctl set_parameter shovel my-shovel \
'{"src-protocol": "amqp091", "src-uri": "amqp://", "src-queue": "source-queue", "dest-protocol": "amqp091", "dest-uri": "amqp://remote-server", "dest-queue": "target-queue", "dest-queue-args": {"x-queue-type": "quorum"}}'
# Declare an AMQP 0-9-1 shovel using rabbitmqadmin
rabbitmqadmin shovels declare_amqp091 --name my-shovel \
--source-uri "amqp://" \
--destination-uri "amqp://remote-server" \
--source-queue "source-queue" \
--destination-queue "target-queue"
rabbitmqctl.bat set_parameter shovel my-shovel ^
"{""src-protocol"": ""amqp091"", ""src-uri"":""amqp://"", ""src-queue"": ""source-queue"", ^
""dest-protocol"": ""amqp091"", ""dest-uri"": ""amqp://remote.rabbitmq.local"", ^
""dest-queue"": ""target-queue"", ""dest-queue-args"": {""x-queue-type"": ""quorum""}}"
# Declare an AMQP 0-9-1 shovel using rabbitmqadmin
rabbitmqadmin.exe shovels declare_amqp091 --name my-shovel ^
--source-uri "amqp://" ^
--destination-uri "amqp://remote-server" ^
--source-queue "source-queue" ^
--destination-queue "target-queue"
以下是一些声明 AMQP 1.0 Shovels 的示例
- 使用 rabbitmqctl (bash)
- 使用 rabbitmqadmin (bash)
- 使用 rabbitmqctl (PowerShell)
- 使用 PowerShell 的 rabbitmqadmin.exe
# AMQP 1.0 shovel using rabbitmqctl set_parameter
rabbitmqctl set_parameter shovel my-amqp10-shovel \
'{"src-protocol": "amqp10", "src-uri": "amqp://username:password@source-server", "src-address": "/queues/source-queue", "dest-protocol": "amqp10", "dest-uri": "amqp://username:password@dest-server", "dest-address": "/queues/target-queue"}'
# Declare an AMQP 1.0 shovel using rabbitmqadmin
rabbitmqadmin shovels declare_amqp10 --name my-amqp10-shovel \
--source-uri "amqp://username:password@source-server" \
--destination-uri "amqp://username:password@dest-server" \
--source-address "/queues/source-queue" \
--destination-address "/queues/target-queue"
# AMQP 1.0 shovel using rabbitmqctl set_parameter
rabbitmqctl.bat set_parameter shovel my-amqp10-shovel ^
"{""src-protocol"": ""amqp10"", ""src-uri"": ""amqp://username:password@source-server"", ""src-address"": ""/queues/source-queue"", ""dest-protocol"": ""amqp10"", ""dest-uri"": ""amqp://username:password@dest-server"", ""dest-address"": ""/queues/target-queue""}"
# Declare an AMQP 1.0 shovel using rabbitmqadmin
rabbitmqadmin.exe shovels declare_amqp10 --name my-amqp10-shovel ^
--source-uri "amqp://username:password@source-server" ^
--destination-uri "amqp://username:password@dest-server" ^
--source-address "/queues/source-queue" ^
--destination-address "/queues/target-queue"
这些示例中的运行时参数定义(body)包含几个键
| 键 | 描述 |
| src-uri | 源连接 URI。必需。有关 RabbitMQ 如何处理 AMQP URI 的一般信息,请参阅 AMQP URI 参考。
此参数的值可以是字符串,也可以是字符串列表。如果提供了多个字符串,Shovel 将从列表中随机选择一个 URI,直到其中一个端点成功。 |
| src-protocol | 连接源时使用的协议。可以是 |
| src-queue | Shovel 将从中消耗的源队列。必须设置此项或 如果源队列在目标虚拟主机中不存在,并且未提供 Shovels 可以使用预声明的拓扑,而不是声明源。请参阅下面的 预声明拓扑 部分。 |
| src-queue-args |
|
| dest-uri | 与上面的 |
| dest-protocol | 连接目标时使用的协议。可以是 |
| dest-queue | 应将消息发布到的队列。必须设置此项或 如果目标队列在目标虚拟主机中不存在,并且未提供 Shovels 可以使用预声明的拓扑,而不是声明目标。请参阅下面的 预声明拓扑 部分。 |
| dest-queue-args |
|
还有其他 Shovel 定义键将在本指南的后面介绍。
预声明拓扑
Shovels 可以使用预声明的拓扑,而不是声明其源和目标。
例如,当拓扑 在启动时从定义文件导入时,这可能是必需的。使用预声明的拓扑可以避免定义导入和 Shovel 插件启动之间的先有鸡还是先有蛋的问题:必须启用插件才能使定义通过验证(针对运行时参数部分)。
以下是插件如何配置为 使用 rabbitmq.conf 等待源可用
# all shovels started on this node will use pre-declared topology
shovel.topology.predeclared = true
如果只有某些 Shovels 需要使用预声明的拓扑,可以使用以下 Shovel 属性为特定 Shovels 配置相同的行为
| 键 | 描述 |
| src-predeclared | 当设置为 |
| dest-predeclared | 当设置为 |
使用 HTTP API
要使用 HTTP API 声明 Shovel,请确保已 启用管理 插件,然后使用以下端点
PUT /api/parameters/shovel/{vhost}/{name}
其中 {vhost} 是应启动 Shovel 的虚拟主机,{name} 是新 Shovel 的名称。该端点要求调用它的用户具有 policymaker 权限(标签)。
请求体是一个 JSON 文档,其结构与本指南前面描述的类似
{
"value": {
"src-protocol": "amqp091",
"src-uri": "amqp://",
"src-queue": "source-queue",
"dest-protocol": "amqp091",
"dest-uri": "amqp://remote.rabbitmq.local",
"dest-queue": "destination-queue"
}
}
下面是一个使用 curl 在本地节点上声明 Shovel 的示例,使用 默认用户凭据。Shovel 将在默认虚拟主机中的 "source-queue" 和 "destination-queue" 两个队列之间传输消息。
请注意,如果针对远程节点调用此确切命令,将会失败。请 添加一个标记为 policymaker 的新用户以供您自己进行实验。
# Note: this user's access is limited to localhost!
curl -v -u guest:guest -X PUT https://:15672/api/parameters/shovel/%2f/my-shovel \
-H "content-type: application/json" \
-d @- <<EOF
{
"value": {
"src-protocol": "amqp091",
"src-uri": "amqp://",
"src-queue": "source-queue",
"dest-protocol": "amqp091",
"dest-uri": "amqp://",
"dest-queue": "destination-queue"
}
}
EOF
使用管理 UI
要使用管理 UI 声明 Shovel,请首先确保已 启用管理 插件。
然后
- 导航到
Admin>Shovel Management>Add a new shovel - 使用本指南前面介绍的 Shovel 参数填写表单
- 点击 Add shovel
检查动态 Shovels 的状态
使用 CLI 工具
使用 rabbitmqctl shovel_status 检查集群中的动态 Shovels。必须在执行此命令的主机上启用 rabbitmq_shovel 插件。
rabbitmqctl shovel_status --formatter=pretty_table
输出可以格式化为 JSON,并重定向到像 jq 这样的工具
rabbitmqctl shovel_status --formatter=json | jq
使用 HTTP API
GET /api/shovels 是一个端点,可用于列出集群中的动态 Shovels。该端点由 rabbitmq_shovel_management 插件提供,该插件必须在目标节点上启用。
# Note: this user's access is limited to localhost!
curl -v -u guest:guest -X GET https://:15672/api/shovels/
要检查特定虚拟主机中的 Shovels,请使用 GET /api/shovels/{vhost}。{vhost} 是虚拟主机名。该值必须进行百分比编码。
# Note: this user's access is limited to localhost!
curl -v -u guest:guest -X GET https://:15672/api/shovels/%2f
要检查特定 Shovel 的状态,请使用 GET /api/shovels/vhost/{vhost}/{name}。{vhost} 是 Shovel 运行所在的虚拟主机,{name} 是 Shovel 的名称。两个值都必须进行百分比编码。
# Note: this user's access is limited to localhost!
curl -v -u guest:guest -X GET https://:15672/api/shovels/vhost/%2f/my-shovel
使用管理 UI
- 导航到
Admin>Shovel Status - 在表中找到感兴趣的 Shovel
重启 Shovel
动态 Shovel 可以重启。重启 Shovel 会短暂中断其操作,并使其重新连接到源和目标。当 Shovel 使用适当的 确认模式时,中断是安全的:从源消费的或发布到目标的任何未确认或未确认(“飞行中”)的消息将在 Shovel 停止时自动重新排队,并在重启后再次消费。
使用 CLI 工具
使用 rabbitmqctl restart_shovel 通过名称重启 Shovel。必须在执行此命令的主机上启用 rabbitmq_shovel 插件。
rabbitmqctl restart_shovel "my-shovel"
使用 HTTP API
DELETE /api/shovels/vhost/{vhost}/{name}/restart 是一个重启动态 Shovel 的端点。该端点由 rabbitmq_shovel_management 插件提供,该插件必须在目标节点上启用。
{vhost} 是 Shovel 运行所在的虚拟主机,{name} 是要重启的 Shovel 的名称。两个值都必须进行百分比编码。
# Note: this user's access is limited to localhost!
curl -v -u guest:guest -X DELETE https://:15672/api/shovels/vhost/%2f/my-shovel/restart
使用管理 UI
- 导航到
Admin>Shovel Status - 在表中找到感兴趣的 Shovel
- 点击 Restart 并等待 UI 下次刷新
删除 Shovel
使用 CLI 工具
要使用 CLI 工具删除 Shovel,请使用 rabbitmqctl clear_parameter,并将 shovel 作为组件名称,将要删除的 Shovel 的名称传递进去
rabbitmqctl clear_parameter shovel "my-shovel"
使用 HTTP API
DELETE /api/parameters/shovel/{vhost}/{name} 是可用于删除 Shovel 的端点。
{vhost} 是 Shovel 运行所在的虚拟主机,{name} 是要删除的 Shovel 的名称。两个值都必须进行百分比编码。
# Note: this user's access is limited to localhost!
curl -v -u guest:guest -X DELETE https://:15672/api/parameters/shovel/%2f/my-shovel
AMQP 0-9-1 Shovel 定义参考
有几个 Shovel 属性未在上述示例中涵盖。它们不从根本上改变动态 Shovels 的工作方式,也不改变声明过程。
| 键 | 描述 |
| reconnect-delay | 在任一端断开连接后,在重新连接到代理之前等待的秒数。默认为 1。 |
| ack-mode | 确定 Shovel 应如何确认已消费的消息。有效值为 如果设置为 如果设置为 如果设置为 |
| src-delete-after | 确定 Shovel 何时(如果根本不)应删除自身。如果 Shovel 被视为一种移动操作——即,用于临时将消息从一个队列移动到另一个队列——这可能会很有用。 默认值为 如果设置为 如果设置为整数,则 Shovel 将在删除自身之前传输该数量的消息。此选项不能与 |
| src-prefetch-count | 一次通过 Shovel 复制的未确认消息的最大数量。默认值为 |
| src-exchange | 应从中消耗的 exchange。必须设置此项或 Shovel 将声明一个独占队列,并将其与 如果源代理上不存在源 exchange,则不会声明它;Shovel 将无法启动。 |
| src-exchange-key | 使用 |
| src-consumer-args | 消费者参数,例如 |
| dest-exchange | 应将消息发布到的 exchange。必须设置此项或 如果目标代理上不存在目标 exchange,则不会声明它;Shovel 将无法启动。 |
| dest-exchange-key | 使用 |
| dest-publish-properties | 在 Shovel 消息时要覆盖的属性的映射(JSON 对象)。目前不支持通过此方式设置标头。默认值是 |
| dest-add-forward-headers | 是否将 |
| dest-add-timestamp-header | 是否将 |
AMQP 1.0 Shovel 定义参考
AMQP 1.0 的源和目标属性与其 AMQP 0-9-1 的对应属性存在一些差异。
| 键 | 描述 |
| src-uri | 源的 AMQP URI。必需。AMQP 1.0 URI 实现的是 AMQP URI 参考 中所述内容的一个子集。AMQP 1.0 中不存在虚拟主机概念,因此不支持 URI 路径段。它支持的查询参数集与 AMQP 0.9.1 URI 不同。
|
| src-address | AMQP 1.0 链接地址。必需。 |
| dest-address | AMQP 1.0 链接地址。必需。 |
| src-prefetch-count | 一次通过 Shovel 复制的未确认消息的最大数量。默认值为 |
| dest-properties | 在 Shovel 消息时要覆盖的属性。有关所有可能属性的详细信息,请参阅 AMQP 1.0 规范 §3.2.4。 |
| dest-application-properties | 在 Shovel 消息时要设置的应用程序属性。 |
| dest-add-forward-headers | 是否将 |
| dest-add-timestamp-header | 是否将 |
| reconnect-delay | 在任一端断开连接后,在重新连接到代理之前等待的秒数。默认为 1。 |
| ack-mode | 确定 Shovel 应如何确认已消费的消息。有效值为 如果设置为 如果设置为 如果设置为 |
| src-delete-after | 确定 Shovel 何时(如果根本不)应删除自身。如果 Shovel 被视为一种移动操作——即,用于临时将消息从一个队列移动到另一个队列——这可能会很有用。 默认值为 如果设置为整数,则 Shovel 将在删除自身之前传输该数量的消息。此选项不能与 |
本地 Shovel 定义参考
有几个 Shovel 属性未在上述示例中涵盖。它们不从根本上改变动态 Shovels 的工作方式,也不改变声明过程。
| 键 | 描述 |
| reconnect-delay | 在任一端断开连接后,在重新连接到代理之前等待的秒数。默认为 1。 |
| ack-mode | 确定 Shovel 应如何确认已消费的消息。有效值为 如果设置为 如果设置为 如果设置为 |
| src-delete-after | 确定 Shovel 何时(如果根本不)应删除自身。如果 Shovel 被视为一种移动操作——即,用于临时将消息从一个队列移动到另一个队列——这可能会很有用。 默认值为 如果设置为 如果设置为整数,则 Shovel 将在删除自身之前传输该数量的消息。此选项不能与 |
| src-prefetch-count | 一次通过 Shovel 复制的未确认消息的最大数量。默认值为 |
| src-exchange | 应从中消耗的 exchange。必须设置此项或 Shovel 将声明一个独占队列,并将其与 如果源代理上不存在源 exchange,则不会声明它;Shovel 将无法启动。 |
| src-exchange-key | 使用 |
| src-consumer-args | 消费者参数,例如 |
| dest-exchange | 应将消息发布到的 exchange。必须设置此项或 如果目标代理上不存在目标 exchange,则不会声明它;Shovel 将无法启动。 |
| dest-exchange-key | 使用 |
| dest-add-forward-headers | 是否将 |
| dest-add-timestamp-header | 是否将 |
监控 Shovels
请参阅 Shovel 插件概述指南中的 监控 Shovels。