Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Normalise timestamps on returned metrics to match beginning of intervals #789

Merged
merged 6 commits into from
Dec 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions internal/testing/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2182,7 +2182,7 @@ func TestWalletFormUnconfirmed(t *testing.T) {
}

func TestBusRecordedMetrics(t *testing.T) {
startTime := time.Now()
startTime := time.Now().UTC().Round(time.Second)

cluster := newTestCluster(t, testClusterOptions{
hosts: 1,
Expand All @@ -2208,8 +2208,8 @@ func TestBusRecordedMetrics(t *testing.T) {
t.Fatalf("expected 1 contract, got %v", m.Contracts)
} else if m.Name != testContractSet {
t.Fatalf("expected contract set %v, got %v", testContractSet, m.Name)
} else if !m.Timestamp.Std().After(startTime) {
t.Fatal("expected time to be after start time")
} else if m.Timestamp.Std().Before(startTime) {
t.Fatalf("expected time to be after start time %v, got %v", startTime, m.Timestamp.Std())
}
}

Expand All @@ -2225,8 +2225,8 @@ func TestBusRecordedMetrics(t *testing.T) {
t.Fatal("expected non-zero FCID")
} else if m.Name != testContractSet {
t.Fatalf("expected contract set %v, got %v", testContractSet, m.Name)
} else if !m.Timestamp.Std().After(startTime) {
t.Fatal("expected time to be after start time")
} else if m.Timestamp.Std().Before(startTime) {
t.Fatalf("expected time to be after start time %v, got %v", startTime, m.Timestamp.Std())
}

// Get contract metrics.
Expand All @@ -2243,8 +2243,8 @@ func TestBusRecordedMetrics(t *testing.T) {

if len(cMetrics) != 1 {
t.Fatalf("expected 1 metric, got %v", len(cMetrics))
} else if m := cMetrics[0]; !startTime.Before(m.Timestamp.Std()) {
t.Fatalf("expected time to be after start time, got %v", m.Timestamp)
} else if m := cMetrics[0]; m.Timestamp.Std().Before(startTime) {
t.Fatalf("expected time to be after start time %v, got %v", startTime, m.Timestamp.Std())
} else if m.ContractID == (types.FileContractID{}) {
t.Fatal("expected non-zero FCID")
} else if m.HostKey == (types.PublicKey{}) {
Expand Down
32 changes: 28 additions & 4 deletions stores/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,9 @@ func (s *SQLStore) contractMetrics(ctx context.Context, start time.Time, n uint6
if err != nil {
return nil, fmt.Errorf("failed to fetch contract metrics: %w", err)
}

for i, m := range metrics {
metrics[i].Timestamp = normaliseTimestamp(start, interval, m.Timestamp)
}
return metrics, nil
}

Expand All @@ -312,7 +314,9 @@ func (s *SQLStore) contractSetChurnMetrics(ctx context.Context, start time.Time,
if err != nil {
return nil, fmt.Errorf("failed to fetch contract set churn metrics: %w", err)
}

for i, m := range metrics {
metrics[i].Timestamp = normaliseTimestamp(start, interval, m.Timestamp)
}
return metrics, nil
}

Expand All @@ -327,10 +331,23 @@ func (s *SQLStore) contractSetMetrics(ctx context.Context, start time.Time, n ui
if err != nil {
return nil, fmt.Errorf("failed to fetch contract set metrics: %w", err)
}

for i, m := range metrics {
metrics[i].Timestamp = normaliseTimestamp(start, interval, m.Timestamp)
}
return metrics, nil
}

func normaliseTimestamp(start time.Time, interval time.Duration, t unixTimeMS) unixTimeMS {
startMS := start.UnixMilli()
toNormaliseMS := time.Time(t).UnixMilli()
intervalMS := interval.Milliseconds()
if startMS > toNormaliseMS {
return unixTimeMS(start)
}
normalizedMS := (toNormaliseMS-startMS)/intervalMS*intervalMS + start.UnixMilli()
return unixTimeMS(time.UnixMilli(normalizedMS))
}

// findPeriods is the core of all methods retrieving metrics. By using integer
// division rounding combined with a GROUP BY operation, all rows of a table are
// split into intervals and the row with the lowest timestamp for each interval
Expand All @@ -340,8 +357,12 @@ func (s *SQLStore) findPeriods(tx *gorm.DB, dst interface{}, start time.Time, n
end := start.Add(time.Duration(n) * interval)
// inner groups all metrics within the requested time range into periods of
// 'interval' length and gives us the min timestamp of each period.
floorExpr := "(timestamp - ?) / ? * ?"
if !isSQLite(s.dbMetrics) {
floorExpr = "FLOOR((timestamp - ?) / ?) * ?"
}
inner := tx.Model(dst).
Select("MIN(timestamp) AS min_time, (timestamp - ?) / ? * ? AS period", unixTimeMS(start), interval.Milliseconds(), interval.Milliseconds()).
Select(fmt.Sprintf("MIN(timestamp) AS min_time, %s AS period", floorExpr), unixTimeMS(start), interval.Milliseconds(), interval.Milliseconds()).
Where("timestamp >= ? AND timestamp < ?", unixTimeMS(start), unixTimeMS(end)).
Group("period")
// mid then joins the result with the original table. This might yield
Expand All @@ -362,6 +383,9 @@ func (s *SQLStore) walletMetrics(ctx context.Context, start time.Time, n uint64,
if err != nil {
return nil, fmt.Errorf("failed to fetch wallet metrics: %w", err)
}
for i, m := range metrics {
metrics[i].Timestamp = normaliseTimestamp(start, interval, m.Timestamp)
}
return
}

Expand Down
46 changes: 41 additions & 5 deletions stores/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,42 @@ import (
"lukechampine.com/frand"
)

func TestNormaliseTimestamp(t *testing.T) {
tests := []struct {
start time.Time
interval time.Duration
ti time.Time
result time.Time
}{
{
start: time.UnixMilli(100),
interval: 10 * time.Millisecond,
ti: time.UnixMilli(105),
result: time.UnixMilli(100),
},
{
start: time.UnixMilli(100),
interval: 10 * time.Millisecond,
ti: time.UnixMilli(115),
result: time.UnixMilli(110),
},
{
start: time.UnixMilli(100),
interval: 10 * time.Millisecond,
ti: time.UnixMilli(125),
result: time.UnixMilli(120),
},
}

for _, test := range tests {
if result := time.Time(normaliseTimestamp(test.start, test.interval, unixTimeMS(test.ti))); !result.Equal(test.result) {
t.Fatalf("expected %v, got %v", test.result, result)
}
}
}

func TestContractSetMetrics(t *testing.T) {
testStart := time.Now()
testStart := time.Now().Round(time.Millisecond).UTC()
ss := newTestSQLStore(t, defaultTestSQLStoreConfig)
defer ss.Close()

Expand All @@ -25,8 +59,8 @@ func TestContractSetMetrics(t *testing.T) {
t.Fatal(err)
} else if m := metrics[0]; m.Contracts != 0 {
t.Fatalf("expected 0 contracts, got %v", m.Contracts)
} else if !time.Time(m.Timestamp).After(testStart) {
t.Fatal("expected time to be after test start")
} else if ti := time.Time(m.Timestamp); !ti.Equal(testStart) {
t.Fatal("expected time to match start time")
} else if m.Name != testContractSet {
t.Fatalf("expected name to be %v, got %v", testContractSet, m.Name)
}
Expand Down Expand Up @@ -277,8 +311,10 @@ func TestContractMetrics(t *testing.T) {
t.Fatal("expected metrics to be sorted by time")
}
for _, m := range metrics {
if !cmp.Equal(m, fcid2Metric[m.ContractID], cmp.Comparer(api.CompareTimeRFC3339)) {
t.Fatal("unexpected metric", cmp.Diff(m, fcid2Metric[m.ContractID]))
expectedMetric := fcid2Metric[m.ContractID]
expectedMetric.Timestamp = api.TimeRFC3339(normaliseTimestamp(start, interval, unixTimeMS(expectedMetric.Timestamp)))
if !cmp.Equal(m, expectedMetric, cmp.Comparer(api.CompareTimeRFC3339)) {
t.Fatal("unexpected metric", cmp.Diff(m, expectedMetric, cmp.Comparer(api.CompareTimeRFC3339)))
}
cmpFn(m)
}
Expand Down