Skip to content

Commit

Permalink
Fixed logic impl
Browse files Browse the repository at this point in the history
  • Loading branch information
ArnaudBger committed Apr 4, 2024
1 parent 0ac5353 commit 889d178
Show file tree
Hide file tree
Showing 4 changed files with 188 additions and 82 deletions.
187 changes: 144 additions & 43 deletions cmd/substreams-sink-noop/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ import (
"fmt"
pbbmsrv "github.com/streamingfast/blockmeta-service/server/pb/sf/blockmeta/v2"
"github.com/streamingfast/blockmeta-service/server/pb/sf/blockmeta/v2/pbbmsrvconnect"
"gopkg.in/yaml.v3"
"hash"
"net/http"
"net/url"
"os"
"strconv"
"strings"
"time"

"github.com/spf13/cobra"
Expand Down Expand Up @@ -60,7 +62,9 @@ func main() {
flags.String("state-store", "./state.yaml", "Output path where to store latest received cursor, if empty, cursor will not be persisted")
flags.String("api-listen-addr", ":8080", "Rest API to manage deployment")
flags.Uint64("print-output-data-hash-interval", 0, "If non-zero, will hash the output for quickly comparing for differences")
flags.String("blockmeta-api-key", "...", "Blockmeta service api key")
flags.Uint64("follow-head-substreams-segment", 1000, "")
flags.String("follow-head-blockmeta-url", "", "Block meta URL to follow head block, when provided, the sink enable the follow head mode (if block range not provided)")
flags.Uint64("follow-head-reversible-segment", 100, "Segment size for reversible block")
}),
PersistentFlags(func(flags *pflag.FlagSet) {
flags.String("metrics-listen-addr", ":9102", "If non-empty, the process will listen on this address to server Prometheus metrics")
Expand All @@ -75,12 +79,7 @@ func main() {
const ApiKeyHeader = "x-api-key"

func run(cmd *cobra.Command, args []string) error {
app := shutter.New()

ctx, cancelApp := context.WithCancel(cmd.Context())
app.OnTerminating(func(_ error) {
cancelApp()
})
ctx := cmd.Context()

endpoint := args[0]
manifestPath := args[1]
Expand All @@ -90,27 +89,108 @@ func run(cmd *cobra.Command, args []string) error {
blockRangeArg = args[3]
}

blockmetaApiKey := sflags.MustGetString(cmd, "blockmeta-api-key")
if blockRangeArg == "" && blockmetaApiKey != "" {
blockmetaUrl := &url.URL{
Scheme: "https",
Host: endpoint,
}
var err error
blockmetaUrl := sflags.MustGetString(cmd, "follow-head-blockmeta-url")
substreamsSegmentSize := sflags.MustGetUint64(cmd, "follow-head-substreams-segment")
reversibleSegmentSize := sflags.MustGetUint64(cmd, "follow-head-reversible-segment")
var blockmetaClient pbbmsrvconnect.BlockClient
if blockmetaUrl != "" {
blockmetaClient = pbbmsrvconnect.NewBlockClient(http.DefaultClient, blockmetaUrl)
}

blockmetaClient := pbbmsrvconnect.NewBlockClient(http.DefaultClient, blockmetaUrl.String())
fmt.Println(blockmetaUrl.String())
request := connect.NewRequest(&pbbmsrv.Empty{})
request.Header().Set(ApiKeyHeader, blockmetaApiKey)
signalHandler, isSignaled, _ := cli.SetupSignalHandler(0*time.Second, zlog)
sessionCounter := uint64(0)
stateStorePath := sflags.MustGetString(cmd, "state-store")
var sleepingDuration time.Duration
for {
if blockmetaClient != nil {
for {
select {
case <-ctx.Done():
return nil
case <-signalHandler:
return nil
case <-time.After(sleepingDuration):
// continue
}
sleepingDuration = 5 * time.Second

blockRangeArg, err = computeBlockRangeFromHead(ctx, blockmetaClient, reversibleSegmentSize, substreamsSegmentSize, blockRangeArg)
if err != nil {
return fmt.Errorf("computing block range from head: %w", err)
}

startBlockString := strings.Split(blockRangeArg, ":")[0]
startBlock, err := strconv.Atoi(startBlockString)
if err != nil {
return fmt.Errorf("converting start block to integer: %w", err)
}

computedEndBlock := strings.Split(blockRangeArg, ":")[1]
endBlock, err := strconv.Atoi(computedEndBlock)
if err != nil {
return fmt.Errorf("converting start block to integer: %w", err)
}

cursorExisting, extractedBlockNumber, err := readBlockNumFromCursor(stateStorePath)
if err != nil {
return fmt.Errorf("reading start block from state path: %w", err)
}

if cursorExisting {
startBlock = int(extractedBlockNumber)
}

if startBlock < endBlock-1 {
break
}

zlog.Info("retrying block range computation", zap.Uint64("session_counter", sessionCounter), zap.Int("start_block_computed", startBlock), zap.Int("end_block_computed", endBlock))
}
}

headBlock, err := blockmetaClient.Head(ctx, request)
zlog.Info("starting sink session", zap.Uint64("session_counter", sessionCounter))
err = runSink(cmd, blockRangeArg, endpoint, manifestPath, moduleName, zlog, tracer, signalHandler, stateStorePath)
if err != nil {
return fmt.Errorf("requesting head block to blockmeta service: %w", err)
return err
}

if blockmetaClient == nil {
return nil
}

if isSignaled.Load() {
return nil
}

blockRangeArg = ":" + strconv.FormatUint(headBlock.Msg.Num, 10)
sessionCounter += 1
zlog.Info("sleeping until next session", zap.Uint64("session_counter", sessionCounter))
}
}

func readBlockNumFromCursor(stateStorePath string) (cursorExisting bool, startBlock uint64, err error) {
content, err := os.ReadFile(stateStorePath)
if err != nil {
if os.IsNotExist(err) {
return false, 0, nil
}
return false, 0, fmt.Errorf("reading cursor state file: %w", err)
}

//TODO: IF BLOCKMETA ADDRESS IS MENTIONED, GET THE HEAD BLOCK FROM BLOCK META...
state := syncState{}
if err = yaml.Unmarshal(content, &state); err != nil {
return false, 0, fmt.Errorf("unmarshal state file %q: %w", stateStorePath, err)
}

return true, state.Block.Number, nil
}
func runSink(cmd *cobra.Command, blockRangeArg string, endpoint string, manifestPath string, moduleName string, zlog *zap.Logger, tracer logging.Tracer, signalHandler <-chan os.Signal, stateStorePath string) error {
app := shutter.New()
ctx, cancelApp := context.WithCancel(cmd.Context())
app.OnTerminating(func(_ error) {
cancelApp()
})

baseSinker, err := sink.NewFromViper(cmd, sink.IgnoreOutputModuleType, endpoint, manifestPath, moduleName, blockRangeArg, zlog, tracer,
sink.WithBlockDataBuffer(0),
)
Expand All @@ -124,10 +204,11 @@ func run(cmd *cobra.Command, args []string) error {

apiListenAddr := sflags.MustGetString(cmd, "api-listen-addr")
cleanState := sflags.MustGetBool(cmd, "clean")
stateStorePath := sflags.MustGetString(cmd, "state-store")
blockRange := sinker.BlockRange()

zlog.Info("consuming substreams",
managementApi := NewManager(apiListenAddr)

zlog.Info("start new substreams consumption session",
zap.String("substreams_endpoint", endpoint),
zap.String("manifest_path", manifestPath),
zap.String("module_name", moduleName),
Expand All @@ -142,7 +223,9 @@ func run(cmd *cobra.Command, args []string) error {

headFetcher := NewHeadTracker(headTrackerClient, headTrackerCallOpts, headTrackerHeaders)
app.OnTerminating(func(_ error) { headFetcher.Close() })
headFetcher.OnTerminated(func(err error) { app.Shutdown(err) })
headFetcher.OnTerminated(func(err error) {
app.Shutdown(err)
})

sinker.headFetcher = headFetcher

Expand All @@ -153,16 +236,21 @@ func run(cmd *cobra.Command, args []string) error {

stats := NewStats(stopBlock, headFetcher)
app.OnTerminating(func(_ error) { stats.Close() })
stats.OnTerminated(func(err error) { app.Shutdown(err) })
stats.OnTerminated(func(err error) {
app.Shutdown(err)
})

stateStore := NewStateStore(stateStorePath, func() (*sink.Cursor, bool, bool) {
return sinker.activeCursor, sinker.backprocessingCompleted, sinker.headBlockReached
return sinker.activeCursor, sinker.backprocessingCompleted, sinker.headBlockReachedMetric
})
app.OnTerminating(func(_ error) { stateStore.Close() })
stateStore.OnTerminated(func(err error) { app.Shutdown(err) })
stateStore.OnTerminated(func(err error) {
app.Shutdown(err)
})

managementApi := NewManager(apiListenAddr)
managementApi.OnTerminated(func(err error) { app.Shutdown(err) })
managementApi.OnTerminated(func(err error) {
app.Shutdown(err)
})
app.OnTerminating(func(_ error) {
if managementApi.shouldResetState {
if err := stateStore.Delete(); err != nil {
Expand All @@ -175,7 +263,6 @@ func run(cmd *cobra.Command, args []string) error {
if !cleanState {
cursor, _, err := stateStore.Read()
cli.NoError(err, "Unable to read state store")

sinker.activeCursor = sink.MustNewCursor(cursor)
}

Expand All @@ -196,31 +283,45 @@ func run(cmd *cobra.Command, args []string) error {

go sinker.Run(ctx)

zlog.Info("ready, waiting for signal to quit")

signalHandler, isSignaled, _ := cli.SetupSignalHandler(0*time.Second, zlog)
select {
case <-signalHandler:
go app.Shutdown(nil)
break
case <-app.Terminating():
zlog.Info("run terminating", zap.Bool("from_signal", isSignaled.Load()), zap.Bool("with_error", app.Err() != nil))
break
zlog.Info("run terminating", zap.Bool("with_error", app.Err() != nil))
}

zlog.Info("waiting for run termination")
select {
case <-app.Terminated():
return app.Err()
case <-time.After(30 * time.Second):
zlog.Warn("application did not terminate within 30s")
return app.Err()
}
}

func computeBlockRangeFromHead(ctx context.Context, blockmetaClient pbbmsrvconnect.BlockClient, reversibleSegmentSize uint64, substreamsSegmentSize uint64, blockRangeArg string) (string, error) {
request := connect.NewRequest(&pbbmsrv.Empty{})

apiKey := os.Getenv("SUBSTREAMS_API_KEY")
if apiKey == "" {
return "", fmt.Errorf("missing SUBSTREAMS_API_KEY environment variable")
}
request.Header().Set(ApiKeyHeader, apiKey)

if err := app.Err(); err != nil {
return err
headBlock, err := blockmetaClient.Head(ctx, request)
if err != nil {
return "", fmt.Errorf("requesting head block to blockmeta service: %w", err)
}

zlog.Info("run terminated gracefully")
return nil
computedEndBlock := ((headBlock.Msg.Num - reversibleSegmentSize) / substreamsSegmentSize) * substreamsSegmentSize
blockRangeArray := strings.Split(blockRangeArg, ":")
if len(blockRangeArray) != 2 {
return "", fmt.Errorf("invalid block range format")
}

//The computed block range replace the end block by a computed one
return (blockRangeArray[0] + ":" + strconv.FormatUint(computedEndBlock, 10)), nil
}

type Sinker struct {
Expand All @@ -229,7 +330,7 @@ type Sinker struct {
headFetcher *HeadTracker

activeCursor *sink.Cursor
headBlockReached bool
headBlockReachedMetric bool
outputDataHash *dataHasher
backprocessingCompleted bool
}
Expand All @@ -250,7 +351,7 @@ func (s *Sinker) HandleBlockScopedData(ctx context.Context, data *pbsubstreamsrp

chainHeadBlock, found := s.headFetcher.Current()
if found && block.Num() >= chainHeadBlock.Num() {
s.headBlockReached = true
s.headBlockReachedMetric = true
HeadBlockReached.SetUint64(1)
}

Expand Down
1 change: 0 additions & 1 deletion cmd/substreams-sink-noop/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ func (s *StateStore) Read() (cursor string, block bstream.BlockRef, err error) {
s.state.LastSyncedAt = s.state.LastSyncedAt.Local()
s.state.BackprocessingCompletedAt = s.state.BackprocessingCompletedAt.Local()
s.state.HeadBlockReachedAt = s.state.HeadBlockReachedAt.Local()

return s.state.Cursor, bstream.NewBlockRef(s.state.Block.ID, s.state.Block.Number), nil
}

Expand Down
26 changes: 14 additions & 12 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,20 @@ require (
github.com/streamingfast/shutter v1.5.0
github.com/streamingfast/substreams v1.3.7
github.com/streamingfast/substreams-sink v0.3.4
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.44.0
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.47.0
go.uber.org/zap v1.26.0
golang.org/x/oauth2 v0.15.0
golang.org/x/oauth2 v0.16.0
google.golang.org/grpc v1.61.0
google.golang.org/protobuf v1.32.0
gopkg.in/yaml.v3 v3.0.1
)

require (
cloud.google.com/go v0.111.0 // indirect
cloud.google.com/go v0.112.0 // indirect
cloud.google.com/go/compute v1.23.3 // indirect
cloud.google.com/go/compute/metadata v0.2.3 // indirect
cloud.google.com/go/iam v1.1.5 // indirect
cloud.google.com/go/storage v1.30.1 // indirect
cloud.google.com/go/iam v1.1.6 // indirect
cloud.google.com/go/storage v1.38.0 // indirect
github.com/Azure/azure-pipeline-go v0.2.3 // indirect
github.com/Azure/azure-storage-blob-go v0.14.0 // indirect
github.com/aws/aws-sdk-go v1.44.325 // indirect
Expand All @@ -53,13 +53,14 @@ require (
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.1.0 // indirect
github.com/envoyproxy/go-control-plane v0.11.1 // indirect
github.com/envoyproxy/protoc-gen-validate v1.0.2 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/go-logr/logr v1.4.1 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/s2a-go v0.1.7 // indirect
github.com/google/uuid v1.4.0 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect
github.com/googleapis/gax-go/v2 v2.12.0 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
Expand Down Expand Up @@ -107,14 +108,15 @@ require (
github.com/spf13/cast v1.5.0 // indirect
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/streamingfast/dbin v0.9.1-0.20231117225723-59790c798e2c // indirect
github.com/streamingfast/dstore v0.1.1-0.20240215171730-493ad5a0f537 // indirect
github.com/streamingfast/dstore v0.1.1-0.20240311181234-470a7a84936f // indirect
github.com/streamingfast/opaque v0.0.0-20210811180740-0c01d37ea308 // indirect
github.com/stretchr/testify v1.8.4 // indirect
github.com/subosito/gotenv v1.4.2 // indirect
github.com/teris-io/shortid v0.0.0-20171029131806-771a37caa5cf // indirect
github.com/whyrusleeping/tar-utils v0.0.0-20180509141711-8c6c8ba81d5c // indirect
github.com/yourbasic/graph v0.0.0-20210606180040-8ecfec1c2869 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.47.0 // indirect
go.opentelemetry.io/otel v1.23.1 // indirect
go.opentelemetry.io/otel/metric v1.23.1 // indirect
go.opentelemetry.io/otel/trace v1.23.1 // indirect
Expand All @@ -124,16 +126,16 @@ require (
golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1 // indirect
golang.org/x/mod v0.12.0 // indirect
golang.org/x/net v0.21.0 // indirect
golang.org/x/sync v0.5.0 // indirect
golang.org/x/sync v0.6.0 // indirect
golang.org/x/sys v0.17.0 // indirect
golang.org/x/term v0.17.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/time v0.5.0 // indirect
google.golang.org/api v0.152.0 // indirect
google.golang.org/api v0.162.0 // indirect
google.golang.org/appengine v1.6.8 // indirect
google.golang.org/genproto v0.0.0-20231212172506-995d672761c0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240102182953-50ed04b92917 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240102182953-50ed04b92917 // indirect
google.golang.org/genproto v0.0.0-20240125205218-1f4bbc51befe // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240205150955-31a09d347014 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240125205218-1f4bbc51befe // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
lukechampine.com/blake3 v1.1.7 // indirect
Expand Down
Loading

0 comments on commit 889d178

Please sign in to comment.