diff --git a/api/worker.go b/api/worker.go index 5b3ea14d1..a86edb446 100644 --- a/api/worker.go +++ b/api/worker.go @@ -52,7 +52,8 @@ type ( } MemoryResponse struct { - Upload MemoryStatus `json:"upload"` + Download MemoryStatus `json:"download"` + Upload MemoryStatus `json:"upload"` } MemoryStatus struct { diff --git a/cmd/renterd/main.go b/cmd/renterd/main.go index 7b468f38d..996faa763 100644 --- a/cmd/renterd/main.go +++ b/cmd/renterd/main.go @@ -94,6 +94,7 @@ var ( DownloadMaxOverdrive: 5, DownloadOverdriveTimeout: 3 * time.Second, + DownloadMaxMemory: 1 << 30, // 1 GiB UploadMaxMemory: 1 << 30, // 1 GiB UploadMaxOverdrive: 5, UploadOverdriveTimeout: 3 * time.Second, diff --git a/config/config.go b/config/config.go index 95a56d191..89e12d8e8 100644 --- a/config/config.go +++ b/config/config.go @@ -98,6 +98,7 @@ type ( DownloadOverdriveTimeout time.Duration `yaml:"downloadOverdriveTimeout"` UploadOverdriveTimeout time.Duration `yaml:"uploadOverdriveTimeout"` DownloadMaxOverdrive uint64 `yaml:"downloadMaxOverdrive"` + DownloadMaxMemory uint64 `yaml:"downloadMaxMemory"` UploadMaxMemory uint64 `yaml:"uploadMaxMemory"` UploadMaxOverdrive uint64 `yaml:"uploadMaxOverdrive"` AllowUnauthenticatedDownloads bool `yaml:"allowUnauthenticatedDownloads"` diff --git a/internal/node/node.go b/internal/node/node.go index a20cf8575..6089e0560 100644 --- a/internal/node/node.go +++ b/internal/node/node.go @@ -179,7 +179,7 @@ func NewBus(cfg BusConfig, dir string, seed types.PrivateKey, l *zap.Logger) (ht func NewWorker(cfg config.Worker, b worker.Bus, seed types.PrivateKey, l *zap.Logger) (http.Handler, ShutdownFn, error) { workerKey := blake2b.Sum256(append([]byte("worker"), seed...)) - w, err := worker.New(workerKey, cfg.ID, b, cfg.ContractLockTimeout, cfg.BusFlushInterval, cfg.DownloadOverdriveTimeout, cfg.UploadOverdriveTimeout, cfg.DownloadMaxOverdrive, cfg.UploadMaxMemory, cfg.UploadMaxOverdrive, cfg.AllowPrivateIPs, l) + w, err := worker.New(workerKey, cfg.ID, b, cfg.ContractLockTimeout, cfg.BusFlushInterval, cfg.DownloadOverdriveTimeout, cfg.UploadOverdriveTimeout, cfg.DownloadMaxOverdrive, cfg.DownloadMaxMemory, cfg.UploadMaxMemory, cfg.UploadMaxOverdrive, cfg.AllowPrivateIPs, l) if err != nil { return nil, nil, err } diff --git a/internal/testing/cluster.go b/internal/testing/cluster.go index a79e35223..f1748d015 100644 --- a/internal/testing/cluster.go +++ b/internal/testing/cluster.go @@ -952,6 +952,7 @@ func testWorkerCfg() config.Worker { BusFlushInterval: testBusFlushInterval, DownloadOverdriveTimeout: 500 * time.Millisecond, UploadOverdriveTimeout: 500 * time.Millisecond, + DownloadMaxMemory: 1 << 28, // 256 MiB UploadMaxMemory: 1 << 28, // 256 MiB UploadMaxOverdrive: 5, } diff --git a/worker/download.go b/worker/download.go index 68dfac252..774a0ba6f 100644 --- a/worker/download.go +++ b/worker/download.go @@ -1,6 +1,7 @@ package worker import ( + "bufio" "bytes" "context" "errors" @@ -39,6 +40,7 @@ type ( id [8]byte downloadManager struct { + mm *memoryManager hp hostProvider pss partialSlabStore slm sectorLostMarker @@ -86,14 +88,10 @@ type ( mgr *downloadManager slm sectorLostMarker - index int minShards int offset uint32 length uint32 - nextSlabChan chan struct{} - nextSlabTriggered bool - created time.Time overpay bool @@ -115,6 +113,7 @@ type ( } slabDownloadResponse struct { + mem *acquiredMemory surchargeApplied bool shards [][]byte index int @@ -160,17 +159,18 @@ type ( } ) -func (w *worker) initDownloadManager(maxOverdrive uint64, overdriveTimeout time.Duration, logger *zap.SugaredLogger) { +func (w *worker) initDownloadManager(mm *memoryManager, maxOverdrive uint64, overdriveTimeout time.Duration, logger *zap.SugaredLogger) { if w.downloadManager != nil { panic("download manager already initialized") // developer error } - w.downloadManager = newDownloadManager(w, w, w.bus, maxOverdrive, overdriveTimeout, logger) + w.downloadManager = newDownloadManager(w, w, mm, w.bus, maxOverdrive, overdriveTimeout, logger) } -func newDownloadManager(hp hostProvider, pss partialSlabStore, slm sectorLostMarker, maxOverdrive uint64, overdriveTimeout time.Duration, logger *zap.SugaredLogger) *downloadManager { +func newDownloadManager(hp hostProvider, pss partialSlabStore, mm *memoryManager, slm sectorLostMarker, maxOverdrive uint64, overdriveTimeout time.Duration, logger *zap.SugaredLogger) *downloadManager { return &downloadManager{ hp: hp, + mm: mm, pss: pss, slm: slm, logger: logger, @@ -251,71 +251,94 @@ func (mgr *downloadManager) DownloadObject(ctx context.Context, w io.Writer, o o hosts[c.HostKey] = struct{}{} } - // create the cipher writer - cw := o.Key.Decrypt(w, offset) + // buffer the writer + bw := bufio.NewWriter(w) + defer bw.Flush() - // create next slab chan - nextSlabChan := make(chan struct{}) - defer close(nextSlabChan) + // create the cipher writer + cw := o.Key.Decrypt(bw, offset) // create response chan and ensure it's closed properly var wg sync.WaitGroup responseChan := make(chan *slabDownloadResponse) + concurrentSlabsChan := make(chan struct{}, maxConcurrentSlabsPerDownload) ctx, cancel := context.WithCancel(ctx) defer func() { cancel() wg.Wait() close(responseChan) + + DRAIN_LOOP: + for { + select { + case <-concurrentSlabsChan: + default: + break DRAIN_LOOP + } + } + close(concurrentSlabsChan) }() // launch a goroutine to launch consecutive slab downloads - var concurrentSlabs uint64 wg.Add(1) go func() { defer wg.Done() var slabIndex int - for { - if slabIndex < len(slabs) && atomic.LoadUint64(&concurrentSlabs) < maxConcurrentSlabsPerDownload { - next := slabs[slabIndex] - - // check if the next slab is a partial slab. - if next.PartialSlab { - responseChan <- &slabDownloadResponse{index: slabIndex} - slabIndex++ - atomic.AddUint64(&concurrentSlabs, 1) - continue // handle partial slab separately - } + for slabIndex = 0; slabIndex < len(slabs); slabIndex++ { + next := slabs[slabIndex] - // check if we have enough downloaders - var available uint8 - for _, s := range next.Shards { - if _, exists := hosts[s.LatestHost]; exists { - available++ - } - } - if available < next.MinShards { - responseChan <- &slabDownloadResponse{err: fmt.Errorf("not enough hosts available to download the slab: %v/%v", available, next.MinShards)} - return - } + // check if we need to abort + select { + case <-ctx.Done(): + return + case <-mgr.stopChan: + return + case concurrentSlabsChan <- struct{}{}: + } - // launch the download - wg.Add(1) - go func(index int) { - mgr.downloadSlab(ctx, id, next.SlabSlice, index, false, responseChan, nextSlabChan) - wg.Done() - }(slabIndex) - atomic.AddUint64(&concurrentSlabs, 1) - slabIndex++ + // check if the next slab is a partial slab. + if next.PartialSlab { + responseChan <- &slabDownloadResponse{index: slabIndex} + continue // handle partial slab separately } - // block until we are ready for the next slab - select { - case <-ctx.Done(): + // check if we have enough downloaders + var available uint8 + for _, s := range next.Shards { + if _, exists := hosts[s.LatestHost]; exists { + available++ + } + } + if available < next.MinShards { + responseChan <- &slabDownloadResponse{err: fmt.Errorf("not enough hosts available to download the slab: %v/%v", available, next.MinShards)} return - case nextSlabChan <- struct{}{}: } + + // acquire memory + mem := mgr.mm.AcquireMemory(ctx, uint64(next.Length)) + if mem == nil { + return // interrupted + } + + // launch the download + wg.Add(1) + go func(index int) { + defer wg.Done() + shards, surchargeApplied, err := mgr.downloadSlab(ctx, id, next.SlabSlice, false) + select { + case responseChan <- &slabDownloadResponse{ + mem: mem, + surchargeApplied: surchargeApplied, + shards: shards, + index: index, + err: err, + }: + case <-ctx.Done(): + mem.Release() // relase memory if we're interrupted + } + }(slabIndex) } }() @@ -325,13 +348,26 @@ func (mgr *downloadManager) DownloadObject(ctx context.Context, w io.Writer, o o var respIndex int outer: for { + var resp *slabDownloadResponse select { case <-mgr.stopChan: return errDownloadManagerStopped case <-ctx.Done(): return errors.New("download timed out") - case resp := <-responseChan: - atomic.AddUint64(&concurrentSlabs, ^uint64(0)) + case resp = <-responseChan: + } + + // handle response + err := func() error { + if resp.mem != nil { + defer resp.mem.Release() + } + defer func() { + select { + case <-concurrentSlabsChan: + default: + } + }() if resp.err != nil { mgr.logger.Errorf("download slab %v failed, overpaid %v: %v", resp.index, resp.surchargeApplied, resp.err) @@ -365,21 +401,20 @@ outer: delete(responses, respIndex) respIndex++ - select { - case <-nextSlabChan: - default: - } - continue } else { break } } + return nil + }() + if err != nil { + return err + } - // exit condition - if respIndex == len(slabs) { - break outer - } + // exit condition + if respIndex == len(slabs) { + break outer } } @@ -409,43 +444,31 @@ func (mgr *downloadManager) DownloadSlab(ctx context.Context, slab object.Slab, return nil, false, fmt.Errorf("not enough hosts available to download the slab: %v/%v", availableShards, slab.MinShards) } + // NOTE: we don't acquire memory here since DownloadSlab is only used for + // migrations which already have memory acquired + // create identifier id := newID() // download the slab - responseChan := make(chan *slabDownloadResponse) - nextSlabChan := make(chan struct{}) slice := object.SlabSlice{ Slab: slab, Offset: 0, Length: uint32(slab.MinShards) * rhpv2.SectorSize, } - go func() { - mgr.downloadSlab(ctx, id, slice, 0, true, responseChan, nextSlabChan) - // NOTE: when downloading 1 slab we can simply close both channels - close(responseChan) - close(nextSlabChan) - }() - - // await the response - var resp *slabDownloadResponse - select { - case <-ctx.Done(): - return nil, false, ctx.Err() - case resp = <-responseChan: - if resp.err != nil { - return nil, false, resp.err - } + shards, surchargeApplied, err := mgr.downloadSlab(ctx, id, slice, true) + if err != nil { + return nil, false, err } // decrypt and recover - slice.Decrypt(resp.shards) - err := slice.Reconstruct(resp.shards) + slice.Decrypt(shards) + err = slice.Reconstruct(shards) if err != nil { return nil, false, err } - return resp.shards, resp.surchargeApplied, err + return shards, surchargeApplied, err } func (mgr *downloadManager) Stats() downloadManagerStats { @@ -529,7 +552,7 @@ func (mgr *downloadManager) refreshDownloaders(contracts []api.ContractMetadata) } } -func (mgr *downloadManager) newSlabDownload(ctx context.Context, dID id, slice object.SlabSlice, slabIndex int, migration bool, nextSlabChan chan struct{}) *slabDownload { +func (mgr *downloadManager) newSlabDownload(ctx context.Context, dID id, slice object.SlabSlice, migration bool) *slabDownload { // calculate the offset and length offset, length := slice.SectorRegion() @@ -544,9 +567,6 @@ func (mgr *downloadManager) newSlabDownload(ctx context.Context, dID id, slice o mgr: mgr, slm: mgr.slm, - nextSlabChan: nextSlabChan, - - index: slabIndex, minShards: int(slice.MinShards), offset: offset, length: length, @@ -562,23 +582,16 @@ func (mgr *downloadManager) newSlabDownload(ctx context.Context, dID id, slice o } } -func (mgr *downloadManager) downloadSlab(ctx context.Context, dID id, slice object.SlabSlice, index int, migration bool, responseChan chan *slabDownloadResponse, nextSlabChan chan struct{}) { +func (mgr *downloadManager) downloadSlab(ctx context.Context, dID id, slice object.SlabSlice, migration bool) ([][]byte, bool, error) { // add tracing ctx, span := tracing.Tracer.Start(ctx, "downloadSlab") defer span.End() // prepare new download - slab := mgr.newSlabDownload(ctx, dID, slice, index, migration, nextSlabChan) + slab := mgr.newSlabDownload(ctx, dID, slice, migration) // execute download - resp := &slabDownloadResponse{index: index} - resp.shards, resp.surchargeApplied, resp.err = slab.download(ctx) - - // send the response - select { - case <-ctx.Done(): - case responseChan <- resp: - } + return slab.download(ctx) } func (d *downloader) stats() downloaderStats { @@ -1181,15 +1194,6 @@ func (s *slabDownload) receive(resp sectorDownloadResp) (finished bool) { return false } - // try trigger next slab read - if !s.nextSlabTriggered && s.numCompleted+int(s.mgr.maxOverdrive) >= s.minShards { - select { - case <-s.nextSlabChan: - s.nextSlabTriggered = true - default: - } - } - // update num overpaid if resp.req.overpay { s.numOverpaid++ diff --git a/worker/worker.go b/worker/worker.go index 4f0397497..0a9f4784c 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -1378,7 +1378,8 @@ func (w *worker) idHandlerGET(jc jape.Context) { func (w *worker) memoryGET(jc jape.Context) { jc.Encode(api.MemoryResponse{ - Upload: w.uploadManager.mm.Status(), + Download: w.downloadManager.mm.Status(), + Upload: w.uploadManager.mm.Status(), }) } @@ -1406,7 +1407,7 @@ func (w *worker) stateHandlerGET(jc jape.Context) { } // New returns an HTTP handler that serves the worker API. -func New(masterKey [32]byte, id string, b Bus, contractLockingDuration, busFlushInterval, downloadOverdriveTimeout, uploadOverdriveTimeout time.Duration, downloadMaxOverdrive, uploadMaxMemory, uploadMaxOverdrive uint64, allowPrivateIPs bool, l *zap.Logger) (*worker, error) { +func New(masterKey [32]byte, id string, b Bus, contractLockingDuration, busFlushInterval, downloadOverdriveTimeout, uploadOverdriveTimeout time.Duration, downloadMaxOverdrive, downloadMaxMemory, uploadMaxMemory, uploadMaxOverdrive uint64, allowPrivateIPs bool, l *zap.Logger) (*worker, error) { if contractLockingDuration == 0 { return nil, errors.New("contract lock duration must be positive") } @@ -1436,12 +1437,16 @@ func New(masterKey [32]byte, id string, b Bus, contractLockingDuration, busFlush w.initAccounts(b) w.initContractSpendingRecorder() w.initPriceTables() - w.initDownloadManager(downloadMaxOverdrive, downloadOverdriveTimeout, l.Sugar().Named("downloadmanager")) - mm, err := newMemoryManager(w.logger, uploadMaxMemory) + dmm, err := newMemoryManager(w.logger, downloadMaxMemory) if err != nil { return nil, err } - w.initUploadManager(mm, uploadMaxOverdrive, uploadOverdriveTimeout, l.Sugar().Named("uploadmanager")) + w.initDownloadManager(dmm, downloadMaxOverdrive, downloadOverdriveTimeout, l.Sugar().Named("downloadmanager")) + umm, err := newMemoryManager(w.logger, uploadMaxMemory) + if err != nil { + return nil, err + } + w.initUploadManager(umm, uploadMaxOverdrive, uploadOverdriveTimeout, l.Sugar().Named("uploadmanager")) return w, nil }