配置动态 Shovel
概述
本指南重点介绍动态配置的 Shovel。它假设您已熟悉 Shovel 插件背后的关键概念。
与静态 Shovel 不同,动态 Shovel 是使用运行时参数进行配置的。它们可以随时启动和停止,包括通过编程方式。动态 Shovel 可用于瞬态(一次性)和永久运行的工作负载。
关于动态 Shovel 的信息存储在 RabbitMQ 的模式数据库中,与用户、权限、队列等存储在一起。因此,它们可以与其它模式定义一起导出,并结合预声明拓扑模式使用。
配置
动态 Shovel 可以使用 rabbitmqadmin、rabbitmqctl、通过 HTTP 管理 API 进行定义,或者(在启用 rabbitmq_shovel_management 插件的情况下)通过管理界面的管理部分进行定义。
Shovel 通过一个定义主体(JSON 对象)进行声明。某些键是强制性的,有些是可选的。它们控制连接参数、使用的协议、消息传输源和目的地、数据安全协议特性等。
每个 Shovel 都属于一个虚拟主机(vhost)。请注意,Shovel 不仅可以从不同的虚拟主机消费消息并发布到不同的虚拟主机,还可以跨越完全不同的集群,因此虚拟主机的选择主要起组织 Shovel 和管理访问权限的作用,这与 RabbitMQ 中其余的权限控制方式非常相似。
每个 Shovel 都有一个名称。该名称用于在检查状态、删除或重启 Shovel 时识别它们。
声明动态 Shovel
在此示例中,我们将设置一个动态 Shovel,它将使用 AMQP 0-9-1 将消息从本地 RabbitMQ 集群中的队列 "source-queue" 移动到远程 RabbitMQ 节点上的队列 "target-queue"。
使用 CLI 工具
Shovel 可以使用 CLI 工具或 HTTP API 进行声明。
使用 rabbitmqctl 时,Shovel 通过 set_parameter 命令声明,该命令会声明一个[运行时参数](./parameters)。rabbitmqadmin 包含在 shovels 组下声明 Shovel 的专用命令,即 shovels declare_amqp091 和 shovels declare_amqp10。
以下是使用 CLI 工具声明动态 Shovel 的示例
- 使用 rabbitmqctl (bash)
- 使用 rabbitmqadmin (bash)
- 使用 rabbitmqctl (PowerShell)
- 使用 PowerShell 的 rabbitmqadmin.exe
# my-shovel here is the name of the shovel
rabbitmqctl set_parameter shovel my-shovel \
'{"src-protocol": "amqp091", "src-uri": "amqp://", "src-queue": "source-queue", "dest-protocol": "amqp091", "dest-uri": "amqp://remote-server", "dest-queue": "target-queue", "dest-queue-args": {"x-queue-type": "quorum"}}'
# Declare an AMQP 0-9-1 shovel using rabbitmqadmin
rabbitmqadmin shovels declare_amqp091 --name my-shovel \
--source-uri "amqp://" \
--destination-uri "amqp://remote-server" \
--source-queue "source-queue" \
--destination-queue "target-queue"
rabbitmqctl.bat set_parameter shovel my-shovel ^
"{""src-protocol"": ""amqp091"", ""src-uri"":""amqp://"", ""src-queue"": ""source-queue"", ^
""dest-protocol"": ""amqp091"", ""dest-uri"": ""amqp://remote.rabbitmq.local"", ^
""dest-queue"": ""target-queue"", ""dest-queue-args"": {""x-queue-type"": ""quorum""}}"
# Declare an AMQP 0-9-1 shovel using rabbitmqadmin
rabbitmqadmin.exe shovels declare_amqp091 --name my-shovel ^
--source-uri "amqp://" ^
--destination-uri "amqp://remote-server" ^
--source-queue "source-queue" ^
--destination-queue "target-queue"
以下是声明 AMQP 1.0 Shovel 的一些示例
- 使用 rabbitmqctl (bash)
- 使用 rabbitmqadmin (bash)
- 使用 rabbitmqctl (PowerShell)
- 使用 PowerShell 的 rabbitmqadmin.exe
# AMQP 1.0 shovel using rabbitmqctl set_parameter
rabbitmqctl set_parameter shovel my-amqp10-shovel \
'{"src-protocol": "amqp10", "src-uri": "amqp://username:password@source-server", "src-address": "/queues/source-queue", "dest-protocol": "amqp10", "dest-uri": "amqp://username:password@dest-server", "dest-address": "/queues/target-queue"}'
# Declare an AMQP 1.0 shovel using rabbitmqadmin
rabbitmqadmin shovels declare_amqp10 --name my-amqp10-shovel \
--source-uri "amqp://username:password@source-server" \
--destination-uri "amqp://username:password@dest-server" \
--source-address "/queues/source-queue" \
--destination-address "/queues/target-queue"
# AMQP 1.0 shovel using rabbitmqctl set_parameter
rabbitmqctl.bat set_parameter shovel my-amqp10-shovel ^
"{""src-protocol"": ""amqp10"", ""src-uri"": ""amqp://username:password@source-server"", ""src-address"": ""/queues/source-queue"", ""dest-protocol"": ""amqp10"", ""dest-uri"": ""amqp://username:password@dest-server"", ""dest-address"": ""/queues/target-queue""}"
# Declare an AMQP 1.0 shovel using rabbitmqadmin
rabbitmqadmin.exe shovels declare_amqp10 --name my-amqp10-shovel ^
--source-uri "amqp://username:password@source-server" ^
--destination-uri "amqp://username:password@dest-server" ^
--source-address "/queues/source-queue" ^
--destination-address "/queues/target-queue"
这些示例中的运行时参数定义(主体)包含几个键
| 键 | 描述 |
| src-uri | 源连接 URI。强制项。有关 RabbitMQ 如何处理 AMQP URI 的一般信息,请参阅 AMQP URI 参考。
此参数的值可以是字符串,也可以是字符串列表。如果提供了多个字符串,Shovel 将从列表中随机选择**一个** URI,直到其中一个端点连接成功。 |
| src-protocol | 连接到源时使用的协议。可以是 |
| src-queue | Shovel 将从中消费的源队列。这是必须消费的队列。此项或 如果源队列在目标虚拟主机上不存在,且未提供 Shovel 可以使用预声明拓扑,而不是声明源。请参阅下方的预声明拓扑部分。 |
| src-queue-args |
|
| src-consumer-name | 可选的消费者名称。当 如果未指定,则使用随机值。 |
| dest-uri | 与上述 |
| dest-protocol | 连接到目的地时使用的协议。可以是 |
| dest-queue | 消息应发布到的队列。此项或 如果目标队列在目标虚拟主机中不存在,且未提供 Shovel 可以使用预声明拓扑,而不是声明目的地。请参阅下方的预声明拓扑部分。 |
| dest-queue-args |
|
本指南稍后将介绍其它 Shovel 定义键。
预声明拓扑
Shovel 可以使用预声明拓扑,而不是声明其源和目的地。
例如,当拓扑在引导时从定义文件导入时,这可能是必要的。使用预声明拓扑可以避免定义导入与 Shovel 插件启动之间的“先有鸡还是先有蛋”的问题:必须启用该插件,定义才能通过(运行时参数部分的)验证。
插件可以通过使用 rabbitmq.conf 配置为等待直到源可用
# all shovels started on this node will use pre-declared topology
shovel.topology.predeclared = true
如果只有部分 Shovel 需要使用预声明拓扑,则可以使用以下 Shovel 属性为特定 Shovel 配置相同的行为
| 键 | 描述 |
| src-predeclared | 当设置为 |
| dest-predeclared | 当设置为 |
使用 HTTP API
要使用 HTTP API 声明 Shovel,请确保已启用 管理插件,然后使用以下端点
PUT /api/parameters/shovel/{vhost}/{name}
其中 {vhost} 是应启动 Shovel 的虚拟主机,{name} 是新 Shovel 的名称。该端点要求调用它的用户拥有 policymaker 特权(标签)。
请求主体是一个 JSON 文档,其结构与本指南前面描述的类似
{
"value": {
"src-protocol": "amqp091",
"src-uri": "amqp://",
"src-queue": "source-queue",
"dest-protocol": "amqp091",
"dest-uri": "amqp://remote.rabbitmq.local",
"dest-queue": "destination-queue"
}
}
下面是一个使用 curl 通过默认用户凭据在本地节点上声明 Shovel 的示例。该 Shovel 将在默认虚拟主机中的两个队列 "source-queue" 和 "destination-queue" 之间传输消息。
请注意,如果针对远程节点调用,此确切命令将会失败。请添加一个标记为 policymaker 的新用户来进行实验。
# Note: this user's access is limited to localhost!
curl -v -u guest:guest -X PUT https://:15672/api/parameters/shovel/%2f/my-shovel \
-H "content-type: application/json" \
-d @- <<EOF
{
"value": {
"src-protocol": "amqp091",
"src-uri": "amqp://",
"src-queue": "source-queue",
"dest-protocol": "amqp091",
"dest-uri": "amqp://",
"dest-queue": "destination-queue"
}
}
EOF
使用管理 UI
要使用管理 UI 声明 Shovel,请首先确保已启用 管理插件。
然后
- 导航至
Admin>Shovel Management>Add a new shovel - 使用本指南前面介绍的 Shovel 参数填写表单
- 点击 Add shovel
检查动态 Shovel 的状态
使用 CLI 工具
使用 rabbitmqctl shovel_status 来检查集群中的动态 Shovel。在执行此命令的主机上,必须启用 rabbitmq_shovel 插件。
rabbitmqctl shovel_status --formatter=pretty_table
输出可以格式化为 JSON 并重定向到诸如 jq 的工具
rabbitmqctl shovel_status --formatter=json | jq
使用 HTTP API
GET /api/shovels 是一个可用于列出集群中动态 Shovel 的端点。该端点由 rabbitmq_shovel_management 插件提供,该插件必须在目标节点上启用。
# Note: this user's access is limited to localhost!
curl -v -u guest:guest -X GET https://:15672/api/shovels/
要检查特定虚拟主机中的 Shovel,请使用 GET /api/shovels/{vhost},{vhost} 为虚拟主机名称。该值必须进行百分号编码。
# Note: this user's access is limited to localhost!
curl -v -u guest:guest -X GET https://:15672/api/shovels/%2f
要检查特定 Shovel 的状态,请使用 GET /api/shovels/vhost/{vhost}/{name},{vhost} 是 Shovel 运行所在的虚拟主机,{name} 是 Shovel 的名称。两个值都必须进行百分号编码。
# Note: this user's access is limited to localhost!
curl -v -u guest:guest -X GET https://:15672/api/shovels/vhost/%2f/my-shovel
使用管理 UI
- 导航至
Admin>Shovel Status - 在表格中找到感兴趣的 Shovel
重启 Shovel
动态 Shovel 可以重启。重启 Shovel 会短暂中断其操作,并使其重新连接到源和目的地。当 Shovel 使用适当的确认模式时,这种中断是安全的:任何从源消费或发布到目的地但尚未确认(“传输中”)的消息,将在 Shovel 停止时自动重新入队,并在重启后再次消费。
使用 CLI 工具
使用 rabbitmqctl restart_shovel 并提供名称来重启 Shovel。在执行此命令的主机上,必须启用 rabbitmq_shovel 插件。
rabbitmqctl restart_shovel "my-shovel"
使用 HTTP API
DELETE /api/shovels/vhost/{vhost}/{name}/restart 是一个重启动态 Shovel 的端点。该端点由 rabbitmq_shovel_management 插件提供,该插件必须在目标节点上启用。
{vhost} 是 Shovel 运行所在的虚拟主机,{name} 是要重启的 Shovel 的名称。两个值都必须进行百分号编码。
# Note: this user's access is limited to localhost!
curl -v -u guest:guest -X DELETE https://:15672/api/shovels/vhost/%2f/my-shovel/restart
使用管理 UI
- 导航至
Admin>Shovel Status - 在表格中找到感兴趣的 Shovel
- 点击 Restart 并等待下一次 UI 刷新
删除 Shovel
使用 CLI 工具
要使用 CLI 工具删除 Shovel,请使用 rabbitmqctl clear_parameter,并将 shovel 作为组件名称,以及要删除的 Shovel 的名称
rabbitmqctl clear_parameter shovel "my-shovel"
使用 HTTP API
DELETE /api/parameters/shovel/{vhost}/{name} 是可用于删除 Shovel 的端点。
{vhost} 是 Shovel 运行所在的虚拟主机,{name} 是要删除的 Shovel 的名称。两个值都必须进行百分号编码。
# Note: this user's access is limited to localhost!
curl -v -u guest:guest -X DELETE https://:15672/api/parameters/shovel/%2f/my-shovel
AMQP 0-9-1 Shovel 定义参考
上述示例中还有一些未涵盖的 Shovel 属性。它们不会从根本上改变动态 Shovel 的工作方式,也不会改变声明过程。
| 键 | 描述 |
| reconnect-delay | 在任一端断开连接后,重新连接到代理之前等待的持续时间(以秒为单位)。默认值为 1。 |
| ack-mode | 确定 Shovel 应如何确认已消费的消息。有效值为 如果设置为 如果设置为 如果设置为 |
| src-delete-after | 确定 Shovel 何时(如果有的话)应自行删除。如果将 Shovel 视为一种移动操作(即用于按需将消息从一个队列移动到另一个队列),这非常有用。 默认值为 如果设置为 如果设置为一个整数,则 Shovel 将在传输该数量的消息后自行删除。此选项不能与设置为 |
| src-prefetch-count | 任何时候通过 Shovel 复制的未确认消息的最大数量。默认值为 |
| src-exchange | 要从中消费的交换机。此项或 Shovel 将声明一个独占队列,并使用 如果源交换机在源代理上不存在,它将不会被声明;Shovel 将无法启动。 |
| src-exchange-key | 使用 |
| src-consumer-args | 消费者参数,例如 |
| dest-exchange | 消息应发布到的交换机。此项或 如果目标交换机在目标代理上不存在,它将不会被声明;Shovel 将无法启动。 |
| dest-exchange-key | 使用 |
| dest-publish-properties | 在移动消息时要覆盖的属性映射(JSON 对象)。目前不支持通过这种方式设置头信息。默认值为 |
| dest-add-forward-headers | 是否向移动的消息添加 |
| dest-add-timestamp-header | 是否向移动的消息添加 |
AMQP 1.0 Shovel 定义参考
AMQP 1.0 源和目标属性与其 AMQP 0-9-1 对等项有一些不同。
| 键 | 描述 |
| src-uri | 源的 AMQP URI。强制项。AMQP 1.0 URI 实现了 AMQP URI 参考中所述内容的一个子集。AMQP 1.0 中没有虚拟主机概念,因此不支持 URI 路径段。它支持的查询参数集与 AMQP 0.9.1 URI 不同。
|
| src-address | AMQP 1.0 链接地址。强制项。 |
| dest-address | AMQP 1.0 链接地址。强制项。 |
| src-prefetch-count | 任何时候通过 Shovel 复制的未确认消息的最大数量。默认值为 |
| dest-properties | 在移动消息时要覆盖的属性。有关所有可能属性的详细信息,请参阅 AMQP 1.0 规范第 3.2.4 节。 |
| dest-application-properties | 在移动消息时要设置的应用程序属性。 |
| dest-add-forward-headers | 是否向移动的消息添加 |
| dest-add-timestamp-header | 是否将 |
| reconnect-delay | 在任一端断开连接后,重新连接到代理之前等待的持续时间(以秒为单位)。默认值为 1。 |
| ack-mode | 确定 Shovel 应如何确认已消费的消息。有效值为 如果设置为 如果设置为 如果设置为 |
| src-delete-after | 确定 Shovel 何时(如果有的话)应自行删除。如果将 Shovel 视为一种移动操作(即用于按需将消息从一个队列移动到另一个队列),这非常有用。 默认值为 如果设置为一个整数,则 Shovel 将在传输该数量的消息后自行删除。此选项不能与 |
本地 Shovel 定义参考
上述示例中还有一些未涵盖的 Shovel 属性。它们不会从根本上改变动态 Shovel 的工作方式,也不会改变声明过程。
| 键 | 描述 |
| reconnect-delay | 在任一端断开连接后,重新连接到代理之前等待的持续时间(以秒为单位)。默认值为 1。 |
| ack-mode | 确定 Shovel 应如何确认已消费的消息。有效值为 如果设置为 如果设置为 如果设置为 |
| src-delete-after | 确定 Shovel 何时(如果有的话)应自行删除。如果将 Shovel 视为一种移动操作(即用于按需将消息从一个队列移动到另一个队列),这非常有用。 默认值为 如果设置为 如果设置为一个整数,则 Shovel 将在传输该数量的消息后自行删除。此选项不能与 |
| src-prefetch-count | 任何时候通过 Shovel 复制的未确认消息的最大数量。默认值为 |
| src-exchange | 要从中消费的交换机。此项或 Shovel 将声明一个独占队列,并使用 如果源交换机在源代理上不存在,它将不会被声明;Shovel 将无法启动。 |
| src-exchange-key | 使用 |
| src-consumer-args | 消费者参数,例如 |
| dest-exchange | 消息应发布到的交换机。此项或 如果目标交换机在目标代理上不存在,它将不会被声明;Shovel 将无法启动。 |
| dest-exchange-key | 使用 |
| dest-add-forward-headers | 是否向移动的消息添加 |
| dest-add-timestamp-header | 是否向移动的消息添加 |
监控 Shovel
请参阅 Shovel 插件总览指南中的 监控 Shovel。