Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: adds try/catch block to increase resilience #666

Merged
merged 11 commits into from
Mar 18, 2024
17 changes: 16 additions & 1 deletion src/adapters/cluster/redis/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,25 @@ class RedisClusterAdapter extends ClusterAdapter {
}

async connect(): Promise<this> {
return this._connect()
try{
return this._connect()
} catch (error) {
console.error('Error connecting to Redis:', error)
throw error
}
}

async send(message: GleeMessage): Promise<void> {
try{
return this._send(message)
} catch (error) {
console.error('Error sending message to Redis:', error)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
console.error('Error sending message to Redis:', error)
const errorMessage = `Failed to send message on channel '${message.channel}' to server '${message.serverName}'`;
this.emit('error', new Error(errorMessage));
this.emit('error', error)

throw error
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
throw error

}
}

async _connect(): Promise<this> {
try {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't want to catch the error since we left the error handling the _connect method. errors thrown by this function will be handled by the connect method.

Suggested change
try {

this._channelName = `${this.serverName}-channel`

this._publisher = createClient({
Expand Down Expand Up @@ -62,6 +73,10 @@ class RedisClusterAdapter extends ClusterAdapter {

this.emit('connect', { name: this.name(), adapter: this })
return this
} catch (error) {
console.error('Error connecting to Redis:', error)
throw error
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
} catch (error) {
console.error('Error connecting to Redis:', error)
throw error
}

}

async _send(message: GleeMessage): Promise<void> {
Expand Down
20 changes: 13 additions & 7 deletions src/adapters/http/client.ts
KhudaDad414 marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,19 @@ class HttpClientAdapter extends Adapter {
return 'HTTP client'
}
async connect(): Promise<this> {
this.emit('connect', {
name: this.name(),
adapter: this,
connection: http,
channel: this.channelNames,
})
return this
try {
this.emit('connect', {
name: this.name(),
adapter: this,
connection: http,
channel: this.channelNames,
})
return this
} catch (err) {
logWarningMessage(`Failed to Connect: An error occurred while connecting to '${this.name()}' on the '${this.channelNames}' channel. Please review the error details below for further information and corrective action.`)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
logWarningMessage(`Failed to Connect: An error occurred while connecting to '${this.name()}' on the '${this.channelNames}' channel. Please review the error details below for further information and corrective action.`)
logWarningMessage(`Failed to Connect: An error occurred while connecting to '${this.name()}'. Please review the error details below for further information and corrective action.`)

this.emit('error', err)
throw err
}
}

async send(message: GleeMessage): Promise<void> {
Expand Down
19 changes: 17 additions & 2 deletions src/adapters/http/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,21 @@ class HttpAdapter extends Adapter {
}

async connect(): Promise<this> {
return this._connect()
try {
return this._connect()
} catch (error) {
console.error('Error connecting to HTTP:', error)
throw error
}
}

async send(message: GleeMessage): Promise<void> {
return this._send(message)
try{
return this._send(message)
} catch (error) {
console.error('Error sending message to HTTP:', error)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also add the channel and the server name to the error so it becomes easier to find the error.

throw error
}
}

async _readRequestBody(req: http.IncomingMessage): Promise<string> {
Expand Down Expand Up @@ -164,6 +174,7 @@ class HttpAdapter extends Adapter {
}

async _connect(): Promise<this> {
try {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
try {

const config = await this.resolveProtocolConfig('http')
const httpOptions = config?.server
const httpServer = httpOptions?.httpServer || http.createServer()
Expand All @@ -173,6 +184,10 @@ class HttpAdapter extends Adapter {
httpServer.listen(port)
this.emit('server:ready', { name: this.name(), adapter: this })
return this
} catch (error) {
console.error(error)
throw error
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
} catch (error) {
console.error(error)
throw error
}

}
_getOperationBindings(channel: ChannelInterface) {
return channel.operations().filterByReceive().map(operation => operation.bindings().get("http")?.json())
Expand Down
44 changes: 24 additions & 20 deletions src/adapters/kafka/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,26 +10,27 @@ class KafkaAdapter extends Adapter {
return 'Kafka adapter'
}

async connect() {
const kafkaOptions: KafkaAdapterConfig = await this.resolveProtocolConfig(
'kafka'
)
const auth: KafkaAuthConfig = await this.getAuthConfig(kafkaOptions?.auth)
const securityRequirements = this.AsyncAPIServer.security().map(
(sec) => {
const secName = Object.keys(sec.values())[0]
return this.parsedAsyncAPI.components().securitySchemes().get(secName)
}
)
const userAndPasswordSecurityReq = securityRequirements.find(
(sec) => sec.type() === 'userPassword'
)
const scramSha256SecurityReq = securityRequirements.find(
(sec) => sec.type() === 'scramSha256'
)
const scramSha512SecurityReq = securityRequirements.find(
(sec) => sec.type() === 'scramSha512'
)
async _connect() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you have removed the public method. the functionality won't work anymore. connect method is used for error handling and _connect for the actual logic. we always call the _connect method from the connect method.

try {
const kafkaOptions: KafkaAdapterConfig = await this.resolveProtocolConfig(
'kafka'
)
const auth: KafkaAuthConfig = await this.getAuthConfig(kafkaOptions?.auth)
const securityRequirements = this.AsyncAPIServer.security().map(
(sec) => {
const secName = Object.keys(sec.values())[0]
return this.parsedAsyncAPI.components().securitySchemes().get(secName)
}
)
const userAndPasswordSecurityReq = securityRequirements.find(
(sec) => sec.type() === 'userPassword'
)
const scramSha256SecurityReq = securityRequirements.find(
(sec) => sec.type() === 'scramSha256'
)
const scramSha512SecurityReq = securityRequirements.find(
(sec) => sec.type() === 'scramSha512'
)

const brokerUrl = new URL(this.AsyncAPIServer.url())
this.kafka = new Kafka({
Expand Down Expand Up @@ -74,6 +75,9 @@ class KafkaAdapter extends Adapter {
this.emit('message', msg, consumer)
},
})
} catch (error) {
console.error('Error connecting to Kafka:', error)
}
}

async send(message: GleeMessage) {
Expand Down
19 changes: 17 additions & 2 deletions src/adapters/mqtt/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,21 @@ class MqttAdapter extends Adapter {
}

async connect(): Promise<this> {
return this._connect()
try{
return this._connect()
} catch(error){
console.error('Error connecting to MQTT:', error)
throw error
}
}

async send(message: GleeMessage) {
return this._send(message)
try{
return this._send(message)
} catch(error){
console.error('Error sending message to MQTT:', error)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just like my previouse suggestion, please add the channel and the server where the error occured.

throw error
}
}

private getSecurityReqs() {
Expand Down Expand Up @@ -186,6 +196,7 @@ class MqttAdapter extends Adapter {
}

async _connect(): Promise<this> {
try {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
try {

const mqttOptions: MqttAdapterConfig = await this.resolveProtocolConfig(
'mqtt'
)
Expand Down Expand Up @@ -238,6 +249,10 @@ class MqttAdapter extends Adapter {
}

return connectClient()
} catch (error) {
console.error(error)
throw error
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
} catch (error) {
console.error(error)
throw error
}

}

_send(message: GleeMessage): Promise<void> {
Expand Down
19 changes: 17 additions & 2 deletions src/adapters/socket.io/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,25 @@ class SocketIOAdapter extends Adapter {
}

async connect(): Promise<this> {
return this._connect()
try {
return this._connect()
} catch(error){
console.error('Error connecting to Socket.IO:', error)
throw error
}
}

async send(message: GleeMessage): Promise<void> {
return this._send(message)
try{
return this._send(message)
} catch(error){
console.error('Error sending message to Socket.IO:', error)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here.

throw error
}
}

async _connect(): Promise<this> {
try {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
try {

const config = await this.resolveProtocolConfig('ws')
const websocketOptions = config?.server
const serverUrl: URL = new URL(this.serverUrlExpanded)
Expand Down Expand Up @@ -73,6 +84,10 @@ class SocketIOAdapter extends Adapter {
this.server.listen(port)
}
return this
} catch (error) {
console.error('An error occurred while connecting:', error)
throw error
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
} catch (error) {
console.error('An error occurred while connecting:', error)
throw error
}

}

async _send(message: GleeMessage): Promise<void> {
Expand Down
19 changes: 17 additions & 2 deletions src/adapters/ws/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,25 @@ class WsClientAdapter extends Adapter {
}

async connect(): Promise<this> {
return this._connect()
try{
return this._connect()
} catch(error){
console.error('Error connecting to WS:', error)
throw error
}
}

async send(message: GleeMessage) {
return this._send(message)
try{
return this._send(message)
} catch(error){
console.error('Error sending message to WS:', error)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also here.

throw error
}
}

private async _connect(): Promise<this> {
try{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
try{

const channelsOnThisServer = this.getWsChannels()

debug("connecting to ", this.serverName)
Expand Down Expand Up @@ -81,6 +92,10 @@ class WsClientAdapter extends Adapter {
})
}
return this
}catch (error) {
console.error('An error occurred while connecting:', error)
throw error
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
}catch (error) {
console.error('An error occurred while connecting:', error)
throw error
}

}

private getWsChannels() {
Expand Down
Loading