Skip to content

Commit

Permalink
update with correct spec versions
Browse files Browse the repository at this point in the history
  • Loading branch information
Eduard-Voiculescu committed Jul 9, 2024
1 parent 097b273 commit 18cf464
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 36 deletions.
4 changes: 2 additions & 2 deletions cmd/firevara/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ func NewFetchCmd(logger *zap.Logger, tracer logging.Tracer) *cobra.Command {
cmd.Flags().String("state-dir", "/data", "location to store the cursor.json")
cmd.Flags().Duration("interval-between-fetch", 0, "interval between fetch")
cmd.Flags().Duration("latest-block-retry-interval", time.Second, "interval between fetch when latest block is not available yet")
cmd.Flags().Int("block-fetch-batch-size", 10, "Number of blocks to fetch in a single batch")

return cmd
}
Expand Down Expand Up @@ -70,7 +69,8 @@ func fetchRunE(logger *zap.Logger, tracer logging.Tracer) firecore.CommandExecut
blockpoller.WithLogger(logger),
)

err = poller.Run(ctx, startBlock, sflags.MustGetInt(cmd, "block-fetch-batch-size"))
// never use batch downloading for blocks
err = poller.Run(ctx, startBlock, 1)
if err != nil {
return fmt.Errorf("running poller: %w", err)
}
Expand Down
119 changes: 85 additions & 34 deletions rpc/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,26 +20,30 @@ import (
"google.golang.org/protobuf/types/known/timestamppb"
)

var specVersion uint32
var metadata string

type FetchedBlockData struct {
latestFinalizedHead uint64
blockHash types.Hash
block *types.SignedBlock
previousSpecVerion uint32
specVersion uint32
events []*parser.Event
metadata []byte
}

type LastBlockInfo struct {
blockNum uint64
blockHash types.Hash
specVersion uint32
}

type Fetcher struct {
gearClients *firecoreRPC.Clients[*Client]

fetchInterval time.Duration
latestBlockRetryInterval time.Duration
logger *zap.Logger
latestBlockNum uint64

latestBlockNum uint64

lastBlockInfo *LastBlockInfo
}

func NewFetcher(
Expand All @@ -52,6 +56,7 @@ func NewFetcher(
fetchInterval: fetchInterval,
latestBlockRetryInterval: latestBlockRetryInterval,
logger: logger,
lastBlockInfo: &LastBlockInfo{},
}
}

Expand All @@ -60,7 +65,7 @@ func (f *Fetcher) IsBlockAvailable(blockNum uint64) bool {
}

func (f *Fetcher) Fetch(ctx context.Context, requestBlockNum uint64) (b *pbbstream.Block, skipped bool, err error) {
f.logger.Info("fetching block", zap.Uint64("block_num", requestBlockNum))
f.logger.Info("gear fetching block", zap.Uint64("block_num", requestBlockNum))

sleepDuration := time.Duration(0)
//TODO: move this logic in the firecore binary, as we do this in all the fetchers
Expand All @@ -82,12 +87,14 @@ func (f *Fetcher) Fetch(ctx context.Context, requestBlockNum uint64) (b *pbbstre

blockData, err := f.fetchBlockData(ctx, requestBlockNum)
if err != nil {
f.logger.Warn("fetching block data", zap.Uint64("block_num", requestBlockNum), zap.Error(err))
return nil, false, fmt.Errorf("fetching block data: %w", err)
}

f.logger.Info("converting block", zap.Uint64("block_num", requestBlockNum))
bstreamBlock, err := convertBlock(blockData)
f.logger.Info("converting block", zap.Uint64("block_num", requestBlockNum), zap.Uint32("spec_version", f.lastBlockInfo.specVersion))
bstreamBlock, err := convertBlock(blockData, f.lastBlockInfo.specVersion)
if err != nil {
f.logger.Warn("converting block", zap.Uint64("block_num", requestBlockNum), zap.Error(err))
return nil, false, fmt.Errorf("converting block %d from rpc response: %w", requestBlockNum, err)
}

Expand All @@ -96,6 +103,7 @@ func (f *Fetcher) Fetch(ctx context.Context, requestBlockNum uint64) (b *pbbstre

func (f *Fetcher) fetchBlockData(_ context.Context, requestedBlockNum uint64) (*FetchedBlockData, error) {
return firecoreRPC.WithClients(f.gearClients, func(client *Client) (*FetchedBlockData, error) {
f.logger.Info("fetching block data", zap.Uint64("block_num", requestedBlockNum))
latestFinalizedHead, err := client.client.RPC.Chain.GetFinalizedHead()
if err != nil {
return nil, fmt.Errorf("unable to fetch latest finalized head: %w", err)
Expand Down Expand Up @@ -128,38 +136,81 @@ func (f *Fetcher) fetchBlockData(_ context.Context, requestedBlockNum uint64) (*
return nil, fmt.Errorf("failed to get events: %w", err)
}
fetchedBlockData.events = events
}

previousBlockhash := block.Block.Header.ParentHash
previousRuntimeVersion, err := client.state.GetRuntimeVersion(previousBlockhash)
if err != nil {
return nil, fmt.Errorf("failed to get runtime version at previous block hash %s: %w", previousBlockhash.Hex(), err)
}
runtimeVersion, err := client.state.GetRuntimeVersion(blockHash)
if err != nil {
return nil, fmt.Errorf("failed to get runtime version at block hash %s: %w", blockHash.Hex(), err)
}

requestedBlockSpecVersion := uint32(runtimeVersion.SpecVersion)
parentSpecVersion := uint32(0)

shouldFetchParentVersion := true
if block.Block.Header.ParentHash == f.lastBlockInfo.blockHash {
shouldFetchParentVersion = false
parentSpecVersion = f.lastBlockInfo.specVersion
}

if requestedBlockNum == 0 {
shouldFetchParentVersion = false
}

previousSpecVersion := uint32(previousRuntimeVersion.SpecVersion)
f.logger.Info(
"requested block spec version",
zap.Uint32("spec_version", requestedBlockSpecVersion),
zap.Uint64("block_num", requestedBlockNum),
)

// if the previous spec version is different from the current specVersion, we fetch the metadata and then we store the metadata
if specVersion != previousSpecVersion {
runtimeVersion, err := client.state.GetRuntimeVersion(blockHash)
if err != nil {
return nil, fmt.Errorf("failed to get runtime version at block hash %s: %w", blockHash.Hex(), err)
}
if shouldFetchParentVersion {
pSpecVersion, err := f.fetchParentSpecVersion(client, block.Block.Header.ParentHash)
if err != nil {
return nil, fmt.Errorf("failed to fetch parent spec version: %w", err)
}
f.logger.Info("fetched parent spec version", zap.Uint32("spec_version", pSpecVersion), zap.Uint64("block_num", requestedBlockNum-1))
parentSpecVersion = pSpecVersion
}

specVersion = uint32(runtimeVersion.SpecVersion)
err = cclient.CallWithBlockHash(client.client.Client, &metadata, "state_getMetadata", &blockHash)
if err != nil {
return nil, err
}
if shouldUpdateMetadata(requestedBlockSpecVersion, parentSpecVersion, isForward(f.lastBlockInfo.blockNum, requestedBlockNum)) {
err := cclient.CallWithBlockHash(client.client.Client, &fetchedBlockData.metadata, "state_getMetadata", &blockHash)
if err != nil {
return nil, fmt.Errorf("failed to get metadata: %w", err)
}

fetchedBlockData.previousSpecVerion = previousSpecVersion
fetchedBlockData.specVersion = specVersion
fetchedBlockData.metadata = []byte(metadata)
f.logger.Info("metadata fetched", zap.Uint32("spec_version", requestedBlockSpecVersion), zap.Uint64("block_num", requestedBlockNum))
}

f.lastBlockInfo = &LastBlockInfo{
blockNum: requestedBlockNum,
blockHash: blockHash,
specVersion: requestedBlockSpecVersion,
}

return fetchedBlockData, nil
})
}

func (f *Fetcher) fetchParentSpecVersion(client *Client, parentBlockHash types.Hash) (uint32, error) {
parentRuntimeVersion, err := client.state.GetRuntimeVersion(parentBlockHash)
if err != nil {
return 0, fmt.Errorf("failed to get runtime version at block hash %s: %w", parentBlockHash.Hex(), err)
}

return uint32(parentRuntimeVersion.SpecVersion), nil
}

func isForward(lastProcessedBlockNum, requestedBlockNum uint64) bool {
return lastProcessedBlockNum <= requestedBlockNum
}

func shouldUpdateMetadata(specVersion uint32, parentSpecVersion uint32, isForward bool) bool {
if !isForward {
return false
}

return specVersion != parentSpecVersion
}

func (f *Fetcher) fetchLatestBlockNum(_ context.Context) (uint64, error) {
return firecoreRPC.WithClients(f.gearClients, func(client *Client) (uint64, error) {
header, err := client.client.RPC.Chain.GetHeaderLatest()
Expand All @@ -170,7 +221,7 @@ func (f *Fetcher) fetchLatestBlockNum(_ context.Context) (uint64, error) {
})
}

func convertBlock(data *FetchedBlockData) (*pbbstream.Block, error) {
func convertBlock(data *FetchedBlockData, specVersion uint32) (*pbbstream.Block, error) {
convertedEvents, err := convertEvents(data.events)
if err != nil {
return nil, fmt.Errorf("failed to convert events: %w", err)
Expand All @@ -183,7 +234,7 @@ func convertBlock(data *FetchedBlockData) (*pbbstream.Block, error) {
block := &pbgear.Block{
Number: uint64(b.Block.Header.Number),
Hash: blockHash.Hex(),
Header: convertHeader(data),
Header: convertHeader(data, specVersion, data.metadata),
Extrinsics: convertExtrinsics(b.Block.Extrinsics),
Events: convertedEvents,
DigestItems: convertLogs(b.Block.Header.Digest),
Expand Down Expand Up @@ -228,7 +279,7 @@ func convertBlock(data *FetchedBlockData) (*pbbstream.Block, error) {
return bstreamBlock, nil
}

func convertHeader(fetchedBlockData *FetchedBlockData) *pbgear.Header {
func convertHeader(fetchedBlockData *FetchedBlockData, specVersion uint32, metadata []byte) *pbgear.Header {
h := &pbgear.Header{
ParentHash: fetchedBlockData.block.Block.Header.ParentHash.Hex(),
StateRoot: fetchedBlockData.block.Block.Header.StateRoot.Hex(),
Expand All @@ -238,8 +289,8 @@ func convertHeader(fetchedBlockData *FetchedBlockData) *pbgear.Header {

// set the metadata if the previous spec version is different from the current spec version
// this means that the metadata has been updated
if fetchedBlockData.previousSpecVerion != fetchedBlockData.specVersion {
h.UpdatedMetadata = []byte(fetchedBlockData.metadata)
if len(metadata) > 0 {
h.UpdatedMetadata = metadata
}

return h
Expand Down

0 comments on commit 18cf464

Please sign in to comment.