Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cid API #118

Merged
merged 5 commits into from
Jul 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 97 additions & 0 deletions api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package main

import (
"context"
"errors"
"strconv"
"strings"

"github.com/gagliardetto/solana-go"
"github.com/rpcpool/yellowstone-faithful/compactindexsized"
"github.com/valyala/fasthttp"
)

func (multi *MultiEpoch) apiHandler(reqCtx *fasthttp.RequestCtx) {
if !reqCtx.IsGet() {
reqCtx.SetStatusCode(fasthttp.StatusMethodNotAllowed)
return
}
// Add a CLI command that takes a config file (or a command line argument) pointing at slot-to-cid and sig-to-cid and looks up the CID for a given block or transaction.
// slot-to-cid API endpoint: /api/v1/slot-to-cid/{slot}
// sig-to-cid API endpoint: /api/v1/sig-to-cid/{sig}
// The API should return the CID as a string.
// The API should return a 404 if the slot or sig is not found.
// The API should return a 500 if there is an internal error.
// The API should return a 400 if the slot or sig is invalid.
// The API should return a 200 if the CID is found.

if strings.HasPrefix(string(reqCtx.Path()), "/api/v1/slot-to-cid/") {
slotStr := string(reqCtx.Path())[len("/api/v1/slot-to-cid/"):]
slotStr = strings.TrimRight(slotStr, "/")
// try to parse the slot as uint64
slot, err := strconv.ParseUint(slotStr, 10, 64)
if err != nil {
reqCtx.SetStatusCode(fasthttp.StatusBadRequest)
return
}
// find the epoch that contains the requested slot
epochNumber := CalcEpochForSlot(slot)
epochHandler, err := multi.GetEpoch(epochNumber)
if err != nil {
reqCtx.SetStatusCode(fasthttp.StatusNotFound) // TODO: this means epoch is not available, and probably should be another dedicated status code
return
}

blockCid, err := epochHandler.FindCidFromSlot(context.TODO(), slot)
if err != nil {
if errors.Is(err, compactindexsized.ErrNotFound) {
reqCtx.SetStatusCode(fasthttp.StatusNotFound)
} else {
reqCtx.SetStatusCode(fasthttp.StatusInternalServerError)
}
return
}
reqCtx.SetStatusCode(fasthttp.StatusOK)
reqCtx.SetBodyString(blockCid.String())
return
}
if strings.HasPrefix(string(reqCtx.Path()), "/api/v1/sig-to-cid/") {
sigStr := string(reqCtx.Path())[len("/api/v1/sig-to-cid/"):]
sigStr = strings.TrimRight(sigStr, "/")
// parse the signature
sig, err := solana.SignatureFromBase58(sigStr)
if err != nil {
reqCtx.SetStatusCode(fasthttp.StatusBadRequest)
return
}
epochNumber, err := multi.findEpochNumberFromSignature(context.TODO(), sig)
if err != nil {
if errors.Is(err, ErrNotFound) {
reqCtx.SetStatusCode(fasthttp.StatusNotFound)
} else {
reqCtx.SetStatusCode(fasthttp.StatusInternalServerError)
}
return
}

epochHandler, err := multi.GetEpoch(uint64(epochNumber))
if err != nil {
reqCtx.SetStatusCode(fasthttp.StatusNotFound) // TODO: this means epoch is not available, and probably should be another dedicated status code
return
}

transactionCid, err := epochHandler.FindCidFromSignature(context.TODO(), sig)
if err != nil {
if errors.Is(err, compactindexsized.ErrNotFound) {
reqCtx.SetStatusCode(fasthttp.StatusNotFound)
} else {
reqCtx.SetStatusCode(fasthttp.StatusInternalServerError)
}
return
}
reqCtx.SetStatusCode(fasthttp.StatusOK)
reqCtx.SetBodyString(transactionCid.String())
return
}
reqCtx.SetStatusCode(fasthttp.StatusNotFound)
}
10 changes: 9 additions & 1 deletion cmd-check-deals.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,13 @@ func (f *commaSeparatedStringSliceFlag) Len() int {
return len(f.slice)
}

const defaultLotusAPIAddress = "https://api.node.glif.io"

func newCmd_check_deals() *cli.Command {
var includePatterns cli.StringSlice
var excludePatterns cli.StringSlice
var providerAllowlist commaSeparatedStringSliceFlag
var lotusAPIAddress string
return &cli.Command{
Name: "check-deals",
Description: "Validate remote split car retrieval for the given config files",
Expand All @@ -79,6 +82,12 @@ func newCmd_check_deals() *cli.Command {
Usage: "List of providers to allow checking (comma-separated, can be specified multiple times); will ignore all pieces that correspond to a provider not in the allowlist.",
Value: &providerAllowlist,
},
&cli.StringFlag{
Name: "filecoin-api-address",
Usage: "Address of the filecoin API to find provider info",
Value: defaultLotusAPIAddress,
Destination: &lotusAPIAddress,
},
},
Action: func(c *cli.Context) error {
src := c.Args().Slice()
Expand Down Expand Up @@ -116,7 +125,6 @@ func newCmd_check_deals() *cli.Command {
klog.Infof("Provider allowlist: <empty>")
}

lotusAPIAddress := "https://api.node.glif.io"
cl := jsonrpc.NewClient(lotusAPIAddress)
dm := splitcarfetcher.NewMinerInfo(
cl,
Expand Down
8 changes: 7 additions & 1 deletion cmd-rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func newCmd_rpc() *cli.Command {
var epochLoadConcurrency int
var maxCacheSizeMB int
var grpcListenOn string
var lotusAPIAddress string
return &cli.Command{
Name: "rpc",
Usage: "Start a Solana JSON RPC server.",
Expand Down Expand Up @@ -102,6 +103,12 @@ func newCmd_rpc() *cli.Command {
Value: 0,
Destination: &maxCacheSizeMB,
},
&cli.StringFlag{
Name: "filecoin-api-address",
Usage: "Address of the filecoin API to find provider info",
Value: defaultLotusAPIAddress,
Destination: &lotusAPIAddress,
},
),
Action: func(c *cli.Context) error {
if listenOn == "" && grpcListenOn == "" {
Expand Down Expand Up @@ -145,7 +152,6 @@ func newCmd_rpc() *cli.Command {
klog.Infof("Loaded %d epoch configs", len(configs))
klog.Info("Initializing epochs async...")

lotusAPIAddress := "https://api.node.glif.io"
cl := jsonrpc.NewClient(lotusAPIAddress)
minerInfo := splitcarfetcher.NewMinerInfo(
cl,
Expand Down
131 changes: 131 additions & 0 deletions first-success.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package main

import (
"context"
"strconv"
"strings"

"golang.org/x/sync/errgroup"
)

// FirstSuccess is a helper for running multiple functions concurrently and returning the first successful result.
// If all functions return an error, all the errors are returned as a ErrorSlice.
func FirstSuccess[T comparable](
ctx context.Context,
concurrency int,
fns ...JobFunc[T],
) (T, error) {
type result struct {
val T
err error
}
results := make(chan result, len(fns))
// NOTE: even after the first success, the other goroutines will still run until they finish.
// ctx, cancel := context.WithCancel(ctx)
// defer cancel()
var wg errgroup.Group
if concurrency > 0 {
wg.SetLimit(concurrency)
}
for _, fn := range fns {
fn := fn
wg.Go(func() error {
if ctx.Err() != nil {
var empty T
results <- result{empty, ctx.Err()} // TODO: is this OK?
return nil
}
val, err := fn(ctx)
select {
case results <- result{val, err}:
case <-ctx.Done():
}
return nil
})
}
go func() {
wg.Wait()
close(results)
}()
var errs ErrorSlice
for res := range results {
if res.err == nil {
// NOTE: it will count as a success even if the value is the zero value (e.g. 0, "", nil, etc.)
return res.val, nil
}
errs = append(errs, res.err)
if len(errs) == len(fns) {
break
}
}
return *new(T), errs
}

func IsErrorSlice(err error) bool {
_, ok := err.(ErrorSlice)
return ok
}

type ErrorSlice []error

func (e ErrorSlice) Error() string {
// format like; ErrorSlice{"error1", "error2", "error3"}
if len(e) == 0 {
return "ErrorSlice{}"
}
builder := strings.Builder{}
builder.WriteString("ErrorSlice{")
for i, err := range e {
if i > 0 {
builder.WriteString(", ")
}
if err == nil {
builder.WriteString("nil")
continue
}
// write quoted string
builder.WriteString(strconv.Quote(err.Error()))
}
builder.WriteString("}")
return builder.String()
}

// Filter returns a new slice of errors that satisfy the predicate.
func (e ErrorSlice) Filter(predicate func(error) bool) ErrorSlice {
var errs ErrorSlice
for _, err := range e {
if predicate(err) {
errs = append(errs, err)
}
}
return errs
}

func (e ErrorSlice) All(predicate func(error) bool) bool {
for _, err := range e {
if !predicate(err) {
return false
}
}
return true
}

type JobFunc[T comparable] func(context.Context) (T, error)

func NewJobGroup[T comparable]() *JobGroup[T] {
return &JobGroup[T]{}
}

type JobGroup[T comparable] []JobFunc[T]

func (r *JobGroup[T]) Add(fn JobFunc[T]) {
*r = append(*r, fn)
}

func (r *JobGroup[T]) Run(ctx context.Context) (T, error) {
return FirstSuccess(ctx, -1, *r...)
}

func (r *JobGroup[T]) RunWithConcurrency(ctx context.Context, concurrency int) (T, error) {
return FirstSuccess(ctx, concurrency, *r...)
}
Loading
Loading