跳到主要内容

RabbitMQ 教程 - 工作队列

工作队列

(使用 amqp Elixir 库)

信息

先决条件

本教程假设 RabbitMQ 已安装并在localhost上的标准端口(5672)运行。如果您使用其他主机、端口或凭据,则连接设置需要进行调整。

在何处获得帮助

如果您在学习本教程时遇到问题,可以通过GitHub 讨论RabbitMQ 社区 Discord联系我们。

第一个教程中,我们编写了程序来从命名队列发送和接收消息。在本教程中,我们将创建一个工作队列,用于在多个工作进程之间分配耗时的任务。

工作队列(又称:任务队列)背后的主要思想是避免立即执行资源密集型任务,并等待其完成。相反,我们将任务安排在稍后执行。我们将任务封装为消息,并将其发送到队列。在后台运行的工作进程将弹出任务,并最终执行该工作。当您运行多个工作进程时,这些任务将在它们之间共享。

这个概念在 Web 应用程序中特别有用,在 Web 应用程序中,不可能在短暂的 HTTP 请求窗口期间处理复杂的任务。

准备

在本教程的上一部分,我们发送了一条包含 "Hello World!" 的消息。现在,我们将发送代表复杂任务的字符串。我们没有真实世界的任务,比如要调整大小的图像或要渲染的 pdf 文件,所以让我们通过假装很忙来假装一下,方法是使用:timer.sleep/1函数。我们将以字符串中的点号数量作为其复杂度;每个点号将代表一秒钟的 "工作"。例如,由Hello...描述的假任务将需要三秒钟。

我们将稍微修改我们之前示例中的send.exs代码,以允许从命令行发送任意消息。该程序将安排任务到我们的工作队列,因此让我们将其命名为new_task.exs

message =
case System.argv do
[] -> "Hello World!"
words -> Enum.join(words, " ")
end

AMQP.Basic.publish(channel, "", "task_queue", message, persistent: true)

IO.puts " [x] Send '#{message}'"

我们旧的receive.exs脚本也需要一些更改:它需要为消息正文中的每个点号模拟一秒钟的工作。它将从队列中弹出消息并执行任务,因此让我们将其命名为worker.exs

defmodule Worker do
def wait_for_messages(channel) do
receive do
{:basic_deliver, payload, meta} ->
IO.puts " [x] Received #{payload}"
payload
|> to_char_list
|> Enum.count(fn x -> x == ?. end)
|> Kernel.*(1000)
|> :timer.sleep
IO.puts " [x] Done."

wait_for_messages(channel)
end
end
end

循环调度

使用任务队列的优势之一是能够轻松地并行化工作。如果我们积累了大量的工作,我们只需添加更多工作进程,这样就可以轻松地扩展。

首先,让我们尝试同时运行两个worker.exs脚本。它们都将从队列中获取消息,但到底是如何的呢?让我们看看。

您需要打开三个控制台。两个将运行worker.exs脚本。这些控制台将成为我们的两个消费者 - C1 和 C2。

mix run worker.exs
# => [*] Waiting for messages. To exit press CTRL+C, CTRL+C
mix run worker.exs
# => [*] Waiting for messages. To exit press CTRL+C, CTRL+C

在第三个控制台中,我们将发布新的任务。启动消费者后,您可以发布一些消息

# shell 3
mix run new_task.exs First message.
mix run new_task.exs Second message..
mix run new_task.exs Third message...
mix run new_task.exs Fourth message....
mix run new_task.exs Fifth message.....

让我们看看传递给我们的工作进程的是什么

# shell 1
mix run worker.exs
# => [*] Waiting for messages. To exit press CTRL+C, CTRL+C
# => [x] Received 'First message.'
# => [x] Received 'Third message...'
# => [x] Received 'Fifth message.....'
# shell 2
mix run worker.exs
# => [*] Waiting for messages. To exit press CTRL+C, CTRL+C
# => [x] Received 'Second message..'
# => [x] Received 'Fourth message....'

默认情况下,RabbitMQ 将按顺序将每条消息发送给下一个消费者。平均而言,每个消费者将接收相同数量的消息。这种分发消息的方式称为循环调度。使用三个或更多工作进程试一试。

消息确认

执行任务可能需要几秒钟,因此您可能想知道如果消费者开始一个长时间的任务,并在未完全完成时终止,会发生什么。在当前代码中,一旦 RabbitMQ 将消息传递给消费者,它就会立即将其标记为删除。在这种情况下,如果您终止工作进程,它正在处理的消息就会丢失。所有已调度到该特定工作进程但尚未处理的消息也会丢失。

但我们不想丢失任何任务。如果工作进程死亡,我们希望任务被传递给另一个工作进程。

为了确保消息永远不会丢失,RabbitMQ 支持消息确认。确认由消费者发回,以告诉 RabbitMQ 特定的消息已接收、已处理,并且 RabbitMQ 可以将其删除。

如果消费者死亡(其通道关闭、连接关闭或 TCP 连接丢失)而没有发送确认,RabbitMQ 将理解消息未完全处理,并将其重新排队。如果此时有其他消费者在线,它将很快将其重新传递给另一个消费者。这样,即使工作进程偶尔死亡,您也可以确保不会丢失任何消息。

对消费者传递确认强制执行超时(默认情况下为 30 分钟)。这有助于检测从未确认传递的错误(卡住)消费者。您可以增加此超时,如传递确认超时中所述。

手动消息确认默认情况下处于启用状态。在之前的示例中,我们通过no_ack: true标志显式地将其关闭。现在是时候删除此标志并从工作进程发送适当的确认,一旦我们完成了任务。

defmodule Worker do
def wait_for_messages(channel) do
receive do
{:basic_deliver, payload, meta} ->
IO.puts " [x] Received #{payload}"
payload
|> to_char_list
|> Enum.count(fn x -> x == ?. end)
|> Kernel.*(1000)
|> :timer.sleep
IO.puts " [x] Done."
AMQP.Basic.ack(channel, meta.delivery_tag)

wait_for_messages(channel)
end
end
end

AMQP.Basic.consume(channel, "hello")

使用此代码,您可以确保即使您使用 CTRL+C 终止一个工作进程节点,而它正在处理一条消息,也不会丢失任何内容。工作进程终止后不久,所有未确认的消息将被重新传递。

必须在接收传递的同一通道上发送确认。尝试使用其他通道确认将导致通道级协议异常。请参阅关于确认的文档指南以了解更多信息。

忘记确认

遗漏AMQP.Basic.ack是一个常见错误。这是一个容易犯的错误,但后果很严重。当您的客户端退出时,消息将被重新传递(这可能看起来像随机重新传递),但 RabbitMQ 将消耗越来越多的内存,因为它将无法释放任何未确认的消息。

为了调试这种类型的错误,您可以使用rabbitmqctl打印messages_unacknowledged字段

sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged

在 Windows 上,删除 sudo

rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged

消息持久性

我们已经了解了如何确保即使消费者死亡,任务也不会丢失。但如果 RabbitMQ 服务器停止,我们的任务仍然会丢失。

当 RabbitMQ 退出或崩溃时,它会忘记队列和消息,除非您告诉它不要。为了确保消息不会丢失,需要做两件事:我们需要将队列和消息都标记为持久。

首先,我们需要确保队列能够在 RabbitMQ 节点重启后存活下来。为此,我们需要将其声明为持久

AMQP.Queue.declare(channel, "hello", durable: true)

虽然此命令本身是正确的,但在我们的设置中它将不起作用。这是因为我们已经定义了一个名为hello的队列,它不是持久的。RabbitMQ 不允许您使用不同的参数重新定义现有队列,并将向尝试执行此操作的任何程序返回错误。但有一个简单的解决方法 - 让我们声明一个具有不同名称的队列,例如task_queue

AMQP.Queue.declare(channel, "task_queue", durable: true)

AMQP.Queue.declare更改需要应用于生产者和消费者代码。

此时,我们确信即使 RabbitMQ 重启,task_queue队列也不会丢失。现在,我们需要将我们的消息标记为持久

  • 方法是提供persistent: true属性。
AMQP.Basic.publish(channel, "", "task_queue", message, persistent: true)

关于消息持久性的说明

将消息标记为持久并不完全保证消息不会丢失。虽然它告诉 RabbitMQ 将消息保存到磁盘,但 RabbitMQ 接受消息但尚未保存消息时,仍然存在短暂的时间窗口。此外,RabbitMQ 不会为每条消息执行fsync(2) -- 它可能只是保存到缓存,而没有真正写入磁盘。持久性保证并不强,但对于我们简单的任务队列来说已经足够了。如果您需要更强的保证,则可以使用发布者确认

公平调度

您可能已经注意到,调度仍然没有按照我们想要的方式工作。例如,在有两个工作进程的情况下,当所有奇数消息都很重而所有偶数消息都很轻时,一个工作进程将始终处于忙碌状态,而另一个工作进程几乎不会做任何工作。好吧,RabbitMQ 对此一无所知,仍然会平均分发消息。

发生这种情况是因为 RabbitMQ 只会在消息进入队列时分发消息。它不会查看消费者未确认的消息数量。它只是盲目地将每第 n 条消息分发给第 n 个消费者。

为了解决这个问题,我们可以使用 `AMQP.Basic.qos` 函数,并设置 `prefetch_count: 1`。这会告诉 RabbitMQ 每次只给一个 worker 发送一条消息。换句话说,在 worker 处理并确认前一条消息之前,不要向它发送新的消息。相反,RabbitMQ 会将消息发送给下一个空闲的 worker。

AMQP.Basic.qos(channel, prefetch_count: 1)

关于队列大小的说明

如果所有 worker 都处于繁忙状态,你的队列可能会被填满。你需要关注这一点,并考虑增加更多 worker 或者使用 消息 TTL

将所有内容整合在一起

我们 `new_task.exs` 脚本的最终代码

{:ok, connection} = AMQP.Connection.open
{:ok, channel} = AMQP.Channel.open(connection)

AMQP.Queue.declare(channel, "task_queue", durable: true)

message =
case System.argv do
[] -> "Hello World!"
words -> Enum.join(words, " ")
end

AMQP.Basic.publish(channel, "", "task_queue", message, persistent: true)
IO.puts " [x] Sent '#{message}'"

AMQP.Connection.close(connection)

(new_task.exs 代码)

以及我们的 worker

defmodule Worker do
def wait_for_messages(channel) do
receive do
{:basic_deliver, payload, meta} ->
IO.puts " [x] Received #{payload}"
payload
|> to_char_list
|> Enum.count(fn x -> x == ?. end)
|> Kernel.*(1000)
|> :timer.sleep
IO.puts " [x] Done."
AMQP.Basic.ack(channel, meta.delivery_tag)

wait_for_messages(channel)
end
end
end

{:ok, connection} = AMQP.Connection.open
{:ok, channel} = AMQP.Channel.open(connection)

AMQP.Queue.declare(channel, "task_queue", durable: true)
AMQP.Basic.qos(channel, prefetch_count: 1)

AMQP.Basic.consume(channel, "task_queue")
IO.puts " [*] Waiting for messages. To exit press CTRL+C, CTRL+C"

Worker.wait_for_messages(channel)

(worker.exs 代码)

使用消息确认和 `prefetch_count`,你可以设置一个工作队列。持久化选项可以让任务即使在 RabbitMQ 重启后依然存在。

现在,我们可以继续学习 教程 3,了解如何将同一消息发送给多个消费者。

© 2024 RabbitMQ. All rights reserved.