跳至主要内容

Erlang RabbitMQ 客户端库

概述

本指南介绍了 RabbitMQ 的 Erlang 客户端 (AMQP 0-9-1).

本用户指南假设读者熟悉 AMQP 0-9-1 的基本概念.

请参考有关 连接通道队列发布者消费者 的指南,以详细了解这些关键的 RabbitMQ 概念。

本指南中涉及的一些主题包括

等等。

依赖关系

客户端库名为 amqp_client,并 通过 Hex.pm 发布,以及它的主要依赖项 rabbit-common

以下是将与流行的构建工具一起使用的依赖项代码片段:Mix、Rebar 3erlang.mk

Mix

{:rabbit_common, "~> 3.8"}

Rebar 3

{rabbit_common, "&version-erlang-client;"}

erlang.mk

dep_rabbit_common = hex &version-erlang-client;

基础知识

客户端的基本用法遵循以下几个步骤

  1. 确保 amqp_client Erlang 应用程序已启动
  2. 建立与 RabbitMQ 节点的 连接
  3. 在连接上打开一个新通道
  4. 使用通道执行 AMQP 0-9-1 命令,例如声明交换机和队列、定义它们之间的绑定、发布消息、注册消费者(订阅)等等
  5. 注册可选的事件处理程序,例如 返回消息处理程序
  6. 不再需要时,关闭通道和连接

amqp_client 应用程序

RabbitMQ Erlang 客户端是一个名为 amqp_client 的 Erlang 应用程序。

与任何 Erlang 应用程序一样,要开始使用客户端,首先需要确保它已启动

application:ensure_started(amqp_client).

关键模块和概念

客户端库中的两个主要模块是

  • amqp_connection,用于打开与 RabbitMQ 节点的连接并在其上打开通道
  • amqp_channel,它公开了大多数 AMQP 0-9-1 操作,例如队列声明或消费者注册

一旦连接建立并成功 认证,并且通道已打开,应用程序通常会将 amqp_channel:call/{2,3}amqp_channel:cast/{2,3} 函数与 AMQP 0-9-1 协议方法记录一起使用来执行大多数操作。

几个额外的模块使应用程序能够对某些事件做出反应。这些模块将在本指南的后面介绍。

该库由两层组成

  • 遵循 AMQP 0-9-1 协议和操作执行模型的高级逻辑层
  • 负责与 RabbitMQ 节点通信的低级协议实现层

网络连接类型

AMQP 0-9-1 客户端使用 TCP 连接到 RabbitMQ。一个 AMQP 0-9-1 连接在幕后使用一个 TCP 连接。但是,Erlang 客户端的独特之处在于它提供了另一种与 RabbitMQ 节点通信的方式。

网络客户端

与其他客户端类似,该库提供了一个基于 TCP 的客户端,它使用 TCP 连接将序列化协议帧传输到服务器。此客户端称为网络客户端,大多数应用程序都应该使用它。

要使用网络客户端,请使用 amqp_connection:start/1 启动连接,并将参数设置为 #amqp_params_network 记录。

直接(Erlang 分布)客户端

或者,可以使用 Erlang 分布连接代替单独的 TCP 连接。此通信方法假设使用客户端的应用程序与 RabbitMQ 节点运行在同一个 Erlang 集群中。

直接客户端的使用应仅限于与 RabbitMQ 并排部署的应用程序。 ShovelFederation 插件是此类应用程序的两个示例。

在大多数其他情况下,开发人员应该更喜欢上面介绍的更传统的网络客户端。对于不熟悉 Erlang 的操作人员和开发人员来说,它更容易理解。

要使用直接驱动程序,请使用 amqp_connection:start/1 启动连接,并将参数设置为 #amqp_params_direct 记录。

包含头文件

Erlang 客户端使用许多记录定义,您将在本指南中遇到这些定义。这些记录大致分为两类

  • 生成的 AMQP 0-9-1 方法定义
  • 客户端中常用的数据结构的定义

要访问这些记录,您需要在每个使用 Erlang 客户端的模块中包含 amqp_client.hrl

-include("amqp_client.hrl").

连接到 RabbitMQ

amqp_connection 模块用于启动与 RabbitMQ 节点的 连接。在本例中,我们将使用网络连接,这是大多数用例的推荐选项

{ok, Connection} = amqp_connection:start(#amqp_params_network{})

此函数返回一个 {ok, Connection} 对,其中 Connection 是一个进程的 pid,该进程维护永久连接。此 pid 将用于在连接上打开通道以及 关闭连接

如果发生错误,上述调用将返回一个 {error, Error} 对。

#amqp_params_network 记录设置以下默认值

参数默认值
用户名guest
密码guest
虚拟主机/
主机localhost
端口5672
通道最大值2047
帧最大值0
心跳0
SSL 选项
认证机制[fun amqp_auth_mechanisms:plain/3, fun amqp_auth_mechanisms:amqplain/3]
客户端属性[]

这些值只是默认值,这些值将在运行在同一主机上的开箱即用的 RabbitMQ 节点上起作用。如果目标节点或环境已进行不同的配置,则可以覆盖这些值以匹配实际的部署场景。

TLS 选项也可以使用 amqp_client 应用程序的 ssl_options 环境键在全局范围内指定。它们将与来自 URI 的 TLS 参数合并(后者将优先)。

直接(Erlang 分布)客户端

在与 RabbitMQ 相同的 Erlang 集群中部署的应用程序(例如 RabbitMQ 插件)可以启动直接连接,该连接绕过网络序列化并依赖于 Erlang 分布进行数据传输。

要启动直接连接,请使用 amqp_connection:start/1,并将参数设置为 #amqp_params_direct 记录

{ok, Connection} = amqp_connection:start(#amqp_params_direct{})

对于直接连接,凭据是可选的,因为 Erlang 分布依赖于 共享密钥(Erlang cookie)进行认证。

如果提供了用户名和密码,则它们将用于认证并提供给认证后端。

如果只提供用户名,则该用户被视为可信用户并无条件登录。

如果既没有提供用户名也没有提供密码,则该连接将被视为来自完全可信用户,该用户可以连接到任何虚拟主机并拥有完全的 权限

#amqp_params_direct 记录设置以下默认值

参数默认值
用户名
密码
虚拟主机/
节点node()
客户端属性[]

使用 AMQP URI 连接到 RabbitMQ

无需直接使用 #amqp_params_network 等记录,可以使用 AMQP URI

为此目的提供了 amqp_uri:parse/1 函数。它会解析 URI 并返回等效的 #amqp_params_network#amqp_params_direct 记录。

与规范不同的是,如果省略了主机名,则该连接被假定为直接连接,并且将返回 #amqp_params_direct{} 记录。除了标准的主机、端口、用户、密码和虚拟主机参数外,还可以通过查询字符串指定额外的参数(例如,“?heartbeat=5”用于配置 心跳超时)。

创建通道

一旦连接建立,请使用 amqp_connection 模块打开一个或多个 通道,这些通道将用于定义拓扑结构、发布和消费消息

{ok, Channel} = amqp_connection:open_channel(Connection)

此函数接收连接进程的 pid 并返回一个 {ok, Channel} 对,其中 Channel 是一个代表通道的 pid,将用于执行协议命令。

使用 AMQP 0-9-1 方法(协议操作)

客户端库与 RabbitMQ 节点交互的主要方式是发送和处理 AMQP 0-9-1 方法(在本指南中也称为“命令”),这些方法由记录表示。

客户端尝试为每个记录使用合理的默认值。例如,当使用 #'exchange.declare'{} 方法声明一个瞬态交换机时,只需指定一个名称就足够了

#'exchange.declare'{exchange = <<"my_exchange">>}

上面的示例等效于以下内容

#'exchange.declare'{exchange    = <<"my_exchange">>,
type = <<"direct">>,
passive = false,
durable = false,
auto_delete = false,
internal = false,
nowait = false,
arguments = []}

定义拓扑:交换机、队列、绑定

建立连接后,amqp_channel 模块可用于管理 AMQP 中的基本对象,即交换机和队列。以下函数创建一个名为 my_exchange 的交换机,默认情况下,它是一个直接交换机

Declare = #'exchange.declare'{exchange = <<"my_exchange">>},
#'exchange.declare_ok'{} = amqp_channel:call(Channel, Declare)

类似地,这段代码创建了一个名为 my_queue瞬时队列

Declare = #'queue.declare'{queue = <<"my_queue">>},
#'queue.declare_ok'{} = amqp_channel:call(Channel, Declare)

要声明一个持久队列

Declare = #'queue.declare'{
queue = <<"my_queue">>,
durable = true
},
#'queue.declare_ok'{} = amqp_channel:call(Channel, Declare)

在某些情况下,应用程序希望使用瞬时队列,并且对队列的实际名称不感兴趣。在这种情况下,可以允许代理为队列生成一个随机名称。为此,请使用 #'queue.declare'{} 方法并将队列属性留空。为队列名称指定一个空字符串将具有相同的效果。

#'queue.declare_ok'{queue = Queue} = amqp_channel:call(Channel, #'queue.declare'{})

服务器将生成一个在此集群中唯一的队列名称,并将此名称作为确认的一部分返回。

绑定

要创建从交换机到队列的路由规则,请使用 #'queue.bind'{} 命令

Binding = #'queue.bind'{queue       = Queue,
exchange = Exchange,
routing_key = RoutingKey},
#'queue.bind_ok'{} = amqp_channel:call(Channel, Binding)

当不再需要此路由规则时,可以使用 #'queue.unbind'{} 命令删除此路由

Binding = #'queue.unbind'{queue       = Queue,
exchange = Exchange,
routing_key = RoutingKey},
#'queue.unbind_ok'{} = amqp_channel:call(Channel, Binding)

删除实体

可以使用 #'exchange.delete'{} 命令删除交换机

Delete = #'exchange.delete'{exchange = <<"my_exchange">>},
#'exchange.delete_ok'{} = amqp_channel:call(Channel, Delete)

类似地,可以使用 #'queue.delete'{} 命令删除队列

Delete = #'queue.delete'{queue = <<"my_queue">>},
#'queue.delete_ok'{} = amqp_channel:call(Channel, Delete)

同步和异步协议方法、调用和广播

请注意,以上示例使用 amqp_channel:call/2。这是因为它们使用同步 AMQP 0-9-1 方法,这些方法会产生响应(与称为异步方法的一组方法不同)。

通常建议对同步方法使用 amqp_channel:call/{2,3},而不是 amqp_channel:cast/{2,3},尽管这两个函数都适用于同步和异步方法。

这两个函数之间的区别之一是,amqp_channel:call/{2,3} 会阻塞调用进程,直到从服务器收到回复(对于同步方法)或方法已在网络上发送(对于异步方法),而 amqp_channel:cast/{2,3} 会立即返回“ok”。

因此,只有使用 amqp_channel:call/{2,3},我们才能验证服务器是否已确认我们的命令。

发布消息

要将消息发布到具有特定路由键的交换机,请使用 #'basic.publish'{} 方法。消息使用 #amqp_msg{} 记录表示

Payload = <<"foobar">>,
Publish = #'basic.publish'{exchange = X, routing_key = Key},
amqp_channel:cast(Channel, Publish, #amqp_msg{payload = Payload})

默认情况下,#amqp_msg{} 记录的属性字段包含一组最小的消息属性,作为 #'P_basic'{} 属性记录。

如果应用程序需要覆盖任何默认值,例如,要发送持久消息,则需要相应地构造 #amqp_msg{}

Payload = <<"foobar">>,
Publish = #'basic.publish'{exchange = X, routing_key = Key},
Props = #'P_basic'{delivery_mode = 2}, %% persistent message
Msg = #amqp_msg{props = Props, payload = Payload},
amqp_channel:cast(Channel, Publish, Msg)

可以在发布者指南中找到消息属性的完整列表。

AMQP 0-9-1 #'basic.publish' 方法是异步的:服务器不会向其发送响应。但是,客户端可以选择接收不可路由消息。这在关于返回消息处理程序的部分中有所描述。

上面的示例不使用发布者确认。要等待发布一批消息后所有未完成的发布都得到确认,请使用 amqp_channel:wait_for_confirms/2 函数。如果所有未完成的发布都成功确认,它将返回 true;如果超时,它将返回 timeout

请注意,在每条发布的消息后等待效率极低且没有必要。更优化的方法是发布一批消息并等待其确认。如果某些发布未及时确认,则可以重新发布整个最后一批。

消费者:使用“推送 API”订阅队列

应用程序可以订阅以接收路由到队列的消息。这种“推送 API”是推荐的消费消息方式(另一种方式是轮询,应尽可能避免)。

要向队列添加消费者(订阅队列),请使用 #'basic.consume'{} 方法,可以使用以下两种方式之一

#'basic.consume_ok'{consumer_tag = Tag} =
amqp_channel:subscribe(Channel, #'basic.consume'{queue = Q}, Consumer)

或者

%% A consumer process is not provided so the calling
%% process (`self()`) will be the consumer
#'basic.consume_ok'{consumer_tag = Tag} =
amqp_channel:call(Channel, #'basic.consume'{queue = Q})

消费者参数是客户端库将向其传递消息的进程的 pid。这可以是任意 Erlang 进程,包括启动连接的进程。#'basic.consume_ok'{} 返回值包含一个消费者标签。标签是消费者(订阅)标识符,用于取消消费者。

这在以后用于取消消费者。此通知将发送到创建订阅的进程(作为 amqp_channel:subscribe/3 的返回值),并作为消息发送到消费者进程。

当消费者进程订阅队列时,它将在其邮箱中接收消息。一个接收循环示例如下所示

loop(Channel) ->
receive
%% This is the first message received
#'basic.consume_ok'{} ->
loop(Channel);

%% This is received when the subscription is cancelled
#'basic.cancel_ok'{} ->
ok;

%% A delivery
{#'basic.deliver'{delivery_tag = Tag}, Content} ->
%% Do something with the message payload
%% (some work here)

%% Ack the message
amqp_channel:cast(Channel, #'basic.ack'{delivery_tag = Tag}),

%% Loop
loop(Channel)
end.

在上面的示例中,进程消耗消费者注册(订阅)通知,然后继续等待传递的消息到达其进程邮箱。

接收消息时,循环会对消息执行一些有用的操作,并将确认发送回服务器。如果消费者被取消,取消通知将发送到消费者进程。在这种情况下,接收循环只是退出。如果应用程序不希望显式确认消息接收,则可以使用自动确认模式。为此,请将 #'basic.consume' 记录的 no_ack 属性设置为 true。在自动确认模式下,消费者不会确认传递:RabbitMQ 将在将它们发送到连接后立即将其视为已传递。

取消消费者

要取消消费者,请使用 #'basic.consume_ok'{} 响应返回的消费者标签

amqp_channel:call(Channel, #'basic.cancel'{consumer_tag = Tag})

已取消的消费者仍然可能接收“正在传输”的传递,例如,在消费者取消时当前位于 TCP 缓冲区中的传递。但是,最终(通常在之后不久)——并且通常在消费者取消后不久——不会再向其处理进程传递消息。

消费者的实现

通道使用一个实现 amqp_gen_consumer 行为的模块来确定它应该如何处理消费者事件。实际上,此模块处理客户端的消费者注册并确保将传递的消息路由到相应的消费者。

例如,默认消费者模块 amqp_selective_consumer 会跟踪哪些进程订阅了哪些队列,并将传递的消息路由到相应的队列;此外,如果通道为其提供未知消费者的传递消息,它会将其传递给默认消费者(如果已注册)。

相比之下,amqp_direct_consumer 只会将它从通道接收到的所有消息转发给它唯一注册的消费者。

通道的消费者模块是在通过将第二个参数设置为 amqp_connection:open_channel/2 打开通道时选择的。

消费者模块实现 amqp_gen_consumer 行为,因此实现了用于处理接收 basic.consumebasic.consume_okbasic.cancelbasic.cancel_ok 方法以及发布消息传递的功能。

关闭通道和连接

当不再需要通道时,客户端应将其关闭。这是使用 amqp_channel:close/1 完成的

amqp_channel:close(Channel)

要关闭连接,请使用 amqp_connection:close/1

amqp_connection:close(Connection)

关闭连接会自动隐式关闭该连接上的所有通道。

两者 #'channel.close'和 #'connection.close'命令都接受 reply_code(整数)和 reply_text(二进制)参数,这些参数可以由客户端根据关闭通道或连接的原因进行设置。

在大多数情况下,reply_code 应设置为 200,以指示正常关闭。reply_text 属性只是一个任意字符串,服务器可能会或可能不会记录该字符串。如果客户端希望设置为不同的回复代码和/或文本,它可以使用重载函数 amqp_channel:close/3amqp_connection:close/3(分别)。

传递流控制

默认情况下,除了正常的 TCP 背压之外,通道内没有流控制。消费者可以设置代理将为单个通道上未确认的未完成消息维护的预取缓冲区的大小。这是使用 #'basic.qos'命令

amqp_channel:call(Channel, #'basic.qos'{prefetch_count = Prefetch})

建议应用程序使用预取。在发布者确认和消费者确认指南中了解更多信息。

阻塞连接

当节点检测到其低于某个可用资源阈值时,它可能会选择停止从发布者的网络套接字读取数据

RabbitMQ 支持一种机制,允许客户端获悉这种情况

使用 amqp_connection:register_blocked_handler/2,给出应向其发送 #'connection.blocked'{}#'connection.unblocked'{} 的进程的 pid。

处理返回的消息

代理会将不可传递的消息返回给原始客户端。这些是使用 immediate 或 mandatory 标志发布的消息。为了使应用程序收到返回通知,它必须注册一个回调进程,该进程可以处理 #'basic.return'{} 帧。

这是一个不可路由消息处理的示例

amqp_channel:register_return_handler(Channel, self()),
amqp_channel:call(Channel, #'exchange.declare'{exchange = X}),
Publish = #'basic.publish'{exchange = X, routing_key = SomeKey,
mandatory = true},
amqp_channel:call(Channel, Publish, #amqp_msg{payload = Payload}),
receive
{BasicReturn, Content} ->
#'basic.return'{reply_text = <<"unroutable">>, exchange = X} = BasicReturn
%% Do something with the returned message
end

使用“获取 API”接收消息

也可以按需检索单个消息(“拉取 API”又称轮询)。这种消费方法效率极低,因为它是实际的轮询,应用程序必须反复请求结果,即使绝大多数请求没有产生结果。因此,强烈建议不要使用这种方法

这是使用 #'basic.get'{} 命令实现的

Get = #'basic.get'{queue = Q, no_ack = true},
{#'basic.get_ok'{}, Content} = amqp_channel:call(Channel, Get),
#amqp_msg{payload = Payload} = Content

返回的有效负载是 Erlang 二进制,应用程序需要自行对其进行解码,因为该内容的结构对客户端库和服务器都是不透明的。

如果在调用 #'basic.get'{} 命令时队列为空,则通道将返回 #'basic.get_empty' 结果,如以下示例所示

#'basic.get_empty'{} = amqp_channel:call(Channel, Get)

请注意,前面的示例在 #'basic.get'{} 命令上设置了 no_ack 标志。这告诉代理接收方不会发送消息确认。这样一来,代理就可以免除其交付责任 - 一旦它认为已交付一条消息,它就可以自由地假设正在消费的应用程序已对此负责。通常,许多应用程序都不希望使用这些语义,而是希望明确地确认已收到消息。这可以通过 #'basic.ack' 命令来实现,其中 no_ack 字段默认情况下处于关闭状态。命令,其中 no_ack 字段默认情况下处于关闭状态。

Get = #'basic.get'{queue = Q},
{#'basic.get_ok'{delivery_tag = Tag}, Content}
= amqp_channel:call(Channel, Get),
%% Do something with the message payload.......and then ack it
amqp_channel:cast(Channel, #'basic.ack'{delivery_tag = Tag})

请注意,#'basic.ack'{} 方法是使用 amqp_channel:cast/2 而不是 amqp_channel:call/2 发送的。这是因为确认是完全异步的,服务器不会为此产生响应。

基本示例

以下是库的基本使用示例。为了简单起见,它没有使用 发布者确认,而是使用 轮询消费者,该消费者执行 手动确认

-module(amqp_example).

-include("amqp_client.hrl").

-compile([export_all]).

test() ->
%% Start a network connection
{ok, Connection} = amqp_connection:start(#amqp_params_network{}),
%% Open a channel on the connection
{ok, Channel} = amqp_connection:open_channel(Connection),

%% Declare a queue
#'queue.declare_ok'{queue = Q}
= amqp_channel:call(Channel, #'queue.declare'{}),

%% Publish a message
Payload = <<"foobar">>,
Publish = #'basic.publish'{exchange = <<>>, routing_key = Q},
amqp_channel:cast(Channel, Publish, #amqp_msg{payload = Payload}),

%% Poll for a message
Get = #'basic.get'{queue = Q},
{#'basic.get_ok'{delivery_tag = Tag}, Content}
= amqp_channel:call(Channel, Get),

%% Do something with the message payload
%% (some work here)

%% Ack the message
amqp_channel:cast(Channel, #'basic.ack'{delivery_tag = Tag}),

%% Close the channel
amqp_channel:close(Channel),
%% Close the connection
amqp_connection:close(Connection),

ok.

在本示例中,使用服务器生成的名称创建一个队列,并将一条消息直接发布到该队列。这利用了每个队列都通过其自己的队列名称绑定到默认交换的事实。然后将消息出队并确认。

使用客户端作为依赖项编译代码

客户端构建过程会生成两个部署存档

  • amqp_client.ez,其中包含所有客户端库模块
  • rabbit_common.ez,其中包含服务器中运行时所需的通用模块

可以使用构建工具(例如 Rebar 3erlang.mk)来提供这两个依赖项。

为了举例说明,假设使用的依赖项管理工具在 ./deps 目录下编译依赖项。

然后,要手动编译示例代码,可以使用 erlc,并让 ERL_LIBS 指向 ./deps 目录

ERL_LIBS=deps erlc -o ebin amqp_example.erl

然后,要运行您的应用程序,您可以像这样设置 Erlang 运行时

ERL_LIBS=deps erl -pa ebin
# => Erlang/OTP 23 [erts-11.0] [source] [64-bit] [smp:8:8] [ds:8:8:10] [async-threads:16]
# =>
# => Eshell V11.0 (abort with ^G)
# => 1> amqp_example:test().
# => ok
# => 2>
© 2024 RabbitMQ. All rights reserved.