From a5e1b26041ba7e2e72495b3e3b7bc4deec79230d Mon Sep 17 00:00:00 2001 From: Nate Maninger Date: Tue, 14 Nov 2023 11:30:35 -0800 Subject: [PATCH 1/5] sqlite: recalculate collateral metrics --- persist/sqlite/migrations.go | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/persist/sqlite/migrations.go b/persist/sqlite/migrations.go index 0d6b5636..573b3b9d 100644 --- a/persist/sqlite/migrations.go +++ b/persist/sqlite/migrations.go @@ -9,6 +9,33 @@ import ( "go.sia.tech/hostd/host/contracts" ) +// migrateVersion22 recalculates the locked and risked collateral metrics +func migrateVersion22(tx txn) error { + rows, err := tx.Query(`SELECT locked_collateral, risked_collateral FROM contracts WHERE contract_status IN (?, ?);`, contracts.ContractStatusPending, contracts.ContractStatusActive) + if err != nil { + return fmt.Errorf("failed to query contracts: %w", err) + } + defer rows.Close() + + var totalLocked, totalRisked types.Currency + for rows.Next() { + var locked, risked types.Currency + if err := rows.Scan((*sqlCurrency)(&locked), (*sqlCurrency)(&risked)); err != nil { + return fmt.Errorf("failed to scan contract: %w", err) + } + + totalLocked = totalLocked.Add(locked) + totalRisked = totalRisked.Add(risked) + } + + if err := setCurrencyStat(tx, metricLockedCollateral, totalLocked, time.Now()); err != nil { + return fmt.Errorf("failed to increment locked collateral: %w", err) + } else if err := setCurrencyStat(tx, metricRiskedCollateral, totalRisked, time.Now()); err != nil { + return fmt.Errorf("failed to increment risked collateral: %w", err) + } + return nil +} + func migrateVersion21(tx txn) error { const query = ` ALTER TABLE global_settings ADD COLUMN last_announce_key BLOB; @@ -526,4 +553,5 @@ var migrations = []func(tx txn) error{ migrateVersion19, migrateVersion20, migrateVersion21, + migrateVersion22, } From 9addd6adf5f6c8b87b6dc5ea10d2f61111aef859 Mon Sep 17 00:00:00 2001 From: Nate Maninger Date: Tue, 14 Nov 2023 11:48:44 -0800 Subject: [PATCH 2/5] sqlite: update contracts logic --- persist/sqlite/contracts.go | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/persist/sqlite/contracts.go b/persist/sqlite/contracts.go index 6842ad9f..3efb0c9b 100644 --- a/persist/sqlite/contracts.go +++ b/persist/sqlite/contracts.go @@ -58,23 +58,25 @@ func (u *updateContractsTxn) ConfirmFormation(id types.FileContractID) error { contract, err := getContract(u.tx, dbID) if err != nil { return fmt.Errorf("failed to get contract: %w", err) - } else if contract.Status == contracts.ContractStatusRejected { - // rejected contracts have already had their collateral and revenue - // removed, need to re-add it if the contract is now confirmed + } + + // only update the status if the contract is pending or rejected + if contract.Status != contracts.ContractStatusPending && contract.Status != contracts.ContractStatusRejected { + return nil + } + + if err := setContractStatus(u.tx, id, contracts.ContractStatusActive); err != nil { + return fmt.Errorf("failed to set contract status to active: %w", err) + } + // rejected contracts have already had their collateral and revenue removed, + // need to re-add it if the contract is now confirmed + if contract.Status == contracts.ContractStatusRejected { if err := incrementCurrencyStat(u.tx, metricLockedCollateral, contract.LockedCollateral, false, time.Now()); err != nil { return fmt.Errorf("failed to increment locked collateral stat: %w", err) } else if err := incrementCurrencyStat(u.tx, metricRiskedCollateral, contract.Usage.RiskedCollateral, false, time.Now()); err != nil { return fmt.Errorf("failed to increment risked collateral stat: %w", err) } } - - // skip updating the status for contracts that are already marked as - // successful or failed - if contract.Status != contracts.ContractStatusSuccessful && contract.Status != contracts.ContractStatusFailed { - if err := setContractStatus(u.tx, id, contracts.ContractStatusActive); err != nil { - return fmt.Errorf("failed to set contract status to active: %w", err) - } - } return nil } @@ -434,8 +436,6 @@ func (s *Store) ExpireContract(id types.FileContractID, status contracts.Contrac return nil } - // successful, failed, and rejected contracts should have already had their - // collateral removed from the metrics if contract.Status == contracts.ContractStatusActive || contract.Status == contracts.ContractStatusPending { // successful, failed and rejected contracts should have already had // their collateral removed from the metrics From 4b4bedc5830066996890b23139c8651e929af620 Mon Sep 17 00:00:00 2001 From: Nate Maninger Date: Tue, 14 Nov 2023 13:01:29 -0800 Subject: [PATCH 3/5] sqlite: don't mark contracts as active when confirming the revision --- persist/sqlite/contracts.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/persist/sqlite/contracts.go b/persist/sqlite/contracts.go index 3efb0c9b..acccfffd 100644 --- a/persist/sqlite/contracts.go +++ b/persist/sqlite/contracts.go @@ -54,7 +54,7 @@ func (u *updateContractsTxn) ConfirmFormation(id types.FileContractID) error { return fmt.Errorf("failed to confirm formation: %w", err) } - // check if the contract is currently "rejected" + // get the contract's status contract, err := getContract(u.tx, dbID) if err != nil { return fmt.Errorf("failed to get contract: %w", err) @@ -87,8 +87,6 @@ func (u *updateContractsTxn) ConfirmRevision(revision types.FileContractRevision err := u.tx.QueryRow(query, sqlUint64(revision.RevisionNumber), sqlHash256(revision.ParentID)).Scan(&dbID) if err != nil { return fmt.Errorf("failed to confirm revision: %w", err) - } else if err := setContractStatus(u.tx, revision.ParentID, contracts.ContractStatusActive); err != nil { - return fmt.Errorf("failed to set contract status to active: %w", err) } return nil } From 3a0f238be6e3bcaf3625ae66c921fe412e752095 Mon Sep 17 00:00:00 2001 From: Nate Maninger Date: Tue, 14 Nov 2023 13:01:53 -0800 Subject: [PATCH 4/5] sqlite: recalculate revenue metrics for bugged nodes --- persist/sqlite/migrations.go | 52 ++++++++++++++++++++++++++++++------ 1 file changed, 44 insertions(+), 8 deletions(-) diff --git a/persist/sqlite/migrations.go b/persist/sqlite/migrations.go index 573b3b9d..9837832b 100644 --- a/persist/sqlite/migrations.go +++ b/persist/sqlite/migrations.go @@ -9,29 +9,65 @@ import ( "go.sia.tech/hostd/host/contracts" ) -// migrateVersion22 recalculates the locked and risked collateral metrics +// migrateVersion22 recalculates the locked and risked collateral and the +// potential and earned revenue metrics which will be bugged if the host rescans +// the blockchain. func migrateVersion22(tx txn) error { - rows, err := tx.Query(`SELECT locked_collateral, risked_collateral FROM contracts WHERE contract_status IN (?, ?);`, contracts.ContractStatusPending, contracts.ContractStatusActive) + rows, err := tx.Query(`SELECT contract_status, locked_collateral, risked_collateral, rpc_revenue, storage_revenue, ingress_revenue, egress_revenue, account_funding, registry_read, registry_write FROM contracts WHERE contract_status IN (?, ?, ?);`, contracts.ContractStatusPending, contracts.ContractStatusActive, contracts.ContractStatusSuccessful) if err != nil { return fmt.Errorf("failed to query contracts: %w", err) } defer rows.Close() - var totalLocked, totalRisked types.Currency + var totalLocked types.Currency + var totalPending, totalEarned contracts.Usage for rows.Next() { - var locked, risked types.Currency - if err := rows.Scan((*sqlCurrency)(&locked), (*sqlCurrency)(&risked)); err != nil { + var status contracts.ContractStatus + var lockedCollateral types.Currency + var usage contracts.Usage + + if err := rows.Scan(&status, (*sqlCurrency)(&lockedCollateral), (*sqlCurrency)(&usage.RiskedCollateral), (*sqlCurrency)(&usage.RPCRevenue), (*sqlCurrency)(&usage.StorageRevenue), (*sqlCurrency)(&usage.IngressRevenue), (*sqlCurrency)(&usage.EgressRevenue), (*sqlCurrency)(&usage.AccountFunding), (*sqlCurrency)(&usage.RegistryRead), (*sqlCurrency)(&usage.RegistryWrite)); err != nil { return fmt.Errorf("failed to scan contract: %w", err) } - totalLocked = totalLocked.Add(locked) - totalRisked = totalRisked.Add(risked) + switch status { + case contracts.ContractStatusPending, contracts.ContractStatusActive: + totalLocked = totalLocked.Add(lockedCollateral) + totalPending = totalPending.Add(usage) + case contracts.ContractStatusSuccessful: + totalEarned = totalEarned.Add(usage) + } + } if err := setCurrencyStat(tx, metricLockedCollateral, totalLocked, time.Now()); err != nil { return fmt.Errorf("failed to increment locked collateral: %w", err) - } else if err := setCurrencyStat(tx, metricRiskedCollateral, totalRisked, time.Now()); err != nil { + } else if err := setCurrencyStat(tx, metricRiskedCollateral, totalPending.RiskedCollateral, time.Now()); err != nil { return fmt.Errorf("failed to increment risked collateral: %w", err) + } else if err := setCurrencyStat(tx, metricPotentialRPCRevenue, totalPending.RPCRevenue, time.Now()); err != nil { + return fmt.Errorf("failed to increment rpc revenue: %w", err) + } else if err := setCurrencyStat(tx, metricPotentialStorageRevenue, totalPending.StorageRevenue, time.Now()); err != nil { + return fmt.Errorf("failed to increment storage revenue: %w", err) + } else if err := setCurrencyStat(tx, metricPotentialIngressRevenue, totalPending.IngressRevenue, time.Now()); err != nil { + return fmt.Errorf("failed to increment ingress revenue: %w", err) + } else if err := setCurrencyStat(tx, metricPotentialEgressRevenue, totalPending.EgressRevenue, time.Now()); err != nil { + return fmt.Errorf("failed to increment egress revenue: %w", err) + } else if err := setCurrencyStat(tx, metricPotentialRegistryReadRevenue, totalPending.RegistryRead, time.Now()); err != nil { + return fmt.Errorf("failed to increment read registry revenue: %w", err) + } else if err := setCurrencyStat(tx, metricPotentialRegistryWriteRevenue, totalPending.RegistryWrite, time.Now()); err != nil { + return fmt.Errorf("failed to increment write registry revenue: %w", err) + } else if err := setCurrencyStat(tx, metricEarnedRPCRevenue, totalEarned.RPCRevenue, time.Now()); err != nil { + return fmt.Errorf("failed to increment rpc revenue: %w", err) + } else if err := setCurrencyStat(tx, metricEarnedStorageRevenue, totalEarned.StorageRevenue, time.Now()); err != nil { + return fmt.Errorf("failed to increment storage revenue: %w", err) + } else if err := setCurrencyStat(tx, metricEarnedIngressRevenue, totalEarned.IngressRevenue, time.Now()); err != nil { + return fmt.Errorf("failed to increment ingress revenue: %w", err) + } else if err := setCurrencyStat(tx, metricEarnedEgressRevenue, totalEarned.EgressRevenue, time.Now()); err != nil { + return fmt.Errorf("failed to increment egress revenue: %w", err) + } else if err := setCurrencyStat(tx, metricEarnedRegistryReadRevenue, totalEarned.RegistryRead, time.Now()); err != nil { + return fmt.Errorf("failed to increment read registry revenue: %w", err) + } else if err := setCurrencyStat(tx, metricEarnedRegistryWriteRevenue, totalEarned.RegistryWrite, time.Now()); err != nil { + return fmt.Errorf("failed to increment write registry revenue: %w", err) } return nil } From f8cd1365015dd8d59c08bf532f840489114d63f1 Mon Sep 17 00:00:00 2001 From: Nate Maninger Date: Tue, 14 Nov 2023 13:32:55 -0800 Subject: [PATCH 5/5] sqlite: pass log to migration func --- persist/sqlite/init.go | 2 +- persist/sqlite/migrations.go | 48 +++++++++++++++++++----------------- 2 files changed, 26 insertions(+), 24 deletions(-) diff --git a/persist/sqlite/init.go b/persist/sqlite/init.go index 6f289696..b2772e49 100644 --- a/persist/sqlite/init.go +++ b/persist/sqlite/init.go @@ -50,7 +50,7 @@ func (s *Store) upgradeDatabase(current, target int64) error { for _, fn := range migrations[current-1:] { current++ start := time.Now() - if err := fn(tx); err != nil { + if err := fn(tx, log.With(zap.Int64("version", current))); err != nil { return fmt.Errorf("failed to migrate database to version %v: %w", current, err) } // check that no foreign key constraints were violated diff --git a/persist/sqlite/migrations.go b/persist/sqlite/migrations.go index 9837832b..e7e04886 100644 --- a/persist/sqlite/migrations.go +++ b/persist/sqlite/migrations.go @@ -7,12 +7,13 @@ import ( "go.sia.tech/core/types" "go.sia.tech/hostd/host/contracts" + "go.uber.org/zap" ) // migrateVersion22 recalculates the locked and risked collateral and the // potential and earned revenue metrics which will be bugged if the host rescans // the blockchain. -func migrateVersion22(tx txn) error { +func migrateVersion22(tx txn, log *zap.Logger) error { rows, err := tx.Query(`SELECT contract_status, locked_collateral, risked_collateral, rpc_revenue, storage_revenue, ingress_revenue, egress_revenue, account_funding, registry_read, registry_write FROM contracts WHERE contract_status IN (?, ?, ?);`, contracts.ContractStatusPending, contracts.ContractStatusActive, contracts.ContractStatusSuccessful) if err != nil { return fmt.Errorf("failed to query contracts: %w", err) @@ -37,9 +38,10 @@ func migrateVersion22(tx txn) error { case contracts.ContractStatusSuccessful: totalEarned = totalEarned.Add(usage) } - } + log.Debug("resetting metrics", zap.Stringer("lockedCollateral", totalLocked), zap.Stringer("riskedCollateral", totalPending.RiskedCollateral)) + if err := setCurrencyStat(tx, metricLockedCollateral, totalLocked, time.Now()); err != nil { return fmt.Errorf("failed to increment locked collateral: %w", err) } else if err := setCurrencyStat(tx, metricRiskedCollateral, totalPending.RiskedCollateral, time.Now()); err != nil { @@ -72,7 +74,7 @@ func migrateVersion22(tx txn) error { return nil } -func migrateVersion21(tx txn) error { +func migrateVersion21(tx txn, _ *zap.Logger) error { const query = ` ALTER TABLE global_settings ADD COLUMN last_announce_key BLOB; ALTER TABLE global_settings ADD COLUMN settings_last_processed_change BLOB; @@ -86,13 +88,13 @@ ALTER TABLE global_settings ADD COLUMN last_announce_address TEXT; } // migrateVersion20 adds a compound index to the volume_sectors table -func migrateVersion20(tx txn) error { +func migrateVersion20(tx txn, _ *zap.Logger) error { _, err := tx.Exec(`CREATE INDEX volume_sectors_volume_id_sector_id_volume_index_set_compound ON volume_sectors (volume_id, sector_id, volume_index) WHERE sector_id IS NOT NULL;`) return err } // migrateVersion19 adds a compound index to the volume_sectors table -func migrateVersion19(tx txn) error { +func migrateVersion19(tx txn, _ *zap.Logger) error { const query = ` DROP INDEX storage_volumes_read_only_available; CREATE INDEX storage_volumes_id_available_read_only ON storage_volumes(id, available, read_only); @@ -104,7 +106,7 @@ CREATE INDEX volume_sectors_volume_id_sector_id_volume_index_compound ON volume_ // migrateVersion18 adds an index to the volume_sectors table to speed up // empty sector selection. -func migrateVersion18(tx txn) error { +func migrateVersion18(tx txn, _ *zap.Logger) error { const query = `CREATE INDEX volume_sectors_volume_id_sector_id ON volume_sectors(volume_id, sector_id);` _, err := tx.Exec(query) return err @@ -113,7 +115,7 @@ func migrateVersion18(tx txn) error { // migrateVersion17 recalculates the indices of all contract sector roots. // Fixes a bug where the indices were not being properly updated if more than // one root was trimmed. -func migrateVersion17(tx txn) error { +func migrateVersion17(tx txn, _ *zap.Logger) error { const query = ` -- create a temp table that contains the new indices CREATE TEMP TABLE temp_contract_sector_roots AS @@ -131,7 +133,7 @@ DROP TABLE temp_contract_sector_roots;` } // migrateVersion16 recalculates the contract and physical sector metrics. -func migrateVersion16(tx txn) error { +func migrateVersion16(tx txn, _ *zap.Logger) error { // recalculate the contract sectors metric var contractSectorCount int64 if err := tx.QueryRow(`SELECT COUNT(*) FROM contract_sector_roots`).Scan(&contractSectorCount); err != nil { @@ -176,7 +178,7 @@ func migrateVersion16(tx txn) error { // migrateVersion15 adds the registry usage fields to the contracts table, // removes the usage fields from the accounts table, and refactors the // contract_account_funding table. -func migrateVersion15(tx txn) error { +func migrateVersion15(tx txn, _ *zap.Logger) error { const query = ` -- drop the tables that are being removed or refactored DROP TABLE account_financial_records; @@ -280,7 +282,7 @@ CREATE INDEX contracts_formation_confirmed_negotiation_height ON contracts(forma // migrateVersion14 adds the locked_sectors table, recalculates the contract // sectors metric, and recalculates the physical sectors metric. -func migrateVersion14(tx txn) error { +func migrateVersion14(tx txn, _ *zap.Logger) error { // create the new locked sectors table const lockedSectorsTableQuery = `CREATE TABLE locked_sectors ( -- should be cleared at startup. currently persisted for simplicity, but may be moved to memory id INTEGER PRIMARY KEY, @@ -311,19 +313,19 @@ CREATE INDEX locked_sectors_sector_id ON locked_sectors(sector_id);` } // migrateVersion13 adds an index to the storage table to speed up location selection -func migrateVersion13(tx txn) error { +func migrateVersion13(tx txn, _ *zap.Logger) error { _, err := tx.Exec(`CREATE INDEX storage_volumes_read_only_available_used_sectors ON storage_volumes(available, read_only, used_sectors);`) return err } // migrateVersion12 adds an index to the contracts table to speed up sector pruning -func migrateVersion12(tx txn) error { +func migrateVersion12(tx txn, _ *zap.Logger) error { _, err := tx.Exec(`CREATE INDEX contracts_window_end ON contracts(window_end);`) return err } // migrateVersion11 recalculates the contract collateral metrics for existing contracts. -func migrateVersion11(tx txn) error { +func migrateVersion11(tx txn, _ *zap.Logger) error { rows, err := tx.Query(`SELECT locked_collateral, risked_collateral FROM contracts WHERE contract_status IN (?, ?)`, contracts.ContractStatusPending, contracts.ContractStatusActive) if err != nil { return fmt.Errorf("failed to query contracts: %w", err) @@ -348,13 +350,13 @@ func migrateVersion11(tx txn) error { } // migrateVersion10 drops the log_lines table. -func migrateVersion10(tx txn) error { +func migrateVersion10(tx txn, _ *zap.Logger) error { _, err := tx.Exec(`DROP TABLE log_lines;`) return err } // migrateVersion9 recalculates the contract metrics for existing contracts. -func migrateVersion9(tx txn) error { +func migrateVersion9(tx txn, _ *zap.Logger) error { rows, err := tx.Query(`SELECT contract_status, COUNT(*) FROM contracts GROUP BY contract_status`) if err != nil { return fmt.Errorf("failed to query contracts: %w", err) @@ -392,7 +394,7 @@ func migrateVersion9(tx txn) error { // migrateVersion8 sets the initial values for the locked and risked collateral // metrics for existing hosts -func migrateVersion8(tx txn) error { +func migrateVersion8(tx txn, _ *zap.Logger) error { rows, err := tx.Query(`SELECT locked_collateral, risked_collateral FROM contracts WHERE contract_status IN (?, ?)`, contracts.ContractStatusPending, contracts.ContractStatusActive) if err != nil { return fmt.Errorf("failed to query contracts: %w", err) @@ -421,14 +423,14 @@ func migrateVersion8(tx txn) error { } // migrateVersion7 adds the sector_cache_size column to the host_settings table -func migrateVersion7(tx txn) error { +func migrateVersion7(tx txn, _ *zap.Logger) error { _, err := tx.Exec(`ALTER TABLE host_settings ADD COLUMN sector_cache_size INTEGER NOT NULL DEFAULT 0;`) return err } // migrateVersion6 fixes a bug where the physical sectors metric was not being // properly decreased when a volume is force removed. -func migrateVersion6(tx txn) error { +func migrateVersion6(tx txn, _ *zap.Logger) error { var count int64 if err := tx.QueryRow(`SELECT COUNT(id) FROM volume_sectors WHERE sector_id IS NOT NULL`).Scan(&count); err != nil { return fmt.Errorf("failed to count volume sectors: %w", err) @@ -441,7 +443,7 @@ func migrateVersion6(tx txn) error { // the contract sectors metric will drastically increase for existing hosts. // This is unavoidable, as we have no way of knowing how many sectors were // previously renewed. -func migrateVersion5(tx txn) error { +func migrateVersion5(tx txn, _ *zap.Logger) error { var count int64 if err := tx.QueryRow(`SELECT COUNT(*) FROM contract_sector_roots`).Scan(&count); err != nil { return fmt.Errorf("failed to count contract sector roots: %w", err) @@ -451,7 +453,7 @@ func migrateVersion5(tx txn) error { } // migrateVersion4 changes the collateral setting to collateral_multiplier -func migrateVersion4(tx txn) error { +func migrateVersion4(tx txn, _ *zap.Logger) error { const ( newSettingsSchema = `CREATE TABLE host_settings ( id INTEGER PRIMARY KEY NOT NULL DEFAULT 0 CHECK (id = 0), -- enforce a single row @@ -507,13 +509,13 @@ egress_limit, ddns_provider, ddns_update_v4, ddns_update_v6, ddns_opts, registry // migrateVersion3 adds a wallet hash to the global settings table to detect // when the private key has changed. -func migrateVersion3(tx txn) error { +func migrateVersion3(tx txn, _ *zap.Logger) error { _, err := tx.Exec(`ALTER TABLE global_settings ADD COLUMN wallet_hash BLOB;`) return err } // migrateVersion2 removes the min prefix from the price columns in host_settings -func migrateVersion2(tx txn) error { +func migrateVersion2(tx txn, _ *zap.Logger) error { const ( newSettingsSchema = `CREATE TABLE host_settings ( id INTEGER PRIMARY KEY NOT NULL DEFAULT 0 CHECK (id = 0), -- enforce a single row @@ -568,7 +570,7 @@ egress_limit, dyn_dns_provider, dns_update_v4, dns_update_v6, dyn_dns_opts, regi // migrations is a list of functions that are run to migrate the database from // one version to the next. Migrations are used to update existing databases to // match the schema in init.sql. -var migrations = []func(tx txn) error{ +var migrations = []func(tx txn, log *zap.Logger) error{ migrateVersion2, migrateVersion3, migrateVersion4,