跳至主内容

RabbitMQ AMQP 0.9.1 Erlang 客户端库

概述

本指南涵盖了 RabbitMQ 的 Erlang 客户端(AMQP 0-9-1)。

本用户指南假定读者熟悉 AMQP 0-9-1 的基本概念

有关更多详细信息,请参阅有关 连接通道队列发布者消费者 的指南,以了解这些关键的 RabbitMQ 概念。

本指南涵盖的一些主题包括

等等。

依赖项

客户端库名为 amqp_client,并通过 Hex.pm 分发,其主要依赖项是 rabbit-common

以下是用于流行构建工具 Mix、Rebar 3erlang.mk 的依赖项代码片段。

Mix

{:rabbit_common, "~> 3.8"}

Rebar 3

{rabbit_common, "&version-erlang-client;"}

erlang.mk

dep_rabbit_common = hex &version-erlang-client;

基本用法

客户端的基本用法遵循以下大致步骤

  1. 确保 amqp_client Erlang 应用程序已启动
  2. 建立到 RabbitMQ 节点的连接
  3. 在连接上打开一个新通道
  4. 使用通道执行AMQP 0-9-1 命令,例如声明交换器和队列、定义它们之间的绑定、发布消息、注册消费者(订阅)等
  5. 注册可选的事件处理程序,例如返回消息处理程序
  6. 不再需要时,关闭通道和连接

amqp_client 应用程序

RabbitMQ Erlang 客户端是一个名为 amqp_client 的 Erlang 应用程序。

与任何 Erlang 应用程序一样,要开始使用客户端,首先需要确保它已启动。

application:ensure_started(amqp_client).

关键模块和概念

客户端库中的两个主要模块是

  • amqp_connection,用于打开到 RabbitMQ 节点的连接并在其上打开通道
  • amqp_channel,它公开了大多数 AMQP 0-9-1 操作,例如队列声明或消费者注册

一旦建立连接并成功认证,并打开了通道,应用程序通常会使用 amqp_channel:call/{2,3}amqp_channel:cast/{2,3} 函数以及 AMQP 0-9-1 协议方法记录来执行大多数操作。

其他几个模块使应用程序能够响应某些事件。这些将在本指南稍后介绍。

该库由两层组成

  • 遵循 AMQP 0-9-1 协议和操作执行模型的高层逻辑层
  • 负责与 RabbitMQ 节点通信的底层协议实现层

网络连接类型

AMQP 0-9-1 客户端使用 TCP 连接到 RabbitMQ。一个 AMQP 0-9-1 连接底层使用一个 TCP 连接。但是,Erlang 客户端独一无二,它提供了一种与 RabbitMQ 节点通信的替代方法。

网络客户端

与其他客户端类似,该库提供了一个基于 TCP 的客户端,它使用 TCP 连接将序列化的协议帧传输到服务器。这个客户端称为网络客户端,大多数应用程序都应该使用。

要使用网络客户端,请使用 amqp_client:start/1 启动连接,并将参数设置为 #amqp_params_network 记录。

直接(Erlang 分布式)客户端

或者,可以使用 Erlang 分布式连接而不是单独的 TCP 连接。此通信方法假定使用该客户端的应用程序运行在与 RabbitMQ 节点相同的 Erlang 集群上。

直接客户端的使用应仅限于与 RabbitMQ 并行部署的应用程序。 ShovelFederation 插件就是此类应用程序的两个示例。

在大多数其他情况下,开发人员应首选上面介绍的更传统的网络客户端。对于不熟悉 Erlang 的操作员和开发人员来说,它更容易理解。

要使用直接驱动程序,请使用 amqp_connection:start/1 启动连接,并将参数设置为 #amqp_params_direct 记录。

包含头文件

Erlang 客户端使用许多记录定义,您将在本指南中遇到它们。这些记录分为两大类

  • 生成的 AMQP 0-9-1 方法定义
  • 客户端中常用的数据结构定义

要访问这些记录,您需要在每个使用 Erlang 客户端的模块中包含 amqp_client.hrl。

-include("amqp_client.hrl").

连接到 RabbitMQ

amqp_connection 模块用于启动到 RabbitMQ 节点的连接。在本例中,我们将使用网络连接,这是大多数用例推荐的选项。

{ok, Connection} = amqp_connection:start(#amqp_params_network{})

此函数返回 {ok, Connection} 对,其中 Connection 是维护永久连接的进程的 pid。此 pid 将用于在连接上打开通道和关闭连接

如果发生错误,上述调用将返回 {error, Error} 对。

#amqp_params_network 记录设置了以下默认值

参数默认值
用户名guest
密码guest
虚拟主机/
主机localhost
端口5672
channel_max2047
frame_max0
heartbeat0
ssl_optionsnone
auth_mechanisms[fun amqp_auth_mechanisms:plain/3, fun amqp_auth_mechanisms:amqplain/3]
client_properties[]

这些值只是默认值,在同一主机上运行的开箱即用的 RabbitMQ 节点上有效。如果目标节点或环境已配置不同,可以覆盖这些值以匹配实际部署场景。

TLS 选项也可以使用 amqp_client 应用程序的 ssl_options 环境键进行全局指定。它们将与 URI 中的 TLS 参数合并(后者具有优先权)。

直接(Erlang 分布式)客户端

部署在与 RabbitMQ 相同 Erlang 集群内的应用程序(例如 RabbitMQ 插件)可以启动一个直接连接,该连接绕过网络序列化,并依赖 Erlang 分布进行数据传输。

要启动直接连接,请使用 amqp_connection:start/1,并将参数设置为 #amqp_params_direct 记录。

{ok, Connection} = amqp_connection:start(#amqp_params_direct{})

对于直接连接,凭据是可选的,因为 Erlang 分布依赖于 共享密钥(Erlang cookie)进行身份验证。

如果提供了用户名和密码,则它们将用于身份验证并提供给身份验证后端。

如果仅提供了用户名,则用户被视为受信任并无条件登录。

如果未提供用户名或密码,则连接将被视为来自完全受信任的用户,该用户可以连接到任何虚拟主机并拥有完整的权限

#amqp_params_direct 记录设置了以下默认值

参数默认值
用户名none
密码none
虚拟主机/
节点node()
client_properties[]

使用 AMQP URI 连接到 RabbitMQ

而不是直接处理 #amqp_params_network 等记录,可以使用 AMQP URI

为此提供了 amqp_uri:parse/1 函数。它解析 URI 并返回等效的 #amqp_params_network#amqp_params_direct 记录。

与规范不同,如果省略主机名,则假定连接是直接的,并返回 #amqp_params_direct{} 记录。除了标准的 host、port、user、password 和 vhost 参数外,还可以通过查询字符串指定额外的参数(例如,“?heartbeat=5”来配置心跳超时)。

创建通道

一旦建立连接,就可以使用 amqp_connection 模块打开一个或多个通道,这些通道将用于定义拓扑、发布和消费消息。

{ok, Channel} = amqp_connection:open_channel(Connection)

此函数接受连接进程的 pid,并返回 {ok, Channel} 对,其中 Channel 是代表通道的 pid,将用于执行协议命令。

使用 AMQP 0-9-1 方法(协议操作)

客户端库与 RabbitMQ 节点交互的主要方式是通过发送和处理AMQP 0-9-1 方法(在本指南中也称为“命令”),这些方法由记录表示。

客户端尝试为每条记录使用合理的默认值。例如,在使用 #'exchange.declare'{} 方法声明瞬态交换器时,仅指定名称就足够了。

#'exchange.declare'{exchange = <<"my_exchange">>}

上面的示例等同于这个

#'exchange.declare'{exchange    = <<"my_exchange">>,
type = <<"direct">>,
passive = false,
durable = false,
auto_delete = false,
internal = false,
nowait = false,
arguments = []}

定义拓扑:交换器、队列、绑定

一旦建立通道,就可以使用 amqp_channel 模块来管理 AMQP 中的基本对象,即交换器和队列。以下函数创建一个名为 my_exchange 的交换器,默认情况下它是直连交换器。

Declare = #'exchange.declare'{exchange = <<"my_exchange">>},
#'exchange.declare_ok'{} = amqp_channel:call(Channel, Declare)

类似地,此代码创建了一个名为 my_queue瞬态队列。

Declare = #'queue.declare'{queue = <<"my_queue">>},
#'queue.declare_ok'{} = amqp_channel:call(Channel, Declare)

声明持久化队列

Declare = #'queue.declare'{
queue = <<"my_queue">>,
durable = true
},
#'queue.declare_ok'{} = amqp_channel:call(Channel, Declare)

在某些情况下,应用程序希望使用瞬态队列,并且不关心队列的实际名称。在这种情况下,可以让代理生成队列的随机名称。为此,请使用 #'queue.declare'{} 方法,并将队列属性留空。指定空字符串作为队列名称效果相同。

#'queue.declare_ok'{queue = Queue} = amqp_channel:call(Channel, #'queue.declare'{})

服务器将生成一个在此集群中唯一的队列名称,并将此名称作为确认的一部分返回。

绑定

要创建从交换器到队列的路由规则,请使用 #'queue.bind'{} 命令。

Binding = #'queue.bind'{queue       = Queue,
exchange = Exchange,
routing_key = RoutingKey},
#'queue.bind_ok'{} = amqp_channel:call(Channel, Binding)

当不再需要此路由规则时,可以使用 #'queue.unbind'{} 命令删除此路由。

Binding = #'queue.unbind'{queue       = Queue,
exchange = Exchange,
routing_key = RoutingKey},
#'queue.unbind_ok'{} = amqp_channel:call(Channel, Binding)

删除实体

可以使用 #'exchange.delete'{} 命令删除交换器。

Delete = #'exchange.delete'{exchange = <<"my_exchange">>},
#'exchange.delete_ok'{} = amqp_channel:call(Channel, Delete)

类似地,使用 #'queue.delete'{} 命令删除队列。

Delete = #'queue.delete'{queue = <<"my_queue">>},
#'queue.delete_ok'{} = amqp_channel:call(Channel, Delete)

同步和异步协议方法、调用和投递

请注意,上面的示例使用 amqp_channel:call/2。这是因为它们使用了产生响应的同步 AMQP 0-9-1 方法(与称为异步方法的一组方法不同)。

通常建议使用 amqp_channel:call/{2,3} 处理同步方法,而不是 amqp_channel:cast/{2,3},即使两个函数都适用于同步和异步方法。

两个函数之间的一个区别是,amqp_channel:call/{2,3} 会阻止调用进程,直到响应从服务器返回(对于同步方法)或方法已发送到网络(对于异步方法),而 amqp_channel:cast/{2,3} 会立即返回“ok”。

因此,只有通过使用 amqp_channel:call/{2,3},我们才能验证服务器已确认我们的命令。

发布消息

要将消息发布到具有特定路由键的交换器,请使用 #'basic.publish'{} 方法。消息使用 #amqp_msg{} 记录表示。

Payload = <<"foobar">>,
Publish = #'basic.publish'{exchange = X, routing_key = Key},
amqp_channel:cast(Channel, Publish, #amqp_msg{payload = Payload})

默认情况下,#amqp_msg{} 记录的 properties 字段包含一组最小的消息属性,作为 #'P_basic'{} properties 记录。

如果应用程序需要覆盖任何默认值,例如发送持久化消息,则需要相应地构造 #amqp_msg{}

Payload = <<"foobar">>,
Publish = #'basic.publish'{exchange = X, routing_key = Key},
Props = #'P_basic'{delivery_mode = 2}, %% persistent message
Msg = #amqp_msg{props = Props, payload = Payload},
amqp_channel:cast(Channel, Publish, Msg)

消息属性的完整列表可以在发布者指南中找到。

AMQP 0-9-1 #'basic.publish' 方法是异步的:服务器不会对其进行响应。但是,客户端可以选择让无法路由的消息返回给它们。这在关于返回消息处理程序的部分进行了描述。

上面的示例未使用发布者确认。要等待所有未完成的发布在发布一批消息后得到确认,请使用 amqp_channel:wait_for_confirms/2 函数。它将返回 true(如果所有未完成的发布都已成功确认)或 timeout(如果发生超时)。

请注意,每次发布消息后等待非常低效且不必要。更优化的方法是发布一批消息并等待其确认。如果某些发布未及时确认,则可以重新发布整个最后一批。

消费者:使用“推送 API”订阅队列

应用程序可以订阅以接收路由到队列的消息。这种“推送 API”是推荐的消息消费方式(另一种方式是轮询,应尽量避免)。

要将消费者添加到队列(订阅队列),可以使用 #'basic.consume'{} 方法,有两种方式

#'basic.consume_ok'{consumer_tag = Tag} =
amqp_channel:subscribe(Channel, #'basic.consume'{queue = Q}, Consumer)

%% A consumer process is not provided so the calling
%% process (`self()`) will be the consumer
#'basic.consume_ok'{consumer_tag = Tag} =
amqp_channel:call(Channel, #'basic.consume'{queue = Q})

consumer 参数是要将消息传递到的进程的 pid。这可以是任意 Erlang 进程,包括发起连接的进程。#'basic.consume_ok'{} 返回包含一个 consumer tag。该 tag 是一个消费者(订阅)标识符,用于取消消费者。

这将在稍后用于取消消费者。此通知将发送到创建订阅的进程(作为 amqp_channel:subscribe/3 的返回值)以及发送到消费者进程的消息。

当消费者进程订阅队列时,它会在其邮箱中接收消息。一个示例接收循环如下所示。

loop(Channel) ->
receive
%% This is the first message received
#'basic.consume_ok'{} ->
loop(Channel);

%% This is received when the subscription is cancelled
#'basic.cancel_ok'{} ->
ok;

%% A delivery
{#'basic.deliver'{delivery_tag = Tag}, Content} ->
%% Do something with the message payload
%% (some work here)

%% Ack the message
amqp_channel:cast(Channel, #'basic.ack'{delivery_tag = Tag}),

%% Loop
loop(Channel)
end.

在上面的示例中,进程处理消费者注册(订阅)通知,然后继续等待传递消息到达其进程邮箱。

收到消息后,循环会执行有用的操作并将确认发送回服务器。如果消费者被取消,则会向消费者进程发送取消通知。在这种情况下,接收循环只是退出。如果应用程序不希望显式确认消息接收,可以使用自动确认模式。为此,将 #'basic.consume' 记录的 no_ack 属性设置为 true。在自动确认模式下,消费者不确认传递:RabbitMQ 将在发送它们通过连接后立即认为它们已传递。

取消消费者

要取消消费者,请使用 #'basic.consume_ok'{} 响应返回的 consumer tag。

amqp_channel:call(Channel, #'basic.cancel'{consumer_tag = Tag})

被取消的消费者仍可能收到“正在传输”的传递,例如在消费者取消时位于 TCP 缓冲区中的那些。但是,最终——通常在消费者取消后不久——将不再向其处理进程进行传递。

消费者实现

通道使用实现 amqp_gen_consumer 行为的模块来确定如何处理消费者事件。实际上,此模块负责客户端端的消费者注册,并确保将传递路由到适当的消费者。

例如,默认消费者模块 amqp_selective_consumer 会跟踪哪些进程订阅了哪些队列,并相应地路由传递;此外,如果通道将传递给未知的消费者,它会将传递给默认消费者(如果已注册)。

相比之下,amqp_direct_consumer 只是将它从通道接收到的所有消息转发给它唯一注册的消费者。

通道的消费者模块在打开通道时通过将第二个参数设置为 amqp_connection:open_channel/2 来选择。

消费者模块实现 amqp_gen_consumer 行为,因此实现处理接收 basic.consumebasic.consume_okbasic.cancelbasic.cancel_ok 方法以及传递已发布消息的函数。

关闭通道和连接

当不再需要通道时,客户端应关闭它。这通过 amqp_channel:close/1 实现。

amqp_channel:close(Channel)

要关闭连接,请使用 amqp_connection:close/1

amqp_connection:close(Connection)

关闭连接将自动隐式关闭该连接上的所有通道。

Both the #'channel.close'and #'connection.close'命令接受 reply_code(整数)和 reply_text(二进制)参数,这些参数可以由客户端根据关闭通道或连接的原因进行设置。

在大多数情况下,reply_code 应设置为 200 以表示正常关闭。reply_text 属性只是一个任意字符串,服务器可能会也可能不会记录。如果客户端希望将其设置为不同的回复代码和/或文本,则可以使用分别为 amqp_channel:close/3amqp_connection:close/3 的重载函数。

传递流控制

默认情况下,通道内除了正常的 TCP 反压之外没有其他流控制。消费者可以设置代理为单个通道上的未确认消息维护的预取缓冲区大小。这是使用 #'basic.qos' 实现的。命令

amqp_channel:call(Channel, #'basic.qos'{prefetch_count = Prefetch})

建议应用程序使用预取。在发布者确认和消费者确认指南中了解更多信息。

被阻止的连接

当节点检测到其可用资源低于某个阈值时,它可能会选择停止从发布者的网络套接字读取

RabbitMQ 支持允许客户端被告知这种情况已发生的机制

使用 amqp_connection:register_blocked_handler/2,提供一个进程的 pid,该进程将接收 #'connection.blocked'{}#'connection.unblocked'{} 消息。

处理返回的消息

代理会将无法传递的消息返回给原始客户端。这些是发布时设置了 immediate 或 mandatory 标志的消息。为了让应用程序收到返回通知,它必须注册一个可以处理 #'basic.return'{} 帧的回调进程。

这是不可路由消息处理的一个示例。

amqp_channel:register_return_handler(Channel, self()),
amqp_channel:call(Channel, #'exchange.declare'{exchange = X}),
Publish = #'basic.publish'{exchange = X, routing_key = SomeKey,
mandatory = true},
amqp_channel:call(Channel, Publish, #amqp_msg{payload = Payload}),
receive
{BasicReturn, Content} ->
#'basic.return'{reply_text = <<"unroutable">>, exchange = X} = BasicReturn
%% Do something with the returned message
end

使用“Fetch API”接收消息

也可以按需检索单个消息(“pull API”又称轮询)。这种消费方法效率极低,因为它实际上是在轮询,应用程序需要反复请求结果,即使绝大多数请求都没有产生任何结果。因此,强烈不鼓励使用这种方法。

这是使用 #'basic.get'{} 命令实现的。

Get = #'basic.get'{queue = Q, no_ack = true},
{#'basic.get_ok'{}, Content} = amqp_channel:call(Channel, Get),
#amqp_msg{payload = Payload} = Content

返回的 payload 是一个 Erlang 二进制文件,应用程序需要自行解码,因为此内容的结构对客户端库和服务器都是不透明的。

如果在调用 #'basic.get'{} 命令时队列为空,则通道将返回 #'basic.get_empty' 结果,如下所示。

#'basic.get_empty'{} = amqp_channel:call(Channel, Get)

请注意,前面的示例在 #'basic.get'{} 命令上设置了 no_ack 标志。这告诉代理接收方将不会发送消息确认。这样做,代理可以免除传递的责任——一旦它认为已传递消息,它就可以假定消费应用程序已承担了责任。通常,许多应用程序不会想要这些语义,而是希望显式确认消息的接收。这是通过 #'basic.ack' 实现的。命令,其中 no_ack 字段默认关闭。

Get = #'basic.get'{queue = Q},
{#'basic.get_ok'{delivery_tag = Tag}, Content}
= amqp_channel:call(Channel, Get),
%% Do something with the message payload.......and then ack it
amqp_channel:cast(Channel, #'basic.ack'{delivery_tag = Tag})

请注意,#'basic.ack'{} 方法是使用 amqp_channel:cast/2 而不是 amqp_channel:call/2 发送的。这是因为确认是完全异步的,服务器不会对其产生响应。

一个基本示例

下面是库基本用法的一个完整示例。为简单起见,它没有使用发布者确认,而是使用了一个执行手动确认轮询消费者

-module(amqp_example).

-include("amqp_client.hrl").

-compile([export_all]).

test() ->
%% Start a network connection
{ok, Connection} = amqp_connection:start(#amqp_params_network{}),
%% Open a channel on the connection
{ok, Channel} = amqp_connection:open_channel(Connection),

%% Declare a queue
#'queue.declare_ok'{queue = Q}
= amqp_channel:call(Channel, #'queue.declare'{}),

%% Publish a message
Payload = <<"foobar">>,
Publish = #'basic.publish'{exchange = <<>>, routing_key = Q},
amqp_channel:cast(Channel, Publish, #amqp_msg{payload = Payload}),

%% Poll for a message
Get = #'basic.get'{queue = Q},
{#'basic.get_ok'{delivery_tag = Tag}, Content}
= amqp_channel:call(Channel, Get),

%% Do something with the message payload
%% (some work here)

%% Ack the message
amqp_channel:cast(Channel, #'basic.ack'{delivery_tag = Tag}),

%% Close the channel
amqp_channel:close(Channel),
%% Close the connection
amqp_connection:close(Connection),

ok.

在此示例中,创建了一个具有服务器生成名称的队列,并将消息直接发布到该队列。这利用了每个队列都通过其自身的队列名称绑定到默认交换器的这一事实。然后,消息被出队并确认。

将代码编译为依赖项

客户端构建过程会生成两个部署存档

  • amqp_client.ez,包含所有客户端库模块
  • rabbit_common.ez,包含运行时所需的服务器的公共模块

可以使用 Rebar 3erlang.mk 等构建工具来配置这两个依赖项。

为方便举例,我们假设使用的依赖管理工具将依赖项编译在 ./deps 目录下。

然后,可以使用 erlc 手动编译示例代码,并将 ERL_LIBS 指向 ./deps 目录。

ERL_LIBS=deps erlc -o ebin amqp_example.erl

然后,可以通过以下方式设置 Erlang 运行时来运行您的应用程序。

ERL_LIBS=deps erl -pa ebin
# => Erlang/OTP 23 [erts-11.0] [source] [64-bit] [smp:8:8] [ds:8:8:10] [async-threads:16]
# =>
# => Eshell V11.0 (abort with ^G)
# => 1> amqp_example:test().
# => ok
# => 2>
© . This site is unofficial and not affiliated with VMware.