跳到主要内容
版本: 4.1

配置静态 Shovel

概述

本指南重点介绍静态配置的 shovel。它假定您熟悉Shovel 插件背后的关键概念。

动态 shovel不同,静态 shovel 使用高级配置文件进行配置。 它们在节点启动时启动,主要用于永久运行的工作负载。 对静态 shovel 配置的任何更改都需要节点重启,这使得它们非常不灵活。

大多数用户应优先选择动态 shovel而不是静态 shovel,因为它们具有灵活性且易于自动化。 生成动态 shovel 定义(JSON 文档)通常比静态 shovel 定义(使用 Erlang 术语)更容易。

配置

Shovel 插件的配置必须在高级配置文件中定义。

它由一个 shovels 子句组成,该子句列出了应在节点启动时启动的 shovel

{rabbit, [
%% ...
]},

{rabbitmq_shovel, [
{shovels, [
{shovel_one, [
%% shovel_one properties ...
]},
%% ...
]}
]}

下面可以找到一个(特意详细的)配置示例

shovels 子句的每个元素都是一个命名的静态 shovel。 列表中的名称必须不同。

shovel 定义在顶层看起来像这样

{shovel_name, [
{source, [
%% protocol-specific source configuration goes here
]},
{destination, [
%% protocol-specific destination configuration goes here
]},
%% 'confirm' is the default acknowledgement mode
{ack_mode, confirm},
%% reconnect with a 5 second delay
{reconnect_delay, 5}
]}

其中 shovel_name 是 shovel 的名称(Erlang 原子)。 如果名称不是以小写字母开头,或者包含字母数字字符、下划线 (_) 或 @ 以外的其他字符,则应将名称用单引号 (') 括起来。

源和目标

shovel 将消息从源传输到目标。

sourcedestination 键是强制性的,并且包含嵌套的协议特定键。 目前,AMQP 0.9.1 和 AMQP 1.0 是两种受支持的协议。 源和目标不必使用相同的协议。 所有其他属性都是可选的。

source 是一个强制键,并且对于不同的协议具有不同的键属性。 所有协议都通用两个属性:protocolurisprotocol 支持两个值:amqp091amqp10,分别用于 AMQP 0-9-1 和 AMQP 1.0

%% for AMQP 0-9-1
{protocol, amqp091}

urisAMQP 连接 URI 的列表

{uris, [
"amqp://fred:secret@host1.domain/my_vhost",
"amqp://john:secret@host2.domain/my_vhost"
]}

URI 语法已扩展为包含查询部分,以允许配置其他连接参数。 请参阅 查询参数参考,静态 shovel 可以使用这些参数,例如 TLS 证书和私钥。

通用源键

AMQP 0-9-1 和 AMQP 1.0 源都支持某些键。 下表描述了这些键。

通用(协议无关)静态 Shovel 键(属性)
描述
reconnect-delay

在任一端断开连接后,重新连接到 Broker 之前等待的持续时间(以秒为单位)。 默认为 1。

{reconnect_delay, 5}

在失败后重新连接之前将延迟五秒钟。 值 0 表示不重新连接:shovel 将在首次失败或连接尝试不成功后停止。

ack-mode

确定 shovel 应如何确认已消费的消息。 有效值为 on-confirmon-publishno-ack。 默认使用 on-confirm

如果设置为 on-confirm(默认值),则在目标确认消息后,消息将确认给源 Broker。 这可以处理网络错误和 Broker 故障而不会丢失消息,并且是最慢的选项。

如果设置为 on-publish,则在消息已在目标发布(但尚未确认)后,消息将确认给源 Broker。 在发生网络或 Broker 故障时,消息可能会丢失。

如果设置为 no-ack,将使用自动消息确认。 此选项将提供最高的吞吐量,但不安全(在发生网络或 Broker 故障时会丢失消息)。

AMQP 0-9-1 源键

AMQP 0-9-1 特定的源键在单独的表中介绍

AMQP 0-9-1 源键(属性)
描述
declarations

在 Shovel 开始传输消息之前,要由 Shovel 执行的可选 AMQP 0-9-1 操作列表。 它们通常用于设置拓扑。

  {declarations, [
%% declaration list
]}

声明遵循RabbitMQ Erlang 客户端使用的方法和属性名称。

一个最简化的声明示例

  {declarations, [
'queue.declare',
{'queue.bind', [
{exchange, <<"my_exchange">>},
{queue, <<>>}
]}
]}

将首先声明一个匿名队列,然后将其绑定到名为 "my_exchange" 的交换机。 方法 queue.bind<<>> 的队列名称表示“使用此通道上最后声明的队列”。

声明列表的每个元素要么是作为单引号原子给出的 AMQP 0-9-1 方法,例如 'queue.declare',要么是第一个元素为方法原子,第二个元素为参数属性列表的元组。

如果仅使用方法名称,则所有参数都采用其默认值(如上面的 'queue.declare' 所示)。

如果提供元组和属性列表,则列表中的属性显式指定部分或全部参数。

这是另一个示例

{'exchange.declare', [
{exchange, <<"my_exchange">>},
{type, <<"direct">>},
durable
]}

将声明一个持久的、直接交换机,名为 “my_exchange”。

queue

源队列的名称,作为 Erlang 二进制值。 此属性是强制性的

{queue, <<"queue.1">>}

queue.1 是从中 shovel 消息的队列的名称,作为二进制字符串。

此队列必须存在。 使用上面介绍的资源 declarations 来声明队列或确保其存在。 如果值为 <<>>(空二进制字符串),则使用 declarations最近声明的队列。 这允许声明和使用匿名队列。

另请参阅下面的预声明拓扑部分

prefetch-count

在任何给定时间通过 shovel 复制的最大未确认消息数。 默认为 1000

{prefetch_count, 1000}

预声明拓扑

declarations 属性通常用于设置拓扑。 至少,它必须设置源队列。

在某些部署场景中,拓扑会自动从启动时的定义文件导入。 在这些场景中,我们可以配置插件以等待队列可用,方法是将以下行添加到 rabbitmq.conf 文件中

shovel.topology.predeclared = true

通过上述配置,如果静态 shovel 没有 declarations 属性或该属性为空,则插件将等待直到最终声明源的 queue

AMQP 1.0 源键

AMQP 1.0 源设置与 AMQP 0-9-1 源的设置不同。

AMQP 1.0 源键(属性)
描述
source_address

这表示 AMQP 1.0 链接的源地址。 此键是强制性的

{source_address, <<"my-address">>}
prefetch-count

此可选键设置将授予接收链接的链接信用额度。 当信用额度降至该值的 1/10 以下时,信用额度将自动续订。 默认值为 1000。 它采用以下形式

  {prefetch_count, 10}

目标

destination 是一个强制键,并且对于不同的协议具有不同的键属性。 所有协议都通用两个属性:protocolurisprotocol 支持两个值:amqp091amqp10,分别用于 AMQP 0-9-1 和 AMQP 1.0

%% for AMQP 0-9-1
{protocol, amqp091}

urisAMQP 连接 URI 的列表

{uris, [
"amqp://fred:secret@host1.domain/my_vhost",
"amqp://john:secret@host2.domain/my_vhost"
]}

URI 语法已扩展为包含查询部分,以允许配置其他连接参数。 请参阅 查询参数参考,静态 shovel 可以使用这些参数,例如 TLS 证书和私钥。

通用目标键

通用目标键(属性)
描述
reconnect-delay

在任一端断开连接后,重新连接到 Broker 之前等待的持续时间(以秒为单位)。 默认为 1。

{reconnect_delay, 5}

在失败后重新连接之前将延迟五秒钟。 值 0 表示不重新连接:shovel 将在首次失败或连接尝试不成功后停止。

AMQP 0-9-1 目标键

AMQP 0-9-1 目标键(属性)
描述
publish_properties

此可选键控制 shovel 设置或覆盖的消息属性。 它采用以下形式

{publish_properties, [
{delivery_mode, 2}
]}

其中列表中的属性在重新发布之前设置在每个消息的基本属性上。

此特定示例会将所有重新发布的消息标记为持久性

{publish_properties, [
{delivery_mode, 2}
]}

默认情况下,消息的原始属性将保留,但此子句可用于更改或设置任何已知属性

  • content_type
  • content_encoding
  • headers
  • delivery_mode
  • priority
  • correlation_id
  • reply_to
  • expiration
  • message_id
  • timestamp
  • type
  • user_id
  • app_id
  • cluster_id

publish_fields

此可选键类似于 publish_properties,但控制发布设置而不是消费者可以访问的消息属性。 它采用以下形式

{publish_fields, [
{exchange, <<"my_exchange">>},
{routing_key, <<"from_shovel">>}
]}

其中列表中的属性用于设置用于重新发布消息的 basic.publish 方法上的字段

默认情况下,消息使用原始交换机名称和路由键重新发布。 通过指定

{publish_fields, [
{exchange, <<"my_exchange">>},
{routing_key, <<"from_shovel">>}
]}

消息将被重新发布到具有显式固定路由键的显式交换机名称。

add_timestamp_header

此布尔键控制是否在重新发布消息之前将自定义标头 x-shovelled-timestamp 添加到消息中

{add_timestamp_header, true}

此标头值是 shovel 消息时的时间戳(自纪元以来的秒数)。 默认情况下,不添加标头。

add_forward_headers

设置为 true 时,shovel 将添加多个自定义消息标头:shovelled-byshovel-typeshovel-name,以提供有关传输的一些其他元数据。

{add_forward_headers, true}

AMQP 1.0 目标键

AMQP 1.0 目标键(属性)
描述
target_address

这表示发送 AMQP 1.0 链接的目标地址

{target_address, <<"some-address">>}

properties

此可选键控制重新发布消息时将添加哪些其他属性。 它采用以下形式

{properties, [
{content_typle, <<"application/json">>}
]}

可用键包括 message_iduser_idtosubjectreply_tocorrelation_idcontent_typecontent_encodingabsolute_expiry_timecreation_time。 有关所有可用键和值,请参阅 AMQP 1.0 规范 (§3.2.4)。

application_properties

此可选键声明在重新发布消息时要添加的任何其他应用程序属性。 它采用以下形式

{application_properties, [
{<<"application-key-1">>, <<"value-1">>},
{<<"application-key-2">>, <<"value-2">>}
]}

键和值应为二进制字符串,如下例所示。

add_timestamp_header

此布尔键控制是否在重新发布消息之前在消息上设置 creation_time 属性

{add_timestamp_header, true}

此值是 shovel 消息时的时间戳(自纪元以来的秒数)。 默认情况下,不设置该属性。

add_forward_headers

设置为 true 时,shovel 将为以下键添加应用程序属性:shovelled-byshovel-typeshovel-name,以提供有关传输的一些其他元数据。

{add_forward_headers, true}

配置示例

AMQP 0.9.1 端点之间的一个相当完整的静态 shovel 配置可能如下所示

{rabbitmq_shovel,
[ {shovels, [ {my_first_shovel,
[ {source,
[ {protocol, amqp091},
{uris, [ "amqp://fred:secret@host1.domain/my_vhost",
"amqp://john:secret@host2.domain/my_vhost" ]},
{declarations, [ {'exchange.declare',
[ {exchange, <<"my_fanout">>},
{type, <<"fanout">>},
durable
]},
{'queue.declare',
[{arguments,
[{<<"x-message-ttl">>, long, 60000}]}]},
{'queue.bind',
[ {exchange, <<"my_fanout">>},
{queue, <<>>}
]}
]},
{queue, <<>>},
{prefetch_count, 10}
]},
{destination,
[ {protocol, amqp091},
{uris, ["amqp://"]},
{declarations, [ {'exchange.declare',
[ {exchange, <<"my_direct">>},
{type, <<"direct">>},
durable
]}
]},
{publish_properties, [ {delivery_mode, 2} ]},
{add_forward_headers, true},
{publish_fields, [ {exchange, <<"my_direct">>},
{routing_key, <<"from_shovel">>}
]}
]},
{ack_mode, on_confirm},
{reconnect_delay, 5}
]}
]}
]}

上面的配置定义了一个名为 'my_first_shovel' 的 shovel。

'my_first_shovel' 将连接到 host1host2 上的 Broker(作为源),并直接连接到本地 Broker 作为目标。 它将在失败后延迟 5 秒重新连接到另一个源 Broker。

连接到源时,它将声明一个直接的扇出交换机,名为 "my_fanout",一个具有每个队列消息 TTL 的匿名队列,并将队列绑定到交换机。

连接到目标(本地 Broker)时,它将声明一个持久的直接交换机,名为 "my_direct"

此 shovel 将把发送到源上匿名队列的消息重新发布到具有固定路由键 "from_shovel" 的本地交换机。 消息将是持久性的,并且仅在收到来自本地 Broker 的发布确认后才会被确认。

如果任何时候至少有十个未确认的消息,则 shovel 消费者将不会获得更多交付。

配置示例(1.0 源 - 0.9.1 目标)

AMQP 1.0 源和 AMQP 0.9.1 目标之间的一个相当完整的 shovel 配置可能如下所示

{rabbitmq_shovel,
[ {shovels, [ {my_first_shovel,
[ {source,
[ {protocol, amqp10},
{uris, [ "amqp://fred:secret@host1.domain/my_vhost",
]},
{source_address, <<"my-source">>},
{prefetch_count, 10}
]},
{destination,
[ {protocol, amqp091},
{uris, ["amqp://"]},
{declarations, [ {'exchange.declare',
[ {exchange, <<"my_direct">>},
{type, <<"direct">>},
durable
]}
]},
{publish_properties, [ {delivery_mode, 2} ]},
{add_forward_headers, true},
{publish_fields, [ {exchange, <<"my_direct">>},
{routing_key, <<"from_shovel">>}
]}
]},
{ack_mode, on_confirm},
{reconnect_delay, 5}
]}
]}
]}

配置示例(0.9.1 源 — 1.0 目标)

AMQP 0.9.1 源和 AMQP 1.0 目标之间的一个更广泛的 shovel 配置可能如下所示

{rabbitmq_shovel,
[{shovels, [{my_first_shovel,
{source,
[{protocol, amqp091},
{uris, ["amqp://fred:secret@host1.domain/my_vhost",
"amqp://john:secret@host2.domain/my_vhost"]},
{declarations, [{'exchange.declare',
[{exchange, <<"my_fanout">>},
{type, <<"fanout">>},
durable]},
{'queue.declare',
[{arguments,
[{<<"x-message-ttl">>, long, 60000}]}]},
{'queue.bind',
[{exchange, <<"my_fanout">>},
{queue, <<>>}
]}
]},
{queue, <<>>},
{prefetch_count, 10}
]},
{destination,
[{protocol, amqp10},
%% Note: for plain text SASL authentication, use
% {uris, ["amqp://user:pass@host:5672?sasl=plain"]},
%% Note: this relies on default user credentials
%% which has remote access restrictions, see
%% ./access-control to learn more
{uris, ["amqp://host:5672"]},
{properties, [{user_id, <<"my-user">>}]},
{application_properties, [{<<"my-prop">>, <<"my-prop-value">>}]},
{add_forward_headers, true},
{target_address, <<"destination-queue">>}
]},
{ack_mode, on_confirm},
{reconnect_delay, 5}
}]}
]}
}
© . All rights reserved.