diff --git a/api/api.go b/api/api.go index 99232dbc..ff2d9b6b 100644 --- a/api/api.go +++ b/api/api.go @@ -14,6 +14,7 @@ import ( "go.sia.tech/hostd/host/metrics" "go.sia.tech/hostd/host/settings" "go.sia.tech/hostd/host/storage" + "go.sia.tech/hostd/rhp" "go.sia.tech/hostd/wallet" "go.sia.tech/jape" "go.sia.tech/siad/modules" @@ -106,6 +107,14 @@ type ( AcceptTransactionSet(txns []types.Transaction) error } + // A RHPSessionReporter reports on RHP session lifecycle events + RHPSessionReporter interface { + Subscribe(rhp.SessionSubscriber) + Unsubscribe(rhp.SessionSubscriber) + + Active() []rhp.Session + } + // An api provides an HTTP API for the host api struct { hostKey types.PublicKey @@ -123,6 +132,7 @@ type ( wallet Wallet metrics Metrics settings Settings + sessions RHPSessionReporter volumeJobs volumeJobs checks integrityCheckJobs @@ -130,7 +140,7 @@ type ( ) // NewServer initializes the API -func NewServer(name string, hostKey types.PublicKey, a Alerts, g Syncer, chain ChainManager, tp TPool, cm ContractManager, am AccountManager, vm VolumeManager, m Metrics, s Settings, w Wallet, log *zap.Logger) http.Handler { +func NewServer(name string, hostKey types.PublicKey, a Alerts, g Syncer, chain ChainManager, tp TPool, cm ContractManager, am AccountManager, vm VolumeManager, rsr RHPSessionReporter, m Metrics, s Settings, w Wallet, log *zap.Logger) http.Handler { api := &api{ hostKey: hostKey, name: name, @@ -145,6 +155,7 @@ func NewServer(name string, hostKey types.PublicKey, a Alerts, g Syncer, chain C metrics: m, settings: s, wallet: w, + sessions: rsr, log: log, checks: integrityCheckJobs{ @@ -195,6 +206,9 @@ func NewServer(name string, hostKey types.PublicKey, a Alerts, g Syncer, chain C "DELETE /volumes/:id": api.handleDeleteVolume, "DELETE /volumes/:id/cancel": api.handleDELETEVolumeCancelOp, "PUT /volumes/:id/resize": api.handlePUTVolumeResize, + // session endpoints + "GET /sessions": api.handleGETSessions, + "GET /sessions/subscribe": api.handleGETSessionsSubscribe, // tpool endpoints "GET /tpool/fee": api.handleGETTPoolFee, // wallet endpoints diff --git a/api/rhpsessions.go b/api/rhpsessions.go new file mode 100644 index 00000000..5bebf072 --- /dev/null +++ b/api/rhpsessions.go @@ -0,0 +1,45 @@ +package api + +import ( + "context" + "encoding/json" + + "go.sia.tech/hostd/rhp" + "go.sia.tech/jape" + "go.uber.org/zap" + "nhooyr.io/websocket" +) + +type rhpSessionSubscriber struct { + conn *websocket.Conn +} + +func (rs *rhpSessionSubscriber) ReceiveSessionEvent(event rhp.SessionEvent) { + buf, err := json.Marshal(event) + if err != nil { + return + } + rs.conn.Write(context.Background(), websocket.MessageText, buf) +} + +func (a *api) handleGETSessions(c jape.Context) { + c.Encode(a.sessions.Active()) +} + +func (a *api) handleGETSessionsSubscribe(c jape.Context) { + wsc, err := websocket.Accept(c.ResponseWriter, c.Request, &websocket.AcceptOptions{ + OriginPatterns: []string{"*"}, + }) + if err != nil { + a.log.Warn("failed to accept websocket connection", zap.Error(err)) + return + } + defer wsc.Close(websocket.StatusNormalClosure, "") + + // subscribe the websocket conn + sub := &rhpSessionSubscriber{ + conn: wsc, + } + a.sessions.Subscribe(sub) + defer a.sessions.Unsubscribe(sub) +} diff --git a/cmd/hostd/main.go b/cmd/hostd/main.go index 67059a12..a4907665 100644 --- a/cmd/hostd/main.go +++ b/cmd/hostd/main.go @@ -328,7 +328,7 @@ func main() { auth := jape.BasicAuth(cfg.HTTP.Password) web := http.Server{ Handler: webRouter{ - api: auth(api.NewServer(cfg.Name, hostKey.PublicKey(), node.a, node.g, node.cm, node.tp, node.contracts, node.accounts, node.storage, node.metrics, node.settings, node.w, log.Named("api"))), + api: auth(api.NewServer(cfg.Name, hostKey.PublicKey(), node.a, node.g, node.cm, node.tp, node.contracts, node.accounts, node.storage, node.sessions, node.metrics, node.settings, node.w, log.Named("api"))), ui: hostd.Handler(), }, ReadTimeout: 30 * time.Second, diff --git a/cmd/hostd/node.go b/cmd/hostd/node.go index b3404432..46a60df6 100644 --- a/cmd/hostd/node.go +++ b/cmd/hostd/node.go @@ -43,6 +43,7 @@ type node struct { registry *registry.Manager storage *storage.VolumeManager + sessions *rhp.SessionReporter rhp2Monitor *rhp.DataRecorder rhp2 *rhpv2.SessionHandler rhp3Monitor *rhp.DataRecorder @@ -64,8 +65,8 @@ func (n *node) Close() error { return nil } -func startRHP2(l net.Listener, hostKey types.PrivateKey, rhp3Addr string, cs rhpv2.ChainManager, tp rhpv2.TransactionPool, w rhpv2.Wallet, cm rhpv2.ContractManager, sr rhpv2.SettingsReporter, sm rhpv2.StorageManager, monitor rhp.DataMonitor, log *zap.Logger) (*rhpv2.SessionHandler, error) { - rhp2, err := rhpv2.NewSessionHandler(l, hostKey, rhp3Addr, cs, tp, w, cm, sr, sm, monitor, discardMetricReporter{}, log) +func startRHP2(l net.Listener, hostKey types.PrivateKey, rhp3Addr string, cs rhpv2.ChainManager, tp rhpv2.TransactionPool, w rhpv2.Wallet, cm rhpv2.ContractManager, sr rhpv2.SettingsReporter, sm rhpv2.StorageManager, monitor rhp.DataMonitor, sessions *rhp.SessionReporter, log *zap.Logger) (*rhpv2.SessionHandler, error) { + rhp2, err := rhpv2.NewSessionHandler(l, hostKey, rhp3Addr, cs, tp, w, cm, sr, sm, monitor, sessions, log) if err != nil { return nil, err } @@ -73,8 +74,8 @@ func startRHP2(l net.Listener, hostKey types.PrivateKey, rhp3Addr string, cs rhp return rhp2, nil } -func startRHP3(l net.Listener, hostKey types.PrivateKey, cs rhpv3.ChainManager, tp rhpv3.TransactionPool, w rhpv3.Wallet, am rhpv3.AccountManager, cm rhpv3.ContractManager, rm rhpv3.RegistryManager, sr rhpv3.SettingsReporter, sm rhpv3.StorageManager, monitor rhp.DataMonitor, log *zap.Logger) (*rhpv3.SessionHandler, error) { - rhp3, err := rhpv3.NewSessionHandler(l, hostKey, cs, tp, w, am, cm, rm, sm, sr, monitor, discardMetricReporter{}, log) +func startRHP3(l net.Listener, hostKey types.PrivateKey, cs rhpv3.ChainManager, tp rhpv3.TransactionPool, w rhpv3.Wallet, am rhpv3.AccountManager, cm rhpv3.ContractManager, rm rhpv3.RegistryManager, sr rhpv3.SettingsReporter, sm rhpv3.StorageManager, monitor rhp.DataMonitor, sessions *rhp.SessionReporter, log *zap.Logger) (*rhpv3.SessionHandler, error) { + rhp3, err := rhpv3.NewSessionHandler(l, hostKey, cs, tp, w, am, cm, rm, sm, sr, monitor, sessions, log) if err != nil { return nil, err } @@ -179,14 +180,16 @@ func newNode(walletKey types.PrivateKey, logger *zap.Logger) (*node, types.Priva } registryManager := registry.NewManager(hostKey, db, logger.Named("registry")) + sessions := rhp.NewSessionReporter() + rhp2Monitor := rhp.NewDataRecorder(&rhp2MonitorStore{db}, logger.Named("rhp2Monitor")) - rhp2, err := startRHP2(rhp2Listener, hostKey, rhp3Listener.Addr().String(), cm, tp, w, contractManager, sr, sm, rhp2Monitor, logger.Named("rhpv2")) + rhp2, err := startRHP2(rhp2Listener, hostKey, rhp3Listener.Addr().String(), cm, tp, w, contractManager, sr, sm, rhp2Monitor, sessions, logger.Named("rhpv2")) if err != nil { return nil, types.PrivateKey{}, fmt.Errorf("failed to start rhp2: %w", err) } rhp3Monitor := rhp.NewDataRecorder(&rhp3MonitorStore{db}, logger.Named("rhp3Monitor")) - rhp3, err := startRHP3(rhp3Listener, hostKey, cm, tp, w, accountManager, contractManager, registryManager, sr, sm, rhp3Monitor, logger.Named("rhpv3")) + rhp3, err := startRHP3(rhp3Listener, hostKey, cm, tp, w, accountManager, contractManager, registryManager, sr, sm, rhp3Monitor, sessions, logger.Named("rhpv3")) if err != nil { return nil, types.PrivateKey{}, fmt.Errorf("failed to start rhp3: %w", err) } @@ -206,6 +209,7 @@ func newNode(walletKey types.PrivateKey, logger *zap.Logger) (*node, types.Priva storage: sm, registry: registryManager, + sessions: sessions, rhp2Monitor: rhp2Monitor, rhp2: rhp2, rhp3Monitor: rhp3Monitor, diff --git a/host/financials/types.go b/host/financials/types.go deleted file mode 100644 index 6ef807ae..00000000 --- a/host/financials/types.go +++ /dev/null @@ -1,86 +0,0 @@ -package financials - -import ( - "fmt" - "time" - - "go.sia.tech/core/types" -) - -const ( - // FundSourceContract identifies a contract as the source of funds - FundSourceContract = "contract" - // FundSourceAccount identifies an account as the source of funds - FundSourceAccount = "account" -) - -type ( - // A FundSource identifies the source of a financial record - FundSource struct { - ID types.Hash256 - Type string - } - - // A FundingRecord records a transfer of funds between a source contract and - // a destination account. - FundingRecord struct { - // Source is the source of the funds. It must be a contract. - Source FundSource `json:"source"` - // Destination is the destination of the funds. It must be an account. - Destination FundSource `json:"destination"` - Amount types.Currency `json:"amount"` - // Reverted indicates whether the funding source was reverted due to a - // block reorg or other consensus issue. - Reverted bool `json:"reverted"` - Timestamp time.Time `json:"timestamp"` - } - - // A Record records spending from a funding source - Record struct { - Source FundSource `json:"source"` - Egress types.Currency `json:"egress"` - Ingress types.Currency `json:"ingress"` - Fees types.Currency `json:"fees"` - Storage types.Currency `json:"storage"` - RegistryRead types.Currency `json:"registryRead"` - RegistryWrite types.Currency `json:"registryWrite"` - - Timestamp time.Time `json:"timestamp"` - } - - // Revenue tracks the host's earnings from all possible sources for a given - // period - Revenue struct { - Storage types.Currency `json:"storage"` - Ingress types.Currency `json:"ingress"` - Egress types.Currency `json:"egress"` - RegistryRead types.Currency `json:"registryRead"` - RegistryWrite types.Currency `json:"registryWrite"` - Fees types.Currency `json:"fees"` - - // AccountDrift tracks funds that were transferred to an account but are - // no longer backed by a contract due to a reorg or other consensus - // event. Once funds are transferred to an ephemeral account, the - // financial records cannot be cleanly reverted. The revenue is, - // basically, lost. - AccountDrift types.Currency `json:"accountDrift"` - - Timestamp time.Time `json:"timestamp"` - } -) - -// String returns a string representation of the fund source -func (fs FundSource) String() string { - return fmt.Sprintf("%s:%x", fs.Type, fs.ID) -} - -// UnmarshalText unmarshals a fund source from a string -func (fs FundSource) UnmarshalText(text []byte) error { - _, err := fmt.Sscanf(string(text), "%s:%x", &fs.Type, &fs.ID) - return err -} - -// MarshalText marshals a fund source to a string -func (fs FundSource) MarshalText() ([]byte, error) { - return []byte(fs.String()), nil -} diff --git a/internal/test/host.go b/internal/test/host.go index f8692eb5..95d51857 100644 --- a/internal/test/host.go +++ b/internal/test/host.go @@ -18,6 +18,7 @@ import ( "go.sia.tech/hostd/host/settings" "go.sia.tech/hostd/host/storage" "go.sia.tech/hostd/persist/sqlite" + "go.sia.tech/hostd/rhp" rhpv2 "go.sia.tech/hostd/rhp/v2" rhpv3 "go.sia.tech/hostd/rhp/v3" "go.sia.tech/hostd/wallet" @@ -26,10 +27,6 @@ import ( const blocksPerMonth = 144 * 30 -type stubMetricReporter struct{} - -func (stubMetricReporter) Report(any) (_ error) { return } - type stubDataMonitor struct{} func (stubDataMonitor) ReadBytes(n int) {} @@ -214,13 +211,15 @@ func NewHost(privKey types.PrivateKey, dir string, node *Node, log *zap.Logger) registry := registry.NewManager(privKey, db, log.Named("registry")) accounts := accounts.NewManager(db, settings) - rhpv2, err := rhpv2.NewSessionHandler(rhp2Listener, privKey, rhp3Listener.Addr().String(), node.cm, node.tp, wallet, contracts, settings, storage, stubDataMonitor{}, stubMetricReporter{}, log.Named("rhpv2")) + sessions := rhp.NewSessionReporter() + + rhpv2, err := rhpv2.NewSessionHandler(rhp2Listener, privKey, rhp3Listener.Addr().String(), node.cm, node.tp, wallet, contracts, settings, storage, stubDataMonitor{}, sessions, log.Named("rhpv2")) if err != nil { return nil, fmt.Errorf("failed to create rhpv2 session handler: %w", err) } go rhpv2.Serve() - rhpv3, err := rhpv3.NewSessionHandler(rhp3Listener, privKey, node.cm, node.tp, wallet, accounts, contracts, registry, storage, settings, stubDataMonitor{}, stubMetricReporter{}, log.Named("rhpv3")) + rhpv3, err := rhpv3.NewSessionHandler(rhp3Listener, privKey, node.cm, node.tp, wallet, accounts, contracts, registry, storage, settings, stubDataMonitor{}, sessions, log.Named("rhpv3")) if err != nil { return nil, fmt.Errorf("failed to create rhpv3 session handler: %w", err) } diff --git a/rhp/reporter.go b/rhp/reporter.go new file mode 100644 index 00000000..253f4762 --- /dev/null +++ b/rhp/reporter.go @@ -0,0 +1,211 @@ +package rhp + +import ( + "encoding/hex" + "sync" + "time" + + "go.sia.tech/core/types" + "go.sia.tech/hostd/host/contracts" + "lukechampine.com/frand" +) + +// SessionEventType is the type of a session event. +const ( + SessionEventTypeStart = "sessionStart" + SessionEventTypeEnd = "sessionEnd" + SessionEventTypeRPCStart = "rpcStart" + SessionEventTypeRPCEnd = "rpcEnd" +) + +// SessionProtocol is the protocol used by a session. +const ( + SessionProtocolTCP = "tcp" + SessionProtocolWS = "websocket" +) + +type ( + // UID is a unique identifier for a session or RPC. + UID [8]byte + + // A Session is an open connection between a host and a renter. + Session struct { + conn *Conn + + ID UID `json:"id"` + Protocol string `json:"protocol"` + RHPVersion int `json:"rhpVersion"` + PeerAddress string `json:"peerAddress"` + Ingress uint64 `json:"ingress"` + Egress uint64 `json:"egress"` + Usage contracts.Usage `json:"usage"` + SuccessfulRPCs uint64 `json:"successfulRPCs"` + FailedRPCs uint64 `json:"failedRPCs"` + + Timestamp time.Time `json:"timestamp"` + } + + // An RPC is an RPC call made by a renter to a host. + RPC struct { + ID UID `json:"ID"` + SessionID UID `json:"sessionID"` + RPC types.Specifier `json:"rpc"` + Usage contracts.Usage `json:"usage"` + Error error `json:"error,omitempty"` + Elapsed time.Duration `json:"timestamp"` + } + + // A SessionSubscriber receives session events. + SessionSubscriber interface { + ReceiveSessionEvent(SessionEvent) + } + + // A SessionReporter manages open sessions and reports session events to + // subscribers. + SessionReporter struct { + mu sync.Mutex + sessions map[UID]Session + subscribers map[SessionSubscriber]struct{} + } + + // A SessionEvent is an event that occurs during a session. + SessionEvent struct { + Type string `json:"type"` + Session Session `json:"session"` + RPC any `json:"rpc,omitempty"` + } +) + +// String returns the hex-encoded string representation of the UID. +func (u UID) String() string { + return hex.EncodeToString(u[:]) +} + +func (sr *SessionReporter) updateSubscribers(sessionID UID, eventType string, rpc any) { + sess, ok := sr.sessions[sessionID] + if !ok { + return + } + + sess.Ingress, sess.Egress = sess.conn.Usage() + sr.sessions[sessionID] = sess + + for sub := range sr.subscribers { + sub.ReceiveSessionEvent(SessionEvent{ + Type: eventType, + Session: sess, + RPC: rpc, + }) + } +} + +// Subscribe subscribes to session events. +func (sr *SessionReporter) Subscribe(sub SessionSubscriber) { + sr.mu.Lock() + defer sr.mu.Unlock() + + sr.subscribers[sub] = struct{}{} +} + +// Unsubscribe unsubscribes from session events. +func (sr *SessionReporter) Unsubscribe(sub SessionSubscriber) { + sr.mu.Lock() + defer sr.mu.Unlock() + + delete(sr.subscribers, sub) +} + +// StartSession starts a new session and returns a function that should be +// called when the session ends. +func (sr *SessionReporter) StartSession(conn *Conn, proto string, version int) (sessionID UID, end func()) { + sr.mu.Lock() + defer sr.mu.Unlock() + + copy(sessionID[:], frand.Bytes(8)) + sr.sessions[sessionID] = Session{ + conn: conn, + + ID: sessionID, + RHPVersion: version, + Protocol: proto, + PeerAddress: conn.RemoteAddr().String(), + Timestamp: time.Now(), + } + sr.updateSubscribers(sessionID, SessionEventTypeStart, nil) + return sessionID, func() { + sr.mu.Lock() + defer sr.mu.Unlock() + + sr.updateSubscribers(sessionID, SessionEventTypeEnd, nil) + delete(sr.sessions, sessionID) + } +} + +// StartRPC starts a new RPC and returns a function that should be called when +// the RPC ends. +func (sr *SessionReporter) StartRPC(sessionID UID, rpc types.Specifier) (rpcID UID, end func(contracts.Usage, error)) { + sr.mu.Lock() + defer sr.mu.Unlock() + + copy(rpcID[:], frand.Bytes(8)) + _, ok := sr.sessions[sessionID] + if !ok { + return rpcID, func(contracts.Usage, error) {} + } + + event := RPC{ + ID: rpcID, + SessionID: sessionID, + RPC: rpc, + } + rpcStart := time.Now() + sr.updateSubscribers(sessionID, SessionEventTypeRPCStart, event) + return rpcID, func(usage contracts.Usage, err error) { + // update event + event.Error = err + event.Elapsed = time.Since(rpcStart) + event.Usage = usage + + sr.mu.Lock() + defer sr.mu.Unlock() + + sess, ok := sr.sessions[sessionID] + if !ok { + return + } + + // update session + if err == nil { + sess.SuccessfulRPCs++ + } else { + sess.FailedRPCs++ + } + sess.Usage = sess.Usage.Add(usage) + sr.sessions[sessionID] = sess + // update subscribers + sr.updateSubscribers(sessionID, SessionEventTypeRPCEnd, event) + } +} + +// Active returns a snapshot of the currently active sessions. +func (sr *SessionReporter) Active() []Session { + sr.mu.Lock() + defer sr.mu.Unlock() + + sessions := make([]Session, 0, len(sr.sessions)) + for _, sess := range sr.sessions { + // update session usage + sess.Ingress, sess.Egress = sess.conn.Usage() + sr.sessions[sess.ID] = sess + // append to slice + sessions = append(sessions, sess) + } + return sessions +} + +// NewSessionReporter returns a new SessionReporter. +func NewSessionReporter() *SessionReporter { + return &SessionReporter{ + sessions: make(map[UID]Session), + } +} diff --git a/rhp/v2/metrics.go b/rhp/v2/metrics.go deleted file mode 100644 index b90eb51c..00000000 --- a/rhp/v2/metrics.go +++ /dev/null @@ -1,125 +0,0 @@ -package rhp - -import ( - "encoding/hex" - "time" - - "go.sia.tech/core/types" - "lukechampine.com/frand" -) - -// A UniqueID is a unique identifier for an RPC or Session. -type UniqueID [8]byte - -// String returns a string representation of the UniqueID. -func (u UniqueID) String() string { - return hex.EncodeToString(u[:]) -} - -// MarshalJSON marshals the UniqueID to JSON. -func (u UniqueID) MarshalJSON() ([]byte, error) { - return []byte(`"` + u.String() + `"`), nil -} - -// generateUniqueID returns a random UniqueID. -func generateUniqueID() (id UniqueID) { - frand.Read(id[:]) - return -} - -type ( - // EventSessionStart records the start of a new renter session. - EventSessionStart struct { - UID UniqueID `json:"uid"` - RenterIP string `json:"renterIP"` - Timestamp time.Time `json:"timestamp"` - } - - // EventSessionEnd records the end of a renter session. - EventSessionEnd struct { - UID UniqueID `json:"uid"` - Timestamp time.Time `json:"timestamp"` - Elapsed time.Duration `json:"elapsed"` - } - - // EventRPCStart records the start of an RPC. - EventRPCStart struct { - RPC types.Specifier `json:"rpc"` - SessionUID UniqueID `json:"sessionUID"` - Timestamp time.Time `json:"timestamp"` - } - - // EventRPCEnd records the end of an RPC. - EventRPCEnd struct { - RPC types.Specifier `json:"rpc"` - SessionUID UniqueID `json:"sessionUID"` - Error error `json:"error"` - - Spending types.Currency `json:"spending"` - ReadBytes uint64 `json:"readBytes"` - WriteBytes uint64 `json:"writeBytes"` - - Elapsed time.Duration `json:"elapsed"` - Timestamp time.Time `json:"timestamp"` - } - - // EventContractFormed records the formation of a new contract - EventContractFormed struct { - SessionUID UniqueID `json:"sessionUID"` - ContractID types.FileContractID `json:"contractID"` - Contract types.FileContractRevision `json:"contract"` - } - - // EventContractRenewed records the renewal of a contract. - EventContractRenewed struct { - SessionUID UniqueID `json:"sessionUID"` - ContractID types.FileContractID `json:"contractID"` - FinalizedContractID types.FileContractID `json:"finalizedContractID"` - Contract types.FileContractRevision `json:"contract"` - FinalizedContract types.FileContractRevision `json:"finalizedContract"` - } -) - -func (sh *SessionHandler) recordSessionStart(s *session) func() { - start := time.Now() - s.uid = generateUniqueID() - sh.metrics.Report(EventSessionStart{ - UID: s.uid, - RenterIP: s.conn.RemoteAddr().String(), - Timestamp: start, - }) - return func() { - sh.metrics.Report(EventSessionEnd{ - UID: s.uid, - Elapsed: time.Since(start), - Timestamp: time.Now(), - }) - } -} - -func (sh *SessionHandler) recordRPC(id types.Specifier, s *session) func(error) { - start := time.Now() - sh.metrics.Report(EventRPCStart{ - RPC: id, - SessionUID: s.uid, - Timestamp: start, - }) - rs, ws := s.conn.Usage() - spent := s.spent - return func(err error) { - re, we := s.conn.Usage() - - sh.metrics.Report(EventRPCEnd{ - RPC: id, - SessionUID: s.uid, - Error: err, - - Spending: s.spent.Sub(spent), - ReadBytes: re - rs, - WriteBytes: we - ws, - - Elapsed: time.Since(start), - Timestamp: time.Now(), - }) - } -} diff --git a/rhp/v2/rhp.go b/rhp/v2/rhp.go index addb97ec..74c28481 100644 --- a/rhp/v2/rhp.go +++ b/rhp/v2/rhp.go @@ -12,7 +12,6 @@ import ( rhpv2 "go.sia.tech/core/rhp/v2" "go.sia.tech/core/types" "go.sia.tech/hostd/host/contracts" - "go.sia.tech/hostd/host/financials" "go.sia.tech/hostd/host/settings" "go.sia.tech/hostd/internal/threadgroup" "go.sia.tech/hostd/rhp" @@ -88,14 +87,10 @@ type ( BandwidthLimiters() (ingress, egress *rate.Limiter) } - // MetricReporter records metrics from the host - MetricReporter interface { - Report(any) error - } - - // A FinancialReporter records financial transactions on the host. - FinancialReporter interface { - Add(financials.Record) error + // SessionReporter reports session metrics + SessionReporter interface { + StartSession(conn *rhp.Conn, proto string, version int) (sessionID rhp.UID, end func()) + StartRPC(sessionID rhp.UID, rpc types.Specifier) (rpcID rhp.UID, end func(contracts.Usage, error)) } // A SessionHandler handles the host side of the renter-host protocol and @@ -113,14 +108,14 @@ type ( wallet Wallet contracts ContractManager - metrics MetricReporter + sessions SessionReporter settings SettingsReporter storage StorageManager log *zap.Logger } ) -func (sh *SessionHandler) rpcLoop(sess *session) error { +func (sh *SessionHandler) rpcLoop(sess *session, log *zap.Logger) error { done, err := sh.tg.Add() if err != nil { return err @@ -132,8 +127,7 @@ func (sh *SessionHandler) rpcLoop(sess *session) error { return fmt.Errorf("failed to read RPC ID: %w", err) } - var rpcFn func(*session, *zap.Logger) error - rpcFn, ok := map[types.Specifier]func(*session, *zap.Logger) error{ + rpcFn, ok := map[types.Specifier]func(*session, *zap.Logger) (contracts.Usage, error){ rhpv2.RPCFormContractID: sh.rpcFormContract, rhpv2.RPCRenewClearContractID: sh.rpcRenewAndClearContract, rhpv2.RPCLockID: sh.rpcLock, @@ -149,11 +143,11 @@ func (sh *SessionHandler) rpcLoop(sess *session) error { return err } start := time.Now() - recordEnd := sh.recordRPC(id, sess) - log := sh.log.Named(id.String()).With(zap.String("peerAddr", sess.conn.RemoteAddr().String())) + rpcID, end := sh.sessions.StartRPC(sess.id, id) + log = log.Named(id.String()).With(zap.Stringer("rpcID", rpcID)) log.Debug("RPC start") - err = rpcFn(sess, log) - recordEnd(err) + usage, err := rpcFn(sess, log) + end(usage, err) if err != nil { log.Warn("RPC error", zap.Error(err), zap.Duration("elapsed", time.Since(start))) return fmt.Errorf("RPC %q error: %w", id, err) @@ -166,30 +160,33 @@ func (sh *SessionHandler) rpcLoop(sess *session) error { func (sh *SessionHandler) upgrade(conn net.Conn) error { // wrap the conn with the bandwidth limiters ingressLimiter, egressLimiter := sh.settings.BandwidthLimiters() - conn = rhp.NewConn(conn, sh.monitor, ingressLimiter, egressLimiter) + rhpConn := rhp.NewConn(conn, sh.monitor, ingressLimiter, egressLimiter) - t, err := rhpv2.NewHostTransport(conn, sh.privateKey) + t, err := rhpv2.NewHostTransport(rhpConn, sh.privateKey) if err != nil { return err } + sessionID, end := sh.sessions.StartSession(rhpConn, rhp.SessionProtocolTCP, 2) + defer end() + sess := &session{ - conn: conn.(*rhp.Conn), - t: t, - metrics: sh.metrics, + id: sessionID, + conn: rhpConn, + t: t, } defer t.Close() - recordEnd := sh.recordSessionStart(sess) - defer recordEnd() defer func() { if sess.contract.Revision.ParentID != (types.FileContractID{}) { sh.contracts.Unlock(sess.contract.Revision.ParentID) } }() + log := sh.log.With(zap.Stringer("sessionID", sessionID), zap.Stringer("peerAddr", conn.RemoteAddr())) + for { - if err := sh.rpcLoop(sess); err != nil { + if err := sh.rpcLoop(sess, log); err != nil { return err } } @@ -285,7 +282,7 @@ func (sh *SessionHandler) LocalAddr() string { } // NewSessionHandler creates a new RHP2 SessionHandler -func NewSessionHandler(l net.Listener, hostKey types.PrivateKey, rhp3Addr string, cm ChainManager, tpool TransactionPool, wallet Wallet, contracts ContractManager, settings SettingsReporter, storage StorageManager, monitor rhp.DataMonitor, metrics MetricReporter, log *zap.Logger) (*SessionHandler, error) { +func NewSessionHandler(l net.Listener, hostKey types.PrivateKey, rhp3Addr string, cm ChainManager, tpool TransactionPool, wallet Wallet, contracts ContractManager, settings SettingsReporter, storage StorageManager, monitor rhp.DataMonitor, sessions SessionReporter, log *zap.Logger) (*SessionHandler, error) { _, rhp3Port, err := net.SplitHostPort(rhp3Addr) if err != nil { return nil, fmt.Errorf("failed to parse rhp3 addr: %w", err) @@ -303,7 +300,7 @@ func NewSessionHandler(l net.Listener, hostKey types.PrivateKey, rhp3Addr string wallet: wallet, contracts: contracts, - metrics: metrics, + sessions: sessions, settings: settings, storage: storage, log: log, diff --git a/rhp/v2/rpc.go b/rhp/v2/rpc.go index 00bcecd7..ca26a01e 100644 --- a/rhp/v2/rpc.go +++ b/rhp/v2/rpc.go @@ -36,33 +36,33 @@ var ( ErrNotAcceptingContracts = errors.New("host is not accepting contracts") ) -func (sh *SessionHandler) rpcSettings(s *session, log *zap.Logger) error { +func (sh *SessionHandler) rpcSettings(s *session, log *zap.Logger) (contracts.Usage, error) { settings, err := sh.Settings() if err != nil { s.t.WriteResponseErr(ErrHostInternalError) - return fmt.Errorf("failed to get host settings: %w", err) + return contracts.Usage{}, fmt.Errorf("failed to get host settings: %w", err) } js, err := json.Marshal(settings) if err != nil { s.t.WriteResponseErr(ErrHostInternalError) - return fmt.Errorf("failed to marshal settings: %v", err) + return contracts.Usage{}, fmt.Errorf("failed to marshal settings: %v", err) } - return s.writeResponse(&rhpv2.RPCSettingsResponse{ + return contracts.Usage{}, s.writeResponse(&rhpv2.RPCSettingsResponse{ Settings: js, }, 30*time.Second) } -func (sh *SessionHandler) rpcLock(s *session, log *zap.Logger) error { +func (sh *SessionHandler) rpcLock(s *session, log *zap.Logger) (contracts.Usage, error) { var req rhpv2.RPCLockRequest if err := s.readRequest(&req, minMessageSize, 30*time.Second); err != nil { - return err + return contracts.Usage{}, err } // Check if a contract is already locked. if s.contract.Revision.ParentID != (types.FileContractID{}) { err := ErrContractAlreadyLocked s.t.WriteResponseErr(err) - return err + return contracts.Usage{}, err } ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) @@ -72,7 +72,7 @@ func (sh *SessionHandler) rpcLock(s *session, log *zap.Logger) error { if err != nil { err := fmt.Errorf("failed to lock contract: %w", err) s.t.WriteResponseErr(err) - return err + return contracts.Usage{}, err } // verify the renter's challenge signature @@ -81,7 +81,7 @@ func (sh *SessionHandler) rpcLock(s *session, log *zap.Logger) error { sh.contracts.Unlock(contract.Revision.ParentID) err := fmt.Errorf("challenge failed: %w", ErrInvalidRenterSignature) s.t.WriteResponseErr(err) - return err + return contracts.Usage{}, err } // set the contract @@ -95,32 +95,32 @@ func (sh *SessionHandler) rpcLock(s *session, log *zap.Logger) error { // avoid holding lock during network round trip if err := s.writeResponse(lockResp, 30*time.Second); err != nil { sh.contracts.Unlock(contract.Revision.ParentID) - return fmt.Errorf("failed to write lock response: %w", err) + return contracts.Usage{}, fmt.Errorf("failed to write lock response: %w", err) } - return nil + return contracts.Usage{}, nil } // rpcUnlock unlocks the contract associated with the session. -func (sh *SessionHandler) rpcUnlock(s *session, log *zap.Logger) error { +func (sh *SessionHandler) rpcUnlock(s *session, log *zap.Logger) (contracts.Usage, error) { // check if a contract is locked if s.contract.Revision.ParentID == (types.FileContractID{}) { - return ErrNoContractLocked + return contracts.Usage{}, ErrNoContractLocked } sh.contracts.Unlock(s.contract.Revision.ParentID) s.contract = contracts.SignedRevision{} - return nil + return contracts.Usage{}, nil } // rpcFormContract is an RPC that forms a contract between a renter and the // host. -func (sh *SessionHandler) rpcFormContract(s *session, log *zap.Logger) error { +func (sh *SessionHandler) rpcFormContract(s *session, log *zap.Logger) (contracts.Usage, error) { if !sh.settings.Settings().AcceptingContracts { s.t.WriteResponseErr(ErrNotAcceptingContracts) - return ErrNotAcceptingContracts + return contracts.Usage{}, ErrNotAcceptingContracts } var req rhpv2.RPCFormContractRequest if err := s.readRequest(&req, 10*minMessageSize, time.Minute); err != nil { - return err + return contracts.Usage{}, err } formationTxnSet := req.Transactions // if the transaction set does not contain any transaction or if the @@ -128,11 +128,11 @@ func (sh *SessionHandler) rpcFormContract(s *session, log *zap.Logger) error { if len(formationTxnSet) == 0 || len(formationTxnSet[len(formationTxnSet)-1].FileContracts) != 1 { err := ErrTxnMissingContract s.t.WriteResponseErr(err) - return err + return contracts.Usage{}, err } else if req.RenterKey.Algorithm != types.SpecifierEd25519 { err := errors.New("unsupported renter key algorithm") s.t.WriteResponseErr(err) - return err + return contracts.Usage{}, err } renterPub := *(*types.PublicKey)(req.RenterKey.Key) // get the host's public key, current block height, and settings @@ -140,7 +140,7 @@ func (sh *SessionHandler) rpcFormContract(s *session, log *zap.Logger) error { settings, err := sh.Settings() if err != nil { s.t.WriteResponseErr(ErrHostInternalError) - return fmt.Errorf("failed to get host settings: %w", err) + return contracts.Usage{}, fmt.Errorf("failed to get host settings: %w", err) } currentHeight := sh.cm.TipState().Index.Height // get the contract from the transaction set @@ -152,7 +152,7 @@ func (sh *SessionHandler) rpcFormContract(s *session, log *zap.Logger) error { if err != nil { err := fmt.Errorf("contract rejected: validation failed: %w", err) s.t.WriteResponseErr(err) - return err + return contracts.Usage{}, err } // calculate the host's collateral and add the inputs to the transaction @@ -160,7 +160,7 @@ func (sh *SessionHandler) rpcFormContract(s *session, log *zap.Logger) error { toSign, discard, err := sh.wallet.FundTransaction(formationTxn, hostCollateral) if err != nil { s.t.WriteResponseErr(ErrHostInternalError) - return fmt.Errorf("failed to fund formation transaction: %w", err) + return contracts.Usage{}, fmt.Errorf("failed to fund formation transaction: %w", err) } defer discard() @@ -175,17 +175,17 @@ func (sh *SessionHandler) rpcFormContract(s *session, log *zap.Logger) error { Outputs: formationTxn.SiacoinOutputs[renterOutputs:], } if err := s.writeResponse(hostAdditionsResp, 30*time.Second); err != nil { - return fmt.Errorf("failed to write host additions: %w", err) + return contracts.Usage{}, fmt.Errorf("failed to write host additions: %w", err) } // read and validate the renter's signatures var renterSignaturesResp rhpv2.RPCFormContractSignatures if err := s.readResponse(&renterSignaturesResp, 10*minMessageSize, 30*time.Second); err != nil { - return fmt.Errorf("failed to read renter signatures: %w", err) + return contracts.Usage{}, fmt.Errorf("failed to read renter signatures: %w", err) } else if err := validateRenterRevisionSignature(renterSignaturesResp.RevisionSignature, initialRevision.ParentID, sigHash, renterPub); err != nil { err := fmt.Errorf("contract rejected: validation failed: %w", err) s.t.WriteResponseErr(err) - return err + return contracts.Usage{}, err } // add the renter's signatures to the transaction and contract revision renterTxnSigs := len(renterSignaturesResp.ContractSignatures) @@ -194,13 +194,13 @@ func (sh *SessionHandler) rpcFormContract(s *session, log *zap.Logger) error { // sign and broadcast the formation transaction if err = sh.wallet.SignTransaction(sh.cm.TipState(), formationTxn, toSign, types.CoveredFields{WholeTransaction: true}); err != nil { s.t.WriteResponseErr(ErrHostInternalError) - return fmt.Errorf("failed to sign formation transaction: %w", err) + return contracts.Usage{}, fmt.Errorf("failed to sign formation transaction: %w", err) } else if err = sh.tpool.AcceptTransactionSet(formationTxnSet); err != nil { err = fmt.Errorf("failed to broadcast formation transaction: %w", err) buf, _ := json.Marshal(formationTxnSet) log.Error("failed to broadcast formation transaction", zap.Error(err), zap.String("txnset", string(buf))) s.t.WriteResponseErr(err) - return err + return contracts.Usage{}, err } signedRevision := contracts.SignedRevision{ @@ -213,18 +213,9 @@ func (sh *SessionHandler) rpcFormContract(s *session, log *zap.Logger) error { } if err := sh.contracts.AddContract(signedRevision, formationTxnSet, hostCollateral, usage); err != nil { s.t.WriteResponseErr(ErrHostInternalError) - return fmt.Errorf("failed to add contract to store: %w", err) + return contracts.Usage{}, fmt.Errorf("failed to add contract to store: %w", err) } - // add the contract fee to the amount spent by the renter - s.Spend(settings.ContractPrice) - // log the formation event - sh.metrics.Report(EventContractFormed{ - SessionUID: s.uid, - ContractID: formationTxn.FileContractID(0), - Contract: initialRevision, - }) - // send the host signatures to the renter hostSignaturesResp := &rhpv2.RPCFormContractSignatures{ ContractSignatures: formationTxn.Signatures[renterTxnSigs:], @@ -234,23 +225,20 @@ func (sh *SessionHandler) rpcFormContract(s *session, log *zap.Logger) error { CoveredFields: types.CoveredFields{FileContractRevisions: []uint64{0}}, }, } - if err := s.writeResponse(hostSignaturesResp, 30*time.Second); err != nil { - return fmt.Errorf("failed to write host signatures: %w", err) - } - return nil + return usage, s.writeResponse(hostSignaturesResp, 30*time.Second) } // rpcRenewAndClearContract is an RPC that renews a contract and clears the // existing contract -func (sh *SessionHandler) rpcRenewAndClearContract(s *session, log *zap.Logger) error { +func (sh *SessionHandler) rpcRenewAndClearContract(s *session, log *zap.Logger) (contracts.Usage, error) { state := sh.cm.TipState() settings, err := sh.Settings() if err != nil { s.t.WriteResponseErr(ErrHostInternalError) - return fmt.Errorf("failed to get host settings: %w", err) + return contracts.Usage{}, fmt.Errorf("failed to get host settings: %w", err) } else if !settings.AcceptingContracts { s.t.WriteResponseErr(ErrNotAcceptingContracts) - return ErrNotAcceptingContracts + return contracts.Usage{}, ErrNotAcceptingContracts } hostUnlockKey := sh.privateKey.PublicKey().UnlockKey() @@ -259,26 +247,26 @@ func (sh *SessionHandler) rpcRenewAndClearContract(s *session, log *zap.Logger) if err := s.ContractRevisable(state.Index.Height); err != nil { err := fmt.Errorf("contract not revisable: %w", err) s.t.WriteResponseErr(err) - return err + return contracts.Usage{}, err } var req rhpv2.RPCRenewAndClearContractRequest if err := s.readRequest(&req, 10*minMessageSize, time.Minute); err != nil { - return fmt.Errorf("failed to read renew request: %w", err) + return contracts.Usage{}, fmt.Errorf("failed to read renew request: %w", err) } renterKey, err := convertToPublicKey(req.RenterKey) if err != nil { err = fmt.Errorf("failed to convert renter key: %w", err) s.t.WriteResponseErr(err) - return err + return contracts.Usage{}, err } renewalTxnSet := req.Transactions if len(renewalTxnSet) == 0 || len(renewalTxnSet[len(renewalTxnSet)-1].FileContracts) != 1 { err := ErrTxnMissingContract s.t.WriteResponseErr(err) - return err + return contracts.Usage{}, err } renewalParents := renewalTxnSet[:len(renewalTxnSet)-1] renewalTxn := renewalTxnSet[len(renewalTxnSet)-1] @@ -289,7 +277,7 @@ func (sh *SessionHandler) rpcRenewAndClearContract(s *session, log *zap.Logger) if err != nil { err = fmt.Errorf("failed to create clearing revision: %w", err) s.t.WriteResponseErr(err) - return err + return contracts.Usage{}, err } expectedExchange := settings.BaseRPCPrice // if the contract has less than the host's current base RPC price, cap @@ -301,7 +289,7 @@ func (sh *SessionHandler) rpcRenewAndClearContract(s *session, log *zap.Logger) if err != nil { err = fmt.Errorf("invalid clearing revision: %w", err) s.t.WriteResponseErr(err) - return err + return contracts.Usage{}, err } clearingUsage := contracts.Usage{ RPCRevenue: finalPayment, @@ -323,7 +311,7 @@ func (sh *SessionHandler) rpcRenewAndClearContract(s *session, log *zap.Logger) if err != nil { err = fmt.Errorf("invalid contract renewal: %w", err) s.t.WriteResponseErr(err) - return err + return contracts.Usage{}, err } renewalUsage := contracts.Usage{ RPCRevenue: settings.ContractPrice, @@ -335,7 +323,7 @@ func (sh *SessionHandler) rpcRenewAndClearContract(s *session, log *zap.Logger) toSign, discard, err := sh.wallet.FundTransaction(&renewalTxn, lockedCollateral) if err != nil { s.t.WriteResponseErr(ErrHostInternalError) - return fmt.Errorf("failed to fund renewal transaction: %w", err) + return contracts.Usage{}, fmt.Errorf("failed to fund renewal transaction: %w", err) } defer discard() @@ -345,15 +333,15 @@ func (sh *SessionHandler) rpcRenewAndClearContract(s *session, log *zap.Logger) Outputs: renewalTxn.SiacoinOutputs[renterOutputs:], } if err = s.writeResponse(hostAdditionsResp, 30*time.Second); err != nil { - return fmt.Errorf("failed to write host additions: %w", err) + return contracts.Usage{}, fmt.Errorf("failed to write host additions: %w", err) } // read the renter's signatures for the renewal var renterSigsResp rhpv2.RPCRenewAndClearContractSignatures if err = s.readResponse(&renterSigsResp, minMessageSize, 30*time.Second); err != nil { - return fmt.Errorf("failed to read renter signatures: %w", err) + return contracts.Usage{}, fmt.Errorf("failed to read renter signatures: %w", err) } else if len(renterSigsResp.RevisionSignature.Signature) != 64 { - return fmt.Errorf("invalid renter signature length: %w", ErrInvalidRenterSignature) + return contracts.Usage{}, fmt.Errorf("invalid renter signature length: %w", ErrInvalidRenterSignature) } // add the renter's signatures to the formation transaction @@ -361,7 +349,7 @@ func (sh *SessionHandler) rpcRenewAndClearContract(s *session, log *zap.Logger) // sign the transaction if err = sh.wallet.SignTransaction(state, &renewalTxn, toSign, types.CoveredFields{WholeTransaction: true}); err != nil { s.t.WriteResponseErr(ErrHostInternalError) - return fmt.Errorf("failed to sign renewal transaction: %w", err) + return contracts.Usage{}, fmt.Errorf("failed to sign renewal transaction: %w", err) } // create the initial revision @@ -373,7 +361,7 @@ func (sh *SessionHandler) rpcRenewAndClearContract(s *session, log *zap.Logger) if !s.contract.RenterKey().VerifyHash(clearingRevSigHash, renterSigsResp.FinalRevisionSignature) { err := fmt.Errorf("failed to verify clearing revision signature: %w", ErrInvalidRenterSignature) s.t.WriteResponseErr(err) - return err + return contracts.Usage{}, err } // verify the renewal revision signature @@ -382,7 +370,7 @@ func (sh *SessionHandler) rpcRenewAndClearContract(s *session, log *zap.Logger) if !renterKey.VerifyHash(renewalSigHash, renterRenewalSig) { err := fmt.Errorf("failed to verify renewal revision signature: %w", ErrInvalidRenterSignature) s.t.WriteResponseErr(err) - return err + return contracts.Usage{}, err } signedClearing := contracts.SignedRevision{ @@ -401,12 +389,12 @@ func (sh *SessionHandler) rpcRenewAndClearContract(s *session, log *zap.Logger) if err = sh.tpool.AcceptTransactionSet(renewalTxnSet); err != nil { err = fmt.Errorf("failed to broadcast renewal transaction: %w", err) s.t.WriteResponseErr(err) - return err + return contracts.Usage{}, err } // update the existing contract and add the renewed contract to the store if err := sh.contracts.RenewContract(signedRenewal, signedClearing, renewalTxnSet, lockedCollateral, clearingUsage, renewalUsage); err != nil { s.t.WriteResponseErr(ErrHostInternalError) - return fmt.Errorf("failed to renew contract: %w", err) + return contracts.Usage{}, fmt.Errorf("failed to renew contract: %w", err) } // send the host signatures to the renter @@ -415,30 +403,27 @@ func (sh *SessionHandler) rpcRenewAndClearContract(s *session, log *zap.Logger) RevisionSignature: signedRenewal.Signatures()[0], FinalRevisionSignature: signedClearing.HostSignature, } - if err := s.writeResponse(hostSigsResp, 30*time.Second); err != nil { - return fmt.Errorf("failed to write host signatures: %w", err) - } - return nil + return clearingUsage.Add(renewalUsage), s.writeResponse(hostSigsResp, 30*time.Second) } // rpcSectorRoots returns the Merkle roots of the sectors in a contract -func (sh *SessionHandler) rpcSectorRoots(s *session, log *zap.Logger) error { +func (sh *SessionHandler) rpcSectorRoots(s *session, log *zap.Logger) (contracts.Usage, error) { currentHeight := sh.cm.TipState().Index.Height if err := s.ContractRevisable(currentHeight); err != nil { err := fmt.Errorf("contract not revisable: %w", err) s.t.WriteResponseErr(err) - return err + return contracts.Usage{}, err } var req rhpv2.RPCSectorRootsRequest if err := s.readRequest(&req, minMessageSize, 30*time.Second); err != nil { - return fmt.Errorf("failed to read sector roots request: %w", err) + return contracts.Usage{}, fmt.Errorf("failed to read sector roots request: %w", err) } settings, err := sh.Settings() if err != nil { s.t.WriteResponseErr(ErrHostInternalError) - return fmt.Errorf("failed to get host settings: %w", err) + return contracts.Usage{}, fmt.Errorf("failed to get host settings: %w", err) } costs := rpcSectorRootsCost(req.NumRoots, req.RootOffset, settings) @@ -449,14 +434,14 @@ func (sh *SessionHandler) rpcSectorRoots(s *session, log *zap.Logger) error { if err != nil { err := fmt.Errorf("failed to revise contract: %w", err) s.t.WriteResponseErr(err) - return err + return contracts.Usage{}, err } payment, _, err := rhp.ValidateRevision(s.contract.Revision, revision, cost, types.ZeroCurrency) if err != nil { err := fmt.Errorf("failed to validate revision: %w", err) s.t.WriteResponseErr(err) - return err + return contracts.Usage{}, err } // validate the renter's signature @@ -464,24 +449,24 @@ func (sh *SessionHandler) rpcSectorRoots(s *session, log *zap.Logger) error { if !s.contract.RenterKey().VerifyHash(sigHash, req.Signature) { err := fmt.Errorf("failed to validate revision: %w", ErrInvalidRenterSignature) s.t.WriteResponseErr(err) - return err + return contracts.Usage{}, err } hostSig := sh.privateKey.SignHash(sigHash) if req.NumRoots > math.MaxInt { err := errors.New("too many requested sector roots") s.t.WriteResponseErr(err) - return err + return contracts.Usage{}, err } else if req.RootOffset > math.MaxInt { err := errors.New("sector root offset is too large") s.t.WriteResponseErr(err) - return err + return contracts.Usage{}, err } roots, err := sh.contracts.SectorRoots(s.contract.Revision.ParentID, int(req.NumRoots), int(req.RootOffset)) if err != nil { s.t.WriteResponseErr(ErrHostInternalError) - return fmt.Errorf("failed to get sector roots: %w", err) + return contracts.Usage{}, fmt.Errorf("failed to get sector roots: %w", err) } // commit the revision @@ -493,7 +478,7 @@ func (sh *SessionHandler) rpcSectorRoots(s *session, log *zap.Logger) error { updater, err := sh.contracts.ReviseContract(revision.ParentID) if err != nil { s.t.WriteResponseErr(ErrHostInternalError) - return fmt.Errorf("failed to revise contract: %w", err) + return contracts.Usage{}, fmt.Errorf("failed to revise contract: %w", err) } defer updater.Close() @@ -503,9 +488,10 @@ func (sh *SessionHandler) rpcSectorRoots(s *session, log *zap.Logger) error { costs.Egress = costs.Egress.Add(excess) } - if err := updater.Commit(signedRevision, costs.ToUsage()); err != nil { + usage := costs.ToUsage() + if err := updater.Commit(signedRevision, usage); err != nil { s.t.WriteResponseErr(ErrHostInternalError) - return fmt.Errorf("failed to commit contract revision: %w", err) + return contracts.Usage{}, fmt.Errorf("failed to commit contract revision: %w", err) } s.contract = signedRevision @@ -514,31 +500,26 @@ func (sh *SessionHandler) rpcSectorRoots(s *session, log *zap.Logger) error { MerkleProof: rhpv2.BuildSectorRangeProof(roots, req.RootOffset, req.RootOffset+req.NumRoots), Signature: hostSig, } - if err := s.writeResponse(sectorRootsResp, 2*time.Minute); err != nil { - return fmt.Errorf("failed to write sector roots response: %w", err) - } - - s.Spend(cost) - return nil + return usage, s.writeResponse(sectorRootsResp, 2*time.Minute) } -func (sh *SessionHandler) rpcWrite(s *session, log *zap.Logger) error { +func (sh *SessionHandler) rpcWrite(s *session, log *zap.Logger) (contracts.Usage, error) { currentHeight := sh.cm.TipState().Index.Height // get the locked contract and check that it is revisable if err := s.ContractRevisable(currentHeight); err != nil { err := fmt.Errorf("contract not revisable: %w", err) s.t.WriteResponseErr(err) - return err + return contracts.Usage{}, err } settings, err := sh.Settings() if err != nil { s.t.WriteResponseErr(ErrHostInternalError) - return fmt.Errorf("failed to get settings: %w", err) + return contracts.Usage{}, fmt.Errorf("failed to get settings: %w", err) } var req rhpv2.RPCWriteRequest if err := s.readRequest(&req, 5*rhpv2.SectorSize, 5*time.Minute); err != nil { - return fmt.Errorf("failed to read write request: %w", err) + return contracts.Usage{}, fmt.Errorf("failed to read write request: %w", err) } remainingDuration := uint64(s.contract.Revision.WindowEnd) - currentHeight @@ -548,7 +529,7 @@ func (sh *SessionHandler) rpcWrite(s *session, log *zap.Logger) error { if err != nil { err := fmt.Errorf("failed to validate write actions: %w", err) s.t.WriteResponseErr(err) - return err + return contracts.Usage{}, err } cost, collateral := costs.Total() @@ -557,20 +538,20 @@ func (sh *SessionHandler) rpcWrite(s *session, log *zap.Logger) error { if err != nil { err := fmt.Errorf("failed to revise contract: %w", err) s.t.WriteResponseErr(err) - return err + return contracts.Usage{}, err } payment, risked, err := rhp.ValidateRevision(s.contract.Revision, revision, cost, collateral) if err != nil { err := fmt.Errorf("failed to validate revision: %w", err) s.t.WriteResponseErr(err) - return err + return contracts.Usage{}, err } contractUpdater, err := sh.contracts.ReviseContract(revision.ParentID) if err != nil { s.t.WriteResponseErr(ErrHostInternalError) - return fmt.Errorf("failed to revise contract: %w", err) + return contracts.Usage{}, fmt.Errorf("failed to revise contract: %w", err) } defer contractUpdater.Close() @@ -581,7 +562,7 @@ func (sh *SessionHandler) rpcWrite(s *session, log *zap.Logger) error { if len(action.Data) != rhpv2.SectorSize { err := fmt.Errorf("append action: invalid sector size: %v", len(action.Data)) s.t.WriteResponseErr(err) - return err + return contracts.Usage{}, err } sector := (*[rhpv2.SectorSize]byte)(action.Data) root := rhpv2.SectorRoot(sector) @@ -589,7 +570,7 @@ func (sh *SessionHandler) rpcWrite(s *session, log *zap.Logger) error { if err != nil { err := fmt.Errorf("append action: failed to write sector: %w", err) s.t.WriteResponseErr(err) - return err + return contracts.Usage{}, err } defer release() contractUpdater.AppendSector(root) @@ -597,37 +578,37 @@ func (sh *SessionHandler) rpcWrite(s *session, log *zap.Logger) error { if err := contractUpdater.TrimSectors(action.A); err != nil { err := fmt.Errorf("trim action: failed to trim sectors: %w", err) s.t.WriteResponseErr(err) - return err + return contracts.Usage{}, err } case rhpv2.RPCWriteActionSwap: if err := contractUpdater.SwapSectors(action.A, action.B); err != nil { err := fmt.Errorf("swap action: failed to swap sectors: %w", err) s.t.WriteResponseErr(err) - return err + return contracts.Usage{}, err } case rhpv2.RPCWriteActionUpdate: root, err := contractUpdater.SectorRoot(action.A) if err != nil { err := fmt.Errorf("update action: failed to get sector root: %w", err) s.t.WriteResponseErr(err) - return err + return contracts.Usage{}, err } sector, err := sh.storage.Read(root) if err != nil { s.t.WriteResponseErr(ErrHostInternalError) - return fmt.Errorf("failed to read sector %v: %w", root, err) + return contracts.Usage{}, fmt.Errorf("failed to read sector %v: %w", root, err) } i, offset := action.A, action.B if offset > rhpv2.SectorSize { err := fmt.Errorf("update action: invalid offset %v bytes", offset) s.t.WriteResponseErr(err) - return err + return contracts.Usage{}, err } else if offset+uint64(len(action.Data)) > rhpv2.SectorSize { err := errors.New("update action: offset + data exceeds sector size") s.t.WriteResponseErr(err) - return err + return contracts.Usage{}, err } copy(sector[offset:], action.Data) @@ -636,13 +617,13 @@ func (sh *SessionHandler) rpcWrite(s *session, log *zap.Logger) error { if err := contractUpdater.UpdateSector(newRoot, i); err != nil { err := fmt.Errorf("update action: failed to update sector: %w", err) s.t.WriteResponseErr(err) - return err + return contracts.Usage{}, err } release, err := sh.storage.Write(root, sector) if err != nil { err := fmt.Errorf("append action: failed to write sector: %w", err) s.t.WriteResponseErr(err) - return err + return contracts.Usage{}, err } defer release() } @@ -656,7 +637,7 @@ func (sh *SessionHandler) rpcWrite(s *session, log *zap.Logger) error { writeResp.OldSubtreeHashes, writeResp.OldLeafHashes = rhpv2.BuildDiffProof(req.Actions, oldRoots) } if err := s.writeResponse(writeResp, time.Minute); err != nil { - return fmt.Errorf("failed to write merkle proof: %w", err) + return contracts.Usage{}, fmt.Errorf("failed to write merkle proof: %w", err) } // apply the new merkle root and file size to the revision @@ -666,7 +647,7 @@ func (sh *SessionHandler) rpcWrite(s *session, log *zap.Logger) error { // read the renter's signature var renterSigResponse rhpv2.RPCWriteResponse if err := s.readResponse(&renterSigResponse, minMessageSize, 30*time.Second); err != nil { - return fmt.Errorf("failed to read renter signature: %w", err) + return contracts.Usage{}, fmt.Errorf("failed to read renter signature: %w", err) } // validate the contract signature @@ -675,7 +656,7 @@ func (sh *SessionHandler) rpcWrite(s *session, log *zap.Logger) error { if !s.contract.RenterKey().VerifyHash(sigHash, renterSigResponse.Signature) { err := fmt.Errorf("failed to verify renter signature: %w", ErrInvalidRenterSignature) s.t.WriteResponseErr(err) - return err + return contracts.Usage{}, err } hostSig := sh.privateKey.SignHash(sigHash) signedRevision := contracts.SignedRevision{ @@ -687,7 +668,7 @@ func (sh *SessionHandler) rpcWrite(s *session, log *zap.Logger) error { // sync the storage manager if err := sh.storage.Sync(); err != nil { s.t.WriteResponseErr(ErrHostInternalError) - return fmt.Errorf("failed to sync storage manager: %w", err) + return contracts.Usage{}, fmt.Errorf("failed to sync storage manager: %w", err) } // adjust the revenue to account for the full transfer by the renter @@ -700,48 +681,46 @@ func (sh *SessionHandler) rpcWrite(s *session, log *zap.Logger) error { costs.Collateral = risked // commit the contract modifications - if err := contractUpdater.Commit(signedRevision, costs.ToUsage()); err != nil { + usage := costs.ToUsage() + if err := contractUpdater.Commit(signedRevision, usage); err != nil { s.t.WriteResponseErr(ErrHostInternalError) - return fmt.Errorf("failed to commit contract modifications: %w", err) + return contracts.Usage{}, fmt.Errorf("failed to commit contract modifications: %w", err) } // update the session contract s.contract = signedRevision // send the host signature hostSigResp := &rhpv2.RPCWriteResponse{Signature: hostSig} - if err := s.writeResponse(hostSigResp, 30*time.Second); err != nil { - return fmt.Errorf("failed to write host signature: %w", err) - } - return nil + return usage, s.writeResponse(hostSigResp, 30*time.Second) } -func (sh *SessionHandler) rpcRead(s *session, log *zap.Logger) error { +func (sh *SessionHandler) rpcRead(s *session, log *zap.Logger) (contracts.Usage, error) { currentHeight := sh.cm.TipState().Index.Height // get the locked contract and check that it is revisable if err := s.ContractRevisable(currentHeight); err != nil { err := fmt.Errorf("contract not revisable: %w", err) s.t.WriteResponseErr(err) - return err + return contracts.Usage{}, err } // get the host's current settings settings, err := sh.Settings() if err != nil { s.t.WriteResponseErr(ErrHostInternalError) - return fmt.Errorf("failed to get host settings: %w", err) + return contracts.Usage{}, fmt.Errorf("failed to get host settings: %w", err) } // read the read request var req rhpv2.RPCReadRequest if err := s.readRequest(&req, 4*minMessageSize, time.Minute); err != nil { - return fmt.Errorf("failed to read read request: %w", err) + return contracts.Usage{}, fmt.Errorf("failed to read read request: %w", err) } // validate the request sections and calculate the cost costs, err := validateReadActions(req.Sections, req.MerkleProof, settings) if err != nil { s.t.WriteResponseErr(err) - return fmt.Errorf("failed to validate read request: %w", err) + return contracts.Usage{}, fmt.Errorf("failed to validate read request: %w", err) } cost, _ := costs.Total() @@ -750,7 +729,7 @@ func (sh *SessionHandler) rpcRead(s *session, log *zap.Logger) error { if err != nil { err := fmt.Errorf("failed to revise contract: %w", err) s.t.WriteResponseErr(err) - return err + return contracts.Usage{}, err } // validate the renter's signature and transfer @@ -758,14 +737,14 @@ func (sh *SessionHandler) rpcRead(s *session, log *zap.Logger) error { if !s.contract.RenterKey().VerifyHash(sigHash, req.Signature) { err := fmt.Errorf("failed to validate revision: %w", ErrInvalidRenterSignature) s.t.WriteResponseErr(err) - return err + return contracts.Usage{}, err } payment, _, err := rhp.ValidateRevision(s.contract.Revision, revision, cost, types.ZeroCurrency) if err != nil { err := fmt.Errorf("failed to validate revision: %w", err) s.t.WriteResponseErr(err) - return err + return contracts.Usage{}, err } // sign and commit the new revision @@ -779,7 +758,7 @@ func (sh *SessionHandler) rpcRead(s *session, log *zap.Logger) error { updater, err := sh.contracts.ReviseContract(revision.ParentID) if err != nil { s.t.WriteResponseErr(ErrHostInternalError) - return fmt.Errorf("failed to revise contract: %w", err) + return contracts.Usage{}, fmt.Errorf("failed to revise contract: %w", err) } defer updater.Close() @@ -788,15 +767,14 @@ func (sh *SessionHandler) rpcRead(s *session, log *zap.Logger) error { if !underflow { costs.Egress = costs.Egress.Add(excess) } + usage := costs.ToUsage() // commit the contract revision - if err := updater.Commit(signedRevision, costs.ToUsage()); err != nil { + if err := updater.Commit(signedRevision, usage); err != nil { s.t.WriteResponseErr(ErrHostInternalError) - return fmt.Errorf("failed to commit contract revision: %w", err) + return contracts.Usage{}, fmt.Errorf("failed to commit contract revision: %w", err) } // update the session contract s.contract = signedRevision - // add the cost to the amount spent - s.Spend(cost) // listen for RPCLoopReadStop stopSignal := make(chan error, 1) @@ -819,7 +797,7 @@ func (sh *SessionHandler) rpcRead(s *session, log *zap.Logger) error { if err != nil { err := fmt.Errorf("failed to get sector: %w", err) s.t.WriteResponseErr(err) - return err + return usage, err } resp := &rhpv2.RPCReadResponse{ @@ -835,10 +813,10 @@ func (sh *SessionHandler) rpcRead(s *session, log *zap.Logger) error { select { case err := <-stopSignal: if err != nil { - return err + return usage, err } resp.Signature = hostSig - return s.writeResponse(resp, 30*time.Second) + return usage, s.writeResponse(resp, 30*time.Second) default: } @@ -846,10 +824,10 @@ func (sh *SessionHandler) rpcRead(s *session, log *zap.Logger) error { resp.Signature = hostSig } if err := s.writeResponse(resp, 30*time.Second); err != nil { - return fmt.Errorf("failed to write read response: %w", err) + return usage, fmt.Errorf("failed to write read response: %w", err) } } - return <-stopSignal + return usage, <-stopSignal } func convertToPublicKey(uc types.UnlockKey) (types.PublicKey, error) { diff --git a/rhp/v2/session.go b/rhp/v2/session.go index db70c891..5a814e57 100644 --- a/rhp/v2/session.go +++ b/rhp/v2/session.go @@ -16,14 +16,11 @@ const minMessageSize = 4096 // A session is an ongoing exchange of RPCs via the renter-host protocol. type session struct { + id rhp.UID conn *rhp.Conn t *rhpv2.Transport contract contracts.SignedRevision - - uid UniqueID - spent types.Currency - metrics MetricReporter } func (s *session) readRequest(req rhpv2.ProtocolObject, maxSize uint64, timeout time.Duration) error { @@ -41,11 +38,6 @@ func (s *session) writeResponse(resp rhpv2.ProtocolObject, timeout time.Duration return s.t.WriteResponse(resp) } -// spend increments the session's spent amount -func (s *session) Spend(n types.Currency) { - s.spent = s.spent.Add(n) -} - // ContractRevisable returns an error if a contract is not locked or can't be // revised. A contract is revisable if the revision number is not the max uint64 // value and it is not close to the proof window. diff --git a/rhp/v3/execute.go b/rhp/v3/execute.go index 04b8f5a8..299caa3c 100644 --- a/rhp/v3/execute.go +++ b/rhp/v3/execute.go @@ -89,7 +89,7 @@ func (pe *programExecutor) payForExecution(cost rhpv3.ResourceCost, usage accoun return nil } -func (pe *programExecutor) executeAppendSector(instr *rhpv3.InstrAppendSector) ([]byte, []types.Hash256, error) { +func (pe *programExecutor) executeAppendSector(instr *rhpv3.InstrAppendSector, log *zap.Logger) ([]byte, []types.Hash256, error) { root, sector, err := pe.programData.Sector(instr.SectorDataOffset) if err != nil { return nil, nil, fmt.Errorf("failed to read sector: %w", err) @@ -110,12 +110,15 @@ func (pe *programExecutor) executeAppendSector(instr *rhpv3.InstrAppendSector) ( if !instr.ProofRequired { return nil, nil, nil } + + proofStart := time.Now() roots := pe.updater.SectorRoots() proof, _ := rhpv2.BuildDiffProof([]rhpv2.RPCWriteAction{{Type: rhpv2.RPCWriteActionAppend}}, roots[:len(roots)-1]) // TODO: add rhp3 proof methods + log.Debug("built proof", zap.Duration("duration", time.Since(proofStart))) return nil, proof, nil } -func (pe *programExecutor) executeAppendSectorRoot(instr *rhpv3.InstrAppendSectorRoot) ([]byte, []types.Hash256, error) { +func (pe *programExecutor) executeAppendSectorRoot(instr *rhpv3.InstrAppendSectorRoot, log *zap.Logger) ([]byte, []types.Hash256, error) { root, err := pe.programData.Hash(instr.MerkleRootOffset) if err != nil { return nil, nil, fmt.Errorf("failed to read sector root: %w", err) @@ -136,12 +139,14 @@ func (pe *programExecutor) executeAppendSectorRoot(instr *rhpv3.InstrAppendSecto if !instr.ProofRequired { return nil, nil, nil } + proofStart := time.Now() roots := pe.updater.SectorRoots() proof, _ := rhpv2.BuildDiffProof([]rhpv2.RPCWriteAction{{Type: rhpv2.RPCWriteActionAppend}}, roots[:len(roots)-1]) // TODO: add rhp3 proof methods + log.Debug("built proof", zap.Duration("duration", time.Since(proofStart))) return nil, proof, nil } -func (pe *programExecutor) executeDropSectors(instr *rhpv3.InstrDropSectors) ([]byte, []types.Hash256, error) { +func (pe *programExecutor) executeDropSectors(instr *rhpv3.InstrDropSectors, log *zap.Logger) ([]byte, []types.Hash256, error) { count, err := pe.programData.Uint64(instr.SectorCountOffset) if err != nil { return nil, nil, fmt.Errorf("failed to read sector count: %w", err) @@ -155,7 +160,9 @@ func (pe *programExecutor) executeDropSectors(instr *rhpv3.InstrDropSectors) ([] // construct the proof before updating the roots var proof []types.Hash256 if instr.ProofRequired { + proofStart := time.Now() proof = rhpv2.BuildSectorRangeProof(pe.updater.SectorRoots(), pe.updater.SectorCount()-count, pe.updater.SectorCount()) // TODO: add rhp3 proof methods + log.Debug("built proof", zap.Duration("duration", time.Since(proofStart))) } // trim the sectors @@ -192,7 +199,7 @@ func (pe *programExecutor) executeHasSector(instr *rhpv3.InstrHasSector) ([]byte return output, nil, nil } -func (pe *programExecutor) executeReadOffset(instr *rhpv3.InstrReadOffset) ([]byte, []types.Hash256, error) { +func (pe *programExecutor) executeReadOffset(instr *rhpv3.InstrReadOffset, log *zap.Logger) ([]byte, []types.Hash256, error) { offset, err := pe.programData.Uint64(instr.OffsetOffset) if err != nil { return nil, nil, fmt.Errorf("failed to read offset: %w", err) @@ -225,13 +232,15 @@ func (pe *programExecutor) executeReadOffset(instr *rhpv3.InstrReadOffset) ([]by return sector[relOffset : relOffset+length], nil, nil } + proofStartTime := time.Now() proofStart := relOffset / rhpv2.LeafSize proofEnd := (relOffset + length) / rhpv2.LeafSize proof := rhpv2.BuildProof(sector, proofStart, proofEnd, nil) + log.Debug("built proof", zap.Duration("duration", time.Since(proofStartTime))) return sector[relOffset : relOffset+length], proof, nil } -func (pe *programExecutor) executeReadSector(instr *rhpv3.InstrReadSector) ([]byte, []types.Hash256, error) { +func (pe *programExecutor) executeReadSector(instr *rhpv3.InstrReadSector, log *zap.Logger) ([]byte, []types.Hash256, error) { root, err := pe.programData.Hash(instr.MerkleRootOffset) if err != nil { return nil, nil, fmt.Errorf("failed to read sector root: %w", err) @@ -270,13 +279,16 @@ func (pe *programExecutor) executeReadSector(instr *rhpv3.InstrReadSector) ([]by if !instr.ProofRequired { return sector[offset : offset+length], nil, nil } + + proofStartTime := time.Now() proofStart := offset / rhpv2.LeafSize proofEnd := (offset + length) / rhpv2.LeafSize proof := rhpv2.BuildProof(sector, proofStart, proofEnd, nil) + log.Debug("built proof", zap.Duration("duration", time.Since(proofStartTime))) return sector[offset : offset+length], proof, nil } -func (pe *programExecutor) executeSwapSector(instr *rhpv3.InstrSwapSector) ([]byte, []types.Hash256, error) { +func (pe *programExecutor) executeSwapSector(instr *rhpv3.InstrSwapSector, log *zap.Logger) ([]byte, []types.Hash256, error) { // read the swap params a, err := pe.programData.Uint64(instr.Sector1Offset) if err != nil { @@ -298,6 +310,7 @@ func (pe *programExecutor) executeSwapSector(instr *rhpv3.InstrSwapSector) ([]by var output []byte var proof []types.Hash256 if instr.ProofRequired { + proofStart := time.Now() var oldLeafHashes []types.Hash256 // build the proof before updating the roots proof, oldLeafHashes = rhpv2.BuildDiffProof([]rhpv2.RPCWriteAction{{Type: rhpv2.RPCWriteActionSwap, A: a, B: b}}, pe.updater.SectorRoots()) // TODO: add rhp3 proof methods @@ -312,6 +325,7 @@ func (pe *programExecutor) executeSwapSector(instr *rhpv3.InstrSwapSector) ([]by return nil, nil, fmt.Errorf("failed to encode old leaf hashes: %w", err) } output = buf.Bytes() + log.Debug("built proof", zap.Duration("duration", time.Since(proofStart))) } if err := pe.updater.SwapSectors(a, b); err != nil { @@ -547,7 +561,7 @@ func (pe *programExecutor) executeProgram(ctx context.Context) <-chan rhpv3.RPCE var output []byte var proof []types.Hash256 var err error - for _, instruction := range pe.instructions { + for i, instruction := range pe.instructions { select { case <-ctx.Done(): outputs <- pe.instructionOutput(nil, nil, ctx.Err()) @@ -555,23 +569,25 @@ func (pe *programExecutor) executeProgram(ctx context.Context) <-chan rhpv3.RPCE default: } + log := pe.log.Named(instrLabel(instruction)).With(zap.Int("instruction", i+1), zap.Int("total", len(pe.instructions))) + start := time.Now() // execute the instruction switch instr := instruction.(type) { case *rhpv3.InstrAppendSector: - output, proof, err = pe.executeAppendSector(instr) + output, proof, err = pe.executeAppendSector(instr, log) case *rhpv3.InstrAppendSectorRoot: - output, proof, err = pe.executeAppendSectorRoot(instr) + output, proof, err = pe.executeAppendSectorRoot(instr, log) case *rhpv3.InstrDropSectors: - output, proof, err = pe.executeDropSectors(instr) + output, proof, err = pe.executeDropSectors(instr, log) case *rhpv3.InstrHasSector: output, proof, err = pe.executeHasSector(instr) case *rhpv3.InstrReadOffset: - output, proof, err = pe.executeReadOffset(instr) + output, proof, err = pe.executeReadOffset(instr, log) case *rhpv3.InstrReadSector: - output, proof, err = pe.executeReadSector(instr) + output, proof, err = pe.executeReadSector(instr, log) case *rhpv3.InstrSwapSector: - output, proof, err = pe.executeSwapSector(instr) + output, proof, err = pe.executeSwapSector(instr, log) case *rhpv3.InstrUpdateSector: output, proof, err = pe.executeUpdateSector(instr) case *rhpv3.InstrStoreSector: @@ -596,7 +612,7 @@ func (pe *programExecutor) executeProgram(ctx context.Context) <-chan rhpv3.RPCE outputs <- pe.instructionOutput(nil, nil, fmt.Errorf("failed to execute instruction %q: %w", instrLabel(instruction), err)) return } - pe.log.Debug("executed instruction", zap.String("instruction", instrLabel(instruction)), zap.Duration("elapsed", time.Since(start))) + log.Debug("executed instruction", zap.Duration("elapsed", time.Since(start))) outputs <- pe.instructionOutput(output, proof, err) } }() @@ -637,6 +653,9 @@ func (pe *programExecutor) rollback() error { if err := pe.budget.Commit(); err != nil { return fmt.Errorf("failed to commit budget: %w", err) } + // zero out the usage + pe.usage.StorageRevenue = types.ZeroCurrency + pe.cost.Collateral = types.ZeroCurrency return nil } @@ -818,10 +837,21 @@ func (pe *programExecutor) Execute(ctx context.Context, s *rhpv3.Stream) error { if err := pe.commit(s); err != nil { return fmt.Errorf("failed to commit program: %w", err) } - return nil } +// Usage returns the program's usage. +func (pe *programExecutor) Usage() (usage contracts.Usage) { + usage.RPCRevenue = pe.usage.RPCRevenue + usage.StorageRevenue = pe.usage.StorageRevenue + usage.IngressRevenue = pe.usage.IngressRevenue + usage.EgressRevenue = pe.usage.EgressRevenue + usage.RegistryRead = pe.usage.RegistryRead + usage.RegistryWrite = pe.usage.RegistryWrite + usage.RiskedCollateral = pe.cost.Collateral + return usage +} + func (sh *SessionHandler) newExecutor(instructions []rhpv3.Instruction, data []byte, pt rhpv3.HostPriceTable, budget *accounts.Budget, revision *contracts.SignedRevision, finalize bool, log *zap.Logger) (*programExecutor, error) { ex := &programExecutor{ hostKey: sh.privateKey, diff --git a/rhp/v3/rhp.go b/rhp/v3/rhp.go index bcde5d3a..bae35a6b 100644 --- a/rhp/v3/rhp.go +++ b/rhp/v3/rhp.go @@ -13,7 +13,6 @@ import ( "go.sia.tech/core/types" "go.sia.tech/hostd/host/accounts" "go.sia.tech/hostd/host/contracts" - "go.sia.tech/hostd/host/financials" "go.sia.tech/hostd/host/settings" "go.sia.tech/hostd/host/storage" "go.sia.tech/hostd/internal/threadgroup" @@ -108,14 +107,10 @@ type ( BandwidthLimiters() (ingress, egress *rate.Limiter) } - // MetricReporter records metrics from the host - MetricReporter interface { - Report(any) error - } - - // A FinancialReporter records financial transactions on the host. - FinancialReporter interface { - Add(financials.Record) error + // SessionReporter reports session metrics + SessionReporter interface { + StartSession(conn *rhp.Conn, proto string, version int) (sessionID rhp.UID, end func()) + StartRPC(sessionID rhp.UID, rpc types.Specifier) (rpcID rhp.UID, end func(contracts.Usage, error)) } // A SessionHandler handles the host side of the renter-host protocol and @@ -129,7 +124,7 @@ type ( accounts AccountManager contracts ContractManager - metrics MetricReporter + sessions SessionReporter registry RegistryManager storage StorageManager log *zap.Logger @@ -176,7 +171,7 @@ var ( ) // handleHostStream handles streams routed to the "host" subscriber -func (sh *SessionHandler) handleHostStream(remoteAddr string, s *rhpv3.Stream) { +func (sh *SessionHandler) handleHostStream(s *rhpv3.Stream, sessionID rhp.UID, log *zap.Logger) { defer s.Close() // close the stream when the RPC has completed done, err := sh.tg.Add() // add the RPC to the threadgroup @@ -186,13 +181,12 @@ func (sh *SessionHandler) handleHostStream(remoteAddr string, s *rhpv3.Stream) { defer done() s.SetDeadline(time.Now().Add(30 * time.Second)) // set an initial timeout - rpcID, err := s.ReadID() + rpc, err := s.ReadID() if err != nil { - sh.log.Debug("failed to read RPC ID", zap.Error(err)) + log.Debug("failed to read RPC ID", zap.Error(err)) return } - - rpcs := map[types.Specifier]func(*rhpv3.Stream, *zap.Logger) error{ + rpcs := map[types.Specifier]func(*rhpv3.Stream, *zap.Logger) (contracts.Usage, error){ rhpv3.RPCAccountBalanceID: sh.handleRPCAccountBalance, rhpv3.RPCUpdatePriceTableID: sh.handleRPCPriceTable, rhpv3.RPCExecuteProgramID: sh.handleRPCExecute, @@ -200,20 +194,24 @@ func (sh *SessionHandler) handleHostStream(remoteAddr string, s *rhpv3.Stream) { rhpv3.RPCLatestRevisionID: sh.handleRPCLatestRevision, rhpv3.RPCRenewContractID: sh.handleRPCRenew, } - rpcFn, ok := rpcs[rpcID] + rpcFn, ok := rpcs[rpc] if !ok { - sh.log.Debug("unrecognized RPC ID", zap.String("rpc", rpcID.String())) + log.Debug("unrecognized RPC ID", zap.String("rpc", rpc.String())) return } - log := sh.log.Named(rpcID.String()).With(zap.String("peerAddr", remoteAddr)) - start := time.Now() + rpcStart := time.Now() s.SetDeadline(time.Now().Add(time.Minute)) // set the initial deadline, may be overwritten by the handler - if err = rpcFn(s, log); err != nil { - log.Warn("RPC failed", zap.Error(err), zap.Duration("elapsed", time.Since(start))) + + rpcID, end := sh.sessions.StartRPC(sessionID, rpc) + log = log.Named(rpc.String()).With(zap.Stringer("rpcID", rpcID)) + usage, err := rpcFn(s, log) + end(usage, err) + if err != nil { + log.Warn("RPC failed", zap.Error(err), zap.Duration("elapsed", time.Since(rpcStart))) return } - log.Info("RPC success", zap.Duration("elapsed", time.Since(start))) + log.Info("RPC success", zap.Duration("elapsed", time.Since(rpcStart))) } // HostKey returns the host's ed25519 public key @@ -239,10 +237,22 @@ func (sh *SessionHandler) Serve() error { go func() { defer conn.Close() + + // wrap the conn with the bandwidth limiters ingress, egress := sh.settings.BandwidthLimiters() - t, err := rhpv3.NewHostTransport(rhp.NewConn(conn, sh.monitor, ingress, egress), sh.privateKey) + rhpConn := rhp.NewConn(conn, sh.monitor, ingress, egress) + defer rhpConn.Close() + + // initiate the session + sessionID, end := sh.sessions.StartSession(rhpConn, rhp.SessionProtocolTCP, 3) + defer end() + + log := sh.log.With(zap.Stringer("sessionID", sessionID), zap.String("peerAddress", conn.RemoteAddr().String())) + + // upgrade the connection to RHP3 + t, err := rhpv3.NewHostTransport(rhpConn, sh.privateKey) if err != nil { - sh.log.Debug("failed to upgrade conn", zap.Error(err), zap.String("remoteAddress", conn.RemoteAddr().String())) + log.Debug("failed to upgrade conn", zap.Error(err)) return } defer t.Close() @@ -251,11 +261,12 @@ func (sh *SessionHandler) Serve() error { stream, err := t.AcceptStream() if err != nil { if !isStreamClosedErr(err) { - sh.log.Debug("failed to accept stream", zap.Error(err), zap.String("remoteAddress", conn.RemoteAddr().String())) + log.Debug("failed to accept stream", zap.Error(err)) } return } - go sh.handleHostStream(conn.RemoteAddr().String(), stream) + + go sh.handleHostStream(stream, sessionID, log) } }() } @@ -267,7 +278,7 @@ func (sh *SessionHandler) LocalAddr() string { } // NewSessionHandler creates a new SessionHandler -func NewSessionHandler(l net.Listener, hostKey types.PrivateKey, chain ChainManager, tpool TransactionPool, wallet Wallet, accounts AccountManager, contracts ContractManager, registry RegistryManager, storage StorageManager, settings SettingsReporter, monitor rhp.DataMonitor, metrics MetricReporter, log *zap.Logger) (*SessionHandler, error) { +func NewSessionHandler(l net.Listener, hostKey types.PrivateKey, chain ChainManager, tpool TransactionPool, wallet Wallet, accounts AccountManager, contracts ContractManager, registry RegistryManager, storage StorageManager, settings SettingsReporter, monitor rhp.DataMonitor, sessions SessionReporter, log *zap.Logger) (*SessionHandler, error) { sh := &SessionHandler{ privateKey: hostKey, @@ -281,7 +292,7 @@ func NewSessionHandler(l net.Listener, hostKey types.PrivateKey, chain ChainMana accounts: accounts, contracts: contracts, - metrics: metrics, + sessions: sessions, registry: registry, settings: settings, storage: storage, diff --git a/rhp/v3/rpc.go b/rhp/v3/rpc.go index 06418ef8..1ac8aa85 100644 --- a/rhp/v3/rpc.go +++ b/rhp/v3/rpc.go @@ -43,69 +43,67 @@ var ( ) // handleRPCPriceTable sends the host's price table to the renter. -func (sh *SessionHandler) handleRPCPriceTable(s *rhpv3.Stream, log *zap.Logger) error { +func (sh *SessionHandler) handleRPCPriceTable(s *rhpv3.Stream, log *zap.Logger) (contracts.Usage, error) { pt, err := sh.PriceTable() if err != nil { s.WriteResponseErr(ErrHostInternalError) - return fmt.Errorf("failed to get price table: %w", err) + return contracts.Usage{}, fmt.Errorf("failed to get price table: %w", err) } buf, err := json.Marshal(pt) if err != nil { s.WriteResponseErr(ErrHostInternalError) - return fmt.Errorf("failed to marshal price table: %w", err) + return contracts.Usage{}, fmt.Errorf("failed to marshal price table: %w", err) } resp := &rhpv3.RPCUpdatePriceTableResponse{ PriceTableJSON: buf, } if err := s.WriteResponse(resp); err != nil { - return fmt.Errorf("failed to send price table: %w", err) + return contracts.Usage{}, fmt.Errorf("failed to send price table: %w", err) } // process the payment, catch connection closed errors since the renter // likely did not intend to pay budget, err := sh.processPayment(s, &pt) if isNonPaymentErr(err) { - return nil + return contracts.Usage{}, nil } else if err != nil { err = fmt.Errorf("failed to process payment: %w", err) s.WriteResponseErr(err) - return err + return contracts.Usage{}, err } defer budget.Rollback() if err := budget.Spend(accounts.Usage{RPCRevenue: pt.UpdatePriceTableCost}); err != nil { err = fmt.Errorf("failed to pay %v for price table: %w", pt.UpdatePriceTableCost, err) s.WriteResponseErr(err) - return err - } - - if err := budget.Commit(); err != nil { + return contracts.Usage{}, err + } else if err := budget.Commit(); err != nil { s.WriteResponseErr(ErrHostInternalError) - return fmt.Errorf("failed to commit payment: %w", err) + return contracts.Usage{}, fmt.Errorf("failed to commit payment: %w", err) } // register the price table for future use sh.priceTables.Register(pt) - if err := s.WriteResponse(&rhpv3.RPCPriceTableResponse{}); err != nil { - return fmt.Errorf("failed to send tracking response: %w", err) + usage := contracts.Usage{ + RPCRevenue: pt.UpdatePriceTableCost, } - return nil + return usage, s.WriteResponse(&rhpv3.RPCPriceTableResponse{}) } -func (sh *SessionHandler) handleRPCFundAccount(s *rhpv3.Stream, log *zap.Logger) error { +func (sh *SessionHandler) handleRPCFundAccount(s *rhpv3.Stream, log *zap.Logger) (contracts.Usage, error) { s.SetDeadline(time.Now().Add(time.Minute)) // read the price table ID from the stream pt, err := sh.readPriceTable(s) if err != nil { err = fmt.Errorf("failed to read price table: %w", err) s.WriteResponseErr(err) - return err + return contracts.Usage{}, err } // read the fund request from the stream var fundReq rhpv3.RPCFundAccountRequest if err := s.ReadRequest(&fundReq, 32); err != nil { - return fmt.Errorf("failed to read fund account request: %w", err) + return contracts.Usage{}, fmt.Errorf("failed to read fund account request: %w", err) } // process the payment for funding the account @@ -113,7 +111,7 @@ func (sh *SessionHandler) handleRPCFundAccount(s *rhpv3.Stream, log *zap.Logger) if err != nil { err = fmt.Errorf("failed to process payment: %w", err) s.WriteResponseErr(err) - return err + return contracts.Usage{}, err } fundResp := &rhpv3.RPCFundAccountResponse{ @@ -129,21 +127,21 @@ func (sh *SessionHandler) handleRPCFundAccount(s *rhpv3.Stream, log *zap.Logger) fundResp.Receipt.EncodeTo(h.E) fundResp.Signature = sh.privateKey.SignHash(h.Sum()) - // send the response - if err := s.WriteResponse(fundResp); err != nil { - return fmt.Errorf("failed to send fund account response: %w", err) + usage := contracts.Usage{ + RPCRevenue: pt.FundAccountCost, + AccountFunding: fundAmount, } - return nil + return usage, s.WriteResponse(fundResp) } -func (sh *SessionHandler) handleRPCAccountBalance(s *rhpv3.Stream, log *zap.Logger) error { +func (sh *SessionHandler) handleRPCAccountBalance(s *rhpv3.Stream, log *zap.Logger) (contracts.Usage, error) { s.SetDeadline(time.Now().Add(time.Minute)) // get the price table to use for payment pt, err := sh.readPriceTable(s) if err != nil { err = fmt.Errorf("failed to read price table: %w", err) s.WriteResponseErr(err) - return err + return contracts.Usage{}, err } // read the payment from the stream @@ -151,7 +149,7 @@ func (sh *SessionHandler) handleRPCAccountBalance(s *rhpv3.Stream, log *zap.Logg if err != nil { err = fmt.Errorf("failed to process payment: %w", err) s.WriteResponseErr(err) - return err + return contracts.Usage{}, err } defer budget.Rollback() @@ -159,88 +157,92 @@ func (sh *SessionHandler) handleRPCAccountBalance(s *rhpv3.Stream, log *zap.Logg if err := budget.Spend(accounts.Usage{RPCRevenue: pt.AccountBalanceCost}); err != nil { err = fmt.Errorf("failed to pay %v for account balance: %w", pt.AccountBalanceCost, err) s.WriteResponseErr(err) - return err + return contracts.Usage{}, err } // read the account balance request from the stream var req rhpv3.RPCAccountBalanceRequest if err := s.ReadRequest(&req, 32); err != nil { - return fmt.Errorf("failed to read account balance request: %w", err) + return contracts.Usage{}, fmt.Errorf("failed to read account balance request: %w", err) } // get the account balance balance, err := sh.accounts.Balance(req.Account) if err != nil { s.WriteResponseErr(ErrHostInternalError) - return fmt.Errorf("failed to get account balance: %w", err) + return contracts.Usage{}, fmt.Errorf("failed to get account balance: %w", err) } resp := &rhpv3.RPCAccountBalanceResponse{ Balance: balance, } if err := budget.Commit(); err != nil { - return fmt.Errorf("failed to commit payment: %w", err) - } else if err := s.WriteResponse(resp); err != nil { - return fmt.Errorf("failed to send account balance response: %w", err) + return contracts.Usage{}, fmt.Errorf("failed to commit payment: %w", err) } - return nil + usage := contracts.Usage{ + RPCRevenue: pt.AccountBalanceCost, + } + return usage, s.WriteResponse(resp) } -func (sh *SessionHandler) handleRPCLatestRevision(s *rhpv3.Stream, log *zap.Logger) error { +func (sh *SessionHandler) handleRPCLatestRevision(s *rhpv3.Stream, log *zap.Logger) (contracts.Usage, error) { s.SetDeadline(time.Now().Add(time.Minute)) var req rhpv3.RPCLatestRevisionRequest if err := s.ReadRequest(&req, maxRequestSize); err != nil { - return fmt.Errorf("failed to read latest revision request: %w", err) + return contracts.Usage{}, fmt.Errorf("failed to read latest revision request: %w", err) } contract, err := sh.contracts.Contract(req.ContractID) if err != nil { err := fmt.Errorf("failed to get contract %q: %w", req.ContractID, err) s.WriteResponseErr(err) - return err + return contracts.Usage{}, err } resp := &rhpv3.RPCLatestRevisionResponse{ Revision: contract.Revision, } if err := s.WriteResponse(resp); err != nil { - return fmt.Errorf("failed to send latest revision response: %w", err) + return contracts.Usage{}, fmt.Errorf("failed to send latest revision response: %w", err) } pt, err := sh.readPriceTable(s) if isNonPaymentErr(err) { - return nil + return contracts.Usage{}, nil } else if err != nil { err = fmt.Errorf("failed to read price table: %w", err) s.WriteResponseErr(err) - return err + return contracts.Usage{}, err } budget, err := sh.processPayment(s, &pt) if isNonPaymentErr(err) { - return nil + return contracts.Usage{}, nil } else if err != nil { err = fmt.Errorf("failed to process payment: %w", err) s.WriteResponseErr(err) - return err + return contracts.Usage{}, err } defer budget.Rollback() if err := budget.Spend(accounts.Usage{RPCRevenue: pt.LatestRevisionCost}); err != nil { err = fmt.Errorf("failed to pay %v for latest revision: %w", pt.LatestRevisionCost, err) s.WriteResponseErr(err) - return err + return contracts.Usage{}, err } else if err := budget.Commit(); err != nil { - return fmt.Errorf("failed to commit payment: %w", err) + return contracts.Usage{}, fmt.Errorf("failed to commit payment: %w", err) } - return nil + usage := contracts.Usage{ + RPCRevenue: pt.LatestRevisionCost, + } + return usage, nil } -func (sh *SessionHandler) handleRPCRenew(s *rhpv3.Stream, log *zap.Logger) error { +func (sh *SessionHandler) handleRPCRenew(s *rhpv3.Stream, log *zap.Logger) (contracts.Usage, error) { s.SetDeadline(time.Now().Add(2 * time.Minute)) if !sh.settings.Settings().AcceptingContracts { s.WriteResponseErr(ErrNotAcceptingContracts) - return ErrNotAcceptingContracts + return contracts.Usage{}, ErrNotAcceptingContracts } pt, err := sh.readPriceTable(s) if errors.Is(err, ErrNoPriceTable) { @@ -248,34 +250,34 @@ func (sh *SessionHandler) handleRPCRenew(s *rhpv3.Stream, log *zap.Logger) error pt, err = sh.PriceTable() if err != nil { s.WriteResponseErr(ErrHostInternalError) - return fmt.Errorf("failed to get price table: %w", err) + return contracts.Usage{}, fmt.Errorf("failed to get price table: %w", err) } buf, err := json.Marshal(pt) if err != nil { s.WriteResponseErr(ErrHostInternalError) - return fmt.Errorf("failed to marshal price table: %w", err) + return contracts.Usage{}, fmt.Errorf("failed to marshal price table: %w", err) } ptResp := &rhpv3.RPCUpdatePriceTableResponse{ PriceTableJSON: buf, } if err := s.WriteResponse(ptResp); err != nil { - return fmt.Errorf("failed to send price table response: %w", err) + return contracts.Usage{}, fmt.Errorf("failed to send price table response: %w", err) } } else if err != nil { - return fmt.Errorf("failed to read price table: %w", err) + return contracts.Usage{}, fmt.Errorf("failed to read price table: %w", err) } var req rhpv3.RPCRenewContractRequest if err := s.ReadRequest(&req, 10*maxRequestSize); err != nil { - return fmt.Errorf("failed to read renew contract request: %w", err) + return contracts.Usage{}, fmt.Errorf("failed to read renew contract request: %w", err) } else if err := validRenewalTxnSet(req.TransactionSet); err != nil { err = fmt.Errorf("invalid renewal transaction set: %w", err) s.WriteResponseErr(err) - return err + return contracts.Usage{}, err } else if req.RenterKey.Algorithm != types.SpecifierEd25519 || len(req.RenterKey.Key) != ed25519.PublicKeySize { err = errors.New("renter key must be an ed25519 public key") s.WriteResponseErr(err) - return err + return contracts.Usage{}, err } renterKey := *(*types.PublicKey)(req.RenterKey.Key) @@ -293,7 +295,7 @@ func (sh *SessionHandler) handleRPCRenew(s *rhpv3.Stream, log *zap.Logger) error if err != nil { err := fmt.Errorf("failed to lock contract %v: %w", clearingRevision.ParentID, err) s.WriteResponseErr(err) - return err + return contracts.Usage{}, err } defer sh.contracts.Unlock(clearingRevision.ParentID) @@ -302,13 +304,13 @@ func (sh *SessionHandler) handleRPCRenew(s *rhpv3.Stream, log *zap.Logger) error if err != nil { err := fmt.Errorf("failed to validate clearing revision: %w", err) s.WriteResponseErr(err) - return err + return contracts.Usage{}, err } finalRevisionSigHash := hashFinalRevision(clearingRevision, renewal) if !existing.RenterKey().VerifyHash(finalRevisionSigHash, req.FinalRevisionSignature) { // important to verify using the existing contract's renter key err := fmt.Errorf("failed to verify final revision signature: %w", ErrInvalidRenterSignature) s.WriteResponseErr(err) - return err + return contracts.Usage{}, err } // sign the clearing revision signedClearingRevision := contracts.SignedRevision{ @@ -332,13 +334,13 @@ func (sh *SessionHandler) handleRPCRenew(s *rhpv3.Stream, log *zap.Logger) error if err != nil { err := fmt.Errorf("failed to validate renewal: %w", err) s.WriteResponseErr(err) - return err + return contracts.Usage{}, err } renterInputs, renterOutputs := len(renewalTxn.SiacoinInputs), len(renewalTxn.SiacoinOutputs) toSign, release, err := sh.wallet.FundTransaction(&renewalTxn, lockedCollateral) if err != nil { s.WriteResponseErr(fmt.Errorf("failed to fund renewal transaction: %w", ErrHostInternalError)) - return fmt.Errorf("failed to fund renewal transaction: %w", err) + return contracts.Usage{}, fmt.Errorf("failed to fund renewal transaction: %w", err) } defer release() @@ -348,12 +350,12 @@ func (sh *SessionHandler) handleRPCRenew(s *rhpv3.Stream, log *zap.Logger) error FinalRevisionSignature: signedClearingRevision.HostSignature, } if err := s.WriteResponse(hostAdditions); err != nil { - return fmt.Errorf("failed to write host additions: %w", err) + return contracts.Usage{}, fmt.Errorf("failed to write host additions: %w", err) } var renterSigsResp rhpv3.RPCRenewSignatures if err := s.ReadRequest(&renterSigsResp, 10*maxRequestSize); err != nil { - return fmt.Errorf("failed to read renter signatures: %w", err) + return contracts.Usage{}, fmt.Errorf("failed to read renter signatures: %w", err) } // create the initial revision and verify the renter's signature @@ -362,7 +364,7 @@ func (sh *SessionHandler) handleRPCRenew(s *rhpv3.Stream, log *zap.Logger) error if err := validateRenterRevisionSignature(renterSigsResp.RevisionSignature, renewalRevision.ParentID, renewalSigHash, renterKey); err != nil { err := fmt.Errorf("failed to verify renter revision signature: %w", ErrInvalidRenterSignature) s.WriteResponseErr(err) - return err + return contracts.Usage{}, err } signedRenewal := contracts.SignedRevision{ Revision: renewalRevision, @@ -395,13 +397,13 @@ func (sh *SessionHandler) handleRPCRenew(s *rhpv3.Stream, log *zap.Logger) error // sign and broadcast the transaction if err := sh.wallet.SignTransaction(sh.chain.TipState(), &renewalTxn, toSign, wallet.ExplicitCoveredFields(renewalTxn)); err != nil { s.WriteResponseErr(fmt.Errorf("failed to sign renewal transaction: %w", ErrHostInternalError)) - return fmt.Errorf("failed to sign renewal transaction: %w", err) + return contracts.Usage{}, fmt.Errorf("failed to sign renewal transaction: %w", err) } renewalTxnSet := append(parents, renewalTxn) if err := sh.tpool.AcceptTransactionSet(renewalTxnSet); err != nil { err = fmt.Errorf("failed to broadcast renewal transaction: %w", err) s.WriteResponseErr(err) - return err + return contracts.Usage{}, err } // calculate the usage @@ -417,7 +419,7 @@ func (sh *SessionHandler) handleRPCRenew(s *rhpv3.Stream, log *zap.Logger) error err = sh.contracts.RenewContract(signedRenewal, signedClearingRevision, renewalTxnSet, lockedCollateral, finalRevisionUsage, renewalUsage) if err != nil { s.WriteResponseErr(fmt.Errorf("failed to renew contract: %w", ErrHostInternalError)) - return fmt.Errorf("failed to renew contract: %w", err) + return contracts.Usage{}, fmt.Errorf("failed to renew contract: %w", err) } // send the signatures to the renter @@ -432,21 +434,18 @@ func (sh *SessionHandler) handleRPCRenew(s *rhpv3.Stream, log *zap.Logger) error Signature: signedRenewal.HostSignature[:], }, } - if err := s.WriteResponse(hostSigs); err != nil { - return fmt.Errorf("failed to write host signatures: %w", err) - } - return nil + return finalRevisionUsage.Add(renewalUsage), s.WriteResponse(hostSigs) } // handleRPCExecute handles an RPCExecuteProgram request. -func (sh *SessionHandler) handleRPCExecute(s *rhpv3.Stream, log *zap.Logger) error { +func (sh *SessionHandler) handleRPCExecute(s *rhpv3.Stream, log *zap.Logger) (contracts.Usage, error) { s.SetDeadline(time.Now().Add(5 * time.Minute)) // read the price table pt, err := sh.readPriceTable(s) if err != nil { err = fmt.Errorf("failed to read price table: %w", err) s.WriteResponseErr(err) - return err + return contracts.Usage{}, err } // create the program budget @@ -454,24 +453,26 @@ func (sh *SessionHandler) handleRPCExecute(s *rhpv3.Stream, log *zap.Logger) err if err != nil { err = fmt.Errorf("failed to process payment: %w", err) s.WriteResponseErr(err) - return err + return contracts.Usage{}, err } // note: the budget is committed by the executor, no need to commit it in the handler. defer budget.Rollback() // read the program request + readReqStart := time.Now() var executeReq rhpv3.RPCExecuteProgramRequest if err := s.ReadRequest(&executeReq, maxProgramRequestSize); err != nil { - return fmt.Errorf("failed to read execute request: %w", err) + return contracts.Usage{}, fmt.Errorf("failed to read execute request: %w", err) } instructions := executeReq.Program + log.Debug("read program request", zap.Duration("elapsed", time.Since(readReqStart))) // pay for the execution executeCost, _ := pt.BaseCost().Total() if err := budget.Spend(accounts.Usage{RPCRevenue: executeCost}); err != nil { err = fmt.Errorf("failed to pay program init cost: %w", err) s.WriteResponseErr(err) - return err + return contracts.Usage{}, err } var requiresContract, requiresFinalization bool @@ -486,9 +487,10 @@ func (sh *SessionHandler) handleRPCExecute(s *rhpv3.Stream, log *zap.Logger) err if executeReq.FileContractID == (types.FileContractID{}) { err = ErrContractRequired s.WriteResponseErr(err) - return err + return contracts.Usage{}, err } + contractLockStart := time.Now() ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() @@ -496,18 +498,19 @@ func (sh *SessionHandler) handleRPCExecute(s *rhpv3.Stream, log *zap.Logger) err if err != nil { err = fmt.Errorf("failed to lock contract %v: %w", executeReq.FileContractID, err) s.WriteResponseErr(err) - return err + return contracts.Usage{}, err } defer sh.contracts.Unlock(contract.Revision.ParentID) revision = &contract log = log.With(zap.String("contractID", contract.Revision.ParentID.String())) // attach the contract ID to the logger + log.Debug("locked contract", zap.Duration("elapsed", time.Since(contractLockStart))) } // generate a cancellation token and write it to the stream. Currently just // a placeholder. cancelToken := types.Specifier(frand.Entropy128()) if err := s.WriteResponse(&cancelToken); err != nil { - return fmt.Errorf("failed to write cancel token: %w", err) + return contracts.Usage{}, fmt.Errorf("failed to write cancel token: %w", err) } ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) @@ -519,12 +522,11 @@ func (sh *SessionHandler) handleRPCExecute(s *rhpv3.Stream, log *zap.Logger) err executor, err := sh.newExecutor(instructions, executeReq.ProgramData, pt, budget, revision, requiresFinalization, log) if err != nil { s.WriteResponseErr(ErrHostInternalError) - return fmt.Errorf("failed to create program executor: %w", err) - } else if err := executor.Execute(ctx, s); err != nil { - return fmt.Errorf("failed to execute program: %w", err) + return contracts.Usage{}, fmt.Errorf("failed to create program executor: %w", err) } - - return nil + err = executor.Execute(ctx, s) + usage := executor.Usage() + return usage, err } // isStreamClosedErr is a helper function that returns true if the stream was diff --git a/rhp/v3/websockets.go b/rhp/v3/websockets.go index 59ba8a57..87f8ec5c 100644 --- a/rhp/v3/websockets.go +++ b/rhp/v3/websockets.go @@ -12,19 +12,33 @@ import ( // handleWebSockets handles websocket connections to the host. func (sh *SessionHandler) handleWebSockets(w http.ResponseWriter, r *http.Request) { - log := sh.log.Named("websockets").With(zap.String("remoteAddr", r.RemoteAddr)) + log := sh.log.Named("websockets").With(zap.String("peerAddr", r.RemoteAddr)) wsConn, err := websocket.Accept(w, r, &websocket.AcceptOptions{ OriginPatterns: []string{"*"}, }) if err != nil { - log.Error("failed to accept websocket connection", zap.Error(err)) + log.Warn("failed to accept websocket connection", zap.Error(err)) return } defer wsConn.Close(websocket.StatusNormalClosure, "") + // wrap the websocket connection conn := websocket.NetConn(context.Background(), wsConn, websocket.MessageBinary) + defer conn.Close() + + // wrap the connection with a rate limiter ingress, egress := sh.settings.BandwidthLimiters() - t, err := rhpv3.NewHostTransport(rhp.NewConn(conn, sh.monitor, ingress, egress), sh.privateKey) + rhpConn := rhp.NewConn(conn, sh.monitor, ingress, egress) + defer rhpConn.Close() + + // initiate the session + sessionID, end := sh.sessions.StartSession(rhpConn, rhp.SessionProtocolWS, 3) + defer end() + + log = log.With(zap.String("sessionID", sessionID.String())) + + // upgrade the connection + t, err := rhpv3.NewHostTransport(rhpConn, sh.privateKey) if err != nil { sh.log.Debug("failed to upgrade conn", zap.Error(err), zap.String("remoteAddress", conn.RemoteAddr().String())) return @@ -37,7 +51,8 @@ func (sh *SessionHandler) handleWebSockets(w http.ResponseWriter, r *http.Reques log.Debug("failed to accept stream", zap.Error(err)) return } - go sh.handleHostStream(conn.RemoteAddr().String(), stream) + + go sh.handleHostStream(stream, sessionID, log) } }