-
-
Notifications
You must be signed in to change notification settings - Fork 62
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: adds try/catch block to increase resilience #666
Changes from 4 commits
ad99d53
178bf6f
1ec5a69
159f3b6
a8affed
d73a9c8
0d6c955
ae99256
7c57144
f0f44f4
3123107
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||
---|---|---|---|---|---|---|---|---|---|---|
|
@@ -14,14 +14,25 @@ class RedisClusterAdapter extends ClusterAdapter { | |||||||||
} | ||||||||||
|
||||||||||
async connect(): Promise<this> { | ||||||||||
return this._connect() | ||||||||||
try{ | ||||||||||
return this._connect() | ||||||||||
} catch (error) { | ||||||||||
console.error('Error connecting to Redis:', error) | ||||||||||
throw error | ||||||||||
} | ||||||||||
} | ||||||||||
|
||||||||||
async send(message: GleeMessage): Promise<void> { | ||||||||||
try{ | ||||||||||
return this._send(message) | ||||||||||
} catch (error) { | ||||||||||
console.error('Error sending message to Redis:', error) | ||||||||||
throw error | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||
} | ||||||||||
} | ||||||||||
|
||||||||||
async _connect(): Promise<this> { | ||||||||||
try { | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we don't want to catch the error since we left the error handling the
Suggested change
|
||||||||||
this._channelName = `${this.serverName}-channel` | ||||||||||
|
||||||||||
this._publisher = createClient({ | ||||||||||
|
@@ -62,6 +73,10 @@ class RedisClusterAdapter extends ClusterAdapter { | |||||||||
|
||||||||||
this.emit('connect', { name: this.name(), adapter: this }) | ||||||||||
return this | ||||||||||
} catch (error) { | ||||||||||
console.error('Error connecting to Redis:', error) | ||||||||||
throw error | ||||||||||
} | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||
} | ||||||||||
|
||||||||||
async _send(message: GleeMessage): Promise<void> { | ||||||||||
|
KhudaDad414 marked this conversation as resolved.
Show resolved
Hide resolved
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -15,13 +15,19 @@ class HttpClientAdapter extends Adapter { | |||||
return 'HTTP client' | ||||||
} | ||||||
async connect(): Promise<this> { | ||||||
this.emit('connect', { | ||||||
name: this.name(), | ||||||
adapter: this, | ||||||
connection: http, | ||||||
channel: this.channelNames, | ||||||
}) | ||||||
return this | ||||||
try { | ||||||
this.emit('connect', { | ||||||
name: this.name(), | ||||||
adapter: this, | ||||||
connection: http, | ||||||
channel: this.channelNames, | ||||||
}) | ||||||
return this | ||||||
} catch (err) { | ||||||
logWarningMessage(`Failed to Connect: An error occurred while connecting to '${this.name()}' on the '${this.channelNames}' channel. Please review the error details below for further information and corrective action.`) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
this.emit('error', err) | ||||||
throw err | ||||||
} | ||||||
} | ||||||
|
||||||
async send(message: GleeMessage): Promise<void> { | ||||||
|
Original file line number | Diff line number | Diff line change | ||||||||
---|---|---|---|---|---|---|---|---|---|---|
|
@@ -17,11 +17,21 @@ class HttpAdapter extends Adapter { | |||||||||
} | ||||||||||
|
||||||||||
async connect(): Promise<this> { | ||||||||||
return this._connect() | ||||||||||
try { | ||||||||||
return this._connect() | ||||||||||
} catch (error) { | ||||||||||
console.error('Error connecting to HTTP:', error) | ||||||||||
throw error | ||||||||||
} | ||||||||||
} | ||||||||||
|
||||||||||
async send(message: GleeMessage): Promise<void> { | ||||||||||
return this._send(message) | ||||||||||
try{ | ||||||||||
return this._send(message) | ||||||||||
} catch (error) { | ||||||||||
console.error('Error sending message to HTTP:', error) | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. also add the channel and the server name to the error so it becomes easier to find the error. |
||||||||||
throw error | ||||||||||
} | ||||||||||
} | ||||||||||
|
||||||||||
async _readRequestBody(req: http.IncomingMessage): Promise<string> { | ||||||||||
|
@@ -164,6 +174,7 @@ class HttpAdapter extends Adapter { | |||||||||
} | ||||||||||
|
||||||||||
async _connect(): Promise<this> { | ||||||||||
try { | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||
const config = await this.resolveProtocolConfig('http') | ||||||||||
const httpOptions = config?.server | ||||||||||
const httpServer = httpOptions?.httpServer || http.createServer() | ||||||||||
|
@@ -173,6 +184,10 @@ class HttpAdapter extends Adapter { | |||||||||
httpServer.listen(port) | ||||||||||
this.emit('server:ready', { name: this.name(), adapter: this }) | ||||||||||
return this | ||||||||||
} catch (error) { | ||||||||||
console.error(error) | ||||||||||
throw error | ||||||||||
} | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||
} | ||||||||||
_getOperationBindings(channel: ChannelInterface) { | ||||||||||
return channel.operations().filterByReceive().map(operation => operation.bindings().get("http")?.json()) | ||||||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,26 +10,27 @@ class KafkaAdapter extends Adapter { | |
return 'Kafka adapter' | ||
} | ||
|
||
async connect() { | ||
const kafkaOptions: KafkaAdapterConfig = await this.resolveProtocolConfig( | ||
'kafka' | ||
) | ||
const auth: KafkaAuthConfig = await this.getAuthConfig(kafkaOptions?.auth) | ||
const securityRequirements = this.AsyncAPIServer.security().map( | ||
(sec) => { | ||
const secName = Object.keys(sec.values())[0] | ||
return this.parsedAsyncAPI.components().securitySchemes().get(secName) | ||
} | ||
) | ||
const userAndPasswordSecurityReq = securityRequirements.find( | ||
(sec) => sec.type() === 'userPassword' | ||
) | ||
const scramSha256SecurityReq = securityRequirements.find( | ||
(sec) => sec.type() === 'scramSha256' | ||
) | ||
const scramSha512SecurityReq = securityRequirements.find( | ||
(sec) => sec.type() === 'scramSha512' | ||
) | ||
async _connect() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you have removed the public method. the functionality won't work anymore. |
||
try { | ||
const kafkaOptions: KafkaAdapterConfig = await this.resolveProtocolConfig( | ||
'kafka' | ||
) | ||
const auth: KafkaAuthConfig = await this.getAuthConfig(kafkaOptions?.auth) | ||
const securityRequirements = this.AsyncAPIServer.security().map( | ||
(sec) => { | ||
const secName = Object.keys(sec.values())[0] | ||
return this.parsedAsyncAPI.components().securitySchemes().get(secName) | ||
} | ||
) | ||
const userAndPasswordSecurityReq = securityRequirements.find( | ||
(sec) => sec.type() === 'userPassword' | ||
) | ||
const scramSha256SecurityReq = securityRequirements.find( | ||
(sec) => sec.type() === 'scramSha256' | ||
) | ||
const scramSha512SecurityReq = securityRequirements.find( | ||
(sec) => sec.type() === 'scramSha512' | ||
) | ||
|
||
const brokerUrl = new URL(this.AsyncAPIServer.url()) | ||
this.kafka = new Kafka({ | ||
|
@@ -74,6 +75,9 @@ class KafkaAdapter extends Adapter { | |
this.emit('message', msg, consumer) | ||
}, | ||
}) | ||
} catch (error) { | ||
console.error('Error connecting to Kafka:', error) | ||
} | ||
} | ||
|
||
async send(message: GleeMessage) { | ||
|
Original file line number | Diff line number | Diff line change | ||||||||
---|---|---|---|---|---|---|---|---|---|---|
|
@@ -37,11 +37,21 @@ class MqttAdapter extends Adapter { | |||||||||
} | ||||||||||
|
||||||||||
async connect(): Promise<this> { | ||||||||||
return this._connect() | ||||||||||
try{ | ||||||||||
return this._connect() | ||||||||||
} catch(error){ | ||||||||||
console.error('Error connecting to MQTT:', error) | ||||||||||
throw error | ||||||||||
} | ||||||||||
} | ||||||||||
|
||||||||||
async send(message: GleeMessage) { | ||||||||||
return this._send(message) | ||||||||||
try{ | ||||||||||
return this._send(message) | ||||||||||
} catch(error){ | ||||||||||
console.error('Error sending message to MQTT:', error) | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. just like my previouse suggestion, please add the channel and the server where the error occured. |
||||||||||
throw error | ||||||||||
} | ||||||||||
} | ||||||||||
|
||||||||||
private getSecurityReqs() { | ||||||||||
|
@@ -186,6 +196,7 @@ class MqttAdapter extends Adapter { | |||||||||
} | ||||||||||
|
||||||||||
async _connect(): Promise<this> { | ||||||||||
try { | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||
const mqttOptions: MqttAdapterConfig = await this.resolveProtocolConfig( | ||||||||||
'mqtt' | ||||||||||
) | ||||||||||
|
@@ -238,6 +249,10 @@ class MqttAdapter extends Adapter { | |||||||||
} | ||||||||||
|
||||||||||
return connectClient() | ||||||||||
} catch (error) { | ||||||||||
console.error(error) | ||||||||||
throw error | ||||||||||
} | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||
} | ||||||||||
|
||||||||||
_send(message: GleeMessage): Promise<void> { | ||||||||||
|
Original file line number | Diff line number | Diff line change | ||||||||
---|---|---|---|---|---|---|---|---|---|---|
|
@@ -10,14 +10,25 @@ class SocketIOAdapter extends Adapter { | |||||||||
} | ||||||||||
|
||||||||||
async connect(): Promise<this> { | ||||||||||
return this._connect() | ||||||||||
try { | ||||||||||
return this._connect() | ||||||||||
} catch(error){ | ||||||||||
console.error('Error connecting to Socket.IO:', error) | ||||||||||
throw error | ||||||||||
} | ||||||||||
} | ||||||||||
|
||||||||||
async send(message: GleeMessage): Promise<void> { | ||||||||||
return this._send(message) | ||||||||||
try{ | ||||||||||
return this._send(message) | ||||||||||
} catch(error){ | ||||||||||
console.error('Error sending message to Socket.IO:', error) | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same here. |
||||||||||
throw error | ||||||||||
} | ||||||||||
} | ||||||||||
|
||||||||||
async _connect(): Promise<this> { | ||||||||||
try { | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||
const config = await this.resolveProtocolConfig('ws') | ||||||||||
const websocketOptions = config?.server | ||||||||||
const serverUrl: URL = new URL(this.serverUrlExpanded) | ||||||||||
|
@@ -73,6 +84,10 @@ class SocketIOAdapter extends Adapter { | |||||||||
this.server.listen(port) | ||||||||||
} | ||||||||||
return this | ||||||||||
} catch (error) { | ||||||||||
console.error('An error occurred while connecting:', error) | ||||||||||
throw error | ||||||||||
} | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||
} | ||||||||||
|
||||||||||
async _send(message: GleeMessage): Promise<void> { | ||||||||||
|
Original file line number | Diff line number | Diff line change | ||||||||
---|---|---|---|---|---|---|---|---|---|---|
|
@@ -21,14 +21,25 @@ class WsClientAdapter extends Adapter { | |||||||||
} | ||||||||||
|
||||||||||
async connect(): Promise<this> { | ||||||||||
return this._connect() | ||||||||||
try{ | ||||||||||
return this._connect() | ||||||||||
} catch(error){ | ||||||||||
console.error('Error connecting to WS:', error) | ||||||||||
throw error | ||||||||||
} | ||||||||||
} | ||||||||||
|
||||||||||
async send(message: GleeMessage) { | ||||||||||
return this._send(message) | ||||||||||
try{ | ||||||||||
return this._send(message) | ||||||||||
} catch(error){ | ||||||||||
console.error('Error sending message to WS:', error) | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. also here. |
||||||||||
throw error | ||||||||||
} | ||||||||||
} | ||||||||||
|
||||||||||
private async _connect(): Promise<this> { | ||||||||||
try{ | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||
const channelsOnThisServer = this.getWsChannels() | ||||||||||
|
||||||||||
debug("connecting to ", this.serverName) | ||||||||||
|
@@ -81,6 +92,10 @@ class WsClientAdapter extends Adapter { | |||||||||
}) | ||||||||||
} | ||||||||||
return this | ||||||||||
}catch (error) { | ||||||||||
console.error('An error occurred while connecting:', error) | ||||||||||
throw error | ||||||||||
} | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||
} | ||||||||||
|
||||||||||
private getWsChannels() { | ||||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.