From e9e7f5d40d5e2a68ed4a4cf7dfaacf79f60f9162 Mon Sep 17 00:00:00 2001 From: PJ Date: Tue, 28 Nov 2023 13:07:15 +0100 Subject: [PATCH] stores: add wallet metric --- api/metrcis.go | 15 ++ bus/bus.go | 14 ++ internal/node/node.go | 3 + stores/hostdb_test.go | 2 +- stores/metrics.go | 375 +++++++++++++++++++++-------------- stores/metrics_test.go | 47 +++++ stores/migrations.go | 306 ++++++++++------------------ stores/migrations_metrics.go | 63 ++++++ stores/migrations_utils.go | 69 +++++++ stores/sql.go | 6 +- stores/sql_test.go | 2 - stores/types.go | 26 ++- wallet/wallet.go | 34 ++++ wallet/wallet_test.go | 5 + 14 files changed, 606 insertions(+), 361 deletions(-) create mode 100644 stores/migrations_metrics.go create mode 100644 stores/migrations_utils.go diff --git a/api/metrcis.go b/api/metrcis.go index 63c77365b..34479eaaa 100644 --- a/api/metrcis.go +++ b/api/metrcis.go @@ -13,6 +13,7 @@ const ( MetricContractSet = "contractset" MetricContractSetChurn = "churn" MetricContract = "contract" + MetricWallet = "wallet" ) type ( @@ -75,6 +76,20 @@ type ( ContractID types.FileContractID HostKey types.PublicKey } + + WalletMetric struct { + Timestamp time.Time `json:"timestamp"` + + Address types.Address `json:"address"` + + ConfirmedBalance types.Currency `json:"confirmedBalance"` + SpendableBalance types.Currency `json:"spendableBalance"` + UnconfirmedBalance types.Currency `json:"unconfirmedBalance"` + } + + WalletMetricsQueryOpts struct { + Address types.Address + } ) type ( diff --git a/bus/bus.go b/bus/bus.go index d1ffb5e37..423241e15 100644 --- a/bus/bus.go +++ b/bus/bus.go @@ -201,6 +201,8 @@ type ( ContractSetChurnMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.ContractSetChurnMetricsQueryOpts) ([]api.ContractSetChurnMetric, error) RecordContractSetChurnMetric(ctx context.Context, metrics ...api.ContractSetChurnMetric) error + + WalletMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.WalletMetricsQueryOpts) ([]api.WalletMetric, error) } ) @@ -2059,6 +2061,16 @@ func (b *bus) metricsHandlerGET(jc jape.Context) { jc.Encode(metrics) return } + case api.MetricWallet: + var opts api.WalletMetricsQueryOpts + if jc.DecodeForm("address", &opts.Address) != nil { + return + } else if metrics, err := b.metrics(jc.Request.Context(), key, start, n, interval, opts); jc.Check("failed to get wallet metrics", err) != nil { + return + } else { + jc.Encode(metrics) + return + } default: jc.Error(fmt.Errorf("unknown metric '%s'", key), http.StatusBadRequest) return @@ -2073,6 +2085,8 @@ func (b *bus) metrics(ctx context.Context, key string, start time.Time, n uint64 return b.mtrcs.ContractSetMetrics(ctx, start, n, interval, opts.(api.ContractSetMetricsQueryOpts)) case api.MetricContractSetChurn: return b.mtrcs.ContractSetChurnMetrics(ctx, start, n, interval, opts.(api.ContractSetChurnMetricsQueryOpts)) + case api.MetricWallet: + return b.mtrcs.WalletMetrics(ctx, start, n, interval, opts.(api.WalletMetricsQueryOpts)) } return nil, nil } diff --git a/internal/node/node.go b/internal/node/node.go index 1864effc8..95234ac2f 100644 --- a/internal/node/node.go +++ b/internal/node/node.go @@ -140,6 +140,9 @@ func NewBus(cfg BusConfig, dir string, seed types.PrivateKey, l *zap.Logger) (ht w := wallet.NewSingleAddressWallet(seed, sqlStore, cfg.UsedUTXOExpiry, zap.NewNop().Sugar()) tp.TransactionPoolSubscribe(w) + if err := cs.ConsensusSetSubscribe(w, modules.ConsensusChangeRecent, nil); err != nil { + return nil, nil, err + } if m := cfg.Miner; m != nil { if err := cs.ConsensusSetSubscribe(m, ccid, nil); err != nil { diff --git a/stores/hostdb_test.go b/stores/hostdb_test.go index a61f9eea3..d4bbf13c6 100644 --- a/stores/hostdb_test.go +++ b/stores/hostdb_test.go @@ -1104,7 +1104,7 @@ func (s *SQLStore) addCustomTestHost(hk types.PublicKey, na string) error { announcement: hostdb.Announcement{NetAddress: na}, }}...) s.lastSave = time.Now().Add(s.persistInterval * -2) - return s.applyUpdates(false) + return s.applyUpdates(false, false) } // hosts returns all hosts in the db. Only used in testing since preloading all diff --git a/stores/metrics.go b/stores/metrics.go index 5fca763f6..ca5cd4988 100644 --- a/stores/metrics.go +++ b/stores/metrics.go @@ -75,52 +75,72 @@ type ( Origin string `gorm:"index;NOT NULL"` Duration time.Duration `gorm:"index;NOT NULL"` } + + // dbWalletMetric tracks information about a specific wallet. + dbWalletMetric struct { + Model + Timestamp unixTimeMS `gorm:"index;NOT NULL"` + + Address address `gorm:"index;size:32;NOT NULL"` + + ConfirmedBalanceLo unsigned64 `gorm:"index:idx_confirmed_balance;NOT NULL"` + ConfirmedBalanceHi unsigned64 `gorm:"index:idx_confirmed_balance;NOT NULL"` + SpendableBalanceLo unsigned64 `gorm:"index:idx_spendable_balance;NOT NULL"` + SpendableBalanceHi unsigned64 `gorm:"index:idx_spendable_balance;NOT NULL"` + UnconfirmedBalanceLo unsigned64 `gorm:"index:idx_unconfirmed_balance;NOT NULL"` + UnconfirmedBalanceHi unsigned64 `gorm:"index:idx_unconfirmed_balance;NOT NULL"` + } ) func (dbContractMetric) TableName() string { return "contracts" } func (dbContractSetMetric) TableName() string { return "contract_sets" } func (dbContractSetChurnMetric) TableName() string { return "contract_sets_churn" } func (dbPerformanceMetric) TableName() string { return "performance" } +func (dbWalletMetric) TableName() string { return "wallets" } -// findPeriods is the core of all methods retrieving metrics. By using integer -// division rounding combined with a GROUP BY operation, all rows of a table are -// split into intervals and the row with the lowest timestamp for each interval -// is returned. The result is then joined with the original table to retrieve -// only the metrics we want. -func (s *SQLStore) findPeriods(tx *gorm.DB, dst interface{}, start time.Time, n uint64, interval time.Duration) error { - end := start.Add(time.Duration(n) * interval) - // inner groups all metrics within the requested time range into periods of - // 'interval' length and gives us the min timestamp of each period. - inner := tx.Model(dst). - Select("MIN(timestamp) AS min_time, (timestamp - ?) / ? * ? AS period", unixTimeMS(start), interval.Milliseconds(), interval.Milliseconds()). - Where("timestamp >= ? AND timestamp < ?", unixTimeMS(start), unixTimeMS(end)). - Group("period") - // mid then joins the result with the original table. This might yield - // duplicates if multiple rows have the same timestamp so we attach a - // row number. We order the rows by id to make the result deterministic. - mid := s.dbMetrics.Model(dst). - Joins("INNER JOIN (?) periods ON timestamp = periods.min_time", inner). - Select("*, ROW_NUMBER() OVER (PARTITION BY periods.min_time ORDER BY id) AS row_num") - // lastly we select all metrics with row number 1 - return s.dbMetrics.Table("(?) numbered", mid). - Where("numbered.row_num = 1"). - Find(dst). - Error -} - -func (s *SQLStore) contractSetMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.ContractSetMetricsQueryOpts) ([]dbContractSetMetric, error) { - tx := s.dbMetrics - if opts.Name != "" { - tx = tx.Where("name", opts.Name) +func (s *SQLStore) ContractMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.ContractMetricsQueryOpts) ([]api.ContractMetric, error) { + metrics, err := s.contractMetrics(ctx, start, n, interval, opts) + if err != nil { + return nil, err + } + resp := make([]api.ContractMetric, len(metrics)) + toCurr := func(lo, hi unsigned64) types.Currency { + return types.NewCurrency(uint64(lo), uint64(hi)) + } + for i := range resp { + resp[i] = api.ContractMetric{ + Timestamp: time.Time(metrics[i].Timestamp).UTC(), + ContractID: types.FileContractID(metrics[i].FCID), + HostKey: types.PublicKey(metrics[i].Host), + RemainingCollateral: toCurr(metrics[i].RemainingCollateralLo, metrics[i].RemainingCollateralHi), + RemainingFunds: toCurr(metrics[i].RemainingFundsLo, metrics[i].RemainingFundsHi), + RevisionNumber: uint64(metrics[i].RevisionNumber), + UploadSpending: toCurr(metrics[i].UploadSpendingLo, metrics[i].UploadSpendingHi), + DownloadSpending: toCurr(metrics[i].DownloadSpendingLo, metrics[i].DownloadSpendingHi), + FundAccountSpending: toCurr(metrics[i].FundAccountSpendingLo, metrics[i].FundAccountSpendingHi), + DeleteSpending: toCurr(metrics[i].DeleteSpendingLo, metrics[i].DeleteSpendingHi), + ListSpending: toCurr(metrics[i].ListSpendingLo, metrics[i].ListSpendingHi), + } } + return resp, nil +} - var metrics []dbContractSetMetric - err := s.findPeriods(tx, &metrics, start, n, interval) +func (s *SQLStore) ContractSetChurnMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.ContractSetChurnMetricsQueryOpts) ([]api.ContractSetChurnMetric, error) { + metrics, err := s.contractSetChurnMetrics(ctx, start, n, interval, opts) if err != nil { - return nil, fmt.Errorf("failed to fetch contract set metrics: %w", err) + return nil, err } - - return metrics, nil + resp := make([]api.ContractSetChurnMetric, len(metrics)) + for i := range resp { + resp[i] = api.ContractSetChurnMetric{ + Direction: metrics[i].Direction, + ContractID: types.FileContractID(metrics[i].FCID), + Name: metrics[i].Name, + Reason: metrics[i].Reason, + Timestamp: time.Time(metrics[i].Timestamp).UTC(), + } + } + return resp, nil } func (s *SQLStore) ContractSetMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.ContractSetMetricsQueryOpts) ([]api.ContractSetMetric, error) { @@ -139,56 +159,51 @@ func (s *SQLStore) ContractSetMetrics(ctx context.Context, start time.Time, n ui return resp, nil } -func (s *SQLStore) RecordContractSetMetric(ctx context.Context, metrics ...api.ContractSetMetric) error { - dbMetrics := make([]dbContractSetMetric, len(metrics)) - for i, metric := range metrics { - dbMetrics[i] = dbContractSetMetric{ - Contracts: metric.Contracts, - Name: metric.Name, - Timestamp: unixTimeMS(metric.Timestamp), - } - } - return s.dbMetrics.Create(&dbMetrics).Error -} - -func (s *SQLStore) contractSetChurnMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.ContractSetChurnMetricsQueryOpts) ([]dbContractSetChurnMetric, error) { - tx := s.dbMetrics - if opts.Name != "" { - tx = tx.Where("name", opts.Name) - } - if opts.Direction != "" { - tx = tx.Where("direction", opts.Direction) - } - if opts.Reason != "" { - tx = tx.Where("reason", opts.Reason) - } - var metrics []dbContractSetChurnMetric - err := s.findPeriods(tx, &metrics, start, n, interval) - if err != nil { - return nil, fmt.Errorf("failed to fetch contract set churn metrics: %w", err) - } - - return metrics, nil -} - -func (s *SQLStore) ContractSetChurnMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.ContractSetChurnMetricsQueryOpts) ([]api.ContractSetChurnMetric, error) { - metrics, err := s.contractSetChurnMetrics(ctx, start, n, interval, opts) +func (s *SQLStore) PerformanceMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.PerformanceMetricsQueryOpts) ([]api.PerformanceMetric, error) { + metrics, err := s.performanceMetrics(ctx, start, n, interval, opts) if err != nil { return nil, err } - resp := make([]api.ContractSetChurnMetric, len(metrics)) + resp := make([]api.PerformanceMetric, len(metrics)) for i := range resp { - resp[i] = api.ContractSetChurnMetric{ - Direction: metrics[i].Direction, - ContractID: types.FileContractID(metrics[i].FCID), - Name: metrics[i].Name, - Reason: metrics[i].Reason, - Timestamp: time.Time(metrics[i].Timestamp).UTC(), + resp[i] = api.PerformanceMetric{ + Action: metrics[i].Action, + HostKey: types.PublicKey(metrics[i].Host), + Origin: metrics[i].Origin, + Duration: metrics[i].Duration, + Timestamp: time.Time(metrics[i].Timestamp).UTC(), } } return resp, nil } +func (s *SQLStore) RecordContractMetric(ctx context.Context, metrics ...api.ContractMetric) error { + dbMetrics := make([]dbContractMetric, len(metrics)) + for i, metric := range metrics { + dbMetrics[i] = dbContractMetric{ + Timestamp: unixTimeMS(metric.Timestamp), + FCID: fileContractID(metric.ContractID), + Host: publicKey(metric.HostKey), + RemainingCollateralLo: unsigned64(metric.RemainingCollateral.Lo), + RemainingCollateralHi: unsigned64(metric.RemainingCollateral.Hi), + RemainingFundsLo: unsigned64(metric.RemainingFunds.Lo), + RemainingFundsHi: unsigned64(metric.RemainingFunds.Hi), + RevisionNumber: unsigned64(metric.RevisionNumber), + UploadSpendingLo: unsigned64(metric.UploadSpending.Lo), + UploadSpendingHi: unsigned64(metric.UploadSpending.Hi), + DownloadSpendingLo: unsigned64(metric.DownloadSpending.Lo), + DownloadSpendingHi: unsigned64(metric.DownloadSpending.Hi), + FundAccountSpendingLo: unsigned64(metric.FundAccountSpending.Lo), + FundAccountSpendingHi: unsigned64(metric.FundAccountSpending.Hi), + DeleteSpendingLo: unsigned64(metric.DeleteSpending.Lo), + DeleteSpendingHi: unsigned64(metric.DeleteSpending.Hi), + ListSpendingLo: unsigned64(metric.ListSpending.Lo), + ListSpendingHi: unsigned64(metric.ListSpending.Hi), + } + } + return s.dbMetrics.Create(&dbMetrics).Error +} + func (s *SQLStore) RecordContractSetChurnMetric(ctx context.Context, metrics ...api.ContractSetChurnMetric) error { dbMetrics := make([]dbContractSetChurnMetric, len(metrics)) for i, metric := range metrics { @@ -203,43 +218,33 @@ func (s *SQLStore) RecordContractSetChurnMetric(ctx context.Context, metrics ... return s.dbMetrics.Create(&dbMetrics).Error } -func (s *SQLStore) performanceMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.PerformanceMetricsQueryOpts) ([]dbPerformanceMetric, error) { - tx := s.dbMetrics - if opts.Action != "" { - tx = tx.Where("action", opts.Action) - } - if opts.HostKey != (types.PublicKey{}) { - tx = tx.Where("host", publicKey(opts.HostKey)) - } - if opts.Origin != "" { - tx = tx.Where("origin", opts.Origin) - } - - var metrics []dbPerformanceMetric - err := s.findPeriods(tx, &metrics, start, n, interval) - if err != nil { - return nil, fmt.Errorf("failed to fetch performance metrics: %w", err) +func (s *SQLStore) RecordContractSetMetric(ctx context.Context, metrics ...api.ContractSetMetric) error { + dbMetrics := make([]dbContractSetMetric, len(metrics)) + for i, metric := range metrics { + dbMetrics[i] = dbContractSetMetric{ + Contracts: metric.Contracts, + Name: metric.Name, + Timestamp: unixTimeMS(metric.Timestamp), + } } - - return metrics, nil + return s.dbMetrics.Create(&dbMetrics).Error } -func (s *SQLStore) PerformanceMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.PerformanceMetricsQueryOpts) ([]api.PerformanceMetric, error) { - metrics, err := s.performanceMetrics(ctx, start, n, interval, opts) - if err != nil { - return nil, err - } - resp := make([]api.PerformanceMetric, len(metrics)) - for i := range resp { - resp[i] = api.PerformanceMetric{ - Action: metrics[i].Action, - HostKey: types.PublicKey(metrics[i].Host), - Origin: metrics[i].Origin, - Duration: metrics[i].Duration, - Timestamp: time.Time(metrics[i].Timestamp).UTC(), +func (s *SQLStore) RecordWalletMetric(ctx context.Context, metrics ...api.WalletMetric) error { + dbMetrics := make([]dbWalletMetric, len(metrics)) + for i, metric := range metrics { + dbMetrics[i] = dbWalletMetric{ + Timestamp: unixTimeMS(metric.Timestamp), + Address: address(metric.Address), + ConfirmedBalanceLo: unsigned64(metric.ConfirmedBalance.Lo), + ConfirmedBalanceHi: unsigned64(metric.ConfirmedBalance.Hi), + SpendableBalanceLo: unsigned64(metric.SpendableBalance.Lo), + SpendableBalanceHi: unsigned64(metric.SpendableBalance.Hi), + UnconfirmedBalanceLo: unsigned64(metric.UnconfirmedBalance.Lo), + UnconfirmedBalanceHi: unsigned64(metric.UnconfirmedBalance.Hi), } } - return resp, nil + return s.dbMetrics.Create(&dbMetrics).Error } func (s *SQLStore) RecordPerformanceMetric(ctx context.Context, metrics ...api.PerformanceMetric) error { @@ -256,6 +261,27 @@ func (s *SQLStore) RecordPerformanceMetric(ctx context.Context, metrics ...api.P return s.dbMetrics.Create(dbMetrics).Error } +func (s *SQLStore) WalletMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.WalletMetricsQueryOpts) ([]api.WalletMetric, error) { + metrics, err := s.walletMetrics(ctx, start, n, interval, opts) + if err != nil { + return nil, err + } + resp := make([]api.WalletMetric, len(metrics)) + toCurr := func(lo, hi unsigned64) types.Currency { + return types.NewCurrency(uint64(lo), uint64(hi)) + } + for i := range resp { + resp[i] = api.WalletMetric{ + Timestamp: time.Time(metrics[i].Timestamp).UTC(), + Address: types.Address(metrics[i].Address), + ConfirmedBalance: toCurr(metrics[i].ConfirmedBalanceLo, metrics[i].ConfirmedBalanceHi), + SpendableBalance: toCurr(metrics[i].SpendableBalanceLo, metrics[i].SpendableBalanceHi), + UnconfirmedBalance: toCurr(metrics[i].UnconfirmedBalanceLo, metrics[i].UnconfirmedBalanceHi), + } + } + return resp, nil +} + func (s *SQLStore) contractMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.ContractMetricsQueryOpts) ([]dbContractMetric, error) { tx := s.dbMetrics if opts.ContractID != (types.FileContractID{}) { @@ -274,56 +300,99 @@ func (s *SQLStore) contractMetrics(ctx context.Context, start time.Time, n uint6 return metrics, nil } -func (s *SQLStore) ContractMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.ContractMetricsQueryOpts) ([]api.ContractMetric, error) { - metrics, err := s.contractMetrics(ctx, start, n, interval, opts) +func (s *SQLStore) contractSetChurnMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.ContractSetChurnMetricsQueryOpts) ([]dbContractSetChurnMetric, error) { + tx := s.dbMetrics + if opts.Name != "" { + tx = tx.Where("name", opts.Name) + } + if opts.Direction != "" { + tx = tx.Where("direction", opts.Direction) + } + if opts.Reason != "" { + tx = tx.Where("reason", opts.Reason) + } + var metrics []dbContractSetChurnMetric + err := s.findPeriods(tx, &metrics, start, n, interval) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to fetch contract set churn metrics: %w", err) } - resp := make([]api.ContractMetric, len(metrics)) - toCurr := func(lo, hi unsigned64) types.Currency { - return types.NewCurrency(uint64(lo), uint64(hi)) + + return metrics, nil +} + +func (s *SQLStore) contractSetMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.ContractSetMetricsQueryOpts) ([]dbContractSetMetric, error) { + tx := s.dbMetrics + if opts.Name != "" { + tx = tx.Where("name", opts.Name) } - for i := range resp { - resp[i] = api.ContractMetric{ - Timestamp: time.Time(metrics[i].Timestamp).UTC(), - ContractID: types.FileContractID(metrics[i].FCID), - HostKey: types.PublicKey(metrics[i].Host), - RemainingCollateral: toCurr(metrics[i].RemainingCollateralLo, metrics[i].RemainingCollateralHi), - RemainingFunds: toCurr(metrics[i].RemainingFundsLo, metrics[i].RemainingFundsHi), - RevisionNumber: uint64(metrics[i].RevisionNumber), - UploadSpending: toCurr(metrics[i].UploadSpendingLo, metrics[i].UploadSpendingHi), - DownloadSpending: toCurr(metrics[i].DownloadSpendingLo, metrics[i].DownloadSpendingHi), - FundAccountSpending: toCurr(metrics[i].FundAccountSpendingLo, metrics[i].FundAccountSpendingHi), - DeleteSpending: toCurr(metrics[i].DeleteSpendingLo, metrics[i].DeleteSpendingHi), - ListSpending: toCurr(metrics[i].ListSpendingLo, metrics[i].ListSpendingHi), - } + + var metrics []dbContractSetMetric + err := s.findPeriods(tx, &metrics, start, n, interval) + if err != nil { + return nil, fmt.Errorf("failed to fetch contract set metrics: %w", err) } - return resp, nil + + return metrics, nil } -func (s *SQLStore) RecordContractMetric(ctx context.Context, metrics ...api.ContractMetric) error { - dbMetrics := make([]dbContractMetric, len(metrics)) - for i, metric := range metrics { - dbMetrics[i] = dbContractMetric{ - Timestamp: unixTimeMS(metric.Timestamp), - FCID: fileContractID(metric.ContractID), - Host: publicKey(metric.HostKey), - RemainingCollateralLo: unsigned64(metric.RemainingCollateral.Lo), - RemainingCollateralHi: unsigned64(metric.RemainingCollateral.Hi), - RemainingFundsLo: unsigned64(metric.RemainingFunds.Lo), - RemainingFundsHi: unsigned64(metric.RemainingFunds.Hi), - RevisionNumber: unsigned64(metric.RevisionNumber), - UploadSpendingLo: unsigned64(metric.UploadSpending.Lo), - UploadSpendingHi: unsigned64(metric.UploadSpending.Hi), - DownloadSpendingLo: unsigned64(metric.DownloadSpending.Lo), - DownloadSpendingHi: unsigned64(metric.DownloadSpending.Hi), - FundAccountSpendingLo: unsigned64(metric.FundAccountSpending.Lo), - FundAccountSpendingHi: unsigned64(metric.FundAccountSpending.Hi), - DeleteSpendingLo: unsigned64(metric.DeleteSpending.Lo), - DeleteSpendingHi: unsigned64(metric.DeleteSpending.Hi), - ListSpendingLo: unsigned64(metric.ListSpending.Lo), - ListSpendingHi: unsigned64(metric.ListSpending.Hi), - } +// findPeriods is the core of all methods retrieving metrics. By using integer +// division rounding combined with a GROUP BY operation, all rows of a table are +// split into intervals and the row with the lowest timestamp for each interval +// is returned. The result is then joined with the original table to retrieve +// only the metrics we want. +func (s *SQLStore) findPeriods(tx *gorm.DB, dst interface{}, start time.Time, n uint64, interval time.Duration) error { + end := start.Add(time.Duration(n) * interval) + // inner groups all metrics within the requested time range into periods of + // 'interval' length and gives us the min timestamp of each period. + inner := tx.Model(dst). + Select("MIN(timestamp) AS min_time, (timestamp - ?) / ? * ? AS period", unixTimeMS(start), interval.Milliseconds(), interval.Milliseconds()). + Where("timestamp >= ? AND timestamp < ?", unixTimeMS(start), unixTimeMS(end)). + Group("period") + // mid then joins the result with the original table. This might yield + // duplicates if multiple rows have the same timestamp so we attach a + // row number. We order the rows by id to make the result deterministic. + mid := s.dbMetrics.Model(dst). + Joins("INNER JOIN (?) periods ON timestamp = periods.min_time", inner). + Select("*, ROW_NUMBER() OVER (PARTITION BY periods.min_time ORDER BY id) AS row_num") + // lastly we select all metrics with row number 1 + return s.dbMetrics.Table("(?) numbered", mid). + Where("numbered.row_num = 1"). + Find(dst). + Error +} + +func (s *SQLStore) walletMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.WalletMetricsQueryOpts) ([]dbWalletMetric, error) { + tx := s.dbMetrics + if opts.Address != (types.Address{}) { + tx = tx.Where("address", address(opts.Address)) } - return s.dbMetrics.Create(&dbMetrics).Error + + var metrics []dbWalletMetric + err := s.findPeriods(tx, &metrics, start, n, interval) + if err != nil { + return nil, fmt.Errorf("failed to fetch contract metrics: %w", err) + } + + return metrics, nil +} + +func (s *SQLStore) performanceMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.PerformanceMetricsQueryOpts) ([]dbPerformanceMetric, error) { + tx := s.dbMetrics + if opts.Action != "" { + tx = tx.Where("action", opts.Action) + } + if opts.HostKey != (types.PublicKey{}) { + tx = tx.Where("host", publicKey(opts.HostKey)) + } + if opts.Origin != "" { + tx = tx.Where("origin", opts.Origin) + } + + var metrics []dbPerformanceMetric + err := s.findPeriods(tx, &metrics, start, n, interval) + if err != nil { + return nil, fmt.Errorf("failed to fetch performance metrics: %w", err) + } + + return metrics, nil } diff --git a/stores/metrics_test.go b/stores/metrics_test.go index ccb172f24..fa911adbf 100644 --- a/stores/metrics_test.go +++ b/stores/metrics_test.go @@ -303,3 +303,50 @@ func TestContractMetrics(t *testing.T) { } }) } + +func TestWalletMetrics(t *testing.T) { + ss := newTestSQLStore(t, defaultTestSQLStoreConfig) + defer ss.Close() + + // Create metrics to query. + times := []time.Time{time.UnixMilli(3), time.UnixMilli(1), time.UnixMilli(2)} + for _, recordedTime := range times { + metric := api.WalletMetric{ + Timestamp: recordedTime, + Address: types.Address{1}, + ConfirmedBalance: types.NewCurrency(frand.Uint64n(math.MaxUint64), frand.Uint64n(math.MaxUint64)), + UnconfirmedBalance: types.NewCurrency(frand.Uint64n(math.MaxUint64), frand.Uint64n(math.MaxUint64)), + SpendableBalance: types.NewCurrency(frand.Uint64n(math.MaxUint64), frand.Uint64n(math.MaxUint64)), + } + if err := ss.RecordWalletMetric(context.Background(), metric); err != nil { + t.Fatal(err) + } + } + + // Fetch all metrcis + metrics, err := ss.WalletMetrics(context.Background(), time.UnixMilli(1), 3, time.Millisecond, api.WalletMetricsQueryOpts{}) + if err != nil { + t.Fatal(err) + } else if len(metrics) != 3 { + t.Fatalf("expected 3 metrics, got %v", len(metrics)) + } else if !sort.SliceIsSorted(metrics, func(i, j int) bool { + return time.Time(metrics[i].Timestamp).Before(time.Time(metrics[j].Timestamp)) + }) { + t.Fatal("expected metrics to be sorted by time") + } + + // Query by address + metrics, err = ss.WalletMetrics(context.Background(), time.UnixMilli(1), 3, time.Millisecond, api.WalletMetricsQueryOpts{Address: types.Address{1}}) + if err != nil { + t.Fatal(err) + } else if len(metrics) != 3 { + t.Fatalf("expected 3 metrics, got %v", len(metrics)) + } + + metrics, err = ss.WalletMetrics(context.Background(), time.UnixMilli(1), 3, time.Millisecond, api.WalletMetricsQueryOpts{Address: types.Address{2}}) + if err != nil { + t.Fatal(err) + } else if len(metrics) != 0 { + t.Fatalf("expected 0 metrics, got %v", len(metrics)) + } +} diff --git a/stores/migrations.go b/stores/migrations.go index b3302e44e..cc179715c 100644 --- a/stores/migrations.go +++ b/stores/migrations.go @@ -5,8 +5,6 @@ import ( "encoding/json" "errors" "fmt" - "reflect" - "strings" "github.com/go-gormigrate/gormigrate/v2" "go.sia.tech/renterd/api" @@ -52,90 +50,43 @@ var ( // webhooks.WebhookStore tables &dbWebhook{}, } - metricsTables = []interface{}{ - &dbContractMetric{}, - &dbContractSetMetric{}, - &dbContractSetChurnMetric{}, - &dbPerformanceMetric{}, - } ) -// migrateShards performs the migrations necessary for removing the 'shards' -// table. -func migrateShards(ctx context.Context, db *gorm.DB, logger *zap.SugaredLogger) error { - m := db.Migrator() - - // add columns - if !m.HasColumn(&dbSlice{}, "db_slab_id") { - logger.Info(ctx, "adding column db_slab_id to table 'slices'") - if err := m.AddColumn(&dbSlice{}, "db_slab_id"); err != nil { - return err - } - logger.Info(ctx, "done adding column db_slab_id to table 'slices'") +// initSchema is executed only on a clean database. Otherwise the individual +// migrations are executed. +func initSchema(tx *gorm.DB) error { + // Setup join tables. + err := setupJoinTables(tx) + if err != nil { + return fmt.Errorf("failed to setup join tables: %w", err) } - if !m.HasColumn(&dbSector{}, "db_slab_id") { - logger.Info(ctx, "adding column db_slab_id to table 'sectors'") - if err := m.AddColumn(&dbSector{}, "db_slab_id"); err != nil { - return err - } - logger.Info(ctx, "done adding column db_slab_id to table 'sectors'") + + // Run auto migrations. + err = tx.AutoMigrate(tables...) + if err != nil { + return fmt.Errorf("failed to init schema: %w", err) } - // populate new columns - var err error - if m.HasColumn(&dbSlab{}, "db_slice_id") { - logger.Info(ctx, "populating column 'db_slab_id' in table 'slices'") - if isSQLite(db) { - err = db.Exec(`UPDATE slices SET db_slab_id = (SELECT slabs.id FROM slabs WHERE slabs.db_slice_id = slices.id)`).Error - } else { - err = db.Exec(`UPDATE slices sli - INNER JOIN slabs sla ON sli.id=sla.db_slice_id - SET sli.db_slab_id=sla.id`).Error + // Change the collation of columns that we need to be case sensitive. + if !isSQLite(tx) { + err = tx.Exec("ALTER TABLE objects MODIFY COLUMN object_id VARCHAR(766) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin;").Error + if err != nil { + return fmt.Errorf("failed to change object_id collation: %w", err) } + err = tx.Exec("ALTER TABLE buckets MODIFY COLUMN name VARCHAR(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin;").Error if err != nil { - return err + return fmt.Errorf("failed to change buckets_name collation: %w", err) + } + err = tx.Exec("ALTER TABLE multipart_uploads MODIFY COLUMN object_id VARCHAR(766) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin;").Error + if err != nil { + return fmt.Errorf("failed to change object_id collation: %w", err) } - logger.Info(ctx, "done populating column 'db_slab_id' in table 'slices'") - } - logger.Info(ctx, "populating column 'db_slab_id' in table 'sectors'") - if isSQLite(db) { - err = db.Exec(`UPDATE sectors SET db_slab_id = (SELECT shards.db_slab_id FROM shards WHERE shards.db_sector_id = sectors.id)`).Error - } else { - err = db.Exec(`UPDATE sectors sec - INNER JOIN shards sha ON sec.id=sha.db_sector_id - SET sec.db_slab_id=sha.db_slab_id`).Error - } - if err != nil { - return err - } - logger.Info(ctx, "done populating column 'db_slab_id' in table 'sectors'") - - // drop column db_slice_id from slabs - logger.Info(ctx, "dropping constraint 'fk_slices_slab' from table 'slabs'") - if err := m.DropConstraint(&dbSlab{}, "fk_slices_slab"); err != nil { - return err - } - logger.Info(ctx, "done dropping constraint 'fk_slices_slab' from table 'slabs'") - logger.Info(ctx, "dropping column 'db_slice_id' from table 'slabs'") - if err := m.DropColumn(&dbSlab{}, "db_slice_id"); err != nil { - return err - } - logger.Info(ctx, "done dropping column 'db_slice_id' from table 'slabs'") - - // delete any sectors that are not referenced by a slab - logger.Info(ctx, "pruning dangling sectors") - if err := db.Exec(`DELETE FROM sectors WHERE db_slab_id IS NULL`).Error; err != nil { - return err } - logger.Info(ctx, "done pruning dangling sectors") - // drop table shards - logger.Info(ctx, "dropping table 'shards'") - if err := m.DropTable("shards"); err != nil { - return err - } - logger.Info(ctx, "done dropping table 'shards'") - return nil + // Add default bucket. + return tx.Create(&dbBucket{ + Name: api.DefaultBucketName, + }).Error } func performMigrations(db *gorm.DB, logger *zap.SugaredLogger) error { @@ -367,131 +318,6 @@ func performMigrations(db *gorm.DB, logger *zap.SugaredLogger) error { return nil } -func performMetricsMigrations(db *gorm.DB, logger *zap.SugaredLogger) error { - migrations := []*gormigrate.Migration{} - // Create migrator. - m := gormigrate.New(db, gormigrate.DefaultOptions, migrations) - - // Set init function. - m.InitSchema(initMetricsSchema) - - // Perform migrations. - if err := m.Migrate(); err != nil { - return fmt.Errorf("failed to migrate: %v", err) - } - return nil -} - -// initSchema is executed only on a clean database. Otherwise the individual -// migrations are executed. -func initSchema(tx *gorm.DB) error { - // Setup join tables. - err := setupJoinTables(tx) - if err != nil { - return fmt.Errorf("failed to setup join tables: %w", err) - } - - // Run auto migrations. - err = tx.AutoMigrate(tables...) - if err != nil { - return fmt.Errorf("failed to init schema: %w", err) - } - - // Change the collation of columns that we need to be case sensitive. - if !isSQLite(tx) { - err = tx.Exec("ALTER TABLE objects MODIFY COLUMN object_id VARCHAR(766) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin;").Error - if err != nil { - return fmt.Errorf("failed to change object_id collation: %w", err) - } - err = tx.Exec("ALTER TABLE buckets MODIFY COLUMN name VARCHAR(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin;").Error - if err != nil { - return fmt.Errorf("failed to change buckets_name collation: %w", err) - } - err = tx.Exec("ALTER TABLE multipart_uploads MODIFY COLUMN object_id VARCHAR(766) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin;").Error - if err != nil { - return fmt.Errorf("failed to change object_id collation: %w", err) - } - } - - // Add default bucket. - return tx.Create(&dbBucket{ - Name: api.DefaultBucketName, - }).Error -} - -// initMetricsSchema is executed only on a clean database. Otherwise the individual -// migrations are executed. -func initMetricsSchema(tx *gorm.DB) error { - // Run auto migrations. - err := tx.AutoMigrate(metricsTables...) - if err != nil { - return fmt.Errorf("failed to init schema: %w", err) - } - return nil -} - -func detectMissingIndicesOnType(tx *gorm.DB, table interface{}, t reflect.Type, f func(dst interface{}, name string)) { - if t.Kind() == reflect.Ptr { - t = t.Elem() - } - for i := 0; i < t.NumField(); i++ { - field := t.Field(i) - if field.Anonymous { - detectMissingIndicesOnType(tx, table, field.Type, f) - continue - } - if !strings.Contains(field.Tag.Get("gorm"), "index") { - continue // no index tag - } - if !tx.Migrator().HasIndex(table, field.Name) { - f(table, field.Name) - } - } -} - -func detectMissingIndices(tx *gorm.DB, f func(dst interface{}, name string)) { - for _, table := range tables { - detectMissingIndicesOnType(tx, table, reflect.TypeOf(table), f) - } -} - -func setupJoinTables(tx *gorm.DB) error { - jointables := []struct { - model interface{} - joinTable interface{ TableName() string } - field string - }{ - { - &dbAllowlistEntry{}, - &dbHostAllowlistEntryHost{}, - "Hosts", - }, - { - &dbBlocklistEntry{}, - &dbHostBlocklistEntryHost{}, - "Hosts", - }, - { - &dbSector{}, - &dbContractSector{}, - "Contracts", - }, - { - &dbContractSet{}, - &dbContractSetContract{}, - "Contracts", - }, - } - for _, t := range jointables { - if err := tx.SetupJoinTable(t.model, t.field, t.joinTable); err != nil { - return fmt.Errorf("failed to setup join table '%s': %w", t.joinTable.TableName(), err) - } - } - return nil -} - -// performMigration00001_gormigrate performs the first migration before -// introducing gormigrate. func performMigration00001_gormigrate(txn *gorm.DB, logger *zap.SugaredLogger) error { ctx := context.Background() m := txn.Migrator() @@ -593,6 +419,84 @@ func performMigration00001_gormigrate(txn *gorm.DB, logger *zap.SugaredLogger) e return nil } +// migrateShards performs the migrations necessary for removing the 'shards' +// table. +func migrateShards(ctx context.Context, db *gorm.DB, logger *zap.SugaredLogger) error { + m := db.Migrator() + + // add columns + if !m.HasColumn(&dbSlice{}, "db_slab_id") { + logger.Info(ctx, "adding column db_slab_id to table 'slices'") + if err := m.AddColumn(&dbSlice{}, "db_slab_id"); err != nil { + return err + } + logger.Info(ctx, "done adding column db_slab_id to table 'slices'") + } + if !m.HasColumn(&dbSector{}, "db_slab_id") { + logger.Info(ctx, "adding column db_slab_id to table 'sectors'") + if err := m.AddColumn(&dbSector{}, "db_slab_id"); err != nil { + return err + } + logger.Info(ctx, "done adding column db_slab_id to table 'sectors'") + } + + // populate new columns + var err error + if m.HasColumn(&dbSlab{}, "db_slice_id") { + logger.Info(ctx, "populating column 'db_slab_id' in table 'slices'") + if isSQLite(db) { + err = db.Exec(`UPDATE slices SET db_slab_id = (SELECT slabs.id FROM slabs WHERE slabs.db_slice_id = slices.id)`).Error + } else { + err = db.Exec(`UPDATE slices sli + INNER JOIN slabs sla ON sli.id=sla.db_slice_id + SET sli.db_slab_id=sla.id`).Error + } + if err != nil { + return err + } + logger.Info(ctx, "done populating column 'db_slab_id' in table 'slices'") + } + logger.Info(ctx, "populating column 'db_slab_id' in table 'sectors'") + if isSQLite(db) { + err = db.Exec(`UPDATE sectors SET db_slab_id = (SELECT shards.db_slab_id FROM shards WHERE shards.db_sector_id = sectors.id)`).Error + } else { + err = db.Exec(`UPDATE sectors sec + INNER JOIN shards sha ON sec.id=sha.db_sector_id + SET sec.db_slab_id=sha.db_slab_id`).Error + } + if err != nil { + return err + } + logger.Info(ctx, "done populating column 'db_slab_id' in table 'sectors'") + + // drop column db_slice_id from slabs + logger.Info(ctx, "dropping constraint 'fk_slices_slab' from table 'slabs'") + if err := m.DropConstraint(&dbSlab{}, "fk_slices_slab"); err != nil { + return err + } + logger.Info(ctx, "done dropping constraint 'fk_slices_slab' from table 'slabs'") + logger.Info(ctx, "dropping column 'db_slice_id' from table 'slabs'") + if err := m.DropColumn(&dbSlab{}, "db_slice_id"); err != nil { + return err + } + logger.Info(ctx, "done dropping column 'db_slice_id' from table 'slabs'") + + // delete any sectors that are not referenced by a slab + logger.Info(ctx, "pruning dangling sectors") + if err := db.Exec(`DELETE FROM sectors WHERE db_slab_id IS NULL`).Error; err != nil { + return err + } + logger.Info(ctx, "done pruning dangling sectors") + + // drop table shards + logger.Info(ctx, "dropping table 'shards'") + if err := m.DropTable("shards"); err != nil { + return err + } + logger.Info(ctx, "done dropping table 'shards'") + return nil +} + func performMigration00002_dropconstraintslabcsid(txn *gorm.DB, logger *zap.SugaredLogger) error { ctx := context.Background() m := txn.Migrator() diff --git a/stores/migrations_metrics.go b/stores/migrations_metrics.go new file mode 100644 index 000000000..fd2918bc3 --- /dev/null +++ b/stores/migrations_metrics.go @@ -0,0 +1,63 @@ +package stores + +import ( + "fmt" + + "github.com/go-gormigrate/gormigrate/v2" + "go.uber.org/zap" + "gorm.io/gorm" +) + +var ( + metricsTables = []interface{}{ + &dbContractMetric{}, + &dbContractSetMetric{}, + &dbContractSetChurnMetric{}, + &dbPerformanceMetric{}, + &dbWalletMetric{}, + } +) + +// initMetricsSchema is executed only on a clean database. Otherwise the individual +// migrations are executed. +func initMetricsSchema(tx *gorm.DB) error { + // Run auto migrations. + err := tx.AutoMigrate(metricsTables...) + if err != nil { + return fmt.Errorf("failed to init schema: %w", err) + } + return nil +} + +func performMetricsMigrations(db *gorm.DB, logger *zap.SugaredLogger) error { + migrations := []*gormigrate.Migration{ + { + ID: "00001_wallet_metrics", + Migrate: func(tx *gorm.DB) error { + return performMigration00001_wallet_metrics(tx, logger) + }, + Rollback: nil, + }, + } + + // Create migrator. + m := gormigrate.New(db, gormigrate.DefaultOptions, migrations) + + // Set init function. + m.InitSchema(initMetricsSchema) + + // Perform migrations. + if err := m.Migrate(); err != nil { + return fmt.Errorf("failed to migrate: %v", err) + } + return nil +} + +func performMigration00001_wallet_metrics(txn *gorm.DB, logger *zap.SugaredLogger) error { + logger.Info("performing migration 00001_wallet_metrics") + if err := txn.Migrator().AutoMigrate(&dbWalletMetric{}); err != nil { + return err + } + logger.Info("migration 00001_wallet_metrics complete") + return nil +} diff --git a/stores/migrations_utils.go b/stores/migrations_utils.go new file mode 100644 index 000000000..fdf7e6e44 --- /dev/null +++ b/stores/migrations_utils.go @@ -0,0 +1,69 @@ +package stores + +import ( + "fmt" + "reflect" + "strings" + + "gorm.io/gorm" +) + +func detectMissingIndices(tx *gorm.DB, f func(dst interface{}, name string)) { + for _, table := range tables { + detectMissingIndicesOnType(tx, table, reflect.TypeOf(table), f) + } +} + +func detectMissingIndicesOnType(tx *gorm.DB, table interface{}, t reflect.Type, f func(dst interface{}, name string)) { + if t.Kind() == reflect.Ptr { + t = t.Elem() + } + for i := 0; i < t.NumField(); i++ { + field := t.Field(i) + if field.Anonymous { + detectMissingIndicesOnType(tx, table, field.Type, f) + continue + } + if !strings.Contains(field.Tag.Get("gorm"), "index") { + continue // no index tag + } + if !tx.Migrator().HasIndex(table, field.Name) { + f(table, field.Name) + } + } +} + +func setupJoinTables(tx *gorm.DB) error { + jointables := []struct { + model interface{} + joinTable interface{ TableName() string } + field string + }{ + { + &dbAllowlistEntry{}, + &dbHostAllowlistEntryHost{}, + "Hosts", + }, + { + &dbBlocklistEntry{}, + &dbHostBlocklistEntryHost{}, + "Hosts", + }, + { + &dbSector{}, + &dbContractSector{}, + "Contracts", + }, + { + &dbContractSet{}, + &dbContractSetContract{}, + "Contracts", + }, + } + for _, t := range jointables { + if err := tx.SetupJoinTable(t.model, t.field, t.joinTable); err != nil { + return fmt.Errorf("failed to setup join table '%s': %w", t.joinTable.TableName(), err) + } + } + return nil +} diff --git a/stores/sql.go b/stores/sql.go index 5a8a414c7..6127d72ea 100644 --- a/stores/sql.go +++ b/stores/sql.go @@ -346,7 +346,7 @@ func (ss *SQLStore) ProcessConsensusChange(cc modules.ConsensusChange) { } // Try to apply the updates. - if err := ss.applyUpdates(false); err != nil { + if err := ss.applyUpdates(false, cc.Synced); err != nil { ss.logger.Error(fmt.Sprintf("failed to apply updates, err: %v", err)) } @@ -368,14 +368,14 @@ func (ss *SQLStore) ProcessConsensusChange(cc modules.ConsensusChange) { ss.persistMu.Lock() defer ss.persistMu.Unlock() - if err := ss.applyUpdates(true); err != nil { + if err := ss.applyUpdates(true, cc.Synced); err != nil { ss.logger.Error(fmt.Sprintf("failed to apply updates, err: %v", err)) } }) } // applyUpdates applies all unapplied updates to the database. -func (ss *SQLStore) applyUpdates(force bool) (err error) { +func (ss *SQLStore) applyUpdates(force, synced bool) (err error) { // Check if we need to apply changes persistIntervalPassed := time.Since(ss.lastSave) > ss.persistInterval // enough time has passed since last persist softLimitReached := len(ss.unappliedAnnouncements) >= announcementBatchSoftLimit // enough announcements have accumulated diff --git a/stores/sql_test.go b/stores/sql_test.go index b91125cf4..56b96b12f 100644 --- a/stores/sql_test.go +++ b/stores/sql_test.go @@ -41,8 +41,6 @@ type testSQLStore struct { type testSQLStoreConfig struct { dbName string dbMetricsName string - dbConn gorm.Dialector - dbMetricsConn gorm.Dialector dir string persistent bool skipMigrate bool diff --git a/stores/types.go b/stores/types.go index 729a4fbc2..864de3c3a 100644 --- a/stores/types.go +++ b/stores/types.go @@ -18,6 +18,7 @@ import ( var zeroCurrency = currency(types.ZeroCurrency) type ( + address types.Address unixTimeMS time.Time datetime time.Time currency types.Currency @@ -31,6 +32,29 @@ type ( secretKey []byte ) +// GormDataType implements gorm.GormDataTypeInterface. +func (address) GormDataType() string { + return "bytes" +} + +// Scan scan value into address, implements sql.Scanner interface. +func (a *address) Scan(value interface{}) error { + bytes, ok := value.([]byte) + if !ok { + return errors.New(fmt.Sprint("failed to unmarshal address value:", value)) + } + if len(bytes) < len(address{}) { + return fmt.Errorf("failed to unmarshal address value due to insufficient bytes %v < %v: %v", len(bytes), len(address{}), value) + } + *a = *(*address)(bytes) + return nil +} + +// Value returns an address value, implements driver.Valuer interface. +func (a address) Value() (driver.Value, error) { + return a[:], nil +} + // GormDataType implements gorm.GormDataTypeInterface. func (secretKey) GormDataType() string { return "bytes" @@ -98,7 +122,7 @@ func (fcid *fileContractID) Scan(value interface{}) error { return nil } -// Value returns a currency value, implements driver.Valuer interface. +// Value returns a fileContractID value, implements driver.Valuer interface. func (fcid fileContractID) Value() (driver.Value, error) { return fcid[:], nil } diff --git a/wallet/wallet.go b/wallet/wallet.go index 07ac0db35..e0cf8f0af 100644 --- a/wallet/wallet.go +++ b/wallet/wallet.go @@ -2,6 +2,7 @@ package wallet import ( "bytes" + "context" "errors" "fmt" "reflect" @@ -12,6 +13,7 @@ import ( "gitlab.com/NebulousLabs/encoding" "go.sia.tech/core/consensus" "go.sia.tech/core/types" + "go.sia.tech/renterd/api" "go.sia.tech/siad/modules" "go.uber.org/zap" "lukechampine.com/frand" @@ -112,6 +114,7 @@ type SingleAddressStore interface { Height() uint64 UnspentSiacoinElements(matured bool) ([]SiacoinElement, error) Transactions(before, since time.Time, offset, limit int) ([]Transaction, error) + RecordWalletMetric(ctx context.Context, metrics ...api.WalletMetric) error } // A TransactionPool contains transactions that have not yet been included in a @@ -418,6 +421,37 @@ func (w *SingleAddressWallet) isOutputUsed(id types.Hash256) bool { return time.Since(lastUsed) <= w.usedUTXOExpiry || inPool } +// ProcessConsensusChange implements modules.ConsensusSetSubscriber. +func (w *SingleAddressWallet) ProcessConsensusChange(cc modules.ConsensusChange) { + // only record when we are synced + if !cc.Synced { + return + } + + // apply sane timeout + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + + // fetch balance + spendable, confirmed, unconfirmed, err := w.Balance() + if err != nil { + w.log.Errorf("failed to fetch wallet balance, err: %v", err) + return + } + + // record wallet metric + if err := w.store.RecordWalletMetric(ctx, api.WalletMetric{ + Timestamp: time.Now().UTC(), + Address: w.addr, + ConfirmedBalance: confirmed, + UnconfirmedBalance: unconfirmed, + SpendableBalance: spendable, + }); err != nil { + w.log.Errorf("failed to record wallet metric, err: %v", err) + return + } +} + // ReceiveUpdatedUnconfirmedTransactions implements modules.TransactionPoolSubscriber. func (w *SingleAddressWallet) ReceiveUpdatedUnconfirmedTransactions(diff *modules.TransactionPoolDiff) { siacoinOutputs := make(map[types.SiacoinOutputID]SiacoinElement) diff --git a/wallet/wallet_test.go b/wallet/wallet_test.go index 881e1eecc..a6b02fcc0 100644 --- a/wallet/wallet_test.go +++ b/wallet/wallet_test.go @@ -1,12 +1,14 @@ package wallet_test import ( + "context" "strings" "testing" "time" "go.sia.tech/core/consensus" "go.sia.tech/core/types" + "go.sia.tech/renterd/api" "go.sia.tech/renterd/wallet" "go.uber.org/zap" "lukechampine.com/frand" @@ -26,6 +28,9 @@ func (s *mockStore) UnspentSiacoinElements(bool) ([]wallet.SiacoinElement, error func (s *mockStore) Transactions(before, since time.Time, offset, limit int) ([]wallet.Transaction, error) { return nil, nil } +func (s *mockStore) RecordWalletMetric(ctx context.Context, metrics ...api.WalletMetric) error { + return nil +} var cs = consensus.State{ Index: types.ChainIndex{