amqpManager.js 1.71 KB
/**
 * Author: Wesnydy Lima Ribeiro
 * Email: wesnydy@lavid.ufpb.br
 */

'use strict';

/**
 * Required libs.
 */
var amqplib = require('amqplib/callback_api');

/**
 * Function to send text to the queue.
 */
exports.sendToQueue = function(body, id, queue, durability, res) {

  amqplib.connect('amqp://localhost', function(err, conn) {
    if(err) {
      res.json({message : err});
      throw err;
    }
    conn.createChannel(function(err, ch) {
      if(err) {
          res.json({message : err});
          throw err;
      }
      ch.assertQueue(queue, {durable : durability});
      ch.sendToQueue(queue, new Buffer(body), {correlationId : id});
      try {
        ch.close();
      }
      catch (alreadyClosed) {
        console.log(alreadyClosed.stackAtStateChange);
      }
    });
    setTimeout(function() { conn.close(); }, 500000);
  });
};

/**
 * Function to receive gloss from the queue.
 */
exports.receiveFromQueue = function(id, queue, durability, res) {

  amqplib.connect('amqp://localhost', function(err, conn) {
    if(err) {
      res.json({message : err});
      throw err;
    }
    conn.createChannel(function(err, ch) {
      if(err){
        res.json({message : err});
        throw err;
      }
      ch.assertQueue(queue, {durable : durability});
      ch.consume(queue, function(msg) {
        if (msg.properties.correlationId === id) {
          ch.ack(msg);
          res.send(msg.content.toString())
          try {
            ch.close();
          }
          catch (alreadyClosed) {
            console.log(alreadyClosed.stackAtStateChange);
          }
        }
        else {
          ch.reject(msg);
        }
      }, {noAck : false});
    });
    setTimeout(function() { conn.close(); }, 500000);
  });
};