diff --git a/internal/test/tt.go b/internal/test/tt.go index 22bcff223..d44152eda 100644 --- a/internal/test/tt.go +++ b/internal/test/tt.go @@ -90,12 +90,17 @@ func (t impl) OKAll(vs ...interface{}) { func (t impl) Retry(tries int, durationBetweenAttempts time.Duration, fn func() error) { t.Helper() - for i := 1; i < tries; i++ { - err := fn() + t.OK(Retry(tries, durationBetweenAttempts, fn)) +} + +func Retry(tries int, durationBetweenAttempts time.Duration, fn func() error) error { + var err error + for i := 0; i < tries; i++ { + err = fn() if err == nil { - return + break } time.Sleep(durationBetweenAttempts) } - t.OK(fn()) + return err } diff --git a/worker/upload.go b/worker/upload.go index d95c9db9e..c5e86a166 100644 --- a/worker/upload.go +++ b/worker/upload.go @@ -198,7 +198,7 @@ func (w *worker) upload(ctx context.Context, r io.Reader, contracts []api.Contra // upload packed slab if len(packedSlabs) > 0 { if err := w.tryUploadPackedSlab(ctx, mem, packedSlabs[0], up.rs, up.contractSet, lockingPriorityBlockedUpload); err != nil { - w.logger.Errorf("couldn't upload packed slabs, err: %v", err) + w.logger.Error(err) } } } @@ -227,10 +227,6 @@ func (w *worker) threadedUploadPackedSlabs(rs api.RedundancySettings, contractSe w.uploadsMu.Unlock() }() - // upload packed slabs - var mu sync.Mutex - var errs error - // derive a context that we can use as an interrupt in case of an error or shutdown. interruptCtx, interruptCancel := context.WithCancel(w.shutdownCtx) defer interruptCancel() @@ -246,9 +242,9 @@ func (w *worker) threadedUploadPackedSlabs(rs api.RedundancySettings, contractSe // fetch packed slab to upload packedSlabs, err := w.bus.PackedSlabsForUpload(interruptCtx, defaultPackedSlabsLockDuration, uint8(rs.MinShards), uint8(rs.TotalShards), contractSet, 1) if err != nil { - mu.Lock() - errs = errors.Join(errs, fmt.Errorf("couldn't fetch packed slabs from bus: %v", err)) - mu.Unlock() + w.logger.Errorf("couldn't fetch packed slabs from bus: %v", err) + mem.Release() + break } // no more packed slabs to upload @@ -270,9 +266,7 @@ func (w *worker) threadedUploadPackedSlabs(rs api.RedundancySettings, contractSe // try to upload a packed slab, if there were no packed slabs left to upload ok is false if err := w.tryUploadPackedSlab(ctx, mem, ps, rs, contractSet, lockPriority); err != nil { - mu.Lock() - errs = errors.Join(errs, err) - mu.Unlock() + w.logger.Error(err) interruptCancel() // prevent new uploads from being launched } }(packedSlabs[0]) @@ -280,11 +274,6 @@ func (w *worker) threadedUploadPackedSlabs(rs api.RedundancySettings, contractSe // wait for all threads to finish wg.Wait() - - // log errors - if err := errors.Join(errs); err != nil { - w.logger.Errorf("couldn't upload packed slabs, err: %v", err) - } return } @@ -890,7 +879,7 @@ loop: for slab.numInflight > 0 && !done { select { case <-u.shutdownCtx.Done(): - return nil, 0, 0, errors.New("upload stopped") + return nil, 0, 0, ErrShuttingDown case <-ctx.Done(): return nil, 0, 0, ctx.Err() case resp := <-respChan: diff --git a/worker/upload_test.go b/worker/upload_test.go index 044827799..1d441693f 100644 --- a/worker/upload_test.go +++ b/worker/upload_test.go @@ -4,13 +4,14 @@ import ( "bytes" "context" "errors" - "math" + "fmt" "testing" "time" rhpv2 "go.sia.tech/core/rhp/v2" "go.sia.tech/core/types" "go.sia.tech/renterd/api" + "go.sia.tech/renterd/internal/test" "go.sia.tech/renterd/object" "lukechampine.com/frand" ) @@ -128,7 +129,7 @@ func TestUploadPackedSlab(t *testing.T) { w := newTestWorker(t) // add hosts to worker - w.AddHosts(testRedundancySettings.TotalShards * 2) + w.AddHosts(testRedundancySettings.TotalShards) // convenience variables os := w.os @@ -140,9 +141,6 @@ func TestUploadPackedSlab(t *testing.T) { params := testParameters(t.Name()) params.packing = true - // block aysnc packed slab uploads - w.BlockAsyncPackedSlabUploads(params) - // create test data data := frand.Bytes(128) @@ -208,67 +206,60 @@ func TestUploadPackedSlab(t *testing.T) { t.Fatal("data mismatch") } - // configure max buffer size - os.setSlabBufferMaxSizeSoft(128) - - // upload 2x64 bytes using the worker - params.path = t.Name() + "2" - _, err = w.upload(context.Background(), bytes.NewReader(frand.Bytes(64)), w.Contracts(), params) - if err != nil { - t.Fatal(err) - } - params.path = t.Name() + "3" - _, err = w.upload(context.Background(), bytes.NewReader(frand.Bytes(64)), w.Contracts(), params) - if err != nil { - t.Fatal(err) + // define a helper that counts packed slabs + packedSlabsCount := func() int { + t.Helper() + os.mu.Lock() + cnt := len(os.partials) + os.mu.Unlock() + return cnt } - // assert we still have two packed slabs (buffer limit not reached) - pss, err = os.PackedSlabsForUpload(context.Background(), 0, uint8(params.rs.MinShards), uint8(params.rs.TotalShards), testContractSet, math.MaxInt) - if err != nil { - t.Fatal(err) - } else if len(pss) != 2 { - t.Fatal("expected 2 packed slab") + // define a helper that uploads data using the worker + var c int + uploadBytes := func(n int) { + t.Helper() + params.path = fmt.Sprintf("%s_%d", t.Name(), c) + _, err := w.upload(context.Background(), bytes.NewReader(frand.Bytes(n)), w.Contracts(), params) + if err != nil { + t.Fatal(err) + } + c++ } - // upload one more byte (buffer limit reached) - params.path = t.Name() + "4" - _, err = w.upload(context.Background(), bytes.NewReader(frand.Bytes(1)), w.Contracts(), params) - if err != nil { - t.Fatal(err) - } + // block aysnc packed slab uploads + w.BlockAsyncPackedSlabUploads(params) - // assert we still have two packed slabs (one got uploaded synchronously) - pss, err = os.PackedSlabsForUpload(context.Background(), 0, uint8(params.rs.MinShards), uint8(params.rs.TotalShards), testContractSet, math.MaxInt) - if err != nil { - t.Fatal(err) - } else if len(pss) != 2 { - t.Fatal("expected 2 packed slab") + // configure max buffer size + os.setSlabBufferMaxSizeSoft(128) + + // upload 2x64 bytes using the worker and assert we still have two packed + // slabs (buffer limit not reached) + uploadBytes(64) + uploadBytes(64) + if packedSlabsCount() != 2 { + t.Fatal("expected 2 packed slabs") } - // allow some time for the background thread to realise we blocked async - // packed slab uploads - time.Sleep(time.Second) + // upload one more byte and assert we still have two packed slabs (one got + // uploaded synchronously because buffer limit was reached) + uploadBytes(1) + if packedSlabsCount() != 2 { + t.Fatal("expected 2 packed slabs") + } // unblock asynchronous uploads w.UnblockAsyncPackedSlabUploads(params) + uploadBytes(129) // ensure background thread is running - // upload 1 byte using the worker - params.path = t.Name() + "5" - _, err = w.upload(context.Background(), bytes.NewReader(frand.Bytes(129)), w.Contracts(), params) - if err != nil { - t.Fatal(err) - } - - // allow some time for the thread to pick up the packed slabs - time.Sleep(time.Second) - - // assert we uploaded all packed slabs - pss, err = os.PackedSlabsForUpload(context.Background(), 0, uint8(params.rs.MinShards), uint8(params.rs.TotalShards), testContractSet, 1) - if err != nil { + // assert packed slabs get uploaded asynchronously + if err := test.Retry(100, 100*time.Millisecond, func() error { + if packedSlabsCount() != 0 { + return errors.New("expected 0 packed slabs") + } + return nil + }); err != nil { t.Fatal(err) - } else if len(pss) != 0 { - t.Fatal("expected 0 packed slab") } }