diff --git a/cmd-car-split.go b/cmd-car-split.go index c632757b..9e9d1e55 100644 --- a/cmd-car-split.go +++ b/cmd-car-split.go @@ -2,8 +2,11 @@ package main import ( "context" + "fmt" + "io" "io/fs" "os" + "sync" "github.com/rpcpool/yellowstone-faithful/accum" "github.com/rpcpool/yellowstone-faithful/carreader" @@ -32,6 +35,12 @@ const ( "\x01" ) +type FileInfo struct { + FileName string + FirstSlot int + LastSlot int +} + func newCmd_SplitCar() *cli.Command { return &cli.Command{ Name: "split-car", @@ -78,26 +87,94 @@ func newCmd_SplitCar() *cli.Command { } maxSize := c.Int("size") + + var ( + currentFileSize int64 + currentFileNum int + currentFile *os.File + fileMutex sync.Mutex + currentFileInfo FileInfo + ) + + createNewFile := func() error { + fileMutex.Lock() + defer fileMutex.Unlock() + + if currentFile != nil { + currentFile.Close() + } + currentFileNum++ + filename := fmt.Sprintf("%d.car", currentFileNum) + currentFile, err = os.Create(filename) + if err != nil { + return err + } + // Write the header + _, err = io.WriteString(currentFile, nulRootCarHeader) + if err != nil { + return err + } + + // Set the currentFileSize to the size of the header + currentFileSize = int64(len(nulRootCarHeader)) + currentFileInfo = FileInfo{FileName: filename, FirstSlot: -1, LastSlot: -1} + return nil + } + + writeObject := func(data []byte) error { + fileMutex.Lock() + defer fileMutex.Unlock() + + if currentFile == nil || currentFileSize+int64(len(data)) > maxFileSize { + if err := createNewFile(); err != nil { + return err + } + } + + _, err := currentFile.Write(data) + if err != nil { + return fmt.Errorf("failed to write object: %w", err) + } + currentFileSize += int64(len(data)) + return nil + } + + processObject := func(data []byte) error { + kind, err := iplddecoders.GetKind(data) + if err != nil { + return err + } + + if kind == iplddecoders.KindBlock { + block, err := iplddecoders.DecodeBlock(data) + if err != nil { + return err + } + + if currentFileInfo.FirstSlot == -1 || block.Slot < currentFileInfo.FirstSlot { + currentFileInfo.FirstSlot = block.Slot + } + if block.Slot > currentFileInfo.LastSlot { + currentFileInfo.LastSlot = block.Slot + } + } + + return writeObject(data) + } + accum := accum.NewObjectAccumulator( rd, iplddecoders.KindBlock, func(owm1 *accum.ObjectWithMetadata, owm2 []accum.ObjectWithMetadata) error { - size := 0 - suffix := 0 - firstSlot := 0 for _, owm := range owm2 { - // create new car file with suffix - // write header to car file - for size < maxSize { - // if object is Block, update firstSlot - // write data + if err := processObject(owm.ObjectData); err != nil { + return err } - // write subsetBlock - // close file - } - //append epochBlock to last file + if err := processObject(owm1.ObjectData); err != nil { + return err + } return nil }, iplddecoders.KindEpoch, @@ -108,6 +185,8 @@ func newCmd_SplitCar() *cli.Command { klog.Exitf("error while accumulating objects: %w", err) } + // To do: Construct and write the SubSet node + return nil }, }