From ae78c208f99a84c73aee72bdc4d129a63c11df81 Mon Sep 17 00:00:00 2001 From: BasBastian <2613768+BasBastian@users.noreply.github.com> Date: Tue, 7 Nov 2017 10:27:16 +0100 Subject: [PATCH] Modular structure of celery client --- celery.js | 227 ++------------------------------------ lib/Result.js | 61 ++++++++++ lib/Task.js | 56 ++++++++++ lib/redis/RedisBackend.js | 60 ++++++++++ lib/redis/RedisBroker.js | 55 +++++++++ 5 files changed, 241 insertions(+), 218 deletions(-) create mode 100644 lib/Result.js create mode 100644 lib/Task.js create mode 100644 lib/redis/RedisBackend.js create mode 100644 lib/redis/RedisBroker.js diff --git a/celery.js b/celery.js index 95fd635..c4e66a7 100644 --- a/celery.js +++ b/celery.js @@ -1,9 +1,12 @@ -var url = require('url'), - util = require('util'), - amqp = require('amqp'), - redis = require('redis'), - events = require('events'), - uuid = require('uuid'); +var url = require('url'), + util = require('util'), + amqp = require('amqp'), + events = require('events'); + +var Result = require('./lib/Result'); +var Task = require('./lib/Task'); +var RedisBroker = require('./lib/redis/RedisBroker'); +var RedisBackend = require('./lib/redis/RedisBackend'); var createMessage = require('./protocol').createMessage; @@ -66,112 +69,6 @@ function Configuration(options) { self.ROUTES = self.ROUTES || {}; } -function RedisBroker(conf) { - var self = this; - self.redis = redis.createClient(conf.BROKER_OPTIONS); - - self.end = function() { - self.redis.end(true); - }; - - self.disconnect = function() { - self.redis.quit(); - }; - - self.redis.on('connect', function() { - self.emit('ready'); - }); - - self.redis.on('error', function(err) { - self.emit('error', err); - }); - - self.redis.on('end', function() { - self.emit('end'); - }); - - self.publish = function(queue, message, options, callback, id) { - var payload = { - body: new Buffer(message).toString('base64'), - headers: {}, - 'content-type': options.contentType, - 'content-encoding': options.contentEncoding, - properties: { - body_encoding: 'base64', - correlation_id: id, - delivery_info: { - exchange: queue, - priority: 0, - routing_key: queue - }, - delivery_mode: 2, // No idea what this means - delivery_tag: uuid.v4(), - reply_to: uuid.v4() - } - }; - self.redis.lpush(queue, JSON.stringify(payload)); - }; - - return self; -} -util.inherits(RedisBroker, events.EventEmitter); - -function RedisBackend(conf) { - var self = this; - self.redis = redis.createClient(conf.RESULT_BACKEND_OPTIONS); - - var backend_ex = self.redis.duplicate(); - - self.redis.on('error', function(err) { - self.emit('error', err); - }); - - self.redis.on('end', function() { - self.emit('end'); - }); - - self.disconnect = function() { - backend_ex.quit(); - self.redis.quit(); - }; - - // store results to emit event when ready - self.results = {}; - - // results prefix - var key_prefix = 'celery-task-meta-'; - - self.redis.on('connect', function() { - debug('Backend connected...'); - // on redis result.. - self.redis.on('pmessage', function(pattern, channel, data) { - backend_ex.expire(channel, conf.TASK_RESULT_EXPIRES / 1000); - var message = JSON.parse(data); - var taskid = channel.slice(key_prefix.length); - if (self.results.hasOwnProperty(taskid)) { - var res = self.results[taskid]; - res.result = message; - res.emit('ready', res.result); - delete self.results[taskid]; - } else { - // in case of incoming messages where we don't have the result object - self.emit('message', message); - } - }); - // subscribe to redis results - self.redis.psubscribe(key_prefix + '*', () => { - self.emit('ready'); - }); - }); - - self.get = function(taskid, cb) { - backend_ex.get(key_prefix + taskid, cb); - } - - return self; -} -util.inherits(RedisBackend, events.EventEmitter); - function Client(conf) { var self = this; self.ready = false; @@ -260,112 +157,6 @@ Client.prototype.call = function(name /*[args], [kwargs], [options], [callback]* return result; }; -function Task(client, name, options, exchange) { - var self = this; - - self.client = client; - self.name = name; - self.options = options || {}; - - var route = self.client.conf.ROUTES[name], - queue = route && route.queue; - - self.publish = function (args, kwargs, options, callback) { - var id = options.id || uuid.v4(); - - var result = new Result(id, self.client); - - if (client.conf.backend_type === 'redis') { - client.backend.results[result.taskid] = result; - } - - queue = options.queue || self.options.queue || queue || self.client.conf.DEFAULT_QUEUE; - var msg = createMessage(self.name, args, kwargs, options, id); - var pubOptions = { - 'contentType': 'application/json', - 'contentEncoding': 'utf-8' - }; - - if (exchange) { - exchange.publish(queue, msg, pubOptions, callback); - } else { - self.client.broker.publish(queue, msg, pubOptions, callback); - } - - return result; - }; -} - -Task.prototype.call = function(args, kwargs, options, callback) { - var self = this; - - args = args || []; - kwargs = kwargs || {}; - options = options || self.options || {}; - - if (!self.client.ready) { - self.client.emit('error', 'Client is not ready'); - } - else { - return self.publish(args, kwargs, options, callback); - } -}; - -function Result(taskid, client) { - var self = this; - - events.EventEmitter.call(self); - self.taskid = taskid; - self.client = client; - self.result = null; - - if (self.client.conf.backend_type === 'amqp' && !self.client.conf.IGNORE_RESULT) { - debug('Subscribing to result queue...'); - self.client.backend.queue( - self.taskid.replace(/-/g, ''), { - "arguments": { - 'x-expires': self.client.conf.TASK_RESULT_EXPIRES - }, - 'durable': self.client.conf.TASK_RESULT_DURABLE, - 'closeChannelOnUnsubscribe': true - }, - - function (q) { - q.bind(self.client.conf.RESULT_EXCHANGE, '#'); - var ctag; - q.subscribe(function (message) { - if (message.contentType === 'application/x-python-serialize') { - console.error('Celery should be configured with json serializer'); - process.exit(1); - } - self.result = message; - q.unsubscribe(ctag); - debug('Emiting ready event...'); - self.emit('ready', message); - debug('Emiting task status event...'); - self.emit(message.status.toLowerCase(), message); - }).addCallback(function(ok) { ctag = ok.consumerTag; }); - }); - } -} - -util.inherits(Result, events.EventEmitter); - -Result.prototype.get = function(callback) { - var self = this; - if (callback && self.result === null) { - self.client.backend.get(self.taskid, function(err, reply) { - self.result = JSON.parse(reply); - callback(self.result); - }); - } else { - if (callback) { - callback(self.result); - } - return self.result; - } -}; - exports.createClient = function(config, callback) { return new Client(config, callback); }; diff --git a/lib/Result.js b/lib/Result.js new file mode 100644 index 0000000..dd523d2 --- /dev/null +++ b/lib/Result.js @@ -0,0 +1,61 @@ +var events = require('events'); +var util = require('util'); +var debug = process.env.NODE_CELERY_DEBUG === '1' ? console.info : function() {}; + + +function Result(taskid, client) { + var self = this; + + events.EventEmitter.call(self); + self.taskid = taskid; + self.client = client; + self.result = null; + + if (self.client.conf.backend_type === 'amqp' && !self.client.conf.IGNORE_RESULT) { + debug('Subscribing to result queue...'); + self.client.backend.queue( + self.taskid.replace(/-/g, ''), { + "arguments": { + 'x-expires': self.client.conf.TASK_RESULT_EXPIRES + }, + 'durable': self.client.conf.TASK_RESULT_DURABLE, + 'closeChannelOnUnsubscribe': true + }, + + function (q) { + q.bind(self.client.conf.RESULT_EXCHANGE, '#'); + var ctag; + q.subscribe(function (message) { + if (message.contentType === 'application/x-python-serialize') { + console.error('Celery should be configured with json serializer'); + process.exit(1); + } + self.result = message; + q.unsubscribe(ctag); + debug('Emiting ready event...'); + self.emit('ready', message); + debug('Emiting task status event...'); + self.emit(message.status.toLowerCase(), message); + }).addCallback(function(ok) { ctag = ok.consumerTag; }); + }); + } +} + +util.inherits(Result, events.EventEmitter); + +Result.prototype.get = function(callback) { + var self = this; + if (callback && self.result === null) { + self.client.backend.get(self.taskid, function(err, reply) { + self.result = JSON.parse(reply); + callback(self.result); + }); + } else { + if (callback) { + callback(self.result); + } + return self.result; + } +}; + +module.exports = Result; diff --git a/lib/Task.js b/lib/Task.js new file mode 100644 index 0000000..633b37f --- /dev/null +++ b/lib/Task.js @@ -0,0 +1,56 @@ +var Result = require('./Result'); +var uuid = require('uuid'); +var createMessage = require('../protocol').createMessage; + +function Task(client, name, options, exchange) { + var self = this; + + self.client = client; + self.name = name; + self.options = options || {}; + + var route = self.client.conf.ROUTES[name], + queue = route && route.queue; + + self.publish = function (args, kwargs, options, callback) { + var id = options.id || uuid.v4(); + + var result = new Result(id, self.client); + + if (client.conf.backend_type === 'redis') { + client.backend.results[result.taskid] = result; + } + + queue = options.queue || self.options.queue || queue || self.client.conf.DEFAULT_QUEUE; + var msg = createMessage(self.name, args, kwargs, options, id); + var pubOptions = { + 'contentType': 'application/json', + 'contentEncoding': 'utf-8' + }; + + if (exchange) { + exchange.publish(queue, msg, pubOptions, callback); + } else { + self.client.broker.publish(queue, msg, pubOptions, callback); + } + + return result; + }; +} + +Task.prototype.call = function(args, kwargs, options, callback) { + var self = this; + + args = args || []; + kwargs = kwargs || {}; + options = options || self.options || {}; + + if (!self.client.ready) { + self.client.emit('error', 'Client is not ready'); + } + else { + return self.publish(args, kwargs, options, callback); + } +}; + +module.exports = Task; diff --git a/lib/redis/RedisBackend.js b/lib/redis/RedisBackend.js new file mode 100644 index 0000000..d597b72 --- /dev/null +++ b/lib/redis/RedisBackend.js @@ -0,0 +1,60 @@ +var events = require('events'); +var redis = require('redis'); +var util = require('util'); +var debug = process.env.NODE_CELERY_DEBUG === '1' ? console.info : function() {}; + +function RedisBackend(conf) { + var self = this; + self.redis = redis.createClient(conf.RESULT_BACKEND_OPTIONS); + + var backend_ex = self.redis.duplicate(); + + self.redis.on('error', function(err) { + self.emit('error', err); + }); + + self.redis.on('end', function() { + self.emit('end'); + }); + + self.disconnect = function() { + backend_ex.quit(); + self.redis.quit(); + }; + + // store results to emit event when ready + self.results = {}; + + // results prefix + var key_prefix = 'celery-task-meta-'; + + self.redis.on('connect', function() { + debug('Backend connected...'); + // on redis result.. + self.redis.on('pmessage', function(pattern, channel, data) { + backend_ex.expire(channel, conf.TASK_RESULT_EXPIRES / 1000); + var message = JSON.parse(data); + var taskid = channel.slice(key_prefix.length); + if (self.results.hasOwnProperty(taskid)) { + var res = self.results[taskid]; + res.result = message; + res.emit('ready', res.result); + delete self.results[taskid]; + } else { + // in case of incoming messages where we don't have the result object + self.emit('message', message); + } + }); + // subscribe to redis results + self.redis.psubscribe(key_prefix + '*', () => { + self.emit('ready'); + }); + }); + + self.get = function(taskid, cb) { + backend_ex.get(key_prefix + taskid, cb); + } + + return self; +} +util.inherits(RedisBackend, events.EventEmitter); diff --git a/lib/redis/RedisBroker.js b/lib/redis/RedisBroker.js new file mode 100644 index 0000000..cd93857 --- /dev/null +++ b/lib/redis/RedisBroker.js @@ -0,0 +1,55 @@ +var events = require('events'); +var redis = require('redis'); +var uuid = require('uuid'); +var util = require('util'); +var debug = process.env.NODE_CELERY_DEBUG === '1' ? console.info : function() {}; + +function RedisBroker(conf) { + var self = this; + self.redis = redis.createClient(conf.BROKER_OPTIONS); + + self.end = function() { + self.redis.end(true); + }; + + self.disconnect = function() { + self.redis.quit(); + }; + + self.redis.on('connect', function() { + self.emit('ready'); + }); + + self.redis.on('error', function(err) { + self.emit('error', err); + }); + + self.redis.on('end', function() { + self.emit('end'); + }); + + self.publish = function(queue, message, options, callback, id) { + var payload = { + body: new Buffer(message).toString('base64'), + headers: {}, + 'content-type': options.contentType, + 'content-encoding': options.contentEncoding, + properties: { + body_encoding: 'base64', + correlation_id: id, + delivery_info: { + exchange: queue, + priority: 0, + routing_key: queue + }, + delivery_mode: 2, // No idea what this means + delivery_tag: uuid.v4(), + reply_to: uuid.v4() + } + }; + self.redis.lpush(queue, JSON.stringify(payload)); + }; + + return self; +} +util.inherits(RedisBroker, events.EventEmitter);