RabbitMQ 教程 - "Hello world!"
简介
先决条件
本教程假设 RabbitMQ 已 安装 并在 localhost 上的 标准端口 (5672) 上运行。如果您使用不同的主机、端口或凭据,则需要调整连接设置。
哪里寻求帮助
如果您在学习本教程时遇到困难,可以通过 GitHub Discussions 或 RabbitMQ 社区 Discord 联系我们。
RabbitMQ 是一个消息代理:它接收并转发消息。你可以把它想象成一个邮局:当你把想要寄出的邮件放进邮箱时,你可以确信邮递员最终会将邮件送到你的收件人手中。在这个比喻中,RabbitMQ 就是邮箱、邮局和邮递员。
RabbitMQ 和邮局的主要区别在于,它不处理纸张,而是接收、存储和转发数据的二进制数据块——消息。
RabbitMQ 和消息传递通常会使用一些行话。
-
发送 仅仅意味着发送。发送消息的程序就是生产者。
-
队列 是 RabbitMQ 中邮箱的名称。虽然消息在 RabbitMQ 和你的应用程序之间流动,但它们只能存储在队列 中。队列 的大小仅受主机内存和磁盘空间的限制,它本质上是一个大的消息缓冲区。
许多生产者 可以发送消息到同一个队列,许多消费者 可以尝试从同一个队列 接收数据。
这就是我们表示队列的方式
-
消费 的含义与接收类似。消费者 是一个主要等待接收消息的程序。
请注意,生产者、消费者和代理不一定需要驻留在同一台主机上;事实上,在大多数应用程序中它们都不会。一个应用程序也可以同时是生产者和消费者。
Hello World!
(使用 Pika Python 客户端)
在本教程的这一部分,我们将用 Python 编写两个小程序:一个发送单条消息的生产者(发送者),以及一个接收消息并打印出来的消费者(接收者)。这就是消息传递的“Hello World”。
在下面的图表中,“P”是我们的生产者,“C”是我们的消费者。中间的框是一个队列——RabbitMQ 代表消费者存储的消息缓冲区。
我们的整体设计将是这样的:
生产者将消息发送到“hello”队列。消费者从该队列接收消息。
RabbitMQ 库
RabbitMQ 支持多种协议。本教程使用 AMQP 0-9-1,这是一种开放的、通用的消息协议。有许多用各种不同语言编写的 RabbitMQ 客户端。在本教程系列中,我们将使用 RabbitMQ 团队推荐的 Python 客户端Pika 1.0.0。要安装它,你可以使用
pip包管理工具。python -m pip install pika --upgrade
现在我们已经安装了 Pika,就可以开始编写代码了。
发送
我们的第一个程序 send.py 将发送一条消息到队列。我们需要做的第一件事是与 RabbitMQ 服务器建立连接。
#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
我们现在已经连接到了本地机器上的一个代理——因此是localhost。如果我们要连接到不同机器上的代理,只需在此指定其名称或 IP 地址即可。
接下来,在发送之前,我们需要确保接收队列存在。如果我们发送消息到一个不存在的位置,RabbitMQ 会直接丢弃该消息。让我们创建一个hello队列,消息将被发送到这个队列。
channel.queue_declare(queue='hello')
此时,我们就可以发送消息了。我们的第一条消息将只包含字符串Hello World!,并且我们想将其发送到我们的hello队列。
在 RabbitMQ 中,消息永远不能直接发送到队列,它总是需要通过交换机。但不要被细节困扰——你可以在本教程的第三部分阅读更多关于交换机的内容。现在我们只需要知道如何使用一个由空字符串标识的默认交换机。这个交换机很特别——它允许我们精确指定消息应该发送到哪个队列。队列名称需要在 routing_key 参数中指定。
channel.basic_publish(exchange='',
routing_key='hello',
body='Hello World!')
print(" [x] Sent 'Hello World!'")
在程序退出之前,我们需要确保网络缓冲区已刷新,并且我们的消息已成功发送到 RabbitMQ。我们可以通过平稳地关闭连接来做到这一点。
connection.close()
发送不起作用!
如果这是您第一次使用 RabbitMQ,但没有看到“Sent”消息,您可能会感到困惑,不知道哪里出了问题。也许代理启动时磁盘空间不足(默认需要至少 50 MB 可用空间),因此拒绝接收消息。检查代理的日志文件,看看是否有资源警报已记录,并在必要时降低可用磁盘空间阈值。配置指南将展示如何设置
disk_free_limit。
接收
我们的第二个程序 receive.py 将从队列接收消息并将其打印到屏幕上。
同样,首先我们需要连接到 RabbitMQ 服务器。负责连接 Rabbit 的代码与之前相同。
下一步,与之前一样,是确保队列存在。使用 queue_declare 创建队列是幂等的——我们可以运行该命令任意多次,只会创建一个队列。
channel.queue_declare(queue='hello')
你可能会问,为什么我们要再次声明队列——我们在之前的代码中已经声明过了。如果我们确定队列已经存在,就可以避免这一点。例如,如果 send.py 程序之前已经运行过。但我们还不确定哪个程序会先运行。在这种情况下,在两个程序中重复声明队列是一个好习惯。
列出队列
您可能希望查看 RabbitMQ 有哪些队列以及其中有多少消息。您可以使用
rabbitmqctl工具(作为特权用户)来完成此操作。sudo rabbitmqctl list_queues在 Windows 上,省略 sudo。
rabbitmqctl.bat list_queues
从队列接收消息更复杂。它通过将一个 callback 函数订阅到队列来实现。每当我们收到一条消息时,Pika 库就会调用这个 callback 函数。在我们的例子中,这个函数将在屏幕上打印消息的内容。
def callback(ch, method, properties, body):
print(f" [x] Received {body}")
接下来,我们需要告诉 RabbitMQ,这个特定的回调函数应该接收我们hello队列的消息。
channel.basic_consume(queue='hello',
auto_ack=True,
on_message_callback=callback)
为了使该命令成功,我们必须确保我们想要订阅的队列存在。幸运的是,我们对此很有信心——我们上面已经使用 queue_declare 创建了一个队列。
auto_ack 参数将在稍后进行描述。
最后,我们进入一个永不结束的循环,等待数据并在必要时运行回调函数,并在程序关闭时捕获 KeyboardInterrupt。
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
print('Interrupted')
try:
sys.exit(0)
except SystemExit:
os._exit(0)
总而言之
send.py (源代码)
#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()
receive.py (源代码)
#!/usr/bin/env python
import pika, sys, os
def main():
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
print(f" [x] Received {body}")
channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
print('Interrupted')
try:
sys.exit(0)
except SystemExit:
os._exit(0)
现在我们可以在终端中尝试运行我们的程序。首先,让我们启动一个消费者,它将持续运行并等待消息的传递。
python receive.py
# => [*] Waiting for messages. To exit press CTRL+C
现在在一个新的终端中启动生产者。生产者程序每次运行后都会停止。
python send.py
# => [x] Sent 'Hello World!'
消费者将打印消息。
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'Hello World!'
太棒了!我们已经成功地通过 RabbitMQ 发送了第一条消息。正如你可能注意到的,receive.py 程序不会退出。它将一直准备接收更多消息,并且可以通过 Ctrl-C 中断。
在新的终端中再次尝试运行 send.py。
我们已经学会了如何从命名队列发送和接收消息。现在是时候继续学习第 2 部分,构建一个简单的工作队列了。