在 Cloud Foundry 上使用 Node.JS 的 RabbitMQ 服务
最近,我们为 Cloud Foundry 推出了一个RabbitMQ 服务,使在 Cloud Foundry 上使用应用程序时轻松启动消息代理变得简单。网上有一些教程介绍了如何使用 Ruby on Rails 和使用 Spring 的 Java 应用程序。在这里,我们将探讨如何在 Node.JS 应用程序中使用 RabbitMQ 服务。
我不假设您对 Node.JS、npm 或 RabbitMQ 有太多先验知识;但是,如果您熟悉它们,您将从中获得更多收获。 npm 的手册页 及其附带的 开发人员信息 是理解 npm 的最佳指南,除了实际使用之外。对于 RabbitMQ,我推荐 Michael Klishin 的AMQP 消息模型入门指南。
GitHub 上的这个仓库 包含我们 Node.JS 示例的源代码。让我们首先浏览一下 package.json
。
{
"name":"node-srs-demo",
"author": "Michael Bridgen",
"version":"0.0.2",
"dependencies":{
"amqp":">= 0.1.0",
"sanitizer": "*"
}
}
其中值得关注的是对“amqp”的依赖;版本 0.1.0 支持 Cloud Foundry 将提供的 URL 语法,并且没有理由不使用最新版本。
这是应用程序代码的第一部分。请注意,我们将文件命名为 app.js
,以便 Cloud Foundry 能够识别它为主模块并运行它。
require.paths.unshift('./node_modules');
var http = require('http');
var amqp = require('amqp');
var URL = require('url');
var htmlEscape = require('sanitizer/sanitizer').escape;
function rabbitUrl() {
if (process.env.VCAP_SERVICES) {
conf = JSON.parse(process.env.VCAP_SERVICES);
return conf['rabbitmq-2.4'][0].credentials.url;
}
else {
return "amqp://127.0.0.1";
}
}
var port = process.env.VCAP_APP_PORT || 3000;
首先,我们确保 Node.JS 知道我们的库在哪里。在工作目录中使用 npm install
将在名为 node_modules
的子目录中安装它们,当我们推送到 Cloud Foundry 时,它们将与 app.js
一起复制。
我们的服务连接详细信息作为 JSON 对象存在于环境中;过程 rabbitUrl
解析它并提取 RabbitMQ 服务的 URL。原则上,我们可以将多个 RabbitMQ 服务实例绑定到应用程序——我们在这里假设我们只需要第一个(并且可能是唯一的)这样的实例。那就是 [0]
部分。
var messages = [];
function setup() {
var exchange = conn.exchange('cf-demo', {'type': 'fanout', durable: false}, function() {
var queue = conn.queue('', {durable: false, exclusive: true},
function() {
queue.subscribe(function(msg) {
messages.push(htmlEscape(msg.body));
if (messages.length > 10) {
messages.shift();
}
});
queue.bind(exchange.name, '');
});
queue.on('queueBindOk', function() { httpServer(exchange); });
});
}
这是一个稍后我们将调用的过程,用于创建 RabbitMQ 实例中所需的所有内容。
由于客户端中的所有内容都是异步操作,因此有很多回调函数。嵌套的顺序由我们何时需要结果决定;具体来说,我们需要队列才能订阅,并且我们需要队列和交换机才能绑定队列。请注意为队列提供的空名称——这表示队列将是匿名的,换句话说,由 RabbitMQ 随机生成名称。
我们可以直接跳出最后一个回调函数的范围(启动 HTTP 服务器的那个),因为我们知道到那时所有事情都已完成。
代码的下一部分是我们响应 HTTP 请求的地方。
function httpServer(exchange) {
var serv = http.createServer(function(req, res) {
var url = URL.parse(req.url);
if (req.method == 'GET' && url.pathname == '/env') {
printEnv(res);
}
else if (req.method == 'GET' && url.pathname == '/') {
res.statusCode = 200;
openHtml(res);
writeForm(res);
writeMessages(res);
closeHtml(res);
}
else if (req.method == 'POST' && url.pathname == '/') {
chunks = '';
req.on('data', function(chunk) { chunks += chunk; });
req.on('end', function() {
msg = unescapeFormData(chunks.split('=')[1]);
exchange.publish('', {body: msg});
res.statusCode = 303;
res.setHeader('Location', '/');
res.end();
});
}
else {
res.statusCode = 404;
res.end("This is not the page you were looking for.");
}
});
serv.listen(port);
}
RabbitMQ 部分位于中间,我们从前面的交换机发布消息。Node.JS AMQP 库将很乐意发布一个对象,将其序列化为 JSON 值;我们之前使用的订阅方法假设 JSON 有效负载并将其解析为对象。
其余的 app.js 只是辅助函数,除了启动整个过程的那一行(到目前为止,我们只编写了回调函数!)。
var conn = amqp.createConnection({url: rabbitUrl()});
conn.on('ready', setup);
您可以看到整体控制流程非常简单。
- 打开与 RabbitMQ 的连接;完成后,
- 构建交换机和队列,将队列绑定到交换机并订阅队列;然后
- 启动 HTTP 监听器。
在这段代码中,我倾向于将事情说得更详细。当然,有很大的空间可以进行抽象,例如在交换机-队列-绑定-订阅模式中,我预计在应用程序中经常会重复出现这种模式。
如需获取有关 Cloud Foundry 上 RabbitMQ 的帮助,请加入 support.cloudfoundry.com 上的论坛。