Skip to content

Commit

Permalink
fix #255 - Queues shutdown (#256)
Browse files Browse the repository at this point in the history
* add queues shutdown

* initial queue tests file

* remove unused deps

* don't drain queues on shutdown
  • Loading branch information
ngmachado authored Oct 25, 2023
1 parent 3fdcdbd commit ad8ed26
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 3 deletions.
2 changes: 2 additions & 0 deletions src/app.js
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ class App {
try {
this.logger.info(`app.shutdown() - closing event tracker`);
this.eventTracker._disconnect();
this.logger.info(`app.shutdown() - closing queues`);
await this.queues.shutdown();
this.logger.info(`app.shutdown() - closing client`);
this.client.disconnect();
this.time.resetTime();
Expand Down
29 changes: 29 additions & 0 deletions src/protocol/queues.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ async function trigger (fn, ms) {
class Queues {
constructor (app) {
this.app = app;
this._isShutdown = false;
}

init () {
Expand All @@ -19,6 +20,7 @@ class Queues {
}

async run (fn, time) {
// don't run if shutting down
if (this.app._isShutdown) {
this.app.logger.info(`app.shutdown() - closing queues`);
return;
Expand Down Expand Up @@ -139,6 +141,9 @@ class Queues {
if (this.estimationQueue === undefined) {
throw Error("Queues.addQueuedEstimation(): Need EstimationQueue to be set first");
}
if(this._isShutdown) {
throw Error("Queues.addQueuedEstimation(): shutdown");
}
this.estimationQueue.push({
self: this,
account: account,
Expand All @@ -155,6 +160,29 @@ class Queues {
return this.estimationQueue.length();
}

async shutdown() {
try {
this._isShutdown = true;
this.app.circularBuffer.push("shutdown", null, "queues shutting down");

if(this.estimationQueue.length() > 0) {
this.app.circularBuffer.push("shutdown", null, `queues shutting down - estimationQueue length: ${this.estimationQueue.length()}`);
}

this.estimationQueue.pause();
this.app.logger.info("estimationQueue successfully shut down");

if(this.agreementUpdateQueue.length() > 0) {
this.app.circularBuffer.push("shutdown", null, `queues shutting down - agreementUpdateQueue length: ${this.agreementUpdateQueue.length()}`);
}
this.agreementUpdateQueue.pause();
this.app.logger.info("agreementUpdateQueue successfully shut down");

} catch (error) {
this.app.logger.error("Error during queue shutdown:", error);
}
}

async _handleAgreementEvents(task, senderFilter, source, eventName, getAgreementEventsFunc) {
const app = task.self.app;
let allFlowUpdatedEvents = await getAgreementEventsFunc(
Expand Down Expand Up @@ -227,6 +255,7 @@ class Queues {
source: event.source
}
}

}

module.exports = Queues;
3 changes: 0 additions & 3 deletions src/web3client/client.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
const { Web3 } = require('web3');
const { wad4human } = require("@decentral.ee/web3-helpers");
const BN = require("bn.js");
const ISuperToken = require("@superfluid-finance/ethereum-contracts/build/truffle/ISuperToken.json");

const AccountManager = require("./accountManager");
const RPCClient = require("./rpcClient");
Expand Down
53 changes: 53 additions & 0 deletions test/unit-tests/protocol/queues.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
const sinon = require("sinon");
const { expect } = require("chai");
const Queues = require("../../../src/protocol/queues");

describe("Queues", () => {
let queue;
let appMock;
let sandbox;

beforeEach(() => {
sandbox = sinon.createSandbox();

appMock = {
logger: {
debug: sinon.stub(),
info: sinon.stub(),
error: sinon.stub()
},

_isShutdown: false,
config: {
NUM_RETRIES: 3,
CONCURRENCY: 2
},
};

queue = new Queues(appMock);
});

afterEach(() => {
sandbox.restore();
});


it("#1.1 - should not run if app is shutting down", async () => {
appMock._isShutdown = true;
await queue.run(sinon.stub(), 5000);
expect(appMock.logger.info.calledOnce).to.be.true;
expect(appMock.logger.info.calledWith("app.shutdown() - closing queues")).to.be.true;
});


it("#1.2 - should not add tasks to the queue if app is shutting down", async () => {
queue.init();
await queue.shutdown();
try {
await queue.addQueuedEstimation("token", "account", "source");
expect.fail("Expected addQueuedEstimation to throw");
} catch (error) {
expect(error.message).to.include("Queues.addQueuedEstimation(): shutdown");
}
});
});

0 comments on commit ad8ed26

Please sign in to comment.