Skip to content

Commit

Permalink
logs and messages in the last flow execution (#25)
Browse files Browse the repository at this point in the history
  • Loading branch information
if0s authored Jun 17, 2024
1 parent bd0ad31 commit 8246f7c
Show file tree
Hide file tree
Showing 8 changed files with 186 additions and 711 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
## 1.4.3 (June 17, 2024)
* Now logs and messages for `Consume` trigger will be located in the last flow execution
* Upgrade to sailor 2.7.2
* Upgrade amqplib to 0.10.4

## 1.4.2 (March 27, 2023)
Fixed issue with shutdown hook in `Consume` trigger

Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ If the exchange doesn't exist it will be created on start.

#### Please note: The flow must be set as real-time! Otherwise, errors may appear.

We recommend you set the lowest flow schedule (cron expression) frequency possible. E.g. once a day (0 0 * * *). And start the flow with the button ‘Run Now’ manually. Even though it does not affect the logic directly, each scheduled flow execution will create a record in the Executions list with no messages and no logs inside. All the logs and emitted messages will be appearing in the first execution.
We recommend you set the lowest flow schedule (cron expression) frequency possible. E.g. once a day (0 0 * * *). And start the flow with the button ‘Run Now’ manually. Even though it does not affect the logic directly, each scheduled flow execution will create a record in the Executions list and can make debugging difficult. All the logs and emitted messages will be appearing in the last execution.

#### Configuration Fields
* **Exchange** - (string, required): Exchange name where you want to get messages
Expand Down
4 changes: 2 additions & 2 deletions component.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"title": "AMQP component",
"description": "AMQP Component for async communication with queues and topics",
"version": "1.4.2",
"version": "1.4.3",
"credentials": {
"fields": {
"amqpURI": {
Expand Down Expand Up @@ -60,7 +60,7 @@
},
"triggers": {
"consume": {
"title": "Consume",
"title": "Consume (Real-time flows only)",
"main": "./lib/triggers/consume.js",
"type": "polling",
"fields": {
Expand Down
2 changes: 1 addition & 1 deletion lib/actions/publish.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ async function processAction(msg, cfg) {
attachments: msg.attachments,
});
}
this.logger.info({ routingKey: msg.body.routingKey }, `Publishing message with routingKey ${msg.body.routingKey}...`);
this.logger.info(`Publishing message with routingKey ${msg.body.routingKey}...`);

await amqpClient.publish(msg.body.routingKey || '', data, {
contentType: cfg.contentType || 'application/octet-stream',
Expand Down
6 changes: 3 additions & 3 deletions lib/amqp.js
Original file line number Diff line number Diff line change
Expand Up @@ -72,14 +72,14 @@ class AMQPClient {
confirm: confirmChannel,
setup: async (channel) => {
try {
this.logger.debug({ exchange: this.cfg.topic }, 'Asserting exchange...');
this.logger.debug(`Asserting exchange "${this.cfg.topic}"...`);
await channel.assertExchange(this.cfg.topic, 'topic', assertExchangeOptions);
if (!confirmChannel) {
this.logger.debug({ queueName: this.queueName }, 'Asserting queue');
this.logger.debug(`Asserting queue "${this.queueName}"`);
await channel.assertQueue(this.queueName, { exclusive: false, durable: false });
const keys = (this.cfg.bindingKeys || '#').split(',').map((s) => s.trim());
for (const key of keys) {
this.logger.debug({ key, queueName: this.queueName, exchange: this.cfg.topic }, 'Binding queue to exchange...');
this.logger.debug(`Key: "${key}". Binding queue "${this.queueName}" to exchange "${this.cfg.topic}"...`);
await channel.bindQueue(this.queueName, this.cfg.topic, key);
}
}
Expand Down
37 changes: 28 additions & 9 deletions lib/triggers/consume.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,38 +4,57 @@ const encryptor = require('../encryptor.js');
const { AMQPClient } = require('../amqp.js');

let amqpClient;
let context;

// eslint-disable-next-line consistent-return
async function processAction(msg, cfg) {
this.logger.info('Trigger started');
context = this;
if (!amqpClient || !amqpClient.connection) {
amqpClient = new AMQPClient(cfg, this);
context.logger.info('Trigger started');
amqpClient = new AMQPClient(cfg, context);
await amqpClient.init(false);
} else {
this.logger.info('Trigger was called again, we will ignore this run');
context.logger.info('Trigger is running, waiting for new messages');
amqpClient.setLogger(context.logger);
return;
}

// eslint-disable-next-line no-shadow
const consumer = (msg) => {
this.logger.debug('New message got');
context.logger.debug('Got a new message');
let data;

if (cfg.doNotDecrypt) {
data = JSON.parse(msg.content);
try {
data = JSON.parse(msg.content);
} catch (err) {
const errMsg = 'Failed to parse message, if it is encrypted you need to uncheck "Don\'t decrypt payload"';
context.logger.error(errMsg);
context.emit('error', errMsg);
return;
}
} else {
data = encryptor.decryptMessageContent(this, msg.content);
this.logger.debug('Message decrypted');
try {
data = encryptor.decryptMessageContent(context, msg.content);
context.logger.debug('Message decrypted');
} catch (err) {
const errMsg = `${err.message}`;
context.logger.error(errMsg);
context.emit('error', errMsg);
return;
}
}

const newMsg = messages.newMessageWithBody(data || {});
newMsg.id = msg.properties.messageId;
newMsg.attachments = data.attachments || {};
this.emit('data', newMsg);
context.emit('data', newMsg);
};
await amqpClient.consume(consumer, {
noAck: true,
consumerTag: `consumer_${process.env.ELASTICIO_EXEC_ID}_${process.env.ELASTICIO_FLOW_ID}`,
});
this.logger.info('Consumption started');
context.logger.info('Consumption started');
}

// eslint-disable-next-line no-unused-vars
Expand Down
Loading

0 comments on commit 8246f7c

Please sign in to comment.