From ad8ed263e713d5d7b74f95bedb79e89339b428c9 Mon Sep 17 00:00:00 2001 From: Axe Date: Wed, 25 Oct 2023 11:16:12 +0100 Subject: [PATCH] fix #255 - Queues shutdown (#256) * add queues shutdown * initial queue tests file * remove unused deps * don't drain queues on shutdown --- src/app.js | 2 + src/protocol/queues.js | 29 ++++++++++++++ src/web3client/client.js | 3 -- test/unit-tests/protocol/queues.test.js | 53 +++++++++++++++++++++++++ 4 files changed, 84 insertions(+), 3 deletions(-) create mode 100644 test/unit-tests/protocol/queues.test.js diff --git a/src/app.js b/src/app.js index 8855a9d3..5b796194 100644 --- a/src/app.js +++ b/src/app.js @@ -135,6 +135,8 @@ class App { try { this.logger.info(`app.shutdown() - closing event tracker`); this.eventTracker._disconnect(); + this.logger.info(`app.shutdown() - closing queues`); + await this.queues.shutdown(); this.logger.info(`app.shutdown() - closing client`); this.client.disconnect(); this.time.resetTime(); diff --git a/src/protocol/queues.js b/src/protocol/queues.js index 352e089c..a227b15d 100644 --- a/src/protocol/queues.js +++ b/src/protocol/queues.js @@ -11,6 +11,7 @@ async function trigger (fn, ms) { class Queues { constructor (app) { this.app = app; + this._isShutdown = false; } init () { @@ -19,6 +20,7 @@ class Queues { } async run (fn, time) { + // don't run if shutting down if (this.app._isShutdown) { this.app.logger.info(`app.shutdown() - closing queues`); return; @@ -139,6 +141,9 @@ class Queues { if (this.estimationQueue === undefined) { throw Error("Queues.addQueuedEstimation(): Need EstimationQueue to be set first"); } + if(this._isShutdown) { + throw Error("Queues.addQueuedEstimation(): shutdown"); + } this.estimationQueue.push({ self: this, account: account, @@ -155,6 +160,29 @@ class Queues { return this.estimationQueue.length(); } + async shutdown() { + try { + this._isShutdown = true; + this.app.circularBuffer.push("shutdown", null, "queues shutting down"); + + if(this.estimationQueue.length() > 0) { + this.app.circularBuffer.push("shutdown", null, `queues shutting down - estimationQueue length: ${this.estimationQueue.length()}`); + } + + this.estimationQueue.pause(); + this.app.logger.info("estimationQueue successfully shut down"); + + if(this.agreementUpdateQueue.length() > 0) { + this.app.circularBuffer.push("shutdown", null, `queues shutting down - agreementUpdateQueue length: ${this.agreementUpdateQueue.length()}`); + } + this.agreementUpdateQueue.pause(); + this.app.logger.info("agreementUpdateQueue successfully shut down"); + + } catch (error) { + this.app.logger.error("Error during queue shutdown:", error); + } + } + async _handleAgreementEvents(task, senderFilter, source, eventName, getAgreementEventsFunc) { const app = task.self.app; let allFlowUpdatedEvents = await getAgreementEventsFunc( @@ -227,6 +255,7 @@ class Queues { source: event.source } } + } module.exports = Queues; \ No newline at end of file diff --git a/src/web3client/client.js b/src/web3client/client.js index a85a023e..fda9f9a3 100644 --- a/src/web3client/client.js +++ b/src/web3client/client.js @@ -1,7 +1,4 @@ -const { Web3 } = require('web3'); const { wad4human } = require("@decentral.ee/web3-helpers"); -const BN = require("bn.js"); -const ISuperToken = require("@superfluid-finance/ethereum-contracts/build/truffle/ISuperToken.json"); const AccountManager = require("./accountManager"); const RPCClient = require("./rpcClient"); diff --git a/test/unit-tests/protocol/queues.test.js b/test/unit-tests/protocol/queues.test.js new file mode 100644 index 00000000..2a8eaf68 --- /dev/null +++ b/test/unit-tests/protocol/queues.test.js @@ -0,0 +1,53 @@ +const sinon = require("sinon"); +const { expect } = require("chai"); +const Queues = require("../../../src/protocol/queues"); + +describe("Queues", () => { + let queue; + let appMock; + let sandbox; + + beforeEach(() => { + sandbox = sinon.createSandbox(); + + appMock = { + logger: { + debug: sinon.stub(), + info: sinon.stub(), + error: sinon.stub() + }, + + _isShutdown: false, + config: { + NUM_RETRIES: 3, + CONCURRENCY: 2 + }, + }; + + queue = new Queues(appMock); + }); + + afterEach(() => { + sandbox.restore(); + }); + + + it("#1.1 - should not run if app is shutting down", async () => { + appMock._isShutdown = true; + await queue.run(sinon.stub(), 5000); + expect(appMock.logger.info.calledOnce).to.be.true; + expect(appMock.logger.info.calledWith("app.shutdown() - closing queues")).to.be.true; + }); + + + it("#1.2 - should not add tasks to the queue if app is shutting down", async () => { + queue.init(); + await queue.shutdown(); + try { + await queue.addQueuedEstimation("token", "account", "source"); + expect.fail("Expected addQueuedEstimation to throw"); + } catch (error) { + expect(error.message).to.include("Queues.addQueuedEstimation(): shutdown"); + } + }); +});