From 413c5865530afe8cc4557f940adde1a905e26ad6 Mon Sep 17 00:00:00 2001 From: Massimo <37666636+Supergiovane@users.noreply.github.com> Date: Fri, 8 Apr 2022 14:12:43 +0200 Subject: [PATCH 1/5] Fixed timeout not property firing in some cases and possible memory leak. --- .vscode/launch.json | 18 +++++ package-lock.json | 89 +++++++++++++++++++++++ sse_client.js | 168 +++++++++++++++++++++++--------------------- 3 files changed, 194 insertions(+), 81 deletions(-) create mode 100644 .vscode/launch.json create mode 100644 package-lock.json diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 0000000..026af9b --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,18 @@ +{ + // Usare IntelliSense per informazioni sui possibili attributi. + // Al passaggio del mouse vengono visualizzate le descrizioni degli attributi esistenti. + // Per altre informazioni, visitare: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + + { + "type": "pwa-node", + "request": "launch", + "name": "Launch Program", + "skipFiles": [ + "/**" + ], + "program": "${workspaceFolder}/sse_client.js" + } + ] +} \ No newline at end of file diff --git a/package-lock.json b/package-lock.json new file mode 100644 index 0000000..606d984 --- /dev/null +++ b/package-lock.json @@ -0,0 +1,89 @@ +{ + "name": "node-red-contrib-sse-client", + "version": "0.2.3", + "lockfileVersion": 2, + "requires": true, + "packages": { + "": { + "name": "node-red-contrib-sse-client", + "version": "0.2.3", + "license": "Apache-2.0", + "dependencies": { + "mustache": "3.0.1", + "original": "^1.0.0" + } + }, + "node_modules/mustache": { + "version": "3.0.1", + "resolved": "https://registry.npmjs.org/mustache/-/mustache-3.0.1.tgz", + "integrity": "sha512-jFI/4UVRsRYdUbuDTKT7KzfOp7FiD5WzYmmwNwXyUVypC0xjoTL78Fqc0jHUPIvvGD+6DQSPHIt1NE7D1ArsqA==", + "bin": { + "mustache": "bin/mustache" + }, + "engines": { + "npm": ">=1.4.0" + } + }, + "node_modules/original": { + "version": "1.0.2", + "resolved": "https://registry.npmjs.org/original/-/original-1.0.2.tgz", + "integrity": "sha512-hyBVl6iqqUOJ8FqRe+l/gS8H+kKYjrEndd5Pm1MfBtsEKA038HkkdbAl/72EAXGyonD/PFsvmVG+EvcIpliMBg==", + "dependencies": { + "url-parse": "^1.4.3" + } + }, + "node_modules/querystringify": { + "version": "2.2.0", + "resolved": "https://registry.npmjs.org/querystringify/-/querystringify-2.2.0.tgz", + "integrity": "sha512-FIqgj2EUvTa7R50u0rGsyTftzjYmv/a3hO345bZNrqabNqjtgiDMgmo4mkUjd+nzU5oF3dClKqFIPUKybUyqoQ==" + }, + "node_modules/requires-port": { + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/requires-port/-/requires-port-1.0.0.tgz", + "integrity": "sha1-kl0mAdOaxIXgkc8NpcbmlNw9yv8=" + }, + "node_modules/url-parse": { + "version": "1.5.10", + "resolved": "https://registry.npmjs.org/url-parse/-/url-parse-1.5.10.tgz", + "integrity": "sha512-WypcfiRhfeUP9vvF0j6rw0J3hrWrw6iZv3+22h6iRMJ/8z1Tj6XfLP4DsUix5MhMPnXpiHDoKyoZ/bdCkwBCiQ==", + "dependencies": { + "querystringify": "^2.1.1", + "requires-port": "^1.0.0" + } + } + }, + "dependencies": { + "mustache": { + "version": "3.0.1", + "resolved": "https://registry.npmjs.org/mustache/-/mustache-3.0.1.tgz", + "integrity": "sha512-jFI/4UVRsRYdUbuDTKT7KzfOp7FiD5WzYmmwNwXyUVypC0xjoTL78Fqc0jHUPIvvGD+6DQSPHIt1NE7D1ArsqA==" + }, + "original": { + "version": "1.0.2", + "resolved": "https://registry.npmjs.org/original/-/original-1.0.2.tgz", + "integrity": "sha512-hyBVl6iqqUOJ8FqRe+l/gS8H+kKYjrEndd5Pm1MfBtsEKA038HkkdbAl/72EAXGyonD/PFsvmVG+EvcIpliMBg==", + "requires": { + "url-parse": "^1.4.3" + } + }, + "querystringify": { + "version": "2.2.0", + "resolved": "https://registry.npmjs.org/querystringify/-/querystringify-2.2.0.tgz", + "integrity": "sha512-FIqgj2EUvTa7R50u0rGsyTftzjYmv/a3hO345bZNrqabNqjtgiDMgmo4mkUjd+nzU5oF3dClKqFIPUKybUyqoQ==" + }, + "requires-port": { + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/requires-port/-/requires-port-1.0.0.tgz", + "integrity": "sha1-kl0mAdOaxIXgkc8NpcbmlNw9yv8=" + }, + "url-parse": { + "version": "1.5.10", + "resolved": "https://registry.npmjs.org/url-parse/-/url-parse-1.5.10.tgz", + "integrity": "sha512-WypcfiRhfeUP9vvF0j6rw0J3hrWrw6iZv3+22h6iRMJ/8z1Tj6XfLP4DsUix5MhMPnXpiHDoKyoZ/bdCkwBCiQ==", + "requires": { + "querystringify": "^2.1.1", + "requires-port": "^1.0.0" + } + } + } +} diff --git a/sse_client.js b/sse_client.js index bb9e7f7..810da84 100644 --- a/sse_client.js +++ b/sse_client.js @@ -18,26 +18,28 @@ function isEmpty(obj) { return Object.keys(obj).length === 0 && obj.constructor === Object }; -module.exports = function(RED) { +module.exports = function (RED) { var EventSource = require('./lib/eventsource'); var mustache = require("mustache"); function SseClientNode(config) { - RED.nodes.createNode(this, config); - this.headers = config.headers || {}; - this.url = config.url; - this.events = config.events || []; - this.proxy = config.proxy; - this.restart = config.restart; - this.timeout = config.timeout; - this.rejectUnauthorized = config.rejectUnauthorized; - this.withCredentials = config.withCredentials; - this.paused = false; - this.prevMsg = null; - this.timerId = null; - + RED.nodes.createNode(this, config); var node = this; - + + node.headers = config.headers || {}; + node.url = config.url; + node.events = config.events || []; + node.proxy = config.proxy; + node.restart = config.restart; + node.timeout = config.timeout; + node.rejectUnauthorized = config.rejectUnauthorized; + node.withCredentials = config.withCredentials; + node.paused = false; + node.prevMsg = null; + node.timerId = null; + node.client = null; + node.proxyUrl = null; + // Migration of old nodes without the http settings if (node.rejectUnauthorized === undefined) { node.rejectUnauthorized = true; @@ -45,103 +47,103 @@ module.exports = function(RED) { if (node.withCredentials === undefined) { node.withCredentials = true; } - - var isTemplatedUrl = (node.url||"").indexOf("{{") != -1; - node.status({fill: 'red', shape: 'ring', text: 'disconnected'}); - + var isTemplatedUrl = (node.url || "").indexOf("{{") != -1; + + node.status({ fill: 'red', shape: 'ring', text: 'Disconnected. Wait for input msg to start.' }); + function handleEvent(e) { // Skip all events when this node is paused if (node.paused) { return; } - + // Skip the 'open' event if (e.type === 'open') { return; } - + // When events have been specified, only allow those events if (node.events.length > 0 && !node.events.includes(e.type)) { return; } - - // When a previous timer is available, stop it - if (node.timerId) { - clearTimeout(node.timerId); - node.timerId = null; - } - + // When a timeout is specified, start a new timer (that restarts the SSE client) - if (node.restart) { - node.timerId = setTimeout(function restartClient() { - // Restart the SSE client by resending the last message again - handleMsg(node.prevMsg); - }, node.timeout * 1000); - } - + if (node.restart) startTimeoutTimer(); + // Send the received SSE event in the output message node.send({ event: e.type, payload: e.data }); } - + + // 08/04/2022 Supergiovane: Function to that starts the timeout timer. + function startTimeoutTimer() { + // When a previous timer is available, stop it + if (node.timerId !== null) clearTimeout(node.timerId); + node.timerId = setTimeout(function restartClient() { + // Restart the SSE client by resending the last message again + node.status({ fill: "yellow", shape: "dot", text: "Timeout. Restarting." }); + if (node.prevMsg === null) node.prevMsg = { payload: true }; + handleMsg(node.prevMsg); + //node.send({ payload: "Reconnecting..." }); + }, node.timeout * 1000); + } + function handleMsg(msg) { + // When a stream is paused or stopped, stop the active timeout timer (since no events will be received anyway) if (msg.pause === true || msg.stop === true) { - if (node.timerId) { + if (node.timerId) { clearTimeout(node.timerId); node.timerId = null; } } - + // Check whether the stream should be paused if (msg.pause === true) { - node.status({fill: 'yellow', shape: 'ring', text: 'paused'}); + node.status({ fill: 'yellow', shape: 'ring', text: 'paused' }); node.paused = true; - + return; } - + // To stop the streaming, close the client if (msg.stop === true) { - node.status({fill: 'red', shape: 'ring', text: 'disconnected'}); + node.status({ fill: 'red', shape: 'ring', text: 'disconnected' }); if (node.client) { node.client.close(); } node.client = null; return; } - + // When the previous client is NOT paused, we will stop it and create a new one (to send a new http GET). if (node.client && !node.paused) { - node.status({fill: 'red', shape: 'ring', text: 'disconnected'}); + setTimeout(() => { + node.status({ fill: 'red', shape: 'ring', text: 'disconnected' }); + }, 1000); // 08/04/2022 Supergiovane: Allow the "reconnecting" status to be shown, in case of reconnection by timeout. node.client.close(); node.client = null; } - + // When we arrive here, a new stream should be started or a paused stream should be restarted - + // When the previous client is paused, then resume it again if (node.client && node.paused) { - node.status({fill: "green", shape: "dot", text: "connected"}); - node.paused = false; - - // When a timeout is specified, start a new timer (that restarts the SSE client) - if (node.restart) { - node.timerId = setTimeout(function restartClient() { - // Restart the SSE client by resending the last message again - handleMsg(node.prevMsg); - }, node.timeout * 1000); - } + node.status({ fill: "green", shape: "dot", text: "connected" }); + node.paused = false; } - + + // When a timeout is specified, start a new timer (that restarts the SSE client) + if (node.restart) startTimeoutTimer(); + // When no client is available, just create one (and connect to the server) ... if (!node.client) { // All EventSource parameter should be passed to the constructor as a dictionary - var options = {}; - var url = ''; + let options = {}; + let url = ''; // Allow override of headers if (msg.headers && isEmpty(node.headers)) { @@ -152,11 +154,11 @@ module.exports = function(RED) { options.headers = node.headers; node.warn('Warning: msg properties can not override set node properties. Using set node properties.'); } - + if (node.proxyUrl) { options.proxy = node.proxyUrl; } - + options.https = { rejectUnauthorized: node.rejectUnauthorized, /*TODO @@ -166,9 +168,9 @@ module.exports = function(RED) { passphrase: 'test1234$' */ } - + options.withCredentials = node.withCredentials; - + // Allow override of url if (msg.url && !node.url) { url = msg.url; @@ -178,49 +180,53 @@ module.exports = function(RED) { url = node.url; node.warn('Warning: msg properties can not override set node properties. Using set node properties.'); } else { - node.status({fill: "red", shape: "dot", text: "no url"}); + node.status({ fill: "red", shape: "dot", text: "no url" }); return; } - + if (isTemplatedUrl) { url = mustache.render(url, msg); } - + // Start a new stream (i.e. send a new http get) node.client = new EventSource(url, options); - node.client.onopen = function() { - node.status({fill: "green", shape: "dot", text: "connected"}); + node.client.onopen = function () { + setTimeout(() => { + node.status({ fill: "green", shape: "dot", text: "connected" }); + }, 1000); // 08/04/2022 Supergiovane: Allow the "reconnecting" status to be shown, in case of reconnection by timeout. + } - node.client.onerror = function(err) { - node.status({fill: "red", shape: "dot", text: `Error: ${err.message}`}); + node.client.onerror = function (err) { + node.status({ fill: "red", shape: "dot", text: `Error: ${err.message}` }); } } - + // Handle ALL events. - node.client.onAnyMessage = function(eventType, event) { + node.client.onAnyMessage = function (eventType, event) { handleEvent(event); } } - - node.on("input", function(msg) { - node.prevMsg = msg; - handleMsg(msg); + + node.on("input", function (msg) { + let _msg = RED.util.cloneMessage(msg); + node.prevMsg = _msg; + handleMsg(_msg); }); - node.on("close", function() { - node.status({fill: "red", shape: "ring", text: "disconnected"}); - if (node.client) { + node.on("close", function () { + node.status({ fill: "red", shape: "ring", text: "disconnected" }); + if (node.client) { node.client.close(); } node.paused = false; - + if (node.timerId) { clearTimeout(node.timerId); node.timerId = null; } - }); + }); } RED.nodes.registerType("sse-client", SseClientNode); From 562f40fcea5603d61fcb3f27545ba0b6eccec6c7 Mon Sep 17 00:00:00 2001 From: Massimo <37666636+Supergiovane@users.noreply.github.com> Date: Fri, 8 Apr 2022 14:13:58 +0200 Subject: [PATCH 2/5] . --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index 9d0a159..b1dc3ef 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "node-red-contrib-sse-client", - "version": "0.2.3", + "version": "0.2.4", "description": "A Node Red node to receive Server Sent events", "dependencies": { "original": "^1.0.0", From c8ef0149a676d363874929e1788c31a42ab80506 Mon Sep 17 00:00:00 2001 From: Supergiovane Date: Sat, 9 Apr 2022 08:56:37 +0200 Subject: [PATCH 3/5] Delete launch.json --- .vscode/launch.json | 18 ------------------ 1 file changed, 18 deletions(-) delete mode 100644 .vscode/launch.json diff --git a/.vscode/launch.json b/.vscode/launch.json deleted file mode 100644 index 026af9b..0000000 --- a/.vscode/launch.json +++ /dev/null @@ -1,18 +0,0 @@ -{ - // Usare IntelliSense per informazioni sui possibili attributi. - // Al passaggio del mouse vengono visualizzate le descrizioni degli attributi esistenti. - // Per altre informazioni, visitare: https://go.microsoft.com/fwlink/?linkid=830387 - "version": "0.2.0", - "configurations": [ - - { - "type": "pwa-node", - "request": "launch", - "name": "Launch Program", - "skipFiles": [ - "/**" - ], - "program": "${workspaceFolder}/sse_client.js" - } - ] -} \ No newline at end of file From 746703261357be63c4cc7fe08bce09bee2da8690 Mon Sep 17 00:00:00 2001 From: Massimo <37666636+Supergiovane@users.noreply.github.com> Date: Sat, 9 Apr 2022 09:25:50 +0200 Subject: [PATCH 4/5] . --- .vscode/launch.json | 18 ------------------ sse_client.js | 2 +- 2 files changed, 1 insertion(+), 19 deletions(-) delete mode 100644 .vscode/launch.json diff --git a/.vscode/launch.json b/.vscode/launch.json deleted file mode 100644 index 026af9b..0000000 --- a/.vscode/launch.json +++ /dev/null @@ -1,18 +0,0 @@ -{ - // Usare IntelliSense per informazioni sui possibili attributi. - // Al passaggio del mouse vengono visualizzate le descrizioni degli attributi esistenti. - // Per altre informazioni, visitare: https://go.microsoft.com/fwlink/?linkid=830387 - "version": "0.2.0", - "configurations": [ - - { - "type": "pwa-node", - "request": "launch", - "name": "Launch Program", - "skipFiles": [ - "/**" - ], - "program": "${workspaceFolder}/sse_client.js" - } - ] -} \ No newline at end of file diff --git a/sse_client.js b/sse_client.js index 810da84..8043c26 100644 --- a/sse_client.js +++ b/sse_client.js @@ -84,7 +84,7 @@ module.exports = function (RED) { if (node.timerId !== null) clearTimeout(node.timerId); node.timerId = setTimeout(function restartClient() { // Restart the SSE client by resending the last message again - node.status({ fill: "yellow", shape: "dot", text: "Timeout. Restarting." }); + node.status({ fill: "yellow", shape: "dot", text: "timeout => restarting" }); if (node.prevMsg === null) node.prevMsg = { payload: true }; handleMsg(node.prevMsg); //node.send({ payload: "Reconnecting..." }); From a1a12584bbf927db4eb4384f1b47d77987ffbfda Mon Sep 17 00:00:00 2001 From: Massimo <37666636+Supergiovane@users.noreply.github.com> Date: Sat, 9 Apr 2022 09:27:22 +0200 Subject: [PATCH 5/5] . --- sse_client.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sse_client.js b/sse_client.js index 8043c26..1fd7aa9 100644 --- a/sse_client.js +++ b/sse_client.js @@ -78,7 +78,7 @@ module.exports = function (RED) { }); } - // 08/04/2022 Supergiovane: Function to that starts the timeout timer. + // 08/04/2022 Supergiovane: Function to start the timeout timer. function startTimeoutTimer() { // When a previous timer is available, stop it if (node.timerId !== null) clearTimeout(node.timerId);