diff --git a/api/metrics.go b/api/metrics.go index c697193f0..85684a7b5 100644 --- a/api/metrics.go +++ b/api/metrics.go @@ -1,12 +1,19 @@ package api import ( + "fmt" "time" "go.sia.tech/core/types" ) +var ( + ErrMaxIntervalsExceeded = fmt.Errorf("max number of intervals exceeds maximum of %v", MetricMaxIntervals) +) + const ( + MetricMaxIntervals = 1000 + ChurnDirAdded = "added" ChurnDirRemoved = "removed" diff --git a/bus/bus.go b/bus/bus.go index 4850c74b8..d11550595 100644 --- a/bus/bus.go +++ b/bus/bus.go @@ -2083,19 +2083,18 @@ func (b *bus) metricsHandlerGET(jc jape.Context) { } // parse optional query parameters - switch key := jc.PathParam("key"); key { + var metrics interface{} + var err error + key := jc.PathParam("key") + switch key { case api.MetricContract: var opts api.ContractMetricsQueryOpts if jc.DecodeForm("contractID", &opts.ContractID) != nil { return } else if jc.DecodeForm("hostKey", &opts.HostKey) != nil { return - } else if metrics, err := b.metrics(jc.Request.Context(), key, start, n, interval, opts); jc.Check("failed to get contract metrics", err) != nil { - return - } else { - jc.Encode(metrics) - return } + metrics, err = b.metrics(jc.Request.Context(), key, start, n, interval, opts) case api.MetricContractPrune: var opts api.ContractPruneMetricsQueryOpts if jc.DecodeForm("contractID", &opts.ContractID) != nil { @@ -2104,22 +2103,14 @@ func (b *bus) metricsHandlerGET(jc jape.Context) { return } else if jc.DecodeForm("hostVersion", &opts.HostVersion) != nil { return - } else if metrics, err := b.metrics(jc.Request.Context(), key, start, n, interval, opts); jc.Check("failed to get contract prune metrics", err) != nil { - return - } else { - jc.Encode(metrics) - return } + metrics, err = b.metrics(jc.Request.Context(), key, start, n, interval, opts) case api.MetricContractSet: var opts api.ContractSetMetricsQueryOpts if jc.DecodeForm("name", &opts.Name) != nil { return - } else if metrics, err := b.metrics(jc.Request.Context(), key, start, n, interval, opts); jc.Check("failed to get contract set metrics", err) != nil { - return - } else { - jc.Encode(metrics) - return } + metrics, err = b.metrics(jc.Request.Context(), key, start, n, interval, opts) case api.MetricContractSetChurn: var opts api.ContractSetChurnMetricsQueryOpts if jc.DecodeForm("name", &opts.Name) != nil { @@ -2128,24 +2119,22 @@ func (b *bus) metricsHandlerGET(jc jape.Context) { return } else if jc.DecodeForm("reason", &opts.Reason) != nil { return - } else if metrics, err := b.metrics(jc.Request.Context(), key, start, n, interval, opts); jc.Check("failed to get contract churn metrics", err) != nil { - return - } else { - jc.Encode(metrics) - return } + metrics, err = b.metrics(jc.Request.Context(), key, start, n, interval, opts) case api.MetricWallet: var opts api.WalletMetricsQueryOpts - if metrics, err := b.metrics(jc.Request.Context(), key, start, n, interval, opts); jc.Check("failed to get wallet metrics", err) != nil { - return - } else { - jc.Encode(metrics) - return - } + metrics, err = b.metrics(jc.Request.Context(), key, start, n, interval, opts) default: jc.Error(fmt.Errorf("unknown metric '%s'", key), http.StatusBadRequest) return } + if errors.Is(err, api.ErrMaxIntervalsExceeded) { + jc.Error(err, http.StatusBadRequest) + return + } else if jc.Check(fmt.Sprintf("failed to fetch '%s' metrics", key), err) != nil { + return + } + jc.Encode(metrics) } func (b *bus) metrics(ctx context.Context, key string, start time.Time, n uint64, interval time.Duration, opts interface{}) (interface{}, error) { diff --git a/internal/testing/cluster_test.go b/internal/testing/cluster_test.go index 7112f9574..b0de2946e 100644 --- a/internal/testing/cluster_test.go +++ b/internal/testing/cluster_test.go @@ -2210,7 +2210,7 @@ func TestBusRecordedMetrics(t *testing.T) { defer cluster.Shutdown() // Get contract set metrics. - csMetrics, err := cluster.Bus.ContractSetMetrics(context.Background(), startTime, math.MaxUint32, time.Second, api.ContractSetMetricsQueryOpts{}) + csMetrics, err := cluster.Bus.ContractSetMetrics(context.Background(), startTime, api.MetricMaxIntervals, time.Second, api.ContractSetMetricsQueryOpts{}) cluster.tt.OK(err) for i := 0; i < len(csMetrics); i++ { @@ -2234,7 +2234,7 @@ func TestBusRecordedMetrics(t *testing.T) { } // Get churn metrics. Should have 1 for the new contract. - cscMetrics, err := cluster.Bus.ContractSetChurnMetrics(context.Background(), startTime, math.MaxUint32, time.Second, api.ContractSetChurnMetricsQueryOpts{}) + cscMetrics, err := cluster.Bus.ContractSetChurnMetrics(context.Background(), startTime, api.MetricMaxIntervals, time.Second, api.ContractSetChurnMetricsQueryOpts{}) cluster.tt.OK(err) if len(cscMetrics) != 1 { @@ -2253,7 +2253,7 @@ func TestBusRecordedMetrics(t *testing.T) { var cMetrics []api.ContractMetric cluster.tt.Retry(100, 100*time.Millisecond, func() error { // Retry fetching metrics since they are buffered. - cMetrics, err = cluster.Bus.ContractMetrics(context.Background(), startTime, math.MaxUint32, time.Second, api.ContractMetricsQueryOpts{}) + cMetrics, err = cluster.Bus.ContractMetrics(context.Background(), startTime, api.MetricMaxIntervals, time.Second, api.ContractMetricsQueryOpts{}) cluster.tt.OK(err) if len(cMetrics) != 1 { return fmt.Errorf("expected 1 metric, got %v", len(cMetrics)) @@ -2290,7 +2290,7 @@ func TestBusRecordedMetrics(t *testing.T) { // Prune one of the metrics if err := cluster.Bus.PruneMetrics(context.Background(), api.MetricContract, time.Now()); err != nil { t.Fatal(err) - } else if cMetrics, err = cluster.Bus.ContractMetrics(context.Background(), startTime, math.MaxUint32, time.Second, api.ContractMetricsQueryOpts{}); err != nil { + } else if cMetrics, err = cluster.Bus.ContractMetrics(context.Background(), startTime, api.MetricMaxIntervals, time.Second, api.ContractMetricsQueryOpts{}); err != nil { t.Fatal(err) } else if len(cMetrics) > 0 { t.Fatalf("expected 0 metrics, got %v", len(cscMetrics)) diff --git a/stores/metrics.go b/stores/metrics.go index a50edcf39..a4a642bba 100644 --- a/stores/metrics.go +++ b/stores/metrics.go @@ -418,12 +418,12 @@ func (s *SQLStore) PruneMetrics(ctx context.Context, metric string, cutoff time. } func (s *SQLStore) contractMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.ContractMetricsQueryOpts) ([]dbContractMetric, error) { - tx := s.dbMetrics + whereExpr := gorm.Expr("TRUE") if opts.ContractID != (types.FileContractID{}) { - tx = tx.Where("fcid", fileContractID(opts.ContractID)) + whereExpr = gorm.Expr("? AND fcid = ?", whereExpr, fileContractID(opts.ContractID)) } if opts.HostKey != (types.PublicKey{}) { - tx = tx.Where("host", publicKey(opts.HostKey)) + whereExpr = gorm.Expr("? AND host = ?", whereExpr, publicKey(opts.HostKey)) } var metrics []dbContractMetric @@ -435,7 +435,7 @@ func (s *SQLStore) contractMetrics(ctx context.Context, start time.Time, n uint6 } else { // otherwise we return the first metric for each period like we usually // do - err = s.findPeriods(tx, &metrics, start, n, interval) + err = s.findPeriods(dbContractMetric{}.TableName(), &metrics, start, n, interval, whereExpr) } if err != nil { return nil, fmt.Errorf("failed to fetch contract metrics: %w", err) @@ -447,19 +447,19 @@ func (s *SQLStore) contractMetrics(ctx context.Context, start time.Time, n uint6 } func (s *SQLStore) contractPruneMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.ContractPruneMetricsQueryOpts) ([]dbContractPruneMetric, error) { - tx := s.dbMetrics + whereExpr := gorm.Expr("TRUE") if opts.ContractID != (types.FileContractID{}) { - tx = tx.Where("fcid", fileContractID(opts.ContractID)) + whereExpr = gorm.Expr("? AND fcid = ?", whereExpr, fileContractID(opts.ContractID)) } if opts.HostKey != (types.PublicKey{}) { - tx = tx.Where("host", publicKey(opts.HostKey)) + whereExpr = gorm.Expr("? AND host = ?", whereExpr, publicKey(opts.HostKey)) } if opts.HostVersion != "" { - tx = tx.Where("host_version", opts.HostVersion) + whereExpr = gorm.Expr("? AND host_version = ?", whereExpr, opts.HostVersion) } var metrics []dbContractPruneMetric - err := s.findPeriods(tx, &metrics, start, n, interval) + err := s.findPeriods(dbContractPruneMetric{}.TableName(), &metrics, start, n, interval, whereExpr) if err != nil { return nil, fmt.Errorf("failed to fetch contract metrics: %w", err) } @@ -468,18 +468,18 @@ func (s *SQLStore) contractPruneMetrics(ctx context.Context, start time.Time, n } func (s *SQLStore) contractSetChurnMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.ContractSetChurnMetricsQueryOpts) ([]dbContractSetChurnMetric, error) { - tx := s.dbMetrics + whereExpr := gorm.Expr("TRUE") if opts.Name != "" { - tx = tx.Where("name", opts.Name) + whereExpr = gorm.Expr("? AND name = ?", whereExpr, opts.Name) } if opts.Direction != "" { - tx = tx.Where("direction", opts.Direction) + whereExpr = gorm.Expr("? AND direction = ?", whereExpr, opts.Direction) } if opts.Reason != "" { - tx = tx.Where("reason", opts.Reason) + whereExpr = gorm.Expr("? AND reason = ?", whereExpr, opts.Reason) } var metrics []dbContractSetChurnMetric - err := s.findPeriods(tx, &metrics, start, n, interval) + err := s.findPeriods(dbContractSetChurnMetric{}.TableName(), &metrics, start, n, interval, whereExpr) if err != nil { return nil, fmt.Errorf("failed to fetch contract set churn metrics: %w", err) } @@ -490,13 +490,13 @@ func (s *SQLStore) contractSetChurnMetrics(ctx context.Context, start time.Time, } func (s *SQLStore) contractSetMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.ContractSetMetricsQueryOpts) ([]dbContractSetMetric, error) { - tx := s.dbMetrics + whereExpr := gorm.Expr("TRUE") if opts.Name != "" { - tx = tx.Where("name", opts.Name) + whereExpr = gorm.Expr("name = ?", opts.Name) } var metrics []dbContractSetMetric - err := s.findPeriods(tx, &metrics, start, n, interval) + err := s.findPeriods(dbContractSetMetric{}.TableName(), &metrics, start, n, interval, whereExpr) if err != nil { return nil, fmt.Errorf("failed to fetch contract set metrics: %w", err) } @@ -517,36 +517,49 @@ func normaliseTimestamp(start time.Time, interval time.Duration, t unixTimeMS) u return unixTimeMS(time.UnixMilli(normalizedMS)) } -func roundPeriodExpr(db *gorm.DB, start time.Time, interval time.Duration) clause.Expr { - if !isSQLite(db) { - return gorm.Expr("CAST(FLOOR((timestamp - ?) / ?) * ? AS SIGNED)", unixTimeMS(start), interval.Milliseconds(), interval.Milliseconds()) - } else { - return gorm.Expr("(timestamp - ?) / ? * ?", unixTimeMS(start), interval.Milliseconds(), interval.Milliseconds()) - } -} - func (s *SQLStore) findAggregatedContractPeriods(start time.Time, n uint64, interval time.Duration) ([]dbContractMetric, error) { + if n > api.MetricMaxIntervals { + return nil, api.ErrMaxIntervalsExceeded + } end := start.Add(time.Duration(n) * interval) var metricsWithPeriod []struct { Metric dbContractMetric `gorm:"embedded"` Period int64 } - currentPeriod := int64(math.MinInt64) err := s.dbMetrics.Raw(` - SELECT * FROM contracts + WITH RECURSIVE periods AS ( + SELECT ? AS period_start + UNION ALL + SELECT period_start + ? + FROM periods + WHERE period_start < ? - ? + ) + SELECT contracts.*, i.Period FROM contracts INNER JOIN ( - SELECT fcid, MIN(timestamp) as timestamp, ? AS Period - FROM contracts - WHERE timestamp >= ? AND timestamp < ? - GROUP BY Period, fcid) i - ON contracts.fcid = i.fcid AND contracts.timestamp = i.timestamp - `, roundPeriodExpr(s.dbMetrics, start, interval), - unixTimeMS(start), unixTimeMS(end)). + SELECT + p.period_start as Period, + MIN(c.id) AS id + FROM + periods p + INNER JOIN + contracts c ON c.timestamp >= p.period_start AND c.timestamp < p.period_start + ? + GROUP BY + p.period_start, c.fcid + ORDER BY + p.period_start ASC + ) i ON contracts.id = i.id + `, unixTimeMS(start), + interval.Milliseconds(), + unixTimeMS(end), + interval.Milliseconds(), + interval.Milliseconds(), + ). Scan(&metricsWithPeriod). Error if err != nil { return nil, fmt.Errorf("failed to fetch aggregate metrics: %w", err) } + currentPeriod := int64(math.MinInt64) var metrics []dbContractMetric for _, m := range metricsWithPeriod { m.Metric.FCID = fileContractID{} @@ -567,29 +580,47 @@ func (s *SQLStore) findAggregatedContractPeriods(start time.Time, n uint64, inte // split into intervals and the row with the lowest timestamp for each interval // is returned. The result is then joined with the original table to retrieve // only the metrics we want. -func (s *SQLStore) findPeriods(tx *gorm.DB, dst interface{}, start time.Time, n uint64, interval time.Duration) error { +func (s *SQLStore) findPeriods(table string, dst interface{}, start time.Time, n uint64, interval time.Duration, whereExpr clause.Expr) error { + if n > api.MetricMaxIntervals { + return api.ErrMaxIntervalsExceeded + } end := start.Add(time.Duration(n) * interval) - // inner groups all metrics within the requested time range into periods of - // 'interval' length and gives us the min timestamp of each period. - inner := tx.Model(dst). - Select("MIN(timestamp) AS min_time, ? AS period", roundPeriodExpr(tx, start, interval)). - Where("timestamp >= ? AND timestamp < ?", unixTimeMS(start), unixTimeMS(end)). - Group("period") - // mid then joins the result with the original table. This might yield - // duplicates if multiple rows have the same timestamp so we attach a - // row number. We order the rows by id to make the result deterministic. - mid := s.dbMetrics.Model(dst). - Joins("INNER JOIN (?) periods ON timestamp = periods.min_time", inner). - Select("*, ROW_NUMBER() OVER (PARTITION BY periods.min_time ORDER BY id) AS row_num") - // lastly we select all metrics with row number 1 - return s.dbMetrics.Table("(?) numbered", mid). - Where("numbered.row_num = 1"). - Find(dst). + return s.dbMetrics.Raw(fmt.Sprintf(` + WITH RECURSIVE periods AS ( + SELECT ? AS period_start + UNION ALL + SELECT period_start + ? + FROM periods + WHERE period_start < ? - ? + ) + SELECT %s.* FROM %s + INNER JOIN ( + SELECT + p.period_start as Period, + MIN(obj.id) AS id + FROM + periods p + INNER JOIN + %s obj ON obj.timestamp >= p.period_start AND obj.timestamp < p.period_start + ? + WHERE ? + GROUP BY + p.period_start + ORDER BY + p.period_start ASC + ) i ON %s.id = i.id + `, table, table, table, table), + unixTimeMS(start), + interval.Milliseconds(), + unixTimeMS(end), + interval.Milliseconds(), + interval.Milliseconds(), + whereExpr, + ).Scan(dst). Error } func (s *SQLStore) walletMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.WalletMetricsQueryOpts) (metrics []dbWalletMetric, err error) { - err = s.findPeriods(s.dbMetrics, &metrics, start, n, interval) + err = s.findPeriods(dbWalletMetric{}.TableName(), &metrics, start, n, interval, gorm.Expr("TRUE")) if err != nil { return nil, fmt.Errorf("failed to fetch wallet metrics: %w", err) } @@ -600,19 +631,19 @@ func (s *SQLStore) walletMetrics(ctx context.Context, start time.Time, n uint64, } func (s *SQLStore) performanceMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.PerformanceMetricsQueryOpts) ([]dbPerformanceMetric, error) { - tx := s.dbMetrics + whereExpr := gorm.Expr("TRUE") if opts.Action != "" { - tx = tx.Where("action", opts.Action) + whereExpr = gorm.Expr("? AND action = ?", whereExpr, opts.Action) } if opts.HostKey != (types.PublicKey{}) { - tx = tx.Where("host", publicKey(opts.HostKey)) + whereExpr = gorm.Expr("? AND host = ?", whereExpr, publicKey(opts.HostKey)) } if opts.Origin != "" { - tx = tx.Where("origin", opts.Origin) + whereExpr = gorm.Expr("? AND origin = ?", whereExpr, opts.Origin) } var metrics []dbPerformanceMetric - err := s.findPeriods(tx, &metrics, start, n, interval) + err := s.findPeriods(dbPerformanceMetric{}.TableName(), &metrics, start, n, interval, whereExpr) if err != nil { return nil, fmt.Errorf("failed to fetch performance metrics: %w", err) }