From 18729c40e0f8f6472d110f8e2d0982f6dc6dfba8 Mon Sep 17 00:00:00 2001 From: souvik Date: Fri, 17 Nov 2023 18:36:49 +0530 Subject: [PATCH] Update index.ts --- src/adapters/mqtt/index.ts | 82 +++++++++++++++++++++++++++----------- 1 file changed, 58 insertions(+), 24 deletions(-) diff --git a/src/adapters/mqtt/index.ts b/src/adapters/mqtt/index.ts index 0f08b523d..268da2f84 100644 --- a/src/adapters/mqtt/index.ts +++ b/src/adapters/mqtt/index.ts @@ -45,16 +45,17 @@ class MqttAdapter extends Adapter { } private getSecurityReqs() { - let userAndPasswordSecurityReq let X509SecurityReq - const securityRequirements = this.AsyncAPIServer.security().map(e => e.all().map(e => e.scheme())) + const securityRequirements = this.AsyncAPIServer.security().map((e) => + e.all().map((e) => e.scheme()) + ) - securityRequirements.forEach(security => { + securityRequirements.forEach((security) => { for (const sec of security) { const securityType = sec.type().toLocaleLowerCase() - switch(securityType){ + switch (securityType) { case SecurityTypes.USER_PASSWORD: userAndPasswordSecurityReq = sec break @@ -62,14 +63,23 @@ class MqttAdapter extends Adapter { X509SecurityReq = sec break default: - this.emit("error", new Error(`Invalid security type '${securityType}' specified for server '${this.serverName}'. Please double-check your configuration to ensure you're using a supported security type. Here is a list of supported types: ${Object.values(SecurityTypes)}`)) + this.emit( + 'error', + new Error( + `Invalid security type '${securityType}' specified for server '${ + this.serverName + }'. Please double-check your configuration to ensure you're using a supported security type. Here is a list of supported types: ${Object.values( + SecurityTypes + )}` + ) + ) } } }) return { userAndPasswordSecurityReq, - X509SecurityReq + X509SecurityReq, } } @@ -110,7 +120,9 @@ class MqttAdapter extends Adapter { this.client.on('close', () => { this.emit('close', { connection: this.client, - channels: this.channelAddresses, + channels: this.channelNames.map((channelName) => + this.parsedAsyncAPI.channels().get(channelName).address() + ), }) }) @@ -139,23 +151,40 @@ class MqttAdapter extends Adapter { private subscribe(channels: string[]) { channels.forEach((channel) => { - const binding = this.parsedAsyncAPI.channels().get(channel).bindings().get('mqtt')?.value() - this.client.subscribe(channel, { - qos: binding?.qos ? binding.qos : 0, - }, (err, granted) => { - if (err) { - logLineWithIcon('x', `Error while trying to subscribe to \`${channel}\` topic.`, { - highlightedWords: [channel], - iconColor: '#f00', - disableEmojis: true, - }) - console.log(err.message) - return + const binding = this.parsedAsyncAPI + .channels() + .get(channel) + .bindings() + .get('mqtt') + ?.value() + this.client.subscribe( + channel, + { + qos: binding?.qos ? binding.qos : 0, + }, + (err, granted) => { + if (err) { + logLineWithIcon( + 'x', + `Error while trying to subscribe to \`${channel}\` topic.`, + { + highlightedWords: [channel], + iconColor: '#f00', + disableEmojis: true, + } + ) + console.log(err.message) + return + } + logLineWithIcon( + ':zap:', + `Subscribed to \`${channel}\` topic with QoS ${granted?.[0].qos}`, + { + highlightedWords: [channel], + } + ) } - logLineWithIcon(':zap:', `Subscribed to \`${channel}\` topic with QoS ${granted?.[0].qos}`, { - highlightedWords: [channel], - }) - }) + ) }) } @@ -216,7 +245,12 @@ class MqttAdapter extends Adapter { _send(message: GleeMessage): Promise { return new Promise((resolve, reject) => { - const binding = this.parsedAsyncAPI.channels().get(message.channel).bindings().get('mqtt')?.value() + const binding = this.parsedAsyncAPI + .channels() + .get(message.channel) + .bindings() + .get('mqtt') + ?.value() this.client.publish( message.channel, message.payload,