diff --git a/index-cid-to-subset-offset.go b/index-cid-to-subset-offset.go index 84512f08..48f93613 100644 --- a/index-cid-to-subset-offset.go +++ b/index-cid-to-subset-offset.go @@ -12,8 +12,10 @@ import ( "github.com/davecgh/go-spew/spew" "github.com/dustin/go-humanize" + "github.com/ipfs/go-cid" "github.com/ipld/go-car/util" carv2 "github.com/ipld/go-car/v2" + cidlink "github.com/ipld/go-ipld-prime/linking/cid" "github.com/rpcpool/yellowstone-faithful/carreader" "github.com/rpcpool/yellowstone-faithful/indexes" "github.com/rpcpool/yellowstone-faithful/iplddecoders" @@ -29,6 +31,8 @@ func CreateIndex_cid2subsetOffset( indexDir string, ) (string, error) { var numItems uint64 + var orderedCids []cid.Cid + carPathMap := make(map[cid.Cid]string) for _, carPath := range carPaths { // Check if the CAR file exists: exists, err := fileExists(carPath) @@ -39,11 +43,34 @@ func CreateIndex_cid2subsetOffset( return "", fmt.Errorf("CAR file %q does not exist", carPath) } + carFile, err := os.Open(carPath) + if err != nil { + return "", fmt.Errorf("failed to open car file: %w", err) + } + defer carFile.Close() + + rd, err := carreader.New(carFile) + if err != nil { + return "", fmt.Errorf("failed to create car reader: %w", err) + } + + // check it has 1 root + if len(rd.Header.Roots) != 1 { + return "", fmt.Errorf("car file must have exactly 1 root, but has %d", len(rd.Header.Roots)) + } + + carPathMap[rd.Header.Roots[0]] = carPath + klog.Infof("Counting items in car file: %s", carPath) - ni, err := carCountItems(carPath) + ni, epochObject, err := carCountItemsAndFindEpoch(carPath) if err != nil { return "", fmt.Errorf("failed to count items in CAR: %w", err) } + if epochObject != nil { + for _, subset := range epochObject.Subsets { + orderedCids = append(orderedCids, subset.(cidlink.Link).Cid) + } + } klog.Infof("Found %s items in car file", humanize.Comma(int64(ni))) numItems += ni } @@ -70,7 +97,8 @@ func CreateIndex_cid2subsetOffset( // To do: how to get the subset index? subset := uint64(0) numItemsIndexed := uint64(0) - for _, carPath := range carPaths { + for _, cid := range orderedCids { + carPath := carPathMap[cid] carFile, err := os.Open(carPath) if err != nil { return "", fmt.Errorf("failed to open car file: %w", err) @@ -94,6 +122,7 @@ func CreateIndex_cid2subsetOffset( c, sectionLength, err := rd.NextInfo() if err != nil { if errors.Is(err, io.EOF) { + subset++ break } return "", fmt.Errorf("encountered an error while indexing: %w", err) diff --git a/readers.go b/readers.go index 2b796a72..591fbe2f 100644 --- a/readers.go +++ b/readers.go @@ -62,6 +62,43 @@ func carCountItems(carPath string) (uint64, error) { return count, nil } +func carCountItemsAndFindEpoch(carPath string) (uint64, *ipldbindcode.Epoch, error) { + file, err := os.Open(carPath) + if err != nil { + return 0, nil, err + } + defer file.Close() + + rd, err := carreader.New(file) + if err != nil { + return 0, nil, fmt.Errorf("failed to open car file: %w", err) + } + + var count uint64 + var epochObject *ipldbindcode.Epoch + for { + _, _, block, err := rd.NextNodeBytes() + if err != nil { + if errors.Is(err, io.EOF) { + break + } + return 0, nil, err + } + count++ + + fdb := block[1] + if iplddecoders.Kind(fdb) == iplddecoders.KindEpoch { + epochObject, err = iplddecoders.DecodeEpoch(block) + if err != nil { + return 0, nil, fmt.Errorf("failed to decode Epoch node: %w", err) + } + } + } + + return count, epochObject, nil + +} + func carCountItemsByFirstByte(carPath string) (map[byte]uint64, *ipldbindcode.Epoch, error) { file, err := os.Open(carPath) if err != nil {