Skip to content

Commit

Permalink
fix a monitor issue
Browse files Browse the repository at this point in the history
  • Loading branch information
forcodedancing committed Oct 11, 2023
1 parent 60af847 commit 132b453
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 3 deletions.
7 changes: 7 additions & 0 deletions monitor/bsc_block_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/bnb-chain/greenfield-data-marketplace-backend/database"
"github.com/bnb-chain/greenfield-data-marketplace-backend/metric"
"github.com/bnb-chain/greenfield-data-marketplace-backend/monitor/contracts"
"github.com/bnb-chain/greenfield-data-marketplace-backend/util"
"github.com/ethereum/go-ethereum/accounts/abi"
ethcommon "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
Expand Down Expand Up @@ -65,6 +66,7 @@ func (p *BscBlockProcessor) Process(blockHeight uint64) error {
topics := []ethcommon.Hash{ethcommon.HexToHash(eventBuyTopic), ethcommon.HexToHash(eventDelistTopic)}
logs, err := p.client.QueryChainLogs(blockHeight, blockHeight, topics, p.marketplaceContract)
if err != nil {
util.Logger.Errorf("processor: %s, fail to query chain logs err: %s", p.Name(), err)
return err
}
sort.SliceStable(logs, func(i, j int) bool {
Expand All @@ -75,12 +77,14 @@ func (p *BscBlockProcessor) Process(blockHeight uint64) error {
for _, l := range logs {
sqls, err := p.handleEventBuy(blockHeight, l)
if err != nil {
util.Logger.Errorf("processor: %s, fail to handle EventBuy err: %s", p.Name(), err)
return err
}
rawSqls = append(rawSqls, sqls...)

sql, err := p.handleEventDelist(blockHeight, l)
if err != nil {
util.Logger.Errorf("processor: %s, fail to handle EventDelist err: %s", p.Name(), err)
return err
}
if sql != "" {
Expand All @@ -99,6 +103,7 @@ func (p *BscBlockProcessor) Process(blockHeight uint64) error {
return nil
})
if err != nil {
util.Logger.Errorf("processor: %s, fail to update database err: %s", p.Name(), err)
return err
}

Expand All @@ -109,6 +114,7 @@ func (p *BscBlockProcessor) Process(blockHeight uint64) error {
func (p *BscBlockProcessor) handleEventBuy(blockHeight uint64, l types.Log) ([]string, error) {
event, err := parseBuyEvent(p.marketplaceAbi, l)
if err != nil {
util.Logger.Errorf("processor: %s, fail to parse BuyEvent err: %s", p.Name(), err)
return nil, err
}
if event == nil {
Expand Down Expand Up @@ -153,6 +159,7 @@ func (p *BscBlockProcessor) handleEventBuy(blockHeight uint64, l types.Log) ([]s
func (p *BscBlockProcessor) handleEventDelist(blockHeight uint64, l types.Log) (string, error) {
event, err := parseDelistEvent(p.marketplaceAbi, l)
if err != nil {
util.Logger.Errorf("processor: %s, fail to parse BuyEvent err: %s", p.Name(), err)
return "", err
}
if event == nil {
Expand Down
20 changes: 20 additions & 0 deletions monitor/gnfd_block_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/bnb-chain/greenfield-data-marketplace-backend/dao"
"github.com/bnb-chain/greenfield-data-marketplace-backend/database"
"github.com/bnb-chain/greenfield-data-marketplace-backend/metric"
"github.com/bnb-chain/greenfield-data-marketplace-backend/util"
"github.com/bnb-chain/greenfield/types/resource"
"github.com/bnb-chain/greenfield/x/permission/types"
abciTypes "github.com/cometbft/cometbft/abci/types"
Expand Down Expand Up @@ -55,6 +56,7 @@ func (p *GnfdBlockProcessor) GetBlockchainBlockHeight() (uint64, error) {
func (p *GnfdBlockProcessor) Process(blockHeight uint64) error {
results, err := p.client.GetBlockResults(int64(blockHeight))
if err != nil {
util.Logger.Errorf("processor: %s, fail to block results err: %s", p.Name(), err)
return err
}

Expand All @@ -69,6 +71,7 @@ func (p *GnfdBlockProcessor) Process(blockHeight uint64) error {
case "greenfield.storage.EventCreateGroup":
rawSql, err = p.handleEventCreateGroup(blockHeight, event)
if err != nil {
util.Logger.Errorf("processor: %s, fail to handle EventCreateGroup err: %s", p.Name(), err)
return err
}
if rawSql != "" {
Expand All @@ -77,6 +80,7 @@ func (p *GnfdBlockProcessor) Process(blockHeight uint64) error {
case "greenfield.storage.EventDeleteGroup":
rawSql, err = p.handleEventDeleteGroup(blockHeight, event)
if err != nil {
util.Logger.Errorf("processor: %s, fail to handle EventDeleteGroup err: %s", p.Name(), err)
return err
}
if rawSql != "" {
Expand All @@ -85,6 +89,7 @@ func (p *GnfdBlockProcessor) Process(blockHeight uint64) error {
case "greenfield.storage.EventUpdateGroupExtra":
rawSql, err = p.handleEventUpdateGroupExtra(blockHeight, event)
if err != nil {
util.Logger.Errorf("processor: %s, fail to handle EventUpdateGroupExtra err: %s", p.Name(), err)
return err
}
if rawSql != "" {
Expand All @@ -93,6 +98,7 @@ func (p *GnfdBlockProcessor) Process(blockHeight uint64) error {
case "greenfield.permission.EventPutPolicy":
rawSql, err = p.handleEventPutPolicy(blockHeight, event)
if err != nil {
util.Logger.Errorf("processor: %s, fail to handle EventPutPolicy err: %s", p.Name(), err)
return err
}
if rawSql != "" {
Expand Down Expand Up @@ -123,6 +129,7 @@ func (p *GnfdBlockProcessor) Process(blockHeight uint64) error {
return nil
})
if err != nil {
util.Logger.Errorf("processor: %s, fail to update database err: %s", p.Name(), err)
return err
}

Expand All @@ -134,6 +141,7 @@ func (p *GnfdBlockProcessor) handleEventCreateGroup(blockHeight uint64, event ab
rawSql := ""
createGroup, err := parseEventCreateGroup(event)
if err != nil {
util.Logger.Errorf("processor: %s, fail to parse EventCreateGroup err: %s", p.Name(), err)
return rawSql, err
}

Expand Down Expand Up @@ -188,6 +196,7 @@ func (p *GnfdBlockProcessor) handleEventDeleteGroup(blockHeight uint64, event ab
rawSql := ""
deleteGroup, err := parseEventDeleteGroup(event)
if err != nil {
util.Logger.Errorf("processor: %s, fail to parse EventDeleteGroup err: %s", p.Name(), err)
return rawSql, err
}

Expand All @@ -199,9 +208,19 @@ func (p *GnfdBlockProcessor) handleEventUpdateGroupExtra(blockHeight uint64, eve
rawSql := ""
updateGroupExtra, err := parseEventUpdateGroupExtra(event)
if err != nil {
util.Logger.Errorf("processor: %s, fail to parse EventUpdateGroupExtra err: %s", p.Name(), err)
return rawSql, err
}

_, err = p.itemDao.GetByGroupId(context.Background(), int64(updateGroupExtra.GroupId.Uint64()))
if err != nil && err != gorm.ErrRecordNotFound {
return rawSql, err
}

if err != nil && err == gorm.ErrRecordNotFound { // the group we do not care about
return rawSql, nil
}

extra, err := parseExtra(updateGroupExtra.Extra)
if err != nil {
return rawSql, err
Expand All @@ -215,6 +234,7 @@ func (p *GnfdBlockProcessor) handleEventPutPolicy(blockHeight uint64, event abci
rawSql := ""
putPolicy, err := parseEventPutPolicy(event)
if err != nil {
util.Logger.Errorf("processor: %s, fail to parse EventPutPolicy err: %s", p.Name(), err)
return rawSql, err
}

Expand Down
6 changes: 3 additions & 3 deletions monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func (m *Monitor) run() error {
}

dbHeight, err := m.processor.GetDatabaseBlockHeight()
util.Logger.Infof("processor: %s, current database height: %d", m.processor.Name(), blockchainHeight)
util.Logger.Infof("processor: %s, current database height: %d", m.processor.Name(), dbHeight)
if err != nil && err != gorm.ErrRecordNotFound {
return err
}
Expand All @@ -42,13 +42,13 @@ func (m *Monitor) run() error {
}

for dbHeight < blockchainHeight {
util.Logger.Infof("processor: %s, processing height: %d", m.processor.Name(), blockchainHeight)
util.Logger.Infof("processor: %s, processing height: %d", m.processor.Name(), dbHeight+1)
err = m.processor.Process(dbHeight + 1)
if err != nil {
return err
}
dbHeight++
time.Sleep(20 * time.Millisecond)
time.Sleep(50 * time.Millisecond)
}

return nil
Expand Down

0 comments on commit 132b453

Please sign in to comment.