Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Used Contracts #603

Merged
merged 9 commits into from
Sep 18, 2023
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 22 additions & 8 deletions worker/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"go.uber.org/zap"
)

func migrateSlab(ctx context.Context, d *downloadManager, u *uploadManager, s *object.Slab, dlContracts, ulContracts []api.ContractMetadata, bh uint64, logger *zap.SugaredLogger) error {
func migrateSlab(ctx context.Context, d *downloadManager, u *uploadManager, s *object.Slab, dlContracts, ulContracts []api.ContractMetadata, bh uint64, logger *zap.SugaredLogger) (map[types.PublicKey]types.FileContractID, error) {
ctx, span := tracing.Tracer.Start(ctx, "migrateSlab")
defer span.End()

Expand Down Expand Up @@ -48,7 +48,7 @@ func migrateSlab(ctx context.Context, d *downloadManager, u *uploadManager, s *o

// if all shards are on good hosts, we're done
if len(shardIndices) == 0 {
return nil
return nil, nil
}

// subtract the number of shards that are on hosts with contracts and might
Expand All @@ -63,16 +63,16 @@ func migrateSlab(ctx context.Context, d *downloadManager, u *uploadManager, s *o

// perform some sanity checks
if len(ulContracts) < int(s.MinShards) {
return fmt.Errorf("not enough hosts to repair unhealthy shard to minimum redundancy, %d<%d", len(ulContracts), int(s.MinShards))
return nil, fmt.Errorf("not enough hosts to repair unhealthy shard to minimum redundancy, %d<%d", len(ulContracts), int(s.MinShards))
}
if len(s.Shards)-missingShards < int(s.MinShards) {
return fmt.Errorf("not enough hosts to download unhealthy shard, %d<%d", len(s.Shards)-len(shardIndices), int(s.MinShards))
return nil, fmt.Errorf("not enough hosts to download unhealthy shard, %d<%d", len(s.Shards)-len(shardIndices), int(s.MinShards))
}

// download the slab
shards, err := d.DownloadSlab(ctx, *s, dlContracts)
if err != nil {
return fmt.Errorf("failed to download slab for migration: %w", err)
return nil, fmt.Errorf("failed to download slab for migration: %w", err)
}
s.Encrypt(shards)

Expand All @@ -91,14 +91,28 @@ func migrateSlab(ctx context.Context, d *downloadManager, u *uploadManager, s *o
}

// migrate the shards
uploaded, err := u.Migrate(ctx, shards, allowed, bh)
uploaded, used, err := u.Migrate(ctx, shards, allowed, bh)
if err != nil {
return fmt.Errorf("failed to upload slab for migration: %w", err)
return nil, fmt.Errorf("failed to upload slab for migration: %w", err)
}

// overwrite the unhealthy shards with the newly migrated ones
for i, si := range shardIndices {
s.Shards[si] = uploaded[i]
}
return nil

// loop all shards and extend the used contracts map so it reflects all used
// contracts, not just the used contracts for the migrated shards
for _, sector := range s.Shards {
_, exists := used[sector.Host]
if !exists {
if fcid, exists := h2c[sector.Host]; !exists {
return nil, fmt.Errorf("couldn't find contract for host %v", sector.Host)
} else {
used[sector.Host] = fcid
}
}
}

return used, nil
}
87 changes: 77 additions & 10 deletions worker/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,16 +214,44 @@ func (mgr *uploadManager) newUploader(c api.ContractMetadata) *uploader {
}
}

func (mgr *uploadManager) Migrate(ctx context.Context, shards [][]byte, contracts []api.ContractMetadata, bh uint64) ([]object.Sector, error) {
func (mgr *uploadManager) Migrate(ctx context.Context, shards [][]byte, contracts []api.ContractMetadata, bh uint64) ([]object.Sector, map[types.PublicKey]types.FileContractID, error) {
// initiate the upload
upload, finishFn, err := mgr.newUpload(ctx, len(shards), contracts, bh)
if err != nil {
return nil, err
return nil, nil, err
}
defer finishFn()

// upload the shards
return upload.uploadShards(ctx, shards, nil)
sectors, err := upload.uploadShards(ctx, shards, nil)
if err != nil {
return nil, nil, err
}

// build host to contract map
h2c := make(map[types.PublicKey]types.FileContractID)
for _, contract := range contracts {
h2c[contract.HostKey] = contract.ID
}

// ask the manager for the renewals
c2r := mgr.renewalsMap()

// build used contracts list
usedContracts := make(map[types.PublicKey]types.FileContractID)
for _, sector := range sectors {
fcid, exists := h2c[sector.Host]
if !exists {
return nil, nil, fmt.Errorf("couldn't find contract for host %v", sector.Host)
}
if renewed, exists := c2r[fcid]; exists {
usedContracts[sector.Host] = renewed
} else {
usedContracts[sector.Host] = fcid
}
}

return sectors, usedContracts, nil
}

func (mgr *uploadManager) Stats() uploadManagerStats {
Expand Down Expand Up @@ -262,7 +290,7 @@ func (mgr *uploadManager) Stop() {
}
}

func (mgr *uploadManager) Upload(ctx context.Context, r io.Reader, rs api.RedundancySettings, contracts []api.ContractMetadata, bh uint64, uploadPacking bool) (_ object.Object, partialSlab []byte, err error) {
func (mgr *uploadManager) Upload(ctx context.Context, r io.Reader, rs api.RedundancySettings, contracts []api.ContractMetadata, bh uint64, uploadPacking bool) (_ object.Object, used map[types.PublicKey]types.FileContractID, partialSlab []byte, err error) {
// cancel all in-flight requests when the upload is done
ctx, cancel := context.WithCancel(ctx)
defer cancel()
Expand All @@ -283,7 +311,7 @@ func (mgr *uploadManager) Upload(ctx context.Context, r io.Reader, rs api.Redund
// create the upload
u, finishFn, err := mgr.newUpload(ctx, rs.TotalShards, contracts, bh)
if err != nil {
return object.Object{}, nil, err
return object.Object{}, nil, nil, err
}
defer finishFn()

Expand All @@ -305,9 +333,9 @@ loop:
for {
select {
case <-mgr.stopChan:
return object.Object{}, nil, errors.New("manager was stopped")
return object.Object{}, nil, nil, errors.New("manager was stopped")
case <-ctx.Done():
return object.Object{}, nil, errors.New("upload timed out")
return object.Object{}, nil, nil, errors.New("upload timed out")
case nextSlabChan <- struct{}{}:
// read next slab's data
data := make([]byte, size)
Expand All @@ -325,7 +353,7 @@ loop:
}
continue
} else if err != nil && err != io.ErrUnexpectedEOF {
return object.Object{}, nil, err
return object.Object{}, nil, nil, err
}
if uploadPacking && errors.Is(err, io.ErrUnexpectedEOF) {
// If uploadPacking is true, we return the partial slab without
Expand All @@ -341,7 +369,7 @@ loop:
slabIndex++
case res := <-respChan:
if res.err != nil {
return object.Object{}, nil, res.err
return object.Object{}, nil, nil, res.err
}

// collect the response and potentially break out of the loop
Expand All @@ -361,7 +389,32 @@ loop:
for _, resp := range responses {
o.Slabs = append(o.Slabs, resp.slab)
}
return o, partialSlab, nil

// build host to contract map
h2c := make(map[types.PublicKey]types.FileContractID)
for _, contract := range contracts {
h2c[contract.HostKey] = contract.ID
}

// ask the manager for the renewals
c2r := mgr.renewalsMap()

// build used contracts list
usedContracts := make(map[types.PublicKey]types.FileContractID)
for _, slab := range o.Slabs {
for _, sector := range slab.Shards {
fcid, exists := h2c[sector.Host]
if !exists {
return object.Object{}, nil, nil, fmt.Errorf("couldn't find contract for host %v", sector.Host)
}
if renewed, exists := c2r[fcid]; exists {
usedContracts[sector.Host] = renewed
} else {
usedContracts[sector.Host] = fcid
}
}
}
return o, usedContracts, partialSlab, nil
}

func (mgr *uploadManager) launch(req *sectorUploadReq) error {
Expand Down Expand Up @@ -516,6 +569,20 @@ func (mgr *uploadManager) renewUploader(u *uploader) {
u.SignalWork()
}

func (mgr *uploadManager) renewalsMap() map[types.FileContractID]types.FileContractID {
mgr.mu.Lock()
defer mgr.mu.Unlock()

renewals := make(map[types.FileContractID]types.FileContractID)
for _, u := range mgr.uploaders {
fcid, renewedFrom, _ := u.contractInfo()
if renewedFrom != (types.FileContractID{}) {
renewals[renewedFrom] = fcid
}
}
return renewals
}

func (mgr *uploadManager) refreshUploaders(contracts []api.ContractMetadata, bh uint64) {
// build map
c2m := make(map[types.FileContractID]api.ContractMetadata)
Expand Down
46 changes: 6 additions & 40 deletions worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -902,26 +902,14 @@
return
}

err = migrateSlab(ctx, w.downloadManager, w.uploadManager, &slab, dlContracts, ulContracts, up.CurrentHeight, w.logger)
// migrate the slab
used, err := migrateSlab(ctx, w.downloadManager, w.uploadManager, &slab, dlContracts, ulContracts, up.CurrentHeight, w.logger)
if jc.Check("couldn't migrate slabs", err) != nil {
return
}

usedContracts := make(map[types.PublicKey]types.FileContractID)
for _, ss := range slab.Shards {
if _, exists := usedContracts[ss.Host]; exists {
continue
}

for _, c := range ulContracts {
if c.HostKey == ss.Host {
usedContracts[ss.Host] = c.ID
break
}
}
}

if jc.Check("couldn't update slab", w.bus.UpdateSlab(ctx, slab, up.ContractSet, usedContracts)) != nil {
// update the slab
if used != nil && jc.Check("couldn't update slab", w.bus.UpdateSlab(ctx, slab, up.ContractSet, used)) != nil {

Check failure on line 912 in worker/worker.go

View workflow job for this annotation

GitHub Actions / test (ubuntu-latest, 1.21)

Weird condition expression; please stick to either (x != nil || y != nil || ...) or (x == nil && y == nil && ...)

Check failure on line 912 in worker/worker.go

View workflow job for this annotation

GitHub Actions / test (ubuntu-latest, 1.21)

Analyzer warning in worker

Weird condition expression; please stick to either (x != nil || y != nil || ...) or (x == nil && y == nil && ...)
return
}
}
Expand Down Expand Up @@ -1144,23 +1132,11 @@
}

// upload the object
obj, partialSlabData, err := w.uploadManager.Upload(ctx, jc.Request.Body, rs, contracts, up.CurrentHeight, up.UploadPacking)
obj, used, partialSlabData, err := w.uploadManager.Upload(ctx, jc.Request.Body, rs, contracts, up.CurrentHeight, up.UploadPacking)
if jc.Check("couldn't upload object", err) != nil {
return
}

// build used contracts map
h2c := make(map[types.PublicKey]types.FileContractID)
for _, c := range contracts {
h2c[c.HostKey] = c.ID
}
used := make(map[types.PublicKey]types.FileContractID)
for _, s := range obj.Slabs {
for _, ss := range s.Shards {
used[ss.Host] = h2c[ss.Host]
}
}

if len(partialSlabData) > 0 {
partialSlabs, err := w.bus.AddPartialSlab(jc.Request.Context(), partialSlabData, uint8(rs.MinShards), uint8(rs.TotalShards), up.ContractSet)
if jc.Check("couldn't add partial slabs to bus", err) != nil {
Expand Down Expand Up @@ -1188,21 +1164,11 @@
for _, ps := range packedSlabs {
// upload packed slab.
shards := encryptPartialSlab(ps.Data, ps.Key, uint8(rs.MinShards), uint8(rs.TotalShards))
sectors, err := w.uploadManager.Migrate(ctx, shards, contracts, up.CurrentHeight)
sectors, used, err := w.uploadManager.Migrate(ctx, shards, contracts, up.CurrentHeight)
if jc.Check("couldn't upload packed slab", err) != nil {
return
}

// build used contracts map
h2c := make(map[types.PublicKey]types.FileContractID)
for _, c := range contracts {
h2c[c.HostKey] = c.ID
}
used := make(map[types.PublicKey]types.FileContractID)
for _, s := range sectors {
used[s.Host] = h2c[s.Host]
}

// mark packed slab as uploaded.
err = w.bus.MarkPackedSlabsUploaded(jc.Request.Context(), []api.UploadedPackedSlab{
{
Expand Down
Loading