From 635d7b31546bef8e7c99a4ff8899bd3d6740e96a Mon Sep 17 00:00:00 2001 From: gagliardetto Date: Wed, 17 May 2023 17:09:46 +0200 Subject: [PATCH] Add slot indexing --- car-dag-traverser.go | 87 ++++++++++-- cmd-x-index-cid2offset.go | 4 +- cmd-x-index-slot2cid.go | 64 +++++++++ cmd-x-index.go | 3 +- cmd-x-traverse.go | 5 +- cmd-x-verify-index-cid2offset.go | 4 +- cmd-x-verify-index-slot2cid.go | 38 ++++++ cmd-x-verify-index.go | 3 +- compactindex36/compactindex.go | 2 +- index-cid-to-offset.go | 22 +++ index-slot-to-cid.go | 226 +++++++++++++++++++++++++++++++ readers.go | 13 -- 12 files changed, 437 insertions(+), 34 deletions(-) create mode 100644 cmd-x-index-slot2cid.go create mode 100644 cmd-x-verify-index-slot2cid.go create mode 100644 index-slot-to-cid.go diff --git a/car-dag-traverser.go b/car-dag-traverser.go index 655a141a..b1dc71cb 100644 --- a/car-dag-traverser.go +++ b/car-dag-traverser.go @@ -251,12 +251,20 @@ func (t *SimpleIterator) GetTransaction(ctx context.Context, c cid.Cid) (*ipldbi // It stops iterating if the callback returns an error. // It works by iterating over all objects in the CAR file and // calling the callback for each object that is a Subset. -func (t *SimpleIterator) FindSubsets(ctx context.Context, callback func(*ipldbindcode.Subset) error) error { +func (t *SimpleIterator) FindSubsets(ctx context.Context, callback func(cid.Cid, *ipldbindcode.Subset) error) error { dr, err := t.cr.DataReader() if err != nil { return fmt.Errorf("failed to get data reader: %w", err) } - rd, err := car.NewCarReader(dr) + return FindSubsets(ctx, dr, callback) +} + +func FindSubsets( + ctx context.Context, + sectionReader carv2.SectionReader, + callback func(cid.Cid, *ipldbindcode.Subset) error, +) error { + rd, err := car.NewCarReader(sectionReader) if err != nil { return fmt.Errorf("failed to create car reader: %w", err) } @@ -274,7 +282,7 @@ func (t *SimpleIterator) FindSubsets(ctx context.Context, callback func(*ipldbin // TODO: log error, or return error? continue } - err = callback(decoded) + err = callback(block.Cid(), decoded) if err != nil { if err == ErrStopIteration { return nil @@ -292,12 +300,20 @@ var ErrStopIteration = errors.New("stop iteration") // It stops iterating if the callback returns an error. // It works by iterating over all objects in the CAR file and // calling the callback for each object that is a Block. -func (t *SimpleIterator) FindBlocks(ctx context.Context, callback func(*ipldbindcode.Block) error) error { +func (t *SimpleIterator) FindBlocks(ctx context.Context, callback func(cid.Cid, *ipldbindcode.Block) error) error { dr, err := t.cr.DataReader() if err != nil { return fmt.Errorf("failed to get data reader: %w", err) } - rd, err := car.NewCarReader(dr) + return FindBlocks(ctx, dr, callback) +} + +func FindBlocks( + ctx context.Context, + sectionReader carv2.SectionReader, + callback func(cid.Cid, *ipldbindcode.Block) error, +) error { + rd, err := car.NewCarReader(sectionReader) if err != nil { return fmt.Errorf("failed to create car reader: %w", err) } @@ -314,7 +330,7 @@ func (t *SimpleIterator) FindBlocks(ctx context.Context, callback func(*ipldbind if err != nil { continue } - err = callback(decoded) + err = callback(block.Cid(), decoded) if err != nil { if err == ErrStopIteration { return nil @@ -330,12 +346,20 @@ func (t *SimpleIterator) FindBlocks(ctx context.Context, callback func(*ipldbind // It stops iterating if the callback returns an error. // It works by iterating over all objects in the CAR file and // calling the callback for each object that is an Entry. -func (t *SimpleIterator) FindEntries(ctx context.Context, callback func(*ipldbindcode.Entry) error) error { +func (t *SimpleIterator) FindEntries(ctx context.Context, callback func(cid.Cid, *ipldbindcode.Entry) error) error { dr, err := t.cr.DataReader() if err != nil { return fmt.Errorf("failed to get data reader: %w", err) } - rd, err := car.NewCarReader(dr) + return FindEntries(ctx, dr, callback) +} + +func FindEntries( + ctx context.Context, + sectionReader carv2.SectionReader, + callback func(cid.Cid, *ipldbindcode.Entry) error, +) error { + rd, err := car.NewCarReader(sectionReader) if err != nil { return fmt.Errorf("failed to create car reader: %w", err) } @@ -352,7 +376,7 @@ func (t *SimpleIterator) FindEntries(ctx context.Context, callback func(*ipldbin if err != nil { continue } - err = callback(decoded) + err = callback(block.Cid(), decoded) if err != nil { return err } @@ -365,12 +389,20 @@ func (t *SimpleIterator) FindEntries(ctx context.Context, callback func(*ipldbin // It stops iterating if the callback returns an error. // It works by iterating over all objects in the CAR file and // calling the callback for each object that is a Transaction. -func (t *SimpleIterator) FindTransactions(ctx context.Context, callback func(*ipldbindcode.Transaction) error) error { +func (t *SimpleIterator) FindTransactions(ctx context.Context, callback func(cid.Cid, *ipldbindcode.Transaction) error) error { dr, err := t.cr.DataReader() if err != nil { return fmt.Errorf("failed to get data reader: %w", err) } - rd, err := car.NewCarReader(dr) + return FindTransactions(ctx, dr, callback) +} + +func FindTransactions( + ctx context.Context, + sectionReader carv2.SectionReader, + callback func(cid.Cid, *ipldbindcode.Transaction) error, +) error { + rd, err := car.NewCarReader(sectionReader) if err != nil { return fmt.Errorf("failed to create car reader: %w", err) } @@ -387,7 +419,7 @@ func (t *SimpleIterator) FindTransactions(ctx context.Context, callback func(*ip if err != nil { continue } - err = callback(decoded) + err = callback(block.Cid(), decoded) if err != nil { return err } @@ -396,6 +428,37 @@ func (t *SimpleIterator) FindTransactions(ctx context.Context, callback func(*ip return nil } +func FindAny( + ctx context.Context, + sectionReader carv2.SectionReader, + callback func(cid.Cid, any) error, +) error { + rd, err := car.NewCarReader(sectionReader) + if err != nil { + return fmt.Errorf("failed to create car reader: %w", err) + } + for { + block, err := rd.Next() + if errors.Is(err, io.EOF) { + break + } + { + decoded, err := iplddecoders.DecodeAny(block.RawData()) + if err != nil { + continue + } + err = callback(block.Cid(), decoded) + if err != nil { + if err == ErrStopIteration { + return nil + } + return err + } + } + } + return nil +} + type offsetFinderFunc func(ctx context.Context, c cid.Cid) (uint64, error) func getRawNodeFromCarByCid(offsetFinder offsetFinderFunc, cr *carv2.Reader, c cid.Cid) (*blocks.BasicBlock, error) { diff --git a/cmd-x-index-cid2offset.go b/cmd-x-index-cid2offset.go index 06b834f3..0b8be884 100644 --- a/cmd-x-index-cid2offset.go +++ b/cmd-x-index-cid2offset.go @@ -8,7 +8,7 @@ import ( "k8s.io/klog/v2" ) -func newCmd_Index_Cid2Offset() *cli.Command { +func newCmd_Index_cid2offset() *cli.Command { var verify bool return &cli.Command{ Name: "cid-to-offset", @@ -34,7 +34,7 @@ func newCmd_Index_Cid2Offset() *cli.Command { defer func() { klog.Infof("Finished in %s", time.Since(startedAt)) }() - klog.Infof("Creating index for %s", carPath) + klog.Infof("Creating CID-to-offset index for %s", carPath) indexFilepath, err := CreateIndex_cid2offset( context.TODO(), carPath, diff --git a/cmd-x-index-slot2cid.go b/cmd-x-index-slot2cid.go new file mode 100644 index 00000000..92a52bb6 --- /dev/null +++ b/cmd-x-index-slot2cid.go @@ -0,0 +1,64 @@ +package main + +import ( + "context" + "time" + + "github.com/urfave/cli/v2" + "k8s.io/klog/v2" +) + +func newCmd_Index_slot2cid() *cli.Command { + var verify bool + return &cli.Command{ + Name: "slot-to-cid", + Description: "Given a CAR file containing a Solana epoch, create an index of the file that maps slot numbers to CIDs.", + ArgsUsage: " ", + Before: func(c *cli.Context) error { + return nil + }, + Flags: []cli.Flag{ + &cli.BoolFlag{ + Name: "verify", + Usage: "verify the index after creating it", + Destination: &verify, + }, + }, + Subcommands: []*cli.Command{}, + Action: func(c *cli.Context) error { + carPath := c.Args().Get(0) + indexDir := c.Args().Get(1) + + { + startedAt := time.Now() + defer func() { + klog.Infof("Finished in %s", time.Since(startedAt)) + }() + klog.Infof("Creating Slot-to-CID index for %s", carPath) + indexFilepath, err := CreateIndex_slot2cid( + context.TODO(), + carPath, + indexDir, + ) + if err != nil { + panic(err) + } + klog.Info("Index created") + if verify { + klog.Infof("Verifying index for %s located at %s", carPath, indexFilepath) + startedAt := time.Now() + defer func() { + klog.Infof("Finished in %s", time.Since(startedAt)) + }() + err := VerifyIndex_slot2cid(context.TODO(), carPath, indexFilepath) + if err != nil { + return cli.Exit(err, 1) + } + klog.Info("Index verified") + return nil + } + } + return nil + }, + } +} diff --git a/cmd-x-index.go b/cmd-x-index.go index 92e818c9..bc20a43a 100644 --- a/cmd-x-index.go +++ b/cmd-x-index.go @@ -13,7 +13,8 @@ func newCmd_Index() *cli.Command { }, Flags: []cli.Flag{}, Subcommands: []*cli.Command{ - newCmd_Index_Cid2Offset(), + newCmd_Index_cid2offset(), + newCmd_Index_slot2cid(), }, } } diff --git a/cmd-x-traverse.go b/cmd-x-traverse.go index e19bd081..00be72ca 100644 --- a/cmd-x-traverse.go +++ b/cmd-x-traverse.go @@ -8,6 +8,7 @@ import ( "github.com/davecgh/go-spew/spew" bin "github.com/gagliardetto/binary" "github.com/gagliardetto/solana-go" + "github.com/ipfs/go-cid" cidlink "github.com/ipld/go-ipld-prime/linking/cid" "github.com/urfave/cli/v2" "go.firedancer.io/radiance/cmd/radiance/car/createcar/ipld/ipldbindcode" @@ -123,7 +124,7 @@ func newCmd_XTraverse() *cli.Command { if false { klog.Info("Iterating Solana blocks...") - err = simpleIter.FindBlocks(context.Background(), func(block *ipldbindcode.Block) error { + err = simpleIter.FindBlocks(context.Background(), func(_ cid.Cid, block *ipldbindcode.Block) error { numSolanaBlocks++ if numSolanaBlocks%10_000 == 0 { fmt.Print(".") @@ -137,7 +138,7 @@ func newCmd_XTraverse() *cli.Command { klog.Infof("Finished iterating blocks in %s; found %d solana blocks", took, numSolanaBlocks) klog.Info("Iterating Solana Transactions...") - err = simpleIter.FindTransactions(context.Background(), func(tx *ipldbindcode.Transaction) error { + err = simpleIter.FindTransactions(context.Background(), func(_ cid.Cid, tx *ipldbindcode.Transaction) error { numTransactions++ if numTransactions%100_000 == 0 { fmt.Print(".") diff --git a/cmd-x-verify-index-cid2offset.go b/cmd-x-verify-index-cid2offset.go index 3aff57cb..fad36a2e 100644 --- a/cmd-x-verify-index-cid2offset.go +++ b/cmd-x-verify-index-cid2offset.go @@ -8,7 +8,7 @@ import ( "k8s.io/klog/v2" ) -func newCmd_VerifyIndex_Cid2Offset() *cli.Command { +func newCmd_VerifyIndex_cid2offset() *cli.Command { return &cli.Command{ Name: "cid-to-offset", Description: "Verify the index of the CAR file that maps CIDs to offsets in the CAR file.", @@ -25,7 +25,7 @@ func newCmd_VerifyIndex_Cid2Offset() *cli.Command { defer func() { klog.Infof("Finished in %s", time.Since(startedAt)) }() - klog.Infof("Verifying index for %s", carPath) + klog.Infof("Verifying CID-to-offset index for %s", carPath) err := VerifyIndex_cid2offset(context.TODO(), carPath, indexFilePath) if err != nil { return err diff --git a/cmd-x-verify-index-slot2cid.go b/cmd-x-verify-index-slot2cid.go new file mode 100644 index 00000000..696f3c5a --- /dev/null +++ b/cmd-x-verify-index-slot2cid.go @@ -0,0 +1,38 @@ +package main + +import ( + "context" + "time" + + "github.com/urfave/cli/v2" + "k8s.io/klog/v2" +) + +func newCmd_VerifyIndex_slot2cid() *cli.Command { + return &cli.Command{ + Name: "slot-to-cid", + Description: "Verify the index of the CAR file that maps slot numbers to CIDs.", + ArgsUsage: " ", + Before: func(c *cli.Context) error { + return nil + }, + Flags: []cli.Flag{}, + Action: func(c *cli.Context) error { + carPath := c.Args().Get(0) + indexFilePath := c.Args().Get(1) + { + startedAt := time.Now() + defer func() { + klog.Infof("Finished in %s", time.Since(startedAt)) + }() + klog.Infof("Verifying Slot-to-CID index for %s", carPath) + err := VerifyIndex_slot2cid(context.TODO(), carPath, indexFilePath) + if err != nil { + return err + } + klog.Info("Index verified successfully") + } + return nil + }, + } +} diff --git a/cmd-x-verify-index.go b/cmd-x-verify-index.go index b466ff61..e5375a55 100644 --- a/cmd-x-verify-index.go +++ b/cmd-x-verify-index.go @@ -13,7 +13,8 @@ func newCmd_VerifyIndex() *cli.Command { }, Flags: []cli.Flag{}, Subcommands: []*cli.Command{ - newCmd_VerifyIndex_Cid2Offset(), + newCmd_VerifyIndex_cid2offset(), + newCmd_VerifyIndex_slot2cid(), }, } } diff --git a/compactindex36/compactindex.go b/compactindex36/compactindex.go index 9b299974..6ac605ed 100644 --- a/compactindex36/compactindex.go +++ b/compactindex36/compactindex.go @@ -239,7 +239,7 @@ type Entry struct { // intWidth returns the number of bytes minimally required to represent the given integer. func intWidth(n uint64) uint8 { - return 36 + return 36 // 36 is the length of the CIDs we use. msb := 64 - bits.LeadingZeros64(n) return uint8((msb + 7) / 8) } diff --git a/index-cid-to-offset.go b/index-cid-to-offset.go index c57291a2..21b229f8 100644 --- a/index-cid-to-offset.go +++ b/index-cid-to-offset.go @@ -12,6 +12,7 @@ import ( "time" "github.com/davecgh/go-spew/spew" + "github.com/ipfs/go-cid" carv1 "github.com/ipld/go-car" "github.com/ipld/go-car/util" carv2 "github.com/ipld/go-car/v2" @@ -22,6 +23,15 @@ import ( // CreateIndex_cid2offset creates an index file that maps CIDs to offsets in the CAR file. func CreateIndex_cid2offset(ctx context.Context, carPath string, indexDir string) (string, error) { + // Check if the CAR file exists: + exists, err := fileExists(carPath) + if err != nil { + return "", fmt.Errorf("failed to check if CAR file exists: %w", err) + } + if !exists { + return "", fmt.Errorf("CAR file %q does not exist", carPath) + } + carFile, err := os.Open(carPath) if err != nil { return "", fmt.Errorf("failed to open car file: %w", err) @@ -234,3 +244,15 @@ func VerifyIndex_cid2offset(ctx context.Context, carPath string, indexFilePath s } return nil } + +func findOffsetFromIndexForCID(db *compactindex.DB, c cid.Cid) (uint64, error) { + bucket, err := db.LookupBucket(c.Bytes()) + if err != nil { + return 0, fmt.Errorf("failed to lookup bucket for %s: %w", c, err) + } + offset, err := bucket.Lookup(c.Bytes()) + if err != nil { + return 0, fmt.Errorf("failed to lookup offset for %s: %w", c, err) + } + return offset, nil +} diff --git a/index-slot-to-cid.go b/index-slot-to-cid.go new file mode 100644 index 00000000..b2fe989e --- /dev/null +++ b/index-slot-to-cid.go @@ -0,0 +1,226 @@ +package main + +import ( + "context" + "encoding/binary" + "fmt" + "os" + "path/filepath" + + "github.com/ipfs/go-cid" + carv2 "github.com/ipld/go-car/v2" + "github.com/rpcpool/yellowstone-faithful/compactindex36" + "go.firedancer.io/radiance/cmd/radiance/car/createcar/ipld/ipldbindcode" + "k8s.io/klog/v2" +) + +// CreateIndex_slot2cid creates an index file that maps slot numbers to CIDs. +func CreateIndex_slot2cid(ctx context.Context, carPath string, indexDir string) (string, error) { + // Check if the CAR file exists: + exists, err := fileExists(carPath) + if err != nil { + return "", fmt.Errorf("failed to check if CAR file exists: %w", err) + } + if !exists { + return "", fmt.Errorf("CAR file %q does not exist", carPath) + } + + cr, err := carv2.OpenReader(carPath) + if err != nil { + return "", fmt.Errorf("failed to open CAR file: %w", err) + } + + // check it has 1 root + roots, err := cr.Roots() + if err != nil { + return "", fmt.Errorf("failed to get roots: %w", err) + } + // There should be only one root CID in the CAR file. + if len(roots) != 1 { + return "", fmt.Errorf("CAR file has %d roots, expected 1", len(roots)) + } + + // TODO: use another way to precisely count the number of solana Blocks in the CAR file. + klog.Infof("Counting items in car file...") + numItems, err := carCountItems(carPath) + if err != nil { + return "", fmt.Errorf("failed to count items in car file: %w", err) + } + klog.Infof("Found %d items in car file", numItems) + + klog.Infof("Creating builder with %d items", numItems) + c2o, err := compactindex36.NewBuilder( + "", + uint(numItems), // TODO: what if the number of real items is less than this? + (0), + ) + if err != nil { + return "", fmt.Errorf("failed to open index store: %w", err) + } + defer c2o.Close() + + numItemsIndexed := uint64(0) + klog.Infof("Indexing...") + + dr, err := cr.DataReader() + if err != nil { + return "", fmt.Errorf("failed to get data reader: %w", err) + } + + // Iterate over all blocks in the CAR file and put them into the index, + // using the slot number as the key and the CID as the value. + err = FindBlocks( + ctx, + dr, + func(c cid.Cid, block *ipldbindcode.Block) error { + slotNum := block.Slot + + slotBytes := uint64ToLeBytes(uint64(slotNum)) + + var buf [36]byte + copy(buf[:], c.Bytes()[:36]) + + err = c2o.Insert(slotBytes, buf) + if err != nil { + return fmt.Errorf("failed to put cid to offset: %w", err) + } + + numItemsIndexed++ + if numItemsIndexed%1_000 == 0 { + printToStderr(".") + } + return nil + }) + if err != nil { + return "", fmt.Errorf("failed to index; error while iterating over blocks: %w", err) + } + + rootCID := roots[0] + + // Use the car file name and root CID to name the index file: + indexFilePath := filepath.Join(indexDir, fmt.Sprintf("%s.%s.slot-to-cid.index", filepath.Base(carPath), rootCID.String())) + + klog.Infof("Creating index file at %s", indexFilePath) + targetFile, err := os.Create(indexFilePath) + if err != nil { + return "", fmt.Errorf("failed to create index file: %w", err) + } + defer targetFile.Close() + + klog.Infof("Sealing index...") + if err = c2o.Seal(ctx, targetFile); err != nil { + return "", fmt.Errorf("failed to seal index: %w", err) + } + klog.Infof("Index created") + return indexFilePath, nil +} + +// VerifyIndex_slot2cid verifies that the index file is correct for the given car file. +// It does this by reading the car file and comparing the offsets in the index +// file to the offsets in the car file. +func VerifyIndex_slot2cid(ctx context.Context, carPath string, indexFilePath string) error { + // Check if the CAR file exists: + exists, err := fileExists(carPath) + if err != nil { + return fmt.Errorf("failed to check if CAR file exists: %w", err) + } + if !exists { + return fmt.Errorf("CAR file %s does not exist", carPath) + } + + // Check if the index file exists: + exists, err = fileExists(indexFilePath) + if err != nil { + return fmt.Errorf("failed to check if index file exists: %w", err) + } + if !exists { + return fmt.Errorf("index file %s does not exist", indexFilePath) + } + + cr, err := carv2.OpenReader(carPath) + if err != nil { + return fmt.Errorf("failed to open CAR file: %w", err) + } + + // check it has 1 root + roots, err := cr.Roots() + if err != nil { + return fmt.Errorf("failed to get roots: %w", err) + } + // There should be only one root CID in the CAR file. + if len(roots) != 1 { + return fmt.Errorf("CAR file has %d roots, expected 1", len(roots)) + } + + indexFile, err := os.Open(indexFilePath) + if err != nil { + return fmt.Errorf("failed to open index file: %w", err) + } + defer indexFile.Close() + + c2o, err := compactindex36.Open(indexFile) + if err != nil { + return fmt.Errorf("failed to open index: %w", err) + } + + dr, err := cr.DataReader() + if err != nil { + return fmt.Errorf("failed to get data reader: %w", err) + } + + numItems := uint64(0) + // Iterate over all blocks in the CAR file and put them into the index, + // using the slot number as the key and the CID as the value. + err = FindBlocks( + ctx, + dr, + func(c cid.Cid, block *ipldbindcode.Block) error { + slotNum := uint64(block.Slot) + + got, err := findCidFromSlot(c2o, slotNum) + if err != nil { + return fmt.Errorf("failed to put cid to offset: %w", err) + } + + if !got.Equals(c) { + return fmt.Errorf("slot %d: expected %s, got %s", slotNum, c, got) + } + + numItems++ + if numItems%1_000 == 0 { + printToStderr(".") + } + + return nil + }) + if err != nil { + return fmt.Errorf("failed to verify index; error while iterating over blocks: %w", err) + } + return nil +} + +func uint64ToLeBytes(n uint64) []byte { + b := make([]byte, 8) + binary.LittleEndian.PutUint64(b, n) + return b +} + +func findCidFromSlot(db *compactindex36.DB, slotNum uint64) (cid.Cid, error) { + slotBytes := uint64ToLeBytes(uint64(slotNum)) + bucket, err := db.LookupBucket(slotBytes) + if err != nil { + return cid.Cid{}, fmt.Errorf("failed to lookup bucket for %d: %w", slotNum, err) + } + got, err := bucket.Lookup(slotBytes) + if err != nil { + return cid.Cid{}, fmt.Errorf("failed to lookup value for %d: %w", slotNum, err) + } + l, c, err := cid.CidFromBytes(got[:]) + if err != nil { + return cid.Cid{}, fmt.Errorf("failed to parse cid from bytes: %w", err) + } + if l != 36 { + return cid.Cid{}, fmt.Errorf("unexpected cid length %d", l) + } + return c, nil +} diff --git a/readers.go b/readers.go index f755ea1f..6a8df221 100644 --- a/readers.go +++ b/readers.go @@ -12,7 +12,6 @@ import ( cbor "github.com/ipfs/go-ipld-cbor" carv1 "github.com/ipld/go-car" "github.com/ipld/go-car/util" - "github.com/rpcpool/yellowstone-faithful/compactindex" ) func readHeader(br io.Reader) (*carv1.CarHeader, error) { @@ -172,15 +171,3 @@ func carCountItems(carPath string) (uint64, error) { func printToStderr(msg string) { fmt.Fprint(os.Stderr, msg) } - -func findOffsetFromIndexForCID(db *compactindex.DB, c cid.Cid) (uint64, error) { - bucket, err := db.LookupBucket(c.Bytes()) - if err != nil { - return 0, fmt.Errorf("failed to lookup bucket for %s: %w", c, err) - } - offset, err := bucket.Lookup(c.Bytes()) - if err != nil { - return 0, fmt.Errorf("failed to lookup offset for %s: %w", c, err) - } - return offset, nil -}