From fe01dc2f64dcd6c8adb9a221940a899695340dc2 Mon Sep 17 00:00:00 2001 From: cdamian <17934949+cdamian@users.noreply.github.com> Date: Wed, 5 Jul 2023 23:00:40 +0300 Subject: [PATCH 1/2] listener: Update event parsing --- chains/substrate/chain.go | 11 +- chains/substrate/connection.go | 2 +- chains/substrate/events.go | 275 ++++++++++++++++++++++++++++----- chains/substrate/listener.go | 128 ++++++++------- go.mod | 1 + go.sum | 1 + 6 files changed, 319 insertions(+), 99 deletions(-) diff --git a/chains/substrate/chain.go b/chains/substrate/chain.go index 1b570e60e..f72c561d6 100644 --- a/chains/substrate/chain.go +++ b/chains/substrate/chain.go @@ -24,6 +24,7 @@ As the writer receives messages from the router, it constructs proposals. If a p package substrate import ( + "fmt" "github.com/ChainSafe/log15" "github.com/centrifuge/chainbridge-utils/blockstore" "github.com/centrifuge/chainbridge-utils/core" @@ -31,6 +32,8 @@ import ( "github.com/centrifuge/chainbridge-utils/keystore" metrics "github.com/centrifuge/chainbridge-utils/metrics/types" "github.com/centrifuge/chainbridge-utils/msg" + "github.com/centrifuge/go-substrate-rpc-client/v4/registry/retriever" + "github.com/centrifuge/go-substrate-rpc-client/v4/registry/state" ) var _ core.Chain = &Chain{} @@ -102,8 +105,14 @@ func InitializeChain(cfg *core.ChainConfig, logger log15.Logger, sysErr chan<- e ue := parseUseExtended(cfg) + eventRetriever, err := retriever.NewDefaultEventRetriever(state.NewEventProvider(conn.api.RPC.State), conn.api.RPC.State) + + if err != nil { + return nil, fmt.Errorf("event retriever creation: %w", err) + } + // Setup listener & writer - l := NewListener(conn, cfg.Name, cfg.Id, startBlock, logger, bs, stop, sysErr, m) + l := NewListener(conn, cfg.Name, cfg.Id, startBlock, logger, bs, stop, sysErr, m, eventRetriever) w := NewWriter(conn, logger, sysErr, m, ue) return &Chain{ cfg: cfg, diff --git a/chains/substrate/connection.go b/chains/substrate/connection.go index d37b18b0b..e70d48172 100644 --- a/chains/substrate/connection.go +++ b/chains/substrate/connection.go @@ -43,7 +43,7 @@ func (c *Connection) getMetadata() (meta types.Metadata) { return meta } -func (c *Connection) updateMetatdata() error { +func (c *Connection) updateMetadata() error { c.metaLock.Lock() meta, err := c.api.RPC.State.GetMetadataLatest() if err != nil { diff --git a/chains/substrate/events.go b/chains/substrate/events.go index 62216114d..18839d9d0 100644 --- a/chains/substrate/events.go +++ b/chains/substrate/events.go @@ -4,20 +4,22 @@ package substrate import ( + "errors" "fmt" "math/big" "github.com/ChainSafe/log15" - events "github.com/centrifuge/chainbridge-substrate-events" "github.com/centrifuge/chainbridge-utils/msg" + "github.com/centrifuge/go-substrate-rpc-client/v4/types" + "github.com/centrifuge/go-substrate-rpc-client/v4/types/codec" ) type eventName string -type eventHandler func(interface{}, log15.Logger) (msg.Message, error) +type eventHandler func(map[string]any, log15.Logger) (msg.Message, error) -const FungibleTransfer eventName = "FungibleTransfer" -const NonFungibleTransfer eventName = "NonFungibleTransfer" -const GenericTransfer eventName = "GenericTransfer" +const FungibleTransfer eventName = "ChainBridge.FungibleTransfer" +const NonFungibleTransfer eventName = "ChainBridge.NonFungibleTransfer" +const GenericTransfer eventName = "ChainBridge.GenericTransfer" var Subscriptions = []struct { name eventName @@ -28,57 +30,250 @@ var Subscriptions = []struct { {GenericTransfer, genericTransferHandler}, } -func fungibleTransferHandler(evtI interface{}, log log15.Logger) (msg.Message, error) { - evt, ok := evtI.(events.EventFungibleTransfer) - if !ok { - return msg.Message{}, fmt.Errorf("failed to cast EventFungibleTransfer type") +func fungibleTransferHandler(eventFields map[string]any, log log15.Logger) (msg.Message, error) { + chainID, err := getFieldValueAsType[types.U8]("ChainId", eventFields) + if err != nil { + return msg.Message{}, err } - resourceId := msg.ResourceId(evt.ResourceId) - log.Info("Got fungible transfer event!", "destination", evt.Destination, "resourceId", resourceId.Hex(), "amount", evt.Amount) + depositNonce, err := getFieldValueAsType[types.U64]("DepositNonce", eventFields) + if err != nil { + return msg.Message{}, err + } + + resID, err := getFieldValueAsSliceOfType[types.U8]("ResourceId", eventFields) + if err != nil { + return msg.Message{}, err + } + + resourceID, err := to32Bytes(resID) + if err != nil { + return msg.Message{}, err + } + + amount, err := getAmount(eventFields) + if err != nil { + return msg.Message{}, err + } + + recipient, err := getFieldValueAsByteSlice("Vec", eventFields) + if err != nil { + return msg.Message{}, err + } + + log.Info("Got fungible transfer event!", "destination", recipient, "resourceId", fmt.Sprintf("%x", resourceID), "amount", amount) return msg.NewFungibleTransfer( 0, // Unset - msg.ChainId(evt.Destination), - msg.Nonce(evt.DepositNonce), - evt.Amount.Int, - resourceId, - evt.Recipient, + msg.ChainId(chainID), + msg.Nonce(depositNonce), + amount.Int, + resourceID, + recipient, ), nil } -func nonFungibleTransferHandler(evtI interface{}, log log15.Logger) (msg.Message, error) { - evt, ok := evtI.(events.EventNonFungibleTransfer) - if !ok { - return msg.Message{}, fmt.Errorf("failed to cast EventNonFungibleTransfer type") +func nonFungibleTransferHandler(_ map[string]any, log log15.Logger) (msg.Message, error) { + log.Warn("Got non-fungible transfer event!") + + return msg.Message{}, errors.New("non-fungible transfer not supported") +} + +func genericTransferHandler(eventFields map[string]any, log log15.Logger) (msg.Message, error) { + chainID, err := getFieldValueAsType[types.U8]("ChainId", eventFields) + if err != nil { + return msg.Message{}, err } - log.Info("Got non-fungible transfer event!", "destination", evt.Destination, "resourceId", evt.ResourceId) + depositNonce, err := getFieldValueAsType[types.U64]("DepositNonce", eventFields) + if err != nil { + return msg.Message{}, err + } - return msg.NewNonFungibleTransfer( - 0, // Unset - msg.ChainId(evt.Destination), - msg.Nonce(evt.DepositNonce), - msg.ResourceId(evt.ResourceId), - big.NewInt(0).SetBytes(evt.TokenId[:]), - evt.Recipient, - evt.Metadata, - ), nil -} + resID, err := getFieldValueAsSliceOfType[types.U8]("ResourceId", eventFields) + if err != nil { + return msg.Message{}, err + } + + resourceID, err := to32Bytes(resID) + if err != nil { + return msg.Message{}, err + } -func genericTransferHandler(evtI interface{}, log log15.Logger) (msg.Message, error) { - evt, ok := evtI.(events.EventGenericTransfer) - if !ok { - return msg.Message{}, fmt.Errorf("failed to cast EventGenericTransfer type") + metadata, err := getFieldValueAsByteSlice("Vec", eventFields) + if err != nil { + return msg.Message{}, err } - log.Info("Got generic transfer event!", "destination", evt.Destination, "resourceId", evt.ResourceId) + log.Info("Got generic transfer event!", "destination", chainID, "resourceId", fmt.Sprintf("%x", resourceID)) return msg.NewGenericTransfer( 0, // Unset - msg.ChainId(evt.Destination), - msg.Nonce(evt.DepositNonce), - msg.ResourceId(evt.ResourceId), - evt.Metadata, + msg.ChainId(chainID), + msg.Nonce(depositNonce), + resourceID, + metadata, ), nil } + +func to32Bytes(array []types.U8) ([32]byte, error) { + var res [32]byte + + if len(array) != 32 { + return res, errors.New("array length mismatch") + } + + for i, item := range array { + res[i] = byte(item) + } + + return res, nil +} + +func getFieldValueAsType[T any](fieldName string, eventFields map[string]any) (T, error) { + var t T + + for name, value := range eventFields { + if name == fieldName { + if v, ok := value.(T); ok { + return v, nil + } + + return t, fmt.Errorf("field type mismatch, expected %T, got %T", t, value) + } + } + + return t, fmt.Errorf("field with name '%s' not found", fieldName) +} + +func getFieldValueAsSliceOfType[T any](fieldName string, eventFields map[string]any) ([]T, error) { + for name, value := range eventFields { + if name == fieldName { + value, ok := value.([]any) + + if !ok { + return nil, errors.New("field value not an array") + } + + res, err := convertSliceToType[T](value) + + if err != nil { + return nil, err + } + + return res, nil + } + } + + return nil, fmt.Errorf("field with name '%s' not found", fieldName) +} + +func getFieldValueAsByteSlice(fieldName string, eventFields map[string]any) ([]byte, error) { + for name, value := range eventFields { + if name == fieldName { + value, ok := value.([]any) + + if !ok { + return nil, errors.New("field value not an array") + } + + slice, err := convertSliceToType[types.U8](value) + + if err != nil { + return nil, err + } + + res, err := convertToByteSlice(slice) + + if err != nil { + return nil, err + } + + return res, nil + } + } + + return nil, fmt.Errorf("field with name '%s' not found", fieldName) +} + +func convertSliceToType[T any](array []any) ([]T, error) { + res := make([]T, 0) + + for _, item := range array { + if v, ok := item.(T); ok { + res = append(res, v) + continue + } + + var t T + + return nil, fmt.Errorf("couldn't cast '%T' to '%T'", item, t) + } + + return res, nil +} + +func convertToByteSlice(array []types.U8) ([]byte, error) { + res := make([]byte, 0) + + for _, item := range array { + res = append(res, byte(item)) + } + + return res, nil +} + +func getAmount(eventFields map[string]any) (types.U256, error) { + for fieldName, fieldValue := range eventFields { + if fieldName != "primitive_types.U256.U256" { + continue + } + + innerField, ok := fieldValue.(map[string]any) + if !ok { + return types.NewU256(*big.NewInt(0)), errors.New("unexpected amount field structure") + } + + innerFieldVal, ok := innerField["[u64; 4]"] + if !ok { + return types.NewU256(*big.NewInt(0)), errors.New("amount field key not found") + } + + slice, ok := innerFieldVal.([]any) + if !ok { + return types.NewU256(*big.NewInt(0)), errors.New("inner field value not a slice") + } + + val, err := convertSliceToType[types.U64](slice) + + if err != nil { + return types.NewU256(*big.NewInt(0)), err + } + + if len(val) != 4 { + return types.NewU256(*big.NewInt(0)), errors.New("slice length mismatch") + } + + var r [4]types.U64 + + for i, item := range val { + r[i] = item + } + + encVal, err := codec.Encode(r) + + if err != nil { + return types.NewU256(*big.NewInt(0)), errors.New("couldn't encode amount val") + } + + var res types.U256 + + if err := codec.Decode(encVal, &res); err != nil { + return types.NewU256(*big.NewInt(0)), errors.New("couldn't decode amount") + } + + return res, nil + } + + return types.NewU256(*big.NewInt(0)), errors.New("amount field not found") +} diff --git a/chains/substrate/listener.go b/chains/substrate/listener.go index 58b4ba367..4c60a395f 100644 --- a/chains/substrate/listener.go +++ b/chains/substrate/listener.go @@ -6,50 +6,64 @@ package substrate import ( "errors" "fmt" + "github.com/centrifuge/go-substrate-rpc-client/v4/registry/parser" "math/big" "time" "github.com/ChainSafe/ChainBridge/chains" - utils "github.com/ChainSafe/ChainBridge/shared/substrate" "github.com/ChainSafe/log15" "github.com/centrifuge/chainbridge-utils/blockstore" metrics "github.com/centrifuge/chainbridge-utils/metrics/types" "github.com/centrifuge/chainbridge-utils/msg" + "github.com/centrifuge/go-substrate-rpc-client/v4/registry/retriever" "github.com/centrifuge/go-substrate-rpc-client/v4/types" ) type listener struct { - name string - chainId msg.ChainId - startBlock uint64 - blockstore blockstore.Blockstorer - conn *Connection - subscriptions map[eventName]eventHandler // Handlers for specific events - router chains.Router - log log15.Logger - stop <-chan int - sysErr chan<- error - latestBlock metrics.LatestBlock - metrics *metrics.ChainMetrics + name string + chainId msg.ChainId + startBlock uint64 + blockstore blockstore.Blockstorer + conn *Connection + subscriptions map[eventName]eventHandler // Handlers for specific events + router chains.Router + log log15.Logger + stop <-chan int + sysErr chan<- error + latestBlock metrics.LatestBlock + metrics *metrics.ChainMetrics + eventRetriever retriever.EventRetriever } // Frequency of polling for a new block var BlockRetryInterval = time.Second * 5 var BlockRetryLimit = 5 -func NewListener(conn *Connection, name string, id msg.ChainId, startBlock uint64, log log15.Logger, bs blockstore.Blockstorer, stop <-chan int, sysErr chan<- error, m *metrics.ChainMetrics) *listener { +func NewListener( + conn *Connection, + name string, + id msg.ChainId, + startBlock uint64, + log log15.Logger, + bs blockstore.Blockstorer, + stop <-chan int, + sysErr chan<- error, + m *metrics.ChainMetrics, + eventRetriever retriever.EventRetriever, +) *listener { return &listener{ - name: name, - chainId: id, - startBlock: startBlock, - blockstore: bs, - conn: conn, - subscriptions: make(map[eventName]eventHandler), - log: log, - stop: stop, - sysErr: sysErr, - latestBlock: metrics.LatestBlock{LastUpdated: time.Now()}, - metrics: m, + name: name, + chainId: id, + startBlock: startBlock, + blockstore: bs, + conn: conn, + subscriptions: make(map[eventName]eventHandler), + log: log, + stop: stop, + sysErr: sysErr, + latestBlock: metrics.LatestBlock{LastUpdated: time.Now()}, + metrics: m, + eventRetriever: eventRetriever, } } @@ -186,58 +200,58 @@ func (l *listener) pollBlocks() error { // processEvents fetches a block and parses out the events, calling Listener.handleEvents() func (l *listener) processEvents(hash types.Hash) error { l.log.Trace("Fetching events for block", "hash", hash.Hex()) - meta := l.conn.getMetadata() - key, err := types.CreateStorageKey(&meta, "System", "Events", nil, nil) - if err != nil { - return err - } - var records types.EventRecordsRaw - _, err = l.conn.api.RPC.State.GetStorage(key, &records, hash) - if err != nil { - return err - } + events, err := l.eventRetriever.GetEvents(hash) - e := utils.Events{} - err = records.DecodeEventRecords(&meta, &e) if err != nil { - return err + return fmt.Errorf("event retrieving error: %w", err) } - l.handleEvents(e) + l.handleEvents(events) l.log.Trace("Finished processing events", "block", hash.Hex()) return nil } // handleEvents calls the associated handler for all registered event types -func (l *listener) handleEvents(evts utils.Events) { - if l.subscriptions[FungibleTransfer] != nil { - for _, evt := range evts.ChainBridge_FungibleTransfer { +func (l *listener) handleEvents(events []*parser.Event) { + for _, event := range events { + if l.subscriptions[FungibleTransfer] != nil { + if event.Name != string(FungibleTransfer) { + continue + } + l.log.Trace("Handling FungibleTransfer event") - l.submitMessage(l.subscriptions[FungibleTransfer](evt, l.log)) + l.submitMessage(l.subscriptions[FungibleTransfer](event.Fields, l.log)) } - } - if l.subscriptions[NonFungibleTransfer] != nil { - for _, evt := range evts.ChainBridge_NonFungibleTransfer { + + if l.subscriptions[NonFungibleTransfer] != nil { + if event.Name != string(NonFungibleTransfer) { + continue + } + l.log.Trace("Handling NonFungibleTransfer event") - l.submitMessage(l.subscriptions[NonFungibleTransfer](evt, l.log)) + l.submitMessage(l.subscriptions[NonFungibleTransfer](event.Fields, l.log)) } - } - if l.subscriptions[GenericTransfer] != nil { - for _, evt := range evts.ChainBridge_GenericTransfer { + + if l.subscriptions[GenericTransfer] != nil { + if event.Name != string(GenericTransfer) { + continue + } + l.log.Trace("Handling GenericTransfer event") - l.submitMessage(l.subscriptions[GenericTransfer](evt, l.log)) + l.submitMessage(l.subscriptions[GenericTransfer](event.Fields, l.log)) } - } - if len(evts.System_CodeUpdated) > 0 { - l.log.Trace("Received CodeUpdated event") - err := l.conn.updateMetatdata() - if err != nil { - l.log.Error("Unable to update Metadata", "error", err) + if event.Name == "ParachainSystem.ValidationFunctionApplied" { + l.log.Trace("Received ValidationFunctionApplied event") + + if err := l.conn.updateMetadata(); err != nil { + l.log.Error("Unable to update Metadata", "error", err) + } } } + } // submitMessage inserts the chainId into the msg and sends it to the router diff --git a/go.mod b/go.mod index 34fffdc0e..49e01e5b5 100644 --- a/go.mod +++ b/go.mod @@ -52,6 +52,7 @@ require ( github.com/rs/cors v1.8.2 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible // indirect + github.com/stretchr/objx v0.1.1 // indirect github.com/tklauser/go-sysconf v0.3.5 // indirect github.com/tklauser/numcpus v0.2.2 // indirect github.com/vedhavyas/go-subkey v1.0.4 // indirect diff --git a/go.sum b/go.sum index e21462651..29e409900 100644 --- a/go.sum +++ b/go.sum @@ -154,6 +154,7 @@ github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPx github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/status-im/keycard-go v0.0.0-20190316090335-8537d3370df4 h1:Gb2Tyox57NRNuZ2d3rmvB3pcmbu7O1RS3m8WRx7ilrg= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.1.1 h1:2vfRuCMp5sSVIDSqO8oNnWJq7mPa6KVP3iPIwFBuy8A= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= From 85ed235d62487e75f67e5c7b9a86268ee5ad976a Mon Sep 17 00:00:00 2001 From: cdamian <17934949+cdamian@users.noreply.github.com> Date: Thu, 13 Jul 2023 12:34:59 +0300 Subject: [PATCH 2/2] events: Rename amount retrieving func --- chains/substrate/events.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/chains/substrate/events.go b/chains/substrate/events.go index 18839d9d0..49875b45d 100644 --- a/chains/substrate/events.go +++ b/chains/substrate/events.go @@ -51,7 +51,7 @@ func fungibleTransferHandler(eventFields map[string]any, log log15.Logger) (msg. return msg.Message{}, err } - amount, err := getAmount(eventFields) + amount, err := getU256(eventFields) if err != nil { return msg.Message{}, err } @@ -223,7 +223,7 @@ func convertToByteSlice(array []types.U8) ([]byte, error) { return res, nil } -func getAmount(eventFields map[string]any) (types.U256, error) { +func getU256(eventFields map[string]any) (types.U256, error) { for fieldName, fieldValue := range eventFields { if fieldName != "primitive_types.U256.U256" { continue