From 6211b6dc0ff0131c3ccf20b38760f06b1fd324c3 Mon Sep 17 00:00:00 2001 From: Vu Tien Date: Fri, 29 Dec 2023 17:09:26 +0700 Subject: [PATCH] [alt] using channel for keep block order --- pkg/listener/listener.go | 51 +++++++++------------------------------- 1 file changed, 11 insertions(+), 40 deletions(-) diff --git a/pkg/listener/listener.go b/pkg/listener/listener.go index b8e5725..650e0d8 100644 --- a/pkg/listener/listener.go +++ b/pkg/listener/listener.go @@ -70,39 +70,6 @@ func New( } } -func (l *Listener) publishBlock(ch chan<- types.Block, seq uint64, block *types.Block) { - if l.queue == nil { - ch <- *block - - return - } - - expectedSeq := l.queue.SequenceNumber() - if seq < expectedSeq { - return - } - - if int(seq-expectedSeq) >= l.maxQueueLen { - for i := 0; i <= int(seq-expectedSeq)-l.maxQueueLen; i++ { - b, _ := l.queue.Dequeue() - if b != nil { - ch <- *b - } - } - } - - l.queue.Insert(seq, block) - for !l.queue.Empty() { - b, _ := l.queue.Peek() - if b == nil { - return - } - - ch <- *b - l.queue.Dequeue() - } -} - func (l *Listener) handleNewHeader(ctx context.Context, header *types.Header) (types.Block, error) { var err error var logs []types.Log @@ -221,7 +188,12 @@ func (l *Listener) subscribeNewBlockHead(ctx context.Context, blockCh chan<- typ return err } - seq := uint64(1) + sequenceCh := make(chan chan types.Block, l.maxQueueLen) + go func() { + for orderedBlockCh := range sequenceCh { + blockCh <- (<-orderedBlockCh) + } + }() for { select { case <-ctx.Done(): @@ -246,16 +218,15 @@ func (l *Listener) subscribeNewBlockHead(ctx context.Context, blockCh chan<- typ } l.mu.Unlock() - go func(seq uint64, head *types.Header) { + ch := make(chan types.Block) + sequenceCh <- ch + go func(head *types.Header, ch chan types.Block) { b, err := l.handleNewHeader(ctx, head) if err != nil { l.l.Fatalw("Fail to handle new head", "header", header, "error", err) } - - l.publishBlock(blockCh, seq, &b) - }(seq, header) - - seq++ + ch <- b + }(header, ch) } } }