connection.js 10.4 KB
'use strict';

var assert = require('assert');
var defs = require('../lib/defs');
var Connection = require('../lib/connection').Connection;
var HEARTBEAT = require('../lib/frame').HEARTBEAT;
var HB_BUF = require('../lib/frame').HEARTBEAT_BUF;
var util = require('./util');
var succeed = util.succeed, fail = util.fail, latch = util.latch;
var completes = util.completes;
var kCallback = util.kCallback;

var LOG_ERRORS = process.env.LOG_ERRORS;

var OPEN_OPTS = {
  // start-ok
  'clientProperties': {},
  'mechanism': 'PLAIN',
  'response': new Buffer(['', 'guest', 'guest'].join(String.fromCharCode(0))),
  'locale': 'en_US',
  
  // tune-ok
  'channelMax': 0,
  'frameMax': 0,
  'heartbeat': 0,
  
  // open
  'virtualHost': '/',
  'capabilities': '',
  'insist': 0
};
module.exports.OPEN_OPTS = OPEN_OPTS;

function happy_open(send, await) {
  // kick it off
  send(defs.ConnectionStart,
       {versionMajor: 0,
        versionMinor: 9,
        serverProperties: {},
        mechanisms: new Buffer('PLAIN'),
        locales: new Buffer('en_US')});
  return await(defs.ConnectionStartOk)()
    .then(function(f) {
      send(defs.ConnectionTune,
           {channelMax: 0,
            heartbeat: 0,
            frameMax: 0});
    })
    .then(await(defs.ConnectionTuneOk))
    .then(await(defs.ConnectionOpen))
    .then(function(f) {
      send(defs.ConnectionOpenOk,
           {knownHosts: ''});
    });
}
module.exports.connection_handshake = happy_open;

function connectionTest(client, server) {
  return function(done) {
    var bothDone = latch(2, done);
    var pair = util.socketPair();
    var c = new Connection(pair.client);
    if (LOG_ERRORS) c.on('error', console.warn);
    client(c, bothDone);

    // NB only not a race here because the writes are synchronous
    var protocolHeader = pair.server.read(8);
    assert.deepEqual(new Buffer("AMQP" + String.fromCharCode(0,0,9,1)),
                     protocolHeader);

    var s = util.runServer(pair.server, function(send, await) {
      server(send, await, bothDone, pair.server);
    });
  };
}

suite("Connection errors", function() {

  test("socket close during open", function(done) {
    // RabbitMQ itself will take at least 3 seconds to close the socket
    // in the event of a handshake problem. Instead of using a live
    // connection, I'm just going to pretend.
    var pair = util.socketPair();
    var conn = new Connection(pair.client);
    pair.server.on('readable', function() {
      pair.server.end();
    });
    conn.open({}, kCallback(fail(done), succeed(done)));
  });

  test("bad frame during open", function(done) {
    var ss = util.socketPair();
    var conn = new (require('../lib/connection').Connection)(ss.client);
    ss.server.on('readable', function() {
      ss.server.write(new Buffer([0, 0, 0, 0, 0, 0, 0, 0, 0, 0]));
    });
    conn.open({}, kCallback(fail(done), succeed(done)));
  });

});

suite("Connection open", function() {

test("happy", connectionTest(
  function(c, done) {
    c.open(OPEN_OPTS, kCallback(succeed(done), fail(done)));
  },
  function(send, await, done) {
    happy_open(send, await).then(succeed(done), fail(done));
  }));

test("wrong first frame", connectionTest(
  function(c, done) {
    c.open(OPEN_OPTS, kCallback(fail(done), succeed(done)));
  },
  function(send, await, done) {
    // bad server! bad! whatever were you thinking?
    completes(function() {
      send(defs.ConnectionTune,
           {channelMax: 0,
            heartbeat: 0,
            frameMax: 0});
    }, done);
  }));

test("unexpected socket close", connectionTest(
  function(c, done) {
    c.open(OPEN_OPTS, kCallback(fail(done), succeed(done)));
  },
  function(send, await, done, socket) {
    send(defs.ConnectionStart,
         {versionMajor: 0,
          versionMinor: 9,
          serverProperties: {},
          mechanisms: new Buffer('PLAIN'),
          locales: new Buffer('en_US')});
    return await(defs.ConnectionStartOk)()
      .then(function() {
        socket.end();
      })
      .then(succeed(done), fail(done));
  }));

});

suite("Connection running", function() {

test("wrong frame on channel 0", connectionTest(
  function(c, done) {
    c.on('error', succeed(done));
    c.open(OPEN_OPTS);
  },
  function(send, await, done) {
    happy_open(send, await)
      .then(function() {
        // there's actually nothing that would plausibly be sent to a
        // just opened connection, so this is violating more than one
        // rule. Nonetheless.
        send(defs.ChannelOpenOk, {channelId: new Buffer('')}, 0);
      })
      .then(await(defs.ConnectionClose))
      .then(function(close) {
        send(defs.ConnectionCloseOk, {}, 0);
      }).then(succeed(done), fail(done));
  }));

test("unopened channel",  connectionTest(
  function(c, done) {
    c.on('error', succeed(done));
    c.open(OPEN_OPTS);
  },
  function(send, await, done) {
    happy_open(send, await)
      .then(function() {
        // there's actually nothing that would plausibly be sent to a
        // just opened connection, so this is violating more than one
        // rule. Nonetheless.
        send(defs.ChannelOpenOk, {channelId: new Buffer('')}, 3);
      })
      .then(await(defs.ConnectionClose))
      .then(function(close) {
        send(defs.ConnectionCloseOk, {}, 0);
      }).then(succeed(done), fail(done));
  }));

test("unexpected socket close", connectionTest(
  function(c, done) {
    var errorAndClosed = latch(2, done);
    c.on('error', succeed(errorAndClosed));
    c.on('close', succeed(errorAndClosed));
    c.open(OPEN_OPTS, kCallback(function() {
      c.sendHeartbeat();
    }, fail(errorAndClosed)));
  },
  function(send, await, done, socket) {
    happy_open(send, await)
      .then(await())
      .then(function() {
        socket.end();
      }).then(succeed(done));
  }));

test("connection.blocked", connectionTest(
  function(c, done) {
    c.on('blocked', succeed(done));
    c.open(OPEN_OPTS);
  },
  function(send, await, done, socket) {
    happy_open(send, await)
      .then(function() {
        send(defs.ConnectionBlocked, {reason: 'felt like it'}, 0);
      })
      .then(succeed(done));
  }));

test("connection.unblocked", connectionTest(
  function(c, done) {
    c.on('unblocked', succeed(done));
    c.open(OPEN_OPTS);
  },
  function(send, await, done, socket) {
    happy_open(send, await)
      .then(function() {
        send(defs.ConnectionUnblocked, {}, 0);
      })
      .then(succeed(done));
  }));


});

suite("Connection close", function() {

test("happy", connectionTest(
  function(c, done0) {
    var done = latch(2, done0);
    c.on('close', done);
    c.open(OPEN_OPTS, kCallback(function(_ok) {
      c.close(kCallback(succeed(done), fail(done)));
    }, function() {}));
  },
  function(send, await, done) {
    happy_open(send, await)
      .then(await(defs.ConnectionClose))
      .then(function(close) {
        send(defs.ConnectionCloseOk, {});
      })
      .then(succeed(done), fail(done));
  }));

test("interleaved close frames", connectionTest(
  function(c, done0) {
    var done = latch(2, done0);
    c.on('close', done);
    c.open(OPEN_OPTS, kCallback(function(_ok) {
      c.close(kCallback(succeed(done), fail(done)));
    }, done));
  },
  function(send, await, done) {
    happy_open(send, await)
      .then(await(defs.ConnectionClose))
      .then(function(f) {
        send(defs.ConnectionClose, {
          replyText: "Ha!",
          replyCode: defs.constants.REPLY_SUCCESS,
          methodId: 0, classId: 0
        });
      })
      .then(await(defs.ConnectionCloseOk))
      .then(function(f) {
        send(defs.ConnectionCloseOk, {});
      })
      .then(succeed(done), fail(done));
  }));

test("server error close", connectionTest(
  function(c, done0) {
    var done = latch(2, done0);
    c.on('close', succeed(done));
    c.on('error', succeed(done));
    c.open(OPEN_OPTS);
  },
  function(send, await, done) {
    happy_open(send, await)
      .then(function(f) {
        send(defs.ConnectionClose, {
          replyText: "Begone",
          replyCode: defs.constants.INTERNAL_ERROR,
          methodId: 0, classId: 0
        });
      })
      .then(await(defs.ConnectionCloseOk))
      .then(succeed(done), fail(done));
  }));

test("operator-intiated close", connectionTest(
  function(c, done) {
    c.on('close', succeed(done));
    c.on('error', fail(done));
    c.open(OPEN_OPTS);
  },
  function(send, await, done) {
    happy_open(send, await)
      .then(function(f) {
        send(defs.ConnectionClose, {
          replyText: "Begone",
          replyCode: defs.constants.CONNECTION_FORCED,
          methodId: 0, classId: 0
        });
      })
      .then(await(defs.ConnectionCloseOk))
      .then(succeed(done), fail(done));
  }));


test("double close", connectionTest(
  function(c, done) {
    c.open(OPEN_OPTS, kCallback(function() {
      c.close();
      // NB no synchronisation, we do this straight away
      assert.throws(function() {
        c.close();
      });
      done();
    }, done));
  },
  function(send, await, done) {
    happy_open(send, await)
      .then(await(defs.ConnectionClose))
      .then(function() {
        send(defs.ConnectionCloseOk, {});
      })
      .then(succeed(done), fail(done));
  }));

});

suite("heartbeats", function() {

var heartbeat = require('../lib/heartbeat');

setup(function() {
  heartbeat.UNITS_TO_MS = 20;
});

teardown(function() {
  heartbeat.UNITS_TO_MS = 1000;
});

test("send heartbeat after open", connectionTest(
  function(c, done) {
    completes(function() {
      var opts = Object.create(OPEN_OPTS);
      opts.heartbeat = 1;
      // Don't leave the error waiting to happen for the next test, this
      // confuses mocha awfully
      c.on('error', function() {});
      c.open(opts);
    }, done);
  },
  function(send, await, done, socket) {
    var timer;
    happy_open(send, await)
      .then(function() {
        timer = setInterval(function() {
          socket.write(HB_BUF);
        }, heartbeat.UNITS_TO_MS);
      })
      .then(await())
      .then(function(hb) {
        if (hb === HEARTBEAT) done();
        else done("Next frame after silence not a heartbeat");
        clearInterval(timer);
      });
  }));

test("detect lack of heartbeats", connectionTest(
  function(c, done) {
    var opts = Object.create(OPEN_OPTS);
    opts.heartbeat = 1;
    c.on('error', succeed(done));
    c.open(opts);
  },
  function(send, await, done, socket) {
    happy_open(send, await)
      .then(succeed(done), fail(done));
    // conspicuously not sending anything ...
  }));

});