Skip to content
This repository has been archived by the owner on Apr 6, 2021. It is now read-only.

Process kill fix #7

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
47 changes: 35 additions & 12 deletions consumer.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
( // Module boilerplate to support browser globals, node.js and AMD.
(typeof module !== "undefined" && function (m) { module.exports = m(require('stream'), require('events'), require('smith')); }) ||
(typeof define === "function" && function (m) { define("vfs-socket/consumer", ["./stream-amd", "./events-amd", "smith"], m); }) ||
(typeof define === "function" && function (m) { define(["./stream-amd", "./events-amd", "smith"], m); }) ||
(function (m) { window.consumer = m(window.stream, window.events, window.smith); })
)(function (stream, events, smith) {
"use strict";
Expand Down Expand Up @@ -62,6 +62,7 @@ function Consumer() {
ping: ping, // Send a simple ping request to the worker
resolve: route("resolve"),
stat: route("stat"),
metadata: route("metadata"),
readfile: route("readfile"),
readdir: route("readdir"),
mkfile: route("mkfile"),
Expand All @@ -74,6 +75,7 @@ function Consumer() {
watch: route("watch"),
connect: route("connect"),
spawn: route("spawn"),
pty: route("pty"),
execFile: route("execFile"),
extend: route("extend"),
unextend: route("unextend"),
Expand Down Expand Up @@ -103,16 +105,16 @@ function Consumer() {
err = new Error("EDISCONNECT: vfs socket disconnected");
err.code = "EDISCONNECT";
}
Object.keys(streams).forEach(function (id) {
var stream = streams[id];
stream.emit("close");
});
Object.keys(proxyStreams).forEach(onClose);
Object.keys(proxyProcesses).forEach(function (pid) {
var proxyProcess = proxyProcesses[pid];
delete proxyProcesses[pid];
proxyProcess.emit("exit", 1);
});
Object.keys(streams).forEach(function (id) {
var stream = streams[id];
stream.emit("close");
});
Object.keys(proxyStreams).forEach(onClose);
Object.keys(proxyWatchers).forEach(function (id) {
var proxyWatcher = proxyWatchers[id];
delete proxyWatchers[id];
Expand All @@ -124,6 +126,11 @@ function Consumer() {
proxyApi.emit("error", err);
});
});

this.on("error", function(err){
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was annoying me sometimes with collab, good that you handled it here

// just adding an empty listener so that events-amd doesn't throw
console.error(err);
});

var nextStreamID = 1;
function storeStream(stream) {
Expand All @@ -138,9 +145,9 @@ function Consumer() {
stream.pause && stream.pause();
}
});
stream.on("end", function () {
stream.on("end", function (chunk) {
delete streams[id];
remote.onEnd(id);
remote.onEnd(id, chunk);
});
}
stream.on("close", function () {
Expand Down Expand Up @@ -193,6 +200,19 @@ function Consumer() {
};
return process;
}
function makePtyProxy(token){
var pty = makeStreamProxy(token);
var pid = token.pid;
pty.pid = pid;
proxyProcesses[pid] = pty;
pty.kill = function (signal) {
remote.kill(pid, signal);
};
pty.resize = function (cols, rows) {
remote.resize(pid, cols, rows);
};
return pty;
}

function makeWatcherProxy(token) {
var watcher = new EventEmitter();
Expand Down Expand Up @@ -248,13 +268,13 @@ function Consumer() {
if (!stream) return;
stream.emit("data", chunk);
}
function onEnd(id) {
function onEnd(id, chunk) {
var stream = proxyStreams[id];
if (!stream) return;
// TODO: not delete proxy if close is going to be called later.
// but somehow do delete proxy if close won't be called later.
delete proxyStreams[id];
stream.emit("end");
stream.emit("end", chunk);
}
function onClose(id) {
var stream = proxyStreams[id];
Expand Down Expand Up @@ -289,7 +309,7 @@ function Consumer() {
if (!stream) return;
stream.destroy();
delete streams[id];
nextStreamID = id;
// nextStreamID = id;
}
function pause(id) {
var stream = streams[id];
Expand All @@ -307,7 +327,7 @@ function Consumer() {
delete streams[id];
if (chunk) stream.end(chunk);
else stream.end();
nextStreamID = id;
// nextStreamID = id;
}

function on(name, handler, callback) {
Expand Down Expand Up @@ -360,6 +380,9 @@ function Consumer() {
if (meta.process) {
meta.process = makeProcessProxy(meta.process);
}
if (meta.pty) {
meta.pty = makePtyProxy(meta.pty);
}
if (meta.watcher) {
meta.watcher = makeWatcherProxy(meta.watcher);
}
Expand Down
1 change: 1 addition & 0 deletions events-amd.js
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ EventEmitter.prototype.removeListener = function(type, listener) {

return this;
};
EventEmitter.prototype.off = EventEmitter.prototype.removeListener;

/**
* Removes all listeners, or those of the specified event.
Expand Down
60 changes: 45 additions & 15 deletions worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ function Worker(vfs) {
// Endpoints for processes at meta.process
kill: kill,

// Endpoints for processes at meta.pty
resize: resize,

// Endpoint for watchers at meta.watcher
close: closeWatcher,

Expand All @@ -47,6 +50,7 @@ function Worker(vfs) {
// Route other calls to the local vfs instance
resolve: route("resolve"),
stat: route("stat"),
metadata: route("metadata"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a PR for vfs-local that includes this function as well ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added it to c9/vfs-local#11

readfile: route("readfile"),
readdir: route("readdir"),
mkfile: route("mkfile"),
Expand All @@ -59,6 +63,7 @@ function Worker(vfs) {
watch: route("watch"),
connect: route("connect"),
spawn: route("spawn"),
pty: route("pty"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't this require c9/vfs-local#11 to be merged as well ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't matter unless the method is called.

execFile: route("execFile"),
extend: route("extend"),
unextend: route("unextend"),
Expand Down Expand Up @@ -107,16 +112,18 @@ function Worker(vfs) {
err = new Error("EDISCONNECT: vfs socket disconnected");
err.code = "EDISCONNECT";
}
Object.keys(streams).forEach(function (id) {
var stream = streams[id];
stream.emit("close", err);
});
Object.keys(proxyStreams).forEach(onClose);
Object.keys(processes).forEach(function (pid) {
var process = processes[pid];
console.log("PROCESS", process)
process.kill();
delete processes[pid];
});
Object.keys(streams).forEach(function (id) {
var stream = streams[id];
console.log("STREAM", stream)
stream.emit("close", err);
});
Object.keys(proxyStreams).forEach(onClose);
Object.keys(watchers).forEach(function (id) {
var watcher = watchers[id];
delete watchers[id];
Expand Down Expand Up @@ -163,9 +170,9 @@ function Worker(vfs) {
stream.pause && stream.pause();
}
});
stream.on("end", function () {
stream.on("end", function (chunk) {
delete streams[id];
remote.onEnd(id);
remote.onEnd(id, chunk);
});
}
stream.on("close", function () {
Expand All @@ -178,7 +185,7 @@ function Worker(vfs) {
return token;
}

function storeProcess(process) {
function storeProcess(process, onlyPid) {
var pid = process.pid;
processes[pid] = process;
process.on("exit", function (code, signal) {
Expand All @@ -187,22 +194,35 @@ function Worker(vfs) {
});
process.on("close", function () {
delete processes[pid];
delete streams[process.stdout.id];
delete streams[process.stderr.id];
delete streams[process.stdin.id];
if (!onlyPid) {
delete streams[process.stdout.id];
delete streams[process.stderr.id];
delete streams[process.stdin.id];
}
remote.onProcessClose(pid);
});

process.kill = function(code) {
killtree(pid, code);
};

if (onlyPid)
return pid;

var token = {pid: pid};
token.stdin = storeStream(process.stdin);
token.stdout = storeStream(process.stdout);
token.stderr = storeStream(process.stderr);
return token;
}

function storePty(pty) {
var pid = storeProcess(pty, true);
var token = storeStream(pty);
token.pid = pid;

return token;
}

function killtree(pid, code) {
childrenOfPid(pid, function(err, pidlist){
Expand Down Expand Up @@ -275,15 +295,15 @@ function Worker(vfs) {
if (!stream) return;
delete streams[id];
stream.destroy();
nextStreamID = id;
// nextStreamID = id;
}
function end(id, chunk) {
var stream = streams[id];
if (!stream) return;
delete streams[id];
if (chunk) stream.end(chunk);
else stream.end();
nextStreamID = id;
// nextStreamID = id;
}

function kill(pid, code) {
Expand All @@ -292,6 +312,15 @@ function Worker(vfs) {
process.kill(code);
}

function resize(pid, cols, rows) {
var process = processes[pid];
if (!process) return;

// Resize can throw
try { process.resize(cols, rows); }
catch(e) {};
}

function closeWatcher(id) {
var watcher = watchers[id];
if (!watcher) return;
Expand Down Expand Up @@ -323,13 +352,13 @@ function Worker(vfs) {
if (!stream) return;
stream.emit("data", chunk);
}
function onEnd(id) {
function onEnd(id, chunk) {
var stream = proxyStreams[id];
if (!stream) return;
// TODO: not delete proxy if close is going to be called later.
// but somehow do delete proxy if close won't be called later.
delete proxyStreams[id];
stream.emit("end");
stream.emit("end", chunk);
}
function onClose(id) {
var stream = proxyStreams[id];
Expand Down Expand Up @@ -360,6 +389,7 @@ function Worker(vfs) {
switch (key) {
case "stream": token.stream = storeStream(meta.stream); break;
case "process": token.process = storeProcess(meta.process); break;
case "pty": token.pty = storePty(meta.pty); break;
case "watcher": token.watcher = storeWatcher(meta.watcher); break;
case "api": token.api = storeApi(meta.api); break;
default: token[key] = meta[key]; break;
Expand Down