Skip to content

Commit

Permalink
feature - reconnect fix and async init
Browse files Browse the repository at this point in the history
  • Loading branch information
AlariCode committed Apr 24, 2020
1 parent 33aa980 commit 88ed201
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 50 deletions.
8 changes: 6 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
# Change log

## 1.5.2
- Fixed double logging
## 1.7.0
- Fixed reconnection bug
- Async init all modules loaded (thx to mjarmoc)

## 1.6.0
- Custom logger injection (thx to @minenkom)

## 1.5.2
- Fixed double logging

## 1.5.1
- Fixed ack race condition
- Added tests
Expand Down
10 changes: 5 additions & 5 deletions e2e/tests/rmq.e2e-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ describe('RMQe2e', () => {
middleware: [DoublePipe],
intercepters: [ZeroIntercepter],
errorHandler: ErrorHostHandler,
serviceName: 'test-service'
serviceName: 'test-service',
}),
],
controllers: [ApiController, MicroserviceController],
Expand Down Expand Up @@ -68,7 +68,7 @@ describe('RMQe2e', () => {
});
it('get common Error from method', async () => {
try {
const { result } = await apiController.sumSuccess([0,0,0]);
const { result } = await apiController.sumSuccess([0, 0, 0]);
expect(result).not.toBe(0);
} catch (error) {
expect(error.message).toBe('My error from method');
Expand All @@ -81,7 +81,7 @@ describe('RMQe2e', () => {
});
it('get RMQError from method', async () => {
try {
const { result } = await apiController.sumSuccess([-1,0,0]);
const { result } = await apiController.sumSuccess([-1, 0, 0]);
expect(result).not.toBe(-1);
} catch (error) {
expect(error.message).toBe('My RMQError from method');
Expand All @@ -94,7 +94,7 @@ describe('RMQe2e', () => {
});
it('get undefined return Error', async () => {
try {
const { result } = await apiController.sumSuccess([-11,0,0]);
const { result } = await apiController.sumSuccess([-11, 0, 0]);
expect(result).not.toBe(-11);
} catch (error) {
expect(error.message).toBe(ERROR_UNDEFINED_FROM_RPC);
Expand Down Expand Up @@ -138,7 +138,7 @@ describe('RMQe2e', () => {
describe('errorHandler', () => {
it('error host change', async () => {
try {
const { result } = await apiController.sumSuccess([0,0,0]);
const { result } = await apiController.sumSuccess([0, 0, 0]);
expect(result).not.toBe(0);
} catch (error) {
expect(error.host).toBe('handler');
Expand Down
4 changes: 3 additions & 1 deletion lib/rmq.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ export class RMQModule {
const rmqServiceProvider = {
provide: RMQService,
useFactory: async (): Promise<RMQService> => {
return new RMQService(options);
const RMQInstance = new RMQService(options);
await RMQInstance.init();
return RMQInstance;
},
};
return {
Expand Down
76 changes: 40 additions & 36 deletions lib/rmq.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,45 +41,49 @@ export class RMQService {
constructor(options: IRMQServiceOptions) {
this.options = options;
this.logger = options.logger ? options.logger : new RQMColorLogger(this.options.logMessages);
this.init();
}

public async init(): Promise<void> {
const connectionURLs: string[] = this.options.connections.map((connection: IRMQConnection) => {
return `amqp://${connection.login}:${connection.password}@${connection.host}`;
});
const connectionOptions = {
reconnectTimeInSeconds: this.options.reconnectTimeInSeconds ?? DEFAULT_RECONNECT_TIME
};
this.server = amqp.connect(connectionURLs, connectionOptions);
this.channel = this.server.createChannel({
json: false,
setup: async (channel: Channel) => {
await channel.assertExchange(this.options.exchangeName, EXCHANGE_TYPE, {
durable: this.options.isExchangeDurable ?? true,
});
await channel.prefetch(
this.options.prefetchCount ?? DEFAULT_PREFETCH_COUNT,
this.options.isGlobalPrefetchCount ?? false
);
await channel.consume(
this.replyQueue,
(msg: Message) => {
this.sendResponseEmitter.emit(msg.properties.correlationId, msg);
},
{ noAck: true }
);
this.waitForReply();
if (this.options.queueName) {
this.listen(channel);
}
this.logger.log(CONNECTED_MESSAGE);
},
});
return new Promise((resolve => {
const connectionURLs: string[] = this.options.connections.map((connection: IRMQConnection) => {
return `amqp://${connection.login}:${connection.password}@${connection.host}`;
});
const connectionOptions = {
reconnectTimeInSeconds: this.options.reconnectTimeInSeconds ?? DEFAULT_RECONNECT_TIME
};
this.server = amqp.connect(connectionURLs, connectionOptions);
this.channel = this.server.createChannel({
json: false,
setup: async (channel: Channel) => {
await channel.assertExchange(this.options.exchangeName, EXCHANGE_TYPE, {
durable: this.options.isExchangeDurable ?? true,
});
await channel.prefetch(
this.options.prefetchCount ?? DEFAULT_PREFETCH_COUNT,
this.options.isGlobalPrefetchCount ?? false
);
await channel.consume(
this.replyQueue,
(msg: Message) => {
this.sendResponseEmitter.emit(msg.properties.correlationId, msg);
},
{ noAck: true }
);
this.waitForReply();
if (this.options.queueName) {
this.listen(channel);
}
this.logger.log(CONNECTED_MESSAGE);
resolve();
},
});

this.server.on(DISCONNECT_EVENT, err => {
this.logger.error(DISCONNECT_MESSAGE);
this.logger.error(err.err);
});
}));

this.server.on(DISCONNECT_EVENT, err => {
this.logger.error(DISCONNECT_MESSAGE, err);
});
}

public async send<IMessage, IReply>(topic: string, message: IMessage): Promise<IReply> {
Expand Down Expand Up @@ -150,7 +154,7 @@ export class RMQService {
);
}

private async waitForReply(): Promise<void> {
private waitForReply(): void {
responseEmitter.on(ResponseEmmiterResult.success, async (msg, result) => {
this.reply(result, msg);
});
Expand Down
8 changes: 4 additions & 4 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "nestjs-rmq",
"version": "1.6.0",
"version": "1.7.0",
"description": "NestJS RabbitMQ Module",
"main": "./dist/index.js",
"types": "./dist/index.d.ts",
Expand Down Expand Up @@ -37,7 +37,7 @@
"@nestjs/core": "^7.0.0",
"@nestjs/platform-express": "^7.0.0",
"@nestjs/testing": "^7.0.0",
"@types/amqp-connection-manager": "^2.0.8",
"@types/amqp-connection-manager": "^2.0.9",
"@types/amqplib": "^0.5.13",
"@types/chalk": "^2.2.0",
"@types/jest": "^25.1.4",
Expand Down

0 comments on commit 88ed201

Please sign in to comment.