Skip to content
This repository has been archived by the owner on May 31, 2020. It is now read-only.

Commit

Permalink
Support additional options for broker and backend (#84)
Browse files Browse the repository at this point in the history
* Use same BROKER_OPTIONS for backend connection

* Remove block that should never happen

And wouldn’t work if it did

* Add error handling to backend connection

* Make broker and backend options more flexible
  • Loading branch information
holm authored and mher committed Mar 19, 2017
1 parent 3e6c60b commit 2445558
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 84 deletions.
111 changes: 39 additions & 72 deletions celery.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,23 @@ var createMessage = require('./protocol').createMessage;
var debug = process.env.NODE_CELERY_DEBUG === '1' ? console.info : function() {};

var supportedProtocols = ['amqp', 'amqps', 'redis'];
function checkProtocol(kind, protocol) {
function getProtocol(kind, options) {
const protocol = url.parse(options.url).protocol.slice(0, -1);
if (protocol === 'amqps') {
protocol = 'amqp';
}
if (supportedProtocols.indexOf(protocol) === -1) {
throw new Error(util.format('Unsupported %s type: %s', kind, protocol));
}
debug(kind + ' type: ' + protocol);

return protocol;
}

function addProtocolDefaults(protocol, options) {
if (protocol === 'amqp') {
options.heartbeat = options.heartbeat || 580;
}
}

function Configuration(options) {
Expand All @@ -29,8 +42,20 @@ function Configuration(options) {
self.TASK_RESULT_EXPIRES = self.TASK_RESULT_EXPIRES * 1000 || 86400000; // Default 1 day

// broker
self.BROKER_URL = self.BROKER_URL || 'amqp://';
self.BROKER_OPTIONS = self.BROKER_OPTIONS || { url: self.BROKER_URL, heartbeat: 580 };
self.BROKER_OPTIONS = self.BROKER_OPTIONS || {};
self.BROKER_OPTIONS.url = self.BROKER_URL || 'amqp://';
self.broker_type = getProtocol('broker', self.BROKER_OPTIONS);
addProtocolDefaults(self.broker_type, self.BROKER_OPTIONS);

// backend
self.RESULT_BACKEND_OPTIONS = self.RESULT_BACKEND_OPTIONS || {};
if (self.RESULT_BACKEND === self.broker_type) {
self.RESULT_BACKEND = self.BROKER_URL;
}
self.RESULT_BACKEND_OPTIONS.url = self.RESULT_BACKEND || self.BROKER_URL;
self.backend_type = getProtocol('backend', self.RESULT_BACKEND_OPTIONS);
addProtocolDefaults(self.backend_type, self.RESULT_BACKEND_OPTIONS);

self.DEFAULT_QUEUE = self.DEFAULT_QUEUE || 'celery';
self.DEFAULT_EXCHANGE = self.DEFAULT_EXCHANGE || '';
self.DEFAULT_EXCHANGE_TYPE = self.DEFAULT_EXCHANGE_TYPE || 'direct';
Expand All @@ -39,46 +64,11 @@ function Configuration(options) {
self.IGNORE_RESULT = self.IGNORE_RESULT || false;
self.TASK_RESULT_DURABLE = undefined !== self.TASK_RESULT_DURABLE ? self.TASK_RESULT_DURABLE : true; // Set Durable true by default (Celery 3.1.7)
self.ROUTES = self.ROUTES || {};

self.broker_type = url.parse(self.BROKER_URL).protocol.slice(0, -1);
if (self.broker_type === 'amqps')
self.broker_type = 'amqp';
debug('Broker type: ' + self.broker_type);
checkProtocol('broker', self.broker_type);

// backend
if (!self.RESULT_BACKEND || (self.RESULT_BACKEND === self.broker_type)) {
self.RESULT_BACKEND = self.BROKER_URL;
}

self.backend_type = url.parse(self.RESULT_BACKEND).protocol.slice(0, -1);
if (self.backend_type === 'amqps')
self.backend_type = 'amqp';
debug('Backend type: ' + self.backend_type);
checkProtocol('backend', self.backend_type);
}

function RedisBroker(broker_url) {
function RedisBroker(conf) {
var self = this;
var purl = url.parse(broker_url);
var database;

if (purl.pathname) {
database = purl.pathname.slice(1);
}

self.redis = redis.createClient(purl.port || 6379,
purl.hostname || 'localhost');

if (purl.auth) {
debug('Authenticating broker...');
self.redis.auth(purl.auth.split(':')[1]);
debug('Broker authenticated...');
}

if (database) {
self.redis.select(database);
}
self.redis = redis.createClient(conf.BROKER_OPTIONS);

self.end = function() {
self.redis.end(true);
Expand Down Expand Up @@ -128,26 +118,10 @@ util.inherits(RedisBroker, events.EventEmitter);

function RedisBackend(conf) {
var self = this;
var purl = url.parse(conf.RESULT_BACKEND);
var database;
self.redis = redis.createClient(conf.RESULT_BACKEND_OPTIONS);

if (purl.pathname) {
database = purl.pathname.slice(1);
}

debug('Connecting to backend...');
if (purl.auth) {
self.redis = redis.createClient(purl.port, purl.hostname, {'auth_pass': purl.auth.split(':')[1]});
} else {
self.redis = redis.createClient(purl.port, purl.hostname);
}
// needed because we'll use `psubscribe`
var backend_ex = self.redis.duplicate();

if (database) {
self.redis.select(database);
}

self.redis.on('error', function(err) {
self.emit('error', err);
});
Expand All @@ -156,7 +130,7 @@ function RedisBackend(conf) {
self.emit('end');
});

self.quit = function() {
self.disconnect = function() {
backend_ex.quit();
self.redis.quit();
};
Expand Down Expand Up @@ -211,24 +185,21 @@ function Client(conf) {
self.emit('message', msg);
});
} else if (self.conf.backend_type === 'amqp') {
self.backend = amqp.createConnection({
url: self.conf.BROKER_URL,
heartbeat: 580
}, {
self.backend = amqp.createConnection(self.conf.RESULT_BACKEND_OPTIONS, {
defaultExchangeName: self.conf.DEFAULT_EXCHANGE
});
} else if (self.conf.backend_type === self.conf.broker_type) {
if (self.conf.backend_type === 'amqp') {
self.backend = self.broker;
}
}

self.backend.on('error', function(err) {
self.emit('error', err);
});

// backend ready...
self.backend.on('ready', function() {
debug('Connecting to broker...');

if (self.conf.broker_type === 'redis') {
self.broker = new RedisBroker(self.conf.BROKER_URL);
self.broker = new RedisBroker(self.conf);
} else if (self.conf.broker_type === 'amqp') {
self.broker = amqp.createConnection(self.conf.BROKER_OPTIONS, {
defaultExchangeName: self.conf.DEFAULT_EXCHANGE
Expand Down Expand Up @@ -260,11 +231,7 @@ Client.prototype.createTask = function(name, options, exchange) {

Client.prototype.end = function() {
this.broker.disconnect();
if (this.conf.backend_type === 'redis') {
this.backend.quit();
} else if (this.conf.broker_type !== this.conf.backend_type) {
this.backend.quit();
}
this.backend.disconnect();
};

Client.prototype.call = function(name /*[args], [kwargs], [options], [callback]*/ ) {
Expand Down
53 changes: 41 additions & 12 deletions tests/test_celery.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,34 +20,63 @@ var conf_redis = {

describe('celery functional tests', function() {
describe('initialization', function() {
it('should create a client without error', function(done) {
var client1 = celery.createClient(conf_amqp),
client2 = celery.createClient(conf_invalid);
it('should create a valid amqp client without error', function(done) {
var client = celery.createClient(conf_amqp);

client1.on('connect', function() {
client1.end();
assert.equal(client.conf.BROKER_OPTIONS.url, 'amqp://');
assert.equal(client.conf.BROKER_OPTIONS.heartbeat, 580);
assert.equal(client.conf.broker_type, 'amqp');

assert.equal(client.conf.RESULT_BACKEND_OPTIONS.url, 'amqp://');
assert.equal(client.conf.RESULT_BACKEND_OPTIONS.heartbeat, 580);
assert.equal(client.conf.backend_type, 'amqp');

client.on('connect', function() {
client.end();
});

client1.on('error', function(exception) {
console.log(exception);
client.on('error', function(exception) {
assert.ok(false);
});

client1.once('end', function() {
client.once('end', function() {
done();
});
});

it('should create a valid redis client without error', function(done) {
var client = celery.createClient(conf_redis);

assert.equal(client.conf.BROKER_OPTIONS.url, 'redis://');
assert.equal(client.conf.broker_type, 'redis');

assert.equal(client.conf.RESULT_BACKEND_OPTIONS.url, 'redis://');
assert.equal(client.conf.backend_type, 'redis');

client.on('connect', function() {
client.end();
});

client2.on('ready', function() {
client.on('error', function(exception) {
assert.ok(false);
});

client2.on('error', function(exception) {
assert.ok(exception);
client.once('end', function() {
done();
});
});

client2.once('end', function() {
it('should throw error on invalid amqp client', function(done) {
var client = celery.createClient(conf_invalid);

client.on('ready', function() {
assert.ok(false);
});

client.on('error', function(exception) {
assert.ok(exception);
done();
});
});
});

Expand Down

0 comments on commit 2445558

Please sign in to comment.