From 17f69ff39fb22b08d997684fcb7b17037c36f3a9 Mon Sep 17 00:00:00 2001 From: kozehgar Date: Wed, 11 Jan 2017 09:45:34 +0330 Subject: [PATCH 1/2] Redis broker and backend connection authentication bug fixed Redis backend result expire time bug fixed --- celery.js | 28 +++++++++++++++++++++------- 1 file changed, 21 insertions(+), 7 deletions(-) diff --git a/celery.js b/celery.js index fe215c9..8105601 100644 --- a/celery.js +++ b/celery.js @@ -67,13 +67,15 @@ function RedisBroker(broker_url) { database = purl.pathname.slice(1); } - self.redis = redis.createClient(purl.port || 6379, - purl.hostname || 'localhost'); - + debug('Connecting to broker...'); if (purl.auth) { + self.redis = redis.createClient(purl.port || 6379, purl.hostname || 'localhost', {'auth_pass': purl.auth.split(':')[1]}); debug('Authenticating broker...'); self.redis.auth(purl.auth.split(':')[1]); debug('Broker authenticated...'); + } else { + self.redis = redis.createClient(purl.port || 6379, + purl.hostname || 'localhost'); } if (database) { @@ -89,6 +91,7 @@ function RedisBroker(broker_url) { }; self.redis.on('connect', function() { + debug('Broker connected...'); self.emit('ready'); }); @@ -132,15 +135,19 @@ function RedisBackend(conf) { var database; if (purl.pathname) { - database = purl.pathname.slice(1); + database = purl.pathname.slice(1); } debug('Connecting to backend...'); if (purl.auth) { - self.redis = redis.createClient(purl.port, purl.hostname, {'auth_pass': purl.auth.split(':')[1]}); + self.redis = redis.createClient(purl.port || 6379, purl.hostname || 'localhost', {'auth_pass': purl.auth.split(':')[1]}); + debug('Authenticating broker...'); + self.redis.auth(purl.auth.split(':')[1]); + debug('Broker authenticated...'); } else { - self.redis = redis.createClient(purl.port, purl.hostname); + self.redis = redis.createClient(purl.port || 6379, purl.hostname || 'localhost'); } + // needed because we'll use `psubscribe` var backend_ex = self.redis.duplicate(); @@ -171,12 +178,19 @@ function RedisBackend(conf) { debug('Backend connected...'); // on redis result.. self.redis.on('pmessage', function(pattern, channel, data) { - backend_ex.expire(channel, conf.TASK_RESULT_EXPIRES / 1000); + + if(conf.CELERY_TASK_RESULT_DURABLE){ + backend_ex.expire(channel, conf.TASK_RESULT_EXPIRES / 1000); + } else { + backend_ex.expire(channel, 0); + } + var message = JSON.parse(data); var taskid = channel.slice(key_prefix.length); if (self.results.hasOwnProperty(taskid)) { var res = self.results[taskid]; res.result = message; + res.emit('ready', res.result); delete self.results[taskid]; } else { From 580c6b56213b69b6b00fb088588a773d0f186634 Mon Sep 17 00:00:00 2001 From: kozehgar Date: Tue, 14 Feb 2017 16:52:26 +0330 Subject: [PATCH 2/2] Conflict with php-celery resolved --- celery.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/celery.js b/celery.js index 8105601..96d669b 100644 --- a/celery.js +++ b/celery.js @@ -199,7 +199,7 @@ function RedisBackend(conf) { } }); // subscribe to redis results - self.redis.psubscribe(key_prefix + '*', () => { + self.redis.psubscribe(key_prefix + '[^php_]*', () => { self.emit('ready'); }); });