From 50fa70c07a526703656914112704e8d7255fa03c Mon Sep 17 00:00:00 2001 From: Michal Jarmoc Date: Wed, 13 May 2020 13:46:10 +0200 Subject: [PATCH] Detach/Attach Response Emitters --- lib/rmq.service.ts | 38 ++++++++++++++++++++++++-------------- 1 file changed, 24 insertions(+), 14 deletions(-) diff --git a/lib/rmq.service.ts b/lib/rmq.service.ts index 55da35e..3411c52 100644 --- a/lib/rmq.service.ts +++ b/lib/rmq.service.ts @@ -44,12 +44,12 @@ export class RMQService { } public async init(): Promise { - return new Promise((resolve => { + return new Promise((resolve) => { const connectionURLs: string[] = this.options.connections.map((connection: IRMQConnection) => { return `amqp://${connection.login}:${connection.password}@${connection.host}`; }); const connectionOptions = { - reconnectTimeInSeconds: this.options.reconnectTimeInSeconds ?? DEFAULT_RECONNECT_TIME + reconnectTimeInSeconds: this.options.reconnectTimeInSeconds ?? DEFAULT_RECONNECT_TIME, }; this.server = amqp.connect(connectionURLs, connectionOptions); this.channel = this.server.createChannel({ @@ -67,9 +67,10 @@ export class RMQService { (msg: Message) => { this.sendResponseEmitter.emit(msg.properties.correlationId, msg); }, - { noAck: true } + { + noAck: true, + } ); - this.waitForReply(); if (this.options.queueName) { this.listen(channel); } @@ -77,13 +78,19 @@ export class RMQService { resolve(); }, }); - - this.server.on(DISCONNECT_EVENT, err => { + this.server.on('connect', (connection) => { + this.attachEmmitters(); + }); + this.server.on(DISCONNECT_EVENT, (err) => { + this.detachEmitters(); this.logger.error(DISCONNECT_MESSAGE); this.logger.error(err.err); }); - })); + }); + } + private ack(msg: Message): void { + this.channel.ack(msg); } public async send(topic: string, message: IMessage): Promise { @@ -120,8 +127,7 @@ export class RMQService { } public async disconnect() { - responseEmitter.removeAllListeners(); - requestEmitter.removeAllListeners(); + this.detachEmitters(); this.sendResponseEmitter.removeAllListeners(); await this.channel.close(); await this.server.close(); @@ -135,7 +141,7 @@ export class RMQService { this.queueMeta = Reflect.getMetadata(RMQ_ROUTES_META, RMQService); this.queueMeta = this.queueMeta ?? []; if (this.queueMeta.length > 0) { - this.queueMeta.map(async meta => { + this.queueMeta.map(async (meta) => { await channel.bindQueue(this.options.queueName, this.options.exchangeName, meta.topic); }); } @@ -154,7 +160,11 @@ export class RMQService { ); } - private waitForReply(): void { + private detachEmitters(): void { + responseEmitter.removeAllListeners(); + } + + private attachEmmitters(): void { responseEmitter.on(ResponseEmmiterResult.success, async (msg, result) => { this.reply(result, msg); }); @@ -162,12 +172,11 @@ export class RMQService { this.reply('', msg, err); }); responseEmitter.on(ResponseEmmiterResult.ack, async (msg) => { - this.channel.ack(msg); + this.ack(msg); }); } private async reply(res: any, msg: Message, error: Error | RMQError = null) { - this.channel.ack(msg); res = await this.intercept(res, msg, error); await this.channel.sendToQueue(msg.properties.replyTo, Buffer.from(JSON.stringify(res)), { correlationId: msg.properties.correlationId, @@ -175,6 +184,7 @@ export class RMQService { ...this.buildError(error), }, }); + this.ack(msg); this.logger.debug(`Sent ▲ [${msg.fields.routingKey}] ${JSON.stringify(res)}`); } @@ -188,7 +198,7 @@ export class RMQService { } private isTopicExists(topic: string): boolean { - return !!this.queueMeta.find(x => x.topic === topic); + return !!this.queueMeta.find((x) => x.topic === topic); } private async useMiddleware(msg: Message) {