Skip to content

Commit

Permalink
update sailor to 2.6.7 (#2)
Browse files Browse the repository at this point in the history
* update sailor to 2.6.7
  • Loading branch information
kirill-levitskiy authored May 21, 2020
1 parent b808995 commit cec1047
Show file tree
Hide file tree
Showing 13 changed files with 3,429 additions and 261 deletions.
7 changes: 7 additions & 0 deletions .eslintrc.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
module.exports = {
'extends': 'airbnb-base',
'env': {
'mocha': true,
'node': true,
}
};
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
node_modules
coverage
.idea
.env
8 changes: 4 additions & 4 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
language: node_js
node_js:
- v5
- v6
- v7
script: npm run integration-test
- v14
- v13
- v12
script: npm run integration-test
20 changes: 20 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
## 1.3.1 (May 22, 2020)

* Update sailor version to 2.6.7

## 1.3.0 (April 21, 2017)

* Update sailor version to 2.1.0

## 1.2.0 (February 21, 2017)

* Update sailor version to 2.0.0

## 1.1.0 (December 16, 2016)

* Add Consume Trigger

## 1.0.0 (December 15, 2016)

* Initial release
* Add Publish Action
1 change: 1 addition & 0 deletions component.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
{
"title": "AMQP",
"buildType": "docker",
"description": "Pub/Sub Component for async communication with queues and topics",
"credentials": {
"fields": {
Expand Down
66 changes: 33 additions & 33 deletions lib/actions/publish.js
Original file line number Diff line number Diff line change
@@ -1,31 +1,27 @@
'use strict';
const co = require('co');
const amqp = require('amqplib');
const logger = require('@elastic.io/component-logger')();
const encryptor = require('../encryptor.js');
const debug = require('debug')('publish');

let channel;

module.exports.process = processAction;
module.exports.init = init;

/**
* This methdo will be called from elastic.io platform on initialization
* This method will be called from elastic.io platform on initialization
*
* @param cfg
*/
function init(cfg) {
console.log('Starting initialization');
const amqpURI = cfg.amqpURI;
const amqpExchange = cfg.topic;
return co(function* gen() {
debug('Connecting to amqpURI=%s', amqpURI);
const conn = yield amqp.connect(amqpURI);
debug('Creating a confirm channel');
channel = yield conn.createConfirmChannel();
debug('Asserting topic exchange exchange=%s', amqpExchange);
yield channel.assertExchange(amqpExchange, 'topic');
});
logger.info('Starting initialization');
const { amqpURI } = cfg;
const amqpExchange = cfg.topic;
return co(function* gen() {
logger.debug('Connecting to amqpURI=%s', amqpURI);
const conn = yield amqp.connect(amqpURI);
logger.debug('Creating a confirm channel');
channel = yield conn.createConfirmChannel();
logger.debug('Asserting topic exchange exchange=%s', amqpExchange);
yield channel.assertExchange(amqpExchange, 'topic');
});
}

/**
Expand All @@ -35,20 +31,24 @@ function init(cfg) {
* @param cfg configuration that is account information and configuration field values
*/
function processAction(msg, cfg) {
const amqpExchange = cfg.topic;
return co(function* sendMessage() {
console.log('Publishing message id=%s', msg.id);
let encryptedData = encryptor.encryptMessageContent({
body: msg.body.payload || msg.body,
attachments: msg.attachments
});
channel.publish(amqpExchange, msg.body.routingKey || '', encryptedData, {
contentType: "application/octet-stream",
messageId: msg.id
});
console.log('Message published id=%s', msg.id);
yield channel.waitForConfirms();
console.log('Message publishing confirmed id=%s', msg.id);
return msg;
}.bind(this));
const self = this;
const amqpExchange = cfg.topic;
return co(function* sendMessage() {
self.logger.info('Publishing message id=%s', msg.id);
const encryptedData = encryptor.encryptMessageContent(self, {
body: msg.body.payload || msg.body,
attachments: msg.attachments,
});
channel.publish(amqpExchange, msg.body.routingKey || '', encryptedData, {
contentType: 'application/octet-stream',
messageId: msg.id,
});
self.logger.info('Message published id=%s', msg.id);
yield channel.waitForConfirms();
self.logger.info('Message publishing confirmed id=%s', msg.id);
return msg;
});
}

module.exports.process = processAction;
module.exports.init = init;
75 changes: 38 additions & 37 deletions lib/cipher.js
Original file line number Diff line number Diff line change
@@ -1,52 +1,53 @@
var _ = require('lodash');
var crypto = require('crypto');
var debug = require('debug')('sailor:cipher');
/* eslint-disable new-cap */

var ALGORYTHM = 'aes-256-cbc';
var PASSWORD = process.env.ELASTICIO_MESSAGE_CRYPTO_PASSWORD;
var VECTOR = process.env.ELASTICIO_MESSAGE_CRYPTO_IV;
const _ = require('lodash');
const crypto = require('crypto');

exports.id = 1;
exports.encrypt = encryptIV;
exports.decrypt = decryptIV;
const ALGORYTHM = 'aes-256-cbc';
const PASSWORD = process.env.ELASTICIO_MESSAGE_CRYPTO_PASSWORD;
const VECTOR = process.env.ELASTICIO_MESSAGE_CRYPTO_IV;

function encryptIV(rawData) {
debug('About to encrypt:', rawData);
function encryptIV(self, rawData) {
self.logger.debug('About to encrypt:', rawData);

if (!_.isString(rawData)) {
throw new Error('RabbitMQ message cipher.encryptIV() accepts only string as parameter.');
}
if (!_.isString(rawData)) {
throw new Error('RabbitMQ message cipher.encryptIV() accepts only string as parameter.');
}

if (!PASSWORD) {
console.log('Encryption will be skipped as ELASTICIO_MESSAGE_CRYPTO_PASSWORD env is empty');
return new Buffer.from(rawData);
}
if (!PASSWORD) {
self.logger.info('Encryption will be skipped as ELASTICIO_MESSAGE_CRYPTO_PASSWORD env is empty');
return new Buffer.from(rawData);
}

if (!VECTOR) {
throw new Error('process.env.ELASTICIO_MESSAGE_CRYPTO_IV is not set');
}
if (!VECTOR) {
throw new Error('process.env.ELASTICIO_MESSAGE_CRYPTO_IV is not set');
}

var encodeKey = crypto.createHash('sha256').update(PASSWORD, 'utf-8').digest();
var cipher = crypto.createCipheriv(ALGORYTHM, encodeKey, VECTOR);
return Buffer.concat([cipher.update(new Buffer.from(rawData)),cipher.final()]);
const encodeKey = crypto.createHash('sha256').update(PASSWORD, 'utf-8').digest();
const cipher = crypto.createCipheriv(ALGORYTHM, encodeKey, VECTOR);
return Buffer.concat([cipher.update(new Buffer.from(rawData)), cipher.final()]);
}

function decryptIV(encData) {
debug('About to decrypt:', encData);
function decryptIV(self, encData) {
self.logger.debug('About to decrypt:', encData);

if (!PASSWORD) {
console.log('Decryption will be skipped as ELASTICIO_MESSAGE_CRYPTO_PASSWORD env is empty');
return encData;
}
if (!PASSWORD) {
self.logger.info('Decryption will be skipped as ELASTICIO_MESSAGE_CRYPTO_PASSWORD env is empty');
return encData;
}

if (!VECTOR) {
throw new Error('process.env.ELASTICIO_MESSAGE_CRYPTO_IV is not set');
}
if (!VECTOR) {
throw new Error('process.env.ELASTICIO_MESSAGE_CRYPTO_IV is not set');
}

var decodeKey = crypto.createHash('sha256').update(PASSWORD, 'utf-8').digest();
var cipher = crypto.createDecipheriv(ALGORYTHM, decodeKey, VECTOR);
const decodeKey = crypto.createHash('sha256').update(PASSWORD, 'utf-8').digest();
const cipher = crypto.createDecipheriv(ALGORYTHM, decodeKey, VECTOR);

var result = cipher.update(encData, 'base64', 'utf-8') + cipher.final('utf-8');
const result = cipher.update(encData, 'base64', 'utf-8') + cipher.final('utf-8');

return result;
return result;
}

exports.id = 1;
exports.encrypt = encryptIV;
exports.decrypt = decryptIV;
32 changes: 16 additions & 16 deletions lib/encryptor.js
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
var cipher = require('./cipher.js');
const cipher = require('./cipher.js');

exports.encryptMessageContent = encryptMessageContent;
exports.decryptMessageContent = decryptMessageContent;

function encryptMessageContent(messagePayload) {
return cipher.encrypt(JSON.stringify(messagePayload));
function encryptMessageContent(self, messagePayload) {
return cipher.encrypt(self, JSON.stringify(messagePayload));
}

function decryptMessageContent(messagePayload) {
if (!messagePayload || ! Buffer.isBuffer(messagePayload)) {
throw new Error("Message payload supplied for decryption is either empty or not a Buffer");
}
try {
return JSON.parse(cipher.decrypt(messagePayload));
} catch (err) {
console.error(err.stack);
throw Error('Failed to decrypt message: ' + err.message);
}
function decryptMessageContent(self, messagePayload) {
if (!messagePayload || !Buffer.isBuffer(messagePayload)) {
throw new Error('Message payload supplied for decryption is either empty or not a Buffer');
}
try {
return JSON.parse(cipher.decrypt(self, messagePayload));
} catch (err) {
self.logger.error(err.stack);
throw Error(`Failed to decrypt message: ${err.message}`);
}
}

exports.encryptMessageContent = encryptMessageContent;
exports.decryptMessageContent = decryptMessageContent;
101 changes: 52 additions & 49 deletions lib/triggers/consume.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
'use strict';
const eioUtils = require('elasticio-node').messages;
const { messages } = require('elasticio-node');
const co = require('co');
const amqp = require('amqplib');
const logger = require('@elastic.io/component-logger')();
const encryptor = require('../encryptor.js');
const debug = require('debug')('consumer');

let channel;
const queueName = `eio_consumer_${process.env.ELASTICIO_FLOW_ID}_${process.env.ELASTICIO_USER_ID}`;
Expand All @@ -15,33 +14,34 @@ let listening;
* @param cfg
*/
function init(cfg) {
console.log('Starting initialization, queueName=%s', queueName);
const amqpURI = cfg.amqpURI;
const amqpExchange = cfg.topic;
const keys = (cfg.bindingKeys || '#').split(',').map((s) => s.trim());
return co(function* initialize() {
debug('Connecting to amqpURI=%s', amqpURI);
const conn = yield amqp.connect(amqpURI);
logger.info('Starting initialization, queueName=%s', queueName);
const { amqpURI } = cfg;
const amqpExchange = cfg.topic;
const keys = (cfg.bindingKeys || '#').split(',').map((s) => s.trim());
return co(function* initialize() {
logger.debug('Connecting to amqpURI=%s', amqpURI);
const conn = yield amqp.connect(amqpURI);

debug('Creating a receiver channel');
channel = yield conn.createChannel();
logger.debug('Creating a receiver channel');
channel = yield conn.createChannel();

debug('Asserting topic exchange exchange=%s', amqpExchange);
yield channel.assertExchange(amqpExchange, 'topic');
logger.debug('Asserting topic exchange exchange=%s', amqpExchange);
yield channel.assertExchange(amqpExchange, 'topic');

debug('Asserting queue');
yield channel.assertQueue(queueName, {
exclusive: false,
durable: false,
autoDelete: true
});

for (let key of keys) {
debug(`Binding queue to exchange queue=${queueName} exchange=${amqpExchange} bindingKey=${key}`);
yield channel.bindQueue(queueName, amqpExchange, key);
}
console.log('Initialization completed');
logger.debug('Asserting queue');
yield channel.assertQueue(queueName, {
exclusive: false,
durable: false,
autoDelete: true,
});

// eslint-disable-next-line no-restricted-syntax
for (const key of keys) {
logger.debug(`Binding queue to exchange queue=${queueName} exchange=${amqpExchange} bindingKey=${key}`);
yield channel.bindQueue(queueName, amqpExchange, key);
}
logger.info('Initialization completed');
});
}

/**
Expand All @@ -50,31 +50,34 @@ function init(cfg) {
* @param msg incoming message object that contains ``body`` with payload
* @param cfg configuration that is account information and configuration field values
*/
// eslint-disable-next-line no-unused-vars
function processAction(msg, cfg) {
console.log('Trigger started');
if (listening) {
console.log('Trigger was called again, we will ignore this run');
return Promise.resolve();
}
const consumer = (msg) => {
debug('Have got message fields=%j properties=%j', msg.fields, msg.properties);
const decrypted = encryptor.decryptMessageContent(msg.content);
debug('Decrypted message=%j', decrypted);
const newMsg = eioUtils.newMessageWithBody(decrypted.body || {});
newMsg.id = msg.properties.messageId;
newMsg.attachments = decrypted.attachments || {};
this.emit('data', newMsg);
};
return co(function* consume() {
console.log('Starting consuming from %s', queueName);
yield channel.consume(queueName, consumer, {
noAck: true, // We can't really assert if message was consumed if we emit it yet
consumerTag: `consumer_${process.env.ELASTICIO_EXEC_ID}_${process.env.ELASTICIO_FLOW_ID}`
});
console.log('Consumption started');
listening = true;
const self = this;
self.logger.info('Trigger started');
if (listening) {
self.logger.info('Trigger was called again, we will ignore this run');
return Promise.resolve();
}
// eslint-disable-next-line no-shadow
const consumer = (msg) => {
self.logger.debug('Have got message fields=%j properties=%j', msg.fields, msg.properties);
const decrypted = encryptor.decryptMessageContent(self, msg.content);
self.logger.debug('Decrypted message=%j', decrypted);
const newMsg = messages.newMessageWithBody(decrypted.body || {});
newMsg.id = msg.properties.messageId;
newMsg.attachments = decrypted.attachments || {};
self.emit('data', newMsg);
};
return co(function* consume() {
self.logger.info('Starting consuming from %s', queueName);
yield channel.consume(queueName, consumer, {
noAck: true, // We can't really assert if message was consumed if we emit it yet
consumerTag: `consumer_${process.env.ELASTICIO_EXEC_ID}_${process.env.ELASTICIO_FLOW_ID}`,
});
self.logger.info('Consumption started');
listening = true;
});
}

module.exports.process = processAction;
module.exports.init = init;
module.exports.init = init;
Loading

0 comments on commit cec1047

Please sign in to comment.