Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix TestUploadPackedSlab NDF #1012

Merged
merged 2 commits into from
Mar 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions internal/test/tt.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,17 @@ func (t impl) OKAll(vs ...interface{}) {

func (t impl) Retry(tries int, durationBetweenAttempts time.Duration, fn func() error) {
t.Helper()
for i := 1; i < tries; i++ {
err := fn()
t.OK(Retry(tries, durationBetweenAttempts, fn))
}

func Retry(tries int, durationBetweenAttempts time.Duration, fn func() error) error {
var err error
for i := 0; i < tries; i++ {
err = fn()
if err == nil {
return
break
}
time.Sleep(durationBetweenAttempts)
}
t.OK(fn())
return err
}
23 changes: 6 additions & 17 deletions worker/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func (w *worker) upload(ctx context.Context, r io.Reader, contracts []api.Contra
// upload packed slab
if len(packedSlabs) > 0 {
if err := w.tryUploadPackedSlab(ctx, mem, packedSlabs[0], up.rs, up.contractSet, lockingPriorityBlockedUpload); err != nil {
w.logger.Errorf("couldn't upload packed slabs, err: %v", err)
w.logger.Error(err)
ChrisSchinnerl marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
Expand Down Expand Up @@ -227,10 +227,6 @@ func (w *worker) threadedUploadPackedSlabs(rs api.RedundancySettings, contractSe
w.uploadsMu.Unlock()
}()

// upload packed slabs
var mu sync.Mutex
var errs error

// derive a context that we can use as an interrupt in case of an error or shutdown.
interruptCtx, interruptCancel := context.WithCancel(w.shutdownCtx)
defer interruptCancel()
Expand All @@ -246,9 +242,9 @@ func (w *worker) threadedUploadPackedSlabs(rs api.RedundancySettings, contractSe
// fetch packed slab to upload
packedSlabs, err := w.bus.PackedSlabsForUpload(interruptCtx, defaultPackedSlabsLockDuration, uint8(rs.MinShards), uint8(rs.TotalShards), contractSet, 1)
if err != nil {
mu.Lock()
errs = errors.Join(errs, fmt.Errorf("couldn't fetch packed slabs from bus: %v", err))
mu.Unlock()
w.logger.Errorf("couldn't fetch packed slabs from bus: %v", err)
mem.Release()
break
}

// no more packed slabs to upload
Expand All @@ -270,21 +266,14 @@ func (w *worker) threadedUploadPackedSlabs(rs api.RedundancySettings, contractSe

// try to upload a packed slab, if there were no packed slabs left to upload ok is false
if err := w.tryUploadPackedSlab(ctx, mem, ps, rs, contractSet, lockPriority); err != nil {
mu.Lock()
errs = errors.Join(errs, err)
mu.Unlock()
w.logger.Error(err)
interruptCancel() // prevent new uploads from being launched
}
}(packedSlabs[0])
}

// wait for all threads to finish
wg.Wait()

// log errors
if err := errors.Join(errs); err != nil {
w.logger.Errorf("couldn't upload packed slabs, err: %v", err)
}
return
}

Expand Down Expand Up @@ -890,7 +879,7 @@ loop:
for slab.numInflight > 0 && !done {
select {
case <-u.shutdownCtx.Done():
return nil, 0, 0, errors.New("upload stopped")
return nil, 0, 0, ErrShuttingDown
case <-ctx.Done():
return nil, 0, 0, ctx.Err()
case resp := <-respChan:
Expand Down
99 changes: 45 additions & 54 deletions worker/upload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@ import (
"bytes"
"context"
"errors"
"math"
"fmt"
"testing"
"time"

rhpv2 "go.sia.tech/core/rhp/v2"
"go.sia.tech/core/types"
"go.sia.tech/renterd/api"
"go.sia.tech/renterd/internal/test"
"go.sia.tech/renterd/object"
"lukechampine.com/frand"
)
Expand Down Expand Up @@ -128,7 +129,7 @@ func TestUploadPackedSlab(t *testing.T) {
w := newTestWorker(t)

// add hosts to worker
w.AddHosts(testRedundancySettings.TotalShards * 2)
w.AddHosts(testRedundancySettings.TotalShards)

// convenience variables
os := w.os
Expand All @@ -140,9 +141,6 @@ func TestUploadPackedSlab(t *testing.T) {
params := testParameters(t.Name())
params.packing = true

// block aysnc packed slab uploads
w.BlockAsyncPackedSlabUploads(params)

// create test data
data := frand.Bytes(128)

Expand Down Expand Up @@ -208,67 +206,60 @@ func TestUploadPackedSlab(t *testing.T) {
t.Fatal("data mismatch")
}

// configure max buffer size
os.setSlabBufferMaxSizeSoft(128)

// upload 2x64 bytes using the worker
params.path = t.Name() + "2"
_, err = w.upload(context.Background(), bytes.NewReader(frand.Bytes(64)), w.Contracts(), params)
if err != nil {
t.Fatal(err)
}
params.path = t.Name() + "3"
_, err = w.upload(context.Background(), bytes.NewReader(frand.Bytes(64)), w.Contracts(), params)
if err != nil {
t.Fatal(err)
// define a helper that counts packed slabs
packedSlabsCount := func() int {
t.Helper()
os.mu.Lock()
cnt := len(os.partials)
os.mu.Unlock()
return cnt
}

// assert we still have two packed slabs (buffer limit not reached)
pss, err = os.PackedSlabsForUpload(context.Background(), 0, uint8(params.rs.MinShards), uint8(params.rs.TotalShards), testContractSet, math.MaxInt)
if err != nil {
t.Fatal(err)
} else if len(pss) != 2 {
t.Fatal("expected 2 packed slab")
// define a helper that uploads data using the worker
var c int
uploadBytes := func(n int) {
t.Helper()
params.path = fmt.Sprintf("%s_%d", t.Name(), c)
_, err := w.upload(context.Background(), bytes.NewReader(frand.Bytes(n)), w.Contracts(), params)
if err != nil {
t.Fatal(err)
}
c++
}

// upload one more byte (buffer limit reached)
params.path = t.Name() + "4"
_, err = w.upload(context.Background(), bytes.NewReader(frand.Bytes(1)), w.Contracts(), params)
if err != nil {
t.Fatal(err)
}
// block aysnc packed slab uploads
w.BlockAsyncPackedSlabUploads(params)

// assert we still have two packed slabs (one got uploaded synchronously)
pss, err = os.PackedSlabsForUpload(context.Background(), 0, uint8(params.rs.MinShards), uint8(params.rs.TotalShards), testContractSet, math.MaxInt)
if err != nil {
t.Fatal(err)
} else if len(pss) != 2 {
t.Fatal("expected 2 packed slab")
// configure max buffer size
os.setSlabBufferMaxSizeSoft(128)

// upload 2x64 bytes using the worker and assert we still have two packed
// slabs (buffer limit not reached)
uploadBytes(64)
uploadBytes(64)
if packedSlabsCount() != 2 {
t.Fatal("expected 2 packed slabs")
}

// allow some time for the background thread to realise we blocked async
// packed slab uploads
time.Sleep(time.Second)
// upload one more byte and assert we still have two packed slabs (one got
// uploaded synchronously because buffer limit was reached)
uploadBytes(1)
if packedSlabsCount() != 2 {
t.Fatal("expected 2 packed slabs")
}

// unblock asynchronous uploads
w.UnblockAsyncPackedSlabUploads(params)
uploadBytes(129) // ensure background thread is running

// upload 1 byte using the worker
params.path = t.Name() + "5"
_, err = w.upload(context.Background(), bytes.NewReader(frand.Bytes(129)), w.Contracts(), params)
if err != nil {
t.Fatal(err)
}

// allow some time for the thread to pick up the packed slabs
time.Sleep(time.Second)

// assert we uploaded all packed slabs
pss, err = os.PackedSlabsForUpload(context.Background(), 0, uint8(params.rs.MinShards), uint8(params.rs.TotalShards), testContractSet, 1)
if err != nil {
// assert packed slabs get uploaded asynchronously
if err := test.Retry(100, 100*time.Millisecond, func() error {
if packedSlabsCount() != 0 {
return errors.New("expected 0 packed slabs")
}
return nil
}); err != nil {
t.Fatal(err)
} else if len(pss) != 0 {
t.Fatal("expected 0 packed slab")
}
ChrisSchinnerl marked this conversation as resolved.
Show resolved Hide resolved
}

Expand Down
Loading