Skip to content

Commit

Permalink
Check for websocket can receive block normally
Browse files Browse the repository at this point in the history
  • Loading branch information
hiepnv90 committed Jan 8, 2024
1 parent c6fa296 commit 18f1d7b
Showing 1 changed file with 15 additions and 1 deletion.
16 changes: 15 additions & 1 deletion pkg/listener/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ const (
metricNameLastHandledBlockNumber = "evmlistener_last_handled_block_number"
)

var errConnectionCorrupted = errors.New("connection is corrupted")

// Listener represents a listener service for on-chain events.
type Listener struct {
l *zap.SugaredLogger
Expand Down Expand Up @@ -212,6 +214,7 @@ func (l *Listener) handleOldHeaders(ctx context.Context, blockCh chan<- types.Bl
return nil
}

//nolint:cyclop
func (l *Listener) subscribeNewBlockHead(ctx context.Context, blockCh chan<- types.Block) error {
l.l.Info("Start subscribing for new head of the chain")
headerCh := make(chan *types.Header, 1)
Expand All @@ -231,6 +234,10 @@ func (l *Listener) subscribeNewBlockHead(ctx context.Context, blockCh chan<- typ
return err
}

ticker := time.NewTicker(l.sanityCheckInterval)
defer ticker.Stop()

lastReceivedTime := time.Now()
seq := uint64(1)
for {
select {
Expand All @@ -242,9 +249,15 @@ func (l *Listener) subscribeNewBlockHead(ctx context.Context, blockCh chan<- typ
l.l.Errorw("Error while subscribing new head", "error", err)

return err
case <-ticker.C:
if time.Since(lastReceivedTime) > l.sanityCheckInterval {
l.l.Errorw("Websocket connection is corrupted", "lastReceivedTime", lastReceivedTime)
return errConnectionCorrupted

Check failure on line 255 in pkg/listener/listener.go

View workflow job for this annotation

GitHub Actions / Run golangci-lint

return with no blank line before (nlreturn)
}
case header := <-headerCh:
l.l.Debugw("Receive new head of the chain", "header", header)

lastReceivedTime = time.Now()
l.mu.Lock()
if l.lastReceivedBlock == nil || l.lastReceivedBlock.Timestamp < header.Time {
l.lastReceivedBlock = &types.Block{
Expand Down Expand Up @@ -282,7 +295,8 @@ func (l *Listener) syncBlocks(ctx context.Context, blockCh chan types.Block) err
websocket.CloseNormalClosure, websocket.CloseServiceRestart) &&
!errors.Is(err, syscall.ECONNRESET) &&
!errors.Is(err, ethereum.NotFound) &&
err.Error() != errStringUnknownBlock {
err.Error() != errStringUnknownBlock &&
!errors.Is(err, errConnectionCorrupted) {
return err
}

Expand Down

0 comments on commit 18f1d7b

Please sign in to comment.