// // // 'use strict'; // A Mux is an object into which other readable streams may be piped; // it then writes 'packets' from the upstreams to the given // downstream. var inherits = require('util').inherits; var assert = require('assert'); var schedule = (typeof setImmediate === 'function') ? setImmediate : process.nextTick; function Mux(downstream) { this.newStreams = []; this.oldStreams = []; this.blocked = false; this.scheduledRead = false; this.out = downstream; var self = this; downstream.on('drain', function() { self.blocked = false; self._readIncoming(); }); } // There are 2 states we can be in: // - waiting for outbound capacity, which will be signalled by a // - 'drain' event on the downstream; or, // - no packets to send, waiting for an inbound buffer to have // packets, which will be signalled by a 'readable' event // If we write all packets available whenever there is outbound // capacity, we will either run out of outbound capacity (`#write` // returns false), or run out of packets (all calls to an // `inbound.read()` have returned null). Mux.prototype._readIncoming = function() { // We may be sent here speculatively, if an incoming stream has // become readable if (this.blocked) return; var self = this; var accepting = true; var out = this.out; // Try to read a chunk from each stream in turn, until all streams // are empty, or we exhaust our ability to accept chunks. function roundrobin(streams) { var s; // if there's just one incoming stream we don't have to // go through all the dequeue/enqueueing if (streams.length === 1) { s = streams.shift(); while (accepting) { var chunk = s.read(); if (chunk !== null) { accepting = out.write(chunk); } else break; } if (!accepting) streams.push(s); } else { while (accepting && (s = streams.shift())) { var chunk = s.read(); if (chunk !== null) { accepting = out.write(chunk); streams.push(s); } } } } roundrobin(this.newStreams); // Either we exhausted the new queues, or we ran out of capacity. If // we ran out of capacity, all the remaining new streams (i.e., // those with packets left) become old streams. This effectively // prioritises streams that keep their buffers close to empty over // those that are constantly near full. if (accepting) { // all new queues are exhausted, write as many as // we can from the old streams assert.equal(0, this.newStreams.length); roundrobin(this.oldStreams); } else { // ran out of room assert(this.newStreams.length > 0, "Expect some new streams to remain"); this.oldStreams = this.oldStreams.concat(this.newStreams); this.newStreams = []; } // We may have exhausted all the old queues, or run out of room; // either way, all we need to do is record whether we have capacity // or not, so any speculative reads will know this.blocked = !accepting; }; Mux.prototype._scheduleRead = function() { var self = this; if (!self.scheduledRead) { schedule(function() { self.scheduledRead = false; self._readIncoming(); }); self.scheduledRead = true; } }; Mux.prototype.pipeFrom = function(readable) { var self = this; function enqueue() { self.newStreams.push(readable); self._scheduleRead(); } function cleanup() { readable.removeListener('readable', enqueue); readable.removeListener('error', cleanup); readable.removeListener('end', cleanup); readable.removeListener('unpipeFrom', cleanupIfMe); } function cleanupIfMe(dest) { if (dest === self) cleanup(); } readable.on('unpipeFrom', cleanupIfMe); readable.on('end', cleanup); readable.on('error', cleanup); readable.on('readable', enqueue); }; Mux.prototype.unpipeFrom = function(readable) { readable.emit('unpipeFrom', this); }; module.exports.Mux = Mux;