Skip to content

Commit

Permalink
feature / connection management and auto reconnect
Browse files Browse the repository at this point in the history
  • Loading branch information
AlariCode committed Sep 7, 2018
1 parent 8825f48 commit 0cc9263
Show file tree
Hide file tree
Showing 11 changed files with 142 additions and 98 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Change log
## v0.1.1
- Added reconnection in client and server, if your RabbitMQ instanse is down.
- Support for multiple urls for cluster usage.

## v0.1.0
- First stable version of the package
17 changes: 13 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@

This module is a custom strategy for NestJS microservice library. It allows you to use RabbitMQ as a transport for microservice messages. Learn about NestJS [here](https://nestjs.com).

``` bash
npm i new nestjs-rmq
```

To generate your microservice, just use @nestjs/cli:

``` bash
Expand All @@ -18,7 +22,7 @@ import { ServerRMQ } from 'nestjs-rmq';
async function bootstrap() {
const app = await NestFactory.createMicroservice(AppModule, {
strategy: new ServerRMQ({
url: `amqp://${config.default.localhost.amqp.login}:${config.default.localhost.amqp.password}@${config.default.localhost.amqp.host}`,
urls: [`amqp://login:password@host`],
queue: 'test',
queueOptions: { durable: false }
})
Expand All @@ -27,7 +31,7 @@ async function bootstrap() {
}
```
Options are:
**url** (string) - Connection url to your RabbitMQ instance. It contains user, password and host.
**urls** (string[]) - Connection urls to one or more RabbitMQ instance. Each url contains user, password and host.
**queue** (string) - Name of queue, your server will lusten to.
prefetchCount | number | Number of prefetched messages. You can read more [here](https://github.com/postwait/node-amqp).
**isGlobalPrefetchCount** (boolean) - You can read more [here](https://github.com/postwait/node-amqp).
Expand All @@ -42,6 +46,8 @@ test(data: string): string {
return 'test' + data;
}
```


As for the clients, they have the same options:

``` javascript
Expand All @@ -50,7 +56,7 @@ import { ClientRMQ } from 'nestjs-rmq';
//...

client = new ClientRMQ({
url: `amqp://login:password@host`,
urls: [`amqp://login:password@host`],
queue: 'test',
queueOptions: { durable: false }
});
Expand All @@ -63,4 +69,7 @@ a(): Observable<string> {
console.log('Client sent: ' + msg);
return this.client.send<string, string>({ cmd: 'test' }, msg);
}
```
```

## Breaking changes from v0.1.0 to v0.1.1
You need to use `urls` option instead of `url` and pass `string[]` instead of `string` for cluster support.
4 changes: 2 additions & 2 deletions lib/client/client.interface.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Options } from 'amqplib';

export interface ClientOptions {
url?: string;
export interface IClientOptions {
urls?: string[];
queue?: string;
prefetchCount?: number;
isGlobalPrefetchCount?: boolean;
Expand Down
104 changes: 60 additions & 44 deletions lib/client/client.ts
Original file line number Diff line number Diff line change
@@ -1,50 +1,94 @@
import { Channel, Connection, Options } from 'amqplib';
import { Options } from 'amqplib';
import { ClientProxy } from '@nestjs/microservices';
import { ClientOptions } from './client.interface';
import { IClientOptions } from './client.interface';
import { EventEmitter } from 'events';
import {
import {
DEFAULT_IS_GLOBAL_PREFETCH_COUNT,
DEFAULT_PREFETCH_COUNT,
DEFAULT_QUEUE,
DEFAULT_QUEUE_OPTIONS,
DEFAULT_URL
DEFAULT_URL,
} from '../constants';
import * as rqmPackage from 'amqplib';
import * as amqp from 'amqp-connection-manager';

export class ClientRMQ extends ClientProxy {
private client: Connection = null;
private channel: Channel = null;
private url: string;
private client: any = null;
private channel: any = null;
private urls: string[];
private queue: string;
private prefetchCount: number;
private isGlobalPrefetchCount: boolean;
private queueOptions: Options.AssertQueue
private queueOptions: Options.AssertQueue;
private replyQueue: string;
private responseEmitter: EventEmitter;

constructor(
private readonly options: ClientOptions) {
private readonly options: IClientOptions) {
super();
this.url = this.options.url || DEFAULT_URL;
this.urls = this.options.urls || [DEFAULT_URL];
this.queue = this.options.queue || DEFAULT_QUEUE;
this.prefetchCount = this.options.prefetchCount || DEFAULT_PREFETCH_COUNT;
this.isGlobalPrefetchCount = this.options.isGlobalPrefetchCount || DEFAULT_IS_GLOBAL_PREFETCH_COUNT;
this.queueOptions = this.options.queueOptions || DEFAULT_QUEUE_OPTIONS;
this.connect();
}

protected publish(messageObj, callback: (err, result, disposed?: boolean) => void) {
public close(): void {
this.channel && this.channel.close();
this.client && this.client.close();
}

public listen() {
this.channel.addSetup((channel) => {
return Promise.all([
channel.consume(this.replyQueue, (msg) => {
this.responseEmitter.emit(msg.properties.correlationId, msg);
}, { noAck: true }),
]);
});
}

public connect(): Promise<any> {
if (this.client && this.channel) {
return Promise.resolve();
}
return new Promise(async (resolve, reject) => {
this.client = amqp.connect(this.urls);
this.client.on('connect', x => {
this.channel = this.client.createChannel({
json: false,
setup: async (channel) => {
await channel.assertQueue(this.queue, this.queueOptions);
await channel.prefetch(this.prefetchCount, this.isGlobalPrefetchCount);
this.replyQueue = (await channel.assertQueue('', { exclusive: true })).queue;
this.responseEmitter = new EventEmitter();
this.responseEmitter.setMaxListeners(0);
this.listen();
resolve();
},
});
});
this.client.on('disconnect', err => {
reject(err);
this.client.close();
this.client = null;
});
});
}

protected async publish(messageObj, callback: (err, result, disposed?: boolean) => void) {
try {
let correlationId = Math.random().toString(36).substring(2, 15) + Math.random().toString(36).substring(2, 15);
if (!this.client) {
await this.connect();
}
const correlationId = Math.random().toString(36).substring(2, 15) + Math.random().toString(36).substring(2, 15);
this.responseEmitter.on(correlationId, msg => {
this.handleMessage(msg, callback);
});
this.channel.sendToQueue(this.queue, Buffer.from(JSON.stringify(messageObj)), {
replyTo: this.replyQueue,
correlationId: correlationId
correlationId,
});
} catch (err) {
console.log(err);
callback(err, null);
}
}
Expand All @@ -66,32 +110,4 @@ export class ClientRMQ extends ClientProxy {
});
}
}

public close(): void {
this.channel && this.channel.close();
this.client && this.client.close();
}

public listen() {
this.channel.consume(this.replyQueue, (msg) => {
this.responseEmitter.emit(msg.properties.correlationId, msg);
}, { noAck: true });
}

public connect(): Promise<any> {
if (this.client && this.channel) {
return Promise.resolve();
}
return new Promise(async (resolve, reject) => {
this.client = await rqmPackage.connect(this.url);
this.channel = await this.client.createChannel();
await this.channel.assertQueue(this.queue, this.queueOptions);
await this.channel.prefetch(this.prefetchCount, this.isGlobalPrefetchCount);
this.replyQueue = (await this.channel.assertQueue('', { exclusive: true })).queue;
this.responseEmitter = new EventEmitter();
this.responseEmitter.setMaxListeners(0);
this.listen();
resolve();
});
}
}
2 changes: 1 addition & 1 deletion lib/client/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
export { ClientRMQ } from './client';
export { ClientOptions } from './client.interface';
export { IClientOptions } from './client.interface';
4 changes: 2 additions & 2 deletions lib/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
export { ServerOptions, ServerRMQ } from './server';
export { ClientOptions, ClientRMQ } from './client';
export { IServerOptions, ServerRMQ } from './server';
export { IClientOptions, ClientRMQ } from './client';
2 changes: 1 addition & 1 deletion lib/server/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
export { ServerRMQ } from './server';
export { ServerOptions } from './server.interface';
export { IServerOptions } from './server.interface';
4 changes: 2 additions & 2 deletions lib/server/server.interface.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Options } from 'amqplib';

export interface ServerOptions {
url?: string;
export interface IServerOptions {
urls?: string[];
queue?: string;
prefetchCount?: number;
isGlobalPrefetchCount?: boolean;
Expand Down
58 changes: 31 additions & 27 deletions lib/server/server.ts
Original file line number Diff line number Diff line change
@@ -1,28 +1,28 @@
import { Server, CustomTransportStrategy } from '@nestjs/microservices';
import { Channel, Connection, Options } from 'amqplib';
import { ServerOptions } from './server.interface';
import { Options } from 'amqplib';
import { IServerOptions } from './server.interface';
import { Observable } from 'rxjs';
import {
import {
DEFAULT_IS_GLOBAL_PREFETCH_COUNT,
DEFAULT_PREFETCH_COUNT,
DEFAULT_QUEUE,
DEFAULT_QUEUE_OPTIONS,
DEFAULT_URL
DEFAULT_URL,
} from '../constants';
import * as rqmPackage from 'amqplib';
import * as amqp from 'amqp-connection-manager';

export class ServerRMQ extends Server implements CustomTransportStrategy {
private server: Connection = null;
private channel: Channel = null;
private url: string;
private server: any = null;
private channel: any = null;
private urls: string[];
private queue: string;
private prefetchCount: number;
private queueOptions: Options.AssertQueue
private queueOptions: Options.AssertQueue;
private isGlobalPrefetchCount: boolean;

constructor(private readonly options: ServerOptions) {
constructor(private readonly options: IServerOptions) {
super();
this.url = this.options.url || DEFAULT_URL;
this.urls = this.options.urls || [DEFAULT_URL];
this.queue = this.options.queue || DEFAULT_QUEUE;
this.prefetchCount = this.options.prefetchCount || DEFAULT_PREFETCH_COUNT;
this.isGlobalPrefetchCount = this.options.isGlobalPrefetchCount || DEFAULT_IS_GLOBAL_PREFETCH_COUNT;
Expand All @@ -31,28 +31,32 @@ export class ServerRMQ extends Server implements CustomTransportStrategy {

public async listen(callback: () => void): Promise<void> {
await this.start(callback);
this.channel.consume(this.queue, (msg) => this.handleMessage(msg), {
noAck: true,
});
}

private async start(callback?: () => void) {
try {
this.server = await rqmPackage.connect(this.url);
this.channel = await this.server.createChannel();
this.channel.assertQueue(this.queue, this.queueOptions);
await this.channel.prefetch(this.prefetchCount, this.isGlobalPrefetchCount);
callback();
} catch (err) {
this.logger.error(err);
}
}

public close(): void {
this.channel && this.channel.close();
this.server && this.server.close();
}

private async start(callback?: () => void) {
this.server = amqp.connect(this.urls);
this.server.on('connect', x => {
this.channel = this.server.createChannel({
json: false,
setup: async (channel) => {
await channel.assertQueue(this.queue, this.queueOptions);
await channel.prefetch(this.prefetchCount, this.isGlobalPrefetchCount);
channel.consume(this.queue, (msg) => this.handleMessage(msg), { noAck: true });
callback();
},
});
});

this.server.on('disconnect', err => {
this.logger.error('Disconnected from RMQ. Trying to reconnect');
});
}

private async handleMessage(message): Promise<void> {
const { content, properties } = message;
const messageObj = JSON.parse(content.toString());
Expand All @@ -68,6 +72,6 @@ export class ServerRMQ extends Server implements CustomTransportStrategy {

private sendMessage(message, replyTo, correlationId): void {
const buffer = Buffer.from(JSON.stringify(message));
this.channel.sendToQueue(replyTo, buffer, { correlationId: correlationId });
this.channel.sendToQueue(replyTo, buffer, { correlationId });
}
}
Loading

0 comments on commit 0cc9263

Please sign in to comment.