diff --git a/cmd/substreams-sink-noop/main.go b/cmd/substreams-sink-noop/main.go index 89765b2..b6eb9a9 100644 --- a/cmd/substreams-sink-noop/main.go +++ b/cmd/substreams-sink-noop/main.go @@ -94,14 +94,20 @@ func run(cmd *cobra.Command, args []string) error { substreamsSegmentSize := sflags.MustGetUint64(cmd, "follow-head-substreams-segment") reversibleSegmentSize := sflags.MustGetUint64(cmd, "follow-head-reversible-segment") var blockmetaClient pbbmsrvconnect.BlockClient + var apiKey string if blockmetaUrl != "" { blockmetaClient = pbbmsrvconnect.NewBlockClient(http.DefaultClient, blockmetaUrl) + apiKey = os.Getenv("SUBSTREAMS_API_KEY") + if apiKey == "" { + return fmt.Errorf("missing SUBSTREAMS_API_KEY environment variable to use blockmeta service") + } } signalHandler, isSignaled, _ := cli.SetupSignalHandler(0*time.Second, zlog) sessionCounter := uint64(0) stateStorePath := sflags.MustGetString(cmd, "state-store") var sleepingDuration time.Duration + retryCounter := uint64(0) for { if blockmetaClient != nil { for { @@ -111,11 +117,15 @@ func run(cmd *cobra.Command, args []string) error { case <-signalHandler: return nil case <-time.After(sleepingDuration): - // continue } + sleepingDuration = 5 * time.Second + headBlockNum, err := fetchHeadBlockNum(ctx, blockmetaClient, apiKey) + if err != nil { + return fmt.Errorf("fetching head block: %w", err) + } - blockRangeArg, err = computeBlockRangeFromHead(ctx, blockmetaClient, reversibleSegmentSize, substreamsSegmentSize, blockRangeArg) + blockRangeArg, err = computeBlockRangeFromHead(reversibleSegmentSize, substreamsSegmentSize, blockRangeArg, headBlockNum) if err != nil { return fmt.Errorf("computing block range from head: %w", err) } @@ -144,8 +154,11 @@ func run(cmd *cobra.Command, args []string) error { if startBlock < endBlock-1 { break } + if retryCounter%6 == 0 { + zlog.Info("waiting for head to reach next threshold", zap.Uint64("target", uint64(startBlock)+substreamsSegmentSize+reversibleSegmentSize), zap.Uint64("current_head", headBlockNum)) + } - zlog.Info("retrying block range computation", zap.Uint64("session_counter", sessionCounter), zap.Int("start_block_computed", startBlock), zap.Int("end_block_computed", endBlock)) + retryCounter += 1 } } @@ -300,28 +313,26 @@ func runSink(cmd *cobra.Command, blockRangeArg string, endpoint string, manifest } } -func computeBlockRangeFromHead(ctx context.Context, blockmetaClient pbbmsrvconnect.BlockClient, reversibleSegmentSize uint64, substreamsSegmentSize uint64, blockRangeArg string) (string, error) { +func fetchHeadBlockNum(ctx context.Context, blockmetaClient pbbmsrvconnect.BlockClient, apiKey string) (uint64, error) { request := connect.NewRequest(&pbbmsrv.Empty{}) - - apiKey := os.Getenv("SUBSTREAMS_API_KEY") - if apiKey == "" { - return "", fmt.Errorf("missing SUBSTREAMS_API_KEY environment variable") - } request.Header().Set(ApiKeyHeader, apiKey) headBlock, err := blockmetaClient.Head(ctx, request) if err != nil { - return "", fmt.Errorf("requesting head block to blockmeta service: %w", err) + return 0, fmt.Errorf("requesting head block to blockmeta service: %w", err) } - computedEndBlock := ((headBlock.Msg.Num - reversibleSegmentSize) / substreamsSegmentSize) * substreamsSegmentSize + return headBlock.Msg.Num, nil +} +func computeBlockRangeFromHead(reversibleSegmentSize uint64, substreamsSegmentSize uint64, blockRangeArg string, headBlock uint64) (string, error) { + computedEndBlock := ((headBlock - reversibleSegmentSize) / substreamsSegmentSize) * substreamsSegmentSize blockRangeArray := strings.Split(blockRangeArg, ":") if len(blockRangeArray) != 2 { return "", fmt.Errorf("invalid block range format") } //The computed block range replace the end block by a computed one - return (blockRangeArray[0] + ":" + strconv.FormatUint(computedEndBlock, 10)), nil + return blockRangeArray[0] + ":" + strconv.FormatUint(computedEndBlock, 10), nil } type Sinker struct {