From 6c8e7a6eec9116cec636c1c9eb374f44fc4f236e Mon Sep 17 00:00:00 2001 From: anjor Date: Wed, 28 Aug 2024 10:35:39 +0100 Subject: [PATCH] sort based on subset --- index-cid-to-subset-offset.go | 45 ++++++++++------------------- readers.go | 53 +++++++++++++++++++++-------------- 2 files changed, 47 insertions(+), 51 deletions(-) diff --git a/index-cid-to-subset-offset.go b/index-cid-to-subset-offset.go index 48f93613..33731581 100644 --- a/index-cid-to-subset-offset.go +++ b/index-cid-to-subset-offset.go @@ -8,20 +8,25 @@ import ( "io" "os" "path/filepath" + "sort" "time" "github.com/davecgh/go-spew/spew" "github.com/dustin/go-humanize" - "github.com/ipfs/go-cid" "github.com/ipld/go-car/util" carv2 "github.com/ipld/go-car/v2" - cidlink "github.com/ipld/go-ipld-prime/linking/cid" "github.com/rpcpool/yellowstone-faithful/carreader" "github.com/rpcpool/yellowstone-faithful/indexes" + "github.com/rpcpool/yellowstone-faithful/ipld/ipldbindcode" "github.com/rpcpool/yellowstone-faithful/iplddecoders" "k8s.io/klog/v2" ) +type subsetAndCar struct { + subset *ipldbindcode.Subset + carPath string +} + func CreateIndex_cid2subsetOffset( ctx context.Context, epoch uint64, @@ -31,8 +36,7 @@ func CreateIndex_cid2subsetOffset( indexDir string, ) (string, error) { var numItems uint64 - var orderedCids []cid.Cid - carPathMap := make(map[cid.Cid]string) + var subsetAndCars []subsetAndCar for _, carPath := range carPaths { // Check if the CAR file exists: exists, err := fileExists(carPath) @@ -43,39 +47,20 @@ func CreateIndex_cid2subsetOffset( return "", fmt.Errorf("CAR file %q does not exist", carPath) } - carFile, err := os.Open(carPath) - if err != nil { - return "", fmt.Errorf("failed to open car file: %w", err) - } - defer carFile.Close() - - rd, err := carreader.New(carFile) - if err != nil { - return "", fmt.Errorf("failed to create car reader: %w", err) - } - - // check it has 1 root - if len(rd.Header.Roots) != 1 { - return "", fmt.Errorf("car file must have exactly 1 root, but has %d", len(rd.Header.Roots)) - } - - carPathMap[rd.Header.Roots[0]] = carPath - klog.Infof("Counting items in car file: %s", carPath) - ni, epochObject, err := carCountItemsAndFindEpoch(carPath) + ni, s, err := carCountItemsWithSubset(carPath) if err != nil { return "", fmt.Errorf("failed to count items in CAR: %w", err) } - if epochObject != nil { - for _, subset := range epochObject.Subsets { - orderedCids = append(orderedCids, subset.(cidlink.Link).Cid) - } - } + subsetAndCars = append(subsetAndCars, subsetAndCar{subset: s, carPath: carPath}) klog.Infof("Found %s items in car file", humanize.Comma(int64(ni))) numItems += ni } klog.Infof("Found a total of %d items in car files", numItems) + sort.Slice(subsetAndCars, func(i, j int) bool { + return subsetAndCars[i].subset.First < subsetAndCars[j].subset.First + }) tmpDir = filepath.Join(tmpDir, "index-cid-to-subset-offset-"+time.Now().Format("20060102-150405.000000000")) if err := os.MkdirAll(tmpDir, 0o755); err != nil { @@ -97,8 +82,8 @@ func CreateIndex_cid2subsetOffset( // To do: how to get the subset index? subset := uint64(0) numItemsIndexed := uint64(0) - for _, cid := range orderedCids { - carPath := carPathMap[cid] + for _, info := range subsetAndCars { + carPath := info.carPath carFile, err := os.Open(carPath) if err != nil { return "", fmt.Errorf("failed to open car file: %w", err) diff --git a/readers.go b/readers.go index 591fbe2f..17bde02c 100644 --- a/readers.go +++ b/readers.go @@ -62,19 +62,21 @@ func carCountItems(carPath string) (uint64, error) { return count, nil } -func carCountItemsAndFindEpoch(carPath string) (uint64, *ipldbindcode.Epoch, error) { +func carCountItemsByFirstByte(carPath string) (map[byte]uint64, *ipldbindcode.Epoch, error) { file, err := os.Open(carPath) if err != nil { - return 0, nil, err + return nil, nil, err } defer file.Close() rd, err := carreader.New(file) if err != nil { - return 0, nil, fmt.Errorf("failed to open car file: %w", err) + return nil, nil, fmt.Errorf("failed to open car file: %w", err) } - var count uint64 + numTotalItems := uint64(0) + counts := make(map[byte]uint64) + startedCountAt := time.Now() var epochObject *ipldbindcode.Epoch for { _, _, block, err := rd.NextNodeBytes() @@ -82,50 +84,59 @@ func carCountItemsAndFindEpoch(carPath string) (uint64, *ipldbindcode.Epoch, err if errors.Is(err, io.EOF) { break } - return 0, nil, err + return nil, nil, err + } + // the first data byte is the block type (after the CBOR tag) + firstDataByte := block[1] + counts[firstDataByte]++ + numTotalItems++ + + if numTotalItems%1_000_000 == 0 { + printToStderr( + fmt.Sprintf("\rCounted %s items", humanize.Comma(int64(numTotalItems))), + ) } - count++ - fdb := block[1] - if iplddecoders.Kind(fdb) == iplddecoders.KindEpoch { + if iplddecoders.Kind(firstDataByte) == iplddecoders.KindEpoch { epochObject, err = iplddecoders.DecodeEpoch(block) if err != nil { - return 0, nil, fmt.Errorf("failed to decode Epoch node: %w", err) + return nil, nil, fmt.Errorf("failed to decode Epoch node: %w", err) } } } - return count, epochObject, nil + printToStderr( + fmt.Sprintf("\rCounted %s items in %s\n", humanize.Comma(int64(numTotalItems)), time.Since(startedCountAt).Truncate(time.Second)), + ) + return counts, epochObject, err } -func carCountItemsByFirstByte(carPath string) (map[byte]uint64, *ipldbindcode.Epoch, error) { +func carCountItemsWithSubset(carPath string) (uint64, *ipldbindcode.Subset, error) { file, err := os.Open(carPath) if err != nil { - return nil, nil, err + return 0, nil, err } defer file.Close() rd, err := carreader.New(file) if err != nil { - return nil, nil, fmt.Errorf("failed to open car file: %w", err) + return 0, nil, fmt.Errorf("failed to open car file: %w", err) } numTotalItems := uint64(0) - counts := make(map[byte]uint64) startedCountAt := time.Now() - var epochObject *ipldbindcode.Epoch + var subsetObject *ipldbindcode.Subset for { _, _, block, err := rd.NextNodeBytes() if err != nil { if errors.Is(err, io.EOF) { break } - return nil, nil, err + return 0, nil, err } // the first data byte is the block type (after the CBOR tag) firstDataByte := block[1] - counts[firstDataByte]++ numTotalItems++ if numTotalItems%1_000_000 == 0 { @@ -134,10 +145,10 @@ func carCountItemsByFirstByte(carPath string) (map[byte]uint64, *ipldbindcode.Ep ) } - if iplddecoders.Kind(firstDataByte) == iplddecoders.KindEpoch { - epochObject, err = iplddecoders.DecodeEpoch(block) + if iplddecoders.Kind(firstDataByte) == iplddecoders.KindSubset { + subsetObject, err = iplddecoders.DecodeSubset(block) if err != nil { - return nil, nil, fmt.Errorf("failed to decode Epoch node: %w", err) + return 0, nil, fmt.Errorf("failed to decode Epoch node: %w", err) } } } @@ -146,7 +157,7 @@ func carCountItemsByFirstByte(carPath string) (map[byte]uint64, *ipldbindcode.Ep fmt.Sprintf("\rCounted %s items in %s\n", humanize.Comma(int64(numTotalItems)), time.Since(startedCountAt).Truncate(time.Second)), ) - return counts, epochObject, err + return numTotalItems, subsetObject, err } func printToStderr(msg string) {