mux.js
3.91 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
//
//
//
'use strict';
// A Mux is an object into which other readable streams may be piped;
// it then writes 'packets' from the upstreams to the given
// downstream.
var inherits = require('util').inherits;
var assert = require('assert');
var schedule = (typeof setImmediate === 'function') ?
setImmediate : process.nextTick;
function Mux(downstream) {
this.newStreams = [];
this.oldStreams = [];
this.blocked = false;
this.scheduledRead = false;
this.out = downstream;
var self = this;
downstream.on('drain', function() {
self.blocked = false;
self._readIncoming();
});
}
// There are 2 states we can be in:
// - waiting for outbound capacity, which will be signalled by a
// - 'drain' event on the downstream; or,
// - no packets to send, waiting for an inbound buffer to have
// packets, which will be signalled by a 'readable' event
// If we write all packets available whenever there is outbound
// capacity, we will either run out of outbound capacity (`#write`
// returns false), or run out of packets (all calls to an
// `inbound.read()` have returned null).
Mux.prototype._readIncoming = function() {
// We may be sent here speculatively, if an incoming stream has
// become readable
if (this.blocked) return;
var self = this;
var accepting = true;
var out = this.out;
// Try to read a chunk from each stream in turn, until all streams
// are empty, or we exhaust our ability to accept chunks.
function roundrobin(streams) {
var s;
// if there's just one incoming stream we don't have to
// go through all the dequeue/enqueueing
if (streams.length === 1) {
s = streams.shift();
while (accepting) {
var chunk = s.read();
if (chunk !== null) {
accepting = out.write(chunk);
}
else break;
}
if (!accepting) streams.push(s);
}
else {
while (accepting && (s = streams.shift())) {
var chunk = s.read();
if (chunk !== null) {
accepting = out.write(chunk);
streams.push(s);
}
}
}
}
roundrobin(this.newStreams);
// Either we exhausted the new queues, or we ran out of capacity. If
// we ran out of capacity, all the remaining new streams (i.e.,
// those with packets left) become old streams. This effectively
// prioritises streams that keep their buffers close to empty over
// those that are constantly near full.
if (accepting) { // all new queues are exhausted, write as many as
// we can from the old streams
assert.equal(0, this.newStreams.length);
roundrobin(this.oldStreams);
}
else { // ran out of room
assert(this.newStreams.length > 0, "Expect some new streams to remain");
this.oldStreams = this.oldStreams.concat(this.newStreams);
this.newStreams = [];
}
// We may have exhausted all the old queues, or run out of room;
// either way, all we need to do is record whether we have capacity
// or not, so any speculative reads will know
this.blocked = !accepting;
};
Mux.prototype._scheduleRead = function() {
var self = this;
if (!self.scheduledRead) {
schedule(function() {
self.scheduledRead = false;
self._readIncoming();
});
self.scheduledRead = true;
}
};
Mux.prototype.pipeFrom = function(readable) {
var self = this;
function enqueue() {
self.newStreams.push(readable);
self._scheduleRead();
}
function cleanup() {
readable.removeListener('readable', enqueue);
readable.removeListener('error', cleanup);
readable.removeListener('end', cleanup);
readable.removeListener('unpipeFrom', cleanupIfMe);
}
function cleanupIfMe(dest) {
if (dest === self) cleanup();
}
readable.on('unpipeFrom', cleanupIfMe);
readable.on('end', cleanup);
readable.on('error', cleanup);
readable.on('readable', enqueue);
};
Mux.prototype.unpipeFrom = function(readable) {
readable.emit('unpipeFrom', this);
};
module.exports.Mux = Mux;