Skip to content

Commit

Permalink
Merge remote-tracking branch 'gagliardetto/multiepoch' into multiepoch
Browse files Browse the repository at this point in the history
  • Loading branch information
linuskendall committed Aug 23, 2023
2 parents 186a49b + 1a12294 commit 388aaa0
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 12 deletions.
4 changes: 4 additions & 0 deletions cmd-rpc-server-car-getTransaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,10 @@ func adaptTransactionMetaToExpectedOutput(m map[string]any) map[string]any {
if !ok {
continue
}
// If doesn't have `index`, then set it to 0
if _, ok := innerInstruction["index"]; !ok {
innerInstruction["index"] = 0
}
instructionsAny, ok := innerInstruction["instructions"]
if !ok {
continue
Expand Down
9 changes: 8 additions & 1 deletion http-range.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"path/filepath"
"strings"
"time"

"github.com/goware/urlx"
)

type ReaderAtCloser interface {
Expand Down Expand Up @@ -71,10 +73,15 @@ func remoteHTTPFileAsIoReaderAt(ctx context.Context, url string) (ReaderAtCloser
contentLength: contentLength,
client: newHTTPClient(),
}
parsedURL, err := urlx.Parse(url)
if err != nil {
return nil, err
}
name := filepath.Base(parsedURL.Path)

rc := NewRangeCache(
contentLength,
filepath.Base(url),
name,
func(p []byte, off int64) (n int, err error) {
return remoteReadAt(rr.client, rr.url, p, off)
})
Expand Down
42 changes: 35 additions & 7 deletions multiepoch-getBlock.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,16 +125,25 @@ func (multi *MultiEpoch) handleGetBlock(ctx context.Context, conn *requestContex
}

idealEntrySize := uint64(36190)
maybeOffsetOfLastEntry := parentOffset - idealEntrySize
length += idealEntrySize
var start uint64
if parentIsInPreviousEpoch {
start = parentOffset
} else {
if parentOffset > idealEntrySize {
start = parentOffset - idealEntrySize
} else {
start = parentOffset
}
length += idealEntrySize
}

klog.Infof("prefetching CAR: start=%d length=%d (parent_offset=%d)", maybeOffsetOfLastEntry, length, parentOffset)
carSection, err := epochHandler.ReadAtFromCar(ctx, maybeOffsetOfLastEntry, length)
klog.Infof("prefetching CAR: start=%d length=%d (parent_offset=%d)", start, length, parentOffset)
carSection, err := epochHandler.ReadAtFromCar(ctx, start, length)
if err != nil {
return err
}
dr := bytes.NewReader(carSection)
{
if !parentIsInPreviousEpoch {
dr.Seek(int64(idealEntrySize), io.SeekStart)
}
br := bufio.NewReader(dr)
Expand All @@ -154,7 +163,7 @@ func (multi *MultiEpoch) handleGetBlock(ctx context.Context, conn *requestContex
if errors.Is(err, io.EOF) {
break
}
return err
return fmt.Errorf("failed to read node: %w", err)
}
if gotCid.Equals(blockCid) {
break
Expand Down Expand Up @@ -285,7 +294,17 @@ func (multi *MultiEpoch) handleGetBlock(ctx context.Context, conn *requestContex
rewardsAsArray := m["rewards"].([]any)
for _, reward := range rewardsAsArray {
rewardAsMap := reward.(map[string]any)
rewardAsMap["commission"] = nil
if _, ok := rewardAsMap["commission"]; !ok {
rewardAsMap["commission"] = nil
}
// if the commission field is a string, convert it to a float
if asString, ok := rewardAsMap["commission"].(string); ok {
rewardAsMap["commission"] = asFloat(asString)
}
// if no lamports field, add it and set it to 0
if _, ok := rewardAsMap["lamports"]; !ok {
rewardAsMap["lamports"] = uint64(0)
}

// if it has a post_balance field, convert it to postBalance
if _, ok := rewardAsMap["post_balance"]; ok {
Expand Down Expand Up @@ -433,6 +452,15 @@ func (multi *MultiEpoch) handleGetBlock(ctx context.Context, conn *requestContex
return nil, nil
}

func asFloat(s string) float64 {
var f float64
_, err := fmt.Sscanf(s, "%f", &f)
if err != nil {
panic(err)
}
return f
}

func mergeTxNodeSlices(slices [][]*ipldbindcode.Transaction) []*ipldbindcode.Transaction {
var out []*ipldbindcode.Transaction
for _, slice := range slices {
Expand Down
9 changes: 6 additions & 3 deletions range-cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,12 @@ func TestCache(t *testing.T) {
v := []byte("hello")
full := append(v, []byte(" world")...)
rd := bytes.NewReader(full)
rc := NewRangeCache(int64(len(full)), func(p []byte, off int64) (n int, err error) {
return rd.ReadAt(p, off)
})
rc := NewRangeCache(
int64(len(full)),
"test",
func(p []byte, off int64) (n int, err error) {
return rd.ReadAt(p, off)
})

{
{
Expand Down
2 changes: 1 addition & 1 deletion solana-block-rewards/rewards.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package solanablockrewards

import (
"github.com/golang/protobuf/proto"
"github.com/rpcpool/yellowstone-faithful/third_party/solana_proto/confirmed_block"
"google.golang.org/protobuf/proto"
)

func ParseRewards(buf []byte) (*confirmed_block.Rewards, error) {
Expand Down

0 comments on commit 388aaa0

Please sign in to comment.