Skip to content

Commit

Permalink
Recreate listener if error is occured
Browse files Browse the repository at this point in the history
Signed-off-by: clyang82 <[email protected]>
  • Loading branch information
clyang82 committed Dec 18, 2024
1 parent eddc46d commit dba9b3d
Showing 1 changed file with 7 additions and 6 deletions.
13 changes: 7 additions & 6 deletions pkg/db/db_session/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func (f *Default) DirectDB() *sql.DB {
return f.db
}

func waitForNotification(ctx context.Context, l *pq.Listener, callback func(id string)) {
func waitForNotification(ctx context.Context, l *pq.Listener, dbConfig *config.DatabaseConfig, channel string, callback func(id string)) {
logger := ocmlogger.NewOCMLogger(ctx)
for {
select {
Expand All @@ -153,10 +153,11 @@ func waitForNotification(ctx context.Context, l *pq.Listener, callback func(id s
case <-time.After(10 * time.Second):
logger.V(10).Infof("Received no events on channel during interval. Pinging source")
go func() {
// TODO: Need to handle the error, especially in cases of network failure.
err := l.Ping()
if err != nil {
logger.Error(err.Error())
if err := l.Ping(); err != nil {
logger.Infof("recreate the listener due to %s", err.Error())
l.Close()
// recreate the listener
newListener(ctx, dbConfig, channel, callback)
}
}()
}
Expand Down Expand Up @@ -190,7 +191,7 @@ func newListener(ctx context.Context, dbConfig *config.DatabaseConfig, channel s
}

logger.Infof("Starting channeling monitor for %s", channel)
waitForNotification(ctx, listener, callback)
waitForNotification(ctx, listener, dbConfig, channel, callback)
}

func (f *Default) NewListener(ctx context.Context, channel string, callback func(id string)) {
Expand Down

0 comments on commit dba9b3d

Please sign in to comment.