Skip to content

Commit

Permalink
Merge branch 'dev' of https://github.com/SiaFoundation/renterd into p…
Browse files Browse the repository at this point in the history
…j/shutdown-err
  • Loading branch information
peterjan committed Jan 12, 2024
2 parents 37a5c0b + a770b85 commit a99112d
Showing 1 changed file with 61 additions and 61 deletions.
122 changes: 61 additions & 61 deletions worker/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,52 @@ func (d *downloader) Stop(err error) {
}
}

func (d *downloader) stats() downloaderStats {
func (d *downloader) enqueue(download *sectorDownloadReq) {
// enqueue the job
d.mu.Lock()
d.queue = append(d.queue, download)
d.mu.Unlock()

// signal there's work
select {
case d.signalWorkChan <- struct{}{}:
default:
}
}

func (d *downloader) estimate() float64 {
d.mu.Lock()
defer d.mu.Unlock()
return downloaderStats{
avgSpeedMBPS: d.statsDownloadSpeedBytesPerMS.Average() * 0.008,
healthy: d.consecutiveFailures == 0,
numDownloads: d.numDownloads,

// fetch estimated duration per sector
estimateP90 := d.statsSectorDownloadEstimateInMS.P90()
if estimateP90 == 0 {
if avg := d.statsSectorDownloadEstimateInMS.Average(); avg > 0 {
estimateP90 = avg
} else {
estimateP90 = 1
}
}

numSectors := float64(len(d.queue) + 1)
return numSectors * estimateP90
}

func (d *downloader) execute(req *sectorDownloadReq) (err error) {
// download the sector
buf := bytes.NewBuffer(make([]byte, 0, req.length))
err = d.host.DownloadSector(req.ctx, buf, req.root, req.offset, req.length, req.overpay)
if err != nil {
req.fail(err)
return err
}

d.mu.Lock()
d.numDownloads++
d.mu.Unlock()

req.succeed(buf.Bytes())
return nil
}

func (d *downloader) fillBatch() (batch []*sectorDownloadReq) {
Expand All @@ -79,6 +117,19 @@ func (d *downloader) fillBatch() (batch []*sectorDownloadReq) {
return
}

func (d *downloader) pop() *sectorDownloadReq {
d.mu.Lock()
defer d.mu.Unlock()

if len(d.queue) > 0 {
j := d.queue[0]
d.queue[0] = nil
d.queue = d.queue[1:]
return j
}
return nil
}

func (d *downloader) processBatch(batch []*sectorDownloadReq) chan struct{} {
doneChan := make(chan struct{})

Expand Down Expand Up @@ -183,48 +234,14 @@ outer:
}
}

func (d *downloader) estimate() float64 {
d.mu.Lock()
defer d.mu.Unlock()

// fetch estimated duration per sector
estimateP90 := d.statsSectorDownloadEstimateInMS.P90()
if estimateP90 == 0 {
if avg := d.statsSectorDownloadEstimateInMS.Average(); avg > 0 {
estimateP90 = avg
} else {
estimateP90 = 1
}
}

numSectors := float64(len(d.queue) + 1)
return numSectors * estimateP90
}

func (d *downloader) enqueue(download *sectorDownloadReq) {
// enqueue the job
d.mu.Lock()
d.queue = append(d.queue, download)
d.mu.Unlock()

// signal there's work
select {
case d.signalWorkChan <- struct{}{}:
default:
}
}

func (d *downloader) pop() *sectorDownloadReq {
func (d *downloader) stats() downloaderStats {
d.mu.Lock()
defer d.mu.Unlock()

if len(d.queue) > 0 {
j := d.queue[0]
d.queue[0] = nil
d.queue = d.queue[1:]
return j
return downloaderStats{
avgSpeedMBPS: d.statsDownloadSpeedBytesPerMS.Average() * 0.008,
healthy: d.consecutiveFailures == 0,
numDownloads: d.numDownloads,
}
return nil
}

func (d *downloader) trackFailure(err error) {
Expand All @@ -246,20 +263,3 @@ func (d *downloader) trackFailure(err error) {
d.consecutiveFailures++
d.statsSectorDownloadEstimateInMS.Track(float64(time.Hour.Milliseconds()))
}

func (d *downloader) execute(req *sectorDownloadReq) (err error) {
// download the sector
buf := bytes.NewBuffer(make([]byte, 0, req.length))
err = d.host.DownloadSector(req.ctx, buf, req.root, req.offset, req.length, req.overpay)
if err != nil {
req.fail(err)
return err
}

d.mu.Lock()
d.numDownloads++
d.mu.Unlock()

req.succeed(buf.Bytes())
return nil
}

0 comments on commit a99112d

Please sign in to comment.