diff --git a/worker/upload.go b/worker/upload.go index 41c736864..c3729d9c2 100644 --- a/worker/upload.go +++ b/worker/upload.go @@ -71,25 +71,23 @@ type ( } slabUpload struct { - uploadID api.UploadID - created time.Time - lockPriority int - maxOverdrive uint64 - mem *acquiredMemory - overdriveTimeout time.Duration + uploadID api.UploadID + lockPriority int + + maxOverdrive uint64 + lastOverdrive time.Time sectors []*sectorUpload candidates []*candidate // sorted by upload estimate - shards [][]byte - mu sync.Mutex - numInflight uint64 numLaunched uint64 + numInflight uint64 numOverdriving uint64 numUploaded uint64 + numSectors uint64 - lastOverdrive time.Time - errs HostErrorSet + mem *acquiredMemory + errs HostErrorSet } candidate struct { @@ -755,7 +753,7 @@ func (mgr *uploadManager) refreshUploaders(contracts []api.ContractMetadata, bh mgr.uploaders = uploaders } -func (u *upload) newSlabUpload(ctx context.Context, shards [][]byte, uploaders []*uploader, mem *acquiredMemory, maxOverdrive uint64, overdriveTimeout time.Duration) (*slabUpload, []*sectorUploadReq, chan sectorUploadResp) { +func (u *upload) newSlabUpload(ctx context.Context, shards [][]byte, uploaders []*uploader, mem *acquiredMemory, maxOverdrive uint64) (*slabUpload, []*sectorUploadReq, chan sectorUploadResp) { // prepare response channel responseChan := make(chan sectorUploadResp) @@ -800,16 +798,14 @@ func (u *upload) newSlabUpload(ctx context.Context, shards [][]byte, uploaders [ // create slab upload return &slabUpload{ - lockPriority: u.lockPriority, - uploadID: u.id, - created: time.Now(), - maxOverdrive: maxOverdrive, - mem: mem, - overdriveTimeout: overdriveTimeout, + lockPriority: u.lockPriority, + uploadID: u.id, + maxOverdrive: maxOverdrive, + mem: mem, sectors: sectors, candidates: candidates, - shards: shards, + numSectors: uint64(len(shards)), errs: make(HostErrorSet), }, requests, responseChan @@ -848,6 +844,8 @@ func (u *upload) uploadSlab(ctx context.Context, rs api.RedundancySettings, data } func (u *upload) uploadShards(ctx context.Context, shards [][]byte, candidates []*uploader, mem *acquiredMemory, maxOverdrive uint64, overdriveTimeout time.Duration) ([]object.Sector, float64, int64, error) { + start := time.Now() + // add tracing ctx, span := tracing.Tracer.Start(ctx, "uploadShards") defer span.End() @@ -857,7 +855,7 @@ func (u *upload) uploadShards(ctx context.Context, shards [][]byte, candidates [ defer cancel() // prepare the upload - slab, requests, respChan := u.newSlabUpload(ctx, shards, candidates, mem, maxOverdrive, overdriveTimeout) + slab, requests, respChan := u.newSlabUpload(ctx, shards, candidates, mem, maxOverdrive) // launch all shard uploads for _, upload := range requests { @@ -875,7 +873,7 @@ func (u *upload) uploadShards(ctx context.Context, shards [][]byte, candidates [ // collect responses var done bool loop: - for slab.inflight() > 0 && !done { + for slab.numInflight > 0 && !done { select { case <-u.shutdownCtx.Done(): return nil, 0, 0, errors.New("upload stopped") @@ -895,12 +893,12 @@ loop: } // try overdriving a sector - if slab.canOverdrive() { + if slab.canOverdrive(overdriveTimeout) { _, _ = slab.launch(slab.nextRequest(respChan)) // ignore result } case <-timer.C: // try overdriving a sector - if slab.canOverdrive() { + if slab.canOverdrive(overdriveTimeout) { _, _ = slab.launch(slab.nextRequest(respChan)) // ignore result } } @@ -917,25 +915,34 @@ loop: } } + // calculate the upload speed + bytes := slab.numUploaded * rhpv2.SectorSize + ms := time.Since(start).Milliseconds() + speed := int64(bytes) / ms + + // calculate overdrive pct + var numOverdrive uint64 + if slab.numLaunched > slab.numSectors { + numOverdrive = slab.numLaunched - slab.numSectors + } + overdrivePct := float64(numOverdrive) / float64(slab.numSectors) + // register the amount of overdrive sectors - span.SetAttributes(attribute.Int("overdrive", slab.overdriveCnt())) + span.SetAttributes(attribute.Int("overdrive", int(numOverdrive))) sectors, err := slab.finish() - return sectors, slab.overdrivePct(), slab.uploadSpeed(), err + return sectors, overdrivePct, speed, err } -func (s *slabUpload) canOverdrive() bool { - s.mu.Lock() - defer s.mu.Unlock() - +func (s *slabUpload) canOverdrive(overdriveTimeout time.Duration) bool { // overdrive is not kicking in yet - remaining := uint64(len(s.shards)) - s.numUploaded + remaining := s.numSectors - s.numUploaded if remaining >= s.maxOverdrive { return false } // overdrive is not due yet - if time.Since(s.lastOverdrive) < s.overdriveTimeout { + if time.Since(s.lastOverdrive) < overdriveTimeout { return false } @@ -948,26 +955,17 @@ func (s *slabUpload) canOverdrive() bool { } func (s *slabUpload) finish() (sectors []object.Sector, _ error) { - s.mu.Lock() - defer s.mu.Unlock() - - if s.numUploaded < uint64(len(s.shards)) { - remaining := uint64(len(s.shards)) - s.numUploaded + if s.numUploaded < s.numSectors { + remaining := s.numSectors - s.numUploaded return nil, fmt.Errorf("failed to upload slab: launched=%d uploaded=%d remaining=%d inflight=%d uploaders=%d errors=%d %w", s.numLaunched, s.numUploaded, remaining, s.numInflight, len(s.candidates), len(s.errs), s.errs) } - for i := 0; i < len(s.shards); i++ { - sectors = append(sectors, s.sectors[i].uploaded) + for _, sector := range s.sectors { + sectors = append(sectors, sector.uploaded) } return } -func (s *slabUpload) inflight() uint64 { - s.mu.Lock() - defer s.mu.Unlock() - return s.numInflight -} - func (s *slabUpload) ongoingOverdrive(sI int) bool { for _, candidate := range s.candidates { if candidate.used && candidate.overdriving == sI { @@ -978,9 +976,6 @@ func (s *slabUpload) ongoingOverdrive(sI int) bool { } func (s *slabUpload) launch(req *sectorUploadReq) (interrupt bool, err error) { - s.mu.Lock() - defer s.mu.Unlock() - // nothing to do if req == nil { return false, nil @@ -1024,9 +1019,6 @@ func (s *slabUpload) launch(req *sectorUploadReq) (interrupt bool, err error) { } func (s *slabUpload) nextRequest(responseChan chan sectorUploadResp) *sectorUploadReq { - s.mu.Lock() - defer s.mu.Unlock() - // count overdrives overdriveCnts := make(map[int]int) for _, c := range s.candidates { @@ -1057,28 +1049,7 @@ func (s *slabUpload) nextRequest(responseChan chan sectorUploadResp) *sectorUplo } } -func (s *slabUpload) overdriveCnt() int { - s.mu.Lock() - defer s.mu.Unlock() - return int(s.numLaunched) - len(s.sectors) -} - -func (s *slabUpload) overdrivePct() float64 { - s.mu.Lock() - defer s.mu.Unlock() - - numOverdrive := int(s.numLaunched) - len(s.sectors) - if numOverdrive <= 0 { - return 0 - } - - return float64(numOverdrive) / float64(len(s.sectors)) -} - func (s *slabUpload) receive(resp sectorUploadResp) bool { - s.mu.Lock() - defer s.mu.Unlock() - // convenience variable req := resp.req sector := req.sector @@ -1110,7 +1081,7 @@ func (s *slabUpload) receive(resp sectorUploadResp) bool { // update uploaded sectors s.numUploaded++ - // cancel the sector context + // cancel the sector's context sector.cancel() // release hosts that are overdriving this sector @@ -1123,18 +1094,9 @@ func (s *slabUpload) receive(resp sectorUploadResp) bool { // release memory sector.data = nil - s.shards[sector.index] = nil s.mem.ReleaseSome(rhpv2.SectorSize) - return s.numUploaded == uint64(len(s.shards)) -} - -func (s *slabUpload) uploadSpeed() int64 { - s.mu.Lock() - defer s.mu.Unlock() - bytes := s.numUploaded * rhpv2.SectorSize - ms := time.Since(s.created).Milliseconds() - return int64(bytes) / ms + return s.numUploaded == s.numSectors } func (s *sectorUpload) isUploaded() bool {