diff --git a/celery.js b/celery.js index fe215c9..96d669b 100644 --- a/celery.js +++ b/celery.js @@ -67,13 +67,15 @@ function RedisBroker(broker_url) { database = purl.pathname.slice(1); } - self.redis = redis.createClient(purl.port || 6379, - purl.hostname || 'localhost'); - + debug('Connecting to broker...'); if (purl.auth) { + self.redis = redis.createClient(purl.port || 6379, purl.hostname || 'localhost', {'auth_pass': purl.auth.split(':')[1]}); debug('Authenticating broker...'); self.redis.auth(purl.auth.split(':')[1]); debug('Broker authenticated...'); + } else { + self.redis = redis.createClient(purl.port || 6379, + purl.hostname || 'localhost'); } if (database) { @@ -89,6 +91,7 @@ function RedisBroker(broker_url) { }; self.redis.on('connect', function() { + debug('Broker connected...'); self.emit('ready'); }); @@ -132,15 +135,19 @@ function RedisBackend(conf) { var database; if (purl.pathname) { - database = purl.pathname.slice(1); + database = purl.pathname.slice(1); } debug('Connecting to backend...'); if (purl.auth) { - self.redis = redis.createClient(purl.port, purl.hostname, {'auth_pass': purl.auth.split(':')[1]}); + self.redis = redis.createClient(purl.port || 6379, purl.hostname || 'localhost', {'auth_pass': purl.auth.split(':')[1]}); + debug('Authenticating broker...'); + self.redis.auth(purl.auth.split(':')[1]); + debug('Broker authenticated...'); } else { - self.redis = redis.createClient(purl.port, purl.hostname); + self.redis = redis.createClient(purl.port || 6379, purl.hostname || 'localhost'); } + // needed because we'll use `psubscribe` var backend_ex = self.redis.duplicate(); @@ -171,12 +178,19 @@ function RedisBackend(conf) { debug('Backend connected...'); // on redis result.. self.redis.on('pmessage', function(pattern, channel, data) { - backend_ex.expire(channel, conf.TASK_RESULT_EXPIRES / 1000); + + if(conf.CELERY_TASK_RESULT_DURABLE){ + backend_ex.expire(channel, conf.TASK_RESULT_EXPIRES / 1000); + } else { + backend_ex.expire(channel, 0); + } + var message = JSON.parse(data); var taskid = channel.slice(key_prefix.length); if (self.results.hasOwnProperty(taskid)) { var res = self.results[taskid]; res.result = message; + res.emit('ready', res.result); delete self.results[taskid]; } else { @@ -185,7 +199,7 @@ function RedisBackend(conf) { } }); // subscribe to redis results - self.redis.psubscribe(key_prefix + '*', () => { + self.redis.psubscribe(key_prefix + '[^php_]*', () => { self.emit('ready'); }); });