Skip to content

Commit

Permalink
Merge pull request #980 from SiaFoundation/pj/fix-packed-slab-uploads
Browse files Browse the repository at this point in the history
Only upload a singly synchronous packed slab when buffer is reached
ChrisSchinnerl authored Feb 28, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
2 parents e031cf4 + c956182 commit 1ef39bc
Showing 7 changed files with 215 additions and 109 deletions.
7 changes: 6 additions & 1 deletion api/setting.go
Original file line number Diff line number Diff line change
@@ -126,7 +126,12 @@ func (rs RedundancySettings) Redundancy() float64 {
return float64(rs.TotalShards) / float64(rs.MinShards)
}

// SlabSizeNoRedundancy returns the size of a slab without added redundancy.
// SlabSize returns the size of a slab.
func (rs RedundancySettings) SlabSize() uint64 {
return uint64(rs.TotalShards) * rhpv2.SectorSize
}

// SlabSizeNoRedundancy returns the size of a slab without redundancy.
func (rs RedundancySettings) SlabSizeNoRedundancy() uint64 {
return uint64(rs.MinShards) * rhpv2.SectorSize
}
3 changes: 1 addition & 2 deletions internal/test/e2e/pruning_test.go
Original file line number Diff line number Diff line change
@@ -9,7 +9,6 @@ import (
"testing"
"time"

rhpv2 "go.sia.tech/core/rhp/v2"
"go.sia.tech/core/types"
"go.sia.tech/renterd/api"
"go.sia.tech/renterd/hostdb"
@@ -208,7 +207,7 @@ func TestSectorPruning(t *testing.T) {
tt.Retry(100, 100*time.Millisecond, func() error {
res, err = b.PrunableData(context.Background())
tt.OK(err)
if res.TotalPrunable != uint64(math.Ceil(float64(numObjects)/2))*uint64(rs.TotalShards)*rhpv2.SectorSize {
if res.TotalPrunable != uint64(math.Ceil(float64(numObjects)/2))*rs.SlabSize() {
return fmt.Errorf("unexpected prunable data %v", n)
}
return nil
45 changes: 36 additions & 9 deletions worker/mocks_test.go
Original file line number Diff line number Diff line change
@@ -5,6 +5,7 @@ import (
"encoding/json"
"errors"
"fmt"
"math"
"math/big"
"sync"
"time"
@@ -346,24 +347,27 @@ var _ ObjectStore = (*objectStoreMock)(nil)

type (
objectStoreMock struct {
mu sync.Mutex
objects map[string]map[string]object.Object
partials map[string]packedSlabMock
bufferIDCntr uint // allows marking packed slabs as uploaded
mu sync.Mutex
objects map[string]map[string]object.Object
partials map[string]*packedSlabMock
slabBufferMaxSizeSoft int
bufferIDCntr uint // allows marking packed slabs as uploaded
}

packedSlabMock struct {
parameterKey string // ([minshards]-[totalshards]-[contractset])
bufferID uint
slabKey object.EncryptionKey
data []byte
lockedUntil time.Time
}
)

func newObjectStoreMock(bucket string) *objectStoreMock {
os := &objectStoreMock{
objects: make(map[string]map[string]object.Object),
partials: make(map[string]packedSlabMock),
objects: make(map[string]map[string]object.Object),
partials: make(map[string]*packedSlabMock),
slabBufferMaxSizeSoft: math.MaxInt64,
}
os.objects[bucket] = make(map[string]object.Object)
return os
@@ -421,15 +425,15 @@ func (os *objectStoreMock) AddPartialSlab(ctx context.Context, data []byte, minS
}

// update store
os.partials[ec.String()] = packedSlabMock{
os.partials[ec.String()] = &packedSlabMock{
parameterKey: fmt.Sprintf("%d-%d-%v", minShards, totalShards, contractSet),
bufferID: os.bufferIDCntr,
slabKey: ec,
data: data,
}
os.bufferIDCntr++

return []object.SlabSlice{ss}, false, nil
return []object.SlabSlice{ss}, os.totalSlabBufferSize() > os.slabBufferMaxSizeSoft, nil
}

func (os *objectStoreMock) Object(ctx context.Context, bucket, path string, opts api.GetObjectOptions) (api.ObjectsResponse, error) {
@@ -511,14 +515,22 @@ func (os *objectStoreMock) PackedSlabsForUpload(ctx context.Context, lockingDura
os.mu.Lock()
defer os.mu.Unlock()

if limit == -1 {
limit = math.MaxInt
}

parameterKey := fmt.Sprintf("%d-%d-%v", minShards, totalShards, set)
for _, ps := range os.partials {
if ps.parameterKey == parameterKey {
if ps.parameterKey == parameterKey && time.Now().After(ps.lockedUntil) {
ps.lockedUntil = time.Now().Add(lockingDuration)
pss = append(pss, api.PackedSlab{
BufferID: ps.bufferID,
Data: ps.data,
Key: ps.slabKey,
})
if len(pss) == limit {
break
}
}
}
return
@@ -557,6 +569,21 @@ func (os *objectStoreMock) MultipartUpload(ctx context.Context, uploadID string)
return api.MultipartUpload{}, nil
}

func (os *objectStoreMock) totalSlabBufferSize() (total int) {
for _, p := range os.partials {
if time.Now().After(p.lockedUntil) {
total += len(p.data)
}
}
return
}

func (os *objectStoreMock) setSlabBufferMaxSizeSoft(n int) {
os.mu.Lock()
defer os.mu.Unlock()
os.slabBufferMaxSizeSoft = n
}

func (os *objectStoreMock) forEachObject(fn func(bucket, path string, o object.Object)) {
for bucket, objects := range os.objects {
for path, object := range objects {
143 changes: 71 additions & 72 deletions worker/upload.go
Original file line number Diff line number Diff line change
@@ -177,119 +177,118 @@ func (w *worker) upload(ctx context.Context, r io.Reader, contracts []api.Contra
return "", err
}

// if packing was enabled try uploading packed slabs
if up.packing {
if err := w.tryUploadPackedSlabs(ctx, up.rs, up.contractSet, bufferSizeLimitReached); err != nil {
w.logger.Errorf("couldn't upload packed slabs, err: %v", err)
// return early if worker was shut down or if we don't have to consider
// packed uploads
if w.isStopped() || !up.packing {
return eTag, nil
}

// try and upload one slab synchronously
if bufferSizeLimitReached {
mem := w.uploadManager.mm.AcquireMemory(ctx, up.rs.SlabSize())
if mem != nil {
defer mem.Release()

// fetch packed slab to upload
packedSlabs, err := w.bus.PackedSlabsForUpload(ctx, defaultPackedSlabsLockDuration, uint8(up.rs.MinShards), uint8(up.rs.TotalShards), up.contractSet, 1)
if err != nil {
return "", fmt.Errorf("couldn't fetch packed slabs from bus: %v", err)
}

// upload packed slab
if len(packedSlabs) > 0 {
if err := w.tryUploadPackedSlab(ctx, mem, packedSlabs[0], up.rs, up.contractSet, lockingPriorityBlockedUpload); err != nil {
w.logger.Errorf("couldn't upload packed slabs, err: %v", err)
}
}
}

// make sure there's a goroutine uploading the remainder of the packed slabs
go w.threadedUploadPackedSlabs(up.rs, up.contractSet, lockingPriorityBackgroundUpload)
}

return eTag, nil
}

func (w *worker) threadedUploadPackedSlabs(rs api.RedundancySettings, contractSet string, lockPriority int) {
key := fmt.Sprintf("%d-%d_%s", rs.MinShards, rs.TotalShards, contractSet)

w.uploadsMu.Lock()
if w.uploadingPackedSlabs[key] {
if _, ok := w.uploadingPackedSlabs[key]; ok {
w.uploadsMu.Unlock()
return
}
w.uploadingPackedSlabs[key] = true
w.uploadingPackedSlabs[key] = struct{}{}
w.uploadsMu.Unlock()

// make sure we mark uploading packed slabs as false when we're done
defer func() {
w.uploadsMu.Lock()
w.uploadingPackedSlabs[key] = false
delete(w.uploadingPackedSlabs, key)
w.uploadsMu.Unlock()
}()

// keep uploading packed slabs until we're done
for {
uploaded, err := w.uploadPackedSlabs(w.shutdownCtx, defaultPackedSlabsLockDuration, rs, contractSet, lockPriority)
if err != nil {
w.logger.Errorf("couldn't upload packed slabs, err: %v", err)
return
} else if uploaded == 0 {
return
}
}
}

func (w *worker) tryUploadPackedSlabs(ctx context.Context, rs api.RedundancySettings, contractSet string, block bool) (err error) {
// if we want to block, try and upload one packed slab synchronously, we use
// a slightly higher upload priority to avoid reaching the context deadline
if block {
_, err = w.uploadPackedSlabs(ctx, defaultPackedSlabsLockDuration, rs, contractSet, lockingPriorityBlockedUpload)
}

// make sure there's a goroutine uploading the remainder of the packed slabs
go w.threadedUploadPackedSlabs(rs, contractSet, lockingPriorityBackgroundUpload)
return
}

func (w *worker) uploadPackedSlabs(ctx context.Context, lockingDuration time.Duration, rs api.RedundancySettings, contractSet string, lockPriority int) (uploaded int, err error) {
// upload packed slabs
var mu sync.Mutex
var errs error

var wg sync.WaitGroup
totalSize := uint64(rs.TotalShards) * rhpv2.SectorSize

// derive a context that we can use as an interrupt in case of an error.
interruptCtx, cancel := context.WithCancel(ctx)
defer cancel()
// derive a context that we can use as an interrupt in case of an error or shutdown.
interruptCtx, interruptCancel := context.WithCancel(w.shutdownCtx)
defer interruptCancel()

var wg sync.WaitGroup
for {
// block until we have memory for a slab or until we are interrupted
mem := w.uploadManager.mm.AcquireMemory(interruptCtx, totalSize)
// block until we have memory
mem := w.uploadManager.mm.AcquireMemory(interruptCtx, rs.SlabSize())
if mem == nil {
break // interrupted
}

// fetch packed slabs to upload
var packedSlabs []api.PackedSlab
packedSlabs, err = w.bus.PackedSlabsForUpload(ctx, lockingDuration, uint8(rs.MinShards), uint8(rs.TotalShards), contractSet, 1)
// fetch packed slab to upload
packedSlabs, err := w.bus.PackedSlabsForUpload(interruptCtx, defaultPackedSlabsLockDuration, uint8(rs.MinShards), uint8(rs.TotalShards), contractSet, 1)
if err != nil {
err = fmt.Errorf("couldn't fetch packed slabs from bus: %v", err)
mu.Lock()
errs = errors.Join(errs, fmt.Errorf("couldn't fetch packed slabs from bus: %v", err))
mu.Unlock()
}

// no more packed slabs to upload
if len(packedSlabs) == 0 {
mem.Release()
break
} else if len(packedSlabs) == 0 {
mem.Release()
break // no more slabs
}
ps := packedSlabs[0]

// launch upload for slab
wg.Add(1)
go func(ps api.PackedSlab) {
defer mem.Release()
defer wg.Done()
err := w.uploadPackedSlab(ctx, rs, ps, mem, contractSet, lockPriority)
mu.Lock()
if err != nil {
defer mem.Release()

// we use the background context here, but apply a sane timeout,
// this ensures ongoing uploads are handled gracefully during
// shutdown
ctx, cancel := context.WithTimeout(context.Background(), defaultPackedSlabsUploadTimeout)
defer cancel()

// try to upload a packed slab, if there were no packed slabs left to upload ok is false
if err := w.tryUploadPackedSlab(ctx, mem, ps, rs, contractSet, lockPriority); err != nil {
mu.Lock()
errs = errors.Join(errs, err)
cancel() // prevent new uploads from being launched
} else {
uploaded++
mu.Unlock()
interruptCancel() // prevent new uploads from being launched
}
mu.Unlock()
}(ps)
}(packedSlabs[0])
}

// wait for all threads to finish
wg.Wait()

// return collected errors
err = errors.Join(err, errs)
// log errors
if err := errors.Join(errs); err != nil {
w.logger.Errorf("couldn't upload packed slabs, err: %v", err)
}
return
}

func (w *worker) uploadPackedSlab(ctx context.Context, rs api.RedundancySettings, ps api.PackedSlab, mem Memory, contractSet string, lockPriority int) error {
// create a context with sane timeout
ctx, cancel := context.WithTimeout(ctx, defaultPackedSlabsUploadTimeout)
defer cancel()

func (w *worker) tryUploadPackedSlab(ctx context.Context, mem Memory, ps api.PackedSlab, rs api.RedundancySettings, contractSet string, lockPriority int) error {
// fetch contracts
contracts, err := w.bus.Contracts(ctx, api.ContractsOpts{ContractSet: contractSet})
if err != nil {
@@ -434,9 +433,9 @@ func (mgr *uploadManager) Upload(ctx context.Context, r io.Reader, contracts []a
// channel to notify main thread of the number of slabs to wait for
numSlabsChan := make(chan int, 1)

// prepare slab size
size := int64(up.rs.MinShards) * rhpv2.SectorSize
redundantSize := uint64(up.rs.TotalShards) * rhpv2.SectorSize
// prepare slab sizes
slabSizeNoRedundancy := up.rs.SlabSizeNoRedundancy()
slabSize := up.rs.SlabSize()
var partialSlab []byte

// launch uploads in a separate goroutine
@@ -451,14 +450,14 @@ func (mgr *uploadManager) Upload(ctx context.Context, r io.Reader, contracts []a
default:
}
// acquire memory
mem := mgr.mm.AcquireMemory(ctx, redundantSize)
mem := mgr.mm.AcquireMemory(ctx, slabSize)
if mem == nil {
return // interrupted
}

// read next slab's data
data := make([]byte, size)
length, err := io.ReadFull(io.LimitReader(cr, size), data)
data := make([]byte, slabSizeNoRedundancy)
length, err := io.ReadFull(io.LimitReader(cr, int64(slabSizeNoRedundancy)), data)
if err == io.EOF {
mem.Release()

98 changes: 75 additions & 23 deletions worker/upload_test.go
Original file line number Diff line number Diff line change
@@ -4,6 +4,7 @@ import (
"bytes"
"context"
"errors"
"math"
"testing"
"time"

@@ -33,10 +34,7 @@ func TestUpload(t *testing.T) {
ul := w.uploadManager

// create test data
data := make([]byte, 128)
if _, err := frand.Read(data); err != nil {
t.Fatal(err)
}
data := frand.Bytes(128)

// create upload params
params := testParameters(t.Name())
@@ -138,16 +136,16 @@ func TestUploadPackedSlab(t *testing.T) {
dl := w.downloadManager
ul := w.uploadManager

// create test data
data := make([]byte, 128)
if _, err := frand.Read(data); err != nil {
t.Fatal(err)
}

// create upload params
params := testParameters(t.Name())
params.packing = true

// block aysnc packed slab uploads
w.BlockAsyncPackedSlabUploads(params)

// create test data
data := frand.Bytes(128)

// upload data
_, _, err := ul.Upload(context.Background(), bytes.NewReader(data), w.Contracts(), params, lockingPriorityUpload)
if err != nil {
@@ -182,9 +180,9 @@ func TestUploadPackedSlab(t *testing.T) {
t.Fatal("expected 1 packed slab")
}
ps := pss[0]
mem := mm.AcquireMemory(context.Background(), uint64(params.rs.TotalShards*rhpv2.SectorSize))

// upload the packed slab
mem := mm.AcquireMemory(context.Background(), params.rs.SlabSize())
err = ul.UploadPackedSlab(context.Background(), params.rs, ps, mem, w.Contracts(), 0, lockingPriorityUpload)
if err != nil {
t.Fatal(err)
@@ -209,6 +207,69 @@ func TestUploadPackedSlab(t *testing.T) {
} else if !bytes.Equal(data, buf.Bytes()) {
t.Fatal("data mismatch")
}

// configure max buffer size
os.setSlabBufferMaxSizeSoft(128)

// upload 2x64 bytes using the worker
params.path = t.Name() + "2"
_, err = w.upload(context.Background(), bytes.NewReader(frand.Bytes(64)), w.Contracts(), params)
if err != nil {
t.Fatal(err)
}
params.path = t.Name() + "3"
_, err = w.upload(context.Background(), bytes.NewReader(frand.Bytes(64)), w.Contracts(), params)
if err != nil {
t.Fatal(err)
}

// assert we still have two packed slabs (buffer limit not reached)
pss, err = os.PackedSlabsForUpload(context.Background(), 0, uint8(params.rs.MinShards), uint8(params.rs.TotalShards), testContractSet, math.MaxInt)
if err != nil {
t.Fatal(err)
} else if len(pss) != 2 {
t.Fatal("expected 2 packed slab")
}

// upload one more byte (buffer limit reached)
params.path = t.Name() + "4"
_, err = w.upload(context.Background(), bytes.NewReader(frand.Bytes(1)), w.Contracts(), params)
if err != nil {
t.Fatal(err)
}

// assert we still have two packed slabs (one got uploaded synchronously)
pss, err = os.PackedSlabsForUpload(context.Background(), 0, uint8(params.rs.MinShards), uint8(params.rs.TotalShards), testContractSet, math.MaxInt)
if err != nil {
t.Fatal(err)
} else if len(pss) != 2 {
t.Fatal("expected 2 packed slab")
}

// allow some time for the background thread to realise we blocked async
// packed slab uploads
time.Sleep(time.Second)

// unblock asynchronous uploads
w.UnblockAsyncPackedSlabUploads(params)

// upload 1 byte using the worker
params.path = t.Name() + "5"
_, err = w.upload(context.Background(), bytes.NewReader(frand.Bytes(129)), w.Contracts(), params)
if err != nil {
t.Fatal(err)
}

// allow some time for the thread to pick up the packed slabs
time.Sleep(time.Second)

// assert we uploaded all packed slabs
pss, err = os.PackedSlabsForUpload(context.Background(), 0, uint8(params.rs.MinShards), uint8(params.rs.TotalShards), testContractSet, 1)
if err != nil {
t.Fatal(err)
} else if len(pss) != 0 {
t.Fatal("expected 0 packed slab")
}
}

func TestUploadShards(t *testing.T) {
@@ -225,10 +286,7 @@ func TestUploadShards(t *testing.T) {
ul := w.uploadManager

// create test data
data := make([]byte, 128)
if _, err := frand.Read(data); err != nil {
t.Fatal(err)
}
data := frand.Bytes(128)

// create upload params
params := testParameters(t.Name())
@@ -343,10 +401,7 @@ func TestRefreshUploaders(t *testing.T) {
hm := w.hm

// create test data
data := make([]byte, 128)
if _, err := frand.Read(data); err != nil {
t.Fatal(err)
}
data := frand.Bytes(128)

// create upload params
params := testParameters(t.Name())
@@ -444,10 +499,7 @@ func TestUploadRegression(t *testing.T) {
dl := w.downloadManager

// create test data
data := make([]byte, 128)
if _, err := frand.Read(data); err != nil {
t.Fatal(err)
}
data := frand.Bytes(128)

// create upload params
params := testParameters(t.Name())
13 changes: 11 additions & 2 deletions worker/worker.go
Original file line number Diff line number Diff line change
@@ -211,7 +211,7 @@ type worker struct {
transportPoolV3 *transportPoolV3

uploadsMu sync.Mutex
uploadingPackedSlabs map[string]bool
uploadingPackedSlabs map[string]struct{}

contractSpendingRecorder ContractSpendingRecorder
contractLockingDuration time.Duration
@@ -222,6 +222,15 @@ type worker struct {
logger *zap.SugaredLogger
}

func (w *worker) isStopped() bool {
select {
case <-w.shutdownCtx.Done():
return true
default:
}
return false
}

func (w *worker) withRevision(ctx context.Context, fetchTimeout time.Duration, fcid types.FileContractID, hk types.PublicKey, siamuxAddr string, lockPriority int, fn func(rev types.FileContractRevision) error) error {
return w.withContractLock(ctx, fcid, lockPriority, func() error {
h := w.Host(hk, fcid, siamuxAddr)
@@ -1318,7 +1327,7 @@ func New(masterKey [32]byte, id string, b Bus, contractLockingDuration, busFlush
masterKey: masterKey,
logger: l.Sugar(),
startTime: time.Now(),
uploadingPackedSlabs: make(map[string]bool),
uploadingPackedSlabs: make(map[string]struct{}),
shutdownCtx: ctx,
shutdownCtxCancel: cancel,
}
15 changes: 15 additions & 0 deletions worker/worker_test.go
Original file line number Diff line number Diff line change
@@ -2,6 +2,7 @@ package worker

import (
"context"
"fmt"
"time"

rhpv2 "go.sia.tech/core/rhp/v2"
@@ -93,6 +94,20 @@ func (w *testWorker) BlockUploads() func() {
return func() { close(blockChan) }
}

func (w *testWorker) BlockAsyncPackedSlabUploads(up uploadParameters) {
w.uploadsMu.Lock()
defer w.uploadsMu.Unlock()
key := fmt.Sprintf("%d-%d_%s", up.rs.MinShards, up.rs.TotalShards, up.contractSet)
w.uploadingPackedSlabs[key] = struct{}{}
}

func (w *testWorker) UnblockAsyncPackedSlabUploads(up uploadParameters) {
w.uploadsMu.Lock()
defer w.uploadsMu.Unlock()
key := fmt.Sprintf("%d-%d_%s", up.rs.MinShards, up.rs.TotalShards, up.contractSet)
delete(w.uploadingPackedSlabs, key)
}

func (w *testWorker) Contracts() []api.ContractMetadata {
metadatas, err := w.cs.Contracts(context.Background(), api.ContractsOpts{})
if err != nil {

0 comments on commit 1ef39bc

Please sign in to comment.