diff --git a/lib/sticky/api.js b/lib/sticky/api.js index b167e19..72a05f1 100644 --- a/lib/sticky/api.js +++ b/lib/sticky/api.js @@ -1,44 +1,50 @@ 'use strict'; -var cluster = require('cluster'); -var os = require('os'); -var debug = require('debug')('sticky:worker'); +var cluster = require('cluster'), + os = require('os'), + debug = require('debug')('sticky:worker'), + sticky = require('../sticky-session'), + Master = sticky.Master; -var sticky = require('../sticky-session'); -var Master = sticky.Master; +exports.listen = listen; function listen(server, port, options) { - if (!options) - options = {}; - - if (cluster.isMaster) { - var workerCount = options.workers || os.cpus().length; - - var master = new Master(workerCount, options.env); - master.listen(port); - master.once('listening', function() { - server.emit('listening'); + var master, closeCallback; + + options = options || {}; + + if (cluster.isMaster) { + master = new Master(options.workers || os.cpus().length, options.env); + master.listen(port); + master.once('listening', function () { + server.emit('listening'); + }); + return false; + } + + // Override close callback to gracefully close server + closeCallback = server.close; + server.close = function close() { + debug('closing gracefully'); + process.send({ type: 'close' }); + return closeCallback.apply(this, arguments); + }; + + process.on('message', function (message, socket) { + if (!message.length || message[0] !== 'sticky:balance' || !socket) { + return; + } + + debug('incoming socket'); + + if (message[1]) { + socket.unshift(new Buffer(message[1], 'base64')); + } + + server._connections++; + socket.server = server; + server.emit('connection', socket); }); - return false; - } - - // Override close callback to gracefully close server - var oldClose = server.close; - server.close = function close() { - debug('graceful close'); - process.send({ type: 'close' }); - return oldClose.apply(this, arguments); - }; - - process.on('message', function(msg, socket) { - if (msg !== 'sticky:balance' || !socket) - return; - - debug('incoming socket'); - server._connections++; - socket.server = server; - server.emit('connection', socket); - }); - return true; + + return true; } -exports.listen = listen; diff --git a/lib/sticky/master.js b/lib/sticky/master.js index d2424e3..bd847e0 100644 --- a/lib/sticky/master.js +++ b/lib/sticky/master.js @@ -1,85 +1,134 @@ 'use strict'; -var cluster = require('cluster'); -var util = require('util'); -var net = require('net'); -var ip = require('ip'); +var cluster = require('cluster'), + util = require('util'), + net = require('net'), + ip = require('ip'), + common = require('_http_common'), + parsers = common.parsers, + HTTPParser = process.binding('http_parser').HTTPParser, + debug = require('debug')('sticky:master'); -var debug = require('debug')('sticky:master'); +module.exports = Master; + +function Master(options) { + var self = this; + + if (!(self instanceof Master)) { + return new Master(options); + } -function Master(workerCount, env) { - net.Server.call(this, { - pauseOnConnect: true - }, this.balance); + debug('master options = %j', options); - this.env = env || {}; + self.options = options || {}; + self.seed = (Math.random() * 0xffffffff) | 0; + self.workers = []; - this.seed = (Math.random() * 0xffffffff) | 0; - this.workers = []; + debug('master seed=%d', self.seed); - debug('master seed=%d', this.seed); + net.Server.call(self, { + pauseOnConnect: true + }, options.proxyHeader ? self.balanceProxyAddress : self.balanceRemoteAddress); - this.once('listening', function() { - debug('master listening on %j', this.address()); + self.once('listening', function () { + debug('master listening on %j', self.address()); - for (var i = 0; i < workerCount; i++) - this.spawnWorker(); - }); + for (var i = 0; i < options.workerCount; i++) { + self.spawnWorker(); + } + }); } + util.inherits(Master, net.Server); -module.exports = Master; Master.prototype.hash = function hash(ip) { - var hash = this.seed; - for (var i = 0; i < ip.length; i++) { - var num = ip[i]; + var self = this, + hash = self.seed; - hash += num; + for (var i = 0; i < ip.length; i++) { + var num = ip[i]; + + hash += num; + hash %= 2147483648; + hash += (hash << 10); + hash %= 2147483648; + hash ^= hash >> 6; + } + + hash += hash << 3; hash %= 2147483648; - hash += (hash << 10); + hash ^= hash >> 11; + hash += hash << 15; hash %= 2147483648; - hash ^= hash >> 6; - } - - hash += hash << 3; - hash %= 2147483648; - hash ^= hash >> 11; - hash += hash << 15; - hash %= 2147483648; - return hash >>> 0; + return hash >>> 0; }; Master.prototype.spawnWorker = function spawnWorker() { - var worker = cluster.fork(this.env); - - var self = this; - worker.on('exit', function(code) { - debug('worker=%d died with code=%d', worker.process.pid, code); - self.respawn(worker); - }); - - worker.on('message', function(msg) { - // Graceful exit - if (msg.type === 'close') - self.respawn(worker); - }); - - debug('worker=%d spawn', worker.process.pid); - this.workers.push(worker); + var self = this, + worker = cluster.fork(self.options.env || {}); + + worker.on('exit', function (code) { + debug('worker=%d died with code=%d', worker.process.pid, code); + self.respawn(worker); + }); + + worker.on('message', function (message) { + // Graceful exit + if (message.type === 'close') { + self.respawn(worker); + } + }); + + debug('worker=%d spawn', worker.process.pid); + self.workers.push(worker); }; Master.prototype.respawn = function respawn(worker) { - var index = this.workers.indexOf(worker); - if (index !== -1) - this.workers.splice(index, 1); - this.spawnWorker(); + var self = this; + + if (self.workers.indexOf(worker) !== -1) { + self.workers.splice(index, 1); + } + + self.spawnWorker(); }; -Master.prototype.balance = function balance(socket) { - var addr = ip.toBuffer(socket.remoteAddress || '127.0.0.1'); - var hash = this.hash(addr); +Master.prototype.balanceRemoteAddress = function balanceRemoteAddress(socket) { + var self = this, + addr = ip.toBuffer(socket.remoteAddress || '127.0.0.1'), + hash = self.hash(addr); + + debug('balancing connection %s', addr.toString('utf8')); + + self.workers[hash % self.workers.length].send(['sticky:balance'], socket); +}; + +Master.prototype.balanceProxyAddress = function balanceProxyAddress(socket) { + var self = this; + + debug('incoming proxy'); + + socket.resume(); + socket.once('data', function (buffer) { + var parser = parsers.alloc(); + + parser.reinitialize(HTTPParser.REQUEST); + parser.onIncoming = function (req) { + var addr = socket.remoteAddress || '127.0.0.1', + hash; + + if (self.options.proxyHeader && req.headers[self.options.proxyHeader]) { + addr = req.headers[self.options.proxyHeader]; + } + + debug('balancing connection %s', addr.toString('utf8')); - debug('balacing connection %j', addr); - this.workers[hash % this.workers.length].send('sticky:balance', socket); + hash = self.hash(ip.toBuffer(addr)); + self.workers[hash % self.workers.length] + .send(['sticky:balance', buffer.toString('base64')], socket); + }; + parser.execute(buffer, 0, buffer.length); + parser.finish(); + }); };