Skip to content

Commit

Permalink
Detach/Attach Response Emitters
Browse files Browse the repository at this point in the history
  • Loading branch information
mjarmoc committed May 13, 2020
1 parent 9e65629 commit 50fa70c
Showing 1 changed file with 24 additions and 14 deletions.
38 changes: 24 additions & 14 deletions lib/rmq.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,12 @@ export class RMQService {
}

public async init(): Promise<void> {
return new Promise((resolve => {
return new Promise((resolve) => {
const connectionURLs: string[] = this.options.connections.map((connection: IRMQConnection) => {
return `amqp://${connection.login}:${connection.password}@${connection.host}`;
});
const connectionOptions = {
reconnectTimeInSeconds: this.options.reconnectTimeInSeconds ?? DEFAULT_RECONNECT_TIME
reconnectTimeInSeconds: this.options.reconnectTimeInSeconds ?? DEFAULT_RECONNECT_TIME,
};
this.server = amqp.connect(connectionURLs, connectionOptions);
this.channel = this.server.createChannel({
Expand All @@ -67,23 +67,30 @@ export class RMQService {
(msg: Message) => {
this.sendResponseEmitter.emit(msg.properties.correlationId, msg);
},
{ noAck: true }
{
noAck: true,
}
);
this.waitForReply();
if (this.options.queueName) {
this.listen(channel);
}
this.logger.log(CONNECTED_MESSAGE);
resolve();
},
});

this.server.on(DISCONNECT_EVENT, err => {
this.server.on('connect', (connection) => {
this.attachEmmitters();
});
this.server.on(DISCONNECT_EVENT, (err) => {
this.detachEmitters();
this.logger.error(DISCONNECT_MESSAGE);
this.logger.error(err.err);
});
}));
});
}

private ack(msg: Message): void {
this.channel.ack(msg);
}

public async send<IMessage, IReply>(topic: string, message: IMessage): Promise<IReply> {
Expand Down Expand Up @@ -120,8 +127,7 @@ export class RMQService {
}

public async disconnect() {
responseEmitter.removeAllListeners();
requestEmitter.removeAllListeners();
this.detachEmitters();
this.sendResponseEmitter.removeAllListeners();
await this.channel.close();
await this.server.close();
Expand All @@ -135,7 +141,7 @@ export class RMQService {
this.queueMeta = Reflect.getMetadata(RMQ_ROUTES_META, RMQService);
this.queueMeta = this.queueMeta ?? [];
if (this.queueMeta.length > 0) {
this.queueMeta.map(async meta => {
this.queueMeta.map(async (meta) => {
await channel.bindQueue(this.options.queueName, this.options.exchangeName, meta.topic);
});
}
Expand All @@ -154,27 +160,31 @@ export class RMQService {
);
}

private waitForReply(): void {
private detachEmitters(): void {
responseEmitter.removeAllListeners();
}

private attachEmmitters(): void {
responseEmitter.on(ResponseEmmiterResult.success, async (msg, result) => {
this.reply(result, msg);
});
responseEmitter.on(ResponseEmmiterResult.error, async (msg, err) => {
this.reply('', msg, err);
});
responseEmitter.on(ResponseEmmiterResult.ack, async (msg) => {
this.channel.ack(msg);
this.ack(msg);
});
}

private async reply(res: any, msg: Message, error: Error | RMQError = null) {
this.channel.ack(msg);
res = await this.intercept(res, msg, error);
await this.channel.sendToQueue(msg.properties.replyTo, Buffer.from(JSON.stringify(res)), {
correlationId: msg.properties.correlationId,
headers: {
...this.buildError(error),
},
});
this.ack(msg);
this.logger.debug(`Sent ▲ [${msg.fields.routingKey}] ${JSON.stringify(res)}`);
}

Expand All @@ -188,7 +198,7 @@ export class RMQService {
}

private isTopicExists(topic: string): boolean {
return !!this.queueMeta.find(x => x.topic === topic);
return !!this.queueMeta.find((x) => x.topic === topic);
}

private async useMiddleware(msg: Message) {
Expand Down

0 comments on commit 50fa70c

Please sign in to comment.