diff --git a/autopilot/autopilot.go b/autopilot/autopilot.go index f10865de2..0446892dc 100644 --- a/autopilot/autopilot.go +++ b/autopilot/autopilot.go @@ -174,7 +174,7 @@ func (wp *workerPool) withWorkers(workerFunc func([]Worker)) { // Handler returns an HTTP handler that serves the autopilot api. func (ap *Autopilot) Handler() http.Handler { - return jape.Mux(tracing.TracedRoutes(api.DefaultAutopilotID, map[string]jape.Handler{ + return jape.Mux(tracing.TracingMiddleware(api.DefaultAutopilotID, map[string]jape.Handler{ "GET /config": ap.configHandlerGET, "PUT /config": ap.configHandlerPUT, "POST /hosts": ap.hostsHandlerPOST, diff --git a/autopilot/contractor.go b/autopilot/contractor.go index 7b2de9130..743a8506c 100644 --- a/autopilot/contractor.go +++ b/autopilot/contractor.go @@ -666,7 +666,7 @@ func (c *contractor) runContractChecks(ctx context.Context, w Worker, contracts toArchive[fcid] = errContractMaxRevisionNumber.Error() } else if contract.RevisionNumber == math.MaxUint64 { toArchive[fcid] = errContractMaxRevisionNumber.Error() - } else if contract.State == api.ContractStatePending && cs.BlockHeight-contract.StartHeight > 18 { + } else if contract.State == api.ContractStatePending && cs.BlockHeight-contract.StartHeight > contractConfirmationDeadline { toArchive[fcid] = errContractNotConfirmed.Error() } if _, archived := toArchive[fcid]; archived { diff --git a/bus/bus.go b/bus/bus.go index 37c5f3f72..aacfb0f0a 100644 --- a/bus/bus.go +++ b/bus/bus.go @@ -233,7 +233,7 @@ type bus struct { // Handler returns an HTTP handler that serves the bus API. func (b *bus) Handler() http.Handler { - return jape.Mux(tracing.TracedRoutes("bus", map[string]jape.Handler{ + return jape.Mux(tracing.TracingMiddleware("bus", map[string]jape.Handler{ "GET /accounts": b.accountsHandlerGET, "POST /account/:id": b.accountHandlerGET, "POST /account/:id/add": b.accountsAddHandlerPOST, diff --git a/hostdb/hostdb.go b/hostdb/hostdb.go index 935240dd1..c1b4769d6 100644 --- a/hostdb/hostdb.go +++ b/hostdb/hostdb.go @@ -23,30 +23,6 @@ type hostAnnouncement struct { Signature types.Signature } -type ErrorResult struct { - Error string `json:"error,omitempty"` -} - -type MetricResultCommon struct { - Address string `json:"address"` - Timestamp time.Time `json:"timestamp"` - Elapsed time.Duration `json:"elapsed"` -} - -type ScanResult struct { - ErrorResult - PriceTable rhpv3.HostPriceTable `json:"priceTable,omitempty"` - Settings rhpv2.HostSettings `json:"settings,omitempty"` -} - -type PriceTableUpdateResult struct { - ErrorResult - PriceTable HostPriceTable `json:"priceTable,omitempty"` -} - -const InteractionTypeScan = "scan" -const InteractionTypePriceTableUpdate = "pricetableupdate" - // ForEachAnnouncement calls fn on each host announcement in a block. func ForEachAnnouncement(b types.Block, height uint64, fn func(types.PublicKey, Announcement)) { for _, txn := range b.Transactions { diff --git a/internal/testing/interactions_test.go b/internal/testing/interactions_test.go new file mode 100644 index 000000000..686003e02 --- /dev/null +++ b/internal/testing/interactions_test.go @@ -0,0 +1,91 @@ +package testing + +import ( + "context" + "errors" + "testing" + "time" + + rhpv3 "go.sia.tech/core/rhp/v3" +) + +func TestInteractions(t *testing.T) { + if testing.Short() { + t.SkipNow() + } + + // create a new test cluster + cluster := newTestCluster(t, clusterOptsDefault) + defer cluster.Shutdown() + b := cluster.Bus + w := cluster.Worker + tt := cluster.tt + + // add a host + hosts := cluster.AddHosts(1) + h1 := hosts[0] + + // wait for contracts + cluster.WaitForContracts() + + // shut down the autopilot to prevent it from interfering with the test + cluster.ShutdownAutopilot(context.Background()) + time.Sleep(time.Second) + + // fetch the host + h, err := b.Host(context.Background(), h1.PublicKey()) + tt.OK(err) + + // assert the host got scanned at least once + ts := h.Interactions.TotalScans + if ts == 0 { + t.Fatal("expected at least one scan") + } + + // assert the price table was set + ptUID := h.PriceTable.UID + if ptUID == (rhpv3.SettingsID{}) { + t.Fatal("expected pt UID to be set") + } + + // assert price table gets updated + var ptUpdates int + tt.Retry(100, 100*time.Millisecond, func() error { + // fetch contracts (this registers host interactions) + tt.OKAll(w.Contracts(context.Background(), time.Minute)) + + // fetch the host + h, err := b.Host(context.Background(), h1.PublicKey()) + tt.OK(err) + + // make sure it did not get scanned again + if h.Interactions.TotalScans != ts { + t.Fatal("expected no new scans", h.Interactions.TotalScans, ts) + } + + // keep track of pt updates + if h.PriceTable.UID != ptUID { + ptUID = h.PriceTable.UID + ptUpdates++ + } + + // assert the price table was updated + if ptUpdates < 2 { + return errors.New("price table should be updated from time to time") + } + return nil + }) + + // scan the host manually + tt.OKAll(w.RHPScan(context.Background(), h1.PublicKey(), h.NetAddress, 0)) + time.Sleep(3 * testBusFlushInterval) + + // fetch the host + h, err = b.Host(context.Background(), h1.PublicKey()) + tt.OK(err) + + // assert the scan was registered + if ts+1 != h.Interactions.TotalScans { + t.Fatal("expected one new scan") + } +} diff --git a/metrics/metrics.go b/metrics/metrics.go deleted file mode 100644 index ac6ca61f9..000000000 --- a/metrics/metrics.go +++ /dev/null @@ -1,38 +0,0 @@ -package metrics - -import ( - "context" - "time" - - "go.sia.tech/core/types" -) - -// A Metric contains metadata pertaining to a particular operation. -type Metric interface { - HostKey() types.PublicKey - Result() interface{} - IsSuccess() bool - Timestamp() time.Time - Type() string -} - -// A MetricsRecorder records metrics. -type MetricsRecorder interface { - RecordMetric(m Metric) -} - -type contextKey string - -const keyMetricsRecorder contextKey = "MetricsRecorder" - -// WithRecorder stores mr in ctx. -func WithRecorder(ctx context.Context, mr MetricsRecorder) context.Context { - return context.WithValue(ctx, keyMetricsRecorder, mr) -} - -// Record records m using the MetricsRecorder stored in ctx, if it exists. -func Record(ctx context.Context, m Metric) { - if mr, ok := ctx.Value(keyMetricsRecorder).(MetricsRecorder); ok { - mr.RecordMetric(m) - } -} diff --git a/stores/types.go b/stores/types.go index a3e6e7869..6b74f7563 100644 --- a/stores/types.go +++ b/stores/types.go @@ -116,11 +116,11 @@ func (currency) GormDataType() string { // Scan scan value into currency, implements sql.Scanner interface. func (c *currency) Scan(value interface{}) error { var s string - switch value.(type) { + switch value := value.(type) { case string: - s = value.(string) + s = value case []byte: - s = string(value.([]byte)) + s = string(value) default: return fmt.Errorf("failed to unmarshal currency value: %v %t", value, value) } @@ -202,11 +202,11 @@ func (balance) GormDataType() string { // Scan scan value into balance, implements sql.Scanner interface. func (hs *balance) Scan(value interface{}) error { var s string - switch value.(type) { + switch value := value.(type) { case string: - s = value.(string) + s = value case []byte: - s = string(value.([]byte)) + s = string(value) default: return fmt.Errorf("failed to unmarshal balance value: %v %t", value, value) } diff --git a/tracing/tracing.go b/tracing/tracing.go index 109a11014..8f4f0a8dc 100644 --- a/tracing/tracing.go +++ b/tracing/tracing.go @@ -69,8 +69,8 @@ func Init(serviceInstanceId string) (func(ctx context.Context) error, error) { return provider.Shutdown, nil } -// TracedHandler attaches a tracing handler to http routes. -func TracedRoutes(component string, routes map[string]jape.Handler) map[string]jape.Handler { +// TracingMiddleware attaches a tracing handler to http routes. +func TracingMiddleware(component string, routes map[string]jape.Handler) map[string]jape.Handler { adapt := func(route string, h jape.Handler) jape.Handler { return jape.Adapt(func(h http.Handler) http.Handler { return otelhttp.NewHandler(h, fmt.Sprintf("%s: %s", component, route)) diff --git a/worker/gouging.go b/worker/gouging.go index ef26fd1d7..a655821f8 100644 --- a/worker/gouging.go +++ b/worker/gouging.go @@ -449,3 +449,10 @@ func sectorUploadCostRHPv3(pt rhpv3.HostPriceTable) (types.Currency, bool) { } return total, false } + +func errToStr(err error) string { + if err != nil { + return err.Error() + } + return "" +} diff --git a/worker/interactions.go b/worker/interactions.go index 785b0b26c..1e8a62f89 100644 --- a/worker/interactions.go +++ b/worker/interactions.go @@ -3,38 +3,69 @@ package worker import ( "context" "fmt" - "sync" + "net/http" "time" - rhpv2 "go.sia.tech/core/rhp/v2" - rhpv3 "go.sia.tech/core/rhp/v3" - "go.sia.tech/core/types" + "go.sia.tech/jape" "go.sia.tech/renterd/hostdb" - "go.sia.tech/renterd/metrics" "go.sia.tech/renterd/tracing" ) -func errToStr(err error) string { - if err != nil { - return err.Error() +const ( + keyInteractionRecorder contextKey = "InteractionRecorder" +) + +type ( + InteractionRecorder interface { + RecordHostScan(...hostdb.HostScan) + RecordPriceTableUpdate(...hostdb.PriceTableUpdate) } - return "" +) + +var _ InteractionRecorder = &worker{} + +func interactionMiddleware(ir InteractionRecorder, routes map[string]jape.Handler) map[string]jape.Handler { + for route, handler := range routes { + routes[route] = jape.Adapt(func(h http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + ctx := context.WithValue(r.Context(), keyInteractionRecorder, ir) + h.ServeHTTP(w, r.WithContext(ctx)) + }) + })(handler) + } + return routes } -// recordInteractions adds some interactions to the worker's interaction buffer -// which is periodically flushed to the bus. -func (w *worker) recordInteractions(scans []hostdb.HostScan, priceTableUpdates []hostdb.PriceTableUpdate) { +func InteractionRecorderFromContext(ctx context.Context) InteractionRecorder { + ir, ok := ctx.Value(keyInteractionRecorder).(InteractionRecorder) + if !ok { + panic("no interaction recorder attached to the context") // developer error + } + return ir +} + +func (w *worker) RecordHostScan(scans ...hostdb.HostScan) { w.interactionsMu.Lock() defer w.interactionsMu.Unlock() - // Append interactions to buffer. w.interactionsScans = append(w.interactionsScans, scans...) - w.interactionsPriceTableUpdates = append(w.interactionsPriceTableUpdates, priceTableUpdates...) + w.tryFlushInteractionsBuffer() +} + +func (w *worker) RecordPriceTableUpdate(ptUpdates ...hostdb.PriceTableUpdate) { + w.interactionsMu.Lock() + defer w.interactionsMu.Unlock() + w.interactionsPriceTableUpdates = append(w.interactionsPriceTableUpdates, ptUpdates...) + w.tryFlushInteractionsBuffer() +} + +func (w *worker) tryFlushInteractionsBuffer() { // If a thread was scheduled to flush the buffer we are done. if w.interactionsFlushTimer != nil { return } + // Otherwise we schedule a flush. w.interactionsFlushTimer = time.AfterFunc(w.busFlushInterval, func() { w.interactionsMu.Lock() @@ -66,124 +97,6 @@ func (w *worker) flushInteractions() { w.interactionsFlushTimer = nil } -// recordPriceTableUpdate records a price table metric. -func recordPriceTableUpdate(ctx context.Context, siamuxAddr string, hostKey types.PublicKey, pt *hostdb.HostPriceTable, err *error) func() { - startTime := time.Now() - return func() { - now := time.Now() - metrics.Record(ctx, MetricPriceTableUpdate{ - metricCommon: metricCommon{ - address: siamuxAddr, - hostKey: hostKey, - timestamp: now, - elapsed: now.Sub(startTime), - err: *err, - }, - pt: *pt, - }) - } -} - -// ephemeralMetricsRecorder can be used to record metrics in memory. -type ephemeralMetricsRecorder struct { - ms []metrics.Metric - mu sync.Mutex -} - -func (mr *ephemeralMetricsRecorder) RecordMetric(m metrics.Metric) { - mr.mu.Lock() - defer mr.mu.Unlock() - mr.ms = append(mr.ms, m) -} - -func (mr *ephemeralMetricsRecorder) interactions() []any { - // TODO: merge/filter metrics? - var his []any - mr.mu.Lock() - defer mr.mu.Unlock() - for _, m := range mr.ms { - his = append(his, metricToInteraction(m)) - } - return his -} - -// metricCommon contains the common fields of all metrics. -type metricCommon struct { - hostKey types.PublicKey - address string - timestamp time.Time - elapsed time.Duration - err error -} - -func (m metricCommon) commonResult() hostdb.MetricResultCommon { - return hostdb.MetricResultCommon{ - Address: m.address, - Timestamp: m.timestamp, - Elapsed: m.elapsed, - } -} -func (m metricCommon) HostKey() types.PublicKey { return m.hostKey } -func (m metricCommon) Timestamp() time.Time { return m.timestamp } - -func (m metricCommon) IsSuccess() bool { - return isSuccessfulInteraction(m.err) -} - -// MetricPriceTableUpdate is a metric that contains the result of fetching a -// price table. -type MetricPriceTableUpdate struct { - metricCommon - pt hostdb.HostPriceTable -} - -func (m MetricPriceTableUpdate) Result() interface{} { - cr := m.commonResult() - er := hostdb.ErrorResult{Error: errToStr(m.err)} - if m.err != nil { - return struct { - hostdb.MetricResultCommon - hostdb.ErrorResult - }{cr, er} - } else { - return struct { - hostdb.MetricResultCommon - hostdb.PriceTableUpdateResult - }{cr, hostdb.PriceTableUpdateResult{ErrorResult: er, PriceTable: m.pt}} - } -} - -func (m MetricPriceTableUpdate) Type() string { - return hostdb.InteractionTypePriceTableUpdate -} - -// MetricHostScan is a metric that contains the result of a host scan. -type MetricHostScan struct { - metricCommon - pt rhpv3.HostPriceTable - settings rhpv2.HostSettings -} - -func (m MetricHostScan) Result() interface{} { - cr := m.commonResult() - er := hostdb.ErrorResult{Error: errToStr(m.err)} - if m.err != nil { - return struct { - hostdb.MetricResultCommon - hostdb.ErrorResult - }{cr, er} - } else { - return struct { - hostdb.MetricResultCommon - hostdb.ScanResult - }{cr, hostdb.ScanResult{ErrorResult: er, PriceTable: m.pt, Settings: m.settings}} - } -} - -func (m MetricHostScan) Type() string { - return hostdb.InteractionTypeScan -} - func isSuccessfulInteraction(err error) bool { // No error always means success. if err == nil { @@ -198,7 +111,3 @@ func isSuccessfulInteraction(err error) bool { } return false } - -func metricToInteraction(m metrics.Metric) any { - return nil -} diff --git a/worker/rhpv3.go b/worker/rhpv3.go index b285cca47..7dfd1a48b 100644 --- a/worker/rhpv3.go +++ b/worker/rhpv3.go @@ -21,7 +21,6 @@ import ( "go.sia.tech/mux/v1" "go.sia.tech/renterd/api" "go.sia.tech/renterd/hostdb" - "go.sia.tech/renterd/metrics" "go.sia.tech/siad/crypto" "go.uber.org/zap" "lukechampine.com/frand" @@ -206,12 +205,6 @@ func dialTransport(ctx context.Context, siamuxAddr string, hostKey types.PublicK } func (p *transportPoolV3) withTransportV3(ctx context.Context, hostKey types.PublicKey, siamuxAddr string, fn func(context.Context, *transportV3) error) (err error) { - var mr ephemeralMetricsRecorder - defer func() { - // TODO: record metrics - }() - ctx = metrics.WithRecorder(ctx, &mr) - // Create or fetch transport. p.mu.Lock() t, found := p.pool[siamuxAddr] @@ -425,7 +418,6 @@ type ( contractSpendingRecorder *contractSpendingRecorder fcid types.FileContractID logger *zap.SugaredLogger - mr *ephemeralMetricsRecorder siamuxAddr string renterKey types.PrivateKey accountKey types.PrivateKey @@ -961,7 +953,14 @@ func (h *host) FetchPriceTable(ctx context.Context, rev *types.FileContractRevis // fetchPT is a helper function that performs the RPC given a payment function fetchPT := func(paymentFn PriceTablePaymentFunc) (hpt hostdb.HostPriceTable, err error) { err = h.transportPool.withTransportV3(ctx, h.HostKey(), h.siamuxAddr, func(ctx context.Context, t *transportV3) (err error) { - defer recordPriceTableUpdate(ctx, h.siamuxAddr, h.HostKey(), &hpt, &err)() + defer func() { + InteractionRecorderFromContext(ctx).RecordPriceTableUpdate(hostdb.PriceTableUpdate{ + HostKey: h.HostKey(), + Success: isSuccessfulInteraction(err), + Timestamp: time.Now(), + PriceTable: hpt, + }) + }() pt, err := RPCPriceTable(ctx, t, paymentFn) if err != nil { diff --git a/worker/upload.go b/worker/upload.go index cb625e9db..a836a2c25 100644 --- a/worker/upload.go +++ b/worker/upload.go @@ -836,20 +836,6 @@ func (mgr *uploadManager) renewUploader(u *uploader) { u.SignalWork() } -func (mgr *uploadManager) renewalsMap() map[types.FileContractID]types.FileContractID { - mgr.mu.Lock() - defer mgr.mu.Unlock() - - renewals := make(map[types.FileContractID]types.FileContractID) - for _, u := range mgr.uploaders { - fcid, renewedFrom, _ := u.contractInfo() - if renewedFrom != (types.FileContractID{}) { - renewals[renewedFrom] = fcid - } - } - return renewals -} - func (mgr *uploadManager) refreshUploaders(contracts []api.ContractMetadata, bh uint64) { // build map c2m := make(map[types.FileContractID]api.ContractMetadata) diff --git a/worker/worker.go b/worker/worker.go index 0a9f4784c..2c98df5cc 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -25,7 +25,6 @@ import ( "go.sia.tech/renterd/api" "go.sia.tech/renterd/build" "go.sia.tech/renterd/hostdb" - "go.sia.tech/renterd/metrics" "go.sia.tech/renterd/object" "go.sia.tech/renterd/tracing" "go.sia.tech/renterd/webhooks" @@ -271,11 +270,6 @@ func dial(ctx context.Context, hostIP string) (net.Conn, error) { } func (w *worker) withTransportV2(ctx context.Context, hostKey types.PublicKey, hostIP string, fn func(*rhpv2.Transport) error) (err error) { - var mr ephemeralMetricsRecorder - defer func() { - // TODO record metrics - }() - ctx = metrics.WithRecorder(ctx, &mr) conn, err := dial(ctx, hostIP) if err != nil { return err @@ -307,7 +301,6 @@ func (w *worker) newHostV3(contractID types.FileContractID, hostKey types.Public acc: w.accounts.ForHost(hostKey), bus: w.bus, contractSpendingRecorder: w.contractSpendingRecorder, - mr: &ephemeralMetricsRecorder{}, logger: w.logger.Named(hostKey.String()[:4]), fcid: contractID, siamuxAddr: siamuxAddr, @@ -352,20 +345,23 @@ func (w *worker) registerAlert(a alerts.Alert) { } func (w *worker) rhpScanHandler(jc jape.Context) { + ctx := jc.Request.Context() + + // decode the request var rsr api.RHPScanRequest if jc.Decode(&rsr) != nil { return } - ctx := jc.Request.Context() + // apply the timeout if rsr.Timeout > 0 { var cancel context.CancelFunc - ctx, cancel = context.WithTimeout(jc.Request.Context(), time.Duration(rsr.Timeout)) + ctx, cancel = context.WithTimeout(ctx, time.Duration(rsr.Timeout)) defer cancel() } // only scan hosts if we are online - peers, err := w.bus.SyncerPeers(jc.Request.Context()) + peers, err := w.bus.SyncerPeers(ctx) if jc.Check("failed to fetch peers from bus", err) != nil { return } @@ -381,20 +377,6 @@ func (w *worker) rhpScanHandler(jc jape.Context) { errStr = err.Error() } - // record scan - err = w.bus.RecordHostScans(jc.Request.Context(), []hostdb.HostScan{{ - HostKey: rsr.HostKey, - Success: err == nil, - Timestamp: time.Now(), - Settings: settings, - PriceTable: priceTable, - }}) - if jc.Check("failed to record scan", err) != nil { - return - } - - // TODO: record metric - jc.Encode(api.RHPScanResponse{ Ping: api.DurationMS(elapsed), PriceTable: priceTable, @@ -465,30 +447,49 @@ func (w *worker) fetchPriceTable(ctx context.Context, hk types.PublicKey, siamux } func (w *worker) rhpPriceTableHandler(jc jape.Context) { + ctx := jc.Request.Context() + + // decode the request var rptr api.RHPPriceTableRequest if jc.Decode(&rptr) != nil { return } - ctx := jc.Request.Context() + // apply timeout if rptr.Timeout > 0 { var cancel context.CancelFunc - ctx, cancel = context.WithTimeout(jc.Request.Context(), time.Duration(rptr.Timeout)) + ctx, cancel = context.WithTimeout(ctx, time.Duration(rptr.Timeout)) defer cancel() } - var pt rhpv3.HostPriceTable - if jc.Check("could not get price table", w.transportPoolV3.withTransportV3(ctx, rptr.HostKey, rptr.SiamuxAddr, func(ctx context.Context, t *transportV3) (err error) { - pt, err = RPCPriceTable(ctx, t, func(pt rhpv3.HostPriceTable) (rhpv3.PaymentMethod, error) { return nil, nil }) - return - })) != nil { - return - } + // defer interaction recording + var err error + var hpt hostdb.HostPriceTable + defer func() { + InteractionRecorderFromContext(ctx).RecordPriceTableUpdate(hostdb.PriceTableUpdate{ + HostKey: rptr.HostKey, + Success: isSuccessfulInteraction(err), + Timestamp: time.Now(), + PriceTable: hpt, + }) + }() - jc.Encode(hostdb.HostPriceTable{ - HostPriceTable: pt, - Expiry: time.Now().Add(pt.Validity), + err = w.transportPoolV3.withTransportV3(ctx, rptr.HostKey, rptr.SiamuxAddr, func(ctx context.Context, t *transportV3) error { + if pt, err := RPCPriceTable(ctx, t, func(pt rhpv3.HostPriceTable) (rhpv3.PaymentMethod, error) { return nil, nil }); err != nil { + return err + } else { + hpt = hostdb.HostPriceTable{ + HostPriceTable: pt, + Expiry: time.Now().Add(pt.Validity), + } + } + return nil }) + + if jc.Check("could not get price table", err) != nil { + return + } + jc.Encode(hpt) } func (w *worker) discardTxnOnErr(ctx context.Context, txn types.Transaction, errContext string, err *error) { @@ -497,6 +498,8 @@ func (w *worker) discardTxnOnErr(ctx context.Context, txn types.Transaction, err func (w *worker) rhpFormHandler(jc jape.Context) { ctx := jc.Request.Context() + + // decode the request var rfr api.RHPFormRequest if jc.Decode(&rfr) != nil { return @@ -558,7 +561,7 @@ func (w *worker) rhpFormHandler(jc jape.Context) { } // broadcast the transaction set - err = w.bus.BroadcastTransaction(jc.Request.Context(), txnSet) + err = w.bus.BroadcastTransaction(ctx, txnSet) if err != nil && !isErrDuplicateTransactionSet(err) { w.logger.Errorf("failed to broadcast formation txn set: %v", err) } @@ -571,13 +574,15 @@ func (w *worker) rhpFormHandler(jc jape.Context) { } func (w *worker) rhpBroadcastHandler(jc jape.Context) { + ctx := jc.Request.Context() + + // decode the fcid var fcid types.FileContractID if jc.DecodeParam("id", &fcid) != nil { return } // Acquire lock before fetching revision. - ctx := jc.Request.Context() unlocker, err := w.acquireContractLock(ctx, fcid, lockingPriorityActiveContractRevision) if jc.Check("could not acquire revision lock", err) != nil { return @@ -624,6 +629,8 @@ func (w *worker) rhpBroadcastHandler(jc jape.Context) { } func (w *worker) rhpPruneContractHandlerPOST(jc jape.Context) { + ctx := jc.Request.Context() + // decode fcid var fcid types.FileContractID if jc.DecodeParam("id", &fcid) != nil { @@ -637,10 +644,9 @@ func (w *worker) rhpPruneContractHandlerPOST(jc jape.Context) { } // apply timeout - ctx := jc.Request.Context() if pcr.Timeout > 0 { var cancel context.CancelFunc - ctx, cancel = context.WithTimeout(jc.Request.Context(), time.Duration(pcr.Timeout)) + ctx, cancel = context.WithTimeout(ctx, time.Duration(pcr.Timeout)) defer cancel() } @@ -687,6 +693,8 @@ func (w *worker) rhpPruneContractHandlerPOST(jc jape.Context) { } func (w *worker) rhpContractRootsHandlerGET(jc jape.Context) { + ctx := jc.Request.Context() + // decode fcid var id types.FileContractID if jc.DecodeParam("id", &id) != nil { @@ -694,7 +702,6 @@ func (w *worker) rhpContractRootsHandlerGET(jc jape.Context) { } // fetch the contract from the bus - ctx := jc.Request.Context() c, err := w.bus.Contract(ctx, id) if errors.Is(err, api.ErrContractNotFound) { jc.Error(err, http.StatusNotFound) @@ -748,7 +755,7 @@ func (w *worker) rhpRenewHandler(jc jape.Context) { } // broadcast the transaction set - err = w.bus.BroadcastTransaction(jc.Request.Context(), txnSet) + err = w.bus.BroadcastTransaction(ctx, txnSet) if err != nil && !isErrDuplicateTransactionSet(err) { w.logger.Errorf("failed to broadcast renewal txn set: %v", err) } @@ -998,9 +1005,10 @@ func (w *worker) uploadsStatsHandlerGET(jc jape.Context) { } func (w *worker) objectsHandlerGET(jc jape.Context) { - ctx := jc.Request.Context() jc.Custom(nil, []api.ObjectMetadata{}) + ctx := jc.Request.Context() + bucket := api.DefaultBucketName if jc.DecodeForm("bucket", &bucket) != nil { return @@ -1190,6 +1198,9 @@ func (w *worker) multipartUploadHandlerPUT(jc jape.Context) { return } + // attach gouging checker to the context + ctx = WithGougingChecker(ctx, w.bus, up.GougingParams) + // cancel the upload if no contract set is specified if up.ContractSet == "" { jc.Error(api.ErrContractSetNotSpecified, http.StatusBadRequest) @@ -1278,7 +1289,7 @@ func (w *worker) multipartUploadHandlerPUT(jc jape.Context) { if disablePreshardingEncryption { opts = append(opts, WithCustomKey(object.NoOpKey)) } else { - upload, err := w.bus.MultipartUpload(jc.Request.Context(), uploadID) + upload, err := w.bus.MultipartUpload(ctx, uploadID) if err != nil { jc.Error(err, http.StatusBadRequest) return @@ -1340,6 +1351,8 @@ func (w *worker) objectsHandlerDELETE(jc jape.Context) { func (w *worker) rhpContractsHandlerGET(jc jape.Context) { ctx := jc.Request.Context() + + // fetch contracts busContracts, err := w.bus.Contracts(ctx) if jc.Check("failed to fetch contracts from bus", err) != nil { return @@ -1452,7 +1465,7 @@ func New(masterKey [32]byte, id string, b Bus, contractLockingDuration, busFlush // Handler returns an HTTP handler that serves the worker API. func (w *worker) Handler() http.Handler { - return jape.Mux(tracing.TracedRoutes("worker", map[string]jape.Handler{ + return jape.Mux(tracing.TracingMiddleware("worker", interactionMiddleware(w, map[string]jape.Handler{ "GET /account/:hostkey": w.accountHandlerGET, "GET /id": w.idHandlerGET, @@ -1482,7 +1495,7 @@ func (w *worker) Handler() http.Handler { "PUT /multipart/*path": w.multipartUploadHandlerPUT, "GET /state": w.stateHandlerGET, - })) + }))) } // Shutdown shuts down the worker. @@ -1590,7 +1603,18 @@ func (w *worker) acquireContractLock(ctx context.Context, fcid types.FileContrac return newContractLock(fcid, lockID, w.contractLockingDuration, w.bus, w.logger), nil } -func (w *worker) scanHost(ctx context.Context, hostKey types.PublicKey, hostIP string) (rhpv2.HostSettings, rhpv3.HostPriceTable, time.Duration, error) { +func (w *worker) scanHost(ctx context.Context, hostKey types.PublicKey, hostIP string) (settings rhpv2.HostSettings, hpt rhpv3.HostPriceTable, elapsed time.Duration, err error) { + // record host scan + defer func() { + InteractionRecorderFromContext(ctx).RecordHostScan(hostdb.HostScan{ + HostKey: hostKey, + Success: isSuccessfulInteraction(err), + Timestamp: time.Now(), + Settings: settings, + PriceTable: hpt, + }) + }() + // resolve hostIP. We don't want to scan hosts on private networks. if !w.allowPrivateIPs { host, _, err := net.SplitHostPort(hostIP) @@ -1610,8 +1634,7 @@ func (w *worker) scanHost(ctx context.Context, hostKey types.PublicKey, hostIP s // fetch the host settings start := time.Now() - var settings rhpv2.HostSettings - err := w.withTransportV2(ctx, hostKey, hostIP, func(t *rhpv2.Transport) (err error) { + err = w.withTransportV2(ctx, hostKey, hostIP, func(t *rhpv2.Transport) (err error) { if settings, err = RPCSettings(ctx, t); err == nil { // NOTE: we overwrite the NetAddress with the host address here since we // just used it to dial the host we know it's valid @@ -1619,17 +1642,16 @@ func (w *worker) scanHost(ctx context.Context, hostKey types.PublicKey, hostIP s } return err }) - elapsed := time.Since(start) + elapsed = time.Since(start) // fetch the host pricetable - var pt rhpv3.HostPriceTable if err == nil { err = w.transportPoolV3.withTransportV3(ctx, hostKey, settings.SiamuxAddr(), func(ctx context.Context, t *transportV3) (err error) { - pt, err = RPCPriceTable(ctx, t, func(pt rhpv3.HostPriceTable) (rhpv3.PaymentMethod, error) { return nil, nil }) + hpt, err = RPCPriceTable(ctx, t, func(pt rhpv3.HostPriceTable) (rhpv3.PaymentMethod, error) { return nil, nil }) return err }) } - return settings, pt, elapsed, err + return } // PartialSlab fetches the data of a partial slab from the bus. It will fall