channel_model.js 9.46 KB
//
//
//

'use strict';

var defs = require('./defs');
var when = require('when'), defer = when.defer;
var inherits = require('util').inherits;
var EventEmitter = require('events').EventEmitter;
var BaseChannel = require('./channel').BaseChannel;
var acceptMessage = require('./channel').acceptMessage;
var Args = require('./api_args');

function ChannelModel(connection) {
  if (!(this instanceof ChannelModel))
    return new ChannelModel(connection);
  EventEmitter.call( this );
  this.connection = connection;
  var self = this;
  ['error', 'close', 'blocked', 'unblocked'].forEach(function(ev) {
    connection.on(ev, self.emit.bind(self, ev));
  });
}
inherits(ChannelModel, EventEmitter);

module.exports.ChannelModel = ChannelModel;

var CM = ChannelModel.prototype;

CM.close = function() {
  var closed = defer();
  this.connection.close(function (err) {
    if (err === null) closed.resolve();
    else closed.reject(err);
  });
  return closed.promise;
};

// Channels

function Channel(connection) {
  BaseChannel.call(this, connection);
  this.on('delivery', this.handleDelivery.bind(this));
  this.on('cancel', this.handleCancel.bind(this));
}
inherits(Channel, BaseChannel);

module.exports.Channel = Channel;

CM.createChannel = function() {
  var c = new Channel(this.connection);
  return c.open().then(function(openOk) { return c; });
};

var C = Channel.prototype;

// An RPC that returns a 'proper' promise, which resolves to just the
// response's fields; this is intended to be suitable for implementing
// API procedures.
C.rpc = function(method, fields, expect) {
  var reply = defer();
  this._rpc(method, fields, expect, function(err, f) {
    if (err !== null) reply.reject(err);
    else reply.resolve(f.fields);
  });
  return reply.promise;
};

// Do the remarkably simple channel open handshake
C.open = function() {
  return when.try(this.allocate.bind(this)).then(
    function(ch) {
      return ch.rpc(defs.ChannelOpen, {outOfBand: ""},
                    defs.ChannelOpenOk);
    });
};

C.close = function() {
  var closed = defer();
  this.closeBecause("Goodbye", defs.constants.REPLY_SUCCESS,
                    closed.resolve)
  return closed.promise;
};

// === Public API, declaring queues and stuff ===

C.assertQueue = function(queue, options) {
  return this.rpc(defs.QueueDeclare,
                  Args.assertQueue(queue, options),
                  defs.QueueDeclareOk);
};

C.checkQueue = function(queue) {
  return this.rpc(defs.QueueDeclare,
                  Args.checkQueue(queue),
                  defs.QueueDeclareOk);
};

C.deleteQueue = function(queue, options) {
  return this.rpc(defs.QueueDelete,
                  Args.deleteQueue(queue, options),
                  defs.QueueDeleteOk);
};

C.purgeQueue = function(queue) {
  return this.rpc(defs.QueuePurge,
                  Args.purgeQueue(queue),
                  defs.QueuePurgeOk);
};

C.bindQueue = function(queue, source, pattern, argt) {
  return this.rpc(defs.QueueBind,
                  Args.bindQueue(queue, source, pattern, argt),
                  defs.QueueBindOk);
};

C.unbindQueue = function(queue, source, pattern, argt) {
  return this.rpc(defs.QueueUnbind,
                  Args.unbindQueue(queue, source, pattern, argt),
                  defs.QueueUnbindOk);
};

C.assertExchange = function(exchange, type, options) {
  // The server reply is an empty set of fields, but it's convenient
  // to have the exchange name handed to the continuation.
  return this.rpc(defs.ExchangeDeclare,
                  Args.assertExchange(exchange, type, options),
                  defs.ExchangeDeclareOk)
    .then(function(_ok) { return { exchange: exchange }; });
};

C.checkExchange = function(exchange) {
  return this.rpc(defs.ExchangeDeclare,
                  Args.checkExchange(exchange),
                  defs.ExchangeDeclareOk);
};

C.deleteExchange = function(name, options) {
  return this.rpc(defs.ExchangeDelete,
                  Args.deleteExchange(name, options),
                  defs.ExchangeDeleteOk);
};

C.bindExchange = function(dest, source, pattern, argt) {
  return this.rpc(defs.ExchangeBind,
                  Args.bindExchange(dest, source, pattern, argt),
                  defs.ExchangeBindOk);
};

C.unbindExchange = function(dest, source, pattern, argt) {
  return this.rpc(defs.ExchangeUnbind,
                  Args.unbindExchange(dest, source, pattern, argt),
                  defs.ExchangeUnbindOk);  
};

// Working with messages

C.publish = function(exchange, routingKey, content, options) {
  var fieldsAndProps = Args.publish(exchange, routingKey, options);
  return this.sendMessage(fieldsAndProps, fieldsAndProps, content);
};

C.sendToQueue = function(queue, content, options) {
  return this.publish('', queue, content, options);
};

C.consume = function(queue, callback, options) {
  var self = this;
  // NB we want the callback to be run synchronously, so that we've
  // registered the consumerTag before any messages can arrive.
  var fields = Args.consume(queue, options);
  var reply = defer();
  this._rpc(defs.BasicConsume, fields, defs.BasicConsumeOk,
            function(err, ok) {
              if (err === null) {
                self.registerConsumer(ok.fields.consumerTag,
                                      callback);
                reply.resolve(ok.fields);
              }
              else reply.reject(err);
            });
  return reply.promise;
};

C.cancel = function(consumerTag) {
  var self = this;
  var reply = defer();
  this._rpc(defs.BasicCancel, Args.cancel(consumerTag),
            defs.BasicCancelOk,
            function(err, ok) {
              if (err === null) {
                self.unregisterConsumer(consumerTag);
                reply.resolve(ok.fields);
              }
              else reply.reject(err);
            });
  return reply.promise;
};

C.get = function(queue, options) {
  var reply = defer();
  var self = this;
  var fields = Args.get(queue, options);
  this.sendOrEnqueue(defs.BasicGet, fields, function(err, f) {
    if (err === null) {
      if (f.id === defs.BasicGetEmpty) {
        reply.resolve(false);
      }
      else if (f.id === defs.BasicGetOk) {
        var fields = f.fields;
        self.handleMessage = acceptMessage(function(m) {
          m.fields = fields;
          reply.resolve(m);
        });
      }
      else {
        reply.reject(new Error("Unexpected response to BasicGet: " +
                               inspect(f)));
      }
    }
    else reply.reject(err);
  });
  return reply.promise;
};

C.ack = function(message, allUpTo) {
  this.sendImmediately(
    defs.BasicAck,
    Args.ack(message.fields.deliveryTag, allUpTo));
};

C.ackAll = function() {
  this.sendImmediately(defs.BasicAck, Args.ack(0, true));
};

C.nack = function(message, allUpTo, requeue) {
  this.sendImmediately(
    defs.BasicNack,
    Args.nack(message.fields.deliveryTag, allUpTo, requeue));
};

C.nackAll = function(requeue) {
  this.sendImmediately(defs.BasicNack,
                       Args.nack(0, true, requeue));
};

// `Basic.Nack` is not available in older RabbitMQ versions (or in the
// AMQP specification), so you have to use the one-at-a-time
// `Basic.Reject`. This is otherwise synonymous with
// `#nack(message, false, requeue)`.
C.reject = function(message, requeue) {
  this.sendImmediately(
    defs.BasicReject,
    Args.reject(message.fields.deliveryTag, requeue));
};

// There are more options in AMQP than exposed here; RabbitMQ only
// implements prefetch based on message count, and only for individual
// channels or consumers. RabbitMQ v3.3.0 and after treat prefetch
// (without `global` set) as per-consumer (for consumers following),
// and prefetch with `global` set as per-channel.
C.prefetch = C.qos = function(count, global) {
  return this.rpc(defs.BasicQos,
                  Args.prefetch(count, global),
                  defs.BasicQosOk);
};

C.recover = function() {
  return this.rpc(defs.BasicRecover,
                  Args.recover(),
                  defs.BasicRecoverOk);
};

// Confirm channel. This is a channel with confirms 'switched on',
// meaning sent messages will provoke a responding 'ack' or 'nack'
// from the server. The upshot of this is that `publish` and
// `sendToQueue` both take a callback, which will be called either
// with `null` as its argument to signify 'ack', or an exception as
// its argument to signify 'nack'.

function ConfirmChannel(connection) {
  Channel.call(this, connection);
}
inherits(ConfirmChannel, Channel);

module.exports.ConfirmChannel = ConfirmChannel;

CM.createConfirmChannel = function() {
  var c = new ConfirmChannel(this.connection);
  return c.open()
    .then(function(openOk) {
      return c.rpc(defs.ConfirmSelect, {nowait: false},
                   defs.ConfirmSelectOk)
    })
    .then(function() { return c; });
};

var CC = ConfirmChannel.prototype;

CC.publish = function(exchange, routingKey, content, options, cb) {
  this.pushConfirmCallback(cb);
  return C.publish.call(this, exchange, routingKey, content, options);
};

CC.sendToQueue = function(queue, content, options, cb) {
  return this.publish('', queue, content, options, cb);
};

CC.waitForConfirms = function() {
  var awaiting = [];
  var unconfirmed = this.unconfirmed;
  unconfirmed.forEach(function(val, index) {
    if (val === null); // already confirmed
    else {
      var confirmed = defer();
      unconfirmed[index] = function(err) {
        if (val) val(err);
        if (err === null) confirmed.resolve();
        else confirmed.reject(err);
      };
      awaiting.push(confirmed.promise);
    }
  });
  return when.all(awaiting);
};