From 02b784a3204c3f2fb40ab1a5df57f3eed543921f Mon Sep 17 00:00:00 2001 From: PJ Date: Tue, 19 Sep 2023 16:49:56 +0200 Subject: [PATCH 01/12] worker: upload packed slabs in goroutine --- worker/upload.go | 126 ++++++++++++++++++++++++++++++++++++++++++++++- worker/worker.go | 56 ++------------------- 2 files changed, 129 insertions(+), 53 deletions(-) diff --git a/worker/upload.go b/worker/upload.go index 53cfcce09..79ef9e029 100644 --- a/worker/upload.go +++ b/worker/upload.go @@ -167,6 +167,128 @@ func (w *worker) initUploadManager(maxOverdrive uint64, overdriveTimeout time.Du w.uploadManager = newUploadManager(w.bus, w, w, maxOverdrive, overdriveTimeout, logger) } +func (w *worker) upload(ctx context.Context, r io.Reader, rs api.RedundancySettings, bucket, path, contractSet string, bh uint64, pack bool) (err error) { + // fetch contracts + contracts, err := w.bus.ContractSetContracts(ctx, contractSet) + if err != nil { + return fmt.Errorf("couldn't fetch contracts from bus: %w", err) + } + + // perform the upload + obj, used, partialSlabData, err := w.uploadManager.Upload(ctx, r, rs, contracts, bh, pack) + if err != nil { + return fmt.Errorf("couldn't upload object: %w", err) + } + + // add parital slabs + if len(partialSlabData) > 0 { + partialSlabs, err := w.bus.AddPartialSlab(ctx, partialSlabData, uint8(rs.MinShards), uint8(rs.TotalShards), contractSet) + if err != nil { + return err + } + obj.PartialSlabs = partialSlabs + } + + // persist the object + err = w.bus.AddObject(ctx, bucket, path, contractSet, obj, used) + if err != nil { + return fmt.Errorf("couldn't add object: %w", err) + } + + // if packing was enabled try uploading packed slabs in a separate goroutine + if pack { + go w.tryUploadPackedSlabs(rs, contractSet) + } + return nil +} + +func (w *worker) tryUploadPackedSlabs(rs api.RedundancySettings, contractSet string) { + key := fmt.Sprintf("%d-%d_%s", rs.MinShards, rs.TotalShards, contractSet) + + w.uploadsMu.Lock() + if w.uploadingPackedSlabs[key] { + w.uploadsMu.Unlock() + return + } + w.uploadingPackedSlabs[key] = true + w.uploadsMu.Unlock() + + // make sure we mark uploading packed slabs as false when we're done + defer func() { + w.uploadsMu.Lock() + w.uploadingPackedSlabs[key] = false + w.uploadsMu.Unlock() + }() + + // keep uploading packed slabs until we're done + packedSlabsFound := true + for packedSlabsFound { + packedSlabsFound = func() bool { + // create a context with sane timeout + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + + // fetch contracts + contracts, err := w.bus.ContractSetContracts(ctx, contractSet) + if err != nil { + w.logger.Errorf("couldn't fetch packed slabs from bus: %v", err) + return false + } + + // fetch consensus state + cs, err := w.bus.ConsensusState(ctx) + if err != nil { + w.logger.Errorf("couldn't fetch consensus state from bus: %v", err) + return false + } + + // if partial uploads are enabled, check whether we have a full slab now + packedSlabs, err := w.bus.PackedSlabsForUpload(ctx, 5*time.Minute, uint8(rs.MinShards), uint8(rs.TotalShards), contractSet, -1) + if err != nil { + w.logger.Errorf("couldn't fetch packed slabs from bus: %v", err) + return false + } + + // if there's no packed slabs, we're done + if len(packedSlabs) == 0 { + return false + } + + // upload packed slabs + for _, ps := range packedSlabs { + if err := func() error { + // create a context with sane timeout + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute) + defer cancel() + + // upload packed slab + shards := encryptPartialSlab(ps.Data, ps.Key, uint8(rs.MinShards), uint8(rs.TotalShards)) + sectors, used, err := w.uploadManager.Migrate(ctx, shards, contracts, cs.BlockHeight) + if err != nil { + w.logger.Errorf("couldn't upload packed slab, err: %v", err) + return err + } + + // mark packed slab as uploaded + if err = w.bus.MarkPackedSlabsUploaded(ctx, []api.UploadedPackedSlab{ + { + BufferID: ps.BufferID, + Shards: sectors, + }, + }, used); err != nil { + w.logger.Errorf("couldn't mark packed slabs uploaded, err: %v", err) + return err + } + return nil + }(); err != nil { + return false + } + } + return true + }() + } +} + func newDataPoints(halfLife time.Duration) *dataPoints { return &dataPoints{ size: 20, @@ -290,7 +412,7 @@ func (mgr *uploadManager) Stop() { } } -func (mgr *uploadManager) Upload(ctx context.Context, r io.Reader, rs api.RedundancySettings, contracts []api.ContractMetadata, bh uint64, uploadPacking bool) (_ object.Object, used map[types.PublicKey]types.FileContractID, partialSlab []byte, err error) { +func (mgr *uploadManager) Upload(ctx context.Context, r io.Reader, rs api.RedundancySettings, contracts []api.ContractMetadata, bh uint64, pack bool) (_ object.Object, used map[types.PublicKey]types.FileContractID, partialSlab []byte, err error) { // cancel all in-flight requests when the upload is done ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -355,7 +477,7 @@ loop: } else if err != nil && err != io.ErrUnexpectedEOF { return object.Object{}, nil, nil, err } - if uploadPacking && errors.Is(err, io.ErrUnexpectedEOF) { + if pack && errors.Is(err, io.ErrUnexpectedEOF) { // If uploadPacking is true, we return the partial slab without // uploading. partialSlab = data[:length] diff --git a/worker/worker.go b/worker/worker.go index f8c9e6ced..a430525c3 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -261,6 +261,9 @@ type worker struct { busFlushInterval time.Duration + uploadsMu sync.Mutex + uploadingPackedSlabs map[string]bool + interactionsMu sync.Mutex interactionsScans []hostdb.HostScan interactionsPriceTableUpdates []hostdb.PriceTableUpdate @@ -1130,61 +1133,11 @@ func (w *worker) objectsHandlerPUT(jc jape.Context) { // attach gouging checker to the context ctx = WithGougingChecker(ctx, w.bus, up.GougingParams) - // update uploader contracts - contracts, err := w.bus.ContractSetContracts(ctx, up.ContractSet) - if jc.Check("couldn't fetch contracts from bus", err) != nil { - return - } - // upload the object - obj, used, partialSlabData, err := w.uploadManager.Upload(ctx, jc.Request.Body, rs, contracts, up.CurrentHeight, up.UploadPacking) + err = w.upload(ctx, jc.Request.Body, rs, bucket, jc.PathParam("path"), up.ContractSet, up.CurrentHeight, up.UploadPacking) if jc.Check("couldn't upload object", err) != nil { return } - - if len(partialSlabData) > 0 { - partialSlabs, err := w.bus.AddPartialSlab(jc.Request.Context(), partialSlabData, uint8(rs.MinShards), uint8(rs.TotalShards), up.ContractSet) - if jc.Check("couldn't add partial slabs to bus", err) != nil { - return - } - obj.PartialSlabs = partialSlabs - } - - // persist the object - if jc.Check("couldn't add object", w.bus.AddObject(ctx, bucket, jc.PathParam("path"), up.ContractSet, obj, used)) != nil { - return - } - - // if partial uploads are not enabled we are done. - if !up.UploadPacking { - return - } - - // if partial uploads are enabled, check whether we have a full slab now - packedSlabs, err := w.bus.PackedSlabsForUpload(jc.Request.Context(), 5*time.Minute, uint8(rs.MinShards), uint8(rs.TotalShards), up.ContractSet, 2) - if jc.Check("couldn't fetch packed slabs from bus", err) != nil { - return - } - - for _, ps := range packedSlabs { - // upload packed slab. - shards := encryptPartialSlab(ps.Data, ps.Key, uint8(rs.MinShards), uint8(rs.TotalShards)) - sectors, used, err := w.uploadManager.Migrate(ctx, shards, contracts, up.CurrentHeight) - if jc.Check("couldn't upload packed slab", err) != nil { - return - } - - // mark packed slab as uploaded. - err = w.bus.MarkPackedSlabsUploaded(jc.Request.Context(), []api.UploadedPackedSlab{ - { - BufferID: ps.BufferID, - Shards: sectors, - }, - }, used) - if jc.Check("couldn't mark packed slabs uploaded", err) != nil { - return - } - } } func encryptPartialSlab(data []byte, key object.EncryptionKey, minShards, totalShards uint8) [][]byte { @@ -1302,6 +1255,7 @@ func New(masterKey [32]byte, id string, b Bus, contractLockingDuration, busFlush busFlushInterval: busFlushInterval, logger: l.Sugar().Named("worker").Named(id), startTime: time.Now(), + uploadingPackedSlabs: make(map[string]bool), } w.initTransportPool() w.initAccounts(b) From 18c254f82b4ec9e3eb6cd366812428b4d08cfb50 Mon Sep 17 00:00:00 2001 From: PJ Date: Tue, 19 Sep 2023 18:03:53 +0200 Subject: [PATCH 02/12] worker: update upload config --- object/object.go | 3 +- worker/upload.go | 91 ++++++++++++++++++++++++++++++------------------ worker/worker.go | 12 ++++--- 3 files changed, 67 insertions(+), 39 deletions(-) diff --git a/object/object.go b/object/object.go index 64f2c1ae8..561540b11 100644 --- a/object/object.go +++ b/object/object.go @@ -5,7 +5,6 @@ import ( "crypto/cipher" "encoding/binary" "encoding/hex" - "errors" "fmt" "io" "math" @@ -52,7 +51,7 @@ func (k *EncryptionKey) UnmarshalText(b []byte) error { // given offset. func (k EncryptionKey) Encrypt(r io.Reader, offset uint64) (cipher.StreamReader, error) { if offset%64 != 0 { - return cipher.StreamReader{}, errors.New("offset must be a multiple of 64") + return cipher.StreamReader{}, fmt.Errorf("offset must be a multiple of 64, got %v", offset) } if k.IsNoopKey() { return cipher.StreamReader{S: &noOpStream{}, R: r}, nil diff --git a/worker/upload.go b/worker/upload.go index a4e722b45..c97576480 100644 --- a/worker/upload.go +++ b/worker/upload.go @@ -17,6 +17,7 @@ import ( rhpv2 "go.sia.tech/core/rhp/v2" "go.sia.tech/core/types" "go.sia.tech/renterd/api" + "go.sia.tech/renterd/build" "go.sia.tech/renterd/object" "go.sia.tech/renterd/tracing" "go.uber.org/zap" @@ -37,6 +38,19 @@ var ( type uploadConfig struct { ec object.EncryptionKey encryptionOffset uint64 + + rs api.RedundancySettings + bh uint64 + contractSet string + pack bool +} + +func defaultConfig() uploadConfig { + return uploadConfig{ + ec: object.GenerateEncryptionKey(), // random key + encryptionOffset: 0, // from the beginning + rs: build.DefaultRedundancySettings, + } } type UploadOption func(*uploadConfig) @@ -53,6 +67,15 @@ func WithCustomEncryptionOffset(offset uint64) UploadOption { } } +func WithUploadParams(up api.UploadParams) UploadOption { + return func(cfg *uploadConfig) { + cfg.rs = up.RedundancySettings + cfg.bh = up.ConsensusState.BlockHeight + cfg.contractSet = up.ContractSet + cfg.pack = up.UploadPacking + } +} + type ( slabID [8]byte @@ -187,16 +210,22 @@ func (w *worker) initUploadManager(maxOverdrive uint64, overdriveTimeout time.Du w.uploadManager = newUploadManager(w.bus, w, w, maxOverdrive, overdriveTimeout, logger) } -func (w *worker) upload(ctx context.Context, r io.Reader, rs api.RedundancySettings, bucket, path, contractSet string, bh uint64, pack bool, opts ...UploadOption) (string, error) { +func (w *worker) upload(ctx context.Context, r io.Reader, bucket, path string, opts ...UploadOption) (string, error) { + // build upload config + cfg := defaultConfig() + for _, opt := range opts { + opt(&cfg) + } + // perform the upload - obj, used, partialSlabData, etag, err := w.uploadManager.Upload(ctx, r, rs, contractSet, bh, pack, opts...) + obj, partialSlabData, used, etag, err := w.uploadManager.Upload(ctx, r, cfg) if err != nil { return "", fmt.Errorf("couldn't upload object: %w", err) } // add parital slabs if len(partialSlabData) > 0 { - partialSlabs, err := w.bus.AddPartialSlab(ctx, partialSlabData, uint8(rs.MinShards), uint8(rs.TotalShards), contractSet) + partialSlabs, err := w.bus.AddPartialSlab(ctx, partialSlabData, uint8(cfg.rs.MinShards), uint8(cfg.rs.TotalShards), cfg.contractSet) if err != nil { return "", err } @@ -204,28 +233,34 @@ func (w *worker) upload(ctx context.Context, r io.Reader, rs api.RedundancySetti } // persist the object - err = w.bus.AddObject(ctx, bucket, path, contractSet, obj, used) + err = w.bus.AddObject(ctx, bucket, path, cfg.contractSet, obj, used) if err != nil { return "", fmt.Errorf("couldn't add object: %w", err) } // if packing was enabled try uploading packed slabs in a separate goroutine - if pack { - go w.tryUploadPackedSlabs(rs, contractSet) + if cfg.pack { + go w.tryUploadPackedSlabs(cfg.rs, cfg.contractSet) } return etag, nil } -func (w *worker) uploadMultiPart(ctx context.Context, r io.Reader, rs api.RedundancySettings, bucket, path, contractSet, uploadID string, partNumber int, bh uint64, pack bool, opts ...UploadOption) (string, error) { +func (w *worker) uploadMultiPart(ctx context.Context, r io.Reader, bucket, path, uploadID string, partNumber int, opts ...UploadOption) (string, error) { + // build upload config + cfg := defaultConfig() + for _, opt := range opts { + opt(&cfg) + } + // upload the part - obj, used, partialSlabData, etag, err := w.uploadManager.Upload(ctx, r, rs, contractSet, bh, pack, opts...) + obj, partialSlabData, used, etag, err := w.uploadManager.Upload(ctx, r, cfg) if err != nil { return "", fmt.Errorf("couldn't upload object: %w", err) } // add parital slabs if len(partialSlabData) > 0 { - partialSlabs, err := w.bus.AddPartialSlab(ctx, partialSlabData, uint8(rs.MinShards), uint8(rs.TotalShards), contractSet) + partialSlabs, err := w.bus.AddPartialSlab(ctx, partialSlabData, uint8(cfg.rs.MinShards), uint8(cfg.rs.TotalShards), cfg.contractSet) if err != nil { return "", err } @@ -233,14 +268,14 @@ func (w *worker) uploadMultiPart(ctx context.Context, r io.Reader, rs api.Redund } // persist the part - err = w.bus.AddMultipartPart(ctx, bucket, path, contractSet, uploadID, partNumber, obj.Slabs, obj.PartialSlabs, etag, used) + err = w.bus.AddMultipartPart(ctx, bucket, path, cfg.contractSet, uploadID, partNumber, obj.Slabs, obj.PartialSlabs, etag, used) if err != nil { return "", fmt.Errorf("couldn't add multi part: %w", err) } // if packing was enabled try uploading packed slabs in a separate goroutine - if pack { - go w.tryUploadPackedSlabs(rs, contractSet) + if cfg.pack { + go w.tryUploadPackedSlabs(cfg.rs, cfg.contractSet) } return etag, nil } @@ -455,7 +490,7 @@ func (mgr *uploadManager) Stop() { } } -func (mgr *uploadManager) Upload(ctx context.Context, r io.Reader, rs api.RedundancySettings, contractSet string, bh uint64, uploadPacking bool, opts ...UploadOption) (_ object.Object, used map[types.PublicKey]types.FileContractID, partialSlab []byte, etag string, err error) { +func (mgr *uploadManager) Upload(ctx context.Context, r io.Reader, cfg uploadConfig) (_ object.Object, partialSlab []byte, used map[types.PublicKey]types.FileContractID, etag string, err error) { // cancel all in-flight requests when the upload is done ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -467,36 +502,26 @@ func (mgr *uploadManager) Upload(ctx context.Context, r io.Reader, rs api.Redund span.End() }() - // apply options - uc := uploadConfig{ - ec: object.GenerateEncryptionKey(), // random key - encryptionOffset: 0, // from the beginning - } - for _, opt := range opts { - opt(&uc) - } - // wrap the reader to create an etag - tagger := newHashReader(r) - r = tagger + hr := newHashReader(r) // create the object - o := object.NewObject(uc.ec) + o := object.NewObject(cfg.ec) // create the cipher reader - cr, err := o.Encrypt(r, uc.encryptionOffset) + cr, err := o.Encrypt(hr, cfg.encryptionOffset) if err != nil { return object.Object{}, nil, nil, "", err } // fetch contracts - contracts, err := mgr.b.ContractSetContracts(ctx, contractSet) + contracts, err := mgr.b.ContractSetContracts(ctx, cfg.contractSet) if err != nil { return object.Object{}, nil, nil, "", fmt.Errorf("couldn't fetch contracts from bus: %w", err) } // create the upload - u, finishFn, err := mgr.newUpload(ctx, rs.TotalShards, contracts, bh) + u, finishFn, err := mgr.newUpload(ctx, cfg.rs.TotalShards, contracts, cfg.bh) if err != nil { return object.Object{}, nil, nil, "", err } @@ -515,7 +540,7 @@ func (mgr *uploadManager) Upload(ctx context.Context, r io.Reader, rs api.Redund numSlabs := -1 // prepare slab size - size := int64(rs.MinShards) * rhpv2.SectorSize + size := int64(cfg.rs.MinShards) * rhpv2.SectorSize loop: for { select { @@ -542,7 +567,7 @@ loop: } else if err != nil && err != io.ErrUnexpectedEOF { return object.Object{}, nil, nil, "", err } - if uploadPacking && errors.Is(err, io.ErrUnexpectedEOF) { + if cfg.pack && errors.Is(err, io.ErrUnexpectedEOF) { // If uploadPacking is true, we return the partial slab without // uploading. partialSlab = data[:length] @@ -551,7 +576,7 @@ loop: // Otherwise we upload it. go func(rs api.RedundancySettings, data []byte, length, slabIndex int) { u.uploadSlab(ctx, rs, data, length, slabIndex, respChan, nextSlabChan) - }(rs, data, length, slabIndex) + }(cfg.rs, data, length, slabIndex) } slabIndex++ case res := <-respChan: @@ -601,7 +626,7 @@ loop: } } } - return o, usedContracts, partialSlab, tagger.Etag(), nil + return o, partialSlab, usedContracts, hr.Hash(), nil } func (mgr *uploadManager) launch(req *sectorUploadReq) error { @@ -1586,7 +1611,7 @@ func (e *hashReader) Read(p []byte) (int, error) { return n, err } -func (e *hashReader) Etag() string { +func (e *hashReader) Hash() string { sum := e.h.Sum() return hex.EncodeToString(sum[:]) } diff --git a/worker/worker.go b/worker/worker.go index a5146bcc6..700952812 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -1132,11 +1132,14 @@ func (w *worker) objectsHandlerPUT(jc jape.Context) { return } + // built options + opts := []UploadOption{WithUploadParams(up)} + // attach gouging checker to the context ctx = WithGougingChecker(ctx, w.bus, up.GougingParams) // upload the object - etag, err := w.upload(ctx, jc.Request.Body, rs, bucket, jc.PathParam("path"), up.ContractSet, up.CurrentHeight, up.UploadPacking) + etag, err := w.upload(ctx, jc.Request.Body, bucket, jc.PathParam("path"), opts...) if jc.Check("couldn't upload object", err) != nil { return } @@ -1209,8 +1212,6 @@ func (w *worker) multipartUploadHandlerPUT(jc jape.Context) { return } - var opts []UploadOption - // make sure only one of the following is set var disablePreshardingEncryption bool if jc.DecodeForm("disablepreshardingencryption", &disablePreshardingEncryption) != nil { @@ -1228,6 +1229,9 @@ func (w *worker) multipartUploadHandlerPUT(jc jape.Context) { if jc.DecodeForm("offset", &offset) != nil { return } + + // built options + opts := []UploadOption{WithUploadParams(up)} if disablePreshardingEncryption { opts = append(opts, WithCustomKey(object.NoOpKey)) } else { @@ -1238,7 +1242,7 @@ func (w *worker) multipartUploadHandlerPUT(jc jape.Context) { ctx = WithGougingChecker(ctx, w.bus, up.GougingParams) // upload the multipart - etag, err := w.uploadMultiPart(ctx, jc.Request.Body, rs, bucket, jc.PathParam("path"), up.ContractSet, uploadID, partNumber, up.CurrentHeight, up.UploadPacking, opts...) + etag, err := w.uploadMultiPart(ctx, jc.Request.Body, bucket, jc.PathParam("path"), uploadID, partNumber, opts...) if jc.Check("couldn't upload object", err) != nil { return } From 781a8f41ba2aedf961337fb02c5de1975e013ad0 Mon Sep 17 00:00:00 2001 From: PJ Date: Tue, 19 Sep 2023 18:08:16 +0200 Subject: [PATCH 03/12] worker: fix consts --- worker/upload.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/worker/upload.go b/worker/upload.go index c97576480..021476a51 100644 --- a/worker/upload.go +++ b/worker/upload.go @@ -28,6 +28,10 @@ const ( statsDecayHalfTime = 10 * time.Minute statsDecayThreshold = 5 * time.Minute statsRecomputeMinInterval = 3 * time.Second + + defaultPackedSlabsLockDuration = 5 * time.Minute + defaultPackedSlabsUploadTimeout = 10 * time.Minute + defaultPackedSlabsLimit = 2 ) var ( @@ -321,7 +325,7 @@ func (w *worker) tryUploadPackedSlabs(rs api.RedundancySettings, contractSet str } // if partial uploads are enabled, check whether we have a full slab now - packedSlabs, err := w.bus.PackedSlabsForUpload(ctx, 5*time.Minute, uint8(rs.MinShards), uint8(rs.TotalShards), contractSet, -1) + packedSlabs, err := w.bus.PackedSlabsForUpload(ctx, defaultPackedSlabsLockDuration, uint8(rs.MinShards), uint8(rs.TotalShards), contractSet, defaultPackedSlabsLimit) if err != nil { w.logger.Errorf("couldn't fetch packed slabs from bus: %v", err) return false @@ -336,7 +340,7 @@ func (w *worker) tryUploadPackedSlabs(rs api.RedundancySettings, contractSet str for _, ps := range packedSlabs { if err := func() error { // create a context with sane timeout - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute) + ctx, cancel := context.WithTimeout(context.Background(), defaultPackedSlabsUploadTimeout) defer cancel() // upload packed slab From 0a328469455c795f566e2fed9547e153fa1efbd0 Mon Sep 17 00:00:00 2001 From: PJ Date: Tue, 19 Sep 2023 18:16:54 +0200 Subject: [PATCH 04/12] worker: do not set etag for now on reg uploads --- worker/worker.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/worker/worker.go b/worker/worker.go index 700952812..af9f488c9 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -1139,13 +1139,10 @@ func (w *worker) objectsHandlerPUT(jc jape.Context) { ctx = WithGougingChecker(ctx, w.bus, up.GougingParams) // upload the object - etag, err := w.upload(ctx, jc.Request.Body, bucket, jc.PathParam("path"), opts...) + _, err = w.upload(ctx, jc.Request.Body, bucket, jc.PathParam("path"), opts...) if jc.Check("couldn't upload object", err) != nil { return } - - // set etag in header response. - jc.ResponseWriter.Header().Set("ETag", api.FormatEtag(etag)) } func (w *worker) multipartUploadHandlerPUT(jc jape.Context) { From 995593c0f494088c45b0ca3430ff810cbdc2ec0e Mon Sep 17 00:00:00 2001 From: PJ Date: Tue, 19 Sep 2023 18:39:26 +0200 Subject: [PATCH 05/12] testing: fix TestUploadPacking --- internal/testing/cluster_test.go | 39 +++++++++++++++++++------------- worker/upload.go | 13 +++++++---- 2 files changed, 31 insertions(+), 21 deletions(-) diff --git a/internal/testing/cluster_test.go b/internal/testing/cluster_test.go index 5a5ecfa77..b26397263 100644 --- a/internal/testing/cluster_test.go +++ b/internal/testing/cluster_test.go @@ -1932,25 +1932,32 @@ func TestUploadPacking(t *testing.T) { uploadDownload("file5", data5) download("file4", data4, 0, int64(len(data4))) - // check the object stats - os, err := b.ObjectsStats() + // check the object stats, we use a retry loop since packed slabs are upload + // in a separate goroutine so stats might lag a bit + err = Retry(60, time.Second, func() error { + os, err := b.ObjectsStats() + if err != nil { + t.Fatal(err) + } + if os.NumObjects != 5 { + return fmt.Errorf("expected 5 objects, got %v", os.NumObjects) + } + totalObjectSize := uint64(3 * slabSize) + totalRedundantSize := totalObjectSize * uint64(rs.TotalShards) / uint64(rs.MinShards) + if os.TotalObjectsSize != totalObjectSize { + return fmt.Errorf("expected totalObjectSize of %v, got %v", totalObjectSize, os.TotalObjectsSize) + } + if os.TotalSectorsSize != uint64(totalRedundantSize) { + return fmt.Errorf("expected totalSectorSize of %v, got %v", totalRedundantSize, os.TotalSectorsSize) + } + if os.TotalUploadedSize != uint64(totalRedundantSize) { + return fmt.Errorf("expected totalUploadedSize of %v, got %v", totalRedundantSize, os.TotalUploadedSize) + } + return nil + }) if err != nil { t.Fatal(err) } - if os.NumObjects != 5 { - t.Fatal("expected 5 objects, got", os.NumObjects) - } - totalObjectSize := uint64(3 * slabSize) - totalRedundantSize := totalObjectSize * uint64(rs.TotalShards) / uint64(rs.MinShards) - if os.TotalObjectsSize != totalObjectSize { - t.Fatalf("expected totalObjectSize of %v, got %v", totalObjectSize, os.TotalObjectsSize) - } - if os.TotalSectorsSize != uint64(totalRedundantSize) { - t.Errorf("expected totalSectorSize of %v, got %v", totalRedundantSize, os.TotalSectorsSize) - } - if os.TotalUploadedSize != uint64(totalRedundantSize) { - t.Errorf("expected totalUploadedSize of %v, got %v", totalRedundantSize, os.TotalUploadedSize) - } // ObjectsBySlabKey should return 2 objects for the slab of file1 since file1 // and file2 share the same slab. diff --git a/worker/upload.go b/worker/upload.go index 021476a51..befece620 100644 --- a/worker/upload.go +++ b/worker/upload.go @@ -279,7 +279,7 @@ func (w *worker) uploadMultiPart(ctx context.Context, r io.Reader, bucket, path, // if packing was enabled try uploading packed slabs in a separate goroutine if cfg.pack { - go w.tryUploadPackedSlabs(cfg.rs, cfg.contractSet) + w.tryUploadPackedSlabs(cfg.rs, cfg.contractSet) } return etag, nil } @@ -317,10 +317,10 @@ func (w *worker) tryUploadPackedSlabs(rs api.RedundancySettings, contractSet str return false } - // fetch consensus state - cs, err := w.bus.ConsensusState(ctx) + // fetch upload params + up, err := w.bus.UploadParams(ctx) if err != nil { - w.logger.Errorf("couldn't fetch consensus state from bus: %v", err) + w.logger.Errorf("couldn't fetch upload params from bus: %v", err) return false } @@ -343,9 +343,12 @@ func (w *worker) tryUploadPackedSlabs(rs api.RedundancySettings, contractSet str ctx, cancel := context.WithTimeout(context.Background(), defaultPackedSlabsUploadTimeout) defer cancel() + // attach gouging checker to the context + ctx = WithGougingChecker(ctx, w.bus, up.GougingParams) + // upload packed slab shards := encryptPartialSlab(ps.Data, ps.Key, uint8(rs.MinShards), uint8(rs.TotalShards)) - sectors, used, err := w.uploadManager.Migrate(ctx, shards, contracts, cs.BlockHeight) + sectors, used, err := w.uploadManager.Migrate(ctx, shards, contracts, up.CurrentHeight) if err != nil { w.logger.Errorf("couldn't upload packed slab, err: %v", err) return err From 903d2356fa3212e2e6a2cf7d48b20516f73ecd81 Mon Sep 17 00:00:00 2001 From: PJ Date: Wed, 20 Sep 2023 09:58:05 +0200 Subject: [PATCH 06/12] testing: fix TestSlabBufferStats --- internal/testing/cluster_test.go | 73 ++++++++++++++++++++------------ 1 file changed, 46 insertions(+), 27 deletions(-) diff --git a/internal/testing/cluster_test.go b/internal/testing/cluster_test.go index b26397263..f48c704a2 100644 --- a/internal/testing/cluster_test.go +++ b/internal/testing/cluster_test.go @@ -2133,23 +2133,32 @@ func TestSlabBufferStats(t *testing.T) { t.Fatal(err) } - // check the object stats - os, err := b.ObjectsStats() + // check the object stats, we use a retry loop since packed slabs are upload + // in a separate goroutine so stats might lag a bit + err = Retry(60, time.Second, func() error { + os, err := b.ObjectsStats() + if err != nil { + t.Fatal(err) + } + if os.NumObjects != 1 { + return fmt.Errorf("expected 1 object, got %d", os.NumObjects) + } + if os.TotalObjectsSize != uint64(len(data1)) { + return fmt.Errorf("expected totalObjectSize of %d, got %d", len(data1), os.TotalObjectsSize) + } + if os.TotalSectorsSize != 0 { + return fmt.Errorf("expected totalSectorSize of 0, got %d", os.TotalSectorsSize) + } + if os.TotalUploadedSize != 0 { + return fmt.Errorf("expected totalUploadedSize of 0, got %d", os.TotalUploadedSize) + } + return nil + }) if err != nil { t.Fatal(err) } - if os.NumObjects != 1 { - t.Fatal("expected 1 object, got", os.NumObjects) - } - if os.TotalObjectsSize != uint64(len(data1)) { - t.Fatalf("expected totalObjectSize of %v, got %v", len(data1), os.TotalObjectsSize) - } - if os.TotalSectorsSize != 0 { - t.Fatal("expected totalSectorSize of 0, got", os.TotalSectorsSize) - } - if os.TotalUploadedSize != 0 { - t.Fatal("expected totalUploadedSize of 0, got", os.TotalUploadedSize) - } + + // check the slab buffers buffers, err := b.SlabBuffers() if err != nil { t.Fatal(err) @@ -2181,22 +2190,32 @@ func TestSlabBufferStats(t *testing.T) { t.Fatal(err) } - os, err = b.ObjectsStats() + // check the object stats again, we use a retry loop since packed slabs are upload + // in a separate goroutine so stats might lag a bit + err = Retry(60, time.Second, func() error { + os, err := b.ObjectsStats() + if err != nil { + t.Fatal(err) + } + if os.NumObjects != 2 { + return fmt.Errorf("expected 1 object, got %d", os.NumObjects) + } + if os.TotalObjectsSize != uint64(len(data1)+len(data2)) { + return fmt.Errorf("expected totalObjectSize of %d, got %d", len(data1)+len(data2), os.TotalObjectsSize) + } + if os.TotalSectorsSize != 3*rhpv2.SectorSize { + return fmt.Errorf("expected totalSectorSize of %d, got %d", 3*rhpv2.SectorSize, os.TotalSectorsSize) + } + if os.TotalUploadedSize != 3*rhpv2.SectorSize { + return fmt.Errorf("expected totalUploadedSize of %d, got %d", 3*rhpv2.SectorSize, os.TotalUploadedSize) + } + return nil + }) if err != nil { t.Fatal(err) } - if os.NumObjects != 2 { - t.Fatal("expected 1 object, got", os.NumObjects) - } - if os.TotalObjectsSize != uint64(len(data1)+len(data2)) { - t.Fatalf("expected totalObjectSize of %v, got %v", len(data1)+len(data2), os.TotalObjectsSize) - } - if os.TotalSectorsSize != 3*rhpv2.SectorSize { - t.Fatalf("expected totalSectorSize of %v, got %v", 3*rhpv2.SectorSize, os.TotalSectorsSize) - } - if os.TotalUploadedSize != 3*rhpv2.SectorSize { - t.Fatalf("expected totalUploadedSize of %v, got %v", 3*rhpv2.SectorSize, os.TotalUploadedSize) - } + + // check the slab buffers buffers, err = b.SlabBuffers() if err != nil { t.Fatal(err) From d0474b0c9435c5e9d69555355a3097b82768e321 Mon Sep 17 00:00:00 2001 From: PJ Date: Wed, 20 Sep 2023 10:04:23 +0200 Subject: [PATCH 07/12] testing: improve retry --- internal/testing/cluster_test.go | 49 ++++++++++++++++++++++---------- 1 file changed, 34 insertions(+), 15 deletions(-) diff --git a/internal/testing/cluster_test.go b/internal/testing/cluster_test.go index f48c704a2..851c49355 100644 --- a/internal/testing/cluster_test.go +++ b/internal/testing/cluster_test.go @@ -1932,16 +1932,23 @@ func TestUploadPacking(t *testing.T) { uploadDownload("file5", data5) download("file4", data4, 0, int64(len(data4))) - // check the object stats, we use a retry loop since packed slabs are upload - // in a separate goroutine so stats might lag a bit + // assert number of objects + os, err := b.ObjectsStats() + if err != nil { + t.Fatal(err) + } + if os.NumObjects != 5 { + t.Fatalf("expected 5 objects, got %v", os.NumObjects) + } + + // check the object size stats, we use a retry loop since packed slabs are + // uploaded in a separate goroutine, so the object stats might lag a bit err = Retry(60, time.Second, func() error { os, err := b.ObjectsStats() if err != nil { t.Fatal(err) } - if os.NumObjects != 5 { - return fmt.Errorf("expected 5 objects, got %v", os.NumObjects) - } + totalObjectSize := uint64(3 * slabSize) totalRedundantSize := totalObjectSize * uint64(rs.TotalShards) / uint64(rs.MinShards) if os.TotalObjectsSize != totalObjectSize { @@ -2133,16 +2140,22 @@ func TestSlabBufferStats(t *testing.T) { t.Fatal(err) } - // check the object stats, we use a retry loop since packed slabs are upload - // in a separate goroutine so stats might lag a bit + // assert number of objects + os, err := b.ObjectsStats() + if err != nil { + t.Fatal(err) + } + if os.NumObjects != 1 { + t.Fatalf("expected 1 object, got %d", os.NumObjects) + } + + // check the object size stats, we use a retry loop since packed slabs are + // uploaded in a separate goroutine, so the object stats might lag a bit err = Retry(60, time.Second, func() error { os, err := b.ObjectsStats() if err != nil { t.Fatal(err) } - if os.NumObjects != 1 { - return fmt.Errorf("expected 1 object, got %d", os.NumObjects) - } if os.TotalObjectsSize != uint64(len(data1)) { return fmt.Errorf("expected totalObjectSize of %d, got %d", len(data1), os.TotalObjectsSize) } @@ -2190,16 +2203,22 @@ func TestSlabBufferStats(t *testing.T) { t.Fatal(err) } - // check the object stats again, we use a retry loop since packed slabs are upload - // in a separate goroutine so stats might lag a bit + // assert number of objects + os, err = b.ObjectsStats() + if err != nil { + t.Fatal(err) + } + if os.NumObjects != 2 { + t.Fatalf("expected 1 object, got %d", os.NumObjects) + } + + // check the object size stats, we use a retry loop since packed slabs are + // uploaded in a separate goroutine, so the object stats might lag a bit err = Retry(60, time.Second, func() error { os, err := b.ObjectsStats() if err != nil { t.Fatal(err) } - if os.NumObjects != 2 { - return fmt.Errorf("expected 1 object, got %d", os.NumObjects) - } if os.TotalObjectsSize != uint64(len(data1)+len(data2)) { return fmt.Errorf("expected totalObjectSize of %d, got %d", len(data1)+len(data2), os.TotalObjectsSize) } From b148fae0a717a152385e25008dc4e489004a25f3 Mon Sep 17 00:00:00 2001 From: PJ Date: Wed, 20 Sep 2023 10:17:14 +0200 Subject: [PATCH 08/12] worker: use CurrentHeight --- worker/upload.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/worker/upload.go b/worker/upload.go index befece620..c2d8a94e0 100644 --- a/worker/upload.go +++ b/worker/upload.go @@ -74,7 +74,7 @@ func WithCustomEncryptionOffset(offset uint64) UploadOption { func WithUploadParams(up api.UploadParams) UploadOption { return func(cfg *uploadConfig) { cfg.rs = up.RedundancySettings - cfg.bh = up.ConsensusState.BlockHeight + cfg.bh = up.CurrentHeight cfg.contractSet = up.ContractSet cfg.pack = up.UploadPacking } From 9c4ecf56172e18aa42a53d1c7f231526f0a417ee Mon Sep 17 00:00:00 2001 From: PJ Date: Wed, 20 Sep 2023 17:18:43 +0200 Subject: [PATCH 09/12] worker: rename to uploadParameters --- worker/upload.go | 81 ++++++++++++++++++++++++++++-------------------- worker/worker.go | 14 +++++++-- 2 files changed, 60 insertions(+), 35 deletions(-) diff --git a/worker/upload.go b/worker/upload.go index c2d8a94e0..f03998137 100644 --- a/worker/upload.go +++ b/worker/upload.go @@ -39,44 +39,59 @@ var ( errNotEnoughContracts = errors.New("not enough contracts to support requested redundancy") ) -type uploadConfig struct { +type uploadParameters struct { ec object.EncryptionKey encryptionOffset uint64 rs api.RedundancySettings bh uint64 contractSet string - pack bool + packing bool } -func defaultConfig() uploadConfig { - return uploadConfig{ +func defaultParameters() uploadParameters { + return uploadParameters{ ec: object.GenerateEncryptionKey(), // random key encryptionOffset: 0, // from the beginning rs: build.DefaultRedundancySettings, } } -type UploadOption func(*uploadConfig) +type UploadOption func(*uploadParameters) func WithCustomKey(ec object.EncryptionKey) UploadOption { - return func(cfg *uploadConfig) { - cfg.ec = ec + return func(up *uploadParameters) { + up.ec = ec } } func WithCustomEncryptionOffset(offset uint64) UploadOption { - return func(cfg *uploadConfig) { - cfg.encryptionOffset = offset + return func(up *uploadParameters) { + up.encryptionOffset = offset } } -func WithUploadParams(up api.UploadParams) UploadOption { - return func(cfg *uploadConfig) { - cfg.rs = up.RedundancySettings - cfg.bh = up.CurrentHeight - cfg.contractSet = up.ContractSet - cfg.pack = up.UploadPacking +func WithRedundancySettings(rs api.RedundancySettings) UploadOption { + return func(up *uploadParameters) { + up.rs = rs + } +} + +func WithBlockHeight(bh uint64) UploadOption { + return func(up *uploadParameters) { + up.bh = bh + } +} + +func WithContractSet(contractSet string) UploadOption { + return func(up *uploadParameters) { + up.contractSet = contractSet + } +} + +func WithPacking(packing bool) UploadOption { + return func(up *uploadParameters) { + up.packing = packing } } @@ -216,7 +231,7 @@ func (w *worker) initUploadManager(maxOverdrive uint64, overdriveTimeout time.Du func (w *worker) upload(ctx context.Context, r io.Reader, bucket, path string, opts ...UploadOption) (string, error) { // build upload config - cfg := defaultConfig() + cfg := defaultParameters() for _, opt := range opts { opt(&cfg) } @@ -243,28 +258,28 @@ func (w *worker) upload(ctx context.Context, r io.Reader, bucket, path string, o } // if packing was enabled try uploading packed slabs in a separate goroutine - if cfg.pack { + if cfg.packing { go w.tryUploadPackedSlabs(cfg.rs, cfg.contractSet) } return etag, nil } func (w *worker) uploadMultiPart(ctx context.Context, r io.Reader, bucket, path, uploadID string, partNumber int, opts ...UploadOption) (string, error) { - // build upload config - cfg := defaultConfig() + // build upload parameters + up := defaultParameters() for _, opt := range opts { - opt(&cfg) + opt(&up) } // upload the part - obj, partialSlabData, used, etag, err := w.uploadManager.Upload(ctx, r, cfg) + obj, partialSlabData, used, etag, err := w.uploadManager.Upload(ctx, r, up) if err != nil { return "", fmt.Errorf("couldn't upload object: %w", err) } // add parital slabs if len(partialSlabData) > 0 { - partialSlabs, err := w.bus.AddPartialSlab(ctx, partialSlabData, uint8(cfg.rs.MinShards), uint8(cfg.rs.TotalShards), cfg.contractSet) + partialSlabs, err := w.bus.AddPartialSlab(ctx, partialSlabData, uint8(up.rs.MinShards), uint8(up.rs.TotalShards), up.contractSet) if err != nil { return "", err } @@ -272,14 +287,14 @@ func (w *worker) uploadMultiPart(ctx context.Context, r io.Reader, bucket, path, } // persist the part - err = w.bus.AddMultipartPart(ctx, bucket, path, cfg.contractSet, uploadID, partNumber, obj.Slabs, obj.PartialSlabs, etag, used) + err = w.bus.AddMultipartPart(ctx, bucket, path, up.contractSet, uploadID, partNumber, obj.Slabs, obj.PartialSlabs, etag, used) if err != nil { return "", fmt.Errorf("couldn't add multi part: %w", err) } // if packing was enabled try uploading packed slabs in a separate goroutine - if cfg.pack { - w.tryUploadPackedSlabs(cfg.rs, cfg.contractSet) + if up.packing { + go w.tryUploadPackedSlabs(up.rs, up.contractSet) } return etag, nil } @@ -497,7 +512,7 @@ func (mgr *uploadManager) Stop() { } } -func (mgr *uploadManager) Upload(ctx context.Context, r io.Reader, cfg uploadConfig) (_ object.Object, partialSlab []byte, used map[types.PublicKey]types.FileContractID, etag string, err error) { +func (mgr *uploadManager) Upload(ctx context.Context, r io.Reader, up uploadParameters) (_ object.Object, partialSlab []byte, used map[types.PublicKey]types.FileContractID, etag string, err error) { // cancel all in-flight requests when the upload is done ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -513,22 +528,22 @@ func (mgr *uploadManager) Upload(ctx context.Context, r io.Reader, cfg uploadCon hr := newHashReader(r) // create the object - o := object.NewObject(cfg.ec) + o := object.NewObject(up.ec) // create the cipher reader - cr, err := o.Encrypt(hr, cfg.encryptionOffset) + cr, err := o.Encrypt(hr, up.encryptionOffset) if err != nil { return object.Object{}, nil, nil, "", err } // fetch contracts - contracts, err := mgr.b.ContractSetContracts(ctx, cfg.contractSet) + contracts, err := mgr.b.ContractSetContracts(ctx, up.contractSet) if err != nil { return object.Object{}, nil, nil, "", fmt.Errorf("couldn't fetch contracts from bus: %w", err) } // create the upload - u, finishFn, err := mgr.newUpload(ctx, cfg.rs.TotalShards, contracts, cfg.bh) + u, finishFn, err := mgr.newUpload(ctx, up.rs.TotalShards, contracts, up.bh) if err != nil { return object.Object{}, nil, nil, "", err } @@ -547,7 +562,7 @@ func (mgr *uploadManager) Upload(ctx context.Context, r io.Reader, cfg uploadCon numSlabs := -1 // prepare slab size - size := int64(cfg.rs.MinShards) * rhpv2.SectorSize + size := int64(up.rs.MinShards) * rhpv2.SectorSize loop: for { select { @@ -574,7 +589,7 @@ loop: } else if err != nil && err != io.ErrUnexpectedEOF { return object.Object{}, nil, nil, "", err } - if cfg.pack && errors.Is(err, io.ErrUnexpectedEOF) { + if up.packing && errors.Is(err, io.ErrUnexpectedEOF) { // If uploadPacking is true, we return the partial slab without // uploading. partialSlab = data[:length] @@ -583,7 +598,7 @@ loop: // Otherwise we upload it. go func(rs api.RedundancySettings, data []byte, length, slabIndex int) { u.uploadSlab(ctx, rs, data, length, slabIndex, respChan, nextSlabChan) - }(cfg.rs, data, length, slabIndex) + }(up.rs, data, length, slabIndex) } slabIndex++ case res := <-respChan: diff --git a/worker/worker.go b/worker/worker.go index af9f488c9..27467ee7d 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -1133,7 +1133,12 @@ func (w *worker) objectsHandlerPUT(jc jape.Context) { } // built options - opts := []UploadOption{WithUploadParams(up)} + opts := []UploadOption{ + WithBlockHeight(up.CurrentHeight), + WithContractSet(up.ContractSet), + WithPacking(up.UploadPacking), + WithRedundancySettings(up.RedundancySettings), + } // attach gouging checker to the context ctx = WithGougingChecker(ctx, w.bus, up.GougingParams) @@ -1228,7 +1233,12 @@ func (w *worker) multipartUploadHandlerPUT(jc jape.Context) { } // built options - opts := []UploadOption{WithUploadParams(up)} + opts := []UploadOption{ + WithBlockHeight(up.CurrentHeight), + WithContractSet(up.ContractSet), + WithPacking(up.UploadPacking), + WithRedundancySettings(up.RedundancySettings), + } if disablePreshardingEncryption { opts = append(opts, WithCustomKey(object.NoOpKey)) } else { From 0f79ed91023ccd9093d90ad7ea002fd28457b009 Mon Sep 17 00:00:00 2001 From: PJ Date: Wed, 20 Sep 2023 17:21:32 +0200 Subject: [PATCH 10/12] worker: fix copy --- worker/upload.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/worker/upload.go b/worker/upload.go index f03998137..304c01571 100644 --- a/worker/upload.go +++ b/worker/upload.go @@ -230,21 +230,21 @@ func (w *worker) initUploadManager(maxOverdrive uint64, overdriveTimeout time.Du } func (w *worker) upload(ctx context.Context, r io.Reader, bucket, path string, opts ...UploadOption) (string, error) { - // build upload config - cfg := defaultParameters() + // build upload parameters + up := defaultParameters() for _, opt := range opts { - opt(&cfg) + opt(&up) } // perform the upload - obj, partialSlabData, used, etag, err := w.uploadManager.Upload(ctx, r, cfg) + obj, partialSlabData, used, etag, err := w.uploadManager.Upload(ctx, r, up) if err != nil { return "", fmt.Errorf("couldn't upload object: %w", err) } // add parital slabs if len(partialSlabData) > 0 { - partialSlabs, err := w.bus.AddPartialSlab(ctx, partialSlabData, uint8(cfg.rs.MinShards), uint8(cfg.rs.TotalShards), cfg.contractSet) + partialSlabs, err := w.bus.AddPartialSlab(ctx, partialSlabData, uint8(up.rs.MinShards), uint8(up.rs.TotalShards), up.contractSet) if err != nil { return "", err } @@ -252,14 +252,14 @@ func (w *worker) upload(ctx context.Context, r io.Reader, bucket, path string, o } // persist the object - err = w.bus.AddObject(ctx, bucket, path, cfg.contractSet, obj, used) + err = w.bus.AddObject(ctx, bucket, path, up.contractSet, obj, used) if err != nil { return "", fmt.Errorf("couldn't add object: %w", err) } // if packing was enabled try uploading packed slabs in a separate goroutine - if cfg.packing { - go w.tryUploadPackedSlabs(cfg.rs, cfg.contractSet) + if up.packing { + go w.tryUploadPackedSlabs(up.rs, up.contractSet) } return etag, nil } From 1ee41617020d89f61774160cdecf2f82440de0d3 Mon Sep 17 00:00:00 2001 From: PJ Date: Thu, 21 Sep 2023 14:00:51 +0200 Subject: [PATCH 11/12] worker: cleanup loop --- worker/upload.go | 120 ++++++++++++++++++++++------------------------- 1 file changed, 57 insertions(+), 63 deletions(-) diff --git a/worker/upload.go b/worker/upload.go index 304c01571..d796422f5 100644 --- a/worker/upload.go +++ b/worker/upload.go @@ -31,7 +31,7 @@ const ( defaultPackedSlabsLockDuration = 5 * time.Minute defaultPackedSlabsUploadTimeout = 10 * time.Minute - defaultPackedSlabsLimit = 2 + defaultPackedSlabsLimit = 1 ) var ( @@ -318,75 +318,69 @@ func (w *worker) tryUploadPackedSlabs(rs api.RedundancySettings, contractSet str }() // keep uploading packed slabs until we're done - packedSlabsFound := true - for packedSlabsFound { - packedSlabsFound = func() bool { - // create a context with sane timeout - ctx, cancel := context.WithTimeout(context.Background(), time.Minute) - defer cancel() - - // fetch contracts - contracts, err := w.bus.ContractSetContracts(ctx, contractSet) - if err != nil { - w.logger.Errorf("couldn't fetch packed slabs from bus: %v", err) - return false - } + for { + // fetch packed slabs + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + packedSlabs, err := w.bus.PackedSlabsForUpload(ctx, defaultPackedSlabsLockDuration, uint8(rs.MinShards), uint8(rs.TotalShards), contractSet, defaultPackedSlabsLimit) + if err != nil { + w.logger.Errorf("couldn't fetch packed slabs from bus: %v", err) + cancel() + return + } + cancel() - // fetch upload params - up, err := w.bus.UploadParams(ctx) - if err != nil { - w.logger.Errorf("couldn't fetch upload params from bus: %v", err) - return false - } + // if there's no packed slabs, we're done + if len(packedSlabs) == 0 { + return + } - // if partial uploads are enabled, check whether we have a full slab now - packedSlabs, err := w.bus.PackedSlabsForUpload(ctx, defaultPackedSlabsLockDuration, uint8(rs.MinShards), uint8(rs.TotalShards), contractSet, defaultPackedSlabsLimit) - if err != nil { - w.logger.Errorf("couldn't fetch packed slabs from bus: %v", err) - return false + // upload packed slabs + for _, ps := range packedSlabs { + if err := w.uploadPackedSlab(ps, rs, contractSet); err != nil { + w.logger.Errorf("failed to upload packed slab, err: %v", err) + return } + } + } +} - // if there's no packed slabs, we're done - if len(packedSlabs) == 0 { - return false - } +func (w *worker) uploadPackedSlab(ps api.PackedSlab, rs api.RedundancySettings, contractSet string) error { + // create a context with sane timeout + ctx, cancel := context.WithTimeout(context.Background(), defaultPackedSlabsUploadTimeout) + defer cancel() - // upload packed slabs - for _, ps := range packedSlabs { - if err := func() error { - // create a context with sane timeout - ctx, cancel := context.WithTimeout(context.Background(), defaultPackedSlabsUploadTimeout) - defer cancel() - - // attach gouging checker to the context - ctx = WithGougingChecker(ctx, w.bus, up.GougingParams) - - // upload packed slab - shards := encryptPartialSlab(ps.Data, ps.Key, uint8(rs.MinShards), uint8(rs.TotalShards)) - sectors, used, err := w.uploadManager.Migrate(ctx, shards, contracts, up.CurrentHeight) - if err != nil { - w.logger.Errorf("couldn't upload packed slab, err: %v", err) - return err - } + // fetch contracts + contracts, err := w.bus.ContractSetContracts(ctx, contractSet) + if err != nil { + return fmt.Errorf("couldn't fetch packed slabs from bus: %v", err) + } - // mark packed slab as uploaded - if err = w.bus.MarkPackedSlabsUploaded(ctx, []api.UploadedPackedSlab{ - { - BufferID: ps.BufferID, - Shards: sectors, - }, - }, used); err != nil { - w.logger.Errorf("couldn't mark packed slabs uploaded, err: %v", err) - return err - } - return nil - }(); err != nil { - return false - } - } - return true - }() + // fetch upload params + up, err := w.bus.UploadParams(ctx) + if err != nil { + return fmt.Errorf("couldn't fetch upload params from bus: %v", err) } + + // attach gouging checker to the context + ctx = WithGougingChecker(ctx, w.bus, up.GougingParams) + + // upload packed slab + shards := encryptPartialSlab(ps.Data, ps.Key, uint8(rs.MinShards), uint8(rs.TotalShards)) + sectors, used, err := w.uploadManager.Migrate(ctx, shards, contracts, up.CurrentHeight) + if err != nil { + return fmt.Errorf("couldn't upload packed slab, err: %v", err) + } + + // mark packed slab as uploaded + if err = w.bus.MarkPackedSlabsUploaded(ctx, []api.UploadedPackedSlab{ + { + BufferID: ps.BufferID, + Shards: sectors, + }, + }, used); err != nil { + return fmt.Errorf("couldn't mark packed slabs uploaded, err: %v", err) + } + return nil } func newDataPoints(halfLife time.Duration) *dataPoints { From 5527f4af3eb04cdb40888ca5cdc8786b9ab55ed7 Mon Sep 17 00:00:00 2001 From: PJ Date: Thu, 21 Sep 2023 14:03:24 +0200 Subject: [PATCH 12/12] worker: cleanup PR --- worker/upload.go | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/worker/upload.go b/worker/upload.go index d796422f5..8738a988a 100644 --- a/worker/upload.go +++ b/worker/upload.go @@ -372,14 +372,12 @@ func (w *worker) uploadPackedSlab(ps api.PackedSlab, rs api.RedundancySettings, } // mark packed slab as uploaded - if err = w.bus.MarkPackedSlabsUploaded(ctx, []api.UploadedPackedSlab{ - { - BufferID: ps.BufferID, - Shards: sectors, - }, - }, used); err != nil { + slab := api.UploadedPackedSlab{BufferID: ps.BufferID, Shards: sectors} + err = w.bus.MarkPackedSlabsUploaded(ctx, []api.UploadedPackedSlab{slab}, used) + if err != nil { return fmt.Errorf("couldn't mark packed slabs uploaded, err: %v", err) } + return nil }