跳至主要内容

Java 客户端 API 指南

概述

本指南涵盖了RabbitMQ Java 客户端及其公共 API。它假设使用了客户端的最新主要版本,并且读者熟悉基础知识

指南的关键部分是

An API 参考 (JavaDoc) 单独提供。

支持时间线

请参见RabbitMQ Java 库支持页面以了解支持时间线。

JDK 和 Android 版本支持

此库的 5.x 版本系列需要 JDK 8,用于编译和运行时。在 Android 上,这意味着仅支持Android 7.0 或更高版本

4.x 版本系列支持 JDK 6 和 Android 7.0 之前的版本。

许可

该库是开源的,在GitHub 上开发,并采用三项许可:

这意味着用户可以认为该库是根据上述列表中的任何许可证授权的。例如,用户可以选择 Apache 公共许可证 2.0 并将此客户端包含到商业产品中。根据 GPLv2 许可的代码库可以选择 GPLv2,依此类推。

概述

客户端 API 公开了AMQP 0-9-1 协议模型中的关键实体,并提供了额外的抽象以方便使用。

RabbitMQ Java 客户端使用 com.rabbitmq.client 作为其顶级包。关键类和接口是

  • Channel:表示 AMQP 0-9-1 通道,并提供大多数操作(协议方法)。
  • Connection:表示 AMQP 0-9-1 连接
  • ConnectionFactory:构造 Connection 实例
  • Consumer:表示消息消费者
  • DefaultConsumer:常用的消费者基类
  • BasicProperties:消息属性(元数据)
  • BasicProperties.Builder:BasicProperties 的构建器

协议操作可通过 Channel 接口获得。Connection 用于打开通道、注册连接生命周期事件处理程序和关闭不再需要的连接。Connection 实例通过 ConnectionFactory 实例化,这是配置各种连接设置(例如虚拟主机或用户名)的方式。

连接和通道

核心 API 类是 ConnectionChannel,分别表示 AMQP 0-9-1 连接和通道。它们通常在使用之前导入

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

连接到 RabbitMQ

以下代码使用给定的参数(主机名、端口号等)连接到 RabbitMQ 节点

ConnectionFactory factory = new ConnectionFactory();
// "guest"/"guest" by default, limited to localhost connections
factory.setUsername(userName);
factory.setPassword(password);
factory.setVirtualHost(virtualHost);
factory.setHost(hostName);
factory.setPort(portNumber);

Connection conn = factory.newConnection();

所有这些参数对于在本地运行的 RabbitMQ 节点都有合理的默认值。

如果属性在创建连接之前未分配,则将使用该属性的默认值。

属性默认值
用户名"guest"
密码"guest"
虚拟主机"/"
主机名"localhost"
端口

对于常规连接为 5672,对于使用 TLS 的连接5671

使用 URI 连接

或者,可以使用URI

ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://userName:password@hostName:portNumber/virtualHost");
Connection conn = factory.newConnection();

所有这些参数对于在本地运行的默认 RabbitMQ 服务器都有合理的默认值。

成功和不成功的客户端连接事件可在服务器节点日志中观察到

请注意,用户 guest 默认只能从 localhost 连接。这是为了限制在生产系统中使用众所周知的凭据。

应用程序开发人员可以为连接分配自定义名称。如果设置了,该名称将在 RabbitMQ 节点日志以及管理 UI中提及。

然后可以使用 Connection 接口打开一个通道

Channel channel = conn.createChannel();

现在可以使用该通道来发送和接收消息,如后续部分所述。

使用端点列表

可以指定一个端点列表,在连接时使用它们。将使用第一个可达的端点。在发生连接故障的情况下,使用端点列表使应用程序能够在原始节点关闭时连接到其他节点。

要使用多个端点,请向 ConnectionFactory#newConnection 提供一个 Address 列表。Address 表示主机名和端口对。

Address[] addrArr = new Address[]{ new Address(hostname1, portnumber1)
, new Address(hostname2, portnumber2)};
Connection conn = factory.newConnection(addrArr);

将尝试连接到 hostname1:portnumber1,如果失败,则连接到 hostname2:portnumber2。返回的连接是数组中第一个成功的连接(没有抛出 IOException)。这与在工厂上反复设置主机和端口、每次调用 factory.newConnection() 直到其中一个成功完全等效。

如果还提供了 ExecutorService(使用 factory.newConnection(es, addrArr) 形式),则线程池将与(第一个)成功的连接相关联。

如果你想要更多地控制要连接的主机,请参见对服务发现的支持

断开与 RabbitMQ 的连接

要断开连接,只需关闭通道和连接

channel.close();
conn.close();

请注意,关闭通道可能被认为是最佳实践,但这里并不是严格必要的 - 在底层连接关闭时会自动完成。

客户端断开连接事件可在服务器节点日志中观察到

连接和通道生命周期

客户端连接应该长寿。底层协议是为长运行连接而设计和优化的。这意味着每操作打开一个新连接(例如发布一条消息)是没必要的,而且强烈反对,因为它会引入很多网络往返和开销。

通道也应该长寿,但由于许多可恢复的协议错误会导致通道关闭,因此通道生命周期可能比其连接短。每操作关闭和打开新通道通常没有必要,但可能是合适的。如有疑问,请先考虑重用通道。

通道级别异常(例如尝试从不存在的队列中消费)会导致通道关闭。关闭的通道不再可以使用,也不会从服务器接收任何事件(例如消息传递)。通道级别异常将由 RabbitMQ 记录,并将启动该通道的关闭序列(见下文)。

客户端提供的连接名称

RabbitMQ 节点对其实验的了解有限

  • 它们的 TCP 端点(源 IP 地址和端口)
  • 使用的凭据

仅凭这些信息,识别应用程序和实例可能很困难,特别是在凭据可以共享,并且客户端通过负载均衡器连接但代理协议无法启用时。

为了便于在服务器日志管理 UI中识别客户端,AMQP 0-9-1 客户端连接(包括 RabbitMQ Java 客户端)可以提供自定义标识符。如果设置了,该标识符将在日志条目和管理 UI 中提及。该标识符称为客户端提供的连接名称。该名称可用于识别应用程序或应用程序中的特定组件。该名称是可选的;但是,强烈建议开发人员提供一个,因为它将极大地简化某些操作任务。

RabbitMQ Java 客户端的ConnectionFactory#newConnection 方法重写接受客户端提供的连接名称。以下是上面使用的修改后的连接示例,它提供了此类名称

ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://userName:password@hostName:portNumber/virtualHost");
// provides a custom connection name
Connection conn = factory.newConnection("app:audit component:event-consumer");

使用交换机和队列

客户端应用程序使用[交换机]和队列,即协议的高级构建块。必须在使用它们之前声明它们。声明这两种类型的对象只是确保存在一个同名的对象,如果必要则创建它。

继续前面的示例,以下代码声明了一个交换机和一个服务器命名的队列,然后将它们绑定在一起。

channel.exchangeDeclare(exchangeName, "direct", true);
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, exchangeName, routingKey);

这将积极声明以下对象,它们都可以通过使用其他参数进行自定义。这里它们都没有任何特殊参数。

  • 一个持久、非自动删除的类型为 "direct" 的交换机
  • 一个非持久、独占、自动删除的队列,具有生成的名称

上面的函数调用然后使用给定的路由键将队列绑定到交换机。

请注意,这将是声明队列的典型方式,当只有一个客户端希望使用它时:它不需要一个众所周知的名称,没有其他客户端可以使用它(独占的)并且会自动清除(自动删除)。如果多个客户端希望与具有众所周知的名称的队列共享,则此代码将适用。

channel.exchangeDeclare(exchangeName, "direct", true);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);

这将主动声明

  • 一个持久、非自动删除的类型为 "direct" 的交换机
  • 一个持久、非独占、非自动删除的队列,具有众所周知的名称。

许多Channel API 方法是重载的。这些方便的exchangeDeclarequeueDeclarequeueBind 的简短形式使用合理的默认值。还存在具有更多参数的更长形式,以允许您根据需要覆盖这些默认值,在需要时提供完全控制。

这种“简短形式、长形式”模式贯穿整个客户端 API 使用。

被动声明

队列和交换机可以被“被动地”声明。被动声明只是检查具有提供名称的实体是否存在。如果存在,则操作将是无操作。对于队列,成功的被动声明将返回与非被动声明相同的信息,即队列中就绪状态的消费者和消息数量。

如果实体不存在,则操作将失败,并出现通道级异常。此后无法使用该通道。应打开一个新的通道。通常使用一次性(临时)通道进行被动声明。

Channel#queueDeclarePassiveChannel#exchangeDeclarePassive 是用于被动声明的方法。以下示例演示了Channel#queueDeclarePassive

Queue.DeclareOk response = channel.queueDeclarePassive("queue-name");
// returns the number of messages in Ready state in the queue
response.getMessageCount();
// returns the number of consumers the queue has
response.getConsumerCount();

Channel#exchangeDeclarePassive 的返回值不包含任何有用的信息。因此,如果方法返回并且没有发生通道异常,则表示交换机确实存在。

具有可选响应的操作

一些常见操作也具有“不等待”版本,它不会等待服务器响应。例如,要声明一个队列并指示服务器不要发送任何响应,请使用

channel.queueDeclareNoWait(queueName, true, false, false, null);

“不等待”版本效率更高,但提供更低的安全性保证,例如,它们更多地依赖于心跳机制来检测失败的操作。如有疑问,请从标准版本开始。“不等待”版本仅在拓扑结构(队列、绑定)变化率很高的场景中需要。

删除实体和清除消息

可以显式删除队列或交换机。

channel.queueDelete("queue-name")

只有在队列为空时才能删除队列

channel.queueDelete("queue-name", false, true)

或者,如果它没有被使用(没有消费者)。

channel.queueDelete("queue-name", true, false)

可以清除队列(删除其所有消息)。

channel.queuePurge("queue-name")

发布消息

要向交换机发布消息,请使用Channel.basicPublish,如下所示。

byte[] messageBodyBytes = "Hello, world!".getBytes();
channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);

为了精细控制,请使用重载变体来指定mandatory 标志,或发送带有预设消息属性的消息(有关详细信息,请参阅发布者指南)。

channel.basicPublish(exchangeName, routingKey, mandatory,
MessageProperties.PERSISTENT_TEXT_PLAIN,
messageBodyBytes);

这会发送一条传递模式为 2(持久)、优先级为 1 且内容类型为“text/plain”的消息。使用Builder 类构建一个消息属性对象,其中包含尽可能多的属性,例如。

channel.basicPublish(exchangeName, routingKey,
new AMQP.BasicProperties.Builder()
.contentType("text/plain")
.deliveryMode(2)
.priority(1)
.userId("bob")
.build(),
messageBodyBytes);

此示例发布带有自定义标头的消息。

Map<String, Object> headers = new HashMap<String, Object>();
headers.put("latitude", 51.5252949);
headers.put("longitude", -0.0905493);

channel.basicPublish(exchangeName, routingKey,
new AMQP.BasicProperties.Builder()
.headers(headers)
.build(),
messageBodyBytes);

此示例发布带有过期时间的消息。

channel.basicPublish(exchangeName, routingKey,
new AMQP.BasicProperties.Builder()
.expiration("60000")
.build(),
messageBodyBytes);

这只是一组简短的示例,并未演示所有支持的属性。

请注意,BasicProperties 是外部类AMQP 的内部类。

如果资源驱动的警报生效,则Channel#basicPublish 的调用最终会阻塞。

通道和并发注意事项(线程安全性)

应避免在线程之间共享Channel 实例。应用程序应为每个线程使用一个Channel,而不是在多个线程之间共享同一个Channel

虽然对通道的一些操作可以安全地并发调用,但有些操作不能并发调用,会导致网络上的帧交错不正确、双重确认等。

在共享通道上并发发布会导致网络上的帧交错不正确,从而触发连接级协议异常,并由代理立即关闭连接。因此,它需要应用程序代码中的显式同步(Channel#basicPublish 必须在临界区内调用)。在线程之间共享通道还会干扰发布者确认。最好完全避免在共享通道上并发发布,例如,通过为每个线程使用一个通道。

可以使用通道池来避免在共享通道上并发发布:线程完成对通道的操作后,将其返回给池,使通道可供另一个线程使用。通道池可以被认为是一种特定的同步解决方案。建议使用现有的池库,而不是自己开发的解决方案。例如,Spring AMQP 带有现成的通道池功能。

通道会消耗资源,在大多数情况下,应用程序很少需要在同一个 JVM 进程中打开数百个通道。如果我们假设应用程序为每个通道都有一个线程(因为通道不应该并发使用),那么单个 JVM 的数千个线程已经是一个相当大的开销,很可能可以避免。此外,一些快速发布者可以轻松地使网络接口和代理节点饱和:发布涉及的工作量少于路由、存储和传递消息。

一个经典的要避免的反模式是为每个发布的消息打开一个通道。通道应该具有相当长的生存期,打开一个新的通道是一个网络往返,这使得这种模式效率极低。

在一个线程中消费,在另一个线程中发布到共享通道上是安全的。

服务器推送的传递(见下文)与保证每个通道排序的并行调度。调度机制使用java.util.concurrent.ExecutorService,每个连接一个。可以使用ConnectionFactory#setSharedExecutor 设置器提供一个自定义执行器,该执行器将由单个ConnectionFactory 生成的所有连接共享。

当使用手动确认 时,重要的是要考虑哪个线程进行确认。如果它与接收传递的线程不同(例如,Consumer#handleDelivery 将传递处理委托给另一个线程),则使用multiple 参数设置为true 进行确认是不安全的,会导致双重确认,因此会导致关闭通道的通道级协议异常。一次确认一条消息是安全的。

通过订阅接收消息(“推送 API”)

import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;

接收消息最有效的方式是使用Consumer 接口设置订阅。然后,消息将随着它们的到达自动传递,而不是必须显式请求。

当调用与Consumer 相关的 API 方法时,单个订阅总是由其消费者标签引用。消费者标签是消费者标识符,可以由客户端或服务器生成。要让 RabbitMQ 生成一个节点范围唯一的标签,请使用不带消费者标签参数的Channel#basicConsume 重载,或为消费者标签传递空字符串,并使用Channel#basicConsume 返回的值。消费者标签用于取消消费者。

不同的Consumer 实例必须具有不同的消费者标签。连接上的重复消费者标签强烈建议避免,会导致自动连接恢复出现问题,以及在监控消费者时混淆监控数据。

实现Consumer 最简单的方法是子类化便利类DefaultConsumer。可以将此子类的对象传递给basicConsume 调用,以设置订阅。

boolean autoAck = false;
channel.basicConsume(queueName, autoAck, "myConsumerTag",
new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException
{
String routingKey = envelope.getRoutingKey();
String contentType = properties.getContentType();
long deliveryTag = envelope.getDeliveryTag();
<i>// (process the message components here ...)</i>
channel.basicAck(deliveryTag, false);
}
});

在这里,由于我们指定了autoAck = false,因此有必要确认传递给Consumer 的消息,最方便的方式是在handleDelivery 方法中完成,如例所示。

更复杂的Consumer 需要进一步覆盖方法。特别是,当通道和连接关闭时,会调用handleShutdownSignal,并且handleConsumeOk 会在调用该Consumer 的任何其他回调之前传递消费者标签。

Consumer 还可以实现handleCancelOkhandleCancel 方法,以分别接收显式和隐式取消的通知。

可以使用Channel.basicCancel 显式取消特定的Consumer

channel.basicCancel(consumerTag);

传递消费者标签。

与发布者一样,重要的是要考虑消费者的并发危害安全性。

Consumer 的回调是在与实例化其Channel 的线程分开的线程池中调度的。这意味着Consumer 可以安全地调用ConnectionChannel 上的阻塞方法,例如Channel#queueDeclareChannel#basicCancel

每个Channel 会按 RabbitMQ 发送的顺序,在其上按顺序调度所有传递到其Consumer 处理程序方法。通道之间传递的顺序不保证:这些传递可以并行调度。

对于最常见的每个Channel 一个Consumer 的用例,这意味着Consumer 不会阻碍其他Consumer。如果每个Channel 多个Consumer,请注意,长时间运行的Consumer 可能会阻碍对该Channel 上其他Consumer 的回调调度。

请参考并发注意事项(线程安全性)部分,以了解有关并发和并发危害安全性的其他主题。

检索单个消息(“拉取 API”)

还可以按需检索单个消息(“拉取 API”又名轮询)。这种消费方法效率极低,因为它实际上是轮询,应用程序必须反复请求结果,即使绝大多数请求没有产生结果。因此,强烈建议不要使用这种方法。

要“拉取”消息,请使用Channel.basicGet 方法。返回值是GetResponse 的实例,可以从中提取标题信息(属性)和消息主体。

boolean autoAck = false;
GetResponse response = channel.basicGet(queueName, autoAck);
if (response == null) {
// No message retrieved.
} else {
AMQP.BasicProperties props = response.getProps();
byte[] body = response.getBody();
long deliveryTag = response.getEnvelope().getDeliveryTag();
// ...

并且由于此示例使用手动确认(上面的autoAck = false),因此您还必须调用Channel.basicAck 来确认您已成功接收消息。

// ...
channel.basicAck(method.deliveryTag, false); // acknowledge receipt of the message
}

处理不可路由消息

如果发布的消息设置了“强制”标志,但无法路由,则代理将将其返回给发送客户端(通过AMQP.Basic.Return 命令)。

为了接收此类返回消息,客户端可以实现 ReturnListener 接口并调用 Channel.addReturnListener 方法。如果客户端没有为特定通道配置返回监听器,则关联的返回消息将被静默丢弃。

channel.addReturnListener(new ReturnListener() {
public void handleReturn(int replyCode,
String replyText,
String exchange,
String routingKey,
AMQP.BasicProperties properties,
byte[] body)
throws IOException {
...
}
});

例如,如果客户端发布带有 "mandatory" 标志设置为 "direct" 类型且未绑定到队列的交换机的消息,则将调用返回监听器。

关闭协议

客户端关闭过程概述

AMQP 0-9-1 连接和通道在管理网络故障、内部故障和显式本地关闭方面采用了相同的一般方法。

AMQP 0-9-1 连接和通道具有以下生命周期状态

  • open: 对象已准备好使用

  • closing: 对象已明确收到本地关闭通知,已向任何支持的下层对象发出关闭请求,并正在等待它们完成关闭过程。

  • closed: 对象已从任何下层对象接收到所有关闭完成通知,因此已自行关闭。

这些对象最终都会进入关闭状态,无论导致关闭的原因是什么,例如应用程序请求、内部客户端库故障、远程网络请求或网络故障。

连接和通道对象拥有以下与关闭相关的 方法

  • addShutdownListener(ShutdownListener listener)

  • removeShutdownListener(ShutdownListener listener),用于管理任何监听器,这些监听器将在对象过渡到 closed 状态时触发。请注意,向已关闭的对象添加 ShutdownListener 将立即触发监听器。

  • getCloseReason(),允许调查对象关闭的原因。

  • isOpen(),用于测试对象是否处于打开状态。

  • close(int closeCode, String closeMessage),用于明确通知对象关闭。

监听器的简单用法如下所示

import com.rabbitmq.client.ShutdownSignalException;
import com.rabbitmq.client.ShutdownListener;

connection.addShutdownListener(new ShutdownListener() {
public void shutdownCompleted(ShutdownSignalException cause)
{
...
}
});

有关关闭情况的信息

可以通过显式调用 getCloseReason() 方法或使用 ShutdownListener 类中的 service(ShutdownSignalException cause) 方法中的 cause 参数来检索包含有关关闭原因的所有可用信息的 ShutdownSignalException

ShutdownSignalException 类提供用于分析关闭原因的方法。通过调用 isHardError() 方法,我们可以获取有关它是连接错误还是通道错误的信息,getReason() 返回有关原因的信息,形式为 AMQP 方法 - AMQP.Channel.CloseAMQP.Connection.Close(如果原因是库中的某些异常,例如网络通信故障,则为 null,在这种情况下,可以使用 getCause() 检索该异常)。

public void shutdownCompleted(ShutdownSignalException cause)
{
if (cause.isHardError())
{
Connection conn = (Connection)cause.getReference();
if (!cause.isInitiatedByApplication())
{
Method reason = cause.getReason();
...
}
...
} else {
Channel ch = (Channel)cause.getReference();
...
}
}

原子性和 isOpen() 方法的使用

不建议在生产代码中使用通道和连接对象的 isOpen() 方法,因为方法返回的值取决于关闭原因的存在。以下代码说明了竞态条件的可能性

public void brokenMethod(Channel channel)
{
if (channel.isOpen())
{
// The following code depends on the channel being in open state.
// However there is a possibility of the change in the channel state
// between isOpen() and basicQos(1) call
...
channel.basicQos(1);
}
}

相反,我们通常应该忽略这种检查,而只是尝试执行所需的操作。如果在代码执行期间通道或连接关闭,则会抛出 ShutdownSignalException,指示对象处于无效状态。我们还应该捕获由 SocketException(当代理意外关闭连接时)或 ShutdownSignalException(当代理发起干净关闭时)引起的 IOException

public void validMethod(Channel channel)
{
try {
...
channel.basicQos(1);
} catch (ShutdownSignalException sse) {
// possibly check if channel was closed
// by the time we started action and reasons for
// closing it
...
} catch (IOException ioe) {
// check why connection was closed
...
}
}

高级连接选项

消费者操作线程池

Consumer 线程(见下文的 接收)默认情况下会在新的 ExecutorService 线程池中自动分配。如果需要更细粒度的控制,请在 newConnection() 方法中提供 ExecutorService,以便使用此线程池。以下示例提供了一个比通常分配的更大的线程池。

ExecutorService es = Executors.newFixedThreadPool(20);
Connection conn = factory.newConnection(es);

ExecutorsExecutorService 类都位于 java.util.concurrent 包中。

当连接关闭时,默认的 ExecutorService 将被 shutdown(),但用户提供的 ExecutorService(如上面的 es不会shutdown()。提供自定义 ExecutorService 的客户端必须确保最终关闭它(通过调用其 shutdown() 方法),否则池的线程可能会阻止 JVM 终止。

相同的执行器服务可以在多个连接之间共享,或者在重新连接时串行重复使用,但它在被 shutdown() 之后不能使用。

只有在有证据表明处理 Consumer 回调存在严重瓶颈时才应考虑使用此功能。如果没有执行 Consumer 回调,或者很少执行,默认分配就足够了。即使偶尔发生消费者活动的突发,开销最初也是最小的,分配的总线程资源也是有限的。

使用 AddressResolver 接口进行服务发现

可以使用 AddressResolver 的实现来更改连接时使用的端点解析算法。

Connection conn = factory.newConnection(addressResolver);

AddressResolver 接口类似于以下内容

public interface AddressResolver {

List<Address> getAddresses() throws IOException;

}

端点列表 一样,将首先尝试返回的第一个 Address,如果客户端无法连接到第一个,则尝试第二个,依此类推。

如果还提供了 ExecutorService(使用 factory.newConnection(es, addressResolver) 形式),则线程池将与(第一个)成功连接相关联。

AddressResolver 是实现自定义服务发现逻辑的理想场所,这在动态基础设施中特别有用。结合 自动恢复,客户端可以自动连接到启动时甚至不存在的节点。亲和性和负载均衡是自定义 AddressResolver 可能有用的其他场景。

Java 客户端附带以下实现(有关详细信息,请参阅 javadoc)

  • DnsRecordIpAddressResolver: 给定主机的名称,返回其 IP 地址(针对平台 DNS 服务器进行解析)。这对于简单的基于 DNS 的负载均衡或故障转移很有用。

  • DnsSrvRecordAddressResolver: 给定服务的名称,返回主机名/端口对。搜索通过 DNS SRV 请求实现。这在使用服务注册表(如 HashiCorp Consul)时很有用。

心跳超时

有关心跳以及如何在 Java 客户端中配置心跳的更多信息,请参阅 心跳指南

自定义线程工厂

Google App Engine (GAE) 等环境可以 限制直接线程实例化。要在这些环境中使用 RabbitMQ Java 客户端,需要配置一个自定义 ThreadFactory,该工厂使用适当的方法实例化线程,例如 GAE 的 ThreadManager

以下是 Google App Engine 的示例。

import com.google.appengine.api.ThreadManager;

ConnectionFactory cf = new ConnectionFactory();
cf.setThreadFactory(ThreadManager.backgroundThreadFactory());

支持 Java 非阻塞 IO

Java 客户端的 4.0 版本带来了对 Java 非阻塞 IO(也称为 Java NIO)的支持。NIO 不应该比阻塞 IO 更快,它只是允许更轻松地控制资源(在本例中为线程)。

在默认阻塞 IO 模式下,每个连接使用一个线程从网络套接字中读取数据。在 NIO 模式下,您可以控制从/向网络套接字读写数据的线程数量。

如果您的 Java 进程使用多个连接(数十个或数百个),请使用 NIO 模式。您应该使用比默认阻塞模式更少的线程。使用适当数量的线程设置,您不应该遇到任何性能下降,尤其是在连接不太繁忙的情况下。

必须显式启用 NIO。

ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.useNio();

可以通过 NioParams 类配置 NIO 模式。

  connectionFactory.setNioParams(new NioParams().setNbIoThreads(4));

NIO 模式使用合理的默认值,但您可能需要根据自己的工作负载更改它们。一些设置包括:使用的总 IO 线程数量、缓冲区的大小、用于 IO 循环的服务执行器、内存中写入队列的参数(写入请求在发送到网络之前排队)。有关详细信息和默认值,请阅读 Javadoc。

从网络故障中自动恢复

连接恢复

客户端和 RabbitMQ 节点之间的网络连接可能会失败。RabbitMQ Java 客户端支持连接和拓扑(队列、交换机、绑定和消费者)的自动恢复。

对于许多应用程序,自动恢复过程遵循以下步骤

  • 重新连接
  • 恢复连接监听器
  • 重新打开通道
  • 恢复通道监听器
  • 恢复通道 basic.qos 设置、发布者确认和事务设置

拓扑恢复包括以下操作,这些操作针对每个通道执行

  • 重新声明交换机(预定义交换机除外)
  • 重新声明队列
  • 恢复所有绑定
  • 恢复所有消费者

从 Java 客户端的 4.0.0 版本开始,默认情况下启用自动恢复(因此也启用拓扑恢复)。

拓扑恢复依赖于每个连接的实体(队列、交换机、绑定、消费者)缓存。例如,当在连接上声明一个队列时,它将被添加到缓存中。当它被删除或计划删除时(例如,因为它被 自动删除),它将被删除。此模型有一些局限性,将在下面介绍。

要禁用或启用自动连接恢复,请使用 factory.setAutomaticRecoveryEnabled(boolean) 方法。以下代码段显示了如何显式启用自动恢复(例如,对于 4.0.0 之前的 Java 客户端)

ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(userName);
factory.setPassword(password);
factory.setVirtualHost(virtualHost);
factory.setHost(hostName);
factory.setPort(portNumber);
factory.setAutomaticRecoveryEnabled(true);
// connection that will recover automatically
Connection conn = factory.newConnection();

如果恢复因异常而失败(例如,RabbitMQ 节点仍然不可达),它将在固定时间间隔后(默认值为 5 秒)重试。间隔时间可以配置。

ConnectionFactory factory = new ConnectionFactory();
// attempt recovery every 10 seconds
factory.setNetworkRecoveryInterval(10000);

当提供地址列表时,该列表将被随机排列,并将依次尝试所有地址。

ConnectionFactory factory = new ConnectionFactory();

Address[] addresses = {new Address("192.168.1.4"), new Address("192.168.1.5")};
factory.newConnection(addresses);

何时会触发连接恢复?

如果启用自动连接恢复,则以下事件将触发它

  • 连接的 IO 循环中抛出 IO 异常
  • 套接字读取操作超时
  • 检测到丢失的服务器 心跳
  • 连接的 I/O 循环中抛出任何其他意外异常

以先发生者为准。

如果客户端初始连接到 RabbitMQ 节点失败,则不会启动自动连接恢复。应用程序开发人员负责重试此类连接,记录失败的尝试,实施重试次数限制等。以下是一个非常基本的示例

ConnectionFactory factory = new ConnectionFactory();
// configure various connection settings

try {
Connection conn = factory.newConnection();
} catch (java.net.ConnectException e) {
Thread.sleep(5000);
// apply retry logic
}

当连接通过 Connection.Close 方法由应用程序关闭时,不会启动连接恢复。

通道级异常不会触发任何类型的恢复,因为它们通常表明应用程序中的语义问题(例如尝试从不存在的队列中消费)。

恢复监听器

可以在可恢复的连接和通道上注册一个或多个恢复监听器。当启用连接恢复时,ConnectionFactory#newConnectionConnection#createChannel 返回的连接实现 com.rabbitmq.client.Recoverable,提供两个具有相当描述性名称的方法

  • addRecoveryListener
  • removeRecoveryListener

请注意,您目前需要将连接和通道强制转换为 Recoverable 才能使用这些方法。

对发布的影响

使用 Channel.basicPublish 在连接关闭时发布的消息将丢失。客户端不会在连接恢复后将它们排队以进行传递。为了确保已发布的消息到达 RabbitMQ,应用程序需要使用 发布确认 并考虑连接故障。

拓扑恢复

拓扑恢复涉及交换机、队列、绑定和消费者的恢复。当启用自动恢复时,默认情况下启用它。在现代版本的客户端中,默认情况下启用拓扑恢复。

如果需要,可以显式禁用拓扑恢复

ConnectionFactory factory = new ConnectionFactory();

Connection conn = factory.newConnection();
// enable automatic recovery (e.g. Java client prior 4.0.0)
factory.setAutomaticRecoveryEnabled(true);
// disable topology recovery
factory.setTopologyRecoveryEnabled(false);

故障检测和恢复限制

自动连接恢复具有一些应用程序开发人员需要注意的限制和有意设计决策。

拓扑恢复依赖于每个连接的实体(队列、交换机、绑定、消费者)缓存。例如,当在连接上声明队列时,它将被添加到缓存中。当它被删除或计划删除时(例如,因为它 自动删除),它将被删除。这使得能够在不同的通道上声明和删除实体,而不会产生意外的结果。这也意味着消费者标签(特定于通道的标识符)必须在使用自动连接恢复的连接的所有通道上唯一。

当连接关闭或丢失时,需要时间才能检测到。因此,存在一个时间窗口,在此期间,库和应用程序都无法识别有效的连接故障。在此时间范围内发布的任何消息都会像往常一样被序列化并写入 TCP 套接字。只有通过 发布确认 才能保证它们传递到代理:根据设计,AMQP 0-9-1 中的发布完全是异步的。

当启用自动恢复的连接检测到套接字或 I/O 操作错误时,恢复将在可配置的延迟后开始,默认情况下为 5 秒。此设计假设即使许多网络故障是短暂的,并且通常持续时间很短,但它们不会立即消失。存在延迟还可以避免服务器端资源清理(例如 独占或自动删除队列 删除)与在相同资源上新打开的连接上执行的操作之间固有的竞争条件。

默认情况下,连接恢复尝试将以相同的时间间隔继续,直到成功打开新的连接。可以通过将 RecoveryDelayHandler 实现实例提供给 ConnectionFactory#setRecoveryDelayHandler 来使恢复延迟变得动态。使用动态计算延迟间隔的实现应避免过低的值(例如低于 2 秒的值)。

当连接处于恢复状态时,对其通道上的任何发布尝试都将被拒绝并抛出异常。客户端当前不会执行此类传出消息的任何内部缓冲。跟踪此类消息并在恢复成功后重新发布它们是应用程序开发人员的责任。发布确认 是一个协议扩展,应该由无法承受消息丢失的发布者使用。

当通道由于通道级异常关闭时,连接恢复将不会启动。此类异常通常表明应用程序级问题。库无法对何时发生这种情况做出明智的决定。

即使连接恢复启动,关闭的通道也不会恢复。这包括显式关闭的通道和上面的通道级异常情况。

手动确认和自动恢复

当使用手动确认时,可能在消息传递和确认之间与 RabbitMQ 节点的网络连接失败。连接恢复后,RabbitMQ 将重置所有通道上的传递标签。

这意味着具有旧传递标签的 basic.ackbasic.nackbasic.reject 将导致通道异常。为了避免这种情况,RabbitMQ Java 客户端会跟踪并更新传递标签,以使它们在恢复之间单调递增。

然后,Channel.basicAckChannel.basicNackChannel.basicReject 将调整后的传递标签转换为 RabbitMQ 使用的那些标签。

使用过时的传递标签的确认不会发送。使用手动确认和自动恢复的应用程序必须能够处理重新传递。

通道生命周期和拓扑恢复

自动连接恢复旨在尽可能透明地对应用程序开发人员,这就是为什么即使在幕后有多个连接失败并恢复,Channel 实例仍然保持相同的原因。从技术上讲,当启用自动恢复时,Channel 实例充当代理或装饰器:它们将 AMQP 业务委托给实际的 AMQP 通道实现,并在其周围实现一些恢复机制。这就是为什么您不应该在通道创建了一些资源(队列、交换机、绑定)之后关闭通道,否则这些资源的拓扑恢复将稍后失败,因为通道已关闭。相反,请将创建通道打开以保持应用程序的生命周期。

未处理的异常

与连接、通道、恢复和消费者生命周期相关的未处理异常将委托给异常处理程序。异常处理程序是任何实现 ExceptionHandler 接口的对象。默认情况下,将使用 DefaultExceptionHandler 的实例。它将异常详细信息打印到标准输出。

可以使用 ConnectionFactory#setExceptionHandler 覆盖处理程序。它将用于工厂创建的所有连接

ConnectionFactory factory = new ConnectionFactory();
cf.setExceptionHandler(customHandler);

异常处理程序应用于异常记录。

指标和监控

客户端会收集活动连接的运行时指标(例如已发布消息的数量)。指标收集是可选功能,应在 ConnectionFactory 级别设置,使用 setMetricsCollector(metricsCollector) 方法。此方法需要一个 MetricsCollector 实例,该实例在客户端代码的多个地方调用。

客户端开箱即用地支持 MicrometerDropwizard MetricsOpenTelemetry

以下是收集的指标

  • 打开连接的数量
  • 打开通道的数量
  • 已发布消息的数量
  • 已确认消息的数量
  • 已否定确认(否定确认)的传出消息的数量
  • 返回的不可路由传出消息的数量
  • 传出消息失败的数量
  • 已消费消息的数量
  • 已确认消息的数量
  • 已拒绝消息的数量

Micrometer 和 Dropwizard Metrics 都提供计数,但也提供消息相关指标的平均速率、最近五分钟的速率等。它们还支持用于监控和报告的常用工具(JMX、Graphite、Ganglia、Datadog 等)。有关更多详细信息,请参见下面的专用部分。

开发人员在启用指标收集时应牢记以下几点。

  • 使用 Micrometer 或 Dropwizard Metrics 时,不要忘记将适当的依赖项(在 Maven、Gradle 或甚至作为 JAR 文件)添加到 JVM 类路径。这些是可选的依赖项,不会自动与 Java 客户端一起拉取。您可能还需要根据使用的报告后端添加其他依赖项。
  • 指标收集是可扩展的。建议为特定需求实现自定义 MetricsCollector
  • MetricsCollectorConnectionFactory 级别设置,但可以在不同的实例之间共享。
  • 指标收集不支持事务。例如,如果在事务中发送确认,然后回滚事务,则确认将在客户端指标中计入(但显然不会计入代理中)。请注意,确认实际上是发送到代理的,然后由事务回滚取消,因此客户端指标在发送的确认方面是正确的。总之,不要将客户端指标用于关键的业务逻辑,它们不能保证完全准确。它们旨在用于简化对运行系统推理并提高操作效率。

Micrometer 支持

指标收集必须首先启用

Micrometer 以下方式

ConnectionFactory connectionFactory = new ConnectionFactory();
MicrometerMetricsCollector metrics = new MicrometerMetricsCollector();
connectionFactory.setMetricsCollector(metrics);
...
metrics.getPublishedMessages(); // get Micrometer's Counter object

Micrometer 支持 多个报告后端:Netflix Atlas、Prometheus、Datadog、Influx、JMX 等。

您通常会将 MeterRegistry 的实例传递给 MicrometerMetricsCollector。以下是用 JMX 的示例

JmxMeterRegistry registry = new JmxMeterRegistry();
MicrometerMetricsCollector metrics = new MicrometerMetricsCollector(registry);
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setMetricsCollector(metrics);

Dropwizard Metrics 支持

使用 Dropwizard 启用指标收集,如下所示

ConnectionFactory connectionFactory = new ConnectionFactory();
StandardMetricsCollector metrics = new StandardMetricsCollector();
connectionFactory.setMetricsCollector(metrics);
...
metrics.getPublishedMessages(); // get Metrics' Meter object

Dropwizard Metrics 支持 多个报告后端:控制台、JMX、HTTP、Graphite、Ganglia 等。

您通常会将 MetricsRegistry 的实例传递给 StandardMetricsCollector。以下是用 JMX 的示例

MetricRegistry registry = new MetricRegistry();
StandardMetricsCollector metrics = new StandardMetricsCollector(registry);

ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setMetricsCollector(metrics);

JmxReporter reporter = JmxReporter
.forRegistry(registry)
.inDomain("com.rabbitmq.client.jmx")
.build();
reporter.start();

Google App Engine 上的 RabbitMQ Java 客户端

在 Google App Engine (GAE) 上使用 RabbitMQ Java 客户端需要使用自定义线程工厂,该工厂使用 GAE 的 ThreadManager 实例化线程(参见上文)。此外,有必要设置较低的心跳间隔(4-5 秒)以避免在 GAE 上遇到较低的 InputStream 读取超时

ConnectionFactory factory = new ConnectionFactory();
cf.setRequestedHeartbeat(5);

注意事项和限制

为了使拓扑恢复成为可能,RabbitMQ Java 客户端维护着一个已声明队列、交换机和绑定的缓存。该缓存是针对每个连接的。某些 RabbitMQ 功能使客户端无法观察到某些拓扑更改,例如当队列因 TTL 而被删除时。RabbitMQ Java 客户端会尝试在最常见的情况下使缓存条目失效。

  • 当队列被删除时。
  • 当交换机被删除时。
  • 当绑定被删除时。
  • 当自动删除队列上的消费者被取消时。
  • 当队列或交换机从自动删除的交换机中解绑时。

但是,客户端无法跟踪超出单个连接的这些拓扑更改。依赖于自动删除队列或交换机,以及队列 TTL(注意:不是消息 TTL!)并使用 自动连接恢复 的应用程序,应显式删除已知为未使用或已删除的实体,以清除客户端的拓扑缓存。这可以通过在 RabbitMQ 3.3.x 中使用 Channel#queueDeleteChannel#exchangeDeleteChannel#queueUnbindChannel#exchangeUnbind 来实现,因为它们是幂等的(删除不存在的东西不会导致异常)。

RPC(请求/响应)模式:示例

为了编程方便,Java 客户端 API 提供了一个名为 RpcClient 的类,它使用一个临时的回复队列,通过 AMQP 0-9-1 提供简单的 RPC 风格的通信 功能。

此类不会对 RPC 参数和返回值施加任何特定的格式。它仅仅提供了一种机制,用于将消息发送到具有特定路由键的特定交换机,并等待回复队列上的响应。

import com.rabbitmq.client.RpcClient;

RpcClient rpc = new RpcClient(channel, exchangeName, routingKey);

(此类使用 AMQP 0-9-1 的实现细节如下:请求消息使用 basic.correlation_id 字段发送,该字段的值对于此 RpcClient 实例是唯一的,并且使用 basic.reply_to 设置为回复队列的名称。)

创建此类的实例后,可以使用以下任一方法来发送 RPC 请求。

byte[] primitiveCall(byte[] message);
String stringCall(String message)
Map mapCall(Map message)
Map mapCall(Object[] keyValuePairs)

primitiveCall 方法将原始字节数组作为请求和响应主体进行传输。stringCall 方法是 primitiveCall 的一个简单的便捷包装器,将消息主体视为默认字符编码中的 String 实例。

mapCall 变体更为复杂:它们将包含普通 Java 值的 java.util.Map 编码为 AMQP 0-9-1 二进制表格表示形式,并以相同的方式解码响应。(请注意,这里对可以使用的值类型有一些限制,请参阅 javadoc 了解详细信息。)

所有编组/解组便捷方法都使用 primitiveCall 作为传输机制,并在其之上提供一个包装层。

TLS 支持

可以使用 TLS 加密客户端和代理之间的通信。客户端和服务器身份验证(也称为对等验证)也受支持。以下是最简单、最朴素的使用 Java 客户端加密的方法。

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5671);

// Only suitable for development.
// This code will not perform peer certificate chain verification and prone
// to man-in-the-middle attacks.
// See the main TLS guide to learn about peer verification and how to enable it.
factory.useSslProtocol();

请注意,客户端在上述示例中没有执行任何服务器身份验证(对等证书链验证),因为默认情况下使用的是“信任所有证书”的 TrustManager。这对于本地开发很方便,但容易受到 中间人攻击,因此 不建议在生产环境中使用

要详细了解 RabbitMQ 中的 TLS 支持,请参阅 TLS 指南。如果您只想配置 Java 客户端(尤其是对等验证和信任管理器部分),请阅读 TLS 指南的相应部分

OAuth 2 支持

客户端可以针对像 UAA 这样的 OAuth 2 服务器进行身份验证。服务器端必须启用 OAuth 2 插件,并配置为使用与客户端相同的 OAuth 2 服务器。

获取 OAuth 2 令牌

Java 客户端提供 OAuth2ClientCredentialsGrantCredentialsProvider 类,以使用 OAuth 2 客户端凭据流 获取 JWT 令牌。客户端将在打开连接时在密码字段中发送 JWT 令牌。然后,代理将验证 JWT 令牌签名、有效性和权限,然后授权连接并授予对请求的虚拟主机的访问权限。

优先使用 OAuth2ClientCredentialsGrantCredentialsProviderBuilder 创建 OAuth2ClientCredentialsGrantCredentialsProvider 实例,然后将其设置在 ConnectionFactory 上。以下代码段显示了如何为 OAuth 2 插件的示例设置 配置和创建 OAuth 2 凭据提供程序的实例。

import com.rabbitmq.client.impl.OAuth2ClientCredentialsGrantCredentialsProvider.
OAuth2ClientCredentialsGrantCredentialsProviderBuilder;
...
CredentialsProvider credentialsProvider =
new OAuth2ClientCredentialsGrantCredentialsProviderBuilder()
.tokenEndpointUri("https://127.0.0.1:8080/uaa/oauth/token/")
.clientId("rabbit_client").clientSecret("rabbit_secret")
.grantType("password")
.parameter("username", "rabbit_super")
.parameter("password", "rabbit_super")
.build();

connectionFactory.setCredentialsProvider(credentialsProvider);

在生产环境中,请确保为令牌端点 URI 使用 HTTPS,并在必要时为 HTTPS 请求配置 SSLContext(以验证和信任 OAuth 2 服务器的身份)。以下代码段通过使用 OAuth2ClientCredentialsGrantCredentialsProviderBuilder 中的 tls().sslContext() 方法来实现。

SSLContext sslContext = ... // create and initialise SSLContext

CredentialsProvider credentialsProvider =
new OAuth2ClientCredentialsGrantCredentialsProviderBuilder()
.tokenEndpointUri("https://127.0.0.1:8080/uaa/oauth/token/")
.clientId("rabbit_client").clientSecret("rabbit_secret")
.grantType("password")
.parameter("username", "rabbit_super")
.parameter("password", "rabbit_super")
.tls() // configure TLS
.sslContext(sslContext) // set SSLContext
.builder() // back to main configuration
.build();

请查阅 Javadoc 以查看所有可用选项。

刷新令牌

令牌会过期,代理将拒绝对使用过期令牌的连接进行操作。为了避免这种情况,可以在过期之前调用 CredentialsProvider#refresh(),并将新的令牌发送到服务器。从应用程序的角度来看,这很麻烦,因此 Java 客户端提供了 DefaultCredentialsRefreshService 的帮助。此实用程序跟踪使用的令牌,在它们过期之前刷新它们,并将新的令牌发送到它负责的连接。

以下代码段显示了如何创建一个 DefaultCredentialsRefreshService 实例,并将其设置在 ConnectionFactory 上。

import com.rabbitmq.client.impl.DefaultCredentialsRefreshService.
DefaultCredentialsRefreshServiceBuilder;
...
CredentialsRefreshService refreshService =
new DefaultCredentialsRefreshServiceBuilder().build();
cf.setCredentialsRefreshService(refreshService);

DefaultCredentialsRefreshService 在令牌有效期时间 80% 后安排一次刷新,例如,如果令牌在 60 分钟后过期,它将在 48 分钟后刷新。这是默认行为,请查阅 Javadoc 了解详细信息。

© 2024 RabbitMQ. All rights reserved.