Skip to content

Commit

Permalink
Amqp reconnection (#18)
Browse files Browse the repository at this point in the history
  • Loading branch information
if0s authored Mar 21, 2023
1 parent deb0857 commit 3dec4e8
Show file tree
Hide file tree
Showing 14 changed files with 2,875 additions and 1,592 deletions.
6 changes: 3 additions & 3 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ jobs:
- checkout
- restore_cache:
key: dependency-cache-{{ checksum "package.json" }}
- run:
name: Audit Dependencies
command: npm audit --audit-level=high
- run:
name: Installing Dependencies
command: npm install
- run:
name: Audit Dependencies
command: npm run audit
- save_cache:
key: dependency-cache-{{ checksum "package.json" }}
paths:
Expand Down
15 changes: 10 additions & 5 deletions .eslintrc.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
module.exports = {
'extends': 'airbnb-base',
'env': {
'mocha': true,
'node': true,
}
extends: 'airbnb-base',
env: {
mocha: true,
node: true,
},
rules: {
'no-await-in-loop': 0,
'max-len': ['error', { code: 180 }],
'no-restricted-syntax': 0,
},
};
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
## 1.4.0 (March 25, 2023)
* Implemented retry mechanism on connection errors
* Added configuration fields to set retry options
* Added `Don't encrypt payload` and `Content-Type` configuration fields to `Publish` action
* Added `Don't decrypt payload` configuration field to `Consume` trigger
* Upgrade to sailor 2.7.1
* Upgrade amqplib to 0.10.3

## 1.3.3 (March 25, 2021)

* Upgrade to sailor 2.6.24
Expand Down
24 changes: 17 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

A component designed to talk to Advanced Message Queuing Protocol,
(**AMQP**) APIs. AMQP is an open standard for passing business messages
between applications or organisations (see [amqp.org](https://www.amqp.org) for more).
between applications or organizations (see [amqp.org](https://www.amqp.org) for more).

AMQP component establishes an asynchronous communications with queues and topics
to publish or consume records.
Expand All @@ -23,8 +23,7 @@ keys that are specified in one string separated by commas.

### Environment variables

This component will automatically encrypt data that is sent to the queue when following
environment variables are set:
This component will automatically encrypt data that is sent to the queue when following environment variables are set and `Don't encrypt payload` unchecked

* `ELASTICIO_MESSAGE_CRYPTO_IV` - vector for symmetric encryption
* `ELASTICIO_MESSAGE_CRYPTO_PASSWORD` - password for symmetric encryption
Expand All @@ -47,23 +46,34 @@ also use URL syntax to provide further parameters and any other options
Will consume the incoming message object that contains `body` with the payload.
If the exchange doesn't exist it will be created on start.

Optionally you can use `#` or `*` to wildcard. For more information check the
tutorial provided at the [RabbitMQ site](http://www.rabbitmq.com/tutorials/tutorial-five-javascript.html).
#### Configuration Fields
* **Exchange** - (string, required): Exchange name where you want to get messages
* **Binding Keys** - (string, optional): Optionally you can use `#` or `*` to wildcard. For more information check the tutorial provided at the [RabbitMQ site](http://www.rabbitmq.com/tutorials/tutorial-five-javascript.html).
* **Don't decrypt payload** - (checkbox, optional): If checked payload will be not decrypted
* **Reconnect Timeout** - (string, optional, 5 by default, maximum 1000): In case of errors how long to wait until retry is seconds
* **Reconnect Attempts** - (string, optional, 12 by default, maximum 1000): How many times try to reconnect before throw error


## Actions

### Publish

Will publish the messages into an exchange. This exchange will be created on
start if it doesn't exists.

#### Configuration Fields
* **Exchange** - (string, required): Exchange name where you want to send message to
* **Don't encrypt payload** - (checkbox, optional): If checked payload will be not encrypted
* **Content-Type** - (string, optional): Content-Type of pushed payload, default is `application/octet-stream`
* **Reconnect Timeout** - (string, optional, 5 by default, maximum 1000): In case of errors how long to wait until retry is seconds
* **Reconnect Attempts** - (string, optional, 12 by default, maximum 1000): How many times try to reconnect before throw error. 12 by default


## Known limitations

Following limitations of the component are known:
* You can not publish to the default exchange.
* All published exchanges are `topic` exchanges by default. However, with the `topic` exchanges one can emulate `direct` and `fanout` exchanges.


## License

Apache-2.0 © [elastic.io GmbH](https://elastic.io)
Expand Down
52 changes: 52 additions & 0 deletions component.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
"title": "AMQP",
"buildType": "docker",
"description": "Pub/Sub Component for async communication with queues and topics",
"version": "1.4.0",
"credentials": {
"fields": {
"amqpURI": {
Expand All @@ -23,6 +24,34 @@
"required": true,
"placeholder": "up_to_200_symbols",
"note": "This exchange will be created on start if not exists."
},
"doNotEncrypt": {
"label": "Don't encrypt payload",
"viewClass": "CheckBoxView",
"help": {
"description": "If checked, payload will be not encrypted"
}
},
"contentType": {
"label": "Content-Type",
"viewClass": "TextFieldWithNoteView",
"note": "Content-Type of pushed payload, default is 'application/octet-stream'"
},
"reconnectTimeOut": {
"label": "Reconnect Time Out",
"required": false,
"viewClass": "TextFieldView",
"help": {
"description": "In case of errors how long to wait until retry in seconds. 5 by default"
}
},
"reconnectAttempts": {
"label": "Reconnect Attempts",
"required": false,
"viewClass": "TextFieldView",
"help": {
"description": "How many times try to reconnect before throw error. 12 by default"
}
}
},
"metadata": {
Expand All @@ -49,6 +78,29 @@
"required": false,
"placeholder": "this.key,that.key",
"note": "Optional. You can use <b>#</b> or <b>*</b> to wildcard, more info <a href=\"http://www.rabbitmq.com/tutorials/tutorial-five-javascript.html\" target=\"_top\">here</a>"
},
"doNotDecrypt": {
"label": "Don't decrypt payload",
"viewClass": "CheckBoxView",
"help": {
"description": "If checked, payload will be not decrypted"
}
},
"reconnectTimeOut": {
"label": "Reconnect Timeout",
"required": false,
"viewClass": "TextFieldView",
"help": {
"description": "In case of errors how long to wait until retry is seconds. 5 by default"
}
},
"reconnectAttempts": {
"label": "Reconnect Attempts",
"required": false,
"viewClass": "TextFieldView",
"help": {
"description": "How many times try to reconnect before throw error. 12 by default"
}
}
},
"metadata": { }
Expand Down
58 changes: 18 additions & 40 deletions lib/actions/publish.js
Original file line number Diff line number Diff line change
@@ -1,51 +1,29 @@
const amqp = require('amqplib');
const logger = require('@elastic.io/component-logger')();

const encryptor = require('../encryptor.js');
const { AMQPClient } = require('../amqp.js');

let channel;

/**
* This method will be called from elastic.io platform on initialization
*
* @param cfg
*/
async function init(cfg) {
logger.info('Starting initialization');
const { amqpURI } = cfg;
const amqpExchange = cfg.topic;
logger.debug('Connecting to amqp...');
const conn = await amqp.connect(amqpURI);
logger.debug('Creating a confirm channel');
channel = await conn.createConfirmChannel();
logger.debug('Asserting topic exchange exchange...');
await channel.assertExchange(amqpExchange, 'topic');
}
let amqpClient;

/**
* This method will be called from elastic.io platform providing following data
*
* @param msg incoming message object that contains ``body`` with payload
* @param cfg configuration that is account information and configuration field values
*/
async function processAction(msg, cfg) {
const self = this;
const amqpExchange = cfg.topic;
if (!amqpClient || !amqpClient.connection) { amqpClient = new AMQPClient(cfg, this); }
amqpClient.setLogger(this.logger);

self.logger.info('Publishing message...');
const encryptedData = encryptor.encryptMessageContent(self, {
body: msg.body.payload || msg.body,
attachments: msg.attachments,
});
channel.publish(amqpExchange, msg.body.routingKey || '', encryptedData, {
contentType: 'application/octet-stream',
let data;
if (cfg.doNotEncrypt) {
data = msg.body.payload || msg.body;
data = Buffer.from(JSON.stringify(data));
} else {
data = encryptor.encryptMessageContent(this, {
body: msg.body.payload || msg.body,
attachments: msg.attachments,
});
}
this.logger.info('Publishing message...');

await amqpClient.publish(msg.body.routingKey || '', data, {
contentType: cfg.contentType || 'application/octet-stream',
messageId: msg.id,
});
self.logger.info('Message published');
await channel.waitForConfirms();
self.logger.info('Message publishing confirmed');
return msg;
}

module.exports.process = processAction;
module.exports.init = init;
107 changes: 107 additions & 0 deletions lib/amqp.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
const amqp = require('amqp-connection-manager');
const platformLogger = require('@elastic.io/component-logger')();

const RECONNECT_TIMEOUT = 5;
const RECONNECT_ATTEMPTS = 12;

const validateAttemptsAndTimeout = (input, def) => {
if (!input) return def;
if (input && Number(input).toString() !== 'NaN' && Number(input) <= 1000 && Number(input) >= 0) {
return Number(input);
}
throw new Error('"Reconnect Timeout" and "Reconnect Attempts" should be valid number between 0 and 1000');
};

class AMQPClient {
constructor(cfg, context) {
this.connection = null;
this.channel = null;
this.logger = context.logger || platformLogger;
this.cfg = cfg;
this.queueName = `eio_consumer_${process.env.ELASTICIO_FLOW_ID}_${process.env.ELASTICIO_USER_ID}`;
this.retry = 0;
this.context = context;
this.reconnectTimeOut = validateAttemptsAndTimeout(cfg.reconnectTimeOut, RECONNECT_TIMEOUT);
this.reconnectAttempts = validateAttemptsAndTimeout(cfg.reconnectAttempts, RECONNECT_ATTEMPTS);
}

setLogger(logger) {
this.logger = logger;
}

init(confirmChannel = true, assertExchangeOptions, deleteQueue) {
return new Promise((resolve, reject) => {
this.connection = amqp.connect([this.cfg.amqpURI], {
reconnectTimeInSeconds: this.reconnectTimeOut,
});
this.connection.on('connect', () => {
this.retry = 0;
this.logger.info('Successfully connected to RabbitMQ');
resolve();
});
this.connection.on('connectFailed', ({ err }) => {
this.retry += 1;
if (this.retry >= this.reconnectAttempts) {
const errMsg = new Error(`Connection failed after ${this.reconnectAttempts} attempts`);
this.connection.emit('error', { err: errMsg });
this.context.emit('error', errMsg);
delete this.connection;
} else {
this.logger.error(`Connection failed due to: ${err}, ${this.retry} of ${this.reconnectAttempts} retry after ${this.reconnectTimeOut}sec`);
}
});
this.connection.on('disconnect', ({ err }) => { this.logger.error(`Connection disconnected due to: ${err}`); });
this.connection.on('error', ({ err }) => {
this.logger.error(`Connection encountered an error: ${err}`);
reject(err);
});
this.connection.on('blocked', ({ reason }) => { this.logger.error(`Connection blocked due to: ${reason}`); });
this.connection.on('unblocked', () => { this.logger.info('Connection unblocked'); });
this.channel = this.connection.createChannel({
confirm: confirmChannel,
setup: async (channel) => {
try {
this.logger.debug('Asserting topic exchange...');
await channel.assertExchange(this.cfg.topic, 'topic', assertExchangeOptions);
if (!confirmChannel) {
this.logger.debug('Asserting queue');
await channel.assertQueue(this.queueName, { exclusive: false, durable: false });
const keys = (this.cfg.bindingKeys || '#').split(',').map((s) => s.trim());
for (const key of keys) {
this.logger.debug('Binding queue to exchange...');
await channel.bindQueue(this.queueName, this.cfg.topic, key);
}
}
if (deleteQueue) {
this.logger.info(`Deleting queue ${this.queueName}`);
await channel.deleteQueue(this.queueName);
}
this.logger.info('Successfully finished channel setup');
return channel;
} catch (err) {
this.logger.error(`Error on Channel setup: ${err}`);
return channel;
}
},
});
});
}

async publish(routingKey, content, options) {
await this.init();
this.logger.info('Going to publish message');
await this.channel.publish(this.cfg.topic, routingKey, content, options);
this.logger.info('Message published');
}

async consume(onMessage, options) {
this.logger.info(`Starting consuming from ${this.queueName}`);
return this.channel.consume(this.queueName, onMessage, options);
}

async waitForConnect() {
return this.channel.waitForConnect();
}
}

module.exports.AMQPClient = AMQPClient;
Loading

0 comments on commit 3dec4e8

Please sign in to comment.