Skip to content

Commit

Permalink
add queues shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
ngmachado committed Oct 23, 2023
1 parent 3fdcdbd commit bd7d79f
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 0 deletions.
2 changes: 2 additions & 0 deletions src/app.js
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ class App {
try {
this.logger.info(`app.shutdown() - closing event tracker`);
this.eventTracker._disconnect();
this.logger.info(`app.shutdown() - closing queues`);
await this.queues.shutdown();
this.logger.info(`app.shutdown() - closing client`);
this.client.disconnect();
this.time.resetTime();
Expand Down
32 changes: 32 additions & 0 deletions src/protocol/queues.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ async function trigger (fn, ms) {
class Queues {
constructor (app) {
this.app = app;
this._isShutdown = false;
}

init () {
Expand All @@ -19,6 +20,7 @@ class Queues {
}

async run (fn, time) {
// don't run if shutting down
if (this.app._isShutdown) {
this.app.logger.info(`app.shutdown() - closing queues`);
return;
Expand Down Expand Up @@ -139,6 +141,9 @@ class Queues {
if (this.estimationQueue === undefined) {
throw Error("Queues.addQueuedEstimation(): Need EstimationQueue to be set first");
}
if(this._isShutdown) {
throw Error("Queues.addQueuedEstimation(): shutdown");
}
this.estimationQueue.push({
self: this,
account: account,
Expand All @@ -155,6 +160,32 @@ class Queues {
return this.estimationQueue.length();
}

async shutdown() {
try {
this._isShutdown = true;
this.app.circularBuffer.push("shutdown", null, "queues shutting down");
// Check if estimationQueue is active before draining
if (!this.estimationQueue.paused) {
if(this.estimationQueue.length() > 0) {
await this.estimationQueue.drain();
}
this.estimationQueue.pause();
this.app.logger.info("estimationQueue successfully shut down");
}

// Check if agreementUpdateQueue is active before draining
if (!this.agreementUpdateQueue.paused) {
if(this.agreementUpdateQueue.length() > 0) {
await this.agreementUpdateQueue.drain();
}
this.agreementUpdateQueue.pause();
this.app.logger.info("agreementUpdateQueue successfully shut down");
}
} catch (error) {
this.app.logger.error("Error during queue shutdown:", error);
}
}

async _handleAgreementEvents(task, senderFilter, source, eventName, getAgreementEventsFunc) {
const app = task.self.app;
let allFlowUpdatedEvents = await getAgreementEventsFunc(
Expand Down Expand Up @@ -227,6 +258,7 @@ class Queues {
source: event.source
}
}

}

module.exports = Queues;

0 comments on commit bd7d79f

Please sign in to comment.