Skip to content

Commit

Permalink
Merge pull request #5 from Supergiovane/master
Browse files Browse the repository at this point in the history
Fix reconnection timeout and possible memory leak
  • Loading branch information
bartbutenaers authored Apr 9, 2022
2 parents a2d105b + a1a1258 commit 4f392af
Show file tree
Hide file tree
Showing 3 changed files with 177 additions and 82 deletions.
89 changes: 89 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "node-red-contrib-sse-client",
"version": "0.2.3",
"version": "0.2.4",
"description": "A Node Red node to receive Server Sent events",
"dependencies": {
"original": "^1.0.0",
Expand Down
168 changes: 87 additions & 81 deletions sse_client.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,130 +18,132 @@ function isEmpty(obj) {
return Object.keys(obj).length === 0 && obj.constructor === Object
};

module.exports = function(RED) {
module.exports = function (RED) {
var EventSource = require('./lib/eventsource');
var mustache = require("mustache");

function SseClientNode(config) {
RED.nodes.createNode(this, config);
this.headers = config.headers || {};
this.url = config.url;
this.events = config.events || [];
this.proxy = config.proxy;
this.restart = config.restart;
this.timeout = config.timeout;
this.rejectUnauthorized = config.rejectUnauthorized;
this.withCredentials = config.withCredentials;
this.paused = false;
this.prevMsg = null;
this.timerId = null;

RED.nodes.createNode(this, config);
var node = this;


node.headers = config.headers || {};
node.url = config.url;
node.events = config.events || [];
node.proxy = config.proxy;
node.restart = config.restart;
node.timeout = config.timeout;
node.rejectUnauthorized = config.rejectUnauthorized;
node.withCredentials = config.withCredentials;
node.paused = false;
node.prevMsg = null;
node.timerId = null;
node.client = null;
node.proxyUrl = null;

// Migration of old nodes without the http settings
if (node.rejectUnauthorized === undefined) {
node.rejectUnauthorized = true;
}
if (node.withCredentials === undefined) {
node.withCredentials = true;
}

var isTemplatedUrl = (node.url||"").indexOf("{{") != -1;

node.status({fill: 'red', shape: 'ring', text: 'disconnected'});

var isTemplatedUrl = (node.url || "").indexOf("{{") != -1;

node.status({ fill: 'red', shape: 'ring', text: 'Disconnected. Wait for input msg to start.' });

function handleEvent(e) {
// Skip all events when this node is paused
if (node.paused) {
return;
}

// Skip the 'open' event
if (e.type === 'open') {
return;
}

// When events have been specified, only allow those events
if (node.events.length > 0 && !node.events.includes(e.type)) {
return;
}

// When a previous timer is available, stop it
if (node.timerId) {
clearTimeout(node.timerId);
node.timerId = null;
}


// When a timeout is specified, start a new timer (that restarts the SSE client)
if (node.restart) {
node.timerId = setTimeout(function restartClient() {
// Restart the SSE client by resending the last message again
handleMsg(node.prevMsg);
}, node.timeout * 1000);
}

if (node.restart) startTimeoutTimer();

// Send the received SSE event in the output message
node.send({
event: e.type,
payload: e.data
});
}


// 08/04/2022 Supergiovane: Function to start the timeout timer.
function startTimeoutTimer() {
// When a previous timer is available, stop it
if (node.timerId !== null) clearTimeout(node.timerId);
node.timerId = setTimeout(function restartClient() {
// Restart the SSE client by resending the last message again
node.status({ fill: "yellow", shape: "dot", text: "timeout => restarting" });
if (node.prevMsg === null) node.prevMsg = { payload: true };
handleMsg(node.prevMsg);
//node.send({ payload: "Reconnecting..." });
}, node.timeout * 1000);
}

function handleMsg(msg) {

// When a stream is paused or stopped, stop the active timeout timer (since no events will be received anyway)
if (msg.pause === true || msg.stop === true) {
if (node.timerId) {
if (node.timerId) {
clearTimeout(node.timerId);
node.timerId = null;
}
}

// Check whether the stream should be paused
if (msg.pause === true) {
node.status({fill: 'yellow', shape: 'ring', text: 'paused'});
node.status({ fill: 'yellow', shape: 'ring', text: 'paused' });
node.paused = true;

return;
}

// To stop the streaming, close the client
if (msg.stop === true) {
node.status({fill: 'red', shape: 'ring', text: 'disconnected'});
node.status({ fill: 'red', shape: 'ring', text: 'disconnected' });
if (node.client) {
node.client.close();
}
node.client = null;
return;
}

// When the previous client is NOT paused, we will stop it and create a new one (to send a new http GET).
if (node.client && !node.paused) {
node.status({fill: 'red', shape: 'ring', text: 'disconnected'});
setTimeout(() => {
node.status({ fill: 'red', shape: 'ring', text: 'disconnected' });
}, 1000); // 08/04/2022 Supergiovane: Allow the "reconnecting" status to be shown, in case of reconnection by timeout.
node.client.close();
node.client = null;
}

// When we arrive here, a new stream should be started or a paused stream should be restarted

// When the previous client is paused, then resume it again
if (node.client && node.paused) {
node.status({fill: "green", shape: "dot", text: "connected"});
node.paused = false;

// When a timeout is specified, start a new timer (that restarts the SSE client)
if (node.restart) {
node.timerId = setTimeout(function restartClient() {
// Restart the SSE client by resending the last message again
handleMsg(node.prevMsg);
}, node.timeout * 1000);
}
node.status({ fill: "green", shape: "dot", text: "connected" });
node.paused = false;
}


// When a timeout is specified, start a new timer (that restarts the SSE client)
if (node.restart) startTimeoutTimer();

// When no client is available, just create one (and connect to the server) ...
if (!node.client) {
// All EventSource parameter should be passed to the constructor as a dictionary
var options = {};
var url = '';
let options = {};
let url = '';

// Allow override of headers
if (msg.headers && isEmpty(node.headers)) {
Expand All @@ -152,11 +154,11 @@ module.exports = function(RED) {
options.headers = node.headers;
node.warn('Warning: msg properties can not override set node properties. Using set node properties.');
}

if (node.proxyUrl) {
options.proxy = node.proxyUrl;
}

options.https = {
rejectUnauthorized: node.rejectUnauthorized,
/*TODO
Expand All @@ -166,9 +168,9 @@ module.exports = function(RED) {
passphrase: 'test1234$'
*/
}

options.withCredentials = node.withCredentials;

// Allow override of url
if (msg.url && !node.url) {
url = msg.url;
Expand All @@ -178,49 +180,53 @@ module.exports = function(RED) {
url = node.url;
node.warn('Warning: msg properties can not override set node properties. Using set node properties.');
} else {
node.status({fill: "red", shape: "dot", text: "no url"});
node.status({ fill: "red", shape: "dot", text: "no url" });
return;
}

if (isTemplatedUrl) {
url = mustache.render(url, msg);
}

// Start a new stream (i.e. send a new http get)
node.client = new EventSource(url, options);

node.client.onopen = function() {
node.status({fill: "green", shape: "dot", text: "connected"});
node.client.onopen = function () {
setTimeout(() => {
node.status({ fill: "green", shape: "dot", text: "connected" });
}, 1000); // 08/04/2022 Supergiovane: Allow the "reconnecting" status to be shown, in case of reconnection by timeout.

}

node.client.onerror = function(err) {
node.status({fill: "red", shape: "dot", text: `Error: ${err.message}`});
node.client.onerror = function (err) {
node.status({ fill: "red", shape: "dot", text: `Error: ${err.message}` });
}
}

// Handle ALL events.
node.client.onAnyMessage = function(eventType, event) {
node.client.onAnyMessage = function (eventType, event) {
handleEvent(event);
}
}

node.on("input", function(msg) {
node.prevMsg = msg;
handleMsg(msg);

node.on("input", function (msg) {
let _msg = RED.util.cloneMessage(msg);
node.prevMsg = _msg;
handleMsg(_msg);
});

node.on("close", function() {
node.status({fill: "red", shape: "ring", text: "disconnected"});
if (node.client) {
node.on("close", function () {
node.status({ fill: "red", shape: "ring", text: "disconnected" });
if (node.client) {
node.client.close();
}
node.paused = false;

if (node.timerId) {
clearTimeout(node.timerId);
node.timerId = null;
}
});
});
}

RED.nodes.registerType("sse-client", SseClientNode);
Expand Down

0 comments on commit 4f392af

Please sign in to comment.