Skip to content

Commit

Permalink
Update index.ts
Browse files Browse the repository at this point in the history
  • Loading branch information
Souvikns committed Nov 17, 2023
1 parent a6a542f commit 18729c4
Showing 1 changed file with 58 additions and 24 deletions.
82 changes: 58 additions & 24 deletions src/adapters/mqtt/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,31 +45,41 @@ class MqttAdapter extends Adapter {
}

private getSecurityReqs() {

let userAndPasswordSecurityReq
let X509SecurityReq

const securityRequirements = this.AsyncAPIServer.security().map(e => e.all().map(e => e.scheme()))
const securityRequirements = this.AsyncAPIServer.security().map((e) =>
e.all().map((e) => e.scheme())
)

securityRequirements.forEach(security => {
securityRequirements.forEach((security) => {
for (const sec of security) {
const securityType = sec.type().toLocaleLowerCase()
switch(securityType){
switch (securityType) {
case SecurityTypes.USER_PASSWORD:
userAndPasswordSecurityReq = sec
break
case SecurityTypes.X509:
X509SecurityReq = sec
break
default:
this.emit("error", new Error(`Invalid security type '${securityType}' specified for server '${this.serverName}'. Please double-check your configuration to ensure you're using a supported security type. Here is a list of supported types: ${Object.values(SecurityTypes)}`))
this.emit(
'error',
new Error(
`Invalid security type '${securityType}' specified for server '${
this.serverName
}'. Please double-check your configuration to ensure you're using a supported security type. Here is a list of supported types: ${Object.values(
SecurityTypes
)}`
)
)
}
}
})

return {
userAndPasswordSecurityReq,
X509SecurityReq
X509SecurityReq,
}
}

Expand Down Expand Up @@ -110,7 +120,9 @@ class MqttAdapter extends Adapter {
this.client.on('close', () => {
this.emit('close', {
connection: this.client,
channels: this.channelAddresses,
channels: this.channelNames.map((channelName) =>
this.parsedAsyncAPI.channels().get(channelName).address()
),
})
})

Expand Down Expand Up @@ -139,23 +151,40 @@ class MqttAdapter extends Adapter {

private subscribe(channels: string[]) {
channels.forEach((channel) => {
const binding = this.parsedAsyncAPI.channels().get(channel).bindings().get('mqtt')?.value()
this.client.subscribe(channel, {
qos: binding?.qos ? binding.qos : 0,
}, (err, granted) => {
if (err) {
logLineWithIcon('x', `Error while trying to subscribe to \`${channel}\` topic.`, {
highlightedWords: [channel],
iconColor: '#f00',
disableEmojis: true,
})
console.log(err.message)
return
const binding = this.parsedAsyncAPI
.channels()
.get(channel)
.bindings()
.get('mqtt')
?.value()
this.client.subscribe(
channel,
{
qos: binding?.qos ? binding.qos : 0,
},
(err, granted) => {
if (err) {
logLineWithIcon(
'x',
`Error while trying to subscribe to \`${channel}\` topic.`,
{
highlightedWords: [channel],
iconColor: '#f00',
disableEmojis: true,
}
)
console.log(err.message)
return
}
logLineWithIcon(
':zap:',
`Subscribed to \`${channel}\` topic with QoS ${granted?.[0].qos}`,
{
highlightedWords: [channel],
}
)
}
logLineWithIcon(':zap:', `Subscribed to \`${channel}\` topic with QoS ${granted?.[0].qos}`, {
highlightedWords: [channel],
})
})
)
})
}

Expand Down Expand Up @@ -216,7 +245,12 @@ class MqttAdapter extends Adapter {

_send(message: GleeMessage): Promise<void> {
return new Promise((resolve, reject) => {
const binding = this.parsedAsyncAPI.channels().get(message.channel).bindings().get('mqtt')?.value()
const binding = this.parsedAsyncAPI
.channels()
.get(message.channel)
.bindings()
.get('mqtt')
?.value()
this.client.publish(
message.channel,
message.payload,
Expand Down

0 comments on commit 18729c4

Please sign in to comment.