Skip to content

Commit

Permalink
more
Browse files Browse the repository at this point in the history
  • Loading branch information
anjor committed Aug 13, 2024
1 parent a4fedaa commit 66c9ff2
Show file tree
Hide file tree
Showing 3 changed files with 202 additions and 60 deletions.
32 changes: 16 additions & 16 deletions cmd-x-index-cid2subsetoffset.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,32 +77,32 @@ func newCmd_Index_cid2subsetOffset() *cli.Command {
defer func() {
klog.Infof("Finished in %s", time.Since(startedAt))
}()
klog.Infof("Creating CID-to-offset index for %s", carPath)
indexFilepath, err := CreateIndex_cid2offset(
klog.Infof("Creating CID-to-offset index")
indexFilepath, err := CreateIndex_cid2subsetOffset(

Check failure on line 81 in cmd-x-index-cid2subsetoffset.go

View workflow job for this annotation

GitHub Actions / test (1.21.x, ubuntu-latest)

indexFilepath declared and not used
context.TODO(),
epoch,
network,
tmpDir,
carPath,
carPaths,
indexDir,
)
if err != nil {
panic(err)
}
klog.Info("Index created")
if verify {
klog.Infof("Verifying index for %s located at %s", carPath, indexFilepath)
startedAt := time.Now()
defer func() {
klog.Infof("Finished in %s", time.Since(startedAt))
}()
err := VerifyIndex_cid2offset(context.TODO(), carPath, indexFilepath)
if err != nil {
return cli.Exit(err, 1)
}
klog.Info("Index verified")
return nil
}
// if verify {
// klog.Infof("Verifying index located at %s", indexFilepath)
// startedAt := time.Now()
// defer func() {
// klog.Infof("Finished in %s", time.Since(startedAt))
// }()
// err := VerifyIndex_cid2subsetOffset(context.TODO(), indexFilepath)
// if err != nil {
// return cli.Exit(err, 1)
// }
// klog.Info("Index verified")
// return nil
// }
}
return nil
},
Expand Down
213 changes: 182 additions & 31 deletions index-cid-to-subset-offset.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package main

import (
"context"
"errors"
"fmt"
"io"
"os"
"path/filepath"
"time"
Expand All @@ -21,7 +23,7 @@ func CreateIndex_cid2subsetOffset(
carPaths []string,
indexDir string,
) (string, error) {

var numItems uint64
for _, carPath := range carPaths {
// Check if the CAR file exists:
exists, err := fileExists(carPath)
Expand All @@ -32,6 +34,38 @@ func CreateIndex_cid2subsetOffset(
return "", fmt.Errorf("CAR file %q does not exist", carPath)
}

klog.Infof("Counting items in car file: %s", carPath)
ni, err := carCountItems(carPath)
if err != nil {
return "", fmt.Errorf("failed to count items in CAR: %w", err)
}
klog.Infof("Found %s items in car file", humanize.Comma(int64(ni)))
numItems += ni
}

klog.Infof("Found a total of %d items in car files", numItems)

tmpDir = filepath.Join(tmpDir, "index-cid-to-subset-offset-"+time.Now().Format("20060102-150405.000000000"))
if err := os.MkdirAll(tmpDir, 0o755); err != nil {
return "", fmt.Errorf("failed to create tmp dir: %w", err)
}

klog.Infof("Creating builder with %d items", numItems)
c2so, err := indexes.NewWriter_CidToSubsetOffsetAndSize(
epoch,
network,
tmpDir,
numItems,
)
if err != nil {
return "", fmt.Errorf("failed to open index store: %w", err)
}
defer c2so.Close()

// To do: how to get the subset index?
subset := uint64(0)
numItemsIndexed := uint64(0)
for _, carPath := range carPaths {
carFile, err := os.Open(carPath)
if err != nil {
return "", fmt.Errorf("failed to open car file: %w", err)
Expand All @@ -42,43 +76,160 @@ func CreateIndex_cid2subsetOffset(
if err != nil {
return "", fmt.Errorf("failed to create car reader: %w", err)
}
// check it has 1 root
if len(rd.Header.Roots) != 1 {
return "", fmt.Errorf("car file must have exactly 1 root, but has %d", len(rd.Header.Roots))
}

klog.Infof("Getting car file size")
targetFileSize, err := getFileSize(carPath)
if err != nil {
return "", fmt.Errorf("failed to get car file size: %w", err)
totalOffset := uint64(0)
{
if size, err := rd.HeaderSize(); err != nil {
return "", fmt.Errorf("failed to get car header size: %w", err)
} else {
totalOffset += size
}
}
for {
c, sectionLength, err := rd.NextInfo()
if err != nil {
if errors.Is(err, io.EOF) {
break
}
return "", fmt.Errorf("encountered an error while indexing: %w", err)
}

klog.Infof("Counting items in car file...")
numItems, err := carCountItems(carPath)
if err != nil {
return "", fmt.Errorf("failed to count items in car file: %w", err)
}
klog.Infof("Found %s items in car file", humanize.Comma(int64(numItems)))
err = c2so.Put(c, subset, totalOffset, sectionLength)
if err != nil {
return "", fmt.Errorf("failed to put cid to subset, offset: %w", err)
}

tmpDir = filepath.Join(tmpDir, "index-cid-to-subset-offset-"+time.Now().Format("20060102-150405.000000000"))
if err = os.MkdirAll(tmpDir, 0o755); err != nil {
return "", fmt.Errorf("failed to create tmp dir: %w", err)
totalOffset += sectionLength

numItemsIndexed++
if numItemsIndexed%100_000 == 0 {
printToStderr(".")
}
}

rootCid := rd.Header.Roots[0]
}

klog.Infof("Creating builder with %d items and target file size %d", numItems, targetFileSize)
c2so, err := indexes.NewWriter_CidToSubsetOffsetAndSize(
epoch,
rootCid,
network,
tmpDir,
numItems,
)
if err != nil {
return "", fmt.Errorf("failed to open index store: %w", err)
}
defer c2so.Close()
klog.Infof("Sealing index...")
if err = c2so.Seal(ctx, indexDir); err != nil {
return "", fmt.Errorf("failed to seal index: %w", err)
}

indexFilePath := c2so.GetFilePath()
klog.Infof("Index created at %s, %d items indexed", indexFilePath, numItemsIndexed)
return indexFilePath, nil
}

// func VerifyIndex_cid2offset(ctx context.Context, carPath string, indexFilePath string) error {
// // Check if the CAR file exists:
// exists, err := fileExists(carPath)
// if err != nil {
// return fmt.Errorf("failed to check if CAR file exists: %w", err)
// }
// if !exists {
// return fmt.Errorf("CAR file %s does not exist", carPath)
// }

// // Check if the index file exists:
// exists, err = fileExists(indexFilePath)
// if err != nil {
// return fmt.Errorf("failed to check if index file exists: %w", err)
// }
// if !exists {
// return fmt.Errorf("index file %s does not exist", indexFilePath)
// }

// carFile, err := os.Open(carPath)
// if err != nil {
// return fmt.Errorf("failed to open car file: %w", err)
// }
// defer carFile.Close()

// rd, err := carreader.New(carFile)
// if err != nil {
// return fmt.Errorf("failed to create car reader: %w", err)
// }
// // check it has 1 root
// if len(rd.Header.Roots) != 1 {
// return fmt.Errorf("car file must have exactly 1 root, but has %d", len(rd.Header.Roots))
// }

// c2o, err := indexes.Open_CidToOffsetAndSize(indexFilePath)
// if err != nil {
// return fmt.Errorf("failed to open index: %w", err)
// }
// {
// // find root cid
// rootCID := rd.Header.Roots[0]
// offset, err := c2o.Get(rootCID)
// if err != nil {
// return fmt.Errorf("failed to get offset from index: %w", err)
// }
// cr, err := carv2.OpenReader(carPath)
// if err != nil {
// return fmt.Errorf("failed to open CAR file: %w", err)
// }
// defer cr.Close()

// dr, err := cr.DataReader()
// if err != nil {
// return fmt.Errorf("failed to open CAR data reader: %w", err)
// }
// dr.Seek(int64(offset.Offset), io.SeekStart)
// br := bufio.NewReader(dr)

// gotCid, data, err := util.ReadNode(br)
// if err != nil {
// return err
// }
// // verify that the CID we read matches the one we expected.
// if !gotCid.Equals(rootCID) {
// return fmt.Errorf("CID mismatch: expected %s, got %s", rootCID, gotCid)
// }
// // try parsing the data as an Epoch node.
// decoded, err := iplddecoders.DecodeEpoch(data)
// if err != nil {
// return fmt.Errorf("failed to decode root node: %w", err)
// }
// spew.Dump(decoded)
// }

// startedAt := time.Now()
// numItems := 0
// defer func() {
// klog.Infof("Finished in %s", time.Since(startedAt))
// klog.Infof("Read %d nodes", numItems)
// }()

// totalOffset := uint64(0)
// {
// if size, err := rd.HeaderSize(); err != nil {
// return err
// } else {
// totalOffset += size
// }
// }
// for {
// c, sectionLen, err := rd.NextInfo()
// if errors.Is(err, io.EOF) {
// klog.Infof("EOF")
// break
// }
// numItems++
// if numItems%100000 == 0 {
// printToStderr(".")
// }
// offset, err := c2o.Get(c)
// if err != nil {
// return fmt.Errorf("failed to lookup offset for %s: %w", c, err)
// }
// if offset.Offset != totalOffset {
// return fmt.Errorf("offset mismatch for %s: %d != %d", c, offset, totalOffset)
// }
// if offset.Size != sectionLen {
// return fmt.Errorf("length mismatch for %s: %d != %d", c, offset, sectionLen)
// }

// totalOffset += sectionLen
// }
// return nil
// }
17 changes: 4 additions & 13 deletions indexes/index-cid-to-subset-offset-and-size.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,31 +27,26 @@ const (
)

// todo: not sure if this file name is correct
func formatFilename_CidToSubsetOffsetAndSize(epoch uint64, rootCid cid.Cid, network Network) string {
func formatFilename_CidToSubsetOffsetAndSize(epoch uint64, network Network) string {
return fmt.Sprintf(
"epoch-%d-%s-%s-%s",
"epoch-%d-%s-%s",
epoch,
rootCid.String(),
network,
"cid-to-offset-and-size.index",
"cid-to-subset-offset-and-size.index",
)
}

var Kind_CidToSubsetOffsetAndSize = []byte("cid-to-subset-offset-and-size")

func NewWriter_CidToSubsetOffsetAndSize(
epoch uint64,
rootCid cid.Cid,
network Network,
tmpDir string,
numItems uint64,
) (*CidToSubsetOffsetAndSize_Writer, error) {
if !IsValidNetwork(network) {
return nil, ErrInvalidNetwork
}
if rootCid == cid.Undef {
return nil, ErrInvalidRootCid
}
index, err := compactindexsized.NewBuilderSized(
tmpDir,
uint(numItems),
Expand All @@ -62,7 +57,6 @@ func NewWriter_CidToSubsetOffsetAndSize(
}
meta := &Metadata{
Epoch: epoch,
RootCid: rootCid,
Network: network,
IndexKind: Kind_CidToSubsetOffsetAndSize,
}
Expand Down Expand Up @@ -107,7 +101,7 @@ func (w *CidToSubsetOffsetAndSize_Writer) Seal(ctx context.Context, dstDir strin
return fmt.Errorf("already sealed")
}

filepath := filepath.Join(dstDir, formatFilename_CidToSubsetOffsetAndSize(w.meta.Epoch, w.meta.RootCid, w.meta.Network))
filepath := filepath.Join(dstDir, formatFilename_CidToSubsetOffsetAndSize(w.meta.Epoch, w.meta.Network))
w.finalPath = filepath

file, err := os.Create(filepath)
Expand Down Expand Up @@ -163,9 +157,6 @@ func OpenWithReader_CidToSubsetOffsetAndSize(reader ReaderAtCloser) (*CidToSubse
if !IsValidNetwork(meta.Network) {
return nil, fmt.Errorf("invalid network")
}
if meta.RootCid == cid.Undef {
return nil, fmt.Errorf("root cid is undefined")
}
if err := meta.AssertIndexKind(Kind_CidToSubsetOffsetAndSize); err != nil {
return nil, err
}
Expand Down

0 comments on commit 66c9ff2

Please sign in to comment.