AMQP 1.0 客户端库
本页介绍如何使用 AMQP 1.0 客户端库与 **RabbitMQ 4.0 或更高版本** 进行交互。
RabbitMQ 团队支持以下库
应用程序开发者可以在这里找到如何使用这些库进行最常见的用例。有关许可、下载、依赖关系管理、高级和特定用法以及配置等其他信息,请参阅相应库存储库中的 README 页面。
概述
RabbitMQ 团队维护着一套 AMQP 1.0 客户端库,经过精心设计并针对 RabbitMQ 进行了优化。它们在 AMQP 1.0 之上提供了一个简单、安全且功能强大的 API。应用程序可以使用这些库发布和使用消息,并在编程语言之间以一致的方式管理服务器拓扑。这些库还提供高级功能,例如自动连接和拓扑恢复,以及与队列的连接亲和性。
RabbitMQ 与任何符合 AMQP-1.0 标准的客户端库兼容。使用 RabbitMQ AMQP 1.0 客户端库与 RabbitMQ 并非强制性要求,但强烈建议应用程序这样做,以获得最佳体验。
安全性
RabbitMQ AMQP 1.0 客户端库默认情况下是安全的,它们始终创建 持久实体 并始终发布持久消息。
保证
RabbitMQ AMQP 1.0 客户端库提供至少一次的保证。
代理始终 确认 已正确处理已发布的消息。发布者通过使用 unsettled
发送方确认模式 和 first
接收方确认模式(在创建时)来实现这一点。
消费者必须始终 指示 消息处理结果给代理。消费者在创建时使用与发布者相同的设置(first
接收方确认模式 和 unsettled
发送方确认模式)。
客户端 API
本节介绍如何使用 RabbitMQ AMQP 1.0 客户端库连接到集群,以及发布和使用消息。
连接
库提供了一个连接到节点或节点集群的入口点。它的名称是“环境”。环境允许创建连接。它可以包含连接之间共享的基础架构相关配置设置(例如,Java 的线程池)。以下是创建环境的方法
- Java
- C#
import com.rabbitmq.client.amqp.*;
import com.rabbitmq.client.amqp.impl.AmqpEnvironmentBuilder;
// ...
// create the environment instance
Environment environment = new AmqpEnvironmentBuilder()
.build();
// ...
// close the environment when the application stops
environment.close();
using RabbitMQ.AMQP.Client;
using RabbitMQ.AMQP.Client.Impl;
// ...
// create the environment instance
IEnvironment environment = await AmqpEnvironment.CreateAsync(
ConnectionSettingBuilder.Create().Build());
// ...
// close the environment when the application stops
await environment.CloseAsync();
通常,应用程序进程只有一个环境实例。应用程序必须在退出时关闭环境以释放其资源。
应用程序从环境中打开连接。它们必须指定适当的设置才能连接到集群节点(URI、凭据)。
- Java
- C#
// open a connection from the environment
Connection connection = environment.connectionBuilder()
.uri("amqp://admin:admin@localhost:5672/%2f")
.build();
// ...
// close the connection when it is no longer necessary
connection.close();
// open a connection from the environment setting
IConnection connection = await environment.CreateConnectionAsync();
//open a connection from the environment with different settings
ConnectionSettingBuilder otherSettingBuilder = ConnectionSettingBuilder.Create()
.ContainerId("my_containerId")
.Host("localhost");
IConnection connection = await environment.CreateConnectionAsync(otherSettingBuilder.Build());
// ...
// close the connection when it is no longer necessary
await connection.CloseAsync();
库默认使用 ANONYMOUS
SASL 身份验证机制。预期连接是长生命周期的对象,应用程序应避免连接频繁更换。在不再需要时,必须关闭它们。
发布
必须创建一个发布者来发布消息。发布者发布消息的目标通常在创建时设置,但也可以在每条消息的基础上设置。
以下是如何在创建时设置目标来声明发布者的示例
- Java
- C#
Publisher publisher = connection.publisherBuilder()
.exchange("foo").key("bar")
.build();
// ...
// close the publisher when it is no longer necessary
publisher.close();
// The publisher can use exchange (optionally with a key) or queue to publish messages.
IPublisher publisher = await connection.PublisherBuilder().Exchange("foo").Key("bar")
.BuildAsync();
// ...
// close the publisher when it is no longer necessary
await publisher.CloseAsync();
publisher.Dispose();
在上面的示例中,使用发布者发布的每条消息都将发送到 foo
交换机,路由键为 bar
。
RabbitMQ 使用 AMQ 0.9.1 模型,该模型包含交换机、队列和绑定。
消息是从发布者实例创建的。它们遵循 AMQP 1.0 消息格式。可以定义主体(作为字节数组)、标准属性和应用程序属性。
发布消息时,代理会以异步回调的方式指示它如何处理消息。客户端应用程序会根据代理为消息返回的状态(AMQP 术语中的“结果”)采取适当的措施(例如,如果消息未被 接受
,则将消息存储在其他位置)。
以下代码片段展示了如何创建消息、发布消息以及处理来自代理的响应
- Java
- C#
// create the message
Message message = publisher
.message("hello".getBytes(StandardCharsets.UTF_8))
.messageId(1L);
// publish the message and deal with broker feedback
publisher.publish(message, context -> {
// asynchronous feedback from the broker
if (context.status() == Publisher.Status.ACCEPTED) {
// the broker accepted (confirmed) the message
} else {
// deal with possible failure
}
});
// create the message
var message = new AmqpMessage("Hello");
// publish the message and deal with broker feedback
// The result is synchronous, use a `List<Task<PublishResult>>` to increase the performances
PublishResult pr = await publisher.PublishAsync(message);
switch (pr.Outcome.State)
{
case OutcomeState.Accepted:
// the broker accepted (confirmed) the message
break;
case OutcomeState.Released:
// the broker could not route the message anywhere
break;
case OutcomeState.Rejected:
// at least one queue rejected the message
break;
}
上面的发布者示例将消息发送到具有指定路由键的指定交换机,但这不是发布者支持的唯一目标。以下是发布者支持的非空目标
- Java
- C#
// publish to an exchange with a routing key
Publisher publisher1 = connection.publisherBuilder()
.exchange("foo").key("bar") // /exchanges/foo/bar
.build();
// publish to an exchange without a routing key
Publisher publisher2 = connection.publisherBuilder()
.exchange("foo") // /exchanges/foo
.build();
// publish to a queue
Publisher publisher3 = connection.publisherBuilder()
.queue("some-queue") // /queues/some-queue
.build();
// publish to an exchange with a routing key
Publisher publisher = await connection.PublisherBuilder()
.Exchange("foo")
.Key("bar")
.BuildAsync();
// publish to an exchange without a routing key
Publisher publisher = await connection.PublisherBuilder()
.Exchange("foo") // /exchanges/foo
.BuildAsync();
// publish to a queue
IPublisher publisher = await _connection.PublisherBuilder()
.Queue("some-queue")// /queues/some-queue
.BuildAsync();
库将 API 调用转换为 地址格式 v2。
也可以在每条消息的基础上定义目标。发布者必须在没有目标的情况下定义,并且每条消息在其属性部分的 to
字段中定义其目标。库在消息创建 API 中提供了辅助函数来定义消息目标,从而避免处理 地址格式。
以下代码片段展示了如何在没有目标的情况下创建发布者,以及如何为消息定义不同的目标类型
- Java
- C#
// no target defined on publisher creation
Publisher publisher = connection.publisherBuilder()
.build();
// publish to an exchange with a routing key
Message message1 = publisher.message()
.toAddress().exchange("foo").key("bar")
.message();
// publish to an exchange without a routing key
Message message2 = publisher.message()
.toAddress().exchange("foo")
.message();
// publish to a queue
Message message3 = publisher.message()
.toAddress().queue("my-queue")
.message();
// Not Implemented yet
使用
消费者创建
创建消费者包括指定要使用到的队列以及处理消息的回调函数
- Java
- C#
Consumer consumer = connection.consumerBuilder()
.queue("some-queue")
.messageHandler((context, message) -> {
byte[] body = message.body();
// ...
context.accept(); // settle the message
})
.build(); // do not forget to build the instance!
IConsumer consumer = await connection.ConsumerBuilder()
.Queue("some-queue")
.MessageHandler(async (context, message) =>
{
// deal with the message
await context.AcceptAsync();// settle the message
}
).BuildAndStartAsync();
应用程序处理完一条消息后,必须对其进行确认。这将指示代理处理结果以及它应该如何处理消息(例如,删除消息)。应用程序必须确认消息,否则它们会用尽 信誉,并且代理将停止向它们分发消息。
下一节介绍消息确认的语义。
消息处理结果(结果)
库允许应用程序以不同的方式确认消息。它们使用在消息应用程序上下文中尽可能明确的术语。每个术语都映射到 AMQP 规范中的 给定结果。
accept
:应用程序成功处理了消息,可以将其从队列中删除(accepted
结果)discard
:应用程序无法处理消息,因为它无效,代理可以将其丢弃,或者如果已配置,则可以将其 死信(rejected
结果)requeue
:应用程序未处理消息,代理可以将其重新排队,并将其传递给相同或不同的消费者(released
结果)
discard
和 requeue
具有可选的消息注释参数,可以将其与消息头部分中已有的注释合并。此类消息注释可用于提供有关 discard
或 requeue
原因的详细信息。应用程序特定的注释键必须以 x-opt-
前缀开头,而代理理解的注释键只以 x-
开头。discard
和 requeue
都使用带有消息注释参数的 modified
结果。
**只有队列组** 支持使用 modified
结果 修改消息注释。
消费者优雅关闭
消费者通过接受、丢弃或重新排队消息来对其进行确认。
未确认的消息在消费者关闭时会被重新排队。这会导致消息重复处理。
以下是一个示例
- 消费者针对给定消息执行数据库操作。
- 消费者在接受(确认)消息之前被关闭。
- 消息被重新排队。
- 另一个消费者获取消息并再次执行数据库操作。
很难完全避免重复消息,这就是为什么处理应该是幂等的。消费者 API 提供了一种方法,可在消费者关闭时避免重复消息。它包括暂停消息的传递,获取未确认消息的数量,以确保在某个时刻达到 0,然后关闭消费者。这确保了消费者最终已静止,并且已处理所有接收到的消息。
以下是如何优雅关闭消费者的示例
- Java
- C#
// pause the delivery of messages
consumer.pause();
// ensure the number of unsettled messages reaches 0
long unsettledMessageCount = consumer.unsettledMessageCount();
// close the consumer
consumer.close();
// pause the delivery of messages
consumer.pause();
// ensure the number of unsettled messages reaches 0
long unsettledMessageCount = consumer.UnsettledMessageCount();
// close the consumer
consumer.close();
应用程序仍然可以在不暂停的情况下关闭消费者,但存在处理相同消息多次的风险。
对流的支持
库在消费者配置中默认支持 流。
可以在 使用 流时设置要附加到流的位置
- Java
- C#
Consumer consumer = connection.consumerBuilder()
.queue("some-stream")
.stream()
.offset(ConsumerBuilder.StreamOffsetSpecification.FIRST)
.builder()
.messageHandler((context, message) -> {
// message processing
})
.build();
IConsumer consumer = await connection.ConsumerBuilder()
.Queue("some-stream")
.Stream()
.Offset(StreamOffsetSpecification.First)
.Builder()
.MessageHandler( async (context, message) => {
// message processing
})
.BuildAndStartAsync();
还支持 流过滤 配置
- Java
- C#
Consumer consumer = connection.consumerBuilder()
.queue("some-stream")
.stream()
.filterValues("invoices", "orders")
.filterMatchUnfiltered(true)
.builder()
.messageHandler((context, message) -> {
// message processing
})
.build();
IConsumer consumer = await connection.ConsumerBuilder()
.Queue("some-stream")
.Stream()
.FilterValues(["invoices", "order"])
.FilterMatchUnfiltered(true)
.Builder()
.MessageHandler(async (context, message) => {
// message processing
}
).BuildAndStartAsync();
在处理流时,请考虑使用您首选编程语言的 原生流协议 和流客户端库。
拓扑管理
应用程序可以管理 RabbitMQ 的 AMQ 0.9.1 模型:声明和删除交换器、队列和绑定。
为此,它们需要从连接中获取管理 API。
- Java
- C#
Management management = connection.management();
// ...
// close the management instance when it is no longer needed
management.close();
IManagement management = connection.Management();
// ...
// close the management instance when it is no longer needed
await management.CloseAsync()
管理 API 应该在不再需要时立即关闭。应用程序通常在启动时创建其所需的拓扑结构,因此管理对象可以在此步骤之后关闭。
交换器
以下是创建内置类型 交换器 的方法。
- Java
- C#
management.exchange()
.name("my-exchange")
.type(Management.ExchangeType.FANOUT) // enum for built-in type
.declare();
IExchangeSpecification exchangeSpec = management
.Exchange(exchangeName)
.Type(ExchangeType.TOPIC);
await exchangeSpec.DeclareAsync();
也可以将交换器类型指定为字符串(对于非内置类型交换器)。
- Java
- C#
management.exchange()
.name("my-exchange")
.type("x-delayed-message") // non-built-in type
.autoDelete(false)
.argument("x-delayed-type", "direct")
.declare();
// Not Implemented yet
以下是删除交换器的方法。
- Java
- C#
management.exchangeDeletion().delete("my-exchange");
await management.Exchange("my-exchange").DeleteAsync();
队列
- Java
- C#
management.queue()
.name("my-queue")
.exclusive(true)
.autoDelete(false)
.declare();
IQueueSpecification queueSpec = management
.Queue("myqueue")
.Exclusive(true)
.AutoDelete(false)
await queueSpec.DeclareAsync();
管理 API 明确支持 队列参数。
- Java
- C#
management.queue()
.name("my-queue")
.type(Management.QueueType.CLASSIC)
.messageTtl(Duration.ofMinutes(10))
.maxLengthBytes(ByteCapacity.MB(100))
.declare();
IQueueSpecification queueSpec = management
.Queue("my-queue")
.Type(QueueType.CLASSIC)
.MessageTtl(TimeSpan.FromMinutes(10))
.MaxLengthBytes(ByteCapacity.Mb(100));
await queueSpec.DeclareAsync();
管理 API 还区分所有队列类型共享的参数和仅对特定类型有效的参数。以下是如何创建 仲裁队列 的示例。
- Java
- C#
management
.queue()
.name("my-quorum-queue")
.quorum() // set queue type to 'quorum'
.quorumInitialGroupSize(3) // specific to quorum queues
.deliveryLimit(3) // specific to quorum queues
.queue()
.declare();
IQueueSpecification queueSpec = management
.Queue("my-quorum-queue")
.Quorum() // set queue type to 'quorum'
.QuorumInitialGroupSize(3) // specific to quorum queues
.DeliveryLimit(3) // specific to quorum queues
.Queue();
await queueSpec.DeclareAsync();
可以查询有关队列的信息。
- Java
- C#
Management.QueueInfo info = management.queueInfo("my-queue");
long messageCount = info.messageCount();
int consumerCount = info.consumerCount();
String leaderNode = info.leader();
IQueueInfo queueInfo = await management.GetQueueInfoAsync("my-queue");
ulong messageCount = queueInfo.MessageCount();
uint consumerCount = queueInfo.ConsumerCount();
string leader = queueInfo.Leader();
此 API 还可以用于检查队列是否存在。
以下是删除队列的方法。
- Java
- C#
management.queueDeletion().delete("my-queue");
await management.Queue("myqueue").DeleteAsync();
绑定
管理 API 支持将 绑定 到交换器。
- Java
- C#
management.binding()
.sourceExchange("my-exchange")
.destinationQueue("my-queue")
.key("foo")
.bind();
IBindingSpecification bindingSpec = management.Binding()
.SourceExchange("my-exchange")
.DestinationQueue("my-queue")
.Key("foo");
await bindingSpec.BindAsync();
还支持 交换器到交换器绑定。
- Java
- C#
management.binding()
.sourceExchange("my-exchange")
.destinationExchange("my-other-exchange")
.key("foo")
.bind();
IBindingSpecification bindingSpec = management.Binding()
.SourceExchange("my-exchange")
.DestinationExchange("my-other-exchange")
.Key("foo");
await bindingSpec.BindAsync();
还可以取消绑定实体。
- Java
- C#
management.unbind()
.sourceExchange("my-exchange")
.destinationQueue("my-queue")
.key("foo")
.unbind();
IBindingSpecification bindingSpec = management.Binding()
.SourceExchange("my-exchange")
.DestinationQueue("my-queue")
.Key("foo");
await bindingSpec.UnbindAsync();
高级用法
生命周期监听器
应用程序可以通过添加监听器来对某些 API 组件的状态更改做出反应。应用程序可以向连接添加监听器,以便在连接在连接恢复后停止发布消息。然后,应用程序可以在连接恢复并再次打开时恢复发布。
以下是如何在连接上设置监听器。
- Java
Connection connection = environment.connectionBuilder()
.listeners(context -> { // set one or several listeners
context.previousState(); // the previous state
context.currentState(); // the current (new) state
context.failureCause(); // the cause of the failure (in case of failure)
context.resource(); // the connection
}).build();
还可以为发布者实例设置监听器。
- Java
Publisher publisher = connection.publisherBuilder()
.listeners(context -> {
// ...
})
.exchange("foo").key("bar")
.build();
以及在消费者实例上。
- Java
Consumer consumer = connection.consumerBuilder()
.listeners(context -> {
// ...
})
.queue("my-queue")
.build();
自动连接恢复
自动连接恢复默认情况下处于激活状态:客户端库将在意外关闭(例如网络故障、节点重启等)后自动恢复连接。自动拓扑恢复也在连接恢复后立即激活:客户端库将重新创建 AMQP 实体,以及恢复连接的发布者和消费者。开发人员不必担心网络稳定性和节点重启,因为客户端库会处理这些问题。
客户端尝试每 5 秒重新连接一次,直到成功。可以通过自定义退避延迟策略来更改此行为。
- Java
Connection connection = environment.connectionBuilder()
.recovery()
.backOffDelayPolicy(BackOffDelayPolicy.fixed(Duration.ofSeconds(2)))
.connectionBuilder().build();
如果拓扑恢复不适合特定应用程序,还可以停用它。应用程序通常会注册一个连接 生命周期监听器,以便在连接恢复时了解这一点,并相应地恢复其自身状态。
- Java
Connection connection = environment.connectionBuilder()
.recovery()
.topology(false) // deactivate topology recovery
.connectionBuilder()
.listeners(context -> {
// set listener that restores application state when connection is recovered
})
.build();
还可以完全停用恢复。
- Java
Connection connection = environment.connectionBuilder()
.recovery()
.activated(false) // deactivate recovery
.connectionBuilder().build();