Skip to content

Commit

Permalink
add channel close in shutdown hook (#21)
Browse files Browse the repository at this point in the history
* add channel close in shutdown hook

* up

---------

Co-authored-by: if0s <[email protected]>
  • Loading branch information
ShkarupaNick and if0s authored Mar 27, 2023
1 parent 9c862f0 commit 0069f4d
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 5 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 1.4.2 (March 27, 2023)
Fixed issue with shutdown hook in `Consume` trigger

## 1.4.1 (March 27, 2023)
Fixed connection cloning in `Publish` action

Expand Down
2 changes: 1 addition & 1 deletion component.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"title": "AMQP component",
"description": "AMQP Component for async communication with queues and topics",
"version": "1.4.1",
"version": "1.4.2",
"credentials": {
"fields": {
"amqpURI": {
Expand Down
2 changes: 1 addition & 1 deletion lib/actions/publish.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ async function processAction(msg, cfg) {
attachments: msg.attachments,
});
}
this.logger.info('Publishing message...');
this.logger.info({ routingKey: msg.body.routingKey }, `Publishing message with routingKey ${msg.body.routingKey}...`);

await amqpClient.publish(msg.body.routingKey || '', data, {
contentType: cfg.contentType || 'application/octet-stream',
Expand Down
17 changes: 14 additions & 3 deletions lib/amqp.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,17 @@ class AMQPClient {
this.logger = logger;
}

async shutdown() {
if (this.channel) {
await this.channel.close();
this.logger.info('The channel closed successfully');
}
if (this.connection) {
await this.connection.close();
this.logger.info('The connection closed successfully');
}
}

init(confirmChannel = true, assertExchangeOptions, deleteQueue) {
return new Promise((resolve, reject) => {
this.connection = amqp.connect([this.cfg.amqpURI], {
Expand Down Expand Up @@ -61,14 +72,14 @@ class AMQPClient {
confirm: confirmChannel,
setup: async (channel) => {
try {
this.logger.debug('Asserting topic exchange...');
this.logger.debug({ exchange: this.cfg.topic }, 'Asserting exchange...');
await channel.assertExchange(this.cfg.topic, 'topic', assertExchangeOptions);
if (!confirmChannel) {
this.logger.debug('Asserting queue');
this.logger.debug({ queueName: this.queueName }, 'Asserting queue');
await channel.assertQueue(this.queueName, { exclusive: false, durable: false });
const keys = (this.cfg.bindingKeys || '#').split(',').map((s) => s.trim());
for (const key of keys) {
this.logger.debug('Binding queue to exchange...');
this.logger.debug({ key, queueName: this.queueName, exchange: this.cfg.topic }, 'Binding queue to exchange...');
await channel.bindQueue(this.queueName, this.cfg.topic, key);
}
}
Expand Down
1 change: 1 addition & 0 deletions lib/triggers/consume.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ async function processAction(msg, cfg) {
async function shutdown(cfg) {
amqpClient = new AMQPClient(cfg, this);
await amqpClient.init(true, {}, true);
await amqpClient.shutdown();
}

module.exports.process = processAction;
Expand Down

0 comments on commit 0069f4d

Please sign in to comment.