// // // 'use strict'; var defs = require('./defs'); var constants = defs.constants; var frame = require('./frame'); var HEARTBEAT = frame.HEARTBEAT; var Mux = require('./mux').Mux; var Duplex = require('stream').Duplex || require('readable-stream/duplex'); var EventEmitter = require('events').EventEmitter; var Heart = require('./heartbeat').Heart; var methodName = require('./format').methodName; var closeMsg = require('./format').closeMessage; var inspect = require('./format').inspect; var BitSet = require('./bitset').BitSet; var inherits = require('util').inherits; var fmt = require('util').format; var PassThrough = require('stream').PassThrough || require('readable-stream/passthrough'); var IllegalOperationError = require('./error').IllegalOperationError; var stackCapture = require('./error').stackCapture; // High-water mark for channel write buffers, in 'objects' (which are // encoded frames as buffers). var DEFAULT_WRITE_HWM = 1024; // If all the frames of a message (method, properties, content) total // to less than this, copy them into a single buffer and write it all // at once. Note that this is less than the minimum frame size: if it // was greater, we might have to fragment the content. var SINGLE_CHUNK_THRESHOLD = 2048; function Connection(underlying) { EventEmitter.call( this ); var stream = this.stream = wrapStream(underlying); this.muxer = new Mux(stream); // frames this.rest = new Buffer(0); this.frameMax = constants.FRAME_MIN_SIZE; this.sentSinceLastCheck = false; this.recvSinceLastCheck = false; this.expectSocketClose = false; this.freeChannels = new BitSet(); this.channels = [{channel: {accept: channel0(this)}, buffer: underlying}]; } inherits(Connection, EventEmitter); var C = Connection.prototype; // Usual frame accept mode function mainAccept(frame) { var rec = this.channels[frame.channel]; if (rec) { return rec.channel.accept(frame); } // NB CHANNEL_ERROR may not be right, but I don't know what is .. else this.closeWithError( fmt('Frame on unknown channel %d', frame.channel), constants.CHANNEL_ERROR, new Error(fmt("Frame on unknown channel: %s", inspect(frame, false)))); } // Handle anything that comes through on channel 0, that's the // connection control channel. This is only used once mainAccept is // installed as the frame handler, after the opening handshake. function channel0(connection) { return function(f) { // Once we get a 'close', we know 1. we'll get no more frames, and // 2. anything we send except close, or close-ok, will be // ignored. If we already sent 'close', this won't be invoked since // we're already in closing mode; if we didn't well we're not going // to send it now are we. if (f === HEARTBEAT); // ignore; it's already counted as activity // on the socket, which is its purpose else if (f.id === defs.ConnectionClose) { // Oh. OK. I guess we're done here then. connection.sendMethod(0, defs.ConnectionCloseOk, {}); var emsg = fmt('Connection closed: %s', closeMsg(f)); var s = stackCapture(emsg); var e = new Error(emsg); e.code = f.fields.replyCode; if (isFatalError(e)) { connection.emit('error', e); } connection.toClosed(s, e); } else if (f.id === defs.ConnectionBlocked) { connection.emit('blocked', f.fields.reason); } else if (f.id === defs.ConnectionUnblocked) { connection.emit('unblocked'); } else { connection.closeWithError( fmt("Unexpected frame on channel 0"), constants.UNEXPECTED_FRAME, new Error(fmt("Unexpected frame on channel 0: %s", inspect(f, false)))); } }; } // This changed between versions, as did the codec, methods, etc. AMQP // 0-9-1 is fairly similar to 0.8, but better, and nothing implements // 0.8 that doesn't implement 0-9-1. In other words, it doesn't make // much sense to generalise here. C.sendProtocolHeader = function() { this.sendBytes(frame.PROTOCOL_HEADER); }; /* The frighteningly complicated opening protocol (spec section 2.2.4): Client -> Server protocol header -> <- start start-ok -> .. next two zero or more times .. <- secure secure-ok -> <- tune tune-ok -> open -> <- open-ok If I'm only supporting SASL's PLAIN mechanism (which I am for the time being), it gets a bit easier since the server won't in general send back a `secure`, it'll just send `tune` after the `start-ok`. (SASL PLAIN: http://tools.ietf.org/html/rfc4616) */ C.open = function(allFields, openCallback0) { var self = this; var openCallback = openCallback0 || function() {}; // This is where we'll put our negotiated values var tunedOptions = Object.create(allFields); function wait(k) { self.step(function(err, frame) { if (err !== null) bail(err); else if (frame.channel !== 0) { bail(new Error( fmt("Frame on channel != 0 during handshake: %s", inspect(frame, false)))); } else k(frame); }); } function expect(Method, k) { wait(function(frame) { if (frame.id === Method) k(frame); else { bail(new Error( fmt("Expected %s; got %s", methodName(Method), inspect(frame, false)))); } }); } function bail(err) { openCallback(err); } function send(Method) { // This can throw an exception if there's some problem with the // options; e.g., something is a string instead of a number. try { self.sendMethod(0, Method, tunedOptions); } catch (err) { bail(err); } } function negotiate(server, desired) { // We get sent values for channelMax, frameMax and heartbeat, // which we may accept or lower (subject to a minimum for // frameMax, but we'll leave that to the server to enforce). In // all cases, `0` really means "no limit", or rather the highest // value in the encoding, e.g., unsigned short for channelMax. if (server === 0 || desired === 0) { // i.e., whichever places a limit, if either return Math.max(server, desired); } else { return Math.min(server, desired); } } function onStart(start) { var mechanisms = start.fields.mechanisms.toString().split(' '); if (mechanisms.indexOf(allFields.mechanism) < 0) { bail(new Error(fmt('SASL mechanism %s is not provided by the server', allFields.mechanism))); return; } send(defs.ConnectionStartOk); wait(afterStartOk); } function afterStartOk(reply) { switch (reply.id) { case defs.ConnectionSecure: bail(new Error( "Wasn't expecting to have to go through secure")); break; case defs.ConnectionClose: bail(new Error(fmt("Handshake terminated by server: %s", closeMsg(reply)))); break; case defs.ConnectionTune: var fields = reply.fields; tunedOptions.frameMax = negotiate(fields.frameMax, allFields.frameMax); tunedOptions.channelMax = negotiate(fields.channelMax, allFields.channelMax); tunedOptions.heartbeat = negotiate(fields.heartbeat, allFields.heartbeat); send(defs.ConnectionTuneOk); send(defs.ConnectionOpen); expect(defs.ConnectionOpenOk, onOpenOk); break; default: bail(new Error( fmt("Expected connection.secure, connection.close, " + "or connection.tune during handshake; got %s", inspect(reply, false)))); break; } } function onOpenOk(openOk) { // Impose the maximum of the encoded value, if the negotiated // value is zero, meaning "no, no limits" self.channelMax = tunedOptions.channelMax || 0xffff; self.frameMax = tunedOptions.frameMax || 0xffffffff; // 0 means "no heartbeat", rather than "maximum period of // heartbeating" self.heartbeat = tunedOptions.heartbeat; self.heartbeater = self.startHeartbeater(); self.accept = mainAccept; succeed(openOk); } // If the server closes the connection, it's probably because of // something we did function endWhileOpening(err) { bail(err || new Error('Socket closed abruptly ' + 'during opening handshake')); } this.stream.on('end', endWhileOpening); this.stream.on('error', endWhileOpening); function succeed(ok) { self.stream.removeListener('end', endWhileOpening); self.stream.removeListener('error', endWhileOpening); self.stream.on('error', self.onSocketError.bind(self)); self.stream.on('end', self.onSocketError.bind( self, new Error('Unexpected close'))); self.on('frameError', self.onSocketError.bind(self)); self.acceptLoop(); openCallback(null, ok); } // Now kick off the handshake by prompting the server this.sendProtocolHeader(); expect(defs.ConnectionStart, onStart); }; // Closing things: AMQP has a closing handshake that applies to // closing both connects and channels. As the initiating party, I send // Close, then ignore all frames until I see either CloseOK -- // which signifies that the other party has seen the Close and shut // the connection or channel down, so it's fine to free resources; or // Close, which means the other party also wanted to close the // whatever, and I should send CloseOk so it can free resources, // then go back to waiting for the CloseOk. If I receive a Close // out of the blue, I should throw away any unsent frames (they will // be ignored anyway) and send CloseOk, then clean up resources. In // general, Close out of the blue signals an error (or a forced // closure, which may as well be an error). // // RUNNING [1] --- send Close ---> Closing [2] ---> recv Close --+ // | | [3] // | +------ send CloseOk ------+ // recv Close recv CloseOk // | | // V V // Ended [4] ---- send CloseOk ---> Closed [5] // // [1] All frames accepted; getting a Close frame from the server // moves to Ended; client may initiate a close by sending Close // itself. // [2] Client has initiated a close; only CloseOk or (simulataneously // sent) Close is accepted. // [3] Simultaneous close // [4] Server won't send any more frames; accept no more frames, send // CloseOk. // [5] Fully closed, client will send no more, server will send no // more. Signal 'close' or 'error'. // // There are two signalling mechanisms used in the API. The first is // that calling `close` will return a promise, that will either // resolve once the connection or channel is cleanly shut down, or // will reject if the shutdown times out. // // The second is the 'close' and 'error' events. These are // emitted as above. The events will fire *before* promises are // resolved. // Close the connection without even giving a reason. Typical. C.close = function(closeCallback) { var k = closeCallback && function() { closeCallback(null); }; this.closeBecause("Cheers, thanks", constants.REPLY_SUCCESS, k); }; // Close with a reason and a 'code'. I'm pretty sure RabbitMQ totally // ignores these; maybe it logs them. The continuation will be invoked // when the CloseOk has been received, and before the 'close' event. C.closeBecause = function(reason, code, k) { this.sendMethod(0, defs.ConnectionClose, { replyText: reason, replyCode: code, methodId: 0, classId: 0 }); var s = stackCapture('closeBecause called: ' + reason); this.toClosing(s, k); }; C.closeWithError = function(reason, code, error) { this.emit('error', error); this.closeBecause(reason, code); }; C.onSocketError = function(err) { if (!this.expectSocketClose) { // forestall any more calls to onSocketError, since we're signed // up for `'error'` *and* `'end'` this.expectSocketClose = true; this.emit('error', err); var s = stackCapture('Socket error'); this.toClosed(s, err); } }; function invalidOp(msg, stack) { return function() { throw new IllegalOperationError(msg, stack); }; } function invalidateSend(conn, msg, stack) { conn.sendMethod = conn.sendContent = conn.sendMessage = invalidOp(msg, stack); } // A close has been initiated. Repeat: a close has been initiated. // This means we should not send more frames, anyway they will be // ignored. We also have to shut down all the channels. C.toClosing = function(capturedStack, k) { var send = this.sendMethod.bind(this); this.accept = function(f) { if (f.id === defs.ConnectionCloseOk) { if (k) k(); var s = stackCapture('ConnectionCloseOk received'); this.toClosed(s, undefined); } else if (f.id === defs.ConnectionClose) { send(0, defs.ConnectionCloseOk, {}); } // else ignore frame }; invalidateSend(this, 'Connection closing', capturedStack); }; C._closeChannels = function(capturedStack) { for (var i = 1; i < this.channels.length; i++) { var ch = this.channels[i]; if (ch !== null) { ch.channel.toClosed(capturedStack); // %%% or with an error? not clear } } }; // A close has been confirmed. Cease all communication. C.toClosed = function(capturedStack, maybeErr) { this._closeChannels(capturedStack); var info = fmt('Connection closed (%s)', (maybeErr) ? maybeErr.toString() : 'by client'); // Tidy up, invalidate enverything, dynamite the bridges. invalidateSend(this, info, capturedStack); this.accept = invalidOp(info, capturedStack); this.close = function(cb) { cb && cb(new IllegalOperationError(info, capturedStack)); }; if (this.heartbeater) this.heartbeater.clear(); // This is certainly true now, if it wasn't before this.expectSocketClose = true; this.stream.end(); this.emit('close', maybeErr); }; // === C.startHeartbeater = function() { if (this.heartbeat === 0) return null; else { var self = this; var hb = new Heart(this.heartbeat, this.checkSend.bind(this), this.checkRecv.bind(this)); hb.on('timeout', function() { var hberr = new Error("Heartbeat timeout"); self.emit('error', hberr); var s = stackCapture('Heartbeat timeout'); self.toClosed(s, hberr); }); hb.on('beat', function() { self.sendHeartbeat(); }); return hb; } }; // I use an array to keep track of the channels, rather than an // object. The channel identifiers are numbers, and allocated by the // connection. If I try to allocate low numbers when they are // available (which I do, by looking from the start of the bitset), // this ought to keep the array small, and out of 'sparse array // storage'. I also set entries to null, rather than deleting them, in // the expectation that the next channel allocation will fill the slot // again rather than growing the array. See // http://www.html5rocks.com/en/tutorials/speed/v8/ C.freshChannel = function(channel, options) { var next = this.freeChannels.nextClearBit(1); if (next < 0 || next > this.channelMax) throw new Error("No channels left to allocate"); this.freeChannels.set(next); var hwm = (options && options.highWaterMark) || DEFAULT_WRITE_HWM; var writeBuffer = new PassThrough({ objectMode: true, highWaterMark: hwm }); this.channels[next] = {channel: channel, buffer: writeBuffer}; writeBuffer.on('drain', function() { channel.onBufferDrain(); }); this.muxer.pipeFrom(writeBuffer); return next; }; C.releaseChannel = function(channel) { this.freeChannels.clear(channel); var buffer = this.channels[channel].buffer; this.muxer.unpipeFrom(buffer); this.channels[channel] = null; }; C.acceptLoop = function() { var self = this; function go() { try { var f; while (f = self.recvFrame()) self.accept(f); } catch (e) { self.emit('frameError', e); } } self.stream.on('readable', go); go(); }; C.step = function(cb) { var self = this; function recv() { var f; try { f = self.recvFrame(); } catch (e) { cb(e, null); return; } if (f) cb(null, f); else self.stream.once('readable', recv); } recv(); }; C.checkSend = function() { var check = this.sentSinceLastCheck; this.sentSinceLastCheck = false; return check; } C.checkRecv = function() { var check = this.recvSinceLastCheck; this.recvSinceLastCheck = false; return check; } C.sendBytes = function(bytes) { this.sentSinceLastCheck = true; this.stream.write(bytes); }; C.sendHeartbeat = function() { return this.sendBytes(frame.HEARTBEAT_BUF); }; var encodeMethod = defs.encodeMethod; var encodeProperties = defs.encodeProperties; C.sendMethod = function(channel, Method, fields) { var frame = encodeMethod(Method, channel, fields); this.sentSinceLastCheck = true; var buffer = this.channels[channel].buffer; return buffer.write(frame); }; C.sendMessage = function(channel, Method, fields, Properties, props, content) { if (!Buffer.isBuffer(content)) throw new TypeError('content is not a buffer'); var mframe = encodeMethod(Method, channel, fields); var pframe = encodeProperties(Properties, channel, content.length, props); var buffer = this.channels[channel].buffer; this.sentSinceLastCheck = true; var methodHeaderLen = mframe.length + pframe.length; var bodyLen = (content.length > 0) ? content.length + FRAME_OVERHEAD : 0; var allLen = methodHeaderLen + bodyLen; if (allLen < SINGLE_CHUNK_THRESHOLD) { var all = new Buffer(allLen); var offset = mframe.copy(all, 0); offset += pframe.copy(all, offset); if (bodyLen > 0) makeBodyFrame(channel, content).copy(all, offset); return buffer.write(all); } else { if (methodHeaderLen < SINGLE_CHUNK_THRESHOLD) { var both = new Buffer(methodHeaderLen); var offset = mframe.copy(both, 0); pframe.copy(both, offset); buffer.write(both); } else { buffer.write(mframe); buffer.write(pframe); } return this.sendContent(channel, content); } }; var FRAME_OVERHEAD = defs.FRAME_OVERHEAD; var makeBodyFrame = frame.makeBodyFrame; C.sendContent = function(channel, body) { if (!Buffer.isBuffer(body)) { throw new TypeError(fmt("Expected buffer; got %s", body)); } var writeResult = true; var buffer = this.channels[channel].buffer; var maxBody = this.frameMax - FRAME_OVERHEAD; for (var offset = 0; offset < body.length; offset += maxBody) { var end = offset + maxBody; var slice = (end > body.length) ? body.slice(offset) : body.slice(offset, end); var bodyFrame = makeBodyFrame(channel, slice); writeResult = buffer.write(bodyFrame); } this.sentSinceLastCheck = true; return writeResult; }; var parseFrame = frame.parseFrame; var decodeFrame = frame.decodeFrame; C.recvFrame = function() { // %%% identifying invariants might help here? var frame = parseFrame(this.rest, this.frameMax); if (!frame) { var incoming = this.stream.read(); if (incoming === null) { return false; } else { this.recvSinceLastCheck = true; this.rest = Buffer.concat([this.rest, incoming]); return this.recvFrame(); } } else { this.rest = frame.rest; return decodeFrame(frame); } }; function wrapStream(s) { if (s instanceof Duplex) return s; else { var ws = new Duplex(); ws.wrap(s); //wraps the readable side of things ws._write = function(chunk, encoding, callback) { return s.write(chunk, encoding, callback); }; return ws; } } function isFatalError(error) { switch (error && error.code) { case defs.constants.CONNECTION_FORCED: case defs.constants.REPLY_SUCCESS: return false; default: return true; } } module.exports.Connection = Connection; module.exports.isFatalError = isFatalError;