Skip to content

Commit

Permalink
fetching the storage hash of spec version instead of fetching the run…
Browse files Browse the repository at this point in the history
…time version on each block, commenting out some part of block/decoder.go
  • Loading branch information
Eduard-Voiculescu committed Aug 3, 2024
1 parent df065b0 commit c455a4e
Show file tree
Hide file tree
Showing 2 changed files with 116 additions and 94 deletions.
104 changes: 54 additions & 50 deletions block/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"bytes"
"fmt"
"reflect"
"strings"

// "github.com/streamingfast/substreams-gear/generated/convert100"
// "github.com/streamingfast/substreams-gear/generated/convert1000"
Expand All @@ -23,7 +22,7 @@ import (
// "github.com/streamingfast/substreams-gear/generated/convert140"
// "github.com/streamingfast/substreams-gear/generated/convert1400"
// "github.com/streamingfast/substreams-gear/generated/convert1410"
"github.com/streamingfast/substreams-gear/generated/convert1420"
// "github.com/streamingfast/substreams-gear/generated/convert1420"
// "github.com/streamingfast/substreams-gear/generated/convert210"
// "github.com/streamingfast/substreams-gear/generated/convert310"
// "github.com/streamingfast/substreams-gear/generated/convert320"
Expand All @@ -34,9 +33,9 @@ import (
"github.com/centrifuge/go-substrate-rpc-client/v4/registry"
"github.com/centrifuge/go-substrate-rpc-client/v4/scale"
"github.com/centrifuge/go-substrate-rpc-client/v4/types"
"github.com/gobeam/stringy"
pbgear "github.com/streamingfast/firehose-gear/pb/sf/gear/type/v1"
v1 "github.com/streamingfast/substreams-gear/pb/sf/substreams/gear/type/v1"

// v1 "github.com/streamingfast/substreams-gear/pb/sf/substreams/gear/type/v1"
"go.uber.org/zap"
)

Expand All @@ -51,28 +50,33 @@ func NewDecoder(logger *zap.Logger) *Decoder {
}
}

func (d *Decoder) Decoded(block *pbgear.Block) (*v1.Block, error) {
decodedExtrinsics, err := decodeExtrinsics("", d.callRegistry, block.Extrinsics, d.logger)
if err != nil {
return nil, fmt.Errorf("failed to decode extrinsics: %w", err)
}

return &v1.Block{
Number: block.Number,
Hash: block.Hash,
Header: block.Header,
DigestItems: block.DigestItems,
Justification: block.Justification,
Extrinsics: decodedExtrinsics,
Events: nil,
}, nil
func (d *Decoder) Decoded(block *pbgear.Block) (*pbgear.Block, error) {
// func (d *Decoder) Decoded(block *pbgear.Block) (*v1.Block, error) {
// decodedExtrinsics, err := decodeExtrinsics("", d.callRegistry, block.Extrinsics, d.logger)
// if err != nil {
// return nil, fmt.Errorf("failed to decode extrinsics: %w", err)
// }

// _ = decodedExtrinsics

return nil, nil

// return &v1.Block{
// Number: block.Number,
// Hash: block.Hash,
// Header: block.Header,
// DigestItems: block.DigestItems,
// Justification: block.Justification,
// Extrinsics: decodedExtrinsics,
// Events: nil,
// }, nil
}

var versionFuncMap map[string]map[string]reflect.Value

func init() {
versionFuncMap = map[string]map[string]reflect.Value{
"1420": convert1420.FuncMap,
// "1420": convert1420.FuncMap,
// "1410": convert1410.FuncMap,
// "1400": convert1400.FuncMap,
// "1310": convert1310.FuncMap,
Expand All @@ -99,36 +103,36 @@ func init() {
}
}

func decodeExtrinsics(version string, callRegistry registry.CallRegistry, extrinsics []*pbgear.Extrinsic, logger *zap.Logger) ([]*v1.Extrinsic, error) {
var decodedExtrinsics []*v1.Extrinsic
for i, extrinsic := range extrinsics {
logger.Info("decoding extrinsic", zap.Int("index", i), zap.Uint32("section", extrinsic.Method.CallIndex.SectionIndex), zap.Uint32("method", extrinsic.Method.CallIndex.MethodIndex))
callName, decodedFields, err := decodeCallExtrinsics(callRegistry, extrinsic)
if err != nil {
return nil, fmt.Errorf("failed to decode extrinsic: %w", err)
}
_ = decodedFields

parts := strings.Split(callName, ".")
pallet := parts[0]
call := parts[1]
call = stringy.New(call).PascalCase().Get()
structName := pallet + "_" + call + "Call"
funcName := "To_" + structName
_ = funcName
// funcMap := versionFuncMap[]
// if fn, found := funcMap[funcName]; found {
// o := fn.Call([]reflect.Value{reflect.ValueOf(decodedFields)})

// e := o[0].Interface().(*v1.Extrinsic)
// decodedExtrinsics = append(decodedExtrinsics, e)

// } else {
// panic(fmt.Sprintf("unknown extrinsic call: %s", callName))
// }
}
return decodedExtrinsics, nil
}
// func decodeExtrinsics(version string, callRegistry registry.CallRegistry, extrinsics []*pbgear.Extrinsic, logger *zap.Logger) ([]*v1.Extrinsic, error) {
// var decodedExtrinsics []*v1.Extrinsic
// for i, extrinsic := range extrinsics {
// logger.Info("decoding extrinsic", zap.Int("index", i), zap.Uint32("section", extrinsic.Method.CallIndex.SectionIndex), zap.Uint32("method", extrinsic.Method.CallIndex.MethodIndex))
// callName, decodedFields, err := decodeCallExtrinsics(callRegistry, extrinsic)
// if err != nil {
// return nil, fmt.Errorf("failed to decode extrinsic: %w", err)
// }
// _ = decodedFields

// parts := strings.Split(callName, ".")
// pallet := parts[0]
// call := parts[1]
// call = stringy.New(call).PascalCase().Get()
// structName := pallet + "_" + call + "Call"
// funcName := "To_" + structName
// _ = funcName
// // funcMap := versionFuncMap[]
// // if fn, found := funcMap[funcName]; found {
// // o := fn.Call([]reflect.Value{reflect.ValueOf(decodedFields)})

// // e := o[0].Interface().(*v1.Extrinsic)
// // decodedExtrinsics = append(decodedExtrinsics, e)

// // } else {
// // panic(fmt.Sprintf("unknown extrinsic call: %s", callName))
// // }
// }
// return decodedExtrinsics, nil
// }

func decodeCallExtrinsics(callRegistry registry.CallRegistry, extrinsic *pbgear.Extrinsic) (string, registry.DecodedFields, error) {
callIndex := extrinsic.Method.CallIndex
Expand Down
106 changes: 62 additions & 44 deletions rpc/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"google.golang.org/protobuf/types/known/timestamppb"
)

var storageKeyRuntimeBlob = "3a636f6465"

type FetchedBlockData struct {
latestFinalizedHead uint64
blockHash types.Hash
Expand All @@ -34,6 +36,7 @@ type FetchedBlockData struct {
type LastBlockInfo struct {
blockNum uint64
blockHash types.Hash
specVersion uint32
specVersionHash string
}

Expand Down Expand Up @@ -94,8 +97,8 @@ func (f *Fetcher) Fetch(ctx context.Context, requestBlockNum uint64) (b *pbbstre
return nil, false, fmt.Errorf("fetching block data: %w", err)
}

f.logger.Info("converting block", zap.Uint64("block_num", requestBlockNum), zap.Uint32("spec_version", f.lastBlockInfo.specVersionHash))
bstreamBlock, err := convertBlock(blockData, f.lastBlockInfo.specVersionHash)
f.logger.Info("converting block", zap.Uint64("block_num", requestBlockNum), zap.Uint32("spec_version", f.lastBlockInfo.specVersion))
bstreamBlock, err := convertBlock(blockData, f.lastBlockInfo.specVersion)
if err != nil {
f.logger.Warn("converting block", zap.Uint64("block_num", requestBlockNum), zap.Error(err))
return nil, false, fmt.Errorf("converting block %d from rpc response: %w", requestBlockNum, err)
Expand Down Expand Up @@ -133,75 +136,76 @@ func (f *Fetcher) fetchBlockData(_ context.Context, requestedBlockNum uint64) (*
block: block,
}

runtimeVersion, err := client.state.GetRuntimeVersion(blockHash)
if err != nil {
return nil, fmt.Errorf("failed to get runtime version at block hash %s: %w", blockHash.Hex(), err)
}

requestedBlockSpecVersion := uint32(runtimeVersion.SpecVersion)

if f.metadata == nil { // bootstraping
_, err := f.setMetadata(blockHash, client, requestedBlockSpecVersion, requestedBlockNum)
_, err := f.setMetadata(blockHash, client)
if err != nil {
return nil, fmt.Errorf("failed to update metadata: %w", err)
}
}

parentSpecVersion := uint32(0)
if requestedBlockNum > 0 {
storageEvents, err := client.eventsProvider.GetStorageEvents(f.metadata, blockHash)
if err != nil {
return nil, fmt.Errorf("failed to get storage events: %w", err)
}

shouldFetchParentVersion := true
fetchedBlockData.rawEvents = []byte(*storageEvents)
}

// fetch the spec version hash at each block
currentSpecVersionHash, err := f.fetchStorageHash(client, blockHash)
if err != nil {
return nil, fmt.Errorf("failed to get runtime version at block hash %s: %w", blockHash.Hex(), err)
}

previousSpecVersionHash := ""
shouldFetchStorageHash := true
if block.Block.Header.ParentHash == f.lastBlockInfo.blockHash {
shouldFetchParentVersion = false
parentSpecVersion = f.lastBlockInfo.specVersionHash
shouldFetchStorageHash = false
previousSpecVersionHash = f.lastBlockInfo.specVersionHash
}

// edge case, the first block of the chain contains no data except for the block number and hash
if requestedBlockNum == 0 {
shouldFetchParentVersion = false
shouldFetchStorageHash = false
}

f.logger.Info(
"requested block spec version",
zap.Uint32("spec_version", requestedBlockSpecVersion),
zap.Uint64("block_num", requestedBlockNum),
)

if shouldFetchParentVersion {
pSpecVersion, err := f.fetchParentSpecVersion(client, block.Block.Header.ParentHash)
if shouldFetchStorageHash {
blobHash, err := f.fetchStorageHash(client, block.Block.Header.ParentHash)
if err != nil {
return nil, fmt.Errorf("failed to fetch parent spec version: %w", err)
return nil, fmt.Errorf("failed to fetch storage hash: %w", err)
}
f.logger.Info("fetched parent spec version", zap.Uint32("spec_version", pSpecVersion), zap.Uint64("block_num", requestedBlockNum-1))
parentSpecVersion = pSpecVersion
previousSpecVersionHash = blobHash
}

if shouldUpdateMetadata(requestedBlockSpecVersion, parentSpecVersion, isForward(f.lastBlockInfo.blockNum, requestedBlockNum)) {
b, err := f.setMetadata(blockHash, client, requestedBlockSpecVersion, requestedBlockNum)
runtimeSpecVersion := f.lastBlockInfo.specVersion
if shouldUpdateMetadata(currentSpecVersionHash, previousSpecVersionHash, isForward(f.lastBlockInfo.blockNum, requestedBlockNum)) {
runtimeVersion, err := client.state.GetRuntimeVersion(blockHash)
if err != nil {
return nil, fmt.Errorf("failed to update metadata: %w", err)
return nil, fmt.Errorf("failed to get runtime version at block hash %s: %w", blockHash.Hex(), err)
}
fetchedBlockData.rawMetadata = b
}
runtimeSpecVersion = uint32(runtimeVersion.SpecVersion)

if requestedBlockNum > 0 {
storageEvents, err := client.eventsProvider.GetStorageEvents(f.metadata, blockHash)
b, err := f.setMetadata(blockHash, client)
if err != nil {
return nil, fmt.Errorf("failed to get storage events: %w", err)
return nil, fmt.Errorf("failed to update metadata: %w", err)
}

fetchedBlockData.rawEvents = []byte(*storageEvents)
fetchedBlockData.rawMetadata = b
}

f.logger.Info("block spec version", zap.Uint64("version", uint64(runtimeSpecVersion)))
f.lastBlockInfo = &LastBlockInfo{
blockNum: requestedBlockNum,
blockHash: blockHash,
specVersionHash: requestedBlockSpecVersion,
specVersionHash: currentSpecVersionHash,
specVersion: runtimeSpecVersion,
}

return fetchedBlockData, nil
})
}

func (f *Fetcher) setMetadata(blockHash types.Hash, client *Client, requestedBlockSpecVersion uint32, requestedBlockNum uint64) ([]byte, error) {
func (f *Fetcher) setMetadata(blockHash types.Hash, client *Client) ([]byte, error) {
var metadataHex string
err := cclient.CallWithBlockHash(client.client.Client, &metadataHex, "state_getMetadata", &blockHash)
if err != nil {
Expand All @@ -214,7 +218,6 @@ func (f *Fetcher) setMetadata(blockHash types.Hash, client *Client, requestedBlo
return nil, fmt.Errorf("failed to decode metadata: %w", err)
}

f.logger.Info("metadata fetched", zap.Uint32("spec_version", requestedBlockSpecVersion), zap.Uint64("block_num", requestedBlockNum))
f.metadata = metadata

b, err := hex.DecodeString(strings.TrimPrefix(metadataHex, "0x"))
Expand All @@ -224,10 +227,25 @@ func (f *Fetcher) setMetadata(blockHash types.Hash, client *Client, requestedBlo
return b, nil
}

func (f *Fetcher) fetchParentSpecVersion(client *Client, parentBlockHash types.Hash) (uint32, error) {
parentRuntimeVersion, err := client.state.GetRuntimeVersion(parentBlockHash)
func (f *Fetcher) fetchStorageHash(client *Client, blockhash types.Hash) (string, error) {
b, err := hex.DecodeString(storageKeyRuntimeBlob)
if err != nil {
return "", fmt.Errorf("failed to decode storage key: %w", err)
}

key := types.NewStorageKey(b)
hash, err := client.state.GetStorageHash(key, blockhash)
if err != nil {
return "", fmt.Errorf("failed to get storage hash: %w", err)
}

return hash.Hex(), nil
}

func (f *Fetcher) fetchParentSpecVersion(client *Client, parentBlockhash types.Hash) (uint32, error) {
parentRuntimeVersion, err := client.state.GetRuntimeVersion(parentBlockhash)
if err != nil {
return 0, fmt.Errorf("failed to get runtime version at block hash %s: %w", parentBlockHash.Hex(), err)
return 0, fmt.Errorf("failed to get runtime version at block hash %s: %w", parentBlockhash.Hex(), err)
}

return uint32(parentRuntimeVersion.SpecVersion), nil
Expand All @@ -237,12 +255,12 @@ func isForward(lastProcessedBlockNum, requestedBlockNum uint64) bool {
return lastProcessedBlockNum <= requestedBlockNum
}

func shouldUpdateMetadata(specVersion uint32, parentSpecVersion uint32, isForward bool) bool {
func shouldUpdateMetadata(specVersionHash string, parentSpecVersionHash string, isForward bool) bool {
if !isForward {
return false
}

return specVersion != parentSpecVersion
return specVersionHash != parentSpecVersionHash
}

func (f *Fetcher) fetchLatestBlockNum(_ context.Context) (uint64, error) {
Expand Down

0 comments on commit c455a4e

Please sign in to comment.