diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml index 73b96965a..65e4f8ed4 100644 --- a/.github/workflows/publish.yml +++ b/.github/workflows/publish.yml @@ -1,13 +1,13 @@ name: Publish -# Controls when the action will run. +# Controls when the action will run. on: # Triggers the workflow on new SemVer tags push: branches: - master - dev - tags: + tags: - 'v[0-9]+.[0-9]+.[0-9]+' - 'v[0-9]+.[0-9]+.[0-9]+-**' @@ -116,7 +116,7 @@ jobs: with: name: renterd path: release/ - build-mac: + build-mac: runs-on: macos-latest strategy: matrix: @@ -212,7 +212,7 @@ jobs: with: name: renterd path: release/ - build-windows: + build-windows: runs-on: windows-latest strategy: matrix: @@ -253,23 +253,21 @@ jobs: with: name: renterd path: release/ - dispatch: + + dispatch-homebrew: # only runs on full releases if: startsWith(github.ref, 'refs/tags/v') && !contains(github.ref, '-') needs: [docker, build-linux, build-mac, build-windows] - strategy: - matrix: - repo: ['siafoundation/homebrew-sia', 'siafoundation/linux'] runs-on: ubuntu-latest steps: - name: Extract Tag Name id: get_tag run: echo "tag_name=${GITHUB_REF#refs/tags/}" >> $GITHUB_ENV - - name: Repository Dispatch + - name: Dispatch uses: peter-evans/repository-dispatch@v3 with: token: ${{ secrets.PAT_REPOSITORY_DISPATCH }} - repository: ${{ matrix.repo }} + repository: siafoundation/homebrew-sia event-type: release-tagged client-payload: > { @@ -277,4 +275,40 @@ jobs: "tag": "${{ env.tag_name }}", "project": "renterd", "workflow_id": "${{ github.run_id }}" - } \ No newline at end of file + } + dispatch-linux: # run on full releases, release candidates, and master branch + if: startsWith(github.ref, 'refs/tags/v') || endsWith(github.ref, 'master') + needs: [docker, build-linux, build-mac, build-windows] + runs-on: ubuntu-latest + steps: + - name: Build Dispatch Payload + id: get_payload + uses: actions/github-script@v7 + with: + script: | + const isRelease = context.ref.startsWith('refs/tags/v'), + isBeta = isRelease && context.ref.includes('-beta'), + tag = isRelease ? context.ref.replace('refs/tags/', '') : 'master'; + + let component = 'nightly'; + if (isBeta) { + component = 'beta'; + } else if (isRelease) { + component = 'main'; + } + + return { + description: "renterd: The Next-Gen Sia Renter", + tag: tag, + project: "renterd", + workflow_id: context.runId, + component: component + }; + + - name: Dispatch + uses: peter-evans/repository-dispatch@v3 + with: + token: ${{ secrets.PAT_REPOSITORY_DISPATCH }} + repository: siafoundation/linux + event-type: release-tagged + client-payload: ${{ steps.get_payload.outputs.result }} \ No newline at end of file diff --git a/autopilot/contractor.go b/autopilot/contractor.go index 3ac9c1d05..82ef0269e 100644 --- a/autopilot/contractor.go +++ b/autopilot/contractor.go @@ -32,6 +32,10 @@ const ( // contract. estimatedFileContractTransactionSetSize = 2048 + // failedRenewalForgivenessPeriod is the amount of time we wait before + // punishing a contract for not being able to refresh + failedRefreshForgivenessPeriod = 24 * time.Hour + // leewayPctCandidateHosts is the leeway we apply when fetching candidate // hosts, we fetch ~10% more than required leewayPctCandidateHosts = 1.1 @@ -95,6 +99,8 @@ type ( revisionLastBroadcast map[types.FileContractID]time.Time revisionSubmissionBuffer uint64 + firstRefreshFailure map[types.FileContractID]time.Time + mu sync.Mutex pruning bool @@ -152,6 +158,8 @@ func newContractor(ap *Autopilot, revisionSubmissionBuffer uint64, revisionBroad revisionLastBroadcast: make(map[types.FileContractID]time.Time), revisionSubmissionBuffer: revisionSubmissionBuffer, + firstRefreshFailure: make(map[types.FileContractID]time.Time), + resolver: newIPResolver(ap.shutdownCtx, resolverLookupTimeout, ap.logger.Named("resolver")), } } @@ -216,6 +224,9 @@ func (c *contractor) performContractMaintenance(ctx context.Context, w Worker) ( contracts := resp.Contracts c.logger.Infof("fetched %d contracts from the worker, took %v", len(resp.Contracts), time.Since(start)) + // prune contract refresh failure map + c.pruneContractRefreshFailures(contracts) + // run revision broadcast c.runRevisionBroadcast(ctx, w, contracts, isInCurrentSet) @@ -1595,6 +1606,27 @@ func (c *contractor) hostForContract(ctx context.Context, fcid types.FileContrac return } +func (c *contractor) pruneContractRefreshFailures(contracts []api.Contract) { + contractMap := make(map[types.FileContractID]struct{}) + for _, contract := range contracts { + contractMap[contract.ID] = struct{}{} + } + for fcid := range c.firstRefreshFailure { + if _, ok := contractMap[fcid]; !ok { + delete(c.firstRefreshFailure, fcid) + } + } +} + +func (c *contractor) shouldForgiveFailedRefresh(fcid types.FileContractID) bool { + lastFailure, exists := c.firstRefreshFailure[fcid] + if !exists { + lastFailure = time.Now() + c.firstRefreshFailure[fcid] = lastFailure + } + return time.Since(lastFailure) < failedRefreshForgivenessPeriod +} + func addLeeway(n uint64, pct float64) uint64 { if pct < 0 { panic("given leeway percent has to be positive") diff --git a/autopilot/contractor_test.go b/autopilot/contractor_test.go index 575605612..9ce54daf5 100644 --- a/autopilot/contractor_test.go +++ b/autopilot/contractor_test.go @@ -3,8 +3,12 @@ package autopilot import ( "math" "testing" + "time" + "go.sia.tech/core/types" + "go.sia.tech/renterd/api" "go.uber.org/zap" + "lukechampine.com/frand" ) func TestCalculateMinScore(t *testing.T) { @@ -35,3 +39,30 @@ func TestCalculateMinScore(t *testing.T) { t.Fatalf("expected minScore to be math.SmallestNonzeroFLoat64 but was %v", minScore) } } + +func TestShouldForgiveFailedRenewal(t *testing.T) { + var fcid types.FileContractID + frand.Read(fcid[:]) + c := &contractor{ + firstRefreshFailure: make(map[types.FileContractID]time.Time), + } + + // try twice since the first time will set the failure time + if !c.shouldForgiveFailedRefresh(fcid) { + t.Fatal("should forgive") + } else if !c.shouldForgiveFailedRefresh(fcid) { + t.Fatal("should forgive") + } + + // set failure to be a full period in the past + c.firstRefreshFailure[fcid] = time.Now().Add(-failedRefreshForgivenessPeriod - time.Second) + if c.shouldForgiveFailedRefresh(fcid) { + t.Fatal("should not forgive") + } + + // prune map + c.pruneContractRefreshFailures([]api.Contract{}) + if len(c.firstRefreshFailure) != 0 { + t.Fatal("expected no failures") + } +} diff --git a/autopilot/hostfilter.go b/autopilot/hostfilter.go index 61edbbe69..15a2147ab 100644 --- a/autopilot/hostfilter.go +++ b/autopilot/hostfilter.go @@ -185,8 +185,8 @@ func (c *contractor) isUsableContract(cfg api.AutopilotConfig, state state, ci c } if isOutOfFunds(cfg, pt, contract) { reasons = append(reasons, errContractOutOfFunds.Error()) - usable = false - recoverable = true + usable = usable && c.shouldForgiveFailedRefresh(contract.ID) + recoverable = !usable // only needs to be recoverable if !usable refresh = true renew = false } diff --git a/bus/bus.go b/bus/bus.go index 24f319de6..a6b86c0e1 100644 --- a/bus/bus.go +++ b/bus/bus.go @@ -1019,7 +1019,7 @@ func (b *bus) contractsPrunableDataHandlerGET(jc jape.Context) { // adjust the amount of prunable data with the pending uploads, due to // how we record contract spending a contract's size might already // include pending sectors - pending := b.uploadingSectors.pending(fcid) + pending := b.uploadingSectors.Pending(fcid) if pending > size.Prunable { size.Prunable = 0 } else { @@ -1066,7 +1066,7 @@ func (b *bus) contractSizeHandlerGET(jc jape.Context) { // adjust the amount of prunable data with the pending uploads, due to how // we record contract spending a contract's size might already include // pending sectors - pending := b.uploadingSectors.pending(id) + pending := b.uploadingSectors.Pending(id) if pending > size.Prunable { size.Prunable = 0 } else { @@ -1143,6 +1143,7 @@ func (b *bus) contractIDRenewedHandlerPOST(jc jape.Context) { if jc.Check("couldn't store contract", err) == nil { jc.Encode(r) } + b.uploadingSectors.HandleRenewal(req.Contract.ID(), req.RenewedFrom) } func (b *bus) contractIDRootsHandlerGET(jc jape.Context) { @@ -1155,7 +1156,7 @@ func (b *bus) contractIDRootsHandlerGET(jc jape.Context) { if jc.Check("couldn't fetch contract sectors", err) == nil { jc.Encode(api.ContractRootsResponse{ Roots: roots, - Uploading: b.uploadingSectors.sectors(id), + Uploading: b.uploadingSectors.Sectors(id), }) } } @@ -2016,7 +2017,7 @@ func (b *bus) stateHandlerGET(jc jape.Context) { func (b *bus) uploadTrackHandlerPOST(jc jape.Context) { var id api.UploadID if jc.DecodeParam("id", &id) == nil { - jc.Check("failed to track upload", b.uploadingSectors.trackUpload(id)) + jc.Check("failed to track upload", b.uploadingSectors.StartUpload(id)) } } @@ -2029,13 +2030,13 @@ func (b *bus) uploadAddSectorHandlerPOST(jc jape.Context) { if jc.Decode(&req) != nil { return } - jc.Check("failed to add sector", b.uploadingSectors.addUploadingSector(id, req.ContractID, req.Root)) + jc.Check("failed to add sector", b.uploadingSectors.AddSector(id, req.ContractID, req.Root)) } func (b *bus) uploadFinishedHandlerDELETE(jc jape.Context) { var id api.UploadID if jc.DecodeParam("id", &id) == nil { - b.uploadingSectors.finishUpload(id) + b.uploadingSectors.FinishUpload(id) } } diff --git a/bus/uploadingsectors.go b/bus/uploadingsectors.go index 6a3917d50..18c64a7c5 100644 --- a/bus/uploadingsectors.go +++ b/bus/uploadingsectors.go @@ -19,12 +19,12 @@ const ( type ( uploadingSectorsCache struct { - mu sync.Mutex - uploads map[api.UploadID]*ongoingUpload + mu sync.Mutex + uploads map[api.UploadID]*ongoingUpload + renewedTo map[types.FileContractID]types.FileContractID } ongoingUpload struct { - mu sync.Mutex started time.Time contractSectors map[types.FileContractID][]types.Hash256 } @@ -32,82 +32,92 @@ type ( func newUploadingSectorsCache() *uploadingSectorsCache { return &uploadingSectorsCache{ - uploads: make(map[api.UploadID]*ongoingUpload), + uploads: make(map[api.UploadID]*ongoingUpload), + renewedTo: make(map[types.FileContractID]types.FileContractID), } } func (ou *ongoingUpload) addSector(fcid types.FileContractID, root types.Hash256) { - ou.mu.Lock() - defer ou.mu.Unlock() ou.contractSectors[fcid] = append(ou.contractSectors[fcid], root) } func (ou *ongoingUpload) sectors(fcid types.FileContractID) (roots []types.Hash256) { - ou.mu.Lock() - defer ou.mu.Unlock() if sectors, exists := ou.contractSectors[fcid]; exists && time.Since(ou.started) < cacheExpiry { roots = append(roots, sectors...) } return } -func (usc *uploadingSectorsCache) addUploadingSector(uID api.UploadID, fcid types.FileContractID, root types.Hash256) error { - // fetch ongoing upload +func (usc *uploadingSectorsCache) AddSector(uID api.UploadID, fcid types.FileContractID, root types.Hash256) error { usc.mu.Lock() - ongoing, exists := usc.uploads[uID] - usc.mu.Unlock() + defer usc.mu.Unlock() - // add sector if upload exists - if exists { - ongoing.addSector(fcid, root) - return nil + ongoing, ok := usc.uploads[uID] + if !ok { + return fmt.Errorf("%w; id '%v'", api.ErrUnknownUpload, uID) } - return fmt.Errorf("%w; id '%v'", api.ErrUnknownUpload, uID) + fcid = usc.latestFCID(fcid) + ongoing.addSector(fcid, root) + return nil } -func (usc *uploadingSectorsCache) pending(fcid types.FileContractID) (size uint64) { +func (usc *uploadingSectorsCache) FinishUpload(uID api.UploadID) { usc.mu.Lock() - var uploads []*ongoingUpload - for _, ongoing := range usc.uploads { - uploads = append(uploads, ongoing) + defer usc.mu.Unlock() + delete(usc.uploads, uID) + + // prune expired uploads + for uID, ongoing := range usc.uploads { + if time.Since(ongoing.started) > cacheExpiry { + delete(usc.uploads, uID) + } } - usc.mu.Unlock() - for _, ongoing := range uploads { - size += uint64(len(ongoing.sectors(fcid))) * rhp.SectorSize + // prune renewed to map + for old, new := range usc.renewedTo { + if _, exists := usc.renewedTo[new]; exists { + delete(usc.renewedTo, old) + } } - return } -func (usc *uploadingSectorsCache) sectors(fcid types.FileContractID) (roots []types.Hash256) { +func (usc *uploadingSectorsCache) HandleRenewal(fcid, renewedFrom types.FileContractID) { usc.mu.Lock() - var uploads []*ongoingUpload - for _, ongoing := range usc.uploads { - uploads = append(uploads, ongoing) + defer usc.mu.Unlock() + + for _, upload := range usc.uploads { + if _, exists := upload.contractSectors[renewedFrom]; exists { + upload.contractSectors[fcid] = upload.contractSectors[renewedFrom] + upload.contractSectors[renewedFrom] = nil + } } - usc.mu.Unlock() + usc.renewedTo[renewedFrom] = fcid +} - for _, ongoing := range uploads { - roots = append(roots, ongoing.sectors(fcid)...) +func (usc *uploadingSectorsCache) Pending(fcid types.FileContractID) (size uint64) { + usc.mu.Lock() + defer usc.mu.Unlock() + + fcid = usc.latestFCID(fcid) + for _, ongoing := range usc.uploads { + size += uint64(len(ongoing.sectors(fcid))) * rhp.SectorSize } return } -func (usc *uploadingSectorsCache) finishUpload(uID api.UploadID) { +func (usc *uploadingSectorsCache) Sectors(fcid types.FileContractID) (roots []types.Hash256) { usc.mu.Lock() defer usc.mu.Unlock() - delete(usc.uploads, uID) - // prune expired uploads - for uID, ongoing := range usc.uploads { - if time.Since(ongoing.started) > cacheExpiry { - delete(usc.uploads, uID) - } + fcid = usc.latestFCID(fcid) + for _, ongoing := range usc.uploads { + roots = append(roots, ongoing.sectors(fcid)...) } + return } -func (usc *uploadingSectorsCache) trackUpload(uID api.UploadID) error { +func (usc *uploadingSectorsCache) StartUpload(uID api.UploadID) error { usc.mu.Lock() defer usc.mu.Unlock() @@ -122,3 +132,10 @@ func (usc *uploadingSectorsCache) trackUpload(uID api.UploadID) error { } return nil } + +func (usc *uploadingSectorsCache) latestFCID(fcid types.FileContractID) types.FileContractID { + if latest, ok := usc.renewedTo[fcid]; ok { + return latest + } + return fcid +} diff --git a/bus/uploadingsectors_test.go b/bus/uploadingsectors_test.go index 244280c70..b1c9b725a 100644 --- a/bus/uploadingsectors_test.go +++ b/bus/uploadingsectors_test.go @@ -4,6 +4,7 @@ import ( "errors" "testing" + rhpv2 "go.sia.tech/core/rhp/v2" "go.sia.tech/core/types" "go.sia.tech/renterd/api" "lukechampine.com/frand" @@ -15,20 +16,24 @@ func TestUploadingSectorsCache(t *testing.T) { uID1 := newTestUploadID() uID2 := newTestUploadID() - c.trackUpload(uID1) - c.trackUpload(uID2) + fcid1 := types.FileContractID{1} + fcid2 := types.FileContractID{2} + fcid3 := types.FileContractID{3} - _ = c.addUploadingSector(uID1, types.FileContractID{1}, types.Hash256{1}) - _ = c.addUploadingSector(uID1, types.FileContractID{2}, types.Hash256{2}) - _ = c.addUploadingSector(uID2, types.FileContractID{2}, types.Hash256{3}) + c.StartUpload(uID1) + c.StartUpload(uID2) - if roots1 := c.sectors(types.FileContractID{1}); len(roots1) != 1 || roots1[0] != (types.Hash256{1}) { + _ = c.AddSector(uID1, fcid1, types.Hash256{1}) + _ = c.AddSector(uID1, fcid2, types.Hash256{2}) + _ = c.AddSector(uID2, fcid2, types.Hash256{3}) + + if roots1 := c.Sectors(fcid1); len(roots1) != 1 || roots1[0] != (types.Hash256{1}) { t.Fatal("unexpected cached sectors") } - if roots2 := c.sectors(types.FileContractID{2}); len(roots2) != 2 { + if roots2 := c.Sectors(fcid2); len(roots2) != 2 { t.Fatal("unexpected cached sectors", roots2) } - if roots3 := c.sectors(types.FileContractID{3}); len(roots3) != 0 { + if roots3 := c.Sectors(fcid3); len(roots3) != 0 { t.Fatal("unexpected cached sectors") } @@ -39,28 +44,73 @@ func TestUploadingSectorsCache(t *testing.T) { t.Fatal("unexpected") } - c.finishUpload(uID1) - if roots1 := c.sectors(types.FileContractID{1}); len(roots1) != 0 { + c.FinishUpload(uID1) + if roots1 := c.Sectors(fcid1); len(roots1) != 0 { t.Fatal("unexpected cached sectors") } - if roots2 := c.sectors(types.FileContractID{2}); len(roots2) != 1 || roots2[0] != (types.Hash256{3}) { + if roots2 := c.Sectors(fcid2); len(roots2) != 1 || roots2[0] != (types.Hash256{3}) { t.Fatal("unexpected cached sectors") } - c.finishUpload(uID2) - if roots2 := c.sectors(types.FileContractID{1}); len(roots2) != 0 { + c.FinishUpload(uID2) + if roots2 := c.Sectors(fcid1); len(roots2) != 0 { t.Fatal("unexpected cached sectors") } - if err := c.addUploadingSector(uID1, types.FileContractID{1}, types.Hash256{1}); !errors.Is(err, api.ErrUnknownUpload) { + if err := c.AddSector(uID1, fcid1, types.Hash256{1}); !errors.Is(err, api.ErrUnknownUpload) { t.Fatal("unexpected error", err) } - if err := c.trackUpload(uID1); err != nil { + if err := c.StartUpload(uID1); err != nil { t.Fatal("unexpected error", err) } - if err := c.trackUpload(uID1); !errors.Is(err, api.ErrUploadAlreadyExists) { + if err := c.StartUpload(uID1); !errors.Is(err, api.ErrUploadAlreadyExists) { t.Fatal("unexpected error", err) } + + // reset cache + c = newUploadingSectorsCache() + + // track upload that uploads across two contracts + c.StartUpload(uID1) + c.AddSector(uID1, fcid1, types.Hash256{1}) + c.AddSector(uID1, fcid1, types.Hash256{2}) + c.HandleRenewal(fcid2, fcid1) + c.AddSector(uID1, fcid2, types.Hash256{3}) + c.AddSector(uID1, fcid2, types.Hash256{4}) + + // assert pending sizes for both contracts should be 4 sectors + p1 := c.Pending(fcid1) + p2 := c.Pending(fcid2) + if p1 != p2 || p1 != 4*rhpv2.SectorSize { + t.Fatal("unexpected pending size", p1/rhpv2.SectorSize, p2/rhpv2.SectorSize) + } + + // assert sectors for both contracts contain 4 sectors + s1 := c.Sectors(fcid1) + s2 := c.Sectors(fcid2) + if len(s1) != 4 || len(s2) != 4 { + t.Fatal("unexpected sectors", len(s1), len(s2)) + } + + // finish upload + c.FinishUpload(uID1) + s1 = c.Sectors(fcid1) + s2 = c.Sectors(fcid2) + if len(s1) != 0 || len(s2) != 0 { + t.Fatal("unexpected sectors", len(s1), len(s2)) + } + + // renew the contract + c.HandleRenewal(fcid3, fcid2) + + // trigger pruning + c.StartUpload(uID2) + c.FinishUpload(uID2) + + // assert renewedTo gets pruned + if len(c.renewedTo) != 1 { + t.Fatal("unexpected", len(c.renewedTo)) + } } func newTestUploadID() api.UploadID { diff --git a/internal/test/e2e/s3_test.go b/internal/test/e2e/s3_test.go index 6c13e8426..daaefed5e 100644 --- a/internal/test/e2e/s3_test.go +++ b/internal/test/e2e/s3_test.go @@ -113,6 +113,12 @@ func TestS3Basic(t *testing.T) { t.Fatal("unexpected ETag:", info.ETag) } + // stat object that doesn't exist + _, err = s3.StatObject(context.Background(), bucket, "nonexistent", minio.StatObjectOptions{}) + if err == nil || !strings.Contains(err.Error(), "The specified key does not exist") { + t.Fatal(err) + } + // add another bucket tt.OK(s3.MakeBucket(context.Background(), bucket+"2", minio.MakeBucketOptions{})) diff --git a/worker/client/client.go b/worker/client/client.go index d658ac027..fe284469f 100644 --- a/worker/client/client.go +++ b/worker/client/client.go @@ -100,9 +100,13 @@ func (c *Client) HeadObject(ctx context.Context, bucket, path string, opts api.H return nil, err } if resp.StatusCode != 200 && resp.StatusCode != 206 { - err, _ := io.ReadAll(resp.Body) _ = resp.Body.Close() - return nil, errors.New(string(err)) + switch resp.StatusCode { + case http.StatusNotFound: + return nil, api.ErrObjectNotFound + default: + return nil, errors.New(http.StatusText(resp.StatusCode)) + } } head, err := parseObjectResponseHeaders(resp.Header) diff --git a/worker/gouging.go b/worker/gouging.go index e8e362040..0345385b4 100644 --- a/worker/gouging.go +++ b/worker/gouging.go @@ -272,7 +272,10 @@ func checkPriceGougingPT(gs api.GougingSettings, cs api.ConsensusState, txnFee t } // check LatestRevisionCost - expect sane value - maxRevisionCost := gs.MaxDownloadPrice.Div64(1 << 40).Mul64(4096) + maxRevisionCost, overflow := gs.MaxRPCPrice.AddWithOverflow(gs.MaxDownloadPrice.Div64(1 << 40).Mul64(2048)) + if overflow { + maxRevisionCost = types.MaxCurrency + } if pt.LatestRevisionCost.Cmp(maxRevisionCost) > 0 { return fmt.Errorf("LatestRevisionCost of %v exceeds maximum cost of %v", pt.LatestRevisionCost, maxRevisionCost) } diff --git a/worker/migrations.go b/worker/migrations.go index 6c25b789f..075642dd5 100644 --- a/worker/migrations.go +++ b/worker/migrations.go @@ -10,7 +10,7 @@ import ( "go.sia.tech/renterd/object" ) -func (w *worker) migrate(ctx context.Context, s *object.Slab, contractSet string, dlContracts, ulContracts []api.ContractMetadata, bh uint64) (int, bool, error) { +func (w *worker) migrate(ctx context.Context, s object.Slab, contractSet string, dlContracts, ulContracts []api.ContractMetadata, bh uint64) (int, bool, error) { // make a map of good hosts goodHosts := make(map[types.PublicKey]map[types.FileContractID]bool) for _, c := range ulContracts { @@ -86,7 +86,7 @@ SHARDS: defer mem.Release() // download the slab - shards, surchargeApplied, err := w.downloadManager.DownloadSlab(ctx, *s, dlContracts) + shards, surchargeApplied, err := w.downloadManager.DownloadSlab(ctx, s, dlContracts) if err != nil { return 0, false, fmt.Errorf("failed to download slab for migration: %w", err) } diff --git a/worker/mocks_test.go b/worker/mocks_test.go index c24b67df3..0e45b80df 100644 --- a/worker/mocks_test.go +++ b/worker/mocks_test.go @@ -390,6 +390,21 @@ func (os *objectStoreMock) TrackUpload(ctx context.Context, uID api.UploadID) er func (os *objectStoreMock) FinishUpload(ctx context.Context, uID api.UploadID) error { return nil } func (os *objectStoreMock) DeleteHostSector(ctx context.Context, hk types.PublicKey, root types.Hash256) error { + os.mu.Lock() + defer os.mu.Unlock() + + for _, objects := range os.objects { + for _, object := range objects { + for _, slab := range object.Slabs { + for _, shard := range slab.Slab.Shards { + if shard.Root == root { + delete(shard.Contracts, hk) + } + } + } + } + } + return nil } @@ -505,10 +520,33 @@ func (os *objectStoreMock) UpdateSlab(ctx context.Context, s object.Slab, contra os.forEachObject(func(bucket, path string, o object.Object) { for i, slab := range o.Slabs { - if slab.Key.String() == s.Key.String() { - os.objects[bucket][path].Slabs[i].Slab = s - return + if slab.Key.String() != s.Key.String() { + continue + } + // update slab + shards := os.objects[bucket][path].Slabs[i].Slab.Shards + for sI := range shards { + // overwrite latest host + shards[sI].LatestHost = s.Shards[sI].LatestHost + + // merge contracts for each shard + existingContracts := make(map[types.FileContractID]struct{}) + for _, fcids := range shards[sI].Contracts { + for _, fcid := range fcids { + existingContracts[fcid] = struct{}{} + } + } + for hk, fcids := range s.Shards[sI].Contracts { + for _, fcid := range fcids { + if _, exists := existingContracts[fcid]; exists { + continue + } + shards[sI].Contracts[hk] = append(shards[sI].Contracts[hk], fcids...) + } + } } + os.objects[bucket][path].Slabs[i].Slab.Shards = shards + return } }) diff --git a/worker/upload.go b/worker/upload.go index d2e2c1ec3..4e82f533e 100644 --- a/worker/upload.go +++ b/worker/upload.go @@ -603,7 +603,7 @@ func (mgr *uploadManager) UploadPackedSlab(ctx context.Context, rs api.Redundanc return nil } -func (mgr *uploadManager) UploadShards(ctx context.Context, s *object.Slab, shardIndices []int, shards [][]byte, contractSet string, contracts []api.ContractMetadata, bh uint64, lockPriority int, mem Memory) (err error) { +func (mgr *uploadManager) UploadShards(ctx context.Context, s object.Slab, shardIndices []int, shards [][]byte, contractSet string, contracts []api.ContractMetadata, bh uint64, lockPriority int, mem Memory) (err error) { // cancel all in-flight requests when the upload is done ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -641,27 +641,14 @@ func (mgr *uploadManager) UploadShards(ctx context.Context, s *object.Slab, shar // overwrite the shards with the newly uploaded ones for i, si := range shardIndices { s.Shards[si].LatestHost = uploaded[i].LatestHost - - knownContracts := make(map[types.FileContractID]struct{}) - for _, fcids := range s.Shards[si].Contracts { - for _, fcid := range fcids { - knownContracts[fcid] = struct{}{} - } - } + s.Shards[si].Contracts = make(map[types.PublicKey][]types.FileContractID) for hk, fcids := range uploaded[i].Contracts { - for _, fcid := range fcids { - if _, exists := knownContracts[fcid]; !exists { - if s.Shards[si].Contracts == nil { - s.Shards[si].Contracts = make(map[types.PublicKey][]types.FileContractID) - } - s.Shards[si].Contracts[hk] = append(s.Shards[si].Contracts[hk], fcid) - } - } + s.Shards[si].Contracts[hk] = append(s.Shards[si].Contracts[hk], fcids...) } } // update the slab - return mgr.os.UpdateSlab(ctx, *s, contractSet) + return mgr.os.UpdateSlab(ctx, s, contractSet) } func (mgr *uploadManager) candidates(allowed map[types.PublicKey]struct{}) (candidates []*uploader) { diff --git a/worker/upload_test.go b/worker/upload_test.go index 1d441693f..0b6308ffe 100644 --- a/worker/upload_test.go +++ b/worker/upload_test.go @@ -263,6 +263,107 @@ func TestUploadPackedSlab(t *testing.T) { } } +func TestMigrateLostSector(t *testing.T) { + // create test worker + w := newTestWorker(t) + + // add hosts to worker + w.AddHosts(testRedundancySettings.TotalShards * 2) + + // convenience variables + os := w.os + mm := w.ulmm + dl := w.downloadManager + ul := w.uploadManager + + // create test data + data := frand.Bytes(128) + + // create upload params + params := testParameters(t.Name()) + + // upload data + _, _, err := ul.Upload(context.Background(), bytes.NewReader(data), w.Contracts(), params, lockingPriorityUpload) + if err != nil { + t.Fatal(err) + } + + // grab the slab + o, err := os.Object(context.Background(), testBucket, t.Name(), api.GetObjectOptions{}) + if err != nil { + t.Fatal(err) + } else if len(o.Object.Object.Slabs) != 1 { + t.Fatal("expected 1 slab") + } + slab := o.Object.Object.Slabs[0] + + // build usedHosts hosts + usedHosts := make(map[types.PublicKey]struct{}) + for _, shard := range slab.Shards { + usedHosts[shard.LatestHost] = struct{}{} + } + + // assume the host of the first shard lost its sector + badHost := slab.Shards[0].LatestHost + badContract := slab.Shards[0].Contracts[badHost][0] + err = os.DeleteHostSector(context.Background(), badHost, slab.Shards[0].Root) + if err != nil { + t.Fatal(err) + } + + // download the slab + shards, _, err := dl.DownloadSlab(context.Background(), slab.Slab, w.Contracts()) + if err != nil { + t.Fatal(err) + } + + // encrypt the shards + o.Object.Object.Slabs[0].Slab.Encrypt(shards) + + // filter it down to the shards we need to migrate + shards = shards[:1] + + // recreate upload contracts + contracts := make([]api.ContractMetadata, 0) + for _, c := range w.Contracts() { + _, used := usedHosts[c.HostKey] + if !used && c.HostKey != badHost { + contracts = append(contracts, c) + } + } + + // migrate the shard away from the bad host + mem := mm.AcquireMemory(context.Background(), rhpv2.SectorSize) + err = ul.UploadShards(context.Background(), o.Object.Object.Slabs[0].Slab, []int{0}, shards, testContractSet, contracts, 0, lockingPriorityUpload, mem) + if err != nil { + t.Fatal(err) + } + + // re-grab the slab + o, err = os.Object(context.Background(), testBucket, t.Name(), api.GetObjectOptions{}) + if err != nil { + t.Fatal(err) + } else if len(o.Object.Object.Slabs) != 1 { + t.Fatal("expected 1 slab") + } + slab = o.Object.Object.Slabs[0] + + // assert the bad shard is on a good host now + shard := slab.Shards[0] + if shard.LatestHost == badHost { + t.Fatal("latest host is bad") + } else if len(shard.Contracts) != 1 { + t.Fatal("expected 1 contract") + } + for _, fcids := range shard.Contracts { + for _, fcid := range fcids { + if fcid == badContract { + t.Fatal("contract belongs to bad host") + } + } + } +} + func TestUploadShards(t *testing.T) { // create test worker w := newTestWorker(t) @@ -340,7 +441,7 @@ func TestUploadShards(t *testing.T) { // migrate those shards away from bad hosts mem := mm.AcquireMemory(context.Background(), uint64(len(badIndices))*rhpv2.SectorSize) - err = ul.UploadShards(context.Background(), &o.Object.Object.Slabs[0].Slab, badIndices, shards, testContractSet, contracts, 0, lockingPriorityUpload, mem) + err = ul.UploadShards(context.Background(), o.Object.Object.Slabs[0].Slab, badIndices, shards, testContractSet, contracts, 0, lockingPriorityUpload, mem) if err != nil { t.Fatal(err) } @@ -355,7 +456,12 @@ func TestUploadShards(t *testing.T) { slab = o.Object.Object.Slabs[0] // assert none of the shards are on bad hosts - for _, shard := range slab.Shards { + for i, shard := range slab.Shards { + if i%2 == 0 && len(shard.Contracts) != 1 { + t.Fatalf("expected 1 contract, got %v", len(shard.Contracts)) + } else if i%2 != 0 && len(shard.Contracts) != 2 { + t.Fatalf("expected 2 contracts, got %v", len(shard.Contracts)) + } if _, bad := badHosts[shard.LatestHost]; bad { t.Fatal("shard is on bad host", shard.LatestHost) } diff --git a/worker/worker.go b/worker/worker.go index c78be49ea..89cc2f31e 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -783,7 +783,7 @@ func (w *worker) slabMigrateHandler(jc jape.Context) { } // migrate the slab - numShardsMigrated, surchargeApplied, err := w.migrate(ctx, &slab, up.ContractSet, dlContracts, ulContracts, up.CurrentHeight) + numShardsMigrated, surchargeApplied, err := w.migrate(ctx, slab, up.ContractSet, dlContracts, ulContracts, up.CurrentHeight) if err != nil { jc.Encode(api.MigrateSlabResponse{ NumShardsMigrated: numShardsMigrated, @@ -1435,26 +1435,31 @@ func (w *worker) scanHost(ctx context.Context, timeout time.Duration, hostKey ty logger := w.logger.With("host", hostKey).With("hostIP", hostIP).With("timeout", timeout) // prepare a helper for scanning scan := func() (rhpv2.HostSettings, rhpv3.HostPriceTable, time.Duration, error) { - // apply timeout - scanCtx := ctx - var cancel context.CancelFunc - if timeout > 0 { - scanCtx, cancel = context.WithTimeout(scanCtx, timeout) - defer cancel() - } - // resolve hostIP. We don't want to scan hosts on private networks. - if !w.allowPrivateIPs { - host, _, err := net.SplitHostPort(hostIP) - if err != nil { - return rhpv2.HostSettings{}, rhpv3.HostPriceTable{}, 0, err + // helper to prepare a context for scanning + withTimeoutCtx := func() (context.Context, context.CancelFunc) { + if timeout > 0 { + return context.WithTimeout(ctx, timeout) } - addrs, err := (&net.Resolver{}).LookupIPAddr(scanCtx, host) - if err != nil { - return rhpv2.HostSettings{}, rhpv3.HostPriceTable{}, 0, err - } - for _, addr := range addrs { - if isPrivateIP(addr.IP) { - return rhpv2.HostSettings{}, rhpv3.HostPriceTable{}, 0, api.ErrHostOnPrivateNetwork + return ctx, func() {} + } + // resolve the address + { + scanCtx, cancel := withTimeoutCtx() + defer cancel() + // resolve hostIP. We don't want to scan hosts on private networks. + if !w.allowPrivateIPs { + host, _, err := net.SplitHostPort(hostIP) + if err != nil { + return rhpv2.HostSettings{}, rhpv3.HostPriceTable{}, 0, err + } + addrs, err := (&net.Resolver{}).LookupIPAddr(scanCtx, host) + if err != nil { + return rhpv2.HostSettings{}, rhpv3.HostPriceTable{}, 0, err + } + for _, addr := range addrs { + if isPrivateIP(addr.IP) { + return rhpv2.HostSettings{}, rhpv3.HostPriceTable{}, 0, api.ErrHostOnPrivateNetwork + } } } } @@ -1462,37 +1467,49 @@ func (w *worker) scanHost(ctx context.Context, timeout time.Duration, hostKey ty // fetch the host settings start := time.Now() var settings rhpv2.HostSettings - err := w.withTransportV2(scanCtx, hostKey, hostIP, func(t *rhpv2.Transport) error { - var err error - if settings, err = RPCSettings(scanCtx, t); err != nil { - return fmt.Errorf("failed to fetch host settings: %w", err) + { + scanCtx, cancel := withTimeoutCtx() + defer cancel() + err := w.withTransportV2(scanCtx, hostKey, hostIP, func(t *rhpv2.Transport) error { + var err error + if settings, err = RPCSettings(scanCtx, t); err != nil { + return fmt.Errorf("failed to fetch host settings: %w", err) + } + // NOTE: we overwrite the NetAddress with the host address here + // since we just used it to dial the host we know it's valid + settings.NetAddress = hostIP + return nil + }) + if err != nil { + return settings, rhpv3.HostPriceTable{}, time.Since(start), err } - // NOTE: we overwrite the NetAddress with the host address here - // since we just used it to dial the host we know it's valid - settings.NetAddress = hostIP - return nil - }) - elapsed := time.Since(start) - if err != nil { - return settings, rhpv3.HostPriceTable{}, elapsed, err } // fetch the host pricetable var pt rhpv3.HostPriceTable - err = w.transportPoolV3.withTransportV3(scanCtx, hostKey, settings.SiamuxAddr(), func(ctx context.Context, t *transportV3) error { - if hpt, err := RPCPriceTable(ctx, t, func(pt rhpv3.HostPriceTable) (rhpv3.PaymentMethod, error) { return nil, nil }); err != nil { - return fmt.Errorf("failed to fetch host price table: %w", err) - } else { - pt = hpt.HostPriceTable - return nil + { + scanCtx, cancel := withTimeoutCtx() + defer cancel() + err := w.transportPoolV3.withTransportV3(scanCtx, hostKey, settings.SiamuxAddr(), func(ctx context.Context, t *transportV3) error { + if hpt, err := RPCPriceTable(ctx, t, func(pt rhpv3.HostPriceTable) (rhpv3.PaymentMethod, error) { return nil, nil }); err != nil { + return fmt.Errorf("failed to fetch host price table: %w", err) + } else { + pt = hpt.HostPriceTable + return nil + } + }) + if err != nil { + return settings, rhpv3.HostPriceTable{}, time.Since(start), err } - }) - return settings, pt, elapsed, err + } + return settings, pt, time.Since(start), nil } // scan: first try settings, pt, duration, err := scan() if err != nil { + logger = logger.With(zap.Error(err)) + // scan: second try select { case <-ctx.Done(): @@ -1501,11 +1518,11 @@ func (w *worker) scanHost(ctx context.Context, timeout time.Duration, hostKey ty } settings, pt, duration, err = scan() - logger = logger.With("elapsed", duration) + logger = logger.With("elapsed", duration).With(zap.Error(err)) if err == nil { logger.Info("successfully scanned host on second try") } else if !isErrHostUnreachable(err) { - logger.Infow("failed to scan host", zap.Error(err)) + logger.Infow("failed to scan host") } }