Skip to content

Commit

Permalink
Merge pull request #9 from gagliardetto/cli-v0
Browse files Browse the repository at this point in the history
Add support for remote indexes, cars, and fetching from filecoin
  • Loading branch information
gagliardetto authored Jun 26, 2023
2 parents ce2599f + 987c5c3 commit ce34d0c
Show file tree
Hide file tree
Showing 15 changed files with 940 additions and 283 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ compile-mac:
GOOS=darwin GOARCH=arm64 go build -ldflags=$(LD_FLAGS) -o ./bin/darwin/arm64/faithful-cli_darwin_arm64 .
compile-windows:
@echo "\nCompiling faithful-cli binary for windows amd64 ..."
GOOS=windows GOARCH=amd64 go build -ldflags=$(LD_FLAGS) -o ./bin/windows-amd64/faithful-cli_windows_amd64
GOOS=windows GOARCH=amd64 go build -ldflags=$(LD_FLAGS) -o ./bin/windows/amd64/faithful-cli_windows_amd64.exe .
test:
go test -v ./...
bindcode: install-deps
Expand Down
148 changes: 75 additions & 73 deletions cmd-fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,84 +46,86 @@ import (

var fetchProviderAddrInfos []peer.AddrInfo

var lassieFetchFlags = []cli.Flag{
&cli.StringFlag{
Name: "output",
Aliases: []string{"o"},
Usage: "the CAR file to write to, may be an existing or a new CAR, or use '-' to write to stdout",
TakesFile: true,
},
&cli.DurationFlag{
Name: "provider-timeout",
Aliases: []string{"pt"},
Usage: "consider it an error after not receiving a response from a storage provider after this amount of time",
Value: 20 * time.Second,
},
&cli.DurationFlag{
Name: "global-timeout",
Aliases: []string{"gt"},
Usage: "consider it an error after not completing the retrieval after this amount of time",
Value: 0,
},
&cli.BoolFlag{
Name: "progress",
Aliases: []string{"p"},
Usage: "print progress output",
},
&cli.StringFlag{
Name: "dag-scope",
Usage: "describes the fetch behavior at the end of the traversal path. Valid values include [all, entity, block].",
DefaultText: "defaults to all, the entire DAG at the end of the path will be fetched",
Value: "all",
Action: func(cctx *cli.Context, v string) error {
switch v {
case string(types.DagScopeAll):
case string(types.DagScopeEntity):
case string(types.DagScopeBlock):
default:
return fmt.Errorf("invalid dag-scope parameter, must be of value [all, entity, block]")
}

return nil
},
},
&cli.StringFlag{
Name: "providers",
Aliases: []string{"provider"},
DefaultText: "Providers will be discovered automatically",
Usage: "Addresses of providers, including peer IDs, to use instead of automatic discovery, seperated by a comma. All protocols will be attempted when connecting to these providers. Example: /ip4/1.2.3.4/tcp/1234/p2p/12D3KooWBSTEYMLSu5FnQjshEVah9LFGEZoQt26eacCEVYfedWA4",
Action: func(cctx *cli.Context, v string) error {
// Do nothing if given an empty string
if v == "" {
return nil
}

var err error
fetchProviderAddrInfos, err = types.ParseProviderStrings(v)
return err
},
},
&cli.StringFlag{
Name: "ipni-endpoint",
Aliases: []string{"ipni"},
DefaultText: "Defaults to https://cid.contact",
Usage: "HTTP endpoint of the IPNI instance used to discover providers.",
},
FlagEventRecorderAuth,
FlagEventRecorderInstanceId,
FlagEventRecorderUrl,
FlagVerbose,
FlagVeryVerbose,
FlagProtocols,
FlagExcludeProviders,
FlagTempDir,
FlagBitswapConcurrency,
}

var fetchCmd = &cli.Command{
Name: "fetch",
Usage: "Fetches content from the IPFS and Filecoin network",
Before: before,
Action: Fetch,
Flags: []cli.Flag{
&cli.StringFlag{
Name: "output",
Aliases: []string{"o"},
Usage: "the CAR file to write to, may be an existing or a new CAR, or use '-' to write to stdout",
TakesFile: true,
},
&cli.DurationFlag{
Name: "provider-timeout",
Aliases: []string{"pt"},
Usage: "consider it an error after not receiving a response from a storage provider after this amount of time",
Value: 20 * time.Second,
},
&cli.DurationFlag{
Name: "global-timeout",
Aliases: []string{"gt"},
Usage: "consider it an error after not completing the retrieval after this amount of time",
Value: 0,
},
&cli.BoolFlag{
Name: "progress",
Aliases: []string{"p"},
Usage: "print progress output",
},
&cli.StringFlag{
Name: "dag-scope",
Usage: "describes the fetch behavior at the end of the traversal path. Valid values include [all, entity, block].",
DefaultText: "defaults to all, the entire DAG at the end of the path will be fetched",
Value: "all",
Action: func(cctx *cli.Context, v string) error {
switch v {
case string(types.DagScopeAll):
case string(types.DagScopeEntity):
case string(types.DagScopeBlock):
default:
return fmt.Errorf("invalid dag-scope parameter, must be of value [all, entity, block]")
}

return nil
},
},
&cli.StringFlag{
Name: "providers",
Aliases: []string{"provider"},
DefaultText: "Providers will be discovered automatically",
Usage: "Addresses of providers, including peer IDs, to use instead of automatic discovery, seperated by a comma. All protocols will be attempted when connecting to these providers. Example: /ip4/1.2.3.4/tcp/1234/p2p/12D3KooWBSTEYMLSu5FnQjshEVah9LFGEZoQt26eacCEVYfedWA4",
Action: func(cctx *cli.Context, v string) error {
// Do nothing if given an empty string
if v == "" {
return nil
}

var err error
fetchProviderAddrInfos, err = types.ParseProviderStrings(v)
return err
},
},
&cli.StringFlag{
Name: "ipni-endpoint",
Aliases: []string{"ipni"},
DefaultText: "Defaults to https://cid.contact",
Usage: "HTTP endpoint of the IPNI instance used to discover providers.",
},
FlagEventRecorderAuth,
FlagEventRecorderInstanceId,
FlagEventRecorderUrl,
FlagVerbose,
FlagVeryVerbose,
FlagProtocols,
FlagExcludeProviders,
FlagTempDir,
FlagBitswapConcurrency,
},
Flags: lassieFetchFlags,
}

func Fetch(cctx *cli.Context) error {
Expand Down
104 changes: 70 additions & 34 deletions cmd-rpc-server-car-getBlock.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package main

import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"runtime"
"sync"
"time"

"github.com/gagliardetto/solana-go"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
Expand Down Expand Up @@ -42,7 +42,26 @@ func (e *InternalError) As(target interface{}) bool {
return errors.As(e.Err, target)
}

type timer struct {
start time.Time
prev time.Time
}

func newTimer() *timer {
now := time.Now()
return &timer{
start: now,
prev: now,
}
}

func (t *timer) time(name string) {
klog.V(2).Infof("TIMED: %s: %s (overall %s)", name, time.Since(t.prev), time.Since(t.start))
t.prev = time.Now()
}

func (ser *rpcServer) getBlock(ctx context.Context, conn *requestContext, req *jsonrpc2.Request) {
tim := newTimer()
params, err := parseGetBlockRequest(req.Params)
if err != nil {
klog.Errorf("failed to parse params: %v", err)
Expand All @@ -55,6 +74,7 @@ func (ser *rpcServer) getBlock(ctx context.Context, conn *requestContext, req *j
})
return
}
tim.time("parseGetBlockRequest")
slot := params.Slot

block, err := ser.GetBlock(ctx, slot)
Expand All @@ -69,14 +89,15 @@ func (ser *rpcServer) getBlock(ctx context.Context, conn *requestContext, req *j
})
return
}
tim.time("GetBlock")
blocktime := uint64(block.Meta.Blocktime)

allTransactionNodes := make([]*ipldbindcode.Transaction, 0)
mu := &sync.Mutex{}
var lastEntryHash solana.Hash
{
wg := new(errgroup.Group)
wg.SetLimit(runtime.NumCPU())
wg.SetLimit(runtime.NumCPU() * 2)
// get entries from the block
for entryIndex, entry := range block.Entries {
entryIndex := entryIndex
Expand All @@ -93,19 +114,27 @@ func (ser *rpcServer) getBlock(ctx context.Context, conn *requestContext, req *j
lastEntryHash = solana.HashFromBytes(entryNode.Hash)
}

twg := new(errgroup.Group)
twg.SetLimit(runtime.NumCPU())
// get the transactions from the entry
for _, tx := range entryNode.Transactions {
// get the transaction by CID
txNode, err := ser.GetTransactionByCid(ctx, tx.(cidlink.Link).Cid)
if err != nil {
klog.Errorf("failed to decode Transaction: %v", err)
continue
}
mu.Lock()
allTransactionNodes = append(allTransactionNodes, txNode)
mu.Unlock()
for txI := range entryNode.Transactions {
txI := txI
tx := entryNode.Transactions[txI]
twg.Go(func() error {
// get the transaction by CID
tcid := tx.(cidlink.Link).Cid
txNode, err := ser.GetTransactionByCid(ctx, tcid)
if err != nil {
klog.Errorf("failed to decode Transaction %s: %v", tcid, err)
return nil
}
mu.Lock()
allTransactionNodes = append(allTransactionNodes, txNode)
mu.Unlock()
return nil
})
}
return nil
return twg.Wait()
})
}
err = wg.Wait()
Expand All @@ -121,10 +150,12 @@ func (ser *rpcServer) getBlock(ctx context.Context, conn *requestContext, req *j
return
}
}
tim.time("get entries")

var allTransactions []GetTransactionResponse
var rewards any // TODO: implement rewards as in solana
if !block.Rewards.(cidlink.Link).Cid.Equals(DummyCID) {
hasRewards := !block.Rewards.(cidlink.Link).Cid.Equals(DummyCID)
if hasRewards {
rewardsNode, err := ser.GetRewardsByCid(ctx, block.Rewards.(cidlink.Link).Cid)
if err != nil {
klog.Errorf("failed to decode Rewards: %v", err)
Expand All @@ -137,29 +168,30 @@ func (ser *rpcServer) getBlock(ctx context.Context, conn *requestContext, req *j
})
return
}
buf := new(bytes.Buffer)
buf.Write(rewardsNode.Data.Data)
if rewardsNode.Data.Total > 1 {
for _, _cid := range rewardsNode.Data.Next {
nextNode, err := ser.GetDataFrameByCid(ctx, _cid.(cidlink.Link).Cid)
if err != nil {
klog.Errorf("failed to decode Rewards: %v", err)
conn.ReplyWithError(
ctx,
req.ID,
&jsonrpc2.Error{
Code: jsonrpc2.CodeInternalError,
Message: "Internal error",
})
return
}
buf.Write(nextNode.Data)
}
rewardsBuf, err := loadDataFromDataFrames(&rewardsNode.Data, ser.GetDataFrameByCid)
if err != nil {
klog.Errorf("failed to load Rewards dataFrames: %v", err)
conn.ReplyWithError(
ctx,
req.ID,
&jsonrpc2.Error{
Code: jsonrpc2.CodeInternalError,
Message: "Internal error",
})
return
}

uncompressedRewards, err := decompressZstd(buf.Bytes())
uncompressedRewards, err := decompressZstd(rewardsBuf)
if err != nil {
panic(err)
klog.Errorf("failed to decompress Rewards: %v", err)
conn.ReplyWithError(
ctx,
req.ID,
&jsonrpc2.Error{
Code: jsonrpc2.CodeInternalError,
Message: "Internal error",
})
return
}
// try decoding as protobuf
actualRewards, err := solanablockrewards.ParseRewards(uncompressedRewards)
Expand Down Expand Up @@ -224,6 +256,7 @@ func (ser *rpcServer) getBlock(ctx context.Context, conn *requestContext, req *j
}
}
}
tim.time("get rewards")
{
for _, transactionNode := range allTransactionNodes {
var txResp GetTransactionResponse
Expand Down Expand Up @@ -272,6 +305,7 @@ func (ser *rpcServer) getBlock(ctx context.Context, conn *requestContext, req *j
allTransactions = append(allTransactions, txResp)
}
}
tim.time("get transactions")
var blockResp GetBlockResponse
blockResp.Transactions = allTransactions
blockResp.BlockTime = &blocktime
Expand Down Expand Up @@ -313,6 +347,7 @@ func (ser *rpcServer) getBlock(ctx context.Context, conn *requestContext, req *j
blockResp.PreviousBlockhash = parentEntryHash.String()
}
}
tim.time("get parent block")

// TODO: get all the transactions from the block
// reply with the data
Expand All @@ -324,6 +359,7 @@ func (ser *rpcServer) getBlock(ctx context.Context, conn *requestContext, req *j
return m
},
)
tim.time("reply")
if err != nil {
klog.Errorf("failed to reply: %v", err)
}
Expand Down
Loading

0 comments on commit ce34d0c

Please sign in to comment.