From f857057c8435b10f1027f62a3b48aebbf373fd28 Mon Sep 17 00:00:00 2001 From: Chris Schinnerl Date: Fri, 1 Dec 2023 15:26:51 +0100 Subject: [PATCH] worker: remove nextSlabChan --- worker/download.go | 74 ++++++++++------------------------------------ 1 file changed, 16 insertions(+), 58 deletions(-) diff --git a/worker/download.go b/worker/download.go index 3a1c3e2cf..f4c0de137 100644 --- a/worker/download.go +++ b/worker/download.go @@ -87,9 +87,6 @@ type ( offset uint32 length uint32 - nextSlabChan chan struct{} - nextSlabTriggered bool - created time.Time overpay bool @@ -252,10 +249,6 @@ func (mgr *downloadManager) DownloadObject(ctx context.Context, w io.Writer, o o // create the cipher writer cw := o.Key.Decrypt(w, offset) - // create next slab chan - nextSlabChan := make(chan struct{}) - defer close(nextSlabChan) - // create response chan and ensure it's closed properly var wg sync.WaitGroup responseChan := make(chan *slabDownloadResponse) @@ -297,22 +290,23 @@ func (mgr *downloadManager) DownloadObject(ctx context.Context, w io.Writer, o o // acquire memory mem := mgr.mm.AcquireMemory(ctx, uint64(next.Length)) + if mem == nil { + return // interrupted + } // launch the download wg.Add(1) go func(index int) { - mgr.downloadSlab(ctx, id, next.SlabSlice, index, false, mem, responseChan, nextSlabChan) + resp := mgr.downloadSlab(ctx, id, next.SlabSlice, index, false, mem) + select { + case <-ctx.Done(): + mem.Release() + case responseChan <- resp: + } wg.Done() }(slabIndex) slabIndex++ } - - // block until we are ready for the next slab - select { - case <-ctx.Done(): - return - case nextSlabChan <- struct{}{}: - } } }() @@ -369,11 +363,6 @@ outer: delete(responses, respIndex) respIndex++ - select { - case <-nextSlabChan: - default: - } - continue } else { break @@ -425,29 +414,14 @@ func (mgr *downloadManager) DownloadSlab(ctx context.Context, slab object.Slab, id := newID() // download the slab - responseChan := make(chan *slabDownloadResponse) - nextSlabChan := make(chan struct{}) slice := object.SlabSlice{ Slab: slab, Offset: 0, Length: uint32(slab.MinShards) * rhpv2.SectorSize, } - go func() { - mgr.downloadSlab(ctx, id, slice, 0, true, mem, responseChan, nextSlabChan) - // NOTE: when downloading 1 slab we can simply close both channels - close(responseChan) - close(nextSlabChan) - }() - - // await the response - var resp *slabDownloadResponse - select { - case <-ctx.Done(): - return nil, false, ctx.Err() - case resp = <-responseChan: - if resp.err != nil { - return nil, false, resp.err - } + resp := mgr.downloadSlab(ctx, id, slice, 0, true, mem) + if resp.err != nil { + return nil, false, resp.err } // decrypt and recover @@ -541,7 +515,7 @@ func (mgr *downloadManager) refreshDownloaders(contracts []api.ContractMetadata) } } -func (mgr *downloadManager) newSlabDownload(ctx context.Context, dID id, slice object.SlabSlice, slabIndex int, migration bool, nextSlabChan chan struct{}) *slabDownload { +func (mgr *downloadManager) newSlabDownload(ctx context.Context, dID id, slice object.SlabSlice, slabIndex int, migration bool) *slabDownload { // calculate the offset and length offset, length := slice.SectorRegion() @@ -556,8 +530,6 @@ func (mgr *downloadManager) newSlabDownload(ctx context.Context, dID id, slice o mgr: mgr, slm: mgr.slm, - nextSlabChan: nextSlabChan, - index: slabIndex, minShards: int(slice.MinShards), offset: offset, @@ -574,13 +546,13 @@ func (mgr *downloadManager) newSlabDownload(ctx context.Context, dID id, slice o } } -func (mgr *downloadManager) downloadSlab(ctx context.Context, dID id, slice object.SlabSlice, index int, migration bool, mem *acquiredMemory, responseChan chan *slabDownloadResponse, nextSlabChan chan struct{}) { +func (mgr *downloadManager) downloadSlab(ctx context.Context, dID id, slice object.SlabSlice, index int, migration bool, mem *acquiredMemory) *slabDownloadResponse { // add tracing ctx, span := tracing.Tracer.Start(ctx, "downloadSlab") defer span.End() // prepare new download - slab := mgr.newSlabDownload(ctx, dID, slice, index, migration, nextSlabChan) + slab := mgr.newSlabDownload(ctx, dID, slice, index, migration) // execute download resp := &slabDownloadResponse{ @@ -588,12 +560,7 @@ func (mgr *downloadManager) downloadSlab(ctx context.Context, dID id, slice obje mem: mem, } resp.shards, resp.surchargeApplied, resp.err = slab.download(ctx) - - // send the response - select { - case <-ctx.Done(): - case responseChan <- resp: - } + return resp } func (d *downloader) stats() downloaderStats { @@ -1196,15 +1163,6 @@ func (s *slabDownload) receive(resp sectorDownloadResp) (finished bool) { return false } - // try trigger next slab read - if !s.nextSlabTriggered && s.numCompleted+int(s.mgr.maxOverdrive) >= s.minShards { - select { - case <-s.nextSlabChan: - s.nextSlabTriggered = true - default: - } - } - // update num overpaid if resp.req.overpay { s.numOverpaid++