Skip to content

Commit

Permalink
v6.6.0 - Added ability for processes to easily respond to messages ov…
Browse files Browse the repository at this point in the history
…er IPC via callbacks
  • Loading branch information
jondubois committed Aug 5, 2017
1 parent f47dc3d commit 7a6effa
Show file tree
Hide file tree
Showing 16 changed files with 348 additions and 85 deletions.
57 changes: 44 additions & 13 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ var SocketCluster = function (options) {
self.EVENT_WORKER_CLUSTER_EXIT = 'workerClusterExit';

self._pendingResponseHandlers = {};
self.workerClusterMessageBuffer = [];

self._errorAnnotations = {
'EADDRINUSE': 'Failed to bind to a port because it was already used by another process.'
Expand Down Expand Up @@ -125,6 +126,7 @@ SocketCluster.prototype._init = function (options) {

self._active = false;
self.workerCluster = null;
self.isWorkerClusterReady = false;

self._colorCodes = {
red: 31,
Expand Down Expand Up @@ -388,9 +390,11 @@ SocketCluster.prototype._logObject = function (obj, objType, time) {

SocketCluster.prototype._convertValueToUnknownError = function (err, origin) {
if (err && typeof err == 'object') {
// If err has neither a stack or message property
// then the error message will be the JSON stringified object.
if (!err.message && !err.stack) {
if (err.message || err.stack) {
err = scErrors.hydrateError(err, true);
} else {
// If err has neither a stack nor a message property
// then the error message will be the JSON stringified object.
var errorMessage;
try {
errorMessage = JSON.stringify(err);
Expand Down Expand Up @@ -489,6 +493,8 @@ SocketCluster.prototype._workerWarningHandler = function (workerPid, warning) {
SocketCluster.prototype._workerClusterReadyHandler = function () {
var self = this;

this.isWorkerClusterReady = true;

if (!this._active) {
if (this.options.rebootOnSignal) {
process.on('SIGUSR2', function () {
Expand All @@ -512,6 +518,8 @@ SocketCluster.prototype._workerClusterReadyHandler = function () {
this._logDeploymentDetails();
}

this._flushWorkerClusterMessageBuffer();

var workerClusterInfo = {
pid: this.workerCluster.pid,
childProcess: this.workerCluster
Expand All @@ -538,6 +546,8 @@ SocketCluster.prototype._workerStartHandler = function (workerInfo, signal) {
};

SocketCluster.prototype._handleWorkerClusterExit = function (errorCode, signal) {
this.isWorkerClusterReady = false;

var workerClusterInfo = {
pid: this.workerCluster.pid,
code: errorCode,
Expand Down Expand Up @@ -591,6 +601,7 @@ SocketCluster.prototype._launchWorkerCluster = function () {
}

this.workerCluster = fork(__dirname + '/lib/workercluster.js', process.argv.slice(2), execOptions);
this.isWorkerClusterReady = false;

var workerOpts = this._cloneObject(this.options);
workerOpts.paths = this._paths;
Expand Down Expand Up @@ -631,7 +642,7 @@ SocketCluster.prototype._launchWorkerCluster = function () {
self._workerClusterErrorHandler(m.data.pid, m.data.error);
}
} else if (m.type == 'warning') {
var warning = scErrors.hydrateError(m.data.error);
var warning = scErrors.hydrateError(m.data.error, true);
self._workerWarningHandler(m.data.workerPid, warning);
} else if (m.type == 'ready') {
self._workerClusterReadyHandler();
Expand All @@ -640,17 +651,18 @@ SocketCluster.prototype._launchWorkerCluster = function () {
} else if (m.type == 'workerExit') {
self._workerExitHandler(m.data);
} else if (m.type == 'workerMessage') {
self.emit('workerMessage', m.workerId, m.data, function (data) {
self.emit('workerMessage', m.workerId, m.data, function (err, data) {
if (m.cid) {
self.respondToWorker(data, m.workerId, m.cid);
self.respondToWorker(err, data, m.workerId, m.cid);
}
});
} else if (m.type == 'workerResponse') {
} else if (m.type == 'workerResponse' || m.type == 'workerClusterResponse') {
var responseHandler = self._pendingResponseHandlers[m.rid];
if (responseHandler) {
clearTimeout(responseHandler.timeout);
delete self._pendingResponseHandlers[m.rid];
responseHandler.callback(null, m.data, m.workerId);
var properError = scErrors.hydrateError(m.error, true);
responseHandler.callback(properError, m.data, m.workerId);
}
}
});
Expand All @@ -662,11 +674,15 @@ SocketCluster.prototype._launchWorkerCluster = function () {
this.emit(this.EVENT_WORKER_CLUSTER_START, workerClusterInfo);

this.workerCluster.on('exit', this._handleWorkerClusterExit.bind(this));
this.workerCluster.on('disconnect', function () {
self.isWorkerClusterReady = false;
});
};

SocketCluster.prototype.respondToWorker = function (data, workerId, rid) {
SocketCluster.prototype.respondToWorker = function (err, data, workerId, rid) {
this.workerCluster.send({
type: 'masterResponse',
error: scErrors.dehydrateError(err, true),
data: data,
rid: rid
});
Expand Down Expand Up @@ -727,6 +743,7 @@ SocketCluster.prototype._start = function () {
expiryAccuracy: self._dataExpiryAccuracy,
downgradeToUser: self.options.downgradeToUser,
processTermTimeout: self.options.processTermTimeout,
ipcAckTimeout: self.options.ipcAckTimeout,
brokerOptions: self.options,
appBrokerControllerPath: self._paths.appBrokerControllerPath,
appInitControllerPath: self._paths.appInitControllerPath
Expand All @@ -750,15 +767,16 @@ SocketCluster.prototype._start = function () {
self.emit(self.EVENT_BROKER_EXIT, brokerInfo);
});

self._brokerEngineServer.on('brokerMessage', function (brokerId, data) {
self.emit('brokerMessage', brokerId, data);
self._brokerEngineServer.on('brokerMessage', function (brokerId, data, callback) {
self.emit('brokerMessage', brokerId, data, callback);
});
};

launchBrokerEngine();
};

SocketCluster.prototype._createIPCResponseHandler = function (callback) {
var self = this;
var cid = uuid.v4();

var responseTimeout = setTimeout(function () {
Expand All @@ -776,6 +794,15 @@ SocketCluster.prototype._createIPCResponseHandler = function (callback) {
return cid;
};

SocketCluster.prototype._flushWorkerClusterMessageBuffer = function () {
var self = this;

this.workerClusterMessageBuffer.forEach(function (messagePacket) {
self.workerCluster.send(messagePacket);
});
this.workerClusterMessageBuffer = [];
};

SocketCluster.prototype.sendToWorker = function (workerId, data, callback) {
var self = this;

Expand All @@ -788,11 +815,15 @@ SocketCluster.prototype.sendToWorker = function (workerId, data, callback) {
if (callback) {
messagePacket.cid = this._createIPCResponseHandler(callback);
}
this.workerCluster.send(messagePacket);
this.workerClusterMessageBuffer.push(messagePacket);

if (this.isWorkerClusterReady) {
this._flushWorkerClusterMessageBuffer();
}
};

SocketCluster.prototype.sendToBroker = function (brokerId, data, callback) {
this._brokerEngineServer.sendToBroker(brokerId, data);
this._brokerEngineServer.sendToBroker(brokerId, data, callback);
};

// The options object is optional and can have two boolean fields:
Expand Down
2 changes: 1 addition & 1 deletion kubernetes/socketcluster-deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ spec:
spec:
containers:
- name: socketcluster
image: socketcluster/socketcluster:v6.5.0
image: socketcluster/socketcluster:v6.6.0
ports:
- containerPort: 8000
env:
Expand Down
11 changes: 7 additions & 4 deletions lib/scworker.js
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,7 @@ SCWorker.prototype.getStatus = function () {
};

SCWorker.prototype._createIPCResponseHandler = function (callback) {
var self = this;
var cid = uuid.v4();

var responseTimeout = setTimeout(function () {
Expand All @@ -371,7 +372,8 @@ SCWorker.prototype.handleMasterResponse = function (message) {
if (responseHandler) {
clearTimeout(responseHandler.timeout);
delete this._pendingResponseHandlers[message.rid];
responseHandler.callback(null, message.data);
var properError = scErrors.hydrateError(message.error, true);
responseHandler.callback(properError, message.data);
}
};

Expand All @@ -388,9 +390,10 @@ SCWorker.prototype.sendToMaster = function (data, callback) {
process.send(messagePacket);
};

SCWorker.prototype.respondToMaster = function (data, rid) {
SCWorker.prototype.respondToMaster = function (err, data, rid) {
process.send({
type: 'workerResponse',
error: scErrors.dehydrateError(err, true),
data: data,
workerId: this.id,
rid: rid
Expand All @@ -404,9 +407,9 @@ SCWorker.prototype.handleMasterEvent = function () {
SCWorker.prototype.handleMasterMessage = function (message) {
var self = this

self.emit('masterMessage', message.data, function (data) {
self.emit('masterMessage', message.data, function (err, data) {
if (message.cid) {
self.respondToMaster(data, message.cid);
self.respondToMaster(err, data, message.cid);
}
});
};
Expand Down
10 changes: 10 additions & 0 deletions lib/workercluster.js
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,16 @@ if (cluster.isMaster) {
' because the worker does not exist';
var notFoundError = new InvalidActionError(errorMessage);
sendErrorToMaster(notFoundError);

if (m.cid) {
process.send({
type: 'workerClusterResponse',
error: scErrors.dehydrateError(notFoundError, true),
data: null,
workerId: m.workerId,
rid: m.cid
});
}
}
} else {
if (m.type == 'terminate' && m.data.killClusterMaster) {
Expand Down
26 changes: 17 additions & 9 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 2 additions & 6 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "socketcluster",
"description": "SocketCluster - A Highly parallelized WebSocket server cluster to make the most of multi-core machines/instances.",
"version": "6.5.0",
"version": "6.6.0",
"homepage": "http://socketcluster.io",
"contributors": [
{
Expand All @@ -13,18 +13,14 @@
"type": "git",
"url": "git://github.com/SocketCluster/socketcluster.git"
},
"scripts":{
"test":"npm run test::masterToWorkerCallback",
"test::masterToWorkerCallback":"node test/internal/serverToWorker.js"
},
"dependencies": {
"async": "2.0.0",
"base64id": "0.1.0",
"fs-extra": "2.0.0",
"inquirer": "1.1.3",
"minimist": "1.1.0",
"sc-auth": "~4.1.1",
"sc-broker-cluster": "~4.0.3",
"sc-broker-cluster": "~4.1.0",
"sc-domain": "~1.0.1",
"sc-emitter": "~1.1.0",
"sc-errors": "~1.3.3",
Expand Down
2 changes: 1 addition & 1 deletion sample/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
"sc-hot-reboot": "~1.0.0",
"scc-broker-client": "~1.4.0",
"serve-static": "1.11.2",
"socketcluster": "~6.5.0",
"socketcluster": "~6.6.0",
"socketcluster-client": "~6.3.0"
},
"keywords": [
Expand Down
2 changes: 1 addition & 1 deletion test/internal/broker.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
var fs = require('fs');
var http = require('http');
var getTestSocketPath = require('./testsocketpath').getTestSocketPath;
var getTestSocketPath = require('./test-socket-path').getTestSocketPath;
var util = require('util');

module.exports.run = function (broker) {
Expand Down
51 changes: 51 additions & 0 deletions test/internal/broker2.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@

module.exports.run = function (broker) {
console.log(' >> Broker PID:', process.pid);

broker.on('masterMessage', function (data, res) {
console.log(`BROKER ${broker.id}::Received data from master:`, data);
if (data.fail) {
var err = new Error('This is an error from broker');
err.name = 'MyCustomBrokerError';
res(err);
} else if (!data.doNothing) {
res(null, {
id: 1,
name: 'TestName'
});
}
});

var packet = {
prop: 1234
};

console.log(`BROKER ${broker.id}::Sending packet to master`);
broker.sendToMaster(packet, function (err, data) {
console.log(`BROKER ${broker.id}::Response error from master:`, err);
console.log(`BROKER ${broker.id}::Response data packet from master:`, data);
});

var timeoutPacket = {
doNothing: true
};

console.log(`BROKER ${broker.id}::Sending timeout-causing packet to master`);
broker.sendToMaster(timeoutPacket, function (err, data) {
console.log(`BROKER ${broker.id}::Timeout response error from master:`, err);
console.log(`BROKER ${broker.id}::Timeout response data packet from master:`, data);
});

var errorPacket = {
fail: true
};

console.log(`BROKER ${broker.id}::Sending error-causing packet to master`);
broker.sendToMaster(errorPacket, function (err, data) {
console.log(`BROKER ${broker.id}::Error response error from master:`, err);
console.log(`BROKER ${broker.id}::Error response data packet from master:`, data);
});

console.log(`BROKER ${broker.id}::Sending error-causing packet to master without callback`);
broker.sendToMaster(errorPacket);
};
2 changes: 1 addition & 1 deletion test/internal/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ var assert = require('assert');
var http = require('http');
var util = require('util');
var fs = require('fs');
var getTestSocketPath = require('./testsocketpath').getTestSocketPath;
var getTestSocketPath = require('./test-socket-path').getTestSocketPath;

var scServer = childProcess.fork(__dirname + '/server.js');
var resultSocketPath = getTestSocketPath();
Expand Down
Loading

0 comments on commit 7a6effa

Please sign in to comment.