Skip to content

Commit

Permalink
Merge pull request AlariCode#8 from AlariCode/master
Browse files Browse the repository at this point in the history
Sync with Master
  • Loading branch information
mjarmoc authored May 13, 2020
2 parents 80d6e48 + 88ed201 commit 9e65629
Show file tree
Hide file tree
Showing 15 changed files with 487 additions and 255 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# Change log

## 1.7.0
- Fixed reconnection bug
- Async init all modules loaded (thx to mjarmoc)

## 1.6.0
- Custom logger injection (thx to @minenkom)

## 1.5.2
- Fixed double logging

Expand Down
19 changes: 10 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,23 +73,24 @@ Additionally, you can use optional parameters:
- **isQueueDurable** (boolean) - Makes created queue durable. Default is true.
- **isExchangeDurable** (boolean) - Makes created exchange durable. Default is true.
- **logMessages** (boolean) - Enable printing all sent and recieved messages in console with its route and content. Default is false.
- **middleware** (array) - Array of middleware functions that implements `RMQPipeClass` with one method `transform`. They will be triggered right after recieving message, before pipes and controller method. Trigger order is equal to array order.
- **errorHandler** (class) - custom error handler for dealing with errors from replies, use `errorHandler` in module options and pass class that implements `RMQErrorHandler`.
- **logger** (LoggerService) - Your custom logger service that implements `LoggerService` interface. Compatible with Winston and other loggers.
- **middleware** (array) - Array of middleware functions that extends `RMQPipeClass` with one method `transform`. They will be triggered right after recieving message, before pipes and controller method. Trigger order is equal to array order.
- **errorHandler** (class) - custom error handler for dealing with errors from replies, use `errorHandler` in module options and pass class that extends `RMQErrorHandler`.
- **serviceName** (string) - service name for debugging.

```javascript
class LogMiddleware implements RMQPipeClass {
class LogMiddleware extends RMQPipeClass {
async transfrom(msg: Message): Promise<Message> {
console.log(msg);
return msg;
}
}
```

- **intercepters** (array) - Array of intercepter functions that implements `RMQIntercepterClass` with one method `intercept`. They will be triggered before replying on any message. Trigger order is equal to array order.
- **intercepters** (array) - Array of intercepter functions that extends `RMQIntercepterClass` with one method `intercept`. They will be triggered before replying on any message. Trigger order is equal to array order.

```javascript
export class MyIntercepter implements RMQIntercepterClass {
export class MyIntercepter extends RMQIntercepterClass {
async intercept(res: any, msg: Message, error: Error): Promise<any> {
// res - response body
// msg - initial message we are replying to
Expand Down Expand Up @@ -253,10 +254,10 @@ myMethod(numbers: number[]): number {
}
```

where `MyPipeClass` implements `RMQPipeClass` with one method `transform`:
where `MyPipeClass` extends `RMQPipeClass` with one method `transform`:

```javascript
class MyPipeClass implements RMQPipeClass {
class MyPipeClass extends RMQPipeClass {
async transfrom(msg: Message): Promise<Message> {
// do something
return msg;
Expand All @@ -265,10 +266,10 @@ class MyPipeClass implements RMQPipeClass {
```

## Using RMQErrorHandler
If you want to use custom error handler for dealing with errors from replies, use `errorHandler` in module options and pass class that implements `RMQErrorHandler`:
If you want to use custom error handler for dealing with errors from replies, use `errorHandler` in module options and pass class that extends `RMQErrorHandler`:

```javascript
class MyErrorHandler implements RMQErrorHandler {
class MyErrorHandler extends RMQErrorHandler {
public static handle(headers: IRmqErrorHeaders): Error | RMQError {
// do something
return new RMQError(
Expand Down
2 changes: 1 addition & 1 deletion e2e/mocks/double.pipe.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { RMQPipeClass } from '../../lib';
import { Message } from 'amqplib';
import { MultiplyContracts } from '../contracts/mock.contracts';

export class DoublePipe implements RMQPipeClass {
export class DoublePipe extends RMQPipeClass {
async transform(msg: Message): Promise<Message> {
if (msg.fields.routingKey === MultiplyContracts.topic) {
let { arrayToMultiply }: MultiplyContracts.Request = JSON.parse(msg.content.toString());
Expand Down
2 changes: 1 addition & 1 deletion e2e/mocks/zero.intercepter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { RMQIntercepterClass } from '../../lib';
import { Message } from 'amqplib';
import { DivideContracts } from '../contracts/mock.contracts';

export class ZeroIntercepter implements RMQIntercepterClass {
export class ZeroIntercepter extends RMQIntercepterClass {
async intercept(res: any, msg: Message, error: Error): Promise<Message> {
if (msg.fields.routingKey === DivideContracts.topic) {
res.result = 0;
Expand Down
12 changes: 6 additions & 6 deletions e2e/tests/rmq.e2e-spec.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { Test } from '@nestjs/testing';
import { RMQModule, RMQService } from '../../lib';
import { INestApplication } from '@nestjs/common';
import { INestApplication, Logger } from '@nestjs/common';
import { ApiController } from '../mocks/api.controller';
import { MicroserviceController } from '../mocks/microservice.controller';
import { ERROR_UNDEFINED_FROM_RPC } from '../../lib/constants';
Expand Down Expand Up @@ -32,7 +32,7 @@ describe('RMQe2e', () => {
middleware: [DoublePipe],
intercepters: [ZeroIntercepter],
errorHandler: ErrorHostHandler,
serviceName: 'test-service'
serviceName: 'test-service',
}),
],
controllers: [ApiController, MicroserviceController],
Expand Down Expand Up @@ -68,7 +68,7 @@ describe('RMQe2e', () => {
});
it('get common Error from method', async () => {
try {
const { result } = await apiController.sumSuccess([0,0,0]);
const { result } = await apiController.sumSuccess([0, 0, 0]);
expect(result).not.toBe(0);
} catch (error) {
expect(error.message).toBe('My error from method');
Expand All @@ -81,7 +81,7 @@ describe('RMQe2e', () => {
});
it('get RMQError from method', async () => {
try {
const { result } = await apiController.sumSuccess([-1,0,0]);
const { result } = await apiController.sumSuccess([-1, 0, 0]);
expect(result).not.toBe(-1);
} catch (error) {
expect(error.message).toBe('My RMQError from method');
Expand All @@ -94,7 +94,7 @@ describe('RMQe2e', () => {
});
it('get undefined return Error', async () => {
try {
const { result } = await apiController.sumSuccess([-11,0,0]);
const { result } = await apiController.sumSuccess([-11, 0, 0]);
expect(result).not.toBe(-11);
} catch (error) {
expect(error.message).toBe(ERROR_UNDEFINED_FROM_RPC);
Expand Down Expand Up @@ -138,7 +138,7 @@ describe('RMQe2e', () => {
describe('errorHandler', () => {
it('error host change', async () => {
try {
const { result } = await apiController.sumSuccess([0,0,0]);
const { result } = await apiController.sumSuccess([0, 0, 0]);
expect(result).not.toBe(0);
} catch (error) {
expect(error.host).toBe('handler');
Expand Down
7 changes: 7 additions & 0 deletions lib/classes/rmq-intercepter.class.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
import { Message } from 'amqplib';
import { LoggerService } from '@nestjs/common';

export class RMQIntercepterClass {
protected logger: LoggerService;

constructor(logger: LoggerService = console) {
this.logger = logger;
}

async intercept(res: any, msg: Message, error?: Error): Promise<any> {
return res;
}
Expand Down
7 changes: 7 additions & 0 deletions lib/classes/rmq-pipe.class.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
import { Message } from 'amqplib';
import { LoggerService } from '@nestjs/common';

// tslint:disable-next-line: interface-name
export class RMQPipeClass {
protected logger: LoggerService;

constructor(logger: LoggerService = console) {
this.logger = logger;
}

async transform(msg: Message): Promise<Message> {
return msg;
}
Expand Down
16 changes: 0 additions & 16 deletions lib/constants.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
export const RMQ_ROUTES_META: string = 'RMQ_ROUTES_META';
export const DISCONNECT_EVENT: string = 'disconnect';
export const DISCONNECT_MESSAGE: string = 'Disconnected from RMQ. Trying to reconnect';
export const CONNECTING_MESSAGE: string = 'Connecting to RMQ';
export const CONNECTED_MESSAGE: string = 'Successfully connected to RMQ';
export const REPLY_QUEUE: string = 'amq.rabbitmq.reply-to';
export const EXCHANGE_TYPE: string = 'topic';
Expand All @@ -20,18 +19,3 @@ export enum ERROR_TYPE {
TRANSPORT = 'TRANSPORT',
RMQ = 'RMQ'
}

export const CUSTOM_LOGS = {
recieved: {
badge: '▼',
color: 'blue',
label: 'recieved',
logLevel: 'info',
},
sent: {
badge: '▲',
color: 'blue',
label: 'sent',
logLevel: 'info',
},
};
11 changes: 2 additions & 9 deletions lib/decorators/rmq-controller.decorator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,16 @@ import {
import { IQueueMeta } from '../interfaces/queue-meta.interface';
import { requestEmitter, responseEmitter, ResponseEmmiterResult } from '../emmiters/router.emmiter';
import { RMQService } from '../rmq.service';
import { Signale } from 'signale';
import { Message } from 'amqplib';
import { RMQError } from '..';
import { Logger } from '@nestjs/common';

export function RMQController(): ClassDecorator {
return function(target: any) {
const logger = new Signale({
config: {
displayTimestamp: true,
displayDate: true,
},
logLevel: 'error',
});
let topics: IQueueMeta[] = Reflect.getMetadata(RMQ_ROUTES_META, RMQService);
topics = topics ? topics.filter(topic => topic.target === target.prototype) : [];
if(topics.length === 0) {
logger.error(`${ERROR_NO_ROUTE_FOR_CONTROLLER} ${target.prototype.constructor.name}`);
Logger.error(`${ERROR_NO_ROUTE_FOR_CONTROLLER} ${target.prototype.constructor.name}`);
}
target = class extends (target as { new(...args): any }) {
constructor(...args: any) {
Expand Down
30 changes: 30 additions & 0 deletions lib/helpers/logger.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import { Logger, LoggerService } from '@nestjs/common';
import { blueBright, white, yellow } from 'chalk';

export class RQMColorLogger implements LoggerService {
logMessages: boolean;

constructor(logMessages: boolean) {
this.logMessages = logMessages ?? false;
}
log(message: any, context?: string): any {
Logger.log(message, context);
}
error(message: any, trace?: string, context?: string): any {
Logger.error(message, trace, context);
}
debug(message: any, context?: string): any {
if(!this.logMessages) {
return;
}
const split = new RegExp(/(.*?)(\[.*?])(.*)/g).exec(message);
if(split[3]) {
Logger.log(`${blueBright(split[1])} ${yellow(split[2])} ${white(split[3])}`);
} else {
Logger.log(message, context);
}
}
warn(message: any, context?: string): any {
Logger.warn(message, context);
}
}
2 changes: 2 additions & 0 deletions lib/interfaces/rmq-options.interface.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { RMQPipeClass } from '../classes/rmq-pipe.class';
import { RMQIntercepterClass } from '../classes/rmq-intercepter.class';
import { RMQErrorHandler } from '../classes/rmq-error-handler.class';
import { LoggerService } from '@nestjs/common';

export interface IRMQServiceOptions {
exchangeName: string;
Expand All @@ -16,6 +17,7 @@ export interface IRMQServiceOptions {
reconnectTimeInSeconds?: number;
messagesTimeout?: number;
logMessages?: boolean;
logger?: LoggerService;
middleware?: typeof RMQPipeClass[];
intercepters?: typeof RMQIntercepterClass[];
errorHandler?: typeof RMQErrorHandler;
Expand Down
4 changes: 3 additions & 1 deletion lib/rmq.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ export class RMQModule {
const rmqServiceProvider = {
provide: RMQService,
useFactory: async (): Promise<RMQService> => {
return new RMQService(options);
const RMQInstance = new RMQService(options);
await RMQInstance.init();
return RMQInstance;
},
};
return {
Expand Down
Loading

0 comments on commit 9e65629

Please sign in to comment.