Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add read stats for split cars & speed up loading #91

Merged
merged 3 commits into from
Feb 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 35 additions & 8 deletions epoch.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,10 +332,23 @@ func NewEpochFromConfig(
minerIP := fmt.Sprintf("%s:%s", ip, port)
klog.V(3).Infof("piece CID %s is stored on miner %s (%s)", piece.CommP, minerID, minerIP)
formattedURL := fmt.Sprintf("http://%s/piece/%s", minerIP, piece.CommP.String())
return splitcarfetcher.NewRemoteFileSplitCarReader(
piece.CommP.String(),
formattedURL,
)

{
rfspc, _, err := splitcarfetcher.NewRemoteHTTPFileAsIoReaderAt(
c.Context,
formattedURL,
)
if err != nil {
return nil, fmt.Errorf("failed to create remote file split car reader from %q: %w", formattedURL, err)
}

return &readCloserWrapper{
rac: rfspc,
name: formattedURL,
size: rfspc.Size(),
isSplitCar: true,
}, nil
}
})
if err != nil {
return nil, fmt.Errorf("failed to open CAR file from pieces: %w", err)
Expand All @@ -350,10 +363,24 @@ func NewEpochFromConfig(
if !ok {
return nil, fmt.Errorf("failed to find URL for piece CID %s", piece.CommP)
}
return splitcarfetcher.NewRemoteFileSplitCarReader(
piece.CommP.String(),
pieceURL.URI.String(),
)

{
formattedURL := pieceURL.URI.String()
rfspc, _, err := splitcarfetcher.NewRemoteHTTPFileAsIoReaderAt(
c.Context,
formattedURL,
)
if err != nil {
return nil, fmt.Errorf("failed to create remote file split car reader from %q: %w", formattedURL, err)
}

return &readCloserWrapper{
rac: rfspc,
name: formattedURL,
size: rfspc.Size(),
isSplitCar: true,
}, nil
}
})
if err != nil {
return nil, fmt.Errorf("failed to open CAR file from pieces: %w", err)
Expand Down
43 changes: 0 additions & 43 deletions http-client.go
Original file line number Diff line number Diff line change
@@ -1,44 +1 @@
package main

import (
"net"
"net/http"
"time"

"github.com/klauspost/compress/gzhttp"
)

var (
defaultMaxIdleConnsPerHost = 100
defaultTimeout = 1000 * time.Second
defaultKeepAlive = 180 * time.Second
)

func newHTTPTransport() *http.Transport {
return &http.Transport{
IdleConnTimeout: time.Minute,
MaxConnsPerHost: defaultMaxIdleConnsPerHost,
MaxIdleConnsPerHost: defaultMaxIdleConnsPerHost,
MaxIdleConns: defaultMaxIdleConnsPerHost,
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: defaultTimeout,
KeepAlive: defaultKeepAlive,
}).DialContext,
ForceAttemptHTTP2: true,
// MaxIdleConns: 100,
TLSHandshakeTimeout: 10 * time.Second,
// ExpectContinueTimeout: 1 * time.Second,
}
}

// newHTTPClient returns a new Client from the provided config.
// Client is safe for concurrent use by multiple goroutines.
func newHTTPClient() *http.Client {
tr := newHTTPTransport()

return &http.Client{
Timeout: defaultTimeout,
Transport: gzhttp.Transport(tr),
}
}
179 changes: 14 additions & 165 deletions http-range.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,11 @@
package main

import (
"context"
"fmt"
"io"
"net/http"
"path/filepath"
"strings"
"time"

"github.com/goware/urlx"
"k8s.io/klog/v2"
)

Expand All @@ -18,167 +14,16 @@ type ReaderAtCloser interface {
io.Closer
}

func getContentSizeWithHeadOrZeroRange(url string) (int64, error) {
// try sending a HEAD request to the server to get the file size:
resp, err := http.Head(url)
if err != nil {
return 0, err
}
if resp.StatusCode != http.StatusOK {
// try sending a GET request with a zero range to the server to get the file size:
req := &http.Request{
Method: "GET",
URL: resp.Request.URL,
Header: make(http.Header),
}
req.Header.Set("Range", "bytes=0-0")
resp, err = http.DefaultClient.Do(req)
if err != nil {
return 0, err
}
if resp.StatusCode != http.StatusPartialContent {
return 0, fmt.Errorf("unexpected status code: %d", resp.StatusCode)
}
// now find the content length:
contentRange := resp.Header.Get("Content-Range")
if contentRange == "" {
return 0, fmt.Errorf("missing Content-Range header")
}
var contentLength int64
_, err := fmt.Sscanf(contentRange, "bytes 0-0/%d", &contentLength)
if err != nil {
return 0, err
}
return contentLength, nil
}
return resp.ContentLength, nil
}

// remoteHTTPFileAsIoReaderAt returns a ReaderAtCloser for a remote file.
// The returned ReaderAtCloser is backed by a http.Client.
func remoteHTTPFileAsIoReaderAt(ctx context.Context, url string) (ReaderAtCloser, error) {
// send a request to the server to get the file size:
contentLength, err := getContentSizeWithHeadOrZeroRange(url)
if err != nil {
return nil, err
}
if contentLength == 0 {
return nil, fmt.Errorf("missing Content-Length/Content-Range header, or file is empty")
}

// Create a cache with a default expiration time of 5 minutes, and which
// purges expired items every 10 minutes
rr := &HTTPSingleFileRemoteReaderAt{
url: url,
contentLength: contentLength,
client: newHTTPClient(),
}
parsedURL, err := urlx.Parse(url)
if err != nil {
return nil, err
}
name := filepath.Base(parsedURL.Path)

rc := NewRangeCache(
contentLength,
name,
func(p []byte, off int64) (n int, err error) {
return remoteReadAt(rr.client, rr.url, p, off)
})
rc.StartCacheGC(ctx, 1*time.Minute)
rr.ca = rc

return rr, nil
}

type HTTPSingleFileRemoteReaderAt struct {
url string
contentLength int64
client *http.Client
ca *RangeCache
}

// Close implements io.Closer.
func (r *HTTPSingleFileRemoteReaderAt) Close() error {
r.client.CloseIdleConnections()
return r.ca.Close()
}

func retryExpotentialBackoff(
ctx context.Context,
startDuration time.Duration,
maxRetries int,
fn func() error,
) error {
var err error
for i := 0; i < maxRetries; i++ {
err = fn()
if err == nil {
return nil
}
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(startDuration):
startDuration *= 2
}
}
return fmt.Errorf("failed after %d retries; last error: %w", maxRetries, err)
}

func (r *HTTPSingleFileRemoteReaderAt) ReadAt(p []byte, off int64) (n int, err error) {
if off >= r.contentLength {
return 0, io.EOF
}
v, err := r.ca.GetRange(context.Background(), off, int64(len(p)))
if err != nil {
return 0, err
}
n = copy(p, v)
if n < len(p) {
return n, io.ErrUnexpectedEOF
}
return n, nil
}

func remoteReadAt(client *http.Client, url string, p []byte, off int64) (n int, err error) {
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return 0, err
}
{
req.Header.Set("Connection", "keep-alive")
req.Header.Set("Keep-Alive", "timeout=600")
}

req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", off, off+int64(len(p))))

var resp *http.Response
err = retryExpotentialBackoff(
context.Background(),
100*time.Millisecond,
3,
func() error {
resp, err = client.Do(req)
return err
})
if err != nil {
return 0, err
}
defer resp.Body.Close()
{
n, err := io.ReadFull(resp.Body, p)
if err != nil {
return 0, err
}
return n, nil
}
type readCloserWrapper struct {
rac ReaderAtCloser
isRemote bool
isSplitCar bool
name string
size int64
}

type readCloserWrapper struct {
rac ReaderAtCloser
isRemote bool
name string
func (r *readCloserWrapper) Size() int64 {
return r.size
}

// when reading print a dot
Expand All @@ -201,8 +46,12 @@ func (r *readCloserWrapper) ReadAt(p []byte, off int64) (n int, err error) {
prefix = icon + azureBG("[READ-INDEX]")
}
// if has suffix .car, then it's a car file
if strings.HasSuffix(r.name, ".car") {
prefix = icon + purpleBG("[READ-CAR]")
if strings.HasSuffix(r.name, ".car") || r.isSplitCar {
if r.isSplitCar {
prefix = icon + azureBG("[READ-SPLIT-CAR]")
} else {
prefix = icon + purpleBG("[READ-CAR]")
}
}
klog.V(5).Infof(prefix+" %s:%d+%d (%s)\n", filepath.Base(r.name), off, len(p), took)
}
Expand Down
2 changes: 1 addition & 1 deletion range-cache.go → range-cache/range-cache.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package main
package rangecache

import (
"context"
Expand Down
2 changes: 1 addition & 1 deletion range-cache_test.go → range-cache/range-cache_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package main
package rangecache

import (
"bytes"
Expand Down
56 changes: 1 addition & 55 deletions split-car-fetcher/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,60 +100,6 @@ func GetContentSizeWithHeadOrZeroRange(url string) (int64, error) {
return resp.ContentLength, nil
}

type RemoteFileSplitCarReader struct {
commP string
url string
size int64
httpClient *http.Client
}

func NewRemoteFileSplitCarReader(commP string, url string) (*RemoteFileSplitCarReader, error) {
size, err := GetContentSizeWithHeadOrZeroRange(url)
if err != nil {
return nil, fmt.Errorf("failed to get content size from %q: %s", url, err)
}
return &RemoteFileSplitCarReader{
commP: commP,
url: url,
size: size,
httpClient: http.DefaultClient,
}, nil
}

func (fscr *RemoteFileSplitCarReader) ReadAt(p []byte, off int64) (n int, err error) {
req, err := http.NewRequest("GET", fscr.url, nil)
if err != nil {
return 0, err
}
req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", off, off+int64(len(p))-1))
{
req.Header.Set("Connection", "keep-alive")
req.Header.Set("Keep-Alive", "timeout=600")
}
resp, err := fscr.httpClient.Do(req)
if err != nil {
return 0, err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusPartialContent {
return 0, fmt.Errorf("GET %q: unexpected status code: %d", fscr.url, resp.StatusCode)
}
n, err = io.ReadFull(resp.Body, p)
if err != nil {
return 0, err
}
return n, nil
}

func (fscr *RemoteFileSplitCarReader) Close() error {
fscr.httpClient.CloseIdleConnections()
return nil
}

func (fscr *RemoteFileSplitCarReader) Size() int64 {
return fscr.size
}

func NewSplitCarReader(
files *carlet.CarPiecesAndMetadata,
readerCreator SplitCarFileReaderCreator,
Expand Down Expand Up @@ -216,7 +162,7 @@ func NewSplitCarReader(
}

// if remote, then the file must be at least as header size + content size:
if _, ok := fi.(*RemoteFileSplitCarReader); ok {
if _, ok := fi.(*HTTPSingleFileRemoteReaderAt); ok {
expectedMinSize := int(cf.HeaderSize) + int(cf.ContentSize)
if size < expectedMinSize {
return nil, fmt.Errorf(
Expand Down
Loading
Loading