From 999e7bd101a68eba8a40b6dd734b1f4eecd75896 Mon Sep 17 00:00:00 2001 From: Matthias Pfeil Date: Wed, 18 Sep 2024 16:33:08 +0200 Subject: [PATCH 1/5] disable reconnect --- docker-compose.yml | 2 - package-lock.json | 161 ++++++++++++++++++++++++++++------------ package.json | 2 +- src/client.js | 2 +- src/mqttConfigParser.js | 4 + test/docker-compose.yml | 2 - 6 files changed, 118 insertions(+), 55 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index d8d5fd3..baa669e 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,5 +1,3 @@ -version: "3.9" - services: integration: diff --git a/package-lock.json b/package-lock.json index d7f8cfa..7086c31 100644 --- a/package-lock.json +++ b/package-lock.json @@ -15,7 +15,7 @@ "@sensebox/osem-protos": "^1.1.0", "express": "^4.18.2", "jsonwebtoken": "^9.0.2", - "mqtt": "^5.0.5", + "mqtt": "^5.10.1", "pino": "^8.15.1", "restify-errors": "^8.0.2", "socket.io": "^4.7.2" @@ -35,6 +35,17 @@ "node": ">=0.10.0" } }, + "node_modules/@babel/runtime": { + "version": "7.25.6", + "resolved": "https://registry.npmjs.org/@babel/runtime/-/runtime-7.25.6.tgz", + "integrity": "sha512-VBj9MYyDb9tuLq7yzqjgzt6Q+IBQLrGZfdjOekyEirZPHxXWoTSGUTMrpsfi58Up73d13NfYLv8HT9vmznjzhQ==", + "dependencies": { + "regenerator-runtime": "^0.14.0" + }, + "engines": { + "node": ">=6.9.0" + } + }, "node_modules/@eslint-community/eslint-utils": { "version": "4.4.0", "dev": true, @@ -516,8 +527,9 @@ "license": "MIT" }, "node_modules/@types/readable-stream": { - "version": "4.0.2", - "license": "MIT", + "version": "4.0.15", + "resolved": "https://registry.npmjs.org/@types/readable-stream/-/readable-stream-4.0.15.tgz", + "integrity": "sha512-oAZ3kw+kJFkEqyh7xORZOku1YAKvsFTogRY8kVl4vHpEKiDkfnSA/My8haRE7fvmix5Zyy+1pwzOi7yycGLBJw==", "dependencies": { "@types/node": "*", "safe-buffer": "~5.1.1" @@ -531,8 +543,9 @@ } }, "node_modules/@types/ws": { - "version": "8.5.5", - "license": "MIT", + "version": "8.5.12", + "resolved": "https://registry.npmjs.org/@types/ws/-/ws-8.5.12.tgz", + "integrity": "sha512-3tPRkv1EtkDpzlgyKyI8pGsGZAGPEaXeu0DOj5DI25Ja91bdAYddYHbADRYVrZMRbfW+1l5YwXVDKohDJNQxkQ==", "dependencies": { "@types/node": "*" } @@ -753,12 +766,29 @@ } }, "node_modules/bl": { - "version": "5.1.0", - "license": "MIT", + "version": "6.0.15", + "resolved": "https://registry.npmjs.org/bl/-/bl-6.0.15.tgz", + "integrity": "sha512-RGhjD1XCPS7ZdAH6cEJVaR3gLV4KJP2hvkQ49AH5kwScjiyd0jBM8RsP4oHKzcx+kNCON9752zPeRnuv0HHwzw==", "dependencies": { + "@types/readable-stream": "^4.0.0", "buffer": "^6.0.3", "inherits": "^2.0.4", - "readable-stream": "^3.4.0" + "readable-stream": "^4.2.0" + } + }, + "node_modules/bl/node_modules/readable-stream": { + "version": "4.5.2", + "resolved": "https://registry.npmjs.org/readable-stream/-/readable-stream-4.5.2.tgz", + "integrity": "sha512-yjavECdqeZ3GLXNgRXgeQEdz9fvDDkNKyHnbHRFtOr7/LcfgBcmct7t/ET+HaCTqfh06OzoAxrkN/IfjJBVe+g==", + "dependencies": { + "abort-controller": "^3.0.0", + "buffer": "^6.0.3", + "events": "^3.3.0", + "process": "^0.11.10", + "string_decoder": "^1.3.0" + }, + "engines": { + "node": "^12.22.0 || ^14.17.0 || >=16.0.0" } }, "node_modules/bluebird": { @@ -1358,16 +1388,6 @@ "node": ">=6.0.0" } }, - "node_modules/duplexify": { - "version": "4.1.2", - "license": "MIT", - "dependencies": { - "end-of-stream": "^1.4.1", - "inherits": "^2.0.3", - "readable-stream": "^3.1.1", - "stream-shift": "^1.0.0" - } - }, "node_modules/ecdsa-sig-formatter": { "version": "1.0.11", "resolved": "https://registry.npmjs.org/ecdsa-sig-formatter/-/ecdsa-sig-formatter-1.0.11.tgz", @@ -1816,6 +1836,18 @@ "node": ">=6" } }, + "node_modules/fast-unique-numbers": { + "version": "8.0.13", + "resolved": "https://registry.npmjs.org/fast-unique-numbers/-/fast-unique-numbers-8.0.13.tgz", + "integrity": "sha512-7OnTFAVPefgw2eBJ1xj2PGGR9FwYzSUso9decayHgCDX4sJkHLdcsYTytTg+tYv+wKF3U8gJuSBz2jJpQV4u/g==", + "dependencies": { + "@babel/runtime": "^7.23.8", + "tslib": "^2.6.2" + }, + "engines": { + "node": ">=16.1.0" + } + }, "node_modules/fastq": { "version": "1.15.0", "dev": true, @@ -2165,12 +2197,9 @@ } }, "node_modules/help-me": { - "version": "4.2.0", - "license": "MIT", - "dependencies": { - "glob": "^8.0.0", - "readable-stream": "^3.6.0" - } + "version": "5.0.0", + "resolved": "https://registry.npmjs.org/help-me/-/help-me-5.0.0.tgz", + "integrity": "sha512-7xgomUX6ADmcYzFik0HzAxh/73YlKR9bmFzf51CZwR+b6YtzU2m0u49hQCqV6SvlqIqsaxovfwdvbnsw3b/zpg==" }, "node_modules/hooks-fixed": { "version": "2.0.2", @@ -3010,25 +3039,26 @@ "license": "MIT" }, "node_modules/mqtt": { - "version": "5.0.5", - "license": "MIT", + "version": "5.10.1", + "resolved": "https://registry.npmjs.org/mqtt/-/mqtt-5.10.1.tgz", + "integrity": "sha512-hXCOki8sANoQ7w+2OzJzg6qMBxTtrH9RlnVNV8panLZgnl+Gh0J/t4k6r8Az8+C7y3KAcyXtn0mmLixyUom8Sw==", "dependencies": { - "@types/readable-stream": "^4.0.1", - "@types/ws": "^8.5.5", + "@types/readable-stream": "^4.0.5", + "@types/ws": "^8.5.9", "commist": "^3.2.0", "concat-stream": "^2.0.0", "debug": "^4.3.4", - "duplexify": "^4.1.2", - "help-me": "^4.2.0", - "lru-cache": "^7.18.3", + "help-me": "^5.0.0", + "lru-cache": "^10.0.1", "minimist": "^1.2.8", - "mqtt-packet": "^8.2.0", + "mqtt-packet": "^9.0.0", "number-allocator": "^1.0.14", "readable-stream": "^4.4.2", "reinterval": "^1.1.0", "rfdc": "^1.3.0", "split2": "^4.2.0", - "ws": "^8.13.0" + "worker-timers": "^7.1.4", + "ws": "^8.17.1" }, "bin": { "mqtt": "build/bin/mqtt.js", @@ -3040,20 +3070,19 @@ } }, "node_modules/mqtt-packet": { - "version": "8.2.0", - "license": "MIT", + "version": "9.0.0", + "resolved": "https://registry.npmjs.org/mqtt-packet/-/mqtt-packet-9.0.0.tgz", + "integrity": "sha512-8v+HkX+fwbodsWAZIZTI074XIoxVBOmPeggQuDFCGg1SqNcC+uoRMWu7J6QlJPqIUIJXmjNYYHxBBLr1Y/Df4w==", "dependencies": { - "bl": "^5.0.0", - "debug": "^4.1.1", + "bl": "^6.0.8", + "debug": "^4.3.4", "process-nextick-args": "^2.0.1" } }, "node_modules/mqtt/node_modules/lru-cache": { - "version": "7.18.3", - "license": "ISC", - "engines": { - "node": ">=12" - } + "version": "10.4.3", + "resolved": "https://registry.npmjs.org/lru-cache/-/lru-cache-10.4.3.tgz", + "integrity": "sha512-JNAzZcXrCt42VGLuYz0zfAzDfAvJWW6AfYlDBQyDV5DClI2m5sAmK+OIO7s59XfsRsWHp02jAJrRadPRGTt6SQ==" }, "node_modules/mqtt/node_modules/readable-stream": { "version": "4.4.2", @@ -3464,7 +3493,8 @@ }, "node_modules/process-nextick-args": { "version": "2.0.1", - "license": "MIT" + "resolved": "https://registry.npmjs.org/process-nextick-args/-/process-nextick-args-2.0.1.tgz", + "integrity": "sha512-3ouUOpQhtgrbOa17J7+uxOTpITYWaGP7/AhoR3+A+/1e9skrzelGi/dXzEYyvbxubEF6Wn2ypscTKiKJFFn1ag==" }, "node_modules/process-warning": { "version": "2.2.0", @@ -3666,6 +3696,11 @@ "node": ">=4" } }, + "node_modules/regenerator-runtime": { + "version": "0.14.1", + "resolved": "https://registry.npmjs.org/regenerator-runtime/-/regenerator-runtime-0.14.1.tgz", + "integrity": "sha512-dYnhHh0nJoMfnkZs6GmmhFknAGRrLznOu5nc9ML+EJxGvrx6H7teuevqVqCuPcPK//3eDrrjQhehXVx9cnkGdw==" + }, "node_modules/regexp-clone": { "version": "0.0.1", "license": "MIT" @@ -4046,10 +4081,6 @@ "node": ">= 0.8" } }, - "node_modules/stream-shift": { - "version": "1.0.1", - "license": "MIT" - }, "node_modules/string_decoder": { "version": "1.3.0", "license": "MIT", @@ -4338,6 +4369,37 @@ "node": ">=0.10.0" } }, + "node_modules/worker-timers": { + "version": "7.1.8", + "resolved": "https://registry.npmjs.org/worker-timers/-/worker-timers-7.1.8.tgz", + "integrity": "sha512-R54psRKYVLuzff7c1OTFcq/4Hue5Vlz4bFtNEIarpSiCYhpifHU3aIQI29S84o1j87ePCYqbmEJPqwBTf+3sfw==", + "dependencies": { + "@babel/runtime": "^7.24.5", + "tslib": "^2.6.2", + "worker-timers-broker": "^6.1.8", + "worker-timers-worker": "^7.0.71" + } + }, + "node_modules/worker-timers-broker": { + "version": "6.1.8", + "resolved": "https://registry.npmjs.org/worker-timers-broker/-/worker-timers-broker-6.1.8.tgz", + "integrity": "sha512-FUCJu9jlK3A8WqLTKXM9E6kAmI/dR1vAJ8dHYLMisLNB/n3GuaFIjJ7pn16ZcD1zCOf7P6H62lWIEBi+yz/zQQ==", + "dependencies": { + "@babel/runtime": "^7.24.5", + "fast-unique-numbers": "^8.0.13", + "tslib": "^2.6.2", + "worker-timers-worker": "^7.0.71" + } + }, + "node_modules/worker-timers-worker": { + "version": "7.0.71", + "resolved": "https://registry.npmjs.org/worker-timers-worker/-/worker-timers-worker-7.0.71.tgz", + "integrity": "sha512-ks/5YKwZsto1c2vmljroppOKCivB/ma97g9y77MAAz2TBBjPPgpoOiS1qYQKIgvGTr2QYPT3XhJWIB6Rj2MVPQ==", + "dependencies": { + "@babel/runtime": "^7.24.5", + "tslib": "^2.6.2" + } + }, "node_modules/workerpool": { "version": "6.2.1", "dev": true, @@ -4363,8 +4425,9 @@ "license": "ISC" }, "node_modules/ws": { - "version": "8.14.1", - "license": "MIT", + "version": "8.18.0", + "resolved": "https://registry.npmjs.org/ws/-/ws-8.18.0.tgz", + "integrity": "sha512-8VbfWfHLbbwu3+N6OKsOMpBdT4kXPDDB9cJk2bJ6mh9ucxdlnNvH1e+roYkKmN9Nxw2yjz7VzeO9oOz2zJ04Pw==", "engines": { "node": ">=10.0.0" }, diff --git a/package.json b/package.json index 40e8e2a..94ea2be 100644 --- a/package.json +++ b/package.json @@ -28,7 +28,7 @@ "@sensebox/osem-protos": "^1.1.0", "express": "^4.18.2", "jsonwebtoken": "^9.0.2", - "mqtt": "^5.0.5", + "mqtt": "^5.10.1", "pino": "^8.15.1", "restify-errors": "^8.0.2", "socket.io": "^4.7.2" diff --git a/src/client.js b/src/client.js index fd08429..1f1330a 100644 --- a/src/client.js +++ b/src/client.js @@ -26,7 +26,7 @@ const getClient = function getClient ( let errRetries = maxRetries, closeRetries = maxRetries; const client = mqtt.connect(url, connectionOptions); - client.reconnecting = true; + // client.reconnecting = true; return new Promise(function (resolve, reject) { client.on('error', function (err) { diff --git a/src/mqttConfigParser.js b/src/mqttConfigParser.js index 5bdff65..3e06c37 100644 --- a/src/mqttConfigParser.js +++ b/src/mqttConfigParser.js @@ -46,6 +46,10 @@ const parseUserConnectionOptions = function parseUserConnectionOptions ( opts.connectTimeout = 5 * 1000; } + if (!opts.reconnectPeriod || isNaN(Number(opts.reconnectPeriod))) { + opts.reconnectPeriod = 0; + } + return opts; }; diff --git a/test/docker-compose.yml b/test/docker-compose.yml index 2e38b3e..12b12b2 100644 --- a/test/docker-compose.yml +++ b/test/docker-compose.yml @@ -1,5 +1,3 @@ -version: "3.9" - services: mosquitto: image: eclipse-mosquitto:2.0.15 From d9a368f7060aac228d745c639e8309c73b4eb8d5 Mon Sep 17 00:00:00 2001 From: Matthias Pfeil Date: Wed, 18 Sep 2024 16:43:25 +0200 Subject: [PATCH 2/5] update topic --- test/helpers/mqtts.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/helpers/mqtts.js b/test/helpers/mqtts.js index e860c80..dd71db4 100644 --- a/test/helpers/mqtts.js +++ b/test/helpers/mqtts.js @@ -8,7 +8,7 @@ module.exports = function mqttsBox ( // defaults enabled: true, url: config.get('url'), - topic: 'generalTestTopic', + topic: 'integration', messageFormat: 'csv', } ) { From 8cf267be30e2bec0e09e317a0a859d31a3b46d73 Mon Sep 17 00:00:00 2001 From: Matthias Pfeil Date: Wed, 18 Sep 2024 16:57:11 +0200 Subject: [PATCH 3/5] disable wss test --- test/test-ws.js | 62 ++++++++++++++++++++++++------------------------- 1 file changed, 31 insertions(+), 31 deletions(-) diff --git a/test/test-ws.js b/test/test-ws.js index 5bf38ab..c6d7ec5 100644 --- a/test/test-ws.js +++ b/test/test-ws.js @@ -51,38 +51,38 @@ describe('ws client', function () { }); }); -describe('wss client', function () { - let testBox; - before(async function () { - await connect(dbConnectionString({ db: 'mqttTest' })); +// describe('wss client', function () { +// let testBox; +// before(async function () { +// await connect(dbConnectionString({ db: 'mqttTest' })); - testBox = await Box.initNew(wssBox()); - mqttClient.connect(testBox); - const mqclient = mqtt.connect(testBox.integrations.mqtt.url); +// testBox = await Box.initNew(wssBox()); +// mqttClient.connect(testBox); +// const mqclient = mqtt.connect(testBox.integrations.mqtt.url); - return new Promise((resolve) => { - mqclient.on('connect', () => { - setTimeout(() => { - mqclient.subscribe(testBox.integrations.mqtt.topic); - mqclient.publish( - testBox.integrations.mqtt.topic, - testBox.sensors.map((s) => `${s._id},12`).join('\n') - ); - setTimeout(() => { - resolve(); - }, 1000); - }, 100); - }); - }); - }); +// return new Promise((resolve) => { +// mqclient.on('connect', () => { +// setTimeout(() => { +// mqclient.subscribe(testBox.integrations.mqtt.topic); +// mqclient.publish( +// testBox.integrations.mqtt.topic, +// testBox.sensors.map((s) => `${s._id},12`).join('\n') +// ); +// setTimeout(() => { +// resolve(); +// }, 1000); +// }, 100); +// }); +// }); +// }); - it('should accept measurements via wss message', async function () { - const box = await Box.findBoxById(testBox._id, { - onlyLastMeasurements: true, - }); +// it('should accept measurements via wss message', async function () { +// const box = await Box.findBoxById(testBox._id, { +// onlyLastMeasurements: true, +// }); - for (const sensor of box.sensors) { - expect(sensor.lastMeasurement.value).equal('12'); - } - }); -}); +// for (const sensor of box.sensors) { +// expect(sensor.lastMeasurement.value).equal('12'); +// } +// }); +// }); From e2eb01daff86135d3324fc5e40d5025af90b6e27 Mon Sep 17 00:00:00 2001 From: Matthias Pfeil Date: Wed, 18 Sep 2024 20:01:33 +0200 Subject: [PATCH 4/5] enable wss tests --- test/test-ws.js | 62 ++++++++++++++++++++++++------------------------- 1 file changed, 31 insertions(+), 31 deletions(-) diff --git a/test/test-ws.js b/test/test-ws.js index c6d7ec5..5bf38ab 100644 --- a/test/test-ws.js +++ b/test/test-ws.js @@ -51,38 +51,38 @@ describe('ws client', function () { }); }); -// describe('wss client', function () { -// let testBox; -// before(async function () { -// await connect(dbConnectionString({ db: 'mqttTest' })); +describe('wss client', function () { + let testBox; + before(async function () { + await connect(dbConnectionString({ db: 'mqttTest' })); -// testBox = await Box.initNew(wssBox()); -// mqttClient.connect(testBox); -// const mqclient = mqtt.connect(testBox.integrations.mqtt.url); + testBox = await Box.initNew(wssBox()); + mqttClient.connect(testBox); + const mqclient = mqtt.connect(testBox.integrations.mqtt.url); -// return new Promise((resolve) => { -// mqclient.on('connect', () => { -// setTimeout(() => { -// mqclient.subscribe(testBox.integrations.mqtt.topic); -// mqclient.publish( -// testBox.integrations.mqtt.topic, -// testBox.sensors.map((s) => `${s._id},12`).join('\n') -// ); -// setTimeout(() => { -// resolve(); -// }, 1000); -// }, 100); -// }); -// }); -// }); + return new Promise((resolve) => { + mqclient.on('connect', () => { + setTimeout(() => { + mqclient.subscribe(testBox.integrations.mqtt.topic); + mqclient.publish( + testBox.integrations.mqtt.topic, + testBox.sensors.map((s) => `${s._id},12`).join('\n') + ); + setTimeout(() => { + resolve(); + }, 1000); + }, 100); + }); + }); + }); -// it('should accept measurements via wss message', async function () { -// const box = await Box.findBoxById(testBox._id, { -// onlyLastMeasurements: true, -// }); + it('should accept measurements via wss message', async function () { + const box = await Box.findBoxById(testBox._id, { + onlyLastMeasurements: true, + }); -// for (const sensor of box.sensors) { -// expect(sensor.lastMeasurement.value).equal('12'); -// } -// }); -// }); + for (const sensor of box.sensors) { + expect(sensor.lastMeasurement.value).equal('12'); + } + }); +}); From 1b24f1db10a3c77f89b2063d80db3601a8fffec0 Mon Sep 17 00:00:00 2001 From: Matthias Pfeil Date: Thu, 19 Sep 2024 17:11:32 +0200 Subject: [PATCH 5/5] add bullmq and redis to handle mqtt connection retries --- .gitignore | 1 + config/default.json | 10 ++- docker-compose.yml | 9 ++ local-redis-stack.conf | 33 ++++++++ package-lock.json | 180 ++++++++++++++++++++++++++++++++-------- package.json | 1 + src/client.js | 63 +++++++------- src/index.js | 2 + src/mqttConfigParser.js | 2 + src/queue.js | 67 +++++++++++++++ src/worker.js | 35 ++++++++ src/worker/retry.js | 12 +++ 12 files changed, 352 insertions(+), 63 deletions(-) create mode 100644 local-redis-stack.conf create mode 100644 src/queue.js create mode 100644 src/worker.js create mode 100644 src/worker/retry.js diff --git a/.gitignore b/.gitignore index 3317dbe..c45c89c 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,4 @@ node_modules config/development.json out yarn-error.log +local-data/ diff --git a/config/default.json b/config/default.json index 69311b2..1854190 100644 --- a/config/default.json +++ b/config/default.json @@ -6,7 +6,7 @@ }, "mqtt_client": { "num_retries": 5, - "retry_after_minutes": 10 + "retry_after_minutes": 1 }, "grpc": { "port": 3925, @@ -19,6 +19,14 @@ "websocket": { "port": 5173 }, + "redis": { + "host": "localhost", + "port": 6379, + "username": "worker", + "password": "somepassword", + "db": 0, + "queue": "mqtt" + }, "jwt": { "secret": "OH GOD THIS IS SO INSECURE PLS CHANGE ME", "algorithm": "HS256", diff --git a/docker-compose.yml b/docker-compose.yml index baa669e..1962b12 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -23,6 +23,15 @@ services: } } + redis-stack: + image: redis/redis-stack:latest + volumes: + - ./local-redis-stack.conf:/redis-stack.conf + - ./local-data:/data + ports: + - 6379:6379 + - 8001:8001 + db: image: mongo:5 container_name: osem-dev-mongo diff --git a/local-redis-stack.conf b/local-redis-stack.conf new file mode 100644 index 0000000..bed32e7 --- /dev/null +++ b/local-redis-stack.conf @@ -0,0 +1,33 @@ +# Redis configuration file example. +# +# Note that in order to read the configuration file, Redis must be +# started with the file path as first argument: +# +# ./redis-server /path/to/redis.conf + +# Note on units: when memory size is needed, it is possible to specify +# it in the usual form of 1k 5GB 4M and so forth: +# +# 1k => 1000 bytes +# 1kb => 1024 bytes +# 1m => 1000000 bytes +# 1mb => 1024*1024 bytes +# 1g => 1000000000 bytes +# 1gb => 1024*1024*1024 bytes +# +# units are case insensitive so 1GB 1Gb 1gB are all the same. + +################################## NETWORK ##################################### + +port 6379 + +############################## APPEND ONLY MODE ############################### + +appendonly yes + +################################## SECURITY ################################### + +user queue on +@all -@dangerous +INFO ~bull:mqtt:* >somepassword +user worker on +@all -@dangerous +INFO +CLIENT ~bull:mqtt:* >somepassword + +requirepass super-secret \ No newline at end of file diff --git a/package-lock.json b/package-lock.json index 7086c31..77ba94f 100644 --- a/package-lock.json +++ b/package-lock.json @@ -13,6 +13,7 @@ "@grpc/proto-loader": "^0.7.9", "@sensebox/opensensemap-api-models": "3.1.0", "@sensebox/osem-protos": "^1.1.0", + "bullmq": "^5.13.1", "express": "^4.18.2", "jsonwebtoken": "^9.0.2", "mqtt": "^5.10.1", @@ -221,7 +222,8 @@ }, "node_modules/@ioredis/commands": { "version": "1.2.0", - "license": "MIT" + "resolved": "https://registry.npmjs.org/@ioredis/commands/-/commands-1.2.0.tgz", + "integrity": "sha512-Sx1pU8EM64o2BrqNpEO1CNLtKQwyhuXuqyfH7oGKCk+1a33d2r5saW8zNwm3j6BTExtjrv2BxTgzzkMwts6vGg==" }, "node_modules/@mapbox/node-pre-gyp": { "version": "1.0.10", @@ -255,16 +257,77 @@ } }, "node_modules/@msgpackr-extract/msgpackr-extract-darwin-arm64": { - "version": "3.0.2", + "version": "3.0.3", + "resolved": "https://registry.npmjs.org/@msgpackr-extract/msgpackr-extract-darwin-arm64/-/msgpackr-extract-darwin-arm64-3.0.3.tgz", + "integrity": "sha512-QZHtlVgbAdy2zAqNA9Gu1UpIuI8Xvsd1v8ic6B2pZmeFnFcMWiPLfWXh7TVw4eGEZ/C9TH281KwhVoeQUKbyjw==", "cpu": [ "arm64" ], - "license": "MIT", "optional": true, "os": [ "darwin" ] }, + "node_modules/@msgpackr-extract/msgpackr-extract-darwin-x64": { + "version": "3.0.3", + "resolved": "https://registry.npmjs.org/@msgpackr-extract/msgpackr-extract-darwin-x64/-/msgpackr-extract-darwin-x64-3.0.3.tgz", + "integrity": "sha512-mdzd3AVzYKuUmiWOQ8GNhl64/IoFGol569zNRdkLReh6LRLHOXxU4U8eq0JwaD8iFHdVGqSy4IjFL4reoWCDFw==", + "cpu": [ + "x64" + ], + "optional": true, + "os": [ + "darwin" + ] + }, + "node_modules/@msgpackr-extract/msgpackr-extract-linux-arm": { + "version": "3.0.3", + "resolved": "https://registry.npmjs.org/@msgpackr-extract/msgpackr-extract-linux-arm/-/msgpackr-extract-linux-arm-3.0.3.tgz", + "integrity": "sha512-fg0uy/dG/nZEXfYilKoRe7yALaNmHoYeIoJuJ7KJ+YyU2bvY8vPv27f7UKhGRpY6euFYqEVhxCFZgAUNQBM3nw==", + "cpu": [ + "arm" + ], + "optional": true, + "os": [ + "linux" + ] + }, + "node_modules/@msgpackr-extract/msgpackr-extract-linux-arm64": { + "version": "3.0.3", + "resolved": "https://registry.npmjs.org/@msgpackr-extract/msgpackr-extract-linux-arm64/-/msgpackr-extract-linux-arm64-3.0.3.tgz", + "integrity": "sha512-YxQL+ax0XqBJDZiKimS2XQaf+2wDGVa1enVRGzEvLLVFeqa5kx2bWbtcSXgsxjQB7nRqqIGFIcLteF/sHeVtQg==", + "cpu": [ + "arm64" + ], + "optional": true, + "os": [ + "linux" + ] + }, + "node_modules/@msgpackr-extract/msgpackr-extract-linux-x64": { + "version": "3.0.3", + "resolved": "https://registry.npmjs.org/@msgpackr-extract/msgpackr-extract-linux-x64/-/msgpackr-extract-linux-x64-3.0.3.tgz", + "integrity": "sha512-cvwNfbP07pKUfq1uH+S6KJ7dT9K8WOE4ZiAcsrSes+UY55E/0jLYc+vq+DO7jlmqRb5zAggExKm0H7O/CBaesg==", + "cpu": [ + "x64" + ], + "optional": true, + "os": [ + "linux" + ] + }, + "node_modules/@msgpackr-extract/msgpackr-extract-win32-x64": { + "version": "3.0.3", + "resolved": "https://registry.npmjs.org/@msgpackr-extract/msgpackr-extract-win32-x64/-/msgpackr-extract-win32-x64-3.0.3.tgz", + "integrity": "sha512-x0fWaQtYp4E6sktbsdAqnehxDgEc/VwM7uLsRCYWaiGu0ykYdZPiS8zCWdnjHwyiumousxfBm4SO31eXqwEZhQ==", + "cpu": [ + "x64" + ], + "optional": true, + "os": [ + "win32" + ] + }, "node_modules/@netflix/nerror": { "version": "1.1.3", "resolved": "https://registry.npmjs.org/@netflix/nerror/-/nerror-1.1.3.tgz", @@ -398,6 +461,33 @@ "node": ">=6" } }, + "node_modules/@sensebox/opensensemap-api-models/node_modules/bullmq": { + "version": "3.15.8", + "resolved": "https://registry.npmjs.org/bullmq/-/bullmq-3.15.8.tgz", + "integrity": "sha512-k3uimHGhl5svqD7SEak+iI6c5DxeLOaOXzCufI9Ic0ST3nJr69v71TGR4cXCTXdgCff3tLec5HgoBnfyWjgn5A==", + "dependencies": { + "cron-parser": "^4.6.0", + "glob": "^8.0.3", + "ioredis": "^5.3.2", + "lodash": "^4.17.21", + "msgpackr": "^1.6.2", + "semver": "^7.3.7", + "tslib": "^2.0.0", + "uuid": "^9.0.0" + } + }, + "node_modules/@sensebox/opensensemap-api-models/node_modules/bullmq/node_modules/uuid": { + "version": "9.0.1", + "resolved": "https://registry.npmjs.org/uuid/-/uuid-9.0.1.tgz", + "integrity": "sha512-b+1eJOlsR9K8HJpow9Ok3fiWOWSIcIzXodvv0rQjVoOVNpWMpxf1wZNpt4y9h10odCNrqnYp1OBzRktckBe3sA==", + "funding": [ + "https://github.com/sponsors/broofa", + "https://github.com/sponsors/ctavan" + ], + "bin": { + "uuid": "dist/bin/uuid" + } + }, "node_modules/@sensebox/opensensemap-api-models/node_modules/protobufjs": { "version": "6.11.2", "hasInstallScript": true, @@ -892,15 +982,15 @@ "license": "MIT" }, "node_modules/bullmq": { - "version": "3.15.8", - "license": "MIT", + "version": "5.13.1", + "resolved": "https://registry.npmjs.org/bullmq/-/bullmq-5.13.1.tgz", + "integrity": "sha512-9Ss2GzV+VVA2Q/+qUxQ7zxAwfLWBwc7DIUNboq0EdXGRf2Ia+Qi7BwT51rnxglu4b/0SRsSElevRT8IZc7HvtQ==", "dependencies": { "cron-parser": "^4.6.0", - "glob": "^8.0.3", - "ioredis": "^5.3.2", - "lodash": "^4.17.21", - "msgpackr": "^1.6.2", - "semver": "^7.3.7", + "ioredis": "^5.4.1", + "msgpackr": "^1.10.1", + "node-abort-controller": "^3.1.1", + "semver": "^7.5.4", "tslib": "^2.0.0", "uuid": "^9.0.0" } @@ -1106,7 +1196,8 @@ }, "node_modules/cluster-key-slot": { "version": "1.1.2", - "license": "Apache-2.0", + "resolved": "https://registry.npmjs.org/cluster-key-slot/-/cluster-key-slot-1.1.2.tgz", + "integrity": "sha512-RMr0FhtfXemyinomL4hrWcYJxmX6deFdCxpJzhDttxgO1+bcCnkk+9drydLVDmAMG7NE6aN/fl4F7ucU/90gAA==", "engines": { "node": ">=0.10.0" } @@ -1340,7 +1431,8 @@ }, "node_modules/denque": { "version": "2.1.0", - "license": "Apache-2.0", + "resolved": "https://registry.npmjs.org/denque/-/denque-2.1.0.tgz", + "integrity": "sha512-HVQE3AAb/pxF8fQAoiqpvg9i3evqug3hoiwakOyZAwJm+6vZehbkYXZ0l4JxS+I3QxM97v5aaRNhj8v5oBhekw==", "engines": { "node": ">=0.10" } @@ -2065,7 +2157,9 @@ }, "node_modules/glob": { "version": "8.1.0", - "license": "ISC", + "resolved": "https://registry.npmjs.org/glob/-/glob-8.1.0.tgz", + "integrity": "sha512-r8hpEjiQEYlF2QU0df3dS+nxxSIreXQS1qRhMJM0Q5NDdR386C7jb7Hwwod8Fgiuex+k0GFjgft18yvxm5XoCQ==", + "deprecated": "Glob versions prior to v9 are no longer supported", "dependencies": { "fs.realpath": "^1.0.0", "inflight": "^1.0.4", @@ -2093,7 +2187,8 @@ }, "node_modules/glob/node_modules/minimatch": { "version": "5.1.6", - "license": "ISC", + "resolved": "https://registry.npmjs.org/minimatch/-/minimatch-5.1.6.tgz", + "integrity": "sha512-lKwV/1brpG6mBUFHtb7NUmtABCb2WZZmm2wNiOA5hAb8VdCS4B3dtMWyvcoViccwAW/COERjXLt0zP1zXUN26g==", "dependencies": { "brace-expansion": "^2.0.1" }, @@ -2321,8 +2416,9 @@ "license": "ISC" }, "node_modules/ioredis": { - "version": "5.3.2", - "license": "MIT", + "version": "5.4.1", + "resolved": "https://registry.npmjs.org/ioredis/-/ioredis-5.4.1.tgz", + "integrity": "sha512-2YZsvl7jopIa1gaePkeMtd9rAcSjOOjPtpcLlOeusyO+XH2SK5ZcT+UCrElPP+WVIInh2TzeI4XW9ENaSLVVHA==", "dependencies": { "@ioredis/commands": "^1.1.1", "cluster-key-slot": "^1.1.0", @@ -2611,7 +2707,8 @@ }, "node_modules/lodash.defaults": { "version": "4.2.0", - "license": "MIT" + "resolved": "https://registry.npmjs.org/lodash.defaults/-/lodash.defaults-4.2.0.tgz", + "integrity": "sha512-qjxPLHd3r5DnsdGacqOMU6pb/avJzdh9tFX2ymgoZE27BmjXrNy/y4LoaiTeAb+O3gL8AfpJGtqfX/ae2leYYQ==" }, "node_modules/lodash.get": { "version": "4.4.2", @@ -2624,7 +2721,8 @@ }, "node_modules/lodash.isarguments": { "version": "3.1.0", - "license": "MIT" + "resolved": "https://registry.npmjs.org/lodash.isarguments/-/lodash.isarguments-3.1.0.tgz", + "integrity": "sha512-chi4NHZlZqZD18a0imDHnZPrDeBbTtVN7GXMwuGdRH9qotxAjYs3aVLKc7zNOG9eddR5Ksd8rvFEBc9SsggPpg==" }, "node_modules/lodash.isboolean": { "version": "3.0.3", @@ -3124,30 +3222,32 @@ "license": "MIT" }, "node_modules/msgpackr": { - "version": "1.9.9", - "license": "MIT", + "version": "1.11.0", + "resolved": "https://registry.npmjs.org/msgpackr/-/msgpackr-1.11.0.tgz", + "integrity": "sha512-I8qXuuALqJe5laEBYoFykChhSXLikZmUhccjGsPuSJ/7uPip2TJ7lwdIQwWSAi0jGZDXv4WOP8Qg65QZRuXxXw==", "optionalDependencies": { "msgpackr-extract": "^3.0.2" } }, "node_modules/msgpackr-extract": { - "version": "3.0.2", + "version": "3.0.3", + "resolved": "https://registry.npmjs.org/msgpackr-extract/-/msgpackr-extract-3.0.3.tgz", + "integrity": "sha512-P0efT1C9jIdVRefqjzOQ9Xml57zpOXnIuS+csaB4MdZbTdmGDLo8XhzBG1N7aO11gKDDkJvBLULeFTo46wwreA==", "hasInstallScript": true, - "license": "MIT", "optional": true, "dependencies": { - "node-gyp-build-optional-packages": "5.0.7" + "node-gyp-build-optional-packages": "5.2.2" }, "bin": { "download-msgpackr-prebuilds": "bin/download-prebuilds.js" }, "optionalDependencies": { - "@msgpackr-extract/msgpackr-extract-darwin-arm64": "3.0.2", - "@msgpackr-extract/msgpackr-extract-darwin-x64": "3.0.2", - "@msgpackr-extract/msgpackr-extract-linux-arm": "3.0.2", - "@msgpackr-extract/msgpackr-extract-linux-arm64": "3.0.2", - "@msgpackr-extract/msgpackr-extract-linux-x64": "3.0.2", - "@msgpackr-extract/msgpackr-extract-win32-x64": "3.0.2" + "@msgpackr-extract/msgpackr-extract-darwin-arm64": "3.0.3", + "@msgpackr-extract/msgpackr-extract-darwin-x64": "3.0.3", + "@msgpackr-extract/msgpackr-extract-linux-arm": "3.0.3", + "@msgpackr-extract/msgpackr-extract-linux-arm64": "3.0.3", + "@msgpackr-extract/msgpackr-extract-linux-x64": "3.0.3", + "@msgpackr-extract/msgpackr-extract-win32-x64": "3.0.3" } }, "node_modules/muri": { @@ -3178,6 +3278,11 @@ "node": ">= 0.6" } }, + "node_modules/node-abort-controller": { + "version": "3.1.1", + "resolved": "https://registry.npmjs.org/node-abort-controller/-/node-abort-controller-3.1.1.tgz", + "integrity": "sha512-AGK2yQKIjRuqnc6VkX2Xj5d+QW8xZ87pa1UK6yA6ouUyuxfHuMP6umE5QK7UmTeOAymo+Zx1Fxiuw9rVx8taHQ==" + }, "node_modules/node-addon-api": { "version": "5.0.0", "license": "MIT" @@ -3201,9 +3306,13 @@ } }, "node_modules/node-gyp-build-optional-packages": { - "version": "5.0.7", - "license": "MIT", + "version": "5.2.2", + "resolved": "https://registry.npmjs.org/node-gyp-build-optional-packages/-/node-gyp-build-optional-packages-5.2.2.tgz", + "integrity": "sha512-s+w+rBWnpTMwSFbaE0UXsRlg7hU4FjekKU4eyAih5T8nJuNZT1nNsskXpxmeqSK9UzkBl6UgRlnKc8hz8IEqOw==", "optional": true, + "dependencies": { + "detect-libc": "^2.0.1" + }, "bin": { "node-gyp-build-optional-packages": "bin.js", "node-gyp-build-optional-packages-optional": "optional.js", @@ -3681,14 +3790,16 @@ }, "node_modules/redis-errors": { "version": "1.2.0", - "license": "MIT", + "resolved": "https://registry.npmjs.org/redis-errors/-/redis-errors-1.2.0.tgz", + "integrity": "sha512-1qny3OExCf0UvUV/5wpYKf2YwPcOqXzkwKKSmKHiE6ZMQs5heeE/c8eXK+PNllPvmjgAbfnsbpkGZWy8cBpn9w==", "engines": { "node": ">=4" } }, "node_modules/redis-parser": { "version": "3.0.0", - "license": "MIT", + "resolved": "https://registry.npmjs.org/redis-parser/-/redis-parser-3.0.0.tgz", + "integrity": "sha512-DJnGAeenTdpMEH6uAJRK/uiyEIH9WVsUmoLwzudwGJUwZPp80PDBWPHXSAGNPwNvIXAbe7MSUB1zQFugFml66A==", "dependencies": { "redis-errors": "^1.0.0" }, @@ -4064,7 +4175,8 @@ }, "node_modules/standard-as-callback": { "version": "2.1.0", - "license": "MIT" + "resolved": "https://registry.npmjs.org/standard-as-callback/-/standard-as-callback-2.1.0.tgz", + "integrity": "sha512-qoRRSyROncaz1z0mvYqIE4lCd9p2R90i6GxW3uZv5ucSu8tU7B5HXUP1gG8pVZsYNVaXjk8ClXHPttLyxAL48A==" }, "node_modules/static-eval": { "version": "2.0.2", diff --git a/package.json b/package.json index 94ea2be..47d9f04 100644 --- a/package.json +++ b/package.json @@ -26,6 +26,7 @@ "@grpc/proto-loader": "^0.7.9", "@sensebox/opensensemap-api-models": "3.1.0", "@sensebox/osem-protos": "^1.1.0", + "bullmq": "^5.13.1", "express": "^4.18.2", "jsonwebtoken": "^9.0.2", "mqtt": "^5.10.1", diff --git a/src/client.js b/src/client.js index 1f1330a..9283784 100644 --- a/src/client.js +++ b/src/client.js @@ -5,48 +5,31 @@ const { sendWebsocketMessage } = require('./websocket-server'); const mqtt = require('mqtt'), parseMQTTConfig = require('./mqttConfigParser'), log = require('./logger'), + { addJob } = require('./queue'), config = require('config').get('mqtt_client'); -const RETRY_AFTER_MINUTES = Number(config.get('retry_after_minutes')), - NUMBER_RETRIES = Number(config.get('num_retries')); +const RETRY_AFTER_MINUTES = Number(config.get('retry_after_minutes')); // use this object as simple key/value store for connecting/disconnecting const mqttConnections = {}; -const _retryAfter = function _retryAfter (box, afterMinutes) { - setTimeout(() => { - connect(box); - }, afterMinutes * 60000); -}; - const getClient = function getClient ( { url, connectionOptions, topic }, - maxRetries ) { - let errRetries = maxRetries, closeRetries = maxRetries; const client = mqtt.connect(url, connectionOptions); - // client.reconnecting = true; return new Promise(function (resolve, reject) { client.on('error', function (err) { - errRetries = errRetries - 1; - if (errRetries === 0) { - client.reconnecting = false; - client.end(true); + client.end(true); - return reject(err); - } + return reject(err); }); client.on('close', function () { - closeRetries = closeRetries - 1; - if (closeRetries === 0) { - client.reconnecting = false; - client.end(true); + client.end(true); - return reject(new Error('connection closed after 5 retries.')); - } + return reject(new Error('Connection closed.')); }); client.on('connect', function () { @@ -81,8 +64,7 @@ const connect = async function connect (box) { try { const client = await getClient( - mqttCfg.internalConnectionOptions, - NUMBER_RETRIES + mqttCfg.internalConnectionOptions ); const msg = { 'mqtt-client': `ℹ️ connected mqtt for box ${box._id}` }; sendWebsocketMessage(box._id, msg); @@ -90,10 +72,24 @@ const connect = async function connect (box) { mqttConnections[box._id] = client; + client.on('error', function (err) { + const msg = { + 'mqtt-client': `🚨 mqtt error for box ${box._id}. Retry after ${RETRY_AFTER_MINUTES} minutes.`, + }; + log.error(err, msg); + sendWebsocketMessage(box._id, msg); + + // Add box to retry queue + addJob('retry_connect', { box }); + }); + client.on('close', function () { const msg = { 'mqtt-client': `ℹ️ mqtt closed for box: ${box._id}` }; sendWebsocketMessage(box._id, msg); log.info(msg); + + // Add box to retry queue + addJob('retry_connect', { box }); }); client.on('message', mqttCfg.decodeAndSaveMessage); @@ -103,8 +99,18 @@ const connect = async function connect (box) { }; log.error(err, msg); sendWebsocketMessage(box._id, msg); - // retry after.. - _retryAfter(box, RETRY_AFTER_MINUTES); + + // Add box to retry queue + const status = await addJob('retry_connect', { box }); + switch (status) { + case 'MAX_ATTEMPTS': + disconnect(box); + break; + + default: + // Nothing to do here + break; + } } }; @@ -112,7 +118,8 @@ const disconnect = function disconnect ({ _id }) { if (mqttConnections[_id]) { mqttConnections[_id].end(true); mqttConnections[_id] = undefined; - const msg = { 'mqtt-client': `ℹ️ mqtt disconnected mqtt for box ${_id}` }; + + const msg = { 'mqtt-client': `ℹ️ mqtt disconnected for box ${_id}` }; log.info(msg); sendWebsocketMessage(_id, msg); } diff --git a/src/index.js b/src/index.js index 261d68b..a85521c 100644 --- a/src/index.js +++ b/src/index.js @@ -7,6 +7,7 @@ const { log = require('./logger.js'), MQTTClient = require('./client'), grpcServer = require('./grpc-server'), + worker = require('./worker.js'), websocketServer = require('./websocket-server.js'); const findMQTTBoxes = function findMQTTBoxes () { @@ -35,6 +36,7 @@ const onDbConnected = async function onDbConnected () { try { grpcServer.init(); websocketServer.init(); + worker.init(); } catch (err) { log.fatal(err); mongoose.disconnect().then(function () { diff --git a/src/mqttConfigParser.js b/src/mqttConfigParser.js index 3e06c37..592d034 100644 --- a/src/mqttConfigParser.js +++ b/src/mqttConfigParser.js @@ -46,6 +46,8 @@ const parseUserConnectionOptions = function parseUserConnectionOptions ( opts.connectTimeout = 5 * 1000; } + // Disable reconnecting in MQTT.js + // https://github.com/mqttjs/MQTT.js?tab=readme-ov-file#enabling-reconnection-with-reconnectperiod-option if (!opts.reconnectPeriod || isNaN(Number(opts.reconnectPeriod))) { opts.reconnectPeriod = 0; } diff --git a/src/queue.js b/src/queue.js new file mode 100644 index 0000000..d3b932a --- /dev/null +++ b/src/queue.js @@ -0,0 +1,67 @@ +'use strict'; + +const redis = require('config').get('redis'); +const config = require('config').get('mqtt_client'); +const { Queue } = require('bullmq'); +const logger = require('./logger'); + +const RETRY_AFTER_MINUTES = Number(config.get('retry_after_minutes')); +const NUMBER_RETRIES = Number(config.get('num_retries')); + +let queue; + +const retryAttempts = {}; + +const requestQueue = () => { + if (queue) { + return queue; + } + queue = new Queue(redis.get('queue'), { + connection: { + host: redis.get('host'), + port: redis.get('port'), + username: redis.get('username'), + password: redis.get('password'), + db: redis.get('db'), + } + }); + + return queue; +}; + +const addJob = async (hash, data) => { + if (!retryAttempts[data.box._id]) { + retryAttempts[data.box._id] = 0; + } + + logger.info(`Retry Attempts for ${data.box._id}: ${retryAttempts[data.box._id]}`); + if (retryAttempts[data.box._id] === NUMBER_RETRIES) { + delete retryAttempts[data.box._id]; + + return 'MAX_ATTEMPTS'; + } + + try { + await requestQueue() + .add(hash, data, { + delay: RETRY_AFTER_MINUTES * 60000, + removeOnComplete: { + age: 72 * 3600, // keep up to 72 hours + count: 1000, // keep up to 1000 jobs + }, + removeOnFail: { + age: 72 * 3600, // keep up to 72 hours + }, + }); + retryAttempts[data.box._id] = retryAttempts[data.box._id] + 1; + + return 'JOB_ADDED'; + } catch (error) { + logger.error(error); + } +}; + +module.exports = { + requestQueue, + addJob +}; diff --git a/src/worker.js b/src/worker.js new file mode 100644 index 0000000..2e04bb8 --- /dev/null +++ b/src/worker.js @@ -0,0 +1,35 @@ +'use strict'; + +const config = require('config').get('redis'); +const { Worker } = require('bullmq'); +const { retry } = require('./worker/retry'); +const logger = require('./logger'); + +const init = function init () { + try { + const worker = new Worker(config.get('queue'), retry, { + connection: { + host: config.get('host'), + port: config.get('port'), + username: config.get('username'), + password: config.get('password'), + db: config.get('db'), + }, + autorun: false, + }); + + worker.on('error', (err) => { + logger.error(err); + }); + + // Start worker + worker.run(); + } catch (error) { + logger.error(error); + } + +}; + +module.exports = { + init +}; diff --git a/src/worker/retry.js b/src/worker/retry.js new file mode 100644 index 0000000..de51f9a --- /dev/null +++ b/src/worker/retry.js @@ -0,0 +1,12 @@ +'use strict'; + +const { connect } = require('../client'); + +const retry = async function retry (job) { + const { box } = job.data; + connect(box); +}; + +module.exports = { + retry +};