Skip to content

Commit

Permalink
sort based on subset
Browse files Browse the repository at this point in the history
  • Loading branch information
anjor committed Aug 28, 2024
1 parent a3b11ad commit 6c8e7a6
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 51 deletions.
45 changes: 15 additions & 30 deletions index-cid-to-subset-offset.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,25 @@ import (
"io"
"os"
"path/filepath"
"sort"
"time"

"github.com/davecgh/go-spew/spew"
"github.com/dustin/go-humanize"
"github.com/ipfs/go-cid"
"github.com/ipld/go-car/util"
carv2 "github.com/ipld/go-car/v2"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/rpcpool/yellowstone-faithful/carreader"
"github.com/rpcpool/yellowstone-faithful/indexes"
"github.com/rpcpool/yellowstone-faithful/ipld/ipldbindcode"
"github.com/rpcpool/yellowstone-faithful/iplddecoders"
"k8s.io/klog/v2"
)

type subsetAndCar struct {
subset *ipldbindcode.Subset
carPath string
}

func CreateIndex_cid2subsetOffset(
ctx context.Context,
epoch uint64,
Expand All @@ -31,8 +36,7 @@ func CreateIndex_cid2subsetOffset(
indexDir string,
) (string, error) {
var numItems uint64
var orderedCids []cid.Cid
carPathMap := make(map[cid.Cid]string)
var subsetAndCars []subsetAndCar
for _, carPath := range carPaths {
// Check if the CAR file exists:
exists, err := fileExists(carPath)
Expand All @@ -43,39 +47,20 @@ func CreateIndex_cid2subsetOffset(
return "", fmt.Errorf("CAR file %q does not exist", carPath)
}

carFile, err := os.Open(carPath)
if err != nil {
return "", fmt.Errorf("failed to open car file: %w", err)
}
defer carFile.Close()

rd, err := carreader.New(carFile)
if err != nil {
return "", fmt.Errorf("failed to create car reader: %w", err)
}

// check it has 1 root
if len(rd.Header.Roots) != 1 {
return "", fmt.Errorf("car file must have exactly 1 root, but has %d", len(rd.Header.Roots))
}

carPathMap[rd.Header.Roots[0]] = carPath

klog.Infof("Counting items in car file: %s", carPath)
ni, epochObject, err := carCountItemsAndFindEpoch(carPath)
ni, s, err := carCountItemsWithSubset(carPath)
if err != nil {
return "", fmt.Errorf("failed to count items in CAR: %w", err)
}
if epochObject != nil {
for _, subset := range epochObject.Subsets {
orderedCids = append(orderedCids, subset.(cidlink.Link).Cid)
}
}
subsetAndCars = append(subsetAndCars, subsetAndCar{subset: s, carPath: carPath})
klog.Infof("Found %s items in car file", humanize.Comma(int64(ni)))
numItems += ni
}

klog.Infof("Found a total of %d items in car files", numItems)
sort.Slice(subsetAndCars, func(i, j int) bool {
return subsetAndCars[i].subset.First < subsetAndCars[j].subset.First
})

tmpDir = filepath.Join(tmpDir, "index-cid-to-subset-offset-"+time.Now().Format("20060102-150405.000000000"))
if err := os.MkdirAll(tmpDir, 0o755); err != nil {
Expand All @@ -97,8 +82,8 @@ func CreateIndex_cid2subsetOffset(
// To do: how to get the subset index?
subset := uint64(0)
numItemsIndexed := uint64(0)
for _, cid := range orderedCids {
carPath := carPathMap[cid]
for _, info := range subsetAndCars {
carPath := info.carPath
carFile, err := os.Open(carPath)
if err != nil {
return "", fmt.Errorf("failed to open car file: %w", err)
Expand Down
53 changes: 32 additions & 21 deletions readers.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,70 +62,81 @@ func carCountItems(carPath string) (uint64, error) {
return count, nil
}

func carCountItemsAndFindEpoch(carPath string) (uint64, *ipldbindcode.Epoch, error) {
func carCountItemsByFirstByte(carPath string) (map[byte]uint64, *ipldbindcode.Epoch, error) {
file, err := os.Open(carPath)
if err != nil {
return 0, nil, err
return nil, nil, err
}
defer file.Close()

rd, err := carreader.New(file)
if err != nil {
return 0, nil, fmt.Errorf("failed to open car file: %w", err)
return nil, nil, fmt.Errorf("failed to open car file: %w", err)
}

var count uint64
numTotalItems := uint64(0)
counts := make(map[byte]uint64)
startedCountAt := time.Now()
var epochObject *ipldbindcode.Epoch
for {
_, _, block, err := rd.NextNodeBytes()
if err != nil {
if errors.Is(err, io.EOF) {
break
}
return 0, nil, err
return nil, nil, err
}
// the first data byte is the block type (after the CBOR tag)
firstDataByte := block[1]
counts[firstDataByte]++
numTotalItems++

if numTotalItems%1_000_000 == 0 {
printToStderr(
fmt.Sprintf("\rCounted %s items", humanize.Comma(int64(numTotalItems))),
)
}
count++

fdb := block[1]
if iplddecoders.Kind(fdb) == iplddecoders.KindEpoch {
if iplddecoders.Kind(firstDataByte) == iplddecoders.KindEpoch {
epochObject, err = iplddecoders.DecodeEpoch(block)
if err != nil {
return 0, nil, fmt.Errorf("failed to decode Epoch node: %w", err)
return nil, nil, fmt.Errorf("failed to decode Epoch node: %w", err)
}
}
}

return count, epochObject, nil
printToStderr(
fmt.Sprintf("\rCounted %s items in %s\n", humanize.Comma(int64(numTotalItems)), time.Since(startedCountAt).Truncate(time.Second)),
)

return counts, epochObject, err
}

func carCountItemsByFirstByte(carPath string) (map[byte]uint64, *ipldbindcode.Epoch, error) {
func carCountItemsWithSubset(carPath string) (uint64, *ipldbindcode.Subset, error) {
file, err := os.Open(carPath)
if err != nil {
return nil, nil, err
return 0, nil, err
}
defer file.Close()

rd, err := carreader.New(file)
if err != nil {
return nil, nil, fmt.Errorf("failed to open car file: %w", err)
return 0, nil, fmt.Errorf("failed to open car file: %w", err)
}

numTotalItems := uint64(0)
counts := make(map[byte]uint64)
startedCountAt := time.Now()
var epochObject *ipldbindcode.Epoch
var subsetObject *ipldbindcode.Subset
for {
_, _, block, err := rd.NextNodeBytes()
if err != nil {
if errors.Is(err, io.EOF) {
break
}
return nil, nil, err
return 0, nil, err
}
// the first data byte is the block type (after the CBOR tag)
firstDataByte := block[1]
counts[firstDataByte]++
numTotalItems++

if numTotalItems%1_000_000 == 0 {
Expand All @@ -134,10 +145,10 @@ func carCountItemsByFirstByte(carPath string) (map[byte]uint64, *ipldbindcode.Ep
)
}

if iplddecoders.Kind(firstDataByte) == iplddecoders.KindEpoch {
epochObject, err = iplddecoders.DecodeEpoch(block)
if iplddecoders.Kind(firstDataByte) == iplddecoders.KindSubset {
subsetObject, err = iplddecoders.DecodeSubset(block)
if err != nil {
return nil, nil, fmt.Errorf("failed to decode Epoch node: %w", err)
return 0, nil, fmt.Errorf("failed to decode Epoch node: %w", err)
}
}
}
Expand All @@ -146,7 +157,7 @@ func carCountItemsByFirstByte(carPath string) (map[byte]uint64, *ipldbindcode.Ep
fmt.Sprintf("\rCounted %s items in %s\n", humanize.Comma(int64(numTotalItems)), time.Since(startedCountAt).Truncate(time.Second)),
)

return counts, epochObject, err
return numTotalItems, subsetObject, err
}

func printToStderr(msg string) {
Expand Down

0 comments on commit 6c8e7a6

Please sign in to comment.