跳至主要内容

AMQP 1.0 客户端库

本页介绍如何使用 AMQP 1.0 客户端库与 **RabbitMQ 4.0 或更高版本** 进行交互。

RabbitMQ 团队支持以下库

应用程序开发者可以在这里找到如何使用这些库进行最常见的用例。有关许可、下载、依赖关系管理、高级和特定用法以及配置等其他信息,请参阅相应库存储库中的 README 页面。

概述

RabbitMQ 团队维护着一套 AMQP 1.0 客户端库,经过精心设计并针对 RabbitMQ 进行了优化。它们在 AMQP 1.0 之上提供了一个简单、安全且功能强大的 API。应用程序可以使用这些库发布和使用消息,并在编程语言之间以一致的方式管理服务器拓扑。这些库还提供高级功能,例如自动连接和拓扑恢复,以及与队列的连接亲和性。

注意

RabbitMQ 与任何符合 AMQP-1.0 标准的客户端库兼容。使用 RabbitMQ AMQP 1.0 客户端库与 RabbitMQ 并非强制性要求,但强烈建议应用程序这样做,以获得最佳体验。

安全性

RabbitMQ AMQP 1.0 客户端库默认情况下是安全的,它们始终创建 持久实体 并始终发布持久消息。

保证

RabbitMQ AMQP 1.0 客户端库提供至少一次的保证。

代理始终 确认 已正确处理已发布的消息。发布者通过使用 unsettled 发送方确认模式first 接收方确认模式(在创建时)来实现这一点。

消费者必须始终 指示 消息处理结果给代理。消费者在创建时使用与发布者相同的设置(first 接收方确认模式unsettled 发送方确认模式)。

客户端 API

本节介绍如何使用 RabbitMQ AMQP 1.0 客户端库连接到集群,以及发布和使用消息。

连接

库提供了一个连接到节点或节点集群的入口点。它的名称是“环境”。环境允许创建连接。它可以包含连接之间共享的基础架构相关配置设置(例如,Java 的线程池)。以下是创建环境的方法

创建环境
import com.rabbitmq.client.amqp.*;
import com.rabbitmq.client.amqp.impl.AmqpEnvironmentBuilder;

// ...

// create the environment instance
Environment environment = new AmqpEnvironmentBuilder()
.build();
// ...
// close the environment when the application stops
environment.close();

通常,应用程序进程只有一个环境实例。应用程序必须在退出时关闭环境以释放其资源。

应用程序从环境中打开连接。它们必须指定适当的设置才能连接到集群节点(URI、凭据)。

打开连接
// open a connection from the environment
Connection connection = environment.connectionBuilder()
.uri("amqp://admin:admin@localhost:5672/%2f")
.build();
// ...
// close the connection when it is no longer necessary
connection.close();

库默认使用 ANONYMOUS SASL 身份验证机制。预期连接是长生命周期的对象,应用程序应避免连接频繁更换。在不再需要时,必须关闭它们。

发布

必须创建一个发布者来发布消息。发布者发布消息的目标通常在创建时设置,但也可以在每条消息的基础上设置。

以下是如何在创建时设置目标来声明发布者的示例

创建发布者
Publisher publisher = connection.publisherBuilder()
.exchange("foo").key("bar")
.build();
// ...
// close the publisher when it is no longer necessary
publisher.close();

在上面的示例中,使用发布者发布的每条消息都将发送到 foo 交换机,路由键为 bar

信息

RabbitMQ 使用 AMQ 0.9.1 模型,该模型包含交换机、队列和绑定。

消息是从发布者实例创建的。它们遵循 AMQP 1.0 消息格式。可以定义主体(作为字节数组)、标准属性和应用程序属性。

发布消息时,代理会以异步回调的方式指示它如何处理消息。客户端应用程序会根据代理为消息返回的状态(AMQP 术语中的“结果”)采取适当的措施(例如,如果消息未被 接受,则将消息存储在其他位置)。

以下代码片段展示了如何创建消息、发布消息以及处理来自代理的响应

发布消息
// create the message
Message message = publisher
.message("hello".getBytes(StandardCharsets.UTF_8))
.messageId(1L);

// publish the message and deal with broker feedback
publisher.publish(message, context -> {
// asynchronous feedback from the broker
if (context.status() == Publisher.Status.ACCEPTED) {
// the broker accepted (confirmed) the message
} else {
// deal with possible failure
}
});

上面的发布者示例将消息发送到具有指定路由键的指定交换机,但这不是发布者支持的唯一目标。以下是发布者支持的非空目标

创建具有不同目标的发布者
// publish to an exchange with a routing key
Publisher publisher1 = connection.publisherBuilder()
.exchange("foo").key("bar") // /exchanges/foo/bar
.build();

// publish to an exchange without a routing key
Publisher publisher2 = connection.publisherBuilder()
.exchange("foo") // /exchanges/foo
.build();

// publish to a queue
Publisher publisher3 = connection.publisherBuilder()
.queue("some-queue") // /queues/some-queue
.build();
信息

库将 API 调用转换为 地址格式 v2

也可以在每条消息的基础上定义目标。发布者必须在没有目标的情况下定义,并且每条消息在其属性部分的 to 字段中定义其目标。库在消息创建 API 中提供了辅助函数来定义消息目标,从而避免处理 地址格式

以下代码片段展示了如何在没有目标的情况下创建发布者,以及如何为消息定义不同的目标类型

在消息中设置目标
// no target defined on publisher creation
Publisher publisher = connection.publisherBuilder()
.build();

// publish to an exchange with a routing key
Message message1 = publisher.message()
.toAddress().exchange("foo").key("bar")
.message();

// publish to an exchange without a routing key
Message message2 = publisher.message()
.toAddress().exchange("foo")
.message();

// publish to a queue
Message message3 = publisher.message()
.toAddress().queue("my-queue")
.message();

使用

消费者创建

创建消费者包括指定要使用到的队列以及处理消息的回调函数

创建消费者
Consumer consumer = connection.consumerBuilder()
.queue("some-queue")
.messageHandler((context, message) -> {
byte[] body = message.body();
// ...
context.accept(); // settle the message
})
.build(); // do not forget to build the instance!

应用程序处理完一条消息后,必须对其进行确认。这将指示代理处理结果以及它应该如何处理消息(例如,删除消息)。应用程序必须确认消息,否则它们会用尽 信誉,并且代理将停止向它们分发消息。

下一节介绍消息确认的语义。

消息处理结果(结果)

库允许应用程序以不同的方式确认消息。它们使用在消息应用程序上下文中尽可能明确的术语。每个术语都映射到 AMQP 规范中的 给定结果

  • accept:应用程序成功处理了消息,可以将其从队列中删除(accepted 结果)
  • discard:应用程序无法处理消息,因为它无效,代理可以将其丢弃,或者如果已配置,则可以将其 死信rejected 结果)
  • requeue:应用程序未处理消息,代理可以将其重新排队,并将其传递给相同或不同的消费者(released 结果)

discardrequeue 具有可选的消息注释参数,可以将其与消息头部分中已有的注释合并。此类消息注释可用于提供有关 discardrequeue 原因的详细信息。应用程序特定的注释键必须以 x-opt- 前缀开头,而代理理解的注释键只以 x- 开头。discardrequeue 都使用带有消息注释参数的 modified 结果。

**只有队列组** 支持使用 modified 结果 修改消息注释

消费者优雅关闭

消费者通过接受、丢弃或重新排队消息来对其进行确认

未确认的消息在消费者关闭时会被重新排队。这会导致消息重复处理。

以下是一个示例

  • 消费者针对给定消息执行数据库操作。
  • 消费者在接受(确认)消息之前被关闭。
  • 消息被重新排队。
  • 另一个消费者获取消息并再次执行数据库操作。

很难完全避免重复消息,这就是为什么处理应该是幂等的。消费者 API 提供了一种方法,可在消费者关闭时避免重复消息。它包括暂停消息的传递,获取未确认消息的数量,以确保在某个时刻达到 0,然后关闭消费者。这确保了消费者最终已静止,并且已处理所有接收到的消息。

以下是如何优雅关闭消费者的示例

优雅地关闭消费者
// pause the delivery of messages
consumer.pause();
// ensure the number of unsettled messages reaches 0
long unsettledMessageCount = consumer.unsettledMessageCount();
// close the consumer
consumer.close();

应用程序仍然可以在不暂停的情况下关闭消费者,但存在处理相同消息多次的风险。

对流的支持

库在消费者配置中默认支持

可以在 使用 流时设置要附加到流的位置

附加到流的开头
Consumer consumer = connection.consumerBuilder()
.queue("some-stream")
.stream()
.offset(ConsumerBuilder.StreamOffsetSpecification.FIRST)
.builder()
.messageHandler((context, message) -> {
// message processing
})
.build();

还支持 流过滤 配置

配置流过滤
Consumer consumer = connection.consumerBuilder()
.queue("some-stream")
.stream()
.filterValues("invoices", "orders")
.filterMatchUnfiltered(true)
.builder()
.messageHandler((context, message) -> {
// message processing
})
.build();

在处理流时,请考虑使用您首选编程语言的 原生流协议 和流客户端库。

拓扑管理

应用程序可以管理 RabbitMQ 的 AMQ 0.9.1 模型:声明和删除交换器、队列和绑定。

为此,它们需要从连接中获取管理 API。

从环境中获取管理对象
Management management = connection.management();
// ...
// close the management instance when it is no longer needed
management.close();

管理 API 应该在不再需要时立即关闭。应用程序通常在启动时创建其所需的拓扑结构,因此管理对象可以在此步骤之后关闭。

交换器

以下是创建内置类型 交换器 的方法。

创建内置类型交换器
management.exchange()
.name("my-exchange")
.type(Management.ExchangeType.FANOUT) // enum for built-in type
.declare();

也可以将交换器类型指定为字符串(对于非内置类型交换器)。

创建非内置类型交换器
management.exchange()
.name("my-exchange")
.type("x-delayed-message") // non-built-in type
.autoDelete(false)
.argument("x-delayed-type", "direct")
.declare();

以下是删除交换器的方法。

删除交换器
management.exchangeDeletion().delete("my-exchange");

队列

以下是创建具有 默认队列类型队列 的方法。

创建经典队列
management.queue()
.name("my-queue")
.exclusive(true)
.autoDelete(false)
.declare();

管理 API 明确支持 队列参数

创建具有参数的队列
management.queue()
.name("my-queue")
.type(Management.QueueType.CLASSIC)
.messageTtl(Duration.ofMinutes(10))
.maxLengthBytes(ByteCapacity.MB(100))
.declare();

管理 API 还区分所有队列类型共享的参数和仅对特定类型有效的参数。以下是如何创建 仲裁队列 的示例。

创建仲裁队列
management
.queue()
.name("my-quorum-queue")
.quorum() // set queue type to 'quorum'
.quorumInitialGroupSize(3) // specific to quorum queues
.deliveryLimit(3) // specific to quorum queues
.queue()
.declare();

可以查询有关队列的信息。

获取队列信息
Management.QueueInfo info = management.queueInfo("my-queue");
long messageCount = info.messageCount();
int consumerCount = info.consumerCount();
String leaderNode = info.leader();

此 API 还可以用于检查队列是否存在。

以下是删除队列的方法。

删除队列
management.queueDeletion().delete("my-queue");

绑定

管理 API 支持将 绑定 到交换器。

将队列绑定到交换器
management.binding()
.sourceExchange("my-exchange")
.destinationQueue("my-queue")
.key("foo")
.bind();

还支持 交换器到交换器绑定

将交换器绑定到另一个交换器
management.binding()
.sourceExchange("my-exchange")
.destinationExchange("my-other-exchange")
.key("foo")
.bind();

还可以取消绑定实体。

删除交换器和队列之间的绑定
management.unbind()
.sourceExchange("my-exchange")
.destinationQueue("my-queue")
.key("foo")
.unbind();

高级用法

生命周期监听器

应用程序可以通过添加监听器来对某些 API 组件的状态更改做出反应。应用程序可以向连接添加监听器,以便在连接在连接恢复后停止发布消息。然后,应用程序可以在连接恢复并再次打开时恢复发布。

以下是如何在连接上设置监听器。

在连接上设置监听器
Connection connection = environment.connectionBuilder()
.listeners(context -> { // set one or several listeners
context.previousState(); // the previous state
context.currentState(); // the current (new) state
context.failureCause(); // the cause of the failure (in case of failure)
context.resource(); // the connection
}).build();

还可以为发布者实例设置监听器。

在发布者上设置监听器
Publisher publisher = connection.publisherBuilder()
.listeners(context -> {
// ...
})
.exchange("foo").key("bar")
.build();

以及在消费者实例上。

在消费者上设置监听器
Consumer consumer = connection.consumerBuilder()
.listeners(context -> {
// ...
})
.queue("my-queue")
.build();

自动连接恢复

自动连接恢复默认情况下处于激活状态:客户端库将在意外关闭(例如网络故障、节点重启等)后自动恢复连接。自动拓扑恢复也在连接恢复后立即激活:客户端库将重新创建 AMQP 实体,以及恢复连接的发布者和消费者。开发人员不必担心网络稳定性和节点重启,因为客户端库会处理这些问题。

客户端尝试每 5 秒重新连接一次,直到成功。可以通过自定义退避延迟策略来更改此行为。

为连接恢复设置退避策略
Connection connection = environment.connectionBuilder()
.recovery()
.backOffDelayPolicy(BackOffDelayPolicy.fixed(Duration.ofSeconds(2)))
.connectionBuilder().build();

如果拓扑恢复不适合特定应用程序,还可以停用它。应用程序通常会注册一个连接 生命周期监听器,以便在连接恢复时了解这一点,并相应地恢复其自身状态。

停用拓扑恢复
Connection connection = environment.connectionBuilder()
.recovery()
.topology(false) // deactivate topology recovery
.connectionBuilder()
.listeners(context -> {
// set listener that restores application state when connection is recovered
})
.build();

还可以完全停用恢复。

停用恢复
Connection connection = environment.connectionBuilder()
.recovery()
.activated(false) // deactivate recovery
.connectionBuilder().build();
© 2024 RabbitMQ. All rights reserved.