RabbitMQ AMQP 0.9.1 Erlang 客户端库
概述
本指南介绍了用于 RabbitMQ (AMQP 0-9-1) 的 Erlang 客户端。
本用户指南假定读者熟悉 AMQP 0-9-1 的基本概念。
请参阅有关 连接、通道、队列、发布者 和 消费者 的指南,以更详细地了解这些关键的 RabbitMQ 概念。
本指南涵盖的部分主题包括
等等。
依赖项
该客户端库命名为 amqp_client,并与核心依赖项 rabbit-common 一起通过 Hex.pm 分发。
以下是可用于主流构建工具(Mix、Rebar 3 和 erlang.mk)的依赖代码片段。
Mix
{:rabbit_common, "~> 3.8"}
Rebar 3
{rabbit_common, "&version-erlang-client;"}
erlang.mk
dep_rabbit_common = hex &version-erlang-client;
基础知识
客户端的基本用法遵循以下大致步骤
- 确保
amqp_clientErlang 应用程序已启动 - 建立到 RabbitMQ 节点的连接
- 在连接上打开一个新通道
- 使用通道执行 AMQP 0-9-1 命令,例如声明交换器和队列、定义它们之间的绑定、发布消息、注册消费者(订阅)等
- 注册可选的事件处理程序,例如退回消息处理程序
- 不再需要时,关闭通道和连接
amqp_client 应用程序
RabbitMQ Erlang 客户端是一个名为 amqp_client 的 Erlang 应用程序。
与任何 Erlang 应用程序一样,开始使用客户端之前,必须先确保其已启动
application:ensure_started(amqp_client).
关键模块与概念
客户端库中的两个主要模块是
amqp_connection:用于打开到 RabbitMQ 节点的连接并在其上打开通道amqp_channel:公开了大多数 AMQP 0-9-1 操作,例如队列声明或消费者注册
一旦连接建立并成功认证,且通道已打开,应用程序通常会使用 amqp_channel:call/{2,3} 和 amqp_channel:cast/{2,3} 函数,配合 AMQP 0-9-1 协议方法记录来执行大多数操作。
一些附加模块使应用程序能够对特定事件做出反应。它们将在本指南后面进行介绍。
该库由两层组成
- 一个遵循 AMQP 0-9-1 协议和操作执行模型的高层逻辑层
- 一个负责与 RabbitMQ 节点通信的底层协议实现层
网络连接类型
AMQP 0-9-1 客户端使用 TCP 连接到 RabbitMQ。一个 AMQP 0-9-1 连接在底层使用一个 TCP 连接。然而,Erlang 客户端的独特之处在于它提供了一种与 RabbitMQ 节点通信的替代方式。
网络客户端
与其他客户端非常相似,该库提供了一个基于 TCP 的客户端,它使用 TCP 连接将序列化的协议帧传输到服务器。此客户端称为网络客户端,大多数应用程序应该使用它。
要使用网络客户端,请使用参数设置为 #amqp_params_network 记录的 amqp_connection:start/1 启动连接。
直接(Erlang 分布式)客户端
或者,可以使用 Erlang 分布式连接来代替单独的 TCP 连接。这种通信方法假设使用该客户端的应用程序与 RabbitMQ 节点运行在同一个 Erlang 集群中。
直接客户端的使用应仅限于与 RabbitMQ 并行部署的应用程序。Shovel 和 Federation 插件就是这类应用程序的两个示例。
在大多数其他情况下,开发人员应优先选择上面介绍的更传统的网络客户端。对于不熟悉 Erlang 的运维人员和开发人员来说,这样更容易理解。
要使用直接驱动程序,请使用参数设置为 #amqp_params_direct 记录的 amqp_connection:start/1 启动连接。
包含头文件
Erlang 客户端使用许多记录定义,您将在本指南中遇到它们。这些记录分为两大类
- 生成的 AMQP 0-9-1 方法定义
- 在整个客户端中常用的数据结构定义
要访问这些记录,您需要在每个使用 Erlang 客户端的模块中包含 amqp_client.hrl
-include("amqp_client.hrl").
连接到 RabbitMQ
amqp_connection 模块用于启动到 RabbitMQ 节点的连接。在本例中,我们将使用网络连接,这是大多数用例推荐的选项
{ok, Connection} = amqp_connection:start(#amqp_params_network{})
此函数返回一个 {ok, Connection} 对,其中 Connection 是维护持久连接的进程的 pid。此 pid 将用于在连接上打开通道和关闭连接。
如果出现错误,上述调用将返回一个 {error, Error} 对。
#amqp_params_network 记录设置了以下默认值
| 参数 | 默认值 |
| 用户名 | guest |
| password | guest |
| 虚拟主机 (vhost) | / |
| 主机 | localhost |
| 端口 | 5672 |
| channel_max | 2047 |
| frame_max | 0 |
| heartbeat | 0 |
| SSL 选项 | none |
| auth_mechanisms | [fun amqp_auth_mechanisms:plain/3, fun amqp_auth_mechanisms:amqplain/3] |
| 客户端属性 | [] |
这些值仅适用于在同一主机上运行的开箱即用的 RabbitMQ 节点的默认值。如果目标节点或环境配置不同,则可以覆盖这些值以匹配实际的部署场景。
TLS 选项也可以通过 amqp_client 应用程序的 ssl_options 环境变量全局指定。它们将与 URI 中的 TLS 参数合并(后者优先)。
直接(Erlang 分布式)客户端
部署在与 RabbitMQ 同一个 Erlang 集群内的应用程序(例如 RabbitMQ 插件)可以启动直接连接,该连接绕过网络序列化并依赖 Erlang 分布式进行数据传输。
要启动直接连接,请使用参数设置为 #amqp_params_direct 记录的 amqp_connection:start/1
{ok, Connection} = amqp_connection:start(#amqp_params_direct{})
直接连接的凭据是可选的,因为 Erlang 分布式依赖于共享密钥(即 Erlang cookie)进行身份验证。
如果提供了用户名和密码,它们将用于身份验证并提供给身份验证后端。
如果仅提供了用户名,则该用户被视为受信任的,并无条件登录。
如果既未提供用户名也未提供密码,则该连接将被视为来自完全受信任的用户,该用户可以连接到任何虚拟主机并拥有完全的权限。
#amqp_params_direct 记录设置了以下默认值
| 参数 | 默认值 |
| 用户名 | none |
| password | none |
| 虚拟主机 (vhost) | / |
| 节点 | node() |
| 客户端属性 | [] |
使用 AMQP URI 连接到 RabbitMQ
除了直接使用 #amqp_params_network 等记录外,还可以使用 AMQP URI。
amqp_uri:parse/1 函数用于此目的。它解析 URI 并返回等效的 #amqp_params_network 或 #amqp_params_direct 记录。
与规范不同的是,如果省略了主机名,则假定连接是直接的,并返回 #amqp_params_direct{} 记录。除了标准的主机、端口、用户、密码和虚拟主机参数外,还可以通过查询字符串指定额外参数(例如 "?heartbeat=5" 以配置心跳超时)。
创建通道
建立连接后,使用 amqp_connection 模块打开一个或多个通道,这些通道将用于定义拓扑、发布和消费消息
{ok, Channel} = amqp_connection:open_channel(Connection)
此函数接受连接进程的 pid,并返回一个 {ok, Channel} 对,其中 Channel 是代表通道并用于执行协议命令的 pid。
使用 AMQP 0-9-1 方法(协议操作)
客户端库与 RabbitMQ 节点交互的主要方式是发送和处理由记录表示的 AMQP 0-9-1 方法(在本指南中也称为“命令”)。
客户端尝试为每个记录使用合理的默认值。例如,在使用 #'exchange.declare'{} 方法声明瞬态交换器时,只需指定名称即可
#'exchange.declare'{exchange = <<"my_exchange">>}
上面的示例等同于此
#'exchange.declare'{exchange = <<"my_exchange">>,
type = <<"direct">>,
passive = false,
durable = false,
auto_delete = false,
internal = false,
nowait = false,
arguments = []}
定义拓扑:交换器、队列、绑定
建立通道后,可以使用 amqp_channel 模块来管理 AMQP 中的基本对象,即交换器和队列。以下函数创建了一个名为 my_exchange 的交换器,默认情况下它是直连交换器 (direct exchange)
Declare = #'exchange.declare'{exchange = <<"my_exchange">>},
#'exchange.declare_ok'{} = amqp_channel:call(Channel, Declare)
类似地,此代码创建了一个名为 my_queue 的瞬态队列
Declare = #'queue.declare'{queue = <<"my_queue">>},
#'queue.declare_ok'{} = amqp_channel:call(Channel, Declare)
要声明持久队列
Declare = #'queue.declare'{
queue = <<"my_queue">>,
durable = true
},
#'queue.declare_ok'{} = amqp_channel:call(Channel, Declare)
在某些情况下,应用程序希望使用瞬态队列,但并不关心队列的实际名称。在这种情况下,可以让代理为队列生成一个随机名称。为此,请使用 #'queue.declare'{} 方法并将 queue 属性保留为未定义。为队列名称指定空字符串具有相同的效果。
#'queue.declare_ok'{queue = Queue} = amqp_channel:call(Channel, #'queue.declare'{})
服务器将在此集群中生成一个唯一的队列名称,并将此名称作为确认的一部分返回。
绑定
要创建从交换器到队列的路由规则,请使用 #'queue.bind'{} 命令
Binding = #'queue.bind'{queue = Queue,
exchange = Exchange,
routing_key = RoutingKey},
#'queue.bind_ok'{} = amqp_channel:call(Channel, Binding)
当不再需要此路由规则时,可以使用 #'queue.unbind'{} 命令删除此路由
Binding = #'queue.unbind'{queue = Queue,
exchange = Exchange,
routing_key = RoutingKey},
#'queue.unbind_ok'{} = amqp_channel:call(Channel, Binding)
删除实体
可以使用 #'exchange.delete'{} 命令删除交换器
Delete = #'exchange.delete'{exchange = <<"my_exchange">>},
#'exchange.delete_ok'{} = amqp_channel:call(Channel, Delete)
类似地,使用 #'queue.delete'{} 命令删除队列
Delete = #'queue.delete'{queue = <<"my_queue">>},
#'queue.delete_ok'{} = amqp_channel:call(Channel, Delete)
同步与异步协议方法、调用 (Calls) 与推送 (Casts)
请注意,上面的示例使用了 amqp_channel:call/2。这是因为它们使用了会产生响应的同步 AMQP 0-9-1 方法(与称为异步方法的一组方法不同)。
通常建议对同步方法使用 amqp_channel:call/{2,3},而不是 amqp_channel:cast/{2,3},即使这两个函数都可以处理同步和异步方法。
这两个函数之间的一个区别是,amqp_channel:call/{2,3} 会阻塞调用进程,直到收到服务器的回复(对于同步方法)或该方法已在网络上发送(对于异步方法),而 amqp_channel:cast/{2,3} 会立即返回 'ok'。
因此,只有通过使用 amqp_channel:call/{2,3},我们才能验证服务器是否已确认我们的命令。
发布消息
要将消息发布到具有特定路由键的交换器,可以使用 #'basic.publish'{} 方法。消息使用 #amqp_msg{} 记录表示
Payload = <<"foobar">>,
Publish = #'basic.publish'{exchange = X, routing_key = Key},
amqp_channel:cast(Channel, Publish, #amqp_msg{payload = Payload})
默认情况下,#amqp_msg{} 记录的 properties 字段包含一组最小的消息属性,作为一个 #'P_basic'{} 属性记录。
如果应用程序需要覆盖任何默认值,例如发送持久化消息,则需要相应地构造 #amqp_msg{}
Payload = <<"foobar">>,
Publish = #'basic.publish'{exchange = X, routing_key = Key},
Props = #'P_basic'{delivery_mode = 2}, %% persistent message
Msg = #amqp_msg{props = Props, payload = Payload},
amqp_channel:cast(Channel, Publish, Msg)
有关消息属性的完整列表,请参阅发布者指南。
AMQP 0-9-1 #'basic.publish' 方法是异步的:服务器不会对其发送响应。但是,客户端可以选择让不可路由的消息返回给它们。这在关于退回消息处理程序的部分中进行了描述。
上面的示例未使用发布者确认 (Publisher Confirms)。要在发布一批消息后等待所有未完成的发布得到确认,请使用 amqp_channel:wait_for_confirms/2 函数。如果所有未完成的发布都成功确认,它将返回 true;如果发生超时,则返回 timeout。
请注意,在每条已发布的消息后等待是极其低效且不必要的。更优化的方法是发布一批消息并等待它们的确认。如果某些发布未及时得到确认,则可以重新发布整批最后的内容。
消费者:使用“推送 API”订阅队列
应用程序可以订阅以接收路由到队列的消息。此“推送 API”是消费消息的推荐方式(另一种方式是轮询,应尽可能避免)。
要向队列添加消费者(订阅队列),需要以两种方式之一使用 #'basic.consume'{} 方法
#'basic.consume_ok'{consumer_tag = Tag} =
amqp_channel:subscribe(Channel, #'basic.consume'{queue = Q}, Consumer)
或
%% A consumer process is not provided so the calling
%% process (`self()`) will be the consumer
#'basic.consume_ok'{consumer_tag = Tag} =
amqp_channel:call(Channel, #'basic.consume'{queue = Q})
consumer 参数是客户端库将向其发送消息的进程的 pid。这可以是任何 Erlang 进程,包括启动连接的进程。#'basic.consume_ok'{} 返回值包含一个消费者标签 (consumer tag)。该标签是一个消费者(订阅)标识符,用于取消消费者。
此标签稍后用于取消消费者。此通知既发送给创建订阅的进程(作为 amqp_channel:subscribe/3 的返回值),也作为消息发送给消费者进程。
当消费者进程订阅队列时,它将在其邮箱中接收消息。一个示例接收循环如下所示
loop(Channel) ->
receive
%% This is the first message received
#'basic.consume_ok'{} ->
loop(Channel);
%% This is received when the subscription is cancelled
#'basic.cancel_ok'{} ->
ok;
%% A delivery
{#'basic.deliver'{delivery_tag = Tag}, Content} ->
%% Do something with the message payload
%% (some work here)
%% Ack the message
amqp_channel:cast(Channel, #'basic.ack'{delivery_tag = Tag}),
%% Loop
loop(Channel)
end.
在上面的示例中,进程消耗了消费者注册(订阅)通知,然后继续等待传递的消息到达其进程邮箱。
当收到消息时,循环会对消息执行有用的操作,并将确认发送回服务器。如果消费者被取消,将向消费者进程发送取消通知。在这种情况下,接收循环直接退出。如果应用程序不希望显式确认消息接收,可以使用自动确认模式。为此,将 #'basic.consume' 记录的 no_ack 属性设置为 true。在自动确认模式下,消费者不会确认交付:RabbitMQ 会在将它们发送到连接后立即将其视为已交付。
取消消费者
要取消消费者,请使用随 #'basic.consume_ok'{} 响应返回的消费者标签
amqp_channel:call(Channel, #'basic.cancel'{consumer_tag = Tag})
已取消的消费者可能仍会收到“传输中”的交付消息,例如在消费者取消时当前在 TCP 缓冲区中的那些消息。然而,最终——通常在消费者取消后不久——将不会再有进一步的交付发送到其处理进程。
消费者的实现
通道使用实现 amqp_gen_consumer 行为的模块来确定它应该如何处理消费者事件。实际上,该模块处理客户端消费者注册,并确保将交付内容路由到适当的消费者。
例如,默认消费者模块 amqp_selective_consumer 会跟踪哪些进程订阅了哪些队列并相应地路由交付内容;此外,如果通道给了它一个未知消费者的交付内容,它会将其传递给默认消费者(如果有注册的话)。
相比之下,amqp_direct_consumer 只是将它从通道接收到的所有消息转发给它唯一注册的消费者。
通道的消费者模块是在打开通道时通过设置 amqp_connection:open_channel/2 的第二个参数来选择的。
消费者模块实现了 amqp_gen_consumer 行为,因此实现了处理接收 basic.consume、basic.consume_ok、basic.cancel、basic.cancel_ok 方法以及已发布消息交付的函数。
关闭通道和连接
当不再需要通道时,客户端应关闭它。这是通过 amqp_channel:close/1 实现的
amqp_channel:close(Channel)
要关闭连接,请使用 amqp_connection:close/1
amqp_connection:close(Connection)
关闭连接将自动隐含地关闭该连接上的所有通道。
#'channel.close'和 #'connection.close'命令都带有 reply_code(整数)和 reply_text(二进制)参数,客户端可以根据关闭通道或连接的原因来设置这些参数。
在大多数情况下,reply_code 应设置为 200 以指示正常关闭。reply_text 属性只是一个任意字符串,服务器可能会也可能不会记录它。如果客户端想要设置不同的回复代码和/或文本,它可以分别使用重载函数 amqp_channel:close/3 和 amqp_connection:close/3。
交付流控制
默认情况下,除了正常的 TCP 反压外,通道内没有流控制。消费者可以设置预取缓冲区 (prefetch buffer) 的大小,代理将为单个通道上未完成的未确认消息维护该大小。这是通过 #'basic.qos'命令实现的
amqp_channel:call(Channel, #'basic.qos'{prefetch_count = Prefetch})
建议应用程序使用预取功能。在发布者确认与消费者确认指南中了解更多信息。
连接被阻塞
当节点检测到其低于某个可用资源阈值时,它可能会选择停止从发布者的网络套接字读取数据。
RabbitMQ 支持一种允许告知客户端已发生此情况的机制。
使用 amqp_connection:register_blocked_handler/2 并提供一个进程的 pid,该进程应接收 #'connection.blocked'{} 和 #'connection.unblocked'{} 帧。
处理退回的消息
代理会将无法交付的消息退回给原始客户端。这些消息是在设置了 immediate 或 mandatory 标志的情况下发布的。为了使应用程序获得退回通知,它必须注册一个可以处理 #'basic.return'{} 帧的回调进程。
这是一个不可路由消息处理的示例
amqp_channel:register_return_handler(Channel, self()),
amqp_channel:call(Channel, #'exchange.declare'{exchange = X}),
Publish = #'basic.publish'{exchange = X, routing_key = SomeKey,
mandatory = true},
amqp_channel:call(Channel, Publish, #amqp_msg{payload = Payload}),
receive
{BasicReturn, Content} ->
#'basic.return'{reply_text = <<"unroutable">>, exchange = X} = BasicReturn
%% Do something with the returned message
end
使用“获取 API”接收消息
还可以按需检索单条消息(“拉取 API”,即轮询)。这种消费方法效率极低,因为它本质上是轮询,应用程序必须重复询问结果,即使绝大多数请求都没有结果。因此,强烈不建议使用这种方法。
这是通过 #'basic.get'{} 命令实现的
Get = #'basic.get'{queue = Q, no_ack = true},
{#'basic.get_ok'{}, Content} = amqp_channel:call(Channel, Get),
#amqp_msg{payload = Payload} = Content
返回的有效载荷是一个 Erlang 二进制数据,由应用程序负责解码,因为此内容的结构对客户端库和服务器都是不透明的。
如果调用 #'basic.get'{} 命令时队列为空,则通道将返回 #'basic.get_empty' 结果,如图所示
#'basic.get_empty'{} = amqp_channel:call(Channel, Get)
请注意,上一个示例在 #'basic.get'{} 命令上设置了 no_ack 标志。这告诉代理接收者不会发送消息的确认。这样,代理就可以免除交付责任——一旦它认为已交付了消息,它就可以自由地假设消费应用程序已承担了责任。通常,许多应用程序不希望使用这些语义,而是希望显式确认消息的接收。这是通过 #'basic.ack'命令完成的,默认情况下 no_ack 字段被关闭
Get = #'basic.get'{queue = Q},
{#'basic.get_ok'{delivery_tag = Tag}, Content}
= amqp_channel:call(Channel, Get),
%% Do something with the message payload.......and then ack it
amqp_channel:cast(Channel, #'basic.ack'{delivery_tag = Tag})
请注意,#'basic.ack'{} 方法是使用 amqp_channel:cast/2 而不是 amqp_channel:call/2 发送的。这是因为确认是完全异步的,服务器不会为它们产生响应。
基本示例
下面是该库基本用法的完整示例。为简单起见,它没有使用发布者确认,而是使用了执行手动确认的轮询消费者。
-module(amqp_example).
-include("amqp_client.hrl").
-compile([export_all]).
test() ->
%% Start a network connection
{ok, Connection} = amqp_connection:start(#amqp_params_network{}),
%% Open a channel on the connection
{ok, Channel} = amqp_connection:open_channel(Connection),
%% Declare a queue
#'queue.declare_ok'{queue = Q}
= amqp_channel:call(Channel, #'queue.declare'{}),
%% Publish a message
Payload = <<"foobar">>,
Publish = #'basic.publish'{exchange = <<>>, routing_key = Q},
amqp_channel:cast(Channel, Publish, #amqp_msg{payload = Payload}),
%% Poll for a message
Get = #'basic.get'{queue = Q},
{#'basic.get_ok'{delivery_tag = Tag}, Content}
= amqp_channel:call(Channel, Get),
%% Do something with the message payload
%% (some work here)
%% Ack the message
amqp_channel:cast(Channel, #'basic.ack'{delivery_tag = Tag}),
%% Close the channel
amqp_channel:close(Channel),
%% Close the connection
amqp_connection:close(Connection),
ok.
在此示例中,创建了一个服务器生成名称的队列,并直接将消息发布到该队列。这利用了每个队列都通过其自身队列名称绑定到默认交换器这一事实。然后将消息取出并确认。
将客户端作为依赖项编译代码
客户端构建过程会产生两个部署存档
- amqp_client.ez,包含所有客户端库模块
- rabbit_common.ez,包含运行环境所需的服务器公共模块
这两个依赖项都可以使用 Rebar 3 或 erlang.mk 等构建工具进行配置。
举个例子,假设所使用的依赖管理工具在 ./deps 目录下编译依赖项。
那么手动编译示例代码时,可以将 erlc 与指向 ./deps 目录的 ERL_LIBS 一起使用
ERL_LIBS=deps erlc -o ebin amqp_example.erl
然后,要运行您的应用程序,您可以像这样设置 Erlang 运行时
ERL_LIBS=deps erl -pa ebin
# => Erlang/OTP 23 [erts-11.0] [source] [64-bit] [smp:8:8] [ds:8:8:10] [async-threads:16]
# =>
# => Eshell V11.0 (abort with ^G)
# => 1> amqp_example:test().
# => ok
# => 2>