From 49a9cb367547dedacbc767ea5de305e177aced91 Mon Sep 17 00:00:00 2001 From: PJ Date: Tue, 13 Feb 2024 11:02:37 +0100 Subject: [PATCH] worker: rewrite packed slabs logic --- api/setting.go | 9 +- internal/testing/cluster_test.go | 2 +- worker/downloader.go | 1 - worker/upload.go | 160 ++++++++++++++----------------- worker/upload_test.go | 2 +- 5 files changed, 83 insertions(+), 91 deletions(-) diff --git a/api/setting.go b/api/setting.go index d110890105..b09f78e737 100644 --- a/api/setting.go +++ b/api/setting.go @@ -121,11 +121,16 @@ func (rs RedundancySettings) Redundancy() float64 { return float64(rs.TotalShards) / float64(rs.MinShards) } -// SlabSizeNoRedundancy returns the size of a slab without added redundancy. -func (rs RedundancySettings) SlabSizeNoRedundancy() uint64 { +// SlabSize returns the size of a slab. +func (rs RedundancySettings) SlabSize() uint64 { return uint64(rs.MinShards) * rhpv2.SectorSize } +// SlabSizeWithRedundancy returns the size of a slab with redundancy. +func (rs RedundancySettings) SlabSizeWithRedundancy() uint64 { + return uint64(rs.TotalShards) * rhpv2.SectorSize +} + // Validate returns an error if the redundancy settings are not considered // valid. func (rs RedundancySettings) Validate() error { diff --git a/internal/testing/cluster_test.go b/internal/testing/cluster_test.go index a8fa5ec717..d564fbb275 100644 --- a/internal/testing/cluster_test.go +++ b/internal/testing/cluster_test.go @@ -2315,7 +2315,7 @@ func TestMultipartUploadWrappedByPartialSlabs(t *testing.T) { defer cluster.Shutdown() b := cluster.Bus w := cluster.Worker - slabSize := testRedundancySettings.SlabSizeNoRedundancy() + slabSize := testRedundancySettings.SlabSizeWithRedundancy() tt := cluster.tt // start a new multipart upload. We upload the parts in reverse order diff --git a/worker/downloader.go b/worker/downloader.go index a184d57023..aa9bd44888 100644 --- a/worker/downloader.go +++ b/worker/downloader.go @@ -18,7 +18,6 @@ const ( type ( downloader struct { - hk types.PublicKey host Host statsDownloadSpeedBytesPerMS *stats.DataPoints // keep track of this separately for stats (no decay is applied) diff --git a/worker/upload.go b/worker/upload.go index 202d356209..fdff956969 100644 --- a/worker/upload.go +++ b/worker/upload.go @@ -153,7 +153,16 @@ func (w *worker) initUploadManager(maxMemory, maxOverdrive uint64, overdriveTime w.uploadManager = newUploadManager(w.shutdownCtx, w, mm, w.bus, w.bus, maxOverdrive, overdriveTimeout, w.contractLockingDuration, logger) } -func (w *worker) upload(ctx context.Context, r io.Reader, contracts []api.ContractMetadata, up uploadParameters, opts ...UploadOption) (_ string, err error) { +func (w *worker) isStopped() bool { + select { + case <-w.shutdownCtx.Done(): + return true + default: + } + return false +} + +func (w *worker) upload(ctx context.Context, r io.Reader, contracts []api.ContractMetadata, up uploadParameters, opts ...UploadOption) (string, error) { // apply the options for _, opt := range opts { opt(&up) @@ -165,9 +174,10 @@ func (w *worker) upload(ctx context.Context, r io.Reader, contracts []api.Contra // if mime type is still not known, wrap the reader with a mime reader if up.mimeType == "" { + var err error up.mimeType, r, err = newMimeReader(r) if err != nil { - return + return "", err } } } @@ -178,22 +188,23 @@ func (w *worker) upload(ctx context.Context, r io.Reader, contracts []api.Contra return "", err } - // if packing was enabled try uploading packed slabs + // handle packed slabs if necessary if up.packing && !w.isStopped() { - if err := w.tryUploadPackedSlabs(ctx, up.rs, up.contractSet, bufferSizeLimitReached); err != nil { - w.logger.Errorf("couldn't upload packed slabs, err: %v", err) + // try and upload one slab synchronously + if bufferSizeLimitReached { + if mem := w.uploadManager.mm.AcquireMemory(ctx, up.rs.SlabSizeWithRedundancy()); mem != nil { + if _, err := w.tryUploadPackedSlab(ctx, mem, defaultPackedSlabsLockDuration, up.rs, up.contractSet, lockingPriorityBlockedUpload); err != nil { + w.logger.Errorf("couldn't upload packed slabs, err: %v", err) + } + mem.Release() + } } - } - return eTag, nil -} -func (w *worker) isStopped() bool { - select { - case <-w.shutdownCtx.Done(): - return true - default: + // make sure there's a goroutine uploading the remainder of the packed slabs + go w.threadedUploadPackedSlabs(up.rs, up.contractSet, lockingPriorityBackgroundUpload) } - return false + + return eTag, nil } func (w *worker) threadedUploadPackedSlabs(rs api.RedundancySettings, contractSet string, lockPriority int) { @@ -214,103 +225,80 @@ func (w *worker) threadedUploadPackedSlabs(rs api.RedundancySettings, contractSe w.uploadsMu.Unlock() }() - // keep uploading packed slabs until we're done - for !w.isStopped() { - uploaded, err := w.uploadPackedSlabs(w.shutdownCtx, defaultPackedSlabsLockDuration, rs, contractSet, lockPriority) - - if err != nil { - w.logger.Errorf("couldn't upload packed slabs, err: %v", err) - return - } else if uploaded == 0 { - return - } - } -} - -func (w *worker) tryUploadPackedSlabs(ctx context.Context, rs api.RedundancySettings, contractSet string, block bool) (err error) { - // if we want to block, try and upload one packed slab synchronously, we use - // a slightly higher upload priority to avoid reaching the context deadline - if block { - _, err = w.uploadPackedSlabs(ctx, defaultPackedSlabsLockDuration, rs, contractSet, lockingPriorityBlockedUpload) - } - - // make sure there's a goroutine uploading the remainder of the packed slabs - go w.threadedUploadPackedSlabs(rs, contractSet, lockingPriorityBackgroundUpload) - return -} - -func (w *worker) uploadPackedSlabs(ctx context.Context, lockingDuration time.Duration, rs api.RedundancySettings, contractSet string, lockPriority int) (uploaded int, err error) { // upload packed slabs var mu sync.Mutex var errs error - var wg sync.WaitGroup - totalSize := uint64(rs.TotalShards) * rhpv2.SectorSize - - // derive a context that we can use as an interrupt in case of an error. - interruptCtx, cancel := context.WithCancel(ctx) - defer cancel() + // derive a context that we can use as an interrupt in case of an error or shutdown. + interruptCtx, interruptCancel := context.WithCancel(w.shutdownCtx) + defer interruptCancel() + var wg sync.WaitGroup for { - // block until we have memory for a slab or until we are interrupted - mem := w.uploadManager.mm.AcquireMemory(interruptCtx, totalSize) + // block until we have memory + mem := w.uploadManager.mm.AcquireMemory(interruptCtx, rs.SlabSizeWithRedundancy()) if mem == nil { break // interrupted } - // fetch packed slabs to upload - var packedSlabs []api.PackedSlab - packedSlabs, err = w.bus.PackedSlabsForUpload(ctx, lockingDuration, uint8(rs.MinShards), uint8(rs.TotalShards), contractSet, 1) - if err != nil { - err = fmt.Errorf("couldn't fetch packed slabs from bus: %v", err) - mem.Release() - break - } else if len(packedSlabs) == 0 { - mem.Release() - break // no more slabs - } - ps := packedSlabs[0] - - // launch upload for slab wg.Add(1) - go func(ps api.PackedSlab) { - defer mem.Release() + go func() { defer wg.Done() - err := w.uploadPackedSlab(ctx, rs, ps, mem, contractSet, lockPriority) - mu.Lock() + defer mem.Release() + + // we use the background context here, but apply a sane timeout, + // this ensures ongoing uploads are handled gracefully during + // shutdown + ctx, cancel := context.WithTimeout(context.Background(), defaultPackedSlabsUploadTimeout) + defer cancel() + + // attach interaction recorder to the context + ctx = context.WithValue(ctx, keyInteractionRecorder, w) + + // try to upload a packed slab, if there were no packed slabs left to upload ok is false + ok, err := w.tryUploadPackedSlab(ctx, mem, defaultPackedSlabsLockDuration, rs, contractSet, lockPriority) if err != nil { + mu.Lock() errs = errors.Join(errs, err) - cancel() // prevent new uploads from being launched - } else { - uploaded++ + mu.Unlock() + interruptCancel() // prevent new uploads from being launched + } else if !ok { + interruptCancel() // no more packed slabs to upload } - mu.Unlock() - }(ps) + }() } // wait for all threads to finish wg.Wait() // return collected errors - err = errors.Join(err, errs) + if err := errors.Join(errs); err != nil { + w.logger.Errorf("couldn't upload packed slabs, err: %v", err) + } return } -func (w *worker) uploadPackedSlab(ctx context.Context, rs api.RedundancySettings, ps api.PackedSlab, mem Memory, contractSet string, lockPriority int) error { - // create a context with sane timeout - ctx, cancel := context.WithTimeout(ctx, defaultPackedSlabsUploadTimeout) - defer cancel() +func (w *worker) tryUploadPackedSlab(ctx context.Context, mem Memory, lockingDuration time.Duration, rs api.RedundancySettings, contractSet string, lockPriority int) (bool, error) { + // fetch packed slab to upload + packedSlabs, err := w.bus.PackedSlabsForUpload(ctx, lockingDuration, uint8(rs.MinShards), uint8(rs.TotalShards), contractSet, 1) + if err != nil { + err = fmt.Errorf("couldn't fetch packed slabs from bus: %v", err) + return false, err + } else if len(packedSlabs) == 0 { + return false, nil // no more slabs + } + ps := packedSlabs[0] // fetch contracts contracts, err := w.bus.Contracts(ctx, api.ContractsOpts{ContractSet: contractSet}) if err != nil { - return fmt.Errorf("couldn't fetch packed slabs from bus: %v", err) + return false, fmt.Errorf("couldn't fetch packed slabs from bus: %v", err) } // fetch upload params up, err := w.bus.UploadParams(ctx) if err != nil { - return fmt.Errorf("couldn't fetch upload params from bus: %v", err) + return false, fmt.Errorf("couldn't fetch upload params from bus: %v", err) } // attach gouging checker to the context @@ -319,10 +307,10 @@ func (w *worker) uploadPackedSlab(ctx context.Context, rs api.RedundancySettings // upload packed slab err = w.uploadManager.UploadPackedSlab(ctx, rs, ps, mem, contracts, up.CurrentHeight, lockPriority) if err != nil { - return fmt.Errorf("couldn't upload packed slab, err: %v", err) + return false, fmt.Errorf("couldn't upload packed slab, err: %v", err) } - return nil + return true, nil } func newUploadManager(ctx context.Context, hm HostManager, mm MemoryManager, os ObjectStore, cs ContractStore, maxOverdrive uint64, overdriveTimeout time.Duration, contractLockDuration time.Duration, logger *zap.SugaredLogger) *uploadManager { @@ -457,9 +445,9 @@ func (mgr *uploadManager) Upload(ctx context.Context, r io.Reader, contracts []a // channel to notify main thread of the number of slabs to wait for numSlabsChan := make(chan int, 1) - // prepare slab size - size := int64(up.rs.MinShards) * rhpv2.SectorSize - redundantSize := uint64(up.rs.TotalShards) * rhpv2.SectorSize + // prepare slab sizes + slabSize := up.rs.SlabSize() + slabSizeWithRedundancy := up.rs.SlabSizeWithRedundancy() var partialSlab []byte // launch uploads in a separate goroutine @@ -473,7 +461,7 @@ func (mgr *uploadManager) Upload(ctx context.Context, r io.Reader, contracts []a } // acquire memory - mem := mgr.mm.AcquireMemory(ctx, redundantSize) + mem := mgr.mm.AcquireMemory(ctx, slabSizeWithRedundancy) if mem == nil { return // interrupted } @@ -494,8 +482,8 @@ func (mgr *uploadManager) Upload(ctx context.Context, r io.Reader, contracts []a } // read next slab's data - data := make([]byte, size) - length, err := io.ReadFull(io.LimitReader(cr, size), data) + data := make([]byte, slabSize) + length, err := io.ReadFull(io.LimitReader(cr, int64(slabSize)), data) if err == io.EOF { mem.Release() diff --git a/worker/upload_test.go b/worker/upload_test.go index 8d32455bd2..9a285efa5c 100644 --- a/worker/upload_test.go +++ b/worker/upload_test.go @@ -185,7 +185,7 @@ func TestUploadPackedSlab(t *testing.T) { t.Fatal("expected 1 packed slab") } ps := pss[0] - mem := mm.AcquireMemory(context.Background(), uint64(params.rs.TotalShards*rhpv2.SectorSize)) + mem := mm.AcquireMemory(context.Background(), params.rs.SlabSizeWithRedundancy()) // upload the packed slab err = ul.UploadPackedSlab(context.Background(), params.rs, ps, mem, w.contracts(), 0, lockingPriorityUpload)