跳至主内容

RabbitMQ 教程 - 工作队列

工作队列

(使用 AMQP 1.0 .NET 客户端)

信息

先决条件

本教程假设 RabbitMQ 已 安装 并在 localhost 上的 标准端口 (5672) 上运行。如果您使用不同的主机、端口或凭据,则需要调整连接设置。

哪里寻求帮助

如果您在学习本教程时遇到困难,可以通过 GitHub DiscussionsRabbitMQ 社区 Discord 联系我们。

第一个教程中,我们编写了程序来向指定的队列发送和接收消息。在这一节中,我们将创建一个工作队列(Work Queue),用于在多个工作者之间分发耗时的任务。

工作队列(又称:任务队列)背后的主要思想是避免立即执行一个资源密集型任务并等待其完成。相反,我们安排任务稍后执行。我们将一个任务封装成一条消息并将其发送到队列。一个在后台运行的工作进程会接收任务并最终执行工作。当您运行多个工作进程时,任务将在它们之间共享。

这个概念对于 Web 应用程序尤其有用,在这些应用程序中,不可能在短暂的 HTTP 请求窗口内处理一个复杂的任务。

本教程使用 RabbitMQ AMQP 1.0 .NET 客户端 (RabbitMQ.AMQP.Client)。它需要 RabbitMQ 4.0 或更高版本

准备工作

在上一部分教程中,我们发送了一条包含“Hello World!”的消息。现在我们将发送代表复杂任务的字符串。我们通过睡眠来模拟工作:字符串中的每个 . 增加一秒钟的耗时。

生产者是 NewTask/Program.cs;消费者是 Worker/Program.cs

NewTask 发布消息到仲裁队列(quorum queue)task_queue,并检查 OutcomeState.Accepted

IQueueSpecification queueSpec = management.Queue(taskQueueName).Type(QueueType.QUORUM);
await queueSpec.DeclareAsync();

IPublisher publisher = await connection.PublisherBuilder().Queue(taskQueueName).BuildAsync();
// ...
PublishResult pr = await publisher.PublishAsync(amqpMessage);
if (pr.Outcome.State != OutcomeState.Accepted)
{
Console.Error.WriteLine($"Unexpected publish outcome: {pr.Outcome.State}");
Environment.Exit(1);
}

Worker 使用 InitialCredits(1) 来实现公平调度,并在 DoWork 完成后的 finally 块中调用 ctx.Accept()

IConsumer consumer = await connection.ConsumerBuilder()
.Queue(taskQueueName)
.InitialCredits(1)
.MessageHandler((ctx, message) =>
{
string body = Encoding.UTF8.GetString(message.Body()!);
Console.WriteLine($" [x] Received '{body}'");
try
{
DoWork(body);
}
finally
{
Console.WriteLine(" [x] Done");
ctx.Accept();
}

return Task.CompletedTask;
})
.BuildAndStartAsync();

轮询分发

运行两个工作者,并从第三个终端发布任务(位于 dotnet-amqp 目录中)。

dotnet run --project Worker/Worker.csproj
dotnet run --project Worker/Worker.csproj
dotnet run --project NewTask/NewTask.csproj "First message."
dotnet run --project NewTask/NewTask.csproj "Second message.."

默认情况下,RabbitMQ 会按顺序将每条消息发送给下一个消费者(循环调度)。

消息确认

使用 AMQP 1.0 时,消费者必须结算(settle)每条消息(如 Accept 等)。在工作完成后再进行结算,这样如果任务中途崩溃,消息可以被重新投递。

公平分派

在消费者构建器上使用 InitialCredits(1),这样每个消费者同一时间只能处理一条未结算的消息(类似于 AMQP 0-9-1 中的 prefetch 1 / basicQos)。

总而言之

请参阅 NewTask/Program.csWorker/Program.cs 获取完整源代码(合并至上游后)。

现在我们可以继续学习教程 3,了解如何将同一条消息发送给多个消费者。

© . This site is unofficial and not affiliated with VMware.