跳到主要内容

Erlang RabbitMQ 客户端库

概述

本指南介绍了 RabbitMQ 的 Erlang 客户端 (AMQP 0-9-1)。

本用户指南假定读者熟悉 AMQP 0-9-1 的基本概念

请参阅关于连接通道队列发布者消费者的指南,以了解关于这些关键 RabbitMQ 概念的更多细节。

本指南涵盖的一些主题包括

等等。

依赖

客户端库名为 amqp_client,并通过 Hex.pm 与其关键依赖项 rabbit-common 一起分发。

以下是与流行的构建工具 Mix、Rebar 3erlang.mk 一起使用的依赖项代码片段。

Mix

{:rabbit_common, "~> 3.8"}

Rebar 3

{rabbit_common, "&version-erlang-client;"}

erlang.mk

dep_rabbit_common = hex &version-erlang-client;

基础

客户端的基本用法遵循以下大致步骤

  1. 确保 amqp_client Erlang 应用程序已启动
  2. 建立与 RabbitMQ 节点的连接
  3. 在连接上打开一个新通道
  4. 使用通道执行 AMQP 0-9-1 命令,例如声明交换机和队列、定义它们之间的绑定、发布消息、注册消费者(订阅)等等
  5. 注册可选的事件处理程序,例如返回消息处理程序
  6. 在不再需要时,关闭通道和连接

amqp_client 应用程序

RabbitMQ Erlang 客户端是一个名为 amqp_client 的 Erlang 应用程序。

与任何 Erlang 应用程序一样,要开始使用客户端,首先必须确保它已启动

application:ensure_started(amqp_client).

关键模块和概念

客户端库中的两个主要模块是

  • amqp_connection,用于打开与 RabbitMQ 节点的连接并在其上打开通道
  • amqp_channel,它公开了大多数 AMQP 0-9-1 操作,例如队列声明或消费者注册

一旦连接建立并成功身份验证,并且通道已打开,应用程序通常将使用 amqp_channel:call/{2,3}amqp_channel:cast/{2,3} 函数以及 AMQP 0-9-1 协议方法记录来执行大多数操作。

几个额外的模块使应用程序可以对某些事件做出反应。这些将在本指南的后面部分介绍。

该库由两层组成

  • 一个高级逻辑层,遵循 AMQP 0-9-1 协议和操作执行模型
  • 一个低级协议实现层,负责与 RabbitMQ 节点通信

网络连接类型

AMQP 0-9-1 客户端使用 TCP 连接到 RabbitMQ。一个 AMQP 0-9-1 连接在底层使用一个 TCP 连接。但是,Erlang 客户端是独一无二的,因为它提供了一种与 RabbitMQ 节点通信的替代方法。

网络客户端

与其他客户端非常相似,此库提供了一个基于 TCP 的客户端,该客户端使用 TCP 连接将序列化的协议帧传输到服务器。此客户端称为网络客户端,大多数应用程序都应使用它。

要使用网络客户端,请使用参数设置为 #amqp_params_network 记录的 amqp_connection:start/1 启动连接

直接(Erlang 分布式)客户端

或者,可以使用 Erlang 分布式连接来代替单独的 TCP 连接。这种通信方法假定使用客户端的应用程序与 RabbitMQ 节点在同一个 Erlang 集群上运行。

直接客户端的使用应限于与 RabbitMQ 并排部署的应用程序。ShovelFederation 插件是此类应用程序的两个示例。

在大多数其他情况下,开发人员应首选上面介绍的更传统的网络客户端。对于不熟悉 Erlang 的操作员和开发人员来说,这将更容易理解。

要使用直接驱动程序,请使用参数设置为 #amqp_params_direct 记录的 amqp_connection:start/1 启动连接

包含头文件

Erlang 客户端使用许多记录定义,您将在本指南中遇到这些定义。这些记录分为两大类

  • 生成的 AMQP 0-9-1 方法定义
  • 客户端中常用的数据结构的定义

要访问这些记录,您需要在每个使用 Erlang 客户端的模块中包含 amqp_client.hrl

-include("amqp_client.hrl").

连接到 RabbitMQ

amqp_connection 模块用于启动与 RabbitMQ 节点的连接。在此示例中,我们将使用网络连接,这是大多数用例的推荐选项

{ok, Connection} = amqp_connection:start(#amqp_params_network{})

此函数返回 {ok, Connection} 对,其中 Connection 是维护持久连接的进程的 pid。此 pid 将用于在连接上打开通道和关闭连接

如果发生错误,则以上调用返回 {error, Error} 对。

#amqp_params_network 记录设置以下默认值

参数默认值
用户名guest
密码guest
虚拟主机/
主机localhost
端口5672
channel_max2047
frame_max0
心跳0
ssl_optionsnone
auth_mechanisms[fun amqp_auth_mechanisms:plain/3, fun amqp_auth_mechanisms:amqplain/3]
client_properties[]

这些值只是默认值,这些默认值适用于在同一主机上运行的开箱即用的 RabbitMQ 节点。如果目标节点或环境已配置为不同,则可以覆盖这些值以匹配实际的部署方案。

TLS 选项也可以使用 amqp_client 应用程序的 ssl_options 环境变量全局指定。它们将与来自 URI 的 TLS 参数合并(后者优先)。

直接(Erlang 分布式)客户端

与 RabbitMQ 部署在同一 Erlang 集群内部的应用程序(例如 RabbitMQ 插件)可以启动直接连接,该连接绕过网络序列化并依赖 Erlang 分布进行数据传输。

要启动直接连接,请使用参数设置为 #amqp_params_direct 记录的 amqp_connection:start/1

{ok, Connection} = amqp_connection:start(#amqp_params_direct{})

凭据对于直接连接是可选的,因为 Erlang 分布依赖于 共享密钥,即 Erlang Cookie,进行身份验证。

如果提供了用户名和密码,则它们将用于身份验证,并提供给身份验证后端。

如果仅提供用户名,则认为用户是受信任的并无条件登录。

如果既未提供用户名也未提供密码,则连接将被视为来自完全受信任的用户,该用户可以连接到任何虚拟主机并具有完全权限

#amqp_params_direct 记录设置以下默认值

参数默认值
用户名none
密码none
虚拟主机/
节点node()
client_properties[]

使用 AMQP URI 连接到 RabbitMQ

除了直接使用 #amqp_params_network 等记录外,还可以使用AMQP URI

为此目的提供了 amqp_uri:parse/1 函数。它解析 URI 并返回等效的 #amqp_params_network#amqp_params_direct 记录。

与规范不同,如果省略了主机名,则假定连接是直接连接,并返回 #amqp_params_direct{} 记录。除了标准的主机、端口、用户、密码和 vhost 参数外,还可以通过查询字符串指定额外的参数(例如,“?heartbeat=5” 以配置心跳超时)。

创建通道

建立连接后,使用 amqp_connection 模块打开一个或多个通道,这些通道将用于定义拓扑、发布和消费消息

{ok, Channel} = amqp_connection:open_channel(Connection)

此函数采用连接进程的 pid,并返回 {ok, Channel} 对,其中 Channel 是表示通道的 pid,并将用于执行协议命令。

使用 AMQP 0-9-1 方法(协议操作)

客户端库与 RabbitMQ 节点交互的主要方式是发送和处理 AMQP 0-9-1 方法(在本指南中也称为“命令”),这些方法由记录表示。

客户端尝试为每个记录使用合理的默认值。例如,当使用 #'exchange.declare'{} 方法声明瞬态交换机时,仅需指定名称就足够了

#'exchange.declare'{exchange = <<"my_exchange">>}

上面的示例等效于此

#'exchange.declare'{exchange    = <<"my_exchange">>,
type = <<"direct">>,
passive = false,
durable = false,
auto_delete = false,
internal = false,
nowait = false,
arguments = []}

定义拓扑:交换机、队列、绑定

建立通道后,可以使用 amqp_channel 模块来管理 AMQP 中的基本对象,即交换机和队列。以下函数创建一个名为 my_exchange 的交换机,默认情况下,它是直接交换机

Declare = #'exchange.declare'{exchange = <<"my_exchange">>},
#'exchange.declare_ok'{} = amqp_channel:call(Channel, Declare)

类似地,此代码创建一个名为 my_queue瞬态队列

Declare = #'queue.declare'{queue = <<"my_queue">>},
#'queue.declare_ok'{} = amqp_channel:call(Channel, Declare)

要声明持久队列

Declare = #'queue.declare'{
queue = <<"my_queue">>,
durable = true
},
#'queue.declare_ok'{} = amqp_channel:call(Channel, Declare)

在某些情况下,应用程序想要使用瞬态队列,并且对队列的实际名称不感兴趣。在这种情况下,可以使 broker 为队列生成随机名称。为此,请使用 #'queue.declare'{} 方法,并将队列属性保留为未定义。为队列名称指定空字符串将具有相同的效果。

#'queue.declare_ok'{queue = Queue} = amqp_channel:call(Channel, #'queue.declare'{})

服务器将生成在此集群中唯一的队列名称,并将此名称作为确认的一部分返回。

绑定

要创建从交换机到队列的路由规则,请使用 #'queue.bind'{} 命令

Binding = #'queue.bind'{queue       = Queue,
exchange = Exchange,
routing_key = RoutingKey},
#'queue.bind_ok'{} = amqp_channel:call(Channel, Binding)

当不再需要此路由规则时,可以使用 #'queue.unbind'{} 命令删除此路由

Binding = #'queue.unbind'{queue       = Queue,
exchange = Exchange,
routing_key = RoutingKey},
#'queue.unbind_ok'{} = amqp_channel:call(Channel, Binding)

删除实体

可以使用 #'exchange.delete'{} 命令删除交换机

Delete = #'exchange.delete'{exchange = <<"my_exchange">>},
#'exchange.delete_ok'{} = amqp_channel:call(Channel, Delete)

类似地,使用 #'queue.delete'{} 命令删除队列

Delete = #'queue.delete'{queue = <<"my_queue">>},
#'queue.delete_ok'{} = amqp_channel:call(Channel, Delete)

同步和异步协议方法、调用和转换

请注意,以上示例使用 amqp_channel:call/2。这是因为它们使用生成响应的同步 AMQP 0-9-1 方法(与一组称为异步方法的方法不同)。

通常建议对同步方法使用 amqp_channel:call/{2,3},而不是 amqp_channel:cast/{2,3},即使这两个函数都适用于同步和异步方法。

这两个函数之间的一个区别是,amqp_channel:call/{2,3} 会阻止调用进程,直到收到来自服务器的回复(对于同步方法)或方法已在线路上发送(对于异步方法),而 amqp_channel:cast/{2,3} 会立即返回“ok”。

因此,只有使用 amqp_channel:call/{2,3},我们才能验证服务器是否已确认我们的命令。

发布消息

要使用特定的路由密钥将消息发布到交换机,请使用 #'basic.publish'{} 方法。消息使用 #amqp_msg{} 记录表示

Payload = <<"foobar">>,
Publish = #'basic.publish'{exchange = X, routing_key = Key},
amqp_channel:cast(Channel, Publish, #amqp_msg{payload = Payload})

默认情况下,#amqp_msg{} 记录的 properties 字段包含最少的消息属性,作为 #'P_basic'{} 属性记录。

如果应用程序需要覆盖任何默认值,例如,发送持久消息,则需要相应地构造 #amqp_msg{}

Payload = <<"foobar">>,
Publish = #'basic.publish'{exchange = X, routing_key = Key},
Props = #'P_basic'{delivery_mode = 2}, %% persistent message
Msg = #amqp_msg{props = Props, payload = Payload},
amqp_channel:cast(Channel, Publish, Msg)

完整的消息属性列表可以在发布者指南中找到。

AMQP 0-9-1 #'basic.publish' 方法是异步的:服务器不会向其发送响应。但是,客户端可以选择让无法路由的消息返回给它们。这在关于返回消息处理程序的部分中描述。

以上示例未使用发布者确认。要在发布一批消息后等待所有未完成的发布得到确认,请使用 amqp_channel:wait_for_confirms/2 函数。如果所有未完成的发布都已成功确认,它将返回 true,如果发生超时,则返回 timeout

请注意,在每条发布的消息后等待是非常低效且不必要的。更优化的方法是发布一批消息并等待它们的确认。如果某些发布未及时确认,则可以重新发布整个最后一批消息。

消费者:使用 “推送 API” 订阅队列

应用程序可以订阅以接收路由到队列的消息。这种“推送 API” 是推荐的消息消费方式(另一种是轮询,应尽可能避免使用)。

要向队列添加消费者(订阅队列),请以两种方式之一使用 #'basic.consume'{} 方法

#'basic.consume_ok'{consumer_tag = Tag} =
amqp_channel:subscribe(Channel, #'basic.consume'{queue = Q}, Consumer)

%% A consumer process is not provided so the calling
%% process (`self()`) will be the consumer
#'basic.consume_ok'{consumer_tag = Tag} =
amqp_channel:call(Channel, #'basic.consume'{queue = Q})

consumer 参数是将由客户端库向其传递消息的进程的 pid。这可以是任意 Erlang 进程,包括启动连接的进程。#'basic.consume_ok'{} 返回包含一个消费者标记。该标记是用于取消消费者的消费者(订阅)标识符。

这在稍后的时间点用于取消消费者。此通知同时发送到创建订阅的进程(作为 amqp_channel:subscribe/3 的返回值)和作为消息发送到消费者进程。

当消费者进程订阅队列时,它将在其邮箱中接收消息。一个示例接收循环如下所示

loop(Channel) ->
receive
%% This is the first message received
#'basic.consume_ok'{} ->
loop(Channel);

%% This is received when the subscription is cancelled
#'basic.cancel_ok'{} ->
ok;

%% A delivery
{#'basic.deliver'{delivery_tag = Tag}, Content} ->
%% Do something with the message payload
%% (some work here)

%% Ack the message
amqp_channel:cast(Channel, #'basic.ack'{delivery_tag = Tag}),

%% Loop
loop(Channel)
end.

在以上示例中,进程消费消费者注册(订阅)通知,然后继续等待传递消息到达其进程邮箱。

当收到消息时,循环会对消息执行一些有用的操作,并将确认发送回服务器。如果取消了消费者,则会向消费者进程发送取消通知。在这种情况下,接收循环只是退出。如果应用程序不希望显式确认消息接收,则可以使用自动确认模式。为此,请将 #'basic.consume' 记录的 no_ack 属性设置为 true。在自动确认模式下,消费者不确认交付:RabbitMQ 会在将它们发送到连接后立即将其视为已交付。

取消消费者

要取消消费者,请使用 #'basic.consume_ok'{} 响应返回的消费者标记

amqp_channel:call(Channel, #'basic.cancel'{consumer_tag = Tag})

已取消的消费者可能仍会收到“正在传输中”的交付,例如,在取消消费者时当前位于 TCP 缓冲区中的交付。但是,最终(通常在取消消费者后不久)将不再有进一步的交付到其处理进程。

消费者的实现

通道使用实现 amqp_gen_consumer 行为的模块来确定应如何处理消费者事件。实际上,此模块处理客户端消费者注册,并确保将交付路由到适当的消费者。

例如,默认消费者模块 amqp_selective_consumer 跟踪哪些进程订阅了哪些队列,并适当地路由交付;此外,如果通道为其提供了未知消费者的交付,它将将其传递给默认消费者(如果已注册一个)。

相比之下,amqp_direct_consumer 只是将其从通道接收到的所有消息转发到其唯一注册的消费者。

通道的消费者模块在打开通道时通过将第二个参数设置为 amqp_connection:open_channel/2 来选择。

消费者模块实现 amqp_gen_consumer 行为,因此实现处理接收 basic.consumebasic.consume_okbasic.cancelbasic.cancel_ok 方法以及交付已发布消息的函数。

关闭通道和连接

当不再需要通道时,客户端应将其关闭。这可以使用 amqp_channel:close/1 来实现

amqp_channel:close(Channel)

要关闭连接,请使用 amqp_connection:close/1

amqp_connection:close(Connection)

关闭连接将自动隐式关闭该连接上的所有通道。

#'channel.close' 和 #'connection.close'#'connection.close'命令都采用参数 reply_code(整数)和 reply_text(二进制),客户端可以根据通道或连接关闭的原因来设置这些参数。

在大多数情况下,reply_code 应设置为 200 以指示正常关闭。reply_text 属性只是一个任意字符串,服务器可能会也可能不会记录它。如果客户端想要设置为不同的回复代码和/或文本,则可以使用重载函数 amqp_channel:close/3amqp_connection:close/3

交付流量控制

默认情况下,通道内没有流量控制,除了正常的 TCP 反压。消费者可以设置预取缓冲区的大小,broker 将为单个通道上未完成的未确认消息维护该缓冲区的大小。这可以使用 #'basic.qos'命令来实现

amqp_channel:call(Channel, #'basic.qos'{prefetch_count = Prefetch})

建议应用程序使用预取。在发布者确认和消费者确认指南中了解更多信息。

阻塞的连接

当节点检测到其低于某个可用资源阈值时,它可能会选择停止从发布者的网络套接字读取数据

RabbitMQ 支持一种机制,允许告知客户端已发生这种情况

使用 amqp_connection:register_blocked_handler/2,提供进程的 pid,#'connection.blocked'{}#'connection.unblocked'{} 可能会发送到该进程。

处理返回的消息

broker 会将无法交付的消息返回给原始客户端。这些消息是使用 immediate 或 mandatory 标志设置发布的消息。为了使应用程序收到返回通知,它必须注册一个可以处理 #'basic.return'{} 帧的回调进程。

这是一个无法路由的消息处理示例

amqp_channel:register_return_handler(Channel, self()),
amqp_channel:call(Channel, #'exchange.declare'{exchange = X}),
Publish = #'basic.publish'{exchange = X, routing_key = SomeKey,
mandatory = true},
amqp_channel:call(Channel, Publish, #amqp_msg{payload = Payload}),
receive
{BasicReturn, Content} ->
#'basic.return'{reply_text = <<"unroutable">>, exchange = X} = BasicReturn
%% Do something with the returned message
end

使用 “拉取 API” 接收消息

也可以按需检索单个消息(“拉取 API”,也称为轮询)。这种消费方法效率极低,因为它实际上是轮询,即使绝大多数请求都没有结果,应用程序也必须重复请求结果。因此,强烈不建议使用这种方法。

这可以使用 #'basic.get'{} 命令来实现

Get = #'basic.get'{queue = Q, no_ack = true},
{#'basic.get_ok'{}, Content} = amqp_channel:call(Channel, Get),
#amqp_msg{payload = Payload} = Content

返回的有效负载是 Erlang 二进制文件,由应用程序对其进行解码,因为此内容的结构对于客户端库和服务器都是不透明的。

如果在调用 #'basic.get'{} 命令时队列为空,则通道将返回 #'basic.get_empty' 结果,如此处所示

#'basic.get_empty'{} = amqp_channel:call(Channel, Get)

请注意,前面的示例在 #'basic.get'{} 命令上设置了 no_ack 标志。这告诉 broker 接收者不会发送消息的确认。这样做,broker 可以解除其交付责任 - 一旦它认为它已交付消息,则可以自由地假设消费应用程序已承担责任。通常,许多应用程序不希望使用这些语义,相反,它们希望显式确认消息的接收。这通过 #'basic.ack'命令完成,其中 no_ack 字段默认关闭

Get = #'basic.get'{queue = Q},
{#'basic.get_ok'{delivery_tag = Tag}, Content}
= amqp_channel:call(Channel, Get),
%% Do something with the message payload.......and then ack it
amqp_channel:cast(Channel, #'basic.ack'{delivery_tag = Tag})

请注意,#'basic.ack'{} 方法是使用 amqp_channel:cast/2 而不是 amqp_channel:call/2 发送的。这是因为确认是完全异步的,并且服务器不会为它们生成响应。

一个基本示例

以下是库基本用法的完整示例。为了简单起见,它不使用发布者确认,而是使用执行手动确认轮询消费者

-module(amqp_example).

-include("amqp_client.hrl").

-compile([export_all]).

test() ->
%% Start a network connection
{ok, Connection} = amqp_connection:start(#amqp_params_network{}),
%% Open a channel on the connection
{ok, Channel} = amqp_connection:open_channel(Connection),

%% Declare a queue
#'queue.declare_ok'{queue = Q}
= amqp_channel:call(Channel, #'queue.declare'{}),

%% Publish a message
Payload = <<"foobar">>,
Publish = #'basic.publish'{exchange = <<>>, routing_key = Q},
amqp_channel:cast(Channel, Publish, #amqp_msg{payload = Payload}),

%% Poll for a message
Get = #'basic.get'{queue = Q},
{#'basic.get_ok'{delivery_tag = Tag}, Content}
= amqp_channel:call(Channel, Get),

%% Do something with the message payload
%% (some work here)

%% Ack the message
amqp_channel:cast(Channel, #'basic.ack'{delivery_tag = Tag}),

%% Close the channel
amqp_channel:close(Channel),
%% Close the connection
amqp_connection:close(Connection),

ok.

在此示例中,使用服务器生成的名称创建队列,并将消息直接发布到队列。这利用了每个队列都通过其自己的队列名称绑定到默认交换机的事实。然后,消息出队并确认。

编译以客户端作为依赖的代码

客户端构建过程生成两个部署归档文件

  • amqp_client.ez,其中包含所有客户端库模块
  • rabbit_common.ez,其中包含运行时所需的服务器中的通用模块

可以使用构建工具(如 Rebar 3erlang.mk)来配置这两个依赖项。

为了举例说明,让我们假设使用的依赖项管理工具在 ./deps 目录下编译依赖项。

然后,要手动编译示例代码,可以使用 erlc,并将 ERL_LIBS 指向 ./deps 目录

ERL_LIBS=deps erlc -o ebin amqp_example.erl

然后,要运行您的应用程序,您可以像这样设置 Erlang 运行时

ERL_LIBS=deps erl -pa ebin
# => Erlang/OTP 23 [erts-11.0] [source] [64-bit] [smp:8:8] [ds:8:8:10] [async-threads:16]
# =>
# => Eshell V11.0 (abort with ^G)
# => 1> amqp_example:test().
# => ok
# => 2>
© . All rights reserved.