跳到主要内容

Java 客户端 API 指南

概述

本指南涵盖了 RabbitMQ Java 客户端 及其公共 API。它假设使用了客户端的最新主要版本,并且读者熟悉基础知识

本指南的关键章节是

API 参考 (JavaDoc) 可单独获取。

支持时间线

有关支持时间线,请参阅 RabbitMQ Java 库支持页面

JDK 和 Android 版本支持

此库的 5.x 发布系列需要 JDK 8,无论是编译时还是运行时。在 Android 上,这意味着仅支持 Android 7.0 或更高版本

4.x 发布系列支持 JDK 6 以及 7.0 之前的 Android 版本。

许可

该库是开源的,在 GitHub 上开发,并根据以下三重许可获得许可:

这意味着用户可以将该库视为在以上列表中的任何许可证下获得许可。例如,用户可以选择 Apache 公共许可证 2.0 并将此客户端包含到商业产品中。根据 GPLv2 许可的代码库可以选择 GPLv2,依此类推。

概述

客户端 API 公开了 AMQP 0-9-1 协议模型中的关键实体,并提供了额外的抽象以方便使用。

RabbitMQ Java 客户端使用 com.rabbitmq.client 作为其顶级包。关键类和接口包括:

  • Channel:表示 AMQP 0-9-1 通道,并提供大多数操作(协议方法)。
  • Connection:表示 AMQP 0-9-1 连接
  • ConnectionFactory:构造 Connection 实例
  • Consumer:表示消息消费者
  • DefaultConsumer:常用的消费者基类
  • BasicProperties:消息属性(元数据)
  • BasicProperties.Builder:BasicProperties 的构建器

协议操作通过 Channel 接口可用。Connection 用于打开通道、注册连接生命周期事件处理程序以及关闭不再需要的连接。Connection 通过 ConnectionFactory 实例化,您可以通过它配置各种连接设置,例如虚拟主机或用户名。

连接和通道

核心 API 类是 ConnectionChannel,分别表示 AMQP 0-9-1 连接和通道。它们通常在使用前导入

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

连接到 RabbitMQ

以下代码使用给定的参数(主机名、端口号等)连接到 RabbitMQ 节点

ConnectionFactory factory = new ConnectionFactory();
// "guest"/"guest" by default, limited to localhost connections
factory.setUsername(userName);
factory.setPassword(password);
factory.setVirtualHost(virtualHost);
factory.setHost(hostName);
factory.setPort(portNumber);

Connection conn = factory.newConnection();

所有这些参数对于在本地运行的 RabbitMQ 节点都具有合理的默认值。

如果属性在创建连接之前保持未分配状态,则将使用该属性的默认值

属性默认值
用户名"guest"
密码"guest"
虚拟主机"/"
主机名"localhost"
端口

常规连接为 5672,对于使用 TLS 的连接5671

使用 URI 连接

或者,可以使用 URI

ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://userName:password@hostName:portNumber/virtualHost");
Connection conn = factory.newConnection();

所有这些参数对于在本地运行的默认 RabbitMQ 服务器都具有合理的默认值。

成功的和不成功的客户端连接事件都可以在服务器节点日志中观察到

请注意,默认情况下,用户 guest 只能从 localhost 连接。这是为了限制在生产系统中使用众所周知的凭据。

应用程序开发人员可以为连接分配自定义名称。如果设置了名称,则该名称将在 RabbitMQ 节点日志以及 管理 UI 中提及。

然后可以使用 Connection 接口打开通道

Channel channel = conn.createChannel();

现在可以使用该通道发送和接收消息,如后续章节所述。

使用端点列表

可以指定要连接的端点列表。将使用第一个可访问的端点。在连接故障的情况下,使用端点列表使应用程序可以在原始节点关闭时连接到不同的节点。

要使用多个端点,请将 Address 列表提供给 ConnectionFactory#newConnectionAddress 表示主机名和端口对。

Address[] addrArr = new Address[]{ new Address(hostname1, portnumber1)
, new Address(hostname2, portnumber2)};
Connection conn = factory.newConnection(addrArr);

将尝试连接到 hostname1:portnumber1,如果失败,则连接到 hostname2:portnumber2。返回的连接是数组中第一个成功的连接(不抛出 IOException)。这完全等同于在工厂上重复设置主机和端口,每次调用 factory.newConnection(),直到其中一个成功为止。

如果也提供了 ExecutorService(使用 factory.newConnection(es, addrArr) 形式),则线程池与(第一个)成功的连接关联。

如果您想更好地控制要连接的主机,请参阅对服务发现的支持

断开与 RabbitMQ 的连接

要断开连接,只需关闭通道和连接

channel.close();
conn.close();

请注意,关闭通道可能被认为是良好的做法,但在此处并非绝对必要 - 当底层连接关闭时,它无论如何都会自动完成。

客户端断开连接事件可以在服务器节点日志中观察到

连接和通道生命周期

客户端连接旨在长期存在。底层协议是为长时间运行的连接而设计和优化的。这意味着每个操作(例如,发布消息)都打开一个新连接是不必要的,并且强烈建议不要这样做,因为它会引入大量的网络往返和开销。

通道也旨在长期存在,但由于许多可恢复的协议错误将导致通道关闭,因此通道的生命周期可能比其连接的生命周期短。每个操作都关闭和打开新通道通常是不必要的,但可能是合适的。如有疑问,请首先考虑重用通道。

通道级异常,例如尝试从不存在的队列中消费,将导致通道关闭。关闭的通道不能再使用,也不会再接收来自服务器的任何事件(例如消息传递)。通道级异常将由 RabbitMQ 记录,并将启动通道的关闭序列(见下文)。

客户端提供的连接名称

RabbitMQ 节点关于其客户端的信息量有限

  • 它们的 TCP 端点(源 IP 地址和端口)
  • 使用的凭据

仅凭这些信息可能使识别应用程序和实例变得困难,尤其是在凭据可以共享且客户端通过负载均衡器连接但 代理协议 无法启用的情况下。

为了更容易在服务器日志管理 UI 中识别客户端,AMQP 0-9-1 客户端连接(包括 RabbitMQ Java 客户端)可以提供自定义标识符。如果设置,则该标识符将在日志条目和管理 UI 中提及。该标识符称为客户端提供的连接名称。该名称可用于标识应用程序或应用程序中的特定组件。该名称是可选的;但是,强烈建议开发人员提供一个,因为它将大大简化某些操作任务。

RabbitMQ Java 客户端的 ConnectionFactory#newConnection 方法重载 接受客户端提供的连接名称。这是一个修改后的连接示例,在上面使用过,它提供了这样一个名称

ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://userName:password@hostName:portNumber/virtualHost");
// provides a custom connection name
Connection conn = factory.newConnection("app:audit component:event-consumer");

使用交换器和队列

客户端应用程序使用[交换器]和队列,这是协议的高级构建块。这些必须先声明才能使用。声明任一类型的对象只是确保存在该名称的对象,并在必要时创建它。

继续前面的示例,以下代码声明了一个交换器和一个服务器命名队列,然后将它们绑定在一起。

channel.exchangeDeclare(exchangeName, "direct", true);
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, exchangeName, routingKey);

这将主动声明以下对象,这两个对象都可以通过使用其他参数进行自定义。这里它们都没有任何特殊参数。

  • 一个持久的、非自动删除的“direct”类型的交换器
  • 一个非持久的、独占的、自动删除的、带有生成名称的队列

上面的函数调用然后使用给定的路由键将队列绑定到交换器。

请注意,当只有一个客户端想要使用队列时,这将是声明队列的典型方式:它不需要众所周知的名称,没有其他客户端可以使用它(独占),并且将自动清理(自动删除)。如果多个客户端想要共享一个具有众所周知名称的队列,则此代码将是合适的

channel.exchangeDeclare(exchangeName, "direct", true);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);

这将主动声明

  • 一个持久的、非自动删除的“direct”类型的交换器
  • 一个持久的、非独占的、非自动删除的、带有众所周知名称的队列

许多 Channel API 方法都是重载的。这些方便的 exchangeDeclarequeueDeclarequeueBind 短格式使用合理的默认值。还有更长的形式,带有更多参数,允许您根据需要覆盖这些默认值,在需要时提供完全控制。

这种“短格式,长格式”模式在整个客户端 API 中使用。

被动声明

队列和交换器可以“被动”声明。被动声明只是检查是否已存在具有提供名称的实体。如果存在,则该操作是空操作。对于队列,成功的被动声明将返回与非被动声明相同的信息,即消费者数量和队列中就绪状态的消息数量。

如果实体不存在,则操作失败并出现通道级异常。之后通道将无法使用。应打开新通道。对于被动声明,通常使用一次性(临时)通道。

Channel#queueDeclarePassiveChannel#exchangeDeclarePassive 是用于被动声明的方法。以下示例演示了 Channel#queueDeclarePassive

Queue.DeclareOk response = channel.queueDeclarePassive("queue-name");
// returns the number of messages in Ready state in the queue
response.getMessageCount();
// returns the number of consumers the queue has
response.getConsumerCount();

Channel#exchangeDeclarePassive 的返回值不包含任何有用的信息。因此,如果该方法返回并且没有发生通道异常,则表示交换器确实存在。

带有可选响应的操作

一些常用操作也有“no wait”版本,它不会等待服务器响应。例如,要声明一个队列并指示服务器不发送任何响应,请使用

channel.queueDeclareNoWait(queueName, true, false, false, null);

“no wait”版本效率更高,但安全保证较低,例如,它们更依赖于心跳机制来检测失败的操作。如有疑问,请从标准版本开始。“no wait”版本仅在具有高拓扑(队列、绑定) churn 的场景中才需要。

删除实体和清除消息

可以显式删除队列或交换器

channel.queueDelete("queue-name")

只有当队列为空时才能删除队列

channel.queueDelete("queue-name", false, true)

或者如果它未使用(没有任何消费者)

channel.queueDelete("queue-name", true, false)

可以清除队列(删除其所有消息)

channel.queuePurge("queue-name")

发布消息

要将消息发布到交换器,请按如下方式使用 Channel.basicPublish

byte[] messageBodyBytes = "Hello, world!".getBytes();
channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);

为了进行精细控制,请使用重载变体来指定 mandatory 标志,或发送带有预设消息属性的消息(有关详细信息,请参阅发布者指南

channel.basicPublish(exchangeName, routingKey, mandatory,
MessageProperties.PERSISTENT_TEXT_PLAIN,
messageBodyBytes);

这将发送一条消息,其传递模式为 2(持久)、优先级为 1 且内容类型为“text/plain”。使用 Builder 类构建具有所需数量属性的消息属性对象,例如

channel.basicPublish(exchangeName, routingKey,
new AMQP.BasicProperties.Builder()
.contentType("text/plain")
.deliveryMode(2)
.priority(1)
.userId("bob")
.build(),
messageBodyBytes);

此示例发布带有自定义标头的消息

Map<String, Object> headers = new HashMap<String, Object>();
headers.put("latitude", 51.5252949);
headers.put("longitude", -0.0905493);

channel.basicPublish(exchangeName, routingKey,
new AMQP.BasicProperties.Builder()
.headers(headers)
.build(),
messageBodyBytes);

此示例发布带有过期的消息

channel.basicPublish(exchangeName, routingKey,
new AMQP.BasicProperties.Builder()
.expiration("60000")
.build(),
messageBodyBytes);

这只是一组简短的示例,并未演示每个支持的属性。

请注意,BasicProperties 是外部类 AMQP 的内部类。

如果资源驱动警报生效,则 Channel#basicPublish 的调用最终将阻塞。

通道和并发注意事项(线程安全)

应避免在线程之间共享 Channel 实例。应用程序应为每个线程使用一个 Channel,而不是在多个线程之间共享同一个 Channel

虽然通道上的某些操作可以安全地并发调用,但有些操作则不是,并且会导致线路上的错误帧交错、重复确认等。

在共享通道上并发发布可能导致线路上的错误帧交错,从而触发连接级协议异常和代理立即关闭连接。因此,它需要在应用程序代码中进行显式同步(必须在临界区中调用 Channel#basicPublish)。线程之间共享通道也会干扰发布者确认。最好完全避免在共享通道上并发发布,例如,为每个线程使用一个通道。

可以使用通道池来避免在共享通道上并发发布:一旦线程完成通道的工作,它就会将其返回到池中,使通道可用于另一个线程。通道池可以被认为是特定的同步解决方案。建议使用现有的池库而不是自制解决方案。例如,Spring AMQP 附带即用型通道池功能。

通道消耗资源,并且在大多数情况下,应用程序很少需要在同一 JVM 进程中打开数百个以上的通道。如果我们假设应用程序为每个通道都有一个线程(因为通道不应并发使用),那么单个 JVM 的数千个线程已经是一个相当大的开销,很可能可以避免。此外,一些快速发布者可以轻松地使网络接口和代理节点饱和:发布涉及的工作量少于路由、存储和传递消息。

要避免的经典反模式是为每条发布的消息打开一个通道。通道应该具有相当长的生命周期,并且打开新通道是网络往返,这使得此模式非常低效。

在共享通道上,在一个线程中消费,在另一个线程中发布可能是安全的。

服务器推送的交付(请参阅下面的部分)与保证每个通道的排序被保留的并发方式分派。分派机制使用 java.util.concurrent.ExecutorService,每个连接一个。可以使用 ConnectionFactory#setSharedExecutor setter 提供一个自定义执行器,该执行器将由单个 ConnectionFactory 生成的所有连接共享。

当使用手动确认时,重要的是要考虑哪个线程进行确认。如果它与接收交付的线程不同(例如,Consumer#handleDelivery 将交付处理委托给不同的线程),则将 multiple 参数设置为 true 进行确认是不安全的,并且会导致双重确认,从而导致通道级协议异常关闭通道。一次确认一条消息可能是安全的。

通过订阅接收消息(“推送 API”)

import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;

接收消息最有效的方法是使用 Consumer 接口设置订阅。然后,消息将在到达时自动传递,而无需显式请求。

当调用与 Consumer 相关的 API 方法时,各个订阅始终由其消费者标签引用。消费者标签是消费者标识符,可以是客户端生成或服务器生成的。要让 RabbitMQ 生成节点范围内的唯一标签,请使用不带消费者标签参数的 Channel#basicConsume 重载,或者为消费者标签传递空字符串,并使用 Channel#basicConsume 返回的值。消费者标签用于取消消费者。

不同的 Consumer 实例必须具有不同的消费者标签。强烈建议不要在连接上重复使用消费者标签,并且当监视消费者时,可能会导致自动连接恢复问题和令人困惑的监控数据。

实现 Consumer 的最简单方法是子类化便利类 DefaultConsumer。此子类的对象可以在 basicConsume 调用中传递,以设置订阅

boolean autoAck = false;
channel.basicConsume(queueName, autoAck, "myConsumerTag",
new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException
{
String routingKey = envelope.getRoutingKey();
String contentType = properties.getContentType();
long deliveryTag = envelope.getDeliveryTag();
<i>// (process the message components here ...)</i>
channel.basicAck(deliveryTag, false);
}
});

在这里,由于我们指定了 autoAck = false,因此有必要确认传递给 Consumer 的消息,最方便的做法是在 handleDelivery 方法中完成,如所示。

更复杂的 Consumer 需要覆盖更多方法。特别是,当通道和连接关闭时,会调用 handleShutdownSignal,并且在调用该 Consumer 的任何其他回调之前,会将消费者标签传递给 handleConsumeOk

Consumer 还可以实现 handleCancelOkhandleCancel 方法,以便分别收到显式和隐式取消的通知。

您可以使用 Channel.basicCancel 显式取消特定的 Consumer

channel.basicCancel(consumerTag);

传递消费者标签。

与发布者一样,重要的是要考虑消费者的并发危害安全性。

Consumer 的回调在与实例化其 Channel 的线程分离的线程池中分派。这意味着 Consumer 可以安全地在 ConnectionChannel 上调用阻塞方法,例如 Channel#queueDeclareChannel#basicCancel

每个 Channel 将按 RabbitMQ 发送的顺序将其所有交付分派到其 Consumer 处理程序方法。通道之间的交付顺序不保证:这些交付可以并行分派。

对于每个 Channel 一个 Consumer 的最常见用例,这意味着 Consumer 不会阻止其他 Consumer。对于每个 Channel 多个 Consumer,请注意,长时间运行的 Consumer 可能会阻止向该 Channel 上的其他 Consumer 分派回调。

有关与并发和并发危害安全性相关的其他主题,请参阅并发注意事项(线程安全)部分。

检索单个消息(“拉取 API”)

也可以按需检索单个消息(“拉取 API”,也称为轮询)。这种消费方法非常低效,因为它实际上是轮询,即使绝大多数请求没有结果,应用程序也必须重复请求结果。因此,强烈建议不要使用此方法。

要“拉取”消息,请使用 Channel.basicGet 方法。返回的值是 GetResponse 的实例,可以从中提取标头信息(属性)和消息体

boolean autoAck = false;
GetResponse response = channel.basicGet(queueName, autoAck);
if (response == null) {
// No message retrieved.
} else {
AMQP.BasicProperties props = response.getProps();
byte[] body = response.getBody();
long deliveryTag = response.getEnvelope().getDeliveryTag();
// ...

并且由于此示例使用手动确认(上面的 autoAck = false),您还必须调用 Channel.basicAck 以确认您已成功接收到消息

// ...
channel.basicAck(method.deliveryTag, false); // acknowledge receipt of the message
}

处理无法路由的消息

如果发布消息时设置了“mandatory”标志,但无法路由,则代理会将消息返回给发送客户端(通过 AMQP.Basic.Return 命令)。

要收到此类返回的通知,客户端可以实现 ReturnListener 接口并调用 Channel.addReturnListener。如果客户端没有为特定通道配置返回侦听器,则关联的返回消息将被静默丢弃。

channel.addReturnListener(new ReturnListener() {
public void handleReturn(int replyCode,
String replyText,
String exchange,
String routingKey,
AMQP.BasicProperties properties,
byte[] body)
throws IOException {
...
}
});

例如,如果客户端将“mandatory”标志设置为“direct”类型的交换器发布消息,但该交换器未绑定到队列,则将调用返回侦听器。

关闭协议

客户端关闭过程概述

AMQP 0-9-1 连接和通道共享相同的一般方法来管理网络故障、内部故障和显式本地关闭。

AMQP 0-9-1 连接和通道具有以下生命周期状态

  • open:对象已准备好使用

  • closing:对象已被显式通知在本地关闭,已向任何支持的下层对象发出关闭请求,并且正在等待它们的关闭过程完成

  • closed:对象已收到来自任何下层对象的所有关闭完成通知,因此已自行关闭

这些对象始终以 closed 状态结束,无论导致关闭的原因是什么,例如应用程序请求、内部客户端库故障、远程网络请求或网络故障。

连接和通道对象具有以下与关闭相关的方法

  • addShutdownListener(ShutdownListener listener)

  • removeShutdownListener(ShutdownListener listener),用于管理任何侦听器,这些侦听器将在对象转换为 closed 状态时触发。请注意,向已关闭的对象添加 ShutdownListener 将立即触发侦听器

  • getCloseReason(),允许调查对象关闭的原因

  • isOpen(),用于测试对象是否处于打开状态

  • close(int closeCode, String closeMessage),用于显式通知对象关闭

侦听器的简单用法如下所示

import com.rabbitmq.client.ShutdownSignalException;
import com.rabbitmq.client.ShutdownListener;

connection.addShutdownListener(new ShutdownListener() {
public void shutdownCompleted(ShutdownSignalException cause)
{
...
}
});

关于关闭情况的信息

可以检索 ShutdownSignalException,其中包含有关关闭原因的所有可用信息,可以通过显式调用 getCloseReason() 方法,或通过使用 ShutdownListener 类的 service(ShutdownSignalException cause) 方法中的 cause 参数来检索。

ShutdownSignalException 类提供了分析关闭原因的方法。通过调用 isHardError() 方法,我们可以获得有关它是连接错误还是通道错误的信息,getReason() 返回有关原因的信息,形式为 AMQP 方法 - AMQP.Channel.CloseAMQP.Connection.Close(如果原因是库中的某些异常,例如网络通信故障,则为 null,在这种情况下,可以使用 getCause() 检索该异常)。

public void shutdownCompleted(ShutdownSignalException cause)
{
if (cause.isHardError())
{
Connection conn = (Connection)cause.getReference();
if (!cause.isInitiatedByApplication())
{
Method reason = cause.getReason();
...
}
...
} else {
Channel ch = (Channel)cause.getReference();
...
}
}

isOpen() 方法的原子性和使用

不建议在生产代码中使用通道和连接对象的 isOpen() 方法,因为该方法返回的值取决于关闭原因的存在。以下代码说明了竞争条件的可能性

public void brokenMethod(Channel channel)
{
if (channel.isOpen())
{
// The following code depends on the channel being in open state.
// However there is a possibility of the change in the channel state
// between isOpen() and basicQos(1) call
...
channel.basicQos(1);
}
}

相反,我们通常应该忽略此类检查,而只需尝试所需的操作。如果在代码执行期间通道连接关闭,则将抛出 ShutdownSignalException,指示对象处于无效状态。我们还应该捕获由 SocketException 引起的 IOException(当代理意外关闭连接时)或 ShutdownSignalException(当代理启动干净关闭时)。

public void validMethod(Channel channel)
{
try {
...
channel.basicQos(1);
} catch (ShutdownSignalException sse) {
// possibly check if channel was closed
// by the time we started action and reasons for
// closing it
...
} catch (IOException ioe) {
// check why connection was closed
...
}
}

高级连接选项

消费者操作线程池

默认情况下,Consumer 线程(请参阅下面的接收)在新的 ExecutorService 线程池中自动分配。如果需要更大的控制,请在 newConnection() 方法上提供 ExecutorService,以便使用此线程池代替。这是一个示例,其中提供的线程池比通常分配的线程池更大

ExecutorService es = Executors.newFixedThreadPool(20);
Connection conn = factory.newConnection(es);

ExecutorsExecutorService 类都在 java.util.concurrent 包中。

当连接关闭时,默认的 ExecutorServiceshutdown(),但用户提供的 ExecutorService(如上面的 es)将不会 shutdown()。提供自定义 ExecutorService 的客户端必须确保最终将其关闭(通过调用其 shutdown() 方法),否则池的线程可能会阻止 JVM 终止。

同一个执行器服务可以在多个连接之间共享,或者在重新连接时串行重用,但它在 shutdown() 后不能使用。

只有当有证据表明 Consumer 回调的处理存在严重瓶颈时,才应考虑使用此功能。如果没有执行 Consumer 回调,或者很少执行,则默认分配已足够。即使偶尔可能发生消费者活动突发,开销最初也是最小的,并且分配的总线程资源也是有界的。

使用 AddressResolver 接口进行服务发现

可以使用 AddressResolver 的实现来更改连接时使用的端点解析算法

Connection conn = factory.newConnection(addressResolver);

AddressResolver 接口如下所示

public interface AddressResolver {

List<Address> getAddresses() throws IOException;

}

就像端点列表一样,将首先尝试返回的第一个 Address,如果客户端无法连接到第一个 Address,则尝试第二个 Address,依此类推。

如果也提供了 ExecutorService(使用 factory.newConnection(es, addressResolver) 形式),则线程池与(第一个)成功的连接关联。

AddressResolver 是实现自定义服务发现逻辑的理想场所,这在动态基础设施中尤其有用。与自动恢复结合使用,客户端可以自动连接到最初启动时甚至未启动的节点。亲和性和负载均衡是自定义 AddressResolver 可能有用的其他场景。

Java 客户端附带以下实现(详细信息请参阅 javadoc)

  • DnsRecordIpAddressResolver: 给定主机名,返回其 IP 地址(针对平台 DNS 服务器进行解析)。这对于基于 DNS 的简单负载均衡或故障转移非常有用。

  • DnsSrvRecordAddressResolver: 给定服务名称,返回主机名/端口对。搜索实现为 DNS SRV 请求。当使用像 HashiCorp Consul 这样的服务注册中心时,这非常有用。

心跳超时

有关心跳以及如何在 Java 客户端中配置心跳的更多信息,请参阅心跳指南

自定义线程工厂

诸如 Google App Engine (GAE) 之类的环境可能会限制直接线程实例化。要在这种环境中使用 RabbitMQ Java 客户端,必须配置一个自定义 ThreadFactory,该工厂使用适当的方法来实例化线程,例如 GAE 的 ThreadManager

以下是 Google App Engine 的示例。

import com.google.appengine.api.ThreadManager;

ConnectionFactory cf = new ConnectionFactory();
cf.setThreadFactory(ThreadManager.backgroundThreadFactory());

支持 Java 非阻塞 IO

Java 客户端 4.0 版本引入了对 Java 非阻塞 IO(又名 Java NIO)的支持。NIO 并非旨在比阻塞 IO 更快,它只是可以更轻松地控制资源(在本例中为线程)。

在默认的阻塞 IO 模式下,每个连接都使用一个线程从网络套接字读取数据。在 NIO 模式下,您可以控制从/向网络套接字读取和写入数据的线程数。

如果您的 Java 进程使用大量连接(几十个或几百个),请使用 NIO 模式。您应该使用比默认阻塞模式更少的线程。在设置适当的线程数后,您不应遇到任何性能下降,尤其是在连接不太繁忙的情况下。

必须显式启用 NIO

ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.useNio();

可以通过 NioParams 类配置 NIO 模式

  connectionFactory.setNioParams(new NioParams().setNbIoThreads(4));

NIO 模式使用合理的默认值,但您可能需要根据自己的工作负载更改它们。一些设置包括:使用的 IO 线程总数、缓冲区大小、用于 IO 循环的服务执行器、内存写入队列的参数(写入请求在发送到网络之前排队)。请阅读 Javadoc 以了解详细信息和默认值。

从网络故障自动恢复

连接恢复

客户端和 RabbitMQ 节点之间的网络连接可能会失败。RabbitMQ Java 客户端支持连接和拓扑(队列、交换器、绑定和消费者)的自动恢复。

许多应用程序的自动恢复过程遵循以下步骤

  • 重新连接
  • 恢复连接监听器
  • 重新打开通道
  • 恢复通道监听器
  • 恢复通道 basic.qos 设置、发布者确认和事务设置

拓扑恢复包括以下操作,对每个通道执行

  • 重新声明交换器(预定义的交换器除外)
  • 重新声明队列
  • 恢复所有绑定
  • 恢复所有消费者

从 Java 客户端 4.0.0 版本开始,默认启用自动恢复(因此也默认启用拓扑恢复)。

拓扑恢复依赖于每个连接的实体(队列、交换器、绑定、消费者)缓存。例如,当在连接上声明队列时,它将被添加到缓存中。当它被删除或计划删除时(例如,因为它被自动删除),它将被移除。此模型有一些限制,将在下面介绍。

要禁用或启用自动连接恢复,请使用 factory.setAutomaticRecoveryEnabled(boolean) 方法。以下代码片段展示了如何显式启用自动恢复(例如,对于 4.0.0 之前的 Java 客户端)

ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(userName);
factory.setPassword(password);
factory.setVirtualHost(virtualHost);
factory.setHost(hostName);
factory.setPort(portNumber);
factory.setAutomaticRecoveryEnabled(true);
// connection that will recover automatically
Connection conn = factory.newConnection();

如果由于异常导致恢复失败(例如,RabbitMQ 节点仍然不可访问),则会在固定的时间间隔后重试(默认为 5 秒)。可以配置此间隔

ConnectionFactory factory = new ConnectionFactory();
// attempt recovery every 10 seconds
factory.setNetworkRecoveryInterval(10000);

当提供地址列表时,该列表会被打乱顺序,并尝试所有地址,一个接一个地尝试

ConnectionFactory factory = new ConnectionFactory();

Address[] addresses = {new Address("192.168.1.4"), new Address("192.168.1.5")};
factory.newConnection(addresses);

何时触发连接恢复?

如果启用了自动连接恢复,则以下事件将触发自动连接恢复

  • 在连接的 I/O 循环中抛出 I/O 异常
  • 套接字读取操作超时
  • 检测到服务器心跳丢失
  • 在连接的 I/O 循环中抛出任何其他意外异常

以先发生者为准。

如果客户端到 RabbitMQ 节点的初始连接失败,则不会启动自动连接恢复。应用程序开发人员负责重试此类连接、记录失败的尝试、实现重试次数限制等等。这是一个非常基本的示例

ConnectionFactory factory = new ConnectionFactory();
// configure various connection settings

try {
Connection conn = factory.newConnection();
} catch (java.net.ConnectException e) {
Thread.sleep(5000);
// apply retry logic
}

当应用程序通过 Connection.Close 方法关闭连接时,将不会启动连接恢复。

通道级异常不会触发任何类型的恢复,因为它们通常表示应用程序中的语义问题(例如,尝试从不存在的队列中消费)。

恢复监听器

可以在可恢复的连接和通道上注册一个或多个恢复监听器。当启用连接恢复后,ConnectionFactory#newConnectionConnection#createChannel 返回的连接会实现 com.rabbitmq.client.Recoverable,提供两个名称具有相当描述性的方法

  • addRecoveryListener
  • removeRecoveryListener

请注意,您当前需要将连接和通道强制转换为 Recoverable 才能使用这些方法。

对发布的影响

当连接断开时,使用 Channel.basicPublish 发布的邮件将丢失。客户端不会将它们排队以便在连接恢复后传递。为了确保发布的邮件到达 RabbitMQ,应用程序需要使用发布者确认并考虑连接故障。

拓扑恢复

拓扑恢复包括交换器、队列、绑定和消费者的恢复。当启用自动恢复时,默认启用拓扑恢复。在客户端的现代版本中,默认启用拓扑恢复。

如果需要,可以显式禁用拓扑恢复

ConnectionFactory factory = new ConnectionFactory();

Connection conn = factory.newConnection();
// enable automatic recovery (e.g. Java client prior 4.0.0)
factory.setAutomaticRecoveryEnabled(true);
// disable topology recovery
factory.setTopologyRecoveryEnabled(false);

故障检测和恢复限制

自动连接恢复有许多限制和有意的设计决策,应用程序开发人员需要了解这些限制和决策。

拓扑恢复依赖于每个连接的实体(队列、交换器、绑定、消费者)缓存。例如,当在连接上声明队列时,它将被添加到缓存中。当它被删除或计划删除时(例如,因为它被自动删除),它将被移除。这使得可以在不同的通道上声明和删除实体,而不会产生意外的结果。这也意味着消费者标签(通道特定的标识符)在所有使用自动连接恢复的连接上的所有通道中必须是唯一的。

当连接断开或丢失时,检测到需要时间。因此,存在一个时间窗口,在此窗口中,库和应用程序都不知道有效的连接故障。在此时间范围内发布的任何消息都会像往常一样序列化并写入 TCP 套接字。它们到代理的传递只能通过发布者确认来保证:AMQP 0-9-1 中的发布在设计上是完全异步的。

当启用自动恢复的连接检测到套接字或 I/O 操作错误时,恢复会在可配置的延迟后开始,默认为 5 秒。此设计假设即使许多网络故障是瞬态的且通常是短暂的,它们也不会立即消失。延迟还可以避免服务器端资源清理(例如独占或自动删除队列删除)与在同一资源上的新打开的连接上执行的操作之间固有的竞争条件。

默认情况下,连接恢复尝试将以相同的时间间隔继续,直到成功打开新连接。可以通过向 ConnectionFactory#setRecoveryDelayHandler 提供 RecoveryDelayHandler 实现实例来使恢复延迟动态化。使用动态计算的延迟间隔的实现应避免过低的值(例如低于 2 秒的值)。

当连接处于恢复状态时,在其通道上尝试的任何发布都将被拒绝并抛出异常。客户端当前不执行此类传出消息的任何内部缓冲。应用程序开发人员有责任跟踪此类消息并在恢复成功后重新发布它们。发布者确认是一种协议扩展,应由无法承受消息丢失的发布者使用。

当通道因通道级异常而关闭时,连接恢复不会启动。此类异常通常表示应用程序级问题。库无法就何时是这种情况做出明智的决定。

即使在连接恢复启动后,已关闭的通道也不会被恢复。这包括显式关闭的通道和上述通道级异常情况。

手动确认和自动恢复

当使用手动确认时,在消息传递和确认之间,与 RabbitMQ 节点的网络连接可能会失败。连接恢复后,RabbitMQ 将重置所有通道上的传递标签。

这意味着具有旧传递标签的 basic.ackbasic.nackbasic.reject 将导致通道异常。为了避免这种情况,RabbitMQ Java 客户端会跟踪并更新传递标签,以使其在恢复之间单调增长。

Channel.basicAckChannel.basicNackChannel.basicReject 然后将调整后的传递标签转换为 RabbitMQ 使用的标签。

不会发送具有过时传递标签的确认。使用手动确认和自动恢复的应用程序必须能够处理重新传递。

通道生命周期和拓扑恢复

自动连接恢复旨在对应用程序开发人员尽可能透明,这就是为什么即使在后台发生多次连接故障和恢复,Channel 实例也保持不变的原因。从技术上讲,当自动恢复开启时,Channel 实例充当代理或装饰器:它们将 AMQP 业务委托给实际的 AMQP 通道实现,并在其周围实现一些恢复机制。这就是为什么在通道创建了一些资源(队列、交换器、绑定)后,您不应关闭通道,否则稍后这些资源的拓扑恢复将失败,因为通道已关闭。相反,请在应用程序的整个生命周期中保持创建的通道处于打开状态。

未处理的异常

与连接、通道、恢复和消费者生命周期相关的未处理异常被委托给异常处理程序。异常处理程序是任何实现 ExceptionHandler 接口的对象。默认情况下,使用 DefaultExceptionHandler 的实例。它将异常详细信息打印到标准输出。

可以使用 ConnectionFactory#setExceptionHandler 覆盖处理程序。它将用于工厂创建的所有连接

ConnectionFactory factory = new ConnectionFactory();
cf.setExceptionHandler(customHandler);

异常处理程序应用于异常日志记录。

指标和监控

客户端收集活动连接的运行时指标(例如,已发布消息的数量)。指标收集是可选功能,应在 ConnectionFactory 级别设置,使用 setMetricsCollector(metricsCollector) 方法。此方法需要一个 MetricsCollector 实例,该实例在客户端代码的多个位置被调用。

客户端开箱即用地支持 MicrometerDropwizard MetricsOpenTelemetry

以下是收集的指标

  • 打开的连接数
  • 打开的通道数
  • 已发布消息的数量
  • 已确认消息的数量
  • 否定确认 (nack-ed) 的出站消息数量
  • 被返回的不可路由的出站消息数量
  • 出站消息失败的数量
  • 已消费消息的数量
  • 已确认消息的数量
  • 已拒绝消息的数量

Micrometer 和 Dropwizard Metrics 都提供计数,以及消息相关指标的平均速率、最近五分钟速率等。它们还支持用于监控和报告的常用工具(JMX、Graphite、Ganglia、Datadog 等)。有关更多详细信息,请参阅下面的专用部分。

开发人员在启用指标收集时应牢记一些事项。

  • 当使用 Micrometer 或 Dropwizard Metrics 时,不要忘记将适当的依赖项(在 Maven、Gradle 中,甚至作为 JAR 文件)添加到 JVM 类路径中。这些是可选依赖项,不会与 Java 客户端一起自动拉取。您可能还需要根据使用的报告后端添加其他依赖项。
  • 指标收集是可扩展的。鼓励为特定需求实现自定义 MetricsCollector
  • MetricsCollectorConnectionFactory 级别设置,但可以在不同的实例之间共享。
  • 指标收集不支持事务。例如,如果在事务中发送确认,然后回滚事务,则客户端指标中会计算该确认(但显然代理不会计算)。请注意,确认实际上已发送到代理,然后被事务回滚取消,因此客户端指标在发送的确认方面是正确的。总而言之,不要将客户端指标用于关键业务逻辑,它们不能保证完全准确。它们旨在用于简化对运行系统的推理并提高操作效率。

Micrometer 支持

必须首先启用指标收集

Micrometer,方法如下

ConnectionFactory connectionFactory = new ConnectionFactory();
MicrometerMetricsCollector metrics = new MicrometerMetricsCollector();
connectionFactory.setMetricsCollector(metrics);
...
metrics.getPublishedMessages(); // get Micrometer's Counter object

Micrometer 支持多个报告后端:Netflix Atlas、Prometheus、Datadog、Influx、JMX 等。

您通常会将 MeterRegistry 的实例传递给 MicrometerMetricsCollector。这是一个 JMX 的示例

JmxMeterRegistry registry = new JmxMeterRegistry();
MicrometerMetricsCollector metrics = new MicrometerMetricsCollector(registry);
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setMetricsCollector(metrics);

Dropwizard Metrics 支持

像这样启用 Dropwizard 的指标收集

ConnectionFactory connectionFactory = new ConnectionFactory();
StandardMetricsCollector metrics = new StandardMetricsCollector();
connectionFactory.setMetricsCollector(metrics);
...
metrics.getPublishedMessages(); // get Metrics' Meter object

Dropwizard Metrics 支持多个报告后端:控制台、JMX、HTTP、Graphite、Ganglia 等。

您通常会将 MetricsRegistry 的实例传递给 StandardMetricsCollector。这是一个 JMX 的示例

MetricRegistry registry = new MetricRegistry();
StandardMetricsCollector metrics = new StandardMetricsCollector(registry);

ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setMetricsCollector(metrics);

JmxReporter reporter = JmxReporter
.forRegistry(registry)
.inDomain("com.rabbitmq.client.jmx")
.build();
reporter.start();

Google App Engine 上的 RabbitMQ Java 客户端

在 Google App Engine (GAE) 上使用 RabbitMQ Java 客户端需要使用自定义线程工厂,该工厂使用 GAE 的 ThreadManager 实例化线程(请参阅上文)。此外,有必要设置较低的心跳间隔(4-5 秒),以避免在 GAE 上遇到较低的 InputStream 读取超时。

ConnectionFactory factory = new ConnectionFactory();
cf.setRequestedHeartbeat(5);

注意事项和限制

为了使拓扑恢复成为可能,RabbitMQ Java 客户端维护已声明的队列、交换器和绑定的缓存。缓存是每个连接的。某些 RabbitMQ 功能使客户端无法观察到某些拓扑更改,例如,当队列由于 TTL 而被删除时。RabbitMQ Java 客户端尝试在最常见的情况下使缓存条目无效

  • 当队列被删除时。
  • 当交换器被删除时。
  • 当绑定被删除时。
  • 当消费者在自动删除队列上被取消时。
  • 当队列或交换器从自动删除的交换器取消绑定时。

但是,客户端无法跟踪超出单个连接的这些拓扑更改。依赖于自动删除队列或交换器以及队列 TTL(注意:不是消息 TTL!)并使用自动连接恢复的应用程序,应显式删除已知未使用或已删除的实体,以清除客户端拓扑缓存。RabbitMQ 3.3.x 中的 Channel#queueDeleteChannel#exchangeDeleteChannel#queueUnbindChannel#exchangeUnbind 的幂等性(删除不存在的东西不会导致异常)为此提供了便利。

RPC(请求/回复)模式:一个示例

作为编程便利,Java 客户端 API 提供了一个 RpcClient 类,该类使用临时回复队列,通过 AMQP 0-9-1 提供简单的RPC 风格的通信工具。

该类不对 RPC 参数和返回值施加任何特定格式。它仅提供一种机制,用于将消息发送到给定的交换器,并使用特定的路由键,并在回复队列上等待响应。

import com.rabbitmq.client.RpcClient;

RpcClient rpc = new RpcClient(channel, exchangeName, routingKey);

(此类如何使用 AMQP 0-9-1 的实现细节如下:请求消息发送时,basic.correlation_id 字段设置为此 RpcClient 实例的唯一值,basic.reply_to 设置为回复队列的名称。)

一旦您创建了此类的实例,您可以使用以下任何方法来发送 RPC 请求

byte[] primitiveCall(byte[] message);
String stringCall(String message)
Map mapCall(Map message)
Map mapCall(Object[] keyValuePairs)

primitiveCall 方法传输原始字节数组作为请求和响应正文。方法 stringCallprimitiveCall 的一个简单的便捷包装器,它将消息正文视为默认字符编码中的 String 实例。

mapCall 变体稍微复杂一些:它们将包含普通 Java 值的 java.util.Map 编码为 AMQP 0-9-1 二进制表表示形式,并以相同的方式解码响应。(请注意,对这里可以使用的值类型有一些限制 - 请参阅 javadoc 了解详细信息。)

所有编组/解组便捷方法都使用 primitiveCall 作为传输机制,并且仅在其之上提供包装层。

TLS 支持

可以使用 TLS 加密客户端和代理之间的通信。还支持客户端和服务器身份验证(又名对等验证)。以下是在 Java 客户端中使用加密的最简单、最幼稚的方法

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5671);

// Only suitable for development.
// This code will not perform peer certificate chain verification and prone
// to man-in-the-middle attacks.
// See the main TLS guide to learn about peer verification and how to enable it.
factory.useSslProtocol();

请注意,在上面的示例中,客户端不强制执行任何服务器身份验证(对等证书链验证),因为使用了默认的“信任所有证书”TrustManager。这对于本地开发很方便,但容易受到中间人攻击,因此不建议用于生产环境

要了解有关 RabbitMQ 中 TLS 支持的更多信息,请参阅TLS 指南。如果您只想配置 Java 客户端(尤其是对等验证和信任管理器部分),请阅读 TLS 指南的相应部分

OAuth 2 支持

客户端可以针对 OAuth 2 服务器(如 UAA)进行身份验证。OAuth 2 插件必须在服务器端启用并配置为使用与客户端相同的 OAuth 2 服务器。

获取 OAuth 2 令牌

Java 客户端提供了 OAuth2ClientCredentialsGrantCredentialsProvider 类,用于使用 OAuth 2 客户端凭据流获取 JWT 令牌。客户端将在打开连接时在密码字段中发送 JWT 令牌。然后,代理将验证 JWT 令牌签名、有效性和权限,然后授权连接并授予对请求的虚拟主机的访问权限。

首选使用 OAuth2ClientCredentialsGrantCredentialsProviderBuilder 创建 OAuth2ClientCredentialsGrantCredentialsProvider 实例,然后在 ConnectionFactory 上进行设置。以下代码片段展示了如何为 OAuth 2 插件的示例设置配置和创建 OAuth 2 凭据提供程序实例

import com.rabbitmq.client.impl.OAuth2ClientCredentialsGrantCredentialsProvider.
OAuth2ClientCredentialsGrantCredentialsProviderBuilder;
...
CredentialsProvider credentialsProvider =
new OAuth2ClientCredentialsGrantCredentialsProviderBuilder()
.tokenEndpointUri("http://localhost:8080/uaa/oauth/token/")
.clientId("rabbit_client").clientSecret("rabbit_secret")
.grantType("password")
.parameter("username", "rabbit_super")
.parameter("password", "rabbit_super")
.build();

connectionFactory.setCredentialsProvider(credentialsProvider);

在生产环境中,请确保为令牌端点 URI 使用 HTTPS,并在必要时为 HTTPS 请求配置 SSLContext(以验证和信任 OAuth 2 服务器的身份)。以下代码片段通过使用 OAuth2ClientCredentialsGrantCredentialsProviderBuilder 中的 tls().sslContext() 方法来实现这一点

SSLContext sslContext = ... // create and initialise SSLContext

CredentialsProvider credentialsProvider =
new OAuth2ClientCredentialsGrantCredentialsProviderBuilder()
.tokenEndpointUri("http://localhost:8080/uaa/oauth/token/")
.clientId("rabbit_client").clientSecret("rabbit_secret")
.grantType("password")
.parameter("username", "rabbit_super")
.parameter("password", "rabbit_super")
.tls() // configure TLS
.sslContext(sslContext) // set SSLContext
.builder() // back to main configuration
.build();

请查阅 Javadoc 以查看所有可用选项。

刷新令牌

令牌会过期,代理将拒绝使用过期令牌的连接上的操作。为了避免这种情况,可以在过期之前调用 CredentialsProvider#refresh() 并将新令牌发送到服务器。从应用程序的角度来看,这很麻烦,因此 Java 客户端提供了 DefaultCredentialsRefreshService 来提供帮助。此实用程序跟踪已使用的令牌,在令牌过期之前刷新它们,并将新令牌发送给它负责的连接。

以下代码片段展示了如何创建 DefaultCredentialsRefreshService 实例并在 ConnectionFactory 上进行设置

import com.rabbitmq.client.impl.DefaultCredentialsRefreshService.
DefaultCredentialsRefreshServiceBuilder;
...
CredentialsRefreshService refreshService =
new DefaultCredentialsRefreshServiceBuilder().build();
cf.setCredentialsRefreshService(refreshService);

DefaultCredentialsRefreshService 在令牌有效期时间的 80% 后安排刷新,例如,如果令牌在 60 分钟后过期,则将在 48 分钟后刷新。这是默认行为,请查阅 Javadoc 以获取更多信息。

© . All rights reserved.