Skip to content

Commit

Permalink
more
Browse files Browse the repository at this point in the history
  • Loading branch information
anjor committed Jul 3, 2024
1 parent c99516f commit 70b19cb
Showing 1 changed file with 91 additions and 12 deletions.
103 changes: 91 additions & 12 deletions cmd-car-split.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ package main

import (
"context"
"fmt"
"io"
"io/fs"
"os"
"sync"

"github.com/rpcpool/yellowstone-faithful/accum"
"github.com/rpcpool/yellowstone-faithful/carreader"
Expand Down Expand Up @@ -32,6 +35,12 @@ const (
"\x01"
)

type FileInfo struct {
FileName string
FirstSlot int
LastSlot int
}

func newCmd_SplitCar() *cli.Command {
return &cli.Command{
Name: "split-car",
Expand Down Expand Up @@ -78,26 +87,94 @@ func newCmd_SplitCar() *cli.Command {
}

maxSize := c.Int("size")

var (
currentFileSize int64
currentFileNum int
currentFile *os.File
fileMutex sync.Mutex
currentFileInfo FileInfo
)

createNewFile := func() error {
fileMutex.Lock()
defer fileMutex.Unlock()

if currentFile != nil {
currentFile.Close()
}
currentFileNum++
filename := fmt.Sprintf("%d.car", currentFileNum)
currentFile, err = os.Create(filename)
if err != nil {
return err
}
// Write the header
_, err = io.WriteString(currentFile, nulRootCarHeader)
if err != nil {
return err
}

// Set the currentFileSize to the size of the header
currentFileSize = int64(len(nulRootCarHeader))
currentFileInfo = FileInfo{FileName: filename, FirstSlot: -1, LastSlot: -1}
return nil
}

writeObject := func(data []byte) error {
fileMutex.Lock()
defer fileMutex.Unlock()

if currentFile == nil || currentFileSize+int64(len(data)) > maxFileSize {
if err := createNewFile(); err != nil {
return err
}
}

_, err := currentFile.Write(data)
if err != nil {
return fmt.Errorf("failed to write object: %w", err)
}
currentFileSize += int64(len(data))
return nil
}

processObject := func(data []byte) error {
kind, err := iplddecoders.GetKind(data)
if err != nil {
return err
}

if kind == iplddecoders.KindBlock {
block, err := iplddecoders.DecodeBlock(data)
if err != nil {
return err
}

if currentFileInfo.FirstSlot == -1 || block.Slot < currentFileInfo.FirstSlot {
currentFileInfo.FirstSlot = block.Slot
}
if block.Slot > currentFileInfo.LastSlot {
currentFileInfo.LastSlot = block.Slot
}
}

return writeObject(data)
}

accum := accum.NewObjectAccumulator(
rd,
iplddecoders.KindBlock,
func(owm1 *accum.ObjectWithMetadata, owm2 []accum.ObjectWithMetadata) error {
size := 0
suffix := 0
firstSlot := 0
for _, owm := range owm2 {
// create new car file with suffix
// write header to car file
for size < maxSize {
// if object is Block, update firstSlot
// write data
if err := processObject(owm.ObjectData); err != nil {
return err
}
// write subsetBlock
// close file

}

//append epochBlock to last file
if err := processObject(owm1.ObjectData); err != nil {
return err
}
return nil
},
iplddecoders.KindEpoch,
Expand All @@ -108,6 +185,8 @@ func newCmd_SplitCar() *cli.Command {
klog.Exitf("error while accumulating objects: %w", err)
}

// To do: Construct and write the SubSet node

return nil
},
}
Expand Down

0 comments on commit 70b19cb

Please sign in to comment.