From 879d30c80d0f12224fd7ab3e114f7345171fa6b1 Mon Sep 17 00:00:00 2001 From: anjor Date: Mon, 1 Jul 2024 13:19:36 +0100 Subject: [PATCH 01/23] add new splitting command --- cmd-car-split.go | 80 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 80 insertions(+) create mode 100644 cmd-car-split.go diff --git a/cmd-car-split.go b/cmd-car-split.go new file mode 100644 index 00000000..571519ac --- /dev/null +++ b/cmd-car-split.go @@ -0,0 +1,80 @@ +package main + +import ( + "context" + "io/fs" + "os" + + "github.com/rpcpool/yellowstone-faithful/accum" + "github.com/rpcpool/yellowstone-faithful/carreader" + "github.com/rpcpool/yellowstone-faithful/iplddecoders" + "github.com/urfave/cli/v2" + "k8s.io/klog/v2" +) + +func newCmd_SplitCar() *cli.Command { + return &cli.Command{ + Name: "split-car", + Description: "Splits an epoch car file into smaller chunks. Each chunk corresponds to a subset.", + ArgsUsage: "", + Flags: []cli.Flag{ + &cli.IntFlag{ + Name: "size", + Aliases: []string{"s"}, + Value: 31 * 1024 * 1024 * 1024, // 31 GiB + Usage: "Target size in bytes to chunk CARs to.", + Required: false, + }, + }, + Action: func(c *cli.Context) error { + carPath := c.Args().First() + var file fs.File + var err error + if carPath == "-" { + file = os.Stdin + } else { + file, err = os.Open(carPath) + if err != nil { + klog.Exit(err.Error()) + } + defer file.Close() + } + + rd, err := carreader.New(file) + if err != nil { + klog.Exitf("Failed to open CAR: %s", err) + } + { + // print roots: + roots := rd.Header.Roots + klog.Infof("Roots: %d", len(roots)) + for i, root := range roots { + if i == 0 && len(roots) == 1 { + klog.Infof("- %s (Epoch CID)", root.String()) + } else { + klog.Infof("- %s", root.String()) + } + } + } + + maxSize := c.Int("size") + accum := accum.NewObjectAccumulator( + rd, + iplddecoders.KindBlock, + func(owm1 *accum.ObjectWithMetadata, owm2 []accum.ObjectWithMetadata) error { + size := 0 + + return nil + }, + iplddecoders.KindEpoch, + iplddecoders.KindSubset, + ) + + if err := accum.Run((context.Background())); err != nil { + klog.Exitf("error while accumulating objects: %w", err) + } + + return nil + }, + } +} From c99516faddc446b6241fc483334db34a24a22381 Mon Sep 17 00:00:00 2001 From: anjor Date: Mon, 1 Jul 2024 16:06:28 +0100 Subject: [PATCH 02/23] pseudo --- cmd-car-split.go | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/cmd-car-split.go b/cmd-car-split.go index 571519ac..c632757b 100644 --- a/cmd-car-split.go +++ b/cmd-car-split.go @@ -12,6 +12,26 @@ import ( "k8s.io/klog/v2" ) +const ( + nulRootCarHeader = "\x19" + // 25 bytes of CBOR (encoded as varint :cryingbear: ) + // map with 2 keys + "\xA2" + + // text-key with length 5 + "\x65" + "roots" + + // 1 element array + "\x81" + + // tag 42 + "\xD8\x2A" + + // bytes with length 5 + "\x45" + + // nul-identity-cid prefixed with \x00 as required in DAG-CBOR: https://github.com/ipld/specs/blob/master/block-layer/codecs/dag-cbor.md#links + "\x00\x01\x55\x00\x00" + + // text-key with length 7 + "\x67" + "version" + + // 1, we call this v0 due to the nul-identity CID being an open question: https://github.com/ipld/go-car/issues/26#issuecomment-604299576 + "\x01" +) + func newCmd_SplitCar() *cli.Command { return &cli.Command{ Name: "split-car", @@ -63,7 +83,21 @@ func newCmd_SplitCar() *cli.Command { iplddecoders.KindBlock, func(owm1 *accum.ObjectWithMetadata, owm2 []accum.ObjectWithMetadata) error { size := 0 + suffix := 0 + firstSlot := 0 + for _, owm := range owm2 { + // create new car file with suffix + // write header to car file + for size < maxSize { + // if object is Block, update firstSlot + // write data + } + // write subsetBlock + // close file + + } + //append epochBlock to last file return nil }, iplddecoders.KindEpoch, From 70b19cb41d19e20c422115622680037e77ca311f Mon Sep 17 00:00:00 2001 From: anjor Date: Mon, 1 Jul 2024 22:07:24 +0100 Subject: [PATCH 03/23] more --- cmd-car-split.go | 103 +++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 91 insertions(+), 12 deletions(-) diff --git a/cmd-car-split.go b/cmd-car-split.go index c632757b..9e9d1e55 100644 --- a/cmd-car-split.go +++ b/cmd-car-split.go @@ -2,8 +2,11 @@ package main import ( "context" + "fmt" + "io" "io/fs" "os" + "sync" "github.com/rpcpool/yellowstone-faithful/accum" "github.com/rpcpool/yellowstone-faithful/carreader" @@ -32,6 +35,12 @@ const ( "\x01" ) +type FileInfo struct { + FileName string + FirstSlot int + LastSlot int +} + func newCmd_SplitCar() *cli.Command { return &cli.Command{ Name: "split-car", @@ -78,26 +87,94 @@ func newCmd_SplitCar() *cli.Command { } maxSize := c.Int("size") + + var ( + currentFileSize int64 + currentFileNum int + currentFile *os.File + fileMutex sync.Mutex + currentFileInfo FileInfo + ) + + createNewFile := func() error { + fileMutex.Lock() + defer fileMutex.Unlock() + + if currentFile != nil { + currentFile.Close() + } + currentFileNum++ + filename := fmt.Sprintf("%d.car", currentFileNum) + currentFile, err = os.Create(filename) + if err != nil { + return err + } + // Write the header + _, err = io.WriteString(currentFile, nulRootCarHeader) + if err != nil { + return err + } + + // Set the currentFileSize to the size of the header + currentFileSize = int64(len(nulRootCarHeader)) + currentFileInfo = FileInfo{FileName: filename, FirstSlot: -1, LastSlot: -1} + return nil + } + + writeObject := func(data []byte) error { + fileMutex.Lock() + defer fileMutex.Unlock() + + if currentFile == nil || currentFileSize+int64(len(data)) > maxFileSize { + if err := createNewFile(); err != nil { + return err + } + } + + _, err := currentFile.Write(data) + if err != nil { + return fmt.Errorf("failed to write object: %w", err) + } + currentFileSize += int64(len(data)) + return nil + } + + processObject := func(data []byte) error { + kind, err := iplddecoders.GetKind(data) + if err != nil { + return err + } + + if kind == iplddecoders.KindBlock { + block, err := iplddecoders.DecodeBlock(data) + if err != nil { + return err + } + + if currentFileInfo.FirstSlot == -1 || block.Slot < currentFileInfo.FirstSlot { + currentFileInfo.FirstSlot = block.Slot + } + if block.Slot > currentFileInfo.LastSlot { + currentFileInfo.LastSlot = block.Slot + } + } + + return writeObject(data) + } + accum := accum.NewObjectAccumulator( rd, iplddecoders.KindBlock, func(owm1 *accum.ObjectWithMetadata, owm2 []accum.ObjectWithMetadata) error { - size := 0 - suffix := 0 - firstSlot := 0 for _, owm := range owm2 { - // create new car file with suffix - // write header to car file - for size < maxSize { - // if object is Block, update firstSlot - // write data + if err := processObject(owm.ObjectData); err != nil { + return err } - // write subsetBlock - // close file - } - //append epochBlock to last file + if err := processObject(owm1.ObjectData); err != nil { + return err + } return nil }, iplddecoders.KindEpoch, @@ -108,6 +185,8 @@ func newCmd_SplitCar() *cli.Command { klog.Exitf("error while accumulating objects: %w", err) } + // To do: Construct and write the SubSet node + return nil }, } From 27366332bc3aee4115885b3ec98e72a6bb93a42f Mon Sep 17 00:00:00 2001 From: anjor Date: Wed, 3 Jul 2024 10:23:47 +0100 Subject: [PATCH 04/23] name --- cmd-car-split.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd-car-split.go b/cmd-car-split.go index 9e9d1e55..3560d3e4 100644 --- a/cmd-car-split.go +++ b/cmd-car-split.go @@ -86,7 +86,7 @@ func newCmd_SplitCar() *cli.Command { } } - maxSize := c.Int("size") + maxFileSize := c.Int64("size") var ( currentFileSize int64 From 78afc844905d31a11504bff6276a002e25b64ec1 Mon Sep 17 00:00:00 2001 From: anjor Date: Wed, 3 Jul 2024 12:46:37 +0100 Subject: [PATCH 05/23] latest --- cmd-car-split.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/cmd-car-split.go b/cmd-car-split.go index 3560d3e4..3cabf187 100644 --- a/cmd-car-split.go +++ b/cmd-car-split.go @@ -126,6 +126,9 @@ func newCmd_SplitCar() *cli.Command { defer fileMutex.Unlock() if currentFile == nil || currentFileSize+int64(len(data)) > maxFileSize { + + // To do: Construct and write the SubSet node + if err := createNewFile(); err != nil { return err } @@ -185,7 +188,7 @@ func newCmd_SplitCar() *cli.Command { klog.Exitf("error while accumulating objects: %w", err) } - // To do: Construct and write the SubSet node + // To do: construct and write the epoch node to the last file return nil }, From 02d01ddb6eed5b88ca0322dad657584deb5b54fe Mon Sep 17 00:00:00 2001 From: anjor Date: Fri, 5 Jul 2024 13:26:15 +0100 Subject: [PATCH 06/23] write subset node --- cmd-car-split.go | 106 ++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 87 insertions(+), 19 deletions(-) diff --git a/cmd-car-split.go b/cmd-car-split.go index 3cabf187..1889df88 100644 --- a/cmd-car-split.go +++ b/cmd-car-split.go @@ -1,6 +1,7 @@ package main import ( + "bytes" "context" "fmt" "io" @@ -8,8 +9,15 @@ import ( "os" "sync" + "github.com/ipfs/go-cid" + "github.com/ipld/go-ipld-prime/codec/dagcbor" + "github.com/ipld/go-ipld-prime/datamodel" + "github.com/ipld/go-ipld-prime/fluent/qp" + cidlink "github.com/ipld/go-ipld-prime/linking/cid" + "github.com/multiformats/go-multicodec" "github.com/rpcpool/yellowstone-faithful/accum" "github.com/rpcpool/yellowstone-faithful/carreader" + "github.com/rpcpool/yellowstone-faithful/ipld/ipldbindcode" "github.com/rpcpool/yellowstone-faithful/iplddecoders" "github.com/urfave/cli/v2" "k8s.io/klog/v2" @@ -35,10 +43,11 @@ const ( "\x01" ) -type FileInfo struct { - FileName string - FirstSlot int - LastSlot int +type subsetInfo struct { + fileName string + firstSlot int + lastSlot int + blockLinks []datamodel.Link } func newCmd_SplitCar() *cli.Command { @@ -89,11 +98,11 @@ func newCmd_SplitCar() *cli.Command { maxFileSize := c.Int64("size") var ( - currentFileSize int64 - currentFileNum int - currentFile *os.File - fileMutex sync.Mutex - currentFileInfo FileInfo + currentFileSize int64 + currentFileNum int + currentFile *os.File + fileMutex sync.Mutex + currentSubsetInfo subsetInfo ) createNewFile := func() error { @@ -101,6 +110,26 @@ func newCmd_SplitCar() *cli.Command { defer fileMutex.Unlock() if currentFile != nil { + subsetNode, err := qp.BuildMap(ipldbindcode.Prototypes.Subset, -1, func(ma datamodel.MapAssembler) { + qp.MapEntry(ma, "kind", qp.Int(int64(iplddecoders.KindSubset))) + qp.MapEntry(ma, "first", qp.Int(int64(currentSubsetInfo.firstSlot))) + qp.MapEntry(ma, "last", qp.Int(int64(currentSubsetInfo.lastSlot))) + qp.MapEntry(ma, "blocks", + qp.List(-1, func(la datamodel.ListAssembler) { + for _, bl := range currentSubsetInfo.blockLinks { + qp.ListEntry(la, qp.Link(bl)) + } + })) + }) + if err != nil { + return err + } + + err = writeNode(subsetNode, currentFile) + if err != nil { + return err + } + currentFile.Close() } currentFileNum++ @@ -117,7 +146,7 @@ func newCmd_SplitCar() *cli.Command { // Set the currentFileSize to the size of the header currentFileSize = int64(len(nulRootCarHeader)) - currentFileInfo = FileInfo{FileName: filename, FirstSlot: -1, LastSlot: -1} + currentSubsetInfo = subsetInfo{fileName: filename, firstSlot: -1, lastSlot: -1} return nil } @@ -127,8 +156,6 @@ func newCmd_SplitCar() *cli.Command { if currentFile == nil || currentFileSize+int64(len(data)) > maxFileSize { - // To do: Construct and write the SubSet node - if err := createNewFile(); err != nil { return err } @@ -142,7 +169,8 @@ func newCmd_SplitCar() *cli.Command { return nil } - processObject := func(data []byte) error { + processObject := func(owm *accum.ObjectWithMetadata) error { + data := owm.ObjectData kind, err := iplddecoders.GetKind(data) if err != nil { return err @@ -154,12 +182,14 @@ func newCmd_SplitCar() *cli.Command { return err } - if currentFileInfo.FirstSlot == -1 || block.Slot < currentFileInfo.FirstSlot { - currentFileInfo.FirstSlot = block.Slot + if currentSubsetInfo.firstSlot == -1 || block.Slot < currentSubsetInfo.firstSlot { + currentSubsetInfo.firstSlot = block.Slot } - if block.Slot > currentFileInfo.LastSlot { - currentFileInfo.LastSlot = block.Slot + if block.Slot > currentSubsetInfo.lastSlot { + currentSubsetInfo.lastSlot = block.Slot } + + currentSubsetInfo.blockLinks = append(currentSubsetInfo.blockLinks, cidlink.Link{Cid: owm.Cid}) } return writeObject(data) @@ -170,12 +200,12 @@ func newCmd_SplitCar() *cli.Command { iplddecoders.KindBlock, func(owm1 *accum.ObjectWithMetadata, owm2 []accum.ObjectWithMetadata) error { for _, owm := range owm2 { - if err := processObject(owm.ObjectData); err != nil { + if err := processObject(&owm); err != nil { return err } } - if err := processObject(owm1.ObjectData); err != nil { + if err := processObject(owm1); err != nil { return err } return nil @@ -194,3 +224,41 @@ func newCmd_SplitCar() *cli.Command { }, } } + +func writeNode(node datamodel.Node, f *os.File) error { + var buf bytes.Buffer + err := dagcbor.Encode(node, &buf) + if err != nil { + return err + } + + bd := cid.V1Builder{MhLength: -1, MhType: uint64(multicodec.Sha2_256), Codec: uint64(multicodec.DagCbor)} + cid, err := bd.Sum(buf.Bytes()) + if err != nil { + return err + } + + c := []byte(cid.KeyString()) + d := buf.Bytes() + + sizeVi := appendVarint(nil, uint64(len(c))+uint64(len(d))) + + if _, err := f.Write(sizeVi); err == nil { + if _, err := f.Write(c); err == nil { + if _, err := f.Write(d); err != nil { + return err + } + + } + } + return nil + +} + +func appendVarint(tgt []byte, v uint64) []byte { + for v > 127 { + tgt = append(tgt, byte(v|128)) + v >>= 7 + } + return append(tgt, byte(v)) +} From 533f9ce9b3580146f968e475129378c1690bf294 Mon Sep 17 00:00:00 2001 From: anjor Date: Fri, 5 Jul 2024 13:43:32 +0100 Subject: [PATCH 07/23] add epoch node --- cmd-car-split.go | 36 ++++++++++++++++++++++++++++-------- main.go | 1 + 2 files changed, 29 insertions(+), 8 deletions(-) diff --git a/cmd-car-split.go b/cmd-car-split.go index 1889df88..085b04ab 100644 --- a/cmd-car-split.go +++ b/cmd-car-split.go @@ -103,6 +103,7 @@ func newCmd_SplitCar() *cli.Command { currentFile *os.File fileMutex sync.Mutex currentSubsetInfo subsetInfo + subsetLinks []datamodel.Link ) createNewFile := func() error { @@ -125,11 +126,13 @@ func newCmd_SplitCar() *cli.Command { return err } - err = writeNode(subsetNode, currentFile) + cid, err := writeNode(subsetNode, currentFile) if err != nil { return err } + subsetLinks = append(subsetLinks, cidlink.Link{Cid: cid}) + currentFile.Close() } currentFileNum++ @@ -218,24 +221,42 @@ func newCmd_SplitCar() *cli.Command { klog.Exitf("error while accumulating objects: %w", err) } - // To do: construct and write the epoch node to the last file + epochNode, err := qp.BuildMap(ipldbindcode.Prototypes.Epoch, -1, func(ma datamodel.MapAssembler) { + qp.MapEntry(ma, "kind", qp.Int(int64(iplddecoders.KindEpoch))) + qp.MapEntry(ma, "epoch", qp.Int(int64(epoch))) + qp.MapEntry(ma, "subsets", + qp.List(-1, func(la datamodel.ListAssembler) { + for _, sl := range subsetLinks { + qp.ListEntry(la, qp.Link(sl)) + } + }), + ) + }) + if err != nil { + return err + } + + _, err = writeNode(epochNode, currentFile) + if err != nil { + return err + } return nil }, } } -func writeNode(node datamodel.Node, f *os.File) error { +func writeNode(node datamodel.Node, f *os.File) (cid.Cid, error) { var buf bytes.Buffer err := dagcbor.Encode(node, &buf) if err != nil { - return err + return nil, err } bd := cid.V1Builder{MhLength: -1, MhType: uint64(multicodec.Sha2_256), Codec: uint64(multicodec.DagCbor)} cid, err := bd.Sum(buf.Bytes()) if err != nil { - return err + return nil, err } c := []byte(cid.KeyString()) @@ -246,13 +267,12 @@ func writeNode(node datamodel.Node, f *os.File) error { if _, err := f.Write(sizeVi); err == nil { if _, err := f.Write(c); err == nil { if _, err := f.Write(d); err != nil { - return err + return nil, err } } } - return nil - + return cid, nil } func appendVarint(tgt []byte, v uint64) []byte { diff --git a/main.go b/main.go index e75a044b..8da32443 100644 --- a/main.go +++ b/main.go @@ -57,6 +57,7 @@ func main() { newCmd_Version(), newCmd_rpc(), newCmd_check_deals(), + newCmd_SplitCar(), }, } From edbddfb941221d49ca98a2b9335f2893dbdfe303 Mon Sep 17 00:00:00 2001 From: anjor Date: Fri, 5 Jul 2024 13:51:15 +0100 Subject: [PATCH 08/23] epoch node --- cmd-car-split.go | 23 +++++++++++++++-------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/cmd-car-split.go b/cmd-car-split.go index 085b04ab..08b26175 100644 --- a/cmd-car-split.go +++ b/cmd-car-split.go @@ -56,13 +56,19 @@ func newCmd_SplitCar() *cli.Command { Description: "Splits an epoch car file into smaller chunks. Each chunk corresponds to a subset.", ArgsUsage: "", Flags: []cli.Flag{ - &cli.IntFlag{ + &cli.Int64Flag{ Name: "size", Aliases: []string{"s"}, Value: 31 * 1024 * 1024 * 1024, // 31 GiB Usage: "Target size in bytes to chunk CARs to.", Required: false, }, + &cli.IntFlag{ + Name: "epoch", + Aliases: []string{"e"}, + Usage: "Epoch number", + Required: true, + }, }, Action: func(c *cli.Context) error { carPath := c.Args().First() @@ -95,6 +101,7 @@ func newCmd_SplitCar() *cli.Command { } } + epoch := c.Int("epoch") maxFileSize := c.Int64("size") var ( @@ -136,7 +143,7 @@ func newCmd_SplitCar() *cli.Command { currentFile.Close() } currentFileNum++ - filename := fmt.Sprintf("%d.car", currentFileNum) + filename := fmt.Sprintf("epoch-%d-%d.car", epoch, currentFileNum) currentFile, err = os.Create(filename) if err != nil { return err @@ -250,16 +257,16 @@ func writeNode(node datamodel.Node, f *os.File) (cid.Cid, error) { var buf bytes.Buffer err := dagcbor.Encode(node, &buf) if err != nil { - return nil, err + return cid.Cid{}, err } bd := cid.V1Builder{MhLength: -1, MhType: uint64(multicodec.Sha2_256), Codec: uint64(multicodec.DagCbor)} - cid, err := bd.Sum(buf.Bytes()) + cd, err := bd.Sum(buf.Bytes()) if err != nil { - return nil, err + return cid.Cid{}, err } - c := []byte(cid.KeyString()) + c := []byte(cd.KeyString()) d := buf.Bytes() sizeVi := appendVarint(nil, uint64(len(c))+uint64(len(d))) @@ -267,12 +274,12 @@ func writeNode(node datamodel.Node, f *os.File) (cid.Cid, error) { if _, err := f.Write(sizeVi); err == nil { if _, err := f.Write(c); err == nil { if _, err := f.Write(d); err != nil { - return nil, err + return cid.Cid{}, err } } } - return cid, nil + return cd, nil } func appendVarint(tgt []byte, v uint64) []byte { From 3a855309a8306e829360425d2e8b1f7bc95595a6 Mon Sep 17 00:00:00 2001 From: anjor Date: Mon, 8 Jul 2024 09:33:25 +0100 Subject: [PATCH 09/23] error messages --- cmd-car-split.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/cmd-car-split.go b/cmd-car-split.go index 08b26175..1b5794a1 100644 --- a/cmd-car-split.go +++ b/cmd-car-split.go @@ -79,14 +79,14 @@ func newCmd_SplitCar() *cli.Command { } else { file, err = os.Open(carPath) if err != nil { - klog.Exit(err.Error()) + return fmt.Errorf("failed to open CAR: %s", err) } defer file.Close() } rd, err := carreader.New(file) if err != nil { - klog.Exitf("Failed to open CAR: %s", err) + return fmt.Errorf("failed to open CAR: %s", err) } { // print roots: @@ -173,7 +173,7 @@ func newCmd_SplitCar() *cli.Command { _, err := currentFile.Write(data) if err != nil { - return fmt.Errorf("failed to write object: %w", err) + return fmt.Errorf("failed to write object to car file: %s, error: %w", currentFile.Name(), err) } currentFileSize += int64(len(data)) return nil @@ -225,7 +225,7 @@ func newCmd_SplitCar() *cli.Command { ) if err := accum.Run((context.Background())); err != nil { - klog.Exitf("error while accumulating objects: %w", err) + return fmt.Errorf("failed to run accumulator while accumulating objects: %w", err) } epochNode, err := qp.BuildMap(ipldbindcode.Prototypes.Epoch, -1, func(ma datamodel.MapAssembler) { From c106839b7099abd4c39457bedebafc484b5ad17f Mon Sep 17 00:00:00 2001 From: anjor Date: Mon, 8 Jul 2024 10:16:59 +0100 Subject: [PATCH 10/23] rawsection --- cmd-car-split.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/cmd-car-split.go b/cmd-car-split.go index 1b5794a1..099912c7 100644 --- a/cmd-car-split.go +++ b/cmd-car-split.go @@ -202,7 +202,12 @@ func newCmd_SplitCar() *cli.Command { currentSubsetInfo.blockLinks = append(currentSubsetInfo.blockLinks, cidlink.Link{Cid: owm.Cid}) } - return writeObject(data) + rs, err := owm.RawSection() + if err != nil { + return err + } + + return writeObject(rs) } accum := accum.NewObjectAccumulator( From fdf3faae81398ef56ba85255f094f2ea901361c1 Mon Sep 17 00:00:00 2001 From: anjor Date: Mon, 8 Jul 2024 16:11:11 +0100 Subject: [PATCH 11/23] mutex --- cmd-car-split.go | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/cmd-car-split.go b/cmd-car-split.go index 099912c7..b8e07f48 100644 --- a/cmd-car-split.go +++ b/cmd-car-split.go @@ -161,16 +161,23 @@ func newCmd_SplitCar() *cli.Command { } writeObject := func(data []byte) error { - fileMutex.Lock() - defer fileMutex.Unlock() + var needNewFile bool + fileMutex.Lock() if currentFile == nil || currentFileSize+int64(len(data)) > maxFileSize { + needNewFile = true + } + fileMutex.Unlock() + if needNewFile { if err := createNewFile(); err != nil { return err } } + fileMutex.Lock() + defer fileMutex.Unlock() + _, err := currentFile.Write(data) if err != nil { return fmt.Errorf("failed to write object to car file: %s, error: %w", currentFile.Name(), err) From 320cec41acac0df05fff074007cfaaf460f920b4 Mon Sep 17 00:00:00 2001 From: anjor Date: Mon, 8 Jul 2024 22:40:29 +0100 Subject: [PATCH 12/23] representation --- cmd-car-split.go | 34 ++++++++++++++++++++++++++++++---- 1 file changed, 30 insertions(+), 4 deletions(-) diff --git a/cmd-car-split.go b/cmd-car-split.go index b8e07f48..2fed86a5 100644 --- a/cmd-car-split.go +++ b/cmd-car-split.go @@ -14,6 +14,7 @@ import ( "github.com/ipld/go-ipld-prime/datamodel" "github.com/ipld/go-ipld-prime/fluent/qp" cidlink "github.com/ipld/go-ipld-prime/linking/cid" + "github.com/ipld/go-ipld-prime/schema" "github.com/multiformats/go-multicodec" "github.com/rpcpool/yellowstone-faithful/accum" "github.com/rpcpool/yellowstone-faithful/carreader" @@ -240,6 +241,28 @@ func newCmd_SplitCar() *cli.Command { return fmt.Errorf("failed to run accumulator while accumulating objects: %w", err) } + subsetNode, err := qp.BuildMap(ipldbindcode.Prototypes.Subset, -1, func(ma datamodel.MapAssembler) { + qp.MapEntry(ma, "kind", qp.Int(int64(iplddecoders.KindSubset))) + qp.MapEntry(ma, "first", qp.Int(int64(currentSubsetInfo.firstSlot))) + qp.MapEntry(ma, "last", qp.Int(int64(currentSubsetInfo.lastSlot))) + qp.MapEntry(ma, "blocks", + qp.List(-1, func(la datamodel.ListAssembler) { + for _, bl := range currentSubsetInfo.blockLinks { + qp.ListEntry(la, qp.Link(bl)) + } + })) + }) + if err != nil { + return err + } + + cid, err := writeNode(subsetNode, currentFile) + if err != nil { + return err + } + + subsetLinks = append(subsetLinks, cidlink.Link{Cid: cid}) + epochNode, err := qp.BuildMap(ipldbindcode.Prototypes.Epoch, -1, func(ma datamodel.MapAssembler) { qp.MapEntry(ma, "kind", qp.Int(int64(iplddecoders.KindEpoch))) qp.MapEntry(ma, "epoch", qp.Int(int64(epoch))) @@ -259,6 +282,7 @@ func newCmd_SplitCar() *cli.Command { if err != nil { return err } + currentFile.Close() return nil }, @@ -266,26 +290,28 @@ func newCmd_SplitCar() *cli.Command { } func writeNode(node datamodel.Node, f *os.File) (cid.Cid, error) { + node = node.(schema.TypedNode).Representation() var buf bytes.Buffer err := dagcbor.Encode(node, &buf) if err != nil { return cid.Cid{}, err } + data := buf.Bytes() + bd := cid.V1Builder{MhLength: -1, MhType: uint64(multicodec.Sha2_256), Codec: uint64(multicodec.DagCbor)} - cd, err := bd.Sum(buf.Bytes()) + cd, err := bd.Sum(data) if err != nil { return cid.Cid{}, err } c := []byte(cd.KeyString()) - d := buf.Bytes() - sizeVi := appendVarint(nil, uint64(len(c))+uint64(len(d))) + sizeVi := appendVarint(nil, uint64(len(c))+uint64(len(data))) if _, err := f.Write(sizeVi); err == nil { if _, err := f.Write(c); err == nil { - if _, err := f.Write(d); err != nil { + if _, err := f.Write(data); err != nil { return cid.Cid{}, err } From 042adcab704fdace8af9e87802e94423920eee14 Mon Sep 17 00:00:00 2001 From: anjor Date: Mon, 8 Jul 2024 22:47:31 +0100 Subject: [PATCH 13/23] fix --- cmd-car-split.go | 37 +++++++++++++++++++++---------------- 1 file changed, 21 insertions(+), 16 deletions(-) diff --git a/cmd-car-split.go b/cmd-car-split.go index 2fed86a5..9399f9a2 100644 --- a/cmd-car-split.go +++ b/cmd-car-split.go @@ -131,28 +131,30 @@ func newCmd_SplitCar() *cli.Command { })) }) if err != nil { - return err + return fmt.Errorf("failed to construct a subsetNode: %s", err) } cid, err := writeNode(subsetNode, currentFile) if err != nil { - return err + return fmt.Errorf("failed to write a subsetNode: %s", err) } subsetLinks = append(subsetLinks, cidlink.Link{Cid: cid}) - currentFile.Close() + if err = currentFile.Close(); err != nil { + return fmt.Errorf("failed to close file: %s", err) + } } currentFileNum++ filename := fmt.Sprintf("epoch-%d-%d.car", epoch, currentFileNum) currentFile, err = os.Create(filename) if err != nil { - return err + return fmt.Errorf("failed to create file %s: %s", filename, err) } // Write the header _, err = io.WriteString(currentFile, nulRootCarHeader) if err != nil { - return err + return fmt.Errorf("failed to write header: %s", err) } // Set the currentFileSize to the size of the header @@ -172,7 +174,7 @@ func newCmd_SplitCar() *cli.Command { if needNewFile { if err := createNewFile(); err != nil { - return err + return fmt.Errorf("failed to create a new file: %s", err) } } @@ -191,13 +193,13 @@ func newCmd_SplitCar() *cli.Command { data := owm.ObjectData kind, err := iplddecoders.GetKind(data) if err != nil { - return err + return fmt.Errorf("failed to get kind: %s", err) } if kind == iplddecoders.KindBlock { block, err := iplddecoders.DecodeBlock(data) if err != nil { - return err + return fmt.Errorf("failed to decode block: %s", err) } if currentSubsetInfo.firstSlot == -1 || block.Slot < currentSubsetInfo.firstSlot { @@ -212,7 +214,7 @@ func newCmd_SplitCar() *cli.Command { rs, err := owm.RawSection() if err != nil { - return err + return fmt.Errorf("failed to get raw section: %s", err) } return writeObject(rs) @@ -224,12 +226,12 @@ func newCmd_SplitCar() *cli.Command { func(owm1 *accum.ObjectWithMetadata, owm2 []accum.ObjectWithMetadata) error { for _, owm := range owm2 { if err := processObject(&owm); err != nil { - return err + return fmt.Errorf("failed to process object: %s", err) } } if err := processObject(owm1); err != nil { - return err + return fmt.Errorf("failed to process object: %s", err) } return nil }, @@ -253,12 +255,12 @@ func newCmd_SplitCar() *cli.Command { })) }) if err != nil { - return err + return fmt.Errorf("failed to construct subsetNode: %s", err) } cid, err := writeNode(subsetNode, currentFile) if err != nil { - return err + return fmt.Errorf("failed to write subsetNode: %s", err) } subsetLinks = append(subsetLinks, cidlink.Link{Cid: cid}) @@ -275,14 +277,17 @@ func newCmd_SplitCar() *cli.Command { ) }) if err != nil { - return err + return fmt.Errorf("failed to construct epochNode: %s", err) } _, err = writeNode(epochNode, currentFile) if err != nil { - return err + return fmt.Errorf("failed to write epochNode: %s", err) + } + err = currentFile.Close() + if err != nil { + return fmt.Errorf("failed to close file: %s", err) } - currentFile.Close() return nil }, From 592bddc1a72f38ecbb65a93c02a951c99dcb1aca Mon Sep 17 00:00:00 2001 From: anjor Date: Wed, 10 Jul 2024 09:09:44 +0100 Subject: [PATCH 14/23] s -> w --- cmd-car-split.go | 36 ++++++++++++++++++------------------ 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/cmd-car-split.go b/cmd-car-split.go index 9399f9a2..bdb3f80a 100644 --- a/cmd-car-split.go +++ b/cmd-car-split.go @@ -80,14 +80,14 @@ func newCmd_SplitCar() *cli.Command { } else { file, err = os.Open(carPath) if err != nil { - return fmt.Errorf("failed to open CAR: %s", err) + return fmt.Errorf("failed to open CAR: %w", err) } defer file.Close() } rd, err := carreader.New(file) if err != nil { - return fmt.Errorf("failed to open CAR: %s", err) + return fmt.Errorf("failed to open CAR: %w", err) } { // print roots: @@ -131,30 +131,30 @@ func newCmd_SplitCar() *cli.Command { })) }) if err != nil { - return fmt.Errorf("failed to construct a subsetNode: %s", err) + return fmt.Errorf("failed to construct a subsetNode: %w", err) } cid, err := writeNode(subsetNode, currentFile) if err != nil { - return fmt.Errorf("failed to write a subsetNode: %s", err) + return fmt.Errorf("failed to write a subsetNode: %w", err) } subsetLinks = append(subsetLinks, cidlink.Link{Cid: cid}) if err = currentFile.Close(); err != nil { - return fmt.Errorf("failed to close file: %s", err) + return fmt.Errorf("failed to close file: %w", err) } } currentFileNum++ filename := fmt.Sprintf("epoch-%d-%d.car", epoch, currentFileNum) currentFile, err = os.Create(filename) if err != nil { - return fmt.Errorf("failed to create file %s: %s", filename, err) + return fmt.Errorf("failed to create file %s: %w", filename, err) } // Write the header _, err = io.WriteString(currentFile, nulRootCarHeader) if err != nil { - return fmt.Errorf("failed to write header: %s", err) + return fmt.Errorf("failed to write header: %w", err) } // Set the currentFileSize to the size of the header @@ -174,7 +174,7 @@ func newCmd_SplitCar() *cli.Command { if needNewFile { if err := createNewFile(); err != nil { - return fmt.Errorf("failed to create a new file: %s", err) + return fmt.Errorf("failed to create a new file: %w", err) } } @@ -193,13 +193,13 @@ func newCmd_SplitCar() *cli.Command { data := owm.ObjectData kind, err := iplddecoders.GetKind(data) if err != nil { - return fmt.Errorf("failed to get kind: %s", err) + return fmt.Errorf("failed to get kind: %w", err) } if kind == iplddecoders.KindBlock { block, err := iplddecoders.DecodeBlock(data) if err != nil { - return fmt.Errorf("failed to decode block: %s", err) + return fmt.Errorf("failed to decode block: %w", err) } if currentSubsetInfo.firstSlot == -1 || block.Slot < currentSubsetInfo.firstSlot { @@ -214,7 +214,7 @@ func newCmd_SplitCar() *cli.Command { rs, err := owm.RawSection() if err != nil { - return fmt.Errorf("failed to get raw section: %s", err) + return fmt.Errorf("failed to get raw section: %w", err) } return writeObject(rs) @@ -226,12 +226,12 @@ func newCmd_SplitCar() *cli.Command { func(owm1 *accum.ObjectWithMetadata, owm2 []accum.ObjectWithMetadata) error { for _, owm := range owm2 { if err := processObject(&owm); err != nil { - return fmt.Errorf("failed to process object: %s", err) + return fmt.Errorf("failed to process object: %w", err) } } if err := processObject(owm1); err != nil { - return fmt.Errorf("failed to process object: %s", err) + return fmt.Errorf("failed to process object: %w", err) } return nil }, @@ -255,12 +255,12 @@ func newCmd_SplitCar() *cli.Command { })) }) if err != nil { - return fmt.Errorf("failed to construct subsetNode: %s", err) + return fmt.Errorf("failed to construct subsetNode: %w", err) } cid, err := writeNode(subsetNode, currentFile) if err != nil { - return fmt.Errorf("failed to write subsetNode: %s", err) + return fmt.Errorf("failed to write subsetNode: %w", err) } subsetLinks = append(subsetLinks, cidlink.Link{Cid: cid}) @@ -277,16 +277,16 @@ func newCmd_SplitCar() *cli.Command { ) }) if err != nil { - return fmt.Errorf("failed to construct epochNode: %s", err) + return fmt.Errorf("failed to construct epochNode: %w", err) } _, err = writeNode(epochNode, currentFile) if err != nil { - return fmt.Errorf("failed to write epochNode: %s", err) + return fmt.Errorf("failed to write epochNode: %w", err) } err = currentFile.Close() if err != nil { - return fmt.Errorf("failed to close file: %s", err) + return fmt.Errorf("failed to close file: %w", err) } return nil From 9683147024e0a7bde9be1dcef7240faef06b6a87 Mon Sep 17 00:00:00 2001 From: anjor Date: Wed, 10 Jul 2024 14:08:44 +0100 Subject: [PATCH 15/23] blockDag in one car --- accum/block.go | 11 ++++++ cmd-car-split.go | 100 +++++++++++++++++++++++++++-------------------- 2 files changed, 68 insertions(+), 43 deletions(-) diff --git a/accum/block.go b/accum/block.go index e4861028..9c56c827 100644 --- a/accum/block.go +++ b/accum/block.go @@ -190,6 +190,17 @@ func (obj ObjectWithMetadata) RawSection() ([]byte, error) { return buf, nil } +func (obj ObjectWithMetadata) RawSectionSize() int { + sectionLen := len(obj.Cid.Bytes()) + len(obj.ObjectData) + lenBytes := leb128.FromUInt64(uint64(sectionLen)) + + // Size is: + // length of LEB128-encoded section length + + // length of CID bytes + + // length of object data + return len(lenBytes) + sectionLen +} + // { // raw, err := objm.RawSection() // if err != nil { diff --git a/cmd-car-split.go b/cmd-car-split.go index bdb3f80a..3d89e7bd 100644 --- a/cmd-car-split.go +++ b/cmd-car-split.go @@ -164,20 +164,6 @@ func newCmd_SplitCar() *cli.Command { } writeObject := func(data []byte) error { - var needNewFile bool - - fileMutex.Lock() - if currentFile == nil || currentFileSize+int64(len(data)) > maxFileSize { - needNewFile = true - } - fileMutex.Unlock() - - if needNewFile { - if err := createNewFile(); err != nil { - return fmt.Errorf("failed to create a new file: %w", err) - } - } - fileMutex.Lock() defer fileMutex.Unlock() @@ -189,50 +175,78 @@ func newCmd_SplitCar() *cli.Command { return nil } - processObject := func(owm *accum.ObjectWithMetadata) error { - data := owm.ObjectData - kind, err := iplddecoders.GetKind(data) - if err != nil { - return fmt.Errorf("failed to get kind: %w", err) - } - - if kind == iplddecoders.KindBlock { - block, err := iplddecoders.DecodeBlock(data) + writeBlockDag := func(blockDag []accum.ObjectWithMetadata) error { + for _, owm := range blockDag { + rs, err := owm.RawSection() if err != nil { - return fmt.Errorf("failed to decode block: %w", err) + return fmt.Errorf("failed to get raw section: %w", err) } - if currentSubsetInfo.firstSlot == -1 || block.Slot < currentSubsetInfo.firstSlot { - currentSubsetInfo.firstSlot = block.Slot - } - if block.Slot > currentSubsetInfo.lastSlot { - currentSubsetInfo.lastSlot = block.Slot + err = writeObject(rs) + if err != nil { + return fmt.Errorf("failed to write object: %w", err) } - - currentSubsetInfo.blockLinks = append(currentSubsetInfo.blockLinks, cidlink.Link{Cid: owm.Cid}) } - rs, err := owm.RawSection() - if err != nil { - return fmt.Errorf("failed to get raw section: %w", err) - } - - return writeObject(rs) + return nil } accum := accum.NewObjectAccumulator( rd, iplddecoders.KindBlock, func(owm1 *accum.ObjectWithMetadata, owm2 []accum.ObjectWithMetadata) error { - for _, owm := range owm2 { - if err := processObject(&owm); err != nil { - return fmt.Errorf("failed to process object: %w", err) + owms := append(owm2, *owm1) + var blockDag []accum.ObjectWithMetadata + dagSize := 0 + for _, owm := range owms { + blockDag = append(blockDag, owm) + dagSize += owm.RawSectionSize() + + // check if the current size + dag size is greater than the max size + // if it is create new file + var needNewFile bool + + fileMutex.Lock() + if currentFile == nil || currentFileSize+int64(dagSize) > maxFileSize { + needNewFile = true + } + fileMutex.Unlock() + + if needNewFile { + if err := createNewFile(); err != nil { + return fmt.Errorf("failed to create a new file: %w", err) + } + } + + kind, err := iplddecoders.GetKind(owm.ObjectData) + if err != nil { + return fmt.Errorf("failed to get kind: %w", err) + } + + if kind == iplddecoders.KindBlock { + block, err := iplddecoders.DecodeBlock(owm.ObjectData) + if err != nil { + return fmt.Errorf("failed to decode block: %w", err) + } + + if currentSubsetInfo.firstSlot == -1 || block.Slot < currentSubsetInfo.firstSlot { + currentSubsetInfo.firstSlot = block.Slot + } + if block.Slot > currentSubsetInfo.lastSlot { + currentSubsetInfo.lastSlot = block.Slot + } + + currentSubsetInfo.blockLinks = append(currentSubsetInfo.blockLinks, cidlink.Link{Cid: owm.Cid}) + + err = writeBlockDag(blockDag) + if err != nil { + return fmt.Errorf("failed to process dag block: %w", err) + } + blockDag = blockDag[:0] } - } - if err := processObject(owm1); err != nil { - return fmt.Errorf("failed to process object: %w", err) } + return nil }, iplddecoders.KindEpoch, From c69cbdeef8713a7b0b0f4961b26a9e6dc20a023f Mon Sep 17 00:00:00 2001 From: anjor Date: Wed, 10 Jul 2024 14:11:59 +0100 Subject: [PATCH 16/23] comments --- cmd-car-split.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/cmd-car-split.go b/cmd-car-split.go index 3d89e7bd..55ae0d20 100644 --- a/cmd-car-split.go +++ b/cmd-car-split.go @@ -199,13 +199,12 @@ func newCmd_SplitCar() *cli.Command { var blockDag []accum.ObjectWithMetadata dagSize := 0 for _, owm := range owms { + // build up a block dag blockDag = append(blockDag, owm) dagSize += owm.RawSectionSize() - // check if the current size + dag size is greater than the max size - // if it is create new file + // if the current size + dag size is greater than the max size, then start a new file var needNewFile bool - fileMutex.Lock() if currentFile == nil || currentFileSize+int64(dagSize) > maxFileSize { needNewFile = true @@ -218,6 +217,7 @@ func newCmd_SplitCar() *cli.Command { } } + // if the current object is a block, write it out kind, err := iplddecoders.GetKind(owm.ObjectData) if err != nil { return fmt.Errorf("failed to get kind: %w", err) @@ -242,7 +242,10 @@ func newCmd_SplitCar() *cli.Command { if err != nil { return fmt.Errorf("failed to process dag block: %w", err) } + + // reset blockDag and dagSize blockDag = blockDag[:0] + dagSize = 0 } } From 07e1d2d4aebfa919d1a9d44b7ef8a38ccfda3d7d Mon Sep 17 00:00:00 2001 From: anjor Date: Wed, 10 Jul 2024 14:28:27 +0100 Subject: [PATCH 17/23] remove mutexes --- cmd-car-split.go | 6 ------ 1 file changed, 6 deletions(-) diff --git a/cmd-car-split.go b/cmd-car-split.go index 55ae0d20..d6c1d1ac 100644 --- a/cmd-car-split.go +++ b/cmd-car-split.go @@ -115,9 +115,6 @@ func newCmd_SplitCar() *cli.Command { ) createNewFile := func() error { - fileMutex.Lock() - defer fileMutex.Unlock() - if currentFile != nil { subsetNode, err := qp.BuildMap(ipldbindcode.Prototypes.Subset, -1, func(ma datamodel.MapAssembler) { qp.MapEntry(ma, "kind", qp.Int(int64(iplddecoders.KindSubset))) @@ -164,9 +161,6 @@ func newCmd_SplitCar() *cli.Command { } writeObject := func(data []byte) error { - fileMutex.Lock() - defer fileMutex.Unlock() - _, err := currentFile.Write(data) if err != nil { return fmt.Errorf("failed to write object to car file: %s, error: %w", currentFile.Name(), err) From 6f1d3a8a9c9267d76230c3e7cbe140aa821ef3c5 Mon Sep 17 00:00:00 2001 From: anjor Date: Wed, 10 Jul 2024 14:32:57 +0100 Subject: [PATCH 18/23] handle nil owm1 --- cmd-car-split.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/cmd-car-split.go b/cmd-car-split.go index d6c1d1ac..ea443c50 100644 --- a/cmd-car-split.go +++ b/cmd-car-split.go @@ -189,9 +189,13 @@ func newCmd_SplitCar() *cli.Command { rd, iplddecoders.KindBlock, func(owm1 *accum.ObjectWithMetadata, owm2 []accum.ObjectWithMetadata) error { + if owm1 == nil { + return nil + } + owms := append(owm2, *owm1) - var blockDag []accum.ObjectWithMetadata dagSize := 0 + var blockDag []accum.ObjectWithMetadata for _, owm := range owms { // build up a block dag blockDag = append(blockDag, owm) From 0acfb7363241b26049800458aa9d68c3087623c7 Mon Sep 17 00:00:00 2001 From: anjor Date: Wed, 10 Jul 2024 14:53:13 +0100 Subject: [PATCH 19/23] simpler logic --- cmd-car-split.go | 67 ++++++++++++++++-------------------------------- 1 file changed, 22 insertions(+), 45 deletions(-) diff --git a/cmd-car-split.go b/cmd-car-split.go index ea443c50..044e4f36 100644 --- a/cmd-car-split.go +++ b/cmd-car-split.go @@ -7,7 +7,6 @@ import ( "io" "io/fs" "os" - "sync" "github.com/ipfs/go-cid" "github.com/ipld/go-ipld-prime/codec/dagcbor" @@ -109,7 +108,6 @@ func newCmd_SplitCar() *cli.Command { currentFileSize int64 currentFileNum int currentFile *os.File - fileMutex sync.Mutex currentSubsetInfo subsetInfo subsetLinks []datamodel.Link ) @@ -195,57 +193,36 @@ func newCmd_SplitCar() *cli.Command { owms := append(owm2, *owm1) dagSize := 0 - var blockDag []accum.ObjectWithMetadata + for _, owm := range owms { - // build up a block dag - blockDag = append(blockDag, owm) dagSize += owm.RawSectionSize() + } - // if the current size + dag size is greater than the max size, then start a new file - var needNewFile bool - fileMutex.Lock() - if currentFile == nil || currentFileSize+int64(dagSize) > maxFileSize { - needNewFile = true + if currentFile == nil || currentFileSize+int64(dagSize) > maxFileSize { + err := createNewFile() + if err != nil { + return fmt.Errorf("failed to create a new file: %w", err) } - fileMutex.Unlock() + } - if needNewFile { - if err := createNewFile(); err != nil { - return fmt.Errorf("failed to create a new file: %w", err) - } - } + // owm1 is necessarily a Block + block, err := iplddecoders.DecodeBlock(owm1.ObjectData) + if err != nil { + return fmt.Errorf("failed to decode block: %w", err) + } - // if the current object is a block, write it out - kind, err := iplddecoders.GetKind(owm.ObjectData) - if err != nil { - return fmt.Errorf("failed to get kind: %w", err) - } + if currentSubsetInfo.firstSlot == -1 || block.Slot < currentSubsetInfo.firstSlot { + currentSubsetInfo.firstSlot = block.Slot + } + if block.Slot > currentSubsetInfo.lastSlot { + currentSubsetInfo.lastSlot = block.Slot + } - if kind == iplddecoders.KindBlock { - block, err := iplddecoders.DecodeBlock(owm.ObjectData) - if err != nil { - return fmt.Errorf("failed to decode block: %w", err) - } - - if currentSubsetInfo.firstSlot == -1 || block.Slot < currentSubsetInfo.firstSlot { - currentSubsetInfo.firstSlot = block.Slot - } - if block.Slot > currentSubsetInfo.lastSlot { - currentSubsetInfo.lastSlot = block.Slot - } - - currentSubsetInfo.blockLinks = append(currentSubsetInfo.blockLinks, cidlink.Link{Cid: owm.Cid}) - - err = writeBlockDag(blockDag) - if err != nil { - return fmt.Errorf("failed to process dag block: %w", err) - } - - // reset blockDag and dagSize - blockDag = blockDag[:0] - dagSize = 0 - } + currentSubsetInfo.blockLinks = append(currentSubsetInfo.blockLinks, cidlink.Link{Cid: owm1.Cid}) + err = writeBlockDag(owms) + if err != nil { + return fmt.Errorf("failed to write block dag to file: %w", err) } return nil From a0b419b5bfc5440b830d2957ca527405bdc46c91 Mon Sep 17 00:00:00 2001 From: anjor Date: Wed, 10 Jul 2024 19:05:26 +0100 Subject: [PATCH 20/23] buffered writeer --- cmd-car-split.go | 31 +++++++++++++++++++++++-------- 1 file changed, 23 insertions(+), 8 deletions(-) diff --git a/cmd-car-split.go b/cmd-car-split.go index 044e4f36..5b46f49f 100644 --- a/cmd-car-split.go +++ b/cmd-car-split.go @@ -1,6 +1,7 @@ package main import ( + "bufio" "bytes" "context" "fmt" @@ -108,6 +109,7 @@ func newCmd_SplitCar() *cli.Command { currentFileSize int64 currentFileNum int currentFile *os.File + bufferedWriter *bufio.Writer currentSubsetInfo subsetInfo subsetLinks []datamodel.Link ) @@ -136,6 +138,10 @@ func newCmd_SplitCar() *cli.Command { subsetLinks = append(subsetLinks, cidlink.Link{Cid: cid}) + if err = bufferedWriter.Flush(); err != nil { + return fmt.Errorf("failed to flush bufferedWriter: %w", err) + } + if err = currentFile.Close(); err != nil { return fmt.Errorf("failed to close file: %w", err) } @@ -146,8 +152,11 @@ func newCmd_SplitCar() *cli.Command { if err != nil { return fmt.Errorf("failed to create file %s: %w", filename, err) } + + bufferedWriter = bufio.NewWriter(currentFile) + // Write the header - _, err = io.WriteString(currentFile, nulRootCarHeader) + _, err = io.WriteString(bufferedWriter, nulRootCarHeader) if err != nil { return fmt.Errorf("failed to write header: %w", err) } @@ -159,7 +168,7 @@ func newCmd_SplitCar() *cli.Command { } writeObject := func(data []byte) error { - _, err := currentFile.Write(data) + _, err := bufferedWriter.Write(data) if err != nil { return fmt.Errorf("failed to write object to car file: %s, error: %w", currentFile.Name(), err) } @@ -250,7 +259,7 @@ func newCmd_SplitCar() *cli.Command { return fmt.Errorf("failed to construct subsetNode: %w", err) } - cid, err := writeNode(subsetNode, currentFile) + cid, err := writeNode(subsetNode, bufferedWriter) if err != nil { return fmt.Errorf("failed to write subsetNode: %w", err) } @@ -272,10 +281,16 @@ func newCmd_SplitCar() *cli.Command { return fmt.Errorf("failed to construct epochNode: %w", err) } - _, err = writeNode(epochNode, currentFile) + _, err = writeNode(epochNode, bufferedWriter) if err != nil { return fmt.Errorf("failed to write epochNode: %w", err) } + + err = bufferedWriter.Flush() + if err != nil { + return fmt.Errorf("failed to flush buffer: %w", err) + } + err = currentFile.Close() if err != nil { return fmt.Errorf("failed to close file: %w", err) @@ -286,7 +301,7 @@ func newCmd_SplitCar() *cli.Command { } } -func writeNode(node datamodel.Node, f *os.File) (cid.Cid, error) { +func writeNode(node datamodel.Node, w io.Writer) (cid.Cid, error) { node = node.(schema.TypedNode).Representation() var buf bytes.Buffer err := dagcbor.Encode(node, &buf) @@ -306,9 +321,9 @@ func writeNode(node datamodel.Node, f *os.File) (cid.Cid, error) { sizeVi := appendVarint(nil, uint64(len(c))+uint64(len(data))) - if _, err := f.Write(sizeVi); err == nil { - if _, err := f.Write(c); err == nil { - if _, err := f.Write(data); err != nil { + if _, err := w.Write(sizeVi); err == nil { + if _, err := w.Write(c); err == nil { + if _, err := w.Write(data); err != nil { return cid.Cid{}, err } From 6d507fd33d801d7fd0064744f80f57ff0fdc34aa Mon Sep 17 00:00:00 2001 From: anjor Date: Wed, 10 Jul 2024 19:31:16 +0100 Subject: [PATCH 21/23] refactor --- cmd-car-split.go | 96 ++++++++++++++++++++++-------------------------- 1 file changed, 44 insertions(+), 52 deletions(-) diff --git a/cmd-car-split.go b/cmd-car-split.go index 5b46f49f..8e36fbd7 100644 --- a/cmd-car-split.go +++ b/cmd-car-split.go @@ -116,33 +116,14 @@ func newCmd_SplitCar() *cli.Command { createNewFile := func() error { if currentFile != nil { - subsetNode, err := qp.BuildMap(ipldbindcode.Prototypes.Subset, -1, func(ma datamodel.MapAssembler) { - qp.MapEntry(ma, "kind", qp.Int(int64(iplddecoders.KindSubset))) - qp.MapEntry(ma, "first", qp.Int(int64(currentSubsetInfo.firstSlot))) - qp.MapEntry(ma, "last", qp.Int(int64(currentSubsetInfo.lastSlot))) - qp.MapEntry(ma, "blocks", - qp.List(-1, func(la datamodel.ListAssembler) { - for _, bl := range currentSubsetInfo.blockLinks { - qp.ListEntry(la, qp.Link(bl)) - } - })) - }) + sl, err := writeSubsetNode(currentSubsetInfo, bufferedWriter) if err != nil { - return fmt.Errorf("failed to construct a subsetNode: %w", err) + return fmt.Errorf("failed to write subset node: %w", err) } + subsetLinks = append(subsetLinks, sl) - cid, err := writeNode(subsetNode, currentFile) + err = closeFile(bufferedWriter, currentFile) if err != nil { - return fmt.Errorf("failed to write a subsetNode: %w", err) - } - - subsetLinks = append(subsetLinks, cidlink.Link{Cid: cid}) - - if err = bufferedWriter.Flush(); err != nil { - return fmt.Errorf("failed to flush bufferedWriter: %w", err) - } - - if err = currentFile.Close(); err != nil { return fmt.Errorf("failed to close file: %w", err) } } @@ -244,27 +225,11 @@ func newCmd_SplitCar() *cli.Command { return fmt.Errorf("failed to run accumulator while accumulating objects: %w", err) } - subsetNode, err := qp.BuildMap(ipldbindcode.Prototypes.Subset, -1, func(ma datamodel.MapAssembler) { - qp.MapEntry(ma, "kind", qp.Int(int64(iplddecoders.KindSubset))) - qp.MapEntry(ma, "first", qp.Int(int64(currentSubsetInfo.firstSlot))) - qp.MapEntry(ma, "last", qp.Int(int64(currentSubsetInfo.lastSlot))) - qp.MapEntry(ma, "blocks", - qp.List(-1, func(la datamodel.ListAssembler) { - for _, bl := range currentSubsetInfo.blockLinks { - qp.ListEntry(la, qp.Link(bl)) - } - })) - }) - if err != nil { - return fmt.Errorf("failed to construct subsetNode: %w", err) - } - - cid, err := writeNode(subsetNode, bufferedWriter) + sl, err := writeSubsetNode(currentSubsetInfo, bufferedWriter) if err != nil { - return fmt.Errorf("failed to write subsetNode: %w", err) + return fmt.Errorf("failed to write subset node: %w", err) } - - subsetLinks = append(subsetLinks, cidlink.Link{Cid: cid}) + subsetLinks = append(subsetLinks, sl) epochNode, err := qp.BuildMap(ipldbindcode.Prototypes.Epoch, -1, func(ma datamodel.MapAssembler) { qp.MapEntry(ma, "kind", qp.Int(int64(iplddecoders.KindEpoch))) @@ -286,19 +251,46 @@ func newCmd_SplitCar() *cli.Command { return fmt.Errorf("failed to write epochNode: %w", err) } - err = bufferedWriter.Flush() - if err != nil { - return fmt.Errorf("failed to flush buffer: %w", err) - } + return closeFile(bufferedWriter, currentFile) + }, + } +} - err = currentFile.Close() - if err != nil { - return fmt.Errorf("failed to close file: %w", err) - } +func writeSubsetNode(currentSubsetInfo subsetInfo, bufferedWriter *bufio.Writer) (datamodel.Link, error) { + subsetNode, err := qp.BuildMap(ipldbindcode.Prototypes.Subset, -1, func(ma datamodel.MapAssembler) { + qp.MapEntry(ma, "kind", qp.Int(int64(iplddecoders.KindSubset))) + qp.MapEntry(ma, "first", qp.Int(int64(currentSubsetInfo.firstSlot))) + qp.MapEntry(ma, "last", qp.Int(int64(currentSubsetInfo.lastSlot))) + qp.MapEntry(ma, "blocks", + qp.List(-1, func(la datamodel.ListAssembler) { + for _, bl := range currentSubsetInfo.blockLinks { + qp.ListEntry(la, qp.Link(bl)) + } + })) + }) + if err != nil { + return nil, fmt.Errorf("failed to write a subsetNode: %w", err) + } - return nil - }, + cid, err := writeNode(subsetNode, bufferedWriter) + if err != nil { + return nil, fmt.Errorf("failed to write a subsetNode: %w", err) + } + + return cidlink.Link{Cid: cid}, nil +} + +func closeFile(bufferedWriter *bufio.Writer, currentFile *os.File) error { + err := bufferedWriter.Flush() + if err != nil { + return fmt.Errorf("failed to flush buffer: %w", err) + } + + err = currentFile.Close() + if err != nil { + return fmt.Errorf("failed to close file: %w", err) } + return nil } func writeNode(node datamodel.Node, w io.Writer) (cid.Cid, error) { From d7e416b663655fe444200c7cb166b1a34aa8a462 Mon Sep 17 00:00:00 2001 From: Anjor Kanekar Date: Thu, 11 Jul 2024 16:58:33 +0100 Subject: [PATCH 22/23] Update cmd-car-split.go Co-authored-by: gagliardetto --- cmd-car-split.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd-car-split.go b/cmd-car-split.go index 8e36fbd7..c3019c67 100644 --- a/cmd-car-split.go +++ b/cmd-car-split.go @@ -309,7 +309,7 @@ func writeNode(node datamodel.Node, w io.Writer) (cid.Cid, error) { return cid.Cid{}, err } - c := []byte(cd.KeyString()) + c := cd.Bytes() sizeVi := appendVarint(nil, uint64(len(c))+uint64(len(data))) From 10880de01485dde412219def901dd9ad85b58d5c Mon Sep 17 00:00:00 2001 From: anjor Date: Thu, 11 Jul 2024 23:13:15 +0100 Subject: [PATCH 23/23] use leb128 --- cmd-car-split.go | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/cmd-car-split.go b/cmd-car-split.go index c3019c67..3af2ce5a 100644 --- a/cmd-car-split.go +++ b/cmd-car-split.go @@ -9,6 +9,7 @@ import ( "io/fs" "os" + "github.com/filecoin-project/go-leb128" "github.com/ipfs/go-cid" "github.com/ipld/go-ipld-prime/codec/dagcbor" "github.com/ipld/go-ipld-prime/datamodel" @@ -311,7 +312,7 @@ func writeNode(node datamodel.Node, w io.Writer) (cid.Cid, error) { c := cd.Bytes() - sizeVi := appendVarint(nil, uint64(len(c))+uint64(len(data))) + sizeVi := leb128.FromUInt64(uint64(len(c)) + uint64(len(data))) if _, err := w.Write(sizeVi); err == nil { if _, err := w.Write(c); err == nil { @@ -323,11 +324,3 @@ func writeNode(node datamodel.Node, w io.Writer) (cid.Cid, error) { } return cd, nil } - -func appendVarint(tgt []byte, v uint64) []byte { - for v > 127 { - tgt = append(tgt, byte(v|128)) - v >>= 7 - } - return append(tgt, byte(v)) -}