Skip to content

Commit

Permalink
sort car urls (#172)
Browse files Browse the repository at this point in the history
* sort car urls

* fix

* refactor

* rename

* add size to return struct

* sort cars test

* urls

* fix reading

* add car file fixtures

* more tests

* use existing func

* cr
  • Loading branch information
anjor authored Oct 26, 2024
1 parent 43c545f commit 8847406
Show file tree
Hide file tree
Showing 5 changed files with 407 additions and 12 deletions.
162 changes: 150 additions & 12 deletions cmd-car-split.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"fmt"
"io"
"io/fs"
"net/http"
"os"
"path/filepath"
"sort"
Expand Down Expand Up @@ -44,7 +45,8 @@ var (
Roots: []cid.Cid{CBOR_SHA256_DUMMY_CID}, // placeholder
Version: 1,
}
hdrSize, _ = car.HeaderSize(hdr)
hdrSize, _ = car.HeaderSize(hdr)
maxSectionSize = 2 << 20 // 2 MiB
)

const maxLinks = 432000 / 18 // 18 subsets
Expand Down Expand Up @@ -532,11 +534,13 @@ func readHeader(streamBuf *bufio.Reader) ([]byte, int64, error) {
return headerBuf.Bytes(), streamLen, nil
}

func SortCarFiles(carFiles []string) ([]string, error) {
type carFileInfo struct {
path string
firstSlot int64
}
type carFileInfo struct {
name string
firstSlot int64
size int64
}

func SortCarFiles(carFiles []string) ([]carFileInfo, error) {

var fileInfos []carFileInfo

Expand Down Expand Up @@ -584,9 +588,15 @@ func SortCarFiles(carFiles []string) ([]string, error) {
return nil, fmt.Errorf("failed to find root node in file %s", path)
}

fi, err := file.Stat()
if err != nil {
return nil, fmt.Errorf("failed to get file info for %s: %w", path, err)
}

fileInfos = append(fileInfos, carFileInfo{
path: path,
name: path,
firstSlot: int64(subset.First),
size: fi.Size(),
})
}

Expand All @@ -595,11 +605,139 @@ func SortCarFiles(carFiles []string) ([]string, error) {
return fileInfos[i].firstSlot < fileInfos[j].firstSlot
})

// Extract the sorted file paths
sortedFiles := make([]string, len(fileInfos))
for i, info := range fileInfos {
sortedFiles[i] = info.path
return fileInfos, nil
}

func SortCarURLs(carURLs []string) ([]carFileInfo, error) {
var urlInfos []carFileInfo

for _, url := range carURLs {
firstSlot, size, err := getSlotAndSizeFromURL(url)
if err != nil {
return nil, fmt.Errorf("failed to get first slot from URL %s: %w", url, err)
}

urlInfos = append(urlInfos, carFileInfo{
name: url,
firstSlot: firstSlot,
size: size,
})
}

// Sort the URL infos based on the firstSlot
sort.Slice(urlInfos, func(i, j int) bool {
return urlInfos[i].firstSlot < urlInfos[j].firstSlot
})

return urlInfos, nil
}

func getSlotAndSizeFromURL(url string) (int64, int64, error) {
fileSize, err := splitcarfetcher.GetContentSizeWithHeadOrZeroRange(url)
if err != nil {
return 0, 0, fmt.Errorf("failed to get file size: %w", err)
}

rootCID, err := getRootCid(url)
if err != nil {
return 0, 0, fmt.Errorf("failed to get root CID: %w", err)
}

endOffset := getEndOffset(fileSize)

partialContent, err := fetchFromOffset(url, endOffset)
if err != nil {
return 0, 0, fmt.Errorf("failed to fetch partial content: %w", err)
}

cidBytes := rootCID.Bytes()
index := bytes.LastIndex(partialContent, cidBytes)
if index == -1 {
return 0, 0, fmt.Errorf("CID block not found in the last 2MiB of the file")
}
blockData := partialContent[index-2:]
r := bufio.NewReader(bytes.NewBuffer(blockData))
cid, _, data, err := carreader.ReadNodeInfoWithData(r)
if err != nil {
return 0, 0, fmt.Errorf("failed to read node info: %w", err)
}
if cid != rootCID {
return 0, 0, fmt.Errorf("expected CID %s, got %s", rootCID, cid)
}

// Decode the Subset
subset, err := iplddecoders.DecodeSubset(data)
if err != nil {
return 0, 0, fmt.Errorf("failed to decode Subset from block: %w", err)
}

return int64(subset.First), fileSize, nil
}

func getRootCid(url string) (cid.Cid, error) {
// Make a GET request for the beginning of the file
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return cid.Undef, fmt.Errorf("failed to create request: %w", err)
}

// Request only the first hdrSize bytes
req.Header.Set("Range", fmt.Sprintf("bytes=0-%d", hdrSize))

resp, err := http.DefaultClient.Do(req)
if err != nil {
return cid.Undef, fmt.Errorf("failed to fetch CAR file header: %w", err)
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusPartialContent {
return cid.Undef, fmt.Errorf("server does not support range requests")
}

// Read the header content
headerContent, err := io.ReadAll(resp.Body)
if err != nil {
return cid.Undef, fmt.Errorf("failed to read header content: %w", err)
}

// Parse the CAR header
rc := io.NopCloser(bytes.NewReader(headerContent))
cr, err := carreader.New(rc)
if err != nil {
return cid.Undef, fmt.Errorf("failed to create CarReader: %w", err)
}

roots := cr.Header.Roots
if len(roots) != 1 {
return cid.Undef, fmt.Errorf("expected 1 root CID, got %d", len(roots))
}
rootCID := roots[0]

return rootCID, nil
}

func getEndOffset(fileSize int64) int64 {
eo := fileSize - int64(maxSectionSize)
return max(eo, 0)
}

func fetchFromOffset(url string, offset int64) ([]byte, error) {
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return nil, fmt.Errorf("failed to create request: %w", err)
}

req.Header.Set("Range", fmt.Sprintf("bytes=%d-", offset))

resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, fmt.Errorf("failed to fetch CAR file: %w", err)
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusPartialContent {
return nil, fmt.Errorf("server does not support range requests")
}

return sortedFiles, nil
return io.ReadAll(resp.Body)
}
Loading

0 comments on commit 8847406

Please sign in to comment.