Skip to content

Commit

Permalink
Merge pull request #208 from SiaFoundation/nate/recalc-collateral
Browse files Browse the repository at this point in the history
Recalculate collateral metrics
  • Loading branch information
n8maninger authored Nov 14, 2023
2 parents d09df8c + f8cd136 commit da44153
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 38 deletions.
30 changes: 14 additions & 16 deletions persist/sqlite/contracts.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,27 +54,29 @@ func (u *updateContractsTxn) ConfirmFormation(id types.FileContractID) error {
return fmt.Errorf("failed to confirm formation: %w", err)
}

// check if the contract is currently "rejected"
// get the contract's status
contract, err := getContract(u.tx, dbID)
if err != nil {
return fmt.Errorf("failed to get contract: %w", err)
} else if contract.Status == contracts.ContractStatusRejected {
// rejected contracts have already had their collateral and revenue
// removed, need to re-add it if the contract is now confirmed
}

// only update the status if the contract is pending or rejected
if contract.Status != contracts.ContractStatusPending && contract.Status != contracts.ContractStatusRejected {
return nil
}

if err := setContractStatus(u.tx, id, contracts.ContractStatusActive); err != nil {
return fmt.Errorf("failed to set contract status to active: %w", err)
}
// rejected contracts have already had their collateral and revenue removed,
// need to re-add it if the contract is now confirmed
if contract.Status == contracts.ContractStatusRejected {
if err := incrementCurrencyStat(u.tx, metricLockedCollateral, contract.LockedCollateral, false, time.Now()); err != nil {
return fmt.Errorf("failed to increment locked collateral stat: %w", err)
} else if err := incrementCurrencyStat(u.tx, metricRiskedCollateral, contract.Usage.RiskedCollateral, false, time.Now()); err != nil {
return fmt.Errorf("failed to increment risked collateral stat: %w", err)
}
}

// skip updating the status for contracts that are already marked as
// successful or failed
if contract.Status != contracts.ContractStatusSuccessful && contract.Status != contracts.ContractStatusFailed {
if err := setContractStatus(u.tx, id, contracts.ContractStatusActive); err != nil {
return fmt.Errorf("failed to set contract status to active: %w", err)
}
}
return nil
}

Expand All @@ -85,8 +87,6 @@ func (u *updateContractsTxn) ConfirmRevision(revision types.FileContractRevision
err := u.tx.QueryRow(query, sqlUint64(revision.RevisionNumber), sqlHash256(revision.ParentID)).Scan(&dbID)
if err != nil {
return fmt.Errorf("failed to confirm revision: %w", err)
} else if err := setContractStatus(u.tx, revision.ParentID, contracts.ContractStatusActive); err != nil {
return fmt.Errorf("failed to set contract status to active: %w", err)
}
return nil
}
Expand Down Expand Up @@ -434,8 +434,6 @@ func (s *Store) ExpireContract(id types.FileContractID, status contracts.Contrac
return nil
}

// successful, failed, and rejected contracts should have already had their
// collateral removed from the metrics
if contract.Status == contracts.ContractStatusActive || contract.Status == contracts.ContractStatusPending {
// successful, failed and rejected contracts should have already had
// their collateral removed from the metrics
Expand Down
2 changes: 1 addition & 1 deletion persist/sqlite/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (s *Store) upgradeDatabase(current, target int64) error {
for _, fn := range migrations[current-1:] {
current++
start := time.Now()
if err := fn(tx); err != nil {
if err := fn(tx, log.With(zap.Int64("version", current))); err != nil {
return fmt.Errorf("failed to migrate database to version %v: %w", current, err)
}
// check that no foreign key constraints were violated
Expand Down
108 changes: 87 additions & 21 deletions persist/sqlite/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,74 @@ import (

"go.sia.tech/core/types"
"go.sia.tech/hostd/host/contracts"
"go.uber.org/zap"
)

func migrateVersion21(tx txn) error {
// migrateVersion22 recalculates the locked and risked collateral and the
// potential and earned revenue metrics which will be bugged if the host rescans
// the blockchain.
func migrateVersion22(tx txn, log *zap.Logger) error {
rows, err := tx.Query(`SELECT contract_status, locked_collateral, risked_collateral, rpc_revenue, storage_revenue, ingress_revenue, egress_revenue, account_funding, registry_read, registry_write FROM contracts WHERE contract_status IN (?, ?, ?);`, contracts.ContractStatusPending, contracts.ContractStatusActive, contracts.ContractStatusSuccessful)
if err != nil {
return fmt.Errorf("failed to query contracts: %w", err)
}
defer rows.Close()

var totalLocked types.Currency
var totalPending, totalEarned contracts.Usage
for rows.Next() {
var status contracts.ContractStatus
var lockedCollateral types.Currency
var usage contracts.Usage

if err := rows.Scan(&status, (*sqlCurrency)(&lockedCollateral), (*sqlCurrency)(&usage.RiskedCollateral), (*sqlCurrency)(&usage.RPCRevenue), (*sqlCurrency)(&usage.StorageRevenue), (*sqlCurrency)(&usage.IngressRevenue), (*sqlCurrency)(&usage.EgressRevenue), (*sqlCurrency)(&usage.AccountFunding), (*sqlCurrency)(&usage.RegistryRead), (*sqlCurrency)(&usage.RegistryWrite)); err != nil {
return fmt.Errorf("failed to scan contract: %w", err)
}

switch status {
case contracts.ContractStatusPending, contracts.ContractStatusActive:
totalLocked = totalLocked.Add(lockedCollateral)
totalPending = totalPending.Add(usage)
case contracts.ContractStatusSuccessful:
totalEarned = totalEarned.Add(usage)
}
}

log.Debug("resetting metrics", zap.Stringer("lockedCollateral", totalLocked), zap.Stringer("riskedCollateral", totalPending.RiskedCollateral))

if err := setCurrencyStat(tx, metricLockedCollateral, totalLocked, time.Now()); err != nil {
return fmt.Errorf("failed to increment locked collateral: %w", err)
} else if err := setCurrencyStat(tx, metricRiskedCollateral, totalPending.RiskedCollateral, time.Now()); err != nil {
return fmt.Errorf("failed to increment risked collateral: %w", err)
} else if err := setCurrencyStat(tx, metricPotentialRPCRevenue, totalPending.RPCRevenue, time.Now()); err != nil {
return fmt.Errorf("failed to increment rpc revenue: %w", err)
} else if err := setCurrencyStat(tx, metricPotentialStorageRevenue, totalPending.StorageRevenue, time.Now()); err != nil {
return fmt.Errorf("failed to increment storage revenue: %w", err)
} else if err := setCurrencyStat(tx, metricPotentialIngressRevenue, totalPending.IngressRevenue, time.Now()); err != nil {
return fmt.Errorf("failed to increment ingress revenue: %w", err)
} else if err := setCurrencyStat(tx, metricPotentialEgressRevenue, totalPending.EgressRevenue, time.Now()); err != nil {
return fmt.Errorf("failed to increment egress revenue: %w", err)
} else if err := setCurrencyStat(tx, metricPotentialRegistryReadRevenue, totalPending.RegistryRead, time.Now()); err != nil {
return fmt.Errorf("failed to increment read registry revenue: %w", err)
} else if err := setCurrencyStat(tx, metricPotentialRegistryWriteRevenue, totalPending.RegistryWrite, time.Now()); err != nil {
return fmt.Errorf("failed to increment write registry revenue: %w", err)
} else if err := setCurrencyStat(tx, metricEarnedRPCRevenue, totalEarned.RPCRevenue, time.Now()); err != nil {
return fmt.Errorf("failed to increment rpc revenue: %w", err)
} else if err := setCurrencyStat(tx, metricEarnedStorageRevenue, totalEarned.StorageRevenue, time.Now()); err != nil {
return fmt.Errorf("failed to increment storage revenue: %w", err)
} else if err := setCurrencyStat(tx, metricEarnedIngressRevenue, totalEarned.IngressRevenue, time.Now()); err != nil {
return fmt.Errorf("failed to increment ingress revenue: %w", err)
} else if err := setCurrencyStat(tx, metricEarnedEgressRevenue, totalEarned.EgressRevenue, time.Now()); err != nil {
return fmt.Errorf("failed to increment egress revenue: %w", err)
} else if err := setCurrencyStat(tx, metricEarnedRegistryReadRevenue, totalEarned.RegistryRead, time.Now()); err != nil {
return fmt.Errorf("failed to increment read registry revenue: %w", err)
} else if err := setCurrencyStat(tx, metricEarnedRegistryWriteRevenue, totalEarned.RegistryWrite, time.Now()); err != nil {
return fmt.Errorf("failed to increment write registry revenue: %w", err)
}
return nil
}

func migrateVersion21(tx txn, _ *zap.Logger) error {
const query = `
ALTER TABLE global_settings ADD COLUMN last_announce_key BLOB;
ALTER TABLE global_settings ADD COLUMN settings_last_processed_change BLOB;
Expand All @@ -23,13 +88,13 @@ ALTER TABLE global_settings ADD COLUMN last_announce_address TEXT;
}

// migrateVersion20 adds a compound index to the volume_sectors table
func migrateVersion20(tx txn) error {
func migrateVersion20(tx txn, _ *zap.Logger) error {
_, err := tx.Exec(`CREATE INDEX volume_sectors_volume_id_sector_id_volume_index_set_compound ON volume_sectors (volume_id, sector_id, volume_index) WHERE sector_id IS NOT NULL;`)
return err
}

// migrateVersion19 adds a compound index to the volume_sectors table
func migrateVersion19(tx txn) error {
func migrateVersion19(tx txn, _ *zap.Logger) error {
const query = `
DROP INDEX storage_volumes_read_only_available;
CREATE INDEX storage_volumes_id_available_read_only ON storage_volumes(id, available, read_only);
Expand All @@ -41,7 +106,7 @@ CREATE INDEX volume_sectors_volume_id_sector_id_volume_index_compound ON volume_

// migrateVersion18 adds an index to the volume_sectors table to speed up
// empty sector selection.
func migrateVersion18(tx txn) error {
func migrateVersion18(tx txn, _ *zap.Logger) error {
const query = `CREATE INDEX volume_sectors_volume_id_sector_id ON volume_sectors(volume_id, sector_id);`
_, err := tx.Exec(query)
return err
Expand All @@ -50,7 +115,7 @@ func migrateVersion18(tx txn) error {
// migrateVersion17 recalculates the indices of all contract sector roots.
// Fixes a bug where the indices were not being properly updated if more than
// one root was trimmed.
func migrateVersion17(tx txn) error {
func migrateVersion17(tx txn, _ *zap.Logger) error {
const query = `
-- create a temp table that contains the new indices
CREATE TEMP TABLE temp_contract_sector_roots AS
Expand All @@ -68,7 +133,7 @@ DROP TABLE temp_contract_sector_roots;`
}

// migrateVersion16 recalculates the contract and physical sector metrics.
func migrateVersion16(tx txn) error {
func migrateVersion16(tx txn, _ *zap.Logger) error {
// recalculate the contract sectors metric
var contractSectorCount int64
if err := tx.QueryRow(`SELECT COUNT(*) FROM contract_sector_roots`).Scan(&contractSectorCount); err != nil {
Expand Down Expand Up @@ -113,7 +178,7 @@ func migrateVersion16(tx txn) error {
// migrateVersion15 adds the registry usage fields to the contracts table,
// removes the usage fields from the accounts table, and refactors the
// contract_account_funding table.
func migrateVersion15(tx txn) error {
func migrateVersion15(tx txn, _ *zap.Logger) error {
const query = `
-- drop the tables that are being removed or refactored
DROP TABLE account_financial_records;
Expand Down Expand Up @@ -217,7 +282,7 @@ CREATE INDEX contracts_formation_confirmed_negotiation_height ON contracts(forma

// migrateVersion14 adds the locked_sectors table, recalculates the contract
// sectors metric, and recalculates the physical sectors metric.
func migrateVersion14(tx txn) error {
func migrateVersion14(tx txn, _ *zap.Logger) error {
// create the new locked sectors table
const lockedSectorsTableQuery = `CREATE TABLE locked_sectors ( -- should be cleared at startup. currently persisted for simplicity, but may be moved to memory
id INTEGER PRIMARY KEY,
Expand Down Expand Up @@ -248,19 +313,19 @@ CREATE INDEX locked_sectors_sector_id ON locked_sectors(sector_id);`
}

// migrateVersion13 adds an index to the storage table to speed up location selection
func migrateVersion13(tx txn) error {
func migrateVersion13(tx txn, _ *zap.Logger) error {
_, err := tx.Exec(`CREATE INDEX storage_volumes_read_only_available_used_sectors ON storage_volumes(available, read_only, used_sectors);`)
return err
}

// migrateVersion12 adds an index to the contracts table to speed up sector pruning
func migrateVersion12(tx txn) error {
func migrateVersion12(tx txn, _ *zap.Logger) error {
_, err := tx.Exec(`CREATE INDEX contracts_window_end ON contracts(window_end);`)
return err
}

// migrateVersion11 recalculates the contract collateral metrics for existing contracts.
func migrateVersion11(tx txn) error {
func migrateVersion11(tx txn, _ *zap.Logger) error {
rows, err := tx.Query(`SELECT locked_collateral, risked_collateral FROM contracts WHERE contract_status IN (?, ?)`, contracts.ContractStatusPending, contracts.ContractStatusActive)
if err != nil {
return fmt.Errorf("failed to query contracts: %w", err)
Expand All @@ -285,13 +350,13 @@ func migrateVersion11(tx txn) error {
}

// migrateVersion10 drops the log_lines table.
func migrateVersion10(tx txn) error {
func migrateVersion10(tx txn, _ *zap.Logger) error {
_, err := tx.Exec(`DROP TABLE log_lines;`)
return err
}

// migrateVersion9 recalculates the contract metrics for existing contracts.
func migrateVersion9(tx txn) error {
func migrateVersion9(tx txn, _ *zap.Logger) error {
rows, err := tx.Query(`SELECT contract_status, COUNT(*) FROM contracts GROUP BY contract_status`)
if err != nil {
return fmt.Errorf("failed to query contracts: %w", err)
Expand Down Expand Up @@ -329,7 +394,7 @@ func migrateVersion9(tx txn) error {

// migrateVersion8 sets the initial values for the locked and risked collateral
// metrics for existing hosts
func migrateVersion8(tx txn) error {
func migrateVersion8(tx txn, _ *zap.Logger) error {
rows, err := tx.Query(`SELECT locked_collateral, risked_collateral FROM contracts WHERE contract_status IN (?, ?)`, contracts.ContractStatusPending, contracts.ContractStatusActive)
if err != nil {
return fmt.Errorf("failed to query contracts: %w", err)
Expand Down Expand Up @@ -358,14 +423,14 @@ func migrateVersion8(tx txn) error {
}

// migrateVersion7 adds the sector_cache_size column to the host_settings table
func migrateVersion7(tx txn) error {
func migrateVersion7(tx txn, _ *zap.Logger) error {
_, err := tx.Exec(`ALTER TABLE host_settings ADD COLUMN sector_cache_size INTEGER NOT NULL DEFAULT 0;`)
return err
}

// migrateVersion6 fixes a bug where the physical sectors metric was not being
// properly decreased when a volume is force removed.
func migrateVersion6(tx txn) error {
func migrateVersion6(tx txn, _ *zap.Logger) error {
var count int64
if err := tx.QueryRow(`SELECT COUNT(id) FROM volume_sectors WHERE sector_id IS NOT NULL`).Scan(&count); err != nil {
return fmt.Errorf("failed to count volume sectors: %w", err)
Expand All @@ -378,7 +443,7 @@ func migrateVersion6(tx txn) error {
// the contract sectors metric will drastically increase for existing hosts.
// This is unavoidable, as we have no way of knowing how many sectors were
// previously renewed.
func migrateVersion5(tx txn) error {
func migrateVersion5(tx txn, _ *zap.Logger) error {
var count int64
if err := tx.QueryRow(`SELECT COUNT(*) FROM contract_sector_roots`).Scan(&count); err != nil {
return fmt.Errorf("failed to count contract sector roots: %w", err)
Expand All @@ -388,7 +453,7 @@ func migrateVersion5(tx txn) error {
}

// migrateVersion4 changes the collateral setting to collateral_multiplier
func migrateVersion4(tx txn) error {
func migrateVersion4(tx txn, _ *zap.Logger) error {
const (
newSettingsSchema = `CREATE TABLE host_settings (
id INTEGER PRIMARY KEY NOT NULL DEFAULT 0 CHECK (id = 0), -- enforce a single row
Expand Down Expand Up @@ -444,13 +509,13 @@ egress_limit, ddns_provider, ddns_update_v4, ddns_update_v6, ddns_opts, registry

// migrateVersion3 adds a wallet hash to the global settings table to detect
// when the private key has changed.
func migrateVersion3(tx txn) error {
func migrateVersion3(tx txn, _ *zap.Logger) error {
_, err := tx.Exec(`ALTER TABLE global_settings ADD COLUMN wallet_hash BLOB;`)
return err
}

// migrateVersion2 removes the min prefix from the price columns in host_settings
func migrateVersion2(tx txn) error {
func migrateVersion2(tx txn, _ *zap.Logger) error {
const (
newSettingsSchema = `CREATE TABLE host_settings (
id INTEGER PRIMARY KEY NOT NULL DEFAULT 0 CHECK (id = 0), -- enforce a single row
Expand Down Expand Up @@ -505,7 +570,7 @@ egress_limit, dyn_dns_provider, dns_update_v4, dns_update_v6, dyn_dns_opts, regi
// migrations is a list of functions that are run to migrate the database from
// one version to the next. Migrations are used to update existing databases to
// match the schema in init.sql.
var migrations = []func(tx txn) error{
var migrations = []func(tx txn, log *zap.Logger) error{
migrateVersion2,
migrateVersion3,
migrateVersion4,
Expand All @@ -526,4 +591,5 @@ var migrations = []func(tx txn) error{
migrateVersion19,
migrateVersion20,
migrateVersion21,
migrateVersion22,
}

0 comments on commit da44153

Please sign in to comment.