Skip to content

Commit

Permalink
Made the suggested changes
Browse files Browse the repository at this point in the history
  • Loading branch information
ankur0904 committed Jan 4, 2024
1 parent ad99d53 commit 178bf6f
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 20 deletions.
5 changes: 5 additions & 0 deletions src/adapters/cluster/redis/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ class RedisClusterAdapter extends ClusterAdapter {
}

async _connect(): Promise<this> {
try {
this._channelName = `${this.serverName}-channel`

this._publisher = createClient({
Expand Down Expand Up @@ -62,6 +63,10 @@ class RedisClusterAdapter extends ClusterAdapter {

this.emit('connect', { name: this.name(), adapter: this })
return this
} catch (error) {
console.error('Error connecting to Redis:', error)
throw error
}
}

async _send(message: GleeMessage): Promise<void> {
Expand Down
5 changes: 5 additions & 0 deletions src/adapters/http/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ class HttpAdapter extends Adapter {
}

async _connect(): Promise<this> {
try {
const config = await this.resolveProtocolConfig('http')
const httpOptions = config?.server
const httpServer = httpOptions?.httpServer || http.createServer()
Expand All @@ -173,6 +174,10 @@ class HttpAdapter extends Adapter {
httpServer.listen(port)
this.emit('server:ready', { name: this.name(), adapter: this })
return this
} catch (error) {
console.error(error)
throw error
}
}
_getOperationBindings(channel: ChannelInterface) {
return channel.operations().filterByReceive().map(operation => operation.bindings().get("http")?.json())
Expand Down
44 changes: 24 additions & 20 deletions src/adapters/kafka/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,26 +10,27 @@ class KafkaAdapter extends Adapter {
return 'Kafka adapter'
}

async connect() {
const kafkaOptions: KafkaAdapterConfig = await this.resolveProtocolConfig(
'kafka'
)
const auth: KafkaAuthConfig = await this.getAuthConfig(kafkaOptions?.auth)
const securityRequirements = this.AsyncAPIServer.security().map(
(sec) => {
const secName = Object.keys(sec.values())[0]
return this.parsedAsyncAPI.components().securitySchemes().get(secName)
}
)
const userAndPasswordSecurityReq = securityRequirements.find(
(sec) => sec.type() === 'userPassword'
)
const scramSha256SecurityReq = securityRequirements.find(
(sec) => sec.type() === 'scramSha256'
)
const scramSha512SecurityReq = securityRequirements.find(
(sec) => sec.type() === 'scramSha512'
)
async _connect() {
try {
const kafkaOptions: KafkaAdapterConfig = await this.resolveProtocolConfig(
'kafka'
)
const auth: KafkaAuthConfig = await this.getAuthConfig(kafkaOptions?.auth)
const securityRequirements = this.AsyncAPIServer.security().map(
(sec) => {
const secName = Object.keys(sec.values())[0]
return this.parsedAsyncAPI.components().securitySchemes().get(secName)
}
)
const userAndPasswordSecurityReq = securityRequirements.find(
(sec) => sec.type() === 'userPassword'
)
const scramSha256SecurityReq = securityRequirements.find(
(sec) => sec.type() === 'scramSha256'
)
const scramSha512SecurityReq = securityRequirements.find(
(sec) => sec.type() === 'scramSha512'
)

const brokerUrl = new URL(this.AsyncAPIServer.url())
this.kafka = new Kafka({
Expand Down Expand Up @@ -74,6 +75,9 @@ class KafkaAdapter extends Adapter {
this.emit('message', msg, consumer)
},
})
} catch (error) {
console.error('Error connecting to Kafka:', error)
}
}

async send(message: GleeMessage) {
Expand Down
5 changes: 5 additions & 0 deletions src/adapters/mqtt/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ class MqttAdapter extends Adapter {
}

async _connect(): Promise<this> {
try {
const mqttOptions: MqttAdapterConfig = await this.resolveProtocolConfig(
'mqtt'
)
Expand Down Expand Up @@ -238,6 +239,10 @@ class MqttAdapter extends Adapter {
}

return connectClient()
} catch (error) {
console.error(error)
throw error
}
}

_send(message: GleeMessage): Promise<void> {
Expand Down
5 changes: 5 additions & 0 deletions src/adapters/socket.io/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class SocketIOAdapter extends Adapter {
}

async _connect(): Promise<this> {
try {
const config = await this.resolveProtocolConfig('ws')
const websocketOptions = config?.server
const serverUrl: URL = new URL(this.serverUrlExpanded)
Expand Down Expand Up @@ -73,6 +74,10 @@ class SocketIOAdapter extends Adapter {
this.server.listen(port)
}
return this
} catch (error) {
console.error('An error occurred while connecting:', error)
throw error
}
}

async _send(message: GleeMessage): Promise<void> {
Expand Down
5 changes: 5 additions & 0 deletions src/adapters/ws/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class WsClientAdapter extends Adapter {
}

private async _connect(): Promise<this> {
try{
const channelsOnThisServer = this.getWsChannels()

debug("connecting to ", this.serverName)
Expand Down Expand Up @@ -80,6 +81,10 @@ class WsClientAdapter extends Adapter {
})
}
return this
}catch (error) {
console.error('An error occurred while connecting:', error)
throw error
}
}

private getWsChannels() {
Expand Down

0 comments on commit 178bf6f

Please sign in to comment.