Skip to content

Commit

Permalink
allow option filter logs
Browse files Browse the repository at this point in the history
  • Loading branch information
secmask committed Jan 8, 2024
1 parent e6dd809 commit 7c3c676
Show file tree
Hide file tree
Showing 6 changed files with 88 additions and 47 deletions.
4 changes: 2 additions & 2 deletions internal/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,9 @@ func NewListener(c *cli.Context) (*listener.Listener, error) {
redisStream := redis.NewStream(redisClient, maxLen)

topic := c.String(publisherTopicFlag.Name)
handler := listener.NewHandler(l, topic, httpEVMClient, blockKeeper, redisStream)
handler := listener.NewHandler(l, topic, httpEVMClient, blockKeeper, redisStream, listener.WithEventLogs(nil, nil))

return listener.New(l, wsEVMClient, httpEVMClient, handler, sanityEVMClient, sanityCheckInterval), nil
return listener.New(l, wsEVMClient, httpEVMClient, handler, sanityEVMClient, sanityCheckInterval, listener.WithEventLogs(nil, nil)), nil
}

const (
Expand Down
17 changes: 17 additions & 0 deletions pkg/listener/filter_option.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package listener

type Option func(opt *FilterOption)

type FilterOption struct {
filterContracts []string
filterTopics [][]string
withLogs bool
}

func WithEventLogs(contracts []string, topics [][]string) Option {
return func(opt *FilterOption) {
opt.withLogs = true
opt.filterContracts = contracts
opt.filterTopics = topics
}
}
14 changes: 11 additions & 3 deletions pkg/listener/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,25 @@ type Handler struct {
blockKeeper block.Keeper
publisher pubsub.Publisher
l *zap.SugaredLogger
option *FilterOption
}

// NewHandler ...
func NewHandler(
l *zap.SugaredLogger, topic string, evmClient evmclient.IClient,
blockKeeper block.Keeper, publisher pubsub.Publisher,
blockKeeper block.Keeper, publisher pubsub.Publisher, options ...Option,
) *Handler {
var opts FilterOption
for _, v := range options {
v(&opts)
}
return &Handler{
topic: topic,
evmClient: evmClient,
blockKeeper: blockKeeper,
publisher: publisher,
l: l,
option: &opts,
}
}

Expand Down Expand Up @@ -79,7 +85,8 @@ func (h *Handler) Init(ctx context.Context) error {
fromBlock := toBlock - uint64(h.blockKeeper.Cap()) + 1

h.l.Infow("Get blocks from node", "from", fromBlock, "to", toBlock)
blocks, err := getBlocks(ctx, h.evmClient, fromBlock, toBlock)
blocks, err := GetBlocks(ctx, h.evmClient, fromBlock, toBlock, h.option.withLogs,
h.option.filterContracts, h.option.filterTopics)
if err != nil {
h.l.Errorw("Fail to get blocks", "from", fromBlock, "to", toBlock, "error", err)

Expand Down Expand Up @@ -112,7 +119,8 @@ func (h *Handler) getBlock(ctx context.Context, hash string) (types.Block, error
return types.Block{}, err
}

b, err = getBlockByHash(ctx, h.evmClient, hash)
b, err = getBlockByHash(ctx, h.evmClient, hash, h.option.withLogs, h.option.filterContracts,
h.option.filterTopics)
if err != nil {
h.l.Errorw("Fail to get block from ndoe", "hash", hash, "error", err)

Expand Down
8 changes: 4 additions & 4 deletions pkg/listener/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (ts *HandlerTestSuite) TestHandle() {
// Handle for normal block (chain was not re-organized).
ts.evmClient.Next()
hash := "0xc0c29448be86bca9d0db94b79cd1a6bd1361aed1e394d3a2a218fb98b159ab74"
b, err = getBlockByHash(context.Background(), ts.evmClient, hash)
b, err = getBlockByHash(context.Background(), ts.evmClient, hash, true, nil, nil)
ts.Require().NoError(err)

err = ts.handler.Handle(context.Background(), b)
Expand All @@ -93,7 +93,7 @@ func (ts *HandlerTestSuite) TestHandle() {
// Handle for far away block (lost connection).
ts.evmClient.SetHead(52)
hash = "0x132c1eb1799a5219b055674177ba95e946feb5f011c7c1409630d42c0581ee52"
b, err = getBlockByHash(context.Background(), ts.evmClient, hash)
b, err = getBlockByHash(context.Background(), ts.evmClient, hash, true, nil, nil)
ts.Require().NoError(err)

err = ts.handler.Handle(context.Background(), b)
Expand All @@ -113,7 +113,7 @@ func (ts *HandlerTestSuite) TestHandle() {
ts.Require().NoError(err)

hash = "0xfe5db0e13993eb721f8174edc783e92dcee70e5a2eb3cd87e8b6c7ba5ab24986"
b, err = getBlockByHash(context.Background(), ts.evmClient, hash)
b, err = getBlockByHash(context.Background(), ts.evmClient, hash, true, nil, nil)
ts.Require().NoError(err)

err = ts.handler.Handle(context.Background(), b)
Expand All @@ -140,7 +140,7 @@ func (ts *HandlerTestSuite) TestHandle() {

ts.evmClient.Next()
hash = "0x2394b0b03959156ec90096deadd34f68195a8d8f5f1e5438ea237be7675178c2"
b, err = getBlockByHash(context.Background(), ts.evmClient, hash)
b, err = getBlockByHash(context.Background(), ts.evmClient, hash, true, nil, nil)
ts.Require().NoError(err)

err = ts.handler.Handle(context.Background(), b)
Expand Down
44 changes: 27 additions & 17 deletions pkg/listener/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,20 @@ type Listener struct {

queue *Queue
maxQueueLen int

option FilterOption
}

// New ...
func New(
l *zap.SugaredLogger, wsEVMClient evmclient.IClient,
httpEVMClient evmclient.IClient, handler *Handler,
sanityEVMClient evmclient.IClient, sanityCheckInterval time.Duration,
sanityEVMClient evmclient.IClient, sanityCheckInterval time.Duration, opts ...Option,
) *Listener {
var o FilterOption
for _, v := range opts {
v(&o)
}
return &Listener{
l: l,

Expand All @@ -67,7 +73,9 @@ func New(

queue: NewQueue(maxQueueLen),
maxQueueLen: maxQueueLen,
option: o,
}

}

func (l *Listener) publishBlock(ch chan<- types.Block, seq uint64, block *types.Block) {
Expand Down Expand Up @@ -108,14 +116,15 @@ func (l *Listener) handleNewHeader(ctx context.Context, header *types.Header) (t
var logs []types.Log

l.l.Debugw("Handle for new head", "hash", header.Hash)
opts := l.option
if opts.withLogs {
logs, err = getLogsByBlockHash(ctx, l.httpEVMClient, header.Hash, opts.filterContracts, opts.filterTopics)
if err != nil {
l.l.Errorw("Fail to get logs by block hash", "hash", header.Hash, "error", err)

logs, err = getLogsByBlockHash(ctx, l.httpEVMClient, header.Hash)
if err != nil {
l.l.Errorw("Fail to get logs by block hash", "hash", header.Hash, "error", err)

return types.Block{}, err
return types.Block{}, err
}
}

l.l.Debugw("Handle new head success", "hash", header.Hash)

return headerToBlock(header, logs), nil
Expand All @@ -129,7 +138,8 @@ func (l *Listener) getBlocks(ctx context.Context, fromBlock, toBlock uint64) ([]
i := i
blkNum := uint64(i) + fromBlock
g.Go(func() error {
block, err := getBlockByNumber(ctx, l.httpEVMClient, new(big.Int).SetUint64(blkNum))
block, err := getBlockByNumber(ctx, l.httpEVMClient, new(big.Int).SetUint64(blkNum),
l.option.withLogs, l.option.filterContracts, l.option.filterTopics)
if err != nil {
l.l.Errorw("Fail to get block by number", "number", blkNum, "error", err)

Expand Down Expand Up @@ -286,11 +296,11 @@ func (l *Listener) Run(ctx context.Context) error {
defer l.l.Info("Stop listener service")

l.l.Info("Init handler")
err := l.handler.Init(ctx)
if err != nil {
l.l.Errorw("Fail to init handler", "error", err)
returnErr := l.handler.Init(ctx)
if returnErr != nil {
l.l.Errorw("Fail to init handler", "error", returnErr)

return err
return returnErr
}

l.setResuming(true)
Expand All @@ -306,9 +316,9 @@ func (l *Listener) Run(ctx context.Context) error {
// Synchronize blocks from node.
blockCh := make(chan types.Block, bufLen)
go func() {
err := l.syncBlocks(ctx, blockCh)
if err != nil {
l.l.Fatalw("Fail to sync blocks", "error", err)
returnErr = l.syncBlocks(ctx, blockCh)
if returnErr != nil {
l.l.Errorw("Fail to sync blocks", "error", returnErr)
}

close(blockCh)
Expand All @@ -328,7 +338,7 @@ func (l *Listener) Run(ctx context.Context) error {
for b := range blockCh {
l.l.Debugw("Receive new block",
"hash", b.Hash, "parent", b.ParentHash, "numLogs", len(b.Logs))
err = l.handler.Handle(ctx, b)
err := l.handler.Handle(ctx, b)
if err != nil {
l.l.Errorw("Fail to handle new block", "hash", b.Hash, "error", err)

Expand All @@ -340,7 +350,7 @@ func (l *Listener) Run(ctx context.Context) error {
l.mu.Unlock()
}

return nil
return returnErr
}

func (l *Listener) startMetricsCollector(_ context.Context) error {
Expand Down
48 changes: 27 additions & 21 deletions pkg/listener/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,15 @@ const (
)

// getLogsByBlockHash returns logs by block hash, retry up to 3 times.
func getLogsByBlockHash(
ctx context.Context, evmClient evmclient.IClient, hash string,
func getLogsByBlockHash(ctx context.Context, evmClient evmclient.IClient, hash string,
contracts []string, topics [][]string,
) (logs []types.Log, err error) {
for i := 0; i < 3; i++ {
logs, err = evmClient.FilterLogs(ctx, evmclient.FilterQuery{BlockHash: &hash})
logs, err = evmClient.FilterLogs(ctx, evmclient.FilterQuery{
BlockHash: &hash,
Addresses: contracts,
Topics: topics,
})
if err == nil {
if len(logs) == 0 {
continue
Expand All @@ -41,11 +45,11 @@ func getLogsByBlockHash(
return logs, err
}

func getBlocks(
ctx context.Context, evmClient evmclient.IClient, fromBlock uint64, toBlock uint64,
func GetBlocks(ctx context.Context, evmClient evmclient.IClient, fromBlock uint64, toBlock uint64,
withLogs bool, contracts []string, topics [][]string,
) ([]types.Block, error) {
// Get latest block by number.
b, err := getBlockByNumber(ctx, evmClient, new(big.Int).SetUint64(toBlock))
b, err := getBlockByNumber(ctx, evmClient, new(big.Int).SetUint64(toBlock), withLogs, contracts, topics)
if err != nil {
return nil, err
}
Expand All @@ -57,7 +61,7 @@ func getBlocks(

hash := b.ParentHash
for i := n - 2; i >= 0; i-- { //nolint:gomnd
b, err = getBlockByHash(ctx, evmClient, hash)
b, err = getBlockByHash(ctx, evmClient, hash, withLogs, contracts, topics)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -87,19 +91,20 @@ func getHeaderByHash(
return nil, err
}

func getBlockByHash(
ctx context.Context, evmClient evmclient.IClient, hash string,
func getBlockByHash(ctx context.Context, evmClient evmclient.IClient, hash string, withLogs bool,
contracts []string, topics [][]string,
) (types.Block, error) {
header, err := getHeaderByHash(ctx, evmClient, hash)
if err != nil {
return types.Block{}, err
}

logs, err := getLogsByBlockHash(ctx, evmClient, hash)
if err != nil {
return types.Block{}, err
var logs []types.Log
if withLogs {
logs, err = getLogsByBlockHash(ctx, evmClient, hash, contracts, topics)
if err != nil {
return types.Block{}, err
}
}

return headerToBlock(header, logs), nil
}

Expand All @@ -122,19 +127,20 @@ func getHeaderByNumber(
return nil, err
}

func getBlockByNumber(
ctx context.Context, evmClient evmclient.IClient, num *big.Int,
func getBlockByNumber(ctx context.Context, evmClient evmclient.IClient, num *big.Int,
withLogs bool, contracts []string, topics [][]string,
) (types.Block, error) {
header, err := getHeaderByNumber(ctx, evmClient, num)
if err != nil {
return types.Block{}, err
}

logs, err := getLogsByBlockHash(ctx, evmClient, header.Hash)
if err != nil {
return types.Block{}, err
var logs []types.Log
if withLogs {
logs, err = getLogsByBlockHash(ctx, evmClient, header.Hash, contracts, topics)
if err != nil {
return types.Block{}, err
}
}

return headerToBlock(header, logs), nil
}

Expand Down

0 comments on commit 7c3c676

Please sign in to comment.