From b2fd6b9539daa71d88e9c40f9d02890f697b6258 Mon Sep 17 00:00:00 2001 From: PJ Date: Mon, 18 Sep 2023 11:46:50 +0200 Subject: [PATCH 1/8] worker: make sure used contracts map have renewed contract ids --- worker/migrations.go | 16 ++++---- worker/upload.go | 87 +++++++++++++++++++++++++++++++++++++++----- worker/worker.go | 50 +++++++++---------------- 3 files changed, 102 insertions(+), 51 deletions(-) diff --git a/worker/migrations.go b/worker/migrations.go index 80089d42a..b72480c3a 100644 --- a/worker/migrations.go +++ b/worker/migrations.go @@ -11,7 +11,7 @@ import ( "go.uber.org/zap" ) -func migrateSlab(ctx context.Context, d *downloadManager, u *uploadManager, s *object.Slab, dlContracts, ulContracts []api.ContractMetadata, bh uint64, logger *zap.SugaredLogger) error { +func migrateSlab(ctx context.Context, d *downloadManager, u *uploadManager, s *object.Slab, dlContracts, ulContracts []api.ContractMetadata, bh uint64, logger *zap.SugaredLogger) (map[types.PublicKey]types.FileContractID, error) { ctx, span := tracing.Tracer.Start(ctx, "migrateSlab") defer span.End() @@ -48,7 +48,7 @@ func migrateSlab(ctx context.Context, d *downloadManager, u *uploadManager, s *o // if all shards are on good hosts, we're done if len(shardIndices) == 0 { - return nil + return nil, nil } // subtract the number of shards that are on hosts with contracts and might @@ -63,16 +63,16 @@ func migrateSlab(ctx context.Context, d *downloadManager, u *uploadManager, s *o // perform some sanity checks if len(ulContracts) < int(s.MinShards) { - return fmt.Errorf("not enough hosts to repair unhealthy shard to minimum redundancy, %d<%d", len(ulContracts), int(s.MinShards)) + return nil, fmt.Errorf("not enough hosts to repair unhealthy shard to minimum redundancy, %d<%d", len(ulContracts), int(s.MinShards)) } if len(s.Shards)-missingShards < int(s.MinShards) { - return fmt.Errorf("not enough hosts to download unhealthy shard, %d<%d", len(s.Shards)-len(shardIndices), int(s.MinShards)) + return nil, fmt.Errorf("not enough hosts to download unhealthy shard, %d<%d", len(s.Shards)-len(shardIndices), int(s.MinShards)) } // download the slab shards, err := d.DownloadSlab(ctx, *s, dlContracts) if err != nil { - return fmt.Errorf("failed to download slab for migration: %w", err) + return nil, fmt.Errorf("failed to download slab for migration: %w", err) } s.Encrypt(shards) @@ -91,14 +91,14 @@ func migrateSlab(ctx context.Context, d *downloadManager, u *uploadManager, s *o } // migrate the shards - uploaded, err := u.Migrate(ctx, shards, allowed, bh) + uploaded, used, err := u.Migrate(ctx, shards, allowed, bh) if err != nil { - return fmt.Errorf("failed to upload slab for migration: %w", err) + return nil, fmt.Errorf("failed to upload slab for migration: %w", err) } // overwrite the unhealthy shards with the newly migrated ones for i, si := range shardIndices { s.Shards[si] = uploaded[i] } - return nil + return used, nil } diff --git a/worker/upload.go b/worker/upload.go index 674980b33..53cfcce09 100644 --- a/worker/upload.go +++ b/worker/upload.go @@ -214,16 +214,44 @@ func (mgr *uploadManager) newUploader(c api.ContractMetadata) *uploader { } } -func (mgr *uploadManager) Migrate(ctx context.Context, shards [][]byte, contracts []api.ContractMetadata, bh uint64) ([]object.Sector, error) { +func (mgr *uploadManager) Migrate(ctx context.Context, shards [][]byte, contracts []api.ContractMetadata, bh uint64) ([]object.Sector, map[types.PublicKey]types.FileContractID, error) { // initiate the upload upload, finishFn, err := mgr.newUpload(ctx, len(shards), contracts, bh) if err != nil { - return nil, err + return nil, nil, err } defer finishFn() // upload the shards - return upload.uploadShards(ctx, shards, nil) + sectors, err := upload.uploadShards(ctx, shards, nil) + if err != nil { + return nil, nil, err + } + + // build host to contract map + h2c := make(map[types.PublicKey]types.FileContractID) + for _, contract := range contracts { + h2c[contract.HostKey] = contract.ID + } + + // ask the manager for the renewals + c2r := mgr.renewalsMap() + + // build used contracts list + usedContracts := make(map[types.PublicKey]types.FileContractID) + for _, sector := range sectors { + fcid, exists := h2c[sector.Host] + if !exists { + return nil, nil, fmt.Errorf("couldn't find contract for host %v", sector.Host) + } + if renewed, exists := c2r[fcid]; exists { + usedContracts[sector.Host] = renewed + } else { + usedContracts[sector.Host] = fcid + } + } + + return sectors, usedContracts, nil } func (mgr *uploadManager) Stats() uploadManagerStats { @@ -262,7 +290,7 @@ func (mgr *uploadManager) Stop() { } } -func (mgr *uploadManager) Upload(ctx context.Context, r io.Reader, rs api.RedundancySettings, contracts []api.ContractMetadata, bh uint64, uploadPacking bool) (_ object.Object, partialSlab []byte, err error) { +func (mgr *uploadManager) Upload(ctx context.Context, r io.Reader, rs api.RedundancySettings, contracts []api.ContractMetadata, bh uint64, uploadPacking bool) (_ object.Object, used map[types.PublicKey]types.FileContractID, partialSlab []byte, err error) { // cancel all in-flight requests when the upload is done ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -283,7 +311,7 @@ func (mgr *uploadManager) Upload(ctx context.Context, r io.Reader, rs api.Redund // create the upload u, finishFn, err := mgr.newUpload(ctx, rs.TotalShards, contracts, bh) if err != nil { - return object.Object{}, nil, err + return object.Object{}, nil, nil, err } defer finishFn() @@ -305,9 +333,9 @@ loop: for { select { case <-mgr.stopChan: - return object.Object{}, nil, errors.New("manager was stopped") + return object.Object{}, nil, nil, errors.New("manager was stopped") case <-ctx.Done(): - return object.Object{}, nil, errors.New("upload timed out") + return object.Object{}, nil, nil, errors.New("upload timed out") case nextSlabChan <- struct{}{}: // read next slab's data data := make([]byte, size) @@ -325,7 +353,7 @@ loop: } continue } else if err != nil && err != io.ErrUnexpectedEOF { - return object.Object{}, nil, err + return object.Object{}, nil, nil, err } if uploadPacking && errors.Is(err, io.ErrUnexpectedEOF) { // If uploadPacking is true, we return the partial slab without @@ -341,7 +369,7 @@ loop: slabIndex++ case res := <-respChan: if res.err != nil { - return object.Object{}, nil, res.err + return object.Object{}, nil, nil, res.err } // collect the response and potentially break out of the loop @@ -361,7 +389,32 @@ loop: for _, resp := range responses { o.Slabs = append(o.Slabs, resp.slab) } - return o, partialSlab, nil + + // build host to contract map + h2c := make(map[types.PublicKey]types.FileContractID) + for _, contract := range contracts { + h2c[contract.HostKey] = contract.ID + } + + // ask the manager for the renewals + c2r := mgr.renewalsMap() + + // build used contracts list + usedContracts := make(map[types.PublicKey]types.FileContractID) + for _, slab := range o.Slabs { + for _, sector := range slab.Shards { + fcid, exists := h2c[sector.Host] + if !exists { + return object.Object{}, nil, nil, fmt.Errorf("couldn't find contract for host %v", sector.Host) + } + if renewed, exists := c2r[fcid]; exists { + usedContracts[sector.Host] = renewed + } else { + usedContracts[sector.Host] = fcid + } + } + } + return o, usedContracts, partialSlab, nil } func (mgr *uploadManager) launch(req *sectorUploadReq) error { @@ -516,6 +569,20 @@ func (mgr *uploadManager) renewUploader(u *uploader) { u.SignalWork() } +func (mgr *uploadManager) renewalsMap() map[types.FileContractID]types.FileContractID { + mgr.mu.Lock() + defer mgr.mu.Unlock() + + renewals := make(map[types.FileContractID]types.FileContractID) + for _, u := range mgr.uploaders { + fcid, renewedFrom, _ := u.contractInfo() + if renewedFrom != (types.FileContractID{}) { + renewals[renewedFrom] = fcid + } + } + return renewals +} + func (mgr *uploadManager) refreshUploaders(contracts []api.ContractMetadata, bh uint64) { // build map c2m := make(map[types.FileContractID]api.ContractMetadata) diff --git a/worker/worker.go b/worker/worker.go index 877f0e61d..ecb2dc4bf 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -902,26 +902,32 @@ func (w *worker) slabMigrateHandler(jc jape.Context) { return } - err = migrateSlab(ctx, w.downloadManager, w.uploadManager, &slab, dlContracts, ulContracts, up.CurrentHeight, w.logger) + // migrate the slab + used, err := migrateSlab(ctx, w.downloadManager, w.uploadManager, &slab, dlContracts, ulContracts, up.CurrentHeight, w.logger) if jc.Check("couldn't migrate slabs", err) != nil { return } - usedContracts := make(map[types.PublicKey]types.FileContractID) + // build host to contract map + h2c := make(map[types.PublicKey]types.FileContractID) + for _, contract := range ulContracts { + h2c[contract.HostKey] = contract.ID + } + for _, ss := range slab.Shards { - if _, exists := usedContracts[ss.Host]; exists { + if _, exists := used[ss.Host]; exists { continue } - for _, c := range ulContracts { - if c.HostKey == ss.Host { - usedContracts[ss.Host] = c.ID - break - } + fcid, exists := h2c[ss.Host] + if !exists { + jc.Error(fmt.Errorf("couldn't find contract for host %v", ss.Host), http.StatusInternalServerError) + continue } + used[ss.Host] = fcid } - if jc.Check("couldn't update slab", w.bus.UpdateSlab(ctx, slab, up.ContractSet, usedContracts)) != nil { + if jc.Check("couldn't update slab", w.bus.UpdateSlab(ctx, slab, up.ContractSet, used)) != nil { return } } @@ -1132,23 +1138,11 @@ func (w *worker) objectsHandlerPUT(jc jape.Context) { } // upload the object - obj, partialSlabData, err := w.uploadManager.Upload(ctx, jc.Request.Body, rs, contracts, up.CurrentHeight, up.UploadPacking) + obj, used, partialSlabData, err := w.uploadManager.Upload(ctx, jc.Request.Body, rs, contracts, up.CurrentHeight, up.UploadPacking) if jc.Check("couldn't upload object", err) != nil { return } - // build used contracts map - h2c := make(map[types.PublicKey]types.FileContractID) - for _, c := range contracts { - h2c[c.HostKey] = c.ID - } - used := make(map[types.PublicKey]types.FileContractID) - for _, s := range obj.Slabs { - for _, ss := range s.Shards { - used[ss.Host] = h2c[ss.Host] - } - } - if len(partialSlabData) > 0 { partialSlabs, err := w.bus.AddPartialSlab(jc.Request.Context(), partialSlabData, uint8(rs.MinShards), uint8(rs.TotalShards), up.ContractSet) if jc.Check("couldn't add partial slabs to bus", err) != nil { @@ -1176,21 +1170,11 @@ func (w *worker) objectsHandlerPUT(jc jape.Context) { for _, ps := range packedSlabs { // upload packed slab. shards := encryptPartialSlab(ps.Data, ps.Key, uint8(rs.MinShards), uint8(rs.TotalShards)) - sectors, err := w.uploadManager.Migrate(ctx, shards, contracts, up.CurrentHeight) + sectors, used, err := w.uploadManager.Migrate(ctx, shards, contracts, up.CurrentHeight) if jc.Check("couldn't upload packed slab", err) != nil { return } - // build used contracts map - h2c := make(map[types.PublicKey]types.FileContractID) - for _, c := range contracts { - h2c[c.HostKey] = c.ID - } - used := make(map[types.PublicKey]types.FileContractID) - for _, s := range sectors { - used[s.Host] = h2c[s.Host] - } - // mark packed slab as uploaded. err = w.bus.MarkPackedSlabsUploaded(jc.Request.Context(), []api.UploadedPackedSlab{ { From 6c1070e71e7da5d66b7d71511cffb563a1dce4c1 Mon Sep 17 00:00:00 2001 From: PJ Date: Mon, 18 Sep 2023 12:05:35 +0200 Subject: [PATCH 2/8] stores: update fetchUsedContracts --- stores/metadata.go | 29 ++++++++++++++++++----------- 1 file changed, 18 insertions(+), 11 deletions(-) diff --git a/stores/metadata.go b/stores/metadata.go index 1e6c812ed..2165ddacc 100644 --- a/stores/metadata.go +++ b/stores/metadata.go @@ -1070,24 +1070,31 @@ func pruneSlabs(tx *gorm.DB) error { } func fetchUsedContracts(tx *gorm.DB, usedContracts map[types.PublicKey]types.FileContractID) (map[types.PublicKey]dbContract, error) { - fcids := make([]fileContractID, 0, len(usedContracts)) - hostForFCID := make(map[types.FileContractID]types.PublicKey, len(usedContracts)) - for hk, fcid := range usedContracts { + // build fcids + var fcids []fileContractID + for _, fcid := range usedContracts { fcids = append(fcids, fileContractID(fcid)) - hostForFCID[fcid] = hk } + + // fetch all contracts var contracts []dbContract - err := tx.Model(&dbContract{}). - Where("fcid IN (?)", fcids). - Find(&contracts).Error - if err != nil { + if err := tx.Model(&dbContract{}). + Where("fcid IN (?) OR renewed_from IN (?)", fcids). + Preload("Host"). + Find(&contracts).Error; err != nil { return nil, err } - fetchedContracts := make(map[types.PublicKey]dbContract, len(contracts)) + + // build a host to contract map + h2c := make(map[types.PublicKey]dbContract, len(contracts)) for _, c := range contracts { - fetchedContracts[hostForFCID[types.FileContractID(c.FCID)]] = c + _, exists := usedContracts[types.PublicKey(c.Host.PublicKey)] + if exists { + h2c[types.PublicKey(c.Host.PublicKey)] = c + } } - return fetchedContracts, nil + + return h2c, nil } func (s *SQLStore) RenameObject(ctx context.Context, bucket, keyOld, keyNew string) error { From 3cd6800912f5c337c4ed890c46a81ca760b762df Mon Sep 17 00:00:00 2001 From: PJ Date: Mon, 18 Sep 2023 13:48:10 +0200 Subject: [PATCH 3/8] stores: fix arg --- stores/metadata.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/stores/metadata.go b/stores/metadata.go index 2165ddacc..3ee8776fc 100644 --- a/stores/metadata.go +++ b/stores/metadata.go @@ -1079,7 +1079,7 @@ func fetchUsedContracts(tx *gorm.DB, usedContracts map[types.PublicKey]types.Fil // fetch all contracts var contracts []dbContract if err := tx.Model(&dbContract{}). - Where("fcid IN (?) OR renewed_from IN (?)", fcids). + Where("fcid IN (?) OR renewed_from IN (?)", fcids, fcids). Preload("Host"). Find(&contracts).Error; err != nil { return nil, err From d62a78b9bf434559b60692a8ae009fb81244a17f Mon Sep 17 00:00:00 2001 From: PJ Date: Mon, 18 Sep 2023 14:02:57 +0200 Subject: [PATCH 4/8] stores: undo changes --- stores/metadata.go | 29 +++++++++++------------------ 1 file changed, 11 insertions(+), 18 deletions(-) diff --git a/stores/metadata.go b/stores/metadata.go index 3ee8776fc..1e6c812ed 100644 --- a/stores/metadata.go +++ b/stores/metadata.go @@ -1070,31 +1070,24 @@ func pruneSlabs(tx *gorm.DB) error { } func fetchUsedContracts(tx *gorm.DB, usedContracts map[types.PublicKey]types.FileContractID) (map[types.PublicKey]dbContract, error) { - // build fcids - var fcids []fileContractID - for _, fcid := range usedContracts { + fcids := make([]fileContractID, 0, len(usedContracts)) + hostForFCID := make(map[types.FileContractID]types.PublicKey, len(usedContracts)) + for hk, fcid := range usedContracts { fcids = append(fcids, fileContractID(fcid)) + hostForFCID[fcid] = hk } - - // fetch all contracts var contracts []dbContract - if err := tx.Model(&dbContract{}). - Where("fcid IN (?) OR renewed_from IN (?)", fcids, fcids). - Preload("Host"). - Find(&contracts).Error; err != nil { + err := tx.Model(&dbContract{}). + Where("fcid IN (?)", fcids). + Find(&contracts).Error + if err != nil { return nil, err } - - // build a host to contract map - h2c := make(map[types.PublicKey]dbContract, len(contracts)) + fetchedContracts := make(map[types.PublicKey]dbContract, len(contracts)) for _, c := range contracts { - _, exists := usedContracts[types.PublicKey(c.Host.PublicKey)] - if exists { - h2c[types.PublicKey(c.Host.PublicKey)] = c - } + fetchedContracts[hostForFCID[types.FileContractID(c.FCID)]] = c } - - return h2c, nil + return fetchedContracts, nil } func (s *SQLStore) RenameObject(ctx context.Context, bucket, keyOld, keyNew string) error { From 3bf59a4f8ed75cd880dfa2ea728d66d048b3faf0 Mon Sep 17 00:00:00 2001 From: PJ Date: Mon, 18 Sep 2023 14:21:22 +0200 Subject: [PATCH 5/8] worker: update migrateSlab --- worker/migrations.go | 14 ++++++++++++++ worker/worker.go | 20 +------------------- 2 files changed, 15 insertions(+), 19 deletions(-) diff --git a/worker/migrations.go b/worker/migrations.go index b72480c3a..f79c33029 100644 --- a/worker/migrations.go +++ b/worker/migrations.go @@ -100,5 +100,19 @@ func migrateSlab(ctx context.Context, d *downloadManager, u *uploadManager, s *o for i, si := range shardIndices { s.Shards[si] = uploaded[i] } + + // loop all shards and extend the used contracs map so it reflects all used + // contracts, not just the used contracts for the migrated shards + for _, sector := range s.Shards { + _, exists := used[sector.Host] + if !exists { + if fcid, exists := h2c[sector.Host]; !exists { + return nil, fmt.Errorf("couldn't find contract for host %v", sector.Host) + } else { + used[sector.Host] = fcid + } + } + } + return used, nil } diff --git a/worker/worker.go b/worker/worker.go index f5040431b..baf86c26e 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -908,25 +908,7 @@ func (w *worker) slabMigrateHandler(jc jape.Context) { return } - // build host to contract map - h2c := make(map[types.PublicKey]types.FileContractID) - for _, contract := range ulContracts { - h2c[contract.HostKey] = contract.ID - } - - for _, ss := range slab.Shards { - if _, exists := used[ss.Host]; exists { - continue - } - - fcid, exists := h2c[ss.Host] - if !exists { - jc.Error(fmt.Errorf("couldn't find contract for host %v", ss.Host), http.StatusInternalServerError) - continue - } - used[ss.Host] = fcid - } - + // update the slab if jc.Check("couldn't update slab", w.bus.UpdateSlab(ctx, slab, up.ContractSet, used)) != nil { return } From 35093d63cc6fd38902539f9073776fd263692703 Mon Sep 17 00:00:00 2001 From: PJ Date: Mon, 18 Sep 2023 14:28:18 +0200 Subject: [PATCH 6/8] worker: do not update if no migration took place --- worker/worker.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/worker/worker.go b/worker/worker.go index baf86c26e..560a6b80c 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -909,7 +909,7 @@ func (w *worker) slabMigrateHandler(jc jape.Context) { } // update the slab - if jc.Check("couldn't update slab", w.bus.UpdateSlab(ctx, slab, up.ContractSet, used)) != nil { + if used != nil && jc.Check("couldn't update slab", w.bus.UpdateSlab(ctx, slab, up.ContractSet, used)) != nil { return } } From 6f32338bca7bbd1e55fb4996b45e275ab52ef430 Mon Sep 17 00:00:00 2001 From: PJ Date: Mon, 18 Sep 2023 14:28:45 +0200 Subject: [PATCH 7/8] worker: fix typo --- worker/migrations.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/worker/migrations.go b/worker/migrations.go index f79c33029..866041cb8 100644 --- a/worker/migrations.go +++ b/worker/migrations.go @@ -101,7 +101,7 @@ func migrateSlab(ctx context.Context, d *downloadManager, u *uploadManager, s *o s.Shards[si] = uploaded[i] } - // loop all shards and extend the used contracs map so it reflects all used + // loop all shards and extend the used contracts map so it reflects all used // contracts, not just the used contracts for the migrated shards for _, sector := range s.Shards { _, exists := used[sector.Host] From 25688470b80143ade6014c1818b5827e684a3d80 Mon Sep 17 00:00:00 2001 From: PJ Date: Mon, 18 Sep 2023 14:48:42 +0200 Subject: [PATCH 8/8] worker: fix analyzer warning --- worker/worker.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/worker/worker.go b/worker/worker.go index 560a6b80c..f8c9e6ced 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -908,8 +908,13 @@ func (w *worker) slabMigrateHandler(jc jape.Context) { return } + // no migration took place, return early + if used == nil { + return + } + // update the slab - if used != nil && jc.Check("couldn't update slab", w.bus.UpdateSlab(ctx, slab, up.ContractSet, used)) != nil { + if jc.Check("couldn't update slab", w.bus.UpdateSlab(ctx, slab, up.ContractSet, used)) != nil { return } }