Skip to content

Commit

Permalink
Applied reverse proxy detection fix as per PR indutny#45
Browse files Browse the repository at this point in the history
  • Loading branch information
giesir committed Jun 18, 2016
1 parent e0de9b7 commit fd15ef5
Show file tree
Hide file tree
Showing 2 changed files with 150 additions and 95 deletions.
80 changes: 43 additions & 37 deletions lib/sticky/api.js
Original file line number Diff line number Diff line change
@@ -1,44 +1,50 @@
'use strict';

var cluster = require('cluster');
var os = require('os');
var debug = require('debug')('sticky:worker');
var cluster = require('cluster'),
os = require('os'),
debug = require('debug')('sticky:worker'),
sticky = require('../sticky-session'),
Master = sticky.Master;

var sticky = require('../sticky-session');
var Master = sticky.Master;
exports.listen = listen;

function listen(server, port, options) {
if (!options)
options = {};

if (cluster.isMaster) {
var workerCount = options.workers || os.cpus().length;

var master = new Master(workerCount, options.env);
master.listen(port);
master.once('listening', function() {
server.emit('listening');
var master, closeCallback;

options = options || {};

if (cluster.isMaster) {
master = new Master(options.workers || os.cpus().length, options.env);
master.listen(port);
master.once('listening', function () {
server.emit('listening');
});
return false;
}

// Override close callback to gracefully close server
closeCallback = server.close;
server.close = function close() {
debug('closing gracefully');
process.send({ type: 'close' });
return closeCallback.apply(this, arguments);
};

process.on('message', function (message, socket) {
if (!message.length || message[0] !== 'sticky:balance' || !socket) {
return;
}

debug('incoming socket');

if (message[1]) {
socket.unshift(new Buffer(message[1], 'base64'));
}

server._connections++;
socket.server = server;
server.emit('connection', socket);
});
return false;
}

// Override close callback to gracefully close server
var oldClose = server.close;
server.close = function close() {
debug('graceful close');
process.send({ type: 'close' });
return oldClose.apply(this, arguments);
};

process.on('message', function(msg, socket) {
if (msg !== 'sticky:balance' || !socket)
return;

debug('incoming socket');
server._connections++;
socket.server = server;
server.emit('connection', socket);
});
return true;

return true;
}
exports.listen = listen;
165 changes: 107 additions & 58 deletions lib/sticky/master.js
Original file line number Diff line number Diff line change
@@ -1,85 +1,134 @@
'use strict';

var cluster = require('cluster');
var util = require('util');
var net = require('net');
var ip = require('ip');
var cluster = require('cluster'),
util = require('util'),
net = require('net'),
ip = require('ip'),
common = require('_http_common'),
parsers = common.parsers,
HTTPParser = process.binding('http_parser').HTTPParser,
debug = require('debug')('sticky:master');

var debug = require('debug')('sticky:master');
module.exports = Master;

function Master(options) {
var self = this;

if (!(self instanceof Master)) {
return new Master(options);
}

function Master(workerCount, env) {
net.Server.call(this, {
pauseOnConnect: true
}, this.balance);
debug('master options = %j', options);

this.env = env || {};
self.options = options || {};
self.seed = (Math.random() * 0xffffffff) | 0;
self.workers = [];

this.seed = (Math.random() * 0xffffffff) | 0;
this.workers = [];
debug('master seed=%d', self.seed);

debug('master seed=%d', this.seed);
net.Server.call(self, {
pauseOnConnect: true
}, options.proxyHeader ? self.balanceProxyAddress : self.balanceRemoteAddress);

this.once('listening', function() {
debug('master listening on %j', this.address());
self.once('listening', function () {
debug('master listening on %j', self.address());

for (var i = 0; i < workerCount; i++)
this.spawnWorker();
});
for (var i = 0; i < options.workerCount; i++) {
self.spawnWorker();
}
});
}

util.inherits(Master, net.Server);
module.exports = Master;

Master.prototype.hash = function hash(ip) {
var hash = this.seed;
for (var i = 0; i < ip.length; i++) {
var num = ip[i];
var self = this,
hash = self.seed;

hash += num;
for (var i = 0; i < ip.length; i++) {
var num = ip[i];

hash += num;
hash %= 2147483648;
hash += (hash << 10);
hash %= 2147483648;
hash ^= hash >> 6;
}

hash += hash << 3;
hash %= 2147483648;
hash += (hash << 10);
hash ^= hash >> 11;
hash += hash << 15;
hash %= 2147483648;
hash ^= hash >> 6;
}

hash += hash << 3;
hash %= 2147483648;
hash ^= hash >> 11;
hash += hash << 15;
hash %= 2147483648;

return hash >>> 0;
return hash >>> 0;
};

Master.prototype.spawnWorker = function spawnWorker() {
var worker = cluster.fork(this.env);

var self = this;
worker.on('exit', function(code) {
debug('worker=%d died with code=%d', worker.process.pid, code);
self.respawn(worker);
});

worker.on('message', function(msg) {
// Graceful exit
if (msg.type === 'close')
self.respawn(worker);
});

debug('worker=%d spawn', worker.process.pid);
this.workers.push(worker);
var self = this,
worker = cluster.fork(self.options.env || {});

worker.on('exit', function (code) {
debug('worker=%d died with code=%d', worker.process.pid, code);
self.respawn(worker);
});

worker.on('message', function (message) {
// Graceful exit
if (message.type === 'close') {
self.respawn(worker);
}
});

debug('worker=%d spawn', worker.process.pid);
self.workers.push(worker);
};

Master.prototype.respawn = function respawn(worker) {
var index = this.workers.indexOf(worker);
if (index !== -1)
this.workers.splice(index, 1);
this.spawnWorker();
var self = this;

if (self.workers.indexOf(worker) !== -1) {
self.workers.splice(index, 1);
}

self.spawnWorker();
};

Master.prototype.balance = function balance(socket) {
var addr = ip.toBuffer(socket.remoteAddress || '127.0.0.1');
var hash = this.hash(addr);
Master.prototype.balanceRemoteAddress = function balanceRemoteAddress(socket) {
var self = this,
addr = ip.toBuffer(socket.remoteAddress || '127.0.0.1'),
hash = self.hash(addr);

debug('balancing connection %s', addr.toString('utf8'));

self.workers[hash % self.workers.length].send(['sticky:balance'], socket);
};

Master.prototype.balanceProxyAddress = function balanceProxyAddress(socket) {
var self = this;

debug('incoming proxy');

socket.resume();
socket.once('data', function (buffer) {
var parser = parsers.alloc();

parser.reinitialize(HTTPParser.REQUEST);
parser.onIncoming = function (req) {
var addr = socket.remoteAddress || '127.0.0.1',
hash;

if (self.options.proxyHeader && req.headers[self.options.proxyHeader]) {
addr = req.headers[self.options.proxyHeader];
}

debug('balancing connection %s', addr.toString('utf8'));

debug('balacing connection %j', addr);
this.workers[hash % this.workers.length].send('sticky:balance', socket);
hash = self.hash(ip.toBuffer(addr));
self.workers[hash % self.workers.length]
.send(['sticky:balance', buffer.toString('base64')], socket);
};
parser.execute(buffer, 0, buffer.length);
parser.finish();
});
};

0 comments on commit fd15ef5

Please sign in to comment.