Skip to content

Commit

Permalink
internal: move downloader
Browse files Browse the repository at this point in the history
  • Loading branch information
peterjan authored and ChrisSchinnerl committed Dec 4, 2024
1 parent 3e43869 commit f617914
Show file tree
Hide file tree
Showing 7 changed files with 286 additions and 266 deletions.
1 change: 0 additions & 1 deletion api/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ type (
DownloaderStats struct {
AvgSectorDownloadSpeedMBPS float64 `json:"avgSectorDownloadSpeedMbps"`
HostKey types.PublicKey `json:"hostKey"`
NumDownloads uint64 `json:"numDownloads"`
}

// UploadStatsResponse is the response type for the /stats/uploads endpoint.
Expand Down
228 changes: 169 additions & 59 deletions worker/downloader.go → internal/download/downloader/downloader.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package worker
package downloader

import (
"bytes"
Expand All @@ -18,14 +18,43 @@ import (
const (
downloadOverheadB = 284
maxConcurrentSectorsPerHost = 3
statsRecomputeMinInterval = 3 * time.Second
)

var (
errDownloaderStopped = errors.New("downloader was stopped")
ErrStopped = errors.New("downloader was stopped")
)

type (
downloader struct {
SectorDownloadReq struct {
Ctx context.Context

Length uint32
Offset uint32
Root types.Hash256
Host *Downloader

Overdrive bool
SectorIndex int
Resps *SectorResponses
}

SectorDownloadResp struct {
Req *SectorDownloadReq
Sector []byte
Err error
}

SectorResponses struct {
c chan struct{} // signal that a new response is available

mu sync.Mutex
closed bool
responses []*SectorDownloadResp
}
)
type (
Downloader struct {
host host.Downloader

statsDownloadSpeedBytesPerMS *utils.DataPoints // keep track of this separately for stats (no decay is applied)
Expand All @@ -36,14 +65,16 @@ type (

mu sync.Mutex
consecutiveFailures uint64
numDownloads uint64
queue []*sectorDownloadReq
stopped bool
lastRecompute time.Time

numDownloads uint64
queue []*SectorDownloadReq
stopped bool
}
)

func newDownloader(ctx context.Context, h host.Downloader) *downloader {
return &downloader{
func New(ctx context.Context, h host.Downloader) *Downloader {
return &Downloader{
host: h,

statsSectorDownloadEstimateInMS: utils.NewDataPoints(10 * time.Minute),
Expand All @@ -52,49 +83,26 @@ func newDownloader(ctx context.Context, h host.Downloader) *downloader {
signalWorkChan: make(chan struct{}, 1),
shutdownCtx: ctx,

queue: make([]*sectorDownloadReq, 0),
queue: make([]*SectorDownloadReq, 0),
}
}

func (d *downloader) PublicKey() types.PublicKey {
return d.host.PublicKey()
}

func (d *downloader) Stop() {
d.mu.Lock()
d.stopped = true
d.mu.Unlock()

for {
download := d.pop()
if download == nil {
break
}
if !download.done() {
download.fail(errDownloaderStopped)
}
func NewSectorResponses() *SectorResponses {
return &SectorResponses{
c: make(chan struct{}, 1),
}
}

func (d *downloader) fillBatch() (batch []*sectorDownloadReq) {
for len(batch) < maxConcurrentSectorsPerHost {
if req := d.pop(); req == nil {
break
} else if req.done() {
continue
} else {
batch = append(batch, req)
}
}
return
func (d *Downloader) AvgDownloadSpeedBytesPerMS() float64 {
return d.statsDownloadSpeedBytesPerMS.Average()
}

func (d *downloader) enqueue(download *sectorDownloadReq) {
func (d *Downloader) Enqueue(download *SectorDownloadReq) {
d.mu.Lock()
// check for stopped
if d.stopped {
d.mu.Unlock()
go download.fail(errDownloaderStopped) // don't block the caller
go download.fail(ErrStopped) // don't block the caller
return
}

Expand All @@ -109,7 +117,7 @@ func (d *downloader) enqueue(download *sectorDownloadReq) {
}
}

func (d *downloader) estimate() float64 {
func (d *Downloader) Estimate() float64 {
d.mu.Lock()
defer d.mu.Unlock()

Expand All @@ -127,10 +135,48 @@ func (d *downloader) estimate() float64 {
return numSectors * estimateP90
}

func (d *downloader) execute(req *sectorDownloadReq) (err error) {
func (d *Downloader) Healthy() bool {
d.mu.Lock()
defer d.mu.Unlock()
return d.consecutiveFailures == 0
}

func (d *Downloader) PublicKey() types.PublicKey {
return d.host.PublicKey()
}

func (d *Downloader) Stop(err error) {
d.mu.Lock()
d.stopped = true
d.mu.Unlock()

for {
download := d.pop()
if download == nil {
break
}
if !download.done() {
download.fail(err)
}
}
}

func (d *Downloader) TryRecomputeStats() {
d.mu.Lock()
defer d.mu.Unlock()
if time.Since(d.lastRecompute) < statsRecomputeMinInterval {
return
}

d.lastRecompute = time.Now()
d.statsDownloadSpeedBytesPerMS.Recompute()
d.statsSectorDownloadEstimateInMS.Recompute()
}

func (d *Downloader) execute(req *SectorDownloadReq) (err error) {
// download the sector
buf := bytes.NewBuffer(make([]byte, 0, req.length))
err = d.host.DownloadSector(req.ctx, buf, req.root, req.offset, req.length)
buf := bytes.NewBuffer(make([]byte, 0, req.Length))
err = d.host.DownloadSector(req.Ctx, buf, req.Root, req.Offset, req.Length)
if err != nil {
req.fail(err)
return err
Expand All @@ -144,7 +190,20 @@ func (d *downloader) execute(req *sectorDownloadReq) (err error) {
return nil
}

func (d *downloader) pop() *sectorDownloadReq {
func (d *Downloader) fillBatch() (batch []*SectorDownloadReq) {
for len(batch) < maxConcurrentSectorsPerHost {
if req := d.pop(); req == nil {
break
} else if req.done() {
continue
} else {
batch = append(batch, req)
}
}
return
}

func (d *Downloader) pop() *SectorDownloadReq {
d.mu.Lock()
defer d.mu.Unlock()

Expand All @@ -157,7 +216,7 @@ func (d *downloader) pop() *sectorDownloadReq {
return nil
}

func (d *downloader) processBatch(batch []*sectorDownloadReq) chan struct{} {
func (d *Downloader) processBatch(batch []*SectorDownloadReq) chan struct{} {
doneChan := make(chan struct{})

// define some state to keep track of stats
Expand All @@ -178,7 +237,7 @@ func (d *downloader) processBatch(batch []*sectorDownloadReq) chan struct{} {

// define a worker to process download requests
inflight := uint64(len(batch))
reqsChan := make(chan *sectorDownloadReq)
reqsChan := make(chan *SectorDownloadReq)
workerFn := func() {
for req := range reqsChan {
// check if we need to abort
Expand All @@ -203,7 +262,7 @@ func (d *downloader) processBatch(batch []*sectorDownloadReq) chan struct{} {
// update state + potentially track stats
mu.Lock()
if err == nil {
downloadedB += int64(req.length) + downloadOverheadB
downloadedB += int64(req.Length) + downloadOverheadB
if downloadedB >= maxConcurrentSectorsPerHost*rhpv2.SectorSize || concurrent == maxConcurrentSectorsPerHost {
trackStatsFn()
}
Expand Down Expand Up @@ -247,7 +306,7 @@ func (d *downloader) processBatch(batch []*sectorDownloadReq) chan struct{} {
return doneChan
}

func (d *downloader) processQueue() {
func (d *Downloader) Start() {
outer:
for {
// wait for work
Expand Down Expand Up @@ -278,17 +337,7 @@ outer:
}
}

func (d *downloader) stats() downloaderStats {
d.mu.Lock()
defer d.mu.Unlock()
return downloaderStats{
avgSpeedMBPS: d.statsDownloadSpeedBytesPerMS.Average() * 0.008,
healthy: d.consecutiveFailures == 0,
numDownloads: d.numDownloads,
}
}

func (d *downloader) trackFailure(err error) {
func (d *Downloader) trackFailure(err error) {
d.mu.Lock()
defer d.mu.Unlock()

Expand All @@ -307,3 +356,64 @@ func (d *downloader) trackFailure(err error) {
d.consecutiveFailures++
d.statsSectorDownloadEstimateInMS.Track(float64(time.Hour.Milliseconds()))
}

func (sr *SectorResponses) Add(resp *SectorDownloadResp) {
sr.mu.Lock()
defer sr.mu.Unlock()
if sr.closed {
return
}
sr.responses = append(sr.responses, resp)
select {
case sr.c <- struct{}{}:
default:
}
}

func (sr *SectorResponses) Close() error {
sr.mu.Lock()
defer sr.mu.Unlock()

sr.closed = true
sr.responses = nil // clear responses
close(sr.c)
return nil
}

func (sr *SectorResponses) Next() *SectorDownloadResp {
sr.mu.Lock()
defer sr.mu.Unlock()
if len(sr.responses) == 0 {
return nil
}
resp := sr.responses[0]
sr.responses = sr.responses[1:]
return resp
}

func (sr *SectorResponses) Received() chan struct{} {
return sr.c
}

func (req *SectorDownloadReq) done() bool {
select {
case <-req.Ctx.Done():
return true
default:
return false
}
}

func (req *SectorDownloadReq) fail(err error) {
req.Resps.Add(&SectorDownloadResp{
Req: req,
Err: err,
})
}

func (req *SectorDownloadReq) succeed(sector []byte) {
req.Resps.Add(&SectorDownloadResp{
Req: req,
Sector: sector,
})
}
53 changes: 53 additions & 0 deletions internal/download/downloader/downloader_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package downloader

import (
"context"
"errors"
"testing"
"time"

"go.sia.tech/core/types"
"go.sia.tech/renterd/internal/test/mocks"
)

func TestDownloaderStopped(t *testing.T) {
assertErr := func(t *testing.T, req SectorDownloadReq, expected error) {
select {
case <-req.Resps.Received():
if err := req.Resps.responses[0].Err; !errors.Is(err, expected) {
t.Fatal("unexpected error response", err)
}
case <-time.After(time.Second):
t.Fatal("no response")
}
}

t.Run("stop before enqueue", func(t *testing.T) {
hm := mocks.NewHostManager()
dl := New(context.Background(), hm.Downloader(types.PublicKey{1}, "localhost:1234"))
req := SectorDownloadReq{
Ctx: context.Background(),
Resps: NewSectorResponses(),
}

dl.Stop(errors.New(t.Name()))
dl.Enqueue(&req)

assertErr(t, req, ErrStopped)
})

t.Run("stop after enqueue", func(t *testing.T) {
hm := mocks.NewHostManager()
dl := New(context.Background(), hm.Downloader(types.PublicKey{1}, "localhost:1234"))
req := SectorDownloadReq{
Ctx: context.Background(),
Resps: NewSectorResponses(),
}

err := errors.New(t.Name())
dl.Enqueue(&req)
dl.Stop(err)

assertErr(t, req, err)
})
}
Loading

0 comments on commit f617914

Please sign in to comment.