diff --git a/api.go b/api.go new file mode 100644 index 00000000..d9f3876d --- /dev/null +++ b/api.go @@ -0,0 +1,97 @@ +package main + +import ( + "context" + "errors" + "strconv" + "strings" + + "github.com/gagliardetto/solana-go" + "github.com/rpcpool/yellowstone-faithful/compactindexsized" + "github.com/valyala/fasthttp" +) + +func (multi *MultiEpoch) apiHandler(reqCtx *fasthttp.RequestCtx) { + if !reqCtx.IsGet() { + reqCtx.SetStatusCode(fasthttp.StatusMethodNotAllowed) + return + } + // Add a CLI command that takes a config file (or a command line argument) pointing at slot-to-cid and sig-to-cid and looks up the CID for a given block or transaction. + // slot-to-cid API endpoint: /api/v1/slot-to-cid/{slot} + // sig-to-cid API endpoint: /api/v1/sig-to-cid/{sig} + // The API should return the CID as a string. + // The API should return a 404 if the slot or sig is not found. + // The API should return a 500 if there is an internal error. + // The API should return a 400 if the slot or sig is invalid. + // The API should return a 200 if the CID is found. + + if strings.HasPrefix(string(reqCtx.Path()), "/api/v1/slot-to-cid/") { + slotStr := string(reqCtx.Path())[len("/api/v1/slot-to-cid/"):] + slotStr = strings.TrimRight(slotStr, "/") + // try to parse the slot as uint64 + slot, err := strconv.ParseUint(slotStr, 10, 64) + if err != nil { + reqCtx.SetStatusCode(fasthttp.StatusBadRequest) + return + } + // find the epoch that contains the requested slot + epochNumber := CalcEpochForSlot(slot) + epochHandler, err := multi.GetEpoch(epochNumber) + if err != nil { + reqCtx.SetStatusCode(fasthttp.StatusNotFound) // TODO: this means epoch is not available, and probably should be another dedicated status code + return + } + + blockCid, err := epochHandler.FindCidFromSlot(context.TODO(), slot) + if err != nil { + if errors.Is(err, compactindexsized.ErrNotFound) { + reqCtx.SetStatusCode(fasthttp.StatusNotFound) + } else { + reqCtx.SetStatusCode(fasthttp.StatusInternalServerError) + } + return + } + reqCtx.SetStatusCode(fasthttp.StatusOK) + reqCtx.SetBodyString(blockCid.String()) + return + } + if strings.HasPrefix(string(reqCtx.Path()), "/api/v1/sig-to-cid/") { + sigStr := string(reqCtx.Path())[len("/api/v1/sig-to-cid/"):] + sigStr = strings.TrimRight(sigStr, "/") + // parse the signature + sig, err := solana.SignatureFromBase58(sigStr) + if err != nil { + reqCtx.SetStatusCode(fasthttp.StatusBadRequest) + return + } + epochNumber, err := multi.findEpochNumberFromSignature(context.TODO(), sig) + if err != nil { + if errors.Is(err, ErrNotFound) { + reqCtx.SetStatusCode(fasthttp.StatusNotFound) + } else { + reqCtx.SetStatusCode(fasthttp.StatusInternalServerError) + } + return + } + + epochHandler, err := multi.GetEpoch(uint64(epochNumber)) + if err != nil { + reqCtx.SetStatusCode(fasthttp.StatusNotFound) // TODO: this means epoch is not available, and probably should be another dedicated status code + return + } + + transactionCid, err := epochHandler.FindCidFromSignature(context.TODO(), sig) + if err != nil { + if errors.Is(err, compactindexsized.ErrNotFound) { + reqCtx.SetStatusCode(fasthttp.StatusNotFound) + } else { + reqCtx.SetStatusCode(fasthttp.StatusInternalServerError) + } + return + } + reqCtx.SetStatusCode(fasthttp.StatusOK) + reqCtx.SetBodyString(transactionCid.String()) + return + } + reqCtx.SetStatusCode(fasthttp.StatusNotFound) +} diff --git a/cmd-check-deals.go b/cmd-check-deals.go index 276a0983..357dd1be 100644 --- a/cmd-check-deals.go +++ b/cmd-check-deals.go @@ -50,10 +50,13 @@ func (f *commaSeparatedStringSliceFlag) Len() int { return len(f.slice) } +const defaultLotusAPIAddress = "https://api.node.glif.io" + func newCmd_check_deals() *cli.Command { var includePatterns cli.StringSlice var excludePatterns cli.StringSlice var providerAllowlist commaSeparatedStringSliceFlag + var lotusAPIAddress string return &cli.Command{ Name: "check-deals", Description: "Validate remote split car retrieval for the given config files", @@ -79,6 +82,12 @@ func newCmd_check_deals() *cli.Command { Usage: "List of providers to allow checking (comma-separated, can be specified multiple times); will ignore all pieces that correspond to a provider not in the allowlist.", Value: &providerAllowlist, }, + &cli.StringFlag{ + Name: "filecoin-api-address", + Usage: "Address of the filecoin API to find provider info", + Value: defaultLotusAPIAddress, + Destination: &lotusAPIAddress, + }, }, Action: func(c *cli.Context) error { src := c.Args().Slice() @@ -116,7 +125,6 @@ func newCmd_check_deals() *cli.Command { klog.Infof("Provider allowlist: ") } - lotusAPIAddress := "https://api.node.glif.io" cl := jsonrpc.NewClient(lotusAPIAddress) dm := splitcarfetcher.NewMinerInfo( cl, diff --git a/cmd-rpc.go b/cmd-rpc.go index 80e40c1f..685eca4c 100644 --- a/cmd-rpc.go +++ b/cmd-rpc.go @@ -33,6 +33,7 @@ func newCmd_rpc() *cli.Command { var epochLoadConcurrency int var maxCacheSizeMB int var grpcListenOn string + var lotusAPIAddress string return &cli.Command{ Name: "rpc", Usage: "Start a Solana JSON RPC server.", @@ -102,6 +103,12 @@ func newCmd_rpc() *cli.Command { Value: 0, Destination: &maxCacheSizeMB, }, + &cli.StringFlag{ + Name: "filecoin-api-address", + Usage: "Address of the filecoin API to find provider info", + Value: defaultLotusAPIAddress, + Destination: &lotusAPIAddress, + }, ), Action: func(c *cli.Context) error { if listenOn == "" && grpcListenOn == "" { @@ -145,7 +152,6 @@ func newCmd_rpc() *cli.Command { klog.Infof("Loaded %d epoch configs", len(configs)) klog.Info("Initializing epochs async...") - lotusAPIAddress := "https://api.node.glif.io" cl := jsonrpc.NewClient(lotusAPIAddress) minerInfo := splitcarfetcher.NewMinerInfo( cl, diff --git a/first-success.go b/first-success.go new file mode 100644 index 00000000..80c93398 --- /dev/null +++ b/first-success.go @@ -0,0 +1,131 @@ +package main + +import ( + "context" + "strconv" + "strings" + + "golang.org/x/sync/errgroup" +) + +// FirstSuccess is a helper for running multiple functions concurrently and returning the first successful result. +// If all functions return an error, all the errors are returned as a ErrorSlice. +func FirstSuccess[T comparable]( + ctx context.Context, + concurrency int, + fns ...JobFunc[T], +) (T, error) { + type result struct { + val T + err error + } + results := make(chan result, len(fns)) + // NOTE: even after the first success, the other goroutines will still run until they finish. + // ctx, cancel := context.WithCancel(ctx) + // defer cancel() + var wg errgroup.Group + if concurrency > 0 { + wg.SetLimit(concurrency) + } + for _, fn := range fns { + fn := fn + wg.Go(func() error { + if ctx.Err() != nil { + var empty T + results <- result{empty, ctx.Err()} // TODO: is this OK? + return nil + } + val, err := fn(ctx) + select { + case results <- result{val, err}: + case <-ctx.Done(): + } + return nil + }) + } + go func() { + wg.Wait() + close(results) + }() + var errs ErrorSlice + for res := range results { + if res.err == nil { + // NOTE: it will count as a success even if the value is the zero value (e.g. 0, "", nil, etc.) + return res.val, nil + } + errs = append(errs, res.err) + if len(errs) == len(fns) { + break + } + } + return *new(T), errs +} + +func IsErrorSlice(err error) bool { + _, ok := err.(ErrorSlice) + return ok +} + +type ErrorSlice []error + +func (e ErrorSlice) Error() string { + // format like; ErrorSlice{"error1", "error2", "error3"} + if len(e) == 0 { + return "ErrorSlice{}" + } + builder := strings.Builder{} + builder.WriteString("ErrorSlice{") + for i, err := range e { + if i > 0 { + builder.WriteString(", ") + } + if err == nil { + builder.WriteString("nil") + continue + } + // write quoted string + builder.WriteString(strconv.Quote(err.Error())) + } + builder.WriteString("}") + return builder.String() +} + +// Filter returns a new slice of errors that satisfy the predicate. +func (e ErrorSlice) Filter(predicate func(error) bool) ErrorSlice { + var errs ErrorSlice + for _, err := range e { + if predicate(err) { + errs = append(errs, err) + } + } + return errs +} + +func (e ErrorSlice) All(predicate func(error) bool) bool { + for _, err := range e { + if !predicate(err) { + return false + } + } + return true +} + +type JobFunc[T comparable] func(context.Context) (T, error) + +func NewJobGroup[T comparable]() *JobGroup[T] { + return &JobGroup[T]{} +} + +type JobGroup[T comparable] []JobFunc[T] + +func (r *JobGroup[T]) Add(fn JobFunc[T]) { + *r = append(*r, fn) +} + +func (r *JobGroup[T]) Run(ctx context.Context) (T, error) { + return FirstSuccess(ctx, -1, *r...) +} + +func (r *JobGroup[T]) RunWithConcurrency(ctx context.Context, concurrency int) (T, error) { + return FirstSuccess(ctx, concurrency, *r...) +} diff --git a/first-success_test.go b/first-success_test.go new file mode 100644 index 00000000..d6d041a1 --- /dev/null +++ b/first-success_test.go @@ -0,0 +1,227 @@ +package main + +import ( + "context" + "errors" + "sync/atomic" + "testing" + "time" +) + +func TestFirstSuccess(t *testing.T) { + { + jobGroup := NewJobGroup[int]() + jobGroup.Add(func(context.Context) (int, error) { + return 1, nil + }) + + val, err := jobGroup.Run(context.Background()) + if err != nil { + t.Errorf("expected no error, got %v", err) + } + if val != 1 { + t.Errorf("expected 1, got %v", val) + } + } + { + jobGroup := NewJobGroup[int]() + jobGroup.Add(func(context.Context) (int, error) { + time.Sleep(1 * time.Second) + return 1, nil + }) + jobGroup.Add(func(context.Context) (int, error) { + return 2, nil + }) + + val, err := jobGroup.Run(context.Background()) + if err != nil { + t.Errorf("expected no error, got %v", err) + } + if val != 2 { + t.Errorf("expected 2, got %v", val) + } + } + { + jobGroup := NewJobGroup[int]() + jobGroup.Add(func(context.Context) (int, error) { + time.Sleep(1 * time.Second) + return 1, nil + }) + jobGroup.Add(func(context.Context) (int, error) { + time.Sleep(2 * time.Second) + return 2, nil + }) + jobGroup.Add(func(context.Context) (int, error) { + return 3, nil + }) + + startedAt := time.Now() + val, err := jobGroup.Run(context.Background()) + if err != nil { + t.Errorf("expected no error, got %v", err) + } + if val != 3 { + t.Errorf("expected 3, got %v", val) + } + if time.Since(startedAt) > time.Second { + t.Errorf("expected less than 1 second, got %v", time.Since(startedAt)) + } + } + { + // all functions fail + jobGroup := NewJobGroup[int]() + jobGroup.Add(func(context.Context) (int, error) { + return -1, errors.New("error 1") + }) + jobGroup.Add(func(context.Context) (int, error) { + return -1, errors.New("error 2") + }) + jobGroup.Add(func(context.Context) (int, error) { + return -1, errors.New("error 3") + }) + + val, err := jobGroup.Run(context.Background()) + if err == nil { + t.Errorf("expected error, got nil") + } + if val != 0 { + t.Errorf("expected 0, got %v", val) + } + if !IsErrorSlice(err) { + t.Errorf("expected error slice, got %v", err) + } + slice := err.(ErrorSlice) + if len(slice) != 3 { + t.Errorf("expected 3 errors, got %v", len(slice)) + } + { + // Cannot know the order of errors + if slice.Filter(func(err error) bool { + return err.Error() == "error 1" + }).Error() != "ErrorSlice{\"error 1\"}" { + t.Errorf("unexpected filtered error message") + } + if slice.Filter(func(err error) bool { + return err.Error() == "error 2" + }).Error() != "ErrorSlice{\"error 2\"}" { + t.Errorf("unexpected filtered error message") + } + if slice.Filter(func(err error) bool { + return err.Error() == "error 3" + }).Error() != "ErrorSlice{\"error 3\"}" { + t.Errorf("unexpected filtered error message") + } + } + } + { + // some functions fail + jobGroup := NewJobGroup[int]() + jobGroup.Add(func(context.Context) (int, error) { + return -1, errors.New("error 1") + }) + jobGroup.Add(func(context.Context) (int, error) { + return 2, nil + }) + jobGroup.Add(func(context.Context) (int, error) { + return -1, errors.New("error 3") + }) + + val, err := jobGroup.Run(context.Background()) + if err != nil { + t.Errorf("expected no error, got %v", err) + } + if val != 2 { + t.Errorf("expected 2, got %v", val) + } + } + { + // two fail, two succeed at different times + jobGroup := NewJobGroup[int]() + jobGroup.Add(func(context.Context) (int, error) { + time.Sleep(time.Millisecond * 100) + return -1, errors.New("error 1") + }) + jobGroup.Add(func(context.Context) (int, error) { + time.Sleep(time.Millisecond * 200) + return -1, errors.New("error 2") + }) + jobGroup.Add(func(context.Context) (int, error) { + time.Sleep(time.Millisecond * 300) + return 3, nil + }) + jobGroup.Add(func(context.Context) (int, error) { + time.Sleep(time.Millisecond * 400) + return 4, nil + }) + + startedAt := time.Now() + val, err := jobGroup.Run(context.Background()) + if err != nil { + t.Errorf("expected no error, got %v", err) + } + if val != 3 { + t.Errorf("expected 3, got %v", val) + } + if time.Since(startedAt) > time.Millisecond*350 { + t.Errorf("expected less than 300ms, got %v", time.Since(startedAt)) + } + } + { + // make sure that even if there is a success, the other functions still get executed + jobGroup := NewJobGroup[int]() + numCalled := new(atomic.Uint64) + jobGroup.Add(func(context.Context) (int, error) { + numCalled.Add(1) + return 1, nil + }) + jobGroup.Add(func(context.Context) (int, error) { + numCalled.Add(1) + return -1, errors.New("error 2") + }) + jobGroup.Add(func(context.Context) (int, error) { + numCalled.Add(1) + time.Sleep(time.Millisecond * 100) + return 123, nil + }) + jobGroup.Add(func(context.Context) (int, error) { + numCalled.Add(1) + time.Sleep(time.Second) + return 123, nil + }) + + startedAt := time.Now() + val, err := jobGroup.Run(context.Background()) + if err != nil { + t.Errorf("expected no error, got %v", err) + } + if val != 1 { + t.Errorf("expected 1, got %v", val) + } + if time.Since(startedAt) > time.Millisecond*100 { + t.Errorf("expected less than 100ms, got %v", time.Since(startedAt)) + } + time.Sleep(time.Second * 2) + if numCalled.Load() != 4 { + t.Errorf("expected 4, got %v", numCalled.Load()) + } + } + { + // try with base valie of int + jobGroup := NewJobGroup[int]() + jobGroup.Add(func(context.Context) (int, error) { + return 0, nil + }) + jobGroup.Add(func(context.Context) (int, error) { + time.Sleep(1 * time.Second) + return 33, nil + }) + + val, err := jobGroup.Run(context.Background()) + if err != nil { + t.Errorf("expected no error, got %v", err) + } + if val != 0 { + t.Errorf("expected 0, got %v", val) + } + } +} diff --git a/first.go b/first.go deleted file mode 100644 index 2ac83797..00000000 --- a/first.go +++ /dev/null @@ -1,82 +0,0 @@ -package main - -import ( - "context" - "errors" - "sync" - "sync/atomic" - - "golang.org/x/sync/errgroup" -) - -// FirstResponse is a helper to get the first non-null result or error from a set of goroutines. -type FirstResponse struct { - result chan any - wg *errgroup.Group - waitWg chan struct{} - resultOnce sync.Once - ctx context.Context - gotResult *atomic.Bool -} - -func NewFirstResponse(ctx context.Context, concurrency int) *FirstResponse { - fr := &FirstResponse{ - result: make(chan any, 1), - waitWg: make(chan struct{}), - gotResult: new(atomic.Bool), - } - fr.wg, ctx = errgroup.WithContext(ctx) - if concurrency > 0 { - fr.wg.SetLimit(concurrency) - } - fr.ctx = ctx - return fr -} - -// Spawn spawns a goroutine that executes the given function. -func (w *FirstResponse) Spawn(f func() (any, error)) (ok bool) { - if w.gotResult.Load() { - return false - } - w.wg.Go(func() error { - result, err := f() - if err != nil { - return w.send(err) // stop the errgroup - } else { - if result != nil { - return w.send(result) // stop the errgroup - } - } - return nil - }) - return true -} - -var errGotFirstResult = errors.New("got first result") - -// send sends the result to the channel, but only once. -// If the result is already sent, it does nothing. -// The result can be something, or an error. -func (w *FirstResponse) send(result any) error { - w.gotResult.Store(true) - w.resultOnce.Do(func() { - w.result <- result - close(w.result) - }) - return errGotFirstResult -} - -// Wait waits for all goroutines to finish, and returns the first non-null result or error. -func (w *FirstResponse) Wait() any { - go func() { - w.wg.Wait() - w.waitWg <- struct{}{} - }() - - select { - case result := <-w.result: - return result - case <-w.waitWg: - return nil - } -} diff --git a/multiepoch-getTransaction.go b/multiepoch-getTransaction.go index 1dae2b71..aa10e2a2 100644 --- a/multiepoch-getTransaction.go +++ b/multiepoch-getTransaction.go @@ -34,6 +34,10 @@ func (multi *MultiEpoch) findEpochNumberFromSignature(ctx context.Context, sig s // - if one epoch, just return that epoch // - if multiple epochs, use sigToEpoch to find the epoch number // - if sigToEpoch is not available, linear search through all epochs + ttok := time.Now() + defer func() { + klog.V(4).Infof("findEpochNumberFromSignature took %s", time.Since(ttok)) + }() if epochs := multi.GetEpochNumbers(); len(epochs) == 1 { return epochs[0], nil @@ -48,45 +52,53 @@ func (multi *MultiEpoch) findEpochNumberFromSignature(ctx context.Context, sig s buckets := multi.getAllBucketteers() // Search all epochs in parallel: - wg := NewFirstResponse(ctx, multi.options.EpochSearchConcurrency) + jobGroup := NewJobGroup[uint64]() for i := range numbers { epochNumber := numbers[i] - wg.Spawn(func() (any, error) { + jobGroup.Add(func(ctx context.Context) (uint64, error) { + if ctx.Err() != nil { + return 0, ctx.Err() + } bucket, ok := buckets[epochNumber] if !ok { - return nil, nil + return 0, ErrNotFound } has, err := bucket.Has(sig) if err != nil { return 0, fmt.Errorf("failed to check if signature exists in bucket: %w", err) } if !has { - return nil, nil + return 0, ErrNotFound } epoch, err := multi.GetEpoch(epochNumber) if err != nil { - return nil, fmt.Errorf("failed to get epoch %d: %w", epochNumber, err) + return 0, fmt.Errorf("failed to get epoch %d: %w", epochNumber, err) } if _, err := epoch.FindCidFromSignature(ctx, sig); err == nil { return epochNumber, nil } // Not found in this epoch. - return nil, nil + return 0, ErrNotFound }) } - switch result := wg.Wait().(type) { - case nil: + val, err := jobGroup.RunWithConcurrency(ctx, multi.options.EpochSearchConcurrency) + // val, err := jobGroup.RunWithConcurrency(ctx, multi.options.EpochSearchConcurrency) + if err != nil { + errs, ok := err.(ErrorSlice) + if !ok { + // An error occurred while searching one of the epochs. + return 0, err + } // All epochs were searched, but the signature was not found. - return 0, ErrNotFound - case error: - // An error occurred while searching one of the epochs. - return 0, result - case uint64: - // The signature was found in one of the epochs. - return result, nil - default: - return 0, fmt.Errorf("unexpected result: (%T) %v", result, result) + if errs.All(func(err error) bool { + return errors.Is(err, ErrNotFound) + }) { + return 0, ErrNotFound + } + return 0, err } + // The signature was found in one of the epochs. + return val, nil } func (multi *MultiEpoch) handleGetTransaction(ctx context.Context, conn *requestContext, req *jsonrpc2.Request) (*jsonrpc2.Error, error) { diff --git a/multiepoch.go b/multiepoch.go index 3f52040b..1a3a27a7 100644 --- a/multiepoch.go +++ b/multiepoch.go @@ -301,6 +301,13 @@ func newMultiEpochHandler(handler *MultiEpoch, lsConf *ListenerConfig) func(ctx return } } + { + // handle the /api/v1/* endpoint + if strings.HasPrefix(string(reqCtx.Path()), "/api/v1/") { + handler.apiHandler(reqCtx) + return + } + } } { // make sure the method is POST