Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move downloader to internal package #1715

Merged
merged 1 commit into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion api/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ type (
DownloaderStats struct {
AvgSectorDownloadSpeedMBPS float64 `json:"avgSectorDownloadSpeedMbps"`
HostKey types.PublicKey `json:"hostKey"`
NumDownloads uint64 `json:"numDownloads"`
}

// UploadStatsResponse is the response type for the /stats/uploads endpoint.
Expand Down
228 changes: 169 additions & 59 deletions worker/downloader.go → internal/download/downloader/downloader.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package worker
package downloader

import (
"bytes"
Expand All @@ -18,14 +18,43 @@ import (
const (
downloadOverheadB = 284
maxConcurrentSectorsPerHost = 3
statsRecomputeMinInterval = 3 * time.Second
)

var (
errDownloaderStopped = errors.New("downloader was stopped")
ErrStopped = errors.New("downloader was stopped")
)

type (
downloader struct {
SectorDownloadReq struct {
Ctx context.Context

Length uint32
Offset uint32
Root types.Hash256
Host *Downloader

Overdrive bool
SectorIndex int
Resps *SectorResponses
}

SectorDownloadResp struct {
Req *SectorDownloadReq
Sector []byte
Err error
}

SectorResponses struct {
c chan struct{} // signal that a new response is available

mu sync.Mutex
closed bool
responses []*SectorDownloadResp
}
)
type (
Downloader struct {
host host.Downloader

statsDownloadSpeedBytesPerMS *utils.DataPoints // keep track of this separately for stats (no decay is applied)
Expand All @@ -36,14 +65,16 @@ type (

mu sync.Mutex
consecutiveFailures uint64
numDownloads uint64
queue []*sectorDownloadReq
stopped bool
lastRecompute time.Time

numDownloads uint64
queue []*SectorDownloadReq
stopped bool
}
)

func newDownloader(ctx context.Context, h host.Downloader) *downloader {
return &downloader{
func New(ctx context.Context, h host.Downloader) *Downloader {
return &Downloader{
host: h,

statsSectorDownloadEstimateInMS: utils.NewDataPoints(10 * time.Minute),
Expand All @@ -52,49 +83,26 @@ func newDownloader(ctx context.Context, h host.Downloader) *downloader {
signalWorkChan: make(chan struct{}, 1),
shutdownCtx: ctx,

queue: make([]*sectorDownloadReq, 0),
queue: make([]*SectorDownloadReq, 0),
}
}

func (d *downloader) PublicKey() types.PublicKey {
return d.host.PublicKey()
}

func (d *downloader) Stop() {
d.mu.Lock()
d.stopped = true
d.mu.Unlock()

for {
download := d.pop()
if download == nil {
break
}
if !download.done() {
download.fail(errDownloaderStopped)
}
func NewSectorResponses() *SectorResponses {
return &SectorResponses{
c: make(chan struct{}, 1),
}
}

func (d *downloader) fillBatch() (batch []*sectorDownloadReq) {
for len(batch) < maxConcurrentSectorsPerHost {
if req := d.pop(); req == nil {
break
} else if req.done() {
continue
} else {
batch = append(batch, req)
}
}
return
func (d *Downloader) AvgDownloadSpeedBytesPerMS() float64 {
return d.statsDownloadSpeedBytesPerMS.Average()
}

func (d *downloader) enqueue(download *sectorDownloadReq) {
func (d *Downloader) Enqueue(download *SectorDownloadReq) {
d.mu.Lock()
// check for stopped
if d.stopped {
d.mu.Unlock()
go download.fail(errDownloaderStopped) // don't block the caller
go download.fail(ErrStopped) // don't block the caller
return
}

Expand All @@ -109,7 +117,7 @@ func (d *downloader) enqueue(download *sectorDownloadReq) {
}
}

func (d *downloader) estimate() float64 {
func (d *Downloader) Estimate() float64 {
d.mu.Lock()
defer d.mu.Unlock()

Expand All @@ -127,10 +135,48 @@ func (d *downloader) estimate() float64 {
return numSectors * estimateP90
}

func (d *downloader) execute(req *sectorDownloadReq) (err error) {
func (d *Downloader) Healthy() bool {
d.mu.Lock()
defer d.mu.Unlock()
return d.consecutiveFailures == 0
}

func (d *Downloader) PublicKey() types.PublicKey {
return d.host.PublicKey()
}

func (d *Downloader) Stop(err error) {
d.mu.Lock()
d.stopped = true
d.mu.Unlock()

for {
download := d.pop()
if download == nil {
break
}
if !download.done() {
download.fail(err)
}
}
}

func (d *Downloader) TryRecomputeStats() {
d.mu.Lock()
defer d.mu.Unlock()
if time.Since(d.lastRecompute) < statsRecomputeMinInterval {
return
}

d.lastRecompute = time.Now()
d.statsDownloadSpeedBytesPerMS.Recompute()
d.statsSectorDownloadEstimateInMS.Recompute()
}

func (d *Downloader) execute(req *SectorDownloadReq) (err error) {
// download the sector
buf := bytes.NewBuffer(make([]byte, 0, req.length))
err = d.host.DownloadSector(req.ctx, buf, req.root, req.offset, req.length)
buf := bytes.NewBuffer(make([]byte, 0, req.Length))
err = d.host.DownloadSector(req.Ctx, buf, req.Root, req.Offset, req.Length)
if err != nil {
req.fail(err)
return err
Expand All @@ -144,7 +190,20 @@ func (d *downloader) execute(req *sectorDownloadReq) (err error) {
return nil
}

func (d *downloader) pop() *sectorDownloadReq {
func (d *Downloader) fillBatch() (batch []*SectorDownloadReq) {
for len(batch) < maxConcurrentSectorsPerHost {
if req := d.pop(); req == nil {
break
} else if req.done() {
continue
} else {
batch = append(batch, req)
}
}
return
}

func (d *Downloader) pop() *SectorDownloadReq {
d.mu.Lock()
defer d.mu.Unlock()

Expand All @@ -157,7 +216,7 @@ func (d *downloader) pop() *sectorDownloadReq {
return nil
}

func (d *downloader) processBatch(batch []*sectorDownloadReq) chan struct{} {
func (d *Downloader) processBatch(batch []*SectorDownloadReq) chan struct{} {
doneChan := make(chan struct{})

// define some state to keep track of stats
Expand All @@ -178,7 +237,7 @@ func (d *downloader) processBatch(batch []*sectorDownloadReq) chan struct{} {

// define a worker to process download requests
inflight := uint64(len(batch))
reqsChan := make(chan *sectorDownloadReq)
reqsChan := make(chan *SectorDownloadReq)
workerFn := func() {
for req := range reqsChan {
// check if we need to abort
Expand All @@ -203,7 +262,7 @@ func (d *downloader) processBatch(batch []*sectorDownloadReq) chan struct{} {
// update state + potentially track stats
mu.Lock()
if err == nil {
downloadedB += int64(req.length) + downloadOverheadB
downloadedB += int64(req.Length) + downloadOverheadB
if downloadedB >= maxConcurrentSectorsPerHost*rhpv2.SectorSize || concurrent == maxConcurrentSectorsPerHost {
trackStatsFn()
}
Expand Down Expand Up @@ -247,7 +306,7 @@ func (d *downloader) processBatch(batch []*sectorDownloadReq) chan struct{} {
return doneChan
}

func (d *downloader) processQueue() {
func (d *Downloader) Start() {
outer:
for {
// wait for work
Expand Down Expand Up @@ -278,17 +337,7 @@ outer:
}
}

func (d *downloader) stats() downloaderStats {
d.mu.Lock()
defer d.mu.Unlock()
return downloaderStats{
avgSpeedMBPS: d.statsDownloadSpeedBytesPerMS.Average() * 0.008,
healthy: d.consecutiveFailures == 0,
numDownloads: d.numDownloads,
}
}

func (d *downloader) trackFailure(err error) {
func (d *Downloader) trackFailure(err error) {
d.mu.Lock()
defer d.mu.Unlock()

Expand All @@ -307,3 +356,64 @@ func (d *downloader) trackFailure(err error) {
d.consecutiveFailures++
d.statsSectorDownloadEstimateInMS.Track(float64(time.Hour.Milliseconds()))
}

func (sr *SectorResponses) Add(resp *SectorDownloadResp) {
sr.mu.Lock()
defer sr.mu.Unlock()
if sr.closed {
return
}
sr.responses = append(sr.responses, resp)
select {
case sr.c <- struct{}{}:
default:
}
}

func (sr *SectorResponses) Close() error {
sr.mu.Lock()
defer sr.mu.Unlock()

sr.closed = true
sr.responses = nil // clear responses
close(sr.c)
return nil
}

func (sr *SectorResponses) Next() *SectorDownloadResp {
sr.mu.Lock()
defer sr.mu.Unlock()
if len(sr.responses) == 0 {
return nil
}
resp := sr.responses[0]
sr.responses = sr.responses[1:]
return resp
}

func (sr *SectorResponses) Received() chan struct{} {
return sr.c
}

func (req *SectorDownloadReq) done() bool {
select {
case <-req.Ctx.Done():
return true
default:
return false
}
}

func (req *SectorDownloadReq) fail(err error) {
req.Resps.Add(&SectorDownloadResp{
Req: req,
Err: err,
})
}

func (req *SectorDownloadReq) succeed(sector []byte) {
req.Resps.Add(&SectorDownloadResp{
Req: req,
Sector: sector,
})
}
53 changes: 53 additions & 0 deletions internal/download/downloader/downloader_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package downloader

import (
"context"
"errors"
"testing"
"time"

"go.sia.tech/core/types"
"go.sia.tech/renterd/internal/test/mocks"
)

func TestDownloaderStopped(t *testing.T) {
ChrisSchinnerl marked this conversation as resolved.
Show resolved Hide resolved
assertErr := func(t *testing.T, req SectorDownloadReq, expected error) {
select {
case <-req.Resps.Received():
if err := req.Resps.responses[0].Err; !errors.Is(err, expected) {
t.Fatal("unexpected error response", err)
}
case <-time.After(time.Second):
t.Fatal("no response")
}
}

t.Run("stop before enqueue", func(t *testing.T) {
hm := mocks.NewHostManager()
dl := New(context.Background(), hm.Downloader(types.PublicKey{1}, "localhost:1234"))
req := SectorDownloadReq{
Ctx: context.Background(),
Resps: NewSectorResponses(),
}

dl.Stop(errors.New(t.Name()))
dl.Enqueue(&req)

assertErr(t, req, ErrStopped)
})

t.Run("stop after enqueue", func(t *testing.T) {
hm := mocks.NewHostManager()
dl := New(context.Background(), hm.Downloader(types.PublicKey{1}, "localhost:1234"))
req := SectorDownloadReq{
Ctx: context.Background(),
Resps: NewSectorResponses(),
}

err := errors.New(t.Name())
dl.Enqueue(&req)
dl.Stop(err)

assertErr(t, req, err)
})
}
Loading
Loading