RabbitMQ 教程 - 工作队列
工作队列
(使用 Objective-C 客户端)
在第一个教程中,我们编写了用于向命名队列发送和接收消息的方法。在本教程中,我们将创建一个工作队列,用于将耗时的任务分布到多个工作进程之间。
工作队列(又称:任务队列)背后的主要思想是避免立即执行资源密集型任务,并等待其完成。相反,我们将任务安排在稍后完成。我们将任务封装为消息并将其发送到队列。在后台运行的 worker 进程将弹出任务并最终执行作业。当你运行多个 worker 时,任务将在它们之间共享。
这个概念在 Web 应用程序中特别有用,因为在短暂的 HTTP 请求窗口中无法处理复杂的任务。
准备
在本教程的上一部分中,我们发送了一条包含“Hello World!”的消息。现在,我们将发送代表复杂任务的字符串。我们没有现实世界的任务,比如要调整大小的图像或要渲染的 pdf 文件,因此让我们通过假装忙碌来模拟它——通过使用 sleep
。我们将把字符串中点的数量视为其复杂度;每个点将代表一秒的“工作”。例如,由 Hello...
描述的假任务将需要三秒钟。
我们将稍微修改来自我们之前示例的 send
方法,以允许将任意字符串作为方法参数发送。此方法将把任务安排到我们的工作队列中,因此让我们将其重命名为 newTask
。除了新参数外,实现保持不变
- (void)newTask:(NSString *)msg {
NSLog(@"Attempting to connect to local RabbitMQ broker");
RMQConnection *conn = [[RMQConnection alloc] initWithDelegate:[RMQConnectionDelegateLogger new]];
[conn start];
id<RMQChannel> ch = [conn createChannel];
RMQQueue *q = [ch queue:@"hello"];
NSData *msgData = [msg dataUsingEncoding:NSUTF8StringEncoding];
[ch.defaultExchange publish:msgData routingKey:q.name];
NSLog(@"Sent %@", msg);
[conn close];
}
我们旧的 receive 方法需要一些更大的更改:它需要为消息主体中的每个点模拟一秒钟的工作。如果每个 worker 都有一个名称,这将有助于我们理解发生了什么,每个 worker 都需要从队列中弹出消息并执行任务,因此让我们将其称为 workerNamed:
[q subscribe:^(RMQMessage * _Nonnull message) {
NSString *messageText = [[NSString alloc] initWithData:message.body encoding:NSUTF8StringEncoding];
NSLog(@"%@: Received %@", name, messageText);
// imitate some work
unsigned int sleepTime = (unsigned int)[messageText componentsSeparatedByString:@"."].count - 1;
NSLog(@"%@: Sleeping for %u seconds", name, sleepTime);
sleep(sleepTime);
}];
请注意,我们的假任务模拟了执行时间。
从 viewDidLoad
中运行它们,就像在教程一中一样
- (void)viewDidLoad {
[super viewDidLoad];
[self newTask:@"Hello World..."];
[self workerNamed:@"Flopsy"];
}
日志输出应表明 Flopsy 正在休眠三秒钟。
循环调度
使用任务队列的优点之一是能够轻松地并行化工作。如果我们正在积压工作,我们可以简单地添加更多 worker,从而轻松地进行扩展。
让我们尝试同时运行两个 workerNamed:
方法。它们都将从队列中获取消息,但确切地说呢?让我们看看。
更改 viewDidLoad 以发送更多消息并启动两个 worker
- (void)viewDidLoad {
[super viewDidLoad];
[self workerNamed:@"Jack"];
[self workerNamed:@"Jill"];
[self newTask:@"Hello World..."];
[self newTask:@"Just one this time."];
[self newTask:@"Five....."];
[self newTask:@"None"];
[self newTask:@"Two..dots"];
}
让我们看看传递到我们 worker 的内容
# => Jack: Waiting for messages
# => Jill: Waiting for messages
# => Sent Hello World...
# => Jack: Received Hello World...
# => Jack: Sleeping for 3 seconds
# => Sent Just one this time.
# => Jill: Received Just one this time.
# => Jill: Sleeping for 1 seconds
# => Sent Five.....
# => Sent None
# => Sent Two..dots
# => Jill: Received Five.....
# => Jill: Sleeping for 5 seconds
# => Jack: Received None
# => Jack: Sleeping for 0 seconds
# => Jack: Received Two..dots
# => Jack: Sleeping for 2 seconds
默认情况下,RabbitMQ 会将每条消息依次发送到下一个消费者。平均而言,每个消费者将获得相同数量的消息。这种消息分配方式称为循环调度。尝试使用三个或更多 worker 来测试一下。
消息确认
执行任务可能需要几秒钟,你可能想知道如果消费者开始执行一项长时间任务并在完成之前终止,会发生什么。在我们当前的代码中,一旦 RabbitMQ 将消息传递给消费者,它就会立即将其标记为删除。在这种情况下,如果你终止了一个 worker,它正在处理的消息就会丢失。分配给该特定 worker 但尚未处理的消息也会丢失。
但我们不想丢失任何任务。如果 worker 死亡,我们希望任务被传递给另一个 worker。
为了确保消息永远不会丢失,RabbitMQ 支持消息确认。消费者会发送一个 ack(确认)来告诉 RabbitMQ,一条特定消息已收到并已处理,RabbitMQ 可以将其删除。
如果消费者死亡(其通道关闭、连接关闭或 TCP 连接丢失)而没有发送 ack,RabbitMQ 会理解消息尚未完全处理,并将重新排队。如果同时在线的其他消费者,它会立即将其重新传递给另一个消费者。这样,你可以确保即使 worker 偶尔死亡,也不会丢失任何消息。
在消费者传递确认上强制执行超时(默认情况下为 30 分钟)。这有助于检测从未确认传递的错误(卡住)消费者。你可以按照传递确认超时中的说明来增加此超时。
消息确认在客户端默认情况下是关闭的,但在 AMQ 协议中并非如此(AMQBasicConsumeNoAck
选项由 subscribe:
自动发送)。现在该通过显式设置 AMQBasicConsumeNoOptions
并完成任务后从 worker 发送适当的确认来打开确认了。
RMQBasicConsumeOptions manualAck = RMQBasicConsumeNoOptions;
[q subscribe:manualAck handler:^(RMQMessage * _Nonnull message) {
NSString *messageText = [[NSString alloc] initWithData:message.body encoding:NSUTF8StringEncoding];
NSLog(@"%@: Received %@", name, messageText);
// imitate some work
unsigned int sleepTime = (unsigned int)[messageText componentsSeparatedByString:@"."].count - 1;
NSLog(@"%@: Sleeping for %u seconds", name, sleepTime);
sleep(sleepTime);
[ch ack:message.deliveryTag];
}];
使用此代码,我们可以确保即使 worker 在处理消息时死亡,也不会丢失任何东西。在 worker 死亡后不久,所有未确认的消息都将被重新传递。
确认必须在接收传递的同一通道上发送。尝试使用不同通道进行确认将导致通道级协议异常。有关更多信息,请参阅关于确认的文档指南。
遗漏确认
遗漏
ack
是一个常见的错误。这是一个容易发生的错误,但后果很严重。当你的客户端退出时,消息将被重新传递(这可能看起来像随机重新传递),但 RabbitMQ 会消耗越来越多的内存,因为它将无法释放任何未确认的消息。为了调试这种错误,你可以使用
rabbitmqctl
来打印messages_unacknowledged
字段sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
在 Windows 上,删除 sudo
rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged
消息持久性
我们已经了解了如何确保即使消费者死亡,任务也不会丢失。但是,如果 RabbitMQ 服务器停止,我们的任务仍然会丢失。
当 RabbitMQ 退出或崩溃时,它会忘记队列和消息,除非你告诉它不要这样做。为了确保消息不会丢失,需要两件事:我们需要将队列和消息都标记为持久性。
首先,我们需要确保队列能够在 RabbitMQ 节点重启后继续存在。为此,我们需要将其声明为持久性
RMQQueue *q = [ch queue:@"hello" options:AMQQueueDeclareDurable];
尽管此命令本身是正确的,但它在我们当前的设置中不会起作用。这是因为我们已经定义了一个名为 hello
的队列,它不是持久性的。RabbitMQ 不允许你使用不同的参数重新定义现有的队列,并将向尝试执行此操作的任何程序返回错误。但是,有一个快速的解决方法——让我们声明一个使用不同名称的队列,例如 task_queue
RMQQueue *q = [ch queue:@"task_queue" options:AMQQueueDeclareDurable];
此 options:AMQQueueDeclareDurable
更改需要应用于生产者和消费者代码。
此时,我们可以确定即使 RabbitMQ 重启,task_queue
队列也不会丢失。现在,我们需要将我们的消息标记为持久性
- 通过使用
persistent
选项。
[ch.defaultExchange publish:msgData routingKey:q.name persistent:YES];
关于消息持久性的说明
将消息标记为持久性并不能完全保证消息不会丢失。虽然它告诉 RabbitMQ 将消息保存到磁盘,但仍然存在一个短暂的时间窗口,在此窗口中,RabbitMQ 已接受消息但尚未保存。此外,RabbitMQ 不会对每条消息执行
fsync(2)
——它可能只是保存到缓存中,而不是真正写入磁盘。持久性保证并不强,但对于我们简单的任务队列来说已经足够了。如果你需要更强的保证,则可以使用发布确认。
公平分配
你可能已经注意到,调度仍然无法完全按照我们想要的方式工作。例如,在有两个 worker 的情况下,当所有奇数消息都很重而所有偶数消息都很轻时,一个 worker 会不断忙碌,而另一个 worker 几乎不会做任何工作。好吧,RabbitMQ 对此一无所知,并将继续平均分配消息。
发生这种情况是因为 RabbitMQ 只是在消息进入队列时才分配消息。它不会查看消费者未确认消息的数量。它只是盲目地将每第 n 个消息分配给第 n 个消费者。
为了解决这个问题,我们可以使用 basicQos:global:
方法,其预取值为 @1
。这告诉 RabbitMQ 一次不向一个 worker 提供超过一条消息。或者,换句话说,在 worker 处理并确认前一条消息之前,不要向其分配新消息。相反,它将把消息分配给下一个尚未忙碌的 worker。
[ch basicQos:@1 global:NO];
关于队列大小的说明
如果所有 worker 都处于繁忙状态,您的队列可能会填满。您需要关注这一点,并可能添加更多 worker 或采用其他策略。
将所有内容整合在一起
我们 newTask:
方法的最终代码
- (void)newTask:(NSString *)msg {
RMQConnection *conn = [[RMQConnection alloc] initWithDelegate:[RMQConnectionDelegateLogger new]];
[conn start];
id<RMQChannel> ch = [conn createChannel];
RMQQueue *q = [ch queue:@"task_queue" options:RMQQueueDeclareDurable];
NSData *msgData = [msg dataUsingEncoding:NSUTF8StringEncoding];
[ch.defaultExchange publish:msgData routingKey:q.name persistent:YES];
NSLog(@"Sent %@", msg);
[conn close];
}
以及我们的 workerNamed:
- (void)workerNamed:(NSString *)name {
RMQConnection *conn = [[RMQConnection alloc] initWithDelegate:[RMQConnectionDelegateLogger new]];
[conn start];
id<RMQChannel> ch = [conn createChannel];
RMQQueue *q = [ch queue:@"task_queue" options:RMQQueueDeclareDurable];
[ch basicQos:@1 global:NO];
NSLog(@"%@: Waiting for messages", name);
RMQBasicConsumeOptions manualAck = RMQBasicConsumeNoOptions;
[q subscribe:manualAck handler:^(RMQMessage * _Nonnull message) {
NSString *messageText = [[NSString alloc] initWithData:message.body encoding:NSUTF8StringEncoding];
NSLog(@"%@: Received %@", name, messageText);
// imitate some work
unsigned int sleepTime = (unsigned int)[messageText componentsSeparatedByString:@"."].count - 1;
NSLog(@"%@: Sleeping for %u seconds", name, sleepTime);
sleep(sleepTime);
[ch ack:message.deliveryTag];
}];
}
使用消息确认和预取,您可以设置一个工作队列。持久性选项使任务即使在 RabbitMQ 重启后也能存活。
现在,我们可以继续学习 教程 3,了解如何将同一消息传递给多个消费者。