Skip to content

Commit

Permalink
Merge pull request #11 from gagliardetto/improv-dataframes
Browse files Browse the repository at this point in the history
Improve dataframes (depends on #9)
  • Loading branch information
gagliardetto authored Jun 26, 2023
2 parents ce34d0c + 4d6658b commit f91feda
Show file tree
Hide file tree
Showing 11 changed files with 447 additions and 138 deletions.
40 changes: 31 additions & 9 deletions cmd-dump-car.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
bin "github.com/gagliardetto/binary"
"github.com/gagliardetto/solana-go"
"github.com/ipld/go-car"
"github.com/rpcpool/yellowstone-faithful/ipld/ipldbindcode"
"github.com/rpcpool/yellowstone-faithful/iplddecoders"
solanablockrewards "github.com/rpcpool/yellowstone-faithful/solana-block-rewards"
solanatxmetaparsers "github.com/rpcpool/yellowstone-faithful/solana-tx-meta-parsers"
Expand Down Expand Up @@ -158,8 +159,17 @@ func newCmd_DumpCar() *cli.Command {
panic(err)
}
{
if decoded.Data.Total == 1 {
completeData := decoded.Data.Data
if total, ok := decoded.Data.GetTotal(); !ok || total == 1 {
completeData := decoded.Data.Bytes()
{
// verify hash (if present)
if ha, ok := decoded.Data.GetHash(); ok {
err := ipldbindcode.VerifyHash(completeData, ha)
if err != nil {
panic(err)
}
}
}
var tx solana.Transaction
if err := bin.UnmarshalBin(&tx, completeData); err != nil {
panic(err)
Expand All @@ -176,11 +186,17 @@ func newCmd_DumpCar() *cli.Command {
}
} else {
if doPrint {
fmt.Println("transaction data is split into multiple blocks; skipping printing")
fmt.Println("transaction data is split into multiple objects; skipping printing")
}
}
if decoded.Metadata.Total == 1 {
completeBuffer := decoded.Metadata.Data
if total, ok := decoded.Metadata.GetTotal(); !ok || total == 1 {
completeBuffer := decoded.Metadata.Bytes()
if ha, ok := decoded.Metadata.GetHash(); ok {
err := ipldbindcode.VerifyHash(completeBuffer, ha)
if err != nil {
panic(err)
}
}
if len(completeBuffer) > 0 {
uncompressedMeta, err := decompressZstd(completeBuffer)
if err != nil {
Expand All @@ -196,7 +212,7 @@ func newCmd_DumpCar() *cli.Command {
}
} else {
if doPrint {
fmt.Println("transaction metadata is split into multiple blocks; skipping printing")
fmt.Println("transaction metadata is split into multiple objects; skipping printing")
}
}
}
Expand Down Expand Up @@ -245,8 +261,14 @@ func newCmd_DumpCar() *cli.Command {
spew.Dump(decoded)
numNodesPrinted++

if decoded.Data.Total == 1 {
completeBuffer := decoded.Data.Data
if total, ok := decoded.Data.GetTotal(); !ok || total == 1 {
completeBuffer := decoded.Data.Bytes()
if ha, ok := decoded.Data.GetHash(); ok {
err := ipldbindcode.VerifyHash(completeBuffer, ha)
if err != nil {
panic(err)
}
}
if len(completeBuffer) > 0 {
uncompressedRewards, err := decompressZstd(completeBuffer)
if err != nil {
Expand All @@ -262,7 +284,7 @@ func newCmd_DumpCar() *cli.Command {
}
}
} else {
fmt.Println("rewards data is split into multiple blocks; skipping printing")
fmt.Println("rewards data is split into multiple objects; skipping printing")
}
}
case iplddecoders.KindDataFrame:
Expand Down
38 changes: 14 additions & 24 deletions cmd-rpc-server-car.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ import (
"encoding/json"
"errors"
"fmt"
"hash/crc64"
"hash/fnv"
"io"
"net/http"
"os"
Expand Down Expand Up @@ -726,14 +724,16 @@ func loadDataFromDataFrames(
return nil, err
}
for _, frame := range allFrames {
dataBuffer.Write(frame.Data)
dataBuffer.Write(frame.Bytes())
}
// verify the data hash
if checksumCrc64(dataBuffer.Bytes()) != uint64(firstDataFrame.Hash) {
// Maybe it's the legacy checksum function?
if checksumFnv(dataBuffer.Bytes()) != uint64(firstDataFrame.Hash) {
return nil, fmt.Errorf("data hash mismatch")
}
// verify the data hash (if present)
bufHash, ok := firstDataFrame.GetHash()
if !ok {
return dataBuffer.Bytes(), nil
}
err = ipldbindcode.VerifyHash(dataBuffer.Bytes(), bufHash)
if err != nil {
return nil, err
}
return dataBuffer.Bytes(), nil
}
Expand All @@ -744,7 +744,11 @@ func getAllFramesFromDataFrame(
) ([]*ipldbindcode.DataFrame, error) {
frames := []*ipldbindcode.DataFrame{firstDataFrame}
// get the next data frames
for _, cid := range firstDataFrame.Next {
next, ok := firstDataFrame.GetNext()
if !ok || len(next) == 0 {
return frames, nil
}
for _, cid := range next {
nextDataFrame, err := dataFrameGetter(context.Background(), cid.(cidlink.Link).Cid)
if err != nil {
return nil, err
Expand All @@ -758,20 +762,6 @@ func getAllFramesFromDataFrame(
return frames, nil
}

// checksumFnv is the legacy checksum function, used in the first version of the radiance
// car creator. Some old cars still use this function.
func checksumFnv(data []byte) uint64 {
h := fnv.New64a()
h.Write(data)
return h.Sum64()
}

// checksumCrc64 returns the hash of the provided buffer.
// It is used in the latest version of the radiance car creator.
func checksumCrc64(buf []byte) uint64 {
return crc64.Checksum(buf, crc64.MakeTable(crc64.ISO))
}

func parseTransactionAndMetaFromNode(
transactionNode *ipldbindcode.Transaction,
dataFrameGetter func(ctx context.Context, wantedCid cid.Cid) (*ipldbindcode.DataFrame, error),
Expand Down
10 changes: 6 additions & 4 deletions cmd-x-index-all.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,9 +233,10 @@ func createAllIndexes(

var tx solana.Transaction
txBuffer := new(bytes.Buffer)
txBuffer.Write(txNode.Data.Data)
if txNode.Data.Total > 1 {
txBuffer.Write(txNode.Data.Bytes())
if total, ok := txNode.Data.GetTotal(); ok && total > 1 {
// TODO: handle this case
klog.Infof("skipping transaction with %d partials", total)
continue
}
if err := bin.UnmarshalBin(&tx, txBuffer.Bytes()); err != nil {
Expand Down Expand Up @@ -617,9 +618,10 @@ func verifyAllIndexes(

var tx solana.Transaction
txBuffer := new(bytes.Buffer)
txBuffer.Write(txNode.Data.Data)
if txNode.Data.Total > 1 {
txBuffer.Write(txNode.Data.Bytes())
if total, ok := txNode.Data.GetTotal(); ok && total > 1 {
// TODO: handle this case
klog.Infof("skipping transaction with %d partials", total)
continue
}
if err := bin.UnmarshalBin(&tx, txBuffer.Bytes()); err != nil {
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -684,6 +684,7 @@ github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaR
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 h1:onHthvaw9LFnH4t2DcNVpwGmV9E1BkGknEliJkfwQj0=
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58/go.mod h1:DXv8WO4yhMYhSNPKjeNKa5WY9YCIEBRbNzFFPJbWO6Y=
github.com/pelletier/go-toml v1.2.0 h1:T5zMGML61Wp+FlcbWjRDT7yAxhJNAiPPLOFECq181zc=
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
github.com/pelletier/go-toml/v2 v2.0.6 h1:nrzqCb7j9cDFj2coyLNLaZuJTLjWjlaz6nvTvIwycIU=
github.com/petar/GoLLRB v0.0.0-20210522233825-ae3b015fd3e9 h1:1/WtZae0yGtPq+TI6+Tv1WTxkukpXeMlviSxvL7SRgk=
Expand Down Expand Up @@ -833,6 +834,7 @@ github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhso
github.com/tj/go-spin v1.1.0/go.mod h1:Mg1mzmePZm4dva8Qz60H2lHwmJ2loum4VIrLgVnKwh4=
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI=
github.com/ugorji/go v1.1.7 h1:/68gy2h+1mWMrwZFeD1kQialdSzAb432dtpeJ42ovdo=
github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw=
github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY=
github.com/ugorji/go/codec v1.2.9 h1:rmenucSohSTiyL09Y+l2OCk+FrMxGMzho2+tjr5ticU=
Expand Down
10 changes: 6 additions & 4 deletions index-sig-to-cid.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,10 @@ func CreateIndex_sig2cid(ctx context.Context, tmpDir string, carPath string, ind
func(c cid.Cid, txNode *ipldbindcode.Transaction) error {
var tx solana.Transaction
txBuffer := new(bytes.Buffer)
txBuffer.Write(txNode.Data.Data)
if txNode.Data.Total > 1 {
txBuffer.Write(txNode.Data.Bytes())
if total, ok := txNode.Data.GetTotal(); ok && total > 1 {
// TODO: handle this case
klog.Infof("skipping transaction with %d partials", total)
return nil
}
if err := bin.UnmarshalBin(&tx, txBuffer.Bytes()); err != nil {
Expand Down Expand Up @@ -194,9 +195,10 @@ func VerifyIndex_sig2cid(ctx context.Context, carPath string, indexFilePath stri
func(c cid.Cid, txNode *ipldbindcode.Transaction) error {
var tx solana.Transaction
txBuffer := new(bytes.Buffer)
txBuffer.Write(txNode.Data.Data)
if txNode.Data.Total > 1 {
txBuffer.Write(txNode.Data.Bytes())
if total, ok := txNode.Data.GetTotal(); ok && total > 1 {
// TODO: handle this case
klog.Infof("skipping transaction with %d partials", total)
return nil
}
if err := bin.UnmarshalBin(&tx, txBuffer.Bytes()); err != nil {
Expand Down
61 changes: 56 additions & 5 deletions ipld/ipldbindcode/ledger.ipldsch
Original file line number Diff line number Diff line change
@@ -1,33 +1,51 @@
# Epoch is the top-level data structure in the DAG. It contains a list of
# subsets, which in turn contain a list of blocks. Each block contains a list
# of entries, which in turn contain a list of transactions.
type Epoch struct {
# The kind of this object. This is used to determine which fields are
# present, and how to interpret them. This is useful for knowing which
# type of object to deserialize.
kind Int
# The epoch number.
epoch Int
# The list of subsets in this epoch.
subsets [ Link ] # [ &Subset ]
} representation tuple

type Subset struct {
kind Int
# First slot in this subset.
first Int
# Last slot in this subset.
last Int
# The list of blocks in this subset.
blocks [ Link ] # [ &Block ]
} representation tuple

type Block struct {
kind Int
# The slot number where this block was created.
slot Int
shredding [ Shredding ]
entries [ Link ] # [ &Entry ]
# The metadata for this block.
meta SlotMeta
# Link to the rewards for this block.
rewards Link # &Rewards
} representation tuple

type Rewards struct {
kind Int
# The slot number for which these rewards are for.
slot Int
# The raw rewards data.
data DataFrame
} representation tuple

type SlotMeta struct {
# The parent slot of this slot.
parent_slot Int
# Block time of this slot.
blocktime Int
} representation tuple

Expand All @@ -40,24 +58,57 @@ type Entry struct {
kind Int
numHashes Int
hash Hash
# The list of transactions in this entry.
transactions [ Link ] # [ &Transaction ]
} representation tuple

type Transaction struct {
kind Int
# Raw transaction data.
data DataFrame
# Raw tx metadata data.
metadata DataFrame
# The slot number where this transaction was created.
slot Int
} representation tuple

type Hash bytes
type Buffer bytes

# DataFrame is a chunk of data that is part of a larger whole. It contains
# a hash of the whole data, and the index of this chunk in the larger whole.
# This is used to verify that the data is not corrupted, and to reassemble
# the data in the correct order.
#
# The data is stored in a Buffer, which is a raw byte array.
# The hash is stored as a CRC64 ISO 3309.
#
# The `next` field is used to link multiple frames together. This is used
# when the data is too large to fit in a single frame.
#
# Example: a payload is too large to fit in a single frame, so it is
# split into multiple frames. Let's say it is split into 10 frames.
# These are what the frames would look like (excluding some fields):
# - DataFrame { index: 0, total: 10, data: [...], next: [cid1, cid2, cid3, cid4, cid5] }
# - DataFrame { index: 1, total: 10, data: [...], next: [] }
# - DataFrame { index: 2, total: 10, data: [...], next: [] }
# - DataFrame { index: 3, total: 10, data: [...], next: [] }
# - DataFrame { index: 4, total: 10, data: [...], next: [] }
# - DataFrame { index: 5, total: 10, data: [...], next: [cid6, cid7, cid8, cid9] }
# - DataFrame { index: 6, total: 10, data: [...], next: [] }
# - DataFrame { index: 7, total: 10, data: [...], next: [] }
# - DataFrame { index: 8, total: 10, data: [...], next: [] }
# - DataFrame { index: 9, total: 10, data: [...], next: [] }
type DataFrame struct {
kind Int
hash Int
index Int
total Int
data Buffer
next [ Link ] # [ &DataFrame ]
# Hash of the whole data across all frames, using CRC64 ISO 3309.
hash nullable optional Int
# Index of this frame among all frames (0-indexed).
index nullable optional Int
# Total number of frames.
total nullable optional Int
# Raw data, stored as a byte array.
data Buffer
# The next frames in the list (if any).
next nullable optional [ Link ] # [ &DataFrame ]
} representation tuple
Loading

0 comments on commit f91feda

Please sign in to comment.