From e8ee7c5faf0c703f328ce58cf26fa1249388f55c Mon Sep 17 00:00:00 2001 From: Nate Maninger Date: Thu, 14 Dec 2023 14:03:21 -0800 Subject: [PATCH 1/5] sqlite: remove separate rhp data metrics --- persist/sqlite/metrics.go | 50 +++++++++------------ persist/sqlite/migrations.go | 84 ++++++++++++++++++++++++++++++++++++ 2 files changed, 104 insertions(+), 30 deletions(-) diff --git a/persist/sqlite/metrics.go b/persist/sqlite/metrics.go index 9ba0d8ae..2a7bc2ef 100644 --- a/persist/sqlite/metrics.go +++ b/persist/sqlite/metrics.go @@ -42,10 +42,21 @@ const ( metricRegistryWrites = "registryWrites" // bandwidth + metricDataIngress = "dataIngress" + metricDataEgress = "dataEgress" + + // metricRHP2Ingress + // Deprecated: combined into metricDataIngress metricRHP2Ingress = "rhp2Ingress" - metricRHP2Egress = "rhp2Egress" + // metricRHP2Egress + // Deprecated: combined into metricDataEgress + metricRHP2Egress = "rhp2Egress" + // metricRHP3Ingress + // Deprecated: combined into metricDataIngress metricRHP3Ingress = "rhp3Ingress" - metricRHP3Egress = "rhp3Egress" + // metricRHP3Egress + // Deprecated: combined into metricDataEgress + metricRHP3Egress = "rhp3Egress" // pricing metricContractPrice = "contractPrice" @@ -227,33 +238,16 @@ JOIN ( return } -// IncrementRHP2DataUsage increments the RHP2 ingress and egress metrics. -func (s *Store) IncrementRHP2DataUsage(ingress, egress uint64) error { - return s.transaction(func(tx txn) error { - if ingress > 0 { - if err := incrementNumericStat(tx, metricRHP2Ingress, int(ingress), time.Now()); err != nil { - return fmt.Errorf("failed to track ingress: %w", err) - } - } - if egress > 0 { - if err := incrementNumericStat(tx, metricRHP2Egress, int(egress), time.Now()); err != nil { - return fmt.Errorf("failed to track egress: %w", err) - } - } - return nil - }) -} - // IncrementRHP3DataUsage increments the RHP3 ingress and egress metrics. -func (s *Store) IncrementRHP3DataUsage(ingress, egress uint64) error { +func (s *Store) IncrementDataUsage(ingress, egress uint64) error { return s.transaction(func(tx txn) error { if ingress > 0 { - if err := incrementNumericStat(tx, metricRHP3Ingress, int(ingress), time.Now()); err != nil { + if err := incrementNumericStat(tx, metricDataIngress, int(ingress), time.Now()); err != nil { return fmt.Errorf("failed to track ingress: %w", err) } } if egress > 0 { - if err := incrementNumericStat(tx, metricRHP3Egress, int(egress), time.Now()); err != nil { + if err := incrementNumericStat(tx, metricDataEgress, int(egress), time.Now()); err != nil { return fmt.Errorf("failed to track egress: %w", err) } } @@ -390,14 +384,10 @@ func mustParseMetricValue(stat string, buf []byte, m *metrics.Metrics) { case metricRegistryWrites: m.Registry.Writes = mustScanUint64(buf) // bandwidth - case metricRHP2Ingress: - m.Data.RHP2.Ingress = mustScanUint64(buf) - case metricRHP2Egress: - m.Data.RHP2.Egress = mustScanUint64(buf) - case metricRHP3Ingress: - m.Data.RHP3.Ingress = mustScanUint64(buf) - case metricRHP3Egress: - m.Data.RHP3.Egress = mustScanUint64(buf) + case metricDataIngress: + m.Data.Ingress = mustScanUint64(buf) + case metricDataEgress: + m.Data.Egress = mustScanUint64(buf) // potential revenue case metricPotentialRPCRevenue: m.Revenue.Potential.RPC = mustScanCurrency(buf) diff --git a/persist/sqlite/migrations.go b/persist/sqlite/migrations.go index 310c50f6..614cf648 100644 --- a/persist/sqlite/migrations.go +++ b/persist/sqlite/migrations.go @@ -10,6 +10,89 @@ import ( "go.uber.org/zap" ) +// migrateVersion24 combines the rhp2 and rhp3 data metrics +func migrateVersion24(tx txn, log *zap.Logger) error { + rows, err := tx.Query(`SELECT date_created, stat, stat_value FROM host_stats WHERE stat IN (?, ?, ?, ?) ORDER BY date_created ASC`, metricRHP2Ingress, metricRHP2Egress, metricRHP3Ingress, metricRHP3Egress) + if err != nil { + return fmt.Errorf("failed to query host stats: %w", err) + } + defer rows.Close() + + type combinedStat struct { + timestamp time.Time + value uint64 + } + + var lastRHP2Ingress, lastRHP2Egress uint64 + var lastRHP3Ingress, lastRHP3Egress uint64 + var dataPointsIngress []combinedStat + var dataPointsEgress []combinedStat + + for rows.Next() { + var timestamp time.Time + var stat string + var value uint64 + if err := rows.Scan((*sqlTime)(×tamp), &stat, (*sqlUint64)(&value)); err != nil { + return fmt.Errorf("failed to scan host stat: %w", err) + } + + switch stat { + case metricRHP2Ingress: + lastRHP2Ingress = value + stat = metricDataIngress + case metricRHP2Egress: + lastRHP2Egress = value + stat = metricDataEgress + case metricRHP3Ingress: + lastRHP3Ingress = value + stat = metricDataIngress + case metricRHP3Egress: + lastRHP3Egress = value + stat = metricDataEgress + } + + switch stat { + case metricDataIngress: + if len(dataPointsIngress) == 0 || !dataPointsIngress[len(dataPointsIngress)-1].timestamp.Equal(timestamp) { + dataPointsIngress = append(dataPointsIngress, combinedStat{timestamp, lastRHP2Ingress + lastRHP3Ingress}) + } else { + dataPointsIngress[len(dataPointsIngress)-1].value = lastRHP2Ingress + lastRHP3Ingress + } + case metricDataEgress: + if len(dataPointsEgress) == 0 || !dataPointsEgress[len(dataPointsEgress)-1].timestamp.Equal(timestamp) { + dataPointsEgress = append(dataPointsEgress, combinedStat{timestamp, lastRHP2Egress + lastRHP3Egress}) + } else { + dataPointsEgress[len(dataPointsEgress)-1].value = lastRHP2Egress + lastRHP3Egress + } + } + } + + if err := rows.Close(); err != nil { + return fmt.Errorf("failed to close rows: %w", err) + } + + // add the new data points + for _, dataPoint := range dataPointsIngress { + if err := setNumericStat(tx, metricDataIngress, dataPoint.value, dataPoint.timestamp); err != nil { + return fmt.Errorf("failed to set data ingress metric: %w", err) + } + log.Debug("added ingress datapoint", zap.Time("timestamp", dataPoint.timestamp), zap.Uint64("value", dataPoint.value)) + } + + for _, dataPoint := range dataPointsEgress { + if err := setNumericStat(tx, metricDataEgress, dataPoint.value, dataPoint.timestamp); err != nil { + return fmt.Errorf("failed to set data egress metric: %w", err) + } + log.Debug("added egress datapoint", zap.Time("timestamp", dataPoint.timestamp), zap.Uint64("value", dataPoint.value)) + } + + // delete the old data points + if _, err := tx.Exec(`DELETE FROM host_stats WHERE stat IN (?, ?, ?, ?)`, metricRHP2Ingress, metricRHP2Egress, metricRHP3Ingress, metricRHP3Egress); err != nil { + return fmt.Errorf("failed to delete old data points: %w", err) + } + return nil +} + // migrateVersion23 creates the webhooks table. func migrateVersion23(tx txn, _ *zap.Logger) error { const query = `CREATE TABLE webhooks ( @@ -606,4 +689,5 @@ var migrations = []func(tx txn, log *zap.Logger) error{ migrateVersion21, migrateVersion22, migrateVersion23, + migrateVersion24, } From 8b1fc8b948e9e7234b7f2aedbfc7ccff9d0b66dc Mon Sep 17 00:00:00 2001 From: Nate Maninger Date: Thu, 14 Dec 2023 14:03:34 -0800 Subject: [PATCH 2/5] rhp: increase data recorder interval --- rhp/recorder.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rhp/recorder.go b/rhp/recorder.go index 83079475..d60519b1 100644 --- a/rhp/recorder.go +++ b/rhp/recorder.go @@ -7,7 +7,7 @@ import ( "go.uber.org/zap" ) -const persistInterval = 30 * time.Second +const persistInterval = time.Minute type ( // A DataRecorderStore persists data usage From 64dde41798f071c41975c2161d4a02efee486028 Mon Sep 17 00:00:00 2001 From: Nate Maninger Date: Thu, 14 Dec 2023 14:04:27 -0800 Subject: [PATCH 3/5] metrics: remove separate rhp metrics --- host/metrics/types.go | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/host/metrics/types.go b/host/metrics/types.go index 5b80b71f..cb29b626 100644 --- a/host/metrics/types.go +++ b/host/metrics/types.go @@ -29,8 +29,8 @@ type ( RegistryWrite types.Currency `json:"registryWrite"` } - // Data is a collection of metrics related to data usage. - Data struct { + // DataMetrics is a collection of metrics related to data usage. + DataMetrics struct { // Ingress returns the number of bytes received by the host. Ingress uint64 `json:"ingress"` // Egress returns the number of bytes sent by the host. @@ -95,12 +95,6 @@ type ( Earned Revenue `json:"earned"` } - // DataMetrics is a collection of metrics related to data usage. - DataMetrics struct { - RHP2 Data `json:"rhp2"` - RHP3 Data `json:"rhp3"` - } - // Metrics is a collection of metrics for the host. Metrics struct { Accounts Accounts `json:"accounts"` From cdbd51162796bbecd61716146dfa2456b8a0f7e5 Mon Sep 17 00:00:00 2001 From: Nate Maninger Date: Thu, 14 Dec 2023 14:04:39 -0800 Subject: [PATCH 4/5] cmd: use same data recorder for rhp2 and rhp3 --- cmd/hostd/monitor.go | 27 --------------------------- cmd/hostd/node.go | 28 ++++++++++++---------------- 2 files changed, 12 insertions(+), 43 deletions(-) delete mode 100644 cmd/hostd/monitor.go diff --git a/cmd/hostd/monitor.go b/cmd/hostd/monitor.go deleted file mode 100644 index 80f18891..00000000 --- a/cmd/hostd/monitor.go +++ /dev/null @@ -1,27 +0,0 @@ -package main - -import "go.sia.tech/hostd/persist/sqlite" - -type ( - // rhp2MonitorStore wraps the sqlite store and implements the - // DataRecorderStore interface - rhp2MonitorStore struct { - store *sqlite.Store - } - - // rhp3MonitorStore wraps the sqlite store and implements the - // DataRecorderStore interface - rhp3MonitorStore struct { - store *sqlite.Store - } -) - -// IncrementDataUsage implements the DataRecorderStore interface -func (r2 *rhp2MonitorStore) IncrementDataUsage(ingress, egress uint64) error { - return r2.store.IncrementRHP2DataUsage(ingress, egress) -} - -// IncrementDataUsage implements the DataRecorderStore interface -func (r3 *rhp3MonitorStore) IncrementDataUsage(ingress, egress uint64) error { - return r3.store.IncrementRHP3DataUsage(ingress, egress) -} diff --git a/cmd/hostd/node.go b/cmd/hostd/node.go index 9d533cb9..50ab10d9 100644 --- a/cmd/hostd/node.go +++ b/cmd/hostd/node.go @@ -45,18 +45,16 @@ type node struct { registry *registry.Manager storage *storage.VolumeManager - sessions *rhp.SessionReporter - rhp2Monitor *rhp.DataRecorder - rhp2 *rhp2.SessionHandler - rhp3Monitor *rhp.DataRecorder - rhp3 *rhp3.SessionHandler + sessions *rhp.SessionReporter + data *rhp.DataRecorder + rhp2 *rhp2.SessionHandler + rhp3 *rhp3.SessionHandler } func (n *node) Close() error { n.rhp3.Close() n.rhp2.Close() - n.rhp2Monitor.Close() - n.rhp3Monitor.Close() + n.data.Close() n.storage.Close() n.contracts.Close() n.w.Close() @@ -191,14 +189,13 @@ func newNode(walletKey types.PrivateKey, logger *zap.Logger) (*node, types.Priva sessions := rhp.NewSessionReporter() - rhp2Monitor := rhp.NewDataRecorder(&rhp2MonitorStore{db}, logger.Named("rhp2Monitor")) - rhp2, err := startRHP2(rhp2Listener, hostKey, rhp3Listener.Addr().String(), cm, tp, w, contractManager, sr, sm, rhp2Monitor, sessions, logger.Named("rhp2")) + dm := rhp.NewDataRecorder(db, logger.Named("data")) + rhp2, err := startRHP2(rhp2Listener, hostKey, rhp3Listener.Addr().String(), cm, tp, w, contractManager, sr, sm, dm, sessions, logger.Named("rhp2")) if err != nil { return nil, types.PrivateKey{}, fmt.Errorf("failed to start rhp2: %w", err) } - rhp3Monitor := rhp.NewDataRecorder(&rhp3MonitorStore{db}, logger.Named("rhp3Monitor")) - rhp3, err := startRHP3(rhp3Listener, hostKey, cm, tp, w, accountManager, contractManager, registryManager, sr, sm, rhp3Monitor, sessions, logger.Named("rhp3")) + rhp3, err := startRHP3(rhp3Listener, hostKey, cm, tp, w, accountManager, contractManager, registryManager, sr, sm, dm, sessions, logger.Named("rhp3")) if err != nil { return nil, types.PrivateKey{}, fmt.Errorf("failed to start rhp3: %w", err) } @@ -219,10 +216,9 @@ func newNode(walletKey types.PrivateKey, logger *zap.Logger) (*node, types.Priva storage: sm, registry: registryManager, - sessions: sessions, - rhp2Monitor: rhp2Monitor, - rhp2: rhp2, - rhp3Monitor: rhp3Monitor, - rhp3: rhp3, + sessions: sessions, + data: dm, + rhp2: rhp2, + rhp3: rhp3, }, hostKey, nil } From 7e004f913df78de5115ba6cd08fc0a8cebf9e201 Mon Sep 17 00:00:00 2001 From: Nate Maninger Date: Thu, 14 Dec 2023 14:14:01 -0800 Subject: [PATCH 5/5] metrics,sqlite,rhp: add RHP designation for future-proofing --- host/metrics/types.go | 5 +++++ persist/sqlite/metrics.go | 28 ++++++++++++++-------------- persist/sqlite/migrations.go | 16 ++++++++-------- rhp/recorder.go | 4 ++-- 4 files changed, 29 insertions(+), 24 deletions(-) diff --git a/host/metrics/types.go b/host/metrics/types.go index cb29b626..1b5c6b1c 100644 --- a/host/metrics/types.go +++ b/host/metrics/types.go @@ -31,6 +31,11 @@ type ( // DataMetrics is a collection of metrics related to data usage. DataMetrics struct { + RHP RHPData `json:"rhp"` + } + + // RHPData is a collection of data metrics related to the RHP. + RHPData struct { // Ingress returns the number of bytes received by the host. Ingress uint64 `json:"ingress"` // Egress returns the number of bytes sent by the host. diff --git a/persist/sqlite/metrics.go b/persist/sqlite/metrics.go index 2a7bc2ef..ab9a6fc7 100644 --- a/persist/sqlite/metrics.go +++ b/persist/sqlite/metrics.go @@ -42,20 +42,20 @@ const ( metricRegistryWrites = "registryWrites" // bandwidth - metricDataIngress = "dataIngress" - metricDataEgress = "dataEgress" + metricDataRHPIngress = "dataIngress" + metricDataRHPEgress = "dataEgress" // metricRHP2Ingress - // Deprecated: combined into metricDataIngress + // Deprecated: combined into metricDataRHPIngress metricRHP2Ingress = "rhp2Ingress" // metricRHP2Egress - // Deprecated: combined into metricDataEgress + // Deprecated: combined into metricDataRHPEgress metricRHP2Egress = "rhp2Egress" // metricRHP3Ingress - // Deprecated: combined into metricDataIngress + // Deprecated: combined into metricDataRHPIngress metricRHP3Ingress = "rhp3Ingress" // metricRHP3Egress - // Deprecated: combined into metricDataEgress + // Deprecated: combined into metricDataRHPEgress metricRHP3Egress = "rhp3Egress" // pricing @@ -238,16 +238,16 @@ JOIN ( return } -// IncrementRHP3DataUsage increments the RHP3 ingress and egress metrics. -func (s *Store) IncrementDataUsage(ingress, egress uint64) error { +// IncrementRHPDataUsage increments the RHP3 ingress and egress metrics. +func (s *Store) IncrementRHPDataUsage(ingress, egress uint64) error { return s.transaction(func(tx txn) error { if ingress > 0 { - if err := incrementNumericStat(tx, metricDataIngress, int(ingress), time.Now()); err != nil { + if err := incrementNumericStat(tx, metricDataRHPIngress, int(ingress), time.Now()); err != nil { return fmt.Errorf("failed to track ingress: %w", err) } } if egress > 0 { - if err := incrementNumericStat(tx, metricDataEgress, int(egress), time.Now()); err != nil { + if err := incrementNumericStat(tx, metricDataRHPEgress, int(egress), time.Now()); err != nil { return fmt.Errorf("failed to track egress: %w", err) } } @@ -384,10 +384,10 @@ func mustParseMetricValue(stat string, buf []byte, m *metrics.Metrics) { case metricRegistryWrites: m.Registry.Writes = mustScanUint64(buf) // bandwidth - case metricDataIngress: - m.Data.Ingress = mustScanUint64(buf) - case metricDataEgress: - m.Data.Egress = mustScanUint64(buf) + case metricDataRHPIngress: + m.Data.RHP.Ingress = mustScanUint64(buf) + case metricDataRHPEgress: + m.Data.RHP.Egress = mustScanUint64(buf) // potential revenue case metricPotentialRPCRevenue: m.Revenue.Potential.RPC = mustScanCurrency(buf) diff --git a/persist/sqlite/migrations.go b/persist/sqlite/migrations.go index 614cf648..33f16dba 100644 --- a/persist/sqlite/migrations.go +++ b/persist/sqlite/migrations.go @@ -39,26 +39,26 @@ func migrateVersion24(tx txn, log *zap.Logger) error { switch stat { case metricRHP2Ingress: lastRHP2Ingress = value - stat = metricDataIngress + stat = metricDataRHPIngress case metricRHP2Egress: lastRHP2Egress = value - stat = metricDataEgress + stat = metricDataRHPEgress case metricRHP3Ingress: lastRHP3Ingress = value - stat = metricDataIngress + stat = metricDataRHPIngress case metricRHP3Egress: lastRHP3Egress = value - stat = metricDataEgress + stat = metricDataRHPEgress } switch stat { - case metricDataIngress: + case metricDataRHPIngress: if len(dataPointsIngress) == 0 || !dataPointsIngress[len(dataPointsIngress)-1].timestamp.Equal(timestamp) { dataPointsIngress = append(dataPointsIngress, combinedStat{timestamp, lastRHP2Ingress + lastRHP3Ingress}) } else { dataPointsIngress[len(dataPointsIngress)-1].value = lastRHP2Ingress + lastRHP3Ingress } - case metricDataEgress: + case metricDataRHPEgress: if len(dataPointsEgress) == 0 || !dataPointsEgress[len(dataPointsEgress)-1].timestamp.Equal(timestamp) { dataPointsEgress = append(dataPointsEgress, combinedStat{timestamp, lastRHP2Egress + lastRHP3Egress}) } else { @@ -73,14 +73,14 @@ func migrateVersion24(tx txn, log *zap.Logger) error { // add the new data points for _, dataPoint := range dataPointsIngress { - if err := setNumericStat(tx, metricDataIngress, dataPoint.value, dataPoint.timestamp); err != nil { + if err := setNumericStat(tx, metricDataRHPIngress, dataPoint.value, dataPoint.timestamp); err != nil { return fmt.Errorf("failed to set data ingress metric: %w", err) } log.Debug("added ingress datapoint", zap.Time("timestamp", dataPoint.timestamp), zap.Uint64("value", dataPoint.value)) } for _, dataPoint := range dataPointsEgress { - if err := setNumericStat(tx, metricDataEgress, dataPoint.value, dataPoint.timestamp); err != nil { + if err := setNumericStat(tx, metricDataRHPEgress, dataPoint.value, dataPoint.timestamp); err != nil { return fmt.Errorf("failed to set data egress metric: %w", err) } log.Debug("added egress datapoint", zap.Time("timestamp", dataPoint.timestamp), zap.Uint64("value", dataPoint.value)) diff --git a/rhp/recorder.go b/rhp/recorder.go index d60519b1..0fff34ab 100644 --- a/rhp/recorder.go +++ b/rhp/recorder.go @@ -12,7 +12,7 @@ const persistInterval = time.Minute type ( // A DataRecorderStore persists data usage DataRecorderStore interface { - IncrementDataUsage(ingress, egress uint64) error + IncrementRHPDataUsage(ingress, egress uint64) error } // A DataRecorder records the amount of data read and written across @@ -59,7 +59,7 @@ func (dr *DataRecorder) persistUsage() { return } - if err := dr.store.IncrementDataUsage(r, w); err != nil { + if err := dr.store.IncrementRHPDataUsage(r, w); err != nil { dr.log.Error("failed to persist data usage", zap.Error(err)) return }