Skip to content
This repository has been archived by the owner on Oct 30, 2018. It is now read-only.

Commit

Permalink
Merge pull request #118 from MrFancyMonocle/bugfix/openprocess
Browse files Browse the repository at this point in the history
[WIP] workaround added for multiple restarting crashed proccesses crashing …
  • Loading branch information
bookchin authored May 2, 2017
2 parents 729ce32 + 8e53c50 commit bf978db
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 20 deletions.
9 changes: 6 additions & 3 deletions bin/storjshare-daemon.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,10 @@

const daemonize = require('daemon');
const dnode = require('dnode');
const config = require('../lib/config/daemon');
const RPC = require('../lib/api');
const utils = require('../lib/utils');
const api = new RPC({ logVerbosity: config.daemonLogVerbosity });
const {createWriteStream} = require('fs');
const config = require('../lib/config/daemon');
const { createWriteStream } = require('fs');
const logFile = createWriteStream(config.daemonLogFilePath, { flags: 'a' });
const storjshare_daemon = require('commander');

Expand All @@ -17,6 +16,10 @@ storjshare_daemon
.option('-F, --foreground', 'keeps the process in the foreground')
.parse(process.argv);

const api = new RPC({
logVerbosity: config.daemonLogVerbosity
});

function startDaemonRpcServer() {
dnode(api.methods)
.on('error', (err) => api.logger.warn(err.message))
Expand Down
5 changes: 2 additions & 3 deletions bin/storjshare-start.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ storjshare_start
.description('starts a new network share')
.option('-c, --config <path>', 'specify the configuration path')
.option('-d, --detached', 'run share without management from daemon')
.option('-u, --unsafe', 'ignore system resource guards')
.parse(process.argv);

if (!storjshare_start.config) {
Expand Down Expand Up @@ -42,7 +43,7 @@ function runManagedShare() {
}
console.info(`\n * starting share with config at ${configPath}`);
sock.end();
});
}, storjshare_start.unsafe);
});
}

Expand All @@ -51,5 +52,3 @@ if (storjshare_start.detached) {
} else {
runManagedShare();
}


4 changes: 2 additions & 2 deletions example/daemon.config.json
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
{
// Bind Dnode RPC server to this port
"daemonRpcPort": 45015,
// Interface to bind Dnode RPC server, if your host is public, be sure to
// Interface to bind Dnode RPC server, if your host is public, be sure to
// leave this as "127.0.0.1" to prevent others from controlling your nodes
// You can set this to a public address if you'd like to control your shares
// remotely, however you must secure access on your own - you have been
// remotely, however you must secure access on your own - you have been
// warned
"daemonRpcAddress": "127.0.0.1",
// Path to write daemon log file to disk, leave blank to default to:
Expand Down
28 changes: 18 additions & 10 deletions lib/api.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ const JsonLogger = require('kad-logger-json');
const {fork} = require('child_process');
const utils = require('./utils');
const path = require('path');
const { cpus } = require('os');

/** Class representing a local RPC API's handlers */
class RPC {
Expand Down Expand Up @@ -75,19 +76,23 @@ class RPC {
/**
* Starts a share process with the given configuration
* @param {String} configPath
* @param {Boolean} unsafeFlag
* @param {RPC~startCallback}
* @see https://storj.github.io/core/FarmerInterface.html
*/
start(configPath, callback) {
start(configPath, callback, unsafeFlag=false) {
let config = null;

if (this.shares.size + 1 >= cpus().length && !unsafeFlag) {
return callback(new Error('insufficient system resources available'));
}

try {
config = this._readConfig(configPath);
} catch (err) {
return callback(err);
}

const self = this;
const nodeId = storj.KeyPair(config.networkPrivateKey).getNodeID();
const share = this.shares.get(nodeId) || {
config: config,
Expand All @@ -103,7 +108,7 @@ class RPC {

this._log(`attempting to start share with config at path ${configPath}`);

if (self.shares.has(nodeId) && self.shares.get(nodeId).readyState === 1) {
if (this.shares.has(nodeId) && this.shares.get(nodeId).readyState === 1) {
return callback(new Error(`share ${nodeId} is already running`));
}

Expand Down Expand Up @@ -137,7 +142,7 @@ class RPC {
// NB: Listen for state changes to update the share's record
share.process.on('error', (err) => {
share.readyState = RPC.SHARE_ERRORED;
self._log(err.message, 'error');
this._log(err.message, 'error');
clearInterval(uptimeCounter);
});

Expand All @@ -146,17 +151,20 @@ class RPC {
let maxRestartsReached = share.meta.numRestarts >= RPC.MAX_RESTARTS;
share.readyState = RPC.SHARE_STOPPED;

self._log(`share ${nodeId} exited with code ${code}`);
this._log(`share ${nodeId} exited with code ${code}`);
clearInterval(uptimeCounter);

if (signal !== 'SIGINT' && !maxRestartsReached) {
if (signal !== 'SIGINT' &&
!maxRestartsReached &&
share.meta.uptimeMs >= 5000
) {
share.meta.numRestarts++;
self.restart(nodeId, () => null);
this.restart(nodeId, () => null);
}
});

share.process.on('message', (msg) => self._processShareIpc(share, msg));
self.shares.set(nodeId, share);
share.process.on('message', (msg) => this._processShareIpc(share, msg));
this.shares.set(nodeId, share);
callback(null);
});
}
Expand Down Expand Up @@ -324,7 +332,7 @@ class RPC {
return callback(new Error('failed to parse snapshot'));
}

async.eachLimit(snapshot, 3, (share, next) => {
async.eachLimit(snapshot, 1, (share, next) => {
this.start(share.path, (err) => {
/* istanbul ignore if */
if (err) {
Expand Down
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "storjshare-daemon",
"version": "2.5.1",
"version": "2.5.2",
"description": "daemon + process manager for sharing space on the storj network",
"main": "index.js",
"bin": {
Expand Down Expand Up @@ -71,7 +71,7 @@
"pretty-ms": "^2.1.0",
"rc": "^1.1.6",
"readable-stream": "^2.2.2",
"storj-lib": "^6.3.2",
"storj-lib": "^6.4.2",
"storj-telemetry-reporter": "^5.0.0",
"strip-json-comments": "^2.0.1",
"tail": "^1.2.1"
Expand Down
32 changes: 32 additions & 0 deletions test/api.unit.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,36 @@ describe('class:RPC', function() {

describe('#start', function() {

it('should callback error if too many shares', function(done) {
let _RPC = proxyquire('../lib/api', {
os: {
cpus: sinon.stub().returns([])
}
});
let rpc = new _RPC({ loggerVerbosity: 0 });
rpc.start('path/to/config', function(err) {
expect(err.message).to.equal(
'insufficient system resources available'
);
done();
});
});

it('should fall through is unsafe flag', function(done) {
let _RPC = proxyquire('../lib/api', {
os: {
cpus: sinon.stub().returns([])
}
});
let rpc = new _RPC({ loggerVerbosity: 0 });
rpc.start('path/to/config', function(err) {
expect(err.message).to.equal(
'failed to read config at path/to/config'
);
done();
}, true);
});

it('should callback error if no config given', function(done) {
let _RPC = proxyquire('../lib/api', {
fs: {
Expand Down Expand Up @@ -148,6 +178,8 @@ describe('class:RPC', function() {
let _ipc = sinon.stub(rpc, '_processShareIpc');
rpc.start('path/to/config', function() {
let id = rpc.shares.keys().next().value;
let share = rpc.shares.get(id);
share.meta.uptimeMs = 6000;
_proc.emit('message', {});
setImmediate(() => {
expect(_ipc.called).to.equal(true);
Expand Down

0 comments on commit bf978db

Please sign in to comment.