Skip to content

Commit

Permalink
refactor metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
ChrisSchinnerl committed Jun 21, 2023
1 parent ec76520 commit 961fb69
Show file tree
Hide file tree
Showing 9 changed files with 301 additions and 292 deletions.
8 changes: 6 additions & 2 deletions hostdb/hostdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,18 @@ type hostAnnouncement struct {
Signature types.Signature
}

type ErrorResult struct {
Error string `json:"error,omitempty"`
}

type ScanResult struct {
Error string `json:"error,omitempty"`
ErrorResult
PriceTable rhpv3.HostPriceTable `json:"priceTable,omitempty"`
Settings rhpv2.HostSettings `json:"settings,omitempty"`
}

type PriceTableUpdateResult struct {
Error string `json:"error,omitempty"`
ErrorResult
PriceTable HostPriceTable `json:"priceTable,omitempty"`
}

Expand Down
8 changes: 7 additions & 1 deletion metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,18 @@ package metrics

import (
"context"
"time"

"go.sia.tech/core/types"
)

// A Metric contains metadata pertaining to a particular operation.
type Metric interface {
IsMetric()
HostKey() types.PublicKey
Result() interface{}
IsSuccess() bool
Timestamp() time.Time
Type() string
}

// A MetricsRecorder records metrics.
Expand Down
236 changes: 236 additions & 0 deletions worker/interactions.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
package worker

import (
"context"
"encoding/json"
"errors"
"fmt"
"strings"
"sync"
"time"

rhpv2 "go.sia.tech/core/rhp/v2"
rhpv3 "go.sia.tech/core/rhp/v3"
"go.sia.tech/core/types"
"go.sia.tech/renterd/hostdb"
"go.sia.tech/renterd/metrics"
"go.sia.tech/renterd/tracing"
)

func errToStr(err error) string {
if err != nil {
return err.Error()
}
return ""
}

// recordInteractions adds some interactions to the worker's interaction buffer
// which is periodically flushed to the bus.
func (w *worker) recordInteractions(interactions []hostdb.Interaction) {
w.interactionsMu.Lock()
defer w.interactionsMu.Unlock()

// Append interactions to buffer.
w.interactions = append(w.interactions, interactions...)

// If a thread was scheduled to flush the buffer we are done.
if w.interactionsFlushTimer != nil {
return
}
// Otherwise we schedule a flush.
w.interactionsFlushTimer = time.AfterFunc(w.busFlushInterval, func() {
w.interactionsMu.Lock()
w.flushInteractions()
w.interactionsMu.Unlock()
})
}

// flushInteractions flushes the worker's interaction buffer to the bus.
func (w *worker) flushInteractions() {
if len(w.interactions) > 0 {
ctx, span := tracing.Tracer.Start(context.Background(), "worker: flushInteractions")
defer span.End()
if err := w.bus.RecordInteractions(ctx, w.interactions); err != nil {
w.logger.Errorw(fmt.Sprintf("failed to record interactions: %v", err))
} else {
w.interactions = nil
}
}
w.interactionsFlushTimer = nil
}

// recordPriceTableUpdate records a price table metric.
func recordPriceTableUpdate(mr metrics.MetricsRecorder, siamuxAddr string, hostKey types.PublicKey, pt hostdb.HostPriceTable, err *error) func() {
startTime := time.Now()
return func() {
now := time.Now()
mr.RecordMetric(MetricPriceTableUpdate{
metricCommon: metricCommon{
address: siamuxAddr,
hostKey: hostKey,
timestamp: now,
elapsed: now.Sub(startTime),
err: *err,
},
pt: pt,
})
}
}

// recordScan records a scan metric.
func recordScan(mr metrics.MetricsRecorder, elapsed time.Duration, hostIP string, hostKey types.PublicKey, pt rhpv3.HostPriceTable, settings rhpv2.HostSettings, err error) {
mr.RecordMetric(MetricHostScan{
metricCommon: metricCommon{
address: hostIP,
hostKey: hostKey,
timestamp: time.Now(),
elapsed: elapsed,
err: err,
},
pt: pt,
settings: settings,
})
}

// ephemeralMetricsRecorder can be used to record metrics in memory.
type ephemeralMetricsRecorder struct {
ms []metrics.Metric
mu sync.Mutex
}

func (mr *ephemeralMetricsRecorder) RecordMetric(m metrics.Metric) {
mr.mu.Lock()
defer mr.mu.Unlock()
mr.ms = append(mr.ms, m)
}

func (mr *ephemeralMetricsRecorder) interactions() []hostdb.Interaction {
// TODO: merge/filter metrics?
var his []hostdb.Interaction
mr.mu.Lock()
defer mr.mu.Unlock()
for _, m := range mr.ms {
his = append(his, metricToInteraction(m))
}
return his
}

// metricCommon contains the common fields of all metrics.
type metricCommon struct {
hostKey types.PublicKey
address string
timestamp time.Time
elapsed time.Duration
err error
}

type metricResultCommon struct {
Address string `json:"address"`
Timestamp time.Time `json:"timestamp"`
Elapsed time.Duration `json:"elapsed"`
}

func (m metricCommon) commonResult() metricResultCommon {
return metricResultCommon{
Address: m.address,
Timestamp: m.timestamp,
Elapsed: m.elapsed,
}
}
func (m metricCommon) HostKey() types.PublicKey { return m.hostKey }
func (m metricCommon) Timestamp() time.Time { return m.timestamp }

func (m metricCommon) IsSuccess() bool {
return isSuccessfulInteraction(m.err)
}

// MetricHostDial is a metric that contains the result of a dial attempt.
type MetricHostDial struct {
metricCommon
}

func (m MetricHostDial) Result() interface{} {
return m.commonResult()
}

func (m MetricHostDial) Type() string { return "dial" }

// MetricPriceTableUpdate is a metric that contains the result of fetching a
// price table.
type MetricPriceTableUpdate struct {
metricCommon
pt hostdb.HostPriceTable
}

func (m MetricPriceTableUpdate) Result() interface{} {
cr := m.commonResult()
er := hostdb.ErrorResult{Error: errToStr(m.err)}
if m.err != nil {
return struct {
metricResultCommon
hostdb.ErrorResult
}{cr, er}
} else {
return struct {
metricResultCommon
hostdb.PriceTableUpdateResult
}{cr, hostdb.PriceTableUpdateResult{ErrorResult: er, PriceTable: m.pt}}
}
}

func (m MetricPriceTableUpdate) Type() string {
return hostdb.InteractionTypePriceTableUpdate
}

// MetricHostScan is a metric that contains the result of a host scan.
type MetricHostScan struct {
metricCommon
pt rhpv3.HostPriceTable
settings rhpv2.HostSettings
}

func (m MetricHostScan) Result() interface{} {
cr := m.commonResult()
er := hostdb.ErrorResult{Error: errToStr(m.err)}
if m.err != nil {
return struct {
metricResultCommon
hostdb.ErrorResult
}{cr, er}
} else {
return struct {
metricResultCommon
hostdb.ScanResult
}{cr, hostdb.ScanResult{ErrorResult: er, PriceTable: m.pt, Settings: m.settings}}
}
}

func (m MetricHostScan) Type() string {
return hostdb.InteractionTypeScan
}

func isSuccessfulInteraction(err error) bool {
// No error always means success.
if err == nil {
return true
}
// List of errors that are considered successful interactions.
if errors.Is(err, ErrInsufficientFunds) || strings.Contains(err.Error(), ErrInsufficientFunds.Error()) {
return true
}
if isBalanceInsufficient(err) {
return true
}
return false
}

func metricToInteraction(m metrics.Metric) hostdb.Interaction {
res, _ := json.Marshal(m.Result())
return hostdb.Interaction{
Host: m.HostKey(),
Result: res,
Success: m.IsSuccess(),
Timestamp: m.Timestamp(),
Type: m.Type(),
}
}
93 changes: 0 additions & 93 deletions worker/rhpv2.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,10 @@ import (
"encoding/json"
"errors"
"fmt"
"io"
"strings"
"time"

rhpv2 "go.sia.tech/core/rhp/v2"
"go.sia.tech/core/types"
"go.sia.tech/renterd/metrics"
)

var (
Expand Down Expand Up @@ -78,62 +75,6 @@ func wrapErr(err *error, fnName string) {
}
}

// wrapResponseErr formats RPC response errors nicely, wrapping them in either
// readCtx or rejectCtx depending on whether we encountered an I/O error or the
// host sent an explicit error message.
func wrapResponseErr(err error, readCtx, rejectCtx string) error {
if errors.As(err, new(*rhpv2.RPCError)) {
return fmt.Errorf("%s: %w", rejectCtx, err)
}
if err != nil {
return fmt.Errorf("%s: %w", readCtx, err)
}
return nil
}

// MetricRPC contains metrics relating to a single RPC.
type MetricRPC struct {
HostKey types.PublicKey
RPC types.Specifier
Timestamp time.Time
Elapsed time.Duration
Contract types.FileContractID // possibly empty
Uploaded uint64
Downloaded uint64
Cost types.Currency
Collateral types.Currency
Err error
}

// IsMetric implements metrics.Metric.
func (MetricRPC) IsMetric() {}

// IsSuccess implements metrics.Metric.
func (m MetricRPC) IsSuccess() bool { return m.Err == nil }

// helper type for ensuring that we always write in multiples of LeafSize,
// which is required by e.g. (renter.EncryptionKey).XORKeyStream
type segWriter struct {
w io.Writer
buf [rhpv2.LeafSize * 64]byte
len int
}

func (sw *segWriter) Write(p []byte) (int, error) {
lenp := len(p)
for len(p) > 0 {
n := copy(sw.buf[sw.len:], p)
sw.len += n
p = p[n:]
segs := sw.buf[:sw.len-(sw.len%rhpv2.LeafSize)]
if _, err := sw.w.Write(segs); err != nil {
return 0, err
}
sw.len = copy(sw.buf[:], sw.buf[len(segs):sw.len])
}
return lenp, nil
}

func hashRevision(rev types.FileContractRevision) types.Hash256 {
h := types.NewHasher()
rev.EncodeTo(h.E)
Expand Down Expand Up @@ -174,43 +115,9 @@ func updateRevisionOutputs(rev *types.FileContractRevision, cost, collateral typ
[]types.Currency{rev.MissedProofOutputs[0].Value, rev.MissedProofOutputs[1].Value, rev.MissedProofOutputs[2].Value}, nil
}

func recordRPC(ctx context.Context, t *rhpv2.Transport, c rhpv2.ContractRevision, id types.Specifier, err *error) func() {
startTime := time.Now()
contractID := c.ID()
var startFunds types.Currency
if len(c.Revision.ValidProofOutputs) > 0 {
startFunds = c.Revision.ValidProofOutputs[0].Value
}
var startCollateral types.Currency
if len(c.Revision.MissedProofOutputs) > 1 {
startCollateral = c.Revision.MissedProofOutputs[1].Value
}
startW, startR := t.BytesWritten(), t.BytesRead()
return func() {
m := MetricRPC{
HostKey: t.HostKey(),
RPC: id,
Timestamp: startTime,
Elapsed: time.Since(startTime),
Contract: contractID,
Uploaded: t.BytesWritten() - startW,
Downloaded: t.BytesRead() - startR,
Err: *err,
}
if len(c.Revision.ValidProofOutputs) > 0 && startFunds.Cmp(c.Revision.ValidProofOutputs[0].Value) > 0 {
m.Cost = startFunds.Sub(c.Revision.ValidProofOutputs[0].Value)
}
if len(c.Revision.MissedProofOutputs) > 1 && startCollateral.Cmp(c.Revision.MissedProofOutputs[1].Value) > 0 {
m.Collateral = startCollateral.Sub(c.Revision.MissedProofOutputs[1].Value)
}
metrics.Record(ctx, m)
}
}

// RPCSettings calls the Settings RPC, returning the host's reported settings.
func RPCSettings(ctx context.Context, t *rhpv2.Transport) (settings rhpv2.HostSettings, err error) {
defer wrapErr(&err, "Settings")
defer recordRPC(ctx, t, rhpv2.ContractRevision{}, rhpv2.RPCSettingsID, &err)()

var resp rhpv2.RPCSettingsResponse
if err := t.Call(rhpv2.RPCSettingsID, nil, &resp); err != nil {
Expand Down
Loading

0 comments on commit 961fb69

Please sign in to comment.