diff --git a/CHANGELOG.md b/CHANGELOG.md index 3836c8f..167a1b0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,6 @@ +## 1.4.2 (March 27, 2023) +Fixed issue with shutdown hook in `Consume` trigger + ## 1.4.1 (March 27, 2023) Fixed connection cloning in `Publish` action diff --git a/component.json b/component.json index 44467ce..6167b18 100644 --- a/component.json +++ b/component.json @@ -1,7 +1,7 @@ { "title": "AMQP component", "description": "AMQP Component for async communication with queues and topics", - "version": "1.4.1", + "version": "1.4.2", "credentials": { "fields": { "amqpURI": { diff --git a/lib/actions/publish.js b/lib/actions/publish.js index d2a8d91..e723183 100644 --- a/lib/actions/publish.js +++ b/lib/actions/publish.js @@ -21,7 +21,7 @@ async function processAction(msg, cfg) { attachments: msg.attachments, }); } - this.logger.info('Publishing message...'); + this.logger.info({ routingKey: msg.body.routingKey }, `Publishing message with routingKey ${msg.body.routingKey}...`); await amqpClient.publish(msg.body.routingKey || '', data, { contentType: cfg.contentType || 'application/octet-stream', diff --git a/lib/amqp.js b/lib/amqp.js index c18430e..7381894 100644 --- a/lib/amqp.js +++ b/lib/amqp.js @@ -29,6 +29,17 @@ class AMQPClient { this.logger = logger; } + async shutdown() { + if (this.channel) { + await this.channel.close(); + this.logger.info('The channel closed successfully'); + } + if (this.connection) { + await this.connection.close(); + this.logger.info('The connection closed successfully'); + } + } + init(confirmChannel = true, assertExchangeOptions, deleteQueue) { return new Promise((resolve, reject) => { this.connection = amqp.connect([this.cfg.amqpURI], { @@ -61,14 +72,14 @@ class AMQPClient { confirm: confirmChannel, setup: async (channel) => { try { - this.logger.debug('Asserting topic exchange...'); + this.logger.debug({ exchange: this.cfg.topic }, 'Asserting exchange...'); await channel.assertExchange(this.cfg.topic, 'topic', assertExchangeOptions); if (!confirmChannel) { - this.logger.debug('Asserting queue'); + this.logger.debug({ queueName: this.queueName }, 'Asserting queue'); await channel.assertQueue(this.queueName, { exclusive: false, durable: false }); const keys = (this.cfg.bindingKeys || '#').split(',').map((s) => s.trim()); for (const key of keys) { - this.logger.debug('Binding queue to exchange...'); + this.logger.debug({ key, queueName: this.queueName, exchange: this.cfg.topic }, 'Binding queue to exchange...'); await channel.bindQueue(this.queueName, this.cfg.topic, key); } } diff --git a/lib/triggers/consume.js b/lib/triggers/consume.js index 4878187..793afd1 100644 --- a/lib/triggers/consume.js +++ b/lib/triggers/consume.js @@ -42,6 +42,7 @@ async function processAction(msg, cfg) { async function shutdown(cfg) { amqpClient = new AMQPClient(cfg, this); await amqpClient.init(true, {}, true); + await amqpClient.shutdown(); } module.exports.process = processAction;