配置静态 Shovel
概述
本指南重点介绍静态配置的 Shovel。它假设您熟悉Shovel 插件背后的关键概念。
与动态 Shovel不同,静态 Shovel 使用高级配置文件进行配置。它们在节点启动时启动,主要用于永久运行的工作负载。任何对静态 Shovel 配置的更改都需要重新启动节点,这使得它们极具局限性。
大多数用户应该优先选择动态 Shovel 而不是静态 Shovel,因为它们更灵活,并且易于自动化。与静态 Shovel 定义(使用 Erlang 项)相比,生成动态 Shovel 定义(JSON 文档)通常更容易。
配置
Shovel 插件的配置必须在高级配置文件中定义。
它包含一个名为 shovels
的单一子句,该子句列出了应在节点启动时启动的 Shovel。
{rabbit, [
%% ...
]},
{rabbitmq_shovel, [
{shovels, [
{shovel_one, [
%% shovel_one properties ...
]},
%% ...
]}
]}
下面可以找到一个(故意冗长的)示例配置。
shovels
子句的每个元素都是一个命名的静态 Shovel。列表中的名称必须是唯一的。
Shovel 定义在顶层看起来像这样
{shovel_name, [
{source, [
%% protocol-specific source configuration goes here
]},
{destination, [
%% protocol-specific destination configuration goes here
]},
%% 'confirm' is the default acknowledgement mode
{ack_mode, confirm},
%% reconnect with a 5 second delay
{reconnect_delay, 5}
]}
其中 shovel_name
是 Shovel 的名称(一个 Erlang 原子)。如果名称不是以小写字母开头或包含除字母数字字符、下划线 (_
) 或 @
之外的其他字符,则应将其括在单引号 ('
) 中。
源和目标
Shovel 将消息从源传输到目标。
source
和 destination
密钥是必须的,并且包含嵌套的协议特定密钥。目前,AMQP 0.9.1 和 AMQP 1.0 是两种受支持的协议。源和目标不必使用相同的协议。所有其他属性都是可选的。
source
是一个必须的密钥,并且针对不同的协议具有不同的密钥属性。所有协议都有两个公共属性:protocol
和 uris
。protocol
支持两个值:amqp091
和 amqp10
,分别表示 AMQP 0-9-1 和 AMQP 1.0。
%% for AMQP 0-9-1
{protocol, amqp091}
uris
是AMQP 连接 URI的列表。
{uris, [
"amqp://fred:[email protected]/my_vhost",
"amqp://john:[email protected]/my_vhost"
]}
URI 语法扩展为包含查询部分,以允许配置其他连接参数。请参阅查询参数参考,这些参数可用于静态 Shovel,例如 TLS 证书和私钥。
常规源密钥
某些密钥受 AMQP 0-9-1 和 AMQP 1.0 源支持。它们在下面的表格中进行了描述。
密钥 | 描述 |
reconnect-delay | 在断开连接任一端后,等待重新连接到代理之前的时间间隔(以秒为单位)。默认为 1。
将在故障后延迟五秒钟重新连接。值为 |
ack-mode | 确定 Shovel 如何确认已消费的消息。有效值为 如果设置为 如果设置为 如果设置为 |
AMQP 0-9-1 源密钥
AMQP 0-9-1 特定的源密钥在单独的表格中介绍。
密钥 | 描述 |
declarations | Shovel 开始传输消息之前要执行的 AMQP 0-9-1 操作的可选列表。它们通常用于设置拓扑结构。
声明遵循RabbitMQ Erlang 客户端使用的 Method 和属性名称。 一个最小的声明示例
将首先声明一个匿名队列,然后将其绑定到名为 声明列表的每个元素要么是作为单引号原子给出的 AMQP 0-9-1 方法,例如 如果只使用方法名称,则所有参数都采用其默认值(如上面的 如果提供了元组和属性列表,则列表中的属性会明确指定某些或所有参数。 这是另一个示例
将声明一个名为“ |
queue | 源队列的名称,作为 Erlang 二进制值。此属性是必须的。
此队列必须存在。使用上面介绍的 另请参阅下面的预声明拓扑部分。 |
prefetch-count | 任何时候通过 Shovel 复制的未确认消息的最大数量。默认为
|
预声明拓扑
declarations
属性通常用于设置拓扑结构。至少,它必须设置源队列。
在某些部署场景中,拓扑结构会在启动时从定义文件自动导入。在这些场景中,我们可以配置插件等待队列可用,方法是在 rabbitmq.conf
文件中添加以下行
shovel.topology.predeclared = true
使用上述配置,如果静态 Shovel 没有 declarations
属性或其为空,则插件将等到源的 queue
最终声明。
AMQP 1.0 源密钥
AMQP 1.0 源设置与 AMQP 0-9-1 源设置不同。
密钥 | 描述 |
source_address | 这表示 AMQP 1.0 链接的源地址。此密钥是必须的。
|
prefetch-count | 此可选密钥设置将授予接收链接的链接信用额度。当信用额度降至此值的十分之一以下时,将自动续订。默认为 1000。它采用以下形式
|
目标
destination
是一个必须的密钥,并且针对不同的协议具有不同的密钥属性。所有协议都有两个公共属性:protocol
和 uris
。protocol
支持两个值:amqp091
和 amqp10
,分别表示 AMQP 0-9-1 和 AMQP 1.0。
%% for AMQP 0-9-1
{protocol, amqp091}
uris
是AMQP 连接 URI的列表。
{uris, [
"amqp://fred:[email protected]/my_vhost",
"amqp://john:[email protected]/my_vhost"
]}
URI 语法扩展为包含查询部分,以允许配置其他连接参数。请参阅查询参数参考,这些参数可用于静态 Shovel,例如 TLS 证书和私钥。
常规目标密钥
密钥 | 描述 |
reconnect-delay | 在断开连接任一端后,等待重新连接到代理之前的时间间隔(以秒为单位)。默认为 1。
将在故障后延迟五秒钟重新连接。值为 |
AMQP 0-9-1 目标密钥
密钥 | 描述 |
publish_properties | 此可选密钥控制 Shovel 设置或覆盖的消息属性。它采用以下形式
其中列表中的属性在重新发布之前设置在每条消息的基本属性上。 此特定示例将所有重新发布的消息标记为持久性。
默认情况下,消息的原始属性将保留,但此子句可用于更改或设置任何已知属性。
|
publish_fields | 此可选密钥类似于
其中列表中的属性用于设置用于重新发布消息的 默认情况下,消息使用原始交换机名称和路由密钥重新发布。通过指定
消息将重新发布到显式交换机名称和显式、固定路由密钥。 |
add_timestamp_header | 此布尔密钥控制是否会在重新发布消息之前向消息添加自定义标头
此标头值是 Shovel 消息时的事件戳(以自纪元以来的秒数为单位)。默认情况下,不会添加此标头。 |
add_forward_headers | 设置为 true 时,Shovel 将添加一些自定义消息标头:
|
AMQP 1.0 目标密钥
密钥 | 描述 |
target_address | 这表示发送 AMQP 1.0 链接的目标地址。
|
属性 | 此可选键控制在重新发布消息时将添加哪些其他属性。它采用以下形式:
可用的键包括 |
应用程序属性 | 此可选键声明在重新发布消息时要添加的任何其他应用程序属性。它采用以下形式:
键和值应为二进制字符串,如下例所示。 |
add_timestamp_header | 此布尔键控制是否在重新发布消息之前在其上设置
此值为消息被转储时的时间戳(以自纪元以来的秒数表示)。默认情况下,不会设置此属性。 |
add_forward_headers | 设置为 true 时,转储将为以下键添加应用程序属性:
|
示例配置
在 AMQP 0.9.1 端点之间的一个相当完整的静态转储配置可能如下所示
{rabbitmq_shovel,
[ {shovels, [ {my_first_shovel,
[ {source,
[ {protocol, amqp091},
{uris, [ "amqp://fred:[email protected]/my_vhost",
"amqp://john:[email protected]/my_vhost" ]},
{declarations, [ {'exchange.declare',
[ {exchange, <<"my_fanout">>},
{type, <<"fanout">>},
durable
]},
{'queue.declare',
[{arguments,
[{<<"x-message-ttl">>, long, 60000}]}]},
{'queue.bind',
[ {exchange, <<"my_fanout">>},
{queue, <<>>}
]}
]},
{queue, <<>>},
{prefetch_count, 10}
]},
{destination,
[ {protocol, amqp091},
{uris, ["amqp://"]},
{declarations, [ {'exchange.declare',
[ {exchange, <<"my_direct">>},
{type, <<"direct">>},
durable
]}
]},
{publish_properties, [ {delivery_mode, 2} ]},
{add_forward_headers, true},
{publish_fields, [ {exchange, <<"my_direct">>},
{routing_key, <<"from_shovel">>}
]}
]},
{ack_mode, on_confirm},
{reconnect_delay, 5}
]}
]}
]}
以上配置定义了一个名为'my_first_shovel'
的单个转储。
'my_first_shovel'
将连接到host1
或host2
上的代理(作为源),并直接连接到本地代理作为目标。它将在发生故障后延迟 5 秒后重新连接到另一个源代理。
连接到源后,它将声明一个名为"my_fanout"
的直接扇出交换机,一个具有每个队列消息 TTL的匿名队列,并将队列绑定到交换机。
连接到目标(本地代理)后,它将声明一个名为"my_direct"
的持久直接交换机。
此转储将重新发布发送到源上匿名队列的消息到本地交换机,并使用固定路由键"from_shovel"
。消息将是持久的,并且仅在收到本地代理的发布确认后才确认。
如果在任何时刻至少有十条未确认的消息,则转储消费者将不会获得更多传递。
示例配置(1.0 源 - 0.9.1 目标)
在 AMQP 1.0 源和 AMQP 0.9.1 目标之间的一个相当完整的转储配置可能如下所示
{rabbitmq_shovel,
[ {shovels, [ {my_first_shovel,
[ {source,
[ {protocol, amqp10},
{uris, [ "amqp://fred:[email protected]/my_vhost",
]},
{source_address, <<"my-source">>},
{prefetch_count, 10}
]},
{destination,
[ {protocol, amqp091},
{uris, ["amqp://"]},
{declarations, [ {'exchange.declare',
[ {exchange, <<"my_direct">>},
{type, <<"direct">>},
durable
]}
]},
{publish_properties, [ {delivery_mode, 2} ]},
{add_forward_headers, true},
{publish_fields, [ {exchange, <<"my_direct">>},
{routing_key, <<"from_shovel">>}
]}
]},
{ack_mode, on_confirm},
{reconnect_delay, 5}
]}
]}
]}
示例配置(0.9.1 源 - 1.0 目标)
在 AMQP 0.9.1 源和 AMQP 1.0 目标之间更广泛的转储配置可能如下所示
{rabbitmq_shovel,
[{shovels, [{my_first_shovel,
{source,
[{protocol, amqp091},
{uris, ["amqp://fred:[email protected]/my_vhost",
"amqp://john:[email protected]/my_vhost"]},
{declarations, [{'exchange.declare',
[{exchange, <<"my_fanout">>},
{type, <<"fanout">>},
durable]},
{'queue.declare',
[{arguments,
[{<<"x-message-ttl">>, long, 60000}]}]},
{'queue.bind',
[{exchange, <<"my_fanout">>},
{queue, <<>>}
]}
]},
{queue, <<>>},
{prefetch_count, 10}
]},
{destination,
[{protocol, amqp10},
%% Note: for plain text SASL authentication, use
% {uris, ["amqp://user:pass@host:5672?sasl=plain"]},
%% Note: this relies on default user credentials
%% which has remote access restrictions, see
%% ./access-control to learn more
{uris, ["amqp://host:5672"]},
{properties, [{user_id, <<"my-user">>}]},
{application_properties, [{<<"my-prop">>, <<"my-prop-value">>}]},
{add_forward_headers, true},
{target_address, <<"destination-queue">>}
]},
{ack_mode, on_confirm},
{reconnect_delay, 5}
}]}
]}
}