RabbitMQ 教程 - 工作队列
工作队列
(使用 AMQP 1.0 Go 客户端)
先决条件
本教程假设 RabbitMQ 已 安装 并在 localhost 上的 标准端口 (5672) 上运行。如果您使用不同的主机、端口或凭据,则需要调整连接设置。
哪里寻求帮助
如果您在学习本教程时遇到困难,可以通过 GitHub Discussions 或 RabbitMQ 社区 Discord 联系我们。
在第一个教程中,我们编写了从命名队列发送和接收消息的程序。在这一篇中,我们将创建一个工作队列 (Work Queue),用于在多个工作者(worker)之间分配耗时的任务。
工作队列(又称:任务队列)背后的主要思想是避免立即执行一个资源密集型任务并等待其完成。相反,我们安排任务稍后执行。我们将一个任务封装成一条消息并将其发送到队列。一个在后台运行的工作进程会接收任务并最终执行工作。当您运行多个工作进程时,任务将在它们之间共享。
这个概念对于 Web 应用程序尤其有用,在这些应用程序中,不可能在短暂的 HTTP 请求窗口内处理一个复杂的任务。
本教程使用 RabbitMQ AMQP 1.0 Go 客户端 (rabbitmq-amqp-go-client)。它要求 RabbitMQ 版本为 4.0 或更高。
准备工作
在上一篇教程中,我们发送了一条包含 "Hello World!" 的消息。现在,我们将发送代表复杂任务的字符串。因为我们没有像调整图片大小或渲染 PDF 文件这样实际的任务,所以我们通过 time.Sleep 函数来模拟“忙碌”状态。我们将字符串中点的数量作为任务的复杂度;每一个点代表一秒的“工作时间”。例如,由 Hello... 描述的虚假任务将耗时三秒。
我们将稍微修改上一个示例中的 send.go 代码,以允许从命令行发送任意消息。该程序将把任务调度到我们的工作队列中,所以我们将其命名为 new_task.go
body := bodyFrom(os.Args)
res, err := publisher.Publish(ctx, rmq.NewMessage([]byte(body)))
if err != nil {
log.Panicf("Failed to publish a message: %v", err)
}
switch res.Outcome.(type) {
case *rmq.StateAccepted:
default:
log.Panicf("Unexpected publish outcome: %v", res.Outcome)
}
log.Printf(" [x] Sent %s", body)
func bodyFrom(args []string) string {
var s string
if (len(args) < 2) || args[1] == "" {
s = "hello"
} else {
s = strings.Join(args[1:], " ")
}
return s
}
我们旧的消费者程序也需要做一些修改:它需要为消息体中的每一个点模拟一秒的工作时间。它将处理已投递的消息并执行任务,所以我们将其命名为 worker.go
consumer, err := conn.NewConsumer(ctx, "task_queue", &rmq.ConsumerOptions{InitialCredits: 1})
if err != nil {
log.Panicf("Failed to create consumer: %v", err)
}
defer func() { _ = consumer.Close(context.Background()) }()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
for {
delivery, err := consumer.Receive(ctx)
if err != nil {
if errors.Is(err, context.Canceled) {
return
}
log.Panicf("Failed to receive a message: %v", err)
}
msg := delivery.Message()
var payload []byte
if len(msg.Data) > 0 {
payload = msg.Data[0]
}
log.Printf("Received a message: %s", payload)
dotCount := bytes.Count(payload, []byte("."))
t := time.Duration(dotCount)
time.Sleep(t * time.Second)
log.Printf("Done")
err = delivery.Accept(ctx)
if err != nil {
log.Panicf("Failed to accept message: %v", err)
}
}
请注意 ConsumerOptions{InitialCredits: 1} 的使用,它用于限制传输中的消息数量(这是 AMQP 1.0 中对应 prefetch 的概念)。
使用 go run 运行每个程序
go run new_task.go hello world
go run worker.go
轮询分发
使用任务队列的一个优点是能够轻松地并行化工作。如果我们正在积累工作积压,我们可以添加更多的工作进程,从而轻松扩展。
首先,让我们尝试同时运行两个 worker 实例。它们都会从队列中获取消息,但具体是如何分配的呢?让我们来看看。
你需要打开三个终端。其中两个将运行 worker 程序。这两个终端将作为我们的两个消费者——C1 和 C2。
# shell 1
go run worker.go
# => [*] Waiting for messages. To exit press CTRL+C
# shell 2
go run worker.go
# => [*] Waiting for messages. To exit press CTRL+C
在第三个中,我们将发布新任务。一旦您启动了消费者,您就可以发布几条消息
# shell 3
go run new_task.go First message.
# => [x] Sent 'First message.'
go run new_task.go Second message..
# => [x] Sent 'Second message..'
go run new_task.go Third message...
# => [x] Sent 'Third message...'
让我们看看分发给我们的工作进程的内容
go run worker.go
# => [*] Waiting for messages. To exit press CTRL+C
# => Received a message: First message.
# => Done
# => Received a message: Third message...
# => Done
go run worker.go
# => [*] Waiting for messages. To exit press CTRL+C
# => Received a message: Second message..
# => Done
默认情况下,RabbitMQ 会按顺序将每条消息发送给下一个消费者。平均而言,每个消费者将获得相同数量的消息。这种分发消息的方式称为轮询。尝试使用三个或更多工作进程进行此操作。
消息确认
执行任务可能需要几秒钟,你可能想知道如果一个消费者开始了一个长时间的任务,但在完成之前就终止了会发生什么。在 AMQP 1.0 中,消费者必须结算 (settle) 每一条消息(执行 Accept、Discard 或 Requeue)。在结算之前,如果 worker 停止,代理 (broker) 可以重新投递该消息。
为了确保消息在 worker 接收后但在处理完成前死掉时不会丢失,请在任务完成后再进行结算。在此,我们在 time.Sleep 完成后调用 delivery.Accept(ctx)。
如果消费者在没有结算的情况下死掉,RabbitMQ 会将消息重新投递给另一个消费者。这样即使 worker 偶尔死掉,你也可以确信没有消息丢失。
完整的 worker.go 文件在工作完成后使用 delivery.Accept。
忘记结算
遗漏
delivery.Accept()(或在工作完成前调用它)是一个常见的错误。当你的客户端退出时,消息会被重新投递(这看起来像是随机的重新投递),且未确认的消息会在代理上堆积。你可以使用
rabbitmqctl来检查队列。sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged在 Windows 上,删除 sudo
rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged
公平分派
您可能已经注意到调度仍然不能完全按我们想要的方式工作。例如,在有两个工作进程的情况下,当所有奇数消息都很重而偶数消息都很轻时,一个工作进程将持续忙碌,而另一个工作进程几乎不做任何工作。好吧,RabbitMQ 对此一无所知,它仍然会均匀地分发消息。
这种情况发生是因为代理可能会在之前的消息结算之前投递多条消息。
使用 AMQP 1.0 Go 客户端时,通过在消费者构建器上将 InitialCredits 设置为 1,可以限制每个消费者的传输中 (in flight) 消息数量。这等同于 AMQP 0-9-1 中的 prefetch 1。
consumer, err := conn.NewConsumer(ctx, "task_queue", &rmq.ConsumerOptions{InitialCredits: 1})
这确保了 RabbitMQ 不会预取消息;相反,它只会在当前消息结算后才会投递下一条消息。
关于队列大小的说明
如果所有工作进程都忙碌,您的队列可能会填满。您需要密切关注这一点,并可能添加更多工作进程,或采取其他策略。
总而言之
new_task.go 的最终大纲
package main
import (
"context"
"log"
"os"
"strings"
rmq "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmqamqp"
)
const brokerURI = "amqp://guest:guest@localhost:5672/"
func main() {
ctx := context.Background()
env := rmq.NewEnvironment(brokerURI, nil)
conn, err := env.NewConnection(ctx)
if err != nil {
log.Panicf("Failed to connect to RabbitMQ: %v", err)
}
defer func() {
_ = env.CloseConnections(context.Background())
}()
_, err = conn.Management().DeclareQueue(ctx, &rmq.QuorumQueueSpecification{Name: "task_queue"})
if err != nil {
log.Panicf("Failed to declare a queue: %v", err)
}
publisher, err := conn.NewPublisher(ctx, &rmq.QueueAddress{Queue: "task_queue"}, nil)
if err != nil {
log.Panicf("Failed to create publisher: %v", err)
}
defer func() { _ = publisher.Close(context.Background()) }()
body := bodyFrom(os.Args)
res, err := publisher.Publish(ctx, rmq.NewMessage([]byte(body)))
if err != nil {
log.Panicf("Failed to publish a message: %v", err)
}
switch res.Outcome.(type) {
case *rmq.StateAccepted:
default:
log.Panicf("Unexpected publish outcome: %v", res.Outcome)
}
log.Printf(" [x] Sent %s", body)
}
func bodyFrom(args []string) string {
var s string
if (len(args) < 2) || args[1] == "" {
s = "hello"
} else {
s = strings.Join(args[1:], " ")
}
return s
}
以及 worker.go
package main
import (
"bytes"
"context"
"errors"
"log"
"time"
rmq "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmqamqp"
)
const brokerURI = "amqp://guest:guest@localhost:5672/"
func main() {
ctx := context.Background()
env := rmq.NewEnvironment(brokerURI, nil)
conn, err := env.NewConnection(ctx)
if err != nil {
log.Panicf("Failed to connect to RabbitMQ: %v", err)
}
defer func() {
_ = env.CloseConnections(context.Background())
}()
_, err = conn.Management().DeclareQueue(ctx, &rmq.QuorumQueueSpecification{Name: "task_queue"})
if err != nil {
log.Panicf("Failed to declare a queue: %v", err)
}
consumer, err := conn.NewConsumer(ctx, "task_queue", &rmq.ConsumerOptions{InitialCredits: 1})
if err != nil {
log.Panicf("Failed to create consumer: %v", err)
}
defer func() { _ = consumer.Close(context.Background()) }()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
for {
delivery, err := consumer.Receive(ctx)
if err != nil {
if errors.Is(err, context.Canceled) {
return
}
log.Panicf("Failed to receive a message: %v", err)
}
msg := delivery.Message()
var payload []byte
if len(msg.Data) > 0 {
payload = msg.Data[0]
}
log.Printf("Received a message: %s", payload)
dotCount := bytes.Count(payload, []byte("."))
t := time.Duration(dotCount)
time.Sleep(t * time.Second)
log.Printf("Done")
err = delivery.Accept(ctx)
if err != nil {
log.Panicf("Failed to accept message: %v", err)
}
}
}
通过使用显式结算和 InitialCredits: 1,你可以建立一个工作队列。仲裁队列 (Quorum queues) 可确保任务在代理重启后依然存在。
现在我们可以继续学习教程 3,了解如何将同一条消息分发给多个消费者。