Skip to content

Commit

Permalink
Merge remote-tracking branch 'gagliardetto/multiepoch' into multiepoch
Browse files Browse the repository at this point in the history
  • Loading branch information
linuskendall committed Aug 23, 2023
2 parents 27dacd0 + a24137c commit 186a49b
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 38 deletions.
1 change: 1 addition & 0 deletions epoch.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,7 @@ func (ser *Epoch) FindOffsetFromCid(ctx context.Context, cid cid.Cid) (uint64, e
if err != nil {
return 0, err
}
klog.Infof("found offset for CID %s: %d", cid, found)
ser.putCidToOffsetInCache(cid, found)
return found, nil
}
Expand Down
54 changes: 33 additions & 21 deletions http-range.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import (
"fmt"
"io"
"net/http"
"os"
"path/filepath"
"strings"
"time"
)

Expand Down Expand Up @@ -70,26 +72,15 @@ func remoteHTTPFileAsIoReaderAt(ctx context.Context, url string) (ReaderAtCloser
client: newHTTPClient(),
}

rc := NewRangeCache(contentLength, func(p []byte, off int64) (n int, err error) {
return remoteReadAt(rr.client, rr.url, p, off)
})
rc := NewRangeCache(
contentLength,
filepath.Base(url),
func(p []byte, off int64) (n int, err error) {
return remoteReadAt(rr.client, rr.url, p, off)
})
rc.StartCacheGC(ctx, 1*time.Minute)
rr.ca = rc

// try prefetching the first n MiB:
{
MiB := int64(1024 * 1024)
prefetchSize := MiB
if prefetchSize > contentLength {
prefetchSize = contentLength
}
prefetchBuf := make([]byte, prefetchSize)
_, err := rr.ReadAt(prefetchBuf, 0)
if err != nil {
return nil, err
}
}

return rr, nil
}

Expand Down Expand Up @@ -184,13 +175,34 @@ type readCloserWrapper struct {

// when reading print a dot
func (r *readCloserWrapper) ReadAt(p []byte, off int64) (n int, err error) {
if DebugMode {
fmt.Print("·")
fmt.Printf("%s:%d-%d\n", filepath.Base(r.name), off, len(p))
}
startedAt := time.Now()
defer func() {
if DebugMode {
prefix := "[READ-UNKNOWN]"
// if has suffix .index, then it's an index file
if strings.HasSuffix(r.name, ".index") {
prefix = azureBG("[READ-INDEX]")
}
// if has suffix .car, then it's a car file
if strings.HasSuffix(r.name, ".car") {
prefix = purpleBG("[READ-CAR]")
}
fmt.Fprintf(os.Stderr, prefix+" %s:%d+%d (%s)\n", filepath.Base(r.name), off, len(p), time.Since(startedAt))
}
}()
return r.rac.ReadAt(p, off)
}

func purpleBG(s string) string {
// blue bg, black fg
return "\033[48;5;4m\033[38;5;0m" + s + "\033[0m"
}

func azureBG(s string) string {
// azure bg, black fg
return "\033[48;5;6m\033[38;5;0m" + s + "\033[0m"
}

// when closing print a newline
func (r *readCloserWrapper) Close() error {
return r.rac.Close()
Expand Down
24 changes: 16 additions & 8 deletions multiepoch-getBlock.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ func (multi *MultiEpoch) handleGetBlock(ctx context.Context, conn *requestContex
if err != nil {
return err
}
klog.Infof("%s -> %s", parentCid, blockCid)
{
var blockOffset, parentOffset uint64
wg := new(errgroup.Group)
Expand Down Expand Up @@ -117,18 +118,25 @@ func (multi *MultiEpoch) handleGetBlock(ctx context.Context, conn *requestContex
}

length := blockOffset - parentOffset
// cap the length to 1GB
GiB := uint64(1024 * 1024 * 1024)
if length > GiB {
length = GiB
MiB := uint64(1024 * 1024)
maxSize := MiB * 100
if length > maxSize {
length = maxSize
}
klog.Infof("prefetching %d bytes from %d", length, parentOffset)
carSection, err := epochHandler.ReadAtFromCar(ctx, parentOffset, length)

idealEntrySize := uint64(36190)
maybeOffsetOfLastEntry := parentOffset - idealEntrySize
length += idealEntrySize

klog.Infof("prefetching CAR: start=%d length=%d (parent_offset=%d)", maybeOffsetOfLastEntry, length, parentOffset)
carSection, err := epochHandler.ReadAtFromCar(ctx, maybeOffsetOfLastEntry, length)
if err != nil {
return err
}
dr := bytes.NewReader(carSection)

{
dr.Seek(int64(idealEntrySize), io.SeekStart)
}
br := bufio.NewReader(dr)

gotCid, data, err := util.ReadNode(br)
Expand Down Expand Up @@ -393,7 +401,7 @@ func (multi *MultiEpoch) handleGetBlock(ctx context.Context, conn *requestContex
blockResp.PreviousBlockhash = &parentEntryHash
}
} else {
klog.Infof("parent slot is in a different epoch, not implemented yet")
klog.Infof("parent slot is in a different epoch, not implemented yet (can't get previousBlockhash)")
}
}
tim.time("get parent block")
Expand Down
41 changes: 36 additions & 5 deletions range-cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"context"
"fmt"
"os"
"sync"
"time"
)
Expand All @@ -11,6 +12,7 @@ type RangeCache struct {
mu sync.RWMutex
// the size of the file.
size int64
name string

occupiedSpace uint64
remoteFetcher func(p []byte, off int64) (n int, err error)
Expand Down Expand Up @@ -39,12 +41,17 @@ func (r Range) isValidFor(size int64) bool {
}

// NewRangeCache creates a new RangeCache.
func NewRangeCache(size int64, fetcher func(p []byte, off int64) (n int, err error)) *RangeCache {
func NewRangeCache(
size int64,
name string,
fetcher func(p []byte, off int64) (n int, err error),
) *RangeCache {
if fetcher == nil {
panic("fetcher must not be nil")
}
return &RangeCache{
size: size,
name: name,
cache: make(map[Range]RangeCacheEntry),
remoteFetcher: fetcher,
}
Expand Down Expand Up @@ -141,7 +148,13 @@ func (rc *RangeCache) GetRange(ctx context.Context, start, ln int64) ([]byte, er
end := start + ln
got, err := rc.getRange(ctx, start, end, func() ([]byte, error) {
v := make([]byte, end-start)
debugLn(orange("cache MISS; reading from reader"), start, end, end-start)
debugf(
orange("[cache-MISS] reading from source %s: start=%d end=%d len=%d\n"),
rc.name,
start,
end,
end-start,
)
_, err := rc.remoteFetcher(v, start)
if err == nil {
cloned := clone(v)
Expand All @@ -160,7 +173,13 @@ func (rc *RangeCache) GetRange(ctx context.Context, start, ln int64) ([]byte, er

func debugLn(a ...interface{}) {
if DebugMode {
fmt.Println(a...)
fmt.Fprintln(os.Stderr, a...)
}
}

func debugf(format string, a ...interface{}) {
if DebugMode {
fmt.Fprintf(os.Stderr, format, a...)
}
}

Expand Down Expand Up @@ -202,7 +221,13 @@ func (rc *RangeCache) getRangeFromCache(ctx context.Context, start, end int64) (
return nil, false, nil
}
if v, ok := rc.cache[Range{start, end}]; ok {
debugLn(lime("exact cache HIT"), start, end, end-start)
debugf(
lime("[exact-cache-HIT] for %s: start=%d end=%d len=%d\n"),
rc.name,
start,
end,
end-start,
)
return clone(v.Value), true, nil
}
{
Expand All @@ -212,7 +237,13 @@ func (rc *RangeCache) getRangeFromCache(ctx context.Context, start, end int64) (
return nil, false, ctx.Err()
}
if r.contains(Range{start, end}) {
debugLn(lime("cache HIT for a superset of this range"), start, end, end-start)
debugf(
lime("[cache-HIT] range superset in %s: start=%d end=%d len=%d\n"),
rc.name,
start,
end,
end-start,
)
return clone(rc.cache[r].Value[start-r[0] : end-r[0]]), true, nil
}
}
Expand Down
15 changes: 11 additions & 4 deletions storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ import (
func openIndexStorage(ctx context.Context, where string) (ReaderAtCloser, error) {
where = strings.TrimSpace(where)
if strings.HasPrefix(where, "http://") || strings.HasPrefix(where, "https://") {
klog.Infof("opening file from %q as HTTP remote file", where)
klog.Infof("opening index file from %q as HTTP remote file", where)
rac, err := remoteHTTPFileAsIoReaderAt(ctx, where)
if err != nil {
return nil, fmt.Errorf("failed to open index file: %w", err)
return nil, fmt.Errorf("failed to open remote index file: %w", err)
}
return &readCloserWrapper{
rac: rac,
Expand All @@ -41,7 +41,7 @@ func openIndexStorage(ctx context.Context, where string) (ReaderAtCloser, error)
// TODO: add support for Filecoin gateways.
rac, err := mmap.Open(where)
if err != nil {
return nil, fmt.Errorf("failed to open index file: %w", err)
return nil, fmt.Errorf("failed to open local index file: %w", err)
}
return &readCloserWrapper{
rac: rac,
Expand All @@ -52,8 +52,15 @@ func openIndexStorage(ctx context.Context, where string) (ReaderAtCloser, error)
func openCarStorage(ctx context.Context, where string) (*carv2.Reader, ReaderAtCloser, error) {
where = strings.TrimSpace(where)
if strings.HasPrefix(where, "http://") || strings.HasPrefix(where, "https://") {
klog.Infof("opening CAR file from %q as HTTP remote file", where)
rem, err := remoteHTTPFileAsIoReaderAt(ctx, where)
return nil, rem, err
if err != nil {
return nil, nil, fmt.Errorf("failed to open remote CAR file: %w", err)
}
return nil, &readCloserWrapper{
rac: rem,
name: where,
}, nil
}
// TODO: add support for IPFS gateways.
// TODO: add support for Filecoin gateways.
Expand Down

0 comments on commit 186a49b

Please sign in to comment.