api_args.js
8.62 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
//
//
//
'use strict';
/*
The channel (promise) and callback APIs have similar signatures, and
in particular, both need AMQP fields prepared from the same arguments
and options. The arguments marshalling is done here. Each of the
procedures below takes arguments and options (the latter in an object)
particular to the operation it represents, and returns an object with
fields for handing to the encoder.
*/
// A number of AMQP methods have a table-typed field called
// `arguments`, that is intended to carry extension-specific
// values. RabbitMQ uses this in a number of places; e.g., to specify
// an 'alternate exchange'.
//
// Many of the methods in this API have an `options` argument, from
// which I take both values that have a default in AMQP (e.g.,
// autoDelete in QueueDeclare) *and* values that are specific to
// RabbitMQ (e.g., 'alternate-exchange'), which would normally be
// supplied in `arguments`. So that extensions I don't support yet can
// be used, I include `arguments` itself among the options.
//
// The upshot of this is that I often need to prepare an `arguments`
// value that has any values passed in `options.arguments` as well as
// any I've promoted to being options themselves. Since I don't want
// to mutate anything passed in, the general pattern is to create a
// fresh object with the `arguments` value given as its prototype; all
// fields in the supplied value will be serialised, as well as any I
// set on the fresh object. What I don't want to do, however, is set a
// field to undefined by copying possibly missing field values,
// because that will mask a value in the prototype.
//
// NB the `arguments` field already has a default value of `{}`, so
// there's no need to explicitly default it unless I'm setting values.
function setIfDefined(obj, prop, value) {
if (value != undefined) obj[prop] = value;
}
var EMPTY_OPTIONS = Object.freeze({});
var Args = {};
Args.assertQueue = function(queue, options) {
queue = queue || '';
options = options || EMPTY_OPTIONS;
var argt = Object.create(options.arguments || null);
setIfDefined(argt, 'x-expires', options.expires);
setIfDefined(argt, 'x-message-ttl', options.messageTtl);
setIfDefined(argt, 'x-dead-letter-exchange',
options.deadLetterExchange);
setIfDefined(argt, 'x-dead-letter-routing-key',
options.deadLetterRoutingKey);
setIfDefined(argt, 'x-max-length', options.maxLength);
setIfDefined(argt, 'x-max-priority', options.maxPriority);
return {
queue: queue,
exclusive: !!options.exclusive,
durable: (options.durable === undefined) ? true : options.durable,
autoDelete: !!options.autoDelete,
arguments: argt,
passive: false,
// deprecated but we have to include it
ticket: 0,
nowait: false
};
};
Args.checkQueue = function(queue) {
return {
queue: queue,
passive: true, // switch to "completely different" mode
nowait: false,
durable: true, autoDelete: false, exclusive: false, // ignored
ticket: 0,
};
};
Args.deleteQueue = function(queue, options) {
options = options || EMPTY_OPTIONS;
return {
queue: queue,
ifUnused: !!options.ifUnused,
ifEmpty: !!options.ifEmpty,
ticket: 0, nowait: false
};
};
Args.purgeQueue = function(queue) {
return {
queue: queue,
ticket: 0, nowait: false
};
};
Args.bindQueue = function(queue, source, pattern, argt) {
return {
queue: queue,
exchange: source,
routingKey: pattern,
arguments: argt,
ticket: 0, nowait: false
};
};
Args.unbindQueue = function(queue, source, pattern, argt) {
return {
queue: queue,
exchange: source,
routingKey: pattern,
arguments: argt,
ticket: 0, nowait: false
};
};
Args.assertExchange = function(exchange, type, options) {
options = options || EMPTY_OPTIONS;
var argt = Object.create(options.arguments || null);
setIfDefined(argt, 'alternate-exchange', options.alternateExchange);
return {
exchange: exchange,
ticket: 0,
type: type,
passive: false,
durable: (options.durable === undefined) ? true : options.durable,
autoDelete: !!options.autoDelete,
internal: !!options.internal,
nowait: false,
arguments: argt
};
};
Args.checkExchange = function(exchange) {
return {
exchange: exchange,
passive: true, // switch to 'may as well be another method' mode
nowait: false,
// ff are ignored
durable: true, internal: false, type: '', autoDelete: false,
ticket: 0
};
};
Args.deleteExchange = function(exchange, options) {
options = options || EMPTY_OPTIONS;
return {
exchange: exchange,
ifUnused: !!options.ifUnused,
ticket: 0, nowait: false
};
};
Args.bindExchange = function(dest, source, pattern, argt) {
return {
source: source,
destination: dest,
routingKey: pattern,
arguments: argt,
ticket: 0, nowait: false
};
};
Args.unbindExchange = function(dest, source, pattern, argt) {
return {
source: source,
destination: dest,
routingKey: pattern,
arguments: argt,
ticket: 0, nowait: false
};
};
// It's convenient to construct the properties and the method fields
// at the same time, since in the APIs, values for both can appear in
// `options`. Since the property or mthod field names don't overlap, I
// just return one big object that can be used for both purposes, and
// the encoder will pick out what it wants.
Args.publish = function(exchange, routingKey, options) {
options = options || EMPTY_OPTIONS;
// The CC and BCC fields expect an array of "longstr", which would
// normally be buffer values in JavaScript; however, since a field
// array (or table) cannot have shortstr values, the codec will
// encode all strings as longstrs anyway.
function convertCC(cc) {
if (cc === undefined) {
return undefined;
}
else if (Array.isArray(cc)) {
return cc.map(String);
}
else return [String(cc)];
}
var headers = Object.create(options.headers || null);
setIfDefined(headers, 'CC', convertCC(options.CC));
setIfDefined(headers, 'BCC', convertCC(options.BCC));
var deliveryMode; // undefined will default to 1 (non-persistent)
// Previously I overloaded deliveryMode be a boolean meaning
// 'persistent or not'; better is to name this option for what it
// is, but I need to have backwards compatibility for applications
// that either supply a numeric or boolean value.
if (options.persistent !== undefined)
deliveryMode = (options.persistent) ? 2 : 1;
else if (typeof options.deliveryMode === 'number')
deliveryMode = options.deliveryMode;
else if (options.deliveryMode) // is supplied and truthy
deliveryMode = 2;
var expiration = options.expiration;
if (expiration !== undefined) expiration = expiration.toString();
return {
// method fields
exchange: exchange,
routingKey: routingKey,
mandatory: !!options.mandatory,
immediate: false, // RabbitMQ doesn't implement this any more
ticket: undefined,
// properties
contentType: options.contentType,
contentEncoding: options.contentEncoding,
headers: headers,
deliveryMode: deliveryMode,
priority: options.priority,
correlationId: options.correlationId,
replyTo: options.replyTo,
expiration: expiration,
messageId: options.messageId,
timestamp: options.timestamp,
type: options.type,
userId: options.userId,
appId: options.appId,
clusterId: undefined
};
};
Args.consume = function(queue, options) {
options = options || EMPTY_OPTIONS;
var argt = Object.create(options.arguments || null);
setIfDefined(argt, 'x-priority', options.priority);
return {
ticket: 0,
queue: queue,
consumerTag: options.consumerTag || '',
noLocal: !!options.noLocal,
noAck: !!options.noAck,
exclusive: !!options.exclusive,
nowait: false,
arguments: argt
};
};
Args.cancel = function(consumerTag) {
return {
consumerTag: consumerTag,
nowait: false
};
};
Args.get = function(queue, options) {
options = options || EMPTY_OPTIONS;
return {
ticket: 0,
queue: queue,
noAck: !!options.noAck
};
};
Args.ack = function(tag, allUpTo) {
return {
deliveryTag: tag,
multiple: !!allUpTo
};
};
Args.nack = function(tag, allUpTo, requeue) {
return {
deliveryTag: tag,
multiple: !!allUpTo,
requeue: (requeue === undefined) ? true : requeue
};
};
Args.reject = function(tag, requeue) {
return {
deliveryTag: tag,
requeue: (requeue === undefined) ? true : requeue
};
};
Args.prefetch = function(count, global) {
return {
prefetchCount: count || 0,
prefetchSize: 0,
global: !!global
};
};
Args.recover = function() {
return {requeue: true};
};
module.exports = Object.freeze(Args);