跳到主要内容

AMQP 1.0 客户端库

此页面记录了 RabbitMQ 4.0 或更高版本AMQP 1.0 客户端库的用法。

RabbitMQ 团队支持以下库

应用程序开发者将在此处找到如何将这些库用于最常见的用例。有关许可、下载、依赖管理、高级和特定用法以及配置等其他信息,请参阅各个库的存储库中的 README 页面。

概述

RabbitMQ 团队维护了一组 设计和优化 用于 RabbitMQ 的 AMQP 1.0 客户端库。它们在 AMQP 1.0 之上提供了简单、安全且功能强大的 API。应用程序可以使用这些库发布和消费消息,并以跨编程语言的一致方式管理服务器拓扑。这些库还提供高级功能,例如自动连接和拓扑恢复,以及与队列的连接亲和性。

注意

RabbitMQ 与任何 AMQP-1.0 兼容的客户端库兼容。并非强制要求将 RabbitMQ AMQP 1.0 客户端库与 RabbitMQ 一起使用,但强烈建议应用程序这样做以获得最佳体验。

安全性

RabbitMQ AMQP 1.0 客户端库默认是安全的,它们始终创建持久实体,并且始终发布持久消息。

保证

RabbitMQ AMQP 1.0 客户端库提供至少一次的保证。

Broker 始终确认已正确处理已发布的消息。发布者通过在使用 unsettled 发送方结算模式first 接收方结算模式 创建它们时实现这一点。

消费者必须始终指示消息处理结果给 Broker。消费者在创建时使用与发布者相同的设置(first 接收方结算模式unsettled 发送方结算模式)。

客户端 API

本节介绍如何使用 RabbitMQ AMQP 1.0 客户端库连接到集群、发布和消费消息。

连接

库提供了一个节点或节点集群的入口点。它的名称是“环境”。环境允许创建连接。它可以包含在连接之间共享的基础设施相关配置设置(例如,Java 的线程池)。以下是如何创建环境

创建环境
import com.rabbitmq.client.amqp.*;
import com.rabbitmq.client.amqp.impl.AmqpEnvironmentBuilder;

// ...

// create the environment instance
Environment environment = new AmqpEnvironmentBuilder()
.build();
// ...
// close the environment when the application stops
environment.close();

一个应用程序进程通常只有一个环境实例。应用程序必须在退出时关闭环境以释放其资源。

应用程序从环境中打开连接。它们必须指定适当的设置以连接到集群节点(URI、凭据)。

打开连接
// open a connection from the environment
Connection connection = environment.connectionBuilder()
.uri("amqp://admin:admin@localhost:5672/%2f")
.build();
// ...
// close the connection when it is no longer necessary
connection.close();

库默认使用 ANONYMOUS SASL 身份验证机制。连接应为长期存在的对象,应用程序应避免连接抖动。当不再需要它们时,必须关闭它们。

发布

必须创建一个发布者才能发布消息。发布者将发布消息的目标通常在创建时设置,但也可以在每条消息的基础上设置。

以下是如何在创建时设置目标的情况下声明发布者

创建发布者
Publisher publisher = connection.publisherBuilder()
.exchange("foo").key("bar")
.build();
// ...
// close the publisher when it is no longer necessary
publisher.close();

在前面的示例中,使用发布者发布的每条消息都将发送到具有 bar 路由键的 foo 交换机。

信息

RabbitMQ 使用包含交换机、队列和绑定的 AMQ 0.9.1 模型

消息从发布者实例创建。它们遵循 AMQP 1.0 消息格式。可以定义正文(作为字节数组)、标准属性和应用程序属性。

当消息发布时,Broker 会在异步回调中指示如何处理该消息。客户端应用程序根据 Broker 为消息返回的状态(AMQP 术语中的“结果”)采取适当的措施(例如,如果消息未被 accepted,则将消息存储在另一个位置)。

以下代码片段显示了如何创建消息、发布消息以及处理来自 Broker 的响应

发布消息
// create the message
Message message = publisher
.message("hello".getBytes(StandardCharsets.UTF_8))
.messageId(1L);

// publish the message and deal with broker feedback
publisher.publish(message, context -> {
// asynchronous feedback from the broker
if (context.status() == Publisher.Status.ACCEPTED) {
// the broker accepted (confirmed) the message
} else {
// deal with possible failure
}
});

上面的发布者示例将消息发送到给定的交换机以及给定的路由键,但这并非发布者唯一支持的目标。以下是发布者支持的非空目标

创建具有不同目标的发布者
// publish to an exchange with a routing key
Publisher publisher1 = connection.publisherBuilder()
.exchange("foo").key("bar") // /exchanges/foo/bar
.build();

// publish to an exchange without a routing key
Publisher publisher2 = connection.publisherBuilder()
.exchange("foo") // /exchanges/foo
.build();

// publish to a queue
Publisher publisher3 = connection.publisherBuilder()
.queue("some-queue") // /queues/some-queue
.build();
信息

库将 API 调用转换为 地址格式 v2

也可以在每条消息的基础上定义目标。必须定义没有任何目标的发布者,并且每条消息都在属性部分的 to 字段中定义其目标。库在消息创建 API 中提供帮助程序来定义消息目标,从而避免处理 地址格式

以下代码片段显示了如何创建没有目标的发布者并定义具有不同目标类型的消息

在消息中设置目标
// no target defined on publisher creation
Publisher publisher = connection.publisherBuilder()
.build();

// publish to an exchange with a routing key
Message message1 = publisher.message()
.toAddress().exchange("foo").key("bar")
.message();

// publish to an exchange without a routing key
Message message2 = publisher.message()
.toAddress().exchange("foo")
.message();

// publish to a queue
Message message3 = publisher.message()
.toAddress().queue("my-queue")
.message();

消费

消费者创建

创建消费者包括指定要从中消费的队列和处理消息的回调

创建消费者
Consumer consumer = connection.consumerBuilder()
.queue("some-queue")
.messageHandler((context, message) -> {
byte[] body = message.body();
// ...
context.accept(); // settle the message
})
.build(); // do not forget to build the instance!

一旦应用程序完成消息处理,它必须结算它。这向 Broker 指示处理结果以及应该如何处理消息(例如,删除消息)。应用程序必须结算消息,否则它们的 信用 将耗尽,Broker 将停止向它们分发消息。

下一节介绍消息结算的语义。

消息处理结果(结果)

库允许应用程序以不同的方式结算消息。它们在消息应用程序的上下文中使用了尽可能明确的术语。每个术语都映射到 AMQP 规范中的给定结果

  • accept:应用程序成功处理了消息,可以从队列中删除它(accepted 结果)
  • discard:应用程序无法处理消息,因为它无效,Broker 可以丢弃它或在配置的情况下死信它(rejected 结果)
  • requeue:应用程序未处理消息,Broker 可以重新排队并将其传递给相同或不同的消费者(released 结果)

discardrequeue 具有可选的消息注释参数,可以与消息标头部分中已有的注释参数组合使用。此类消息注释可用于提供有关 discardrequeue 原因的详细信息。应用程序特定的注释键必须以 x-opt- 前缀开头,而 Broker 理解的注释键仅以 x- 开头。discardrequeue 都使用带有消息注释参数的 modified 结果。

只有仲裁队列支持使用 modified 结果修改消息注释

消费者优雅关闭

消费者通过接受、丢弃或重新排队消息来结算消息。

当消费者关闭时,未结算的消息将被重新排队。这可能导致消息的重复处理。

这是一个例子

  • 消费者为给定消息执行数据库操作。
  • 消费者在接受(结算)消息之前被关闭。
  • 消息被重新排队。
  • 另一个消费者获取消息并再次执行数据库操作。

完全避免重复消息很困难,这就是为什么处理应该是幂等的。当消费者关闭时,消费者 API 提供了一种避免重复消息的方法。它包括暂停消息的传递,获取未结算消息的数量以确保它在某个时候达到 0,然后关闭消费者。这确保了消费者最终静止,并且所有接收到的消息都已处理。

这是一个消费者优雅关闭的示例

优雅地关闭消费者
// pause the delivery of messages
consumer.pause();
// ensure the number of unsettled messages reaches 0
long unsettledMessageCount = consumer.unsettledMessageCount();
// close the consumer
consumer.close();

应用程序仍然可以在不暂停消费者的情况下关闭消费者,但存在多次处理同一消息的风险。

对流的支持

库对消费者配置中的提供开箱即用的支持。

可以设置在从流消费时附加到哪里

附加到流的开头
Consumer consumer = connection.consumerBuilder()
.queue("some-stream")
.stream()
.offset(ConsumerBuilder.StreamOffsetSpecification.FIRST)
.builder()
.messageHandler((context, message) -> {
// message processing
})
.build();

还支持流过滤配置

配置流过滤
Consumer consumer = connection.consumerBuilder()
.queue("some-stream")
.stream()
.filterValues("invoices", "orders")
.filterMatchUnfiltered(true)
.builder()
.messageHandler((context, message) -> {
// message processing
})
.build();

在处理流时,还可以考虑将原生流协议与您首选编程语言的流客户端库一起使用。

拓扑管理

应用程序可以管理 RabbitMQ 的 AMQ 0.9.1 模型:声明和删除交换机、队列和绑定。

为此,他们需要从连接中获取管理 API

从环境中获取管理对象
Management management = connection.management();
// ...
// close the management instance when it is no longer needed
management.close();

一旦不再需要管理 API,就应立即关闭它。应用程序通常在启动时创建它需要的拓扑,因此管理对象可以在此步骤之后关闭。

交换机

以下是如何创建内置类型的交换机

创建内置类型的交换机
management.exchange()
.name("my-exchange")
.type(Management.ExchangeType.FANOUT) // enum for built-in type
.declare();

也可以将交换机类型指定为字符串(对于非内置类型交换机)

创建非内置类型的交换机
management.exchange()
.name("my-exchange")
.type("x-delayed-message") // non-built-in type
.autoDelete(false)
.argument("x-delayed-type", "direct")
.declare();

以下是如何删除交换机

删除交换机
management.exchangeDelete("my-exchange");

队列

以下是如何使用默认队列类型创建队列

创建经典队列
management.queue()
.name("my-queue")
.exclusive(true)
.autoDelete(false)
.declare();

管理 API 显式支持队列参数

创建带有参数的队列
management.queue()
.name("my-queue")
.type(Management.QueueType.CLASSIC)
.messageTtl(Duration.ofMinutes(10))
.maxLengthBytes(ByteCapacity.MB(100))
.declare();

管理 API 还区分了所有队列类型共享的参数和仅对给定类型有效的参数。以下是创建仲裁队列的示例

创建仲裁队列
management
.queue()
.name("my-quorum-queue")
.quorum() // set queue type to 'quorum'
.quorumInitialGroupSize(3) // specific to quorum queues
.deliveryLimit(3) // specific to quorum queues
.queue()
.declare();

可以查询有关队列的信息

获取队列信息
Management.QueueInfo info = management.queueInfo("my-queue");
long messageCount = info.messageCount();
int consumerCount = info.consumerCount();
String leaderNode = info.leader();

此 API 也可用于检查队列是否存在。

以下是如何删除队列

删除队列
management.queueDelete("my-queue");

绑定

管理 API 支持将队列绑定到交换机

将队列绑定到交换机
management.binding()
.sourceExchange("my-exchange")
.destinationQueue("my-queue")
.key("foo")
.bind();

还支持交换机到交换机的绑定

将交换机绑定到另一个交换机
management.binding()
.sourceExchange("my-exchange")
.destinationExchange("my-other-exchange")
.key("foo")
.bind();

也可以取消绑定实体

删除交换机和队列之间的绑定
management.unbind()
.sourceExchange("my-exchange")
.destinationQueue("my-queue")
.key("foo")
.unbind();

高级用法

生命周期监听器

应用程序可以通过添加监听器来响应某些 API 组件的状态更改。应用程序可以向连接添加监听器,以便在连接在连接后恢复时停止发布消息。然后,应用程序可以在连接恢复并再次打开时恢复发布。

以下是如何在连接上设置监听器

在连接上设置监听器
Connection connection = environment.connectionBuilder()
.listeners(context -> { // set one or several listeners
context.previousState(); // the previous state
context.currentState(); // the current (new) state
context.failureCause(); // the cause of the failure (in case of failure)
context.resource(); // the connection
}).build();

也可以在发布者实例上设置监听器

在发布者上设置监听器
Publisher publisher = connection.publisherBuilder()
.listeners(context -> {
// ...
})
.exchange("foo").key("bar")
.build();

以及在消费者实例上

在消费者上设置监听器
Consumer consumer = connection.consumerBuilder()
.listeners(context -> {
// ...
})
.queue("my-queue")
.build();

自动连接恢复

默认情况下激活自动连接恢复:客户端库将在意外关闭后自动恢复连接(例如,网络故障、节点重启等)。一旦连接恢复,自动拓扑恢复也会激活:客户端库将为恢复的连接重新创建 AMQP 实体以及发布者和消费者。开发人员不必太担心网络稳定性和节点重启,因为客户端库会处理它。

客户端每 5 秒尝试重新连接,直到成功为止。可以通过自定义退避延迟策略来更改此行为

为连接恢复设置退避策略
Connection connection = environment.connectionBuilder()
.recovery()
.backOffDelayPolicy(BackOffDelayPolicy.fixed(Duration.ofSeconds(2)))
.connectionBuilder().build();

如果拓扑恢复不适用于给定的应用程序,也可以停用拓扑恢复。应用程序通常会注册连接生命周期监听器,以了解连接何时恢复并相应地恢复其自身状态。

停用拓扑恢复
Connection connection = environment.connectionBuilder()
.recovery()
.topology(false) // deactivate topology recovery
.connectionBuilder()
.listeners(context -> {
// set listener that restores application state when connection is recovered
})
.build();

也可以完全停用恢复

停用恢复
Connection connection = environment.connectionBuilder()
.recovery()
.activated(false) // deactivate recovery
.connectionBuilder().build();
© . All rights reserved.