Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add subset, offset and size index #146

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 110 additions & 0 deletions cmd-x-index-cid2subsetoffset.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package main

import (
"context"
"fmt"
"os"
"time"

"github.com/rpcpool/yellowstone-faithful/indexes"
"github.com/urfave/cli/v2"
"k8s.io/klog/v2"
)

func newCmd_Index_cid2subsetOffset() *cli.Command {
var verify bool
var epoch uint64
var network indexes.Network
var indexDir string
return &cli.Command{
Name: "cid-to-subset-offset",
Description: "Given all split CAR files corresponding to a Solana epoch, create an index of the file that maps CIDs to offsets in the CAR file.",
ArgsUsage: "<car-paths> <index-dir>",
Before: func(c *cli.Context) error {
if network == "" {
network = indexes.NetworkMainnet
}
return nil
},
Flags: []cli.Flag{
&cli.BoolFlag{
Name: "verify",
Usage: "verify the index after creating it",
Destination: &verify,
},
&cli.StringFlag{
Name: "tmp-dir",
Usage: "temporary directory to use for storing intermediate files",
Value: os.TempDir(),
},
&cli.Uint64Flag{
Name: "epoch",
Usage: "the epoch of the CAR files",
Destination: &epoch,
Required: true,
},
&cli.StringFlag{
Name: "network",
Usage: "the cluster of the epoch; one of: mainnet, testnet, devnet",
Action: func(c *cli.Context, s string) error {
network = indexes.Network(s)
if !indexes.IsValidNetwork(network) {
return fmt.Errorf("invalid network: %q", network)
}
return nil
},
},
&cli.StringFlag{
Name: "index-dir",
Usage: "directory to store the index",
Destination: &indexDir,
Required: true,
},
},
Subcommands: []*cli.Command{},
Action: func(c *cli.Context) error {
carPaths := c.Args().Slice()
tmpDir := c.String("tmp-dir")

if ok, err := isDirectory(indexDir); err != nil {
return err
} else if !ok {
return fmt.Errorf("index-dir is not a directory")
}

{
startedAt := time.Now()
defer func() {
klog.Infof("Finished in %s", time.Since(startedAt))
}()
klog.Infof("Creating CID-to-offset index")
indexFilepath, err := CreateIndex_cid2subsetOffset(
context.TODO(),
epoch,
network,
tmpDir,
carPaths,
indexDir,
)
if err != nil {
panic(err)
}
klog.Info("Index created")
if verify {
klog.Infof("Verifying index located at %s", indexFilepath)
startedAt := time.Now()
defer func() {
klog.Infof("Finished in %s", time.Since(startedAt))
}()
err := VerifyIndex_cid2subsetOffset(context.TODO(), carPaths, indexFilepath)
if err != nil {
return cli.Exit(err, 1)
}
klog.Info("Index verified")
return nil
}
}
return nil
},
}
}
1 change: 1 addition & 0 deletions cmd-x-index.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ func newCmd_Index() *cli.Command {
Flags: []cli.Flag{},
Subcommands: []*cli.Command{
newCmd_Index_cid2offset(),
newCmd_Index_cid2subsetOffset(),
newCmd_Index_slot2cid(),
newCmd_Index_sig2cid(),
newCmd_Index_all(), // NOTE: not actually all.
Expand Down
258 changes: 258 additions & 0 deletions index-cid-to-subset-offset.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,258 @@
package main

import (
"bufio"
"context"
"errors"
"fmt"
"io"
"os"
"path/filepath"
"sort"
"time"

"github.com/davecgh/go-spew/spew"
"github.com/dustin/go-humanize"
"github.com/ipld/go-car/util"
carv2 "github.com/ipld/go-car/v2"
"github.com/rpcpool/yellowstone-faithful/carreader"
"github.com/rpcpool/yellowstone-faithful/indexes"
"github.com/rpcpool/yellowstone-faithful/ipld/ipldbindcode"
"github.com/rpcpool/yellowstone-faithful/iplddecoders"
"k8s.io/klog/v2"
)

type subsetAndCar struct {
subset *ipldbindcode.Subset
carPath string
}

func CreateIndex_cid2subsetOffset(
ctx context.Context,
epoch uint64,
network indexes.Network,
tmpDir string,
carPaths []string,
indexDir string,
) (string, error) {
var numItems uint64
var subsetAndCars []subsetAndCar
for _, carPath := range carPaths {
// Check if the CAR file exists:
exists, err := fileExists(carPath)
if err != nil {
return "", fmt.Errorf("failed to check if CAR file exists: %w", err)
}
if !exists {
return "", fmt.Errorf("CAR file %q does not exist", carPath)
}

klog.Infof("Counting items in car file: %s", carPath)
ni, s, err := carCountItemsWithSubset(carPath)
if err != nil {
return "", fmt.Errorf("failed to count items in CAR: %w", err)
}
subsetAndCars = append(subsetAndCars, subsetAndCar{subset: s, carPath: carPath})
klog.Infof("Found %s items in car file", humanize.Comma(int64(ni)))
numItems += ni
}

klog.Infof("Found a total of %d items in car files", numItems)
sort.Slice(subsetAndCars, func(i, j int) bool {
return subsetAndCars[i].subset.First < subsetAndCars[j].subset.First
})

tmpDir = filepath.Join(tmpDir, "index-cid-to-subset-offset-"+time.Now().Format("20060102-150405.000000000"))
if err := os.MkdirAll(tmpDir, 0o755); err != nil {
return "", fmt.Errorf("failed to create tmp dir: %w", err)
}

klog.Infof("Creating builder with %d items", numItems)
c2so, err := indexes.NewWriter_CidToSubsetOffsetAndSize(
epoch,
network,
tmpDir,
numItems,
)
if err != nil {
return "", fmt.Errorf("failed to open index store: %w", err)
}
defer c2so.Close()

// To do: how to get the subset index?
subset := uint64(0)
numItemsIndexed := uint64(0)
for _, info := range subsetAndCars {
carPath := info.carPath
carFile, err := os.Open(carPath)
if err != nil {
return "", fmt.Errorf("failed to open car file: %w", err)
}
defer carFile.Close()

rd, err := carreader.New(carFile)
if err != nil {
return "", fmt.Errorf("failed to create car reader: %w", err)
}

totalOffset := uint64(0)
{
if size, err := rd.HeaderSize(); err != nil {
return "", fmt.Errorf("failed to get car header size: %w", err)
} else {
totalOffset += size
}
}
for {
c, sectionLength, err := rd.NextInfo()
if err != nil {
if errors.Is(err, io.EOF) {
subset++
break
}
return "", fmt.Errorf("encountered an error while indexing: %w", err)
}

err = c2so.Put(c, subset, totalOffset, sectionLength)
if err != nil {
return "", fmt.Errorf("failed to put cid to subset, offset: %w", err)
}

totalOffset += sectionLength

numItemsIndexed++
if numItemsIndexed%100_000 == 0 {
printToStderr(".")
}
}

}

klog.Infof("Sealing index...")
if err = c2so.Seal(ctx, indexDir); err != nil {
return "", fmt.Errorf("failed to seal index: %w", err)
}

indexFilePath := c2so.GetFilePath()
klog.Infof("Index created at %s, %d items indexed", indexFilePath, numItemsIndexed)
return indexFilePath, nil
}

func VerifyIndex_cid2subsetOffset(ctx context.Context, carPaths []string, indexFilePath string) error {
// Check if the index file exists:
exists, err := fileExists(indexFilePath)
if err != nil {
return fmt.Errorf("failed to check if index file exists: %w", err)
}
if !exists {
return fmt.Errorf("index file %s does not exist", indexFilePath)
}

c2so, err := indexes.Open_CidToSubsetOffsetAndSize(indexFilePath)
if err != nil {
return fmt.Errorf("failed to open index: %w", err)
}

startedAt := time.Now()
numItems := 0
defer func() {
klog.Infof("Finished in %s", time.Since(startedAt))
klog.Infof("Read %d nodes", numItems)
}()

for _, carPath := range carPaths {
// Check if the CAR file exists:
exists, err := fileExists(carPath)
if err != nil {
return fmt.Errorf("failed to check if CAR file exists: %w", err)
}
if !exists {
return fmt.Errorf("CAR file %s does not exist", carPath)
}

carFile, err := os.Open(carPath)
if err != nil {
return fmt.Errorf("failed to open car file: %w", err)
}

rd, err := carreader.New(carFile)
if err != nil {
return fmt.Errorf("failed to create car reader: %w", err)
}
// check it has 1 root
if len(rd.Header.Roots) != 1 {
return fmt.Errorf("car file must have exactly 1 root, but has %d", len(rd.Header.Roots))
}

{
// find root cid
rootCID := rd.Header.Roots[0]
subsetAndOffset, err := c2so.Get(rootCID)
if err != nil {
return fmt.Errorf("failed to get subset and offset from index: %w", err)
}
cr, err := carv2.OpenReader(carPath)
if err != nil {
return fmt.Errorf("failed to open CAR file: %w", err)
}

dr, err := cr.DataReader()
if err != nil {
return fmt.Errorf("failed to open CAR data reader: %w", err)
}
dr.Seek(int64(subsetAndOffset.Offset), io.SeekStart)
br := bufio.NewReader(dr)

gotCid, data, err := util.ReadNode(br)
if err != nil {
return err
}
// verify that the CID we read matches the one we expected.
if !gotCid.Equals(rootCID) {
return fmt.Errorf("CID mismatch: expected %s, got %s", rootCID, gotCid)
}
// try parsing the data as a Subset node.
decoded, err := iplddecoders.DecodeSubset(data)
if err != nil {
return fmt.Errorf("failed to decode root node: %w", err)
}
spew.Dump(decoded)
cr.Close()
}

totalOffset := uint64(0)
{
if size, err := rd.HeaderSize(); err != nil {
return err
} else {
totalOffset += size
}
}
for {
c, sectionLen, err := rd.NextInfo()
if errors.Is(err, io.EOF) {
klog.Infof("EOF")
break
}
numItems++
if numItems%100000 == 0 {
printToStderr(".")
}
offset, err := c2so.Get(c)
if err != nil {
return fmt.Errorf("failed to lookup offset for %s: %w", c, err)
}
if offset.Offset != totalOffset {
return fmt.Errorf("offset mismatch for %s: %d != %d", c, offset, totalOffset)
}
if offset.Size != sectionLen {
return fmt.Errorf("length mismatch for %s: %d != %d", c, offset, sectionLen)
}

totalOffset += sectionLen
}
carFile.Close()

}
return nil
}
Loading
Loading