Skip to content

Commit

Permalink
Merge pull request #2 from streamingfast/feature/bm-powered-sink
Browse files Browse the repository at this point in the history
Feature/bm powered sink
  • Loading branch information
ArnaudBger authored May 9, 2024
2 parents 23b0ba4 + d18b708 commit 8a795bf
Show file tree
Hide file tree
Showing 7 changed files with 309 additions and 135 deletions.
1 change: 1 addition & 0 deletions .github/workflows/docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ on:
- "*"
branches:
- develop
- feature/*

env:
REGISTRY: ghcr.io
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ on:
branches:
- master
- develop
- feature/*
pull_request:
branches:
- "**"
Expand Down
4 changes: 3 additions & 1 deletion cmd/substreams-sink-noop/head_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ func (s *HeadTracker) Start() {

go func() {
activeCursor := ""
backOff := backoff.WithContext(backoff.NewExponentialBackOff(), ctx)
bo := backoff.NewExponentialBackOff()
bo.MaxElapsedTime = 0
backOff := backoff.WithContext(bo, ctx)
receivedMessage := false

for {
Expand Down
196 changes: 169 additions & 27 deletions cmd/substreams-sink-noop/main.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
package main

import (
"connectrpc.com/connect"
"context"
"crypto/sha256"
"fmt"
pbbmsrv "github.com/streamingfast/blockmeta-service/server/pb/sf/blockmeta/v2"
"github.com/streamingfast/blockmeta-service/server/pb/sf/blockmeta/v2/pbbmsrvconnect"
"gopkg.in/yaml.v3"
"hash"
"net/http"
"os"
"strconv"
"strings"
"time"

"github.com/spf13/cobra"
Expand Down Expand Up @@ -54,6 +62,9 @@ func main() {
flags.String("state-store", "./state.yaml", "Output path where to store latest received cursor, if empty, cursor will not be persisted")
flags.String("api-listen-addr", ":8080", "Rest API to manage deployment")
flags.Uint64("print-output-data-hash-interval", 0, "If non-zero, will hash the output for quickly comparing for differences")
flags.Uint64("follow-head-substreams-segment", 1000, "")
flags.String("follow-head-blockmeta-url", "", "Block meta URL to follow head block, when provided, the sink enable the follow head mode (if block range not provided)")
flags.Uint64("follow-head-reversible-segment", 100, "Segment size for reversible block")
}),
PersistentFlags(func(flags *pflag.FlagSet) {
flags.String("metrics-listen-addr", ":9102", "If non-empty, the process will listen on this address to server Prometheus metrics")
Expand All @@ -65,13 +76,10 @@ func main() {
)
}

func run(cmd *cobra.Command, args []string) error {
app := shutter.New()
const ApiKeyHeader = "x-api-key"

ctx, cancelApp := context.WithCancel(cmd.Context())
app.OnTerminating(func(_ error) {
cancelApp()
})
func run(cmd *cobra.Command, args []string) error {
ctx := cmd.Context()

endpoint := args[0]
manifestPath := args[1]
Expand All @@ -81,6 +89,121 @@ func run(cmd *cobra.Command, args []string) error {
blockRangeArg = args[3]
}

var err error
blockmetaUrl := sflags.MustGetString(cmd, "follow-head-blockmeta-url")
substreamsSegmentSize := sflags.MustGetUint64(cmd, "follow-head-substreams-segment")
reversibleSegmentSize := sflags.MustGetUint64(cmd, "follow-head-reversible-segment")
var blockmetaClient pbbmsrvconnect.BlockClient
var apiKey string
if blockmetaUrl != "" {
blockmetaClient = pbbmsrvconnect.NewBlockClient(http.DefaultClient, blockmetaUrl)
apiKey = os.Getenv("SUBSTREAMS_API_KEY")
if apiKey == "" {
return fmt.Errorf("missing SUBSTREAMS_API_KEY environment variable to use blockmeta service")
}
}

signalHandler, isSignaled, _ := cli.SetupSignalHandler(0*time.Second, zlog)
sessionCounter := uint64(0)
stateStorePath := sflags.MustGetString(cmd, "state-store")
var sleepingDuration time.Duration
retryCounter := uint64(0)
for {
if blockmetaClient != nil {
for {
select {
case <-ctx.Done():
return nil
case <-signalHandler:
return nil
case <-time.After(sleepingDuration):
}

sleepingDuration = 5 * time.Second
headBlockNum, err := fetchHeadBlockNum(ctx, blockmetaClient, apiKey)
if err != nil {
return fmt.Errorf("fetching head block: %w", err)
}

blockRangeArg, err = computeBlockRangeFromHead(reversibleSegmentSize, substreamsSegmentSize, blockRangeArg, headBlockNum)
if err != nil {
return fmt.Errorf("computing block range from head: %w", err)
}

startBlockString := strings.Split(blockRangeArg, ":")[0]
startBlock, err := strconv.Atoi(startBlockString)
if err != nil {
return fmt.Errorf("converting start block to integer: %w", err)
}

computedEndBlock := strings.Split(blockRangeArg, ":")[1]
endBlock, err := strconv.Atoi(computedEndBlock)
if err != nil {
return fmt.Errorf("converting start block to integer: %w", err)
}

cursorExisting, extractedBlockNumber, err := readBlockNumFromCursor(stateStorePath)
if err != nil {
return fmt.Errorf("reading start block from state path: %w", err)
}

if cursorExisting {
startBlock = int(extractedBlockNumber)
}

if startBlock < endBlock-1 {
break
}
if retryCounter%6 == 0 {
zlog.Info("waiting for head to reach next threshold", zap.Uint64("target", uint64(startBlock)+substreamsSegmentSize+reversibleSegmentSize), zap.Uint64("current_head", headBlockNum))
}

retryCounter += 1
}
}

zlog.Info("starting sink session", zap.Uint64("session_counter", sessionCounter))
err = runSink(cmd, blockRangeArg, endpoint, manifestPath, moduleName, zlog, tracer, signalHandler, stateStorePath)
if err != nil {
return err
}

if blockmetaClient == nil {
return nil
}

if isSignaled.Load() {
return nil
}

sessionCounter += 1
zlog.Info("sleeping until next session", zap.Uint64("session_counter", sessionCounter))
}
}

func readBlockNumFromCursor(stateStorePath string) (cursorExisting bool, startBlock uint64, err error) {
content, err := os.ReadFile(stateStorePath)
if err != nil {
if os.IsNotExist(err) {
return false, 0, nil
}
return false, 0, fmt.Errorf("reading cursor state file: %w", err)
}

state := syncState{}
if err = yaml.Unmarshal(content, &state); err != nil {
return false, 0, fmt.Errorf("unmarshal state file %q: %w", stateStorePath, err)
}

return true, state.Block.Number, nil
}
func runSink(cmd *cobra.Command, blockRangeArg string, endpoint string, manifestPath string, moduleName string, zlog *zap.Logger, tracer logging.Tracer, signalHandler <-chan os.Signal, stateStorePath string) error {
app := shutter.New()
ctx, cancelApp := context.WithCancel(cmd.Context())
app.OnTerminating(func(_ error) {
cancelApp()
})

baseSinker, err := sink.NewFromViper(cmd, sink.IgnoreOutputModuleType, endpoint, manifestPath, moduleName, blockRangeArg, zlog, tracer,
sink.WithBlockDataBuffer(0),
)
Expand All @@ -94,10 +217,11 @@ func run(cmd *cobra.Command, args []string) error {

apiListenAddr := sflags.MustGetString(cmd, "api-listen-addr")
cleanState := sflags.MustGetBool(cmd, "clean")
stateStorePath := sflags.MustGetString(cmd, "state-store")
blockRange := sinker.BlockRange()

zlog.Info("consuming substreams",
managementApi := NewManager(apiListenAddr)

zlog.Info("start new substreams consumption session",
zap.String("substreams_endpoint", endpoint),
zap.String("manifest_path", manifestPath),
zap.String("module_name", moduleName),
Expand All @@ -112,7 +236,9 @@ func run(cmd *cobra.Command, args []string) error {

headFetcher := NewHeadTracker(headTrackerClient, headTrackerCallOpts, headTrackerHeaders)
app.OnTerminating(func(_ error) { headFetcher.Close() })
headFetcher.OnTerminated(func(err error) { app.Shutdown(err) })
headFetcher.OnTerminated(func(err error) {
app.Shutdown(err)
})

sinker.headFetcher = headFetcher

Expand All @@ -123,16 +249,21 @@ func run(cmd *cobra.Command, args []string) error {

stats := NewStats(stopBlock, headFetcher)
app.OnTerminating(func(_ error) { stats.Close() })
stats.OnTerminated(func(err error) { app.Shutdown(err) })
stats.OnTerminated(func(err error) {
app.Shutdown(err)
})

stateStore := NewStateStore(stateStorePath, func() (*sink.Cursor, bool, bool) {
return sinker.activeCursor, sinker.backprocessingCompleted, sinker.headBlockReached
return sinker.activeCursor, sinker.backprocessingCompleted, sinker.headBlockReachedMetric
})
app.OnTerminating(func(_ error) { stateStore.Close() })
stateStore.OnTerminated(func(err error) { app.Shutdown(err) })
stateStore.OnTerminated(func(err error) {
app.Shutdown(err)
})

managementApi := NewManager(apiListenAddr)
managementApi.OnTerminated(func(err error) { app.Shutdown(err) })
managementApi.OnTerminated(func(err error) {
app.Shutdown(err)
})
app.OnTerminating(func(_ error) {
if managementApi.shouldResetState {
if err := stateStore.Delete(); err != nil {
Expand All @@ -145,7 +276,6 @@ func run(cmd *cobra.Command, args []string) error {
if !cleanState {
cursor, _, err := stateStore.Read()
cli.NoError(err, "Unable to read state store")

sinker.activeCursor = sink.MustNewCursor(cursor)
}

Expand All @@ -166,31 +296,43 @@ func run(cmd *cobra.Command, args []string) error {

go sinker.Run(ctx)

zlog.Info("ready, waiting for signal to quit")

signalHandler, isSignaled, _ := cli.SetupSignalHandler(0*time.Second, zlog)
select {
case <-signalHandler:
go app.Shutdown(nil)
break
case <-app.Terminating():
zlog.Info("run terminating", zap.Bool("from_signal", isSignaled.Load()), zap.Bool("with_error", app.Err() != nil))
break
zlog.Info("run terminating", zap.Bool("with_error", app.Err() != nil))
}

zlog.Info("waiting for run termination")
select {
case <-app.Terminated():
return app.Err()
case <-time.After(30 * time.Second):
zlog.Warn("application did not terminate within 30s")
return app.Err()
}
}

if err := app.Err(); err != nil {
return err
func fetchHeadBlockNum(ctx context.Context, blockmetaClient pbbmsrvconnect.BlockClient, apiKey string) (uint64, error) {
request := connect.NewRequest(&pbbmsrv.Empty{})
request.Header().Set(ApiKeyHeader, apiKey)

headBlock, err := blockmetaClient.Head(ctx, request)
if err != nil {
return 0, fmt.Errorf("requesting head block to blockmeta service: %w", err)
}

zlog.Info("run terminated gracefully")
return nil
return headBlock.Msg.Num, nil
}
func computeBlockRangeFromHead(reversibleSegmentSize uint64, substreamsSegmentSize uint64, blockRangeArg string, headBlock uint64) (string, error) {
computedEndBlock := ((headBlock - reversibleSegmentSize) / substreamsSegmentSize) * substreamsSegmentSize
blockRangeArray := strings.Split(blockRangeArg, ":")
if len(blockRangeArray) != 2 {
return "", fmt.Errorf("invalid block range format")
}

//The computed block range replace the end block by a computed one
return blockRangeArray[0] + ":" + strconv.FormatUint(computedEndBlock, 10), nil
}

type Sinker struct {
Expand All @@ -199,7 +341,7 @@ type Sinker struct {
headFetcher *HeadTracker

activeCursor *sink.Cursor
headBlockReached bool
headBlockReachedMetric bool
outputDataHash *dataHasher
backprocessingCompleted bool
}
Expand All @@ -220,7 +362,7 @@ func (s *Sinker) HandleBlockScopedData(ctx context.Context, data *pbsubstreamsrp

chainHeadBlock, found := s.headFetcher.Current()
if found && block.Num() >= chainHeadBlock.Num() {
s.headBlockReached = true
s.headBlockReachedMetric = true
HeadBlockReached.SetUint64(1)
}

Expand Down
1 change: 0 additions & 1 deletion cmd/substreams-sink-noop/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ func (s *StateStore) Read() (cursor string, block bstream.BlockRef, err error) {
s.state.LastSyncedAt = s.state.LastSyncedAt.Local()
s.state.BackprocessingCompletedAt = s.state.BackprocessingCompletedAt.Local()
s.state.HeadBlockReachedAt = s.state.HeadBlockReachedAt.Local()

return s.state.Cursor, bstream.NewBlockRef(s.state.Block.ID, s.state.Block.Number), nil
}

Expand Down
Loading

0 comments on commit 8a795bf

Please sign in to comment.