Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: adds try/catch block to increase resilience #666

Merged
merged 11 commits into from
Mar 18, 2024
12 changes: 11 additions & 1 deletion src/adapters/cluster/redis/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,21 @@ class RedisClusterAdapter extends ClusterAdapter {
}

async connect(): Promise<this> {
return this._connect()
try{
return this._connect()
} catch (error) {
console.error('Error connecting to Redis:', error)
throw error
}
}

async send(message: GleeMessage): Promise<void> {
try{
return this._send(message)
} catch (error) {
console.error('Error sending message to Redis:', error)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
console.error('Error sending message to Redis:', error)
const errorMessage = `Failed to send message on channel '${message.channel}' to server '${message.serverName}'`;
this.emit('error', new Error(errorMessage));
this.emit('error', error)

throw error
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
throw error

}
}

async _connect(): Promise<this> {
Expand Down
20 changes: 13 additions & 7 deletions src/adapters/http/client.ts
KhudaDad414 marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,19 @@ class HttpClientAdapter extends Adapter {
return 'HTTP client'
}
async connect(): Promise<this> {
this.emit('connect', {
name: this.name(),
adapter: this,
connection: http,
channel: this.channelNames,
})
return this
try {
this.emit('connect', {
name: this.name(),
adapter: this,
connection: http,
channel: this.channelNames,
})
return this
} catch (err) {
logWarningMessage(`Failed to Connect: An error occurred while connecting to '${this.name()}' on the '${this.channelNames}' channel. Please review the error details below for further information and corrective action.`)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
logWarningMessage(`Failed to Connect: An error occurred while connecting to '${this.name()}' on the '${this.channelNames}' channel. Please review the error details below for further information and corrective action.`)
logWarningMessage(`Failed to Connect: An error occurred while connecting to '${this.name()}'. Please review the error details below for further information and corrective action.`)

this.emit('error', err)
throw err
}
}

async send(message: GleeMessage): Promise<void> {
Expand Down
14 changes: 12 additions & 2 deletions src/adapters/http/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,21 @@ class HttpAdapter extends Adapter {
}

async connect(): Promise<this> {
return this._connect()
try {
return this._connect()
} catch (error) {
console.error('Error connecting to HTTP:', error)
throw error
}
}

async send(message: GleeMessage): Promise<void> {
return this._send(message)
try{
return this._send(message)
} catch (error) {
console.error('Error sending message to HTTP:', error)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also add the channel and the server name to the error so it becomes easier to find the error.

throw error
}
}

async _readRequestBody(req: http.IncomingMessage): Promise<string> {
Expand Down
49 changes: 29 additions & 20 deletions src/adapters/kafka/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,26 +10,35 @@ class KafkaAdapter extends Adapter {
return 'Kafka adapter'
}

async connect() {
const kafkaOptions: KafkaAdapterConfig = await this.resolveProtocolConfig(
'kafka'
)
const auth: KafkaAuthConfig = await this.getAuthConfig(kafkaOptions?.auth)
const securityRequirements = this.AsyncAPIServer.security().map(
(sec) => {
const secName = Object.keys(sec.values())[0]
return this.parsedAsyncAPI.components().securitySchemes().get(secName)
}
)
const userAndPasswordSecurityReq = securityRequirements.find(
(sec) => sec.type() === 'userPassword'
)
const scramSha256SecurityReq = securityRequirements.find(
(sec) => sec.type() === 'scramSha256'
)
const scramSha512SecurityReq = securityRequirements.find(
(sec) => sec.type() === 'scramSha512'
)
async connect(): Promise<this> {
try {
return this._connect()
} catch (error) {
console.error('Error connecting to Kafka:', error)
throw error
}
}

async _connect() {
const kafkaOptions: KafkaAdapterConfig = await this.resolveProtocolConfig(
'kafka'
)
const auth: KafkaAuthConfig = await this.getAuthConfig(kafkaOptions?.auth)
const securityRequirements = this.AsyncAPIServer.security().map(
(sec) => {
const secName = Object.keys(sec.values())[0]
return this.parsedAsyncAPI.components().securitySchemes().get(secName)
}
)
const userAndPasswordSecurityReq = securityRequirements.find(
(sec) => sec.type() === 'userPassword'
)
const scramSha256SecurityReq = securityRequirements.find(
(sec) => sec.type() === 'scramSha256'
)
const scramSha512SecurityReq = securityRequirements.find(
(sec) => sec.type() === 'scramSha512'
)

const brokerUrl = new URL(this.AsyncAPIServer.url())
this.kafka = new Kafka({
Expand Down
15 changes: 13 additions & 2 deletions src/adapters/mqtt/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,21 @@ class MqttAdapter extends Adapter {
}

async connect(): Promise<this> {
return this._connect()
try{
return this._connect()
} catch(error){
console.error('Error connecting to MQTT:', error)
throw error
}
}

async send(message: GleeMessage) {
return this._send(message)
try{
return this._send(message)
} catch(error){
console.error('Error sending message to MQTT:', error)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just like my previouse suggestion, please add the channel and the server where the error occured.

throw error
}
}

private getSecurityReqs() {
Expand Down Expand Up @@ -238,6 +248,7 @@ class MqttAdapter extends Adapter {
}

return connectClient()

}

_send(message: GleeMessage): Promise<void> {
Expand Down
14 changes: 12 additions & 2 deletions src/adapters/socket.io/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,21 @@ class SocketIOAdapter extends Adapter {
}

async connect(): Promise<this> {
return this._connect()
try {
return this._connect()
} catch(error){
console.error('Error connecting to Socket.IO:', error)
throw error
}
}

async send(message: GleeMessage): Promise<void> {
return this._send(message)
try{
return this._send(message)
} catch(error){
console.error('Error sending message to Socket.IO:', error)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here.

throw error
}
}

async _connect(): Promise<this> {
Expand Down
14 changes: 12 additions & 2 deletions src/adapters/ws/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,21 @@ class WsClientAdapter extends Adapter {
}

async connect(): Promise<this> {
return this._connect()
try{
return this._connect()
} catch(error){
console.error('Error connecting to WS:', error)
throw error
}
}

async send(message: GleeMessage) {
return this._send(message)
try{
return this._send(message)
} catch(error){
console.error('Error sending message to WS:', error)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also here.

throw error
}
}

private async _connect(): Promise<this> {
Expand Down