跳至主内容

RabbitMQ 教程 - 发布确认

发布者确认

(使用 Kotlin 客户端)

信息

先决条件

本教程假设 RabbitMQ 已 安装 并在 localhost 上的 标准端口 (5672) 上运行。如果您使用不同的主机、端口或凭据,则需要调整连接设置。

哪里寻求帮助

如果您在学习本教程时遇到困难,可以通过 GitHub DiscussionsRabbitMQ 社区 Discord 联系我们。

发布者确认 是 RabbitMQ 的一项扩展,用于实现可靠发布。当通道上启用了发布者确认时,客户端发布的这些消息将由代理异步确认,这意味着这些消息在服务器端已被处理。

概述

在本教程中,我们将使用发布确认(publisher confirms)来确保发布的消息已安全到达代理(broker)。我们将介绍使用发布确认的几种策略,并解释它们的优缺点。

在通道上启用发布者确认

发布确认是 RabbitMQ 对 AMQP 0.9.1 协议的一种扩展。发布确认在通道(channel)级别启用。要启用它们,请使用 confirmSelect 方法。

channel.confirmSelect()

此方法必须在每一个你打算使用发布确认的通道上调用。确认机制只需启用一次,而不是为每条发布的消息都启用。

策略 #1:单独发布消息

让我们从最简单的发布确认方式开始,即发布一条消息并同步等待其确认。

suspend fun publishMessagesIndividually(channel: AMQPChannel, messages: List<String>) {
channel.confirmSelect()

for (message in messages) {
channel.basicPublish(
message.toByteArray(),
exchange = "",
routingKey = "my_queue",
properties = Properties()
)

// Wait for confirm
val confirm = channel.publishConfirmResponses.first()

when (confirm) {
is AMQPResponse.Channel.Basic.PublishConfirm.Ack -> {
println("✓ Message confirmed: $message")
}
is AMQPResponse.Channel.Basic.PublishConfirm.Nack -> {
println("✗ Message rejected: $message")
// Handle rejection (retry, log, etc.)
}
}
}
}

在前面的示例中,我们像往常一样发布一条消息,并使用 first() 调用等待其确认。该方法会在消息被确认后立即返回。如果消息在超时时间内未被确认,或者被 nack(即代理因某种原因无法处理该消息),该方法将抛出异常。对异常的处理通常包括记录错误消息和/或重试发送消息。

不同的客户端库处理同步发布者确认的方式各不相同,因此请务必仔细阅读你所使用的客户端文档。

这种技术非常直观,但也有一个主要缺点:它**显著降低了发布速度**,因为确认一条消息会阻塞后续所有消息的发布。这种方法每秒无法实现超过几百条消息的吞吐量。尽管如此,这对某些应用来说已经足够了。

发布者确认是异步的吗?

我们在开头提到过,代理是异步确认已发布消息的,但在第一个示例中,代码是同步等待直到消息被确认的。客户端实际上是异步接收确认的,并据此取消对 first() 调用的阻塞。可以将 first() 看作是一个依赖底层异步通知的同步辅助工具。

策略 #2:批量发布消息

为了改进我们之前的示例,我们可以批量发布消息并等待整个批次被确认。下面的示例使用 100 条消息作为一个批次。

suspend fun publishMessagesInBatch(channel: AMQPChannel, messages: List<String>, batchSize: Int) {
channel.confirmSelect()

messages.chunked(batchSize).forEach { batch ->
// Publish entire batch
batch.forEach { message ->
channel.basicPublish(
message.toByteArray(),
exchange = "",
routingKey = "my_queue",
properties = Properties()
)
}

// Wait for all confirms for this batch
val confirms = channel.publishConfirmResponses.take(batch.size).toList()

val ackCount = confirms.count { it is AMQPResponse.Channel.Basic.PublishConfirm.Ack }
val nackCount = confirms.count { it is AMQPResponse.Channel.Basic.PublishConfirm.Nack }

println("Batch complete: $ackCount acks, $nackCount nacks")

if (nackCount > 0) {
// Handle failures (can't identify specific messages easily)
println("Warning: Some messages in batch were rejected")
}
}
}

与等待每条消息的确认相比,等待一批消息的确认可以显著提高吞吐量(在远程 RabbitMQ 节点上可提高 20-30 倍)。一个缺点是,如果发生故障,我们无法确切知道哪里出了问题,因此可能必须将整个批次保存在内存中,以便记录有意义的日志或重新发布消息。此外,该方案仍然是同步的,因此它会阻塞消息的发布。

策略 3:异步处理发布者确认

代理会异步确认已发布的消息,只需在客户端注册一个回调函数即可获得这些确认的通知。

suspend fun publishMessagesAsync(channel: AMQPChannel, messages: List<String>) {
channel.confirmSelect()

val outstandingConfirms = mutableMapOf<ULong, String>()
var nextDeliveryTag = 1UL

// Launch coroutine to handle confirms
val confirmJob = launch {
channel.publishConfirmResponses.collect { confirm ->
when (confirm) {
is AMQPResponse.Channel.Basic.PublishConfirm.Ack -> {
if (confirm.multiple) {
// Remove all up to and including this tag
outstandingConfirms.keys.filter { it <= confirm.deliveryTag }
.forEach { outstandingConfirms.remove(it) }
} else {
outstandingConfirms.remove(confirm.deliveryTag)
}
}
is AMQPResponse.Channel.Basic.PublishConfirm.Nack -> {
val message = outstandingConfirms[confirm.deliveryTag]
println("✗ Message nacked: $message")
// Handle specific message rejection
outstandingConfirms.remove(confirm.deliveryTag)
}
}
}
}

// Publish all messages
messages.forEach { message ->
outstandingConfirms[nextDeliveryTag] = message

channel.basicPublish(
message.toByteArray(),
exchange = "",
routingKey = "my_queue",
properties = Properties()
)

nextDeliveryTag++
}

// Wait until all confirms are received
while (outstandingConfirms.isNotEmpty()) {
delay(10)
}

confirmJob.cancel()
}

在此示例中,我们使用 Kotlin 的 Flow API 来异步处理确认。我们从 publishConfirmResponses flow 中收集确认。每条已确认的消息都会调用回调函数。我们使用一个映射(map)来追踪未决的确认。当确认到达时,我们从映射中删除该条目。如果确认信息显示多条消息已确认(multiple 字段为 true),我们会删除所有直到并包括该传递标签(delivery tag)的消息。

异步处理确认的方法需要追踪已发布的消息。我们使用一个并发映射来关联发布传递标签与消息内容。这对于记录有意义的信息或重新发布被 nack 的消息是必要的。处理确认也可以分解为“即发即弃”(fire-and-forget)的方法:后台任务或 flow 可以处理确认并相应地更新映射。

总结

确保已发布的消息到达代理对于某些应用程序至关重要。发布者确认是 RabbitMQ 的一项功能,有助于满足这一要求。发布者确认本质上是异步的,但也可以同步处理。没有一种绝对的实现发布者确认的方法,这通常取决于应用程序和整个系统的约束。典型的技术包括:

  • 单独发布消息,同步等待确认:简单,但吞吐量非常有限。
  • 批量发布消息,同步等待一批消息的确认:简单,吞吐量合理,但当出现问题时难以进行逻辑推理。
  • 异步处理:最佳性能和资源利用率,错误情况下控制良好,但正确实现可能比较复杂。

总而言之

完整示例代码

import dev.kourier.amqp.AMQPResponse
import dev.kourier.amqp.Properties
import dev.kourier.amqp.connection.amqpConfig
import dev.kourier.amqp.connection.createAMQPConnection
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.take
import kotlinx.coroutines.flow.toList

suspend fun publishMessagesIndividually(channel: AMQPChannel, messages: List<String>) {
channel.confirmSelect()

for (message in messages) {
channel.basicPublish(
message.toByteArray(),
exchange = "",
routingKey = "my_queue",
properties = Properties()
)

val confirm = channel.publishConfirmResponses.first()

when (confirm) {
is AMQPResponse.Channel.Basic.PublishConfirm.Ack -> {
println("✓ Message confirmed: $message")
}
is AMQPResponse.Channel.Basic.PublishConfirm.Nack -> {
println("✗ Message rejected: $message")
}
}
}
}

suspend fun publishMessagesInBatch(channel: AMQPChannel, messages: List<String>, batchSize: Int) {
channel.confirmSelect()

messages.chunked(batchSize).forEach { batch ->
batch.forEach { message ->
channel.basicPublish(
message.toByteArray(),
exchange = "",
routingKey = "my_queue",
properties = Properties()
)
}

val confirms = channel.publishConfirmResponses.take(batch.size).toList()

val ackCount = confirms.count { it is AMQPResponse.Channel.Basic.PublishConfirm.Ack }
val nackCount = confirms.count { it is AMQPResponse.Channel.Basic.PublishConfirm.Nack }

println("Batch complete: $ackCount acks, $nackCount nacks")

if (nackCount > 0) {
println("Warning: Some messages in batch were rejected")
}
}
}

suspend fun publishMessagesAsync(channel: AMQPChannel, messages: List<String>) {
channel.confirmSelect()

val outstandingConfirms = mutableMapOf<ULong, String>()
var nextDeliveryTag = 1UL

val confirmJob = launch {
channel.publishConfirmResponses.collect { confirm ->
when (confirm) {
is AMQPResponse.Channel.Basic.PublishConfirm.Ack -> {
if (confirm.multiple) {
outstandingConfirms.keys.filter { it <= confirm.deliveryTag }
.forEach { outstandingConfirms.remove(it) }
} else {
outstandingConfirms.remove(confirm.deliveryTag)
}
}
is AMQPResponse.Channel.Basic.PublishConfirm.Nack -> {
val message = outstandingConfirms[confirm.deliveryTag]
println("✗ Message nacked: $message")
outstandingConfirms.remove(confirm.deliveryTag)
}
}
}
}

messages.forEach { message ->
outstandingConfirms[nextDeliveryTag] = message

channel.basicPublish(
message.toByteArray(),
exchange = "",
routingKey = "my_queue",
properties = Properties()
)

nextDeliveryTag++
}

while (outstandingConfirms.isNotEmpty()) {
delay(10)
}

confirmJob.cancel()
}

fun main() = runBlocking {
val config = amqpConfig {
server {
host = "localhost"
}
}
val connection = createAMQPConnection(this, config)
val channel = connection.openChannel()

channel.queueDeclare("my_queue", false, false, true, emptyMap())

val messages = List(1000) { "Message $it" }

val startTime = System.currentTimeMillis()
publishMessagesAsync(channel, messages)
val duration = System.currentTimeMillis() - startTime

println("Published ${messages.size} messages in ${duration}ms")

channel.close()
connection.close()
}

本教程到此结束。请注意,发布确认是一个高级功能,可能并非所有应用程序都需要。有关发布确认及其他可靠性功能的更多信息,请参阅可靠性相关文档

© . This site is unofficial and not affiliated with VMware.