frame.js 5.72 KB
'use strict';

var assert = require('assert');
var succeed = require('./util').succeed;
var fail = require('./util').fail;

var connection = require('../lib/connection');
var Frames = connection.Connection;
var HEARTBEAT = require('../lib/frame').HEARTBEAT;
var Stream = require('stream');
var PassThrough = Stream.PassThrough ||
    require('readable-stream/passthrough');

var defs = require('../lib/defs');

// We'll need to supply a stream which we manipulate ourselves

function inputs() {
  // don't coalesce buffers, since that could mess up properties
  // (e.g., encoded frame size)
  return new PassThrough({objectMode: true});
}

var HB = new Buffer([defs.constants.FRAME_HEARTBEAT,
                     0, 0, // channel 0
                     0, 0, 0, 0, // zero size
                     defs.constants.FRAME_END]);

suite("Explicit parsing", function() {
  
  test('Parse heartbeat', function() {
    var input = inputs();
    var frames = new Frames(input);
    input.write(HB);
    assert(frames.recvFrame() === HEARTBEAT);
    assert(!frames.recvFrame());
  });

  test('Parse partitioned', function() {
    var input = inputs();
    var frames = new Frames(input);
    input.write(HB.slice(0, 3));
    assert(!frames.recvFrame());
    input.write(HB.slice(3));
    assert(frames.recvFrame() === HEARTBEAT);
    assert(!frames.recvFrame());
  });

  function testBogusFrame(name, bytes) {
    test(name, function(done) {
      var input = inputs();
      var frames = new Frames(input);
      frames.frameMax = 5; //for the max frame test
      input.write(new Buffer(bytes));
      frames.step(function(err, frame) {
        if (err != null) done();
        else done(new Error('Was a bogus frame!'));
      });
    });
  }

  testBogusFrame('Wrong sized frame',
                 [defs.constants.FRAME_BODY,
                  0,0, 0,0,0,0, // zero length
                  65, // but a byte!
                  defs.constants.FRAME_END]);

  testBogusFrame('Unknown method frame',
                 [defs.constants.FRAME_METHOD,
                  0,0, 0,0,0,4,
                  0,0,0,0, // garbage ID
                  defs.constants.FRAME_END]);

  testBogusFrame('> max frame',
                 [defs.constants.FRAME_BODY,
                  0,0, 0,0,0,6, // too big!
                  65,66,67,68,69,70,
                  defs.constants.FRAME_END]);

});

// Now for a bit more fun.

var amqp = require('./data');
var claire = require('claire');
var choice = claire.choice;
var forAll = claire.forAll;
var repeat = claire.repeat;
var label = claire.label;
var sequence = claire.sequence;
var transform = claire.transform;
var sized = claire.sized;

var assertEqualModuloDefaults =
  require('./codec').assertEqualModuloDefaults;

var Trace = label('frame trace',
                  repeat(choice.apply(choice, amqp.methods)));

suite("Parsing", function() {

  function testPartitioning(partition) {
    return forAll(Trace).satisfy(function(t) {
      var bufs = [];
      var input = inputs();
      var frames = new Frames(input);
      var i = 0, ex;
      frames.accept = function(f) {
        // A minor hack to make sure we get the assertion exception;
        // otherwise, it's just a test that we reached the line
        // incrementing `i` for each frame.
        try {
          assertEqualModuloDefaults(t[i], f.fields);
        }
        catch (e) {
          ex = e;
        }
        i++;
      };
      
      t.forEach(function(f) {
        f.channel = 0;
        bufs.push(defs.encodeMethod(f.id, 0, f.fields));
      });

      partition(bufs).forEach(input.write.bind(input));
      frames.acceptLoop();
      if (ex) throw ex;
      return i === t.length;
    }).asTest({times: 20})
  };

  test("Parse trace of methods",
       testPartitioning(function(bufs) { return bufs; }));

  test("Parse concat'd methods",
       testPartitioning(function(bufs) {
         return [Buffer.concat(bufs)];
       }));

  test("Parse partitioned methods",
       testPartitioning(function(bufs) {
         var full = Buffer.concat(bufs);
         var onethird = Math.floor(full.length / 3);
         var twothirds = 2 * onethird;
         return [
           full.slice(0, onethird),
           full.slice(onethird, twothirds),
           full.slice(twothirds)
         ];
       }));
});

var FRAME_MAX_MAX = 4096 * 4;
var FRAME_MAX_MIN = 4096;

var FrameMax = amqp.rangeInt('frame max',
                             FRAME_MAX_MIN,
                             FRAME_MAX_MAX);

var Body = sized(function(_n) {
  return Math.floor(Math.random() * FRAME_MAX_MAX);
}, repeat(amqp.Octet));

var Content = transform(function(args) {
  return {
    method: args[0].fields,
    header: args[1].fields,
    body: new Buffer(args[2])
  }
}, sequence(amqp.methods['BasicDeliver'],
            amqp.properties['BasicProperties'], Body));

suite("Content framing", function() {
  test("Adhere to frame max",
       forAll(Content, FrameMax).satisfy(function(content, max) {
         var input = inputs();
         var frames = new Frames(input);
         frames.frameMax = max;
         frames.sendMessage(
           0,
           defs.BasicDeliver, content.method,
           defs.BasicProperties, content.header,
           content.body);
         var f, i = 0, largest = 0;
         while (f = input.read()) {
           i++;
           if (f.length > largest) largest = f.length;
           if (f.length > max) {
             return false;
           }
         }
         // The ratio of frames to 'contents' should always be >= 2
         // (one properties frame and at least one content frame); > 2
         // indicates fragmentation. The largest is always, of course <= frame max
         //console.log('Frames: %d; frames per message: %d; largest frame %d', i, i / t.length, largest);
         return true;
       }).asTest());
});