配置静态 Shovel
概述
本指南重点介绍静态配置的 Shovel。它假设您已经熟悉 Shovel 插件背后的关键概念。
与动态 Shovel 不同,静态 Shovel 是使用高级配置文件进行配置的。它们在节点启动时运行,主要适用于永久运行的工作负载。对静态 Shovel 配置的任何更改都需要重启节点,这使得它们非常不灵活。
大多数用户应优先选择动态 Shovel,而非静态 Shovel,因为前者具有灵活性且易于自动化。生成动态 Shovel 定义(JSON 文档)通常比生成静态 Shovel 定义(使用 Erlang 术语)更容易。
配置
Shovel 插件的配置必须定义在高级配置文件中。
它包含一个单独的 shovels 子句,列出了应在节点启动时启动的 Shovel。
{rabbit, [
%% ...
]},
{rabbitmq_shovel, [
{shovels, [
{shovel_one, [
%% shovel_one properties ...
]},
%% ...
]}
]}
下面提供了一个(刻意详细的)示例配置。
shovels 子句中的每个元素都是一个命名静态 Shovel。列表中的名称必须是唯一的。
Shovel 定义在顶层看起来如下所示
{shovel_name, [
{source, [
%% protocol-specific source configuration goes here
]},
{destination, [
%% protocol-specific destination configuration goes here
]},
%% 'confirm' is the default acknowledgement mode
{ack_mode, confirm},
%% reconnect with a 5 second delay
{reconnect_delay, 5}
]}
其中 shovel_name 是 Shovel 的名称(Erlang 原子)。如果名称不是以小写字母开头,或者包含字母数字、下划线 (_) 或 @ 以外的字符,则应使用单引号 (') 将其括起来。
源与目标
Shovel 将消息从源传输到目标。
source 和 destination 键是强制性的,并包含嵌套的协议特定键。目前支持 AMQP 0.9.1 和 AMQP 1.0 两种协议。源和目标不必使用相同的协议。所有其他属性都是可选的。
source 是一个强制键,针对不同协议具有不同的属性键。所有协议共有两个属性:protocol 和 uris。protocol 支持三个值:amqp091、amqp10 和 local,分别用于 AMQP 0-9-1、AMQP 1.0 和本地 Shovel。
%% for AMQP 0-9-1
{protocol, amqp091}
uris 是一个AMQP 连接 URI 列表。
{uris, [
"amqp://fred:secret@host1.domain/my_vhost",
"amqp://john:secret@host2.domain/my_vhost"
]}
URI 语法已扩展为包含查询部分,以允许配置额外的连接参数。请参阅查询参数参考,了解静态 Shovel 可用的参数,例如 TLS 证书和私钥。
常规源键
某些键由 AMQP 0-9-1、AMQP 1.0 和本地源支持。它们在下表中描述。
| 键 | 描述 |
| reconnect-delay | 在任一端断开连接后,等待重新连接到代理的时间(以秒为单位)。默认值为 1。 将在失败后延迟五秒再重新连接。值为 |
| ack-mode | 决定了 Shovel 应如何确认 (acknowledge) 已消费的消息。有效值为 如果设置为 如果设置为 如果设置为 |
AMQP 0-9-1 源键
AMQP 0-9-1 特定源键在单独的表格中介绍。
| 键 | 描述 |
| declarations | Shovel 在开始传输消息之前执行的可选 AMQP 0-9-1 操作列表。它们通常用于设置拓扑结构。 这些声明遵循 RabbitMQ Erlang 客户端使用的方法和属性名称。 一个极简的声明示例 将首先声明一个匿名队列,然后将其绑定到名为 声明列表的每个元素要么是一个以单引号原子表示的 AMQP 0-9-1 方法(例如 如果仅使用方法名称,则所有参数都将采用其默认值(如上面的 如果提供了元组和属性列表,则列表中的属性会显式指定部分或全部参数。 这是另一个示例 将声明一个名为 " |
| queue | 源队列的名称,作为 Erlang 二进制值。此属性是强制性的。
此队列必须存在。使用上述 另请参阅下文的预声明拓扑 (Predeclared Topology) 部分。 |
| prefetch-count | Shovel 同时复制的未确认消息的最大数量。默认值为 |
预声明拓扑
declarations 属性通常用于设置拓扑。至少,它必须设置源队列。
在某些部署场景中,拓扑是在启动时从定义文件自动导入的。在这些场景中,我们可以通过在 rabbitmq.conf 文件中添加以下行来配置插件,使其等待直到队列可用。
shovel.topology.predeclared = true
使用上述配置,如果静态 Shovel 没有 declarations 属性或者该属性为空,插件将一直等待,直到源的 queue 最终被声明。
AMQP 1.0 源键
AMQP 1.0 源设置与 AMQP 0-9-1 源不同。
| 键 | 描述 |
| source_address | 这代表 AMQP 1.0 链路的源地址。此键是强制性的。 |
| prefetch-count | 此可选键设置将授予接收链路的链路信用额度 (link credit)。当信用额度低于该值的 1/10 时,它将自动续订。默认值为 1000。其形式如下 |
本地 Shovel 源键
本地 Shovel 的特定源键在单独的表格中介绍。
| 键 | 描述 |
| declarations | Shovel 在开始传输消息之前执行的可选 AMQP 0-9-1 操作列表。它们通常用于设置拓扑结构。 这些声明遵循 RabbitMQ Erlang 客户端使用的方法和属性名称。 一个极简的声明示例 将首先声明一个匿名队列,然后将其绑定到名为 声明列表的每个元素要么是一个以单引号原子表示的 AMQP 0-9-1 方法(例如 如果仅使用方法名称,则所有参数都将采用其默认值(如上面的 如果提供了元组和属性列表,则列表中的属性会显式指定部分或全部参数。 这是另一个示例 将声明一个名为 " |
| queue | 源队列的名称,作为 Erlang 二进制值。此属性是强制性的。
此队列必须存在。使用上述 另请参阅下文的预声明拓扑部分。 |
| prefetch-count | Shovel 同时复制的未确认消息的最大数量。默认值为 |
预声明拓扑
declarations 属性通常用于设置拓扑。至少,它必须设置源队列。
在某些部署场景中,拓扑是在启动时从定义文件自动导入的。在这些场景中,我们可以通过在 rabbitmq.conf 文件中添加以下行来配置插件,使其等待直到队列可用。
shovel.topology.predeclared = true
使用上述配置,如果静态 Shovel 没有 declarations 属性或者该属性为空,插件将一直等待,直到源的 queue 最终被声明。
目标
destination 是一个强制键,针对不同协议具有不同的属性键。所有协议共有两个属性:protocol 和 uris。protocol 支持三个值:amqp091、amqp10 和 local,分别用于 AMQP 0-9-1、AMQP 1.0 和本地 Shovel。
%% for AMQP 0-9-1
{protocol, amqp091}
uris 是一个AMQP 连接 URI 列表。
{uris, [
"amqp://fred:secret@host1.domain/my_vhost",
"amqp://john:secret@host2.domain/my_vhost"
]}
URI 语法已扩展为包含查询部分,以允许配置额外的连接参数。请参阅查询参数参考,了解静态 Shovel 可用的参数,例如 TLS 证书和私钥。
常规目标键
| 键 | 描述 |
| reconnect-delay | 在任一端断开连接后,等待重新连接到代理的时间(以秒为单位)。默认值为 1。 将在失败后延迟五秒再重新连接。值为 |
AMQP 0-9-1 目标键
| 键 | 描述 |
| publish_properties | 此可选键控制由 Shovel 设置或覆盖的消息属性。其形式如下 其中列表中的属性会在每条消息重新发布之前,设置在消息的基本属性上。 这个特定示例将把所有重新发布的消息标记为持久化。 默认情况下,消息的原始属性会保留,但可以使用此子句更改或设置任何已知属性
|
| publish_fields | 此可选键类似于 其中列表中的属性用于设置用于重新发布消息的 默认情况下,消息使用原始交换机名称和路由键进行重新发布。通过指定 消息将被重新发布到显式的交换机名称,并带有固定的路由键。 |
| add_timestamp_header | 此布尔键控制在重新发布消息之前,是否添加自定义标头 此标头的值是消息被 Shovel 传输时的时间戳(自纪元以来的秒数)。默认情况下不添加此标头。 |
| add_forward_headers | 设置为 true 时,Shovel 将添加多个自定义消息标头: |
AMQP 1.0 目标键
| 键 | 描述 |
| target_address | 这代表发送 AMQP 1.0 链路的目标地址。 |
| properties | 此可选键控制在重新发布消息时将添加哪些额外属性。其形式如下 可用的键包括 |
| application_properties | 此可选键声明在重新发布消息时要添加的任何额外应用程序属性。其形式如下 键和值应为二进制字符串,如下例所示。 |
| add_timestamp_header | 此布尔键控制在重新发布消息之前,是否在消息上设置 此值是消息被 Shovel 传输时的时间戳(自纪元以来的秒数)。默认情况下不设置此属性。 |
| add_forward_headers | 当设置为 true 时,Shovel 将为以下键添加应用程序属性: |
本地 Shovel 目标键
| 键 | 描述 |
| add_timestamp_header | 此布尔键控制在重新发布消息之前,是否添加自定义标头 此标头的值是消息被 Shovel 传输时的时间戳(自纪元以来的秒数)。默认情况下不添加此标头。 |
| add_forward_headers | 设置为 true 时,Shovel 将添加多个自定义消息标头: |
示例配置
在 AMQP 0.9.1 端点之间进行的一个相当完整的静态 Shovel 配置可能如下所示
{rabbitmq_shovel,
[ {shovels, [ {my_first_shovel,
[ {source,
[ {protocol, amqp091},
{uris, [ "amqp://fred:secret@host1.domain/my_vhost",
"amqp://john:secret@host2.domain/my_vhost" ]},
{declarations, [ {'exchange.declare',
[ {exchange, <<"my_fanout">>},
{type, <<"fanout">>},
durable
]},
{'queue.declare',
[{arguments,
[{<<"x-message-ttl">>, long, 60000}]}]},
{'queue.bind',
[ {exchange, <<"my_fanout">>},
{queue, <<>>}
]}
]},
{queue, <<>>},
{prefetch_count, 10}
]},
{destination,
[ {protocol, amqp091},
{uris, ["amqp://"]},
{declarations, [ {'exchange.declare',
[ {exchange, <<"my_direct">>},
{type, <<"direct">>},
durable
]}
]},
{publish_properties, [ {delivery_mode, 2} ]},
{add_forward_headers, true},
{publish_fields, [ {exchange, <<"my_direct">>},
{routing_key, <<"from_shovel">>}
]}
]},
{ack_mode, on_confirm},
{reconnect_delay, 5}
]}
]}
]}
上面的配置定义了一个名为 'my_first_shovel' 的单一 Shovel。
'my_first_shovel' 将连接到 host1 或 host2 上的代理(作为源),并直接连接到本地代理作为目标。在发生故障时,它将在 5 秒的延迟后重新连接到另一个源代理。
连接到源时,它将声明一个直连的扇出 (fanout) 交换机 "my_fanout",一个带有每队列消息 TTL 的匿名队列,并将队列绑定到该交换机。
连接到目标(本地代理)时,它将声明一个名为 "my_direct" 的持久化直连交换机。
此 Shovel 将把发送到源上匿名队列的消息重新发布到本地交换机,并带有固定的路由键 "from_shovel"。消息将是持久化的,并且只有在收到来自本地代理的发布确认后才会被确认。
如果此时有至少十条未确认的消息,Shovel 消费者将不会再接收投递。
示例配置(1.0 源 - 0.9.1 目标)
在 AMQP 1.0 源和 AMQP 0.9.1 目标之间的一个相当完整的 Shovel 配置可能如下所示
{rabbitmq_shovel,
[ {shovels, [ {my_first_shovel,
[ {source,
[ {protocol, amqp10},
{uris, [ "amqp://fred:secret@host1.domain/my_vhost",
]},
{source_address, <<"my-source">>},
{prefetch_count, 10}
]},
{destination,
[ {protocol, amqp091},
{uris, ["amqp://"]},
{declarations, [ {'exchange.declare',
[ {exchange, <<"my_direct">>},
{type, <<"direct">>},
durable
]}
]},
{publish_properties, [ {delivery_mode, 2} ]},
{add_forward_headers, true},
{publish_fields, [ {exchange, <<"my_direct">>},
{routing_key, <<"from_shovel">>}
]}
]},
{ack_mode, on_confirm},
{reconnect_delay, 5}
]}
]}
]}
示例配置(0.9.1 源 — 1.0 目标)
在 AMQP 0.9.1 源和 AMQP 1.0 目标之间的一个更广泛的 Shovel 配置可能如下所示
{rabbitmq_shovel,
[{shovels, [{my_first_shovel,
{source,
[{protocol, amqp091},
{uris, ["amqp://fred:secret@host1.domain/my_vhost",
"amqp://john:secret@host2.domain/my_vhost"]},
{declarations, [{'exchange.declare',
[{exchange, <<"my_fanout">>},
{type, <<"fanout">>},
durable]},
{'queue.declare',
[{arguments,
[{<<"x-message-ttl">>, long, 60000}]}]},
{'queue.bind',
[{exchange, <<"my_fanout">>},
{queue, <<>>}
]}
]},
{queue, <<>>},
{prefetch_count, 10}
]},
{destination,
[{protocol, amqp10},
%% Note: for plain text SASL authentication, use
% {uris, ["amqp://user:pass@host:5672?sasl=plain"]},
%% Note: this relies on default user credentials
%% which has remote access restrictions, see
%% ./access-control to learn more
{uris, ["amqp://host:5672"]},
{properties, [{user_id, <<"my-user">>}]},
{application_properties, [{<<"my-prop">>, <<"my-prop-value">>}]},
{add_forward_headers, true},
{target_address, <<"destination-queue">>}
]},
{ack_mode, on_confirm},
{reconnect_delay, 5}
}]}
]}
}