diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index e85e580..233b3d4 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -77,7 +77,7 @@ jobs: - name: Install Go uses: actions/setup-go@v3 with: - go-version: "1.21.x" + go-version: "1.21.4" - name: Checkout uses: actions/checkout@v3 - name: golangci-lint @@ -103,7 +103,7 @@ jobs: - name: Install Go uses: actions/setup-go@v3 with: - go-version: "1.21.x" + go-version: "1.21.4" - name: Run test run: go test -race -v ./... diff --git a/go.mod b/go.mod index e42ed62..00d1bfa 100644 --- a/go.mod +++ b/go.mod @@ -2,7 +2,7 @@ module github.com/KyberNetwork/evmlistener go 1.21.0 -toolchain go1.21.1 +toolchain go1.21.4 require ( github.com/KyberNetwork/kyber-trace-go v0.1.1 diff --git a/internal/app/app.go b/internal/app/app.go index 4a41250..b006d9a 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -98,9 +98,11 @@ func NewListener(c *cli.Context) (*listener.Listener, error) { redisStream := redis.NewStream(redisClient, maxLen) topic := c.String(publisherTopicFlag.Name) - handler := listener.NewHandler(l, topic, httpEVMClient, blockKeeper, redisStream) + handler := listener.NewHandler(l, topic, httpEVMClient, blockKeeper, redisStream, + listener.WithEventLogs(nil, nil)) - return listener.New(l, wsEVMClient, httpEVMClient, handler, sanityEVMClient, sanityCheckInterval), nil + return listener.New(l, wsEVMClient, httpEVMClient, handler, sanityEVMClient, sanityCheckInterval, + listener.WithEventLogs(nil, nil)), nil } const ( diff --git a/pkg/listener/evm_client_mock_test.go b/pkg/listener/evm_client_mock_test.go index 15462ad..aa2030d 100644 --- a/pkg/listener/evm_client_mock_test.go +++ b/pkg/listener/evm_client_mock_test.go @@ -71,11 +71,11 @@ func (c *EVMClientMock) Next() { defer c.mu.Unlock() c.head++ - + currentHead := c.head // Publish new head for subscriptions. for _, ch := range c.subHeadChs { go func(ch chan<- *types.Header) { - header := c.headerMap[c.sequence[c.head]] + header := c.headerMap[c.sequence[currentHead]] ch <- &types.Header{ Hash: common.ToHex(header.Hash()), ParentHash: common.ToHex(header.ParentHash), @@ -108,10 +108,10 @@ func (c *EVMClientMock) SubscribeNewHead(ctx context.Context, ch chan<- *types.H defer c.mu.Unlock() c.subHeadChs = append(c.subHeadChs, ch) - + currentHead := c.sequence[c.head] // Publish current head to the channel. go func() { - header := c.headerMap[c.sequence[c.head]] + header := c.headerMap[currentHead] ch <- &types.Header{ Hash: common.ToHex(header.Hash()), ParentHash: common.ToHex(header.ParentHash), diff --git a/pkg/listener/filter_option.go b/pkg/listener/filter_option.go new file mode 100644 index 0000000..9de0beb --- /dev/null +++ b/pkg/listener/filter_option.go @@ -0,0 +1,17 @@ +package listener + +type Option func(opt *FilterOption) + +type FilterOption struct { + filterContracts []string + filterTopics [][]string + withLogs bool +} + +func WithEventLogs(contracts []string, topics [][]string) Option { + return func(opt *FilterOption) { + opt.withLogs = true + opt.filterContracts = contracts + opt.filterTopics = topics + } +} diff --git a/pkg/listener/handler.go b/pkg/listener/handler.go index 8b86cdb..217bd46 100644 --- a/pkg/listener/handler.go +++ b/pkg/listener/handler.go @@ -19,19 +19,26 @@ type Handler struct { blockKeeper block.Keeper publisher pubsub.Publisher l *zap.SugaredLogger + option *FilterOption } // NewHandler ... func NewHandler( l *zap.SugaredLogger, topic string, evmClient evmclient.IClient, - blockKeeper block.Keeper, publisher pubsub.Publisher, + blockKeeper block.Keeper, publisher pubsub.Publisher, options ...Option, ) *Handler { + var opts FilterOption + for _, v := range options { + v(&opts) + } + return &Handler{ topic: topic, evmClient: evmClient, blockKeeper: blockKeeper, publisher: publisher, l: l, + option: &opts, } } @@ -79,7 +86,8 @@ func (h *Handler) Init(ctx context.Context) error { fromBlock := toBlock - uint64(h.blockKeeper.Cap()) + 1 h.l.Infow("Get blocks from node", "from", fromBlock, "to", toBlock) - blocks, err := getBlocks(ctx, h.evmClient, fromBlock, toBlock) + blocks, err := GetBlocks(ctx, h.evmClient, fromBlock, toBlock, h.option.withLogs, + h.option.filterContracts, h.option.filterTopics) if err != nil { h.l.Errorw("Fail to get blocks", "from", fromBlock, "to", toBlock, "error", err) @@ -112,7 +120,8 @@ func (h *Handler) getBlock(ctx context.Context, hash string) (types.Block, error return types.Block{}, err } - b, err = getBlockByHash(ctx, h.evmClient, hash) + b, err = getBlockByHash(ctx, h.evmClient, hash, h.option.withLogs, h.option.filterContracts, + h.option.filterTopics) if err != nil { h.l.Errorw("Fail to get block from ndoe", "hash", hash, "error", err) diff --git a/pkg/listener/handler_test.go b/pkg/listener/handler_test.go index 93bad43..363100f 100644 --- a/pkg/listener/handler_test.go +++ b/pkg/listener/handler_test.go @@ -76,7 +76,7 @@ func (ts *HandlerTestSuite) TestHandle() { // Handle for normal block (chain was not re-organized). ts.evmClient.Next() hash := "0xc0c29448be86bca9d0db94b79cd1a6bd1361aed1e394d3a2a218fb98b159ab74" - b, err = getBlockByHash(context.Background(), ts.evmClient, hash) + b, err = getBlockByHash(context.Background(), ts.evmClient, hash, true, nil, nil) ts.Require().NoError(err) err = ts.handler.Handle(context.Background(), b) @@ -93,7 +93,7 @@ func (ts *HandlerTestSuite) TestHandle() { // Handle for far away block (lost connection). ts.evmClient.SetHead(52) hash = "0x132c1eb1799a5219b055674177ba95e946feb5f011c7c1409630d42c0581ee52" - b, err = getBlockByHash(context.Background(), ts.evmClient, hash) + b, err = getBlockByHash(context.Background(), ts.evmClient, hash, true, nil, nil) ts.Require().NoError(err) err = ts.handler.Handle(context.Background(), b) @@ -113,7 +113,7 @@ func (ts *HandlerTestSuite) TestHandle() { ts.Require().NoError(err) hash = "0xfe5db0e13993eb721f8174edc783e92dcee70e5a2eb3cd87e8b6c7ba5ab24986" - b, err = getBlockByHash(context.Background(), ts.evmClient, hash) + b, err = getBlockByHash(context.Background(), ts.evmClient, hash, true, nil, nil) ts.Require().NoError(err) err = ts.handler.Handle(context.Background(), b) @@ -140,7 +140,7 @@ func (ts *HandlerTestSuite) TestHandle() { ts.evmClient.Next() hash = "0x2394b0b03959156ec90096deadd34f68195a8d8f5f1e5438ea237be7675178c2" - b, err = getBlockByHash(context.Background(), ts.evmClient, hash) + b, err = getBlockByHash(context.Background(), ts.evmClient, hash, true, nil, nil) ts.Require().NoError(err) err = ts.handler.Handle(context.Background(), b) diff --git a/pkg/listener/listener.go b/pkg/listener/listener.go index b8e5725..5e87805 100644 --- a/pkg/listener/listener.go +++ b/pkg/listener/listener.go @@ -47,14 +47,21 @@ type Listener struct { queue *Queue maxQueueLen int + + option FilterOption } // New ... func New( l *zap.SugaredLogger, wsEVMClient evmclient.IClient, httpEVMClient evmclient.IClient, handler *Handler, - sanityEVMClient evmclient.IClient, sanityCheckInterval time.Duration, + sanityEVMClient evmclient.IClient, sanityCheckInterval time.Duration, opts ...Option, ) *Listener { + var o FilterOption + for _, v := range opts { + v(&o) + } + return &Listener{ l: l, @@ -67,6 +74,7 @@ func New( queue: NewQueue(maxQueueLen), maxQueueLen: maxQueueLen, + option: o, } } @@ -108,14 +116,15 @@ func (l *Listener) handleNewHeader(ctx context.Context, header *types.Header) (t var logs []types.Log l.l.Debugw("Handle for new head", "hash", header.Hash) + opts := l.option + if opts.withLogs { + logs, err = getLogsByBlockHash(ctx, l.httpEVMClient, header.Hash, opts.filterContracts, opts.filterTopics) + if err != nil { + l.l.Errorw("Fail to get logs by block hash", "hash", header.Hash, "error", err) - logs, err = getLogsByBlockHash(ctx, l.httpEVMClient, header.Hash) - if err != nil { - l.l.Errorw("Fail to get logs by block hash", "hash", header.Hash, "error", err) - - return types.Block{}, err + return types.Block{}, err + } } - l.l.Debugw("Handle new head success", "hash", header.Hash) return headerToBlock(header, logs), nil @@ -129,7 +138,8 @@ func (l *Listener) getBlocks(ctx context.Context, fromBlock, toBlock uint64) ([] i := i blkNum := uint64(i) + fromBlock g.Go(func() error { - block, err := getBlockByNumber(ctx, l.httpEVMClient, new(big.Int).SetUint64(blkNum)) + block, err := getBlockByNumber(ctx, l.httpEVMClient, new(big.Int).SetUint64(blkNum), + l.option.withLogs, l.option.filterContracts, l.option.filterTopics) if err != nil { l.l.Errorw("Fail to get block by number", "number", blkNum, "error", err) @@ -286,11 +296,11 @@ func (l *Listener) Run(ctx context.Context) error { defer l.l.Info("Stop listener service") l.l.Info("Init handler") - err := l.handler.Init(ctx) - if err != nil { - l.l.Errorw("Fail to init handler", "error", err) + returnErr := l.handler.Init(ctx) + if returnErr != nil { + l.l.Errorw("Fail to init handler", "error", returnErr) - return err + return returnErr } l.setResuming(true) @@ -306,9 +316,9 @@ func (l *Listener) Run(ctx context.Context) error { // Synchronize blocks from node. blockCh := make(chan types.Block, bufLen) go func() { - err := l.syncBlocks(ctx, blockCh) - if err != nil { - l.l.Fatalw("Fail to sync blocks", "error", err) + returnErr = l.syncBlocks(ctx, blockCh) + if returnErr != nil { + l.l.Errorw("Fail to sync blocks", "error", returnErr) } close(blockCh) @@ -328,7 +338,7 @@ func (l *Listener) Run(ctx context.Context) error { for b := range blockCh { l.l.Debugw("Receive new block", "hash", b.Hash, "parent", b.ParentHash, "numLogs", len(b.Logs)) - err = l.handler.Handle(ctx, b) + err := l.handler.Handle(ctx, b) if err != nil { l.l.Errorw("Fail to handle new block", "hash", b.Hash, "error", err) @@ -340,7 +350,7 @@ func (l *Listener) Run(ctx context.Context) error { l.mu.Unlock() } - return nil + return returnErr } func (l *Listener) startMetricsCollector(_ context.Context) error { diff --git a/pkg/listener/utils.go b/pkg/listener/utils.go index 108bfce..53d6e59 100644 --- a/pkg/listener/utils.go +++ b/pkg/listener/utils.go @@ -18,11 +18,15 @@ const ( ) // getLogsByBlockHash returns logs by block hash, retry up to 3 times. -func getLogsByBlockHash( - ctx context.Context, evmClient evmclient.IClient, hash string, +func getLogsByBlockHash(ctx context.Context, evmClient evmclient.IClient, hash string, + contracts []string, topics [][]string, ) (logs []types.Log, err error) { for i := 0; i < 3; i++ { - logs, err = evmClient.FilterLogs(ctx, evmclient.FilterQuery{BlockHash: &hash}) + logs, err = evmClient.FilterLogs(ctx, evmclient.FilterQuery{ + BlockHash: &hash, + Addresses: contracts, + Topics: topics, + }) if err == nil { if len(logs) == 0 { continue @@ -41,11 +45,11 @@ func getLogsByBlockHash( return logs, err } -func getBlocks( - ctx context.Context, evmClient evmclient.IClient, fromBlock uint64, toBlock uint64, +func GetBlocks(ctx context.Context, evmClient evmclient.IClient, fromBlock uint64, toBlock uint64, + withLogs bool, contracts []string, topics [][]string, ) ([]types.Block, error) { // Get latest block by number. - b, err := getBlockByNumber(ctx, evmClient, new(big.Int).SetUint64(toBlock)) + b, err := getBlockByNumber(ctx, evmClient, new(big.Int).SetUint64(toBlock), withLogs, contracts, topics) if err != nil { return nil, err } @@ -57,7 +61,7 @@ func getBlocks( hash := b.ParentHash for i := n - 2; i >= 0; i-- { //nolint:gomnd - b, err = getBlockByHash(ctx, evmClient, hash) + b, err = getBlockByHash(ctx, evmClient, hash, withLogs, contracts, topics) if err != nil { return nil, err } @@ -87,17 +91,19 @@ func getHeaderByHash( return nil, err } -func getBlockByHash( - ctx context.Context, evmClient evmclient.IClient, hash string, +func getBlockByHash(ctx context.Context, evmClient evmclient.IClient, hash string, withLogs bool, + contracts []string, topics [][]string, ) (types.Block, error) { header, err := getHeaderByHash(ctx, evmClient, hash) if err != nil { return types.Block{}, err } - - logs, err := getLogsByBlockHash(ctx, evmClient, hash) - if err != nil { - return types.Block{}, err + var logs []types.Log + if withLogs { + logs, err = getLogsByBlockHash(ctx, evmClient, hash, contracts, topics) + if err != nil { + return types.Block{}, err + } } return headerToBlock(header, logs), nil @@ -122,17 +128,19 @@ func getHeaderByNumber( return nil, err } -func getBlockByNumber( - ctx context.Context, evmClient evmclient.IClient, num *big.Int, +func getBlockByNumber(ctx context.Context, evmClient evmclient.IClient, num *big.Int, + withLogs bool, contracts []string, topics [][]string, ) (types.Block, error) { header, err := getHeaderByNumber(ctx, evmClient, num) if err != nil { return types.Block{}, err } - - logs, err := getLogsByBlockHash(ctx, evmClient, header.Hash) - if err != nil { - return types.Block{}, err + var logs []types.Log + if withLogs { + logs, err = getLogsByBlockHash(ctx, evmClient, header.Hash, contracts, topics) + if err != nil { + return types.Block{}, err + } } return headerToBlock(header, logs), nil