Skip to content

Commit

Permalink
feat: add optional manualAck
Browse files Browse the repository at this point in the history
  • Loading branch information
Piotr Frankowski committed Oct 17, 2020
1 parent 1b59ca5 commit a8b070c
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 7 deletions.
4 changes: 4 additions & 0 deletions e2e/contracts/mock.contracts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,7 @@ export namespace TimeOutContracts {
export namespace AckOnReadContracts {
export const topic: string = 'ackOnRead.rpc';
}

export namespace ManualAckContracts {
export const topic: string = 'manualAck.rpc';
}
5 changes: 5 additions & 0 deletions e2e/mocks/api.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { RMQService } from '../../lib';
import {
AckOnReadContracts,
DivideContracts,
ManualAckContracts,
MultiplyContracts,
NotificationContracts,
SumContracts,
Expand Down Expand Up @@ -49,4 +50,8 @@ export class ApiController {
async ackOnRead(num: number): Promise<number> {
return this.rmq.send<number, number>(AckOnReadContracts.topic, num);
}

async manualAck(num: number): Promise<number> {
return this.rmq.send<number, number>(ManualAckContracts.topic, num);
}
}
13 changes: 13 additions & 0 deletions e2e/mocks/microservice.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { RMQController, RMQError, RMQRoute, Validate } from '../../lib';
import {
AckOnReadContracts,
DivideContracts,
ManualAckContracts,
MultiplyContracts,
NotificationContracts,
SumContracts,
Expand Down Expand Up @@ -65,4 +66,16 @@ export class MicroserviceController {
}, 300);
});
}

@RMQRoute(ManualAckContracts.topic)
async manualAck(num: number, manualAck: () => void): Promise<number> {
return new Promise((resolve, reject) => {
setTimeout(function () {
manualAck();
}, 100);
setTimeout(function () {
resolve(num);
}, 300);
});
}
}
7 changes: 7 additions & 0 deletions e2e/tests/rmq.e2e-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,13 @@ describe('RMQe2e', () => {
expect(first).toBe(ResponseEmmiterResult.ack);
expect(second).toBe(ResponseEmmiterResult.success);
});

it('acks manually', async () => {
await apiController.manualAck(3);
const [[first], [second]] = responseEmitterSpy.mock.calls;
expect(first).toBe(ResponseEmmiterResult.ack);
expect(second).toBe(ResponseEmmiterResult.success);
});
});

afterAll(async () => {
Expand Down
27 changes: 20 additions & 7 deletions lib/decorators/rmq-controller.decorator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,30 +25,43 @@ export function RMQController(options?: IRMQControllerOptions): ClassDecorator {
topics.forEach(async (topic) => {
const shouldAckOnRead = topic.ackOnRead || (options?.ackOnRead && topic.ackOnRead !== false);
requestEmitter.on(topic.topic, async (msg: Message) => {
let alreadyAcked = false;
const manualAck = () => {
if (alreadyAcked) {
return;
}

responseEmitter.emit(ResponseEmmiterResult.ack, msg);
alreadyAcked = true;
};

try {
if (shouldAckOnRead) {
responseEmitter.emit(ResponseEmmiterResult.ack, msg);
alreadyAcked = true;
}
const result = await this[topic.methodName].apply(
this,
options?.msgFactory ? options.msgFactory(msg, topic) : RMQMessageFactory(msg, topic)
);
const result = await this[topic.methodName].apply(this, [
...(options?.msgFactory
? options.msgFactory(msg, topic)
: RMQMessageFactory(msg, topic)),
manualAck,
]);
if (msg.properties.replyTo && result) {
responseEmitter.emit(ResponseEmmiterResult.success, msg, result, shouldAckOnRead);
responseEmitter.emit(ResponseEmmiterResult.success, msg, result, alreadyAcked);
} else if (msg.properties.replyTo && result === undefined) {
responseEmitter.emit(
ResponseEmmiterResult.error,
msg,
new RMQError(ERROR_UNDEFINED_FROM_RPC, ERROR_TYPE.RMQ),
shouldAckOnRead
alreadyAcked
);
} else {
responseEmitter.emit(ResponseEmmiterResult.ack, msg);
}
} catch (err) {
if (msg.properties.replyTo) {
responseEmitter.emit(ResponseEmmiterResult.error, msg, err);
} else if (!shouldAckOnRead) {
} else if (!alreadyAcked) {
responseEmitter.emit(ResponseEmmiterResult.ack, msg);
}
}
Expand Down

0 comments on commit a8b070c

Please sign in to comment.