跳至主要内容

RabbitMQ 教程 - 工作队列

工作队列

(使用 Bunny)

信息

先决条件

本教程假设 RabbitMQ 已经安装并运行在 localhost 上的标准端口(5672)。如果您使用其他主机、端口或凭据,则连接设置需要调整。

哪里可以获得帮助

如果您在学习本教程时遇到问题,可以通过GitHub 讨论RabbitMQ 社区 Discord联系我们。

第一个教程中,我们编写了程序来发送和接收来自命名队列的消息。在本教程中,我们将创建一个工作队列,用于将耗时的任务分配给多个工作进程。

工作队列(也称为任务队列)背后的主要思想是避免立即执行资源密集型任务并等待它完成。相反,我们计划稍后执行任务。我们将一个任务封装成一条消息并将其发送到一个队列。一个在后台运行的工作进程将弹出任务并最终执行该任务。当您运行多个工作进程时,任务将在它们之间共享。

这个概念在 web 应用程序中特别有用,因为在短暂的 HTTP 请求窗口内无法处理复杂的任务。

准备

在本教程的上一部分中,我们发送了一条包含 "Hello World!" 的消息。现在,我们将发送代表复杂任务的字符串。我们没有现实世界的任务,比如需要调整大小的图像或需要渲染的 pdf 文件,所以让我们假装很忙 - 通过使用 Kernel#sleep 方法。我们将字符串中的点号数量作为其复杂性;每个点号将代表一秒钟的 "工作"。例如,由 Hello... 描述的假任务将花费三秒钟。

我们将稍微修改我们之前示例中的send.rb 代码,以允许从命令行发送任意消息。该程序将安排任务到我们的工作队列,所以让我们将其命名为 new_task.rb

message = ARGV.empty? ? 'Hello World!' : ARGV.join(' ')

queue.publish(message, persistent: true)
puts " [x] Sent #{message}"

我们之前的receive.rb 脚本也需要一些更改:它需要为消息主体中的每个点号模拟一秒钟的工作。它将从队列中弹出消息并执行任务,所以让我们将其命名为 worker.rb

queue.subscribe(block: true) do |delivery_info, _properties, body|
puts " [x] Received #{body}"
# imitate some work
sleep body.count('.').to_i
puts ' [x] Done'
end

请注意,我们的假任务模拟执行时间。

像教程一中那样运行它们

# shell 1
ruby worker.rb
# shell 2
ruby new_task.rb

循环调度

使用任务队列的优势之一是可以轻松地并行化工作。如果我们正在积压工作,我们可以简单地添加更多工作进程,并以这种方式轻松扩展。

首先,让我们尝试同时运行两个 worker.rb 脚本。它们都将从队列中获取消息,但究竟是如何获取的?让我们来看看。

您需要打开三个控制台。两个将运行 worker.rb 脚本。这些控制台将成为我们的两个消费者 - C1 和 C2。

# shell 1
ruby worker.rb
# => [*] Waiting for messages. To exit press CTRL+C
# shell 2
ruby worker.rb
# => [*] Waiting for messages. To exit press CTRL+C

在第三个控制台中,我们将发布新任务。启动消费者后,您可以发布一些消息

# shell 3
ruby new_task.rb First message.
ruby new_task.rb Second message..
ruby new_task.rb Third message...
ruby new_task.rb Fourth message....
ruby new_task.rb Fifth message.....

让我们看看我们的工作进程接收到了什么

# shell 1
ruby worker.rb
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'First message.'
# => [x] Received 'Third message...'
# => [x] Received 'Fifth message.....'
# shell 2
ruby worker.rb
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'Second message..'
# => [x] Received 'Fourth message....'

默认情况下,RabbitMQ 将依次将每条消息发送到下一个消费者。平均而言,每个消费者将获得相同数量的消息。这种分配消息的方式称为循环调度。使用三个或更多工作进程尝试一下。

消息确认

执行一个任务可能需要几秒钟,您可能想知道如果一个消费者开始一个长时间的任务并且它在完成之前终止了会发生什么。使用我们当前的代码,一旦 RabbitMQ 将一条消息传递给消费者,它会立即将其标记为删除。在这种情况下,如果您终止了一个工作进程,它正在处理的消息就会丢失。分配给该特定工作进程但尚未处理的消息也会丢失。

但我们不想丢失任何任务。如果一个工作进程死亡,我们希望任务被传递给另一个工作进程。

为了确保消息永远不会丢失,RabbitMQ 支持消息确认。消费者会发送一个 ack(确认)来告诉 RabbitMQ 已经接收并处理了特定消息,并且 RabbitMQ 可以将其删除。

如果消费者在没有发送 ack 的情况下死亡(它的通道关闭、连接关闭或 TCP 连接丢失),RabbitMQ 将理解消息未完全处理并将其重新放入队列。如果有其他消费者在线,它将立即将其重新传递给另一个消费者。这样,即使工作进程偶尔死亡,您也可以确保没有消息丢失。

消费者交付确认会强制执行超时(默认值为 30 分钟)。这有助于检测从未确认交付的错误(卡住)消费者。您可以根据交付确认超时中的说明增加此超时。

消息确认默认情况下是关闭的。现在,使用 :manual_ack 选项将其打开,并在完成任务后从工作进程发送正确的确认。

queue.subscribe(manual_ack: true, block: true) do |delivery_info, _properties, body|
puts " [x] Received '#{body}'"
# imitate some work
sleep body.count('.').to_i
puts ' [x] Done'
channel.ack(delivery_info.delivery_tag)
end

使用此代码,您可以确保即使在使用 CTRL+C 终止工作进程时,它正在处理消息,也不会丢失任何内容。在工作进程终止后不久,所有未确认的消息将被重新传递。

确认必须在接收交付的相同通道上发送。尝试使用其他通道确认将导致通道级别的协议异常。有关详细信息,请参阅有关确认的文档指南

忘记确认

遗漏 ack 是一个常见的错误。这是一个容易犯的错误,但后果很严重。当您的客户端退出时,消息将被重新传递(看起来像是随机重新传递),但 RabbitMQ 会占用越来越多的内存,因为它无法释放任何未确认的消息。

为了调试这种错误,您可以使用 rabbitmqctl 打印 messages_unacknowledged 字段

sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged

在 Windows 上,去掉 sudo

rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged

消息持久性

我们已经了解了如何确保即使消费者死亡,任务也不会丢失。但如果 RabbitMQ 服务器停止,我们的任务仍然会丢失。

当 RabbitMQ 退出或崩溃时,它会忘记队列和消息,除非您告诉它不要忘记。为了确保消息不会丢失,需要做两件事:我们需要将队列和消息都标记为持久。

首先,我们需要确保队列能够在 RabbitMQ 节点重启后幸存下来。为此,我们需要将其声明为持久

channel.queue('hello', durable: true)

虽然此命令本身是正确的,但它在当前设置中将不起作用。这是因为我们已经定义了一个名为 hello 的队列,它不是持久的。RabbitMQ 不允许您使用不同的参数重新定义现有的队列,并且会向尝试执行此操作的任何程序返回错误。但有一个简单的解决方法 - 让我们声明一个具有不同名称的队列,例如 task_queue

channel.queue('task_queue', durable: true)

:durable 选项更改需要应用于生产者和消费者代码。

此时,我们确信即使 RabbitMQ 重启,task_queue 队列也不会丢失。现在,我们需要将我们的消息标记为持久

  • 通过使用 :persistent 选项,Bunny::Exchange#publish 会接受。
exchange.publish(message, persistent: true)

关于消息持久性的说明

将消息标记为持久并不完全保证消息不会丢失。虽然它告诉 RabbitMQ 将消息保存到磁盘,但仍然有一个短暂的时间窗口,RabbitMQ 在接受消息后尚未保存它。此外,RabbitMQ 不会对每条消息执行 fsync(2) - 它可能只是保存到缓存而不是真正写入磁盘。持久性保证并不强大,但对于我们的简单任务队列来说已经足够了。如果您需要更强大的保证,则可以使用发布者确认

公平调度

您可能已经注意到,调度仍然没有完全按照我们想要的方式工作。例如,在有两个工作进程的情况下,当所有奇数消息都很重而偶数消息很轻时,一个工作进程会一直很忙,而另一个工作进程几乎不做任何工作。好吧,RabbitMQ 对此一无所知,并且仍然会均匀地分配消息。

发生这种情况是因为 RabbitMQ 只会在消息进入队列时分配消息。它不会查看消费者未确认的消息数量。它只是盲目地将每条第 n 条消息分配给第 n 个消费者。

为了解决这个问题,我们可以使用 prefetch 方法,其值为 1。这告诉 RabbitMQ 一次不要向一个工作进程提供超过一条消息。换句话说,在工作进程处理并确认之前,不要将其分配给工作进程的新消息。相反,它将将其分配给下一个尚未忙碌的工作进程。

n = 1;
channel.prefetch(n);

关于队列大小的说明

如果所有 worker 都很忙,你的队列可能会被填满。你可能需要关注这一点,也许增加更多 worker,或者采用其他策略。

综合起来

我们 new_task.rb 类的最终代码

#!/usr/bin/env ruby
require 'bunny'

connection = Bunny.new(automatically_recover: false)
connection.start

channel = connection.create_channel
queue = channel.queue('task_queue', durable: true)

message = ARGV.empty? ? 'Hello World!' : ARGV.join(' ')

queue.publish(message, persistent: true)
puts " [x] Sent #{message}"

connection.close

(new_task.rb 源代码)

还有我们的 worker.rb

#!/usr/bin/env ruby
require 'bunny'

connection = Bunny.new(automatically_recover: false)
connection.start

channel = connection.create_channel
queue = channel.queue('task_queue', durable: true)

channel.prefetch(1)
puts ' [*] Waiting for messages. To exit press CTRL+C'

begin
queue.subscribe(manual_ack: true, block: true) do |delivery_info, _properties, body|
puts " [x] Received '#{body}'"
# imitate some work
sleep body.count('.').to_i
puts ' [x] Done'
channel.ack(delivery_info.delivery_tag)
end
rescue Interrupt => _
connection.close
end

(worker.rb 源代码)

使用消息确认和 prefetch,你可以设置一个工作队列。持久性选项允许任务即使在 RabbitMQ 重新启动后也能幸存。

有关 Bunny::Channel 方法和消息属性的更多信息,你可以浏览 Bunny API 参考

现在我们可以继续学习 教程 3,并了解如何将相同的消息传递给多个消费者。

© 2024 RabbitMQ. All rights reserved.