Skip to content

Commit

Permalink
Merge pull request #907 from SiaFoundation/chris/improve-contract-met…
Browse files Browse the repository at this point in the history
…rics-query

Improve metrics queries
  • Loading branch information
ChrisSchinnerl authored Jan 19, 2024
2 parents cef20e4 + 08f1abd commit 4affd8a
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 88 deletions.
7 changes: 7 additions & 0 deletions api/metrics.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
package api

import (
"fmt"
"time"

"go.sia.tech/core/types"
)

var (
ErrMaxIntervalsExceeded = fmt.Errorf("max number of intervals exceeds maximum of %v", MetricMaxIntervals)
)

const (
MetricMaxIntervals = 1000

ChurnDirAdded = "added"
ChurnDirRemoved = "removed"

Expand Down
43 changes: 16 additions & 27 deletions bus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -2083,19 +2083,18 @@ func (b *bus) metricsHandlerGET(jc jape.Context) {
}

// parse optional query parameters
switch key := jc.PathParam("key"); key {
var metrics interface{}
var err error
key := jc.PathParam("key")
switch key {
case api.MetricContract:
var opts api.ContractMetricsQueryOpts
if jc.DecodeForm("contractID", &opts.ContractID) != nil {
return
} else if jc.DecodeForm("hostKey", &opts.HostKey) != nil {
return
} else if metrics, err := b.metrics(jc.Request.Context(), key, start, n, interval, opts); jc.Check("failed to get contract metrics", err) != nil {
return
} else {
jc.Encode(metrics)
return
}
metrics, err = b.metrics(jc.Request.Context(), key, start, n, interval, opts)
case api.MetricContractPrune:
var opts api.ContractPruneMetricsQueryOpts
if jc.DecodeForm("contractID", &opts.ContractID) != nil {
Expand All @@ -2104,22 +2103,14 @@ func (b *bus) metricsHandlerGET(jc jape.Context) {
return
} else if jc.DecodeForm("hostVersion", &opts.HostVersion) != nil {
return
} else if metrics, err := b.metrics(jc.Request.Context(), key, start, n, interval, opts); jc.Check("failed to get contract prune metrics", err) != nil {
return
} else {
jc.Encode(metrics)
return
}
metrics, err = b.metrics(jc.Request.Context(), key, start, n, interval, opts)
case api.MetricContractSet:
var opts api.ContractSetMetricsQueryOpts
if jc.DecodeForm("name", &opts.Name) != nil {
return
} else if metrics, err := b.metrics(jc.Request.Context(), key, start, n, interval, opts); jc.Check("failed to get contract set metrics", err) != nil {
return
} else {
jc.Encode(metrics)
return
}
metrics, err = b.metrics(jc.Request.Context(), key, start, n, interval, opts)
case api.MetricContractSetChurn:
var opts api.ContractSetChurnMetricsQueryOpts
if jc.DecodeForm("name", &opts.Name) != nil {
Expand All @@ -2128,24 +2119,22 @@ func (b *bus) metricsHandlerGET(jc jape.Context) {
return
} else if jc.DecodeForm("reason", &opts.Reason) != nil {
return
} else if metrics, err := b.metrics(jc.Request.Context(), key, start, n, interval, opts); jc.Check("failed to get contract churn metrics", err) != nil {
return
} else {
jc.Encode(metrics)
return
}
metrics, err = b.metrics(jc.Request.Context(), key, start, n, interval, opts)
case api.MetricWallet:
var opts api.WalletMetricsQueryOpts
if metrics, err := b.metrics(jc.Request.Context(), key, start, n, interval, opts); jc.Check("failed to get wallet metrics", err) != nil {
return
} else {
jc.Encode(metrics)
return
}
metrics, err = b.metrics(jc.Request.Context(), key, start, n, interval, opts)
default:
jc.Error(fmt.Errorf("unknown metric '%s'", key), http.StatusBadRequest)
return
}
if errors.Is(err, api.ErrMaxIntervalsExceeded) {
jc.Error(err, http.StatusBadRequest)
return
} else if jc.Check(fmt.Sprintf("failed to fetch '%s' metrics", key), err) != nil {
return
}
jc.Encode(metrics)
}

func (b *bus) metrics(ctx context.Context, key string, start time.Time, n uint64, interval time.Duration, opts interface{}) (interface{}, error) {
Expand Down
8 changes: 4 additions & 4 deletions internal/testing/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2210,7 +2210,7 @@ func TestBusRecordedMetrics(t *testing.T) {
defer cluster.Shutdown()

// Get contract set metrics.
csMetrics, err := cluster.Bus.ContractSetMetrics(context.Background(), startTime, math.MaxUint32, time.Second, api.ContractSetMetricsQueryOpts{})
csMetrics, err := cluster.Bus.ContractSetMetrics(context.Background(), startTime, api.MetricMaxIntervals, time.Second, api.ContractSetMetricsQueryOpts{})
cluster.tt.OK(err)

for i := 0; i < len(csMetrics); i++ {
Expand All @@ -2234,7 +2234,7 @@ func TestBusRecordedMetrics(t *testing.T) {
}

// Get churn metrics. Should have 1 for the new contract.
cscMetrics, err := cluster.Bus.ContractSetChurnMetrics(context.Background(), startTime, math.MaxUint32, time.Second, api.ContractSetChurnMetricsQueryOpts{})
cscMetrics, err := cluster.Bus.ContractSetChurnMetrics(context.Background(), startTime, api.MetricMaxIntervals, time.Second, api.ContractSetChurnMetricsQueryOpts{})
cluster.tt.OK(err)

if len(cscMetrics) != 1 {
Expand All @@ -2253,7 +2253,7 @@ func TestBusRecordedMetrics(t *testing.T) {
var cMetrics []api.ContractMetric
cluster.tt.Retry(100, 100*time.Millisecond, func() error {
// Retry fetching metrics since they are buffered.
cMetrics, err = cluster.Bus.ContractMetrics(context.Background(), startTime, math.MaxUint32, time.Second, api.ContractMetricsQueryOpts{})
cMetrics, err = cluster.Bus.ContractMetrics(context.Background(), startTime, api.MetricMaxIntervals, time.Second, api.ContractMetricsQueryOpts{})
cluster.tt.OK(err)
if len(cMetrics) != 1 {
return fmt.Errorf("expected 1 metric, got %v", len(cMetrics))
Expand Down Expand Up @@ -2290,7 +2290,7 @@ func TestBusRecordedMetrics(t *testing.T) {
// Prune one of the metrics
if err := cluster.Bus.PruneMetrics(context.Background(), api.MetricContract, time.Now()); err != nil {
t.Fatal(err)
} else if cMetrics, err = cluster.Bus.ContractMetrics(context.Background(), startTime, math.MaxUint32, time.Second, api.ContractMetricsQueryOpts{}); err != nil {
} else if cMetrics, err = cluster.Bus.ContractMetrics(context.Background(), startTime, api.MetricMaxIntervals, time.Second, api.ContractMetricsQueryOpts{}); err != nil {
t.Fatal(err)
} else if len(cMetrics) > 0 {
t.Fatalf("expected 0 metrics, got %v", len(cscMetrics))
Expand Down
145 changes: 88 additions & 57 deletions stores/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,12 +418,12 @@ func (s *SQLStore) PruneMetrics(ctx context.Context, metric string, cutoff time.
}

func (s *SQLStore) contractMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.ContractMetricsQueryOpts) ([]dbContractMetric, error) {
tx := s.dbMetrics
whereExpr := gorm.Expr("TRUE")
if opts.ContractID != (types.FileContractID{}) {
tx = tx.Where("fcid", fileContractID(opts.ContractID))
whereExpr = gorm.Expr("? AND fcid = ?", whereExpr, fileContractID(opts.ContractID))
}
if opts.HostKey != (types.PublicKey{}) {
tx = tx.Where("host", publicKey(opts.HostKey))
whereExpr = gorm.Expr("? AND host = ?", whereExpr, publicKey(opts.HostKey))
}

var metrics []dbContractMetric
Expand All @@ -435,7 +435,7 @@ func (s *SQLStore) contractMetrics(ctx context.Context, start time.Time, n uint6
} else {
// otherwise we return the first metric for each period like we usually
// do
err = s.findPeriods(tx, &metrics, start, n, interval)
err = s.findPeriods(dbContractMetric{}.TableName(), &metrics, start, n, interval, whereExpr)
}
if err != nil {
return nil, fmt.Errorf("failed to fetch contract metrics: %w", err)
Expand All @@ -447,19 +447,19 @@ func (s *SQLStore) contractMetrics(ctx context.Context, start time.Time, n uint6
}

func (s *SQLStore) contractPruneMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.ContractPruneMetricsQueryOpts) ([]dbContractPruneMetric, error) {
tx := s.dbMetrics
whereExpr := gorm.Expr("TRUE")
if opts.ContractID != (types.FileContractID{}) {
tx = tx.Where("fcid", fileContractID(opts.ContractID))
whereExpr = gorm.Expr("? AND fcid = ?", whereExpr, fileContractID(opts.ContractID))
}
if opts.HostKey != (types.PublicKey{}) {
tx = tx.Where("host", publicKey(opts.HostKey))
whereExpr = gorm.Expr("? AND host = ?", whereExpr, publicKey(opts.HostKey))
}
if opts.HostVersion != "" {
tx = tx.Where("host_version", opts.HostVersion)
whereExpr = gorm.Expr("? AND host_version = ?", whereExpr, opts.HostVersion)
}

var metrics []dbContractPruneMetric
err := s.findPeriods(tx, &metrics, start, n, interval)
err := s.findPeriods(dbContractPruneMetric{}.TableName(), &metrics, start, n, interval, whereExpr)
if err != nil {
return nil, fmt.Errorf("failed to fetch contract metrics: %w", err)
}
Expand All @@ -468,18 +468,18 @@ func (s *SQLStore) contractPruneMetrics(ctx context.Context, start time.Time, n
}

func (s *SQLStore) contractSetChurnMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.ContractSetChurnMetricsQueryOpts) ([]dbContractSetChurnMetric, error) {
tx := s.dbMetrics
whereExpr := gorm.Expr("TRUE")
if opts.Name != "" {
tx = tx.Where("name", opts.Name)
whereExpr = gorm.Expr("? AND name = ?", whereExpr, opts.Name)
}
if opts.Direction != "" {
tx = tx.Where("direction", opts.Direction)
whereExpr = gorm.Expr("? AND direction = ?", whereExpr, opts.Direction)
}
if opts.Reason != "" {
tx = tx.Where("reason", opts.Reason)
whereExpr = gorm.Expr("? AND reason = ?", whereExpr, opts.Reason)
}
var metrics []dbContractSetChurnMetric
err := s.findPeriods(tx, &metrics, start, n, interval)
err := s.findPeriods(dbContractSetChurnMetric{}.TableName(), &metrics, start, n, interval, whereExpr)
if err != nil {
return nil, fmt.Errorf("failed to fetch contract set churn metrics: %w", err)
}
Expand All @@ -490,13 +490,13 @@ func (s *SQLStore) contractSetChurnMetrics(ctx context.Context, start time.Time,
}

func (s *SQLStore) contractSetMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.ContractSetMetricsQueryOpts) ([]dbContractSetMetric, error) {
tx := s.dbMetrics
whereExpr := gorm.Expr("TRUE")
if opts.Name != "" {
tx = tx.Where("name", opts.Name)
whereExpr = gorm.Expr("name = ?", opts.Name)
}

var metrics []dbContractSetMetric
err := s.findPeriods(tx, &metrics, start, n, interval)
err := s.findPeriods(dbContractSetMetric{}.TableName(), &metrics, start, n, interval, whereExpr)
if err != nil {
return nil, fmt.Errorf("failed to fetch contract set metrics: %w", err)
}
Expand All @@ -517,36 +517,49 @@ func normaliseTimestamp(start time.Time, interval time.Duration, t unixTimeMS) u
return unixTimeMS(time.UnixMilli(normalizedMS))
}

func roundPeriodExpr(db *gorm.DB, start time.Time, interval time.Duration) clause.Expr {
if !isSQLite(db) {
return gorm.Expr("CAST(FLOOR((timestamp - ?) / ?) * ? AS SIGNED)", unixTimeMS(start), interval.Milliseconds(), interval.Milliseconds())
} else {
return gorm.Expr("(timestamp - ?) / ? * ?", unixTimeMS(start), interval.Milliseconds(), interval.Milliseconds())
}
}

func (s *SQLStore) findAggregatedContractPeriods(start time.Time, n uint64, interval time.Duration) ([]dbContractMetric, error) {
if n > api.MetricMaxIntervals {
return nil, api.ErrMaxIntervalsExceeded
}
end := start.Add(time.Duration(n) * interval)
var metricsWithPeriod []struct {
Metric dbContractMetric `gorm:"embedded"`
Period int64
}
currentPeriod := int64(math.MinInt64)
err := s.dbMetrics.Raw(`
SELECT * FROM contracts
WITH RECURSIVE periods AS (
SELECT ? AS period_start
UNION ALL
SELECT period_start + ?
FROM periods
WHERE period_start < ? - ?
)
SELECT contracts.*, i.Period FROM contracts
INNER JOIN (
SELECT fcid, MIN(timestamp) as timestamp, ? AS Period
FROM contracts
WHERE timestamp >= ? AND timestamp < ?
GROUP BY Period, fcid) i
ON contracts.fcid = i.fcid AND contracts.timestamp = i.timestamp
`, roundPeriodExpr(s.dbMetrics, start, interval),
unixTimeMS(start), unixTimeMS(end)).
SELECT
p.period_start as Period,
MIN(c.id) AS id
FROM
periods p
INNER JOIN
contracts c ON c.timestamp >= p.period_start AND c.timestamp < p.period_start + ?
GROUP BY
p.period_start, c.fcid
ORDER BY
p.period_start ASC
) i ON contracts.id = i.id
`, unixTimeMS(start),
interval.Milliseconds(),
unixTimeMS(end),
interval.Milliseconds(),
interval.Milliseconds(),
).
Scan(&metricsWithPeriod).
Error
if err != nil {
return nil, fmt.Errorf("failed to fetch aggregate metrics: %w", err)
}
currentPeriod := int64(math.MinInt64)
var metrics []dbContractMetric
for _, m := range metricsWithPeriod {
m.Metric.FCID = fileContractID{}
Expand All @@ -567,29 +580,47 @@ func (s *SQLStore) findAggregatedContractPeriods(start time.Time, n uint64, inte
// split into intervals and the row with the lowest timestamp for each interval
// is returned. The result is then joined with the original table to retrieve
// only the metrics we want.
func (s *SQLStore) findPeriods(tx *gorm.DB, dst interface{}, start time.Time, n uint64, interval time.Duration) error {
func (s *SQLStore) findPeriods(table string, dst interface{}, start time.Time, n uint64, interval time.Duration, whereExpr clause.Expr) error {
if n > api.MetricMaxIntervals {
return api.ErrMaxIntervalsExceeded
}
end := start.Add(time.Duration(n) * interval)
// inner groups all metrics within the requested time range into periods of
// 'interval' length and gives us the min timestamp of each period.
inner := tx.Model(dst).
Select("MIN(timestamp) AS min_time, ? AS period", roundPeriodExpr(tx, start, interval)).
Where("timestamp >= ? AND timestamp < ?", unixTimeMS(start), unixTimeMS(end)).
Group("period")
// mid then joins the result with the original table. This might yield
// duplicates if multiple rows have the same timestamp so we attach a
// row number. We order the rows by id to make the result deterministic.
mid := s.dbMetrics.Model(dst).
Joins("INNER JOIN (?) periods ON timestamp = periods.min_time", inner).
Select("*, ROW_NUMBER() OVER (PARTITION BY periods.min_time ORDER BY id) AS row_num")
// lastly we select all metrics with row number 1
return s.dbMetrics.Table("(?) numbered", mid).
Where("numbered.row_num = 1").
Find(dst).
return s.dbMetrics.Raw(fmt.Sprintf(`
WITH RECURSIVE periods AS (
SELECT ? AS period_start
UNION ALL
SELECT period_start + ?
FROM periods
WHERE period_start < ? - ?
)
SELECT %s.* FROM %s
INNER JOIN (
SELECT
p.period_start as Period,
MIN(obj.id) AS id
FROM
periods p
INNER JOIN
%s obj ON obj.timestamp >= p.period_start AND obj.timestamp < p.period_start + ?
WHERE ?
GROUP BY
p.period_start
ORDER BY
p.period_start ASC
) i ON %s.id = i.id
`, table, table, table, table),
unixTimeMS(start),
interval.Milliseconds(),
unixTimeMS(end),
interval.Milliseconds(),
interval.Milliseconds(),
whereExpr,
).Scan(dst).
Error
}

func (s *SQLStore) walletMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.WalletMetricsQueryOpts) (metrics []dbWalletMetric, err error) {
err = s.findPeriods(s.dbMetrics, &metrics, start, n, interval)
err = s.findPeriods(dbWalletMetric{}.TableName(), &metrics, start, n, interval, gorm.Expr("TRUE"))
if err != nil {
return nil, fmt.Errorf("failed to fetch wallet metrics: %w", err)
}
Expand All @@ -600,19 +631,19 @@ func (s *SQLStore) walletMetrics(ctx context.Context, start time.Time, n uint64,
}

func (s *SQLStore) performanceMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.PerformanceMetricsQueryOpts) ([]dbPerformanceMetric, error) {
tx := s.dbMetrics
whereExpr := gorm.Expr("TRUE")
if opts.Action != "" {
tx = tx.Where("action", opts.Action)
whereExpr = gorm.Expr("? AND action = ?", whereExpr, opts.Action)
}
if opts.HostKey != (types.PublicKey{}) {
tx = tx.Where("host", publicKey(opts.HostKey))
whereExpr = gorm.Expr("? AND host = ?", whereExpr, publicKey(opts.HostKey))
}
if opts.Origin != "" {
tx = tx.Where("origin", opts.Origin)
whereExpr = gorm.Expr("? AND origin = ?", whereExpr, opts.Origin)
}

var metrics []dbPerformanceMetric
err := s.findPeriods(tx, &metrics, start, n, interval)
err := s.findPeriods(dbPerformanceMetric{}.TableName(), &metrics, start, n, interval, whereExpr)
if err != nil {
return nil, fmt.Errorf("failed to fetch performance metrics: %w", err)
}
Expand Down

0 comments on commit 4affd8a

Please sign in to comment.