跳至主要内容
版本: 4.0

配置静态 Shovel

概述

本指南重点介绍静态配置的 Shovel。它假设您熟悉Shovel 插件背后的关键概念。

动态 Shovel不同,静态 Shovel 使用高级配置文件进行配置。它们在节点启动时启动,主要用于永久运行的工作负载。任何对静态 Shovel 配置的更改都需要重新启动节点,这使得它们极具局限性。

大多数用户应该优先选择动态 Shovel 而不是静态 Shovel,因为它们更灵活,并且易于自动化。与静态 Shovel 定义(使用 Erlang 项)相比,生成动态 Shovel 定义(JSON 文档)通常更容易。

配置

Shovel 插件的配置必须在高级配置文件中定义。

它包含一个名为 shovels 的单一子句,该子句列出了应在节点启动时启动的 Shovel。

{rabbit, [
%% ...
]},

{rabbitmq_shovel, [
{shovels, [
{shovel_one, [
%% shovel_one properties ...
]},
%% ...
]}
]}

下面可以找到一个(故意冗长的)示例配置

shovels 子句的每个元素都是一个命名的静态 Shovel。列表中的名称必须是唯一的。

Shovel 定义在顶层看起来像这样

{shovel_name, [
{source, [
%% protocol-specific source configuration goes here
]},
{destination, [
%% protocol-specific destination configuration goes here
]},
%% 'confirm' is the default acknowledgement mode
{ack_mode, confirm},
%% reconnect with a 5 second delay
{reconnect_delay, 5}
]}

其中 shovel_name 是 Shovel 的名称(一个 Erlang 原子)。如果名称不是以小写字母开头或包含除字母数字字符、下划线 (_) 或 @ 之外的其他字符,则应将其括在单引号 (') 中。

源和目标

Shovel 将消息从源传输到目标。

sourcedestination 密钥是必须的,并且包含嵌套的协议特定密钥。目前,AMQP 0.9.1 和 AMQP 1.0 是两种受支持的协议。源和目标不必使用相同的协议。所有其他属性都是可选的。

source 是一个必须的密钥,并且针对不同的协议具有不同的密钥属性。所有协议都有两个公共属性:protocolurisprotocol 支持两个值:amqp091amqp10,分别表示 AMQP 0-9-1 和 AMQP 1.0。

%% for AMQP 0-9-1
{protocol, amqp091}

urisAMQP 连接 URI的列表。

{uris, [
"amqp://fred:secret@host1.domain/my_vhost",
"amqp://john:secret@host2.domain/my_vhost"
]}

URI 语法扩展为包含查询部分,以允许配置其他连接参数。请参阅查询参数参考,这些参数可用于静态 Shovel,例如 TLS 证书和私钥。

常规源密钥

某些密钥受 AMQP 0-9-1 和 AMQP 1.0 源支持。它们在下面的表格中进行了描述。

通用(协议无关)静态 Shovel 密钥(属性)
密钥描述
reconnect-delay

在断开连接任一端后,等待重新连接到代理之前的时间间隔(以秒为单位)。默认为 1。

{reconnect_delay, 5}

将在故障后延迟五秒钟重新连接。值为 0 表示不重新连接:Shovel 将在第一次故障或连接尝试失败后停止。

ack-mode

确定 Shovel 如何确认已消费的消息。有效值为 on-confirmon-publishno-ack。默认情况下使用 on-confirm

如果设置为 on-confirm(默认值),则在目标确认消息后,将向源代理确认消息。这处理网络错误和代理故障,而不会丢失消息,并且是最慢的选项。

如果设置为 on-publish,则在消息在目标发布(但尚未确认)后,将向源代理确认消息。这处理网络错误,而不会丢失消息,但在代理发生故障时可能会丢失消息。

如果设置为 no-ack,将使用自动消息确认。此选项将提供最高的吞吐量,但并不安全(在网络或代理发生故障时会丢失消息)。

AMQP 0-9-1 源密钥

AMQP 0-9-1 特定的源密钥在单独的表格中介绍。

AMQP 0-9-1 源密钥(属性)
密钥描述
declarations

Shovel 开始传输消息之前要执行的 AMQP 0-9-1 操作的可选列表。它们通常用于设置拓扑结构。

  {declarations, [
%% declaration list
]}

声明遵循RabbitMQ Erlang 客户端使用的 Method 和属性名称。

一个最小的声明示例

  {declarations, [
'queue.declare',
{'queue.bind', [
{exchange, <<"my_exchange">>},
{queue, <<>>}
]}
]}

将首先声明一个匿名队列,然后将其绑定到名为 "my_exchange" 的交换机。queue.bind 方法上的 <<>> 队列名称表示“使用此通道上最后声明的队列”。

声明列表的每个元素要么是作为单引号原子给出的 AMQP 0-9-1 方法,例如 'queue.declare',要么是元组,其第一个元素是方法原子,第二个元素是参数的属性列表。

如果只使用方法名称,则所有参数都采用其默认值(如上面的 'queue.declare' 所示)。

如果提供了元组和属性列表,则列表中的属性会明确指定某些或所有参数。

这是另一个示例

{'exchange.declare', [
{exchange, <<"my_exchange">>},
{type, <<"direct">>},
durable
]}

将声明一个名为“my_exchange”的持久、直接交换机。

queue

源队列的名称,作为 Erlang 二进制值。此属性是必须的。

{queue, <<"queue.1">>}

queue.1 是要从中 Shovel 消息的队列的名称,作为二进制字符串。

此队列必须存在。使用上面介绍的 declarations 资源声明队列或确保其存在。如果值为 <<>>(空二进制字符串),则使用 declarations最近声明的队列。这允许声明和使用匿名队列。

另请参阅下面的预声明拓扑部分

prefetch-count

任何时候通过 Shovel 复制的未确认消息的最大数量。默认为 1000

{prefetch_count, 1000}

预声明拓扑

declarations 属性通常用于设置拓扑结构。至少,它必须设置源队列。

在某些部署场景中,拓扑结构会在启动时从定义文件自动导入。在这些场景中,我们可以配置插件等待队列可用,方法是在 rabbitmq.conf 文件中添加以下行

shovel.topology.predeclared = true

使用上述配置,如果静态 Shovel 没有 declarations 属性或其为空,则插件将等到源的 queue 最终声明。

AMQP 1.0 源密钥

AMQP 1.0 源设置与 AMQP 0-9-1 源设置不同。

AMQP 1.0 源密钥(属性)
密钥描述
source_address

这表示 AMQP 1.0 链接的源地址。此密钥是必须的。

{source_address, <<"my-address">>}
prefetch-count

此可选密钥设置将授予接收链接的链接信用额度。当信用额度降至此值的十分之一以下时,将自动续订。默认为 1000。它采用以下形式

  {prefetch_count, 10}

目标

destination 是一个必须的密钥,并且针对不同的协议具有不同的密钥属性。所有协议都有两个公共属性:protocolurisprotocol 支持两个值:amqp091amqp10,分别表示 AMQP 0-9-1 和 AMQP 1.0。

%% for AMQP 0-9-1
{protocol, amqp091}

urisAMQP 连接 URI的列表。

{uris, [
"amqp://fred:secret@host1.domain/my_vhost",
"amqp://john:secret@host2.domain/my_vhost"
]}

URI 语法扩展为包含查询部分,以允许配置其他连接参数。请参阅查询参数参考,这些参数可用于静态 Shovel,例如 TLS 证书和私钥。

常规目标密钥

常规目标密钥(属性)
密钥描述
reconnect-delay

在断开连接任一端后,等待重新连接到代理之前的时间间隔(以秒为单位)。默认为 1。

{reconnect_delay, 5}

将在故障后延迟五秒钟重新连接。值为 0 表示不重新连接:Shovel 将在第一次故障或连接尝试失败后停止。

AMQP 0-9-1 目标密钥

AMQP 0-9-1 目标密钥(属性)
密钥描述
publish_properties

此可选密钥控制 Shovel 设置或覆盖的消息属性。它采用以下形式

{publish_properties, [
{delivery_mode, 2}
]}

其中列表中的属性在重新发布之前设置在每条消息的基本属性上。

此特定示例将所有重新发布的消息标记为持久性。

{publish_properties, [
{delivery_mode, 2}
]}

默认情况下,消息的原始属性将保留,但此子句可用于更改或设置任何已知属性。

  • content_type
  • content_encoding
  • headers
  • delivery_mode
  • priority
  • correlation_id
  • reply_to
  • expiration
  • message_id
  • timestamp
  • type
  • user_id
  • app_id
  • cluster_id

publish_fields

此可选密钥类似于 publish_properties,但控制发布设置而不是消费者可以访问的消息属性。它采用以下形式

{publish_fields, [
{exchange, <<"my_exchange">>},
{routing_key, <<"from_shovel">>}
]}

其中列表中的属性用于设置用于重新发布消息的 basic.publish 方法上的字段

默认情况下,消息使用原始交换机名称和路由密钥重新发布。通过指定

{publish_fields, [
{exchange, <<"my_exchange">>},
{routing_key, <<"from_shovel">>}
]}

消息将重新发布到显式交换机名称和显式、固定路由密钥。

add_timestamp_header

此布尔密钥控制是否会在重新发布消息之前向消息添加自定义标头 x-shovelled-timestamp

{add_timestamp_header, true}

此标头值是 Shovel 消息时的事件戳(以自纪元以来的秒数为单位)。默认情况下,不会添加此标头。

add_forward_headers

设置为 true 时,Shovel 将添加一些自定义消息标头:shovelled-byshovel-typeshovel-name,以提供有关传输的一些其他元数据。

{add_forward_headers, true}

AMQP 1.0 目标密钥

AMQP 1.0 目标密钥(属性)
密钥描述
target_address

这表示发送 AMQP 1.0 链接的目标地址。

{target_address, <<"some-address">>}

属性

此可选键控制在重新发布消息时将添加哪些其他属性。它采用以下形式:

{properties, [
{content_typle, <<"application/json">>}
]}

可用的键包括message_iduser_idtosubjectreply_tocorrelation_idcontent_typecontent_encodingabsolute_expiry_timecreation_time。有关所有可用键和值,请参阅 AMQP 1.0 规范(第 3.2.4 节)。

应用程序属性

此可选键声明在重新发布消息时要添加的任何其他应用程序属性。它采用以下形式:

{application_properties, [
{<<"application-key-1">>, <<"value-1">>},
{<<"application-key-2">>, <<"value-2">>}
]}

键和值应为二进制字符串,如下例所示。

add_timestamp_header

此布尔键控制是否在重新发布消息之前在其上设置creation_time属性。

{add_timestamp_header, true}

此值为消息被转储时的时间戳(以自纪元以来的秒数表示)。默认情况下,不会设置此属性。

add_forward_headers

设置为 true 时,转储将为以下键添加应用程序属性:shovelled-byshovel-typeshovel-name,以提供有关传输的一些其他元数据。

{add_forward_headers, true}

示例配置

在 AMQP 0.9.1 端点之间的一个相当完整的静态转储配置可能如下所示

{rabbitmq_shovel,
[ {shovels, [ {my_first_shovel,
[ {source,
[ {protocol, amqp091},
{uris, [ "amqp://fred:secret@host1.domain/my_vhost",
"amqp://john:secret@host2.domain/my_vhost" ]},
{declarations, [ {'exchange.declare',
[ {exchange, <<"my_fanout">>},
{type, <<"fanout">>},
durable
]},
{'queue.declare',
[{arguments,
[{<<"x-message-ttl">>, long, 60000}]}]},
{'queue.bind',
[ {exchange, <<"my_fanout">>},
{queue, <<>>}
]}
]},
{queue, <<>>},
{prefetch_count, 10}
]},
{destination,
[ {protocol, amqp091},
{uris, ["amqp://"]},
{declarations, [ {'exchange.declare',
[ {exchange, <<"my_direct">>},
{type, <<"direct">>},
durable
]}
]},
{publish_properties, [ {delivery_mode, 2} ]},
{add_forward_headers, true},
{publish_fields, [ {exchange, <<"my_direct">>},
{routing_key, <<"from_shovel">>}
]}
]},
{ack_mode, on_confirm},
{reconnect_delay, 5}
]}
]}
]}

以上配置定义了一个名为'my_first_shovel'的单个转储。

'my_first_shovel'将连接到host1host2上的代理(作为源),并直接连接到本地代理作为目标。它将在发生故障后延迟 5 秒后重新连接到另一个源代理。

连接到源后,它将声明一个名为"my_fanout"的直接扇出交换机,一个具有每个队列消息 TTL的匿名队列,并将队列绑定到交换机。

连接到目标(本地代理)后,它将声明一个名为"my_direct"的持久直接交换机。

此转储将重新发布发送到源上匿名队列的消息到本地交换机,并使用固定路由键"from_shovel"。消息将是持久的,并且仅在收到本地代理的发布确认后才确认。

如果在任何时刻至少有十条未确认的消息,则转储消费者将不会获得更多传递。

示例配置(1.0 源 - 0.9.1 目标)

在 AMQP 1.0 源和 AMQP 0.9.1 目标之间的一个相当完整的转储配置可能如下所示

{rabbitmq_shovel,
[ {shovels, [ {my_first_shovel,
[ {source,
[ {protocol, amqp10},
{uris, [ "amqp://fred:secret@host1.domain/my_vhost",
]},
{source_address, <<"my-source">>},
{prefetch_count, 10}
]},
{destination,
[ {protocol, amqp091},
{uris, ["amqp://"]},
{declarations, [ {'exchange.declare',
[ {exchange, <<"my_direct">>},
{type, <<"direct">>},
durable
]}
]},
{publish_properties, [ {delivery_mode, 2} ]},
{add_forward_headers, true},
{publish_fields, [ {exchange, <<"my_direct">>},
{routing_key, <<"from_shovel">>}
]}
]},
{ack_mode, on_confirm},
{reconnect_delay, 5}
]}
]}
]}

示例配置(0.9.1 源 - 1.0 目标)

在 AMQP 0.9.1 源和 AMQP 1.0 目标之间更广泛的转储配置可能如下所示

{rabbitmq_shovel,
[{shovels, [{my_first_shovel,
{source,
[{protocol, amqp091},
{uris, ["amqp://fred:secret@host1.domain/my_vhost",
"amqp://john:secret@host2.domain/my_vhost"]},
{declarations, [{'exchange.declare',
[{exchange, <<"my_fanout">>},
{type, <<"fanout">>},
durable]},
{'queue.declare',
[{arguments,
[{<<"x-message-ttl">>, long, 60000}]}]},
{'queue.bind',
[{exchange, <<"my_fanout">>},
{queue, <<>>}
]}
]},
{queue, <<>>},
{prefetch_count, 10}
]},
{destination,
[{protocol, amqp10},
%% Note: for plain text SASL authentication, use
% {uris, ["amqp://user:pass@host:5672?sasl=plain"]},
%% Note: this relies on default user credentials
%% which has remote access restrictions, see
%% ./access-control to learn more
{uris, ["amqp://host:5672"]},
{properties, [{user_id, <<"my-user">>}]},
{application_properties, [{<<"my-prop">>, <<"my-prop-value">>}]},
{add_forward_headers, true},
{target_address, <<"destination-queue">>}
]},
{ack_mode, on_confirm},
{reconnect_delay, 5}
}]}
]}
}
© 2024 RabbitMQ. All rights reserved.