mux.js 3.91 KB
//
//
//

'use strict';

// A Mux is an object into which other readable streams may be piped;
// it then writes 'packets' from the upstreams to the given
// downstream.

var inherits = require('util').inherits;
var assert = require('assert');

var schedule = (typeof setImmediate === 'function') ?
  setImmediate : process.nextTick;

function Mux(downstream) {
  this.newStreams = [];
  this.oldStreams = [];
  this.blocked = false;
  this.scheduledRead = false;

  this.out = downstream;
  var self = this;
  downstream.on('drain', function() {
    self.blocked = false;
    self._readIncoming();
  });
}

// There are 2 states we can be in:

// - waiting for outbound capacity, which will be signalled by a
// - 'drain' event on the downstream; or,

// - no packets to send, waiting for an inbound buffer to have
//   packets, which will be signalled by a 'readable' event

// If we write all packets available whenever there is outbound
// capacity, we will either run out of outbound capacity (`#write`
// returns false), or run out of packets (all calls to an
// `inbound.read()` have returned null).

Mux.prototype._readIncoming = function() {

  // We may be sent here speculatively, if an incoming stream has
  // become readable
  if (this.blocked) return;

  var self = this;
  var accepting = true;
  var out = this.out;

  // Try to read a chunk from each stream in turn, until all streams
  // are empty, or we exhaust our ability to accept chunks.
  function roundrobin(streams) {
    var s;
    // if there's just one incoming stream we don't have to
    // go through all the dequeue/enqueueing
    if (streams.length === 1) {
      s = streams.shift();
      while (accepting) {
        var chunk = s.read();
        if (chunk !== null) {
          accepting = out.write(chunk);
        }
        else break;
      }
      if (!accepting) streams.push(s);
    }
    else {
      while (accepting && (s = streams.shift())) {
        var chunk = s.read();
        if (chunk !== null) {
          accepting = out.write(chunk);
          streams.push(s);
        }
      }
    }
  }

  roundrobin(this.newStreams);

  // Either we exhausted the new queues, or we ran out of capacity. If
  // we ran out of capacity, all the remaining new streams (i.e.,
  // those with packets left) become old streams. This effectively
  // prioritises streams that keep their buffers close to empty over
  // those that are constantly near full.

  if (accepting) { // all new queues are exhausted, write as many as
                   // we can from the old streams
    assert.equal(0, this.newStreams.length);
    roundrobin(this.oldStreams);
  }
  else { // ran out of room
    assert(this.newStreams.length > 0, "Expect some new streams to remain");
    this.oldStreams = this.oldStreams.concat(this.newStreams);
    this.newStreams = [];
  }
  // We may have exhausted all the old queues, or run out of room;
  // either way, all we need to do is record whether we have capacity
  // or not, so any speculative reads will know
  this.blocked = !accepting;
};

Mux.prototype._scheduleRead = function() {
  var self = this;
  
  if (!self.scheduledRead) {
    schedule(function() {
      self.scheduledRead = false;
      self._readIncoming();
    });
    self.scheduledRead = true;
  }
};

Mux.prototype.pipeFrom = function(readable) {
  var self = this;

  function enqueue() {
    self.newStreams.push(readable);
    self._scheduleRead();
  }

  function cleanup() {
    readable.removeListener('readable', enqueue);
    readable.removeListener('error', cleanup);
    readable.removeListener('end', cleanup);
    readable.removeListener('unpipeFrom', cleanupIfMe);
  }
  function cleanupIfMe(dest) {
    if (dest === self) cleanup();
  }

  readable.on('unpipeFrom', cleanupIfMe);
  readable.on('end', cleanup);
  readable.on('error', cleanup);
  readable.on('readable', enqueue);
};

Mux.prototype.unpipeFrom = function(readable) {
  readable.emit('unpipeFrom', this);
};

module.exports.Mux = Mux;