diff --git a/autopilot/contractor/contractor.go b/autopilot/contractor/contractor.go index 2ca8c607b..b452a21c4 100644 --- a/autopilot/contractor/contractor.go +++ b/autopilot/contractor/contractor.go @@ -463,7 +463,9 @@ func activeContracts(ctx context.Context, bus Bus, logger *zap.SugaredLogger) ([ // fetch active contracts logger.Info("fetching active contracts") start := time.Now() - metadatas, err := bus.Contracts(ctx, api.ContractsOpts{FilterMode: api.ContractFilterModeActive}) + metadatas, err := bus.Contracts(ctx, api.ContractsOpts{ + FilterMode: api.ContractFilterModeActive, + }) if err != nil { return nil, err } @@ -907,7 +909,9 @@ func performContractFormations(ctx *mCtx, bus Bus, cr contractReviser, hf hostFi wanted := int(ctx.WantedContracts()) // fetch all active contracts - contracts, err := bus.Contracts(ctx, api.ContractsOpts{}) + contracts, err := bus.Contracts(ctx, api.ContractsOpts{ + FilterMode: api.ContractFilterModeActive, + }) if err != nil { return 0, fmt.Errorf("failed to fetch contracts: %w", err) } @@ -1057,7 +1061,9 @@ func performHostChecks(ctx *mCtx, bus Bus, logger *zap.SugaredLogger) error { func performPostMaintenanceTasks(ctx *mCtx, bus Bus, alerter alerts.Alerter, cc contractChecker, rb revisionBroadcaster, logger *zap.SugaredLogger) error { // fetch some contract and host info - allContracts, err := bus.Contracts(ctx, api.ContractsOpts{}) + allContracts, err := bus.Contracts(ctx, api.ContractsOpts{ + FilterMode: api.ContractFilterModeActive, + }) if err != nil { return fmt.Errorf("failed to fetch all contracts: %w", err) } @@ -1122,7 +1128,7 @@ func performV2ContractMigration(ctx *mCtx, bus Bus, cr contractReviser, logger * } contracts, err := bus.Contracts(ctx, api.ContractsOpts{ - FilterMode: api.ContractFilterModeAll, // TODO: change to usable + FilterMode: api.ContractFilterModeActive, }) if err != nil { logger.With(zap.Error(err)).Error("failed to fetch contracts for migration") @@ -1154,7 +1160,7 @@ func performV2ContractMigration(ctx *mCtx, bus Bus, cr contractReviser, logger * } // form a new contract with the same host - contract, _, err := cr.formContract(ctx, bus, host, InitialContractFunding, logger) + _, _, err = cr.formContract(ctx, bus, host, InitialContractFunding, logger) if err != nil { logger.Errorf("failed to form a v2 contract with the host") continue diff --git a/bus/bus.go b/bus/bus.go index d33594270..8c5898440 100644 --- a/bus/bus.go +++ b/bus/bus.go @@ -794,6 +794,13 @@ func (b *Bus) renewContractV1(ctx context.Context, cs consensus.State, gp api.Go // derive the renter key renterKey := b.masterKey.DeriveContractKey(c.HostKey) + // cap v1 renewals to the v2 require height since the host won't allow us to + // form contracts beyond that + v2ReqHeight := b.cm.TipState().Network.HardforkV2.RequireHeight + if endHeight >= v2ReqHeight { + endHeight = v2ReqHeight - 1 + } + // fetch the revision rev, err := b.rhp3Client.Revision(ctx, c.ID, c.HostKey, hs.SiamuxAddr()) if err != nil { diff --git a/bus/routes.go b/bus/routes.go index a5a56d7e4..04853a49c 100644 --- a/bus/routes.go +++ b/bus/routes.go @@ -2323,6 +2323,13 @@ func (b *Bus) contractsFormHandler(jc jape.Context) { return } + // cap v1 formations to the v2 require height since the host won't allow + // us to form contracts beyond that + v2ReqHeight := b.cm.TipState().Network.HardforkV2.RequireHeight + if rfr.EndHeight >= v2ReqHeight { + rfr.EndHeight = v2ReqHeight - 1 + } + // check gouging breakdown := gc.CheckSettings(settings) if breakdown.Gouging() { diff --git a/cmd/renterd/config.go b/cmd/renterd/config.go index 464a8521b..e3bf4d0e6 100644 --- a/cmd/renterd/config.go +++ b/cmd/renterd/config.go @@ -95,6 +95,7 @@ func defaultConfig() config.Config { ID: "", AccountsRefillInterval: defaultAccountRefillInterval, BusFlushInterval: 5 * time.Second, + CacheExpiry: 5 * time.Minute, DownloadMaxOverdrive: 5, DownloadOverdriveTimeout: 3 * time.Second, diff --git a/config/config.go b/config/config.go index 86ef1d97f..f40554b11 100644 --- a/config/config.go +++ b/config/config.go @@ -124,6 +124,7 @@ type ( UploadMaxMemory uint64 `yaml:"uploadMaxMemory,omitempty"` UploadMaxOverdrive uint64 `yaml:"uploadMaxOverdrive,omitempty"` AllowUnauthenticatedDownloads bool `yaml:"allowUnauthenticatedDownloads,omitempty"` + CacheExpiry time.Duration `yaml:"cacheExpiry,omitempty"` } // Autopilot contains the configuration for an autopilot. diff --git a/go.mod b/go.mod index 6083dd99b..6a02e9ca6 100644 --- a/go.mod +++ b/go.mod @@ -14,10 +14,10 @@ require ( github.com/mattn/go-sqlite3 v1.14.24 github.com/montanaflynn/stats v0.7.1 github.com/shopspring/decimal v1.4.0 - go.sia.tech/core v0.8.0 - go.sia.tech/coreutils v0.8.1-0.20241217101542-5d6fc37cbb94 + go.sia.tech/core v0.8.1-0.20241217152409-7950a7ca324b + go.sia.tech/coreutils v0.8.1-0.20241217153531-b5e84c03d17f go.sia.tech/gofakes3 v0.0.5 - go.sia.tech/hostd v1.1.3-0.20241217094733-44bba16e129e + go.sia.tech/hostd v1.1.3-0.20241218083322-ae9c8a971fe0 go.sia.tech/jape v0.12.1 go.sia.tech/mux v1.3.0 go.sia.tech/web/renterd v0.69.0 diff --git a/go.sum b/go.sum index 7012eafab..7fcf27db2 100644 --- a/go.sum +++ b/go.sum @@ -55,14 +55,14 @@ github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOf github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= go.etcd.io/bbolt v1.3.11 h1:yGEzV1wPz2yVCLsD8ZAiGHhHVlczyC9d1rP43/VCRJ0= go.etcd.io/bbolt v1.3.11/go.mod h1:dksAq7YMXoljX0xu6VF5DMZGbhYYoLUalEiSySYAS4I= -go.sia.tech/core v0.8.0 h1:J6vZQlVhpj4bTVeuC2GKkfkGEs8jf0j651Kl1wwOxjg= -go.sia.tech/core v0.8.0/go.mod h1:Wj1qzvpMM2rqEQjwWJEbCBbe9VWX/mSJUu2Y2ABl1QA= -go.sia.tech/coreutils v0.8.1-0.20241217101542-5d6fc37cbb94 h1:1fbD59wfyA1+5LmLYNh+ukNpkbtEmQgcXYlRUZTdr+M= -go.sia.tech/coreutils v0.8.1-0.20241217101542-5d6fc37cbb94/go.mod h1:ml5MefDMWCvPKNeRVIGHmyF5tv27C9h1PiI/iOiTGLg= +go.sia.tech/core v0.8.1-0.20241217152409-7950a7ca324b h1:VRkb6OOX1KawLQwuqOEHLcjha8gxVX0tAyu2Dyoq8Ek= +go.sia.tech/core v0.8.1-0.20241217152409-7950a7ca324b/go.mod h1:Wj1qzvpMM2rqEQjwWJEbCBbe9VWX/mSJUu2Y2ABl1QA= +go.sia.tech/coreutils v0.8.1-0.20241217153531-b5e84c03d17f h1:TafvnqJgx/+0zX/QMSOOkf5HfMqaoe/73eO515fUucI= +go.sia.tech/coreutils v0.8.1-0.20241217153531-b5e84c03d17f/go.mod h1:xhIbFjjkzmCF8Dt73ZvquaBQCT2Dje7AKYBRAesn93w= go.sia.tech/gofakes3 v0.0.5 h1:vFhVBUFbKE9ZplvLE2w4TQxFMQyF8qvgxV4TaTph+Vw= go.sia.tech/gofakes3 v0.0.5/go.mod h1:LXEzwGw+OHysWLmagleCttX93cJZlT9rBu/icOZjQ54= -go.sia.tech/hostd v1.1.3-0.20241217094733-44bba16e129e h1:VWdrQiZKnoWxB3Qtxkolph+SL6/qharIX8dkqZ7i1d0= -go.sia.tech/hostd v1.1.3-0.20241217094733-44bba16e129e/go.mod h1:9jRImPfriQKypd7O6O46BQzRkyx+0tRabNKxQxJxDR8= +go.sia.tech/hostd v1.1.3-0.20241218083322-ae9c8a971fe0 h1:QtF8l+pHZq6gPyDyuQoMv8GdwU6lvz39y4I34S3cuvo= +go.sia.tech/hostd v1.1.3-0.20241218083322-ae9c8a971fe0/go.mod h1:9jRImPfriQKypd7O6O46BQzRkyx+0tRabNKxQxJxDR8= go.sia.tech/jape v0.12.1 h1:xr+o9V8FO8ScRqbSaqYf9bjj1UJ2eipZuNcI1nYousU= go.sia.tech/jape v0.12.1/go.mod h1:wU+h6Wh5olDjkPXjF0tbZ1GDgoZ6VTi4naFw91yyWC4= go.sia.tech/mux v1.3.0 h1:hgR34IEkqvfBKUJkAzGi31OADeW2y7D6Bmy/Jcbop9c= diff --git a/internal/accounts/accounts.go b/internal/accounts/accounts.go index 2a67b7002..72d35ee9d 100644 --- a/internal/accounts/accounts.go +++ b/internal/accounts/accounts.go @@ -448,12 +448,8 @@ func (a *Manager) refillAccount(ctx context.Context, contract api.ContractMetada } func (a *Account) Token() rhpv4.AccountToken { - t := rhpv4.AccountToken{ - Account: rhpv4.Account(a.key.PublicKey()), - ValidUntil: time.Now().Add(5 * time.Minute), - } - t.Signature = a.key.SignHash(t.SigHash()) - return t + account := rhpv4.Account(a.key.PublicKey()) + return account.Token(a.key, a.acc.HostKey) } // WithSync syncs an accounts balance with the bus. To do so, the account is diff --git a/internal/bus/chainsubscriber.go b/internal/bus/chainsubscriber.go index 23da73cfa..0224ab0f3 100644 --- a/internal/bus/chainsubscriber.go +++ b/internal/bus/chainsubscriber.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "math" + "strings" "sync" "time" @@ -71,6 +72,7 @@ type ( Wallet interface { FundV2Transaction(txn *types.V2Transaction, amount types.Currency, useUnconfirmed bool) (types.ChainIndex, []int, error) + ReleaseInputs(txns []types.Transaction, v2txns []types.V2Transaction) SignV2Inputs(txn *types.V2Transaction, toSign []int) UpdateChainState(tx wallet.UpdateTx, reverted []chain.RevertUpdate, applied []chain.ApplyUpdate) error } @@ -433,8 +435,15 @@ func (s *chainSubscriber) broadcastExpiredFileContractResolutions(tx sql.ChainUp // verify txn and broadcast it _, err = s.cm.AddV2PoolTransactions(basis, []types.V2Transaction{txn}) - if err != nil { - s.logger.Errorf("failed to broadcast contract expiration txn: %v", err) + if err != nil && + (strings.Contains(err.Error(), "has already been resolved") || + strings.Contains(err.Error(), "not present in the accumulator")) { + s.wallet.ReleaseInputs(nil, []types.V2Transaction{txn}) + s.logger.With(zap.Error(err)).Debug("failed to broadcast contract expiration txn") + continue + } else if err != nil { + s.logger.With(zap.Error(err)).Error("failed to broadcast contract expiration txn") + s.wallet.ReleaseInputs(nil, []types.V2Transaction{txn}) continue } s.s.BroadcastV2TransactionSet(basis, []types.V2Transaction{txn}) diff --git a/internal/test/e2e/cluster.go b/internal/test/e2e/cluster.go index 606177391..f5d34a454 100644 --- a/internal/test/e2e/cluster.go +++ b/internal/test/e2e/cluster.go @@ -636,7 +636,7 @@ func announceHosts(hosts []*Host) error { for _, host := range hosts { settings := defaultHostSettings settings.NetAddress = host.rhp4Listener.Addr().(*net.TCPAddr).IP.String() - if err := host.settings.UpdateSettings(settings); err != nil { + if err := host.UpdateSettings(settings); err != nil { return err } if err := host.settings.Announce(); err != nil { @@ -1000,6 +1000,7 @@ func testDBCfg() dbConfig { func testWorkerCfg() config.Worker { return config.Worker{ AccountsRefillInterval: 10 * time.Millisecond, + CacheExpiry: 100 * time.Millisecond, ID: "worker", BusFlushInterval: testBusFlushInterval, DownloadOverdriveTimeout: 500 * time.Millisecond, diff --git a/internal/test/e2e/cluster_test.go b/internal/test/e2e/cluster_test.go index b4dff8278..f6c989dd8 100644 --- a/internal/test/e2e/cluster_test.go +++ b/internal/test/e2e/cluster_test.go @@ -2602,7 +2602,7 @@ func TestDownloadAllHosts(t *testing.T) { // block the new host but unblock the old one for _, host := range cluster.hosts { if host.PublicKey() == newHost { - toBlock := []string{host.settings.Settings().NetAddress, host.RHPv4Addr()} + toBlock := []string{host.RHPv2Addr(), host.RHPv4Addr()} tt.OK(b.UpdateHostBlocklist(context.Background(), toBlock, randomHost, false)) } } @@ -2615,6 +2615,7 @@ func TestDownloadAllHosts(t *testing.T) { tt.OK(host.UpdateSettings(settings)) } } + time.Sleep(testWorkerCfg().CacheExpiry) // expire cache // download the object dst = new(bytes.Buffer) @@ -2833,6 +2834,7 @@ func TestContractFundsReturnWhenHostOffline(t *testing.T) { // mine until the contract is expired cluster.mineBlocks(types.VoidAddress, contract.WindowEnd-cs.BlockHeight) + cluster.sync() expectedBalance := wallet.Confirmed.Add(contract.InitialRenterFunds).Sub(fee.Mul64(ibus.ContractResolutionTxnWeight)) cluster.tt.Retry(10, time.Second, func() error { @@ -2913,3 +2915,175 @@ func TestResyncAccounts(t *testing.T) { return w.DownloadObject(context.Background(), bytes.NewBuffer(nil), testBucket, path, api.DownloadObjectOptions{}) }) } + +func TestV1ToV2Transition(t *testing.T) { + // create a chain manager with a custom network that starts before the v2 + // allow height + network, genesis := testNetwork() + network.HardforkV2.AllowHeight = 100 + network.HardforkV2.RequireHeight = 200 // 100 blocks after the allow height + store, state, err := chain.NewDBStore(chain.NewMemDB(), network, genesis) + if err != nil { + t.Fatal(err) + } + cm := chain.NewManager(store, state) + + // custom autopilot config + apCfg := test.AutopilotConfig + apCfg.Contracts.Amount = 2 + apCfg.Contracts.Period = 1000 // make sure we handle trying to form contracts with a proof height after the v2 require height + apCfg.Contracts.RenewWindow = 50 + + // create a test cluster + nHosts := 3 + cluster := newTestCluster(t, testClusterOptions{ + autopilotConfig: &apCfg, + hosts: 0, // add hosts manually later + cm: cm, + uploadPacking: false, // disable to make sure we don't accidentally serve data from disk + }) + defer cluster.Shutdown() + tt := cluster.tt + + // add hosts and wait for contracts to form + cluster.AddHosts(nHosts) + + // make sure we are still before the v2 allow height + if cm.Tip().Height >= network.HardforkV2.AllowHeight { + t.Fatal("should be before the v2 allow height") + } + + // we should have 2 v1 contracts + var contracts []api.ContractMetadata + tt.Retry(100, 100*time.Millisecond, func() error { + contracts, err = cluster.Bus.Contracts(context.Background(), api.ContractsOpts{FilterMode: api.ContractFilterModeAll}) + tt.OK(err) + if len(contracts) != nHosts-1 { + return fmt.Errorf("expected %v contracts, got %v", nHosts-1, len(contracts)) + } + return nil + }) + contractHosts := make(map[types.PublicKey]struct{}) + for _, c := range contracts { + if c.V2 { + t.Fatal("should not have formed v2 contracts") + } else if c.EndHeight() != network.HardforkV2.RequireHeight-1 { + t.Fatalf("expected proof height to be %v, got %v", network.HardforkV2.RequireHeight-1, c.EndHeight()) + } + contractHosts[c.HostKey] = struct{}{} + } + + // sanity check number of hosts just to be safe + if len(contractHosts) != nHosts-1 { + t.Fatalf("expected %v unique hosts, got %v", nHosts-1, len(contractHosts)) + } + + // upload some data + data := frand.Bytes(100) + tt.OKAll(cluster.Worker.UploadObject(context.Background(), bytes.NewReader(data), testBucket, "foo", api.UploadObjectOptions{ + MinShards: 1, + TotalShards: nHosts - 1, + })) + + // mine until we reach the v2 allowheight + cluster.MineBlocks(network.HardforkV2.AllowHeight - cm.Tip().Height) + + // slowly mine a few more blocks to allow renter to react + for i := 0; i < 5; i++ { + cluster.MineBlocks(1) + time.Sleep(100 * time.Millisecond) + } + + // check that we have 1 archived contract for every contract we had before + var archivedContracts []api.ContractMetadata + tt.Retry(100, 100*time.Millisecond, func() error { + archivedContracts, err = cluster.Bus.Contracts(context.Background(), api.ContractsOpts{FilterMode: api.ContractFilterModeArchived}) + tt.OK(err) + if len(archivedContracts) != nHosts-1 { + return fmt.Errorf("expected %v archived contracts, got %v", nHosts-1, len(archivedContracts)) + } + return nil + }) + + // they should be on nHosts-1 unique hosts + usedHosts := make(map[types.PublicKey]struct{}) + for _, c := range archivedContracts { + if c.ArchivalReason != "migrated to v2" { + t.Fatalf("expected archival reason to be 'migrated to v2', got %v", c.ArchivalReason) + } + usedHosts[c.HostKey] = struct{}{} + } + if len(usedHosts) != nHosts-1 { + t.Fatalf("expected %v unique hosts, got %v", nHosts-1, len(usedHosts)) + } + + // we should have the same number of active contracts + activeContracts, err := cluster.Bus.Contracts(context.Background(), api.ContractsOpts{FilterMode: api.ContractFilterModeActive}) + tt.OK(err) + if len(activeContracts) != nHosts-1 { + t.Fatalf("expected %v active contracts, got %v", nHosts-1, len(activeContracts)) + } + + // they should be on the same hosts as before + for _, c := range activeContracts { + if _, ok := usedHosts[c.HostKey]; !ok { + t.Fatal("host not found in used hosts") + } else if !c.V2 { + t.Fatal("expected contract to be v2, got v1", c.ID, c.ArchivalReason) + } + delete(usedHosts, c.HostKey) + } + + tt.Retry(100, 100*time.Millisecond, func() error { + // check health is 1 + object, err := cluster.Bus.Object(context.Background(), testBucket, "foo", api.GetObjectOptions{}) + tt.OK(err) + if object.Health != 1 { + return fmt.Errorf("expected health to be 1, got %v", object.Health) + } + slab := object.Slabs[0] + + // check that the contracts now contain the data + activeContracts, err = cluster.Bus.Contracts(context.Background(), api.ContractsOpts{FilterMode: api.ContractFilterModeActive}) + tt.OK(err) + for _, c := range activeContracts { + // check revision + rev, err := cluster.Bus.ContractRevision(context.Background(), c.ID) + tt.OK(err) + if rev.Size != rhpv4.SectorSize { + return fmt.Errorf("expected revision size to be %v, got %v", rhpv4.SectorSize, rev.Size) + } + // check local metadata + if c.Size != rhpv4.SectorSize { + return fmt.Errorf("expected contract size to be %v, got %v", rhpv4.SectorSize, c.Size) + } + // one of the shards should be on this contract + var found bool + for _, shard := range slab.Shards { + for _, fcid := range shard.Contracts[c.HostKey] { + found = found || fcid == c.ID + } + } + if !found { + t.Fatal("expected contract to shard data") + } + } + return nil + }) + + // download file to make sure it's still available + // NOTE: 1st try fails since the accounts appear not to be funded since the + // test host has separate account managers for rhp3 and rhp4 + tt.FailAll(cluster.Worker.DownloadObject(context.Background(), bytes.NewBuffer(nil), testBucket, "foo", api.DownloadObjectOptions{})) + + // subsequent tries succeed + tt.Retry(100, 100*time.Millisecond, func() error { + buf := new(bytes.Buffer) + if err := cluster.Worker.DownloadObject(context.Background(), buf, testBucket, "foo", api.DownloadObjectOptions{}); err != nil { + return err + } else if !bytes.Equal(data, buf.Bytes()) { + t.Fatal("data mismatch") + } + return nil + }) +} diff --git a/internal/test/e2e/gouging_test.go b/internal/test/e2e/gouging_test.go index 440879972..dc52a39f7 100644 --- a/internal/test/e2e/gouging_test.go +++ b/internal/test/e2e/gouging_test.go @@ -93,14 +93,14 @@ func TestGouging(t *testing.T) { if err := h.UpdateSettings(settings); err != nil { t.Fatal(err) } - } - // make sure the price table expires so the worker is forced to fetch it - // again, this is necessary for the host to be considered price gouging - time.Sleep(defaultHostSettings.PriceTableValidity) + // scan the host + tt.OKAll(cluster.Bus.ScanHost(context.Background(), h.PublicKey(), time.Second)) + } + time.Sleep(testWorkerCfg().CacheExpiry) // wait for cache to refresh - // download the data - should still work - tt.OKAll(w.DownloadObject(context.Background(), io.Discard, testBucket, path, api.DownloadObjectOptions{})) + // download the data - won't work since the hosts are not usable anymore + tt.FailAll(w.DownloadObject(context.Background(), io.Discard, testBucket, path, api.DownloadObjectOptions{})) // try optimising gouging settings resp, err := cluster.Autopilot.EvaluateConfig(context.Background(), test.AutopilotConfig, gs, test.RedundancySettings) diff --git a/internal/upload/uploadmanager.go b/internal/upload/uploadmanager.go index 130be696e..357c9956b 100644 --- a/internal/upload/uploadmanager.go +++ b/internal/upload/uploadmanager.go @@ -471,9 +471,6 @@ func (mgr *Manager) UploadShards(ctx context.Context, s object.Slab, shardIndice // upload the shards uploaded, uploadSpeed, overdrivePct, err := upload.uploadShards(ctx, shards, mgr.candidates(upload.allowed), mem, mgr.maxOverdrive, mgr.overdriveTimeout) - if err != nil { - return err - } // build sectors var sectors []api.UploadedSector @@ -483,13 +480,22 @@ func (mgr *Manager) UploadShards(ctx context.Context, s object.Slab, shardIndice Root: sector.root, }) } + if len(sectors) > 0 { + if err := mgr.os.UpdateSlab(ctx, s.EncryptionKey, sectors); err != nil { + return fmt.Errorf("couldn't update slab: %w", err) + } + } + + // check error + if err != nil { + return err + } // track stats mgr.statsOverdrivePct.Track(overdrivePct) mgr.statsSlabUploadSpeedBytesPerMS.Track(float64(uploadSpeed)) - // update the slab - return mgr.os.UpdateSlab(ctx, s.EncryptionKey, sectors) + return nil } func (mgr *Manager) candidates(allowed map[types.PublicKey]struct{}) (candidates []*uploader.Uploader) { @@ -672,6 +678,9 @@ func (u *upload) uploadSlab(ctx context.Context, rs api.RedundancySettings, data return uploadSpeed, overdrivePct } +// uploadShards uploads the shards to the provided candidates. It returns an +// error if it fails to upload all shards but len(sectors) will be > 0 if some +// shards were uploaded successfully. func (u *upload) uploadShards(ctx context.Context, shards [][]byte, candidates []*uploader.Uploader, mem memory.Memory, maxOverdrive uint64, overdriveTimeout time.Duration) (sectors []uploadedSector, uploadSpeed int64, overdrivePct float64, err error) { // ensure inflight uploads get cancelled ctx, cancel := context.WithCancel(ctx) @@ -770,6 +779,13 @@ loop: } } + // collect the sectors + for _, sector := range slab.sectors { + if sector.isUploaded() { + sectors = append(sectors, sector.uploaded) + } + } + // calculate the upload speed bytes := slab.numUploaded * rhpv2.SectorSize ms := time.Since(start).Milliseconds() @@ -791,10 +807,6 @@ loop: return } - // collect the sectors - for _, sector := range slab.sectors { - sectors = append(sectors, sector.uploaded) - } return } diff --git a/internal/worker/cache.go b/internal/worker/cache.go index 3fc69184a..f0fedded8 100644 --- a/internal/worker/cache.go +++ b/internal/worker/cache.go @@ -12,14 +12,13 @@ import ( ) const ( - cacheEntryExpiry = 5 * time.Minute - cacheKeyUsableHosts = "usablehosts" ) type memoryCache struct { - items map[string]*cacheEntry - mu sync.RWMutex + cacheEntryExpiry time.Duration + items map[string]*cacheEntry + mu sync.RWMutex } type cacheEntry struct { @@ -27,9 +26,10 @@ type cacheEntry struct { expiry time.Time } -func newMemoryCache() *memoryCache { +func newMemoryCache(expiry time.Duration) *memoryCache { return &memoryCache{ - items: make(map[string]*cacheEntry), + cacheEntryExpiry: expiry, + items: make(map[string]*cacheEntry), } } @@ -59,7 +59,7 @@ func (c *memoryCache) Set(key string, value interface{}) { defer c.mu.Unlock() c.items[key] = &cacheEntry{ value: value, - expiry: time.Now().Add(cacheEntryExpiry), + expiry: time.Now().Add(c.cacheEntryExpiry), } } @@ -79,12 +79,12 @@ type cache struct { logger *zap.SugaredLogger } -func NewCache(b Bus, logger *zap.Logger) WorkerCache { +func NewCache(b Bus, expiry time.Duration, logger *zap.Logger) WorkerCache { logger = logger.Named("workercache") return &cache{ b: b, - cache: newMemoryCache(), + cache: newMemoryCache(expiry), logger: logger.Sugar(), } } diff --git a/stores/metadata_test.go b/stores/metadata_test.go index d03bf3203..90a386667 100644 --- a/stores/metadata_test.go +++ b/stores/metadata_test.go @@ -245,19 +245,7 @@ func TestObjectBasic(t *testing.T) { t.Fatal(err) } if !reflect.DeepEqual(*got.Object, want) { - t.Fatal("object mismatch", got.Object, want) - } - - // update the sector to have a non-consecutive slab index - _, err = ss.DB().Exec(context.Background(), "UPDATE sectors SET slab_index = 100 WHERE slab_index = 1") - if err != nil { - t.Fatalf("failed to update sector: %v", err) - } - - // fetch the object again and assert we receive an indication it was corrupted - _, err = ss.Object(context.Background(), testBucket, "/"+t.Name()) - if !errors.Is(err, api.ErrObjectCorrupted) { - t.Fatal("unexpected err", err) + t.Fatal("object mismatch", cmp.Diff(*got.Object, want, cmp.AllowUnexported(object.EncryptionKey{}))) } // create an object without slabs @@ -272,7 +260,7 @@ func TestObjectBasic(t *testing.T) { t.Fatal(err) } if !reflect.DeepEqual(*got2.Object, want2) { - t.Fatal("object mismatch", cmp.Diff(got2.Object, want2)) + t.Fatal("object mismatch", cmp.Diff(*got2.Object, want2, cmp.AllowUnexported(object.EncryptionKey{}))) } } @@ -2614,7 +2602,7 @@ func TestPartialSlab(t *testing.T) { t.Fatal(err) } if !reflect.DeepEqual(obj, *fetched.Object) { - t.Fatal("mismatch", cmp.Diff(obj, fetched.Object, cmp.AllowUnexported(object.EncryptionKey{}))) + t.Fatal("mismatch", cmp.Diff(obj, *fetched.Object, cmp.AllowUnexported(object.EncryptionKey{}))) } // Add the second slab. @@ -3636,6 +3624,63 @@ func TestUpdateSlabSanityChecks(t *testing.T) { } } +func TestSlabSectorOnHostButNotInContract(t *testing.T) { + ss := newTestSQLStore(t, defaultTestSQLStoreConfig) + defer ss.Close() + + // create host and 2 contracts with it + hk := types.PublicKey{1} + err := ss.addTestHost(hk) + if err != nil { + t.Fatal(err) + } + _, contracts, err := ss.addTestContracts([]types.PublicKey{hk, hk}) + if err != nil { + t.Fatal(err) + } + + // prepare a slab - it has one shard that is pinned to contract 0. + shard := newTestShard(hk, contracts[0].ID, types.Hash256{1}) + slab := object.Slab{ + EncryptionKey: object.GenerateEncryptionKey(object.EncryptionKeyTypeSalted), + Shards: []object.Sector{shard}, + Health: 1, + } + + // set slab. + _, err = ss.addTestObject("/"+t.Name(), object.Object{ + Key: object.GenerateEncryptionKey(object.EncryptionKeyTypeSalted), + Slabs: []object.SlabSlice{{Slab: slab}}, + }) + if err != nil { + t.Fatal(err) + } + + rSlab, err := ss.Slab(context.Background(), slab.EncryptionKey) + if err != nil { + t.Fatal(err) + } else if len(rSlab.Shards) != 1 { + t.Fatal("should have 1 shard", len(rSlab.Shards)) + } else if fcids, exists := rSlab.Shards[0].Contracts[hk]; !exists || len(fcids) != 1 { + t.Fatalf("unexpected contracts %v, exists %v", fcids, exists) + } + + // delete one of the contracts - this should cause the host to still be in + // the slab but the associated slice should be empty + if err := ss.ArchiveContract(context.Background(), contracts[0].ID, "test"); err != nil { + t.Fatal(err) + } + + rSlab, err = ss.Slab(context.Background(), slab.EncryptionKey) + if err != nil { + t.Fatal(err) + } else if len(rSlab.Shards) != 1 { + t.Fatal("should have 1 shard", len(rSlab.Shards)) + } else if fcids, exists := rSlab.Shards[0].Contracts[hk]; !exists || len(fcids) != 0 { + t.Fatalf("unexpected contracts %v, exists %v", fcids, exists) + } +} + func TestSlabHealthInvalidation(t *testing.T) { // create db ss := newTestSQLStore(t, defaultTestSQLStoreConfig) diff --git a/stores/sql/main.go b/stores/sql/main.go index b6b020fbe..c8f2f1d8c 100644 --- a/stores/sql/main.go +++ b/stores/sql/main.go @@ -192,20 +192,16 @@ func ArchiveContract(ctx context.Context, tx sql.Tx, fcid types.FileContractID, return fmt.Errorf("failed to delete contract_sectors: %w", err) } - // delete host sectors that are no longer associated with any contract + // delete all host_sectors for every host that we don't have an active + // contract with anymore _, err = tx.Exec(ctx, `DELETE FROM host_sectors -WHERE NOT EXISTS ( - SELECT 1 - FROM contract_sectors - WHERE contract_sectors.db_sector_id = host_sectors.db_sector_id -) -OR NOT EXISTS ( - SELECT 1 - FROM contracts - INNER JOIN hosts ON contracts.host_id = hosts.id - WHERE contracts.archival_reason IS NULL - AND hosts.id = host_sectors.db_host_id -)`) + WHERE NOT EXISTS ( + SELECT 1 + FROM contracts + INNER JOIN hosts ON contracts.host_id = hosts.id + WHERE contracts.archival_reason IS NULL + AND hosts.id = host_sectors.db_host_id + )`) if err != nil { return fmt.Errorf("failed to delete host_sectors: %w", err) } @@ -506,19 +502,24 @@ func DeleteBucket(ctx context.Context, tx sql.Tx, bucket string) error { } func DeleteHostSector(ctx context.Context, tx sql.Tx, hk types.PublicKey, root types.Hash256) (int, error) { + // fetch sector id + var sectorID int64 + if err := tx.QueryRow(ctx, "SELECT s.id FROM sectors s WHERE s.root = ?", Hash256(root)). + Scan(§orID); errors.Is(err, dsql.ErrNoRows) { + return 0, nil + } else if err != nil { + return 0, fmt.Errorf("failed to fetch sector id: %w", err) + } + // remove potential links between the host's contracts and the sector res, err := tx.Exec(ctx, ` DELETE FROM contract_sectors - WHERE db_sector_id = ( - SELECT s.id - FROM sectors s - WHERE root = ? - ) AND db_contract_id IN ( + WHERE db_sector_id = ? AND db_contract_id IN ( SELECT c.id FROM contracts c WHERE c.host_key = ? ) - `, Hash256(root), PublicKey(hk)) + `, sectorID, PublicKey(hk)) if err != nil { return 0, fmt.Errorf("failed to delete contract sectors: %w", err) } @@ -552,6 +553,13 @@ func DeleteHostSector(ctx context.Context, tx sql.Tx, hk types.PublicKey, root t if err != nil { return 0, fmt.Errorf("failed to update lost sectors: %w", err) } + + // remove sector from host_sectors + _, err = tx.Exec(ctx, "DELETE FROM host_sectors WHERE db_sector_id = ?", sectorID) + if err != nil { + return 0, fmt.Errorf("failed to delete host sector: %w", err) + } + return int(deletedSectors), nil } @@ -890,6 +898,14 @@ LEFT JOIN host_checks hc ON hc.db_host_id = h.id // fill in v2 addresses err = fillInV2Addresses(ctx, tx, hostIDs, func(i int, addrs []string) { hosts[i].V2SiamuxAddresses = addrs + + // NOTE: a v2 host might have been scanned before the v2 height so strictly + // speaking it is scanned but since it hasn't been scanned since, the + // settings aren't set so we treat it as not scanned + if hosts[i].IsV2() && hosts[i].V2Settings == (rhp.HostSettings{}) { + hosts[i].Scanned = false + } + i++ }) if err != nil { @@ -1627,7 +1643,7 @@ func RecordHostScans(ctx context.Context, tx sql.Tx, scans []api.HostScan) error uptime = CASE WHEN ? AND last_scan > 0 AND last_scan < ? THEN uptime + ? - last_scan ELSE uptime END, last_scan = ?, settings = CASE WHEN ? THEN ? ELSE settings END, - v2_settings = CASE WHEN ? THEN ? ELSE settings END, + v2_settings = CASE WHEN ? THEN ? ELSE v2_settings END, price_table = CASE WHEN ? THEN ? ELSE price_table END, price_table_expiry = CASE WHEN ? THEN ? ELSE price_table_expiry END, successful_interactions = CASE WHEN ? THEN successful_interactions + 1 ELSE successful_interactions END, @@ -1843,7 +1859,44 @@ func Slab(ctx context.Context, tx sql.Tx, key object.EncryptionKey) (object.Slab } defer stmt.Close() + // fetch hosts for each sector + hostStmt, err := tx.Prepare(ctx, ` + SELECT h.public_key + FROM host_sectors hs + INNER JOIN hosts h ON h.id = hs.db_host_id + WHERE hs.db_sector_id = ? + `) + if err != nil { + return object.Slab{}, fmt.Errorf("failed to prepare statement to fetch hosts: %w", err) + } + defer hostStmt.Close() + for i, sectorID := range sectorIDs { + slab.Shards[i].Contracts = make(map[types.PublicKey][]types.FileContractID) + + // hosts + rows, err = hostStmt.Query(ctx, sectorID) + if err != nil { + return object.Slab{}, fmt.Errorf("failed to fetch hosts: %w", err) + } + if err := func() error { + defer rows.Close() + + for rows.Next() { + var pk types.PublicKey + if err := rows.Scan((*PublicKey)(&pk)); err != nil { + return fmt.Errorf("failed to scan host: %w", err) + } + if _, exists := slab.Shards[i].Contracts[pk]; !exists { + slab.Shards[i].Contracts[pk] = []types.FileContractID{} + } + } + return nil + }(); err != nil { + return object.Slab{}, err + } + + // contracts rows, err := stmt.Query(ctx, sectorID) if err != nil { return object.Slab{}, fmt.Errorf("failed to fetch contracts: %w", err) @@ -1851,14 +1904,18 @@ func Slab(ctx context.Context, tx sql.Tx, key object.EncryptionKey) (object.Slab if err := func() error { defer rows.Close() - slab.Shards[i].Contracts = make(map[types.PublicKey][]types.FileContractID) for rows.Next() { var pk types.PublicKey var fcid types.FileContractID if err := rows.Scan((*PublicKey)(&pk), (*FileContractID)(&fcid)); err != nil { return fmt.Errorf("failed to scan contract: %w", err) } - slab.Shards[i].Contracts[pk] = append(slab.Shards[i].Contracts[pk], fcid) + if _, exists := slab.Shards[i].Contracts[pk]; !exists { + slab.Shards[i].Contracts[pk] = []types.FileContractID{} + } + if fcid != (types.FileContractID{}) { + slab.Shards[i].Contracts[pk] = append(slab.Shards[i].Contracts[pk], fcid) + } } return nil }(); err != nil { @@ -2180,10 +2237,10 @@ EXISTS ( h.settings, h.v2_settings FROM hosts h - INNER JOIN contracts c on c.host_id = h.id and c.archival_reason IS NULL + INNER JOIN contracts c on c.host_id = h.id and c.archival_reason IS NULL AND c.usability = ? INNER JOIN host_checks hc on hc.db_host_id = h.id WHERE %s - GROUP by h.id`, strings.Join(whereExprs, " AND "))) + GROUP by h.id`, strings.Join(whereExprs, " AND ")), contractUsabilityGood) if err != nil { return nil, fmt.Errorf("failed to fetch hosts: %w", err) } @@ -2527,81 +2584,32 @@ func Object(ctx context.Context, tx Tx, bucket, key string) (api.Object, error) // fetch slab slices rows, err = tx.Query(ctx, ` - SELECT sla.db_buffered_slab_id IS NOT NULL, sli.object_index, sli.offset, sli.length, sla.health, sla.key, sla.min_shards, COALESCE(sec.slab_index, 0), COALESCE(sec.root, ?), COALESCE(c.fcid, ?), COALESCE(c.host_key, ?) + SELECT sla.health, sla.key, sla.min_shards, sli.offset, sli.length FROM slices sli INNER JOIN slabs sla ON sli.db_slab_id = sla.id - LEFT JOIN sectors sec ON sec.db_slab_id = sla.id - LEFT JOIN contract_sectors csec ON csec.db_sector_id = sec.id - LEFT JOIN contracts c ON c.id = csec.db_contract_id WHERE sli.db_object_id = ? - ORDER BY sli.object_index ASC, sec.slab_index ASC - `, Hash256{}, FileContractID{}, PublicKey{}, objID) + ORDER BY sli.object_index ASC + `, objID) if err != nil { return api.Object{}, fmt.Errorf("failed to fetch slabs: %w", err) } defer rows.Close() slabSlices := object.SlabSlices{} - var current *object.SlabSlice - var currObjIdx, currSlaIdx int64 for rows.Next() { - var bufferedSlab bool - var objectIndex int64 - var slabIndex int64 var ss object.SlabSlice - var sector object.Sector - var fcid types.FileContractID - var hk types.PublicKey - if err := rows.Scan(&bufferedSlab, // whether the slab is buffered - &objectIndex, &ss.Offset, &ss.Length, // slice info - &ss.Health, (*EncryptionKey)(&ss.EncryptionKey), &ss.MinShards, // slab info - &slabIndex, (*Hash256)(§or.Root), // sector info - (*PublicKey)(&fcid), // contract info - (*PublicKey)(&hk), // host info - ); err != nil { + if err := rows.Scan(&ss.Health, (*EncryptionKey)(&ss.EncryptionKey), &ss.MinShards, &ss.Offset, &ss.Length); err != nil { return api.Object{}, fmt.Errorf("failed to scan slab slice: %w", err) } - - // sanity check object for corruption - isFirst := current == nil && objectIndex == 1 && slabIndex == 1 - isBuffered := bufferedSlab && objectIndex == currObjIdx+1 && slabIndex == 0 - isNewSlab := isFirst || isBuffered || (current != nil && objectIndex == currObjIdx+1 && slabIndex == 1) - isNewShard := isNewSlab || (objectIndex == currObjIdx && slabIndex == currSlaIdx+1) - isNewContract := isNewShard || (objectIndex == currObjIdx && slabIndex == currSlaIdx) - if !isFirst && !isBuffered && !isNewSlab && !isNewShard && !isNewContract { - return api.Object{}, fmt.Errorf("%w: object index %d, slab index %d, current object index %d, current slab index %d", api.ErrObjectCorrupted, objectIndex, slabIndex, currObjIdx, currSlaIdx) - } - - // update indices - currObjIdx = objectIndex - currSlaIdx = slabIndex - - if isNewSlab { - if current != nil { - slabSlices = append(slabSlices, *current) - } - current = &ss - } - - // if the slab is buffered there are no sectors/contracts to add - if bufferedSlab { - continue - } - - if isNewShard { - current.Shards = append(current.Shards, sector) - } - if isNewContract { - if current.Shards[len(current.Shards)-1].Contracts == nil { - current.Shards[len(current.Shards)-1].Contracts = make(map[types.PublicKey][]types.FileContractID) - } - current.Shards[len(current.Shards)-1].Contracts[hk] = append(current.Shards[len(current.Shards)-1].Contracts[hk], fcid) - } + slabSlices = append(slabSlices, ss) } - // add last slab slice - if current != nil { - slabSlices = append(slabSlices, *current) + // fill in the shards + for i := range slabSlices { + slabSlices[i].Slab, err = Slab(ctx, tx, slabSlices[i].EncryptionKey) + if err != nil { + return api.Object{}, fmt.Errorf("failed to fetch slab: %w", err) + } } return api.Object{ diff --git a/worker/worker.go b/worker/worker.go index aca411d76..6720be179 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -599,6 +599,9 @@ func New(cfg config.Worker, masterKey [32]byte, b Bus, l *zap.Logger) (*Worker, if cfg.UploadMaxMemory == 0 { return nil, errors.New("uploadMaxMemory cannot be 0") } + if cfg.CacheExpiry == 0 { + return nil, errors.New("cache expiry cannot be 0") + } a := alerts.WithOrigin(b, fmt.Sprintf("worker.%s", cfg.ID)) shutdownCtx, shutdownCancel := context.WithCancel(context.Background()) @@ -606,7 +609,7 @@ func New(cfg config.Worker, masterKey [32]byte, b Bus, l *zap.Logger) (*Worker, dialer := rhp.NewFallbackDialer(b, net.Dialer{}, l) w := &Worker{ alerts: a, - cache: iworker.NewCache(b, l), + cache: iworker.NewCache(b, cfg.CacheExpiry, l), id: cfg.ID, bus: b, masterKey: masterKey, diff --git a/worker/worker_test.go b/worker/worker_test.go index 0c731766b..c32e51e1e 100644 --- a/worker/worker_test.go +++ b/worker/worker_test.go @@ -156,6 +156,7 @@ func (w *testWorker) UsableHosts() []api.HostInfo { func newTestWorkerCfg() config.Worker { return config.Worker{ AccountsRefillInterval: time.Second, + CacheExpiry: 100 * time.Millisecond, ID: "test", BusFlushInterval: time.Second, DownloadOverdriveTimeout: time.Second,