跳到主要内容

RabbitMQ 教程 - 工作队列

工作队列

(使用 Objective-C 客户端

信息

前提条件

本教程假设您已安装 RabbitMQ 并运行在 localhost 的标准端口 (5672) 上。如果您使用不同的主机、端口或凭据,则需要调整连接设置。

在哪里获得帮助

如果您在学习本教程时遇到问题,可以通过 GitHub DiscussionsRabbitMQ 社区 Discord 联系我们。

第一个教程中,我们编写了从命名队列发送和接收消息的方法。在本教程中,我们将创建一个工作队列,用于在多个工作进程之间分配耗时的任务。

工作队列(也称为:任务队列)背后的主要思想是避免立即执行资源密集型任务,并避免等待其完成。相反,我们计划稍后完成任务。我们将任务封装为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当您运行多个工作进程时,任务将在它们之间共享。

这个概念在 Web 应用程序中特别有用,在 Web 应用程序中,在短暂的 HTTP 请求窗口期间处理复杂任务是不可能的。

准备

在本教程的前一部分中,我们发送了一条包含 "Hello World!" 的消息。现在我们将发送代表复杂任务的字符串。我们没有真实的world任务,例如要调整大小的图像或要渲染的 pdf 文件,所以让我们通过假装我们很忙来伪造它 - 通过使用 sleep。我们将字符串中点的数量作为其复杂性;每个点将代表一秒钟的“工作”。例如,用 Hello... 描述的虚假任务将花费三秒钟。

我们将稍微修改我们之前示例中的 send 方法,以允许将任意字符串作为方法参数发送。此方法会将任务调度到我们的工作队列,因此让我们将其重命名为 newTask。除了新参数外,实现保持不变

- (void)newTask:(NSString *)msg {
NSLog(@"Attempting to connect to local RabbitMQ broker");
RMQConnection *conn = [[RMQConnection alloc] initWithDelegate:[RMQConnectionDelegateLogger new]];
[conn start];

id<RMQChannel> ch = [conn createChannel];

RMQQueue *q = [ch queue:@"hello"];

NSData *msgData = [msg dataUsingEncoding:NSUTF8StringEncoding];
[ch.defaultExchange publish:msgData routingKey:q.name];
NSLog(@"Sent %@", msg);

[conn close];
}

我们的旧 receive 方法需要进行一些更大的更改:它需要为消息正文中的每个点伪造一秒钟的工作。如果每个 worker 都有一个名称,并且每个 worker 都需要从队列中弹出消息并执行任务,这将有助于我们理解正在发生的事情,所以让我们将其称为 workerNamed:

[q subscribe:^(RMQMessage * _Nonnull message) {
NSString *messageText = [[NSString alloc] initWithData:message.body encoding:NSUTF8StringEncoding];
NSLog(@"%@: Received %@", name, messageText);
// imitate some work
unsigned int sleepTime = (unsigned int)[messageText componentsSeparatedByString:@"."].count - 1;
NSLog(@"%@: Sleeping for %u seconds", name, sleepTime);
sleep(sleepTime);
}];

请注意,我们的虚假任务模拟了执行时间。

像在第一个教程中一样从 viewDidLoad 运行它们

- (void)viewDidLoad {
[super viewDidLoad];
[self newTask:@"Hello World..."];
[self workerNamed:@"Flopsy"];
}

日志输出应指示 Flopsy 正在休眠三秒钟。

轮询分发

使用任务队列的优势之一是能够轻松地并行化工作。如果我们正在积累工作积压,我们可以简单地添加更多的工作进程,从而轻松扩展。

让我们尝试同时运行两个 workerNamed: 方法。它们都将从队列中获取消息,但具体如何?让我们看看。

更改 viewDidLoad 以发送更多消息并启动两个工作进程

- (void)viewDidLoad {
[super viewDidLoad];
[self workerNamed:@"Jack"];
[self workerNamed:@"Jill"];
[self newTask:@"Hello World..."];
[self newTask:@"Just one this time."];
[self newTask:@"Five....."];
[self newTask:@"None"];
[self newTask:@"Two..dots"];
}

让我们看看传递给我们的工作进程的内容

# => Jack: Waiting for messages
# => Jill: Waiting for messages
# => Sent Hello World...
# => Jack: Received Hello World...
# => Jack: Sleeping for 3 seconds
# => Sent Just one this time.
# => Jill: Received Just one this time.
# => Jill: Sleeping for 1 seconds
# => Sent Five.....
# => Sent None
# => Sent Two..dots
# => Jill: Received Five.....
# => Jill: Sleeping for 5 seconds
# => Jack: Received None
# => Jack: Sleeping for 0 seconds
# => Jack: Received Two..dots
# => Jack: Sleeping for 2 seconds

默认情况下,RabbitMQ 将按顺序将每条消息发送给下一个消费者。平均而言,每个消费者将获得相同数量的消息。这种分发消息的方式称为轮询。用三个或更多工作进程尝试一下。

消息确认

执行任务可能需要几秒钟,您可能想知道如果消费者启动一个长时间的任务并在完成之前终止会发生什么。使用我们当前的代码,一旦 RabbitMQ 将消息传递给消费者,它会立即将其标记为删除。在这种情况下,如果您终止一个工作进程,它正在处理的消息将丢失。已分派给此特定工作进程但尚未处理的消息也会丢失。

但是我们不想丢失任何任务。如果一个工作进程死掉,我们希望将任务传递给另一个工作进程。

为了确保消息永远不会丢失,RabbitMQ 支持消息确认。ack(确认)由消费者发回,以告知 RabbitMQ 已接收和处理了特定消息,并且 RabbitMQ 可以自由删除它。

如果消费者在没有发送 ack 的情况下死掉(其通道关闭、连接关闭或 TCP 连接丢失),RabbitMQ 将理解消息未被完全处理并将重新排队。如果同时有其他消费者在线,它将很快将其重新传递给另一个消费者。这样,即使工作进程偶尔死掉,您也可以确保不会丢失任何消息。

消费者交付确认会强制执行超时(默认为 30 分钟)。这有助于检测从不确认交付的有缺陷(卡住)的消费者。您可以按照交付确认超时中的描述增加此超时。

消息确认在客户端中默认关闭,但在 AMQ 协议中未关闭(AMQBasicConsumeNoAck 选项由 subscribe: 自动发送)。现在是时候通过显式设置 AMQBasicConsumeNoOptions 并在我们完成任务后从工作进程发送正确的确认来打开确认了。

RMQBasicConsumeOptions manualAck = RMQBasicConsumeNoOptions;
[q subscribe:manualAck handler:^(RMQMessage * _Nonnull message) {
NSString *messageText = [[NSString alloc] initWithData:message.body encoding:NSUTF8StringEncoding];
NSLog(@"%@: Received %@", name, messageText);
// imitate some work
unsigned int sleepTime = (unsigned int)[messageText componentsSeparatedByString:@"."].count - 1;
NSLog(@"%@: Sleeping for %u seconds", name, sleepTime);
sleep(sleepTime);

[ch ack:message.deliveryTag];
}];

使用此代码,我们可以确保即使工作进程在处理消息时死掉,也不会丢失任何内容。工作进程死掉后不久,所有未确认的消息都将被重新传递。

确认必须在接收交付的同一通道上发送。尝试使用不同的通道进行确认将导致通道级协议异常。请参阅关于确认的文档指南以了解更多信息。

忘记确认

错过 ack 是一个常见的错误。这是一个容易犯的错误,但后果很严重。当您的客户端退出时,消息将被重新传递(这可能看起来像随机重新传递),但 RabbitMQ 将消耗越来越多的内存,因为它无法释放任何未确认的消息。

为了调试这种错误,您可以使用 rabbitmqctl 打印 messages_unacknowledged 字段

sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged

在 Windows 上,删除 sudo

rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged

消息持久性

我们已经学习了如何确保即使消费者死掉,任务也不会丢失。但是,如果 RabbitMQ 服务器停止,我们的任务仍然会丢失。

当 RabbitMQ 退出或崩溃时,它会忘记队列和消息,除非您告诉它不要这样做。需要做两件事来确保消息不会丢失:我们需要将队列和消息都标记为持久。

首先,我们需要确保队列在 RabbitMQ 节点重启后仍然存在。为了做到这一点,我们需要将其声明为持久队列

RMQQueue *q = [ch queue:@"hello" options:AMQQueueDeclareDurable];

尽管此命令本身是正确的,但它在我们当前的设置中不起作用。那是因为我们已经定义了一个名为 hello 的队列,它不是持久的。RabbitMQ 不允许您使用不同的参数重新定义现有队列,并且会向任何尝试这样做的程序返回错误。但是有一个快速的解决方法 - 让我们声明一个名称不同的队列,例如 task_queue

RMQQueue *q = [ch queue:@"task_queue" options:AMQQueueDeclareDurable];

options:AMQQueueDeclareDurable 更改需要应用于生产者和消费者代码。

此时,我们确信即使 RabbitMQ 重新启动,task_queue 队列也不会丢失。现在我们需要将我们的消息标记为持久性

  • 通过使用 persistent 选项。
[ch.defaultExchange publish:msgData routingKey:q.name persistent:YES];

关于消息持久性的说明

将消息标记为持久性并不能完全保证消息不会丢失。尽管它告诉 RabbitMQ 将消息保存到磁盘,但在 RabbitMQ 接受消息但尚未保存消息时,仍然存在一个短暂的时间窗口。此外,RabbitMQ 不会对每条消息都执行 fsync(2) -- 它可能只是保存到缓存中,而不是真正写入磁盘。持久性保证不是很强,但对于我们简单的任务队列来说已经足够了。如果您需要更强的保证,则可以使用发布者确认

公平分发

您可能已经注意到,分发仍然无法完全按照我们的意愿工作。例如,在有两个工作进程的情况下,当所有奇数消息都很重而偶数消息都很轻时,一个工作进程将一直很忙,而另一个工作进程几乎不做任何工作。好吧,RabbitMQ 对此一无所知,仍然会均匀地分发消息。

发生这种情况是因为 RabbitMQ 只是在消息进入队列时分发消息。它不查看消费者的未确认消息数量。它只是盲目地将每第 n 条消息分发给第 n 个消费者。

为了克服这个问题,我们可以使用 prefetch 值为 @1basicQos:global: 方法。这告诉 RabbitMQ 一次不要给一个工作进程超过一条消息。或者,换句话说,在工作进程处理并确认上一条消息之前,不要向其分发新消息。相反,它会将其分发给下一个不忙的工作进程。

[ch basicQos:@1 global:NO];

关于队列大小的说明

如果所有工作进程都很忙,您的队列可能会被填满。您需要密切关注这一点,并可能添加更多工作进程,或采取其他策略。

总结

我们的 newTask: 方法的最终代码

- (void)newTask:(NSString *)msg {
RMQConnection *conn = [[RMQConnection alloc] initWithDelegate:[RMQConnectionDelegateLogger new]];
[conn start];

id<RMQChannel> ch = [conn createChannel];

RMQQueue *q = [ch queue:@"task_queue" options:RMQQueueDeclareDurable];

NSData *msgData = [msg dataUsingEncoding:NSUTF8StringEncoding];
[ch.defaultExchange publish:msgData routingKey:q.name persistent:YES];
NSLog(@"Sent %@", msg);

[conn close];
}

以及我们的 workerNamed:

- (void)workerNamed:(NSString *)name {
RMQConnection *conn = [[RMQConnection alloc] initWithDelegate:[RMQConnectionDelegateLogger new]];
[conn start];

id<RMQChannel> ch = [conn createChannel];

RMQQueue *q = [ch queue:@"task_queue" options:RMQQueueDeclareDurable];

[ch basicQos:@1 global:NO];
NSLog(@"%@: Waiting for messages", name);

RMQBasicConsumeOptions manualAck = RMQBasicConsumeNoOptions;
[q subscribe:manualAck handler:^(RMQMessage * _Nonnull message) {
NSString *messageText = [[NSString alloc] initWithData:message.body encoding:NSUTF8StringEncoding];
NSLog(@"%@: Received %@", name, messageText);
// imitate some work
unsigned int sleepTime = (unsigned int)[messageText componentsSeparatedByString:@"."].count - 1;
NSLog(@"%@: Sleeping for %u seconds", name, sleepTime);
sleep(sleepTime);

[ch ack:message.deliveryTag];
}];
}

(来源)

使用消息确认和预取,您可以设置工作队列。持久性选项使任务即使在 RabbitMQ 重新启动后也能继续存在。

现在我们可以继续学习教程 3,学习如何将同一消息传递给多个消费者。

© . All rights reserved.