Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix uploads sometimes getting stuck #785

Merged
merged 3 commits into from
Dec 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 5 additions & 12 deletions worker/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ type (
acquiredMemory struct {
mm *memoryManager

mu sync.Mutex
remaining uint64
}
)
Expand Down Expand Up @@ -66,15 +65,16 @@ func (mm *memoryManager) AcquireMemory(ctx context.Context, amt uint64) *acquire
// check if the context was canceled in the meantime
select {
case <-ctx.Done():
mm.sigNewMem.Broadcast() // flush out other cancelled goroutines
mm.sigNewMem.L.Unlock()
return nil
default:
}
}
mm.available -= amt
mm.sigNewMem.Signal() // wake next goroutine
mm.sigNewMem.L.Unlock()

mm.sigNewMem.Signal() // wake next goroutine
return &acquiredMemory{
mm: mm,
remaining: amt,
Expand All @@ -86,28 +86,21 @@ func (mm *memoryManager) AcquireMemory(ctx context.Context, amt uint64) *acquire
func (am *acquiredMemory) Release() {
am.mm.sigNewMem.L.Lock()
am.mm.available += am.remaining
am.mm.sigNewMem.L.Unlock()

am.mu.Lock()
am.remaining = 0
am.mu.Unlock()

am.mm.sigNewMem.Signal() // wake next goroutine
am.mm.sigNewMem.L.Unlock()
}

// ReleaseSome releases some of the remaining memory to the memory manager.
// Panics if more memory is released than was acquired.
func (am *acquiredMemory) ReleaseSome(amt uint64) {
am.mm.sigNewMem.L.Lock()
if amt > am.remaining {
am.mm.sigNewMem.L.Unlock()
panic("releasing more memory than remaining")
}
am.mm.available += amt
am.mm.sigNewMem.L.Unlock()

am.mu.Lock()
am.remaining -= amt
am.mu.Unlock()

am.mm.sigNewMem.Signal() // wake next goroutine
am.mm.sigNewMem.L.Unlock()
}
129 changes: 24 additions & 105 deletions worker/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,13 +151,11 @@ type (
id api.UploadID
mgr *uploadManager

allowed map[types.FileContractID]struct{}
doneShardTrigger chan struct{}
lockPriority int
allowed map[types.FileContractID]struct{}
lockPriority int

mu sync.Mutex
ongoing []slabID
used map[slabID]map[types.FileContractID]struct{}
mu sync.Mutex
used map[slabID]map[types.FileContractID]struct{}
}

slabUpload struct {
Expand Down Expand Up @@ -578,18 +576,18 @@ func (mgr *uploadManager) Upload(ctx context.Context, r io.Reader, contracts []a
redundantSize := uint64(up.rs.TotalShards) * rhpv2.SectorSize

// launch uploads in a separate goroutine
stopCtx, cancel := context.WithCancel(ctx)
ChrisSchinnerl marked this conversation as resolved.
Show resolved Hide resolved
defer cancel()
go func() {
var slabIndex int
for {
select {
case <-mgr.stopChan:
return // interrupted
case <-ctx.Done():
return // interrupted
default:
}
// acquire memory
mem := mgr.mm.AcquireMemory(stopCtx, redundantSize)
mem := mgr.mm.AcquireMemory(ctx, redundantSize)
if mem == nil {
return // interrupted
}
Expand All @@ -613,7 +611,7 @@ func (mgr *uploadManager) Upload(ctx context.Context, r io.Reader, contracts []a
// unexpected error, notify main thread
select {
case respChan <- slabUploadResponse{err: err}:
case <-stopCtx.Done():
case <-ctx.Done():
}
return
} else if up.packing && errors.Is(err, io.ErrUnexpectedEOF) {
Expand Down Expand Up @@ -734,12 +732,10 @@ func (mgr *uploadManager) newUpload(ctx context.Context, totalShards int, contra
id: id,
mgr: mgr,

allowed: allowed,
doneShardTrigger: make(chan struct{}, 1),
lockPriority: lockPriority,
allowed: allowed,
lockPriority: lockPriority,

ongoing: make([]slabID, 0),
used: make(map[slabID]map[types.FileContractID]struct{}),
used: make(map[slabID]map[types.FileContractID]struct{}),
}, finishFn, nil
}

Expand All @@ -750,56 +746,20 @@ func (mgr *uploadManager) numUploaders() int {
}

func (mgr *uploadManager) candidate(req *sectorUploadReq) *uploader {
// fetch candidates
candidates := func() []*uploader {
mgr.mu.Lock()
defer mgr.mu.Unlock()

// sort the uploaders by their estimate
sort.Slice(mgr.uploaders, func(i, j int) bool {
return mgr.uploaders[i].estimate() < mgr.uploaders[j].estimate()
})

// select top ten candidates
var candidates []*uploader
for _, uploader := range mgr.uploaders {
if req.upload.canUseUploader(req.sID, uploader) {
candidates = append(candidates, uploader)
if len(candidates) == 10 {
break
}
}
}
return candidates
}()

// return early if we have no queues left
if len(candidates) == 0 {
return nil
}

loop:
for {
// if this slab does not have more than 1 parent, we return the best
// candidate
if len(req.upload.parents(req.sID)) <= 1 {
return candidates[0]
}
// fetch candidate
mgr.mu.Lock()
defer mgr.mu.Unlock()

// otherwise we wait, allowing the parents to complete, after which we
// re-sort the candidates
select {
case <-req.upload.doneShardTrigger:
sort.Slice(candidates, func(i, j int) bool {
return candidates[i].estimate() < candidates[j].estimate()
})
continue loop
case <-req.ctx.Done():
break loop
// select candidate with the best estimate
var candidate *uploader
for _, uploader := range mgr.uploaders {
if !req.upload.canUseUploader(req.sID, uploader) {
continue // ignore
} else if candidate == nil || uploader.estimate() < candidate.estimate() {
candidate = uploader
ChrisSchinnerl marked this conversation as resolved.
Show resolved Hide resolved
}
}

return nil
return candidate
}

func (mgr *uploadManager) renewUploader(u *uploader) {
Expand Down Expand Up @@ -900,31 +860,7 @@ func (mgr *uploadManager) tryRecomputeStats() {
mgr.lastRecompute = time.Now()
}

func (u *upload) parents(sID slabID) []slabID {
u.mu.Lock()
defer u.mu.Unlock()

var parents []slabID
for _, ongoing := range u.ongoing {
if ongoing == sID {
break
}
parents = append(parents, ongoing)
}
return parents
}

func (u *upload) finishSlabUpload(upload *slabUpload) {
// update ongoing slab history
u.mu.Lock()
for i, prev := range u.ongoing {
if prev == upload.sID {
u.ongoing = append(u.ongoing[:i], u.ongoing[i+1:]...)
break
}
}
u.mu.Unlock()

// cleanup contexts
upload.mu.Lock()
for _, shard := range upload.remaining {
Expand All @@ -938,11 +874,6 @@ func (u *upload) newSlabUpload(ctx context.Context, shards [][]byte, mem *acquir
var sID slabID
frand.Read(sID[:])

// add to ongoing uploads
u.mu.Lock()
u.ongoing = append(u.ongoing, sID)
u.mu.Unlock()

// create slab upload
slab := &slabUpload{
mgr: u.mgr,
Expand Down Expand Up @@ -1101,15 +1032,6 @@ func (u *upload) uploadShards(ctx context.Context, shards [][]byte, mem *acquire
}
}
}

// handle the response
if resp.err == nil {
// signal the upload a shard was received
select {
case u.doneShardTrigger <- struct{}{}:
default:
}
}
}

// register the amount of overdrive sectors
Expand Down Expand Up @@ -1563,13 +1485,10 @@ func (s *slabUpload) receive(resp sectorUploadResp) bool {
// update remaining sectors
delete(s.remaining, resp.req.sectorIndex)

// release memory - we don't release memory for overdrive sectors because
// it's not included in the initial allocation.
// release memory
resp.req.sector = nil
s.shards[resp.req.sectorIndex] = nil
if !resp.req.overdrive {
s.mem.ReleaseSome(rhpv2.SectorSize)
}
s.mem.ReleaseSome(rhpv2.SectorSize)

return len(s.remaining) == 0
}
Expand Down