diff --git a/pkg/listener/listener.go b/pkg/listener/listener.go index 5e87805..a32f0be 100644 --- a/pkg/listener/listener.go +++ b/pkg/listener/listener.go @@ -28,6 +28,8 @@ const ( metricNameLastHandledBlockNumber = "evmlistener_last_handled_block_number" ) +var errConnectionCorrupted = errors.New("connection is corrupted") + // Listener represents a listener service for on-chain events. type Listener struct { l *zap.SugaredLogger @@ -212,6 +214,7 @@ func (l *Listener) handleOldHeaders(ctx context.Context, blockCh chan<- types.Bl return nil } +//nolint:cyclop func (l *Listener) subscribeNewBlockHead(ctx context.Context, blockCh chan<- types.Block) error { l.l.Info("Start subscribing for new head of the chain") headerCh := make(chan *types.Header, 1) @@ -231,6 +234,10 @@ func (l *Listener) subscribeNewBlockHead(ctx context.Context, blockCh chan<- typ return err } + ticker := time.NewTicker(l.sanityCheckInterval) + defer ticker.Stop() + + lastReceivedTime := time.Now() seq := uint64(1) for { select { @@ -242,9 +249,15 @@ func (l *Listener) subscribeNewBlockHead(ctx context.Context, blockCh chan<- typ l.l.Errorw("Error while subscribing new head", "error", err) return err + case <-ticker.C: + if time.Since(lastReceivedTime) > l.sanityCheckInterval { + l.l.Errorw("Websocket connection is corrupted", "lastReceivedTime", lastReceivedTime) + return errConnectionCorrupted + } case header := <-headerCh: l.l.Debugw("Receive new head of the chain", "header", header) + lastReceivedTime = time.Now() l.mu.Lock() if l.lastReceivedBlock == nil || l.lastReceivedBlock.Timestamp < header.Time { l.lastReceivedBlock = &types.Block{ @@ -282,7 +295,8 @@ func (l *Listener) syncBlocks(ctx context.Context, blockCh chan types.Block) err websocket.CloseNormalClosure, websocket.CloseServiceRestart) && !errors.Is(err, syscall.ECONNRESET) && !errors.Is(err, ethereum.NotFound) && - err.Error() != errStringUnknownBlock { + err.Error() != errStringUnknownBlock && + !errors.Is(err, errConnectionCorrupted) { return err }