Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

allow option filter logs #94

Merged
merged 2 commits into from
Jan 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ jobs:
- name: Install Go
uses: actions/setup-go@v3
with:
go-version: "1.21.x"
go-version: "1.21.4"
- name: Checkout
uses: actions/checkout@v3
- name: golangci-lint
Expand All @@ -103,7 +103,7 @@ jobs:
- name: Install Go
uses: actions/setup-go@v3
with:
go-version: "1.21.x"
go-version: "1.21.4"
- name: Run test
run: go test -race -v ./...

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ module github.com/KyberNetwork/evmlistener

go 1.21.0

toolchain go1.21.1
toolchain go1.21.4

require (
github.com/KyberNetwork/kyber-trace-go v0.1.1
Expand Down
6 changes: 4 additions & 2 deletions internal/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,11 @@ func NewListener(c *cli.Context) (*listener.Listener, error) {
redisStream := redis.NewStream(redisClient, maxLen)

topic := c.String(publisherTopicFlag.Name)
handler := listener.NewHandler(l, topic, httpEVMClient, blockKeeper, redisStream)
handler := listener.NewHandler(l, topic, httpEVMClient, blockKeeper, redisStream,
listener.WithEventLogs(nil, nil))

return listener.New(l, wsEVMClient, httpEVMClient, handler, sanityEVMClient, sanityCheckInterval), nil
return listener.New(l, wsEVMClient, httpEVMClient, handler, sanityEVMClient, sanityCheckInterval,
listener.WithEventLogs(nil, nil)), nil
}

const (
Expand Down
8 changes: 4 additions & 4 deletions pkg/listener/evm_client_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,11 @@ func (c *EVMClientMock) Next() {
defer c.mu.Unlock()

c.head++

currentHead := c.head
// Publish new head for subscriptions.
for _, ch := range c.subHeadChs {
go func(ch chan<- *types.Header) {
header := c.headerMap[c.sequence[c.head]]
header := c.headerMap[c.sequence[currentHead]]
ch <- &types.Header{
Hash: common.ToHex(header.Hash()),
ParentHash: common.ToHex(header.ParentHash),
Expand Down Expand Up @@ -108,10 +108,10 @@ func (c *EVMClientMock) SubscribeNewHead(ctx context.Context, ch chan<- *types.H
defer c.mu.Unlock()

c.subHeadChs = append(c.subHeadChs, ch)

currentHead := c.sequence[c.head]
// Publish current head to the channel.
go func() {
header := c.headerMap[c.sequence[c.head]]
header := c.headerMap[currentHead]
ch <- &types.Header{
Hash: common.ToHex(header.Hash()),
ParentHash: common.ToHex(header.ParentHash),
Expand Down
17 changes: 17 additions & 0 deletions pkg/listener/filter_option.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package listener

type Option func(opt *FilterOption)

type FilterOption struct {
filterContracts []string
filterTopics [][]string
withLogs bool
}

func WithEventLogs(contracts []string, topics [][]string) Option {
return func(opt *FilterOption) {
opt.withLogs = true
opt.filterContracts = contracts
opt.filterTopics = topics
}
}
15 changes: 12 additions & 3 deletions pkg/listener/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,26 @@ type Handler struct {
blockKeeper block.Keeper
publisher pubsub.Publisher
l *zap.SugaredLogger
option *FilterOption
}

// NewHandler ...
func NewHandler(
l *zap.SugaredLogger, topic string, evmClient evmclient.IClient,
blockKeeper block.Keeper, publisher pubsub.Publisher,
blockKeeper block.Keeper, publisher pubsub.Publisher, options ...Option,
) *Handler {
var opts FilterOption
for _, v := range options {
v(&opts)
}

return &Handler{
topic: topic,
evmClient: evmClient,
blockKeeper: blockKeeper,
publisher: publisher,
l: l,
option: &opts,
}
}

Expand Down Expand Up @@ -79,7 +86,8 @@ func (h *Handler) Init(ctx context.Context) error {
fromBlock := toBlock - uint64(h.blockKeeper.Cap()) + 1

h.l.Infow("Get blocks from node", "from", fromBlock, "to", toBlock)
blocks, err := getBlocks(ctx, h.evmClient, fromBlock, toBlock)
blocks, err := GetBlocks(ctx, h.evmClient, fromBlock, toBlock, h.option.withLogs,
h.option.filterContracts, h.option.filterTopics)
if err != nil {
h.l.Errorw("Fail to get blocks", "from", fromBlock, "to", toBlock, "error", err)

Expand Down Expand Up @@ -112,7 +120,8 @@ func (h *Handler) getBlock(ctx context.Context, hash string) (types.Block, error
return types.Block{}, err
}

b, err = getBlockByHash(ctx, h.evmClient, hash)
b, err = getBlockByHash(ctx, h.evmClient, hash, h.option.withLogs, h.option.filterContracts,
h.option.filterTopics)
if err != nil {
h.l.Errorw("Fail to get block from ndoe", "hash", hash, "error", err)

Expand Down
8 changes: 4 additions & 4 deletions pkg/listener/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (ts *HandlerTestSuite) TestHandle() {
// Handle for normal block (chain was not re-organized).
ts.evmClient.Next()
hash := "0xc0c29448be86bca9d0db94b79cd1a6bd1361aed1e394d3a2a218fb98b159ab74"
b, err = getBlockByHash(context.Background(), ts.evmClient, hash)
b, err = getBlockByHash(context.Background(), ts.evmClient, hash, true, nil, nil)
ts.Require().NoError(err)

err = ts.handler.Handle(context.Background(), b)
Expand All @@ -93,7 +93,7 @@ func (ts *HandlerTestSuite) TestHandle() {
// Handle for far away block (lost connection).
ts.evmClient.SetHead(52)
hash = "0x132c1eb1799a5219b055674177ba95e946feb5f011c7c1409630d42c0581ee52"
b, err = getBlockByHash(context.Background(), ts.evmClient, hash)
b, err = getBlockByHash(context.Background(), ts.evmClient, hash, true, nil, nil)
ts.Require().NoError(err)

err = ts.handler.Handle(context.Background(), b)
Expand All @@ -113,7 +113,7 @@ func (ts *HandlerTestSuite) TestHandle() {
ts.Require().NoError(err)

hash = "0xfe5db0e13993eb721f8174edc783e92dcee70e5a2eb3cd87e8b6c7ba5ab24986"
b, err = getBlockByHash(context.Background(), ts.evmClient, hash)
b, err = getBlockByHash(context.Background(), ts.evmClient, hash, true, nil, nil)
ts.Require().NoError(err)

err = ts.handler.Handle(context.Background(), b)
Expand All @@ -140,7 +140,7 @@ func (ts *HandlerTestSuite) TestHandle() {

ts.evmClient.Next()
hash = "0x2394b0b03959156ec90096deadd34f68195a8d8f5f1e5438ea237be7675178c2"
b, err = getBlockByHash(context.Background(), ts.evmClient, hash)
b, err = getBlockByHash(context.Background(), ts.evmClient, hash, true, nil, nil)
ts.Require().NoError(err)

err = ts.handler.Handle(context.Background(), b)
Expand Down
44 changes: 27 additions & 17 deletions pkg/listener/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,21 @@ type Listener struct {

queue *Queue
maxQueueLen int

option FilterOption
}

// New ...
func New(
l *zap.SugaredLogger, wsEVMClient evmclient.IClient,
httpEVMClient evmclient.IClient, handler *Handler,
sanityEVMClient evmclient.IClient, sanityCheckInterval time.Duration,
sanityEVMClient evmclient.IClient, sanityCheckInterval time.Duration, opts ...Option,
) *Listener {
var o FilterOption
for _, v := range opts {
v(&o)
}

return &Listener{
l: l,

Expand All @@ -67,6 +74,7 @@ func New(

queue: NewQueue(maxQueueLen),
maxQueueLen: maxQueueLen,
option: o,
}
}

Expand Down Expand Up @@ -108,14 +116,15 @@ func (l *Listener) handleNewHeader(ctx context.Context, header *types.Header) (t
var logs []types.Log

l.l.Debugw("Handle for new head", "hash", header.Hash)
opts := l.option
if opts.withLogs {
logs, err = getLogsByBlockHash(ctx, l.httpEVMClient, header.Hash, opts.filterContracts, opts.filterTopics)
if err != nil {
l.l.Errorw("Fail to get logs by block hash", "hash", header.Hash, "error", err)

logs, err = getLogsByBlockHash(ctx, l.httpEVMClient, header.Hash)
if err != nil {
l.l.Errorw("Fail to get logs by block hash", "hash", header.Hash, "error", err)

return types.Block{}, err
return types.Block{}, err
}
}

l.l.Debugw("Handle new head success", "hash", header.Hash)

return headerToBlock(header, logs), nil
Expand All @@ -129,7 +138,8 @@ func (l *Listener) getBlocks(ctx context.Context, fromBlock, toBlock uint64) ([]
i := i
blkNum := uint64(i) + fromBlock
g.Go(func() error {
block, err := getBlockByNumber(ctx, l.httpEVMClient, new(big.Int).SetUint64(blkNum))
block, err := getBlockByNumber(ctx, l.httpEVMClient, new(big.Int).SetUint64(blkNum),
l.option.withLogs, l.option.filterContracts, l.option.filterTopics)
if err != nil {
l.l.Errorw("Fail to get block by number", "number", blkNum, "error", err)

Expand Down Expand Up @@ -286,11 +296,11 @@ func (l *Listener) Run(ctx context.Context) error {
defer l.l.Info("Stop listener service")

l.l.Info("Init handler")
err := l.handler.Init(ctx)
if err != nil {
l.l.Errorw("Fail to init handler", "error", err)
returnErr := l.handler.Init(ctx)
if returnErr != nil {
l.l.Errorw("Fail to init handler", "error", returnErr)

return err
return returnErr
}

l.setResuming(true)
Expand All @@ -306,9 +316,9 @@ func (l *Listener) Run(ctx context.Context) error {
// Synchronize blocks from node.
blockCh := make(chan types.Block, bufLen)
go func() {
err := l.syncBlocks(ctx, blockCh)
if err != nil {
l.l.Fatalw("Fail to sync blocks", "error", err)
returnErr = l.syncBlocks(ctx, blockCh)
if returnErr != nil {
l.l.Errorw("Fail to sync blocks", "error", returnErr)
}

close(blockCh)
Expand All @@ -328,7 +338,7 @@ func (l *Listener) Run(ctx context.Context) error {
for b := range blockCh {
l.l.Debugw("Receive new block",
"hash", b.Hash, "parent", b.ParentHash, "numLogs", len(b.Logs))
err = l.handler.Handle(ctx, b)
err := l.handler.Handle(ctx, b)
if err != nil {
l.l.Errorw("Fail to handle new block", "hash", b.Hash, "error", err)

Expand All @@ -340,7 +350,7 @@ func (l *Listener) Run(ctx context.Context) error {
l.mu.Unlock()
}

return nil
return returnErr
}

func (l *Listener) startMetricsCollector(_ context.Context) error {
Expand Down
46 changes: 27 additions & 19 deletions pkg/listener/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,15 @@ const (
)

// getLogsByBlockHash returns logs by block hash, retry up to 3 times.
func getLogsByBlockHash(
ctx context.Context, evmClient evmclient.IClient, hash string,
func getLogsByBlockHash(ctx context.Context, evmClient evmclient.IClient, hash string,
contracts []string, topics [][]string,
) (logs []types.Log, err error) {
for i := 0; i < 3; i++ {
logs, err = evmClient.FilterLogs(ctx, evmclient.FilterQuery{BlockHash: &hash})
logs, err = evmClient.FilterLogs(ctx, evmclient.FilterQuery{
BlockHash: &hash,
Addresses: contracts,
Topics: topics,
})
if err == nil {
if len(logs) == 0 {
continue
Expand All @@ -41,11 +45,11 @@ func getLogsByBlockHash(
return logs, err
}

func getBlocks(
ctx context.Context, evmClient evmclient.IClient, fromBlock uint64, toBlock uint64,
func GetBlocks(ctx context.Context, evmClient evmclient.IClient, fromBlock uint64, toBlock uint64,
withLogs bool, contracts []string, topics [][]string,
) ([]types.Block, error) {
// Get latest block by number.
b, err := getBlockByNumber(ctx, evmClient, new(big.Int).SetUint64(toBlock))
b, err := getBlockByNumber(ctx, evmClient, new(big.Int).SetUint64(toBlock), withLogs, contracts, topics)
if err != nil {
return nil, err
}
Expand All @@ -57,7 +61,7 @@ func getBlocks(

hash := b.ParentHash
for i := n - 2; i >= 0; i-- { //nolint:gomnd
b, err = getBlockByHash(ctx, evmClient, hash)
b, err = getBlockByHash(ctx, evmClient, hash, withLogs, contracts, topics)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -87,17 +91,19 @@ func getHeaderByHash(
return nil, err
}

func getBlockByHash(
ctx context.Context, evmClient evmclient.IClient, hash string,
func getBlockByHash(ctx context.Context, evmClient evmclient.IClient, hash string, withLogs bool,
contracts []string, topics [][]string,
) (types.Block, error) {
header, err := getHeaderByHash(ctx, evmClient, hash)
if err != nil {
return types.Block{}, err
}

logs, err := getLogsByBlockHash(ctx, evmClient, hash)
if err != nil {
return types.Block{}, err
var logs []types.Log
if withLogs {
logs, err = getLogsByBlockHash(ctx, evmClient, hash, contracts, topics)
if err != nil {
return types.Block{}, err
}
}

return headerToBlock(header, logs), nil
Expand All @@ -122,17 +128,19 @@ func getHeaderByNumber(
return nil, err
}

func getBlockByNumber(
ctx context.Context, evmClient evmclient.IClient, num *big.Int,
func getBlockByNumber(ctx context.Context, evmClient evmclient.IClient, num *big.Int,
withLogs bool, contracts []string, topics [][]string,
) (types.Block, error) {
header, err := getHeaderByNumber(ctx, evmClient, num)
if err != nil {
return types.Block{}, err
}

logs, err := getLogsByBlockHash(ctx, evmClient, header.Hash)
if err != nil {
return types.Block{}, err
var logs []types.Log
if withLogs {
logs, err = getLogsByBlockHash(ctx, evmClient, header.Hash, contracts, topics)
if err != nil {
return types.Block{}, err
}
}

return headerToBlock(header, logs), nil
Expand Down
Loading