配置静态 Shovel
概述
本指南重点介绍静态配置的 shovel。它假定您熟悉Shovel 插件背后的关键概念。
与动态 shovel不同,静态 shovel 使用高级配置文件进行配置。 它们在节点启动时启动,主要用于永久运行的工作负载。 对静态 shovel 配置的任何更改都需要节点重启,这使得它们非常不灵活。
大多数用户应优先选择动态 shovel而不是静态 shovel,因为它们具有灵活性且易于自动化。 生成动态 shovel 定义(JSON 文档)通常比静态 shovel 定义(使用 Erlang 术语)更容易。
配置
Shovel 插件的配置必须在高级配置文件中定义。
它由一个 shovels
子句组成,该子句列出了应在节点启动时启动的 shovel
{rabbit, [
%% ...
]},
{rabbitmq_shovel, [
{shovels, [
{shovel_one, [
%% shovel_one properties ...
]},
%% ...
]}
]}
下面可以找到一个(特意详细的)配置示例。
shovels
子句的每个元素都是一个命名的静态 shovel。 列表中的名称必须不同。
shovel 定义在顶层看起来像这样
{shovel_name, [
{source, [
%% protocol-specific source configuration goes here
]},
{destination, [
%% protocol-specific destination configuration goes here
]},
%% 'confirm' is the default acknowledgement mode
{ack_mode, confirm},
%% reconnect with a 5 second delay
{reconnect_delay, 5}
]}
其中 shovel_name
是 shovel 的名称(Erlang 原子)。 如果名称不是以小写字母开头,或者包含字母数字字符、下划线 (_
) 或 @
以外的其他字符,则应将名称用单引号 ('
) 括起来。
源和目标
shovel 将消息从源传输到目标。
source
和 destination
键是强制性的,并且包含嵌套的协议特定键。 目前,AMQP 0.9.1 和 AMQP 1.0 是两种受支持的协议。 源和目标不必使用相同的协议。 所有其他属性都是可选的。
source
是一个强制键,并且对于不同的协议具有不同的键属性。 所有协议都通用两个属性:protocol
和 uris
。 protocol
支持两个值:amqp091
和 amqp10
,分别用于 AMQP 0-9-1 和 AMQP 1.0
%% for AMQP 0-9-1
{protocol, amqp091}
uris
是 AMQP 连接 URI 的列表
{uris, [
"amqp://fred:secret@host1.domain/my_vhost",
"amqp://john:secret@host2.domain/my_vhost"
]}
URI 语法已扩展为包含查询部分,以允许配置其他连接参数。 请参阅 查询参数参考,静态 shovel 可以使用这些参数,例如 TLS 证书和私钥。
通用源键
AMQP 0-9-1 和 AMQP 1.0 源都支持某些键。 下表描述了这些键。
键 | 描述 |
reconnect-delay | 在任一端断开连接后,重新连接到 Broker 之前等待的持续时间(以秒为单位)。 默认为 1。
在失败后重新连接之前将延迟五秒钟。 值 |
ack-mode | 确定 shovel 应如何确认已消费的消息。 有效值为 如果设置为 如果设置为 如果设置为 |
AMQP 0-9-1 源键
AMQP 0-9-1 特定的源键在单独的表中介绍
键 | 描述 |
declarations | 在 Shovel 开始传输消息之前,要由 Shovel 执行的可选 AMQP 0-9-1 操作列表。 它们通常用于设置拓扑。
声明遵循RabbitMQ Erlang 客户端使用的方法和属性名称。 一个最简化的声明示例
将首先声明一个匿名队列,然后将其绑定到名为 声明列表的每个元素要么是作为单引号原子给出的 AMQP 0-9-1 方法,例如 如果仅使用方法名称,则所有参数都采用其默认值(如上面的 如果提供元组和属性列表,则列表中的属性显式指定部分或全部参数。 这是另一个示例
将声明一个持久的、直接交换机,名为 “ |
queue | 源队列的名称,作为 Erlang 二进制值。 此属性是强制性的
此队列必须存在。 使用上面介绍的资源 另请参阅下面的预声明拓扑部分。 |
prefetch-count | 在任何给定时间通过 shovel 复制的最大未确认消息数。 默认为
|
预声明拓扑
declarations
属性通常用于设置拓扑。 至少,它必须设置源队列。
在某些部署场景中,拓扑会自动从启动时的定义文件导入。 在这些场景中,我们可以配置插件以等待队列可用,方法是将以下行添加到 rabbitmq.conf
文件中
shovel.topology.predeclared = true
通过上述配置,如果静态 shovel 没有 declarations
属性或该属性为空,则插件将等待直到最终声明源的 queue
。
AMQP 1.0 源键
AMQP 1.0 源设置与 AMQP 0-9-1 源的设置不同。
键 | 描述 |
source_address | 这表示 AMQP 1.0 链接的源地址。 此键是强制性的
|
prefetch-count | 此可选键设置将授予接收链接的链接信用额度。 当信用额度降至该值的 1/10 以下时,信用额度将自动续订。 默认值为 1000。 它采用以下形式
|
目标
destination
是一个强制键,并且对于不同的协议具有不同的键属性。 所有协议都通用两个属性:protocol
和 uris
。 protocol
支持两个值:amqp091
和 amqp10
,分别用于 AMQP 0-9-1 和 AMQP 1.0
%% for AMQP 0-9-1
{protocol, amqp091}
uris
是 AMQP 连接 URI 的列表
{uris, [
"amqp://fred:secret@host1.domain/my_vhost",
"amqp://john:secret@host2.domain/my_vhost"
]}
URI 语法已扩展为包含查询部分,以允许配置其他连接参数。 请参阅 查询参数参考,静态 shovel 可以使用这些参数,例如 TLS 证书和私钥。
通用目标键
键 | 描述 |
reconnect-delay | 在任一端断开连接后,重新连接到 Broker 之前等待的持续时间(以秒为单位)。 默认为 1。
在失败后重新连接之前将延迟五秒钟。 值 |
AMQP 0-9-1 目标键
键 | 描述 |
publish_properties | 此可选键控制 shovel 设置或覆盖的消息属性。 它采用以下形式
其中列表中的属性在重新发布之前设置在每个消息的基本属性上。 此特定示例会将所有重新发布的消息标记为持久性
默认情况下,消息的原始属性将保留,但此子句可用于更改或设置任何已知属性
|
publish_fields | 此可选键类似于
其中列表中的属性用于设置用于重新发布消息的 默认情况下,消息使用原始交换机名称和路由键重新发布。 通过指定
消息将被重新发布到具有显式固定路由键的显式交换机名称。 |
add_timestamp_header | 此布尔键控制是否在重新发布消息之前将自定义标头
此标头值是 shovel 消息时的时间戳(自纪元以来的秒数)。 默认情况下,不添加标头。 |
add_forward_headers | 设置为 true 时,shovel 将添加多个自定义消息标头:
|
AMQP 1.0 目标键
键 | 描述 |
target_address | 这表示发送 AMQP 1.0 链接的目标地址
|
properties | 此可选键控制重新发布消息时将添加哪些其他属性。 它采用以下形式
可用键包括 |
application_properties | 此可选键声明在重新发布消息时要添加的任何其他应用程序属性。 它采用以下形式
键和值应为二进制字符串,如下例所示。 |
add_timestamp_header | 此布尔键控制是否在重新发布消息之前在消息上设置
此值是 shovel 消息时的时间戳(自纪元以来的秒数)。 默认情况下,不设置该属性。 |
add_forward_headers | 设置为 true 时,shovel 将为以下键添加应用程序属性:
|
配置示例
AMQP 0.9.1 端点之间的一个相当完整的静态 shovel 配置可能如下所示
{rabbitmq_shovel,
[ {shovels, [ {my_first_shovel,
[ {source,
[ {protocol, amqp091},
{uris, [ "amqp://fred:secret@host1.domain/my_vhost",
"amqp://john:secret@host2.domain/my_vhost" ]},
{declarations, [ {'exchange.declare',
[ {exchange, <<"my_fanout">>},
{type, <<"fanout">>},
durable
]},
{'queue.declare',
[{arguments,
[{<<"x-message-ttl">>, long, 60000}]}]},
{'queue.bind',
[ {exchange, <<"my_fanout">>},
{queue, <<>>}
]}
]},
{queue, <<>>},
{prefetch_count, 10}
]},
{destination,
[ {protocol, amqp091},
{uris, ["amqp://"]},
{declarations, [ {'exchange.declare',
[ {exchange, <<"my_direct">>},
{type, <<"direct">>},
durable
]}
]},
{publish_properties, [ {delivery_mode, 2} ]},
{add_forward_headers, true},
{publish_fields, [ {exchange, <<"my_direct">>},
{routing_key, <<"from_shovel">>}
]}
]},
{ack_mode, on_confirm},
{reconnect_delay, 5}
]}
]}
]}
上面的配置定义了一个名为 'my_first_shovel'
的 shovel。
'my_first_shovel'
将连接到 host1
或 host2
上的 Broker(作为源),并直接连接到本地 Broker 作为目标。 它将在失败后延迟 5 秒重新连接到另一个源 Broker。
连接到源时,它将声明一个直接的扇出交换机,名为 "my_fanout"
,一个具有每个队列消息 TTL 的匿名队列,并将队列绑定到交换机。
连接到目标(本地 Broker)时,它将声明一个持久的直接交换机,名为 "my_direct"
。
此 shovel 将把发送到源上匿名队列的消息重新发布到具有固定路由键 "from_shovel"
的本地交换机。 消息将是持久性的,并且仅在收到来自本地 Broker 的发布确认后才会被确认。
如果任何时候至少有十个未确认的消息,则 shovel 消费者将不会获得更多交付。
配置示例(1.0 源 - 0.9.1 目标)
AMQP 1.0 源和 AMQP 0.9.1 目标之间的一个相当完整的 shovel 配置可能如下所示
{rabbitmq_shovel,
[ {shovels, [ {my_first_shovel,
[ {source,
[ {protocol, amqp10},
{uris, [ "amqp://fred:secret@host1.domain/my_vhost",
]},
{source_address, <<"my-source">>},
{prefetch_count, 10}
]},
{destination,
[ {protocol, amqp091},
{uris, ["amqp://"]},
{declarations, [ {'exchange.declare',
[ {exchange, <<"my_direct">>},
{type, <<"direct">>},
durable
]}
]},
{publish_properties, [ {delivery_mode, 2} ]},
{add_forward_headers, true},
{publish_fields, [ {exchange, <<"my_direct">>},
{routing_key, <<"from_shovel">>}
]}
]},
{ack_mode, on_confirm},
{reconnect_delay, 5}
]}
]}
]}
配置示例(0.9.1 源 — 1.0 目标)
AMQP 0.9.1 源和 AMQP 1.0 目标之间的一个更广泛的 shovel 配置可能如下所示
{rabbitmq_shovel,
[{shovels, [{my_first_shovel,
{source,
[{protocol, amqp091},
{uris, ["amqp://fred:secret@host1.domain/my_vhost",
"amqp://john:secret@host2.domain/my_vhost"]},
{declarations, [{'exchange.declare',
[{exchange, <<"my_fanout">>},
{type, <<"fanout">>},
durable]},
{'queue.declare',
[{arguments,
[{<<"x-message-ttl">>, long, 60000}]}]},
{'queue.bind',
[{exchange, <<"my_fanout">>},
{queue, <<>>}
]}
]},
{queue, <<>>},
{prefetch_count, 10}
]},
{destination,
[{protocol, amqp10},
%% Note: for plain text SASL authentication, use
% {uris, ["amqp://user:pass@host:5672?sasl=plain"]},
%% Note: this relies on default user credentials
%% which has remote access restrictions, see
%% ./access-control to learn more
{uris, ["amqp://host:5672"]},
{properties, [{user_id, <<"my-user">>}]},
{application_properties, [{<<"my-prop">>, <<"my-prop-value">>}]},
{add_forward_headers, true},
{target_address, <<"destination-queue">>}
]},
{ack_mode, on_confirm},
{reconnect_delay, 5}
}]}
]}
}