From a24137c066f18817322ced65c997a8c1194e692a Mon Sep 17 00:00:00 2001 From: gagliardetto Date: Wed, 23 Aug 2023 14:11:05 +0200 Subject: [PATCH] Remove one roundtrip when using remote CAR files --- epoch.go | 1 + http-range.go | 9 ++++++--- multiepoch-getBlock.go | 24 ++++++++++++++++-------- range-cache.go | 17 +++++++++++++---- 4 files changed, 36 insertions(+), 15 deletions(-) diff --git a/epoch.go b/epoch.go index 15945f1f..435548ca 100644 --- a/epoch.go +++ b/epoch.go @@ -365,6 +365,7 @@ func (ser *Epoch) FindOffsetFromCid(ctx context.Context, cid cid.Cid) (uint64, e if err != nil { return 0, err } + klog.Infof("found offset for CID %s: %d", cid, found) ser.putCidToOffsetInCache(cid, found) return found, nil } diff --git a/http-range.go b/http-range.go index 1da06d99..c66e6a0b 100644 --- a/http-range.go +++ b/http-range.go @@ -72,9 +72,12 @@ func remoteHTTPFileAsIoReaderAt(ctx context.Context, url string) (ReaderAtCloser client: newHTTPClient(), } - rc := NewRangeCache(contentLength, func(p []byte, off int64) (n int, err error) { - return remoteReadAt(rr.client, rr.url, p, off) - }) + rc := NewRangeCache( + contentLength, + filepath.Base(url), + func(p []byte, off int64) (n int, err error) { + return remoteReadAt(rr.client, rr.url, p, off) + }) rc.StartCacheGC(ctx, 1*time.Minute) rr.ca = rc diff --git a/multiepoch-getBlock.go b/multiepoch-getBlock.go index 70d1a832..33a08ef8 100644 --- a/multiepoch-getBlock.go +++ b/multiepoch-getBlock.go @@ -88,6 +88,7 @@ func (multi *MultiEpoch) handleGetBlock(ctx context.Context, conn *requestContex if err != nil { return err } + klog.Infof("%s -> %s", parentCid, blockCid) { var blockOffset, parentOffset uint64 wg := new(errgroup.Group) @@ -117,18 +118,25 @@ func (multi *MultiEpoch) handleGetBlock(ctx context.Context, conn *requestContex } length := blockOffset - parentOffset - // cap the length to 1GB - GiB := uint64(1024 * 1024 * 1024) - if length > GiB { - length = GiB + MiB := uint64(1024 * 1024) + maxSize := MiB * 100 + if length > maxSize { + length = maxSize } - klog.Infof("prefetching %d bytes from %d", length, parentOffset) - carSection, err := epochHandler.ReadAtFromCar(ctx, parentOffset, length) + + idealEntrySize := uint64(36190) + maybeOffsetOfLastEntry := parentOffset - idealEntrySize + length += idealEntrySize + + klog.Infof("prefetching CAR: start=%d length=%d (parent_offset=%d)", maybeOffsetOfLastEntry, length, parentOffset) + carSection, err := epochHandler.ReadAtFromCar(ctx, maybeOffsetOfLastEntry, length) if err != nil { return err } dr := bytes.NewReader(carSection) - + { + dr.Seek(int64(idealEntrySize), io.SeekStart) + } br := bufio.NewReader(dr) gotCid, data, err := util.ReadNode(br) @@ -393,7 +401,7 @@ func (multi *MultiEpoch) handleGetBlock(ctx context.Context, conn *requestContex blockResp.PreviousBlockhash = &parentEntryHash } } else { - klog.Infof("parent slot is in a different epoch, not implemented yet") + klog.Infof("parent slot is in a different epoch, not implemented yet (can't get previousBlockhash)") } } tim.time("get parent block") diff --git a/range-cache.go b/range-cache.go index b3bdc87b..5e3bee2d 100644 --- a/range-cache.go +++ b/range-cache.go @@ -12,6 +12,7 @@ type RangeCache struct { mu sync.RWMutex // the size of the file. size int64 + name string occupiedSpace uint64 remoteFetcher func(p []byte, off int64) (n int, err error) @@ -40,12 +41,17 @@ func (r Range) isValidFor(size int64) bool { } // NewRangeCache creates a new RangeCache. -func NewRangeCache(size int64, fetcher func(p []byte, off int64) (n int, err error)) *RangeCache { +func NewRangeCache( + size int64, + name string, + fetcher func(p []byte, off int64) (n int, err error), +) *RangeCache { if fetcher == nil { panic("fetcher must not be nil") } return &RangeCache{ size: size, + name: name, cache: make(map[Range]RangeCacheEntry), remoteFetcher: fetcher, } @@ -143,7 +149,8 @@ func (rc *RangeCache) GetRange(ctx context.Context, start, ln int64) ([]byte, er got, err := rc.getRange(ctx, start, end, func() ([]byte, error) { v := make([]byte, end-start) debugf( - orange("[cache-MISS] going to read from original reader: start=%d end=%d len=%d\n"), + orange("[cache-MISS] reading from source %s: start=%d end=%d len=%d\n"), + rc.name, start, end, end-start, @@ -215,7 +222,8 @@ func (rc *RangeCache) getRangeFromCache(ctx context.Context, start, end int64) ( } if v, ok := rc.cache[Range{start, end}]; ok { debugf( - lime("exact cache HIT: start=%d end=%d len=%d\n"), + lime("[exact-cache-HIT] for %s: start=%d end=%d len=%d\n"), + rc.name, start, end, end-start, @@ -230,7 +238,8 @@ func (rc *RangeCache) getRangeFromCache(ctx context.Context, start, end int64) ( } if r.contains(Range{start, end}) { debugf( - lime("[cache-HIT] for a superset of this range: start=%d end=%d len=%d\n"), + lime("[cache-HIT] range superset in %s: start=%d end=%d len=%d\n"), + rc.name, start, end, end-start,