Skip to content

Commit

Permalink
Migrate PerformanceMetrics to raw SQL (#1312)
Browse files Browse the repository at this point in the history
  • Loading branch information
peterjan authored Jun 20, 2024
1 parent 9225732 commit b2404ee
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 53 deletions.
65 changes: 12 additions & 53 deletions stores/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,22 +150,12 @@ func (s *SQLStore) ContractSetMetrics(ctx context.Context, start time.Time, n ui
return
}

func (s *SQLStore) PerformanceMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.PerformanceMetricsQueryOpts) ([]api.PerformanceMetric, error) {
metrics, err := s.performanceMetrics(ctx, start, n, interval, opts)
if err != nil {
return nil, err
}
resp := make([]api.PerformanceMetric, len(metrics))
for i := range resp {
resp[i] = api.PerformanceMetric{
Action: metrics[i].Action,
HostKey: types.PublicKey(metrics[i].Host),
Origin: metrics[i].Origin,
Duration: metrics[i].Duration,
Timestamp: api.TimeRFC3339(time.Time(metrics[i].Timestamp).UTC()),
}
}
return resp, nil
func (s *SQLStore) PerformanceMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.PerformanceMetricsQueryOpts) (metrics []api.PerformanceMetric, err error) {
err = s.bMetrics.Transaction(ctx, func(tx sql.MetricsDatabaseTx) (txErr error) {
metrics, txErr = tx.PerformanceMetrics(ctx, start, n, interval, opts)
return
})
return
}

func (s *SQLStore) RecordContractMetric(ctx context.Context, metrics ...api.ContractMetric) error {
Expand All @@ -192,6 +182,12 @@ func (s *SQLStore) RecordContractSetMetric(ctx context.Context, metrics ...api.C
})
}

func (s *SQLStore) RecordPerformanceMetric(ctx context.Context, metrics ...api.PerformanceMetric) error {
return s.bMetrics.Transaction(ctx, func(tx sql.MetricsDatabaseTx) error {
return tx.RecordPerformanceMetric(ctx, metrics...)
})
}

func (s *SQLStore) RecordWalletMetric(ctx context.Context, metrics ...api.WalletMetric) error {
dbMetrics := make([]dbWalletMetric, len(metrics))
for i, metric := range metrics {
Expand All @@ -210,22 +206,6 @@ func (s *SQLStore) RecordWalletMetric(ctx context.Context, metrics ...api.Wallet
})
}

func (s *SQLStore) RecordPerformanceMetric(ctx context.Context, metrics ...api.PerformanceMetric) error {
dbMetrics := make([]dbPerformanceMetric, len(metrics))
for i, metric := range metrics {
dbMetrics[i] = dbPerformanceMetric{
Action: metric.Action,
Duration: metric.Duration,
Host: publicKey(metric.HostKey),
Origin: metric.Origin,
Timestamp: unixTimeMS(metric.Timestamp),
}
}
return s.dbMetrics.Transaction(func(tx *gorm.DB) error {
return tx.Create(&dbMetrics).Error
})
}

func (s *SQLStore) WalletMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.WalletMetricsQueryOpts) ([]api.WalletMetric, error) {
metrics, err := s.walletMetrics(ctx, start, n, interval, opts)
if err != nil {
Expand Down Expand Up @@ -338,24 +318,3 @@ func (s *SQLStore) walletMetrics(ctx context.Context, start time.Time, n uint64,
}
return
}

func (s *SQLStore) performanceMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.PerformanceMetricsQueryOpts) ([]dbPerformanceMetric, error) {
whereExpr := gorm.Expr("TRUE")
if opts.Action != "" {
whereExpr = gorm.Expr("? AND action = ?", whereExpr, opts.Action)
}
if opts.HostKey != (types.PublicKey{}) {
whereExpr = gorm.Expr("? AND host = ?", whereExpr, publicKey(opts.HostKey))
}
if opts.Origin != "" {
whereExpr = gorm.Expr("? AND origin = ?", whereExpr, opts.Origin)
}

var metrics []dbPerformanceMetric
err := s.findPeriods(ctx, dbPerformanceMetric{}.TableName(), &metrics, start, n, interval, whereExpr)
if err != nil {
return nil, fmt.Errorf("failed to fetch performance metrics: %w", err)
}

return metrics, nil
}
6 changes: 6 additions & 0 deletions stores/sql/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,9 @@ type (
// time range and options.
ContractSetMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.ContractSetMetricsQueryOpts) ([]api.ContractSetMetric, error)

// PerformanceMetrics returns performance metrics for the given time range
PerformanceMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.PerformanceMetricsQueryOpts) ([]api.PerformanceMetric, error)

// RecordContractMetric records contract metrics.
RecordContractMetric(ctx context.Context, metrics ...api.ContractMetric) error

Expand All @@ -286,6 +289,9 @@ type (

// RecordContractSetMetric records contract set metrics.
RecordContractSetMetric(ctx context.Context, metrics ...api.ContractSetMetric) error

// RecordPerformanceMetric records performance metrics.
RecordPerformanceMetric(ctx context.Context, metrics ...api.PerformanceMetric) error
}

UsedContract struct {
Expand Down
65 changes: 65 additions & 0 deletions stores/sql/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,29 @@ func ContractSetMetrics(ctx context.Context, tx sql.Tx, start time.Time, n uint6
})
}

func PerformanceMetrics(ctx context.Context, tx sql.Tx, start time.Time, n uint64, interval time.Duration, opts api.PerformanceMetricsQueryOpts) ([]api.PerformanceMetric, error) {
return queryPeriods(ctx, tx, start, n, interval, opts, func(rows *sql.LoggedRows) (m api.PerformanceMetric, err error) {
var placeHolder int64
var placeHolderTime time.Time
var timestamp UnixTimeMS
err = rows.Scan(
&placeHolder,
&placeHolderTime,
&timestamp,
&m.Action,
(*PublicKey)(&m.HostKey),
&m.Origin,
&m.Duration,
)
if err != nil {
err = fmt.Errorf("failed to scan contract set metric: %w", err)
return
}
m.Timestamp = api.TimeRFC3339(normaliseTimestamp(start, interval, timestamp))
return
})
}

func RecordContractMetric(ctx context.Context, tx sql.Tx, metrics ...api.ContractMetric) error {
insertStmt, err := tx.Prepare(ctx, "INSERT INTO contracts (created_at, timestamp, fcid, host, remaining_collateral_lo, remaining_collateral_hi, remaining_funds_lo, remaining_funds_hi, revision_number, upload_spending_lo, upload_spending_hi, download_spending_lo, download_spending_hi, fund_account_spending_lo, fund_account_spending_hi, delete_spending_lo, delete_spending_hi, list_spending_lo, list_spending_hi) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")
if err != nil {
Expand Down Expand Up @@ -284,6 +307,34 @@ func RecordContractSetMetric(ctx context.Context, tx sql.Tx, metrics ...api.Cont
return nil
}

func RecordPerformanceMetric(ctx context.Context, tx sql.Tx, metrics ...api.PerformanceMetric) error {
insertStmt, err := tx.Prepare(ctx, "INSERT INTO performance (created_at, timestamp, action, host, origin, duration) VALUES (?, ?, ?, ?, ?, ?)")
if err != nil {
return fmt.Errorf("failed to prepare statement to insert performance metric: %w", err)
}
defer insertStmt.Close()

for _, metric := range metrics {
res, err := insertStmt.Exec(ctx,
time.Now().UTC(),
UnixTimeMS(metric.Timestamp),
metric.Action,
PublicKey(metric.HostKey),
metric.Origin,
metric.Duration,
)
if err != nil {
return fmt.Errorf("failed to insert performance metric: %w", err)
} else if n, err := res.RowsAffected(); err != nil {
return fmt.Errorf("failed to get rows affected: %w", err)
} else if n == 0 {
return fmt.Errorf("failed to insert performance metric: no rows affected")
}
}

return nil
}

func queryPeriods[T any](ctx context.Context, tx sql.Tx, start time.Time, n uint64, interval time.Duration, opts interface{}, scanRowFn func(*sql.LoggedRows) (T, error)) ([]T, error) {
if n > api.MetricMaxIntervals {
return nil, api.ErrMaxIntervalsExceeded
Expand Down Expand Up @@ -459,6 +510,20 @@ func whereClauseFromQueryOpts(opts interface{}) (where whereClause, _ error) {
where.query += " AND name = ?"
where.params = append(where.params, opts.Name)
}
case api.PerformanceMetricsQueryOpts:
where.table = "performance"
if opts.Action != "" {
where.query += " AND action = ?"
where.params = append(where.params, opts.Action)
}
if opts.HostKey != (types.PublicKey{}) {
where.query += " AND host = ?"
where.params = append(where.params, PublicKey(opts.HostKey))
}
if opts.Origin != "" {
where.query += " AND origin = ?"
where.params = append(where.params, opts.Origin)
}
default:
return whereClause{}, fmt.Errorf("unknown query opts type: %T", opts)
}
Expand Down
8 changes: 8 additions & 0 deletions stores/sql/mysql/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ func (tx *MetricsDatabaseTx) ContractSetMetrics(ctx context.Context, start time.
return ssql.ContractSetMetrics(ctx, tx, start, n, interval, opts)
}

func (tx *MetricsDatabaseTx) PerformanceMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.PerformanceMetricsQueryOpts) ([]api.PerformanceMetric, error) {
return ssql.PerformanceMetrics(ctx, tx, start, n, interval, opts)
}

func (tx *MetricsDatabaseTx) RecordContractMetric(ctx context.Context, metrics ...api.ContractMetric) error {
return ssql.RecordContractMetric(ctx, tx, metrics...)
}
Expand All @@ -103,3 +107,7 @@ func (tx *MetricsDatabaseTx) RecordContractSetChurnMetric(ctx context.Context, m
func (tx *MetricsDatabaseTx) RecordContractSetMetric(ctx context.Context, metrics ...api.ContractSetMetric) error {
return ssql.RecordContractSetMetric(ctx, tx, metrics...)
}

func (tx *MetricsDatabaseTx) RecordPerformanceMetric(ctx context.Context, metrics ...api.PerformanceMetric) error {
return ssql.RecordPerformanceMetric(ctx, tx, metrics...)
}
8 changes: 8 additions & 0 deletions stores/sql/sqlite/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ func (tx *MetricsDatabaseTx) ContractSetMetrics(ctx context.Context, start time.
return ssql.ContractSetMetrics(ctx, tx, start, n, interval, opts)
}

func (tx *MetricsDatabaseTx) PerformanceMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.PerformanceMetricsQueryOpts) ([]api.PerformanceMetric, error) {
return ssql.PerformanceMetrics(ctx, tx, start, n, interval, opts)
}

func (tx *MetricsDatabaseTx) RecordContractMetric(ctx context.Context, metrics ...api.ContractMetric) error {
return ssql.RecordContractMetric(ctx, tx, metrics...)
}
Expand All @@ -102,3 +106,7 @@ func (tx *MetricsDatabaseTx) RecordContractSetChurnMetric(ctx context.Context, m
func (tx *MetricsDatabaseTx) RecordContractSetMetric(ctx context.Context, metrics ...api.ContractSetMetric) error {
return ssql.RecordContractSetMetric(ctx, tx, metrics...)
}

func (tx *MetricsDatabaseTx) RecordPerformanceMetric(ctx context.Context, metrics ...api.PerformanceMetric) error {
return ssql.RecordPerformanceMetric(ctx, tx, metrics...)
}

0 comments on commit b2404ee

Please sign in to comment.