diff --git a/api/worker.go b/api/worker.go index 1a5e77558..5b3ea14d1 100644 --- a/api/worker.go +++ b/api/worker.go @@ -51,6 +51,15 @@ type ( Error string `json:"error,omitempty"` } + MemoryResponse struct { + Upload MemoryStatus `json:"upload"` + } + + MemoryStatus struct { + Available uint64 `json:"available"` + Total uint64 `json:"total"` + } + // MigrateSlabResponse is the response type for the /slab/migrate endpoint. MigrateSlabResponse struct { NumShardsMigrated int `json:"numShardsMigrated"` diff --git a/cmd/renterd/main.go b/cmd/renterd/main.go index cee7b79cc..a37e43cc5 100644 --- a/cmd/renterd/main.go +++ b/cmd/renterd/main.go @@ -94,6 +94,7 @@ var ( DownloadMaxOverdrive: 5, DownloadOverdriveTimeout: 3 * time.Second, + UploadMaxMemory: 1 << 30, // 1 GiB UploadMaxOverdrive: 5, UploadOverdriveTimeout: 3 * time.Second, }, @@ -250,6 +251,7 @@ func main() { flag.Uint64Var(&cfg.Worker.DownloadMaxOverdrive, "worker.downloadMaxOverdrive", cfg.Worker.DownloadMaxOverdrive, "maximum number of active overdrive workers when downloading a slab") flag.StringVar(&cfg.Worker.ID, "worker.id", cfg.Worker.ID, "unique identifier of worker used internally - can be overwritten using the RENTERD_WORKER_ID environment variable") flag.DurationVar(&cfg.Worker.DownloadOverdriveTimeout, "worker.downloadOverdriveTimeout", cfg.Worker.DownloadOverdriveTimeout, "timeout applied to slab downloads that decides when we start overdriving") + flag.Uint64Var(&cfg.Worker.UploadMaxMemory, "worker.uploadMaxMemory", cfg.Worker.UploadMaxMemory, "maximum amount of ram the worker allocates for slabs when uploading - can be overwritten using the RENTERD_WORKER_UPLOAD_MAX_MEMORY environment variable") flag.Uint64Var(&cfg.Worker.UploadMaxOverdrive, "worker.uploadMaxOverdrive", cfg.Worker.UploadMaxOverdrive, "maximum number of active overdrive workers when uploading a slab") flag.DurationVar(&cfg.Worker.UploadOverdriveTimeout, "worker.uploadOverdriveTimeout", cfg.Worker.UploadOverdriveTimeout, "timeout applied to slab uploads that decides when we start overdriving") flag.BoolVar(&cfg.Worker.Enabled, "worker.enabled", cfg.Worker.Enabled, "enable/disable creating a worker - can be overwritten using the RENTERD_WORKER_ENABLED environment variable") @@ -315,6 +317,7 @@ func main() { parseEnvVar("RENTERD_WORKER_ENABLED", &cfg.Worker.Enabled) parseEnvVar("RENTERD_WORKER_ID", &cfg.Worker.ID) parseEnvVar("RENTERD_WORKER_UNAUTHENTICATED_DOWNLOADS", &cfg.Worker.AllowUnauthenticatedDownloads) + parseEnvVar("RENTERD_WORKER_UPLOAD_MAX_MEMORY", &cfg.Worker.UploadMaxMemory) parseEnvVar("RENTERD_AUTOPILOT_ENABLED", &cfg.Autopilot.Enabled) parseEnvVar("RENTERD_AUTOPILOT_REVISION_BROADCAST_INTERVAL", &cfg.Autopilot.RevisionBroadcastInterval) diff --git a/config/config.go b/config/config.go index 10b080020..95a56d191 100644 --- a/config/config.go +++ b/config/config.go @@ -98,6 +98,7 @@ type ( DownloadOverdriveTimeout time.Duration `yaml:"downloadOverdriveTimeout"` UploadOverdriveTimeout time.Duration `yaml:"uploadOverdriveTimeout"` DownloadMaxOverdrive uint64 `yaml:"downloadMaxOverdrive"` + UploadMaxMemory uint64 `yaml:"uploadMaxMemory"` UploadMaxOverdrive uint64 `yaml:"uploadMaxOverdrive"` AllowUnauthenticatedDownloads bool `yaml:"allowUnauthenticatedDownloads"` } diff --git a/internal/node/node.go b/internal/node/node.go index 1864effc8..b35868dca 100644 --- a/internal/node/node.go +++ b/internal/node/node.go @@ -176,7 +176,7 @@ func NewBus(cfg BusConfig, dir string, seed types.PrivateKey, l *zap.Logger) (ht func NewWorker(cfg config.Worker, b worker.Bus, seed types.PrivateKey, l *zap.Logger) (http.Handler, ShutdownFn, error) { workerKey := blake2b.Sum256(append([]byte("worker"), seed...)) - w, err := worker.New(workerKey, cfg.ID, b, cfg.ContractLockTimeout, cfg.BusFlushInterval, cfg.DownloadOverdriveTimeout, cfg.UploadOverdriveTimeout, cfg.DownloadMaxOverdrive, cfg.UploadMaxOverdrive, cfg.AllowPrivateIPs, l) + w, err := worker.New(workerKey, cfg.ID, b, cfg.ContractLockTimeout, cfg.BusFlushInterval, cfg.DownloadOverdriveTimeout, cfg.UploadOverdriveTimeout, cfg.DownloadMaxOverdrive, cfg.UploadMaxMemory, cfg.UploadMaxOverdrive, cfg.AllowPrivateIPs, l) if err != nil { return nil, nil, err } diff --git a/internal/testing/cluster.go b/internal/testing/cluster.go index 4edccd18f..a79e35223 100644 --- a/internal/testing/cluster.go +++ b/internal/testing/cluster.go @@ -952,6 +952,7 @@ func testWorkerCfg() config.Worker { BusFlushInterval: testBusFlushInterval, DownloadOverdriveTimeout: 500 * time.Millisecond, UploadOverdriveTimeout: 500 * time.Millisecond, + UploadMaxMemory: 1 << 28, // 256 MiB UploadMaxOverdrive: 5, } } diff --git a/worker/client/client.go b/worker/client/client.go index 8319d5c5c..a5979d124 100644 --- a/worker/client/client.go +++ b/worker/client/client.go @@ -136,6 +136,12 @@ func (c *Client) ID(ctx context.Context) (id string, err error) { return } +// Memory requests the /memory endpoint. +func (c *Client) Memory(ctx context.Context) (resp api.MemoryResponse, err error) { + err = c.c.WithContext(ctx).GET("/memory", &resp) + return +} + // MigrateSlab migrates the specified slab. func (c *Client) MigrateSlab(ctx context.Context, slab object.Slab, set string) (res api.MigrateSlabResponse, err error) { values := make(url.Values) diff --git a/worker/memory.go b/worker/memory.go new file mode 100644 index 000000000..cee82661d --- /dev/null +++ b/worker/memory.go @@ -0,0 +1,113 @@ +package worker + +import ( + "context" + "fmt" + "sync" + + "go.sia.tech/renterd/api" + "go.uber.org/zap" +) + +type ( + // memoryManager helps regulate processes that use a lot of memory. Such as + // uploads and downloads. + memoryManager struct { + totalAvailable uint64 + logger *zap.SugaredLogger + + mu sync.Mutex + sigNewMem sync.Cond + available uint64 + } + + acquiredMemory struct { + mm *memoryManager + + mu sync.Mutex + remaining uint64 + } +) + +func newMemoryManager(logger *zap.SugaredLogger, maxMemory uint64) (*memoryManager, error) { + if maxMemory == 0 { + return nil, fmt.Errorf("maxMemory cannot be 0") + } + mm := &memoryManager{ + logger: logger, + totalAvailable: maxMemory, + } + mm.available = mm.totalAvailable + mm.sigNewMem = *sync.NewCond(&mm.mu) + return mm, nil +} + +func (mm *memoryManager) Status() api.MemoryStatus { + mm.mu.Lock() + defer mm.mu.Unlock() + return api.MemoryStatus{ + Available: mm.available, + Total: mm.totalAvailable, + } +} + +func (mm *memoryManager) AcquireMemory(ctx context.Context, amt uint64) *acquiredMemory { + if amt == 0 { + mm.logger.Fatal("cannot acquire 0 memory") + } else if mm.totalAvailable < amt { + mm.logger.Errorf("cannot acquire %v memory with only %v available", amt, mm.totalAvailable) + return nil + } + // block until enough memory is available + mm.sigNewMem.L.Lock() + for mm.available < amt { + mm.sigNewMem.Wait() + + // check if the context was canceled in the meantime + select { + case <-ctx.Done(): + mm.sigNewMem.L.Unlock() + return nil + default: + } + } + mm.available -= amt + mm.sigNewMem.L.Unlock() + + mm.sigNewMem.Signal() // wake next goroutine + return &acquiredMemory{ + mm: mm, + remaining: amt, + } +} + +// release returns all the remaining memory to the memory manager. Should always +// be called on every acquiredMemory when done using it. +func (am *acquiredMemory) Release() { + am.mm.sigNewMem.L.Lock() + am.mm.available += am.remaining + am.mm.sigNewMem.L.Unlock() + + am.mu.Lock() + am.remaining = 0 + am.mu.Unlock() + + am.mm.sigNewMem.Signal() // wake next goroutine +} + +// ReleaseSome releases some of the remaining memory to the memory manager. +// Panics if more memory is released than was acquired. +func (am *acquiredMemory) ReleaseSome(amt uint64) { + am.mm.sigNewMem.L.Lock() + if amt > am.remaining { + panic("releasing more memory than remaining") + } + am.mm.available += amt + am.mm.sigNewMem.L.Unlock() + + am.mu.Lock() + am.remaining -= amt + am.mu.Unlock() + + am.mm.sigNewMem.Signal() // wake next goroutine +} diff --git a/worker/migrations.go b/worker/migrations.go index e8ce000de..e07d24ee7 100644 --- a/worker/migrations.go +++ b/worker/migrations.go @@ -4,6 +4,7 @@ import ( "context" "fmt" + rhpv2 "go.sia.tech/core/rhp/v2" "go.sia.tech/core/types" "go.sia.tech/renterd/api" "go.sia.tech/renterd/object" @@ -82,6 +83,13 @@ SHARDS: return 0, false, fmt.Errorf("not enough hosts to download unhealthy shard, %d<%d", len(s.Shards)-missingShards, int(s.MinShards)) } + // acquire memory for the migration + mem := u.mm.AcquireMemory(ctx, uint64(len(shardIndices))*rhpv2.SectorSize) + if mem == nil { + return 0, false, fmt.Errorf("failed to acquire memory for migration") + } + defer mem.Release() + // download the slab shards, surchargeApplied, err := d.DownloadSlab(ctx, *s, dlContracts) if err != nil { @@ -105,7 +113,7 @@ SHARDS: } // migrate the shards - uploaded, err := u.UploadShards(ctx, shards, allowed, bh, lockingPriorityUpload) + uploaded, err := u.UploadShards(ctx, shards, allowed, bh, lockingPriorityUpload, mem) if err != nil { return 0, surchargeApplied, fmt.Errorf("failed to upload slab for migration: %w", err) } diff --git a/worker/upload.go b/worker/upload.go index b2e5c5b87..258c4d2ab 100644 --- a/worker/upload.go +++ b/worker/upload.go @@ -33,7 +33,6 @@ const ( defaultPackedSlabsLockDuration = 10 * time.Minute defaultPackedSlabsUploadTimeout = 10 * time.Minute - defaultPackedSlabsLimit = 1 ) var ( @@ -112,6 +111,7 @@ type ( hp hostProvider rl revisionLocker logger *zap.SugaredLogger + mm *memoryManager maxOverdrive uint64 overdriveTimeout time.Duration @@ -161,6 +161,7 @@ type ( slabUpload struct { mgr *uploadManager + mem *acquiredMemory upload *upload sID slabID @@ -221,12 +222,12 @@ type ( } ) -func (w *worker) initUploadManager(maxOverdrive uint64, overdriveTimeout time.Duration, logger *zap.SugaredLogger) { +func (w *worker) initUploadManager(mm *memoryManager, maxOverdrive uint64, overdriveTimeout time.Duration, logger *zap.SugaredLogger) { if w.uploadManager != nil { panic("upload manager already initialized") // developer error } - w.uploadManager = newUploadManager(w.bus, w, w, maxOverdrive, overdriveTimeout, logger) + w.uploadManager = newUploadManager(w.bus, w, w, mm, maxOverdrive, overdriveTimeout, logger) } func (w *worker) upload(ctx context.Context, r io.Reader, bucket, path string, opts ...UploadOption) (string, error) { @@ -339,7 +340,7 @@ func (w *worker) threadedUploadPackedSlabs(rs api.RedundancySettings, contractSe // keep uploading packed slabs until we're done for { - uploaded, err := w.uploadPackedSlabs(context.Background(), defaultPackedSlabsLockDuration, rs, contractSet, defaultPackedSlabsLimit, lockPriority) + uploaded, err := w.uploadPackedSlabs(context.Background(), defaultPackedSlabsLockDuration, rs, contractSet, lockPriority) if err != nil { w.logger.Errorf("couldn't upload packed slabs, err: %v", err) return @@ -353,7 +354,7 @@ func (w *worker) tryUploadPackedSlabs(ctx context.Context, rs api.RedundancySett // if we want to block, try and upload one packed slab synchronously, we use // a slightly higher upload priority to avoid reaching the context deadline if block { - _, err = w.uploadPackedSlabs(ctx, defaultPackedSlabsLockDuration, rs, contractSet, defaultPackedSlabsLimit, lockingPriorityBlockedUpload) + _, err = w.uploadPackedSlabs(ctx, defaultPackedSlabsLockDuration, rs, contractSet, lockingPriorityBlockedUpload) } // make sure there's a goroutine uploading the remainder of the packed slabs @@ -361,25 +362,64 @@ func (w *worker) tryUploadPackedSlabs(ctx context.Context, rs api.RedundancySett return } -func (w *worker) uploadPackedSlabs(ctx context.Context, lockingDuration time.Duration, rs api.RedundancySettings, contractSet string, limit, lockPriority int) (uploaded int, err error) { - // fetch packed slabs - packedSlabs, err := w.bus.PackedSlabsForUpload(ctx, lockingDuration, uint8(rs.MinShards), uint8(rs.TotalShards), contractSet, limit) - if err != nil { - return 0, fmt.Errorf("couldn't fetch packed slabs from bus: %v", err) - } - +func (w *worker) uploadPackedSlabs(ctx context.Context, lockingDuration time.Duration, rs api.RedundancySettings, contractSet string, lockPriority int) (uploaded int, err error) { // upload packed slabs - for _, ps := range packedSlabs { - err = w.uploadPackedSlab(ctx, ps, rs, contractSet, lockPriority) + var mu sync.Mutex + var errs error + + var wg sync.WaitGroup + totalSize := uint64(rs.TotalShards) * rhpv2.SectorSize + + // derive a context that we can use as an interrupt in case of an error. + interruptCtx, cancel := context.WithCancel(ctx) + defer cancel() + + for { + // block until we have memory for a slab or until we are interrupted + mem := w.uploadManager.mm.AcquireMemory(interruptCtx, totalSize) + if mem == nil { + break // interrupted + } + + // fetch packed slabs to upload + var packedSlabs []api.PackedSlab + packedSlabs, err = w.bus.PackedSlabsForUpload(ctx, lockingDuration, uint8(rs.MinShards), uint8(rs.TotalShards), contractSet, 1) if err != nil { - return + err = fmt.Errorf("couldn't fetch packed slabs from bus: %v", err) + mem.Release() + break + } else if len(packedSlabs) == 0 { + mem.Release() + break // no more slabs } - uploaded++ + ps := packedSlabs[0] + + // launch upload for slab + wg.Add(1) + go func(ps api.PackedSlab) { + defer mem.Release() + defer wg.Done() + err := w.uploadPackedSlab(ctx, ps, rs, contractSet, lockPriority, mem) + mu.Lock() + if err != nil { + errs = errors.Join(errs, err) + cancel() // prevent new uploads from being launched + } else { + uploaded++ + } + mu.Unlock() + }(ps) } + + // wait for all threads to finish + wg.Wait() + + // return collected errors + err = errors.Join(err, errs) return } -func (w *worker) uploadPackedSlab(ctx context.Context, ps api.PackedSlab, rs api.RedundancySettings, contractSet string, lockPriority int) error { +func (w *worker) uploadPackedSlab(ctx context.Context, ps api.PackedSlab, rs api.RedundancySettings, contractSet string, lockPriority int, mem *acquiredMemory) error { // create a context with sane timeout ctx, cancel := context.WithTimeout(ctx, defaultPackedSlabsUploadTimeout) defer cancel() @@ -401,7 +441,7 @@ func (w *worker) uploadPackedSlab(ctx context.Context, ps api.PackedSlab, rs api // upload packed slab shards := encryptPartialSlab(ps.Data, ps.Key, uint8(rs.MinShards), uint8(rs.TotalShards)) - sectors, err := w.uploadManager.UploadShards(ctx, shards, contracts, up.CurrentHeight, lockPriority) + sectors, err := w.uploadManager.UploadShards(ctx, shards, contracts, up.CurrentHeight, lockPriority, mem) if err != nil { return fmt.Errorf("couldn't upload packed slab, err: %v", err) } @@ -416,12 +456,13 @@ func (w *worker) uploadPackedSlab(ctx context.Context, ps api.PackedSlab, rs api return nil } -func newUploadManager(b Bus, hp hostProvider, rl revisionLocker, maxOverdrive uint64, overdriveTimeout time.Duration, logger *zap.SugaredLogger) *uploadManager { +func newUploadManager(b Bus, hp hostProvider, rl revisionLocker, mm *memoryManager, maxOverdrive uint64, overdriveTimeout time.Duration, logger *zap.SugaredLogger) *uploadManager { return &uploadManager{ b: b, hp: hp, rl: rl, logger: logger, + mm: mm, maxOverdrive: maxOverdrive, overdriveTimeout: overdriveTimeout, @@ -527,68 +568,85 @@ func (mgr *uploadManager) Upload(ctx context.Context, r io.Reader, up uploadPara } defer finishFn() - // create the next slab channel - nextSlabChan := make(chan struct{}, 1) - defer close(nextSlabChan) - // create the response channel respChan := make(chan slabUploadResponse) - // collect the responses - var responses []slabUploadResponse - var slabIndex int - numSlabs := -1 + // channel to notify main thread of the number of slabs to wait for + numSlabsChan := make(chan int, 1) // prepare slab size size := int64(up.rs.MinShards) * rhpv2.SectorSize -loop: - for { - select { - case <-mgr.stopChan: - return object.Object{}, nil, "", errors.New("manager was stopped") - case <-ctx.Done(): - return object.Object{}, nil, "", errors.New("upload timed out") - case nextSlabChan <- struct{}{}: + redundantSize := uint64(up.rs.TotalShards) * rhpv2.SectorSize + + // launch uploads in a separate goroutine + stopCtx, cancel := context.WithCancel(ctx) + defer cancel() + go func() { + var slabIndex int + for { + select { + case <-mgr.stopChan: + return // interrupted + default: + } + // acquire memory + mem := mgr.mm.AcquireMemory(stopCtx, redundantSize) + if mem == nil { + return // interrupted + } // read next slab's data data := make([]byte, size) length, err := io.ReadFull(io.LimitReader(cr, size), data) if err == io.EOF { - if slabIndex == 0 { - break loop - } - numSlabs = slabIndex - if partialSlab != nil { + mem.Release() + + // no more data to upload, notify main thread of the number of + // slabs to wait for + numSlabs := slabIndex + if partialSlab != nil && slabIndex > 0 { numSlabs-- // don't wait on partial slab } - if len(responses) == numSlabs { - break loop - } - continue + numSlabsChan <- numSlabs + return } else if err != nil && err != io.ErrUnexpectedEOF { - return object.Object{}, nil, "", err - } - if up.packing && errors.Is(err, io.ErrUnexpectedEOF) { - // If uploadPacking is true, we return the partial slab without + mem.Release() + + // unexpected error, notify main thread + select { + case respChan <- slabUploadResponse{err: err}: + case <-stopCtx.Done(): + } + return + } else if up.packing && errors.Is(err, io.ErrUnexpectedEOF) { + mem.Release() + + // uploadPacking is true, we return the partial slab without // uploading. partialSlab = data[:length] - <-nextSlabChan // trigger next iteration } else { - // Otherwise we upload it. + // regular upload go func(rs api.RedundancySettings, data []byte, length, slabIndex int) { - u.uploadSlab(ctx, rs, data, length, slabIndex, respChan, nextSlabChan) + u.uploadSlab(ctx, rs, data, length, slabIndex, respChan, mem) + mem.Release() }(up.rs, data, length, slabIndex) } slabIndex++ + } + }() + + // collect responses + var responses []slabUploadResponse + numSlabs := math.MaxInt32 + for len(responses) < numSlabs { + select { + case <-mgr.stopChan: + return object.Object{}, nil, "", errors.New("manager was stopped") + case numSlabs = <-numSlabsChan: case res := <-respChan: if res.err != nil { return object.Object{}, nil, "", res.err } - - // collect the response and potentially break out of the loop responses = append(responses, res) - if len(responses) == numSlabs { - break loop - } } } @@ -604,7 +662,7 @@ loop: return o, partialSlab, hr.Hash(), nil } -func (mgr *uploadManager) UploadShards(ctx context.Context, shards [][]byte, contracts []api.ContractMetadata, bh uint64, lockPriority int) ([]object.Sector, error) { +func (mgr *uploadManager) UploadShards(ctx context.Context, shards [][]byte, contracts []api.ContractMetadata, bh uint64, lockPriority int, mem *acquiredMemory) ([]object.Sector, error) { // initiate the upload upload, finishFn, err := mgr.newUpload(ctx, len(shards), contracts, bh, lockPriority) if err != nil { @@ -613,7 +671,7 @@ func (mgr *uploadManager) UploadShards(ctx context.Context, shards [][]byte, con defer finishFn() // upload the shards - sectors, err := upload.uploadShards(ctx, shards, nil) + sectors, err := upload.uploadShards(ctx, shards, mem) if err != nil { return nil, err } @@ -875,7 +933,7 @@ func (u *upload) finishSlabUpload(upload *slabUpload) { upload.mu.Unlock() } -func (u *upload) newSlabUpload(ctx context.Context, shards [][]byte) (*slabUpload, []*sectorUploadReq, chan sectorUploadResp) { +func (u *upload) newSlabUpload(ctx context.Context, shards [][]byte, mem *acquiredMemory) (*slabUpload, []*sectorUploadReq, chan sectorUploadResp) { // create slab id var sID slabID frand.Read(sID[:]) @@ -888,6 +946,7 @@ func (u *upload) newSlabUpload(ctx context.Context, shards [][]byte) (*slabUploa // create slab upload slab := &slabUpload{ mgr: u.mgr, + mem: mem, upload: u, sID: sID, @@ -951,7 +1010,7 @@ func (u *upload) canUseUploader(sID slabID, ul *uploader) bool { return !used } -func (u *upload) uploadSlab(ctx context.Context, rs api.RedundancySettings, data []byte, length, index int, respChan chan slabUploadResponse, nextSlabChan chan struct{}) { +func (u *upload) uploadSlab(ctx context.Context, rs api.RedundancySettings, data []byte, length, index int, respChan chan slabUploadResponse, mem *acquiredMemory) { // cancel any sector uploads once the slab is done. ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -976,7 +1035,7 @@ func (u *upload) uploadSlab(ctx context.Context, rs api.RedundancySettings, data resp.slab.Slab.Encrypt(shards) // upload the shards - resp.slab.Slab.Shards, resp.err = u.uploadShards(ctx, shards, nextSlabChan) + resp.slab.Slab.Shards, resp.err = u.uploadShards(ctx, shards, mem) // send the response select { @@ -996,13 +1055,13 @@ func (u *upload) markUsed(sID slabID, fcid types.FileContractID) { u.used[sID][fcid] = struct{}{} } -func (u *upload) uploadShards(ctx context.Context, shards [][]byte, nextSlabChan chan struct{}) ([]object.Sector, error) { +func (u *upload) uploadShards(ctx context.Context, shards [][]byte, mem *acquiredMemory) ([]object.Sector, error) { // add tracing ctx, span := tracing.Tracer.Start(ctx, "uploadShards") defer span.End() // prepare the upload - slab, requests, respChan := u.newSlabUpload(ctx, shards) + slab, requests, respChan := u.newSlabUpload(ctx, shards, mem) span.SetAttributes(attribute.Stringer("id", slab.sID)) defer u.finishSlabUpload(slab) @@ -1018,8 +1077,6 @@ func (u *upload) uploadShards(ctx context.Context, shards [][]byte, nextSlabChan // collect responses var done bool - var next bool - var triggered bool for slab.inflight() > 0 && !done { var resp sectorUploadResp select { @@ -1033,16 +1090,7 @@ func (u *upload) uploadShards(ctx context.Context, shards [][]byte, nextSlabChan resetOverdrive() // receive the response - done, next = slab.receive(resp) - - // try and trigger next slab - if next && !triggered { - select { - case <-nextSlabChan: - triggered = true - default: - } - } + done = slab.receive(resp) // relaunch non-overdrive uploads if !done && resp.err != nil && !resp.req.overdrive { @@ -1064,15 +1112,6 @@ func (u *upload) uploadShards(ctx context.Context, shards [][]byte, nextSlabChan } } - // make sure next slab is triggered - if done && !triggered { - select { - case <-nextSlabChan: - triggered = true - default: - } - } - // register the amount of overdrive sectors span.SetAttributes(attribute.Int("overdrive", slab.overdriveCnt())) @@ -1488,7 +1527,7 @@ func (s *slabUpload) overdrivePct() float64 { return float64(numOverdrive) / float64(len(s.sectors)) } -func (s *slabUpload) receive(resp sectorUploadResp) (finished bool, next bool) { +func (s *slabUpload) receive(resp sectorUploadResp) bool { s.mu.Lock() defer s.mu.Unlock() @@ -1501,12 +1540,12 @@ func (s *slabUpload) receive(resp sectorUploadResp) (finished bool, next bool) { s.numInflight-- if resp.err != nil { s.errs[resp.req.hk] = resp.err - return false, false + return false } // redundant sectors can't complete the upload if s.sectors[resp.req.sectorIndex].Root != (types.Hash256{}) { - return false, false + return false } // store the sector and call cancel on the sector ctx @@ -1523,9 +1562,16 @@ func (s *slabUpload) receive(resp sectorUploadResp) (finished bool, next bool) { // update remaining sectors delete(s.remaining, resp.req.sectorIndex) - finished = len(s.remaining) == 0 - next = len(s.remaining) <= int(s.mgr.maxOverdrive) - return + + // release memory - we don't release memory for overdrive sectors because + // it's not included in the initial allocation. + resp.req.sector = nil + s.shards[resp.req.sectorIndex] = nil + if !resp.req.overdrive { + s.mem.ReleaseSome(rhpv2.SectorSize) + } + + return len(s.remaining) == 0 } func (sID slabID) String() string { diff --git a/worker/worker.go b/worker/worker.go index b537d448e..97d366c03 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -1340,6 +1340,12 @@ func (w *worker) idHandlerGET(jc jape.Context) { jc.Encode(w.id) } +func (w *worker) memoryGET(jc jape.Context) { + jc.Encode(api.MemoryResponse{ + Upload: w.uploadManager.mm.Status(), + }) +} + func (w *worker) accountHandlerGET(jc jape.Context) { var hostKey types.PublicKey if jc.DecodeParam("hostkey", &hostKey) != nil { @@ -1364,7 +1370,7 @@ func (w *worker) stateHandlerGET(jc jape.Context) { } // New returns an HTTP handler that serves the worker API. -func New(masterKey [32]byte, id string, b Bus, contractLockingDuration, busFlushInterval, downloadOverdriveTimeout, uploadOverdriveTimeout time.Duration, downloadMaxOverdrive, uploadMaxOverdrive uint64, allowPrivateIPs bool, l *zap.Logger) (*worker, error) { +func New(masterKey [32]byte, id string, b Bus, contractLockingDuration, busFlushInterval, downloadOverdriveTimeout, uploadOverdriveTimeout time.Duration, downloadMaxOverdrive, uploadMaxMemory, uploadMaxOverdrive uint64, allowPrivateIPs bool, l *zap.Logger) (*worker, error) { if contractLockingDuration == 0 { return nil, errors.New("contract lock duration must be positive") } @@ -1395,7 +1401,11 @@ func New(masterKey [32]byte, id string, b Bus, contractLockingDuration, busFlush w.initContractSpendingRecorder() w.initPriceTables() w.initDownloadManager(downloadMaxOverdrive, downloadOverdriveTimeout, l.Sugar().Named("downloadmanager")) - w.initUploadManager(uploadMaxOverdrive, uploadOverdriveTimeout, l.Sugar().Named("uploadmanager")) + mm, err := newMemoryManager(w.logger, uploadMaxMemory) + if err != nil { + return nil, err + } + w.initUploadManager(mm, uploadMaxOverdrive, uploadOverdriveTimeout, l.Sugar().Named("uploadmanager")) return w, nil } @@ -1405,6 +1415,8 @@ func (w *worker) Handler() http.Handler { "GET /account/:hostkey": w.accountHandlerGET, "GET /id": w.idHandlerGET, + "GET /memory": w.memoryGET, + "GET /rhp/contracts": w.rhpContractsHandlerGET, "POST /rhp/contract/:id/broadcast": w.rhpBroadcastHandler, "POST /rhp/contract/:id/prune": w.rhpPruneContractHandlerPOST,