跳至主内容

在 Cloud Foundry 上使用 Node.JS 的 RabbitMQ 服务

·5 分钟阅读
Michael Bridgen

最近,我们推出了Cloud Foundry 的 RabbitMQ 服务,让您可以在 Cloud Foundry 上轻松地启动消息代理以供您的应用程序使用。网上有关于将其与 Ruby on Rails 和使用 Spring 的 Java 应用程序的教程。在这里,我们将探讨如何将 RabbitMQ 服务与 Node.JS 应用程序结合使用。

我不会假设你对 Node.JS、npm 或 RabbitMQ 有太多先验知识;但是,如果你都熟悉它们,你将从中受益更多。对于理解 npm,除了实际使用之外,npm 的 man 手册及其配套的开发者信息是最好的指南。对于 RabbitMQ,我推荐 Michael Klishin 的AMQP 消息模型入门指南

GitHub 上的这个仓库包含我们 Node.JS 示例的源代码。让我们先一步步看 package.json

{
"name":"node-srs-demo",
"author": "Michael Bridgen",
"version":"0.0.2",
"dependencies":{
"amqp":">= 0.1.0",
"sanitizer": "*"
}
}

其中最值得关注的是对“amqp”的依赖;版本 0.1.0 支持 Cloud Foundry 将提供的 URL 语法,而且没有理由不使用最新最好的版本。

这是应用程序代码的第一部分。请注意,我们将文件命名为 app.js,这样 Cloud Foundry 就会将其识别为主模块并运行它。

require.paths.unshift('./node_modules');

var http = require('http');
var amqp = require('amqp');
var URL = require('url');
var htmlEscape = require('sanitizer/sanitizer').escape;

function rabbitUrl() {
if (process.env.VCAP_SERVICES) {
conf = JSON.parse(process.env.VCAP_SERVICES);
return conf['rabbitmq-2.4'][0].credentials.url;
}
else {
return "amqp://";
}
}

var port = process.env.VCAP_APP_PORT || 3000;

首先,我们确保 Node.JS 知道我们的库在哪里。在工作目录中使用 npm install 会将它们安装在名为 node_modules 的子目录中,当我们推送到 Cloud Foundry 时,它们会与 app.js 一起被复制过去。

我们的服务连接详细信息以 JSON 对象的形式出现在环境变量中;rabbitUrl 过程会解析该对象并提取 RabbitMQ 服务的 URL。原则上,我们可以将多个 RabbitMQ 服务实例绑定到应用程序——我们在这里假设我们只想要第一个(可能也是唯一的)这样的实例。这就是 [0] 部分。

var messages = [];

function setup() {

var exchange = conn.exchange('cf-demo', {'type': 'fanout', durable: false}, function() {

var queue = conn.queue('', {durable: false, exclusive: true},
function() {
queue.subscribe(function(msg) {
messages.push(htmlEscape(msg.body));
if (messages.length > 10) {
messages.shift();
}
});
queue.bind(exchange.name, '');
});
queue.on('queueBindOk', function() { httpServer(exchange); });
});
}

这是一个我们稍后将调用的过程,用于在我们的 RabbitMQ 实例中创建所有我们需要的东西。

由于客户端中的一切都是异步操作,因此有很多回调。嵌套的顺序取决于我们需要结果的时间;具体来说,我们需要队列才能订阅,并且我们需要队列交换才能绑定队列。注意给队列提供的空名称——这表示队列是匿名的,换句话说,由 RabbitMQ 随机生成名称。

由于我们知道那时所有工作都已完成,因此我们可以跳出最后一个回调(启动 HTTP 服务器的回调)的作用域。

代码的下一部分是我们响应 HTTP 请求的地方。

function httpServer(exchange) {
var serv = http.createServer(function(req, res) {
var url = URL.parse(req.url);
if (req.method == 'GET' && url.pathname == '/env') {
printEnv(res);
}
else if (req.method == 'GET' && url.pathname == '/') {
res.statusCode = 200;
openHtml(res);
writeForm(res);
writeMessages(res);
closeHtml(res);
}
else if (req.method == 'POST' && url.pathname == '/') {
chunks = '';
req.on('data', function(chunk) { chunks += chunk; });
req.on('end', function() {
msg = unescapeFormData(chunks.split('=')[1]);
exchange.publish('', {body: msg});
res.statusCode = 303;
res.setHeader('Location', '/');
res.end();
});
}
else {
res.statusCode = 404;
res.end("This is not the page you were looking for.");
}
});
serv.listen(port);
}

RabbitMQ 部分在中间,我们将消息发布到我们之前的交换。Node.JS AMQP 库会愉快地发布一个对象,将其序列化为 JSON 值;我们之前使用的 subscribe 方法假定 JSON 载荷并将其解析为对象。

app.js 的其余部分只是辅助函数,除了启动整个过程的行(到目前为止我们只编写了回调!)

var conn = amqp.createConnection({url: rabbitUrl()});
conn.on('ready', setup);

你可以看到整体控制流程很简单:

  1. 打开到 RabbitMQ 的连接;完成后,
  2. 构造一个交换和一个队列,将队列绑定到交换并订阅该队列;然后
  3. 启动 HTTP 监听器。

对于这段代码,我倾向于把事情讲清楚。当然,有很多抽象的空间,例如在交换-队列-绑定-订阅模式中,我期望它在应用程序中经常出现。

有关 Cloud Foundry 上的 RabbitMQ 的帮助,请加入 support.cloudfoundry.com 上的论坛。

© . This site is unofficial and not affiliated with VMware.