'use strict'; var assert = require('assert'); var defs = require('../lib/defs'); var Connection = require('../lib/connection').Connection; var HEARTBEAT = require('../lib/frame').HEARTBEAT; var HB_BUF = require('../lib/frame').HEARTBEAT_BUF; var util = require('./util'); var succeed = util.succeed, fail = util.fail, latch = util.latch; var completes = util.completes; var kCallback = util.kCallback; var LOG_ERRORS = process.env.LOG_ERRORS; var OPEN_OPTS = { // start-ok 'clientProperties': {}, 'mechanism': 'PLAIN', 'response': new Buffer(['', 'guest', 'guest'].join(String.fromCharCode(0))), 'locale': 'en_US', // tune-ok 'channelMax': 0, 'frameMax': 0, 'heartbeat': 0, // open 'virtualHost': '/', 'capabilities': '', 'insist': 0 }; module.exports.OPEN_OPTS = OPEN_OPTS; function happy_open(send, await) { // kick it off send(defs.ConnectionStart, {versionMajor: 0, versionMinor: 9, serverProperties: {}, mechanisms: new Buffer('PLAIN'), locales: new Buffer('en_US')}); return await(defs.ConnectionStartOk)() .then(function(f) { send(defs.ConnectionTune, {channelMax: 0, heartbeat: 0, frameMax: 0}); }) .then(await(defs.ConnectionTuneOk)) .then(await(defs.ConnectionOpen)) .then(function(f) { send(defs.ConnectionOpenOk, {knownHosts: ''}); }); } module.exports.connection_handshake = happy_open; function connectionTest(client, server) { return function(done) { var bothDone = latch(2, done); var pair = util.socketPair(); var c = new Connection(pair.client); if (LOG_ERRORS) c.on('error', console.warn); client(c, bothDone); // NB only not a race here because the writes are synchronous var protocolHeader = pair.server.read(8); assert.deepEqual(new Buffer("AMQP" + String.fromCharCode(0,0,9,1)), protocolHeader); var s = util.runServer(pair.server, function(send, await) { server(send, await, bothDone, pair.server); }); }; } suite("Connection errors", function() { test("socket close during open", function(done) { // RabbitMQ itself will take at least 3 seconds to close the socket // in the event of a handshake problem. Instead of using a live // connection, I'm just going to pretend. var pair = util.socketPair(); var conn = new Connection(pair.client); pair.server.on('readable', function() { pair.server.end(); }); conn.open({}, kCallback(fail(done), succeed(done))); }); test("bad frame during open", function(done) { var ss = util.socketPair(); var conn = new (require('../lib/connection').Connection)(ss.client); ss.server.on('readable', function() { ss.server.write(new Buffer([0, 0, 0, 0, 0, 0, 0, 0, 0, 0])); }); conn.open({}, kCallback(fail(done), succeed(done))); }); }); suite("Connection open", function() { test("happy", connectionTest( function(c, done) { c.open(OPEN_OPTS, kCallback(succeed(done), fail(done))); }, function(send, await, done) { happy_open(send, await).then(succeed(done), fail(done)); })); test("wrong first frame", connectionTest( function(c, done) { c.open(OPEN_OPTS, kCallback(fail(done), succeed(done))); }, function(send, await, done) { // bad server! bad! whatever were you thinking? completes(function() { send(defs.ConnectionTune, {channelMax: 0, heartbeat: 0, frameMax: 0}); }, done); })); test("unexpected socket close", connectionTest( function(c, done) { c.open(OPEN_OPTS, kCallback(fail(done), succeed(done))); }, function(send, await, done, socket) { send(defs.ConnectionStart, {versionMajor: 0, versionMinor: 9, serverProperties: {}, mechanisms: new Buffer('PLAIN'), locales: new Buffer('en_US')}); return await(defs.ConnectionStartOk)() .then(function() { socket.end(); }) .then(succeed(done), fail(done)); })); }); suite("Connection running", function() { test("wrong frame on channel 0", connectionTest( function(c, done) { c.on('error', succeed(done)); c.open(OPEN_OPTS); }, function(send, await, done) { happy_open(send, await) .then(function() { // there's actually nothing that would plausibly be sent to a // just opened connection, so this is violating more than one // rule. Nonetheless. send(defs.ChannelOpenOk, {channelId: new Buffer('')}, 0); }) .then(await(defs.ConnectionClose)) .then(function(close) { send(defs.ConnectionCloseOk, {}, 0); }).then(succeed(done), fail(done)); })); test("unopened channel", connectionTest( function(c, done) { c.on('error', succeed(done)); c.open(OPEN_OPTS); }, function(send, await, done) { happy_open(send, await) .then(function() { // there's actually nothing that would plausibly be sent to a // just opened connection, so this is violating more than one // rule. Nonetheless. send(defs.ChannelOpenOk, {channelId: new Buffer('')}, 3); }) .then(await(defs.ConnectionClose)) .then(function(close) { send(defs.ConnectionCloseOk, {}, 0); }).then(succeed(done), fail(done)); })); test("unexpected socket close", connectionTest( function(c, done) { var errorAndClosed = latch(2, done); c.on('error', succeed(errorAndClosed)); c.on('close', succeed(errorAndClosed)); c.open(OPEN_OPTS, kCallback(function() { c.sendHeartbeat(); }, fail(errorAndClosed))); }, function(send, await, done, socket) { happy_open(send, await) .then(await()) .then(function() { socket.end(); }).then(succeed(done)); })); test("connection.blocked", connectionTest( function(c, done) { c.on('blocked', succeed(done)); c.open(OPEN_OPTS); }, function(send, await, done, socket) { happy_open(send, await) .then(function() { send(defs.ConnectionBlocked, {reason: 'felt like it'}, 0); }) .then(succeed(done)); })); test("connection.unblocked", connectionTest( function(c, done) { c.on('unblocked', succeed(done)); c.open(OPEN_OPTS); }, function(send, await, done, socket) { happy_open(send, await) .then(function() { send(defs.ConnectionUnblocked, {}, 0); }) .then(succeed(done)); })); }); suite("Connection close", function() { test("happy", connectionTest( function(c, done0) { var done = latch(2, done0); c.on('close', done); c.open(OPEN_OPTS, kCallback(function(_ok) { c.close(kCallback(succeed(done), fail(done))); }, function() {})); }, function(send, await, done) { happy_open(send, await) .then(await(defs.ConnectionClose)) .then(function(close) { send(defs.ConnectionCloseOk, {}); }) .then(succeed(done), fail(done)); })); test("interleaved close frames", connectionTest( function(c, done0) { var done = latch(2, done0); c.on('close', done); c.open(OPEN_OPTS, kCallback(function(_ok) { c.close(kCallback(succeed(done), fail(done))); }, done)); }, function(send, await, done) { happy_open(send, await) .then(await(defs.ConnectionClose)) .then(function(f) { send(defs.ConnectionClose, { replyText: "Ha!", replyCode: defs.constants.REPLY_SUCCESS, methodId: 0, classId: 0 }); }) .then(await(defs.ConnectionCloseOk)) .then(function(f) { send(defs.ConnectionCloseOk, {}); }) .then(succeed(done), fail(done)); })); test("server error close", connectionTest( function(c, done0) { var done = latch(2, done0); c.on('close', succeed(done)); c.on('error', succeed(done)); c.open(OPEN_OPTS); }, function(send, await, done) { happy_open(send, await) .then(function(f) { send(defs.ConnectionClose, { replyText: "Begone", replyCode: defs.constants.INTERNAL_ERROR, methodId: 0, classId: 0 }); }) .then(await(defs.ConnectionCloseOk)) .then(succeed(done), fail(done)); })); test("operator-intiated close", connectionTest( function(c, done) { c.on('close', succeed(done)); c.on('error', fail(done)); c.open(OPEN_OPTS); }, function(send, await, done) { happy_open(send, await) .then(function(f) { send(defs.ConnectionClose, { replyText: "Begone", replyCode: defs.constants.CONNECTION_FORCED, methodId: 0, classId: 0 }); }) .then(await(defs.ConnectionCloseOk)) .then(succeed(done), fail(done)); })); test("double close", connectionTest( function(c, done) { c.open(OPEN_OPTS, kCallback(function() { c.close(); // NB no synchronisation, we do this straight away assert.throws(function() { c.close(); }); done(); }, done)); }, function(send, await, done) { happy_open(send, await) .then(await(defs.ConnectionClose)) .then(function() { send(defs.ConnectionCloseOk, {}); }) .then(succeed(done), fail(done)); })); }); suite("heartbeats", function() { var heartbeat = require('../lib/heartbeat'); setup(function() { heartbeat.UNITS_TO_MS = 20; }); teardown(function() { heartbeat.UNITS_TO_MS = 1000; }); test("send heartbeat after open", connectionTest( function(c, done) { completes(function() { var opts = Object.create(OPEN_OPTS); opts.heartbeat = 1; // Don't leave the error waiting to happen for the next test, this // confuses mocha awfully c.on('error', function() {}); c.open(opts); }, done); }, function(send, await, done, socket) { var timer; happy_open(send, await) .then(function() { timer = setInterval(function() { socket.write(HB_BUF); }, heartbeat.UNITS_TO_MS); }) .then(await()) .then(function(hb) { if (hb === HEARTBEAT) done(); else done("Next frame after silence not a heartbeat"); clearInterval(timer); }); })); test("detect lack of heartbeats", connectionTest( function(c, done) { var opts = Object.create(OPEN_OPTS); opts.heartbeat = 1; c.on('error', succeed(done)); c.open(opts); }, function(send, await, done, socket) { happy_open(send, await) .then(succeed(done), fail(done)); // conspicuously not sending anything ... })); });