跳至主要内容

RabbitMQ 教程 - 工作队列

工作队列

(使用 Go RabbitMQ 客户端)

信息

先决条件

本教程假设 RabbitMQ 已安装并在 localhost 上的标准端口 (5672) 上运行。如果您使用不同的主机、端口或凭据,则连接设置需要调整。

获取帮助

如果您在学习本教程时遇到问题,可以通过 GitHub 讨论RabbitMQ 社区 Discord 联系我们。

第一个教程中,我们编写了程序来发送和接收来自命名队列的消息。在本教程中,我们将创建一个工作队列,用于在多个工作进程之间分配耗时的任务。

工作队列(又称:任务队列)背后的主要思想是避免立即执行资源密集型任务并等待其完成。相反,我们将任务安排在稍后执行。我们将任务封装为消息并将其发送到队列。后台运行的工作进程将弹出任务并最终执行作业。当您运行多个工作进程时,任务将在它们之间共享。

此概念在 Web 应用程序中特别有用,在 Web 应用程序中,在短 HTTP 请求窗口期间无法处理复杂任务。

准备

在本教程的前一部分中,我们发送了一条包含“Hello World!”的消息。现在我们将发送代表复杂任务的字符串。我们没有现实世界中的任务,例如要调整大小的图像或要渲染的 pdf 文件,因此让我们通过假装很忙来模拟它 - 通过使用 time.Sleep 函数。我们将字符串中点的数量作为其复杂性;每个点将占“工作”的一秒钟。例如,由 Hello... 描述的虚假任务将花费三秒钟。

我们将稍微修改我们之前示例中的send.go代码,以允许从命令行发送任意消息。此程序将任务安排到我们的工作队列中,因此让我们将其命名为 new_task.go

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

body := bodyFrom(os.Args)
err = ch.PublishWithContext(ctx,
"", // exchange
q.Name, // routing key
false, // mandatory
false,
amqp.Publishing {
DeliveryMode: amqp.Persistent,
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s", body)

这是 bodyFrom 函数

func bodyFrom(args []string) string {
var s string
if (len(args) < 2) || os.Args[1] == "" {
s = "hello"
} else {
s = strings.Join(args[1:], " ")
}
return s
}

我们旧的receive.go脚本也需要一些更改:它需要为消息正文中的每个点模拟一秒钟的工作。它将从队列中弹出消息并执行任务,因此让我们将其称为 worker.go

msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")

var forever chan struct{}

go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
dotCount := bytes.Count(d.Body, []byte("."))
t := time.Duration(dotCount)
time.Sleep(t * time.Second)
log.Printf("Done")
}
}()

log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever

请注意,我们的虚假任务模拟执行时间。

像教程一中那样运行它们

# shell 1
go run worker.go
# shell 2
go run new_task.go

循环分发

使用任务队列的优势之一是可以轻松地并行化工作。如果我们正在积压工作,我们可以添加更多工作进程,从而轻松扩展。

首先,让我们尝试同时运行两个 worker.go 脚本。它们都将从队列中获取消息,但具体是如何获取的呢?让我们看看。

您需要打开三个控制台。两个将运行 worker.go 脚本。这些控制台将是我们的两个消费者 - C1 和 C2。

# shell 1
go run worker.go
# => [*] Waiting for messages. To exit press CTRL+C
# shell 2
go run worker.go
# => [*] Waiting for messages. To exit press CTRL+C

在第三个控制台中,我们将发布新任务。启动消费者后,您可以发布一些消息

# shell 3
go run new_task.go First message.
go run new_task.go Second message..
go run new_task.go Third message...
go run new_task.go Fourth message....
go run new_task.go Fifth message.....

让我们看看我们的工作进程收到了什么

# shell 1
go run worker.go
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'First message.'
# => [x] Received 'Third message...'
# => [x] Received 'Fifth message.....'
# shell 2
go run worker.go
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'Second message..'
# => [x] Received 'Fourth message....'

默认情况下,RabbitMQ 将依次将每条消息发送到下一个消费者。平均而言,每个消费者将获得相同数量的消息。这种分发消息的方式称为循环分发。尝试使用三个或更多工作进程来测试这一点。

消息确认

执行任务可能需要几秒钟,您可能想知道如果消费者启动一个长时间的任务并在完成之前终止会发生什么。使用我们当前的代码,一旦 RabbitMQ 将消息传递给消费者,它就会立即将其标记为删除。在这种情况下,如果您终止一个工作进程,则它正在处理的消息将丢失。分配给此特定工作进程但尚未处理的消息也会丢失。

但我们不想丢失任何任务。如果一个工作进程死亡,我们希望该任务被传递给另一个工作进程。

为了确保消息永远不会丢失,RabbitMQ 支持消息确认。消费者会发送一个确认消息来告诉 RabbitMQ 特定消息已接收并处理,并且 RabbitMQ 可以将其删除。

如果消费者在未发送确认消息的情况下死亡(其通道关闭、连接关闭或 TCP 连接丢失),RabbitMQ 将理解消息未完全处理,并将将其重新排队。如果有其他消费者同时在线,它将很快将其重新传递给另一个消费者。这样,即使工作进程偶尔死亡,您也可以确保不会丢失任何消息。

消费者传递确认消息时会强制执行超时(默认 30 分钟)。这有助于检测从不确认传递的错误(卡住)消费者。您可以按照传递确认消息超时中的说明增加此超时时间。

在本教程中,我们将使用手动消息确认,方法是为“自动确认”参数传递 false,然后在我们完成任务后从工作进程发送适当的确认消息,使用 d.Ack(false)(这确认单个传递)。

msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")

var forever chan struct{}

go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
dotCount := bytes.Count(d.Body, []byte("."))
t := time.Duration(dotCount)
time.Sleep(t * time.Second)
log.Printf("Done")
d.Ack(false)
}
}()

log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever

使用此代码,您可以确保即使您在工作进程正在处理消息时使用 CTRL+C 终止它,也不会丢失任何内容。工作进程终止后不久,所有未确认的消息都会重新传递。

确认必须发送到接收传递的同一通道。尝试使用其他通道进行确认会导致通道级别的协议异常。请参阅有关确认的文档指南以了解更多信息。

忘记确认

忘记 ack 是一个常见的错误。这是一个很容易犯的错误,但后果很严重。当您的客户端退出时,消息将被重新传递(这可能看起来像是随机重新传递),但 RabbitMQ 将消耗越来越多的内存,因为它将无法释放任何未确认的消息。

为了调试此类错误,您可以使用 rabbitmqctl 打印 messages_unacknowledged 字段

sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged

在 Windows 上,删除 sudo

rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged

消息持久性

我们已经了解了如何确保即使消费者死亡,任务也不会丢失。但是,如果 RabbitMQ 服务器停止,我们的任务仍然会丢失。

当 RabbitMQ 退出或崩溃时,它会忘记队列和消息,除非您告诉它不要忘记。要确保消息不会丢失,需要两件事:我们需要将队列和消息都标记为持久。

首先,我们需要确保队列能够在 RabbitMQ 节点重启后幸存下来。为此,我们需要将其声明为持久

q, err := ch.QueueDeclare(
"hello", // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")

虽然此命令本身是正确的,但它在我们的当前设置中不起作用。这是因为我们已经定义了一个名为 hello 的队列,该队列不是持久的。RabbitMQ 不允许您使用不同的参数重新定义现有队列,并且会向尝试执行此操作的任何程序返回错误。但是有一个快速的解决方法 - 让我们声明一个名称不同的队列,例如 task_queue

q, err := ch.QueueDeclare(
"task_queue", // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")

durable 选项更改需要应用于生产者和消费者代码。

此时,我们确信即使 RabbitMQ 重启,task_queue 队列也不会丢失。现在我们需要将我们的消息标记为持久

  • 通过使用 amqp.Publishing 接受的 amqp.Persistent 选项。
err = ch.PublishWithContext(ctx,
"", // exchange
q.Name, // routing key
false, // mandatory
false,
amqp.Publishing {
DeliveryMode: amqp.Persistent,
ContentType: "text/plain",
Body: []byte(body),
})

关于消息持久性的说明

将消息标记为持久并不完全保证消息不会丢失。虽然它告诉 RabbitMQ 将消息保存到磁盘,但仍然存在一个短暂的时间窗口,RabbitMQ 在接受消息但尚未保存它。此外,RabbitMQ 不会为每条消息执行 fsync(2) - 它可能只是保存到缓存而不是真正写入磁盘。持久性保证并不强大,但对于我们的简单任务队列来说已经足够了。如果您需要更强的保证,则可以使用发布者确认

公平分发

您可能已经注意到,分发仍然没有完全按照我们想要的方式工作。例如,在有两个工作进程的情况下,当所有奇数消息都很重而偶数消息很轻时,一个工作进程将持续忙碌,而另一个工作进程几乎不做任何工作。好吧,RabbitMQ 对此一无所知,并且仍然会均匀地分发消息。

发生这种情况是因为 RabbitMQ 仅在消息进入队列时才分发消息。它不会查看消费者未确认的消息数量。它只是盲目地将每第 n 个消息分发给第 n 个消费者。

为了解决这个问题,我们可以将预取计数设置为 1。这告诉 RabbitMQ 一次不要向工作进程提供超过一条消息。或者,换句话说,在工作进程处理并确认前一条消息之前,不要向其分发新消息。相反,它会将其分发给下一个未忙的工作进程。

err = ch.Qos(
1, // prefetch count
0, // prefetch size
false, // global
)
failOnError(err, "Failed to set QoS")

关于队列大小的说明

如果所有工作进程都处于繁忙状态,您的队列可能会填满。您需要关注这一点,并可能添加更多工作进程,或采取其他策略。

综合起来

我们 new_task.go 类的最终代码

package main

import (
"context"
"log"
"os"
"strings"
"time"

amqp "github.com/rabbitmq/amqp091-go"
)

func failOnError(err error, msg string) {
if err != nil {
log.Panicf("%s: %s", msg, err)
}
}

func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()

ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()

q, err := ch.QueueDeclare(
"task_queue", // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

body := bodyFrom(os.Args)
err = ch.PublishWithContext(ctx,
"", // exchange
q.Name, // routing key
false, // mandatory
false,
amqp.Publishing{
DeliveryMode: amqp.Persistent,
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s", body)
}

func bodyFrom(args []string) string {
var s string
if (len(args) < 2) || os.Args[1] == "" {
s = "hello"
} else {
s = strings.Join(args[1:], " ")
}
return s
}

(new_task.go 源码)

以及我们的 worker.go

package main

import (
"bytes"
"log"
"time"

amqp "github.com/rabbitmq/amqp091-go"
)

func failOnError(err error, msg string) {
if err != nil {
log.Panicf("%s: %s", msg, err)
}
}

func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()

ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()

q, err := ch.QueueDeclare(
"task_queue", // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")

err = ch.Qos(
1, // prefetch count
0, // prefetch size
false, // global
)
failOnError(err, "Failed to set QoS")

msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")

var forever chan struct{}

go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
dotCount := bytes.Count(d.Body, []byte("."))
t := time.Duration(dotCount)
time.Sleep(t * time.Second)
log.Printf("Done")
d.Ack(false)
}
}()

log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever
}

(worker.go 源码)

使用消息确认和预取计数,您可以设置工作队列。持久性选项允许任务即使在 RabbitMQ 重新启动后也能生存。

有关 amqp.Channel 方法和消息属性的更多信息,您可以浏览 amqp API 参考

现在我们可以继续学习 教程 3,并了解如何将相同的消息传递给多个消费者。

© 2024 RabbitMQ. All rights reserved.