app.js 3.49 KB
/**
 * Author: Jonathan Lincoln Brilhante
 * Email: jonathan.lincoln.brilhante@gmail.com
 *
 * Author: Wesnydy Lima Ribeiro
 * Email: wesnydy@lavid.ufpb.br
 */

'use strict';

/**
 * Module dependencies.
 */
var express = require('express');
var bodyParser = require('body-parser');
var shortid = require('shortid');
var amqp = require('amqplib/callback_api');

/**
 * Express application and router object.
 */
var app = express();
var router = express.Router();

/**
 * For parsing application/json and application/x-www-form-urlencoded
 */
app.use(bodyParser.json());
app.use(bodyParser.urlencoded({ extended: true }));

/**
 * Function to send text to the queue.
 */
function sendToQueue(id, text, queue, durability, res) {
    amqp.connect('amqp://localhost', function(err, conn) {
        if(err) {
            res.json({message : err});
            throw err;
        }
        conn.createChannel(function(err, ch) {
            if(err) {
                res.json({message : err});
                throw err;
            }
            ch.assertQueue(queue, {durable : durability});
            ch.sendToQueue(queue, new Buffer(text), {correlationId : id});
            try {
                ch.close();
            }
            catch (alreadyClosed) {
                console.log(alreadyClosed.stackAtStateChange);
            }
        });
        setTimeout(function() { conn.close(); }, 500000);
    });
}

/**
 * Function to receive gloss from the queue.
 */
function receiveFromQueue(id, queue, durability, res) {
    amqp.connect('amqp://localhost', function(err, conn) {
        if(err) {
            res.json({message : err});
            throw err;
        }
        conn.createChannel(function(err, ch) {
            if(err){
                res.json({message : err});
                throw err;
            }
            ch.assertQueue(queue, {durable : durability});
            ch.consume(queue, function(msg) {
                if (msg.properties.correlationId === id) {
                    ch.ack(msg);
                    res.send(msg.content.toString())
                    try {
                        ch.close();
                    }
                    catch (alreadyClosed) {
                        console.log(alreadyClosed.stackAtStateChange);
                    }
                }
                else {
                    ch.reject(msg);
                }
            }, {noAck : false});
        });
        setTimeout(function() { conn.close(); }, 500000);
    });
}

/**
 * Route to process text and return gloss.
 */
router.post('/', function(req, res) {
    if (!req.body.text)
      return console.log('Text key is missing');

		var id = shortid.generate();
    var text = req.body.text;
    sendToQueue(id, text, 'texts', false, res);
    sendToQueue(id, text, 'log', true, res);
    receiveFromQueue(id, 'glosses', false, res);
});

/**
 * OLD VERSION
 */
router.get('/', function(req, res) {
    var texto = req.param('text').toString('utf8');
    var id = shortid.generate();
    sendToQueue(id, texto, 'texts', false, res);
    sendToQueue(id, texto, 'log', true, res);
    receiveFromQueue(id, 'glosses', false, res);
});

/**
 * Register routes with prefix '/translate'.
 */
app.use('/translate', router);

/**
 * Error handler.
 */
app.use(function(err, req, res, next) {
    res.status(err.status || 500);
    res.json({ message: err.message, error: err });
});

module.exports = app;