From 4113d65075cd4cce8313edbc124b542ffbab6904 Mon Sep 17 00:00:00 2001 From: Chris Schinnerl Date: Thu, 23 Nov 2023 11:28:56 +0100 Subject: [PATCH 01/13] worker: start uploading next packed slab before previous one is finished --- worker/migrations.go | 2 +- worker/upload.go | 77 ++++++++++++++++++++++++++++++++++++-------- 2 files changed, 64 insertions(+), 15 deletions(-) diff --git a/worker/migrations.go b/worker/migrations.go index e8ce000de..79e555256 100644 --- a/worker/migrations.go +++ b/worker/migrations.go @@ -105,7 +105,7 @@ SHARDS: } // migrate the shards - uploaded, err := u.UploadShards(ctx, shards, allowed, bh, lockingPriorityUpload) + uploaded, err := u.UploadShards(ctx, shards, allowed, bh, lockingPriorityUpload, nil) if err != nil { return 0, surchargeApplied, fmt.Errorf("failed to upload slab for migration: %w", err) } diff --git a/worker/upload.go b/worker/upload.go index 85d839d96..1f9619e74 100644 --- a/worker/upload.go +++ b/worker/upload.go @@ -376,24 +376,73 @@ func (w *worker) tryUploadPackedSlabs(ctx context.Context, rs api.RedundancySett } func (w *worker) uploadPackedSlabs(ctx context.Context, lockingDuration time.Duration, rs api.RedundancySettings, contractSet string, limit, lockPriority int) (uploaded int, err error) { - // fetch packed slabs - packedSlabs, err := w.bus.PackedSlabsForUpload(ctx, lockingDuration, uint8(rs.MinShards), uint8(rs.TotalShards), contractSet, limit) - if err != nil { - return 0, fmt.Errorf("couldn't fetch packed slabs from bus: %v", err) - } + // create the next slab channel + nextSlabChan := make(chan struct{}, 1) + defer close(nextSlabChan) // upload packed slabs - for _, ps := range packedSlabs { - err = w.uploadPackedSlab(ctx, ps, rs, contractSet, lockPriority) - if err != nil { - return + var mu sync.Mutex + errChan := make(chan error, 1) + + var ps api.PackedSlab + var packedSlabs []api.PackedSlab + var wg sync.WaitGroup + +LOOP: + for { + // fetch packed slabs to upload + if len(packedSlabs) == 0 { + packedSlabs, err = w.bus.PackedSlabsForUpload(ctx, lockingDuration, uint8(rs.MinShards), uint8(rs.TotalShards), contractSet, limit) + if err != nil { + err = fmt.Errorf("couldn't fetch packed slabs from bus: %v", err) + break LOOP + } else if len(packedSlabs) == 0 { + break LOOP + } + } + ps, packedSlabs = packedSlabs[0], packedSlabs[1:] + + // launch upload for slab + wg.Add(1) + go func(ps api.PackedSlab) { + defer wg.Done() + err = w.uploadPackedSlab(ctx, ps, rs, contractSet, lockPriority, nextSlabChan) + mu.Lock() + defer mu.Unlock() + if err != nil { + select { + case errChan <- err: + default: + } + } else { + uploaded++ + } + }(ps) + + // block until either an error occurs or the next slab is ready to be + // uploaded + select { + case <-ctx.Done(): + break LOOP + case err = <-errChan: + break LOOP + case <-nextSlabChan: } - uploaded++ + } + + // wait for all threads to finish + wg.Wait() + + // check for any last errors + select { + case uploadErr := <-errChan: + err = errors.Join(err, uploadErr) + default: } return } -func (w *worker) uploadPackedSlab(ctx context.Context, ps api.PackedSlab, rs api.RedundancySettings, contractSet string, lockPriority int) error { +func (w *worker) uploadPackedSlab(ctx context.Context, ps api.PackedSlab, rs api.RedundancySettings, contractSet string, lockPriority int, nextSlabChan chan struct{}) error { // create a context with sane timeout ctx, cancel := context.WithTimeout(ctx, defaultPackedSlabsUploadTimeout) defer cancel() @@ -415,7 +464,7 @@ func (w *worker) uploadPackedSlab(ctx context.Context, ps api.PackedSlab, rs api // upload packed slab shards := encryptPartialSlab(ps.Data, ps.Key, uint8(rs.MinShards), uint8(rs.TotalShards)) - sectors, err := w.uploadManager.UploadShards(ctx, shards, contracts, up.CurrentHeight, lockPriority) + sectors, err := w.uploadManager.UploadShards(ctx, shards, contracts, up.CurrentHeight, lockPriority, nextSlabChan) if err != nil { return fmt.Errorf("couldn't upload packed slab, err: %v", err) } @@ -627,7 +676,7 @@ loop: return o, partialSlab, hr.Hash(), nil } -func (mgr *uploadManager) UploadShards(ctx context.Context, shards [][]byte, contracts []api.ContractMetadata, bh uint64, lockPriority int) ([]object.Sector, error) { +func (mgr *uploadManager) UploadShards(ctx context.Context, shards [][]byte, contracts []api.ContractMetadata, bh uint64, lockPriority int, nextSlabChan chan struct{}) ([]object.Sector, error) { // initiate the upload upload, finishFn, err := mgr.newUpload(ctx, len(shards), contracts, bh, lockPriority) if err != nil { @@ -636,7 +685,7 @@ func (mgr *uploadManager) UploadShards(ctx context.Context, shards [][]byte, con defer finishFn() // upload the shards - sectors, err := upload.uploadShards(ctx, shards, nil) + sectors, err := upload.uploadShards(ctx, shards, nextSlabChan) if err != nil { return nil, err } From ece85c69304605bb1e48c45ed34d1e3681423cee Mon Sep 17 00:00:00 2001 From: Chris Schinnerl Date: Thu, 23 Nov 2023 12:29:54 +0100 Subject: [PATCH 02/13] worker: fix TestUploadPacking --- worker/upload.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/worker/upload.go b/worker/upload.go index 1f9619e74..67db7ab1a 100644 --- a/worker/upload.go +++ b/worker/upload.go @@ -426,7 +426,7 @@ LOOP: break LOOP case err = <-errChan: break LOOP - case <-nextSlabChan: + case nextSlabChan <- struct{}{}: } } From 1222d7efc2352183e735b929329c04b994f13acb Mon Sep 17 00:00:00 2001 From: Chris Schinnerl Date: Thu, 23 Nov 2023 12:47:48 +0100 Subject: [PATCH 03/13] worker: fix race --- worker/upload.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/worker/upload.go b/worker/upload.go index 67db7ab1a..b59250abb 100644 --- a/worker/upload.go +++ b/worker/upload.go @@ -384,7 +384,6 @@ func (w *worker) uploadPackedSlabs(ctx context.Context, lockingDuration time.Dur var mu sync.Mutex errChan := make(chan error, 1) - var ps api.PackedSlab var packedSlabs []api.PackedSlab var wg sync.WaitGroup @@ -400,13 +399,14 @@ LOOP: break LOOP } } + var ps api.PackedSlab ps, packedSlabs = packedSlabs[0], packedSlabs[1:] // launch upload for slab wg.Add(1) go func(ps api.PackedSlab) { defer wg.Done() - err = w.uploadPackedSlab(ctx, ps, rs, contractSet, lockPriority, nextSlabChan) + err := w.uploadPackedSlab(ctx, ps, rs, contractSet, lockPriority, nextSlabChan) mu.Lock() defer mu.Unlock() if err != nil { From 9e2d58427c222372fe50d5d02fc42a3047282c9f Mon Sep 17 00:00:00 2001 From: Chris Schinnerl Date: Thu, 23 Nov 2023 15:38:19 +0100 Subject: [PATCH 04/13] worker: free shards as uploads complete --- worker/upload.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/worker/upload.go b/worker/upload.go index b59250abb..ba77b4437 100644 --- a/worker/upload.go +++ b/worker/upload.go @@ -1597,6 +1597,9 @@ func (s *slabUpload) receive(resp sectorUploadResp) (finished bool, next bool) { delete(s.remaining, resp.req.sectorIndex) finished = len(s.remaining) == 0 next = len(s.remaining) <= int(s.mgr.maxOverdrive) + + resp.req.sector = nil + s.shards[resp.req.sectorIndex] = nil return } From 4d98f0219756d0caee9cadb41186dc04d78f9cea Mon Sep 17 00:00:00 2001 From: Chris Schinnerl Date: Thu, 23 Nov 2023 16:02:59 +0100 Subject: [PATCH 05/13] worker: address review comments --- worker/upload.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/worker/upload.go b/worker/upload.go index ba77b4437..e7c9eab6c 100644 --- a/worker/upload.go +++ b/worker/upload.go @@ -407,15 +407,15 @@ LOOP: go func(ps api.PackedSlab) { defer wg.Done() err := w.uploadPackedSlab(ctx, ps, rs, contractSet, lockPriority, nextSlabChan) - mu.Lock() - defer mu.Unlock() if err != nil { select { case errChan <- err: default: } } else { + mu.Lock() uploaded++ + mu.Unlock() } }(ps) From 86fde245c2e6705070b2dc0297508b04dce6fd6d Mon Sep 17 00:00:00 2001 From: Chris Schinnerl Date: Mon, 27 Nov 2023 13:31:29 +0100 Subject: [PATCH 06/13] worker: add upload memory manager --- worker/memory.go | 94 +++++++++++++++++++++++++ worker/migrations.go | 7 +- worker/upload.go | 163 ++++++++++++++++++++++--------------------- 3 files changed, 182 insertions(+), 82 deletions(-) create mode 100644 worker/memory.go diff --git a/worker/memory.go b/worker/memory.go new file mode 100644 index 000000000..6096e7ea8 --- /dev/null +++ b/worker/memory.go @@ -0,0 +1,94 @@ +package worker + +import ( + "context" + "sync" +) + +type ( + // memoryManager helps regulate processes that use a lot of memory. Such as + // uploads and downloads. + memoryManager struct { + totalAvailable uint64 + + mu sync.Mutex + sigNewMem sync.Cond + available uint64 + } + + acquiredMemory struct { + mm *memoryManager + + mu sync.Mutex + remaining uint64 + } +) + +func newMemoryManager() *memoryManager { + mm := &memoryManager{ + totalAvailable: 1 << 30, // 1 GB + } + mm.available = mm.totalAvailable + mm.sigNewMem = *sync.NewCond(&mm.mu) + return mm +} + +func (mm *memoryManager) AcquireMemory(ctx context.Context, amt uint64) <-chan *acquiredMemory { + memChan := make(chan *acquiredMemory, 1) + // block until enough memory is available + mm.sigNewMem.L.Lock() + for mm.available < amt { + mm.sigNewMem.Wait() + + // check if the context was canceled in the meantime + select { + case <-ctx.Done(): + mm.sigNewMem.L.Unlock() + close(memChan) + return memChan + default: + } + } + mm.available -= amt + mm.sigNewMem.L.Unlock() + + memChan <- &acquiredMemory{ + mm: mm, + remaining: amt, + } + close(memChan) + + mm.sigNewMem.Signal() // wake next goroutine + return memChan +} + +// release returns all the remaining memory to the memory manager. Should always +// be called on every acquiredMemory when done using it. +func (am *acquiredMemory) Release() { + am.mm.sigNewMem.L.Lock() + am.mm.available += am.remaining + am.mm.sigNewMem.L.Unlock() + + am.mu.Lock() + am.remaining = 0 + am.mu.Unlock() + + am.mm.sigNewMem.Signal() // wake next goroutine +} + +// ReleaseSome releases some of the remaining memory to the memory manager. +// Panics if more memory is released than was acquired. +func (am *acquiredMemory) ReleaseSome(amt uint64) { + am.mm.sigNewMem.L.Lock() + if amt > am.remaining { + panic("releasing more memory than remaining") + } + am.mm.available += amt + am.mm.sigNewMem.L.Unlock() + + am.mu.Lock() + am.remaining -= amt + am.mu.Unlock() + + am.mm.sigNewMem.Signal() // wake next goroutine +} diff --git a/worker/migrations.go b/worker/migrations.go index 79e555256..fda36b021 100644 --- a/worker/migrations.go +++ b/worker/migrations.go @@ -4,6 +4,7 @@ import ( "context" "fmt" + rhpv2 "go.sia.tech/core/rhp/v2" "go.sia.tech/core/types" "go.sia.tech/renterd/api" "go.sia.tech/renterd/object" @@ -82,6 +83,10 @@ SHARDS: return 0, false, fmt.Errorf("not enough hosts to download unhealthy shard, %d<%d", len(s.Shards)-missingShards, int(s.MinShards)) } + // acquire memory for the migration + mem := <-u.mm.AcquireMemory(ctx, uint64(missingShards)*rhpv2.SectorSize) + defer mem.Release() + // download the slab shards, surchargeApplied, err := d.DownloadSlab(ctx, *s, dlContracts) if err != nil { @@ -105,7 +110,7 @@ SHARDS: } // migrate the shards - uploaded, err := u.UploadShards(ctx, shards, allowed, bh, lockingPriorityUpload, nil) + uploaded, err := u.UploadShards(ctx, shards, allowed, bh, lockingPriorityUpload, mem) if err != nil { return 0, surchargeApplied, fmt.Errorf("failed to upload slab for migration: %w", err) } diff --git a/worker/upload.go b/worker/upload.go index e7c9eab6c..57efca8ea 100644 --- a/worker/upload.go +++ b/worker/upload.go @@ -35,7 +35,6 @@ const ( defaultPackedSlabsLockDuration = 10 * time.Minute defaultPackedSlabsUploadTimeout = 10 * time.Minute - defaultPackedSlabsLimit = 1 ) var ( @@ -114,6 +113,7 @@ type ( hp hostProvider rl revisionLocker logger *zap.SugaredLogger + mm *memoryManager maxOverdrive uint64 overdriveTimeout time.Duration @@ -163,6 +163,7 @@ type ( slabUpload struct { mgr *uploadManager + mem *acquiredMemory upload *upload sID slabID @@ -353,7 +354,7 @@ func (w *worker) threadedUploadPackedSlabs(rs api.RedundancySettings, contractSe // keep uploading packed slabs until we're done for { - uploaded, err := w.uploadPackedSlabs(context.Background(), defaultPackedSlabsLockDuration, rs, contractSet, defaultPackedSlabsLimit, lockPriority) + uploaded, err := w.uploadPackedSlabs(context.Background(), defaultPackedSlabsLockDuration, rs, contractSet, lockPriority) if err != nil { w.logger.Errorf("couldn't upload packed slabs, err: %v", err) return @@ -367,7 +368,7 @@ func (w *worker) tryUploadPackedSlabs(ctx context.Context, rs api.RedundancySett // if we want to block, try and upload one packed slab synchronously, we use // a slightly higher upload priority to avoid reaching the context deadline if block { - _, err = w.uploadPackedSlabs(ctx, defaultPackedSlabsLockDuration, rs, contractSet, defaultPackedSlabsLimit, lockingPriorityBlockedUpload) + _, err = w.uploadPackedSlabs(ctx, defaultPackedSlabsLockDuration, rs, contractSet, lockingPriorityBlockedUpload) } // make sure there's a goroutine uploading the remainder of the packed slabs @@ -375,38 +376,49 @@ func (w *worker) tryUploadPackedSlabs(ctx context.Context, rs api.RedundancySett return } -func (w *worker) uploadPackedSlabs(ctx context.Context, lockingDuration time.Duration, rs api.RedundancySettings, contractSet string, limit, lockPriority int) (uploaded int, err error) { - // create the next slab channel - nextSlabChan := make(chan struct{}, 1) - defer close(nextSlabChan) - +func (w *worker) uploadPackedSlabs(ctx context.Context, lockingDuration time.Duration, rs api.RedundancySettings, contractSet string, lockPriority int) (uploaded int, err error) { // upload packed slabs var mu sync.Mutex errChan := make(chan error, 1) - var packedSlabs []api.PackedSlab var wg sync.WaitGroup + redundantSize := uint64(rs.TotalShards) * rhpv2.SectorSize LOOP: for { + // block until either an error occurs or the next slab is ready to be + // uploaded + var mem *acquiredMemory + select { + case <-ctx.Done(): + break LOOP + case err = <-errChan: + break LOOP + case mem = <-w.uploadManager.mm.AcquireMemory(ctx, redundantSize): + } + if mem != nil { + break // interrupted + } + // fetch packed slabs to upload - if len(packedSlabs) == 0 { - packedSlabs, err = w.bus.PackedSlabsForUpload(ctx, lockingDuration, uint8(rs.MinShards), uint8(rs.TotalShards), contractSet, limit) - if err != nil { - err = fmt.Errorf("couldn't fetch packed slabs from bus: %v", err) - break LOOP - } else if len(packedSlabs) == 0 { - break LOOP - } + var packedSlabs []api.PackedSlab + packedSlabs, err = w.bus.PackedSlabsForUpload(ctx, lockingDuration, uint8(rs.MinShards), uint8(rs.TotalShards), contractSet, 1) + if err != nil { + err = fmt.Errorf("couldn't fetch packed slabs from bus: %v", err) + mem.Release() + break LOOP + } else if len(packedSlabs) == 0 { + mem.Release() + break LOOP // no more slabs } - var ps api.PackedSlab - ps, packedSlabs = packedSlabs[0], packedSlabs[1:] + ps := packedSlabs[0] // launch upload for slab wg.Add(1) go func(ps api.PackedSlab) { + defer mem.Release() defer wg.Done() - err := w.uploadPackedSlab(ctx, ps, rs, contractSet, lockPriority, nextSlabChan) + err := w.uploadPackedSlab(ctx, ps, rs, contractSet, lockPriority, mem) if err != nil { select { case errChan <- err: @@ -418,16 +430,6 @@ LOOP: mu.Unlock() } }(ps) - - // block until either an error occurs or the next slab is ready to be - // uploaded - select { - case <-ctx.Done(): - break LOOP - case err = <-errChan: - break LOOP - case nextSlabChan <- struct{}{}: - } } // wait for all threads to finish @@ -442,7 +444,7 @@ LOOP: return } -func (w *worker) uploadPackedSlab(ctx context.Context, ps api.PackedSlab, rs api.RedundancySettings, contractSet string, lockPriority int, nextSlabChan chan struct{}) error { +func (w *worker) uploadPackedSlab(ctx context.Context, ps api.PackedSlab, rs api.RedundancySettings, contractSet string, lockPriority int, mem *acquiredMemory) error { // create a context with sane timeout ctx, cancel := context.WithTimeout(ctx, defaultPackedSlabsUploadTimeout) defer cancel() @@ -464,7 +466,7 @@ func (w *worker) uploadPackedSlab(ctx context.Context, ps api.PackedSlab, rs api // upload packed slab shards := encryptPartialSlab(ps.Data, ps.Key, uint8(rs.MinShards), uint8(rs.TotalShards)) - sectors, err := w.uploadManager.UploadShards(ctx, shards, contracts, up.CurrentHeight, lockPriority, nextSlabChan) + sectors, err := w.uploadManager.UploadShards(ctx, shards, contracts, up.CurrentHeight, lockPriority, mem) if err != nil { return fmt.Errorf("couldn't upload packed slab, err: %v", err) } @@ -494,6 +496,7 @@ func newUploadManager(b Bus, hp hostProvider, rl revisionLocker, maxOverdrive ui hp: hp, rl: rl, logger: logger, + mm: newMemoryManager(), maxOverdrive: maxOverdrive, overdriveTimeout: overdriveTimeout, @@ -599,10 +602,6 @@ func (mgr *uploadManager) Upload(ctx context.Context, r io.Reader, up uploadPara } defer finishFn() - // create the next slab channel - nextSlabChan := make(chan struct{}, 1) - defer close(nextSlabChan) - // create the response channel respChan := make(chan slabUploadResponse) @@ -613,41 +612,47 @@ func (mgr *uploadManager) Upload(ctx context.Context, r io.Reader, up uploadPara // prepare slab size size := int64(up.rs.MinShards) * rhpv2.SectorSize + redundantSize := uint64(up.rs.TotalShards) * rhpv2.SectorSize + + // launch uploads and collect responses loop: for { select { case <-mgr.stopChan: return object.Object{}, nil, "", errors.New("manager was stopped") - case <-ctx.Done(): - return object.Object{}, nil, "", errors.New("upload timed out") - case nextSlabChan <- struct{}{}: + case mem := <-mgr.mm.AcquireMemory(ctx, redundantSize): + if mem == nil { + return object.Object{}, nil, "", errors.New("upload timed out") + } // read next slab's data data := make([]byte, size) length, err := io.ReadFull(io.LimitReader(cr, size), data) if err == io.EOF { if slabIndex == 0 { + mem.Release() break loop } numSlabs = slabIndex if partialSlab != nil { numSlabs-- // don't wait on partial slab } - if len(responses) == numSlabs { - break loop - } - continue + mem.Release() + // no more data to upload + break loop } else if err != nil && err != io.ErrUnexpectedEOF { + mem.Release() return object.Object{}, nil, "", err } if up.packing && errors.Is(err, io.ErrUnexpectedEOF) { // If uploadPacking is true, we return the partial slab without // uploading. partialSlab = data[:length] - <-nextSlabChan // trigger next iteration + mem.Release() // trigger next iteration } else { // Otherwise we upload it. go func(rs api.RedundancySettings, data []byte, length, slabIndex int) { - u.uploadSlab(ctx, rs, data, length, slabIndex, respChan, nextSlabChan) + u.uploadSlab(ctx, rs, data, length, slabIndex, respChan, mem) + mem.Release() }(up.rs, data, length, slabIndex) } slabIndex++ @@ -655,8 +660,6 @@ loop: if res.err != nil { return object.Object{}, nil, "", res.err } - - // collect the response and potentially break out of the loop responses = append(responses, res) if len(responses) == numSlabs { break loop @@ -664,6 +667,19 @@ loop: } } + // keep collecting responses + for len(responses) < numSlabs { + select { + case <-mgr.stopChan: + return object.Object{}, nil, "", errors.New("manager was stopped") + case res := <-respChan: + if res.err != nil { + return object.Object{}, nil, "", res.err + } + responses = append(responses, res) + } + } + // sort the slabs by index sort.Slice(responses, func(i, j int) bool { return responses[i].index < responses[j].index @@ -676,7 +692,7 @@ loop: return o, partialSlab, hr.Hash(), nil } -func (mgr *uploadManager) UploadShards(ctx context.Context, shards [][]byte, contracts []api.ContractMetadata, bh uint64, lockPriority int, nextSlabChan chan struct{}) ([]object.Sector, error) { +func (mgr *uploadManager) UploadShards(ctx context.Context, shards [][]byte, contracts []api.ContractMetadata, bh uint64, lockPriority int, mem *acquiredMemory) ([]object.Sector, error) { // initiate the upload upload, finishFn, err := mgr.newUpload(ctx, len(shards), contracts, bh, lockPriority) if err != nil { @@ -685,7 +701,7 @@ func (mgr *uploadManager) UploadShards(ctx context.Context, shards [][]byte, con defer finishFn() // upload the shards - sectors, err := upload.uploadShards(ctx, shards, nextSlabChan) + sectors, err := upload.uploadShards(ctx, shards, mem) if err != nil { return nil, err } @@ -947,7 +963,7 @@ func (u *upload) finishSlabUpload(upload *slabUpload) { upload.mu.Unlock() } -func (u *upload) newSlabUpload(ctx context.Context, shards [][]byte) (*slabUpload, []*sectorUploadReq, chan sectorUploadResp) { +func (u *upload) newSlabUpload(ctx context.Context, shards [][]byte, mem *acquiredMemory) (*slabUpload, []*sectorUploadReq, chan sectorUploadResp) { // create slab id var sID slabID frand.Read(sID[:]) @@ -960,6 +976,7 @@ func (u *upload) newSlabUpload(ctx context.Context, shards [][]byte) (*slabUploa // create slab upload slab := &slabUpload{ mgr: u.mgr, + mem: mem, upload: u, sID: sID, @@ -1023,7 +1040,7 @@ func (u *upload) canUseUploader(sID slabID, ul *uploader) bool { return !used } -func (u *upload) uploadSlab(ctx context.Context, rs api.RedundancySettings, data []byte, length, index int, respChan chan slabUploadResponse, nextSlabChan chan struct{}) { +func (u *upload) uploadSlab(ctx context.Context, rs api.RedundancySettings, data []byte, length, index int, respChan chan slabUploadResponse, mem *acquiredMemory) { // cancel any sector uploads once the slab is done. ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -1048,7 +1065,7 @@ func (u *upload) uploadSlab(ctx context.Context, rs api.RedundancySettings, data resp.slab.Slab.Encrypt(shards) // upload the shards - resp.slab.Slab.Shards, resp.err = u.uploadShards(ctx, shards, nextSlabChan) + resp.slab.Slab.Shards, resp.err = u.uploadShards(ctx, shards, mem) // send the response select { @@ -1068,13 +1085,13 @@ func (u *upload) markUsed(sID slabID, fcid types.FileContractID) { u.used[sID][fcid] = struct{}{} } -func (u *upload) uploadShards(ctx context.Context, shards [][]byte, nextSlabChan chan struct{}) ([]object.Sector, error) { +func (u *upload) uploadShards(ctx context.Context, shards [][]byte, mem *acquiredMemory) ([]object.Sector, error) { // add tracing ctx, span := tracing.Tracer.Start(ctx, "uploadShards") defer span.End() // prepare the upload - slab, requests, respChan := u.newSlabUpload(ctx, shards) + slab, requests, respChan := u.newSlabUpload(ctx, shards, mem) span.SetAttributes(attribute.Stringer("id", slab.sID)) defer u.finishSlabUpload(slab) @@ -1090,8 +1107,6 @@ func (u *upload) uploadShards(ctx context.Context, shards [][]byte, nextSlabChan // collect responses var done bool - var next bool - var triggered bool for slab.inflight() > 0 && !done { var resp sectorUploadResp select { @@ -1105,16 +1120,7 @@ func (u *upload) uploadShards(ctx context.Context, shards [][]byte, nextSlabChan resetOverdrive() // receive the response - done, next = slab.receive(resp) - - // try and trigger next slab - if next && !triggered { - select { - case <-nextSlabChan: - triggered = true - default: - } - } + done = slab.receive(resp) // relaunch non-overdrive uploads if !done && resp.err != nil && !resp.req.overdrive { @@ -1136,15 +1142,6 @@ func (u *upload) uploadShards(ctx context.Context, shards [][]byte, nextSlabChan } } - // make sure next slab is triggered - if done && !triggered { - select { - case <-nextSlabChan: - triggered = true - default: - } - } - // register the amount of overdrive sectors span.SetAttributes(attribute.Int("overdrive", slab.overdriveCnt())) @@ -1560,7 +1557,7 @@ func (s *slabUpload) overdrivePct() float64 { return float64(numOverdrive) / float64(len(s.sectors)) } -func (s *slabUpload) receive(resp sectorUploadResp) (finished bool, next bool) { +func (s *slabUpload) receive(resp sectorUploadResp) bool { s.mu.Lock() defer s.mu.Unlock() @@ -1573,12 +1570,12 @@ func (s *slabUpload) receive(resp sectorUploadResp) (finished bool, next bool) { s.numInflight-- if resp.err != nil { s.errs[resp.req.hk] = resp.err - return false, false + return false } // redundant sectors can't complete the upload if s.sectors[resp.req.sectorIndex].Root != (types.Hash256{}) { - return false, false + return false } // store the sector and call cancel on the sector ctx @@ -1595,12 +1592,16 @@ func (s *slabUpload) receive(resp sectorUploadResp) (finished bool, next bool) { // update remaining sectors delete(s.remaining, resp.req.sectorIndex) - finished = len(s.remaining) == 0 - next = len(s.remaining) <= int(s.mgr.maxOverdrive) + // release memory - we don't release memory for overdrive sectors because + // it's not included in the initial allocation. resp.req.sector = nil s.shards[resp.req.sectorIndex] = nil - return + if !resp.req.overdrive { + s.mem.ReleaseSome(rhpv2.SectorSize) + } + + return len(s.remaining) == 0 } func (a *dataPoints) Average() float64 { From 0f674c6fca93c1cadf7903cbf295c6e8fce548b6 Mon Sep 17 00:00:00 2001 From: Chris Schinnerl Date: Mon, 27 Nov 2023 13:58:33 +0100 Subject: [PATCH 07/13] testing: fix TestUploadPacking --- worker/upload.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/worker/upload.go b/worker/upload.go index 57efca8ea..6291fd52d 100644 --- a/worker/upload.go +++ b/worker/upload.go @@ -396,7 +396,7 @@ LOOP: break LOOP case mem = <-w.uploadManager.mm.AcquireMemory(ctx, redundantSize): } - if mem != nil { + if mem == nil { break // interrupted } From 4f877db0c9dccde431fb912894b1870dc9ae7f4d Mon Sep 17 00:00:00 2001 From: Chris Schinnerl Date: Mon, 27 Nov 2023 14:23:11 +0100 Subject: [PATCH 08/13] testing: fix TestMigrations --- worker/memory.go | 3 +++ worker/migrations.go | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/worker/memory.go b/worker/memory.go index 6096e7ea8..ec8afcad9 100644 --- a/worker/memory.go +++ b/worker/memory.go @@ -34,6 +34,9 @@ func newMemoryManager() *memoryManager { } func (mm *memoryManager) AcquireMemory(ctx context.Context, amt uint64) <-chan *acquiredMemory { + if amt == 0 { + panic("cannot acquire 0 memory") + } memChan := make(chan *acquiredMemory, 1) // block until enough memory is available mm.sigNewMem.L.Lock() diff --git a/worker/migrations.go b/worker/migrations.go index fda36b021..a822a4e55 100644 --- a/worker/migrations.go +++ b/worker/migrations.go @@ -84,7 +84,7 @@ SHARDS: } // acquire memory for the migration - mem := <-u.mm.AcquireMemory(ctx, uint64(missingShards)*rhpv2.SectorSize) + mem := <-u.mm.AcquireMemory(ctx, uint64(len(shardIndices))*rhpv2.SectorSize) defer mem.Release() // download the slab From 34f1651ba08aa752e17b88d3f78b5a8cadc9bf41 Mon Sep 17 00:00:00 2001 From: Chris Schinnerl Date: Mon, 27 Nov 2023 15:35:36 +0100 Subject: [PATCH 09/13] worker: make memory configurable --- cmd/renterd/main.go | 3 +++ config/config.go | 1 + internal/node/node.go | 2 +- internal/testing/cluster.go | 1 + worker/memory.go | 10 +++++++--- worker/upload.go | 8 ++++---- worker/worker.go | 8 ++++++-- 7 files changed, 23 insertions(+), 10 deletions(-) diff --git a/cmd/renterd/main.go b/cmd/renterd/main.go index 90ce3eaa2..697799559 100644 --- a/cmd/renterd/main.go +++ b/cmd/renterd/main.go @@ -94,6 +94,7 @@ var ( DownloadMaxOverdrive: 5, DownloadOverdriveTimeout: 3 * time.Second, + UploadMaxMemory: 1 << 30, // 1 GiB UploadMaxOverdrive: 5, UploadOverdriveTimeout: 3 * time.Second, }, @@ -295,6 +296,7 @@ func main() { flag.Uint64Var(&cfg.Worker.DownloadMaxOverdrive, "worker.downloadMaxOverdrive", cfg.Worker.DownloadMaxOverdrive, "maximum number of active overdrive workers when downloading a slab") flag.StringVar(&cfg.Worker.ID, "worker.id", cfg.Worker.ID, "unique identifier of worker used internally - can be overwritten using the RENTERD_WORKER_ID environment variable") flag.DurationVar(&cfg.Worker.DownloadOverdriveTimeout, "worker.downloadOverdriveTimeout", cfg.Worker.DownloadOverdriveTimeout, "timeout applied to slab downloads that decides when we start overdriving") + flag.Uint64Var(&cfg.Worker.UploadMaxMemory, "worker.uploadMaxMemory", cfg.Worker.UploadMaxMemory, "maximum amount of ram the worker allocates for slabs when uploading - can be overwritten using the RENTERD_WORKER_UPLOAD_MAX_MEMORY environment variable") flag.Uint64Var(&cfg.Worker.UploadMaxOverdrive, "worker.uploadMaxOverdrive", cfg.Worker.UploadMaxOverdrive, "maximum number of active overdrive workers when uploading a slab") flag.DurationVar(&cfg.Worker.UploadOverdriveTimeout, "worker.uploadOverdriveTimeout", cfg.Worker.UploadOverdriveTimeout, "timeout applied to slab uploads that decides when we start overdriving") flag.BoolVar(&cfg.Worker.Enabled, "worker.enabled", cfg.Worker.Enabled, "enable/disable creating a worker - can be overwritten using the RENTERD_WORKER_ENABLED environment variable") @@ -358,6 +360,7 @@ func main() { parseEnvVar("RENTERD_WORKER_ENABLED", &cfg.Worker.Enabled) parseEnvVar("RENTERD_WORKER_ID", &cfg.Worker.ID) parseEnvVar("RENTERD_WORKER_UNAUTHENTICATED_DOWNLOADS", &cfg.Worker.AllowUnauthenticatedDownloads) + parseEnvVar("RENTERD_WORKER_UPLOAD_MAX_MEMORY", &cfg.Worker.UploadMaxMemory) parseEnvVar("RENTERD_AUTOPILOT_ENABLED", &cfg.Autopilot.Enabled) parseEnvVar("RENTERD_AUTOPILOT_REVISION_BROADCAST_INTERVAL", &cfg.Autopilot.RevisionBroadcastInterval) diff --git a/config/config.go b/config/config.go index 10b080020..95a56d191 100644 --- a/config/config.go +++ b/config/config.go @@ -98,6 +98,7 @@ type ( DownloadOverdriveTimeout time.Duration `yaml:"downloadOverdriveTimeout"` UploadOverdriveTimeout time.Duration `yaml:"uploadOverdriveTimeout"` DownloadMaxOverdrive uint64 `yaml:"downloadMaxOverdrive"` + UploadMaxMemory uint64 `yaml:"uploadMaxMemory"` UploadMaxOverdrive uint64 `yaml:"uploadMaxOverdrive"` AllowUnauthenticatedDownloads bool `yaml:"allowUnauthenticatedDownloads"` } diff --git a/internal/node/node.go b/internal/node/node.go index 1864effc8..b35868dca 100644 --- a/internal/node/node.go +++ b/internal/node/node.go @@ -176,7 +176,7 @@ func NewBus(cfg BusConfig, dir string, seed types.PrivateKey, l *zap.Logger) (ht func NewWorker(cfg config.Worker, b worker.Bus, seed types.PrivateKey, l *zap.Logger) (http.Handler, ShutdownFn, error) { workerKey := blake2b.Sum256(append([]byte("worker"), seed...)) - w, err := worker.New(workerKey, cfg.ID, b, cfg.ContractLockTimeout, cfg.BusFlushInterval, cfg.DownloadOverdriveTimeout, cfg.UploadOverdriveTimeout, cfg.DownloadMaxOverdrive, cfg.UploadMaxOverdrive, cfg.AllowPrivateIPs, l) + w, err := worker.New(workerKey, cfg.ID, b, cfg.ContractLockTimeout, cfg.BusFlushInterval, cfg.DownloadOverdriveTimeout, cfg.UploadOverdriveTimeout, cfg.DownloadMaxOverdrive, cfg.UploadMaxMemory, cfg.UploadMaxOverdrive, cfg.AllowPrivateIPs, l) if err != nil { return nil, nil, err } diff --git a/internal/testing/cluster.go b/internal/testing/cluster.go index 00aea17dc..c4c64b927 100644 --- a/internal/testing/cluster.go +++ b/internal/testing/cluster.go @@ -951,6 +951,7 @@ func testWorkerCfg() config.Worker { BusFlushInterval: testBusFlushInterval, DownloadOverdriveTimeout: 500 * time.Millisecond, UploadOverdriveTimeout: 500 * time.Millisecond, + UploadMaxMemory: 1 << 28, // 256 MiB UploadMaxOverdrive: 5, } } diff --git a/worker/memory.go b/worker/memory.go index ec8afcad9..1f408f5d6 100644 --- a/worker/memory.go +++ b/worker/memory.go @@ -2,6 +2,7 @@ package worker import ( "context" + "fmt" "sync" ) @@ -24,13 +25,16 @@ type ( } ) -func newMemoryManager() *memoryManager { +func newMemoryManager(maxMemory uint64) (*memoryManager, error) { + if maxMemory == 0 { + return nil, fmt.Errorf("maxMemory cannot be 0") + } mm := &memoryManager{ - totalAvailable: 1 << 30, // 1 GB + totalAvailable: maxMemory, } mm.available = mm.totalAvailable mm.sigNewMem = *sync.NewCond(&mm.mu) - return mm + return mm, nil } func (mm *memoryManager) AcquireMemory(ctx context.Context, amt uint64) <-chan *acquiredMemory { diff --git a/worker/upload.go b/worker/upload.go index 6291fd52d..276a907d1 100644 --- a/worker/upload.go +++ b/worker/upload.go @@ -236,12 +236,12 @@ type ( } ) -func (w *worker) initUploadManager(maxOverdrive uint64, overdriveTimeout time.Duration, logger *zap.SugaredLogger) { +func (w *worker) initUploadManager(mm *memoryManager, maxOverdrive uint64, overdriveTimeout time.Duration, logger *zap.SugaredLogger) { if w.uploadManager != nil { panic("upload manager already initialized") // developer error } - w.uploadManager = newUploadManager(w.bus, w, w, maxOverdrive, overdriveTimeout, logger) + w.uploadManager = newUploadManager(w.bus, w, w, mm, maxOverdrive, overdriveTimeout, logger) } func (w *worker) upload(ctx context.Context, r io.Reader, bucket, path string, opts ...UploadOption) (string, error) { @@ -490,13 +490,13 @@ func newDataPoints(halfLife time.Duration) *dataPoints { } } -func newUploadManager(b Bus, hp hostProvider, rl revisionLocker, maxOverdrive uint64, overdriveTimeout time.Duration, logger *zap.SugaredLogger) *uploadManager { +func newUploadManager(b Bus, hp hostProvider, rl revisionLocker, mm *memoryManager, maxOverdrive uint64, overdriveTimeout time.Duration, logger *zap.SugaredLogger) *uploadManager { return &uploadManager{ b: b, hp: hp, rl: rl, logger: logger, - mm: newMemoryManager(), + mm: mm, maxOverdrive: maxOverdrive, overdriveTimeout: overdriveTimeout, diff --git a/worker/worker.go b/worker/worker.go index b537d448e..1719ac9de 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -1364,7 +1364,7 @@ func (w *worker) stateHandlerGET(jc jape.Context) { } // New returns an HTTP handler that serves the worker API. -func New(masterKey [32]byte, id string, b Bus, contractLockingDuration, busFlushInterval, downloadOverdriveTimeout, uploadOverdriveTimeout time.Duration, downloadMaxOverdrive, uploadMaxOverdrive uint64, allowPrivateIPs bool, l *zap.Logger) (*worker, error) { +func New(masterKey [32]byte, id string, b Bus, contractLockingDuration, busFlushInterval, downloadOverdriveTimeout, uploadOverdriveTimeout time.Duration, downloadMaxOverdrive, uploadMaxMemory, uploadMaxOverdrive uint64, allowPrivateIPs bool, l *zap.Logger) (*worker, error) { if contractLockingDuration == 0 { return nil, errors.New("contract lock duration must be positive") } @@ -1395,7 +1395,11 @@ func New(masterKey [32]byte, id string, b Bus, contractLockingDuration, busFlush w.initContractSpendingRecorder() w.initPriceTables() w.initDownloadManager(downloadMaxOverdrive, downloadOverdriveTimeout, l.Sugar().Named("downloadmanager")) - w.initUploadManager(uploadMaxOverdrive, uploadOverdriveTimeout, l.Sugar().Named("uploadmanager")) + mm, err := newMemoryManager(uploadMaxMemory) + if err != nil { + return nil, err + } + w.initUploadManager(mm, uploadMaxOverdrive, uploadOverdriveTimeout, l.Sugar().Named("uploadmanager")) return w, nil } From 50f453e3d3bf9821890413ac2422238a0a7e93e6 Mon Sep 17 00:00:00 2001 From: Chris Schinnerl Date: Tue, 28 Nov 2023 09:43:23 +0100 Subject: [PATCH 10/13] worker: add memory endpoint --- api/worker.go | 9 +++++++++ worker/client/client.go | 6 ++++++ worker/memory.go | 11 +++++++++++ worker/worker.go | 8 ++++++++ 4 files changed, 34 insertions(+) diff --git a/api/worker.go b/api/worker.go index 1a5e77558..5b3ea14d1 100644 --- a/api/worker.go +++ b/api/worker.go @@ -51,6 +51,15 @@ type ( Error string `json:"error,omitempty"` } + MemoryResponse struct { + Upload MemoryStatus `json:"upload"` + } + + MemoryStatus struct { + Available uint64 `json:"available"` + Total uint64 `json:"total"` + } + // MigrateSlabResponse is the response type for the /slab/migrate endpoint. MigrateSlabResponse struct { NumShardsMigrated int `json:"numShardsMigrated"` diff --git a/worker/client/client.go b/worker/client/client.go index 8319d5c5c..a5979d124 100644 --- a/worker/client/client.go +++ b/worker/client/client.go @@ -136,6 +136,12 @@ func (c *Client) ID(ctx context.Context) (id string, err error) { return } +// Memory requests the /memory endpoint. +func (c *Client) Memory(ctx context.Context) (resp api.MemoryResponse, err error) { + err = c.c.WithContext(ctx).GET("/memory", &resp) + return +} + // MigrateSlab migrates the specified slab. func (c *Client) MigrateSlab(ctx context.Context, slab object.Slab, set string) (res api.MigrateSlabResponse, err error) { values := make(url.Values) diff --git a/worker/memory.go b/worker/memory.go index 1f408f5d6..7918e69c0 100644 --- a/worker/memory.go +++ b/worker/memory.go @@ -4,6 +4,8 @@ import ( "context" "fmt" "sync" + + "go.sia.tech/renterd/api" ) type ( @@ -37,6 +39,15 @@ func newMemoryManager(maxMemory uint64) (*memoryManager, error) { return mm, nil } +func (mm *memoryManager) Status() api.MemoryStatus { + mm.mu.Lock() + defer mm.mu.Unlock() + return api.MemoryStatus{ + Available: mm.available, + Total: mm.totalAvailable, + } +} + func (mm *memoryManager) AcquireMemory(ctx context.Context, amt uint64) <-chan *acquiredMemory { if amt == 0 { panic("cannot acquire 0 memory") diff --git a/worker/worker.go b/worker/worker.go index 1719ac9de..f28bffaa1 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -1340,6 +1340,12 @@ func (w *worker) idHandlerGET(jc jape.Context) { jc.Encode(w.id) } +func (w *worker) memoryGET(jc jape.Context) { + jc.Encode(api.MemoryResponse{ + Upload: w.uploadManager.mm.Status(), + }) +} + func (w *worker) accountHandlerGET(jc jape.Context) { var hostKey types.PublicKey if jc.DecodeParam("hostkey", &hostKey) != nil { @@ -1409,6 +1415,8 @@ func (w *worker) Handler() http.Handler { "GET /account/:hostkey": w.accountHandlerGET, "GET /id": w.idHandlerGET, + "GET /memory": w.memoryGET, + "GET /rhp/contracts": w.rhpContractsHandlerGET, "POST /rhp/contract/:id/broadcast": w.rhpBroadcastHandler, "POST /rhp/contract/:id/prune": w.rhpPruneContractHandlerPOST, From e6d1cc2c3e6153823650459cebc7b1623c206bfa Mon Sep 17 00:00:00 2001 From: Chris Schinnerl Date: Wed, 29 Nov 2023 10:23:44 +0100 Subject: [PATCH 11/13] worker: address comments --- worker/memory.go | 13 ++++++++++--- worker/upload.go | 4 ++-- worker/worker.go | 2 +- 3 files changed, 13 insertions(+), 6 deletions(-) diff --git a/worker/memory.go b/worker/memory.go index 7918e69c0..e6950b192 100644 --- a/worker/memory.go +++ b/worker/memory.go @@ -6,6 +6,7 @@ import ( "sync" "go.sia.tech/renterd/api" + "go.uber.org/zap" ) type ( @@ -13,6 +14,7 @@ type ( // uploads and downloads. memoryManager struct { totalAvailable uint64 + logger *zap.SugaredLogger mu sync.Mutex sigNewMem sync.Cond @@ -27,11 +29,12 @@ type ( } ) -func newMemoryManager(maxMemory uint64) (*memoryManager, error) { +func newMemoryManager(logger *zap.SugaredLogger, maxMemory uint64) (*memoryManager, error) { if maxMemory == 0 { return nil, fmt.Errorf("maxMemory cannot be 0") } mm := &memoryManager{ + logger: logger, totalAvailable: maxMemory, } mm.available = mm.totalAvailable @@ -49,10 +52,14 @@ func (mm *memoryManager) Status() api.MemoryStatus { } func (mm *memoryManager) AcquireMemory(ctx context.Context, amt uint64) <-chan *acquiredMemory { + memChan := make(chan *acquiredMemory, 1) if amt == 0 { - panic("cannot acquire 0 memory") + mm.logger.Fatal("cannot acquire 0 memory") + } else if mm.totalAvailable < amt { + mm.logger.Errorf("cannot acquire %v memory with only %v available", amt, mm.totalAvailable) + close(memChan) + return memChan } - memChan := make(chan *acquiredMemory, 1) // block until enough memory is available mm.sigNewMem.L.Lock() for mm.available < amt { diff --git a/worker/upload.go b/worker/upload.go index 4a0302e49..7fc060990 100644 --- a/worker/upload.go +++ b/worker/upload.go @@ -368,7 +368,7 @@ func (w *worker) uploadPackedSlabs(ctx context.Context, lockingDuration time.Dur errChan := make(chan error, 1) var wg sync.WaitGroup - redundantSize := uint64(rs.TotalShards) * rhpv2.SectorSize + totalSize := uint64(rs.TotalShards) * rhpv2.SectorSize LOOP: for { @@ -380,7 +380,7 @@ LOOP: break LOOP case err = <-errChan: break LOOP - case mem = <-w.uploadManager.mm.AcquireMemory(ctx, redundantSize): + case mem = <-w.uploadManager.mm.AcquireMemory(ctx, totalSize): } if mem == nil { break // interrupted diff --git a/worker/worker.go b/worker/worker.go index f28bffaa1..97d366c03 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -1401,7 +1401,7 @@ func New(masterKey [32]byte, id string, b Bus, contractLockingDuration, busFlush w.initContractSpendingRecorder() w.initPriceTables() w.initDownloadManager(downloadMaxOverdrive, downloadOverdriveTimeout, l.Sugar().Named("downloadmanager")) - mm, err := newMemoryManager(uploadMaxMemory) + mm, err := newMemoryManager(w.logger, uploadMaxMemory) if err != nil { return nil, err } From aecc9d1505c3a79040903ce47204523a60378df5 Mon Sep 17 00:00:00 2001 From: Chris Schinnerl Date: Wed, 29 Nov 2023 11:46:46 +0100 Subject: [PATCH 12/13] worker: change Acquire to not return a channel --- worker/memory.go | 16 +++---- worker/migrations.go | 5 ++- worker/upload.go | 101 ++++++++++++++++++++----------------------- 3 files changed, 57 insertions(+), 65 deletions(-) diff --git a/worker/memory.go b/worker/memory.go index e6950b192..cee82661d 100644 --- a/worker/memory.go +++ b/worker/memory.go @@ -51,14 +51,12 @@ func (mm *memoryManager) Status() api.MemoryStatus { } } -func (mm *memoryManager) AcquireMemory(ctx context.Context, amt uint64) <-chan *acquiredMemory { - memChan := make(chan *acquiredMemory, 1) +func (mm *memoryManager) AcquireMemory(ctx context.Context, amt uint64) *acquiredMemory { if amt == 0 { mm.logger.Fatal("cannot acquire 0 memory") } else if mm.totalAvailable < amt { mm.logger.Errorf("cannot acquire %v memory with only %v available", amt, mm.totalAvailable) - close(memChan) - return memChan + return nil } // block until enough memory is available mm.sigNewMem.L.Lock() @@ -69,22 +67,18 @@ func (mm *memoryManager) AcquireMemory(ctx context.Context, amt uint64) <-chan * select { case <-ctx.Done(): mm.sigNewMem.L.Unlock() - close(memChan) - return memChan + return nil default: } } mm.available -= amt mm.sigNewMem.L.Unlock() - memChan <- &acquiredMemory{ + mm.sigNewMem.Signal() // wake next goroutine + return &acquiredMemory{ mm: mm, remaining: amt, } - close(memChan) - - mm.sigNewMem.Signal() // wake next goroutine - return memChan } // release returns all the remaining memory to the memory manager. Should always diff --git a/worker/migrations.go b/worker/migrations.go index a822a4e55..e07d24ee7 100644 --- a/worker/migrations.go +++ b/worker/migrations.go @@ -84,7 +84,10 @@ SHARDS: } // acquire memory for the migration - mem := <-u.mm.AcquireMemory(ctx, uint64(len(shardIndices))*rhpv2.SectorSize) + mem := u.mm.AcquireMemory(ctx, uint64(len(shardIndices))*rhpv2.SectorSize) + if mem == nil { + return 0, false, fmt.Errorf("failed to acquire memory for migration") + } defer mem.Release() // download the slab diff --git a/worker/upload.go b/worker/upload.go index 7fc060990..c69421e37 100644 --- a/worker/upload.go +++ b/worker/upload.go @@ -365,23 +365,18 @@ func (w *worker) tryUploadPackedSlabs(ctx context.Context, rs api.RedundancySett func (w *worker) uploadPackedSlabs(ctx context.Context, lockingDuration time.Duration, rs api.RedundancySettings, contractSet string, lockPriority int) (uploaded int, err error) { // upload packed slabs var mu sync.Mutex - errChan := make(chan error, 1) + var errs error var wg sync.WaitGroup totalSize := uint64(rs.TotalShards) * rhpv2.SectorSize -LOOP: + // derive a context that we can use as an interrupt in case of an error. + interruptCtx, cancel := context.WithCancel(ctx) + defer cancel() + for { - // block until either an error occurs or the next slab is ready to be - // uploaded - var mem *acquiredMemory - select { - case <-ctx.Done(): - break LOOP - case err = <-errChan: - break LOOP - case mem = <-w.uploadManager.mm.AcquireMemory(ctx, totalSize): - } + // block until we have memory for a slab or until we are interrupted + mem := w.uploadManager.mm.AcquireMemory(interruptCtx, totalSize) if mem == nil { break // interrupted } @@ -392,10 +387,10 @@ LOOP: if err != nil { err = fmt.Errorf("couldn't fetch packed slabs from bus: %v", err) mem.Release() - break LOOP + break } else if len(packedSlabs) == 0 { mem.Release() - break LOOP // no more slabs + break // no more slabs } ps := packedSlabs[0] @@ -405,28 +400,22 @@ LOOP: defer mem.Release() defer wg.Done() err := w.uploadPackedSlab(ctx, ps, rs, contractSet, lockPriority, mem) + mu.Lock() if err != nil { - select { - case errChan <- err: - default: - } + errs = errors.Join(errs, err) + cancel() // prevent new uploads from being launched } else { - mu.Lock() uploaded++ - mu.Unlock() } + mu.Unlock() }(ps) } // wait for all threads to finish wg.Wait() - // check for any last errors - select { - case uploadErr := <-errChan: - err = errors.Join(err, uploadErr) - default: - } + // return collected errors + err = errors.Join(err, errs) return } @@ -582,24 +571,28 @@ func (mgr *uploadManager) Upload(ctx context.Context, r io.Reader, up uploadPara // create the response channel respChan := make(chan slabUploadResponse) - // collect the responses - var responses []slabUploadResponse - var slabIndex int - numSlabs := -1 + // channel to notify main thread of the number of slabs to wait for + numSlabsChan := make(chan int) // prepare slab size size := int64(up.rs.MinShards) * rhpv2.SectorSize redundantSize := uint64(up.rs.TotalShards) * rhpv2.SectorSize - // launch uploads and collect responses -loop: - for { - select { - case <-mgr.stopChan: - return object.Object{}, nil, "", errors.New("manager was stopped") - case mem := <-mgr.mm.AcquireMemory(ctx, redundantSize): + // launch uploads in a separate goroutine + stopCtx, cancel := context.WithCancel(ctx) + defer cancel() + go func() { + var slabIndex int + for { + select { + case <-mgr.stopChan: + return // interrupted + default: + } + // acquire memory + mem := mgr.mm.AcquireMemory(stopCtx, redundantSize) if mem == nil { - return object.Object{}, nil, "", errors.New("upload timed out") + return // interrupted } // read next slab's data data := make([]byte, size) @@ -607,18 +600,25 @@ loop: if err == io.EOF { if slabIndex == 0 { mem.Release() - break loop + return } - numSlabs = slabIndex + numSlabs := slabIndex if partialSlab != nil { numSlabs-- // don't wait on partial slab } mem.Release() - // no more data to upload - break loop + // no more data to upload, notify main thread of the number of + // slabs to wait for + numSlabsChan <- numSlabs + return } else if err != nil && err != io.ErrUnexpectedEOF { mem.Release() - return object.Object{}, nil, "", err + select { + case respChan <- slabUploadResponse{err: err}: + // notify main thread + case <-stopCtx.Done(): + } + return } if up.packing && errors.Is(err, io.ErrUnexpectedEOF) { // If uploadPacking is true, we return the partial slab without @@ -633,22 +633,17 @@ loop: }(up.rs, data, length, slabIndex) } slabIndex++ - case res := <-respChan: - if res.err != nil { - return object.Object{}, nil, "", res.err - } - responses = append(responses, res) - if len(responses) == numSlabs { - break loop - } } - } + }() - // keep collecting responses + // collect responses + var responses []slabUploadResponse + numSlabs := math.MaxInt32 for len(responses) < numSlabs { select { case <-mgr.stopChan: return object.Object{}, nil, "", errors.New("manager was stopped") + case numSlabs = <-numSlabsChan: case res := <-respChan: if res.err != nil { return object.Object{}, nil, "", res.err From ff147e8b12e4816b3e36619bf3d8af8d2d565861 Mon Sep 17 00:00:00 2001 From: Chris Schinnerl Date: Wed, 29 Nov 2023 11:52:34 +0100 Subject: [PATCH 13/13] worker: refactor upload loop --- worker/upload.go | 28 +++++++++++++--------------- 1 file changed, 13 insertions(+), 15 deletions(-) diff --git a/worker/upload.go b/worker/upload.go index c69421e37..258c4d2ab 100644 --- a/worker/upload.go +++ b/worker/upload.go @@ -572,7 +572,7 @@ func (mgr *uploadManager) Upload(ctx context.Context, r io.Reader, up uploadPara respChan := make(chan slabUploadResponse) // channel to notify main thread of the number of slabs to wait for - numSlabsChan := make(chan int) + numSlabsChan := make(chan int, 1) // prepare slab size size := int64(up.rs.MinShards) * rhpv2.SectorSize @@ -598,35 +598,33 @@ func (mgr *uploadManager) Upload(ctx context.Context, r io.Reader, up uploadPara data := make([]byte, size) length, err := io.ReadFull(io.LimitReader(cr, size), data) if err == io.EOF { - if slabIndex == 0 { - mem.Release() - return - } - numSlabs := slabIndex - if partialSlab != nil { - numSlabs-- // don't wait on partial slab - } mem.Release() + // no more data to upload, notify main thread of the number of // slabs to wait for + numSlabs := slabIndex + if partialSlab != nil && slabIndex > 0 { + numSlabs-- // don't wait on partial slab + } numSlabsChan <- numSlabs return } else if err != nil && err != io.ErrUnexpectedEOF { mem.Release() + + // unexpected error, notify main thread select { case respChan <- slabUploadResponse{err: err}: - // notify main thread case <-stopCtx.Done(): } return - } - if up.packing && errors.Is(err, io.ErrUnexpectedEOF) { - // If uploadPacking is true, we return the partial slab without + } else if up.packing && errors.Is(err, io.ErrUnexpectedEOF) { + mem.Release() + + // uploadPacking is true, we return the partial slab without // uploading. partialSlab = data[:length] - mem.Release() // trigger next iteration } else { - // Otherwise we upload it. + // regular upload go func(rs api.RedundancySettings, data []byte, length, slabIndex int) { u.uploadSlab(ctx, rs, data, length, slabIndex, respChan, mem) mem.Release()