Skip to content

Commit

Permalink
worker: extend TestUploadPackedSlab to test sync and async packed sla…
Browse files Browse the repository at this point in the history
…b uploads
  • Loading branch information
peterjan committed Feb 27, 2024
1 parent 8ca4580 commit 716091b
Show file tree
Hide file tree
Showing 5 changed files with 143 additions and 42 deletions.
45 changes: 36 additions & 9 deletions worker/mocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"errors"
"fmt"
"math"
"math/big"
"sync"
"time"
Expand Down Expand Up @@ -346,24 +347,27 @@ var _ ObjectStore = (*objectStoreMock)(nil)

type (
objectStoreMock struct {
mu sync.Mutex
objects map[string]map[string]object.Object
partials map[string]packedSlabMock
bufferIDCntr uint // allows marking packed slabs as uploaded
mu sync.Mutex
objects map[string]map[string]object.Object
partials map[string]*packedSlabMock
slabBufferMaxSizeSoft int
bufferIDCntr uint // allows marking packed slabs as uploaded
}

packedSlabMock struct {
parameterKey string // ([minshards]-[totalshards]-[contractset])
bufferID uint
slabKey object.EncryptionKey
data []byte
lockedUntil time.Time
}
)

func newObjectStoreMock(bucket string) *objectStoreMock {
os := &objectStoreMock{
objects: make(map[string]map[string]object.Object),
partials: make(map[string]packedSlabMock),
objects: make(map[string]map[string]object.Object),
partials: make(map[string]*packedSlabMock),
slabBufferMaxSizeSoft: math.MaxInt64,
}
os.objects[bucket] = make(map[string]object.Object)
return os
Expand Down Expand Up @@ -421,15 +425,15 @@ func (os *objectStoreMock) AddPartialSlab(ctx context.Context, data []byte, minS
}

// update store
os.partials[ec.String()] = packedSlabMock{
os.partials[ec.String()] = &packedSlabMock{
parameterKey: fmt.Sprintf("%d-%d-%v", minShards, totalShards, contractSet),
bufferID: os.bufferIDCntr,
slabKey: ec,
data: data,
}
os.bufferIDCntr++

return []object.SlabSlice{ss}, false, nil
return []object.SlabSlice{ss}, os.totalSlabBufferSize() > os.slabBufferMaxSizeSoft, nil
}

func (os *objectStoreMock) Object(ctx context.Context, bucket, path string, opts api.GetObjectOptions) (api.ObjectsResponse, error) {
Expand Down Expand Up @@ -511,14 +515,22 @@ func (os *objectStoreMock) PackedSlabsForUpload(ctx context.Context, lockingDura
os.mu.Lock()
defer os.mu.Unlock()

if limit == -1 {
limit = math.MaxInt
}

parameterKey := fmt.Sprintf("%d-%d-%v", minShards, totalShards, set)
for _, ps := range os.partials {
if ps.parameterKey == parameterKey {
if ps.parameterKey == parameterKey && time.Now().After(ps.lockedUntil) {
ps.lockedUntil = time.Now().Add(lockingDuration)
pss = append(pss, api.PackedSlab{
BufferID: ps.bufferID,
Data: ps.data,
Key: ps.slabKey,
})
if len(pss) == limit {
break
}
}
}
return
Expand Down Expand Up @@ -557,6 +569,21 @@ func (os *objectStoreMock) MultipartUpload(ctx context.Context, uploadID string)
return api.MultipartUpload{}, nil
}

func (os *objectStoreMock) totalSlabBufferSize() (total int) {
for _, p := range os.partials {
if time.Now().After(p.lockedUntil) {
total += len(p.data)
}
}
return
}

func (os *objectStoreMock) setSlabBufferMaxSizeSoft(n int) {
os.mu.Lock()
defer os.mu.Unlock()
os.slabBufferMaxSizeSoft = n
}

func (os *objectStoreMock) forEachObject(fn func(bucket, path string, o object.Object)) {
for bucket, objects := range os.objects {
for path, object := range objects {
Expand Down
13 changes: 6 additions & 7 deletions worker/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,29 +203,28 @@ func (w *worker) upload(ctx context.Context, r io.Reader, contracts []api.Contra
}
}
}
}

// make sure there's a goroutine uploading the remainder of the packed slabs
go w.threadedUploadPackedSlabs(up.rs, up.contractSet, lockingPriorityBackgroundUpload)
// make sure there's a goroutine uploading the remainder of the packed slabs
go w.threadedUploadPackedSlabs(up.rs, up.contractSet, lockingPriorityBackgroundUpload)

This comment has been minimized.

Copy link
@freopen

freopen Apr 8, 2024

I'm sorry, I'm pretty new here. Why is this function now inside the soft limit block?

I was investigating why my instance has 1.5GiB of completed slabs in partial_slabs directory. As far as I see, after this change completed slab background upload will only trigger once we reach soft limit of 4GiB, then it will work until there are no more complete slabs and stop. After that no slabs will be uploaded until we accumulate 4GiB of slabs once again. Is that the intended behavior?

}

return eTag, nil
}

func (w *worker) threadedUploadPackedSlabs(rs api.RedundancySettings, contractSet string, lockPriority int) {
key := fmt.Sprintf("%d-%d_%s", rs.MinShards, rs.TotalShards, contractSet)

w.uploadsMu.Lock()
if w.uploadingPackedSlabs[key] {
if _, ok := w.uploadingPackedSlabs[key]; ok {
w.uploadsMu.Unlock()
return
}
w.uploadingPackedSlabs[key] = true
w.uploadingPackedSlabs[key] = struct{}{}
w.uploadsMu.Unlock()

// make sure we mark uploading packed slabs as false when we're done
defer func() {
w.uploadsMu.Lock()
w.uploadingPackedSlabs[key] = false
delete(w.uploadingPackedSlabs, key)
w.uploadsMu.Unlock()
}()

Expand Down
108 changes: 84 additions & 24 deletions worker/upload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"errors"
"math"
"testing"
"time"

Expand Down Expand Up @@ -33,10 +34,7 @@ func TestUpload(t *testing.T) {
ul := w.uploadManager

// create test data
data := make([]byte, 128)
if _, err := frand.Read(data); err != nil {
t.Fatal(err)
}
data := testData(128)

// create upload params
params := testParameters(t.Name())
Expand Down Expand Up @@ -130,24 +128,24 @@ func TestUploadPackedSlab(t *testing.T) {
w := newTestWorker(t)

// add hosts to worker
w.addHosts(testRedundancySettings.TotalShards * 2)
w.addHosts(testRedundancySettings.TotalShards)

// convenience variables
os := w.os
mm := w.ulmm
dl := w.downloadManager
ul := w.uploadManager

// create test data
data := make([]byte, 128)
if _, err := frand.Read(data); err != nil {
t.Fatal(err)
}

// create upload params
params := testParameters(t.Name())
params.packing = true

// block aysnc packed slab uploads
w.blockAsyncPackedSlabUploads(params)

// create test data
data := testData(128)

// upload data
_, _, err := ul.Upload(context.Background(), bytes.NewReader(data), w.contracts(), params, lockingPriorityUpload)
if err != nil {
Expand Down Expand Up @@ -182,9 +180,9 @@ func TestUploadPackedSlab(t *testing.T) {
t.Fatal("expected 1 packed slab")
}
ps := pss[0]
mem := mm.AcquireMemory(context.Background(), params.rs.SlabSize())

// upload the packed slab
mem := mm.AcquireMemory(context.Background(), params.rs.SlabSize())
err = ul.UploadPackedSlab(context.Background(), params.rs, ps, mem, w.contracts(), 0, lockingPriorityUpload)
if err != nil {
t.Fatal(err)
Expand All @@ -209,6 +207,69 @@ func TestUploadPackedSlab(t *testing.T) {
} else if !bytes.Equal(data, buf.Bytes()) {
t.Fatal("data mismatch")
}

// configure max buffer size
os.setSlabBufferMaxSizeSoft(128)

// upload 2x64 bytes using the worker
params.path = t.Name() + "2"
_, err = w.upload(context.Background(), bytes.NewReader(testData(64)), w.contracts(), params)
if err != nil {
t.Fatal(err)
}
params.path = t.Name() + "3"
_, err = w.upload(context.Background(), bytes.NewReader(testData(64)), w.contracts(), params)
if err != nil {
t.Fatal(err)
}

// assert we still have two packed slabs (buffer limit not reached)
pss, err = os.PackedSlabsForUpload(context.Background(), 0, uint8(params.rs.MinShards), uint8(params.rs.TotalShards), testContractSet, math.MaxInt)
if err != nil {
t.Fatal(err)
} else if len(pss) != 2 {
t.Fatal("expected 2 packed slab")
}

// upload one more byte (buffer limit reached)
params.path = t.Name() + "4"
_, err = w.upload(context.Background(), bytes.NewReader(testData(1)), w.contracts(), params)
if err != nil {
t.Fatal(err)
}

// assert we still have two packed slabs (one got uploaded synchronously)
pss, err = os.PackedSlabsForUpload(context.Background(), 0, uint8(params.rs.MinShards), uint8(params.rs.TotalShards), testContractSet, math.MaxInt)
if err != nil {
t.Fatal(err)
} else if len(pss) != 2 {
t.Fatal("expected 2 packed slab")
}

// allow some time for the background thread to realise we blocked async
// packed slab uploads
time.Sleep(time.Second)

// unblock asynchronous uploads
w.unblockAsyncPackedSlabUploads(params)

// upload 1 byte using the worker
params.path = t.Name() + "5"
_, err = w.upload(context.Background(), bytes.NewReader(testData(129)), w.contracts(), params)
if err != nil {
t.Fatal(err)
}

// allow some time for the thread to pick up the packed slabs
time.Sleep(time.Second)

// assert we uploaded all packed slabs
pss, err = os.PackedSlabsForUpload(context.Background(), 0, uint8(params.rs.MinShards), uint8(params.rs.TotalShards), testContractSet, 1)
if err != nil {
t.Fatal(err)
} else if len(pss) != 0 {
t.Fatal("expected 0 packed slab")
}
}

func TestUploadShards(t *testing.T) {
Expand All @@ -225,10 +286,7 @@ func TestUploadShards(t *testing.T) {
ul := w.uploadManager

// create test data
data := make([]byte, 128)
if _, err := frand.Read(data); err != nil {
t.Fatal(err)
}
data := testData(128)

// create upload params
params := testParameters(t.Name())
Expand Down Expand Up @@ -343,10 +401,7 @@ func TestRefreshUploaders(t *testing.T) {
hm := w.hm

// create test data
data := make([]byte, 128)
if _, err := frand.Read(data); err != nil {
t.Fatal(err)
}
data := testData(128)

// create upload params
params := testParameters(t.Name())
Expand Down Expand Up @@ -444,10 +499,7 @@ func TestUploadRegression(t *testing.T) {
dl := w.downloadManager

// create test data
data := make([]byte, 128)
if _, err := frand.Read(data); err != nil {
t.Fatal(err)
}
data := testData(128)

// create upload params
params := testParameters(t.Name())
Expand Down Expand Up @@ -500,3 +552,11 @@ func testParameters(path string) uploadParameters {
rs: testRedundancySettings,
}
}

func testData(n int) []byte {
data := make([]byte, n)
if _, err := frand.Read(data); err != nil {
panic(err)
}
return data
}
4 changes: 2 additions & 2 deletions worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ type worker struct {
transportPoolV3 *transportPoolV3

uploadsMu sync.Mutex
uploadingPackedSlabs map[string]bool
uploadingPackedSlabs map[string]struct{}

contractSpendingRecorder ContractSpendingRecorder
contractLockingDuration time.Duration
Expand Down Expand Up @@ -1327,7 +1327,7 @@ func New(masterKey [32]byte, id string, b Bus, contractLockingDuration, busFlush
masterKey: masterKey,
logger: l.Sugar(),
startTime: time.Now(),
uploadingPackedSlabs: make(map[string]bool),
uploadingPackedSlabs: make(map[string]struct{}),
shutdownCtx: ctx,
shutdownCtxCancel: cancel,
}
Expand Down
15 changes: 15 additions & 0 deletions worker/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package worker

import (
"context"
"fmt"
"testing"
"time"

Expand Down Expand Up @@ -93,6 +94,20 @@ func (w *testWorker) blockUploads() func() {
return func() { close(blockChan) }
}

func (w *testWorker) blockAsyncPackedSlabUploads(up uploadParameters) {
w.uploadsMu.Lock()
defer w.uploadsMu.Unlock()
key := fmt.Sprintf("%d-%d_%s", up.rs.MinShards, up.rs.TotalShards, up.contractSet)
w.uploadingPackedSlabs[key] = struct{}{}
}

func (w *testWorker) unblockAsyncPackedSlabUploads(up uploadParameters) {
w.uploadsMu.Lock()
defer w.uploadsMu.Unlock()
key := fmt.Sprintf("%d-%d_%s", up.rs.MinShards, up.rs.TotalShards, up.contractSet)
delete(w.uploadingPackedSlabs, key)
}

func (w *testWorker) contracts() []api.ContractMetadata {
metadatas, err := w.cs.Contracts(context.Background(), api.ContractsOpts{})
if err != nil {
Expand Down

0 comments on commit 716091b

Please sign in to comment.