// // // 'use strict'; var defs = require('./defs'); var when = require('when'), defer = when.defer; var inherits = require('util').inherits; var EventEmitter = require('events').EventEmitter; var BaseChannel = require('./channel').BaseChannel; var acceptMessage = require('./channel').acceptMessage; var Args = require('./api_args'); function CallbackModel(connection) { if (!(this instanceof CallbackModel)) return new CallbackModel(connection); EventEmitter.call( this ); this.connection = connection; var self = this; ['error', 'close', 'blocked', 'unblocked'].forEach(function(ev) { connection.on(ev, self.emit.bind(self, ev)); }); } inherits(CallbackModel, EventEmitter); module.exports.CallbackModel = CallbackModel; CallbackModel.prototype.close = function(cb) { this.connection.close(cb); }; function Channel(connection) { BaseChannel.call(this, connection); this.on('delivery', this.handleDelivery.bind(this)); this.on('cancel', this.handleCancel.bind(this)); } inherits(Channel, BaseChannel); module.exports.Channel = Channel; CallbackModel.prototype.createChannel = function(cb) { var ch = new Channel(this.connection); ch.open(function(err, ok) { if (err === null) cb && cb(null, ch); else cb && cb(err); }); return ch; }; // Wrap an RPC callback to make sure the callback is invoked with // either `(null, value)` or `(error)`, i.e., never two non-null // values. Also substitutes a stub if the callback is `undefined` or // otherwise falsey, for convenience in methods for which the callback // is optional (that is, most of them). function callbackWrapper(ch, cb) { return (cb) ? function(err, ok) { if (err === null) { cb(null, ok); } else cb(err); } : function() {}; } // This encodes straight-forward RPC: no side-effects and return the // fields from the server response. It wraps the callback given it, so // the calling method argument can be passed as-is. For anything that // needs to have side-effects, or needs to change the server response, // use `#_rpc(...)` and remember to dereference `.fields` of the // server response. Channel.prototype.rpc = function(method, fields, expect, cb0) { var cb = callbackWrapper(this, cb0); this._rpc(method, fields, expect, function(err, ok) { cb(err, ok && ok.fields); // in case of an error, ok will be // undefined }); return this; }; // === Public API === Channel.prototype.open = function(cb) { try { this.allocate(); } catch (e) { return cb(e); } return this.rpc(defs.ChannelOpen, {outOfBand: ""}, defs.ChannelOpenOk, cb); }; Channel.prototype.close = function(cb) { return this.closeBecause("Goodbye", defs.constants.REPLY_SUCCESS, function() { cb && cb(null); }); }; Channel.prototype.assertQueue = function(queue, options, cb) { return this.rpc(defs.QueueDeclare, Args.assertQueue(queue, options), defs.QueueDeclareOk, cb); }; Channel.prototype.checkQueue = function(queue, cb) { return this.rpc(defs.QueueDeclare, Args.checkQueue(queue), defs.QueueDeclareOk, cb); }; Channel.prototype.deleteQueue = function(queue, options, cb) { return this.rpc(defs.QueueDelete, Args.deleteQueue(queue, options), defs.QueueDeleteOk, cb); }; Channel.prototype.purgeQueue = function(queue, cb) { return this.rpc(defs.QueuePurge, Args.purgeQueue(queue), defs.QueuePurgeOk, cb); }; Channel.prototype.bindQueue = function(queue, source, pattern, argt, cb) { return this.rpc(defs.QueueBind, Args.bindQueue(queue, source, pattern, argt), defs.QueueBindOk, cb); }; Channel.prototype.unbindQueue = function(queue, source, pattern, argt, cb) { return this.rpc(defs.QueueUnbind, Args.unbindQueue(queue, source, pattern, argt), defs.QueueUnbindOk, cb); }; Channel.prototype.assertExchange = function(ex, type, options, cb0) { var cb = callbackWrapper(this, cb0); this._rpc(defs.ExchangeDeclare, Args.assertExchange(ex, type, options), defs.ExchangeDeclareOk, function(e, _) { cb(e, {exchange: ex}); }); return this; }; Channel.prototype.checkExchange = function(exchange, cb) { return this.rpc(defs.ExchangeDeclare, Args.checkExchange(exchange), defs.ExchangeDeclareOk, cb); }; Channel.prototype.deleteExchange = function(exchange, options, cb) { return this.rpc(defs.ExchangeDelete, Args.deleteExchange(exchange, options), defs.ExchangeDeleteOk, cb); }; Channel.prototype.bindExchange = function(dest, source, pattern, argt, cb) { return this.rpc(defs.ExchangeBind, Args.bindExchange(dest, source, pattern, argt), defs.ExchangeBindOk, cb); }; Channel.prototype.unbindExchange = function(dest, source, pattern, argt, cb) { return this.rpc(defs.ExchangeUnbind, Args.unbindExchange(dest, source, pattern, argt), defs.ExchangeUnbindOk, cb); }; Channel.prototype.publish = function(exchange, routingKey, content, options) { var fieldsAndProps = Args.publish(exchange, routingKey, options); return this.sendMessage(fieldsAndProps, fieldsAndProps, content); }; Channel.prototype.sendToQueue = function(queue, content, options) { return this.publish('', queue, content, options); }; Channel.prototype.consume = function(queue, callback, options, cb0) { var cb = callbackWrapper(this, cb0); var fields = Args.consume(queue, options); var self = this; this._rpc( defs.BasicConsume, fields, defs.BasicConsumeOk, function(err, ok) { if (err === null) { self.registerConsumer(ok.fields.consumerTag, callback); cb(null, ok.fields); } else cb(err); }); return this; }; Channel.prototype.cancel = function(consumerTag, cb0) { var cb = callbackWrapper(this, cb0); var self = this; this._rpc( defs.BasicCancel, Args.cancel(consumerTag), defs.BasicCancelOk, function(err, ok) { if (err === null) { self.unregisterConsumer(consumerTag); cb(null, ok.fields); } else cb(err); }); return this; }; Channel.prototype.get = function(queue, options, cb0) { var self = this; var fields = Args.get(queue, options); var cb = callbackWrapper(this, cb0); this.sendOrEnqueue(defs.BasicGet, fields, function(err, f) { if (err === null) { if (f.id === defs.BasicGetEmpty) { cb(null, false); } else if (f.id = defs.BasicGetOk) { self.handleMessage = acceptMessage(function(m) { m.fields = f.fields; cb(null, m); }); } else { cb(new Error("Unexpected response to BasicGet: " + inspect(f))); } } }); return this; }; Channel.prototype.ack = function(message, allUpTo) { this.sendImmediately( defs.BasicAck, Args.ack(message.fields.deliveryTag, allUpTo)); return this; }; Channel.prototype.ackAll = function() { this.sendImmediately(defs.BasicAck, Args.ack(0, true)); return this; }; Channel.prototype.nack = function(message, allUpTo, requeue) { this.sendImmediately( defs.BasicNack, Args.nack(message.fields.deliveryTag, allUpTo, requeue)); return this; }; Channel.prototype.nackAll = function(requeue) { this.sendImmediately( defs.BasicNack, Args.nack(0, true, requeue)) return this; }; Channel.prototype.reject = function(message, requeue) { this.sendImmediately( defs.BasicReject, Args.reject(message.fields.deliveryTag, requeue)); return this; }; Channel.prototype.prefetch = function(count, global, cb) { return this.rpc(defs.BasicQos, Args.prefetch(count, global), defs.BasicQosOk, cb); }; Channel.prototype.recover = function(cb) { return this.rpc(defs.BasicRecover, Args.recover(), defs.BasicRecoverOk, cb); }; function ConfirmChannel(connection) { Channel.call(this, connection); } inherits(ConfirmChannel, Channel); module.exports.ConfirmChannel = ConfirmChannel; CallbackModel.prototype.createConfirmChannel = function(cb) { var ch = new ConfirmChannel(this.connection); ch.open(function(err) { if (err !== null) return cb && cb(err); else { ch.rpc(defs.ConfirmSelect, {nowait: false}, defs.ConfirmSelectOk, function(err, _ok) { if (err !== null) return cb && cb(err); else cb && cb(null, ch); }); } }); return ch; }; ConfirmChannel.prototype.publish = function(exchange, routingKey, content, options, cb) { this.pushConfirmCallback(cb); return Channel.prototype.publish.call( this, exchange, routingKey, content, options); }; ConfirmChannel.prototype.sendToQueue = function(queue, content, options, cb) { return this.publish('', queue, content, options, cb); }; ConfirmChannel.prototype.waitForConfirms = function(k) { var awaiting = []; var unconfirmed = this.unconfirmed; unconfirmed.forEach(function(val, index) { if (val === null); // already confirmed else { var confirmed = defer(); unconfirmed[index] = function(err) { if (val) val(err); if (err === null) confirmed.resolve(); else confirmed.reject(err); }; awaiting.push(confirmed.promise); } }); return when.all(awaiting).then(function() { k(); }, function(err) { k(err); }); };