Skip to content

Commit

Permalink
Allow workers to respond to IPC messages from master
Browse files Browse the repository at this point in the history
  • Loading branch information
Nick Kotenberg authored and jondubois committed Aug 5, 2017
1 parent f0b3d21 commit f47dc3d
Show file tree
Hide file tree
Showing 7 changed files with 819 additions and 9 deletions.
61 changes: 55 additions & 6 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ var path = require('path');
var crypto = require('crypto');
var EventEmitter = require('events').EventEmitter;
var domain = require('sc-domain');
var uuid = require('uuid');
var fork = require('child_process').fork;
var os = require('os');
var fs = require('fs-extra');
var uidNumber = require('uid-number');
var uuid = require('uuid');
var pkg = require('./package.json');
var argv = require('minimist')(process.argv.slice(2));
var cluster = require('cluster');
Expand All @@ -17,6 +17,7 @@ var InvalidActionError = scErrors.InvalidActionError;
var BrokerError = scErrors.BrokerError;
var ProcessExitError = scErrors.ProcessExitError;
var UnknownError = scErrors.UnknownError;
var TimeoutError = scErrors.TimeoutError;
var decycle = scErrors.decycle;

var socketClusterSingleton = null;
Expand All @@ -42,6 +43,8 @@ var SocketCluster = function (options) {
self.EVENT_WORKER_CLUSTER_READY = 'workerClusterReady';
self.EVENT_WORKER_CLUSTER_EXIT = 'workerClusterExit';

self._pendingResponseHandlers = {};

self._errorAnnotations = {
'EADDRINUSE': 'Failed to bind to a port because it was already used by another process.'
};
Expand Down Expand Up @@ -89,6 +92,7 @@ SocketCluster.prototype._init = function (options) {
logLevel: 2,
handshakeTimeout: 10000,
ackTimeout: 10000,
ipcAckTimeout: 10000,
pingInterval: 8000,
pingTimeout: 20000,
origins: '*:*',
Expand Down Expand Up @@ -147,6 +151,7 @@ SocketCluster.prototype._init = function (options) {
};

verifyDuration('ackTimeout');
verifyDuration('ipcAckTimeout');
verifyDuration('pingInterval');
verifyDuration('pingTimeout');
verifyDuration('workerStatusInterval');
Expand Down Expand Up @@ -635,7 +640,18 @@ SocketCluster.prototype._launchWorkerCluster = function () {
} else if (m.type == 'workerExit') {
self._workerExitHandler(m.data);
} else if (m.type == 'workerMessage') {
self.emit('workerMessage', m.workerId, m.data);
self.emit('workerMessage', m.workerId, m.data, function (data) {
if (m.cid) {
self.respondToWorker(data, m.workerId, m.cid);
}
});
} else if (m.type == 'workerResponse') {
var responseHandler = self._pendingResponseHandlers[m.rid];
if (responseHandler) {
clearTimeout(responseHandler.timeout);
delete self._pendingResponseHandlers[m.rid];
responseHandler.callback(null, m.data, m.workerId);
}
}
});

Expand All @@ -648,6 +664,14 @@ SocketCluster.prototype._launchWorkerCluster = function () {
this.workerCluster.on('exit', this._handleWorkerClusterExit.bind(this));
};

SocketCluster.prototype.respondToWorker = function (data, workerId, rid) {
this.workerCluster.send({
type: 'masterResponse',
data: data,
rid: rid
});
};

SocketCluster.prototype._logDeploymentDetails = function () {
if (this.options.logLevel > 0) {
console.log(' ' + this.colorText('[Active]', 'green') + ' SocketCluster started');
Expand Down Expand Up @@ -734,15 +758,40 @@ SocketCluster.prototype._start = function () {
launchBrokerEngine();
};

SocketCluster.prototype.sendToWorker = function (workerId, data) {
this.workerCluster.send({
SocketCluster.prototype._createIPCResponseHandler = function (callback) {
var cid = uuid.v4();

var responseTimeout = setTimeout(function () {
var responseHandler = self._pendingResponseHandlers[cid];
delete self._pendingResponseHandlers[cid];
var timeoutError = new TimeoutError('IPC response timed out');
responseHandler.callback(timeoutError);
}, this.options.ipcAckTimeout);

this._pendingResponseHandlers[cid] = {
callback: callback,
timeout: responseTimeout
};

return cid;
};

SocketCluster.prototype.sendToWorker = function (workerId, data, callback) {
var self = this;

var messagePacket = {
type: 'masterMessage',
workerId: workerId,
data: data
});
};

if (callback) {
messagePacket.cid = this._createIPCResponseHandler(callback);
}
this.workerCluster.send(messagePacket);
};

SocketCluster.prototype.sendToBroker = function (brokerId, data) {
SocketCluster.prototype.sendToBroker = function (brokerId, data, callback) {
this._brokerEngineServer.sendToBroker(brokerId, data);
};

Expand Down
56 changes: 53 additions & 3 deletions lib/scworker.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ var socketClusterServer = require('socketcluster-server');
var EventEmitter = require('events').EventEmitter;
var crypto = require('crypto');
var domain = require('sc-domain');
var uuid = require('uuid');
var http = require('http');
var https = require('https');
var fs = require('fs');
Expand All @@ -14,6 +15,7 @@ var InvalidActionError = scErrors.InvalidActionError;
var ResourceLimitError = scErrors.ResourceLimitError;
var BrokerError = scErrors.BrokerError;
var HTTPServerError = scErrors.HTTPServerError;
var TimeoutError = scErrors.TimeoutError;

var SCWorker = function (options) {
var self = this;
Expand All @@ -27,6 +29,7 @@ var SCWorker = function (options) {
this.MIDDLEWARE_START = 'start';

this.type = 'worker';
self._pendingResponseHandlers = {};

this._errorDomain = domain.create();
this._errorDomain.on('error', function () {
Expand Down Expand Up @@ -345,11 +348,52 @@ SCWorker.prototype.getStatus = function () {
};
};

SCWorker.prototype.sendToMaster = function (data) {
process.send({
SCWorker.prototype._createIPCResponseHandler = function (callback) {
var cid = uuid.v4();

var responseTimeout = setTimeout(function () {
var responseHandler = self._pendingResponseHandlers[cid];
delete self._pendingResponseHandlers[cid];
var timeoutError = new TimeoutError('IPC response timed out');
responseHandler.callback(timeoutError);
}, this.options.ipcAckTimeout);

this._pendingResponseHandlers[cid] = {
callback: callback,
timeout: responseTimeout
};

return cid;
};

SCWorker.prototype.handleMasterResponse = function (message) {
var responseHandler = this._pendingResponseHandlers[message.rid];
if (responseHandler) {
clearTimeout(responseHandler.timeout);
delete this._pendingResponseHandlers[message.rid];
responseHandler.callback(null, message.data);
}
};

SCWorker.prototype.sendToMaster = function (data, callback) {
var messagePacket = {
type: 'workerMessage',
data: data,
workerId: this.id
};

if (callback) {
messagePacket.cid = this._createIPCResponseHandler(callback);
}
process.send(messagePacket);
};

SCWorker.prototype.respondToMaster = function (data, rid) {
process.send({
type: 'workerResponse',
data: data,
workerId: this.id,
rid: rid
});
};

Expand All @@ -358,7 +402,13 @@ SCWorker.prototype.handleMasterEvent = function () {
};

SCWorker.prototype.handleMasterMessage = function (message) {
this.emit('masterMessage', message.data);
var self = this

self.emit('masterMessage', message.data, function (data) {
if (message.cid) {
self.respondToMaster(data, message.cid);
}
});
};

SCWorker.prototype.errorHandler = function (err) {
Expand Down
3 changes: 3 additions & 0 deletions lib/workercluster.js
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ if (cluster.isMaster) {
var worker;

process.on('message', function (m) {

if (m.type == 'init') {
if (m.data.processTermTimeout) {
processTermTimeout = m.data.processTermTimeout;
Expand Down Expand Up @@ -210,6 +211,8 @@ if (cluster.isMaster) {
}
} else if (m.type == 'masterMessage') {
worker.handleMasterMessage(m);
} else if (m.type == 'masterResponse') {
worker.handleMasterResponse(m);
} else if (m.type == 'terminate') {
if (worker && !m.data.immediate) {
worker.close(function () {
Expand Down
Loading

0 comments on commit f47dc3d

Please sign in to comment.