Skip to content

Commit

Permalink
worker: remove nextSlabChan
Browse files Browse the repository at this point in the history
  • Loading branch information
ChrisSchinnerl committed Dec 1, 2023
1 parent e6fe78b commit f857057
Showing 1 changed file with 16 additions and 58 deletions.
74 changes: 16 additions & 58 deletions worker/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,6 @@ type (
offset uint32
length uint32

nextSlabChan chan struct{}
nextSlabTriggered bool

created time.Time
overpay bool

Expand Down Expand Up @@ -252,10 +249,6 @@ func (mgr *downloadManager) DownloadObject(ctx context.Context, w io.Writer, o o
// create the cipher writer
cw := o.Key.Decrypt(w, offset)

// create next slab chan
nextSlabChan := make(chan struct{})
defer close(nextSlabChan)

// create response chan and ensure it's closed properly
var wg sync.WaitGroup
responseChan := make(chan *slabDownloadResponse)
Expand Down Expand Up @@ -297,22 +290,23 @@ func (mgr *downloadManager) DownloadObject(ctx context.Context, w io.Writer, o o

// acquire memory
mem := mgr.mm.AcquireMemory(ctx, uint64(next.Length))
if mem == nil {
return // interrupted
}

// launch the download
wg.Add(1)
go func(index int) {
mgr.downloadSlab(ctx, id, next.SlabSlice, index, false, mem, responseChan, nextSlabChan)
resp := mgr.downloadSlab(ctx, id, next.SlabSlice, index, false, mem)
select {
case <-ctx.Done():
mem.Release()
case responseChan <- resp:
}
wg.Done()
}(slabIndex)
slabIndex++
}

// block until we are ready for the next slab
select {
case <-ctx.Done():
return
case nextSlabChan <- struct{}{}:
}
}
}()

Expand Down Expand Up @@ -369,11 +363,6 @@ outer:
delete(responses, respIndex)
respIndex++

select {
case <-nextSlabChan:
default:
}

continue
} else {
break
Expand Down Expand Up @@ -425,29 +414,14 @@ func (mgr *downloadManager) DownloadSlab(ctx context.Context, slab object.Slab,
id := newID()

// download the slab
responseChan := make(chan *slabDownloadResponse)
nextSlabChan := make(chan struct{})
slice := object.SlabSlice{
Slab: slab,
Offset: 0,
Length: uint32(slab.MinShards) * rhpv2.SectorSize,
}
go func() {
mgr.downloadSlab(ctx, id, slice, 0, true, mem, responseChan, nextSlabChan)
// NOTE: when downloading 1 slab we can simply close both channels
close(responseChan)
close(nextSlabChan)
}()

// await the response
var resp *slabDownloadResponse
select {
case <-ctx.Done():
return nil, false, ctx.Err()
case resp = <-responseChan:
if resp.err != nil {
return nil, false, resp.err
}
resp := mgr.downloadSlab(ctx, id, slice, 0, true, mem)
if resp.err != nil {
return nil, false, resp.err
}

// decrypt and recover
Expand Down Expand Up @@ -541,7 +515,7 @@ func (mgr *downloadManager) refreshDownloaders(contracts []api.ContractMetadata)
}
}

func (mgr *downloadManager) newSlabDownload(ctx context.Context, dID id, slice object.SlabSlice, slabIndex int, migration bool, nextSlabChan chan struct{}) *slabDownload {
func (mgr *downloadManager) newSlabDownload(ctx context.Context, dID id, slice object.SlabSlice, slabIndex int, migration bool) *slabDownload {
// calculate the offset and length
offset, length := slice.SectorRegion()

Expand All @@ -556,8 +530,6 @@ func (mgr *downloadManager) newSlabDownload(ctx context.Context, dID id, slice o
mgr: mgr,
slm: mgr.slm,

nextSlabChan: nextSlabChan,

index: slabIndex,
minShards: int(slice.MinShards),
offset: offset,
Expand All @@ -574,26 +546,21 @@ func (mgr *downloadManager) newSlabDownload(ctx context.Context, dID id, slice o
}
}

func (mgr *downloadManager) downloadSlab(ctx context.Context, dID id, slice object.SlabSlice, index int, migration bool, mem *acquiredMemory, responseChan chan *slabDownloadResponse, nextSlabChan chan struct{}) {
func (mgr *downloadManager) downloadSlab(ctx context.Context, dID id, slice object.SlabSlice, index int, migration bool, mem *acquiredMemory) *slabDownloadResponse {
// add tracing
ctx, span := tracing.Tracer.Start(ctx, "downloadSlab")
defer span.End()

// prepare new download
slab := mgr.newSlabDownload(ctx, dID, slice, index, migration, nextSlabChan)
slab := mgr.newSlabDownload(ctx, dID, slice, index, migration)

// execute download
resp := &slabDownloadResponse{
index: index,
mem: mem,
}
resp.shards, resp.surchargeApplied, resp.err = slab.download(ctx)

// send the response
select {
case <-ctx.Done():
case responseChan <- resp:
}
return resp
}

func (d *downloader) stats() downloaderStats {
Expand Down Expand Up @@ -1196,15 +1163,6 @@ func (s *slabDownload) receive(resp sectorDownloadResp) (finished bool) {
return false
}

// try trigger next slab read
if !s.nextSlabTriggered && s.numCompleted+int(s.mgr.maxOverdrive) >= s.minShards {
select {
case <-s.nextSlabChan:
s.nextSlabTriggered = true
default:
}
}

// update num overpaid
if resp.req.overpay {
s.numOverpaid++
Expand Down

0 comments on commit f857057

Please sign in to comment.