From 1cb3ac195455b9c561c8a71538d7952f0ffaa634 Mon Sep 17 00:00:00 2001 From: Chris Schinnerl Date: Fri, 22 Mar 2024 09:40:45 +0100 Subject: [PATCH 01/10] client: fix HeadObject error response --- internal/test/e2e/s3_test.go | 6 ++++++ worker/client/client.go | 8 ++++++-- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/internal/test/e2e/s3_test.go b/internal/test/e2e/s3_test.go index 6c13e8426..daaefed5e 100644 --- a/internal/test/e2e/s3_test.go +++ b/internal/test/e2e/s3_test.go @@ -113,6 +113,12 @@ func TestS3Basic(t *testing.T) { t.Fatal("unexpected ETag:", info.ETag) } + // stat object that doesn't exist + _, err = s3.StatObject(context.Background(), bucket, "nonexistent", minio.StatObjectOptions{}) + if err == nil || !strings.Contains(err.Error(), "The specified key does not exist") { + t.Fatal(err) + } + // add another bucket tt.OK(s3.MakeBucket(context.Background(), bucket+"2", minio.MakeBucketOptions{})) diff --git a/worker/client/client.go b/worker/client/client.go index d658ac027..fe284469f 100644 --- a/worker/client/client.go +++ b/worker/client/client.go @@ -100,9 +100,13 @@ func (c *Client) HeadObject(ctx context.Context, bucket, path string, opts api.H return nil, err } if resp.StatusCode != 200 && resp.StatusCode != 206 { - err, _ := io.ReadAll(resp.Body) _ = resp.Body.Close() - return nil, errors.New(string(err)) + switch resp.StatusCode { + case http.StatusNotFound: + return nil, api.ErrObjectNotFound + default: + return nil, errors.New(http.StatusText(resp.StatusCode)) + } } head, err := parseObjectResponseHeaders(resp.Header) From e317b0fdb5fb26bb3d62ec94e93a122f903b9451 Mon Sep 17 00:00:00 2001 From: Chris Schinnerl Date: Mon, 25 Mar 2024 10:18:30 +0100 Subject: [PATCH 02/10] worker: when migrating a slab, only set contracts for new shards --- worker/migrations.go | 4 ++-- worker/upload.go | 21 ++++----------------- worker/upload_test.go | 2 +- worker/worker.go | 2 +- 4 files changed, 8 insertions(+), 21 deletions(-) diff --git a/worker/migrations.go b/worker/migrations.go index 6c25b789f..075642dd5 100644 --- a/worker/migrations.go +++ b/worker/migrations.go @@ -10,7 +10,7 @@ import ( "go.sia.tech/renterd/object" ) -func (w *worker) migrate(ctx context.Context, s *object.Slab, contractSet string, dlContracts, ulContracts []api.ContractMetadata, bh uint64) (int, bool, error) { +func (w *worker) migrate(ctx context.Context, s object.Slab, contractSet string, dlContracts, ulContracts []api.ContractMetadata, bh uint64) (int, bool, error) { // make a map of good hosts goodHosts := make(map[types.PublicKey]map[types.FileContractID]bool) for _, c := range ulContracts { @@ -86,7 +86,7 @@ SHARDS: defer mem.Release() // download the slab - shards, surchargeApplied, err := w.downloadManager.DownloadSlab(ctx, *s, dlContracts) + shards, surchargeApplied, err := w.downloadManager.DownloadSlab(ctx, s, dlContracts) if err != nil { return 0, false, fmt.Errorf("failed to download slab for migration: %w", err) } diff --git a/worker/upload.go b/worker/upload.go index d146b920e..bc419d703 100644 --- a/worker/upload.go +++ b/worker/upload.go @@ -604,7 +604,7 @@ func (mgr *uploadManager) UploadPackedSlab(ctx context.Context, rs api.Redundanc return nil } -func (mgr *uploadManager) UploadShards(ctx context.Context, s *object.Slab, shardIndices []int, shards [][]byte, contractSet string, contracts []api.ContractMetadata, bh uint64, lockPriority int, mem Memory) (err error) { +func (mgr *uploadManager) UploadShards(ctx context.Context, s object.Slab, shardIndices []int, shards [][]byte, contractSet string, contracts []api.ContractMetadata, bh uint64, lockPriority int, mem Memory) (err error) { // cancel all in-flight requests when the upload is done ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -642,27 +642,14 @@ func (mgr *uploadManager) UploadShards(ctx context.Context, s *object.Slab, shar // overwrite the shards with the newly uploaded ones for i, si := range shardIndices { s.Shards[si].LatestHost = uploaded[i].LatestHost - - knownContracts := make(map[types.FileContractID]struct{}) - for _, fcids := range s.Shards[si].Contracts { - for _, fcid := range fcids { - knownContracts[fcid] = struct{}{} - } - } + s.Shards[si].Contracts = make(map[types.PublicKey][]types.FileContractID) for hk, fcids := range uploaded[i].Contracts { - for _, fcid := range fcids { - if _, exists := knownContracts[fcid]; !exists { - if s.Shards[si].Contracts == nil { - s.Shards[si].Contracts = make(map[types.PublicKey][]types.FileContractID) - } - s.Shards[si].Contracts[hk] = append(s.Shards[si].Contracts[hk], fcid) - } - } + s.Shards[si].Contracts[hk] = append(s.Shards[si].Contracts[hk], fcids...) } } // update the slab - return mgr.os.UpdateSlab(ctx, *s, contractSet) + return mgr.os.UpdateSlab(ctx, s, contractSet) } func (mgr *uploadManager) candidates(allowed map[types.PublicKey]struct{}) (candidates []*uploader) { diff --git a/worker/upload_test.go b/worker/upload_test.go index 1d441693f..cb4a7ee7b 100644 --- a/worker/upload_test.go +++ b/worker/upload_test.go @@ -340,7 +340,7 @@ func TestUploadShards(t *testing.T) { // migrate those shards away from bad hosts mem := mm.AcquireMemory(context.Background(), uint64(len(badIndices))*rhpv2.SectorSize) - err = ul.UploadShards(context.Background(), &o.Object.Object.Slabs[0].Slab, badIndices, shards, testContractSet, contracts, 0, lockingPriorityUpload, mem) + err = ul.UploadShards(context.Background(), o.Object.Object.Slabs[0].Slab, badIndices, shards, testContractSet, contracts, 0, lockingPriorityUpload, mem) if err != nil { t.Fatal(err) } diff --git a/worker/worker.go b/worker/worker.go index 89fe37a14..c6ce7a67a 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -784,7 +784,7 @@ func (w *worker) slabMigrateHandler(jc jape.Context) { } // migrate the slab - numShardsMigrated, surchargeApplied, err := w.migrate(ctx, &slab, up.ContractSet, dlContracts, ulContracts, up.CurrentHeight) + numShardsMigrated, surchargeApplied, err := w.migrate(ctx, slab, up.ContractSet, dlContracts, ulContracts, up.CurrentHeight) if err != nil { jc.Encode(api.MigrateSlabResponse{ NumShardsMigrated: numShardsMigrated, From f0eef4fd331397f5ba801636f17041a23c0ac41a Mon Sep 17 00:00:00 2001 From: Chris Schinnerl Date: Mon, 25 Mar 2024 12:20:27 +0100 Subject: [PATCH 03/10] worker: add TestMigrateLostSector --- worker/mocks_test.go | 44 +++++++++++++++-- worker/upload_test.go | 108 +++++++++++++++++++++++++++++++++++++++++- 2 files changed, 148 insertions(+), 4 deletions(-) diff --git a/worker/mocks_test.go b/worker/mocks_test.go index 7b3609c0b..b8257e95c 100644 --- a/worker/mocks_test.go +++ b/worker/mocks_test.go @@ -388,6 +388,21 @@ func (os *objectStoreMock) TrackUpload(ctx context.Context, uID api.UploadID) er func (os *objectStoreMock) FinishUpload(ctx context.Context, uID api.UploadID) error { return nil } func (os *objectStoreMock) DeleteHostSector(ctx context.Context, hk types.PublicKey, root types.Hash256) error { + os.mu.Lock() + defer os.mu.Unlock() + + for _, objects := range os.objects { + for _, object := range objects { + for _, slab := range object.Slabs { + for _, shard := range slab.Slab.Shards { + if shard.Root == root { + delete(shard.Contracts, hk) + } + } + } + } + } + return nil } @@ -503,10 +518,33 @@ func (os *objectStoreMock) UpdateSlab(ctx context.Context, s object.Slab, contra os.forEachObject(func(bucket, path string, o object.Object) { for i, slab := range o.Slabs { - if slab.Key.String() == s.Key.String() { - os.objects[bucket][path].Slabs[i].Slab = s - return + if slab.Key.String() != s.Key.String() { + continue + } + // update slab + shards := os.objects[bucket][path].Slabs[i].Slab.Shards + for sI := range shards { + // overwrite latest host + shards[sI].LatestHost = s.Shards[sI].LatestHost + + // merge contracts for each shard + existingContracts := make(map[types.FileContractID]struct{}) + for _, fcids := range shards[sI].Contracts { + for _, fcid := range fcids { + existingContracts[fcid] = struct{}{} + } + } + for hk, fcids := range s.Shards[sI].Contracts { + for _, fcid := range fcids { + if _, exists := existingContracts[fcid]; exists { + continue + } + shards[sI].Contracts[hk] = append(shards[sI].Contracts[hk], fcids...) + } + } } + os.objects[bucket][path].Slabs[i].Slab.Shards = shards + return } }) diff --git a/worker/upload_test.go b/worker/upload_test.go index cb4a7ee7b..0b6308ffe 100644 --- a/worker/upload_test.go +++ b/worker/upload_test.go @@ -263,6 +263,107 @@ func TestUploadPackedSlab(t *testing.T) { } } +func TestMigrateLostSector(t *testing.T) { + // create test worker + w := newTestWorker(t) + + // add hosts to worker + w.AddHosts(testRedundancySettings.TotalShards * 2) + + // convenience variables + os := w.os + mm := w.ulmm + dl := w.downloadManager + ul := w.uploadManager + + // create test data + data := frand.Bytes(128) + + // create upload params + params := testParameters(t.Name()) + + // upload data + _, _, err := ul.Upload(context.Background(), bytes.NewReader(data), w.Contracts(), params, lockingPriorityUpload) + if err != nil { + t.Fatal(err) + } + + // grab the slab + o, err := os.Object(context.Background(), testBucket, t.Name(), api.GetObjectOptions{}) + if err != nil { + t.Fatal(err) + } else if len(o.Object.Object.Slabs) != 1 { + t.Fatal("expected 1 slab") + } + slab := o.Object.Object.Slabs[0] + + // build usedHosts hosts + usedHosts := make(map[types.PublicKey]struct{}) + for _, shard := range slab.Shards { + usedHosts[shard.LatestHost] = struct{}{} + } + + // assume the host of the first shard lost its sector + badHost := slab.Shards[0].LatestHost + badContract := slab.Shards[0].Contracts[badHost][0] + err = os.DeleteHostSector(context.Background(), badHost, slab.Shards[0].Root) + if err != nil { + t.Fatal(err) + } + + // download the slab + shards, _, err := dl.DownloadSlab(context.Background(), slab.Slab, w.Contracts()) + if err != nil { + t.Fatal(err) + } + + // encrypt the shards + o.Object.Object.Slabs[0].Slab.Encrypt(shards) + + // filter it down to the shards we need to migrate + shards = shards[:1] + + // recreate upload contracts + contracts := make([]api.ContractMetadata, 0) + for _, c := range w.Contracts() { + _, used := usedHosts[c.HostKey] + if !used && c.HostKey != badHost { + contracts = append(contracts, c) + } + } + + // migrate the shard away from the bad host + mem := mm.AcquireMemory(context.Background(), rhpv2.SectorSize) + err = ul.UploadShards(context.Background(), o.Object.Object.Slabs[0].Slab, []int{0}, shards, testContractSet, contracts, 0, lockingPriorityUpload, mem) + if err != nil { + t.Fatal(err) + } + + // re-grab the slab + o, err = os.Object(context.Background(), testBucket, t.Name(), api.GetObjectOptions{}) + if err != nil { + t.Fatal(err) + } else if len(o.Object.Object.Slabs) != 1 { + t.Fatal("expected 1 slab") + } + slab = o.Object.Object.Slabs[0] + + // assert the bad shard is on a good host now + shard := slab.Shards[0] + if shard.LatestHost == badHost { + t.Fatal("latest host is bad") + } else if len(shard.Contracts) != 1 { + t.Fatal("expected 1 contract") + } + for _, fcids := range shard.Contracts { + for _, fcid := range fcids { + if fcid == badContract { + t.Fatal("contract belongs to bad host") + } + } + } +} + func TestUploadShards(t *testing.T) { // create test worker w := newTestWorker(t) @@ -355,7 +456,12 @@ func TestUploadShards(t *testing.T) { slab = o.Object.Object.Slabs[0] // assert none of the shards are on bad hosts - for _, shard := range slab.Shards { + for i, shard := range slab.Shards { + if i%2 == 0 && len(shard.Contracts) != 1 { + t.Fatalf("expected 1 contract, got %v", len(shard.Contracts)) + } else if i%2 != 0 && len(shard.Contracts) != 2 { + t.Fatalf("expected 2 contracts, got %v", len(shard.Contracts)) + } if _, bad := badHosts[shard.LatestHost]; bad { t.Fatal("shard is on bad host", shard.LatestHost) } From 80eca756f0f280ae4c777a635266495d881ec35d Mon Sep 17 00:00:00 2001 From: Chris Schinnerl Date: Mon, 25 Mar 2024 15:46:53 +0100 Subject: [PATCH 04/10] worker: update logging in scanHost and apply timeout to each step of scanning --- worker/worker.go | 99 ++++++++++++++++++++++++++++-------------------- 1 file changed, 58 insertions(+), 41 deletions(-) diff --git a/worker/worker.go b/worker/worker.go index d0de33f71..a39fc608e 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -1436,26 +1436,31 @@ func (w *worker) scanHost(ctx context.Context, timeout time.Duration, hostKey ty logger := w.logger.With("host", hostKey).With("hostIP", hostIP).With("timeout", timeout) // prepare a helper for scanning scan := func() (rhpv2.HostSettings, rhpv3.HostPriceTable, time.Duration, error) { - // apply timeout - scanCtx := ctx - var cancel context.CancelFunc - if timeout > 0 { - scanCtx, cancel = context.WithTimeout(scanCtx, timeout) - defer cancel() - } - // resolve hostIP. We don't want to scan hosts on private networks. - if !w.allowPrivateIPs { - host, _, err := net.SplitHostPort(hostIP) - if err != nil { - return rhpv2.HostSettings{}, rhpv3.HostPriceTable{}, 0, err + // helper to prepare a context for scanning + withTimeoutCtx := func() (context.Context, context.CancelFunc) { + if timeout > 0 { + return context.WithTimeout(ctx, timeout) } - addrs, err := (&net.Resolver{}).LookupIPAddr(scanCtx, host) - if err != nil { - return rhpv2.HostSettings{}, rhpv3.HostPriceTable{}, 0, err - } - for _, addr := range addrs { - if isPrivateIP(addr.IP) { - return rhpv2.HostSettings{}, rhpv3.HostPriceTable{}, 0, api.ErrHostOnPrivateNetwork + return ctx, func() {} + } + // resolve the address + { + scanCtx, cancel := withTimeoutCtx() + defer cancel() + // resolve hostIP. We don't want to scan hosts on private networks. + if !w.allowPrivateIPs { + host, _, err := net.SplitHostPort(hostIP) + if err != nil { + return rhpv2.HostSettings{}, rhpv3.HostPriceTable{}, 0, err + } + addrs, err := (&net.Resolver{}).LookupIPAddr(scanCtx, host) + if err != nil { + return rhpv2.HostSettings{}, rhpv3.HostPriceTable{}, 0, err + } + for _, addr := range addrs { + if isPrivateIP(addr.IP) { + return rhpv2.HostSettings{}, rhpv3.HostPriceTable{}, 0, api.ErrHostOnPrivateNetwork + } } } } @@ -1463,37 +1468,49 @@ func (w *worker) scanHost(ctx context.Context, timeout time.Duration, hostKey ty // fetch the host settings start := time.Now() var settings rhpv2.HostSettings - err := w.withTransportV2(scanCtx, hostKey, hostIP, func(t *rhpv2.Transport) error { - var err error - if settings, err = RPCSettings(scanCtx, t); err != nil { - return fmt.Errorf("failed to fetch host settings: %w", err) + { + scanCtx, cancel := withTimeoutCtx() + defer cancel() + err := w.withTransportV2(scanCtx, hostKey, hostIP, func(t *rhpv2.Transport) error { + var err error + if settings, err = RPCSettings(scanCtx, t); err != nil { + return fmt.Errorf("failed to fetch host settings: %w", err) + } + // NOTE: we overwrite the NetAddress with the host address here + // since we just used it to dial the host we know it's valid + settings.NetAddress = hostIP + return nil + }) + if err != nil { + return settings, rhpv3.HostPriceTable{}, time.Since(start), err } - // NOTE: we overwrite the NetAddress with the host address here - // since we just used it to dial the host we know it's valid - settings.NetAddress = hostIP - return nil - }) - elapsed := time.Since(start) - if err != nil { - return settings, rhpv3.HostPriceTable{}, elapsed, err } // fetch the host pricetable var pt rhpv3.HostPriceTable - err = w.transportPoolV3.withTransportV3(scanCtx, hostKey, settings.SiamuxAddr(), func(ctx context.Context, t *transportV3) error { - if hpt, err := RPCPriceTable(ctx, t, func(pt rhpv3.HostPriceTable) (rhpv3.PaymentMethod, error) { return nil, nil }); err != nil { - return fmt.Errorf("failed to fetch host price table: %w", err) - } else { - pt = hpt.HostPriceTable - return nil + { + scanCtx, cancel := withTimeoutCtx() + defer cancel() + err := w.transportPoolV3.withTransportV3(scanCtx, hostKey, settings.SiamuxAddr(), func(ctx context.Context, t *transportV3) error { + if hpt, err := RPCPriceTable(ctx, t, func(pt rhpv3.HostPriceTable) (rhpv3.PaymentMethod, error) { return nil, nil }); err != nil { + return fmt.Errorf("failed to fetch host price table: %w", err) + } else { + pt = hpt.HostPriceTable + return nil + } + }) + if err != nil { + return settings, rhpv3.HostPriceTable{}, time.Since(start), err } - }) - return settings, pt, elapsed, err + } + return settings, pt, time.Since(start), nil } // scan: first try settings, pt, duration, err := scan() if err != nil { + logger = logger.With(zap.Error(err)) + // scan: second try select { case <-ctx.Done(): @@ -1502,11 +1519,11 @@ func (w *worker) scanHost(ctx context.Context, timeout time.Duration, hostKey ty } settings, pt, duration, err = scan() - logger = logger.With("elapsed", duration) + logger = logger.With("elapsed", duration).With(zap.Error(err)) if err == nil { logger.Info("successfully scanned host on second try") } else if !isErrHostUnreachable(err) { - logger.Infow("failed to scan host", zap.Error(err)) + logger.Infow("failed to scan host") } } From b30999c07087e1fab4611915e93e63a650499ddd Mon Sep 17 00:00:00 2001 From: Chris Schinnerl Date: Mon, 25 Mar 2024 17:20:33 +0100 Subject: [PATCH 05/10] contractor: add forgiveness period for failed refreshes --- autopilot/contractor.go | 33 +++++++++++++++++++++++++++++++++ autopilot/contractor_test.go | 31 +++++++++++++++++++++++++++++++ autopilot/hostfilter.go | 4 ++-- 3 files changed, 66 insertions(+), 2 deletions(-) diff --git a/autopilot/contractor.go b/autopilot/contractor.go index 7b2ea9863..82ea4e619 100644 --- a/autopilot/contractor.go +++ b/autopilot/contractor.go @@ -33,6 +33,10 @@ const ( // contract. estimatedFileContractTransactionSetSize = 2048 + // failedRenewalForgivenessPeriod is the amount of time we wait before + // punishing a contract for not being able to refresh + failedRefreshForgivenessPeriod = 24 * time.Hour + // leewayPctCandidateHosts is the leeway we apply when fetching candidate // hosts, we fetch ~10% more than required leewayPctCandidateHosts = 1.1 @@ -96,6 +100,8 @@ type ( revisionLastBroadcast map[types.FileContractID]time.Time revisionSubmissionBuffer uint64 + firstRefreshFailure map[types.FileContractID]time.Time + mu sync.Mutex pruning bool @@ -162,6 +168,8 @@ func newContractor(ap *Autopilot, revisionSubmissionBuffer uint64, revisionBroad revisionLastBroadcast: make(map[types.FileContractID]time.Time), revisionSubmissionBuffer: revisionSubmissionBuffer, + firstRefreshFailure: make(map[types.FileContractID]time.Time), + resolver: newIPResolver(ap.shutdownCtx, resolverLookupTimeout, ap.logger.Named("resolver")), } } @@ -226,6 +234,9 @@ func (c *contractor) performContractMaintenance(ctx context.Context, w Worker) ( contracts := resp.Contracts c.logger.Infof("fetched %d contracts from the worker, took %v", len(resp.Contracts), time.Since(start)) + // prune contract refresh failure map + c.pruneContractRefreshFailures(contracts) + // run revision broadcast c.runRevisionBroadcast(ctx, w, contracts, isInCurrentSet) @@ -1624,6 +1635,28 @@ func (c *contractor) hostForContract(ctx context.Context, fcid types.FileContrac return } +func (c *contractor) pruneContractRefreshFailures(contracts []api.Contract) { + contractMap := make(map[types.FileContractID]struct{}) + for _, contract := range contracts { + contractMap[contract.ID] = struct{}{} + } + for fcid := range c.firstRefreshFailure { + if _, ok := contractMap[fcid]; !ok { + delete(c.firstRefreshFailure, fcid) + } + } +} + +func (c *contractor) shouldForgiveFailedRefresh(fcid types.FileContractID) bool { + lastFailure, exists := c.firstRefreshFailure[fcid] + if !exists { + lastFailure = time.Now() + c.firstRefreshFailure[fcid] = lastFailure + } + fmt.Println(time.Since(lastFailure)) + return time.Since(lastFailure) < failedRefreshForgivenessPeriod +} + func addLeeway(n uint64, pct float64) uint64 { if pct < 0 { panic("given leeway percent has to be positive") diff --git a/autopilot/contractor_test.go b/autopilot/contractor_test.go index 575605612..9ce54daf5 100644 --- a/autopilot/contractor_test.go +++ b/autopilot/contractor_test.go @@ -3,8 +3,12 @@ package autopilot import ( "math" "testing" + "time" + "go.sia.tech/core/types" + "go.sia.tech/renterd/api" "go.uber.org/zap" + "lukechampine.com/frand" ) func TestCalculateMinScore(t *testing.T) { @@ -35,3 +39,30 @@ func TestCalculateMinScore(t *testing.T) { t.Fatalf("expected minScore to be math.SmallestNonzeroFLoat64 but was %v", minScore) } } + +func TestShouldForgiveFailedRenewal(t *testing.T) { + var fcid types.FileContractID + frand.Read(fcid[:]) + c := &contractor{ + firstRefreshFailure: make(map[types.FileContractID]time.Time), + } + + // try twice since the first time will set the failure time + if !c.shouldForgiveFailedRefresh(fcid) { + t.Fatal("should forgive") + } else if !c.shouldForgiveFailedRefresh(fcid) { + t.Fatal("should forgive") + } + + // set failure to be a full period in the past + c.firstRefreshFailure[fcid] = time.Now().Add(-failedRefreshForgivenessPeriod - time.Second) + if c.shouldForgiveFailedRefresh(fcid) { + t.Fatal("should not forgive") + } + + // prune map + c.pruneContractRefreshFailures([]api.Contract{}) + if len(c.firstRefreshFailure) != 0 { + t.Fatal("expected no failures") + } +} diff --git a/autopilot/hostfilter.go b/autopilot/hostfilter.go index f41a20c94..d64c1f3e3 100644 --- a/autopilot/hostfilter.go +++ b/autopilot/hostfilter.go @@ -254,8 +254,8 @@ func (c *contractor) isUsableContract(cfg api.AutopilotConfig, state state, ci c } if isOutOfFunds(cfg, pt, contract) { reasons = append(reasons, errContractOutOfFunds.Error()) - usable = false - recoverable = true + usable = usable && c.shouldForgiveFailedRefresh(contract.ID) + recoverable = !usable // only needs to be recoverable if !usable refresh = true renew = false } From 73f0640f2d5edaa0e043c95f57f37c9bf0278efd Mon Sep 17 00:00:00 2001 From: Nate Maninger Date: Mon, 25 Mar 2024 17:10:59 -0700 Subject: [PATCH 06/10] ci: release nightlies on linux --- .github/workflows/publish.yml | 56 ++++++++++++++++++++++++++++------- 1 file changed, 45 insertions(+), 11 deletions(-) diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml index 824f69231..8b11e8b62 100644 --- a/.github/workflows/publish.yml +++ b/.github/workflows/publish.yml @@ -1,13 +1,13 @@ name: Publish -# Controls when the action will run. +# Controls when the action will run. on: # Triggers the workflow on new SemVer tags push: branches: - master - dev - tags: + tags: - 'v[0-9]+.[0-9]+.[0-9]+' - 'v[0-9]+.[0-9]+.[0-9]+-**' @@ -116,7 +116,7 @@ jobs: with: name: renterd path: release/ - build-mac: + build-mac: runs-on: macos-latest strategy: matrix: @@ -212,7 +212,7 @@ jobs: with: name: renterd path: release/ - build-windows: + build-windows: runs-on: windows-latest strategy: matrix: @@ -253,23 +253,21 @@ jobs: with: name: renterd path: release/ - dispatch: + + dispatch-homebrew: # only runs on full releases if: startsWith(github.ref, 'refs/tags/v') && !contains(github.ref, '-') needs: [docker, build-linux, build-mac, build-windows] - strategy: - matrix: - repo: ['siafoundation/homebrew-sia', 'siafoundation/linux'] runs-on: ubuntu-latest steps: - name: Extract Tag Name id: get_tag run: echo "::set-output name=tag_name::${GITHUB_REF#refs/tags/}" - - name: Repository Dispatch + - name: Dispatch uses: peter-evans/repository-dispatch@v3 with: token: ${{ secrets.PAT_REPOSITORY_DISPATCH }} - repository: ${{ matrix.repo }} + repository: siafoundation/homebrew-sia event-type: release-tagged client-payload: > { @@ -277,4 +275,40 @@ jobs: "tag": "${{ steps.get_tag.outputs.tag_name }}", "project": "renterd", "workflow_id": "${{ github.run_id }}" - } \ No newline at end of file + } + dispatch-linux: # run on full releases, release candidates, and master branch + if: startsWith(github.ref, 'refs/tags/v') || endsWith(github.ref, 'master') + needs: [docker, build-linux, build-mac, build-windows] + runs-on: ubuntu-latest + steps: + - name: Build Dispatch Payload + id: get_payload + uses: actions/github-script@v7 + with: + script: | + const isRelease = context.ref.startsWith('refs/tags/v'), + isBeta = isRelease && context.ref.includes('-beta'), + tag = isRelease ? context.ref.replace('refs/tags/', '') : 'master'; + + let component = 'nightly'; + if (isBeta) { + component = 'beta'; + } else if (isRelease) { + component = 'main'; + } + + return { + description: "renterd: The Next-Gen Sia Renter", + tag: tag, + project: "renterd", + workflow_id: context.runId, + component: component + }; + + - name: Dispatch + uses: peter-evans/repository-dispatch@v3 + with: + token: ${{ secrets.PAT_REPOSITORY_DISPATCH }} + repository: siafoundation/linux + event-type: release-tagged + client-payload: ${{ steps.get_payload.outputs.result }} \ No newline at end of file From bcc4591e707b773eada22824c3dd16a30d04dd8f Mon Sep 17 00:00:00 2001 From: Chris Schinnerl Date: Wed, 27 Mar 2024 11:33:26 +0100 Subject: [PATCH 07/10] autopilot: remove debug logging --- autopilot/contractor.go | 1 - 1 file changed, 1 deletion(-) diff --git a/autopilot/contractor.go b/autopilot/contractor.go index 82ea4e619..49ba304ae 100644 --- a/autopilot/contractor.go +++ b/autopilot/contractor.go @@ -1653,7 +1653,6 @@ func (c *contractor) shouldForgiveFailedRefresh(fcid types.FileContractID) bool lastFailure = time.Now() c.firstRefreshFailure[fcid] = lastFailure } - fmt.Println(time.Since(lastFailure)) return time.Since(lastFailure) < failedRefreshForgivenessPeriod } From b9f6762ea653a6a0cb65cb9a3ac736d28617aaaa Mon Sep 17 00:00:00 2001 From: PJ Date: Tue, 26 Mar 2024 13:07:22 +0100 Subject: [PATCH 08/10] bus: take renewals into account in the sectors cache --- bus/bus.go | 13 ++--- bus/uploadingsectors.go | 97 +++++++++++++++++++++--------------- bus/uploadingsectors_test.go | 82 ++++++++++++++++++++++++------ 3 files changed, 130 insertions(+), 62 deletions(-) diff --git a/bus/bus.go b/bus/bus.go index d68e46309..8c7c99649 100644 --- a/bus/bus.go +++ b/bus/bus.go @@ -1017,7 +1017,7 @@ func (b *bus) contractsPrunableDataHandlerGET(jc jape.Context) { // adjust the amount of prunable data with the pending uploads, due to // how we record contract spending a contract's size might already // include pending sectors - pending := b.uploadingSectors.pending(fcid) + pending := b.uploadingSectors.Pending(fcid) if pending > size.Prunable { size.Prunable = 0 } else { @@ -1064,7 +1064,7 @@ func (b *bus) contractSizeHandlerGET(jc jape.Context) { // adjust the amount of prunable data with the pending uploads, due to how // we record contract spending a contract's size might already include // pending sectors - pending := b.uploadingSectors.pending(id) + pending := b.uploadingSectors.Pending(id) if pending > size.Prunable { size.Prunable = 0 } else { @@ -1141,6 +1141,7 @@ func (b *bus) contractIDRenewedHandlerPOST(jc jape.Context) { if jc.Check("couldn't store contract", err) == nil { jc.Encode(r) } + b.uploadingSectors.HandleRenewal(req.Contract.ID(), req.RenewedFrom) } func (b *bus) contractIDRootsHandlerGET(jc jape.Context) { @@ -1153,7 +1154,7 @@ func (b *bus) contractIDRootsHandlerGET(jc jape.Context) { if jc.Check("couldn't fetch contract sectors", err) == nil { jc.Encode(api.ContractRootsResponse{ Roots: roots, - Uploading: b.uploadingSectors.sectors(id), + Uploading: b.uploadingSectors.Sectors(id), }) } } @@ -1991,7 +1992,7 @@ func (b *bus) stateHandlerGET(jc jape.Context) { func (b *bus) uploadTrackHandlerPOST(jc jape.Context) { var id api.UploadID if jc.DecodeParam("id", &id) == nil { - jc.Check("failed to track upload", b.uploadingSectors.trackUpload(id)) + jc.Check("failed to track upload", b.uploadingSectors.StartUpload(id)) } } @@ -2004,13 +2005,13 @@ func (b *bus) uploadAddSectorHandlerPOST(jc jape.Context) { if jc.Decode(&req) != nil { return } - jc.Check("failed to add sector", b.uploadingSectors.addUploadingSector(id, req.ContractID, req.Root)) + jc.Check("failed to add sector", b.uploadingSectors.AddSector(id, req.ContractID, req.Root)) } func (b *bus) uploadFinishedHandlerDELETE(jc jape.Context) { var id api.UploadID if jc.DecodeParam("id", &id) == nil { - b.uploadingSectors.finishUpload(id) + b.uploadingSectors.FinishUpload(id) } } diff --git a/bus/uploadingsectors.go b/bus/uploadingsectors.go index 6a3917d50..18c64a7c5 100644 --- a/bus/uploadingsectors.go +++ b/bus/uploadingsectors.go @@ -19,12 +19,12 @@ const ( type ( uploadingSectorsCache struct { - mu sync.Mutex - uploads map[api.UploadID]*ongoingUpload + mu sync.Mutex + uploads map[api.UploadID]*ongoingUpload + renewedTo map[types.FileContractID]types.FileContractID } ongoingUpload struct { - mu sync.Mutex started time.Time contractSectors map[types.FileContractID][]types.Hash256 } @@ -32,82 +32,92 @@ type ( func newUploadingSectorsCache() *uploadingSectorsCache { return &uploadingSectorsCache{ - uploads: make(map[api.UploadID]*ongoingUpload), + uploads: make(map[api.UploadID]*ongoingUpload), + renewedTo: make(map[types.FileContractID]types.FileContractID), } } func (ou *ongoingUpload) addSector(fcid types.FileContractID, root types.Hash256) { - ou.mu.Lock() - defer ou.mu.Unlock() ou.contractSectors[fcid] = append(ou.contractSectors[fcid], root) } func (ou *ongoingUpload) sectors(fcid types.FileContractID) (roots []types.Hash256) { - ou.mu.Lock() - defer ou.mu.Unlock() if sectors, exists := ou.contractSectors[fcid]; exists && time.Since(ou.started) < cacheExpiry { roots = append(roots, sectors...) } return } -func (usc *uploadingSectorsCache) addUploadingSector(uID api.UploadID, fcid types.FileContractID, root types.Hash256) error { - // fetch ongoing upload +func (usc *uploadingSectorsCache) AddSector(uID api.UploadID, fcid types.FileContractID, root types.Hash256) error { usc.mu.Lock() - ongoing, exists := usc.uploads[uID] - usc.mu.Unlock() + defer usc.mu.Unlock() - // add sector if upload exists - if exists { - ongoing.addSector(fcid, root) - return nil + ongoing, ok := usc.uploads[uID] + if !ok { + return fmt.Errorf("%w; id '%v'", api.ErrUnknownUpload, uID) } - return fmt.Errorf("%w; id '%v'", api.ErrUnknownUpload, uID) + fcid = usc.latestFCID(fcid) + ongoing.addSector(fcid, root) + return nil } -func (usc *uploadingSectorsCache) pending(fcid types.FileContractID) (size uint64) { +func (usc *uploadingSectorsCache) FinishUpload(uID api.UploadID) { usc.mu.Lock() - var uploads []*ongoingUpload - for _, ongoing := range usc.uploads { - uploads = append(uploads, ongoing) + defer usc.mu.Unlock() + delete(usc.uploads, uID) + + // prune expired uploads + for uID, ongoing := range usc.uploads { + if time.Since(ongoing.started) > cacheExpiry { + delete(usc.uploads, uID) + } } - usc.mu.Unlock() - for _, ongoing := range uploads { - size += uint64(len(ongoing.sectors(fcid))) * rhp.SectorSize + // prune renewed to map + for old, new := range usc.renewedTo { + if _, exists := usc.renewedTo[new]; exists { + delete(usc.renewedTo, old) + } } - return } -func (usc *uploadingSectorsCache) sectors(fcid types.FileContractID) (roots []types.Hash256) { +func (usc *uploadingSectorsCache) HandleRenewal(fcid, renewedFrom types.FileContractID) { usc.mu.Lock() - var uploads []*ongoingUpload - for _, ongoing := range usc.uploads { - uploads = append(uploads, ongoing) + defer usc.mu.Unlock() + + for _, upload := range usc.uploads { + if _, exists := upload.contractSectors[renewedFrom]; exists { + upload.contractSectors[fcid] = upload.contractSectors[renewedFrom] + upload.contractSectors[renewedFrom] = nil + } } - usc.mu.Unlock() + usc.renewedTo[renewedFrom] = fcid +} - for _, ongoing := range uploads { - roots = append(roots, ongoing.sectors(fcid)...) +func (usc *uploadingSectorsCache) Pending(fcid types.FileContractID) (size uint64) { + usc.mu.Lock() + defer usc.mu.Unlock() + + fcid = usc.latestFCID(fcid) + for _, ongoing := range usc.uploads { + size += uint64(len(ongoing.sectors(fcid))) * rhp.SectorSize } return } -func (usc *uploadingSectorsCache) finishUpload(uID api.UploadID) { +func (usc *uploadingSectorsCache) Sectors(fcid types.FileContractID) (roots []types.Hash256) { usc.mu.Lock() defer usc.mu.Unlock() - delete(usc.uploads, uID) - // prune expired uploads - for uID, ongoing := range usc.uploads { - if time.Since(ongoing.started) > cacheExpiry { - delete(usc.uploads, uID) - } + fcid = usc.latestFCID(fcid) + for _, ongoing := range usc.uploads { + roots = append(roots, ongoing.sectors(fcid)...) } + return } -func (usc *uploadingSectorsCache) trackUpload(uID api.UploadID) error { +func (usc *uploadingSectorsCache) StartUpload(uID api.UploadID) error { usc.mu.Lock() defer usc.mu.Unlock() @@ -122,3 +132,10 @@ func (usc *uploadingSectorsCache) trackUpload(uID api.UploadID) error { } return nil } + +func (usc *uploadingSectorsCache) latestFCID(fcid types.FileContractID) types.FileContractID { + if latest, ok := usc.renewedTo[fcid]; ok { + return latest + } + return fcid +} diff --git a/bus/uploadingsectors_test.go b/bus/uploadingsectors_test.go index 244280c70..b1c9b725a 100644 --- a/bus/uploadingsectors_test.go +++ b/bus/uploadingsectors_test.go @@ -4,6 +4,7 @@ import ( "errors" "testing" + rhpv2 "go.sia.tech/core/rhp/v2" "go.sia.tech/core/types" "go.sia.tech/renterd/api" "lukechampine.com/frand" @@ -15,20 +16,24 @@ func TestUploadingSectorsCache(t *testing.T) { uID1 := newTestUploadID() uID2 := newTestUploadID() - c.trackUpload(uID1) - c.trackUpload(uID2) + fcid1 := types.FileContractID{1} + fcid2 := types.FileContractID{2} + fcid3 := types.FileContractID{3} - _ = c.addUploadingSector(uID1, types.FileContractID{1}, types.Hash256{1}) - _ = c.addUploadingSector(uID1, types.FileContractID{2}, types.Hash256{2}) - _ = c.addUploadingSector(uID2, types.FileContractID{2}, types.Hash256{3}) + c.StartUpload(uID1) + c.StartUpload(uID2) - if roots1 := c.sectors(types.FileContractID{1}); len(roots1) != 1 || roots1[0] != (types.Hash256{1}) { + _ = c.AddSector(uID1, fcid1, types.Hash256{1}) + _ = c.AddSector(uID1, fcid2, types.Hash256{2}) + _ = c.AddSector(uID2, fcid2, types.Hash256{3}) + + if roots1 := c.Sectors(fcid1); len(roots1) != 1 || roots1[0] != (types.Hash256{1}) { t.Fatal("unexpected cached sectors") } - if roots2 := c.sectors(types.FileContractID{2}); len(roots2) != 2 { + if roots2 := c.Sectors(fcid2); len(roots2) != 2 { t.Fatal("unexpected cached sectors", roots2) } - if roots3 := c.sectors(types.FileContractID{3}); len(roots3) != 0 { + if roots3 := c.Sectors(fcid3); len(roots3) != 0 { t.Fatal("unexpected cached sectors") } @@ -39,28 +44,73 @@ func TestUploadingSectorsCache(t *testing.T) { t.Fatal("unexpected") } - c.finishUpload(uID1) - if roots1 := c.sectors(types.FileContractID{1}); len(roots1) != 0 { + c.FinishUpload(uID1) + if roots1 := c.Sectors(fcid1); len(roots1) != 0 { t.Fatal("unexpected cached sectors") } - if roots2 := c.sectors(types.FileContractID{2}); len(roots2) != 1 || roots2[0] != (types.Hash256{3}) { + if roots2 := c.Sectors(fcid2); len(roots2) != 1 || roots2[0] != (types.Hash256{3}) { t.Fatal("unexpected cached sectors") } - c.finishUpload(uID2) - if roots2 := c.sectors(types.FileContractID{1}); len(roots2) != 0 { + c.FinishUpload(uID2) + if roots2 := c.Sectors(fcid1); len(roots2) != 0 { t.Fatal("unexpected cached sectors") } - if err := c.addUploadingSector(uID1, types.FileContractID{1}, types.Hash256{1}); !errors.Is(err, api.ErrUnknownUpload) { + if err := c.AddSector(uID1, fcid1, types.Hash256{1}); !errors.Is(err, api.ErrUnknownUpload) { t.Fatal("unexpected error", err) } - if err := c.trackUpload(uID1); err != nil { + if err := c.StartUpload(uID1); err != nil { t.Fatal("unexpected error", err) } - if err := c.trackUpload(uID1); !errors.Is(err, api.ErrUploadAlreadyExists) { + if err := c.StartUpload(uID1); !errors.Is(err, api.ErrUploadAlreadyExists) { t.Fatal("unexpected error", err) } + + // reset cache + c = newUploadingSectorsCache() + + // track upload that uploads across two contracts + c.StartUpload(uID1) + c.AddSector(uID1, fcid1, types.Hash256{1}) + c.AddSector(uID1, fcid1, types.Hash256{2}) + c.HandleRenewal(fcid2, fcid1) + c.AddSector(uID1, fcid2, types.Hash256{3}) + c.AddSector(uID1, fcid2, types.Hash256{4}) + + // assert pending sizes for both contracts should be 4 sectors + p1 := c.Pending(fcid1) + p2 := c.Pending(fcid2) + if p1 != p2 || p1 != 4*rhpv2.SectorSize { + t.Fatal("unexpected pending size", p1/rhpv2.SectorSize, p2/rhpv2.SectorSize) + } + + // assert sectors for both contracts contain 4 sectors + s1 := c.Sectors(fcid1) + s2 := c.Sectors(fcid2) + if len(s1) != 4 || len(s2) != 4 { + t.Fatal("unexpected sectors", len(s1), len(s2)) + } + + // finish upload + c.FinishUpload(uID1) + s1 = c.Sectors(fcid1) + s2 = c.Sectors(fcid2) + if len(s1) != 0 || len(s2) != 0 { + t.Fatal("unexpected sectors", len(s1), len(s2)) + } + + // renew the contract + c.HandleRenewal(fcid3, fcid2) + + // trigger pruning + c.StartUpload(uID2) + c.FinishUpload(uID2) + + // assert renewedTo gets pruned + if len(c.renewedTo) != 1 { + t.Fatal("unexpected", len(c.renewedTo)) + } } func newTestUploadID() api.UploadID { From a6c306d7c11b21894872209863c095a76b5e7717 Mon Sep 17 00:00:00 2001 From: Chris Schinnerl Date: Wed, 27 Mar 2024 13:41:45 +0100 Subject: [PATCH 09/10] worker: update maxRevisionCost check to match the way hostd computes LatestRevisionCost --- worker/gouging.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/worker/gouging.go b/worker/gouging.go index e8e362040..38d4aa088 100644 --- a/worker/gouging.go +++ b/worker/gouging.go @@ -272,7 +272,7 @@ func checkPriceGougingPT(gs api.GougingSettings, cs api.ConsensusState, txnFee t } // check LatestRevisionCost - expect sane value - maxRevisionCost := gs.MaxDownloadPrice.Div64(1 << 40).Mul64(4096) + maxRevisionCost := gs.MaxRPCPrice.Add(gs.MaxDownloadPrice.Div64(1 << 40).Mul64(2048)) if pt.LatestRevisionCost.Cmp(maxRevisionCost) > 0 { return fmt.Errorf("LatestRevisionCost of %v exceeds maximum cost of %v", pt.LatestRevisionCost, maxRevisionCost) } From aaab735d70dac8cf7dfb6b9d5956ed8c054f6c4f Mon Sep 17 00:00:00 2001 From: Chris Schinnerl Date: Wed, 27 Mar 2024 14:11:09 +0100 Subject: [PATCH 10/10] e2e: fix TestGouging --- worker/gouging.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/worker/gouging.go b/worker/gouging.go index 38d4aa088..0345385b4 100644 --- a/worker/gouging.go +++ b/worker/gouging.go @@ -272,7 +272,10 @@ func checkPriceGougingPT(gs api.GougingSettings, cs api.ConsensusState, txnFee t } // check LatestRevisionCost - expect sane value - maxRevisionCost := gs.MaxRPCPrice.Add(gs.MaxDownloadPrice.Div64(1 << 40).Mul64(2048)) + maxRevisionCost, overflow := gs.MaxRPCPrice.AddWithOverflow(gs.MaxDownloadPrice.Div64(1 << 40).Mul64(2048)) + if overflow { + maxRevisionCost = types.MaxCurrency + } if pt.LatestRevisionCost.Cmp(maxRevisionCost) > 0 { return fmt.Errorf("LatestRevisionCost of %v exceeds maximum cost of %v", pt.LatestRevisionCost, maxRevisionCost) }