diff --git a/cmd/hostd/monitor.go b/cmd/hostd/monitor.go deleted file mode 100644 index 80f18891..00000000 --- a/cmd/hostd/monitor.go +++ /dev/null @@ -1,27 +0,0 @@ -package main - -import "go.sia.tech/hostd/persist/sqlite" - -type ( - // rhp2MonitorStore wraps the sqlite store and implements the - // DataRecorderStore interface - rhp2MonitorStore struct { - store *sqlite.Store - } - - // rhp3MonitorStore wraps the sqlite store and implements the - // DataRecorderStore interface - rhp3MonitorStore struct { - store *sqlite.Store - } -) - -// IncrementDataUsage implements the DataRecorderStore interface -func (r2 *rhp2MonitorStore) IncrementDataUsage(ingress, egress uint64) error { - return r2.store.IncrementRHP2DataUsage(ingress, egress) -} - -// IncrementDataUsage implements the DataRecorderStore interface -func (r3 *rhp3MonitorStore) IncrementDataUsage(ingress, egress uint64) error { - return r3.store.IncrementRHP3DataUsage(ingress, egress) -} diff --git a/cmd/hostd/node.go b/cmd/hostd/node.go index 9d533cb9..50ab10d9 100644 --- a/cmd/hostd/node.go +++ b/cmd/hostd/node.go @@ -45,18 +45,16 @@ type node struct { registry *registry.Manager storage *storage.VolumeManager - sessions *rhp.SessionReporter - rhp2Monitor *rhp.DataRecorder - rhp2 *rhp2.SessionHandler - rhp3Monitor *rhp.DataRecorder - rhp3 *rhp3.SessionHandler + sessions *rhp.SessionReporter + data *rhp.DataRecorder + rhp2 *rhp2.SessionHandler + rhp3 *rhp3.SessionHandler } func (n *node) Close() error { n.rhp3.Close() n.rhp2.Close() - n.rhp2Monitor.Close() - n.rhp3Monitor.Close() + n.data.Close() n.storage.Close() n.contracts.Close() n.w.Close() @@ -191,14 +189,13 @@ func newNode(walletKey types.PrivateKey, logger *zap.Logger) (*node, types.Priva sessions := rhp.NewSessionReporter() - rhp2Monitor := rhp.NewDataRecorder(&rhp2MonitorStore{db}, logger.Named("rhp2Monitor")) - rhp2, err := startRHP2(rhp2Listener, hostKey, rhp3Listener.Addr().String(), cm, tp, w, contractManager, sr, sm, rhp2Monitor, sessions, logger.Named("rhp2")) + dm := rhp.NewDataRecorder(db, logger.Named("data")) + rhp2, err := startRHP2(rhp2Listener, hostKey, rhp3Listener.Addr().String(), cm, tp, w, contractManager, sr, sm, dm, sessions, logger.Named("rhp2")) if err != nil { return nil, types.PrivateKey{}, fmt.Errorf("failed to start rhp2: %w", err) } - rhp3Monitor := rhp.NewDataRecorder(&rhp3MonitorStore{db}, logger.Named("rhp3Monitor")) - rhp3, err := startRHP3(rhp3Listener, hostKey, cm, tp, w, accountManager, contractManager, registryManager, sr, sm, rhp3Monitor, sessions, logger.Named("rhp3")) + rhp3, err := startRHP3(rhp3Listener, hostKey, cm, tp, w, accountManager, contractManager, registryManager, sr, sm, dm, sessions, logger.Named("rhp3")) if err != nil { return nil, types.PrivateKey{}, fmt.Errorf("failed to start rhp3: %w", err) } @@ -219,10 +216,9 @@ func newNode(walletKey types.PrivateKey, logger *zap.Logger) (*node, types.Priva storage: sm, registry: registryManager, - sessions: sessions, - rhp2Monitor: rhp2Monitor, - rhp2: rhp2, - rhp3Monitor: rhp3Monitor, - rhp3: rhp3, + sessions: sessions, + data: dm, + rhp2: rhp2, + rhp3: rhp3, }, hostKey, nil } diff --git a/host/metrics/types.go b/host/metrics/types.go index 5b80b71f..1b5c6b1c 100644 --- a/host/metrics/types.go +++ b/host/metrics/types.go @@ -29,8 +29,13 @@ type ( RegistryWrite types.Currency `json:"registryWrite"` } - // Data is a collection of metrics related to data usage. - Data struct { + // DataMetrics is a collection of metrics related to data usage. + DataMetrics struct { + RHP RHPData `json:"rhp"` + } + + // RHPData is a collection of data metrics related to the RHP. + RHPData struct { // Ingress returns the number of bytes received by the host. Ingress uint64 `json:"ingress"` // Egress returns the number of bytes sent by the host. @@ -95,12 +100,6 @@ type ( Earned Revenue `json:"earned"` } - // DataMetrics is a collection of metrics related to data usage. - DataMetrics struct { - RHP2 Data `json:"rhp2"` - RHP3 Data `json:"rhp3"` - } - // Metrics is a collection of metrics for the host. Metrics struct { Accounts Accounts `json:"accounts"` diff --git a/persist/sqlite/metrics.go b/persist/sqlite/metrics.go index 9ba0d8ae..ab9a6fc7 100644 --- a/persist/sqlite/metrics.go +++ b/persist/sqlite/metrics.go @@ -42,10 +42,21 @@ const ( metricRegistryWrites = "registryWrites" // bandwidth + metricDataRHPIngress = "dataIngress" + metricDataRHPEgress = "dataEgress" + + // metricRHP2Ingress + // Deprecated: combined into metricDataRHPIngress metricRHP2Ingress = "rhp2Ingress" - metricRHP2Egress = "rhp2Egress" + // metricRHP2Egress + // Deprecated: combined into metricDataRHPEgress + metricRHP2Egress = "rhp2Egress" + // metricRHP3Ingress + // Deprecated: combined into metricDataRHPIngress metricRHP3Ingress = "rhp3Ingress" - metricRHP3Egress = "rhp3Egress" + // metricRHP3Egress + // Deprecated: combined into metricDataRHPEgress + metricRHP3Egress = "rhp3Egress" // pricing metricContractPrice = "contractPrice" @@ -227,33 +238,16 @@ JOIN ( return } -// IncrementRHP2DataUsage increments the RHP2 ingress and egress metrics. -func (s *Store) IncrementRHP2DataUsage(ingress, egress uint64) error { - return s.transaction(func(tx txn) error { - if ingress > 0 { - if err := incrementNumericStat(tx, metricRHP2Ingress, int(ingress), time.Now()); err != nil { - return fmt.Errorf("failed to track ingress: %w", err) - } - } - if egress > 0 { - if err := incrementNumericStat(tx, metricRHP2Egress, int(egress), time.Now()); err != nil { - return fmt.Errorf("failed to track egress: %w", err) - } - } - return nil - }) -} - -// IncrementRHP3DataUsage increments the RHP3 ingress and egress metrics. -func (s *Store) IncrementRHP3DataUsage(ingress, egress uint64) error { +// IncrementRHPDataUsage increments the RHP3 ingress and egress metrics. +func (s *Store) IncrementRHPDataUsage(ingress, egress uint64) error { return s.transaction(func(tx txn) error { if ingress > 0 { - if err := incrementNumericStat(tx, metricRHP3Ingress, int(ingress), time.Now()); err != nil { + if err := incrementNumericStat(tx, metricDataRHPIngress, int(ingress), time.Now()); err != nil { return fmt.Errorf("failed to track ingress: %w", err) } } if egress > 0 { - if err := incrementNumericStat(tx, metricRHP3Egress, int(egress), time.Now()); err != nil { + if err := incrementNumericStat(tx, metricDataRHPEgress, int(egress), time.Now()); err != nil { return fmt.Errorf("failed to track egress: %w", err) } } @@ -390,14 +384,10 @@ func mustParseMetricValue(stat string, buf []byte, m *metrics.Metrics) { case metricRegistryWrites: m.Registry.Writes = mustScanUint64(buf) // bandwidth - case metricRHP2Ingress: - m.Data.RHP2.Ingress = mustScanUint64(buf) - case metricRHP2Egress: - m.Data.RHP2.Egress = mustScanUint64(buf) - case metricRHP3Ingress: - m.Data.RHP3.Ingress = mustScanUint64(buf) - case metricRHP3Egress: - m.Data.RHP3.Egress = mustScanUint64(buf) + case metricDataRHPIngress: + m.Data.RHP.Ingress = mustScanUint64(buf) + case metricDataRHPEgress: + m.Data.RHP.Egress = mustScanUint64(buf) // potential revenue case metricPotentialRPCRevenue: m.Revenue.Potential.RPC = mustScanCurrency(buf) diff --git a/persist/sqlite/migrations.go b/persist/sqlite/migrations.go index 310c50f6..33f16dba 100644 --- a/persist/sqlite/migrations.go +++ b/persist/sqlite/migrations.go @@ -10,6 +10,89 @@ import ( "go.uber.org/zap" ) +// migrateVersion24 combines the rhp2 and rhp3 data metrics +func migrateVersion24(tx txn, log *zap.Logger) error { + rows, err := tx.Query(`SELECT date_created, stat, stat_value FROM host_stats WHERE stat IN (?, ?, ?, ?) ORDER BY date_created ASC`, metricRHP2Ingress, metricRHP2Egress, metricRHP3Ingress, metricRHP3Egress) + if err != nil { + return fmt.Errorf("failed to query host stats: %w", err) + } + defer rows.Close() + + type combinedStat struct { + timestamp time.Time + value uint64 + } + + var lastRHP2Ingress, lastRHP2Egress uint64 + var lastRHP3Ingress, lastRHP3Egress uint64 + var dataPointsIngress []combinedStat + var dataPointsEgress []combinedStat + + for rows.Next() { + var timestamp time.Time + var stat string + var value uint64 + if err := rows.Scan((*sqlTime)(×tamp), &stat, (*sqlUint64)(&value)); err != nil { + return fmt.Errorf("failed to scan host stat: %w", err) + } + + switch stat { + case metricRHP2Ingress: + lastRHP2Ingress = value + stat = metricDataRHPIngress + case metricRHP2Egress: + lastRHP2Egress = value + stat = metricDataRHPEgress + case metricRHP3Ingress: + lastRHP3Ingress = value + stat = metricDataRHPIngress + case metricRHP3Egress: + lastRHP3Egress = value + stat = metricDataRHPEgress + } + + switch stat { + case metricDataRHPIngress: + if len(dataPointsIngress) == 0 || !dataPointsIngress[len(dataPointsIngress)-1].timestamp.Equal(timestamp) { + dataPointsIngress = append(dataPointsIngress, combinedStat{timestamp, lastRHP2Ingress + lastRHP3Ingress}) + } else { + dataPointsIngress[len(dataPointsIngress)-1].value = lastRHP2Ingress + lastRHP3Ingress + } + case metricDataRHPEgress: + if len(dataPointsEgress) == 0 || !dataPointsEgress[len(dataPointsEgress)-1].timestamp.Equal(timestamp) { + dataPointsEgress = append(dataPointsEgress, combinedStat{timestamp, lastRHP2Egress + lastRHP3Egress}) + } else { + dataPointsEgress[len(dataPointsEgress)-1].value = lastRHP2Egress + lastRHP3Egress + } + } + } + + if err := rows.Close(); err != nil { + return fmt.Errorf("failed to close rows: %w", err) + } + + // add the new data points + for _, dataPoint := range dataPointsIngress { + if err := setNumericStat(tx, metricDataRHPIngress, dataPoint.value, dataPoint.timestamp); err != nil { + return fmt.Errorf("failed to set data ingress metric: %w", err) + } + log.Debug("added ingress datapoint", zap.Time("timestamp", dataPoint.timestamp), zap.Uint64("value", dataPoint.value)) + } + + for _, dataPoint := range dataPointsEgress { + if err := setNumericStat(tx, metricDataRHPEgress, dataPoint.value, dataPoint.timestamp); err != nil { + return fmt.Errorf("failed to set data egress metric: %w", err) + } + log.Debug("added egress datapoint", zap.Time("timestamp", dataPoint.timestamp), zap.Uint64("value", dataPoint.value)) + } + + // delete the old data points + if _, err := tx.Exec(`DELETE FROM host_stats WHERE stat IN (?, ?, ?, ?)`, metricRHP2Ingress, metricRHP2Egress, metricRHP3Ingress, metricRHP3Egress); err != nil { + return fmt.Errorf("failed to delete old data points: %w", err) + } + return nil +} + // migrateVersion23 creates the webhooks table. func migrateVersion23(tx txn, _ *zap.Logger) error { const query = `CREATE TABLE webhooks ( @@ -606,4 +689,5 @@ var migrations = []func(tx txn, log *zap.Logger) error{ migrateVersion21, migrateVersion22, migrateVersion23, + migrateVersion24, } diff --git a/rhp/recorder.go b/rhp/recorder.go index 83079475..0fff34ab 100644 --- a/rhp/recorder.go +++ b/rhp/recorder.go @@ -7,12 +7,12 @@ import ( "go.uber.org/zap" ) -const persistInterval = 30 * time.Second +const persistInterval = time.Minute type ( // A DataRecorderStore persists data usage DataRecorderStore interface { - IncrementDataUsage(ingress, egress uint64) error + IncrementRHPDataUsage(ingress, egress uint64) error } // A DataRecorder records the amount of data read and written across @@ -59,7 +59,7 @@ func (dr *DataRecorder) persistUsage() { return } - if err := dr.store.IncrementDataUsage(r, w); err != nil { + if err := dr.store.IncrementRHPDataUsage(r, w); err != nil { dr.log.Error("failed to persist data usage", zap.Error(err)) return }