Skip to content

Commit

Permalink
Updating code logic
Browse files Browse the repository at this point in the history
  • Loading branch information
ArnaudBger committed Apr 8, 2024
1 parent 889d178 commit f4953d3
Showing 1 changed file with 23 additions and 12 deletions.
35 changes: 23 additions & 12 deletions cmd/substreams-sink-noop/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,14 +94,20 @@ func run(cmd *cobra.Command, args []string) error {
substreamsSegmentSize := sflags.MustGetUint64(cmd, "follow-head-substreams-segment")
reversibleSegmentSize := sflags.MustGetUint64(cmd, "follow-head-reversible-segment")
var blockmetaClient pbbmsrvconnect.BlockClient
var apiKey string
if blockmetaUrl != "" {
blockmetaClient = pbbmsrvconnect.NewBlockClient(http.DefaultClient, blockmetaUrl)
apiKey = os.Getenv("SUBSTREAMS_API_KEY")
if apiKey == "" {
return fmt.Errorf("missing SUBSTREAMS_API_KEY environment variable to use blockmeta service")
}
}

signalHandler, isSignaled, _ := cli.SetupSignalHandler(0*time.Second, zlog)
sessionCounter := uint64(0)
stateStorePath := sflags.MustGetString(cmd, "state-store")
var sleepingDuration time.Duration
retryCounter := uint64(0)
for {
if blockmetaClient != nil {
for {
Expand All @@ -111,11 +117,15 @@ func run(cmd *cobra.Command, args []string) error {
case <-signalHandler:
return nil
case <-time.After(sleepingDuration):
// continue
}

sleepingDuration = 5 * time.Second
headBlockNum, err := fetchHeadBlockNum(ctx, blockmetaClient, apiKey)
if err != nil {
return fmt.Errorf("fetching head block: %w", err)
}

blockRangeArg, err = computeBlockRangeFromHead(ctx, blockmetaClient, reversibleSegmentSize, substreamsSegmentSize, blockRangeArg)
blockRangeArg, err = computeBlockRangeFromHead(reversibleSegmentSize, substreamsSegmentSize, blockRangeArg, headBlockNum)
if err != nil {
return fmt.Errorf("computing block range from head: %w", err)
}
Expand Down Expand Up @@ -144,8 +154,11 @@ func run(cmd *cobra.Command, args []string) error {
if startBlock < endBlock-1 {
break
}
if retryCounter%6 == 0 {
zlog.Info("waiting for head to reach next threshold", zap.Uint64("target", uint64(startBlock)+substreamsSegmentSize+reversibleSegmentSize), zap.Uint64("current_head", headBlockNum))
}

zlog.Info("retrying block range computation", zap.Uint64("session_counter", sessionCounter), zap.Int("start_block_computed", startBlock), zap.Int("end_block_computed", endBlock))
retryCounter += 1
}
}

Expand Down Expand Up @@ -300,28 +313,26 @@ func runSink(cmd *cobra.Command, blockRangeArg string, endpoint string, manifest
}
}

func computeBlockRangeFromHead(ctx context.Context, blockmetaClient pbbmsrvconnect.BlockClient, reversibleSegmentSize uint64, substreamsSegmentSize uint64, blockRangeArg string) (string, error) {
func fetchHeadBlockNum(ctx context.Context, blockmetaClient pbbmsrvconnect.BlockClient, apiKey string) (uint64, error) {
request := connect.NewRequest(&pbbmsrv.Empty{})

apiKey := os.Getenv("SUBSTREAMS_API_KEY")
if apiKey == "" {
return "", fmt.Errorf("missing SUBSTREAMS_API_KEY environment variable")
}
request.Header().Set(ApiKeyHeader, apiKey)

headBlock, err := blockmetaClient.Head(ctx, request)
if err != nil {
return "", fmt.Errorf("requesting head block to blockmeta service: %w", err)
return 0, fmt.Errorf("requesting head block to blockmeta service: %w", err)
}

computedEndBlock := ((headBlock.Msg.Num - reversibleSegmentSize) / substreamsSegmentSize) * substreamsSegmentSize
return headBlock.Msg.Num, nil
}
func computeBlockRangeFromHead(reversibleSegmentSize uint64, substreamsSegmentSize uint64, blockRangeArg string, headBlock uint64) (string, error) {
computedEndBlock := ((headBlock - reversibleSegmentSize) / substreamsSegmentSize) * substreamsSegmentSize
blockRangeArray := strings.Split(blockRangeArg, ":")
if len(blockRangeArray) != 2 {
return "", fmt.Errorf("invalid block range format")
}

//The computed block range replace the end block by a computed one
return (blockRangeArray[0] + ":" + strconv.FormatUint(computedEndBlock, 10)), nil
return blockRangeArray[0] + ":" + strconv.FormatUint(computedEndBlock, 10), nil
}

type Sinker struct {
Expand Down

0 comments on commit f4953d3

Please sign in to comment.