Skip to content

Commit

Permalink
metadata (#132)
Browse files Browse the repository at this point in the history
* metadata

* function signature

* write at end

* fix

* header size

* use existing metadata struct

* metadata fix

* m

* header

* fixes
  • Loading branch information
anjor authored Oct 1, 2024
1 parent 4910c9a commit 0cd46f4
Showing 1 changed file with 138 additions and 27 deletions.
165 changes: 138 additions & 27 deletions cmd-car-split.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"bufio"
"bytes"
"context"
"encoding/base64"
"encoding/binary"
"encoding/csv"
"fmt"
"io"
Expand All @@ -12,6 +14,7 @@ import (
"path/filepath"
"strconv"

"github.com/anjor/carlet"
commcid "github.com/filecoin-project/go-fil-commcid"
commp "github.com/filecoin-project/go-fil-commp-hashhash"
"github.com/filecoin-project/go-leb128"
Expand All @@ -28,11 +31,20 @@ import (
"github.com/rpcpool/yellowstone-faithful/carreader"
"github.com/rpcpool/yellowstone-faithful/ipld/ipldbindcode"
"github.com/rpcpool/yellowstone-faithful/iplddecoders"
splitcarfetcher "github.com/rpcpool/yellowstone-faithful/split-car-fetcher"
"github.com/urfave/cli/v2"
"gopkg.in/yaml.v2"
"k8s.io/klog/v2"
)

var CBOR_SHA256_DUMMY_CID = cid.MustParse("bafyreics5uul5lbtxslcigtoa5fkba7qgwu7cyb7ih7z6fzsh4lgfgraau")
var (
CBOR_SHA256_DUMMY_CID = cid.MustParse("bafyreics5uul5lbtxslcigtoa5fkba7qgwu7cyb7ih7z6fzsh4lgfgraau")
hdr = &car.CarHeader{
Roots: []cid.Cid{CBOR_SHA256_DUMMY_CID}, // placeholder
Version: 1,
}
hdrSize, _ = car.HeaderSize(hdr)
)

const maxLinks = 432000 / 18 // 18 subsets

Expand All @@ -48,7 +60,7 @@ type carFile struct {
commP cid.Cid
payloadCid cid.Cid
paddedSize uint64
fileSize int64
fileSize uint64
}

func newCmd_SplitCar() *cli.Command {
Expand Down Expand Up @@ -99,7 +111,35 @@ func newCmd_SplitCar() *cli.Command {
defer file.Close()
}

rd, err := carreader.New(file)
var (
currentFileSize uint64
currentFileNum int
currentFile *os.File
bufferedWriter *bufio.Writer
currentSubsetInfo subsetInfo
subsetLinks []datamodel.Link
writer io.Writer
carFiles []carFile
metadata *splitcarfetcher.Metadata
)

metadata = &splitcarfetcher.Metadata{}
headerBuf := new(bytes.Buffer)
teeReader := io.TeeReader(file, headerBuf)

streamBuf := bufio.NewReaderSize(teeReader, 1<<20)

actualHeader, headerSize, err := readHeader(streamBuf)
if err != nil {
return fmt.Errorf("failed to read header: %w", err)
}

encodedHeader := base64.StdEncoding.EncodeToString(actualHeader)

metadata.CarPieces = &carlet.CarPiecesAndMetadata{OriginalCarHeader: encodedHeader, OriginalCarHeaderSize: uint64(headerSize)}

combinedReader := io.MultiReader(headerBuf, file)
rd, err := carreader.New(io.NopCloser(combinedReader))
if err != nil {
return fmt.Errorf("failed to open CAR: %w", err)
}
Expand All @@ -117,7 +157,7 @@ func newCmd_SplitCar() *cli.Command {
}

epoch := c.Int("epoch")
maxFileSize := c.Int64("size")
maxFileSize := uint64(c.Int64("size"))
outputDir := c.String("output-dir")
meta := c.String("metadata")

Expand All @@ -127,17 +167,6 @@ func newCmd_SplitCar() *cli.Command {

cp := new(commp.Calc)

var (
currentFileSize int64
currentFileNum int
currentFile *os.File
bufferedWriter *bufio.Writer
currentSubsetInfo subsetInfo
subsetLinks []datamodel.Link
writer io.Writer
carFiles []carFile
)

createNewFile := func() error {

if currentFile != nil {
Expand All @@ -157,9 +186,25 @@ func newCmd_SplitCar() *cli.Command {
return fmt.Errorf("failed to calculate commitment to cid: %w", err)
}

cf := carFile{name: fmt.Sprintf("epoch-%d-%d.car", epoch, currentFileNum), commP: commCid, payloadCid: sl.(cidlink.Link).Cid, paddedSize: ps, fileSize: currentFileSize}
cf := carFile{
name: fmt.Sprintf("epoch-%d-%d.car", epoch, currentFileNum),
commP: commCid,
payloadCid: sl.(cidlink.Link).Cid,
paddedSize: ps,
fileSize: currentFileSize,
}
carFiles = append(carFiles, cf)

metadata.CarPieces.CarPieces = append(
metadata.CarPieces.CarPieces,
carlet.CarFile{
Name: currentSubsetInfo.fileName,
ContentSize: currentFileSize - hdrSize,
HeaderSize: hdrSize,
CommP: commCid,
PaddedSize: ps,
})

err = closeFile(bufferedWriter, currentFile)
if err != nil {
return fmt.Errorf("failed to close file: %w", err)
Expand All @@ -183,17 +228,12 @@ func newCmd_SplitCar() *cli.Command {
bufferedWriter = bufio.NewWriter(currentFile)
writer = io.MultiWriter(bufferedWriter, cp)

// Write the header
hdr := car.CarHeader{
Roots: []cid.Cid{CBOR_SHA256_DUMMY_CID}, // placeholder
Version: 1,
}
if err := car.WriteHeader(&hdr, writer); err != nil {
if err := car.WriteHeader(hdr, writer); err != nil {
return fmt.Errorf("failed to write header: %w", err)
}

// Set the currentFileSize to the size of the header
currentFileSize = int64(len(nulRootCarHeader))
currentFileSize = uint64(len(nulRootCarHeader))
currentSubsetInfo = subsetInfo{fileName: filename, firstSlot: -1, lastSlot: -1}
return nil
}
Expand All @@ -203,7 +243,7 @@ func newCmd_SplitCar() *cli.Command {
if err != nil {
return fmt.Errorf("failed to write object to car file: %s, error: %w", currentFile.Name(), err)
}
currentFileSize += int64(len(data))
currentFileSize += uint64(len(data))
return nil
}

Expand Down Expand Up @@ -238,7 +278,8 @@ func newCmd_SplitCar() *cli.Command {
dagSize += owm.RawSectionSize()
}

if currentFile == nil || currentFileSize+int64(dagSize) > maxFileSize || len(currentSubsetInfo.blockLinks) > maxLinks {
if currentFile == nil || currentFileSize+uint64(dagSize) > maxFileSize || len(currentSubsetInfo.blockLinks) > maxLinks {
klog.Infof("Creating new file, currentFileSize: %d, dagSize: %d, maxFileSize: %d, maxLinks: %d, currentSubsetInfo.blockLinks: %d", currentFileSize, dagSize, maxFileSize, maxLinks, len(currentSubsetInfo.blockLinks))
err := createNewFile()
if err != nil {
return fmt.Errorf("failed to create a new file: %w", err)
Expand Down Expand Up @@ -311,8 +352,23 @@ func newCmd_SplitCar() *cli.Command {
return fmt.Errorf("failed to calculate commitment to cid: %w", err)
}

cf := carFile{name: fmt.Sprintf("epoch-%d-%d.car", epoch, currentFileNum), commP: commCid, payloadCid: sl.(cidlink.Link).Cid, paddedSize: ps, fileSize: currentFileSize}
cf := carFile{
name: fmt.Sprintf("epoch-%d-%d.car", epoch, currentFileNum),
commP: commCid,
payloadCid: sl.(cidlink.Link).Cid,
paddedSize: ps,
fileSize: currentFileSize,
}

carFiles = append(carFiles, cf)
metadata.CarPieces.CarPieces = append(
metadata.CarPieces.CarPieces,
carlet.CarFile{
Name: currentSubsetInfo.fileName,
ContentSize: currentFileSize - hdrSize,
HeaderSize: hdrSize,
CommP: commCid,
})

err = closeFile(bufferedWriter, currentFile)
if err != nil {
Expand Down Expand Up @@ -342,8 +398,16 @@ func newCmd_SplitCar() *cli.Command {
c.commP.String(),
c.payloadCid.String(),
strconv.FormatUint(c.paddedSize, 10),
strconv.FormatInt(c.fileSize, 10),
strconv.FormatUint(c.fileSize, 10),
})
if err != nil {
return fmt.Errorf("failed to write metatadata csv: %w", err)
}
}

err = writeMetadata(metadata, epoch)
if err != nil {
return fmt.Errorf("failed to write metatadata yaml: %w", err)
}

return nil
Expand Down Expand Up @@ -419,3 +483,50 @@ func writeNode(node datamodel.Node, w io.Writer) (cid.Cid, error) {
}
return cd, nil
}

func writeMetadata(metadata *splitcarfetcher.Metadata, epoch int) error {
metadataFileName := fmt.Sprintf("epoch-%d-metadata.yaml", epoch)

// Open file in append mode
metadataFile, err := os.OpenFile(metadataFileName, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return fmt.Errorf("failed to open metadata file: %w", err)
}
defer metadataFile.Close()

encoder := yaml.NewEncoder(metadataFile)
err = encoder.Encode(metadata)
if err != nil {
return fmt.Errorf("failed to encode metadata: %w", err)
}

return nil
}

func readHeader(streamBuf *bufio.Reader) ([]byte, int64, error) {
maybeHeaderLen, err := streamBuf.Peek(varintSize)
if err != nil {
return nil, 0, fmt.Errorf("failed to read header: %s", err)
}

hdrLen, viLen := binary.Uvarint(maybeHeaderLen)
if hdrLen <= 0 || viLen < 0 {
return nil, 0, fmt.Errorf("unexpected header len = %d, varint len = %d", hdrLen, viLen)
}

actualViLen, err := io.CopyN(io.Discard, streamBuf, int64(viLen))
if err != nil {
return nil, 0, fmt.Errorf("failed to discard header varint: %s", err)
}
streamLen := actualViLen

headerBuf := new(bytes.Buffer)

actualHdrLen, err := io.CopyN(headerBuf, streamBuf, int64(hdrLen))
if err != nil {
return nil, 0, fmt.Errorf("failed to read header: %s", err)
}
streamLen += actualHdrLen

return headerBuf.Bytes(), streamLen, nil
}

0 comments on commit 0cd46f4

Please sign in to comment.