Skip to content

Commit

Permalink
worker: cleanup upload types more
Browse files Browse the repository at this point in the history
  • Loading branch information
peterjan committed Dec 8, 2023
1 parent 316160f commit 7bb7e72
Showing 1 changed file with 44 additions and 82 deletions.
126 changes: 44 additions & 82 deletions worker/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,25 +71,23 @@ type (
}

slabUpload struct {
uploadID api.UploadID
created time.Time
lockPriority int
maxOverdrive uint64
mem *acquiredMemory
overdriveTimeout time.Duration
uploadID api.UploadID
lockPriority int

maxOverdrive uint64
lastOverdrive time.Time

sectors []*sectorUpload
candidates []*candidate // sorted by upload estimate
shards [][]byte

mu sync.Mutex
numInflight uint64
numLaunched uint64
numInflight uint64
numOverdriving uint64
numUploaded uint64
numSectors uint64

lastOverdrive time.Time
errs HostErrorSet
mem *acquiredMemory
errs HostErrorSet
}

candidate struct {
Expand Down Expand Up @@ -755,7 +753,7 @@ func (mgr *uploadManager) refreshUploaders(contracts []api.ContractMetadata, bh
mgr.uploaders = uploaders
}

func (u *upload) newSlabUpload(ctx context.Context, shards [][]byte, uploaders []*uploader, mem *acquiredMemory, maxOverdrive uint64, overdriveTimeout time.Duration) (*slabUpload, []*sectorUploadReq, chan sectorUploadResp) {
func (u *upload) newSlabUpload(ctx context.Context, shards [][]byte, uploaders []*uploader, mem *acquiredMemory, maxOverdrive uint64) (*slabUpload, []*sectorUploadReq, chan sectorUploadResp) {
// prepare response channel
responseChan := make(chan sectorUploadResp)

Expand Down Expand Up @@ -800,16 +798,14 @@ func (u *upload) newSlabUpload(ctx context.Context, shards [][]byte, uploaders [

// create slab upload
return &slabUpload{
lockPriority: u.lockPriority,
uploadID: u.id,
created: time.Now(),
maxOverdrive: maxOverdrive,
mem: mem,
overdriveTimeout: overdriveTimeout,
lockPriority: u.lockPriority,
uploadID: u.id,
maxOverdrive: maxOverdrive,
mem: mem,

sectors: sectors,
candidates: candidates,
shards: shards,
numSectors: uint64(len(shards)),

errs: make(HostErrorSet),
}, requests, responseChan
Expand Down Expand Up @@ -848,6 +844,8 @@ func (u *upload) uploadSlab(ctx context.Context, rs api.RedundancySettings, data
}

func (u *upload) uploadShards(ctx context.Context, shards [][]byte, candidates []*uploader, mem *acquiredMemory, maxOverdrive uint64, overdriveTimeout time.Duration) ([]object.Sector, float64, int64, error) {
start := time.Now()

// add tracing
ctx, span := tracing.Tracer.Start(ctx, "uploadShards")
defer span.End()
Expand All @@ -857,7 +855,7 @@ func (u *upload) uploadShards(ctx context.Context, shards [][]byte, candidates [
defer cancel()

// prepare the upload
slab, requests, respChan := u.newSlabUpload(ctx, shards, candidates, mem, maxOverdrive, overdriveTimeout)
slab, requests, respChan := u.newSlabUpload(ctx, shards, candidates, mem, maxOverdrive)

// launch all shard uploads
for _, upload := range requests {
Expand All @@ -875,7 +873,7 @@ func (u *upload) uploadShards(ctx context.Context, shards [][]byte, candidates [
// collect responses
var done bool
loop:
for slab.inflight() > 0 && !done {
for slab.numInflight > 0 && !done {
select {
case <-u.shutdownCtx.Done():
return nil, 0, 0, errors.New("upload stopped")
Expand All @@ -895,12 +893,12 @@ loop:
}

// try overdriving a sector
if slab.canOverdrive() {
if slab.canOverdrive(overdriveTimeout) {
_, _ = slab.launch(slab.nextRequest(respChan)) // ignore result
}
case <-timer.C:
// try overdriving a sector
if slab.canOverdrive() {
if slab.canOverdrive(overdriveTimeout) {
_, _ = slab.launch(slab.nextRequest(respChan)) // ignore result
}
}
Expand All @@ -917,25 +915,34 @@ loop:
}
}

// calculate the upload speed
bytes := slab.numUploaded * rhpv2.SectorSize
ms := time.Since(start).Milliseconds()
speed := int64(bytes) / ms

// calculate overdrive pct
var numOverdrive uint64
if slab.numLaunched > slab.numSectors {
numOverdrive = slab.numLaunched - slab.numSectors
}
overdrivePct := float64(numOverdrive) / float64(slab.numSectors)

// register the amount of overdrive sectors
span.SetAttributes(attribute.Int("overdrive", slab.overdriveCnt()))
span.SetAttributes(attribute.Int("overdrive", int(numOverdrive)))

sectors, err := slab.finish()
return sectors, slab.overdrivePct(), slab.uploadSpeed(), err
return sectors, overdrivePct, speed, err
}

func (s *slabUpload) canOverdrive() bool {
s.mu.Lock()
defer s.mu.Unlock()

func (s *slabUpload) canOverdrive(overdriveTimeout time.Duration) bool {
// overdrive is not kicking in yet
remaining := uint64(len(s.shards)) - s.numUploaded
remaining := s.numSectors - s.numUploaded
if remaining >= s.maxOverdrive {
return false
}

// overdrive is not due yet
if time.Since(s.lastOverdrive) < s.overdriveTimeout {
if time.Since(s.lastOverdrive) < overdriveTimeout {
return false
}

Expand All @@ -948,26 +955,17 @@ func (s *slabUpload) canOverdrive() bool {
}

func (s *slabUpload) finish() (sectors []object.Sector, _ error) {
s.mu.Lock()
defer s.mu.Unlock()

if s.numUploaded < uint64(len(s.shards)) {
remaining := uint64(len(s.shards)) - s.numUploaded
if s.numUploaded < s.numSectors {
remaining := s.numSectors - s.numUploaded
return nil, fmt.Errorf("failed to upload slab: launched=%d uploaded=%d remaining=%d inflight=%d uploaders=%d errors=%d %w", s.numLaunched, s.numUploaded, remaining, s.numInflight, len(s.candidates), len(s.errs), s.errs)
}

for i := 0; i < len(s.shards); i++ {
sectors = append(sectors, s.sectors[i].uploaded)
for _, sector := range s.sectors {
sectors = append(sectors, sector.uploaded)
}
return
}

func (s *slabUpload) inflight() uint64 {
s.mu.Lock()
defer s.mu.Unlock()
return s.numInflight
}

func (s *slabUpload) ongoingOverdrive(sI int) bool {
for _, candidate := range s.candidates {
if candidate.used && candidate.overdriving == sI {
Expand All @@ -978,9 +976,6 @@ func (s *slabUpload) ongoingOverdrive(sI int) bool {
}

func (s *slabUpload) launch(req *sectorUploadReq) (interrupt bool, err error) {
s.mu.Lock()
defer s.mu.Unlock()

// nothing to do
if req == nil {
return false, nil
Expand Down Expand Up @@ -1024,9 +1019,6 @@ func (s *slabUpload) launch(req *sectorUploadReq) (interrupt bool, err error) {
}

func (s *slabUpload) nextRequest(responseChan chan sectorUploadResp) *sectorUploadReq {
s.mu.Lock()
defer s.mu.Unlock()

// count overdrives
overdriveCnts := make(map[int]int)
for _, c := range s.candidates {
Expand Down Expand Up @@ -1057,28 +1049,7 @@ func (s *slabUpload) nextRequest(responseChan chan sectorUploadResp) *sectorUplo
}
}

func (s *slabUpload) overdriveCnt() int {
s.mu.Lock()
defer s.mu.Unlock()
return int(s.numLaunched) - len(s.sectors)
}

func (s *slabUpload) overdrivePct() float64 {
s.mu.Lock()
defer s.mu.Unlock()

numOverdrive := int(s.numLaunched) - len(s.sectors)
if numOverdrive <= 0 {
return 0
}

return float64(numOverdrive) / float64(len(s.sectors))
}

func (s *slabUpload) receive(resp sectorUploadResp) bool {
s.mu.Lock()
defer s.mu.Unlock()

// convenience variable
req := resp.req
sector := req.sector
Expand Down Expand Up @@ -1110,7 +1081,7 @@ func (s *slabUpload) receive(resp sectorUploadResp) bool {
// update uploaded sectors
s.numUploaded++

// cancel the sector context
// cancel the sector's context
sector.cancel()

// release hosts that are overdriving this sector
Expand All @@ -1123,18 +1094,9 @@ func (s *slabUpload) receive(resp sectorUploadResp) bool {

// release memory
sector.data = nil
s.shards[sector.index] = nil
s.mem.ReleaseSome(rhpv2.SectorSize)

return s.numUploaded == uint64(len(s.shards))
}

func (s *slabUpload) uploadSpeed() int64 {
s.mu.Lock()
defer s.mu.Unlock()
bytes := s.numUploaded * rhpv2.SectorSize
ms := time.Since(s.created).Milliseconds()
return int64(bytes) / ms
return s.numUploaded == s.numSectors
}

func (s *sectorUpload) isUploaded() bool {
Expand Down

0 comments on commit 7bb7e72

Please sign in to comment.