From 2143e0902440a4de439fdd77408b94e6fc2af742 Mon Sep 17 00:00:00 2001 From: regevbr Date: Thu, 21 Nov 2019 23:50:20 +0200 Subject: [PATCH] Fix #70 Add DLQ rebumitter tool --- src/Types.ts | 10 ++ src/resubmitter/resubmitter.ts | 294 ++++++++++++++------------------- 2 files changed, 137 insertions(+), 167 deletions(-) diff --git a/src/Types.ts b/src/Types.ts index 6bd1176..9c90b9d 100644 --- a/src/Types.ts +++ b/src/Types.ts @@ -144,3 +144,13 @@ export interface ISquissEvents { } export type SquissEmitter = StrictEventEmitter; + +export type ResubmitterMutator = (body: any) => any; + +export interface ResubmitterConfig { + readonly resubmitFromQueueConfig: ISquissOptions; + readonly resubmitToQueueConfig: ISquissOptions; + readonly limit: number; + readonly customMutator?: ResubmitterMutator; + readonly releaseTimeoutSeconds: number; +} diff --git a/src/resubmitter/resubmitter.ts b/src/resubmitter/resubmitter.ts index be6381a..5339243 100644 --- a/src/resubmitter/resubmitter.ts +++ b/src/resubmitter/resubmitter.ts @@ -1,167 +1,127 @@ -// 'use strict'; -// -// import {SQS} from 'aws-sdk'; -// import {SQS_MAX_RECEIVE_BATCH, Squiss} from '../Squiss'; -// import {ISquissOptions} from '../Types'; -// -// const DEFAULT_SQUISS_OPTS = { -// receiveAttributes: ['All'], -// receiveSqsAttributes: ['All'], -// minReceiveBatchSize: 0, -// }; -// -// export type Mutator = (body: any) => any; -// -// export interface ResubmitConfig { -// readonly resubmitFromQueueConfig: ISquissOptions; -// readonly resubmitToQueueConfig: ISquissOptions; -// readonly limit: number; -// readonly customMutator?: Mutator; -// readonly releaseTimeoutSeconds: number; -// } -// -// export const resubmit = (config: ResubmitConfig) => { -// const runContext = buildRunContext(config); -// const handledMessages = new Set(); -// return iteration({ -// handledMessages, -// numHandledMessages: 0, -// runContext, -// limit: config.limit, -// customMutator: config.customMutator, -// }); -// }; -// -// const iteration = (context: IterationContext): Promise => { -// if (context.numHandledMessages >= context.limit || context.limit <= 0) { -// return Promise.resolve(); -// } -// const remaining = Math.max(context.limit - context.numHandledMessages, 0); -// const numberOfMessageToRead = Math.min(SQS_MAX_RECEIVE_BATCH, remaining); -// if (numberOfMessageToRead <= 0) { -// return Promise.resolve(); -// } -// return readMessages(numberOfMessageToRead, context.runContext) -// .then((messages) => { -// if (!messages.length) { -// // Make sure the iteration stops -// context.numHandledMessages = context.limit; -// return Promise.resolve(); -// } -// const promises = messages.map((message) => { -// const msgContext: MessageContext = { -// ...context.runContext, -// message, -// }; -// return Promise.resolve().then(() => { -// console.log(`${++context.numHandledMessages} messages handled`); -// if (context.numHandledMessages > context.limit) { -// return releaseMessage(msgContext); -// } -// const location = message.MessageId ?? ''; -// if (context.handledMessages.has(location)) { -// return releaseMessage(msgContext); -// } -// context.handledMessages.add(location); -// return handleMessage(context.customMutator, msgContext) -// .catch((err) => { -// releaseMessage(msgContext); -// return Promise.reject(err); -// }); -// }); -// }); -// return Promise.all(promises).then(() => { -// return Promise.resolve(); -// }); -// }) -// .then(() => { -// return iteration(context); -// }); -// }; -// -// interface IterationContext { -// readonly limit: number; -// readonly runContext: RunContext; -// numHandledMessages: number; -// readonly customMutator?: Mutator; -// readonly handledMessages: Set; -// } -// -// interface RunContext { -// readonly squissFrom: Squiss; -// readonly squissTo: Squiss; -// readonly releaseTimeoutSeconds: number; -// } -// -// interface MessageContext extends RunContext { -// readonly message: SQS.Message; -// } -// -// const buildRunContext = (config: ResubmitConfig): RunContext => { -// const squissFrom = new Squiss({ -// ...config.resubmitFromQueueConfig, -// ...DEFAULT_SQUISS_OPTS, -// }); -// const squissTo = new Squiss({ -// ...config.resubmitToQueueConfig, -// ...DEFAULT_SQUISS_OPTS, -// }); -// return { -// squissFrom, -// squissTo, -// releaseTimeoutSeconds: config.releaseTimeoutSeconds, -// }; -// }; -// -// const readMessages = (numberOfMessageToRead: number, context: RunContext) => { -// return context.squissFrom.getManualBatch(numberOfMessageToRead); -// }; -// -// const handleMessage = (customMutator: Mutator | undefined, context: MessageContext): Promise => { -// let getBodyPromise: Promise; -// if (!customMutator) { -// getBodyPromise = Promise.resolve(context.message.Body ?? ''); -// } else { -// const mutateResult = mutateMessageToSend(customMutator, context); -// getBodyPromise = mutateResult.mutatePromise; -// } -// return getBodyPromise -// .then((body) => { -// return sendMessage(body, context); -// }) -// .then(() => { -// return deleteMessage(context); -// }); -// }; -// -// const sendMessage = (body: string, context: MessageContext) => { -// return context.sqs -// .sendMessage({ -// QueueUrl: context.fullQueueName, -// MessageAttributes: context.message.MessageAttributes, -// MessageBody: body, -// MessageDeduplicationId: context.message.MessageAttributes?.MessageId?.StringValue, -// MessageGroupId: context.message.MessageAttributes?.MessageId?.StringValue, -// }) -// .promise(); -// }; -// -// const mutateMessageToSend = (customMutator: Mutator, context: MessageContext) => { -// const originalBody = context.message.Body ?? ''; -// const getMessageBodyFromS3Result = getMessageBodyFromS3(originalBody, context); -// const s3UploadData = getMessageBodyFromS3Result.s3UploadData; -// const mutatePromise = getMessageBodyFromS3Result.promise -// .then((bodyToDigest) => { -// return unzipMessage(bodyToDigest, context); -// }) -// .then((bodyStr: string): Promise => { -// const toMutate = context.isJson ? JSON.parse(bodyStr) : bodyStr; -// const mutatedObject = customMutator(toMutate); -// const mutated = context.isJson ? JSON.stringify(mutatedObject) : mutatedObject; -// return zipMessage(mutated, context); -// }) -// .then((toSendBody) => { -// return uploadMessageBodyToS3(toSendBody, s3UploadData, context); -// }); -// return {mutatePromise, s3UploadData}; -// }; +'use strict'; + +import {SQS_MAX_RECEIVE_BATCH, Squiss} from '../Squiss'; +import {IMessageToSend, ResubmitterConfig, ResubmitterMutator} from '../Types'; +import {Message} from '../Message'; + +interface IterationContext { + readonly limit: number; + readonly runContext: RunContext; + numHandledMessages: number; + readonly customMutator?: ResubmitterMutator; + readonly handledMessages: Set; + readonly releaseTimeoutSeconds: number; +} + +interface RunContext { + readonly squissFrom: Squiss; + readonly squissTo: Squiss; +} + +interface MessageContext extends RunContext { + readonly message: Message; +} + +const DEFAULT_SQUISS_OPTS = { + receiveAttributes: ['All'], + receiveSqsAttributes: ['All'], + minReceiveBatchSize: 0, + unwrapSns: false, +}; + +const iteration = (context: IterationContext): Promise => { + if (context.numHandledMessages >= context.limit || context.limit <= 0) { + return Promise.resolve(); + } + const remaining = Math.max(context.limit - context.numHandledMessages, 0); + const numberOfMessageToRead = Math.min(SQS_MAX_RECEIVE_BATCH, remaining); + if (numberOfMessageToRead <= 0) { + return Promise.resolve(); + } + return readMessages(numberOfMessageToRead, context.runContext) + .then((messages) => { + if (!messages.length) { + // Make sure the iteration stops + context.numHandledMessages = context.limit; + return Promise.resolve(); + } + const promises = messages.map((message) => { + const msgContext: MessageContext = { + ...context.runContext, + message, + }; + return Promise.resolve().then(() => { + console.log(`${++context.numHandledMessages} messages handled`); + if (context.numHandledMessages > context.limit) { + return message.changeVisibility(context.releaseTimeoutSeconds); + } + const location = message.raw.MessageId ?? ''; + if (context.handledMessages.has(location)) { + return message.changeVisibility(context.releaseTimeoutSeconds); + } + context.handledMessages.add(location); + return handleMessage(context.customMutator, msgContext) + .then(() => { + return message.del(); + }) + .catch((err) => { + message.changeVisibility(context.releaseTimeoutSeconds); + return Promise.reject(err); + }); + }); + }); + return Promise.all(promises).then(() => { + return Promise.resolve(); + }); + }) + .then(() => { + return iteration(context); + }); +}; + +const buildRunContext = (config: ResubmitterConfig): RunContext => { + const squissFrom = new Squiss({ + ...config.resubmitFromQueueConfig, + ...DEFAULT_SQUISS_OPTS, + }); + const squissTo = new Squiss({ + ...config.resubmitToQueueConfig, + ...DEFAULT_SQUISS_OPTS, + }); + return { + squissFrom, + squissTo, + }; +}; + +const readMessages = (numberOfMessageToRead: number, context: RunContext) => { + return context.squissFrom.getManualBatch(numberOfMessageToRead); +}; + +const sendMessage = (messageToSend: IMessageToSend, context: MessageContext) => { + return context.squissTo.sendMessage(messageToSend, undefined, context.message.attributes); +}; + +const handleMessage = (customMutator: ResubmitterMutator | undefined, context: MessageContext): Promise => { + return Promise.resolve() + .then(() => { + let body = context.message.body; + if (customMutator) { + body = customMutator(body); + } + return sendMessage(body, context); + }); +}; + +export const resubmit = (config: ResubmitterConfig) => { + const runContext = buildRunContext(config); + const handledMessages = new Set(); + return iteration({ + releaseTimeoutSeconds: config.releaseTimeoutSeconds, + handledMessages, + numHandledMessages: 0, + runContext, + limit: config.limit, + customMutator: config.customMutator, + }); +};