From 0cd46f40138e1a369f164e2c4d0b102bd8b3ee74 Mon Sep 17 00:00:00 2001 From: Anjor Kanekar Date: Tue, 1 Oct 2024 13:42:14 +0100 Subject: [PATCH] metadata (#132) * metadata * function signature * write at end * fix * header size * use existing metadata struct * metadata fix * m * header * fixes --- cmd-car-split.go | 165 +++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 138 insertions(+), 27 deletions(-) diff --git a/cmd-car-split.go b/cmd-car-split.go index 665886ab..9cc21c6c 100644 --- a/cmd-car-split.go +++ b/cmd-car-split.go @@ -4,6 +4,8 @@ import ( "bufio" "bytes" "context" + "encoding/base64" + "encoding/binary" "encoding/csv" "fmt" "io" @@ -12,6 +14,7 @@ import ( "path/filepath" "strconv" + "github.com/anjor/carlet" commcid "github.com/filecoin-project/go-fil-commcid" commp "github.com/filecoin-project/go-fil-commp-hashhash" "github.com/filecoin-project/go-leb128" @@ -28,11 +31,20 @@ import ( "github.com/rpcpool/yellowstone-faithful/carreader" "github.com/rpcpool/yellowstone-faithful/ipld/ipldbindcode" "github.com/rpcpool/yellowstone-faithful/iplddecoders" + splitcarfetcher "github.com/rpcpool/yellowstone-faithful/split-car-fetcher" "github.com/urfave/cli/v2" + "gopkg.in/yaml.v2" "k8s.io/klog/v2" ) -var CBOR_SHA256_DUMMY_CID = cid.MustParse("bafyreics5uul5lbtxslcigtoa5fkba7qgwu7cyb7ih7z6fzsh4lgfgraau") +var ( + CBOR_SHA256_DUMMY_CID = cid.MustParse("bafyreics5uul5lbtxslcigtoa5fkba7qgwu7cyb7ih7z6fzsh4lgfgraau") + hdr = &car.CarHeader{ + Roots: []cid.Cid{CBOR_SHA256_DUMMY_CID}, // placeholder + Version: 1, + } + hdrSize, _ = car.HeaderSize(hdr) +) const maxLinks = 432000 / 18 // 18 subsets @@ -48,7 +60,7 @@ type carFile struct { commP cid.Cid payloadCid cid.Cid paddedSize uint64 - fileSize int64 + fileSize uint64 } func newCmd_SplitCar() *cli.Command { @@ -99,7 +111,35 @@ func newCmd_SplitCar() *cli.Command { defer file.Close() } - rd, err := carreader.New(file) + var ( + currentFileSize uint64 + currentFileNum int + currentFile *os.File + bufferedWriter *bufio.Writer + currentSubsetInfo subsetInfo + subsetLinks []datamodel.Link + writer io.Writer + carFiles []carFile + metadata *splitcarfetcher.Metadata + ) + + metadata = &splitcarfetcher.Metadata{} + headerBuf := new(bytes.Buffer) + teeReader := io.TeeReader(file, headerBuf) + + streamBuf := bufio.NewReaderSize(teeReader, 1<<20) + + actualHeader, headerSize, err := readHeader(streamBuf) + if err != nil { + return fmt.Errorf("failed to read header: %w", err) + } + + encodedHeader := base64.StdEncoding.EncodeToString(actualHeader) + + metadata.CarPieces = &carlet.CarPiecesAndMetadata{OriginalCarHeader: encodedHeader, OriginalCarHeaderSize: uint64(headerSize)} + + combinedReader := io.MultiReader(headerBuf, file) + rd, err := carreader.New(io.NopCloser(combinedReader)) if err != nil { return fmt.Errorf("failed to open CAR: %w", err) } @@ -117,7 +157,7 @@ func newCmd_SplitCar() *cli.Command { } epoch := c.Int("epoch") - maxFileSize := c.Int64("size") + maxFileSize := uint64(c.Int64("size")) outputDir := c.String("output-dir") meta := c.String("metadata") @@ -127,17 +167,6 @@ func newCmd_SplitCar() *cli.Command { cp := new(commp.Calc) - var ( - currentFileSize int64 - currentFileNum int - currentFile *os.File - bufferedWriter *bufio.Writer - currentSubsetInfo subsetInfo - subsetLinks []datamodel.Link - writer io.Writer - carFiles []carFile - ) - createNewFile := func() error { if currentFile != nil { @@ -157,9 +186,25 @@ func newCmd_SplitCar() *cli.Command { return fmt.Errorf("failed to calculate commitment to cid: %w", err) } - cf := carFile{name: fmt.Sprintf("epoch-%d-%d.car", epoch, currentFileNum), commP: commCid, payloadCid: sl.(cidlink.Link).Cid, paddedSize: ps, fileSize: currentFileSize} + cf := carFile{ + name: fmt.Sprintf("epoch-%d-%d.car", epoch, currentFileNum), + commP: commCid, + payloadCid: sl.(cidlink.Link).Cid, + paddedSize: ps, + fileSize: currentFileSize, + } carFiles = append(carFiles, cf) + metadata.CarPieces.CarPieces = append( + metadata.CarPieces.CarPieces, + carlet.CarFile{ + Name: currentSubsetInfo.fileName, + ContentSize: currentFileSize - hdrSize, + HeaderSize: hdrSize, + CommP: commCid, + PaddedSize: ps, + }) + err = closeFile(bufferedWriter, currentFile) if err != nil { return fmt.Errorf("failed to close file: %w", err) @@ -183,17 +228,12 @@ func newCmd_SplitCar() *cli.Command { bufferedWriter = bufio.NewWriter(currentFile) writer = io.MultiWriter(bufferedWriter, cp) - // Write the header - hdr := car.CarHeader{ - Roots: []cid.Cid{CBOR_SHA256_DUMMY_CID}, // placeholder - Version: 1, - } - if err := car.WriteHeader(&hdr, writer); err != nil { + if err := car.WriteHeader(hdr, writer); err != nil { return fmt.Errorf("failed to write header: %w", err) } // Set the currentFileSize to the size of the header - currentFileSize = int64(len(nulRootCarHeader)) + currentFileSize = uint64(len(nulRootCarHeader)) currentSubsetInfo = subsetInfo{fileName: filename, firstSlot: -1, lastSlot: -1} return nil } @@ -203,7 +243,7 @@ func newCmd_SplitCar() *cli.Command { if err != nil { return fmt.Errorf("failed to write object to car file: %s, error: %w", currentFile.Name(), err) } - currentFileSize += int64(len(data)) + currentFileSize += uint64(len(data)) return nil } @@ -238,7 +278,8 @@ func newCmd_SplitCar() *cli.Command { dagSize += owm.RawSectionSize() } - if currentFile == nil || currentFileSize+int64(dagSize) > maxFileSize || len(currentSubsetInfo.blockLinks) > maxLinks { + if currentFile == nil || currentFileSize+uint64(dagSize) > maxFileSize || len(currentSubsetInfo.blockLinks) > maxLinks { + klog.Infof("Creating new file, currentFileSize: %d, dagSize: %d, maxFileSize: %d, maxLinks: %d, currentSubsetInfo.blockLinks: %d", currentFileSize, dagSize, maxFileSize, maxLinks, len(currentSubsetInfo.blockLinks)) err := createNewFile() if err != nil { return fmt.Errorf("failed to create a new file: %w", err) @@ -311,8 +352,23 @@ func newCmd_SplitCar() *cli.Command { return fmt.Errorf("failed to calculate commitment to cid: %w", err) } - cf := carFile{name: fmt.Sprintf("epoch-%d-%d.car", epoch, currentFileNum), commP: commCid, payloadCid: sl.(cidlink.Link).Cid, paddedSize: ps, fileSize: currentFileSize} + cf := carFile{ + name: fmt.Sprintf("epoch-%d-%d.car", epoch, currentFileNum), + commP: commCid, + payloadCid: sl.(cidlink.Link).Cid, + paddedSize: ps, + fileSize: currentFileSize, + } + carFiles = append(carFiles, cf) + metadata.CarPieces.CarPieces = append( + metadata.CarPieces.CarPieces, + carlet.CarFile{ + Name: currentSubsetInfo.fileName, + ContentSize: currentFileSize - hdrSize, + HeaderSize: hdrSize, + CommP: commCid, + }) err = closeFile(bufferedWriter, currentFile) if err != nil { @@ -342,8 +398,16 @@ func newCmd_SplitCar() *cli.Command { c.commP.String(), c.payloadCid.String(), strconv.FormatUint(c.paddedSize, 10), - strconv.FormatInt(c.fileSize, 10), + strconv.FormatUint(c.fileSize, 10), }) + if err != nil { + return fmt.Errorf("failed to write metatadata csv: %w", err) + } + } + + err = writeMetadata(metadata, epoch) + if err != nil { + return fmt.Errorf("failed to write metatadata yaml: %w", err) } return nil @@ -419,3 +483,50 @@ func writeNode(node datamodel.Node, w io.Writer) (cid.Cid, error) { } return cd, nil } + +func writeMetadata(metadata *splitcarfetcher.Metadata, epoch int) error { + metadataFileName := fmt.Sprintf("epoch-%d-metadata.yaml", epoch) + + // Open file in append mode + metadataFile, err := os.OpenFile(metadataFileName, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + return fmt.Errorf("failed to open metadata file: %w", err) + } + defer metadataFile.Close() + + encoder := yaml.NewEncoder(metadataFile) + err = encoder.Encode(metadata) + if err != nil { + return fmt.Errorf("failed to encode metadata: %w", err) + } + + return nil +} + +func readHeader(streamBuf *bufio.Reader) ([]byte, int64, error) { + maybeHeaderLen, err := streamBuf.Peek(varintSize) + if err != nil { + return nil, 0, fmt.Errorf("failed to read header: %s", err) + } + + hdrLen, viLen := binary.Uvarint(maybeHeaderLen) + if hdrLen <= 0 || viLen < 0 { + return nil, 0, fmt.Errorf("unexpected header len = %d, varint len = %d", hdrLen, viLen) + } + + actualViLen, err := io.CopyN(io.Discard, streamBuf, int64(viLen)) + if err != nil { + return nil, 0, fmt.Errorf("failed to discard header varint: %s", err) + } + streamLen := actualViLen + + headerBuf := new(bytes.Buffer) + + actualHdrLen, err := io.CopyN(headerBuf, streamBuf, int64(hdrLen)) + if err != nil { + return nil, 0, fmt.Errorf("failed to read header: %s", err) + } + streamLen += actualHdrLen + + return headerBuf.Bytes(), streamLen, nil +}