Skip to content

Commit

Permalink
Fix Upload Overdrive (#702)
Browse files Browse the repository at this point in the history
* worker: fix upload overdrive

* worker: handle nil req in launch

* worker: print num errors
  • Loading branch information
peterjan authored Nov 2, 2023
1 parent a2cd963 commit 63f5e2a
Showing 1 changed file with 23 additions and 13 deletions.
36 changes: 23 additions & 13 deletions worker/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -1071,7 +1071,7 @@ func (u *upload) uploadShards(ctx context.Context, shards [][]byte, nextSlabChan

// launch all shard uploads
for _, upload := range requests {
if err := slab.launch(upload); err != nil {
if _, err := slab.launch(upload); err != nil {
return nil, err
}
}
Expand Down Expand Up @@ -1109,9 +1109,11 @@ func (u *upload) uploadShards(ctx context.Context, shards [][]byte, nextSlabChan

// relaunch non-overdrive uploads
if !done && resp.err != nil && !resp.req.overdrive {
if err := slab.launch(resp.req); err != nil {
if overdriving, err := slab.launch(resp.req); err != nil {
u.mgr.logger.Errorf("failed to relaunch a sector upload, err %v", err)
break // fail the upload
if !overdriving {
break // fail the upload
}
}
}

Expand Down Expand Up @@ -1401,7 +1403,7 @@ func (s *slabUpload) finish() ([]object.Sector, error) {

remaining := len(s.remaining)
if remaining > 0 {
return nil, fmt.Errorf("failed to upload slab: remaining=%d, inflight=%d, launched=%d uploaders=%d errors=%w", remaining, s.numInflight, s.numLaunched, s.mgr.numUploaders(), s.errs)
return nil, fmt.Errorf("failed to upload slab: remaining=%d, inflight=%d, launched=%d uploaders=%d errors=%d %w", remaining, s.numInflight, s.numLaunched, s.mgr.numUploaders(), len(s.errs), s.errs)
}
return s.sectors, nil
}
Expand All @@ -1412,17 +1414,23 @@ func (s *slabUpload) inflight() uint64 {
return s.numInflight
}

func (s *slabUpload) launch(req *sectorUploadReq) error {
func (s *slabUpload) launch(req *sectorUploadReq) (overdriving bool, err error) {
s.mu.Lock()
defer s.mu.Unlock()

// nothing to do
if req == nil {
return false, nil
}

// launch the req
err := s.mgr.launch(req)
err = s.mgr.launch(req)
if err != nil {
overdriving = req.overdrive && s.overdriving[req.sectorIndex] > 0
span := trace.SpanFromContext(req.ctx)
span.RecordError(err)
span.End()
return err
return
}

// update the state
Expand All @@ -1431,9 +1439,9 @@ func (s *slabUpload) launch(req *sectorUploadReq) error {
if req.overdrive {
s.lastOverdrive = time.Now()
s.overdriving[req.sectorIndex]++
overdriving = true
}

return nil
return
}

func (s *slabUpload) overdrive(ctx context.Context, respChan chan sectorUploadResp) (resetTimer func()) {
Expand Down Expand Up @@ -1484,10 +1492,7 @@ func (s *slabUpload) overdrive(ctx context.Context, respChan chan sectorUploadRe
return
case <-timer.C:
if canOverdrive() {
req := s.nextRequest(respChan)
if req != nil {
_ = s.launch(req) // ignore error
}
_, _ = s.launch(s.nextRequest(respChan)) // ignore result
}
resetTimer()
}
Expand Down Expand Up @@ -1548,6 +1553,11 @@ func (s *slabUpload) receive(resp sectorUploadResp) (finished bool, next bool) {
s.mu.Lock()
defer s.mu.Unlock()

// update the state
if resp.req.overdrive {
s.overdriving[resp.req.sectorIndex]--
}

// failed reqs can't complete the upload
s.numInflight--
if resp.err != nil {
Expand Down

0 comments on commit 63f5e2a

Please sign in to comment.