Skip to content

Commit

Permalink
stores: record wallet metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
peterjan committed Feb 28, 2024
1 parent e5b55d4 commit 014ffc9
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 29 deletions.
13 changes: 9 additions & 4 deletions internal/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
"go.sia.tech/coreutils"
"go.sia.tech/coreutils/chain"
"go.sia.tech/coreutils/syncer"
"go.sia.tech/coreutils/wallet"
cwallet "go.sia.tech/coreutils/wallet"
"go.sia.tech/renterd/alerts"
"go.sia.tech/renterd/autopilot"
"go.sia.tech/renterd/bus"
Expand All @@ -32,7 +32,7 @@ import (

// TODOs:
// - pass last tip to AddSubscriber
// - all wallet metrics support
// - extend wallet metric with immature
// - add UPNP support

type BusConfig struct {
Expand Down Expand Up @@ -122,7 +122,7 @@ func NewBus(cfg BusConfig, dir string, seed types.PrivateKey, logger *zap.Logger
cm := chain.NewManager(store, state)

// create wallet
w, err := wallet.NewSingleAddressWallet(seed, cm, sqlStore, wallet.WithReservationDuration(cfg.UsedUTXOExpiry))
w, err := NewSingleAddressWallet(seed, cm, sqlStore, sqlStore, logger.Named("wallet").Sugar(), cwallet.WithReservationDuration(cfg.UsedUTXOExpiry))
if err != nil {
return nil, nil, nil, err
}
Expand All @@ -147,7 +147,7 @@ func NewBus(cfg BusConfig, dir string, seed types.PrivateKey, logger *zap.Logger
}
s := syncer.New(l, cm, sqlStore, header, syncer.WithSyncInterval(100*time.Millisecond), syncer.WithLogger(logger.Named("syncer")))

b, err := bus.New(alertsMgr, wh, cm, s, w, sqlStore, sqlStore, sqlStore, sqlStore, sqlStore, sqlStore, logger)
b, err := bus.New(alertsMgr, wh, cm, s, w.SingleAddressWallet, sqlStore, sqlStore, sqlStore, sqlStore, sqlStore, sqlStore, logger)
if err != nil {
return nil, nil, nil, err
}
Expand All @@ -169,6 +169,11 @@ func NewBus(cfg BusConfig, dir string, seed types.PrivateKey, logger *zap.Logger
return nil, nil, nil, err
}

err = cm.AddSubscriber(w, types.ChainIndex{})
if err != nil {
return nil, nil, nil, err
}

shutdownFn := func(ctx context.Context) error {
return errors.Join(
l.Close(),
Expand Down
75 changes: 75 additions & 0 deletions internal/node/wallet.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package node

import (
"context"
"time"

"go.sia.tech/core/types"
"go.sia.tech/coreutils/chain"
"go.sia.tech/coreutils/wallet"
"go.sia.tech/renterd/api"
"go.uber.org/zap"
)

type metricRecorder interface {
RecordWalletMetric(ctx context.Context, metrics ...api.WalletMetric) error
}

type singleAddressWallet struct {
*wallet.SingleAddressWallet

cm *chain.Manager
mr metricRecorder
logger *zap.SugaredLogger
}

func NewSingleAddressWallet(seed types.PrivateKey, cm *chain.Manager, store wallet.SingleAddressStore, mr metricRecorder, l *zap.SugaredLogger, opts ...wallet.Option) (*singleAddressWallet, error) {
w, err := wallet.NewSingleAddressWallet(seed, cm, store, opts...)
if err != nil {
return nil, err
}

return &singleAddressWallet{w, cm, mr, l}, nil
}

func (w *singleAddressWallet) ProcessChainApplyUpdate(cau *chain.ApplyUpdate, mayCommit bool) error {
// escape early if we're not synced
if !w.isSynced() {
return nil
}

// fetch balance
balance, err := w.Balance()
if err != nil {
w.logger.Errorf("failed to fetch wallet balance, err: %v", err)
return nil
}

// apply sane timeout
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()

// record wallet metric
err = w.mr.RecordWalletMetric(ctx, api.WalletMetric{
Timestamp: api.TimeNow(),
Confirmed: balance.Confirmed,
Unconfirmed: balance.Unconfirmed,
Spendable: balance.Spendable,
})
if err != nil {
w.logger.Errorf("failed to record wallet metric, err: %v", err)
return nil
}

return nil
}

func (w *singleAddressWallet) ProcessChainRevertUpdate(cru *chain.RevertUpdate) error { return nil }

func (w *singleAddressWallet) isSynced() bool {
var synced bool
if block, ok := w.cm.Block(w.cm.Tip().ID); ok && time.Since(block.Timestamp) < 2*w.cm.TipState().BlockInterval() {
synced = true
}
return synced
}
29 changes: 9 additions & 20 deletions stores/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,20 +73,13 @@ type (
dbMetrics *gorm.DB
logger *zap.SugaredLogger

// HostDB related fields
announcementMaxAge time.Duration

// ObjectDB related fields
slabBufferMgr *SlabBufferManager
slabPruneSigChan chan struct{}
slabBufferMgr *SlabBufferManager

// SettingsDB related fields
settingsMu sync.Mutex
settings map[string]string

// WalletDB related fields.
walletAddress types.Address

retryTransactionIntervals []time.Duration

shutdownCtx context.Context
Expand Down Expand Up @@ -221,18 +214,14 @@ func NewSQLStore(cfg Config) (*SQLStore, error) {

shutdownCtx, shutdownCtxCancel := context.WithCancel(context.Background())
ss := &SQLStore{
alerts: cfg.Alerts,
cs: cs,
db: db,
dbMetrics: dbMetrics,
logger: l,
hasAllowlist: allowlistCnt > 0,
hasBlocklist: blocklistCnt > 0,
settings: make(map[string]string),
slabPruneSigChan: make(chan struct{}, 1),
walletAddress: cfg.WalletAddress,

announcementMaxAge: cfg.AnnouncementMaxAge,
alerts: cfg.Alerts,
cs: cs,
db: db,
dbMetrics: dbMetrics,
logger: l,
hasAllowlist: allowlistCnt > 0,
hasBlocklist: blocklistCnt > 0,
settings: make(map[string]string),

retryTransactionIntervals: cfg.RetryTransactionIntervals,

Expand Down
13 changes: 8 additions & 5 deletions stores/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ type (
logger *zap.SugaredLogger
persistInterval time.Duration
retryIntervals []time.Duration
walletAddress types.Address

// WalletDB related fields.
walletAddress types.Address

// buffered state
mu sync.Mutex
Expand All @@ -45,7 +47,7 @@ type (
}
)

func NewChainSubscriber(db *gorm.DB, logger *zap.SugaredLogger, intvls []time.Duration, persistInterval time.Duration, addr types.Address, ancmtMaxAge time.Duration) (*chainSubscriber, error) {
func NewChainSubscriber(db *gorm.DB, logger *zap.SugaredLogger, intvls []time.Duration, persistInterval time.Duration, walletAddress types.Address, ancmtMaxAge time.Duration) (*chainSubscriber, error) {
var activeFCIDs, archivedFCIDs []fileContractID
if err := db.Model(&dbContract{}).
Select("fcid").
Expand All @@ -68,9 +70,10 @@ func NewChainSubscriber(db *gorm.DB, logger *zap.SugaredLogger, intvls []time.Du
db: db,
logger: logger,
retryIntervals: intvls,
walletAddress: addr,
lastSave: time.Now(),
persistInterval: persistInterval,

walletAddress: walletAddress,
lastSave: time.Now(),
persistInterval: persistInterval,

contractState: make(map[types.Hash256]contractState),
outputs: make(map[types.Hash256]outputChange),
Expand Down

0 comments on commit 014ffc9

Please sign in to comment.