From e9e7f5d40d5e2a68ed4a4cf7dfaacf79f60f9162 Mon Sep 17 00:00:00 2001 From: PJ Date: Tue, 28 Nov 2023 13:07:15 +0100 Subject: [PATCH 1/7] stores: add wallet metric --- api/metrcis.go | 15 ++ bus/bus.go | 14 ++ internal/node/node.go | 3 + stores/hostdb_test.go | 2 +- stores/metrics.go | 375 +++++++++++++++++++++-------------- stores/metrics_test.go | 47 +++++ stores/migrations.go | 306 ++++++++++------------------ stores/migrations_metrics.go | 63 ++++++ stores/migrations_utils.go | 69 +++++++ stores/sql.go | 6 +- stores/sql_test.go | 2 - stores/types.go | 26 ++- wallet/wallet.go | 34 ++++ wallet/wallet_test.go | 5 + 14 files changed, 606 insertions(+), 361 deletions(-) create mode 100644 stores/migrations_metrics.go create mode 100644 stores/migrations_utils.go diff --git a/api/metrcis.go b/api/metrcis.go index 63c77365b..34479eaaa 100644 --- a/api/metrcis.go +++ b/api/metrcis.go @@ -13,6 +13,7 @@ const ( MetricContractSet = "contractset" MetricContractSetChurn = "churn" MetricContract = "contract" + MetricWallet = "wallet" ) type ( @@ -75,6 +76,20 @@ type ( ContractID types.FileContractID HostKey types.PublicKey } + + WalletMetric struct { + Timestamp time.Time `json:"timestamp"` + + Address types.Address `json:"address"` + + ConfirmedBalance types.Currency `json:"confirmedBalance"` + SpendableBalance types.Currency `json:"spendableBalance"` + UnconfirmedBalance types.Currency `json:"unconfirmedBalance"` + } + + WalletMetricsQueryOpts struct { + Address types.Address + } ) type ( diff --git a/bus/bus.go b/bus/bus.go index d1ffb5e37..423241e15 100644 --- a/bus/bus.go +++ b/bus/bus.go @@ -201,6 +201,8 @@ type ( ContractSetChurnMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.ContractSetChurnMetricsQueryOpts) ([]api.ContractSetChurnMetric, error) RecordContractSetChurnMetric(ctx context.Context, metrics ...api.ContractSetChurnMetric) error + + WalletMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.WalletMetricsQueryOpts) ([]api.WalletMetric, error) } ) @@ -2059,6 +2061,16 @@ func (b *bus) metricsHandlerGET(jc jape.Context) { jc.Encode(metrics) return } + case api.MetricWallet: + var opts api.WalletMetricsQueryOpts + if jc.DecodeForm("address", &opts.Address) != nil { + return + } else if metrics, err := b.metrics(jc.Request.Context(), key, start, n, interval, opts); jc.Check("failed to get wallet metrics", err) != nil { + return + } else { + jc.Encode(metrics) + return + } default: jc.Error(fmt.Errorf("unknown metric '%s'", key), http.StatusBadRequest) return @@ -2073,6 +2085,8 @@ func (b *bus) metrics(ctx context.Context, key string, start time.Time, n uint64 return b.mtrcs.ContractSetMetrics(ctx, start, n, interval, opts.(api.ContractSetMetricsQueryOpts)) case api.MetricContractSetChurn: return b.mtrcs.ContractSetChurnMetrics(ctx, start, n, interval, opts.(api.ContractSetChurnMetricsQueryOpts)) + case api.MetricWallet: + return b.mtrcs.WalletMetrics(ctx, start, n, interval, opts.(api.WalletMetricsQueryOpts)) } return nil, nil } diff --git a/internal/node/node.go b/internal/node/node.go index 1864effc8..95234ac2f 100644 --- a/internal/node/node.go +++ b/internal/node/node.go @@ -140,6 +140,9 @@ func NewBus(cfg BusConfig, dir string, seed types.PrivateKey, l *zap.Logger) (ht w := wallet.NewSingleAddressWallet(seed, sqlStore, cfg.UsedUTXOExpiry, zap.NewNop().Sugar()) tp.TransactionPoolSubscribe(w) + if err := cs.ConsensusSetSubscribe(w, modules.ConsensusChangeRecent, nil); err != nil { + return nil, nil, err + } if m := cfg.Miner; m != nil { if err := cs.ConsensusSetSubscribe(m, ccid, nil); err != nil { diff --git a/stores/hostdb_test.go b/stores/hostdb_test.go index a61f9eea3..d4bbf13c6 100644 --- a/stores/hostdb_test.go +++ b/stores/hostdb_test.go @@ -1104,7 +1104,7 @@ func (s *SQLStore) addCustomTestHost(hk types.PublicKey, na string) error { announcement: hostdb.Announcement{NetAddress: na}, }}...) s.lastSave = time.Now().Add(s.persistInterval * -2) - return s.applyUpdates(false) + return s.applyUpdates(false, false) } // hosts returns all hosts in the db. Only used in testing since preloading all diff --git a/stores/metrics.go b/stores/metrics.go index 5fca763f6..ca5cd4988 100644 --- a/stores/metrics.go +++ b/stores/metrics.go @@ -75,52 +75,72 @@ type ( Origin string `gorm:"index;NOT NULL"` Duration time.Duration `gorm:"index;NOT NULL"` } + + // dbWalletMetric tracks information about a specific wallet. + dbWalletMetric struct { + Model + Timestamp unixTimeMS `gorm:"index;NOT NULL"` + + Address address `gorm:"index;size:32;NOT NULL"` + + ConfirmedBalanceLo unsigned64 `gorm:"index:idx_confirmed_balance;NOT NULL"` + ConfirmedBalanceHi unsigned64 `gorm:"index:idx_confirmed_balance;NOT NULL"` + SpendableBalanceLo unsigned64 `gorm:"index:idx_spendable_balance;NOT NULL"` + SpendableBalanceHi unsigned64 `gorm:"index:idx_spendable_balance;NOT NULL"` + UnconfirmedBalanceLo unsigned64 `gorm:"index:idx_unconfirmed_balance;NOT NULL"` + UnconfirmedBalanceHi unsigned64 `gorm:"index:idx_unconfirmed_balance;NOT NULL"` + } ) func (dbContractMetric) TableName() string { return "contracts" } func (dbContractSetMetric) TableName() string { return "contract_sets" } func (dbContractSetChurnMetric) TableName() string { return "contract_sets_churn" } func (dbPerformanceMetric) TableName() string { return "performance" } +func (dbWalletMetric) TableName() string { return "wallets" } -// findPeriods is the core of all methods retrieving metrics. By using integer -// division rounding combined with a GROUP BY operation, all rows of a table are -// split into intervals and the row with the lowest timestamp for each interval -// is returned. The result is then joined with the original table to retrieve -// only the metrics we want. -func (s *SQLStore) findPeriods(tx *gorm.DB, dst interface{}, start time.Time, n uint64, interval time.Duration) error { - end := start.Add(time.Duration(n) * interval) - // inner groups all metrics within the requested time range into periods of - // 'interval' length and gives us the min timestamp of each period. - inner := tx.Model(dst). - Select("MIN(timestamp) AS min_time, (timestamp - ?) / ? * ? AS period", unixTimeMS(start), interval.Milliseconds(), interval.Milliseconds()). - Where("timestamp >= ? AND timestamp < ?", unixTimeMS(start), unixTimeMS(end)). - Group("period") - // mid then joins the result with the original table. This might yield - // duplicates if multiple rows have the same timestamp so we attach a - // row number. We order the rows by id to make the result deterministic. - mid := s.dbMetrics.Model(dst). - Joins("INNER JOIN (?) periods ON timestamp = periods.min_time", inner). - Select("*, ROW_NUMBER() OVER (PARTITION BY periods.min_time ORDER BY id) AS row_num") - // lastly we select all metrics with row number 1 - return s.dbMetrics.Table("(?) numbered", mid). - Where("numbered.row_num = 1"). - Find(dst). - Error -} - -func (s *SQLStore) contractSetMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.ContractSetMetricsQueryOpts) ([]dbContractSetMetric, error) { - tx := s.dbMetrics - if opts.Name != "" { - tx = tx.Where("name", opts.Name) +func (s *SQLStore) ContractMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.ContractMetricsQueryOpts) ([]api.ContractMetric, error) { + metrics, err := s.contractMetrics(ctx, start, n, interval, opts) + if err != nil { + return nil, err + } + resp := make([]api.ContractMetric, len(metrics)) + toCurr := func(lo, hi unsigned64) types.Currency { + return types.NewCurrency(uint64(lo), uint64(hi)) + } + for i := range resp { + resp[i] = api.ContractMetric{ + Timestamp: time.Time(metrics[i].Timestamp).UTC(), + ContractID: types.FileContractID(metrics[i].FCID), + HostKey: types.PublicKey(metrics[i].Host), + RemainingCollateral: toCurr(metrics[i].RemainingCollateralLo, metrics[i].RemainingCollateralHi), + RemainingFunds: toCurr(metrics[i].RemainingFundsLo, metrics[i].RemainingFundsHi), + RevisionNumber: uint64(metrics[i].RevisionNumber), + UploadSpending: toCurr(metrics[i].UploadSpendingLo, metrics[i].UploadSpendingHi), + DownloadSpending: toCurr(metrics[i].DownloadSpendingLo, metrics[i].DownloadSpendingHi), + FundAccountSpending: toCurr(metrics[i].FundAccountSpendingLo, metrics[i].FundAccountSpendingHi), + DeleteSpending: toCurr(metrics[i].DeleteSpendingLo, metrics[i].DeleteSpendingHi), + ListSpending: toCurr(metrics[i].ListSpendingLo, metrics[i].ListSpendingHi), + } } + return resp, nil +} - var metrics []dbContractSetMetric - err := s.findPeriods(tx, &metrics, start, n, interval) +func (s *SQLStore) ContractSetChurnMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.ContractSetChurnMetricsQueryOpts) ([]api.ContractSetChurnMetric, error) { + metrics, err := s.contractSetChurnMetrics(ctx, start, n, interval, opts) if err != nil { - return nil, fmt.Errorf("failed to fetch contract set metrics: %w", err) + return nil, err } - - return metrics, nil + resp := make([]api.ContractSetChurnMetric, len(metrics)) + for i := range resp { + resp[i] = api.ContractSetChurnMetric{ + Direction: metrics[i].Direction, + ContractID: types.FileContractID(metrics[i].FCID), + Name: metrics[i].Name, + Reason: metrics[i].Reason, + Timestamp: time.Time(metrics[i].Timestamp).UTC(), + } + } + return resp, nil } func (s *SQLStore) ContractSetMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.ContractSetMetricsQueryOpts) ([]api.ContractSetMetric, error) { @@ -139,56 +159,51 @@ func (s *SQLStore) ContractSetMetrics(ctx context.Context, start time.Time, n ui return resp, nil } -func (s *SQLStore) RecordContractSetMetric(ctx context.Context, metrics ...api.ContractSetMetric) error { - dbMetrics := make([]dbContractSetMetric, len(metrics)) - for i, metric := range metrics { - dbMetrics[i] = dbContractSetMetric{ - Contracts: metric.Contracts, - Name: metric.Name, - Timestamp: unixTimeMS(metric.Timestamp), - } - } - return s.dbMetrics.Create(&dbMetrics).Error -} - -func (s *SQLStore) contractSetChurnMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.ContractSetChurnMetricsQueryOpts) ([]dbContractSetChurnMetric, error) { - tx := s.dbMetrics - if opts.Name != "" { - tx = tx.Where("name", opts.Name) - } - if opts.Direction != "" { - tx = tx.Where("direction", opts.Direction) - } - if opts.Reason != "" { - tx = tx.Where("reason", opts.Reason) - } - var metrics []dbContractSetChurnMetric - err := s.findPeriods(tx, &metrics, start, n, interval) - if err != nil { - return nil, fmt.Errorf("failed to fetch contract set churn metrics: %w", err) - } - - return metrics, nil -} - -func (s *SQLStore) ContractSetChurnMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.ContractSetChurnMetricsQueryOpts) ([]api.ContractSetChurnMetric, error) { - metrics, err := s.contractSetChurnMetrics(ctx, start, n, interval, opts) +func (s *SQLStore) PerformanceMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.PerformanceMetricsQueryOpts) ([]api.PerformanceMetric, error) { + metrics, err := s.performanceMetrics(ctx, start, n, interval, opts) if err != nil { return nil, err } - resp := make([]api.ContractSetChurnMetric, len(metrics)) + resp := make([]api.PerformanceMetric, len(metrics)) for i := range resp { - resp[i] = api.ContractSetChurnMetric{ - Direction: metrics[i].Direction, - ContractID: types.FileContractID(metrics[i].FCID), - Name: metrics[i].Name, - Reason: metrics[i].Reason, - Timestamp: time.Time(metrics[i].Timestamp).UTC(), + resp[i] = api.PerformanceMetric{ + Action: metrics[i].Action, + HostKey: types.PublicKey(metrics[i].Host), + Origin: metrics[i].Origin, + Duration: metrics[i].Duration, + Timestamp: time.Time(metrics[i].Timestamp).UTC(), } } return resp, nil } +func (s *SQLStore) RecordContractMetric(ctx context.Context, metrics ...api.ContractMetric) error { + dbMetrics := make([]dbContractMetric, len(metrics)) + for i, metric := range metrics { + dbMetrics[i] = dbContractMetric{ + Timestamp: unixTimeMS(metric.Timestamp), + FCID: fileContractID(metric.ContractID), + Host: publicKey(metric.HostKey), + RemainingCollateralLo: unsigned64(metric.RemainingCollateral.Lo), + RemainingCollateralHi: unsigned64(metric.RemainingCollateral.Hi), + RemainingFundsLo: unsigned64(metric.RemainingFunds.Lo), + RemainingFundsHi: unsigned64(metric.RemainingFunds.Hi), + RevisionNumber: unsigned64(metric.RevisionNumber), + UploadSpendingLo: unsigned64(metric.UploadSpending.Lo), + UploadSpendingHi: unsigned64(metric.UploadSpending.Hi), + DownloadSpendingLo: unsigned64(metric.DownloadSpending.Lo), + DownloadSpendingHi: unsigned64(metric.DownloadSpending.Hi), + FundAccountSpendingLo: unsigned64(metric.FundAccountSpending.Lo), + FundAccountSpendingHi: unsigned64(metric.FundAccountSpending.Hi), + DeleteSpendingLo: unsigned64(metric.DeleteSpending.Lo), + DeleteSpendingHi: unsigned64(metric.DeleteSpending.Hi), + ListSpendingLo: unsigned64(metric.ListSpending.Lo), + ListSpendingHi: unsigned64(metric.ListSpending.Hi), + } + } + return s.dbMetrics.Create(&dbMetrics).Error +} + func (s *SQLStore) RecordContractSetChurnMetric(ctx context.Context, metrics ...api.ContractSetChurnMetric) error { dbMetrics := make([]dbContractSetChurnMetric, len(metrics)) for i, metric := range metrics { @@ -203,43 +218,33 @@ func (s *SQLStore) RecordContractSetChurnMetric(ctx context.Context, metrics ... return s.dbMetrics.Create(&dbMetrics).Error } -func (s *SQLStore) performanceMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.PerformanceMetricsQueryOpts) ([]dbPerformanceMetric, error) { - tx := s.dbMetrics - if opts.Action != "" { - tx = tx.Where("action", opts.Action) - } - if opts.HostKey != (types.PublicKey{}) { - tx = tx.Where("host", publicKey(opts.HostKey)) - } - if opts.Origin != "" { - tx = tx.Where("origin", opts.Origin) - } - - var metrics []dbPerformanceMetric - err := s.findPeriods(tx, &metrics, start, n, interval) - if err != nil { - return nil, fmt.Errorf("failed to fetch performance metrics: %w", err) +func (s *SQLStore) RecordContractSetMetric(ctx context.Context, metrics ...api.ContractSetMetric) error { + dbMetrics := make([]dbContractSetMetric, len(metrics)) + for i, metric := range metrics { + dbMetrics[i] = dbContractSetMetric{ + Contracts: metric.Contracts, + Name: metric.Name, + Timestamp: unixTimeMS(metric.Timestamp), + } } - - return metrics, nil + return s.dbMetrics.Create(&dbMetrics).Error } -func (s *SQLStore) PerformanceMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.PerformanceMetricsQueryOpts) ([]api.PerformanceMetric, error) { - metrics, err := s.performanceMetrics(ctx, start, n, interval, opts) - if err != nil { - return nil, err - } - resp := make([]api.PerformanceMetric, len(metrics)) - for i := range resp { - resp[i] = api.PerformanceMetric{ - Action: metrics[i].Action, - HostKey: types.PublicKey(metrics[i].Host), - Origin: metrics[i].Origin, - Duration: metrics[i].Duration, - Timestamp: time.Time(metrics[i].Timestamp).UTC(), +func (s *SQLStore) RecordWalletMetric(ctx context.Context, metrics ...api.WalletMetric) error { + dbMetrics := make([]dbWalletMetric, len(metrics)) + for i, metric := range metrics { + dbMetrics[i] = dbWalletMetric{ + Timestamp: unixTimeMS(metric.Timestamp), + Address: address(metric.Address), + ConfirmedBalanceLo: unsigned64(metric.ConfirmedBalance.Lo), + ConfirmedBalanceHi: unsigned64(metric.ConfirmedBalance.Hi), + SpendableBalanceLo: unsigned64(metric.SpendableBalance.Lo), + SpendableBalanceHi: unsigned64(metric.SpendableBalance.Hi), + UnconfirmedBalanceLo: unsigned64(metric.UnconfirmedBalance.Lo), + UnconfirmedBalanceHi: unsigned64(metric.UnconfirmedBalance.Hi), } } - return resp, nil + return s.dbMetrics.Create(&dbMetrics).Error } func (s *SQLStore) RecordPerformanceMetric(ctx context.Context, metrics ...api.PerformanceMetric) error { @@ -256,6 +261,27 @@ func (s *SQLStore) RecordPerformanceMetric(ctx context.Context, metrics ...api.P return s.dbMetrics.Create(dbMetrics).Error } +func (s *SQLStore) WalletMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.WalletMetricsQueryOpts) ([]api.WalletMetric, error) { + metrics, err := s.walletMetrics(ctx, start, n, interval, opts) + if err != nil { + return nil, err + } + resp := make([]api.WalletMetric, len(metrics)) + toCurr := func(lo, hi unsigned64) types.Currency { + return types.NewCurrency(uint64(lo), uint64(hi)) + } + for i := range resp { + resp[i] = api.WalletMetric{ + Timestamp: time.Time(metrics[i].Timestamp).UTC(), + Address: types.Address(metrics[i].Address), + ConfirmedBalance: toCurr(metrics[i].ConfirmedBalanceLo, metrics[i].ConfirmedBalanceHi), + SpendableBalance: toCurr(metrics[i].SpendableBalanceLo, metrics[i].SpendableBalanceHi), + UnconfirmedBalance: toCurr(metrics[i].UnconfirmedBalanceLo, metrics[i].UnconfirmedBalanceHi), + } + } + return resp, nil +} + func (s *SQLStore) contractMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.ContractMetricsQueryOpts) ([]dbContractMetric, error) { tx := s.dbMetrics if opts.ContractID != (types.FileContractID{}) { @@ -274,56 +300,99 @@ func (s *SQLStore) contractMetrics(ctx context.Context, start time.Time, n uint6 return metrics, nil } -func (s *SQLStore) ContractMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.ContractMetricsQueryOpts) ([]api.ContractMetric, error) { - metrics, err := s.contractMetrics(ctx, start, n, interval, opts) +func (s *SQLStore) contractSetChurnMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.ContractSetChurnMetricsQueryOpts) ([]dbContractSetChurnMetric, error) { + tx := s.dbMetrics + if opts.Name != "" { + tx = tx.Where("name", opts.Name) + } + if opts.Direction != "" { + tx = tx.Where("direction", opts.Direction) + } + if opts.Reason != "" { + tx = tx.Where("reason", opts.Reason) + } + var metrics []dbContractSetChurnMetric + err := s.findPeriods(tx, &metrics, start, n, interval) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to fetch contract set churn metrics: %w", err) } - resp := make([]api.ContractMetric, len(metrics)) - toCurr := func(lo, hi unsigned64) types.Currency { - return types.NewCurrency(uint64(lo), uint64(hi)) + + return metrics, nil +} + +func (s *SQLStore) contractSetMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.ContractSetMetricsQueryOpts) ([]dbContractSetMetric, error) { + tx := s.dbMetrics + if opts.Name != "" { + tx = tx.Where("name", opts.Name) } - for i := range resp { - resp[i] = api.ContractMetric{ - Timestamp: time.Time(metrics[i].Timestamp).UTC(), - ContractID: types.FileContractID(metrics[i].FCID), - HostKey: types.PublicKey(metrics[i].Host), - RemainingCollateral: toCurr(metrics[i].RemainingCollateralLo, metrics[i].RemainingCollateralHi), - RemainingFunds: toCurr(metrics[i].RemainingFundsLo, metrics[i].RemainingFundsHi), - RevisionNumber: uint64(metrics[i].RevisionNumber), - UploadSpending: toCurr(metrics[i].UploadSpendingLo, metrics[i].UploadSpendingHi), - DownloadSpending: toCurr(metrics[i].DownloadSpendingLo, metrics[i].DownloadSpendingHi), - FundAccountSpending: toCurr(metrics[i].FundAccountSpendingLo, metrics[i].FundAccountSpendingHi), - DeleteSpending: toCurr(metrics[i].DeleteSpendingLo, metrics[i].DeleteSpendingHi), - ListSpending: toCurr(metrics[i].ListSpendingLo, metrics[i].ListSpendingHi), - } + + var metrics []dbContractSetMetric + err := s.findPeriods(tx, &metrics, start, n, interval) + if err != nil { + return nil, fmt.Errorf("failed to fetch contract set metrics: %w", err) } - return resp, nil + + return metrics, nil } -func (s *SQLStore) RecordContractMetric(ctx context.Context, metrics ...api.ContractMetric) error { - dbMetrics := make([]dbContractMetric, len(metrics)) - for i, metric := range metrics { - dbMetrics[i] = dbContractMetric{ - Timestamp: unixTimeMS(metric.Timestamp), - FCID: fileContractID(metric.ContractID), - Host: publicKey(metric.HostKey), - RemainingCollateralLo: unsigned64(metric.RemainingCollateral.Lo), - RemainingCollateralHi: unsigned64(metric.RemainingCollateral.Hi), - RemainingFundsLo: unsigned64(metric.RemainingFunds.Lo), - RemainingFundsHi: unsigned64(metric.RemainingFunds.Hi), - RevisionNumber: unsigned64(metric.RevisionNumber), - UploadSpendingLo: unsigned64(metric.UploadSpending.Lo), - UploadSpendingHi: unsigned64(metric.UploadSpending.Hi), - DownloadSpendingLo: unsigned64(metric.DownloadSpending.Lo), - DownloadSpendingHi: unsigned64(metric.DownloadSpending.Hi), - FundAccountSpendingLo: unsigned64(metric.FundAccountSpending.Lo), - FundAccountSpendingHi: unsigned64(metric.FundAccountSpending.Hi), - DeleteSpendingLo: unsigned64(metric.DeleteSpending.Lo), - DeleteSpendingHi: unsigned64(metric.DeleteSpending.Hi), - ListSpendingLo: unsigned64(metric.ListSpending.Lo), - ListSpendingHi: unsigned64(metric.ListSpending.Hi), - } +// findPeriods is the core of all methods retrieving metrics. By using integer +// division rounding combined with a GROUP BY operation, all rows of a table are +// split into intervals and the row with the lowest timestamp for each interval +// is returned. The result is then joined with the original table to retrieve +// only the metrics we want. +func (s *SQLStore) findPeriods(tx *gorm.DB, dst interface{}, start time.Time, n uint64, interval time.Duration) error { + end := start.Add(time.Duration(n) * interval) + // inner groups all metrics within the requested time range into periods of + // 'interval' length and gives us the min timestamp of each period. + inner := tx.Model(dst). + Select("MIN(timestamp) AS min_time, (timestamp - ?) / ? * ? AS period", unixTimeMS(start), interval.Milliseconds(), interval.Milliseconds()). + Where("timestamp >= ? AND timestamp < ?", unixTimeMS(start), unixTimeMS(end)). + Group("period") + // mid then joins the result with the original table. This might yield + // duplicates if multiple rows have the same timestamp so we attach a + // row number. We order the rows by id to make the result deterministic. + mid := s.dbMetrics.Model(dst). + Joins("INNER JOIN (?) periods ON timestamp = periods.min_time", inner). + Select("*, ROW_NUMBER() OVER (PARTITION BY periods.min_time ORDER BY id) AS row_num") + // lastly we select all metrics with row number 1 + return s.dbMetrics.Table("(?) numbered", mid). + Where("numbered.row_num = 1"). + Find(dst). + Error +} + +func (s *SQLStore) walletMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.WalletMetricsQueryOpts) ([]dbWalletMetric, error) { + tx := s.dbMetrics + if opts.Address != (types.Address{}) { + tx = tx.Where("address", address(opts.Address)) } - return s.dbMetrics.Create(&dbMetrics).Error + + var metrics []dbWalletMetric + err := s.findPeriods(tx, &metrics, start, n, interval) + if err != nil { + return nil, fmt.Errorf("failed to fetch contract metrics: %w", err) + } + + return metrics, nil +} + +func (s *SQLStore) performanceMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.PerformanceMetricsQueryOpts) ([]dbPerformanceMetric, error) { + tx := s.dbMetrics + if opts.Action != "" { + tx = tx.Where("action", opts.Action) + } + if opts.HostKey != (types.PublicKey{}) { + tx = tx.Where("host", publicKey(opts.HostKey)) + } + if opts.Origin != "" { + tx = tx.Where("origin", opts.Origin) + } + + var metrics []dbPerformanceMetric + err := s.findPeriods(tx, &metrics, start, n, interval) + if err != nil { + return nil, fmt.Errorf("failed to fetch performance metrics: %w", err) + } + + return metrics, nil } diff --git a/stores/metrics_test.go b/stores/metrics_test.go index ccb172f24..fa911adbf 100644 --- a/stores/metrics_test.go +++ b/stores/metrics_test.go @@ -303,3 +303,50 @@ func TestContractMetrics(t *testing.T) { } }) } + +func TestWalletMetrics(t *testing.T) { + ss := newTestSQLStore(t, defaultTestSQLStoreConfig) + defer ss.Close() + + // Create metrics to query. + times := []time.Time{time.UnixMilli(3), time.UnixMilli(1), time.UnixMilli(2)} + for _, recordedTime := range times { + metric := api.WalletMetric{ + Timestamp: recordedTime, + Address: types.Address{1}, + ConfirmedBalance: types.NewCurrency(frand.Uint64n(math.MaxUint64), frand.Uint64n(math.MaxUint64)), + UnconfirmedBalance: types.NewCurrency(frand.Uint64n(math.MaxUint64), frand.Uint64n(math.MaxUint64)), + SpendableBalance: types.NewCurrency(frand.Uint64n(math.MaxUint64), frand.Uint64n(math.MaxUint64)), + } + if err := ss.RecordWalletMetric(context.Background(), metric); err != nil { + t.Fatal(err) + } + } + + // Fetch all metrcis + metrics, err := ss.WalletMetrics(context.Background(), time.UnixMilli(1), 3, time.Millisecond, api.WalletMetricsQueryOpts{}) + if err != nil { + t.Fatal(err) + } else if len(metrics) != 3 { + t.Fatalf("expected 3 metrics, got %v", len(metrics)) + } else if !sort.SliceIsSorted(metrics, func(i, j int) bool { + return time.Time(metrics[i].Timestamp).Before(time.Time(metrics[j].Timestamp)) + }) { + t.Fatal("expected metrics to be sorted by time") + } + + // Query by address + metrics, err = ss.WalletMetrics(context.Background(), time.UnixMilli(1), 3, time.Millisecond, api.WalletMetricsQueryOpts{Address: types.Address{1}}) + if err != nil { + t.Fatal(err) + } else if len(metrics) != 3 { + t.Fatalf("expected 3 metrics, got %v", len(metrics)) + } + + metrics, err = ss.WalletMetrics(context.Background(), time.UnixMilli(1), 3, time.Millisecond, api.WalletMetricsQueryOpts{Address: types.Address{2}}) + if err != nil { + t.Fatal(err) + } else if len(metrics) != 0 { + t.Fatalf("expected 0 metrics, got %v", len(metrics)) + } +} diff --git a/stores/migrations.go b/stores/migrations.go index b3302e44e..cc179715c 100644 --- a/stores/migrations.go +++ b/stores/migrations.go @@ -5,8 +5,6 @@ import ( "encoding/json" "errors" "fmt" - "reflect" - "strings" "github.com/go-gormigrate/gormigrate/v2" "go.sia.tech/renterd/api" @@ -52,90 +50,43 @@ var ( // webhooks.WebhookStore tables &dbWebhook{}, } - metricsTables = []interface{}{ - &dbContractMetric{}, - &dbContractSetMetric{}, - &dbContractSetChurnMetric{}, - &dbPerformanceMetric{}, - } ) -// migrateShards performs the migrations necessary for removing the 'shards' -// table. -func migrateShards(ctx context.Context, db *gorm.DB, logger *zap.SugaredLogger) error { - m := db.Migrator() - - // add columns - if !m.HasColumn(&dbSlice{}, "db_slab_id") { - logger.Info(ctx, "adding column db_slab_id to table 'slices'") - if err := m.AddColumn(&dbSlice{}, "db_slab_id"); err != nil { - return err - } - logger.Info(ctx, "done adding column db_slab_id to table 'slices'") +// initSchema is executed only on a clean database. Otherwise the individual +// migrations are executed. +func initSchema(tx *gorm.DB) error { + // Setup join tables. + err := setupJoinTables(tx) + if err != nil { + return fmt.Errorf("failed to setup join tables: %w", err) } - if !m.HasColumn(&dbSector{}, "db_slab_id") { - logger.Info(ctx, "adding column db_slab_id to table 'sectors'") - if err := m.AddColumn(&dbSector{}, "db_slab_id"); err != nil { - return err - } - logger.Info(ctx, "done adding column db_slab_id to table 'sectors'") + + // Run auto migrations. + err = tx.AutoMigrate(tables...) + if err != nil { + return fmt.Errorf("failed to init schema: %w", err) } - // populate new columns - var err error - if m.HasColumn(&dbSlab{}, "db_slice_id") { - logger.Info(ctx, "populating column 'db_slab_id' in table 'slices'") - if isSQLite(db) { - err = db.Exec(`UPDATE slices SET db_slab_id = (SELECT slabs.id FROM slabs WHERE slabs.db_slice_id = slices.id)`).Error - } else { - err = db.Exec(`UPDATE slices sli - INNER JOIN slabs sla ON sli.id=sla.db_slice_id - SET sli.db_slab_id=sla.id`).Error + // Change the collation of columns that we need to be case sensitive. + if !isSQLite(tx) { + err = tx.Exec("ALTER TABLE objects MODIFY COLUMN object_id VARCHAR(766) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin;").Error + if err != nil { + return fmt.Errorf("failed to change object_id collation: %w", err) } + err = tx.Exec("ALTER TABLE buckets MODIFY COLUMN name VARCHAR(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin;").Error if err != nil { - return err + return fmt.Errorf("failed to change buckets_name collation: %w", err) + } + err = tx.Exec("ALTER TABLE multipart_uploads MODIFY COLUMN object_id VARCHAR(766) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin;").Error + if err != nil { + return fmt.Errorf("failed to change object_id collation: %w", err) } - logger.Info(ctx, "done populating column 'db_slab_id' in table 'slices'") - } - logger.Info(ctx, "populating column 'db_slab_id' in table 'sectors'") - if isSQLite(db) { - err = db.Exec(`UPDATE sectors SET db_slab_id = (SELECT shards.db_slab_id FROM shards WHERE shards.db_sector_id = sectors.id)`).Error - } else { - err = db.Exec(`UPDATE sectors sec - INNER JOIN shards sha ON sec.id=sha.db_sector_id - SET sec.db_slab_id=sha.db_slab_id`).Error - } - if err != nil { - return err - } - logger.Info(ctx, "done populating column 'db_slab_id' in table 'sectors'") - - // drop column db_slice_id from slabs - logger.Info(ctx, "dropping constraint 'fk_slices_slab' from table 'slabs'") - if err := m.DropConstraint(&dbSlab{}, "fk_slices_slab"); err != nil { - return err - } - logger.Info(ctx, "done dropping constraint 'fk_slices_slab' from table 'slabs'") - logger.Info(ctx, "dropping column 'db_slice_id' from table 'slabs'") - if err := m.DropColumn(&dbSlab{}, "db_slice_id"); err != nil { - return err - } - logger.Info(ctx, "done dropping column 'db_slice_id' from table 'slabs'") - - // delete any sectors that are not referenced by a slab - logger.Info(ctx, "pruning dangling sectors") - if err := db.Exec(`DELETE FROM sectors WHERE db_slab_id IS NULL`).Error; err != nil { - return err } - logger.Info(ctx, "done pruning dangling sectors") - // drop table shards - logger.Info(ctx, "dropping table 'shards'") - if err := m.DropTable("shards"); err != nil { - return err - } - logger.Info(ctx, "done dropping table 'shards'") - return nil + // Add default bucket. + return tx.Create(&dbBucket{ + Name: api.DefaultBucketName, + }).Error } func performMigrations(db *gorm.DB, logger *zap.SugaredLogger) error { @@ -367,131 +318,6 @@ func performMigrations(db *gorm.DB, logger *zap.SugaredLogger) error { return nil } -func performMetricsMigrations(db *gorm.DB, logger *zap.SugaredLogger) error { - migrations := []*gormigrate.Migration{} - // Create migrator. - m := gormigrate.New(db, gormigrate.DefaultOptions, migrations) - - // Set init function. - m.InitSchema(initMetricsSchema) - - // Perform migrations. - if err := m.Migrate(); err != nil { - return fmt.Errorf("failed to migrate: %v", err) - } - return nil -} - -// initSchema is executed only on a clean database. Otherwise the individual -// migrations are executed. -func initSchema(tx *gorm.DB) error { - // Setup join tables. - err := setupJoinTables(tx) - if err != nil { - return fmt.Errorf("failed to setup join tables: %w", err) - } - - // Run auto migrations. - err = tx.AutoMigrate(tables...) - if err != nil { - return fmt.Errorf("failed to init schema: %w", err) - } - - // Change the collation of columns that we need to be case sensitive. - if !isSQLite(tx) { - err = tx.Exec("ALTER TABLE objects MODIFY COLUMN object_id VARCHAR(766) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin;").Error - if err != nil { - return fmt.Errorf("failed to change object_id collation: %w", err) - } - err = tx.Exec("ALTER TABLE buckets MODIFY COLUMN name VARCHAR(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin;").Error - if err != nil { - return fmt.Errorf("failed to change buckets_name collation: %w", err) - } - err = tx.Exec("ALTER TABLE multipart_uploads MODIFY COLUMN object_id VARCHAR(766) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin;").Error - if err != nil { - return fmt.Errorf("failed to change object_id collation: %w", err) - } - } - - // Add default bucket. - return tx.Create(&dbBucket{ - Name: api.DefaultBucketName, - }).Error -} - -// initMetricsSchema is executed only on a clean database. Otherwise the individual -// migrations are executed. -func initMetricsSchema(tx *gorm.DB) error { - // Run auto migrations. - err := tx.AutoMigrate(metricsTables...) - if err != nil { - return fmt.Errorf("failed to init schema: %w", err) - } - return nil -} - -func detectMissingIndicesOnType(tx *gorm.DB, table interface{}, t reflect.Type, f func(dst interface{}, name string)) { - if t.Kind() == reflect.Ptr { - t = t.Elem() - } - for i := 0; i < t.NumField(); i++ { - field := t.Field(i) - if field.Anonymous { - detectMissingIndicesOnType(tx, table, field.Type, f) - continue - } - if !strings.Contains(field.Tag.Get("gorm"), "index") { - continue // no index tag - } - if !tx.Migrator().HasIndex(table, field.Name) { - f(table, field.Name) - } - } -} - -func detectMissingIndices(tx *gorm.DB, f func(dst interface{}, name string)) { - for _, table := range tables { - detectMissingIndicesOnType(tx, table, reflect.TypeOf(table), f) - } -} - -func setupJoinTables(tx *gorm.DB) error { - jointables := []struct { - model interface{} - joinTable interface{ TableName() string } - field string - }{ - { - &dbAllowlistEntry{}, - &dbHostAllowlistEntryHost{}, - "Hosts", - }, - { - &dbBlocklistEntry{}, - &dbHostBlocklistEntryHost{}, - "Hosts", - }, - { - &dbSector{}, - &dbContractSector{}, - "Contracts", - }, - { - &dbContractSet{}, - &dbContractSetContract{}, - "Contracts", - }, - } - for _, t := range jointables { - if err := tx.SetupJoinTable(t.model, t.field, t.joinTable); err != nil { - return fmt.Errorf("failed to setup join table '%s': %w", t.joinTable.TableName(), err) - } - } - return nil -} - -// performMigration00001_gormigrate performs the first migration before -// introducing gormigrate. func performMigration00001_gormigrate(txn *gorm.DB, logger *zap.SugaredLogger) error { ctx := context.Background() m := txn.Migrator() @@ -593,6 +419,84 @@ func performMigration00001_gormigrate(txn *gorm.DB, logger *zap.SugaredLogger) e return nil } +// migrateShards performs the migrations necessary for removing the 'shards' +// table. +func migrateShards(ctx context.Context, db *gorm.DB, logger *zap.SugaredLogger) error { + m := db.Migrator() + + // add columns + if !m.HasColumn(&dbSlice{}, "db_slab_id") { + logger.Info(ctx, "adding column db_slab_id to table 'slices'") + if err := m.AddColumn(&dbSlice{}, "db_slab_id"); err != nil { + return err + } + logger.Info(ctx, "done adding column db_slab_id to table 'slices'") + } + if !m.HasColumn(&dbSector{}, "db_slab_id") { + logger.Info(ctx, "adding column db_slab_id to table 'sectors'") + if err := m.AddColumn(&dbSector{}, "db_slab_id"); err != nil { + return err + } + logger.Info(ctx, "done adding column db_slab_id to table 'sectors'") + } + + // populate new columns + var err error + if m.HasColumn(&dbSlab{}, "db_slice_id") { + logger.Info(ctx, "populating column 'db_slab_id' in table 'slices'") + if isSQLite(db) { + err = db.Exec(`UPDATE slices SET db_slab_id = (SELECT slabs.id FROM slabs WHERE slabs.db_slice_id = slices.id)`).Error + } else { + err = db.Exec(`UPDATE slices sli + INNER JOIN slabs sla ON sli.id=sla.db_slice_id + SET sli.db_slab_id=sla.id`).Error + } + if err != nil { + return err + } + logger.Info(ctx, "done populating column 'db_slab_id' in table 'slices'") + } + logger.Info(ctx, "populating column 'db_slab_id' in table 'sectors'") + if isSQLite(db) { + err = db.Exec(`UPDATE sectors SET db_slab_id = (SELECT shards.db_slab_id FROM shards WHERE shards.db_sector_id = sectors.id)`).Error + } else { + err = db.Exec(`UPDATE sectors sec + INNER JOIN shards sha ON sec.id=sha.db_sector_id + SET sec.db_slab_id=sha.db_slab_id`).Error + } + if err != nil { + return err + } + logger.Info(ctx, "done populating column 'db_slab_id' in table 'sectors'") + + // drop column db_slice_id from slabs + logger.Info(ctx, "dropping constraint 'fk_slices_slab' from table 'slabs'") + if err := m.DropConstraint(&dbSlab{}, "fk_slices_slab"); err != nil { + return err + } + logger.Info(ctx, "done dropping constraint 'fk_slices_slab' from table 'slabs'") + logger.Info(ctx, "dropping column 'db_slice_id' from table 'slabs'") + if err := m.DropColumn(&dbSlab{}, "db_slice_id"); err != nil { + return err + } + logger.Info(ctx, "done dropping column 'db_slice_id' from table 'slabs'") + + // delete any sectors that are not referenced by a slab + logger.Info(ctx, "pruning dangling sectors") + if err := db.Exec(`DELETE FROM sectors WHERE db_slab_id IS NULL`).Error; err != nil { + return err + } + logger.Info(ctx, "done pruning dangling sectors") + + // drop table shards + logger.Info(ctx, "dropping table 'shards'") + if err := m.DropTable("shards"); err != nil { + return err + } + logger.Info(ctx, "done dropping table 'shards'") + return nil +} + func performMigration00002_dropconstraintslabcsid(txn *gorm.DB, logger *zap.SugaredLogger) error { ctx := context.Background() m := txn.Migrator() diff --git a/stores/migrations_metrics.go b/stores/migrations_metrics.go new file mode 100644 index 000000000..fd2918bc3 --- /dev/null +++ b/stores/migrations_metrics.go @@ -0,0 +1,63 @@ +package stores + +import ( + "fmt" + + "github.com/go-gormigrate/gormigrate/v2" + "go.uber.org/zap" + "gorm.io/gorm" +) + +var ( + metricsTables = []interface{}{ + &dbContractMetric{}, + &dbContractSetMetric{}, + &dbContractSetChurnMetric{}, + &dbPerformanceMetric{}, + &dbWalletMetric{}, + } +) + +// initMetricsSchema is executed only on a clean database. Otherwise the individual +// migrations are executed. +func initMetricsSchema(tx *gorm.DB) error { + // Run auto migrations. + err := tx.AutoMigrate(metricsTables...) + if err != nil { + return fmt.Errorf("failed to init schema: %w", err) + } + return nil +} + +func performMetricsMigrations(db *gorm.DB, logger *zap.SugaredLogger) error { + migrations := []*gormigrate.Migration{ + { + ID: "00001_wallet_metrics", + Migrate: func(tx *gorm.DB) error { + return performMigration00001_wallet_metrics(tx, logger) + }, + Rollback: nil, + }, + } + + // Create migrator. + m := gormigrate.New(db, gormigrate.DefaultOptions, migrations) + + // Set init function. + m.InitSchema(initMetricsSchema) + + // Perform migrations. + if err := m.Migrate(); err != nil { + return fmt.Errorf("failed to migrate: %v", err) + } + return nil +} + +func performMigration00001_wallet_metrics(txn *gorm.DB, logger *zap.SugaredLogger) error { + logger.Info("performing migration 00001_wallet_metrics") + if err := txn.Migrator().AutoMigrate(&dbWalletMetric{}); err != nil { + return err + } + logger.Info("migration 00001_wallet_metrics complete") + return nil +} diff --git a/stores/migrations_utils.go b/stores/migrations_utils.go new file mode 100644 index 000000000..fdf7e6e44 --- /dev/null +++ b/stores/migrations_utils.go @@ -0,0 +1,69 @@ +package stores + +import ( + "fmt" + "reflect" + "strings" + + "gorm.io/gorm" +) + +func detectMissingIndices(tx *gorm.DB, f func(dst interface{}, name string)) { + for _, table := range tables { + detectMissingIndicesOnType(tx, table, reflect.TypeOf(table), f) + } +} + +func detectMissingIndicesOnType(tx *gorm.DB, table interface{}, t reflect.Type, f func(dst interface{}, name string)) { + if t.Kind() == reflect.Ptr { + t = t.Elem() + } + for i := 0; i < t.NumField(); i++ { + field := t.Field(i) + if field.Anonymous { + detectMissingIndicesOnType(tx, table, field.Type, f) + continue + } + if !strings.Contains(field.Tag.Get("gorm"), "index") { + continue // no index tag + } + if !tx.Migrator().HasIndex(table, field.Name) { + f(table, field.Name) + } + } +} + +func setupJoinTables(tx *gorm.DB) error { + jointables := []struct { + model interface{} + joinTable interface{ TableName() string } + field string + }{ + { + &dbAllowlistEntry{}, + &dbHostAllowlistEntryHost{}, + "Hosts", + }, + { + &dbBlocklistEntry{}, + &dbHostBlocklistEntryHost{}, + "Hosts", + }, + { + &dbSector{}, + &dbContractSector{}, + "Contracts", + }, + { + &dbContractSet{}, + &dbContractSetContract{}, + "Contracts", + }, + } + for _, t := range jointables { + if err := tx.SetupJoinTable(t.model, t.field, t.joinTable); err != nil { + return fmt.Errorf("failed to setup join table '%s': %w", t.joinTable.TableName(), err) + } + } + return nil +} diff --git a/stores/sql.go b/stores/sql.go index 5a8a414c7..6127d72ea 100644 --- a/stores/sql.go +++ b/stores/sql.go @@ -346,7 +346,7 @@ func (ss *SQLStore) ProcessConsensusChange(cc modules.ConsensusChange) { } // Try to apply the updates. - if err := ss.applyUpdates(false); err != nil { + if err := ss.applyUpdates(false, cc.Synced); err != nil { ss.logger.Error(fmt.Sprintf("failed to apply updates, err: %v", err)) } @@ -368,14 +368,14 @@ func (ss *SQLStore) ProcessConsensusChange(cc modules.ConsensusChange) { ss.persistMu.Lock() defer ss.persistMu.Unlock() - if err := ss.applyUpdates(true); err != nil { + if err := ss.applyUpdates(true, cc.Synced); err != nil { ss.logger.Error(fmt.Sprintf("failed to apply updates, err: %v", err)) } }) } // applyUpdates applies all unapplied updates to the database. -func (ss *SQLStore) applyUpdates(force bool) (err error) { +func (ss *SQLStore) applyUpdates(force, synced bool) (err error) { // Check if we need to apply changes persistIntervalPassed := time.Since(ss.lastSave) > ss.persistInterval // enough time has passed since last persist softLimitReached := len(ss.unappliedAnnouncements) >= announcementBatchSoftLimit // enough announcements have accumulated diff --git a/stores/sql_test.go b/stores/sql_test.go index b91125cf4..56b96b12f 100644 --- a/stores/sql_test.go +++ b/stores/sql_test.go @@ -41,8 +41,6 @@ type testSQLStore struct { type testSQLStoreConfig struct { dbName string dbMetricsName string - dbConn gorm.Dialector - dbMetricsConn gorm.Dialector dir string persistent bool skipMigrate bool diff --git a/stores/types.go b/stores/types.go index 729a4fbc2..864de3c3a 100644 --- a/stores/types.go +++ b/stores/types.go @@ -18,6 +18,7 @@ import ( var zeroCurrency = currency(types.ZeroCurrency) type ( + address types.Address unixTimeMS time.Time datetime time.Time currency types.Currency @@ -31,6 +32,29 @@ type ( secretKey []byte ) +// GormDataType implements gorm.GormDataTypeInterface. +func (address) GormDataType() string { + return "bytes" +} + +// Scan scan value into address, implements sql.Scanner interface. +func (a *address) Scan(value interface{}) error { + bytes, ok := value.([]byte) + if !ok { + return errors.New(fmt.Sprint("failed to unmarshal address value:", value)) + } + if len(bytes) < len(address{}) { + return fmt.Errorf("failed to unmarshal address value due to insufficient bytes %v < %v: %v", len(bytes), len(address{}), value) + } + *a = *(*address)(bytes) + return nil +} + +// Value returns an address value, implements driver.Valuer interface. +func (a address) Value() (driver.Value, error) { + return a[:], nil +} + // GormDataType implements gorm.GormDataTypeInterface. func (secretKey) GormDataType() string { return "bytes" @@ -98,7 +122,7 @@ func (fcid *fileContractID) Scan(value interface{}) error { return nil } -// Value returns a currency value, implements driver.Valuer interface. +// Value returns a fileContractID value, implements driver.Valuer interface. func (fcid fileContractID) Value() (driver.Value, error) { return fcid[:], nil } diff --git a/wallet/wallet.go b/wallet/wallet.go index 07ac0db35..e0cf8f0af 100644 --- a/wallet/wallet.go +++ b/wallet/wallet.go @@ -2,6 +2,7 @@ package wallet import ( "bytes" + "context" "errors" "fmt" "reflect" @@ -12,6 +13,7 @@ import ( "gitlab.com/NebulousLabs/encoding" "go.sia.tech/core/consensus" "go.sia.tech/core/types" + "go.sia.tech/renterd/api" "go.sia.tech/siad/modules" "go.uber.org/zap" "lukechampine.com/frand" @@ -112,6 +114,7 @@ type SingleAddressStore interface { Height() uint64 UnspentSiacoinElements(matured bool) ([]SiacoinElement, error) Transactions(before, since time.Time, offset, limit int) ([]Transaction, error) + RecordWalletMetric(ctx context.Context, metrics ...api.WalletMetric) error } // A TransactionPool contains transactions that have not yet been included in a @@ -418,6 +421,37 @@ func (w *SingleAddressWallet) isOutputUsed(id types.Hash256) bool { return time.Since(lastUsed) <= w.usedUTXOExpiry || inPool } +// ProcessConsensusChange implements modules.ConsensusSetSubscriber. +func (w *SingleAddressWallet) ProcessConsensusChange(cc modules.ConsensusChange) { + // only record when we are synced + if !cc.Synced { + return + } + + // apply sane timeout + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + + // fetch balance + spendable, confirmed, unconfirmed, err := w.Balance() + if err != nil { + w.log.Errorf("failed to fetch wallet balance, err: %v", err) + return + } + + // record wallet metric + if err := w.store.RecordWalletMetric(ctx, api.WalletMetric{ + Timestamp: time.Now().UTC(), + Address: w.addr, + ConfirmedBalance: confirmed, + UnconfirmedBalance: unconfirmed, + SpendableBalance: spendable, + }); err != nil { + w.log.Errorf("failed to record wallet metric, err: %v", err) + return + } +} + // ReceiveUpdatedUnconfirmedTransactions implements modules.TransactionPoolSubscriber. func (w *SingleAddressWallet) ReceiveUpdatedUnconfirmedTransactions(diff *modules.TransactionPoolDiff) { siacoinOutputs := make(map[types.SiacoinOutputID]SiacoinElement) diff --git a/wallet/wallet_test.go b/wallet/wallet_test.go index 881e1eecc..a6b02fcc0 100644 --- a/wallet/wallet_test.go +++ b/wallet/wallet_test.go @@ -1,12 +1,14 @@ package wallet_test import ( + "context" "strings" "testing" "time" "go.sia.tech/core/consensus" "go.sia.tech/core/types" + "go.sia.tech/renterd/api" "go.sia.tech/renterd/wallet" "go.uber.org/zap" "lukechampine.com/frand" @@ -26,6 +28,9 @@ func (s *mockStore) UnspentSiacoinElements(bool) ([]wallet.SiacoinElement, error func (s *mockStore) Transactions(before, since time.Time, offset, limit int) ([]wallet.Transaction, error) { return nil, nil } +func (s *mockStore) RecordWalletMetric(ctx context.Context, metrics ...api.WalletMetric) error { + return nil +} var cs = consensus.State{ Index: types.ChainIndex{ From 2f53ee3dd8637bb29a5b1da6c5b258f29aaaee66 Mon Sep 17 00:00:00 2001 From: PJ Date: Wed, 29 Nov 2023 10:13:10 +0100 Subject: [PATCH 2/7] stores: remove _Balance --- api/metrcis.go | 6 +++--- stores/metrics.go | 38 +++++++++++++++++++------------------- stores/metrics_test.go | 10 +++++----- wallet/wallet.go | 10 +++++----- 4 files changed, 32 insertions(+), 32 deletions(-) diff --git a/api/metrcis.go b/api/metrcis.go index 34479eaaa..536dc968c 100644 --- a/api/metrcis.go +++ b/api/metrcis.go @@ -82,9 +82,9 @@ type ( Address types.Address `json:"address"` - ConfirmedBalance types.Currency `json:"confirmedBalance"` - SpendableBalance types.Currency `json:"spendableBalance"` - UnconfirmedBalance types.Currency `json:"unconfirmedBalance"` + Confirmed types.Currency `json:"confirmed"` + Spendable types.Currency `json:"spendable"` + Unconfirmed types.Currency `json:"unconfirmed"` } WalletMetricsQueryOpts struct { diff --git a/stores/metrics.go b/stores/metrics.go index ca5cd4988..a46858041 100644 --- a/stores/metrics.go +++ b/stores/metrics.go @@ -83,12 +83,12 @@ type ( Address address `gorm:"index;size:32;NOT NULL"` - ConfirmedBalanceLo unsigned64 `gorm:"index:idx_confirmed_balance;NOT NULL"` - ConfirmedBalanceHi unsigned64 `gorm:"index:idx_confirmed_balance;NOT NULL"` - SpendableBalanceLo unsigned64 `gorm:"index:idx_spendable_balance;NOT NULL"` - SpendableBalanceHi unsigned64 `gorm:"index:idx_spendable_balance;NOT NULL"` - UnconfirmedBalanceLo unsigned64 `gorm:"index:idx_unconfirmed_balance;NOT NULL"` - UnconfirmedBalanceHi unsigned64 `gorm:"index:idx_unconfirmed_balance;NOT NULL"` + ConfirmedLo unsigned64 `gorm:"index:idx_confirmed;NOT NULL"` + ConfirmedHi unsigned64 `gorm:"index:idx_confirmed;NOT NULL"` + SpendableLo unsigned64 `gorm:"index:idx_spendable;NOT NULL"` + SpendableHi unsigned64 `gorm:"index:idx_spendable;NOT NULL"` + UnconfirmedLo unsigned64 `gorm:"index:idx_unconfirmed;NOT NULL"` + UnconfirmedHi unsigned64 `gorm:"index:idx_unconfirmed;NOT NULL"` } ) @@ -234,14 +234,14 @@ func (s *SQLStore) RecordWalletMetric(ctx context.Context, metrics ...api.Wallet dbMetrics := make([]dbWalletMetric, len(metrics)) for i, metric := range metrics { dbMetrics[i] = dbWalletMetric{ - Timestamp: unixTimeMS(metric.Timestamp), - Address: address(metric.Address), - ConfirmedBalanceLo: unsigned64(metric.ConfirmedBalance.Lo), - ConfirmedBalanceHi: unsigned64(metric.ConfirmedBalance.Hi), - SpendableBalanceLo: unsigned64(metric.SpendableBalance.Lo), - SpendableBalanceHi: unsigned64(metric.SpendableBalance.Hi), - UnconfirmedBalanceLo: unsigned64(metric.UnconfirmedBalance.Lo), - UnconfirmedBalanceHi: unsigned64(metric.UnconfirmedBalance.Hi), + Timestamp: unixTimeMS(metric.Timestamp), + Address: address(metric.Address), + ConfirmedLo: unsigned64(metric.Confirmed.Lo), + ConfirmedHi: unsigned64(metric.Confirmed.Hi), + SpendableLo: unsigned64(metric.Spendable.Lo), + SpendableHi: unsigned64(metric.Spendable.Hi), + UnconfirmedLo: unsigned64(metric.Unconfirmed.Lo), + UnconfirmedHi: unsigned64(metric.Unconfirmed.Hi), } } return s.dbMetrics.Create(&dbMetrics).Error @@ -272,11 +272,11 @@ func (s *SQLStore) WalletMetrics(ctx context.Context, start time.Time, n uint64, } for i := range resp { resp[i] = api.WalletMetric{ - Timestamp: time.Time(metrics[i].Timestamp).UTC(), - Address: types.Address(metrics[i].Address), - ConfirmedBalance: toCurr(metrics[i].ConfirmedBalanceLo, metrics[i].ConfirmedBalanceHi), - SpendableBalance: toCurr(metrics[i].SpendableBalanceLo, metrics[i].SpendableBalanceHi), - UnconfirmedBalance: toCurr(metrics[i].UnconfirmedBalanceLo, metrics[i].UnconfirmedBalanceHi), + Timestamp: time.Time(metrics[i].Timestamp).UTC(), + Address: types.Address(metrics[i].Address), + Confirmed: toCurr(metrics[i].ConfirmedLo, metrics[i].ConfirmedHi), + Spendable: toCurr(metrics[i].SpendableLo, metrics[i].SpendableHi), + Unconfirmed: toCurr(metrics[i].UnconfirmedLo, metrics[i].UnconfirmedHi), } } return resp, nil diff --git a/stores/metrics_test.go b/stores/metrics_test.go index fa911adbf..07a502e4d 100644 --- a/stores/metrics_test.go +++ b/stores/metrics_test.go @@ -312,11 +312,11 @@ func TestWalletMetrics(t *testing.T) { times := []time.Time{time.UnixMilli(3), time.UnixMilli(1), time.UnixMilli(2)} for _, recordedTime := range times { metric := api.WalletMetric{ - Timestamp: recordedTime, - Address: types.Address{1}, - ConfirmedBalance: types.NewCurrency(frand.Uint64n(math.MaxUint64), frand.Uint64n(math.MaxUint64)), - UnconfirmedBalance: types.NewCurrency(frand.Uint64n(math.MaxUint64), frand.Uint64n(math.MaxUint64)), - SpendableBalance: types.NewCurrency(frand.Uint64n(math.MaxUint64), frand.Uint64n(math.MaxUint64)), + Timestamp: recordedTime, + Address: types.Address{1}, + Confirmed: types.NewCurrency(frand.Uint64n(math.MaxUint64), frand.Uint64n(math.MaxUint64)), + Unconfirmed: types.NewCurrency(frand.Uint64n(math.MaxUint64), frand.Uint64n(math.MaxUint64)), + Spendable: types.NewCurrency(frand.Uint64n(math.MaxUint64), frand.Uint64n(math.MaxUint64)), } if err := ss.RecordWalletMetric(context.Background(), metric); err != nil { t.Fatal(err) diff --git a/wallet/wallet.go b/wallet/wallet.go index e0cf8f0af..b76e9715c 100644 --- a/wallet/wallet.go +++ b/wallet/wallet.go @@ -441,11 +441,11 @@ func (w *SingleAddressWallet) ProcessConsensusChange(cc modules.ConsensusChange) // record wallet metric if err := w.store.RecordWalletMetric(ctx, api.WalletMetric{ - Timestamp: time.Now().UTC(), - Address: w.addr, - ConfirmedBalance: confirmed, - UnconfirmedBalance: unconfirmed, - SpendableBalance: spendable, + Timestamp: time.Now().UTC(), + Address: w.addr, + Confirmed: confirmed, + Unconfirmed: unconfirmed, + Spendable: spendable, }); err != nil { w.log.Errorf("failed to record wallet metric, err: %v", err) return From b140865c8e153a2e74b6f3af4f7e84a844d3852e Mon Sep 17 00:00:00 2001 From: PJ Date: Wed, 29 Nov 2023 10:14:35 +0100 Subject: [PATCH 3/7] bus: fix params --- bus/bus.go | 4 ++-- bus/client/metrics.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/bus/bus.go b/bus/bus.go index 423241e15..9085b2157 100644 --- a/bus/bus.go +++ b/bus/bus.go @@ -2027,9 +2027,9 @@ func (b *bus) metricsHandlerGET(jc jape.Context) { switch key := jc.PathParam("key"); key { case api.MetricContract: var opts api.ContractMetricsQueryOpts - if jc.DecodeForm("fcid", &opts.ContractID) != nil { + if jc.DecodeForm("contractID", &opts.ContractID) != nil { return - } else if jc.DecodeForm("host", &opts.HostKey) != nil { + } else if jc.DecodeForm("hostKey", &opts.HostKey) != nil { return } else if metrics, err := b.metrics(jc.Request.Context(), key, start, n, interval, opts); jc.Check("failed to get contract metrics", err) != nil { return diff --git a/bus/client/metrics.go b/bus/client/metrics.go index f8b1e4270..757f30ded 100644 --- a/bus/client/metrics.go +++ b/bus/client/metrics.go @@ -20,7 +20,7 @@ func (c *Client) ContractMetrics(ctx context.Context, start time.Time, n uint64, values.Set("n", fmt.Sprint(n)) values.Set("interval", api.DurationMS(interval).String()) if opts.ContractID != (types.FileContractID{}) { - values.Set("fcid", opts.ContractID.String()) + values.Set("contractID", opts.ContractID.String()) } if opts.HostKey != (types.PublicKey{}) { values.Set("hostKey", opts.HostKey.String()) From 1a40582afd75b83e2b953062b23d2ce6670f72f5 Mon Sep 17 00:00:00 2001 From: PJ Date: Wed, 29 Nov 2023 15:50:17 +0100 Subject: [PATCH 4/7] stores: remove address --- api/metrcis.go | 6 +----- bus/bus.go | 4 +--- stores/metrics.go | 17 +++-------------- stores/metrics_test.go | 16 ---------------- stores/migrations_metrics.go | 15 ++++++++++++++- stores/types.go | 24 ------------------------ wallet/wallet.go | 1 - 7 files changed, 19 insertions(+), 64 deletions(-) diff --git a/api/metrcis.go b/api/metrcis.go index 536dc968c..88337359f 100644 --- a/api/metrcis.go +++ b/api/metrcis.go @@ -80,16 +80,12 @@ type ( WalletMetric struct { Timestamp time.Time `json:"timestamp"` - Address types.Address `json:"address"` - Confirmed types.Currency `json:"confirmed"` Spendable types.Currency `json:"spendable"` Unconfirmed types.Currency `json:"unconfirmed"` } - WalletMetricsQueryOpts struct { - Address types.Address - } + WalletMetricsQueryOpts struct{} ) type ( diff --git a/bus/bus.go b/bus/bus.go index 9085b2157..5e8bc00b7 100644 --- a/bus/bus.go +++ b/bus/bus.go @@ -2063,9 +2063,7 @@ func (b *bus) metricsHandlerGET(jc jape.Context) { } case api.MetricWallet: var opts api.WalletMetricsQueryOpts - if jc.DecodeForm("address", &opts.Address) != nil { - return - } else if metrics, err := b.metrics(jc.Request.Context(), key, start, n, interval, opts); jc.Check("failed to get wallet metrics", err) != nil { + if metrics, err := b.metrics(jc.Request.Context(), key, start, n, interval, opts); jc.Check("failed to get wallet metrics", err) != nil { return } else { jc.Encode(metrics) diff --git a/stores/metrics.go b/stores/metrics.go index a46858041..419ca4602 100644 --- a/stores/metrics.go +++ b/stores/metrics.go @@ -81,8 +81,6 @@ type ( Model Timestamp unixTimeMS `gorm:"index;NOT NULL"` - Address address `gorm:"index;size:32;NOT NULL"` - ConfirmedLo unsigned64 `gorm:"index:idx_confirmed;NOT NULL"` ConfirmedHi unsigned64 `gorm:"index:idx_confirmed;NOT NULL"` SpendableLo unsigned64 `gorm:"index:idx_spendable;NOT NULL"` @@ -235,7 +233,6 @@ func (s *SQLStore) RecordWalletMetric(ctx context.Context, metrics ...api.Wallet for i, metric := range metrics { dbMetrics[i] = dbWalletMetric{ Timestamp: unixTimeMS(metric.Timestamp), - Address: address(metric.Address), ConfirmedLo: unsigned64(metric.Confirmed.Lo), ConfirmedHi: unsigned64(metric.Confirmed.Hi), SpendableLo: unsigned64(metric.Spendable.Lo), @@ -273,7 +270,6 @@ func (s *SQLStore) WalletMetrics(ctx context.Context, start time.Time, n uint64, for i := range resp { resp[i] = api.WalletMetric{ Timestamp: time.Time(metrics[i].Timestamp).UTC(), - Address: types.Address(metrics[i].Address), Confirmed: toCurr(metrics[i].ConfirmedLo, metrics[i].ConfirmedHi), Spendable: toCurr(metrics[i].SpendableLo, metrics[i].SpendableHi), Unconfirmed: toCurr(metrics[i].UnconfirmedLo, metrics[i].UnconfirmedHi), @@ -361,19 +357,12 @@ func (s *SQLStore) findPeriods(tx *gorm.DB, dst interface{}, start time.Time, n Error } -func (s *SQLStore) walletMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.WalletMetricsQueryOpts) ([]dbWalletMetric, error) { - tx := s.dbMetrics - if opts.Address != (types.Address{}) { - tx = tx.Where("address", address(opts.Address)) - } - - var metrics []dbWalletMetric - err := s.findPeriods(tx, &metrics, start, n, interval) +func (s *SQLStore) walletMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.WalletMetricsQueryOpts) (metrics []dbWalletMetric, err error) { + err = s.findPeriods(s.dbMetrics, &metrics, start, n, interval) if err != nil { return nil, fmt.Errorf("failed to fetch contract metrics: %w", err) } - - return metrics, nil + return } func (s *SQLStore) performanceMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.PerformanceMetricsQueryOpts) ([]dbPerformanceMetric, error) { diff --git a/stores/metrics_test.go b/stores/metrics_test.go index 07a502e4d..473a5d666 100644 --- a/stores/metrics_test.go +++ b/stores/metrics_test.go @@ -313,7 +313,6 @@ func TestWalletMetrics(t *testing.T) { for _, recordedTime := range times { metric := api.WalletMetric{ Timestamp: recordedTime, - Address: types.Address{1}, Confirmed: types.NewCurrency(frand.Uint64n(math.MaxUint64), frand.Uint64n(math.MaxUint64)), Unconfirmed: types.NewCurrency(frand.Uint64n(math.MaxUint64), frand.Uint64n(math.MaxUint64)), Spendable: types.NewCurrency(frand.Uint64n(math.MaxUint64), frand.Uint64n(math.MaxUint64)), @@ -334,19 +333,4 @@ func TestWalletMetrics(t *testing.T) { }) { t.Fatal("expected metrics to be sorted by time") } - - // Query by address - metrics, err = ss.WalletMetrics(context.Background(), time.UnixMilli(1), 3, time.Millisecond, api.WalletMetricsQueryOpts{Address: types.Address{1}}) - if err != nil { - t.Fatal(err) - } else if len(metrics) != 3 { - t.Fatalf("expected 3 metrics, got %v", len(metrics)) - } - - metrics, err = ss.WalletMetrics(context.Background(), time.UnixMilli(1), 3, time.Millisecond, api.WalletMetricsQueryOpts{Address: types.Address{2}}) - if err != nil { - t.Fatal(err) - } else if len(metrics) != 0 { - t.Fatalf("expected 0 metrics, got %v", len(metrics)) - } } diff --git a/stores/migrations_metrics.go b/stores/migrations_metrics.go index fd2918bc3..581ad8198 100644 --- a/stores/migrations_metrics.go +++ b/stores/migrations_metrics.go @@ -2,6 +2,7 @@ package stores import ( "fmt" + "time" "github.com/go-gormigrate/gormigrate/v2" "go.uber.org/zap" @@ -55,7 +56,19 @@ func performMetricsMigrations(db *gorm.DB, logger *zap.SugaredLogger) error { func performMigration00001_wallet_metrics(txn *gorm.DB, logger *zap.SugaredLogger) error { logger.Info("performing migration 00001_wallet_metrics") - if err := txn.Migrator().AutoMigrate(&dbWalletMetric{}); err != nil { + if err := txn.Table("wallets").Migrator().AutoMigrate(&struct { + ID uint `gorm:"primarykey"` + CreatedAt time.Time + + Timestamp unixTimeMS `gorm:"index;NOT NULL"` + + ConfirmedLo unsigned64 `gorm:"index:idx_confirmed;NOT NULL"` + ConfirmedHi unsigned64 `gorm:"index:idx_confirmed;NOT NULL"` + SpendableLo unsigned64 `gorm:"index:idx_spendable;NOT NULL"` + SpendableHi unsigned64 `gorm:"index:idx_spendable;NOT NULL"` + UnconfirmedLo unsigned64 `gorm:"index:idx_unconfirmed;NOT NULL"` + UnconfirmedHi unsigned64 `gorm:"index:idx_unconfirmed;NOT NULL"` + }{}); err != nil { return err } logger.Info("migration 00001_wallet_metrics complete") diff --git a/stores/types.go b/stores/types.go index 864de3c3a..bbbf19594 100644 --- a/stores/types.go +++ b/stores/types.go @@ -18,7 +18,6 @@ import ( var zeroCurrency = currency(types.ZeroCurrency) type ( - address types.Address unixTimeMS time.Time datetime time.Time currency types.Currency @@ -32,29 +31,6 @@ type ( secretKey []byte ) -// GormDataType implements gorm.GormDataTypeInterface. -func (address) GormDataType() string { - return "bytes" -} - -// Scan scan value into address, implements sql.Scanner interface. -func (a *address) Scan(value interface{}) error { - bytes, ok := value.([]byte) - if !ok { - return errors.New(fmt.Sprint("failed to unmarshal address value:", value)) - } - if len(bytes) < len(address{}) { - return fmt.Errorf("failed to unmarshal address value due to insufficient bytes %v < %v: %v", len(bytes), len(address{}), value) - } - *a = *(*address)(bytes) - return nil -} - -// Value returns an address value, implements driver.Valuer interface. -func (a address) Value() (driver.Value, error) { - return a[:], nil -} - // GormDataType implements gorm.GormDataTypeInterface. func (secretKey) GormDataType() string { return "bytes" diff --git a/wallet/wallet.go b/wallet/wallet.go index b76e9715c..35da64287 100644 --- a/wallet/wallet.go +++ b/wallet/wallet.go @@ -442,7 +442,6 @@ func (w *SingleAddressWallet) ProcessConsensusChange(cc modules.ConsensusChange) // record wallet metric if err := w.store.RecordWalletMetric(ctx, api.WalletMetric{ Timestamp: time.Now().UTC(), - Address: w.addr, Confirmed: confirmed, Unconfirmed: unconfirmed, Spendable: spendable, From 5dd22fe2794b78baca6ce3de64eaac6dd73a8971 Mon Sep 17 00:00:00 2001 From: PJ Date: Wed, 29 Nov 2023 17:32:14 +0100 Subject: [PATCH 5/7] stores: remove synced bool --- stores/hostdb_test.go | 2 +- stores/sql.go | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/stores/hostdb_test.go b/stores/hostdb_test.go index d4bbf13c6..a61f9eea3 100644 --- a/stores/hostdb_test.go +++ b/stores/hostdb_test.go @@ -1104,7 +1104,7 @@ func (s *SQLStore) addCustomTestHost(hk types.PublicKey, na string) error { announcement: hostdb.Announcement{NetAddress: na}, }}...) s.lastSave = time.Now().Add(s.persistInterval * -2) - return s.applyUpdates(false, false) + return s.applyUpdates(false) } // hosts returns all hosts in the db. Only used in testing since preloading all diff --git a/stores/sql.go b/stores/sql.go index 6127d72ea..5a8a414c7 100644 --- a/stores/sql.go +++ b/stores/sql.go @@ -346,7 +346,7 @@ func (ss *SQLStore) ProcessConsensusChange(cc modules.ConsensusChange) { } // Try to apply the updates. - if err := ss.applyUpdates(false, cc.Synced); err != nil { + if err := ss.applyUpdates(false); err != nil { ss.logger.Error(fmt.Sprintf("failed to apply updates, err: %v", err)) } @@ -368,14 +368,14 @@ func (ss *SQLStore) ProcessConsensusChange(cc modules.ConsensusChange) { ss.persistMu.Lock() defer ss.persistMu.Unlock() - if err := ss.applyUpdates(true, cc.Synced); err != nil { + if err := ss.applyUpdates(true); err != nil { ss.logger.Error(fmt.Sprintf("failed to apply updates, err: %v", err)) } }) } // applyUpdates applies all unapplied updates to the database. -func (ss *SQLStore) applyUpdates(force, synced bool) (err error) { +func (ss *SQLStore) applyUpdates(force bool) (err error) { // Check if we need to apply changes persistIntervalPassed := time.Since(ss.lastSave) > ss.persistInterval // enough time has passed since last persist softLimitReached := len(ss.unappliedAnnouncements) >= announcementBatchSoftLimit // enough announcements have accumulated From 9c7d9731431043492dade00f8ea676b9b2d78f14 Mon Sep 17 00:00:00 2001 From: PJ Date: Thu, 30 Nov 2023 10:21:02 +0100 Subject: [PATCH 6/7] bus: add client method --- bus/client/metrics.go | 13 +++++++++++++ wallet/wallet.go | 2 +- 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/bus/client/metrics.go b/bus/client/metrics.go index 757f30ded..9037a1b17 100644 --- a/bus/client/metrics.go +++ b/bus/client/metrics.go @@ -71,6 +71,19 @@ func (c *Client) ContractSetMetrics(ctx context.Context, start time.Time, n uint return resp, nil } +func (c *Client) WalletMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.WalletMetricsQueryOpts) ([]api.WalletMetric, error) { + values := url.Values{} + values.Set("start", api.TimeRFC3339(start).String()) + values.Set("n", fmt.Sprint(n)) + values.Set("interval", api.DurationMS(interval).String()) + + var resp []api.WalletMetric + if err := c.metric(ctx, api.MetricWallet, values, &resp); err != nil { + return nil, err + } + return resp, nil +} + func (c *Client) RecordContractSetChurnMetric(ctx context.Context, metrics ...api.ContractSetChurnMetric) error { return c.c.WithContext(ctx).PUT(fmt.Sprintf("/metric/%s", api.MetricContractSetChurn), api.ContractSetChurnMetricRequestPUT{ Metrics: metrics, diff --git a/wallet/wallet.go b/wallet/wallet.go index 35da64287..0a30246fd 100644 --- a/wallet/wallet.go +++ b/wallet/wallet.go @@ -441,7 +441,7 @@ func (w *SingleAddressWallet) ProcessConsensusChange(cc modules.ConsensusChange) // record wallet metric if err := w.store.RecordWalletMetric(ctx, api.WalletMetric{ - Timestamp: time.Now().UTC(), + Timestamp: time.Now(), Confirmed: confirmed, Unconfirmed: unconfirmed, Spendable: spendable, From 290887e6b819c4a98ee12918ec93c14fb97fe98e Mon Sep 17 00:00:00 2001 From: PJ Date: Thu, 30 Nov 2023 11:03:53 +0100 Subject: [PATCH 7/7] store: fix error copy --- stores/metrics.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/stores/metrics.go b/stores/metrics.go index 419ca4602..3bb06bb2a 100644 --- a/stores/metrics.go +++ b/stores/metrics.go @@ -360,7 +360,7 @@ func (s *SQLStore) findPeriods(tx *gorm.DB, dst interface{}, start time.Time, n func (s *SQLStore) walletMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.WalletMetricsQueryOpts) (metrics []dbWalletMetric, err error) { err = s.findPeriods(s.dbMetrics, &metrics, start, n, interval) if err != nil { - return nil, fmt.Errorf("failed to fetch contract metrics: %w", err) + return nil, fmt.Errorf("failed to fetch wallet metrics: %w", err) } return }