diff --git a/.golangci.yml b/.golangci.yml index 34c87cfb9a..d439ef1779 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -58,7 +58,6 @@ linters-settings: # diagnostic - commentedOutCode - uncheckedInlineErr - - builtinShadow # style - exitAfterDefer diff --git a/api/bus.go b/api/bus.go index c172f10016..248dfd3132 100644 --- a/api/bus.go +++ b/api/bus.go @@ -7,10 +7,9 @@ import ( type ( // ConsensusState holds the current blockheight and whether we are synced or not. ConsensusState struct { - BlockHeight uint64 `json:"blockHeight"` - LastBlockTime TimeRFC3339 `json:"lastBlockTime"` - SubscriberHeight uint64 `json:"subscriberHeight"` - Synced bool `json:"synced"` + BlockHeight uint64 `json:"blockHeight"` + LastBlockTime TimeRFC3339 `json:"lastBlockTime"` + Synced bool `json:"synced"` } // ConsensusNetwork holds the name of the network. diff --git a/autopilot/autopilot.go b/autopilot/autopilot.go index 2509fce014..092fc1afb1 100644 --- a/autopilot/autopilot.go +++ b/autopilot/autopilot.go @@ -121,7 +121,7 @@ type Autopilot struct { } // New initializes an Autopilot. -func New(id string, bus Bus, workers []Worker, logger *zap.Logger, heartbeat time.Duration, scannerScanInterval time.Duration, scannerBatchSize, scannerNumThreads uint64, migrationHealthCutoff float64, accountsRefillInterval time.Duration, contractConfirmationDeadline, revisionSubmissionBuffer, migratorParallelSlabsPerWorker uint64, revisionBroadcastInterval time.Duration) (*Autopilot, error) { +func New(id string, bus Bus, workers []Worker, logger *zap.Logger, heartbeat time.Duration, scannerScanInterval time.Duration, scannerBatchSize, scannerNumThreads uint64, migrationHealthCutoff float64, accountsRefillInterval time.Duration, revisionSubmissionBuffer, migratorParallelSlabsPerWorker uint64, revisionBroadcastInterval time.Duration) (*Autopilot, error) { shutdownCtx, shutdownCtxCancel := context.WithCancel(context.Background()) ap := &Autopilot{ @@ -149,7 +149,7 @@ func New(id string, bus Bus, workers []Worker, logger *zap.Logger, heartbeat tim } ap.s = scanner - ap.c = contractor.New(bus, bus, ap.logger, contractConfirmationDeadline, revisionSubmissionBuffer, revisionBroadcastInterval) + ap.c = contractor.New(bus, bus, ap.logger, revisionSubmissionBuffer, revisionBroadcastInterval) ap.m = newMigrator(ap, migrationHealthCutoff, migratorParallelSlabsPerWorker) ap.a = newAccounts(ap, ap.bus, ap.bus, ap.workers, ap.logger, accountsRefillInterval) diff --git a/autopilot/contractor/contractor.go b/autopilot/contractor/contractor.go index caf605e4b7..873394e7b0 100644 --- a/autopilot/contractor/contractor.go +++ b/autopilot/contractor/contractor.go @@ -120,8 +120,6 @@ type ( revisionLastBroadcast map[types.FileContractID]time.Time revisionSubmissionBuffer uint64 - contractConfirmationDeadline uint64 - firstRefreshFailure map[types.FileContractID]time.Time mu sync.Mutex @@ -171,7 +169,7 @@ type ( } ) -func New(bus Bus, alerter alerts.Alerter, logger *zap.SugaredLogger, contractConfirmationDeadline, revisionSubmissionBuffer uint64, revisionBroadcastInterval time.Duration) *Contractor { +func New(bus Bus, alerter alerts.Alerter, logger *zap.SugaredLogger, revisionSubmissionBuffer uint64, revisionBroadcastInterval time.Duration) *Contractor { logger = logger.Named("contractor") ctx, cancel := context.WithCancel(context.Background()) return &Contractor{ @@ -184,8 +182,6 @@ func New(bus Bus, alerter alerts.Alerter, logger *zap.SugaredLogger, contractCon revisionLastBroadcast: make(map[types.FileContractID]time.Time), revisionSubmissionBuffer: revisionSubmissionBuffer, - contractConfirmationDeadline: contractConfirmationDeadline, - firstRefreshFailure: make(map[types.FileContractID]time.Time), resolver: newIPResolver(ctx, resolverLookupTimeout, logger.Named("resolver")), @@ -649,14 +645,6 @@ LOOP: fcid := contract.ID hk := contract.HostKey - // refresh contract metadata - metadata, err := c.bus.Contract(ctx, contract.ID) - if err != nil { - c.logger.Errorf("couldn't fetch contract %v from database, err: %v", contract.ID, err) - break LOOP - } - contract.ContractMetadata = metadata - // check if contract is ready to be archived. if bh > contract.EndHeight()-c.revisionSubmissionBuffer { toArchive[fcid] = errContractExpired.Error() @@ -664,7 +652,7 @@ LOOP: toArchive[fcid] = errContractMaxRevisionNumber.Error() } else if contract.RevisionNumber == math.MaxUint64 { toArchive[fcid] = errContractMaxRevisionNumber.Error() - } else if c.contractConfirmationDeadline > 0 && contract.State == api.ContractStatePending && bh-contract.StartHeight > c.contractConfirmationDeadline { + } else if contract.State == api.ContractStatePending && bh-contract.StartHeight > ContractConfirmationDeadline { toArchive[fcid] = errContractNotConfirmed.Error() } if _, archived := toArchive[fcid]; archived { diff --git a/autopilot/contractor/hostfilter.go b/autopilot/contractor/hostfilter.go index 9c9481367c..31d53eb130 100644 --- a/autopilot/contractor/hostfilter.go +++ b/autopilot/contractor/hostfilter.go @@ -14,6 +14,10 @@ import ( ) const ( + // ContractConfirmationDeadline is the number of blocks since its start + // height we wait for a contract to appear on chain. + ContractConfirmationDeadline = 18 + // minContractFundUploadThreshold is the percentage of contract funds // remaining at which the contract gets marked as not good for upload minContractFundUploadThreshold = float64(0.05) // 5% diff --git a/bus/bus.go b/bus/bus.go index d590103d14..ea313094b4 100644 --- a/bus/bus.go +++ b/bus/bus.go @@ -33,6 +33,9 @@ import ( "go.uber.org/zap" ) +// blockInterval is the expected wall clock time between consecutive blocks. +const blockInterval = 10 * time.Minute + // Client re-exports the client from the client package. type Client struct { *client.Client @@ -1778,8 +1781,6 @@ func (b *bus) paramsHandlerUploadGET(jc jape.Context) { } func (b *bus) consensusState(ctx context.Context) (api.ConsensusState, error) { - tip := b.cm.TipState() - index, err := b.cs.ChainIndex(ctx) if err != nil { return api.ConsensusState{}, err @@ -1787,15 +1788,14 @@ func (b *bus) consensusState(ctx context.Context) (api.ConsensusState, error) { var synced bool block, found := b.cm.Block(index.ID) - if found && time.Since(block.Timestamp) < 2*tip.BlockInterval() { + if found && time.Since(block.Timestamp) < 2*blockInterval { synced = true } return api.ConsensusState{ - BlockHeight: tip.Index.Height, - LastBlockTime: api.TimeRFC3339(tip.PrevTimestamps[0]), - Synced: synced, - SubscriberHeight: index.Height, + BlockHeight: index.Height, + LastBlockTime: api.TimeRFC3339(block.Timestamp), + Synced: synced, }, nil } diff --git a/chain/subscriber.go b/chain/subscriber.go index 1c578f531b..781dda70a2 100644 --- a/chain/subscriber.go +++ b/chain/subscriber.go @@ -11,6 +11,7 @@ import ( "go.sia.tech/coreutils/chain" "go.sia.tech/coreutils/wallet" "go.sia.tech/renterd/api" + "go.sia.tech/renterd/internal/utils" "go.uber.org/zap" ) @@ -48,14 +49,9 @@ type ( wallet.UpdateTx } - ContractStore interface { - ContractExists(ctx context.Context, fcid types.FileContractID) (bool, error) - } - Subscriber struct { cm ChainManager cs ChainStore - css ContractStore logger *zap.SugaredLogger announcementMaxAge time.Duration @@ -89,7 +85,7 @@ type ( } ) -func NewSubscriber(cm ChainManager, cs ChainStore, css ContractStore, walletAddress types.Address, announcementMaxAge time.Duration, logger *zap.Logger) (_ *Subscriber, err error) { +func NewSubscriber(cm ChainManager, cs ChainStore, walletAddress types.Address, announcementMaxAge time.Duration, logger *zap.Logger) (_ *Subscriber, err error) { if announcementMaxAge == 0 { return nil, errors.New("announcementMaxAge must be non-zero") } @@ -98,7 +94,6 @@ func NewSubscriber(cm ChainManager, cs ChainStore, css ContractStore, walletAddr return &Subscriber{ cm: cm, cs: cs, - css: css, logger: logger.Sugar(), announcementMaxAge: announcementMaxAge, @@ -144,7 +139,7 @@ func (s *Subscriber) Run() (func(), error) { if err := s.sync(); errors.Is(err, errClosed) || errors.Is(err, context.Canceled) { return } else if err != nil { - s.logger.Errorf("failed to sync: %v", err) + s.logger.Panicf("failed to sync: %v", err) } } }() @@ -298,7 +293,7 @@ func (s *Subscriber) processUpdates(ctx context.Context, crus []chain.RevertUpda return nil }); err != nil { - return types.ChainIndex{}, fmt.Errorf("failed to process chain update: %w", err) + return types.ChainIndex{}, err } return index, nil } @@ -310,16 +305,19 @@ func (s *Subscriber) updateContract(tx ChainUpdateTx, index types.ChainIndex, fc } // ignore unknown contracts - if known, err := s.isKnownContract(fcid); err != nil { - return fmt.Errorf("failed to check if contract exists: %w", err) - } else if !known { + if !s.isKnownContract(fcid) { return nil } // fetch contract state state, err := tx.ContractState(fcid) - if err != nil { + if err != nil && utils.IsErr(err, api.ErrContractNotFound) { + s.updateKnownContracts(fcid, false) // ignore unknown contracts + return nil + } else if err != nil { return fmt.Errorf("failed to get contract state: %w", err) + } else { + s.updateKnownContracts(fcid, true) // update known contracts } // handle reverts @@ -423,24 +421,20 @@ func (s *Subscriber) isClosed() bool { return false } -func (s *Subscriber) isKnownContract(fcid types.FileContractID) (bool, error) { +func (s *Subscriber) isKnownContract(fcid types.FileContractID) bool { s.mu.Lock() defer s.mu.Unlock() - - // return result from cache known, ok := s.knownContracts[fcid] - if ok { - return known, nil - } - - // check if contract exists in the store - known, err := s.css.ContractExists(s.shutdownCtx, fcid) - if err != nil { - return false, err + if !ok { + return true // assume known } + return known +} +func (s *Subscriber) updateKnownContracts(fcid types.FileContractID, known bool) { + s.mu.Lock() + defer s.mu.Unlock() s.knownContracts[fcid] = known - return known, nil } func v1ContractUpdate(fce types.FileContractElement, rev *types.FileContractElement, resolved, valid bool) contractUpdate { diff --git a/cmd/renterd/main.go b/cmd/renterd/main.go index 393b5bb49f..816fe7099a 100644 --- a/cmd/renterd/main.go +++ b/cmd/renterd/main.go @@ -125,11 +125,10 @@ var ( }, Autopilot: config.Autopilot{ Enabled: true, + RevisionSubmissionBuffer: 144, AccountsRefillInterval: defaultAccountRefillInterval, Heartbeat: 30 * time.Minute, MigrationHealthCutoff: 0.75, - ContractConfirmationDeadline: 18, - RevisionSubmissionBuffer: 144, RevisionBroadcastInterval: 7 * 24 * time.Hour, ScannerBatchSize: 1000, ScannerInterval: 24 * time.Hour, @@ -248,7 +247,6 @@ func main() { flag.StringVar(&cfg.Directory, "dir", cfg.Directory, "Directory for storing node state") flag.BoolVar(&disableStdin, "env", false, "disable stdin prompts for environment variables (default false)") flag.BoolVar(&cfg.AutoOpenWebUI, "openui", cfg.AutoOpenWebUI, "automatically open the web UI on startup") - flag.DurationVar(&cfg.Bus.PersistInterval, "bus.persistInterval", cfg.Bus.PersistInterval, "(deprecated) Interval for persisting consensus updates") // logger flag.StringVar(&cfg.Log.Level, "log.level", cfg.Log.Level, "Global logger level (debug|info|warn|error). Defaults to 'info' (overrides with RENTERD_LOG_LEVEL)") @@ -273,6 +271,7 @@ func main() { flag.Uint64Var(&cfg.Bus.AnnouncementMaxAgeHours, "bus.announcementMaxAgeHours", cfg.Bus.AnnouncementMaxAgeHours, "Max age for announcements") flag.BoolVar(&cfg.Bus.Bootstrap, "bus.bootstrap", cfg.Bus.Bootstrap, "Bootstraps gateway and consensus modules") flag.StringVar(&cfg.Bus.GatewayAddr, "bus.gatewayAddr", cfg.Bus.GatewayAddr, "Address for Sia peer connections (overrides with RENTERD_BUS_GATEWAY_ADDR)") + flag.DurationVar(&cfg.Bus.PersistInterval, "bus.persistInterval", cfg.Bus.PersistInterval, "(deprecated) Interval for persisting consensus updates") flag.DurationVar(&cfg.Bus.UsedUTXOExpiry, "bus.usedUTXOExpiry", cfg.Bus.UsedUTXOExpiry, "Expiry for used UTXOs in transactions") flag.Int64Var(&cfg.Bus.SlabBufferCompletionThreshold, "bus.slabBufferCompletionThreshold", cfg.Bus.SlabBufferCompletionThreshold, "Threshold for slab buffer upload (overrides with RENTERD_BUS_SLAB_BUFFER_COMPLETION_THRESHOLD)") @@ -294,7 +293,6 @@ func main() { flag.Float64Var(&cfg.Autopilot.MigrationHealthCutoff, "autopilot.migrationHealthCutoff", cfg.Autopilot.MigrationHealthCutoff, "Threshold for migrating slabs based on health") flag.DurationVar(&cfg.Autopilot.RevisionBroadcastInterval, "autopilot.revisionBroadcastInterval", cfg.Autopilot.RevisionBroadcastInterval, "Interval for broadcasting contract revisions (overrides with RENTERD_AUTOPILOT_REVISION_BROADCAST_INTERVAL)") flag.Uint64Var(&cfg.Autopilot.RevisionSubmissionBuffer, "autopilot.revisionSubmissionBuffer", cfg.Autopilot.RevisionSubmissionBuffer, "Amount of blocks buffer applied on the contract's end height before archiving it (overrides with RENTERD_AUTOPILOT_REVISION_SUBMISSION_BUFFER)") - flag.Uint64Var(&cfg.Autopilot.ContractConfirmationDeadline, "autopilot.contractConfirmationDeadline", cfg.Autopilot.ContractConfirmationDeadline, "Maximum number of blocks we wait for the contract to appear on chain (overrides with RENTERD_AUTOPILOT_CONTRACT_CONFIRMATION_DEADLINE)") flag.Uint64Var(&cfg.Autopilot.ScannerBatchSize, "autopilot.scannerBatchSize", cfg.Autopilot.ScannerBatchSize, "Batch size for host scanning") flag.DurationVar(&cfg.Autopilot.ScannerInterval, "autopilot.scannerInterval", cfg.Autopilot.ScannerInterval, "Interval for scanning hosts") flag.Uint64Var(&cfg.Autopilot.ScannerNumThreads, "autopilot.scannerNumThreads", cfg.Autopilot.ScannerNumThreads, "Number of threads for scanning hosts") diff --git a/config/config.go b/config/config.go index 018cdc4eca..1602bf75f9 100644 --- a/config/config.go +++ b/config/config.go @@ -132,7 +132,6 @@ type ( AccountsRefillInterval time.Duration `yaml:"accountsRefillInterval,omitempty"` Heartbeat time.Duration `yaml:"heartbeat,omitempty"` MigrationHealthCutoff float64 `yaml:"migrationHealthCutoff,omitempty"` - ContractConfirmationDeadline uint64 `yaml:"contractConfirmationDeadline,omitempty"` RevisionBroadcastInterval time.Duration `yaml:"revisionBroadcastInterval,omitempty"` RevisionSubmissionBuffer uint64 `yaml:"revisionSubmissionBuffer,omitempty"` ScannerInterval time.Duration `yaml:"scannerInterval,omitempty"` diff --git a/internal/node/node.go b/internal/node/node.go index a9482e3a68..557bc50077 100644 --- a/internal/node/node.go +++ b/internal/node/node.go @@ -186,7 +186,7 @@ func NewBus(cfg BusConfig, dir string, seed types.PrivateKey, logger *zap.Logger return nil, nil, nil, nil, err } - cs, err := chain.NewSubscriber(cm, sqlStore, sqlStore, types.StandardUnlockHash(seed.PublicKey()), time.Duration(cfg.AnnouncementMaxAgeHours)*time.Hour, logger.Named("chainsubscriber")) + cs, err := chain.NewSubscriber(cm, sqlStore, types.StandardUnlockHash(seed.PublicKey()), time.Duration(cfg.AnnouncementMaxAgeHours)*time.Hour, logger.Named("chainsubscriber")) if err != nil { return nil, nil, nil, nil, err } @@ -254,7 +254,7 @@ func NewWorker(cfg config.Worker, s3Opts s3.Opts, b Bus, seed types.PrivateKey, } func NewAutopilot(cfg AutopilotConfig, b autopilot.Bus, workers []autopilot.Worker, l *zap.Logger) (http.Handler, RunFn, ShutdownFn, error) { - ap, err := autopilot.New(cfg.ID, b, workers, l, cfg.Heartbeat, cfg.ScannerInterval, cfg.ScannerBatchSize, cfg.ScannerNumThreads, cfg.MigrationHealthCutoff, cfg.AccountsRefillInterval, cfg.ContractConfirmationDeadline, cfg.RevisionSubmissionBuffer, cfg.MigratorParallelSlabsPerWorker, cfg.RevisionBroadcastInterval) + ap, err := autopilot.New(cfg.ID, b, workers, l, cfg.Heartbeat, cfg.ScannerInterval, cfg.ScannerBatchSize, cfg.ScannerNumThreads, cfg.MigrationHealthCutoff, cfg.AccountsRefillInterval, cfg.RevisionSubmissionBuffer, cfg.MigratorParallelSlabsPerWorker, cfg.RevisionBroadcastInterval) if err != nil { return nil, nil, nil, err } diff --git a/internal/test/e2e/__debug_bin1717411325 b/internal/test/e2e/__debug_bin1717411325 deleted file mode 100755 index 0353a696ed..0000000000 Binary files a/internal/test/e2e/__debug_bin1717411325 and /dev/null differ diff --git a/internal/test/e2e/cluster.go b/internal/test/e2e/cluster.go index bdd9d411c7..6800fb51ca 100644 --- a/internal/test/e2e/cluster.go +++ b/internal/test/e2e/cluster.go @@ -526,25 +526,28 @@ func (c *TestCluster) MineToRenewWindow() { c.MineBlocksBlocking(renewWindowStart - cs.BlockHeight) } -// sync blocks until the cluster is synced. -func (c *TestCluster) sync(hosts []*Host) { +// MineBlocksBlocking mines n blocks and blocks until the cluster is synced. +func (c *TestCluster) MineBlocksBlocking(n uint64) { c.tt.Helper() - // fetch current block height to ensure sync height catches up - start, err := c.Bus.ConsensusState(context.Background()) - c.tt.OK(err) + // mine 'n' blocks + c.MineBlocks(n) + + // fetch tip + tip := c.cm.Tip() + // wait until we've caught up with the chain manager's tip c.tt.Retry(300, 100*time.Millisecond, func() error { cs, err := c.Bus.ConsensusState(context.Background()) if err != nil { return err } else if !cs.Synced { - return fmt.Errorf("bus is not synced, last block %v at %v", cs.BlockHeight, cs.LastBlockTime) // can't be synced if bus itself isn't synced - } else if cs.SubscriberHeight < start.BlockHeight { - return fmt.Errorf("bus is not synced, sync height %v < block height %v", cs.SubscriberHeight, cs.BlockHeight) + return errors.New("bus is not synced") + } else if cs.BlockHeight < tip.Height { + return fmt.Errorf("subscriber hasn't caught up, %d < %d", cs.BlockHeight, tip.Height) } - for _, h := range hosts { + for _, h := range c.hosts { if hh := h.cs.Height(); uint64(hh) < cs.BlockHeight { return fmt.Errorf("host %v is not synced, %v < %v", h.PublicKey(), hh, cs.BlockHeight) } @@ -553,13 +556,6 @@ func (c *TestCluster) sync(hosts []*Host) { }) } -// MineBlocksBlocking mines n blocks and blocks until the cluster is synced. -func (c *TestCluster) MineBlocksBlocking(n uint64) { - c.tt.Helper() - c.MineBlocks(n) - c.sync(c.hosts) -} - // MineBlocks mines n blocks func (c *TestCluster) MineBlocks(n uint64) { c.tt.Helper() @@ -956,7 +952,6 @@ func testApCfg() node.AutopilotConfig { Heartbeat: time.Second, MigrationHealthCutoff: 0.99, MigratorParallelSlabsPerWorker: 1, - ContractConfirmationDeadline: 144, RevisionSubmissionBuffer: 0, ScannerInterval: time.Second, ScannerBatchSize: 10, diff --git a/internal/test/e2e/cluster_test.go b/internal/test/e2e/cluster_test.go index 7260b2f3ff..1c252d134e 100644 --- a/internal/test/e2e/cluster_test.go +++ b/internal/test/e2e/cluster_test.go @@ -24,6 +24,7 @@ import ( "go.sia.tech/coreutils/wallet" "go.sia.tech/renterd/alerts" "go.sia.tech/renterd/api" + "go.sia.tech/renterd/autopilot/contractor" "go.sia.tech/renterd/internal/test" "go.sia.tech/renterd/object" "lukechampine.com/frand" @@ -790,14 +791,14 @@ func TestUploadDownloadSpending(t *testing.T) { large := make([]byte, rhpv2.SectorSize*3) files := [][]byte{small, large} - uploadDownload := func(ttt string) { + uploadDownload := func() { t.Helper() for _, data := range files { // prepare some data - make sure it's more than one sector tt.OKAll(frand.Read(data)) // upload the data - path := fmt.Sprintf("data_%v_%s", len(data), ttt) + path := fmt.Sprintf("data_%v", len(data)) tt.OKAll(w.UploadObject(context.Background(), bytes.NewReader(data), api.DefaultBucketName, path, api.UploadObjectOptions{})) // Should be registered in bus. @@ -827,7 +828,7 @@ func TestUploadDownloadSpending(t *testing.T) { } // run uploads once - uploadDownload("1") + uploadDownload() // Fuzzy search for uploaded data in various ways. objects, err := cluster.Bus.SearchObjects(context.Background(), api.DefaultBucketName, api.SearchObjectOptions{}) @@ -880,9 +881,9 @@ func TestUploadDownloadSpending(t *testing.T) { } return nil }) - time.Sleep(10 * time.Second) + // run uploads again - uploadDownload("2") + uploadDownload() // check that the spending was recorded tt.Retry(100, testBusFlushInterval, func() error { @@ -1373,9 +1374,7 @@ func TestUnconfirmedContractArchival(t *testing.T) { } // create a test cluster - apCfg := testApCfg() - apCfg.ContractConfirmationDeadline = 20 - cluster := newTestCluster(t, testClusterOptions{hosts: 1, autopilotCfg: &apCfg}) + cluster := newTestCluster(t, testClusterOptions{hosts: 1}) defer cluster.Shutdown() tt := cluster.tt @@ -1421,7 +1420,7 @@ func TestUnconfirmedContractArchival(t *testing.T) { } // mine enough blocks to ensure we're passed the confirmation deadline - cluster.MineBlocksBlocking(apCfg.ContractConfirmationDeadline + 1) + cluster.MineBlocksBlocking(contractor.ContractConfirmationDeadline + 1) tt.Retry(100, 100*time.Millisecond, func() error { contracts, err := cluster.Bus.Contracts(context.Background(), api.ContractsOpts{}) diff --git a/stores/chain.go b/stores/chain.go index 48c91c0758..8233448415 100644 --- a/stores/chain.go +++ b/stores/chain.go @@ -117,30 +117,31 @@ func (u *chainUpdateTx) ApplyIndex(index types.ChainIndex, created, spent []type // ContractState returns the state of a file contract. func (u *chainUpdateTx) ContractState(fcid types.FileContractID) (api.ContractState, error) { - var state contractState - // try regular contracts - err := u.tx. - Select("state"). + var c dbContract + if err := u.tx. Model(&dbContract{}). Where("fcid", fileContractID(fcid)). - Scan(&state). - Error - - // try archived contracts - if errors.Is(err, gorm.ErrRecordNotFound) { - err = u.tx. - Select("state"). - Model(&dbArchivedContract{}). - Where("fcid", fileContractID(fcid)). - Scan(&state). - Error + Take(&c). + Error; err != nil && !errors.Is(err, gorm.ErrRecordNotFound) { + return "", err + } else if err == nil { + return api.ContractState(c.State.String()), nil } - if err != nil { + // try archived contracts + var ac dbArchivedContract + if err := u.tx. + Model(&dbArchivedContract{}). + Where("fcid", fileContractID(fcid)). + Take(&ac). + Error; err != nil && !errors.Is(err, gorm.ErrRecordNotFound) { return "", err + } else if err == nil { + return api.ContractState(ac.State.String()), nil } - return api.ContractState(state.String()), nil + + return "", api.ErrContractNotFound } // RevertIndex is called with the chain index that is being reverted. Any @@ -281,24 +282,27 @@ func (u *chainUpdateTx) UpdateContractState(fcid types.FileContractID, state api } // try regular contract - res := u.tx. + if res := u.tx. Model(&dbContract{}). Where("fcid", fileContractID(fcid)). - Update("state", cs) + Update("state", cs); res.Error != nil { + return res.Error + } else if res.RowsAffected > 0 { + return nil + } // try archived contract - if errors.Is(res.Error, gorm.ErrRecordNotFound) || (res.Error == nil && res.RowsAffected == 0) { - res = u.tx. - Model(&dbArchivedContract{}). - Where("fcid", fileContractID(fcid)). - Update("state", cs) + if res := u.tx. + Model(&dbArchivedContract{}). + Where("fcid", fileContractID(fcid)). + Update("state", cs); res.Error != nil { + return res.Error + } else if res.RowsAffected > 0 { + return nil } // wrap ErrContractNotFound - if errors.Is(res.Error, gorm.ErrRecordNotFound) || (res.Error == nil && res.RowsAffected == 0) { - return fmt.Errorf("%v %w", fcid, api.ErrContractNotFound) - } - return res.Error + return fmt.Errorf("%v %w", fcid, api.ErrContractNotFound) } // UpdateContractProofHeight updates the proof height of the contract with given @@ -307,24 +311,27 @@ func (u *chainUpdateTx) UpdateContractProofHeight(fcid types.FileContractID, pro u.debug("update contract proof height", "fcid", fcid, "proof_height", proofHeight) // try regular contract - res := u.tx. + if res := u.tx. Model(&dbContract{}). Where("fcid", fileContractID(fcid)). - Update("proof_height", proofHeight) + Update("proof_height", proofHeight); res.Error != nil { + return res.Error + } else if res.RowsAffected > 0 { + return nil + } // try archived contract - if errors.Is(res.Error, gorm.ErrRecordNotFound) || (res.Error == nil && res.RowsAffected == 0) { - res = u.tx. - Model(&dbArchivedContract{}). - Where("fcid", fileContractID(fcid)). - Update("proof_height", proofHeight) + if res := u.tx. + Model(&dbArchivedContract{}). + Where("fcid", fileContractID(fcid)). + Update("proof_height", proofHeight); res.Error != nil { + return res.Error + } else if res.RowsAffected > 0 { + return nil } // wrap api.ErrContractNotFound - if errors.Is(res.Error, gorm.ErrRecordNotFound) || (res.Error == nil && res.RowsAffected == 0) { - return fmt.Errorf("%v %w", fcid, api.ErrContractNotFound) - } - return res.Error + return fmt.Errorf("%v %w", fcid, api.ErrContractNotFound) } // UpdateFailedContracts marks active contract as failed if the current diff --git a/stores/chain_test.go b/stores/chain_test.go index 25b9205571..addb290ec5 100644 --- a/stores/chain_test.go +++ b/stores/chain_test.go @@ -34,7 +34,7 @@ func TestProcessChainUpdate(t *testing.T) { if state, err := tx.ContractState(fcid); err != nil { return err } else if state != api.ContractStatePending { - return fmt.Errorf("unexpected state %v", state) + return fmt.Errorf("unexpected state '%v'", state) } else { return nil } diff --git a/stores/metadata.go b/stores/metadata.go index 4416b3ecaf..458c7f713c 100644 --- a/stores/metadata.go +++ b/stores/metadata.go @@ -727,24 +727,6 @@ func (s *SQLStore) AddContract(ctx context.Context, c rhpv2.ContractRevision, co return added.convert(), nil } -func (s *SQLStore) ContractExists(ctx context.Context, fcid types.FileContractID) (bool, error) { - var exists bool - if err := s.db.WithContext(ctx). - Raw(`SELECT EXISTS(SELECT 1 FROM contracts WHERE fcid = ?)`, fileContractID(fcid)). - Scan(&exists). - Error; err != nil { - return false, err - } else if !exists { - if err := s.db.WithContext(ctx). - Raw(`SELECT EXISTS(SELECT 1 FROM archived_contracts WHERE fcid = ?)`, fileContractID(fcid)). - Scan(&exists). - Error; err != nil { - return false, err - } - } - return exists, nil -} - func (s *SQLStore) Contracts(ctx context.Context, opts api.ContractsOpts) ([]api.ContractMetadata, error) { db := s.db.WithContext(ctx) @@ -1458,8 +1440,10 @@ func (s *SQLStore) RecordContractSpending(ctx context.Context, records []api.Con return err } } - if err := s.RecordContractMetric(ctx, metrics...); err != nil { - s.logger.Errorw("failed to record contract metrics", zap.Error(err)) + if len(metrics) > 0 { + if err := s.RecordContractMetric(ctx, metrics...); err != nil { + s.logger.Errorw("failed to record contract metrics", zap.Error(err)) + } } return nil } diff --git a/stores/metadata_test.go b/stores/metadata_test.go index 4510abde8c..c8dd9b5ad0 100644 --- a/stores/metadata_test.go +++ b/stores/metadata_test.go @@ -810,57 +810,6 @@ func TestRenewedContract(t *testing.T) { } } -func TestContractExists(t *testing.T) { - ss := newTestSQLStore(t, defaultTestSQLStoreConfig) - defer ss.Close() - - // add test host - hk1 := types.PublicKey{1} - err := ss.addTestHost(hk1) - if err != nil { - t.Fatal(err) - } - - // define a helper to assert if a contract exists - assertExists := func(fcid types.FileContractID, expected bool) { - t.Helper() - exists, err := ss.ContractExists(context.Background(), fcid) - if err != nil { - t.Fatal(err) - } else if exists != expected { - t.Fatalf("expected %v but got %v", expected, exists) - } - } - - // define two fcids - fcid1 := types.FileContractID{1} - fcid2 := types.FileContractID{2} - - // both contracts don't exist - assertExists(fcid1, false) - assertExists(fcid2, false) - - // add contract - _, err = ss.addTestContract(fcid1, hk1) - if err != nil { - t.Fatal(err) - } - - // assert contract exists - assertExists(fcid1, true) - assertExists(fcid2, false) - - // renew contract - _, err = ss.addTestRenewedContract(fcid2, fcid1, hk1, 1) - if err != nil { - t.Fatal(err) - } - - // assert both exist - assertExists(fcid1, true) - assertExists(fcid2, true) -} - // TestAncestorsContracts verifies that AncestorContracts returns the right // ancestors in the correct order. func TestAncestorsContracts(t *testing.T) { diff --git a/stores/metrics.go b/stores/metrics.go index 91e411bd5e..6405b2ade5 100644 --- a/stores/metrics.go +++ b/stores/metrics.go @@ -226,10 +226,6 @@ func (s *SQLStore) PerformanceMetrics(ctx context.Context, start time.Time, n ui } func (s *SQLStore) RecordContractMetric(ctx context.Context, metrics ...api.ContractMetric) error { - if len(metrics) == 0 { - return nil - } - dbMetrics := make([]dbContractMetric, len(metrics)) for i, metric := range metrics { dbMetrics[i] = dbContractMetric{