diff --git a/cmd-x-index-cid2subsetoffset.go b/cmd-x-index-cid2subsetoffset.go new file mode 100644 index 00000000..10f6aa22 --- /dev/null +++ b/cmd-x-index-cid2subsetoffset.go @@ -0,0 +1,110 @@ +package main + +import ( + "context" + "fmt" + "os" + "time" + + "github.com/rpcpool/yellowstone-faithful/indexes" + "github.com/urfave/cli/v2" + "k8s.io/klog/v2" +) + +func newCmd_Index_cid2subsetOffset() *cli.Command { + var verify bool + var epoch uint64 + var network indexes.Network + var indexDir string + return &cli.Command{ + Name: "cid-to-subset-offset", + Description: "Given all split CAR files corresponding to a Solana epoch, create an index of the file that maps CIDs to offsets in the CAR file.", + ArgsUsage: " ", + Before: func(c *cli.Context) error { + if network == "" { + network = indexes.NetworkMainnet + } + return nil + }, + Flags: []cli.Flag{ + &cli.BoolFlag{ + Name: "verify", + Usage: "verify the index after creating it", + Destination: &verify, + }, + &cli.StringFlag{ + Name: "tmp-dir", + Usage: "temporary directory to use for storing intermediate files", + Value: os.TempDir(), + }, + &cli.Uint64Flag{ + Name: "epoch", + Usage: "the epoch of the CAR files", + Destination: &epoch, + Required: true, + }, + &cli.StringFlag{ + Name: "network", + Usage: "the cluster of the epoch; one of: mainnet, testnet, devnet", + Action: func(c *cli.Context, s string) error { + network = indexes.Network(s) + if !indexes.IsValidNetwork(network) { + return fmt.Errorf("invalid network: %q", network) + } + return nil + }, + }, + &cli.StringFlag{ + Name: "index-dir", + Usage: "directory to store the index", + Destination: &indexDir, + Required: true, + }, + }, + Subcommands: []*cli.Command{}, + Action: func(c *cli.Context) error { + carPaths := c.Args().Slice() + tmpDir := c.String("tmp-dir") + + if ok, err := isDirectory(indexDir); err != nil { + return err + } else if !ok { + return fmt.Errorf("index-dir is not a directory") + } + + { + startedAt := time.Now() + defer func() { + klog.Infof("Finished in %s", time.Since(startedAt)) + }() + klog.Infof("Creating CID-to-offset index") + indexFilepath, err := CreateIndex_cid2subsetOffset( + context.TODO(), + epoch, + network, + tmpDir, + carPaths, + indexDir, + ) + if err != nil { + panic(err) + } + klog.Info("Index created") + if verify { + klog.Infof("Verifying index located at %s", indexFilepath) + startedAt := time.Now() + defer func() { + klog.Infof("Finished in %s", time.Since(startedAt)) + }() + err := VerifyIndex_cid2subsetOffset(context.TODO(), carPaths, indexFilepath) + if err != nil { + return cli.Exit(err, 1) + } + klog.Info("Index verified") + return nil + } + } + return nil + }, + } +} diff --git a/cmd-x-index.go b/cmd-x-index.go index 58e58fc1..e6c30e71 100644 --- a/cmd-x-index.go +++ b/cmd-x-index.go @@ -15,6 +15,7 @@ func newCmd_Index() *cli.Command { Flags: []cli.Flag{}, Subcommands: []*cli.Command{ newCmd_Index_cid2offset(), + newCmd_Index_cid2subsetOffset(), newCmd_Index_slot2cid(), newCmd_Index_sig2cid(), newCmd_Index_all(), // NOTE: not actually all. diff --git a/index-cid-to-subset-offset.go b/index-cid-to-subset-offset.go new file mode 100644 index 00000000..33731581 --- /dev/null +++ b/index-cid-to-subset-offset.go @@ -0,0 +1,258 @@ +package main + +import ( + "bufio" + "context" + "errors" + "fmt" + "io" + "os" + "path/filepath" + "sort" + "time" + + "github.com/davecgh/go-spew/spew" + "github.com/dustin/go-humanize" + "github.com/ipld/go-car/util" + carv2 "github.com/ipld/go-car/v2" + "github.com/rpcpool/yellowstone-faithful/carreader" + "github.com/rpcpool/yellowstone-faithful/indexes" + "github.com/rpcpool/yellowstone-faithful/ipld/ipldbindcode" + "github.com/rpcpool/yellowstone-faithful/iplddecoders" + "k8s.io/klog/v2" +) + +type subsetAndCar struct { + subset *ipldbindcode.Subset + carPath string +} + +func CreateIndex_cid2subsetOffset( + ctx context.Context, + epoch uint64, + network indexes.Network, + tmpDir string, + carPaths []string, + indexDir string, +) (string, error) { + var numItems uint64 + var subsetAndCars []subsetAndCar + for _, carPath := range carPaths { + // Check if the CAR file exists: + exists, err := fileExists(carPath) + if err != nil { + return "", fmt.Errorf("failed to check if CAR file exists: %w", err) + } + if !exists { + return "", fmt.Errorf("CAR file %q does not exist", carPath) + } + + klog.Infof("Counting items in car file: %s", carPath) + ni, s, err := carCountItemsWithSubset(carPath) + if err != nil { + return "", fmt.Errorf("failed to count items in CAR: %w", err) + } + subsetAndCars = append(subsetAndCars, subsetAndCar{subset: s, carPath: carPath}) + klog.Infof("Found %s items in car file", humanize.Comma(int64(ni))) + numItems += ni + } + + klog.Infof("Found a total of %d items in car files", numItems) + sort.Slice(subsetAndCars, func(i, j int) bool { + return subsetAndCars[i].subset.First < subsetAndCars[j].subset.First + }) + + tmpDir = filepath.Join(tmpDir, "index-cid-to-subset-offset-"+time.Now().Format("20060102-150405.000000000")) + if err := os.MkdirAll(tmpDir, 0o755); err != nil { + return "", fmt.Errorf("failed to create tmp dir: %w", err) + } + + klog.Infof("Creating builder with %d items", numItems) + c2so, err := indexes.NewWriter_CidToSubsetOffsetAndSize( + epoch, + network, + tmpDir, + numItems, + ) + if err != nil { + return "", fmt.Errorf("failed to open index store: %w", err) + } + defer c2so.Close() + + // To do: how to get the subset index? + subset := uint64(0) + numItemsIndexed := uint64(0) + for _, info := range subsetAndCars { + carPath := info.carPath + carFile, err := os.Open(carPath) + if err != nil { + return "", fmt.Errorf("failed to open car file: %w", err) + } + defer carFile.Close() + + rd, err := carreader.New(carFile) + if err != nil { + return "", fmt.Errorf("failed to create car reader: %w", err) + } + + totalOffset := uint64(0) + { + if size, err := rd.HeaderSize(); err != nil { + return "", fmt.Errorf("failed to get car header size: %w", err) + } else { + totalOffset += size + } + } + for { + c, sectionLength, err := rd.NextInfo() + if err != nil { + if errors.Is(err, io.EOF) { + subset++ + break + } + return "", fmt.Errorf("encountered an error while indexing: %w", err) + } + + err = c2so.Put(c, subset, totalOffset, sectionLength) + if err != nil { + return "", fmt.Errorf("failed to put cid to subset, offset: %w", err) + } + + totalOffset += sectionLength + + numItemsIndexed++ + if numItemsIndexed%100_000 == 0 { + printToStderr(".") + } + } + + } + + klog.Infof("Sealing index...") + if err = c2so.Seal(ctx, indexDir); err != nil { + return "", fmt.Errorf("failed to seal index: %w", err) + } + + indexFilePath := c2so.GetFilePath() + klog.Infof("Index created at %s, %d items indexed", indexFilePath, numItemsIndexed) + return indexFilePath, nil +} + +func VerifyIndex_cid2subsetOffset(ctx context.Context, carPaths []string, indexFilePath string) error { + // Check if the index file exists: + exists, err := fileExists(indexFilePath) + if err != nil { + return fmt.Errorf("failed to check if index file exists: %w", err) + } + if !exists { + return fmt.Errorf("index file %s does not exist", indexFilePath) + } + + c2so, err := indexes.Open_CidToSubsetOffsetAndSize(indexFilePath) + if err != nil { + return fmt.Errorf("failed to open index: %w", err) + } + + startedAt := time.Now() + numItems := 0 + defer func() { + klog.Infof("Finished in %s", time.Since(startedAt)) + klog.Infof("Read %d nodes", numItems) + }() + + for _, carPath := range carPaths { + // Check if the CAR file exists: + exists, err := fileExists(carPath) + if err != nil { + return fmt.Errorf("failed to check if CAR file exists: %w", err) + } + if !exists { + return fmt.Errorf("CAR file %s does not exist", carPath) + } + + carFile, err := os.Open(carPath) + if err != nil { + return fmt.Errorf("failed to open car file: %w", err) + } + + rd, err := carreader.New(carFile) + if err != nil { + return fmt.Errorf("failed to create car reader: %w", err) + } + // check it has 1 root + if len(rd.Header.Roots) != 1 { + return fmt.Errorf("car file must have exactly 1 root, but has %d", len(rd.Header.Roots)) + } + + { + // find root cid + rootCID := rd.Header.Roots[0] + subsetAndOffset, err := c2so.Get(rootCID) + if err != nil { + return fmt.Errorf("failed to get subset and offset from index: %w", err) + } + cr, err := carv2.OpenReader(carPath) + if err != nil { + return fmt.Errorf("failed to open CAR file: %w", err) + } + + dr, err := cr.DataReader() + if err != nil { + return fmt.Errorf("failed to open CAR data reader: %w", err) + } + dr.Seek(int64(subsetAndOffset.Offset), io.SeekStart) + br := bufio.NewReader(dr) + + gotCid, data, err := util.ReadNode(br) + if err != nil { + return err + } + // verify that the CID we read matches the one we expected. + if !gotCid.Equals(rootCID) { + return fmt.Errorf("CID mismatch: expected %s, got %s", rootCID, gotCid) + } + // try parsing the data as a Subset node. + decoded, err := iplddecoders.DecodeSubset(data) + if err != nil { + return fmt.Errorf("failed to decode root node: %w", err) + } + spew.Dump(decoded) + cr.Close() + } + + totalOffset := uint64(0) + { + if size, err := rd.HeaderSize(); err != nil { + return err + } else { + totalOffset += size + } + } + for { + c, sectionLen, err := rd.NextInfo() + if errors.Is(err, io.EOF) { + klog.Infof("EOF") + break + } + numItems++ + if numItems%100000 == 0 { + printToStderr(".") + } + offset, err := c2so.Get(c) + if err != nil { + return fmt.Errorf("failed to lookup offset for %s: %w", c, err) + } + if offset.Offset != totalOffset { + return fmt.Errorf("offset mismatch for %s: %d != %d", c, offset, totalOffset) + } + if offset.Size != sectionLen { + return fmt.Errorf("length mismatch for %s: %d != %d", c, offset, sectionLen) + } + + totalOffset += sectionLen + } + carFile.Close() + + } + return nil +} diff --git a/indexes/index-cid-to-subset-offset-and-size.go b/indexes/index-cid-to-subset-offset-and-size.go new file mode 100644 index 00000000..fb0d64f1 --- /dev/null +++ b/indexes/index-cid-to-subset-offset-and-size.go @@ -0,0 +1,197 @@ +package indexes + +import ( + "context" + "fmt" + "io" + "os" + "path/filepath" + + "github.com/ipfs/go-cid" + "github.com/rpcpool/yellowstone-faithful/compactindexsized" +) + +type CidToSubsetOffsetAndSize_Writer struct { + sealed bool + tmpDir string + finalPath string + meta *Metadata + index *compactindexsized.Builder +} + +const ( + // 3 bytes for subset (uint24, max 16.7 MB (megabytes)), + // 6 bytes for offset (uint48, max 281.5 TB (terabytes)), + // 3 bytes for size (uint24, max 16.7 MB (megabytes), which is plenty considering the max object size is ~1 MB) + IndexValueSize_CidToSubsetOffsetAndSize = 3 + 6 + 3 +) + +// todo: not sure if this file name is correct +func formatFilename_CidToSubsetOffsetAndSize(epoch uint64, network Network) string { + return fmt.Sprintf( + "epoch-%d-%s-%s", + epoch, + network, + "cid-to-subset-offset-and-size.index", + ) +} + +var Kind_CidToSubsetOffsetAndSize = []byte("cid-to-subset-offset-and-size") + +func NewWriter_CidToSubsetOffsetAndSize( + epoch uint64, + network Network, + tmpDir string, + numItems uint64, +) (*CidToSubsetOffsetAndSize_Writer, error) { + if !IsValidNetwork(network) { + return nil, ErrInvalidNetwork + } + index, err := compactindexsized.NewBuilderSized( + tmpDir, + uint(numItems), + IndexValueSize_CidToSubsetOffsetAndSize, + ) + if err != nil { + return nil, err + } + meta := &Metadata{ + Epoch: epoch, + Network: network, + IndexKind: Kind_CidToSubsetOffsetAndSize, + } + if err := setDefaultMetadata(index, meta); err != nil { + return nil, err + } + return &CidToSubsetOffsetAndSize_Writer{ + tmpDir: tmpDir, + meta: meta, + index: index, + }, nil +} + +func (w *CidToSubsetOffsetAndSize_Writer) Put(cid_ cid.Cid, subset, offset, size uint64) error { + if cid_ == cid.Undef { + return fmt.Errorf("cid is undefined") + } + if subset > MaxUint24 { + return fmt.Errorf("subset number is too large; max is %d, but got %d", MaxUint24, subset) + } + if offset > MaxUint48 { + return fmt.Errorf("offset is too large; max is %d, but got %d", MaxUint48, offset) + } + if size > MaxUint24 { + return fmt.Errorf("size is too large; max is %d, but got %d", MaxUint24, size) + } + + key := cid_.Bytes() + value := append( + Uint24tob(uint32(subset)), + append( + Uint48tob(offset), + Uint24tob(uint32(size))..., + )..., + ) + + return w.index.Insert(key, value) +} + +func (w *CidToSubsetOffsetAndSize_Writer) Seal(ctx context.Context, dstDir string) error { + if w.sealed { + return fmt.Errorf("already sealed") + } + + filepath := filepath.Join(dstDir, formatFilename_CidToSubsetOffsetAndSize(w.meta.Epoch, w.meta.Network)) + w.finalPath = filepath + + file, err := os.Create(filepath) + if err != nil { + return fmt.Errorf("failed to create file: %v", err) + } + defer file.Close() + + if err := w.index.Seal(ctx, file); err != nil { + return fmt.Errorf("failed to seal index: %w", err) + } + + w.sealed = true + + return nil +} + +func (w *CidToSubsetOffsetAndSize_Writer) Close() error { + if !w.sealed { + return fmt.Errorf("attempted to close a cid-to-subset-offset-and-size index that was not sealed") + } + return w.index.Close() +} + +func (w *CidToSubsetOffsetAndSize_Writer) GetFilePath() string { + return w.finalPath +} + +type CidToSubsetOffsetAndSize_Reader struct { + file io.Closer + meta *Metadata + index *compactindexsized.DB +} + +func Open_CidToSubsetOffsetAndSize(file string) (*CidToSubsetOffsetAndSize_Reader, error) { + reader, err := os.Open(file) + if err != nil { + return nil, fmt.Errorf("failed to open index file: %w", err) + } + + return OpenWithReader_CidToSubsetOffsetAndSize(reader) +} + +func OpenWithReader_CidToSubsetOffsetAndSize(reader ReaderAtCloser) (*CidToSubsetOffsetAndSize_Reader, error) { + index, err := compactindexsized.Open(reader) + if err != nil { + return nil, fmt.Errorf("failed to open index: %w", err) + } + meta, err := getDefaultMetadata(index) + if err != nil { + return nil, err + } + if !IsValidNetwork(meta.Network) { + return nil, fmt.Errorf("invalid network") + } + if err := meta.AssertIndexKind(Kind_CidToSubsetOffsetAndSize); err != nil { + return nil, err + } + return &CidToSubsetOffsetAndSize_Reader{ + file: reader, + meta: meta, + index: index, + }, nil +} + +func (r *CidToSubsetOffsetAndSize_Reader) Get(cid_ cid.Cid) (*SubsetOffsetAndSize, error) { + if cid_ == cid.Undef { + return nil, fmt.Errorf("cid is undefined") + } + key := cid_.Bytes() + value, err := r.index.Lookup(key) + if err != nil { + return nil, err + } + soas := &SubsetOffsetAndSize{} + if err := soas.FromBytes(value); err != nil { + return nil, err + } + return soas, nil +} + +func (r *CidToSubsetOffsetAndSize_Reader) Close() error { + return r.file.Close() +} + +// Meta returns the metadata for the index. +func (r *CidToSubsetOffsetAndSize_Reader) Meta() *Metadata { + return r.meta +} + +func (r *CidToSubsetOffsetAndSize_Reader) Prefetch(b bool) { + r.index.Prefetch(b) +} diff --git a/indexes/subset-offset-and-size.go b/indexes/subset-offset-and-size.go new file mode 100644 index 00000000..34ec83f6 --- /dev/null +++ b/indexes/subset-offset-and-size.go @@ -0,0 +1,21 @@ +package indexes + +import "errors" + +type SubsetOffsetAndSize struct { + Subset uint64 // uint24, 3 bytes, max 16.7 MB (megabytes) + Offset uint64 // uint48, 6 bytes, max 281.5 TB (terabytes) + Size uint64 // uint24, 3 bytes, max 16.7 MB (megabytes) +} + +// FromBytes parses the offset and size from a byte slice. +func (soas *SubsetOffsetAndSize) FromBytes(buf []byte) error { + if len(buf) != IndexValueSize_CidToSubsetOffsetAndSize { + return errors.New("invalid byte slice length") + } + _ = buf[IndexValueSize_CidToSubsetOffsetAndSize-1] // bounds check hint to compiler + soas.Subset = uint64(BtoUint24(buf[:3])) + soas.Offset = BtoUint48(buf[3:9]) + soas.Size = uint64(BtoUint24(buf[9:])) + return nil +} diff --git a/readers.go b/readers.go index 2b796a72..17bde02c 100644 --- a/readers.go +++ b/readers.go @@ -112,6 +112,54 @@ func carCountItemsByFirstByte(carPath string) (map[byte]uint64, *ipldbindcode.Ep return counts, epochObject, err } +func carCountItemsWithSubset(carPath string) (uint64, *ipldbindcode.Subset, error) { + file, err := os.Open(carPath) + if err != nil { + return 0, nil, err + } + defer file.Close() + + rd, err := carreader.New(file) + if err != nil { + return 0, nil, fmt.Errorf("failed to open car file: %w", err) + } + + numTotalItems := uint64(0) + startedCountAt := time.Now() + var subsetObject *ipldbindcode.Subset + for { + _, _, block, err := rd.NextNodeBytes() + if err != nil { + if errors.Is(err, io.EOF) { + break + } + return 0, nil, err + } + // the first data byte is the block type (after the CBOR tag) + firstDataByte := block[1] + numTotalItems++ + + if numTotalItems%1_000_000 == 0 { + printToStderr( + fmt.Sprintf("\rCounted %s items", humanize.Comma(int64(numTotalItems))), + ) + } + + if iplddecoders.Kind(firstDataByte) == iplddecoders.KindSubset { + subsetObject, err = iplddecoders.DecodeSubset(block) + if err != nil { + return 0, nil, fmt.Errorf("failed to decode Epoch node: %w", err) + } + } + } + + printToStderr( + fmt.Sprintf("\rCounted %s items in %s\n", humanize.Comma(int64(numTotalItems)), time.Since(startedCountAt).Truncate(time.Second)), + ) + + return numTotalItems, subsetObject, err +} + func printToStderr(msg string) { fmt.Fprint(os.Stderr, msg) }