amqpManager.js
1.71 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
/**
* Author: Wesnydy Lima Ribeiro
* Email: wesnydy@lavid.ufpb.br
*/
'use strict';
/**
* Required libs.
*/
var amqplib = require('amqplib/callback_api');
/**
* Function to send text to the queue.
*/
exports.sendToQueue = function(body, id, queue, durability, res) {
amqplib.connect('amqp://localhost', function(err, conn) {
if(err) {
res.json({message : err});
throw err;
}
conn.createChannel(function(err, ch) {
if(err) {
res.json({message : err});
throw err;
}
ch.assertQueue(queue, {durable : durability});
ch.sendToQueue(queue, new Buffer(body), {correlationId : id});
try {
ch.close();
}
catch (alreadyClosed) {
console.log(alreadyClosed.stackAtStateChange);
}
});
setTimeout(function() { conn.close(); }, 500000);
});
};
/**
* Function to receive gloss from the queue.
*/
exports.receiveFromQueue = function(id, queue, durability, res) {
amqplib.connect('amqp://localhost', function(err, conn) {
if(err) {
res.json({message : err});
throw err;
}
conn.createChannel(function(err, ch) {
if(err){
res.json({message : err});
throw err;
}
ch.assertQueue(queue, {durable : durability});
ch.consume(queue, function(msg) {
if (msg.properties.correlationId === id) {
ch.ack(msg);
res.send(msg.content.toString())
try {
ch.close();
}
catch (alreadyClosed) {
console.log(alreadyClosed.stackAtStateChange);
}
}
else {
ch.reject(msg);
}
}, {noAck : false});
});
setTimeout(function() { conn.close(); }, 500000);
});
};