RabbitMQ 教程 - 工作队列
工作队列
(使用 amqp Elixir 库)
先决条件
本教程假设 RabbitMQ 已 安装 并在 localhost 上的 标准端口 (5672) 上运行。如果您使用不同的主机、端口或凭据,则需要调整连接设置。
哪里寻求帮助
如果您在学习本教程时遇到困难,可以通过 GitHub Discussions 或 RabbitMQ 社区 Discord 联系我们。
在第一个教程中,我们编写了用于向命名队列发送和接收消息的程序。在本教程中,我们将创建一个工作队列(Work Queue),用于在多个工作进程(worker)之间分配耗时的任务。
工作队列(又称:任务队列)背后的核心思想是避免立即执行资源密集型任务并等待其完成。相反,我们安排任务在稍后执行。我们将任务封装为消息并发送到队列。在后台运行的工作进程会取出任务并最终执行该作业。当你运行多个工作进程时,任务将在它们之间共享。
这个概念对于 Web 应用程序尤其有用,在这些应用程序中,不可能在短暂的 HTTP 请求窗口内处理一个复杂的任务。
准备工作
在本教程的前一部分,我们发送了一条包含 "Hello World!" 的消息。现在,我们将发送代表复杂任务的字符串。我们没有类似于调整图像大小或渲染 PDF 文件那样的真实任务,所以让我们通过使用 :timer.sleep/1 函数来假装我们在忙碌。我们将字符串中点的数量视为任务的复杂度;每一个点代表一秒的“工作量”。例如,由 Hello... 描述的虚假任务将耗时三秒。
我们将稍微修改前一个示例中的 send.exs 代码,以允许从命令行发送任意消息。该程序将把任务调度到我们的工作队列,所以让我们将其命名为 new_task.exs。
message =
case System.argv do
[] -> "Hello World!"
words -> Enum.join(words, " ")
end
AMQP.Basic.publish(channel, "", "task_queue", message, persistent: true)
IO.puts " [x] Send '#{message}'"
我们旧的 receive.exs 脚本也需要做一些改动:它需要针对消息体中的每一个点模拟一秒的工作时间。它将从队列中取出消息并执行任务,所以让我们将其命名为 worker.exs。
defmodule Worker do
def wait_for_messages(channel) do
receive do
{:basic_deliver, payload, meta} ->
IO.puts " [x] Received #{payload}"
payload
|> to_char_list
|> Enum.count(fn x -> x == ?. end)
|> Kernel.*(1000)
|> :timer.sleep
IO.puts " [x] Done."
wait_for_messages(channel)
end
end
end
轮询分发
使用任务队列的一个优点是能够轻松地并行化工作。如果我们正在积累工作积压,我们可以添加更多的工作进程,从而轻松扩展。
首先,让我们尝试同时运行两个 worker.exs 脚本。它们都会从队列中获取消息,但具体是如何获取的呢?让我们来看看。
你需要打开三个控制台。两个将运行 worker.exs 脚本。这些控制台将充当我们的两个消费者——C1 和 C2。
mix run worker.exs
# => [*] Waiting for messages. To exit press CTRL+C, CTRL+C
mix run worker.exs
# => [*] Waiting for messages. To exit press CTRL+C, CTRL+C
在第三个中,我们将发布新任务。一旦您启动了消费者,您就可以发布几条消息
# shell 3
mix run new_task.exs First message.
mix run new_task.exs Second message..
mix run new_task.exs Third message...
mix run new_task.exs Fourth message....
mix run new_task.exs Fifth message.....
让我们看看分发给我们的工作进程的内容
# shell 1
mix run worker.exs
# => [*] Waiting for messages. To exit press CTRL+C, CTRL+C
# => [x] Received 'First message.'
# => [x] Received 'Third message...'
# => [x] Received 'Fifth message.....'
# shell 2
mix run worker.exs
# => [*] Waiting for messages. To exit press CTRL+C, CTRL+C
# => [x] Received 'Second message..'
# => [x] Received 'Fourth message....'
默认情况下,RabbitMQ 会按顺序将每条消息发送给下一个消费者。平均而言,每个消费者将获得相同数量的消息。这种分发消息的方式称为轮询。尝试使用三个或更多工作进程进行此操作。
消息确认
执行任务可能需要几秒钟,所以你可能想知道如果一个消费者开始了一个长时间的任务,而在它未完全完成时终止了会发生什么。使用我们当前的代码,一旦 RabbitMQ 将消息传递给消费者,它会立即标记为删除。在这种情况下,如果你终止了一个工作进程,它正在处理的消息就会丢失。所有发送给这个特定工作进程但尚未处理的消息也会丢失。
但是我们不想丢失任何任务。如果一个工作进程死亡,我们希望将任务分发给另一个工作进程。
为了确保消息永远不会丢失,RabbitMQ 支持消息确认(acknowledgments)。消费者会向 RabbitMQ 发回确认(ack),告知某条消息已被接收、处理,且 RabbitMQ 可以自由地将其删除。
如果消费者死亡(其通道关闭、连接关闭或 TCP 连接丢失)而未发送 ack,RabbitMQ 将理解消息未完全处理,并会将其重新排队。如果当时有其他消费者在线,它将迅速将其重新分发给另一个消费者。这样,您可以确信没有消息会丢失,即使工作进程偶尔会死亡。
强制执行消费者交付确认的超时(默认为 30 分钟)。这有助于检测从未确认交付的错误(卡住)的消费者。您可以按照交付确认超时中的说明增加此超时。
手动消息确认默认是开启的。在之前的示例中,我们通过 no_ack: true 标志显式地关闭了它们。现在是时候移除这个标志,并在我们完成任务后从工作进程发送正确的确认了。
defmodule Worker do
def wait_for_messages(channel) do
receive do
{:basic_deliver, payload, meta} ->
IO.puts " [x] Received #{payload}"
payload
|> to_char_list
|> Enum.count(fn x -> x == ?. end)
|> Kernel.*(1000)
|> :timer.sleep
IO.puts " [x] Done."
AMQP.Basic.ack(channel, meta.delivery_tag)
wait_for_messages(channel)
end
end
end
AMQP.Basic.consume(channel, "hello")
使用此代码,即使你在工作节点处理消息时使用 CTRL+C 将其终止,你也可以确保没有任何内容丢失。在工作进程终止后不久,所有未确认的消息都会被重新发送。
确认必须在接收到交付的同一通道上发送。尝试使用不同通道进行确认将导致通道级协议异常。请参阅确认文档指南以了解更多信息。
遗忘的确认
漏掉
AMQP.Basic.ack是一个常见的错误。这很容易发生,但后果很严重。当你的客户端退出时,消息会被重新发送(这看起来像是随机的重新发送),但 RabbitMQ 将消耗越来越多的内存,因为它无法释放任何未确认的消息。为了调试此类错误,您可以使用
rabbitmqctl打印messages_unacknowledged字段sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged在 Windows 上,删除 sudo
rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged
消息持久性
我们已经学习了如何确保即使工作进程死亡,任务也不会丢失。但是,如果 RabbitMQ 服务器停止,我们的任务仍然会丢失。
当 RabbitMQ 退出或崩溃时,它会忘记队列和消息,除非您告诉它不要这样做。需要两件事来确保消息不会丢失:我们需要将队列和消息都标记为持久。
首先,我们需要确保队列能够承受 RabbitMQ 节点重启。为此,我们需要将其声明为持久
AMQP.Queue.declare(channel, "hello", durable: true, arguments: [{"x-queue-type", :longstr, "quorum"}])
虽然这个命令本身是正确的,但在我们的设置中它无法工作。这是因为我们已经定义了一个名为 hello 的非持久化队列。RabbitMQ 不允许你使用不同的参数重新定义现有的队列,如果任何程序尝试这样做,它将返回错误。但有一个快速的解决方法——让我们声明一个不同名称的队列,例如 task_queue。
AMQP.Queue.declare(channel, "task_queue", durable: true, arguments: [{"x-queue-type", :longstr, "quorum"}])
此 AMQP.Queue.declare 的更改需要同时应用于生产者和消费者的代码中。
此时,我们可以确信即使 RabbitMQ 重启,task_queue 队列也不会丢失。现在我们需要将我们的消息标记为持久化,
- 通过提供
persistent: true属性来实现。
AMQP.Basic.publish(channel, "", "task_queue", message, persistent: true)
关于消息持久化的注意事项
将消息标记为持久并不能完全保证消息不会丢失。尽管它告诉 RabbitMQ 将消息保存到磁盘,但在 RabbitMQ 接受消息但尚未保存的短时间窗口内仍然存在风险。此外,RabbitMQ 不会为每条消息执行
fsync(2)——它可能只保存在缓存中而未真正写入磁盘。持久性保证不强,但对于我们简单的任务队列来说已绰绰有余。如果您需要更强的保证,可以使用发布者确认。
公平分派
您可能已经注意到调度仍然不能完全按我们想要的方式工作。例如,在有两个工作进程的情况下,当所有奇数消息都很重而偶数消息都很轻时,一个工作进程将持续忙碌,而另一个工作进程几乎不做任何工作。好吧,RabbitMQ 对此一无所知,它仍然会均匀地分发消息。
这是因为 RabbitMQ 在消息进入队列时就会分发消息。它不会查看消费者未确认消息的数量。它只是盲目地将第 n 条消息分发给第 n 个消费者。
为了解决这个问题,我们可以使用带有 prefetch_count: 1 设置的 AMQP.Basic.qos 函数。这告诉 RabbitMQ 不要一次给工作进程超过一条消息。换句话说,在工作进程处理并确认上一条消息之前,不要给它发送新消息。相反,它会将消息发送给下一个尚未忙碌的工作进程。
AMQP.Basic.qos(channel, prefetch_count: 1)
关于队列大小的说明
如果所有的工作者都很忙,您的队列可能会填满。您需要留意这种情况,并考虑增加更多工作者,或者使用消息 TTL。
总而言之
我们 new_task.exs 脚本的最终代码
{:ok, connection} = AMQP.Connection.open
{:ok, channel} = AMQP.Channel.open(connection)
AMQP.Queue.declare(channel, "task_queue", durable: true, arguments: [{"x-queue-type", :longstr, "quorum"}])
message =
case System.argv do
[] -> "Hello World!"
words -> Enum.join(words, " ")
end
AMQP.Basic.publish(channel, "", "task_queue", message, persistent: true)
IO.puts " [x] Sent '#{message}'"
AMQP.Connection.close(connection)
以及我们的工作进程
defmodule Worker do
def wait_for_messages(channel) do
receive do
{:basic_deliver, payload, meta} ->
IO.puts " [x] Received #{payload}"
payload
|> to_char_list
|> Enum.count(fn x -> x == ?. end)
|> Kernel.*(1000)
|> :timer.sleep
IO.puts " [x] Done."
AMQP.Basic.ack(channel, meta.delivery_tag)
wait_for_messages(channel)
end
end
end
{:ok, connection} = AMQP.Connection.open
{:ok, channel} = AMQP.Channel.open(connection)
AMQP.Queue.declare(channel, "task_queue", durable: true, arguments: [{"x-queue-type", :longstr, "quorum"}])
AMQP.Basic.qos(channel, prefetch_count: 1)
AMQP.Basic.consume(channel, "task_queue")
IO.puts " [*] Waiting for messages. To exit press CTRL+C, CTRL+C"
Worker.wait_for_messages(channel)
通过使用消息确认和 prefetch_count,你可以建立一个工作队列。持久化选项可以让任务即使在 RabbitMQ 重启后依然存在。
现在我们可以继续学习教程 3,了解如何将同一条消息发送给多个消费者。