Skip to content

Commit

Permalink
Merge branch 'chris/aggregate-contract-speedup' into chris/refresh-he…
Browse files Browse the repository at this point in the history
…alth-lockup
  • Loading branch information
ChrisSchinnerl committed Feb 28, 2024
2 parents 6d19929 + 7db0f92 commit 47939ba
Show file tree
Hide file tree
Showing 7 changed files with 89 additions and 36 deletions.
83 changes: 51 additions & 32 deletions stores/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ import (
"gorm.io/gorm/clause"
)

const (
contractMetricGranularity = 5 * time.Minute
)

type (
// dbContractMetric tracks information about a contract's funds. It is
// supposed to be reported by a worker every time a contract is revised.
Expand Down Expand Up @@ -246,6 +250,21 @@ func (s *SQLStore) RecordContractMetric(ctx context.Context, metrics ...api.Cont
}
}
return s.dbMetrics.Transaction(func(tx *gorm.DB) error {
// delete any existing metric for the same contract that has happened
// within the same 30 seconds window by diving the timestamp by 30 seconds and use integer division.
for _, metric := range metrics {
intervalStart := metric.Timestamp.Std().Truncate(contractMetricGranularity)
intervalEnd := intervalStart.Add(contractMetricGranularity)
err := tx.
Where("timestamp >= ?", unixTimeMS(intervalStart)).
Where("timestamp < ?", unixTimeMS(intervalEnd)).
Where("fcid", fileContractID(metric.ContractID)).
Delete(&dbContractMetric{}).
Error
if err != nil {
return err
}
}
return tx.Create(&dbMetrics).Error
})
}
Expand Down Expand Up @@ -522,43 +541,43 @@ func (s *SQLStore) findAggregatedContractPeriods(start time.Time, n uint64, inte
return nil, api.ErrMaxIntervalsExceeded
}
end := start.Add(time.Duration(n) * interval)
var metricsWithPeriod []struct {

type metricWithPeriod struct {
Metric dbContractMetric `gorm:"embedded"`
Period int64
}
err := s.dbMetrics.Raw(`
WITH RECURSIVE periods AS (
SELECT ? AS period_start
UNION ALL
SELECT period_start + ?
FROM periods
WHERE period_start < ? - ?
)
SELECT contracts.*, i.Period FROM contracts
INNER JOIN (
SELECT
p.period_start as Period,
MIN(c.id) AS id
FROM
periods p
INNER JOIN
contracts c ON c.timestamp >= p.period_start AND c.timestamp < p.period_start + ?
GROUP BY
p.period_start, c.fcid
ORDER BY
p.period_start ASC
) i ON contracts.id = i.id
`, unixTimeMS(start),
interval.Milliseconds(),
unixTimeMS(end),
interval.Milliseconds(),
interval.Milliseconds(),
).
Scan(&metricsWithPeriod).
Error
var metricsWithPeriod []metricWithPeriod

err := s.dbMetrics.Transaction(func(tx *gorm.DB) error {
var fcids []fileContractID
if err := tx.Raw("SELECT DISTINCT fcid FROM contracts WHERE contracts.timestamp >= ? AND contracts.timestamp < ?", unixTimeMS(start), unixTimeMS(end)).
Scan(&fcids).Error; err != nil {
return fmt.Errorf("failed to fetch distinct contract ids: %w", err)
}

for intervalStart := start; intervalStart.Before(end); intervalStart = intervalStart.Add(interval) {
intervalEnd := intervalStart.Add(interval)
for _, fcid := range fcids {
var metrics []dbContractMetric
err := tx.Raw("SELECT * FROM contracts WHERE contracts.timestamp >= ? AND contracts.timestamp < ? AND contracts.fcid = ? LIMIT 1", unixTimeMS(intervalStart), unixTimeMS(intervalEnd), fileContractID(fcid)).
Scan(&metrics).Error
if err != nil {
return fmt.Errorf("failed to fetch contract metrics: %w", err)
} else if len(metrics) == 0 {
continue
}
metricsWithPeriod = append(metricsWithPeriod, metricWithPeriod{
Metric: metrics[0],
Period: intervalStart.UnixMilli(),
})
}
}
return nil
})
if err != nil {
return nil, fmt.Errorf("failed to fetch aggregate metrics: %w", err)
return nil, err
}

currentPeriod := int64(math.MinInt64)
var metrics []dbContractMetric
for _, m := range metricsWithPeriod {
Expand Down
24 changes: 24 additions & 0 deletions stores/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,30 @@ func TestContractMetrics(t *testing.T) {
} else if len(metrics) != 1 {
t.Fatalf("expected 1 metric, got %v", len(metrics))
}

// Drop all metrics.
if err := ss.dbMetrics.Where("TRUE").Delete(&dbContractMetric{}).Error; err != nil {
t.Fatal(err)
}

// Record multiple metrics for the same contract - one per second over 10 minutes
for i := int64(0); i < 600; i++ {
err := ss.RecordContractMetric(context.Background(), api.ContractMetric{
ContractID: types.FileContractID{1},
Timestamp: api.TimeRFC3339(time.Unix(i, 0)),
})
if err != nil {
t.Fatal(err)
}
}

// Check how many metrics were recorded.
var n int64
if err := ss.dbMetrics.Model(&dbContractMetric{}).Count(&n).Error; err != nil {
t.Fatal(err)
} else if n != 2 {
t.Fatalf("expected 2 metrics, got %v", n)
}
}

func TestWalletMetrics(t *testing.T) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
CREATE INDEX `idx_contracts_fcid_timestamp` ON `contracts`(`fcid`,`timestamp`);
3 changes: 2 additions & 1 deletion stores/migrations/mysql/metrics/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ CREATE TABLE `contracts` (
KEY `idx_contracts_timestamp` (`timestamp`),
KEY `idx_remaining_funds` (`remaining_funds_lo`,`remaining_funds_hi`),
KEY `idx_delete_spending` (`delete_spending_lo`,`delete_spending_hi`),
KEY `idx_list_spending` (`list_spending_lo`,`list_spending_hi`)
KEY `idx_list_spending` (`list_spending_lo`,`list_spending_hi`),
KEY `idx_contracts_fcid_timestamp` (`fcid`,`timestamp`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

-- dbPerformanceMetric
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
CREATE INDEX `idx_contracts_fcid_timestamp` ON `contracts`(`fcid`,`timestamp`);
1 change: 1 addition & 0 deletions stores/migrations/sqlite/metrics/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ CREATE INDEX `idx_download_spending` ON `contracts`(`download_spending_lo`,`down
CREATE INDEX `idx_upload_spending` ON `contracts`(`upload_spending_lo`,`upload_spending_hi`);
CREATE INDEX `idx_contracts_revision_number` ON `contracts`(`revision_number`);
CREATE INDEX `idx_remaining_funds` ON `contracts`(`remaining_funds_lo`,`remaining_funds_hi`);
CREATE INDEX `idx_contracts_fcid_timestamp` ON `contracts`(`fcid`,`timestamp`);

-- dbContractPruneMetric
CREATE TABLE `contract_prunes` (`id` integer PRIMARY KEY AUTOINCREMENT,`created_at` datetime,`timestamp` BIGINT NOT NULL,`fcid` blob NOT NULL,`host` blob NOT NULL,`host_version` text,`pruned` BIGINT NOT NULL,`remaining` BIGINT NOT NULL,`duration` integer NOT NULL);
Expand Down
12 changes: 9 additions & 3 deletions stores/migrations_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,26 @@ import (
"gorm.io/gorm"
)

func performMetricsMigrations(tx *gorm.DB, logger *zap.SugaredLogger) error {
func performMetricsMigrations(db *gorm.DB, logger *zap.SugaredLogger) error {
dbIdentifier := "metrics"
migrations := []*gormigrate.Migration{
{
ID: "00001_init",
Migrate: func(tx *gorm.DB) error { return errRunV072 },
},
{
ID: "00001_idx_contracts_fcid_timestamp",
Migrate: func(tx *gorm.DB) error {
return performMigration(tx, dbIdentifier, "00001_idx_contracts_fcid_timestamp", logger)
},
},
}

// Create migrator.
m := gormigrate.New(tx, gormigrate.DefaultOptions, migrations)
m := gormigrate.New(db, gormigrate.DefaultOptions, migrations)

// Set init function.
m.InitSchema(initSchema(tx, dbIdentifier, logger))
m.InitSchema(initSchema(db, dbIdentifier, logger))

// Perform migrations.
if err := m.Migrate(); err != nil {
Expand Down

0 comments on commit 47939ba

Please sign in to comment.