From 9952980dcf6ebfafd889c00417721bd78a25550b Mon Sep 17 00:00:00 2001 From: gagliardetto Date: Wed, 23 Aug 2023 11:56:29 +0200 Subject: [PATCH 1/5] Improve debug --- http-range.go | 11 +++++++---- storage.go | 15 +++++++++++---- 2 files changed, 18 insertions(+), 8 deletions(-) diff --git a/http-range.go b/http-range.go index 8df50cae..8b885959 100644 --- a/http-range.go +++ b/http-range.go @@ -5,6 +5,7 @@ import ( "fmt" "io" "net/http" + "os" "path/filepath" "time" ) @@ -184,10 +185,12 @@ type readCloserWrapper struct { // when reading print a dot func (r *readCloserWrapper) ReadAt(p []byte, off int64) (n int, err error) { - if DebugMode { - fmt.Print("ยท") - fmt.Printf("%s:%d-%d\n", filepath.Base(r.name), off, len(p)) - } + startedAt := time.Now() + defer func() { + if DebugMode { + fmt.Fprintf(os.Stderr, "read %s:%d-%d (%s)\n", filepath.Base(r.name), off, len(p), time.Since(startedAt)) + } + }() return r.rac.ReadAt(p, off) } diff --git a/storage.go b/storage.go index 3a61c0dd..e6a18b97 100644 --- a/storage.go +++ b/storage.go @@ -27,10 +27,10 @@ import ( func openIndexStorage(ctx context.Context, where string) (ReaderAtCloser, error) { where = strings.TrimSpace(where) if strings.HasPrefix(where, "http://") || strings.HasPrefix(where, "https://") { - klog.Infof("opening file from %q as HTTP remote file", where) + klog.Infof("opening index file from %q as HTTP remote file", where) rac, err := remoteHTTPFileAsIoReaderAt(ctx, where) if err != nil { - return nil, fmt.Errorf("failed to open index file: %w", err) + return nil, fmt.Errorf("failed to open remote index file: %w", err) } return &readCloserWrapper{ rac: rac, @@ -41,7 +41,7 @@ func openIndexStorage(ctx context.Context, where string) (ReaderAtCloser, error) // TODO: add support for Filecoin gateways. rac, err := mmap.Open(where) if err != nil { - return nil, fmt.Errorf("failed to open index file: %w", err) + return nil, fmt.Errorf("failed to open local index file: %w", err) } return &readCloserWrapper{ rac: rac, @@ -52,8 +52,15 @@ func openIndexStorage(ctx context.Context, where string) (ReaderAtCloser, error) func openCarStorage(ctx context.Context, where string) (*carv2.Reader, ReaderAtCloser, error) { where = strings.TrimSpace(where) if strings.HasPrefix(where, "http://") || strings.HasPrefix(where, "https://") { + klog.Infof("opening CAR file from %q as HTTP remote file", where) rem, err := remoteHTTPFileAsIoReaderAt(ctx, where) - return nil, rem, err + if err != nil { + return nil, nil, fmt.Errorf("failed to open remote CAR file: %w", err) + } + return nil, &readCloserWrapper{ + rac: rem, + name: where, + }, nil } // TODO: add support for IPFS gateways. // TODO: add support for Filecoin gateways. From 973d3d4f8f28cdfd6451884ace304df4df53e2bb Mon Sep 17 00:00:00 2001 From: gagliardetto Date: Wed, 23 Aug 2023 12:22:23 +0200 Subject: [PATCH 2/5] Improve debug --- http-range.go | 36 +++++++++++++++++++++--------------- range-cache.go | 4 ++-- 2 files changed, 23 insertions(+), 17 deletions(-) diff --git a/http-range.go b/http-range.go index 8b885959..b056f7b8 100644 --- a/http-range.go +++ b/http-range.go @@ -7,6 +7,7 @@ import ( "net/http" "os" "path/filepath" + "strings" "time" ) @@ -77,20 +78,6 @@ func remoteHTTPFileAsIoReaderAt(ctx context.Context, url string) (ReaderAtCloser rc.StartCacheGC(ctx, 1*time.Minute) rr.ca = rc - // try prefetching the first n MiB: - { - MiB := int64(1024 * 1024) - prefetchSize := MiB - if prefetchSize > contentLength { - prefetchSize = contentLength - } - prefetchBuf := make([]byte, prefetchSize) - _, err := rr.ReadAt(prefetchBuf, 0) - if err != nil { - return nil, err - } - } - return rr, nil } @@ -188,12 +175,31 @@ func (r *readCloserWrapper) ReadAt(p []byte, off int64) (n int, err error) { startedAt := time.Now() defer func() { if DebugMode { - fmt.Fprintf(os.Stderr, "read %s:%d-%d (%s)\n", filepath.Base(r.name), off, len(p), time.Since(startedAt)) + prefix := "[READ-UNKNOWN]" + // if has suffix .index, then it's an index file + if strings.HasSuffix(r.name, ".index") { + prefix = azureBG("[READ-INDEX]") + } + // if has suffix .car, then it's a car file + if strings.HasSuffix(r.name, ".car") { + prefix = purpleBG("[READ-CAR]") + } + fmt.Fprintf(os.Stderr, prefix+" %s:%d-%d (%s)\n", filepath.Base(r.name), off, len(p), time.Since(startedAt)) } }() return r.rac.ReadAt(p, off) } +func purpleBG(s string) string { + // blue bg, black fg + return "\033[48;5;4m\033[38;5;0m" + s + "\033[0m" +} + +func azureBG(s string) string { + // azure bg, black fg + return "\033[48;5;6m\033[38;5;0m" + s + "\033[0m" +} + // when closing print a newline func (r *readCloserWrapper) Close() error { return r.rac.Close() diff --git a/range-cache.go b/range-cache.go index e7353231..c72d3a8a 100644 --- a/range-cache.go +++ b/range-cache.go @@ -141,7 +141,7 @@ func (rc *RangeCache) GetRange(ctx context.Context, start, ln int64) ([]byte, er end := start + ln got, err := rc.getRange(ctx, start, end, func() ([]byte, error) { v := make([]byte, end-start) - debugLn(orange("cache MISS; reading from reader"), start, end, end-start) + debugLn(orange("[cache-MISS] going to read from original reader"), start, end, end-start) _, err := rc.remoteFetcher(v, start) if err == nil { cloned := clone(v) @@ -212,7 +212,7 @@ func (rc *RangeCache) getRangeFromCache(ctx context.Context, start, end int64) ( return nil, false, ctx.Err() } if r.contains(Range{start, end}) { - debugLn(lime("cache HIT for a superset of this range"), start, end, end-start) + debugLn(lime("[cache-HIT] for a superset of this range"), start, end, end-start) return clone(rc.cache[r].Value[start-r[0] : end-r[0]]), true, nil } } From c05e3e2f4161ce044385702858f449d143fd764c Mon Sep 17 00:00:00 2001 From: gagliardetto Date: Wed, 23 Aug 2023 12:24:00 +0200 Subject: [PATCH 3/5] Improve debug --- http-range.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/http-range.go b/http-range.go index b056f7b8..1da06d99 100644 --- a/http-range.go +++ b/http-range.go @@ -184,7 +184,7 @@ func (r *readCloserWrapper) ReadAt(p []byte, off int64) (n int, err error) { if strings.HasSuffix(r.name, ".car") { prefix = purpleBG("[READ-CAR]") } - fmt.Fprintf(os.Stderr, prefix+" %s:%d-%d (%s)\n", filepath.Base(r.name), off, len(p), time.Since(startedAt)) + fmt.Fprintf(os.Stderr, prefix+" %s:%d+%d (%s)\n", filepath.Base(r.name), off, len(p), time.Since(startedAt)) } }() return r.rac.ReadAt(p, off) From 0e1101a6a81a2a9de3c6b11d03c389ca5991a3d6 Mon Sep 17 00:00:00 2001 From: gagliardetto Date: Wed, 23 Aug 2023 12:40:56 +0200 Subject: [PATCH 4/5] Improve debug --- range-cache.go | 30 ++++++++++++++++++++++++++---- 1 file changed, 26 insertions(+), 4 deletions(-) diff --git a/range-cache.go b/range-cache.go index c72d3a8a..b3bdc87b 100644 --- a/range-cache.go +++ b/range-cache.go @@ -3,6 +3,7 @@ package main import ( "context" "fmt" + "os" "sync" "time" ) @@ -141,7 +142,12 @@ func (rc *RangeCache) GetRange(ctx context.Context, start, ln int64) ([]byte, er end := start + ln got, err := rc.getRange(ctx, start, end, func() ([]byte, error) { v := make([]byte, end-start) - debugLn(orange("[cache-MISS] going to read from original reader"), start, end, end-start) + debugf( + orange("[cache-MISS] going to read from original reader: start=%d end=%d len=%d\n"), + start, + end, + end-start, + ) _, err := rc.remoteFetcher(v, start) if err == nil { cloned := clone(v) @@ -160,7 +166,13 @@ func (rc *RangeCache) GetRange(ctx context.Context, start, ln int64) ([]byte, er func debugLn(a ...interface{}) { if DebugMode { - fmt.Println(a...) + fmt.Fprintln(os.Stderr, a...) + } +} + +func debugf(format string, a ...interface{}) { + if DebugMode { + fmt.Fprintf(os.Stderr, format, a...) } } @@ -202,7 +214,12 @@ func (rc *RangeCache) getRangeFromCache(ctx context.Context, start, end int64) ( return nil, false, nil } if v, ok := rc.cache[Range{start, end}]; ok { - debugLn(lime("exact cache HIT"), start, end, end-start) + debugf( + lime("exact cache HIT: start=%d end=%d len=%d\n"), + start, + end, + end-start, + ) return clone(v.Value), true, nil } { @@ -212,7 +229,12 @@ func (rc *RangeCache) getRangeFromCache(ctx context.Context, start, end int64) ( return nil, false, ctx.Err() } if r.contains(Range{start, end}) { - debugLn(lime("[cache-HIT] for a superset of this range"), start, end, end-start) + debugf( + lime("[cache-HIT] for a superset of this range: start=%d end=%d len=%d\n"), + start, + end, + end-start, + ) return clone(rc.cache[r].Value[start-r[0] : end-r[0]]), true, nil } } From a24137c066f18817322ced65c997a8c1194e692a Mon Sep 17 00:00:00 2001 From: gagliardetto Date: Wed, 23 Aug 2023 14:11:05 +0200 Subject: [PATCH 5/5] Remove one roundtrip when using remote CAR files --- epoch.go | 1 + http-range.go | 9 ++++++--- multiepoch-getBlock.go | 24 ++++++++++++++++-------- range-cache.go | 17 +++++++++++++---- 4 files changed, 36 insertions(+), 15 deletions(-) diff --git a/epoch.go b/epoch.go index 15945f1f..435548ca 100644 --- a/epoch.go +++ b/epoch.go @@ -365,6 +365,7 @@ func (ser *Epoch) FindOffsetFromCid(ctx context.Context, cid cid.Cid) (uint64, e if err != nil { return 0, err } + klog.Infof("found offset for CID %s: %d", cid, found) ser.putCidToOffsetInCache(cid, found) return found, nil } diff --git a/http-range.go b/http-range.go index 1da06d99..c66e6a0b 100644 --- a/http-range.go +++ b/http-range.go @@ -72,9 +72,12 @@ func remoteHTTPFileAsIoReaderAt(ctx context.Context, url string) (ReaderAtCloser client: newHTTPClient(), } - rc := NewRangeCache(contentLength, func(p []byte, off int64) (n int, err error) { - return remoteReadAt(rr.client, rr.url, p, off) - }) + rc := NewRangeCache( + contentLength, + filepath.Base(url), + func(p []byte, off int64) (n int, err error) { + return remoteReadAt(rr.client, rr.url, p, off) + }) rc.StartCacheGC(ctx, 1*time.Minute) rr.ca = rc diff --git a/multiepoch-getBlock.go b/multiepoch-getBlock.go index 70d1a832..33a08ef8 100644 --- a/multiepoch-getBlock.go +++ b/multiepoch-getBlock.go @@ -88,6 +88,7 @@ func (multi *MultiEpoch) handleGetBlock(ctx context.Context, conn *requestContex if err != nil { return err } + klog.Infof("%s -> %s", parentCid, blockCid) { var blockOffset, parentOffset uint64 wg := new(errgroup.Group) @@ -117,18 +118,25 @@ func (multi *MultiEpoch) handleGetBlock(ctx context.Context, conn *requestContex } length := blockOffset - parentOffset - // cap the length to 1GB - GiB := uint64(1024 * 1024 * 1024) - if length > GiB { - length = GiB + MiB := uint64(1024 * 1024) + maxSize := MiB * 100 + if length > maxSize { + length = maxSize } - klog.Infof("prefetching %d bytes from %d", length, parentOffset) - carSection, err := epochHandler.ReadAtFromCar(ctx, parentOffset, length) + + idealEntrySize := uint64(36190) + maybeOffsetOfLastEntry := parentOffset - idealEntrySize + length += idealEntrySize + + klog.Infof("prefetching CAR: start=%d length=%d (parent_offset=%d)", maybeOffsetOfLastEntry, length, parentOffset) + carSection, err := epochHandler.ReadAtFromCar(ctx, maybeOffsetOfLastEntry, length) if err != nil { return err } dr := bytes.NewReader(carSection) - + { + dr.Seek(int64(idealEntrySize), io.SeekStart) + } br := bufio.NewReader(dr) gotCid, data, err := util.ReadNode(br) @@ -393,7 +401,7 @@ func (multi *MultiEpoch) handleGetBlock(ctx context.Context, conn *requestContex blockResp.PreviousBlockhash = &parentEntryHash } } else { - klog.Infof("parent slot is in a different epoch, not implemented yet") + klog.Infof("parent slot is in a different epoch, not implemented yet (can't get previousBlockhash)") } } tim.time("get parent block") diff --git a/range-cache.go b/range-cache.go index b3bdc87b..5e3bee2d 100644 --- a/range-cache.go +++ b/range-cache.go @@ -12,6 +12,7 @@ type RangeCache struct { mu sync.RWMutex // the size of the file. size int64 + name string occupiedSpace uint64 remoteFetcher func(p []byte, off int64) (n int, err error) @@ -40,12 +41,17 @@ func (r Range) isValidFor(size int64) bool { } // NewRangeCache creates a new RangeCache. -func NewRangeCache(size int64, fetcher func(p []byte, off int64) (n int, err error)) *RangeCache { +func NewRangeCache( + size int64, + name string, + fetcher func(p []byte, off int64) (n int, err error), +) *RangeCache { if fetcher == nil { panic("fetcher must not be nil") } return &RangeCache{ size: size, + name: name, cache: make(map[Range]RangeCacheEntry), remoteFetcher: fetcher, } @@ -143,7 +149,8 @@ func (rc *RangeCache) GetRange(ctx context.Context, start, ln int64) ([]byte, er got, err := rc.getRange(ctx, start, end, func() ([]byte, error) { v := make([]byte, end-start) debugf( - orange("[cache-MISS] going to read from original reader: start=%d end=%d len=%d\n"), + orange("[cache-MISS] reading from source %s: start=%d end=%d len=%d\n"), + rc.name, start, end, end-start, @@ -215,7 +222,8 @@ func (rc *RangeCache) getRangeFromCache(ctx context.Context, start, end int64) ( } if v, ok := rc.cache[Range{start, end}]; ok { debugf( - lime("exact cache HIT: start=%d end=%d len=%d\n"), + lime("[exact-cache-HIT] for %s: start=%d end=%d len=%d\n"), + rc.name, start, end, end-start, @@ -230,7 +238,8 @@ func (rc *RangeCache) getRangeFromCache(ctx context.Context, start, end int64) ( } if r.contains(Range{start, end}) { debugf( - lime("[cache-HIT] for a superset of this range: start=%d end=%d len=%d\n"), + lime("[cache-HIT] range superset in %s: start=%d end=%d len=%d\n"), + rc.name, start, end, end-start,