Skip to content

Commit

Permalink
Fix #70 Add DLQ rebumitter tool
Browse files Browse the repository at this point in the history
  • Loading branch information
regevbr committed Nov 21, 2019
1 parent 49fa7c4 commit a9b6f1b
Showing 1 changed file with 32 additions and 44 deletions.
76 changes: 32 additions & 44 deletions src/resubmitter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,6 @@ import {SQS_MAX_RECEIVE_BATCH, Squiss} from './Squiss';
import {IMessageToSend, ResubmitterConfig} from './Types';
import {Message} from './Message';

interface MessageContext {
readonly message: Message;
}

const DEFAULT_SQUISS_OPTS = {
receiveAttributes: ['All'],
receiveSqsAttributes: ['All'],
Expand Down Expand Up @@ -43,63 +39,55 @@ export class Resubmitter {
return this.squissFrom.getManualBatch(numberOfMessageToRead);
}

private _sendMessage(messageToSend: IMessageToSend, context: MessageContext) {
return this.squissTo.sendMessage(messageToSend, undefined, context.message.attributes);
private _sendMessage(messageToSend: IMessageToSend, message: Message) {
return this.squissTo.sendMessage(messageToSend, undefined, message.attributes);
}

private _handleMessage(context: MessageContext): Promise<any> {
return Promise.resolve()
.then(() => {
let body = context.message.body;
if (this.config.customMutator) {
body = this.config.customMutator(body);
}
return this._sendMessage(body, context);
});
private _changeMessageVisibility(message: Message) {
return message.changeVisibility(this.config.releaseTimeoutSeconds);
}

private _handleMessage(message: Message): Promise<void> {
return Promise.resolve().then(() => {
console.log(`${++this.numHandledMessages} messages handled`);
const location = message.raw.MessageId ?? '';
if (this.numHandledMessages > this.config.limit || this.handledMessages.has(location)) {
return this._changeMessageVisibility(message);
}
this.handledMessages.add(location);
let body = message.body;
if (this.config.customMutator) {
body = this.config.customMutator(body);
}
return this._sendMessage(body, message)
.then(() => {
return message.del();
})
.catch((err) => {
return this._changeMessageVisibility(message)
.then(() => {
return Promise.reject(err);
});
});
});
}

private _iteration(): Promise<void> {
if (this.numHandledMessages >= this.config.limit || this.config.limit <= 0) {
return Promise.resolve();
}
const remaining = Math.max(this.config.limit - this.numHandledMessages, 0);
const numberOfMessageToRead = Math.min(SQS_MAX_RECEIVE_BATCH, remaining);
const numberOfMessageToRead =
Math.min(SQS_MAX_RECEIVE_BATCH, Math.max(this.config.limit - this.numHandledMessages, 0));
if (numberOfMessageToRead <= 0) {
return Promise.resolve();
}
return this._readMessages(numberOfMessageToRead)
.then((messages) => {
if (!messages.length) {
// Make sure the iteration stops
this.numHandledMessages = this.config.limit;
return Promise.resolve();
}
const promises = messages.map((message) => {
const msgContext: MessageContext = {
message,
};
return Promise.resolve().then(() => {
console.log(`${++this.numHandledMessages} messages handled`);
if (this.numHandledMessages > this.config.limit) {
return message.changeVisibility(this.config.releaseTimeoutSeconds);
}
const location = message.raw.MessageId ?? '';
if (this.handledMessages.has(location)) {
return message.changeVisibility(this.config.releaseTimeoutSeconds);
}
this.handledMessages.add(location);
return this._handleMessage(msgContext)
.then(() => {
return message.del();
})
.catch((err) => {
return message.changeVisibility(this.config.releaseTimeoutSeconds)
.then(() => {
return Promise.reject(err);
});
});
});
});
const promises = messages.map(this._handleMessage.bind(this));
return Promise.all(promises).then(() => {
return Promise.resolve();
});
Expand Down

0 comments on commit a9b6f1b

Please sign in to comment.