callback_api.js
9.05 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
'use strict';
var assert = require('assert');
var crypto = require('crypto');
var api = require('../callback_api');
var util = require('./util');
var schedule = util.schedule;
var randomString = util.randomString;
var kCallback = util.kCallback;
var domain = require('domain');
var URL = process.env.URL || 'amqp://localhost';
function connect(cb) {
api.connect(URL, {}, cb);
}
// Construct a node-style callback from a `done` function
function doneCallback(done) {
return function(err, _) {
if (err == null) done();
else done(err);
};
}
function ignore() {}
function twice(done) {
var first = function(err) {
if (err == undefined) second = done;
else second = ignore, done(err);
};
var second = function(err) {
if (err == undefined) first = done;
else first = ignore, done(err);
};
return {first: function(err) { first(err); },
second: function(err) { second(err); }};
}
// Adapt 'done' to a callback that's expected to fail
function failCallback(done) {
return function(err, _) {
if (err == null) done(new Error('Expected failure, got ' + val));
else done();
};
}
function waitForMessages(ch, q, k) {
ch.checkQueue(q, function(e, ok) {
if (e != null) return k(e);
else if (ok.messageCount > 0) return k(null, ok);
else schedule(waitForMessages.bind(null, ch, q, k));
});
}
suite('connect', function() {
test('at all', function(done) {
connect(doneCallback(done));
});
});
function channel_test_fn(method) {
return function(name, chfun) {
test(name, function(done) {
connect(kCallback(function(c) {
c[method](kCallback(function(ch) {
chfun(ch, done);
}, done));
}, done));
});
};
}
var channel_test = channel_test_fn('createChannel');
var confirm_channel_test = channel_test_fn('createConfirmChannel');
suite('channel open', function() {
channel_test('at all', function(ch, done) {
done();
});
channel_test('open and close', function(ch, done) {
ch.close(doneCallback(done));
});
});
suite('assert, check, delete', function() {
channel_test('assert, check, delete queue', function(ch, done) {
ch.assertQueue('test.cb.queue', {}, kCallback(function(q) {
ch.checkQueue('test.cb.queue', kCallback(function(ok) {
ch.deleteQueue('test.cb.queue', {}, doneCallback(done));
}, done));
}, done));
});
channel_test('assert, check, delete exchange', function(ch, done) {
ch.assertExchange(
'test.cb.exchange', 'topic', {}, kCallback(function(ex) {
ch.checkExchange('test.cb.exchange', kCallback(function(ok) {
ch.deleteExchange('test.cb.exchange', {}, doneCallback(done));
}, done));
}, done));
});
channel_test('fail on check non-queue', function(ch, done) {
var both = twice(done);
ch.on('error', failCallback(both.first));
ch.checkQueue('test.cb.nothere', failCallback(both.second));
});
channel_test('fail on check non-exchange', function(ch, done) {
var both = twice(done);
ch.on('error', failCallback(both.first));
ch.checkExchange('test.cb.nothere', failCallback(both.second));
});
});
suite('bindings', function() {
channel_test('bind queue', function(ch, done) {
ch.assertQueue('test.cb.bindq', {}, kCallback(function(q) {
ch.assertExchange(
'test.cb.bindex', 'fanout', {}, kCallback(function(ex) {
ch.bindQueue(q.queue, ex.exchange, '', {},
doneCallback(done));
}, done));
}, done));
});
channel_test('bind exchange', function(ch, done) {
ch.assertExchange(
'test.cb.bindex1', 'fanout', {}, kCallback(function(ex1) {
ch.assertExchange(
'test.cb.bindex2', 'fanout', {}, kCallback(function(ex2) {
ch.bindExchange(ex1.exchange,
ex2.exchange, '', {},
doneCallback(done));
}, done));
}, done));
});
});
suite('sending messages', function() {
channel_test('send to queue and consume noAck', function(ch, done) {
var msg = randomString();
ch.assertQueue('', {exclusive: true}, function(e, q) {
if (e !== null) return done(e);
ch.consume(q.queue, function(m) {
if (m.content.toString() == msg) done();
else done(new Error("message content doesn't match:" +
msg + " =/= " + m.content.toString()));
}, {noAck: true, exclusive: true});
ch.sendToQueue(q.queue, new Buffer(msg));
});
});
channel_test('send to queue and consume ack', function(ch, done) {
var msg = randomString();
ch.assertQueue('', {exclusive: true}, function(e, q) {
if (e !== null) return done(e);
ch.consume(q.queue, function(m) {
if (m.content.toString() == msg) {
ch.ack(m);
done();
}
else done(new Error("message content doesn't match:" +
msg + " =/= " + m.content.toString()));
}, {noAck: false, exclusive: true});
ch.sendToQueue(q.queue, new Buffer(msg));
});
});
channel_test('send to and get from queue', function(ch, done) {
ch.assertQueue('', {exclusive: true}, function(e, q) {
if (e != null) return done(e);
var msg = randomString();
ch.sendToQueue(q.queue, new Buffer(msg));
waitForMessages(ch, q.queue, function(e, _) {
if (e != null) return done(e);
ch.get(q.queue, {noAck: true}, function(e, m) {
if (e != null)
return done(e);
else if (!m)
return done(new Error('Empty (false) not expected'));
else if (m.content.toString() == msg)
return done();
else
return done(
new Error('Messages do not match: ' +
msg + ' =/= ' + m.content.toString()));
});
});
});
});
});
suite('ConfirmChannel', function() {
confirm_channel_test('Receive confirmation', function(ch, done) {
// An unroutable message, on the basis that you're not allowed a
// queue with an empty name, and you can't make bindings to the
// default exchange. Tricky eh?
ch.publish('', '', new Buffer('foo'), {}, done);
});
confirm_channel_test('Wait for confirms', function(ch, done) {
for (var i=0; i < 1000; i++) {
ch.publish('', '', new Buffer('foo'), {});
}
ch.waitForConfirms(done);
});
});
suite("Error handling", function() {
/*
I don't like having to do this, but there appears to be something
broken about domains in Node.JS v0.8 and mocha. Apparently it has to
do with how mocha and domains hook into error propogation:
https://github.com/visionmedia/mocha/issues/513 (summary: domains in
Node.JS v0.8 don't prevent uncaughtException from firing, and that's
what mocha uses to detect .. an uncaught exception).
Using domains with amqplib *does* work in practice in Node.JS v0.8:
that is, it's possible to throw an exception in a callback and deal
with it in the active domain, and thereby avoid it crashing the
program.
*/
if (util.versionGreaterThan(process.versions.node, '0.8')) {
test('Throw error in connection open callback', function(done) {
var dom = domain.createDomain();
dom.on('error', failCallback(done));
connect(dom.bind(function(err, conn) {
throw new Error('Spurious connection open callback error');
}));
});
}
// TODO: refactor {error_test, channel_test}
function error_test(name, fun) {
test(name, function(done) {
var dom = domain.createDomain();
dom.run(function() {
connect(kCallback(function(c) {
// Seems like there were some unironed wrinkles in 0.8's
// implementation of domains; explicitly adding the connection
// to the domain makes sure any exception thrown in the course
// of processing frames is handled by the domain. For other
// versions of Node.JS, this ends up being belt-and-braces.
dom.add(c);
c.createChannel(kCallback(function(ch) {
fun(ch, done, dom);
}, done));
}, done));
});
});
}
error_test('Channel open callback throws an error', function(ch, done, dom) {
dom.on('error', failCallback(done));
throw new Error('Error in open callback');
});
error_test('RPC callback throws error', function(ch, done, dom) {
dom.on('error', failCallback(done));
ch.prefetch(0, false, function(err, ok) {
throw new Error('Spurious callback error');
});
});
error_test('Get callback throws error', function(ch, done, dom) {
dom.on('error', failCallback(done));
ch.assertQueue('test.cb.get-with-error', {}, function(err, ok) {
ch.get('test.cb.get-with-error', {noAck: true}, function() {
throw new Error('Spurious callback error');
});
});
});
error_test('Consume callback throws error', function(ch, done, dom) {
dom.on('error', failCallback(done));
ch.assertQueue('test.cb.consume-with-error', {}, function(err, ok) {
ch.consume('test.cb.consume-with-error', ignore, {noAck: true}, function() {
throw new Error('Spurious callback error');
});
});
});
error_test('Get from non-queue invokes error k', function(ch, done, dom) {
var both = twice(failCallback(done));
dom.on('error', both.first);
ch.get('', {}, both.second);
});
error_test('Consume from non-queue invokes error k', function(ch, done, dom) {
var both = twice(failCallback(done));
dom.on('error', both.first);
ch.consume('', both.second);
});
});