From 4bda7cdc32e92513fac399fdaf134b8276aa271c Mon Sep 17 00:00:00 2001 From: PJ Date: Tue, 3 Dec 2024 18:03:08 +0100 Subject: [PATCH] internal: move downloader --- api/worker.go | 1 - .../download/downloader}/downloader.go | 228 +++++++++++++----- .../download/downloader/downloader_test.go | 53 ++++ worker/download.go | 219 +++++------------ worker/downloader_test.go | 36 --- worker/upload.go | 2 - worker/worker.go | 13 +- 7 files changed, 286 insertions(+), 266 deletions(-) rename {worker => internal/download/downloader}/downloader.go (59%) create mode 100644 internal/download/downloader/downloader_test.go delete mode 100644 worker/downloader_test.go diff --git a/api/worker.go b/api/worker.go index 1fe10dcd3..6b9be82e4 100644 --- a/api/worker.go +++ b/api/worker.go @@ -102,7 +102,6 @@ type ( DownloaderStats struct { AvgSectorDownloadSpeedMBPS float64 `json:"avgSectorDownloadSpeedMbps"` HostKey types.PublicKey `json:"hostKey"` - NumDownloads uint64 `json:"numDownloads"` } // UploadStatsResponse is the response type for the /stats/uploads endpoint. diff --git a/worker/downloader.go b/internal/download/downloader/downloader.go similarity index 59% rename from worker/downloader.go rename to internal/download/downloader/downloader.go index 933d15ba0..af6985b71 100644 --- a/worker/downloader.go +++ b/internal/download/downloader/downloader.go @@ -1,4 +1,4 @@ -package worker +package downloader import ( "bytes" @@ -18,14 +18,43 @@ import ( const ( downloadOverheadB = 284 maxConcurrentSectorsPerHost = 3 + statsRecomputeMinInterval = 3 * time.Second ) var ( - errDownloaderStopped = errors.New("downloader was stopped") + ErrStopped = errors.New("downloader was stopped") ) type ( - downloader struct { + SectorDownloadReq struct { + Ctx context.Context + + Length uint32 + Offset uint32 + Root types.Hash256 + Host *Downloader + + Overdrive bool + SectorIndex int + Resps *SectorResponses + } + + SectorDownloadResp struct { + Req *SectorDownloadReq + Sector []byte + Err error + } + + SectorResponses struct { + c chan struct{} // signal that a new response is available + + mu sync.Mutex + closed bool + responses []*SectorDownloadResp + } +) +type ( + Downloader struct { host host.Downloader statsDownloadSpeedBytesPerMS *utils.DataPoints // keep track of this separately for stats (no decay is applied) @@ -36,14 +65,16 @@ type ( mu sync.Mutex consecutiveFailures uint64 - numDownloads uint64 - queue []*sectorDownloadReq - stopped bool + lastRecompute time.Time + + numDownloads uint64 + queue []*SectorDownloadReq + stopped bool } ) -func newDownloader(ctx context.Context, h host.Downloader) *downloader { - return &downloader{ +func New(ctx context.Context, h host.Downloader) *Downloader { + return &Downloader{ host: h, statsSectorDownloadEstimateInMS: utils.NewDataPoints(10 * time.Minute), @@ -52,49 +83,26 @@ func newDownloader(ctx context.Context, h host.Downloader) *downloader { signalWorkChan: make(chan struct{}, 1), shutdownCtx: ctx, - queue: make([]*sectorDownloadReq, 0), + queue: make([]*SectorDownloadReq, 0), } } -func (d *downloader) PublicKey() types.PublicKey { - return d.host.PublicKey() -} - -func (d *downloader) Stop() { - d.mu.Lock() - d.stopped = true - d.mu.Unlock() - - for { - download := d.pop() - if download == nil { - break - } - if !download.done() { - download.fail(errDownloaderStopped) - } +func NewSectorResponses() *SectorResponses { + return &SectorResponses{ + c: make(chan struct{}, 1), } } -func (d *downloader) fillBatch() (batch []*sectorDownloadReq) { - for len(batch) < maxConcurrentSectorsPerHost { - if req := d.pop(); req == nil { - break - } else if req.done() { - continue - } else { - batch = append(batch, req) - } - } - return +func (d *Downloader) AvgDownloadSpeedBytesPerMS() float64 { + return d.statsDownloadSpeedBytesPerMS.Average() } -func (d *downloader) enqueue(download *sectorDownloadReq) { +func (d *Downloader) Enqueue(download *SectorDownloadReq) { d.mu.Lock() // check for stopped if d.stopped { d.mu.Unlock() - go download.fail(errDownloaderStopped) // don't block the caller + go download.fail(ErrStopped) // don't block the caller return } @@ -109,7 +117,7 @@ func (d *downloader) enqueue(download *sectorDownloadReq) { } } -func (d *downloader) estimate() float64 { +func (d *Downloader) Estimate() float64 { d.mu.Lock() defer d.mu.Unlock() @@ -127,10 +135,48 @@ func (d *downloader) estimate() float64 { return numSectors * estimateP90 } -func (d *downloader) execute(req *sectorDownloadReq) (err error) { +func (d *Downloader) Healthy() bool { + d.mu.Lock() + defer d.mu.Unlock() + return d.consecutiveFailures == 0 +} + +func (d *Downloader) PublicKey() types.PublicKey { + return d.host.PublicKey() +} + +func (d *Downloader) Stop(err error) { + d.mu.Lock() + d.stopped = true + d.mu.Unlock() + + for { + download := d.pop() + if download == nil { + break + } + if !download.done() { + download.fail(err) + } + } +} + +func (d *Downloader) TryRecomputeStats() { + d.mu.Lock() + defer d.mu.Unlock() + if time.Since(d.lastRecompute) < statsRecomputeMinInterval { + return + } + + d.lastRecompute = time.Now() + d.statsDownloadSpeedBytesPerMS.Recompute() + d.statsSectorDownloadEstimateInMS.Recompute() +} + +func (d *Downloader) execute(req *SectorDownloadReq) (err error) { // download the sector - buf := bytes.NewBuffer(make([]byte, 0, req.length)) - err = d.host.DownloadSector(req.ctx, buf, req.root, req.offset, req.length) + buf := bytes.NewBuffer(make([]byte, 0, req.Length)) + err = d.host.DownloadSector(req.Ctx, buf, req.Root, req.Offset, req.Length) if err != nil { req.fail(err) return err @@ -144,7 +190,20 @@ func (d *downloader) execute(req *sectorDownloadReq) (err error) { return nil } -func (d *downloader) pop() *sectorDownloadReq { +func (d *Downloader) fillBatch() (batch []*SectorDownloadReq) { + for len(batch) < maxConcurrentSectorsPerHost { + if req := d.pop(); req == nil { + break + } else if req.done() { + continue + } else { + batch = append(batch, req) + } + } + return +} + +func (d *Downloader) pop() *SectorDownloadReq { d.mu.Lock() defer d.mu.Unlock() @@ -157,7 +216,7 @@ func (d *downloader) pop() *sectorDownloadReq { return nil } -func (d *downloader) processBatch(batch []*sectorDownloadReq) chan struct{} { +func (d *Downloader) processBatch(batch []*SectorDownloadReq) chan struct{} { doneChan := make(chan struct{}) // define some state to keep track of stats @@ -178,7 +237,7 @@ func (d *downloader) processBatch(batch []*sectorDownloadReq) chan struct{} { // define a worker to process download requests inflight := uint64(len(batch)) - reqsChan := make(chan *sectorDownloadReq) + reqsChan := make(chan *SectorDownloadReq) workerFn := func() { for req := range reqsChan { // check if we need to abort @@ -203,7 +262,7 @@ func (d *downloader) processBatch(batch []*sectorDownloadReq) chan struct{} { // update state + potentially track stats mu.Lock() if err == nil { - downloadedB += int64(req.length) + downloadOverheadB + downloadedB += int64(req.Length) + downloadOverheadB if downloadedB >= maxConcurrentSectorsPerHost*rhpv2.SectorSize || concurrent == maxConcurrentSectorsPerHost { trackStatsFn() } @@ -247,7 +306,7 @@ func (d *downloader) processBatch(batch []*sectorDownloadReq) chan struct{} { return doneChan } -func (d *downloader) processQueue() { +func (d *Downloader) Start() { outer: for { // wait for work @@ -278,17 +337,7 @@ outer: } } -func (d *downloader) stats() downloaderStats { - d.mu.Lock() - defer d.mu.Unlock() - return downloaderStats{ - avgSpeedMBPS: d.statsDownloadSpeedBytesPerMS.Average() * 0.008, - healthy: d.consecutiveFailures == 0, - numDownloads: d.numDownloads, - } -} - -func (d *downloader) trackFailure(err error) { +func (d *Downloader) trackFailure(err error) { d.mu.Lock() defer d.mu.Unlock() @@ -307,3 +356,64 @@ func (d *downloader) trackFailure(err error) { d.consecutiveFailures++ d.statsSectorDownloadEstimateInMS.Track(float64(time.Hour.Milliseconds())) } + +func (sr *SectorResponses) Add(resp *SectorDownloadResp) { + sr.mu.Lock() + defer sr.mu.Unlock() + if sr.closed { + return + } + sr.responses = append(sr.responses, resp) + select { + case sr.c <- struct{}{}: + default: + } +} + +func (sr *SectorResponses) Close() error { + sr.mu.Lock() + defer sr.mu.Unlock() + + sr.closed = true + sr.responses = nil // clear responses + close(sr.c) + return nil +} + +func (sr *SectorResponses) Next() *SectorDownloadResp { + sr.mu.Lock() + defer sr.mu.Unlock() + if len(sr.responses) == 0 { + return nil + } + resp := sr.responses[0] + sr.responses = sr.responses[1:] + return resp +} + +func (sr *SectorResponses) Received() chan struct{} { + return sr.c +} + +func (req *SectorDownloadReq) done() bool { + select { + case <-req.Ctx.Done(): + return true + default: + return false + } +} + +func (req *SectorDownloadReq) fail(err error) { + req.Resps.Add(&SectorDownloadResp{ + Req: req, + Err: err, + }) +} + +func (req *SectorDownloadReq) succeed(sector []byte) { + req.Resps.Add(&SectorDownloadResp{ + Req: req, + Sector: sector, + }) +} diff --git a/internal/download/downloader/downloader_test.go b/internal/download/downloader/downloader_test.go new file mode 100644 index 000000000..231cbed79 --- /dev/null +++ b/internal/download/downloader/downloader_test.go @@ -0,0 +1,53 @@ +package downloader + +import ( + "context" + "errors" + "testing" + "time" + + "go.sia.tech/core/types" + "go.sia.tech/renterd/internal/test/mocks" +) + +func TestDownloaderStopped(t *testing.T) { + assertErr := func(t *testing.T, req SectorDownloadReq, expected error) { + select { + case <-req.Resps.Received(): + if err := req.Resps.responses[0].Err; !errors.Is(err, expected) { + t.Fatal("unexpected error response", err) + } + case <-time.After(time.Second): + t.Fatal("no response") + } + } + + t.Run("stop before enqueue", func(t *testing.T) { + hm := mocks.NewHostManager() + dl := New(context.Background(), hm.Downloader(types.PublicKey{1}, "localhost:1234")) + req := SectorDownloadReq{ + Ctx: context.Background(), + Resps: NewSectorResponses(), + } + + dl.Stop(errors.New(t.Name())) + dl.Enqueue(&req) + + assertErr(t, req, ErrStopped) + }) + + t.Run("stop after enqueue", func(t *testing.T) { + hm := mocks.NewHostManager() + dl := New(context.Background(), hm.Downloader(types.PublicKey{1}, "localhost:1234")) + req := SectorDownloadReq{ + Ctx: context.Background(), + Resps: NewSectorResponses(), + } + + err := errors.New(t.Name()) + dl.Enqueue(&req) + dl.Stop(err) + + assertErr(t, req, err) + }) +} diff --git a/worker/download.go b/worker/download.go index afc22b189..f3956f84b 100644 --- a/worker/download.go +++ b/worker/download.go @@ -14,6 +14,7 @@ import ( rhpv2 "go.sia.tech/core/rhp/v2" "go.sia.tech/core/types" "go.sia.tech/renterd/api" + "go.sia.tech/renterd/internal/download/downloader" "go.sia.tech/renterd/internal/host" "go.sia.tech/renterd/internal/memory" rhp3 "go.sia.tech/renterd/internal/rhp/v3" @@ -27,6 +28,7 @@ const ( ) var ( + errHostNoLongerUsable = errors.New("host no longer usable") errDownloadNotEnoughHosts = errors.New("not enough hosts available to download the slab") errDownloadCancelled = errors.New("download was cancelled") ) @@ -47,15 +49,8 @@ type ( shutdownCtx context.Context - mu sync.Mutex - downloaders map[types.PublicKey]*downloader - lastRecompute time.Time - } - - downloaderStats struct { - avgSpeedMBPS float64 - healthy bool - numDownloads uint64 + mu sync.Mutex + downloaders map[types.PublicKey]*downloader.Downloader } slabDownload struct { @@ -85,32 +80,6 @@ type ( err error } - sectorDownloadReq struct { - ctx context.Context - - length uint32 - offset uint32 - root types.Hash256 - host *downloader - - overdrive bool - sectorIndex int - resps *sectorResponses - } - - sectorDownloadResp struct { - req *sectorDownloadReq - sector []byte - err error - } - - sectorResponses struct { - mu sync.Mutex - closed bool - responses []*sectorDownloadResp - c chan struct{} // signal that a new response is available - } - sectorInfo struct { root types.Hash256 data []byte @@ -122,7 +91,9 @@ type ( downloadManagerStats struct { avgDownloadSpeedMBPS float64 avgOverdrivePct float64 - downloaders map[types.PublicKey]downloaderStats + healthyDownloaders uint64 + numDownloaders uint64 + downloadSpeedsMBPS map[types.PublicKey]float64 } ) @@ -160,7 +131,7 @@ func newDownloadManager(ctx context.Context, uploadKey *utils.UploadKey, hm host shutdownCtx: ctx, - downloaders: make(map[types.PublicKey]*downloader), + downloaders: make(map[types.PublicKey]*downloader.Downloader), } } @@ -417,22 +388,25 @@ func (mgr *downloadManager) DownloadSlab(ctx context.Context, slab object.Slab, } func (mgr *downloadManager) Stats() downloadManagerStats { - // recompute stats - mgr.tryRecomputeStats() - mgr.mu.Lock() defer mgr.mu.Unlock() // collect stats - stats := make(map[types.PublicKey]downloaderStats) - for hk, d := range mgr.downloaders { - stats[hk] = d.stats() + var numHealthy uint64 + speeds := make(map[types.PublicKey]float64) + for _, d := range mgr.downloaders { + speeds[d.PublicKey()] = d.AvgDownloadSpeedBytesPerMS() + if d.Healthy() { + numHealthy++ + } } return downloadManagerStats{ avgDownloadSpeedMBPS: mgr.statsSlabDownloadSpeedBytesPerMS.Average() * 0.008, // convert bytes per ms to mbps, avgOverdrivePct: mgr.statsOverdrivePct.Average(), - downloaders: stats, + healthyDownloaders: numHealthy, + numDownloaders: uint64(len(mgr.downloaders)), + downloadSpeedsMBPS: speeds, } } @@ -440,22 +414,8 @@ func (mgr *downloadManager) Stop() { mgr.mu.Lock() defer mgr.mu.Unlock() for _, d := range mgr.downloaders { - d.Stop() - } -} - -func (mgr *downloadManager) tryRecomputeStats() { - mgr.mu.Lock() - defer mgr.mu.Unlock() - if time.Since(mgr.lastRecompute) < statsRecomputeMinInterval { - return - } - - for _, d := range mgr.downloaders { - d.statsSectorDownloadEstimateInMS.Recompute() - d.statsDownloadSpeedBytesPerMS.Recompute() + d.Stop(ErrShuttingDown) } - mgr.lastRecompute = time.Now() } func (mgr *downloadManager) numDownloaders() int { @@ -487,27 +447,31 @@ func (mgr *downloadManager) refreshDownloaders(hosts []api.HostInfo) { defer mgr.mu.Unlock() // build map - want := make(map[types.PublicKey]string) + want := make(map[types.PublicKey]api.HostInfo) for _, h := range hosts { - want[h.PublicKey] = h.SiamuxAddr + want[h.PublicKey] = h } // prune downloaders for hk := range mgr.downloaders { _, wanted := want[hk] if !wanted { - mgr.downloaders[hk].Stop() + mgr.downloaders[hk].Stop(errHostNoLongerUsable) delete(mgr.downloaders, hk) continue } - delete(want, hk) // remove from want so remainging ones are the missing ones + // recompute the stats + mgr.downloaders[hk].TryRecomputeStats() + + // remove from want so remainging ones are the missing ones + delete(want, hk) } // update downloaders - for hk, siamuxAddr := range want { - mgr.downloaders[hk] = newDownloader(mgr.shutdownCtx, mgr.hm.Downloader(hk, siamuxAddr)) - go mgr.downloaders[hk].processQueue() + for hk, hi := range want { + mgr.downloaders[hk] = downloader.New(mgr.shutdownCtx, mgr.hm.Downloader(hk, hi.SiamuxAddr)) + go mgr.downloaders[hk].Start() } } @@ -552,30 +516,7 @@ func (mgr *downloadManager) downloadSlab(ctx context.Context, slice object.SlabS return slab.download(ctx) } -func (req *sectorDownloadReq) succeed(sector []byte) { - req.resps.Add(§orDownloadResp{ - req: req, - sector: sector, - }) -} - -func (req *sectorDownloadReq) fail(err error) { - req.resps.Add(§orDownloadResp{ - req: req, - err: err, - }) -} - -func (req *sectorDownloadReq) done() bool { - select { - case <-req.ctx.Done(): - return true - default: - return false - } -} - -func (s *slabDownload) overdrive(ctx context.Context, resps *sectorResponses) (resetTimer func()) { +func (s *slabDownload) overdrive(ctx context.Context, resps *downloader.SectorResponses) (resetTimer func()) { // overdrive is disabled if s.mgr.overdriveTimeout == 0 { return func() {} @@ -640,7 +581,7 @@ func (s *slabDownload) overdrive(ctx context.Context, resps *sectorResponses) (r return } -func (s *slabDownload) nextRequest(ctx context.Context, resps *sectorResponses, overdrive bool) *sectorDownloadReq { +func (s *slabDownload) nextRequest(ctx context.Context, resps *downloader.SectorResponses, overdrive bool) *downloader.SectorDownloadReq { s.mu.Lock() defer s.mu.Unlock() @@ -671,7 +612,7 @@ func (s *slabDownload) nextRequest(ctx context.Context, resps *sectorResponses, return pending[i].selected < pending[j].selected } // both have been selected the same number of times, pick the faster one - return iFastest.estimate() < jFastest.estimate() + return iFastest.Estimate() < jFastest.Estimate() }) for _, next := range pending { @@ -682,17 +623,17 @@ func (s *slabDownload) nextRequest(ctx context.Context, resps *sectorResponses, continue } next.selectHost(fastest.PublicKey()) - return §orDownloadReq{ - ctx: ctx, + return &downloader.SectorDownloadReq{ + Ctx: ctx, - offset: s.offset, - length: s.length, - root: next.root, - host: fastest, + Offset: s.offset, + Length: s.length, + Root: next.root, + Host: fastest, - overdrive: overdrive, - sectorIndex: next.index, - resps: resps, + Overdrive: overdrive, + SectorIndex: next.index, + Resps: resps, } } @@ -708,9 +649,7 @@ func (s *slabDownload) download(ctx context.Context) ([][]byte, error) { defer cancel() // create the responses queue - resps := §orResponses{ - c: make(chan struct{}, 1), - } + resps := downloader.NewSectorResponses() defer resps.Close() // launch overdrive @@ -734,7 +673,7 @@ func (s *slabDownload) download(ctx context.Context) ([][]byte, error) { return nil, errors.New("download stopped") case <-ctx.Done(): return nil, context.Cause(ctx) - case <-resps.c: + case <-resps.Received(): resetOverdrive() } @@ -751,16 +690,16 @@ func (s *slabDownload) download(ctx context.Context) ([][]byte, error) { } // handle errors - if resp.err != nil { + if resp.Err != nil { // launch replacement request - if req := s.nextRequest(ctx, resps, resp.req.overdrive); req != nil { + if req := s.nextRequest(ctx, resps, resp.Req.Overdrive); req != nil { s.launch(req) } // handle lost sectors - if rhp3.IsSectorNotFound(resp.err) { - if err := s.mgr.os.DeleteHostSector(ctx, resp.req.host.PublicKey(), resp.req.root); err != nil { - s.mgr.logger.Errorw("failed to mark sector as lost", "hk", resp.req.host.PublicKey(), "root", resp.req.root, zap.Error(err)) + if rhp3.IsSectorNotFound(resp.Err) { + if err := s.mgr.os.DeleteHostSector(ctx, resp.Req.Host.PublicKey(), resp.Req.Root); err != nil { + s.mgr.logger.Errorw("failed to mark sector as lost", "hk", resp.Req.Host.PublicKey(), "root", resp.Req.Root, zap.Error(err)) } } } @@ -816,58 +755,54 @@ func (s *slabDownload) inflight() uint64 { return s.numInflight } -func (s *slabDownload) launch(req *sectorDownloadReq) { +func (s *slabDownload) launch(req *downloader.SectorDownloadReq) { s.mu.Lock() defer s.mu.Unlock() // queue the request - req.host.enqueue(req) + req.Host.Enqueue(req) // update the state s.numInflight++ - if req.overdrive { + if req.Overdrive { s.numOverdriving++ } s.numLaunched++ } -func (s *slabDownload) receive(resp sectorDownloadResp) (finished bool) { +func (s *slabDownload) receive(resp downloader.SectorDownloadResp) (finished bool) { s.mu.Lock() defer s.mu.Unlock() // update num overdriving - if resp.req.overdrive { + if resp.Req.Overdrive { s.numOverdriving-- } // failed reqs can't complete the upload s.numInflight-- - if resp.err != nil { - s.errs[resp.req.host.PublicKey()] = resp.err + if resp.Err != nil { + s.errs[resp.Req.Host.PublicKey()] = resp.Err return false } // store the sector - if len(s.sectors[resp.req.sectorIndex].data) == 0 { - s.sectors[resp.req.sectorIndex].data = resp.sector + if len(s.sectors[resp.Req.SectorIndex].data) == 0 { + s.sectors[resp.Req.SectorIndex].data = resp.Sector s.numCompleted++ } return s.numCompleted >= s.minShards } -func (mgr *downloadManager) fastest(hosts []types.PublicKey) (fastest *downloader) { - // recompute stats - mgr.tryRecomputeStats() - - // return the fastest host +func (mgr *downloadManager) fastest(hosts []types.PublicKey) (fastest *downloader.Downloader) { mgr.mu.Lock() defer mgr.mu.Unlock() lowest := math.MaxFloat64 for _, h := range hosts { if d, ok := mgr.downloaders[h]; !ok { continue - } else if estimate := d.estimate(); estimate < lowest { + } else if estimate := d.Estimate(); estimate < lowest { lowest = estimate fastest = d } @@ -917,40 +852,6 @@ func slabsForDownload(slabs []slabSlice, offset, length uint64) []slabSlice { return slabs } -func (sr *sectorResponses) Add(resp *sectorDownloadResp) { - sr.mu.Lock() - defer sr.mu.Unlock() - if sr.closed { - return - } - sr.responses = append(sr.responses, resp) - select { - case sr.c <- struct{}{}: - default: - } -} - -func (sr *sectorResponses) Close() error { - sr.mu.Lock() - defer sr.mu.Unlock() - - sr.closed = true - sr.responses = nil // clear responses - close(sr.c) - return nil -} - -func (sr *sectorResponses) Next() *sectorDownloadResp { - sr.mu.Lock() - defer sr.mu.Unlock() - if len(sr.responses) == 0 { - return nil - } - resp := sr.responses[0] - sr.responses = sr.responses[1:] - return resp -} - func isSectorAvailable(s object.Sector, hosts map[types.PublicKey]struct{}) bool { // if any of the other hosts that store the sector are // available, the sector is also considered available diff --git a/worker/downloader_test.go b/worker/downloader_test.go deleted file mode 100644 index ca004582b..000000000 --- a/worker/downloader_test.go +++ /dev/null @@ -1,36 +0,0 @@ -package worker - -import ( - "errors" - "testing" - "time" -) - -func TestDownloaderStopped(t *testing.T) { - w := newTestWorker(t) - hosts := w.AddHosts(1) - - // convenience variables - dm := w.downloadManager - h := hosts[0] - - dm.refreshDownloaders(w.UsableHosts()) - dl := w.downloadManager.downloaders[h.PublicKey()] - dl.Stop() - - req := sectorDownloadReq{ - resps: §orResponses{ - c: make(chan struct{}, 1), - }, - } - dl.enqueue(&req) - - select { - case <-req.resps.c: - if err := req.resps.responses[0].err; !errors.Is(err, errDownloaderStopped) { - t.Fatal("unexpected error response", err) - } - case <-time.After(10 * time.Second): - t.Fatal("no response") - } -} diff --git a/worker/upload.go b/worker/upload.go index 845b60fbb..468368b07 100644 --- a/worker/upload.go +++ b/worker/upload.go @@ -26,8 +26,6 @@ import ( ) const ( - statsRecomputeMinInterval = 3 * time.Second - defaultPackedSlabsLockDuration = 10 * time.Minute defaultPackedSlabsUploadTimeout = 10 * time.Minute ) diff --git a/worker/worker.go b/worker/worker.go index 67bf36b45..f6ebd4955 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -275,16 +275,11 @@ func (w *Worker) downloadsStatsHandlerGET(jc jape.Context) { stats := w.downloadManager.Stats() // prepare downloaders stats - var healthy uint64 var dss []api.DownloaderStats - for hk, stat := range stats.downloaders { - if stat.healthy { - healthy++ - } + for hk, mbps := range stats.downloadSpeedsMBPS { dss = append(dss, api.DownloaderStats{ HostKey: hk, - AvgSectorDownloadSpeedMBPS: stat.avgSpeedMBPS, - NumDownloads: stat.numDownloads, + AvgSectorDownloadSpeedMBPS: mbps, }) } sort.SliceStable(dss, func(i, j int) bool { @@ -295,8 +290,8 @@ func (w *Worker) downloadsStatsHandlerGET(jc jape.Context) { api.WriteResponse(jc, api.DownloadStatsResponse{ AvgDownloadSpeedMBPS: math.Ceil(stats.avgDownloadSpeedMBPS*100) / 100, AvgOverdrivePct: math.Floor(stats.avgOverdrivePct*100*100) / 100, - HealthyDownloaders: healthy, - NumDownloaders: uint64(len(stats.downloaders)), + HealthyDownloaders: stats.healthyDownloaders, + NumDownloaders: stats.numDownloaders, DownloadersStats: dss, }) }