From 18cf464b638b9baa6d2dae666b4e93318e964e90 Mon Sep 17 00:00:00 2001 From: Eduard Voiculescu Date: Tue, 9 Jul 2024 17:39:47 -0400 Subject: [PATCH] update with correct spec versions --- cmd/firevara/fetcher.go | 4 +- rpc/fetcher.go | 119 ++++++++++++++++++++++++++++------------ 2 files changed, 87 insertions(+), 36 deletions(-) diff --git a/cmd/firevara/fetcher.go b/cmd/firevara/fetcher.go index 3eb4608..63e11cb 100644 --- a/cmd/firevara/fetcher.go +++ b/cmd/firevara/fetcher.go @@ -26,7 +26,6 @@ func NewFetchCmd(logger *zap.Logger, tracer logging.Tracer) *cobra.Command { cmd.Flags().String("state-dir", "/data", "location to store the cursor.json") cmd.Flags().Duration("interval-between-fetch", 0, "interval between fetch") cmd.Flags().Duration("latest-block-retry-interval", time.Second, "interval between fetch when latest block is not available yet") - cmd.Flags().Int("block-fetch-batch-size", 10, "Number of blocks to fetch in a single batch") return cmd } @@ -70,7 +69,8 @@ func fetchRunE(logger *zap.Logger, tracer logging.Tracer) firecore.CommandExecut blockpoller.WithLogger(logger), ) - err = poller.Run(ctx, startBlock, sflags.MustGetInt(cmd, "block-fetch-batch-size")) + // never use batch downloading for blocks + err = poller.Run(ctx, startBlock, 1) if err != nil { return fmt.Errorf("running poller: %w", err) } diff --git a/rpc/fetcher.go b/rpc/fetcher.go index 49b95b8..8f4e497 100644 --- a/rpc/fetcher.go +++ b/rpc/fetcher.go @@ -20,26 +20,30 @@ import ( "google.golang.org/protobuf/types/known/timestamppb" ) -var specVersion uint32 -var metadata string - type FetchedBlockData struct { latestFinalizedHead uint64 blockHash types.Hash block *types.SignedBlock - previousSpecVerion uint32 - specVersion uint32 events []*parser.Event metadata []byte } +type LastBlockInfo struct { + blockNum uint64 + blockHash types.Hash + specVersion uint32 +} + type Fetcher struct { gearClients *firecoreRPC.Clients[*Client] fetchInterval time.Duration latestBlockRetryInterval time.Duration logger *zap.Logger - latestBlockNum uint64 + + latestBlockNum uint64 + + lastBlockInfo *LastBlockInfo } func NewFetcher( @@ -52,6 +56,7 @@ func NewFetcher( fetchInterval: fetchInterval, latestBlockRetryInterval: latestBlockRetryInterval, logger: logger, + lastBlockInfo: &LastBlockInfo{}, } } @@ -60,7 +65,7 @@ func (f *Fetcher) IsBlockAvailable(blockNum uint64) bool { } func (f *Fetcher) Fetch(ctx context.Context, requestBlockNum uint64) (b *pbbstream.Block, skipped bool, err error) { - f.logger.Info("fetching block", zap.Uint64("block_num", requestBlockNum)) + f.logger.Info("gear fetching block", zap.Uint64("block_num", requestBlockNum)) sleepDuration := time.Duration(0) //TODO: move this logic in the firecore binary, as we do this in all the fetchers @@ -82,12 +87,14 @@ func (f *Fetcher) Fetch(ctx context.Context, requestBlockNum uint64) (b *pbbstre blockData, err := f.fetchBlockData(ctx, requestBlockNum) if err != nil { + f.logger.Warn("fetching block data", zap.Uint64("block_num", requestBlockNum), zap.Error(err)) return nil, false, fmt.Errorf("fetching block data: %w", err) } - f.logger.Info("converting block", zap.Uint64("block_num", requestBlockNum)) - bstreamBlock, err := convertBlock(blockData) + f.logger.Info("converting block", zap.Uint64("block_num", requestBlockNum), zap.Uint32("spec_version", f.lastBlockInfo.specVersion)) + bstreamBlock, err := convertBlock(blockData, f.lastBlockInfo.specVersion) if err != nil { + f.logger.Warn("converting block", zap.Uint64("block_num", requestBlockNum), zap.Error(err)) return nil, false, fmt.Errorf("converting block %d from rpc response: %w", requestBlockNum, err) } @@ -96,6 +103,7 @@ func (f *Fetcher) Fetch(ctx context.Context, requestBlockNum uint64) (b *pbbstre func (f *Fetcher) fetchBlockData(_ context.Context, requestedBlockNum uint64) (*FetchedBlockData, error) { return firecoreRPC.WithClients(f.gearClients, func(client *Client) (*FetchedBlockData, error) { + f.logger.Info("fetching block data", zap.Uint64("block_num", requestedBlockNum)) latestFinalizedHead, err := client.client.RPC.Chain.GetFinalizedHead() if err != nil { return nil, fmt.Errorf("unable to fetch latest finalized head: %w", err) @@ -128,38 +136,81 @@ func (f *Fetcher) fetchBlockData(_ context.Context, requestedBlockNum uint64) (* return nil, fmt.Errorf("failed to get events: %w", err) } fetchedBlockData.events = events + } - previousBlockhash := block.Block.Header.ParentHash - previousRuntimeVersion, err := client.state.GetRuntimeVersion(previousBlockhash) - if err != nil { - return nil, fmt.Errorf("failed to get runtime version at previous block hash %s: %w", previousBlockhash.Hex(), err) - } + runtimeVersion, err := client.state.GetRuntimeVersion(blockHash) + if err != nil { + return nil, fmt.Errorf("failed to get runtime version at block hash %s: %w", blockHash.Hex(), err) + } + + requestedBlockSpecVersion := uint32(runtimeVersion.SpecVersion) + parentSpecVersion := uint32(0) + + shouldFetchParentVersion := true + if block.Block.Header.ParentHash == f.lastBlockInfo.blockHash { + shouldFetchParentVersion = false + parentSpecVersion = f.lastBlockInfo.specVersion + } + + if requestedBlockNum == 0 { + shouldFetchParentVersion = false + } - previousSpecVersion := uint32(previousRuntimeVersion.SpecVersion) + f.logger.Info( + "requested block spec version", + zap.Uint32("spec_version", requestedBlockSpecVersion), + zap.Uint64("block_num", requestedBlockNum), + ) - // if the previous spec version is different from the current specVersion, we fetch the metadata and then we store the metadata - if specVersion != previousSpecVersion { - runtimeVersion, err := client.state.GetRuntimeVersion(blockHash) - if err != nil { - return nil, fmt.Errorf("failed to get runtime version at block hash %s: %w", blockHash.Hex(), err) - } + if shouldFetchParentVersion { + pSpecVersion, err := f.fetchParentSpecVersion(client, block.Block.Header.ParentHash) + if err != nil { + return nil, fmt.Errorf("failed to fetch parent spec version: %w", err) + } + f.logger.Info("fetched parent spec version", zap.Uint32("spec_version", pSpecVersion), zap.Uint64("block_num", requestedBlockNum-1)) + parentSpecVersion = pSpecVersion + } - specVersion = uint32(runtimeVersion.SpecVersion) - err = cclient.CallWithBlockHash(client.client.Client, &metadata, "state_getMetadata", &blockHash) - if err != nil { - return nil, err - } + if shouldUpdateMetadata(requestedBlockSpecVersion, parentSpecVersion, isForward(f.lastBlockInfo.blockNum, requestedBlockNum)) { + err := cclient.CallWithBlockHash(client.client.Client, &fetchedBlockData.metadata, "state_getMetadata", &blockHash) + if err != nil { + return nil, fmt.Errorf("failed to get metadata: %w", err) } - fetchedBlockData.previousSpecVerion = previousSpecVersion - fetchedBlockData.specVersion = specVersion - fetchedBlockData.metadata = []byte(metadata) + f.logger.Info("metadata fetched", zap.Uint32("spec_version", requestedBlockSpecVersion), zap.Uint64("block_num", requestedBlockNum)) + } + + f.lastBlockInfo = &LastBlockInfo{ + blockNum: requestedBlockNum, + blockHash: blockHash, + specVersion: requestedBlockSpecVersion, } return fetchedBlockData, nil }) } +func (f *Fetcher) fetchParentSpecVersion(client *Client, parentBlockHash types.Hash) (uint32, error) { + parentRuntimeVersion, err := client.state.GetRuntimeVersion(parentBlockHash) + if err != nil { + return 0, fmt.Errorf("failed to get runtime version at block hash %s: %w", parentBlockHash.Hex(), err) + } + + return uint32(parentRuntimeVersion.SpecVersion), nil +} + +func isForward(lastProcessedBlockNum, requestedBlockNum uint64) bool { + return lastProcessedBlockNum <= requestedBlockNum +} + +func shouldUpdateMetadata(specVersion uint32, parentSpecVersion uint32, isForward bool) bool { + if !isForward { + return false + } + + return specVersion != parentSpecVersion +} + func (f *Fetcher) fetchLatestBlockNum(_ context.Context) (uint64, error) { return firecoreRPC.WithClients(f.gearClients, func(client *Client) (uint64, error) { header, err := client.client.RPC.Chain.GetHeaderLatest() @@ -170,7 +221,7 @@ func (f *Fetcher) fetchLatestBlockNum(_ context.Context) (uint64, error) { }) } -func convertBlock(data *FetchedBlockData) (*pbbstream.Block, error) { +func convertBlock(data *FetchedBlockData, specVersion uint32) (*pbbstream.Block, error) { convertedEvents, err := convertEvents(data.events) if err != nil { return nil, fmt.Errorf("failed to convert events: %w", err) @@ -183,7 +234,7 @@ func convertBlock(data *FetchedBlockData) (*pbbstream.Block, error) { block := &pbgear.Block{ Number: uint64(b.Block.Header.Number), Hash: blockHash.Hex(), - Header: convertHeader(data), + Header: convertHeader(data, specVersion, data.metadata), Extrinsics: convertExtrinsics(b.Block.Extrinsics), Events: convertedEvents, DigestItems: convertLogs(b.Block.Header.Digest), @@ -228,7 +279,7 @@ func convertBlock(data *FetchedBlockData) (*pbbstream.Block, error) { return bstreamBlock, nil } -func convertHeader(fetchedBlockData *FetchedBlockData) *pbgear.Header { +func convertHeader(fetchedBlockData *FetchedBlockData, specVersion uint32, metadata []byte) *pbgear.Header { h := &pbgear.Header{ ParentHash: fetchedBlockData.block.Block.Header.ParentHash.Hex(), StateRoot: fetchedBlockData.block.Block.Header.StateRoot.Hex(), @@ -238,8 +289,8 @@ func convertHeader(fetchedBlockData *FetchedBlockData) *pbgear.Header { // set the metadata if the previous spec version is different from the current spec version // this means that the metadata has been updated - if fetchedBlockData.previousSpecVerion != fetchedBlockData.specVersion { - h.UpdatedMetadata = []byte(fetchedBlockData.metadata) + if len(metadata) > 0 { + h.UpdatedMetadata = metadata } return h