From a9b6f1be38f1c96655cff7298e45eae8ca1bdf54 Mon Sep 17 00:00:00 2001 From: regevbr Date: Fri, 22 Nov 2019 00:51:43 +0200 Subject: [PATCH] Fix #70 Add DLQ rebumitter tool --- src/resubmitter.ts | 76 +++++++++++++++++++--------------------------- 1 file changed, 32 insertions(+), 44 deletions(-) diff --git a/src/resubmitter.ts b/src/resubmitter.ts index 0d74de1..6238869 100644 --- a/src/resubmitter.ts +++ b/src/resubmitter.ts @@ -4,10 +4,6 @@ import {SQS_MAX_RECEIVE_BATCH, Squiss} from './Squiss'; import {IMessageToSend, ResubmitterConfig} from './Types'; import {Message} from './Message'; -interface MessageContext { - readonly message: Message; -} - const DEFAULT_SQUISS_OPTS = { receiveAttributes: ['All'], receiveSqsAttributes: ['All'], @@ -43,63 +39,55 @@ export class Resubmitter { return this.squissFrom.getManualBatch(numberOfMessageToRead); } - private _sendMessage(messageToSend: IMessageToSend, context: MessageContext) { - return this.squissTo.sendMessage(messageToSend, undefined, context.message.attributes); + private _sendMessage(messageToSend: IMessageToSend, message: Message) { + return this.squissTo.sendMessage(messageToSend, undefined, message.attributes); } - private _handleMessage(context: MessageContext): Promise { - return Promise.resolve() - .then(() => { - let body = context.message.body; - if (this.config.customMutator) { - body = this.config.customMutator(body); - } - return this._sendMessage(body, context); - }); + private _changeMessageVisibility(message: Message) { + return message.changeVisibility(this.config.releaseTimeoutSeconds); + } + + private _handleMessage(message: Message): Promise { + return Promise.resolve().then(() => { + console.log(`${++this.numHandledMessages} messages handled`); + const location = message.raw.MessageId ?? ''; + if (this.numHandledMessages > this.config.limit || this.handledMessages.has(location)) { + return this._changeMessageVisibility(message); + } + this.handledMessages.add(location); + let body = message.body; + if (this.config.customMutator) { + body = this.config.customMutator(body); + } + return this._sendMessage(body, message) + .then(() => { + return message.del(); + }) + .catch((err) => { + return this._changeMessageVisibility(message) + .then(() => { + return Promise.reject(err); + }); + }); + }); } private _iteration(): Promise { if (this.numHandledMessages >= this.config.limit || this.config.limit <= 0) { return Promise.resolve(); } - const remaining = Math.max(this.config.limit - this.numHandledMessages, 0); - const numberOfMessageToRead = Math.min(SQS_MAX_RECEIVE_BATCH, remaining); + const numberOfMessageToRead = + Math.min(SQS_MAX_RECEIVE_BATCH, Math.max(this.config.limit - this.numHandledMessages, 0)); if (numberOfMessageToRead <= 0) { return Promise.resolve(); } return this._readMessages(numberOfMessageToRead) .then((messages) => { if (!messages.length) { - // Make sure the iteration stops this.numHandledMessages = this.config.limit; return Promise.resolve(); } - const promises = messages.map((message) => { - const msgContext: MessageContext = { - message, - }; - return Promise.resolve().then(() => { - console.log(`${++this.numHandledMessages} messages handled`); - if (this.numHandledMessages > this.config.limit) { - return message.changeVisibility(this.config.releaseTimeoutSeconds); - } - const location = message.raw.MessageId ?? ''; - if (this.handledMessages.has(location)) { - return message.changeVisibility(this.config.releaseTimeoutSeconds); - } - this.handledMessages.add(location); - return this._handleMessage(msgContext) - .then(() => { - return message.del(); - }) - .catch((err) => { - return message.changeVisibility(this.config.releaseTimeoutSeconds) - .then(() => { - return Promise.reject(err); - }); - }); - }); - }); + const promises = messages.map(this._handleMessage.bind(this)); return Promise.all(promises).then(() => { return Promise.resolve(); });