Skip to content

Commit

Permalink
Used Contracts (#603)
Browse files Browse the repository at this point in the history
* worker: make sure used contracts map have renewed contract ids

* stores: update fetchUsedContracts

* stores: fix arg

* stores: undo changes

* worker: update migrateSlab

* worker: do not update if no migration took place

* worker: fix typo

* worker: fix analyzer warning
  • Loading branch information
peterjan authored Sep 18, 2023
1 parent 6051137 commit 6afb988
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 56 deletions.
30 changes: 22 additions & 8 deletions worker/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"go.uber.org/zap"
)

func migrateSlab(ctx context.Context, d *downloadManager, u *uploadManager, s *object.Slab, dlContracts, ulContracts []api.ContractMetadata, bh uint64, logger *zap.SugaredLogger) error {
func migrateSlab(ctx context.Context, d *downloadManager, u *uploadManager, s *object.Slab, dlContracts, ulContracts []api.ContractMetadata, bh uint64, logger *zap.SugaredLogger) (map[types.PublicKey]types.FileContractID, error) {
ctx, span := tracing.Tracer.Start(ctx, "migrateSlab")
defer span.End()

Expand Down Expand Up @@ -48,7 +48,7 @@ func migrateSlab(ctx context.Context, d *downloadManager, u *uploadManager, s *o

// if all shards are on good hosts, we're done
if len(shardIndices) == 0 {
return nil
return nil, nil
}

// subtract the number of shards that are on hosts with contracts and might
Expand All @@ -63,16 +63,16 @@ func migrateSlab(ctx context.Context, d *downloadManager, u *uploadManager, s *o

// perform some sanity checks
if len(ulContracts) < int(s.MinShards) {
return fmt.Errorf("not enough hosts to repair unhealthy shard to minimum redundancy, %d<%d", len(ulContracts), int(s.MinShards))
return nil, fmt.Errorf("not enough hosts to repair unhealthy shard to minimum redundancy, %d<%d", len(ulContracts), int(s.MinShards))
}
if len(s.Shards)-missingShards < int(s.MinShards) {
return fmt.Errorf("not enough hosts to download unhealthy shard, %d<%d", len(s.Shards)-len(shardIndices), int(s.MinShards))
return nil, fmt.Errorf("not enough hosts to download unhealthy shard, %d<%d", len(s.Shards)-len(shardIndices), int(s.MinShards))
}

// download the slab
shards, err := d.DownloadSlab(ctx, *s, dlContracts)
if err != nil {
return fmt.Errorf("failed to download slab for migration: %w", err)
return nil, fmt.Errorf("failed to download slab for migration: %w", err)
}
s.Encrypt(shards)

Expand All @@ -91,14 +91,28 @@ func migrateSlab(ctx context.Context, d *downloadManager, u *uploadManager, s *o
}

// migrate the shards
uploaded, err := u.Migrate(ctx, shards, allowed, bh)
uploaded, used, err := u.Migrate(ctx, shards, allowed, bh)
if err != nil {
return fmt.Errorf("failed to upload slab for migration: %w", err)
return nil, fmt.Errorf("failed to upload slab for migration: %w", err)
}

// overwrite the unhealthy shards with the newly migrated ones
for i, si := range shardIndices {
s.Shards[si] = uploaded[i]
}
return nil

// loop all shards and extend the used contracts map so it reflects all used
// contracts, not just the used contracts for the migrated shards
for _, sector := range s.Shards {
_, exists := used[sector.Host]
if !exists {
if fcid, exists := h2c[sector.Host]; !exists {
return nil, fmt.Errorf("couldn't find contract for host %v", sector.Host)
} else {
used[sector.Host] = fcid
}
}
}

return used, nil
}
87 changes: 77 additions & 10 deletions worker/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,16 +214,44 @@ func (mgr *uploadManager) newUploader(c api.ContractMetadata) *uploader {
}
}

func (mgr *uploadManager) Migrate(ctx context.Context, shards [][]byte, contracts []api.ContractMetadata, bh uint64) ([]object.Sector, error) {
func (mgr *uploadManager) Migrate(ctx context.Context, shards [][]byte, contracts []api.ContractMetadata, bh uint64) ([]object.Sector, map[types.PublicKey]types.FileContractID, error) {
// initiate the upload
upload, finishFn, err := mgr.newUpload(ctx, len(shards), contracts, bh)
if err != nil {
return nil, err
return nil, nil, err
}
defer finishFn()

// upload the shards
return upload.uploadShards(ctx, shards, nil)
sectors, err := upload.uploadShards(ctx, shards, nil)
if err != nil {
return nil, nil, err
}

// build host to contract map
h2c := make(map[types.PublicKey]types.FileContractID)
for _, contract := range contracts {
h2c[contract.HostKey] = contract.ID
}

// ask the manager for the renewals
c2r := mgr.renewalsMap()

// build used contracts list
usedContracts := make(map[types.PublicKey]types.FileContractID)
for _, sector := range sectors {
fcid, exists := h2c[sector.Host]
if !exists {
return nil, nil, fmt.Errorf("couldn't find contract for host %v", sector.Host)
}
if renewed, exists := c2r[fcid]; exists {
usedContracts[sector.Host] = renewed
} else {
usedContracts[sector.Host] = fcid
}
}

return sectors, usedContracts, nil
}

func (mgr *uploadManager) Stats() uploadManagerStats {
Expand Down Expand Up @@ -262,7 +290,7 @@ func (mgr *uploadManager) Stop() {
}
}

func (mgr *uploadManager) Upload(ctx context.Context, r io.Reader, rs api.RedundancySettings, contracts []api.ContractMetadata, bh uint64, uploadPacking bool) (_ object.Object, partialSlab []byte, err error) {
func (mgr *uploadManager) Upload(ctx context.Context, r io.Reader, rs api.RedundancySettings, contracts []api.ContractMetadata, bh uint64, uploadPacking bool) (_ object.Object, used map[types.PublicKey]types.FileContractID, partialSlab []byte, err error) {
// cancel all in-flight requests when the upload is done
ctx, cancel := context.WithCancel(ctx)
defer cancel()
Expand All @@ -283,7 +311,7 @@ func (mgr *uploadManager) Upload(ctx context.Context, r io.Reader, rs api.Redund
// create the upload
u, finishFn, err := mgr.newUpload(ctx, rs.TotalShards, contracts, bh)
if err != nil {
return object.Object{}, nil, err
return object.Object{}, nil, nil, err
}
defer finishFn()

Expand All @@ -305,9 +333,9 @@ loop:
for {
select {
case <-mgr.stopChan:
return object.Object{}, nil, errors.New("manager was stopped")
return object.Object{}, nil, nil, errors.New("manager was stopped")
case <-ctx.Done():
return object.Object{}, nil, errors.New("upload timed out")
return object.Object{}, nil, nil, errors.New("upload timed out")
case nextSlabChan <- struct{}{}:
// read next slab's data
data := make([]byte, size)
Expand All @@ -325,7 +353,7 @@ loop:
}
continue
} else if err != nil && err != io.ErrUnexpectedEOF {
return object.Object{}, nil, err
return object.Object{}, nil, nil, err
}
if uploadPacking && errors.Is(err, io.ErrUnexpectedEOF) {
// If uploadPacking is true, we return the partial slab without
Expand All @@ -341,7 +369,7 @@ loop:
slabIndex++
case res := <-respChan:
if res.err != nil {
return object.Object{}, nil, res.err
return object.Object{}, nil, nil, res.err
}

// collect the response and potentially break out of the loop
Expand All @@ -361,7 +389,32 @@ loop:
for _, resp := range responses {
o.Slabs = append(o.Slabs, resp.slab)
}
return o, partialSlab, nil

// build host to contract map
h2c := make(map[types.PublicKey]types.FileContractID)
for _, contract := range contracts {
h2c[contract.HostKey] = contract.ID
}

// ask the manager for the renewals
c2r := mgr.renewalsMap()

// build used contracts list
usedContracts := make(map[types.PublicKey]types.FileContractID)
for _, slab := range o.Slabs {
for _, sector := range slab.Shards {
fcid, exists := h2c[sector.Host]
if !exists {
return object.Object{}, nil, nil, fmt.Errorf("couldn't find contract for host %v", sector.Host)
}
if renewed, exists := c2r[fcid]; exists {
usedContracts[sector.Host] = renewed
} else {
usedContracts[sector.Host] = fcid
}
}
}
return o, usedContracts, partialSlab, nil
}

func (mgr *uploadManager) launch(req *sectorUploadReq) error {
Expand Down Expand Up @@ -516,6 +569,20 @@ func (mgr *uploadManager) renewUploader(u *uploader) {
u.SignalWork()
}

func (mgr *uploadManager) renewalsMap() map[types.FileContractID]types.FileContractID {
mgr.mu.Lock()
defer mgr.mu.Unlock()

renewals := make(map[types.FileContractID]types.FileContractID)
for _, u := range mgr.uploaders {
fcid, renewedFrom, _ := u.contractInfo()
if renewedFrom != (types.FileContractID{}) {
renewals[renewedFrom] = fcid
}
}
return renewals
}

func (mgr *uploadManager) refreshUploaders(contracts []api.ContractMetadata, bh uint64) {
// build map
c2m := make(map[types.FileContractID]api.ContractMetadata)
Expand Down
47 changes: 9 additions & 38 deletions worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -902,26 +902,19 @@ func (w *worker) slabMigrateHandler(jc jape.Context) {
return
}

err = migrateSlab(ctx, w.downloadManager, w.uploadManager, &slab, dlContracts, ulContracts, up.CurrentHeight, w.logger)
// migrate the slab
used, err := migrateSlab(ctx, w.downloadManager, w.uploadManager, &slab, dlContracts, ulContracts, up.CurrentHeight, w.logger)
if jc.Check("couldn't migrate slabs", err) != nil {
return
}

usedContracts := make(map[types.PublicKey]types.FileContractID)
for _, ss := range slab.Shards {
if _, exists := usedContracts[ss.Host]; exists {
continue
}

for _, c := range ulContracts {
if c.HostKey == ss.Host {
usedContracts[ss.Host] = c.ID
break
}
}
// no migration took place, return early
if used == nil {
return
}

if jc.Check("couldn't update slab", w.bus.UpdateSlab(ctx, slab, up.ContractSet, usedContracts)) != nil {
// update the slab
if jc.Check("couldn't update slab", w.bus.UpdateSlab(ctx, slab, up.ContractSet, used)) != nil {
return
}
}
Expand Down Expand Up @@ -1144,23 +1137,11 @@ func (w *worker) objectsHandlerPUT(jc jape.Context) {
}

// upload the object
obj, partialSlabData, err := w.uploadManager.Upload(ctx, jc.Request.Body, rs, contracts, up.CurrentHeight, up.UploadPacking)
obj, used, partialSlabData, err := w.uploadManager.Upload(ctx, jc.Request.Body, rs, contracts, up.CurrentHeight, up.UploadPacking)
if jc.Check("couldn't upload object", err) != nil {
return
}

// build used contracts map
h2c := make(map[types.PublicKey]types.FileContractID)
for _, c := range contracts {
h2c[c.HostKey] = c.ID
}
used := make(map[types.PublicKey]types.FileContractID)
for _, s := range obj.Slabs {
for _, ss := range s.Shards {
used[ss.Host] = h2c[ss.Host]
}
}

if len(partialSlabData) > 0 {
partialSlabs, err := w.bus.AddPartialSlab(jc.Request.Context(), partialSlabData, uint8(rs.MinShards), uint8(rs.TotalShards), up.ContractSet)
if jc.Check("couldn't add partial slabs to bus", err) != nil {
Expand Down Expand Up @@ -1188,21 +1169,11 @@ func (w *worker) objectsHandlerPUT(jc jape.Context) {
for _, ps := range packedSlabs {
// upload packed slab.
shards := encryptPartialSlab(ps.Data, ps.Key, uint8(rs.MinShards), uint8(rs.TotalShards))
sectors, err := w.uploadManager.Migrate(ctx, shards, contracts, up.CurrentHeight)
sectors, used, err := w.uploadManager.Migrate(ctx, shards, contracts, up.CurrentHeight)
if jc.Check("couldn't upload packed slab", err) != nil {
return
}

// build used contracts map
h2c := make(map[types.PublicKey]types.FileContractID)
for _, c := range contracts {
h2c[c.HostKey] = c.ID
}
used := make(map[types.PublicKey]types.FileContractID)
for _, s := range sectors {
used[s.Host] = h2c[s.Host]
}

// mark packed slab as uploaded.
err = w.bus.MarkPackedSlabsUploaded(jc.Request.Context(), []api.UploadedPackedSlab{
{
Expand Down

0 comments on commit 6afb988

Please sign in to comment.