跳至主要内容

RabbitMQ 教程 - 工作队列

工作队列

(使用 Objective-C 客户端)

信息

先决条件

本教程假设 RabbitMQ 已 安装 并在 localhost 上的 标准端口 (5672) 上运行。如果您使用不同的主机、端口或凭据,则需要调整连接设置。

获取帮助

如果您在学习本教程时遇到问题,可以通过 GitHub 讨论RabbitMQ 社区 Discord 联系我们。

第一个教程 中,我们编写了从命名队列发送和接收消息的方法。在本教程中,我们将创建一个工作队列,用于在多个工作进程之间分配耗时的任务。

工作队列(又名:任务队列)背后的主要思想是避免立即执行资源密集型任务并等待其完成。相反,我们安排稍后执行任务。我们将任务封装为消息并将其发送到队列。后台运行的工作进程将获取任务并最终执行作业。当您运行多个工作进程时,任务将在它们之间共享。

此概念在 Web 应用程序中尤其有用,在 Web 应用程序中,不可能在短暂的 HTTP 请求窗口期间处理复杂的任务。

准备

在本教程的 上一部分 中,我们发送了一条包含“Hello World!”的消息。现在,我们将发送代表复杂任务的字符串。我们没有像需要调整大小的图像或需要渲染的 pdf 文件这样的现实世界任务,因此让我们通过假装很忙来模拟它 - 通过使用 sleep。我们将字符串中点的数量作为其复杂度;每个点将占“工作”一秒。例如,由 Hello... 描述的伪任务将花费三秒钟。

我们将稍微修改 我们之前示例 中的 send 方法,以允许将任意字符串作为方法参数发送。此方法将任务调度到我们的工作队列,因此让我们将其重命名为 newTask。除了新的参数外,实现保持不变。

func newTask(_ msg: String) {
let conn = RMQConnection(delegate: RMQConnectionDelegateLogger())
conn.start()
let ch = conn.createChannel()
let q = ch.queue("task_queue", options: .durable)
let msgData = msg.data(using: .utf8)
ch.defaultExchange().publish(msgData, routingKey: q.name, persistent: true)
print("Sent \(msg)")
conn.close()
}

我们旧的 receive 方法需要一些较大的更改:它需要为消息体中的每个点模拟一秒的工作。如果每个工作进程都有一个名称,并且每个工作进程都需要从队列中获取消息并执行任务,这将有助于我们了解正在发生的事情,因此让我们将其称为 workerNamed()

q.subscribe({(_ message: RMQMessage) -> Void in
let messageText = String(data: message.body, encoding: .utf8)
print("\(name): Received \(messageText)")
// imitate some work
let sleepTime = UInt(messageText.components(separatedBy: ".").count) - 1
print("\(name): Sleeping for \(sleepTime) seconds")
sleep(sleepTime)
})

请注意,我们的伪任务模拟执行时间。

viewDidLoad 中运行它们,就像教程一一样。

override func viewDidLoad() {
super.viewDidLoad()
self.newTask("Hello World...")
self.workerNamed("Flopsy")
}

日志输出应表明 Flopsy 正在休眠三秒钟。

循环分发

使用任务队列的优势之一是能够轻松地并行化工作。如果我们正在积压工作,我们可以添加更多工作进程,从而轻松扩展。

让我们尝试同时运行两个 workerNamed() 方法。它们都将从队列中获取消息,但究竟是如何获取的呢?让我们看看。

更改 viewDidLoad 以发送更多消息并启动两个工作进程。

override func viewDidLoad() {
super.viewDidLoad()
self.workerNamed("Jack")
self.workerNamed("Jill")
self.newTask("Hello World...")
self.newTask("Just one this time.")
self.newTask("Five.....")
self.newTask("None")
self.newTask("Two..dots")
}

让我们看看我们的工作进程收到了什么。

# => Jack: Waiting for messages
# => Jill: Waiting for messages
# => Sent Hello World...
# => Jack: Received Hello World...
# => Jack: Sleeping for 3 seconds
# => Sent Just one this time.
# => Jill: Received Just one this time.
# => Jill: Sleeping for 1 seconds
# => Sent Five.....
# => Sent None
# => Sent Two..dots
# => Jill: Received Five.....
# => Jill: Sleeping for 5 seconds
# => Jack: Received None
# => Jack: Sleeping for 0 seconds
# => Jack: Received Two..dots
# => Jack: Sleeping for 2 seconds

默认情况下,RabbitMQ 将依次将每条消息发送到下一个消费者。平均而言,每个消费者将收到相同数量的消息。这种分发消息的方式称为循环分发。尝试使用三个或更多工作进程来测试此功能。

消息确认

执行任务可能需要几秒钟,您可能想知道如果消费者启动一个长时间的任务并在完成之前终止会发生什么。使用我们当前的代码,一旦 RabbitMQ 将消息传递给消费者,它就会立即将其标记为删除。在这种情况下,如果您终止一个工作进程,它正在处理的消息就会丢失。分派到此特定工作进程但尚未处理的消息也会丢失。

但我们不想丢失任何任务。如果一个工作进程死亡,我们希望该任务被传递给另一个工作进程。

为了确保消息永不丢失,RabbitMQ 支持 消息确认。消费者发送回一个 ack(确认),告诉 RabbitMQ 特定的消息已收到并处理,并且 RabbitMQ 可以将其删除。

如果一个消费者死亡(其通道关闭、连接关闭或 TCP 连接丢失)而没有发送 ack,RabbitMQ 将理解消息未完全处理,并将将其重新排队。如果同时还有其他消费者在线,它将很快将其重新传递给另一个消费者。这样,您可以确保即使工作进程偶尔死亡,也不会丢失任何消息。

对消费者传递确认强制执行超时(默认情况下为 30 分钟)。这有助于检测从不确认传递的错误(卡住)的消费者。您可以按照 传递确认超时 中的说明增加此超时。

消息确认在客户端默认情况下是关闭的,但在 AMQ 协议中不是(RMQBasicConsumeOptions.noAck 选项由 subscribe() 自动发送)。现在是时候通过显式设置一个空的 RMQBasicConsumeOptions 并从工作进程完成任务后发送适当的确认来打开确认了。

let manualAck = RMQBasicConsumeOptions()
q.subscribe(manualAck, handler: {(_ message: RMQMessage) -> Void in
let messageText = String(data: message.body, encoding: .utf8)
print("\(name): Received \(messageText)")
// imitate some work
let sleepTime = UInt(messageText.components(separatedBy: ".").count) - 1
print("\(name): Sleeping for \(sleepTime) seconds")
sleep(sleepTime)
ch.ack(message.deliveryTag)
})

使用此代码,我们可以确保即使工作进程在处理消息时死亡,也不会丢失任何内容。工作进程死亡后不久,所有未确认的消息都将被重新传递。

确认必须在接收传递的同一通道上发送。尝试使用不同的通道进行确认会导致通道级别的协议异常。请参阅 关于确认的文档指南 以了解更多信息。

遗漏的确认

遗漏 ack 是一个常见的错误。这是一个容易发生的错误,但后果很严重。当您的客户端退出时,消息将被重新传递(这可能看起来像随机重新传递),但 RabbitMQ 将消耗越来越多的内存,因为它将无法释放任何未确认的消息。

为了调试此类错误,您可以使用 rabbitmqctl 打印 messages_unacknowledged 字段。

sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged

在 Windows 上,删除 sudo。

rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged

消息持久性

我们已经了解了如何确保即使消费者死亡,任务也不会丢失。但是,如果 RabbitMQ 服务器停止,我们的任务仍然会丢失。

当 RabbitMQ 退出或崩溃时,它将忘记队列和消息,除非您告诉它不要这样做。要确保消息不会丢失,需要两件事:我们需要将队列和消息都标记为持久。

首先,我们需要确保队列能够在 RabbitMQ 节点重启后继续存在。为此,我们需要将其声明为持久

let q = ch.queue("hello", options: .durable)

虽然此命令本身是正确的,但它在我们的当前设置中不起作用。这是因为我们已经定义了一个名为 hello 的队列,该队列不是持久的。RabbitMQ 不允许您使用不同的参数重新定义现有队列,并且会向尝试执行此操作的任何程序返回错误。但是,有一个快速的解决方法 - 让我们声明一个具有不同名称的队列,例如 task_queue

var q = ch.queue("task_queue", options: .durable)

options: .durable 更改需要应用于生产者和消费者代码。

此时,我们可以确定即使 RabbitMQ 重启,task_queue 队列也不会丢失。现在我们需要将我们的消息标记为持久。

  • 通过使用 persistent 选项。
ch.defaultExchange().publish(msgData, routingKey: q.name, persistent: true)

关于消息持久性的说明

将消息标记为持久并不完全保证消息不会丢失。虽然它告诉 RabbitMQ 将消息保存到磁盘,但仍然存在一个短暂的时间窗口,RabbitMQ 在接受消息但尚未保存它的时候。此外,RabbitMQ 不会为每条消息执行 fsync(2) -- 它可能只是保存到缓存中,而不是真正写入磁盘。持久性保证并不强大,但对于我们的简单任务队列来说已经足够了。如果您需要更强大的保证,则可以使用 发布者确认

公平分发

您可能已经注意到,分发仍然没有完全按照我们想要的方式工作。例如,在有两个工作进程的情况下,当所有奇数消息都很重而偶数消息很轻时,一个工作进程将持续忙碌,而另一个工作进程几乎不做任何工作。好吧,RabbitMQ 对此一无所知,并且仍然会均匀地分发消息。

发生这种情况是因为 RabbitMQ 只在消息进入队列时才分发消息。它不会查看消费者的未确认消息数量。它只是盲目地将每第 n 条消息分派给第 n 个消费者。

为了解决这个问题,我们可以使用 basicQos(global:) 方法,其预取值为 @1。这告诉 RabbitMQ 一次不要向工作进程提供超过一条消息。或者,换句话说,在工作进程处理并确认上一条消息之前,不要向其分派新消息。相反,它会将其分派给下一个尚未忙碌的工作进程。

ch.basicQos(1, global: false)

关于队列大小的说明

如果所有 worker 都很忙,您的队列可能会填满。您需要留意这一点,并可能添加更多 worker,或采用其他策略。

综合示例

我们 newTask() 方法的最终代码

func newTask(_ msg: String) {
let conn = RMQConnection(delegate: RMQConnectionDelegateLogger())
conn.start()
let ch = conn.createChannel()
let q = ch.queue("task_queue", options: .durable)
let msgData = msg.data(using: .utf8)
ch.defaultExchange().publish(msgData, routingKey: q.name, persistent: true)
print("Sent \(msg)")
conn.close()
}

以及我们的 workerNamed()

func workerNamed(_ name: String) {
let conn = RMQConnection(delegate: RMQConnectionDelegateLogger())
conn.start()
let ch = conn.createChannel()
let q = ch.queue("task_queue", options: .durable)
ch.basicQos(1, global: false)
print("\(name): Waiting for messages")
let manualAck = RMQBasicConsumeOptions()
q.subscribe(manualAck, handler: {(_ message: RMQMessage) -> Void in
let messageText = String(data: message.body, encoding: .utf8)
print("\(name): Received \(messageText)")
// imitate some work
let sleepTime = UInt(messageText.components(separatedBy: ".").count) - 1
print("\(name): Sleeping for \(sleepTime) seconds")
sleep(sleepTime)
ch.ack(message.deliveryTag)
})
}

(源代码)

使用消息确认和预取,您可以设置一个工作队列。持久性选项允许任务即使在 RabbitMQ 重新启动后也能继续存在。

现在我们可以继续学习教程 3,了解如何将相同的消息传递给多个消费者。

© 2024 RabbitMQ. All rights reserved.