Skip to content

Commit

Permalink
worker: add TODO
Browse files Browse the repository at this point in the history
  • Loading branch information
peterjan committed Jan 12, 2024
1 parent a99112d commit 51bbfe2
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 26 deletions.
2 changes: 2 additions & 0 deletions cmd/renterd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -623,6 +623,8 @@ func main() {
logger.Fatal("Fatal autopilot error: " + err.Error())
}

// TODO: maybe we revisit this logic
//
// Give each service a fraction of the total shutdown timeout. One service
// timing out shouldn't prevent the others from attempting a shutdown.
timeout := cfg.ShutdownTimeout / time.Duration(len(shutdownFns))
Expand Down
33 changes: 19 additions & 14 deletions worker/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ const (
)

var (
errDownloadInterrupted = errors.New("download was interrupted")
errDownloadNotEnoughHosts = errors.New("not enough hosts available to download the slab")
errHostNoLongerUsed = errors.New("host no longer used")
)
Expand Down Expand Up @@ -320,7 +321,7 @@ outer:
var resp *slabDownloadResponse
select {
case <-ctx.Done():
return fmt.Errorf("download failed; %w", ctx.Err())
return errDownloadInterrupted
case resp = <-responseChan:
}

Expand Down Expand Up @@ -454,26 +455,15 @@ func (mgr *downloadManager) Stats() downloadManagerStats {
}

func (mgr *downloadManager) Stop(ctx context.Context) {
// wait on all ongoing downloads to finish
doneChan := make(chan struct{})
go func() {
mgr.wg.Wait()
close(doneChan)
}()

// allow the context to interrupt the wait
select {
case <-ctx.Done():
case <-doneChan:
}
// wait on ongoing downloads, ctx is respected
mgr.waitForOngoingDownloads(ctx)

// stop all downloaders
mgr.mu.Lock()
defer mgr.mu.Unlock()
for _, d := range mgr.downloaders {
d.Stop(context.Cause(ctx))
}
mgr.downloaders = nil
}

func (mgr *downloadManager) tryRecomputeStats() {
Expand Down Expand Up @@ -612,6 +602,21 @@ func (mgr *downloadManager) launch(req *sectorDownloadReq) error {
return nil
}

func (mgr *downloadManager) waitForOngoingDownloads(ctx context.Context) {
// wait on all ongoing downloads to finish
doneChan := make(chan struct{})
go func() {
mgr.wg.Wait()
close(doneChan)
}()

// allow the context to interrupt the wait
select {
case <-ctx.Done():
case <-doneChan:
}
}

func (req *sectorDownloadReq) succeed(sector []byte) {
req.resps.Add(&sectorDownloadResp{
req: req,
Expand Down
29 changes: 17 additions & 12 deletions worker/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,18 +386,8 @@ func (mgr *uploadManager) Stats() uploadManagerStats {
}

func (mgr *uploadManager) Stop(ctx context.Context) {
// wait on all ongoing uploads to finish
doneChan := make(chan struct{})
go func() {
mgr.wg.Wait()
close(doneChan)
}()

// allow the context to interrupt the wait
select {
case <-ctx.Done():
case <-doneChan:
}
// wait on ongoing uploads, ctx is respected
mgr.waitForOngoingUploads(ctx)

// stop uploaders
mgr.mu.Lock()
Expand Down Expand Up @@ -792,6 +782,21 @@ func (mgr *uploadManager) refreshUploaders(contracts []api.ContractMetadata, bh
mgr.uploaders = refreshed
}

func (mgr *uploadManager) waitForOngoingUploads(ctx context.Context) {
// wait on all ongoing uploads to finish
doneChan := make(chan struct{})
go func() {
mgr.wg.Wait()
close(doneChan)
}()

// allow the context to interrupt the wait
select {
case <-ctx.Done():
case <-doneChan:
}
}

func (u *upload) newSlabUpload(ctx context.Context, shards [][]byte, uploaders []*uploader, mem Memory, maxOverdrive uint64) (*slabUpload, chan sectorUploadResp) {
// prepare response channel
responseChan := make(chan sectorUploadResp)
Expand Down

0 comments on commit 51bbfe2

Please sign in to comment.