From 5a157a24cdb0947c560f4c5de499a064fa62c09a Mon Sep 17 00:00:00 2001 From: PJ Date: Tue, 28 Nov 2023 17:51:27 +0100 Subject: [PATCH 01/12] metrics: add contract pruning metrics --- .github/workflows/test.yml | 12 +- api/metrcis.go | 25 +++ autopilot/alerts.go | 16 +- autopilot/autopilot.go | 1 + autopilot/contract_pruning.go | 189 ++++++++++++++++++ ...ractorspending.go => contract_spending.go} | 0 autopilot/contractor.go | 100 +-------- bus/bus.go | 36 +++- bus/client/metrics.go | 61 +++++- stores/metrics.go | 87 ++++++++ stores/metrics_test.go | 81 ++++++++ stores/migrations.go | 21 +- 12 files changed, 509 insertions(+), 120 deletions(-) create mode 100644 autopilot/contract_pruning.go rename autopilot/{contractorspending.go => contract_spending.go} (100%) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index e19d0dbbc..8de7f6ee1 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -43,15 +43,15 @@ jobs: autopilot bus bus/client worker worker/client - # - name: Test - # uses: n8maninger/action-golang-test@v1 - # with: - # args: "-race;-short" + - name: Test + uses: n8maninger/action-golang-test@v1 + with: + args: "-race;-short" - name: Test Integration uses: n8maninger/action-golang-test@v1 with: package: "./internal/testing/..." - args: "-race;-tags=testing;-timeout=180m;-count=100;-run=^TestParallelDownload$" + args: "-race;-tags='testing';-timeout=30m" - name: Test Integration - MySQL if: matrix.os == 'ubuntu-latest' uses: n8maninger/action-golang-test@v1 @@ -61,6 +61,6 @@ jobs: RENTERD_DB_PASSWORD: test with: package: "./internal/testing/..." - args: "-race;-tags=testing;-timeout=180m;-count=100;-run=^TestParallelDownload$" + args: "-race;-tags='testing';-timeout=30m" - name: Build run: go build -o bin/ ./cmd/renterd diff --git a/api/metrcis.go b/api/metrcis.go index 63c77365b..b782d4223 100644 --- a/api/metrcis.go +++ b/api/metrcis.go @@ -10,6 +10,7 @@ const ( ChurnDirAdded = "added" ChurnDirRemoved = "removed" + MetricContractPrune = "contractprune" MetricContractSet = "contractset" MetricContractSetChurn = "churn" MetricContract = "contract" @@ -75,9 +76,33 @@ type ( ContractID types.FileContractID HostKey types.PublicKey } + + ContractPruneMetric struct { + Timestamp time.Time `json:"timestamp"` + + ContractID types.FileContractID `json:"contractID"` + HostKey types.PublicKey `json:"hostKey"` + HostVersion string `json:"hostVersion"` + + Pruned uint64 `json:"pruned"` + Remaining uint64 `json:"remaining"` + Duration time.Duration `json:"duration"` + + Error string `json:"error,omitempty"` + } + + ContractPruneMetricsQueryOpts struct { + ContractID types.FileContractID + HostKey types.PublicKey + HostVersion string + } ) type ( + ContractPruneMetricRequestPUT struct { + Metrics []ContractPruneMetric `json:"metrics"` + } + ContractSetChurnMetricRequestPUT struct { Metrics []ContractSetChurnMetric `json:"metrics"` } diff --git a/autopilot/alerts.go b/autopilot/alerts.go index 4581f6b32..eb0a6a4fe 100644 --- a/autopilot/alerts.go +++ b/autopilot/alerts.go @@ -26,8 +26,8 @@ func alertIDForAccount(alertID [32]byte, id rhpv3.Account) types.Hash256 { return types.HashBytes(append(alertID[:], id[:]...)) } -func alertIDForContract(alertID [32]byte, contract api.ContractMetadata) types.Hash256 { - return types.HashBytes(append(alertID[:], contract.ID[:]...)) +func alertIDForContract(alertID [32]byte, fcid types.FileContractID) types.Hash256 { + return types.HashBytes(append(alertID[:], fcid[:]...)) } func alertIDForHost(alertID [32]byte, hk types.PublicKey) types.Hash256 { @@ -103,7 +103,7 @@ func newContractRenewalFailedAlert(contract api.ContractMetadata, interrupted bo } return alerts.Alert{ - ID: alertIDForContract(alertRenewalFailedID, contract), + ID: alertIDForContract(alertRenewalFailedID, contract.ID), Severity: severity, Message: "Contract renewal failed", Data: map[string]interface{}{ @@ -116,15 +116,15 @@ func newContractRenewalFailedAlert(contract api.ContractMetadata, interrupted bo } } -func newContractPruningFailedAlert(contract api.ContractMetadata, err error) alerts.Alert { - return alerts.Alert{ - ID: alertIDForContract(alertPruningID, contract), +func newContractPruningFailedAlert(hk types.PublicKey, fcid types.FileContractID, err error) *alerts.Alert { + return &alerts.Alert{ + ID: alertIDForContract(alertPruningID, fcid), Severity: alerts.SeverityWarning, Message: "Contract pruning failed", Data: map[string]interface{}{ "error": err.Error(), - "contractID": contract.ID.String(), - "hostKey": contract.HostKey.String(), + "contractID": fcid.String(), + "hostKey": hk.String(), }, Timestamp: time.Now(), } diff --git a/autopilot/autopilot.go b/autopilot/autopilot.go index 50494a987..740f9b724 100644 --- a/autopilot/autopilot.go +++ b/autopilot/autopilot.go @@ -63,6 +63,7 @@ type Bus interface { // metrics RecordContractSetChurnMetric(ctx context.Context, metrics ...api.ContractSetChurnMetric) error + RecordContractPruneMetric(ctx context.Context, metrics ...api.ContractPruneMetric) error // objects ObjectsBySlabKey(ctx context.Context, bucket string, key object.EncryptionKey) (objects []api.ObjectMetadata, err error) diff --git a/autopilot/contract_pruning.go b/autopilot/contract_pruning.go new file mode 100644 index 000000000..0f8b2a6d3 --- /dev/null +++ b/autopilot/contract_pruning.go @@ -0,0 +1,189 @@ +package autopilot + +import ( + "context" + "errors" + "fmt" + "time" + + "go.sia.tech/core/types" + "go.sia.tech/renterd/alerts" + "go.sia.tech/renterd/api" + "go.sia.tech/siad/build" +) + +var ( + errConnectionRefused = errors.New("connection refused") + errConnectionTimedOut = errors.New("connection timed out") + errInvalidHandshakeSignature = errors.New("host's handshake signature was invalid") + errInvalidMerkleProof = errors.New("host supplied invalid Merkle proof") + errNoRouteToHost = errors.New("no route to host") + errNoSuchHost = errors.New("no such host") +) + +type ( + pruneResult struct { + ts time.Time + + fcid types.FileContractID + hk types.PublicKey + version string + + pruned uint64 + remaining uint64 + duration time.Duration + + err error + } + + pruneMetrics []api.ContractPruneMetric +) + +func (pr pruneResult) String() string { + msg := fmt.Sprintf("contract %v, pruned %d bytes, remaining %d bytes, elapsed %v", pr.fcid, pr.pruned, pr.remaining, pr.duration) + if pr.err != nil { + msg += fmt.Sprintf(", err: %v", pr.err) + } + return msg +} + +func (pm pruneMetrics) String() string { + var total uint64 + for _, m := range pm { + total += m.Pruned + } + return fmt.Sprintf("pruned %d (%s) from %v contracts", total, humanReadableSize(int(total)), len(pm)) +} + +func (pr pruneResult) toAlert() (id types.Hash256, alert *alerts.Alert) { + id = alertIDForContract(alertPruningID, pr.fcid) + + if shouldTrigger := pr.err != nil && !((isErr(pr.err, errInvalidMerkleProof) && build.VersionCmp(pr.version, "1.6.0") < 0) || + isErr(pr.err, errConnectionRefused) || + isErr(pr.err, errConnectionTimedOut) || + isErr(pr.err, errInvalidHandshakeSignature) || + isErr(pr.err, errNoRouteToHost) || + isErr(pr.err, errNoSuchHost)); shouldTrigger { + alert = newContractPruningFailedAlert(pr.hk, pr.fcid, pr.err) + } + return +} + +func (pr pruneResult) toMetric() (metric api.ContractPruneMetric) { + metric = api.ContractPruneMetric{ + Timestamp: pr.ts, + ContractID: pr.fcid, + HostKey: pr.hk, + Pruned: pr.pruned, + Remaining: pr.remaining, + Duration: pr.duration, + } + if pr.err != nil { + metric.Error = pr.err.Error() + } + return +} + +func humanReadableSize(b int) string { + const unit = 1024 + if b < unit { + return fmt.Sprintf("%d B", b) + } + div, exp := int64(unit), 0 + for n := b / unit; n >= unit; n /= unit { + div *= unit + exp++ + } + return fmt.Sprintf("%.1f %ciB", + float64(b)/float64(div), "KMGTPE"[exp]) +} + +func (c *contractor) performContractPruning(wp *workerPool) { + c.logger.Info("performing contract pruning") + + // fetch prunable data + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + res, err := c.ap.bus.PrunableData(ctx) + cancel() + + // return early if we couldn't fetch the prunable data + if err != nil { + c.logger.Error(err) + return + } + + // return early if there's no prunable data + if res.TotalPrunable == 0 { + c.logger.Info("no contracts to prune") + return + } + + // prune every contract individually, one at a time and for a maximum + // duration of 'timeoutPruneContract' to limit the amount of time we lock + // the contract as contracts on old hosts can take a long time to prune + var metrics pruneMetrics + wp.withWorker(func(w Worker) { + for _, contract := range res.Contracts { + result := c.pruneContract(w, contract.ID) + if result.err != nil { + c.logger.Error(result) + } else { + c.logger.Info(result) + } + + // handle alert + id, alert := result.toAlert() + if alert != nil { + c.ap.RegisterAlert(ctx, *alert) + } else { + c.ap.DismissAlert(ctx, id) + } + + // handle metrics + metrics = append(metrics, result.toMetric()) + } + }) + + // record metrics + ctx, cancel = context.WithTimeout(context.Background(), time.Minute) + if err := c.ap.bus.RecordContractPruneMetric(ctx, metrics...); err != nil { + c.logger.Error(err) + } + cancel() + + // log metrics + c.logger.Info(metrics) +} + +func (c *contractor) pruneContract(w Worker, fcid types.FileContractID) pruneResult { + // create a sane timeout + ctx, cancel := context.WithTimeout(context.Background(), 2*timeoutPruneContract) + defer cancel() + + // fetch the host + host, contract, err := c.hostForContract(ctx, fcid) + if err != nil { + return pruneResult{err: err} + } + + // prune the contract + start := time.Now() + pruned, remaining, err := w.RHPPruneContract(ctx, fcid, timeoutPruneContract) + if err != nil && pruned == 0 { + return pruneResult{err: err} + } + + return pruneResult{ + ts: start, + + fcid: contract.ID, + hk: contract.HostKey, + version: host.Settings.Version, + + pruned: pruned, + remaining: remaining, + duration: time.Since(start), + + err: err, + } +} diff --git a/autopilot/contractorspending.go b/autopilot/contract_spending.go similarity index 100% rename from autopilot/contractorspending.go rename to autopilot/contract_spending.go diff --git a/autopilot/contractor.go b/autopilot/contractor.go index e93c74110..96bfaa850 100644 --- a/autopilot/contractor.go +++ b/autopilot/contractor.go @@ -21,7 +21,6 @@ import ( "go.sia.tech/renterd/tracing" "go.sia.tech/renterd/wallet" "go.sia.tech/renterd/worker" - "go.sia.tech/siad/build" "go.uber.org/zap" ) @@ -86,15 +85,6 @@ const ( timeoutPruneContract = 10 * time.Minute ) -var ( - errConnectionRefused = errors.New("connection refused") - errConnectionTimedOut = errors.New("connection timed out") - errInvalidHandshakeSignature = errors.New("host's handshake signature was invalid") - errInvalidMerkleProof = errors.New("host supplied invalid Merkle proof") - errNoRouteToHost = errors.New("no route to host") - errNoSuchHost = errors.New("no such host") -) - type ( contractor struct { ap *Autopilot @@ -1043,7 +1033,7 @@ func (c *contractor) runContractRenewals(ctx context.Context, w Worker, toRenew toKeep = append(toKeep, toRenew[i]) } } else { - c.ap.DismissAlert(ctx, alertIDForContract(alertRenewalFailedID, contract)) + c.ap.DismissAlert(ctx, alertIDForContract(alertRenewalFailedID, contract.ID)) renewals = append(renewals, renewal{from: contract.ID, to: renewed.ID, ci: toRenew[i]}) } @@ -1595,80 +1585,6 @@ func (c *contractor) tryPerformPruning(ctx context.Context, wp *workerPool) { }() } -func (c *contractor) performContractPruning(wp *workerPool) { - c.logger.Info("performing contract pruning") - - // fetch prunable data - ctx, cancel := context.WithTimeout(context.Background(), time.Minute) - res, err := c.ap.bus.PrunableData(ctx) - cancel() - - // return early if we couldn't fetch the prunable data - if err != nil { - c.logger.Error(err) - return - } - - // return early if there's no prunable data - if res.TotalPrunable == 0 { - c.logger.Info("no contracts to prune") - return - } - - // prune every contract individually, one at a time and for a maximum - // duration of 'timeoutPruneContract' to limit the amount of time we lock - // the contract as contracts on old hosts can take a long time to prune - var total uint64 - wp.withWorker(func(w Worker) { - for _, contract := range res.Contracts { - c.logger.Debugf("pruning %d bytes from contract %v", contract.Prunable, contract.ID) - start := time.Now() - pruned, remaining, err := c.pruneContract(w, contract.ID) - if err != nil { - if pruned == 0 { - c.logger.Errorf("pruning contract %v failed after %v, err: %v", contract.ID, time.Since(start), err) - } else { - c.logger.Debugf("pruning contract %v errored out, pruned %d bytes, remaining %d bytes, took %v", contract.ID, pruned, remaining, time.Since(start)) - } - } else { - c.logger.Debugf("pruning contract %v succeeded, pruned %d bytes, remaining %d bytes, took %v", contract.ID, pruned, remaining, time.Since(start)) - } - total += pruned - } - }) - - c.logger.Infof("pruned %d (%s) from %v contracts", total, humanReadableSize(int(total)), len(res.Contracts)) -} - -func (c *contractor) pruneContract(w Worker, fcid types.FileContractID) (pruned uint64, remaining uint64, err error) { - c.logger.Debugf("pruning contract %v", fcid) - - ctx, cancel := context.WithTimeout(context.Background(), 2*timeoutPruneContract) - defer cancel() - - // fetch the host - host, contract, err := c.hostForContract(ctx, fcid) - if err != nil { - return 0, 0, err - } - - // prune the contract - pruned, remaining, err = w.RHPPruneContract(ctx, fcid, timeoutPruneContract) - - // handle alert - if err != nil && !((isErr(err, errInvalidMerkleProof) && build.VersionCmp(host.Settings.Version, "1.6.0") < 0) || - isErr(err, errConnectionRefused) || - isErr(err, errConnectionTimedOut) || - isErr(err, errInvalidHandshakeSignature) || - isErr(err, errNoRouteToHost) || - isErr(err, errNoSuchHost)) { - c.ap.RegisterAlert(ctx, newContractPruningFailedAlert(contract, err)) - } else { - c.ap.DismissAlert(ctx, alertIDForContract(alertPruningID, contract)) - } - return -} - func (c *contractor) hostForContract(ctx context.Context, fcid types.FileContractID) (host hostdb.HostInfo, metadata api.ContractMetadata, err error) { // fetch the contract metadata, err = c.ap.bus.Contract(ctx, fcid) @@ -1692,20 +1608,6 @@ func endHeight(cfg api.AutopilotConfig, currentPeriod uint64) uint64 { return currentPeriod + cfg.Contracts.Period + cfg.Contracts.RenewWindow } -func humanReadableSize(b int) string { - const unit = 1024 - if b < unit { - return fmt.Sprintf("%d B", b) - } - div, exp := int64(unit), 0 - for n := b / unit; n >= unit; n /= unit { - div *= unit - exp++ - } - return fmt.Sprintf("%.1f %ciB", - float64(b)/float64(div), "KMGTPE"[exp]) -} - func initialContractFunding(settings rhpv2.HostSettings, txnFee, min, max types.Currency) types.Currency { if !max.IsZero() && min.Cmp(max) > 0 { panic("given min is larger than max") // developer error diff --git a/bus/bus.go b/bus/bus.go index d1ffb5e37..88a3ba477 100644 --- a/bus/bus.go +++ b/bus/bus.go @@ -196,6 +196,9 @@ type ( MetricsStore interface { ContractSetMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.ContractSetMetricsQueryOpts) ([]api.ContractSetMetric, error) + ContractPruneMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.ContractPruneMetricsQueryOpts) ([]api.ContractPruneMetric, error) + RecordContractPruneMetric(ctx context.Context, metrics ...api.ContractPruneMetric) error + ContractMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.ContractMetricsQueryOpts) ([]api.ContractMetric, error) RecordContractMetric(ctx context.Context, metrics ...api.ContractMetric) error @@ -1993,11 +1996,24 @@ func (b *bus) webhookHandlerPost(jc jape.Context) { } func (b *bus) metricsHandlerPUT(jc jape.Context) { + jc.Custom((*interface{})(nil), nil) + key := jc.PathParam("key") switch key { + case api.MetricContractPrune: + // TODO: jape hack - remove once jape can handle decoding multiple different request types + var req api.ContractPruneMetricRequestPUT + if err := json.NewDecoder(jc.Request.Body).Decode(&req); err != nil { + jc.Error(fmt.Errorf("couldn't decode request type (%T): %w", req, err), http.StatusBadRequest) + return + } else if jc.Check("failed to record contract prune metric", b.mtrcs.RecordContractPruneMetric(jc.Request.Context(), req.Metrics...)) != nil { + return + } case api.MetricContractSetChurn: + // TODO: jape hack - remove once jape can handle decoding multiple different request types var req api.ContractSetChurnMetricRequestPUT - if jc.Decode(&req) != nil { + if err := json.NewDecoder(jc.Request.Body).Decode(&req); err != nil { + jc.Error(fmt.Errorf("couldn't decode request type (%T): %w", req, err), http.StatusBadRequest) return } else if jc.Check("failed to record contract churn metric", b.mtrcs.RecordContractSetChurnMetric(jc.Request.Context(), req.Metrics...)) != nil { return @@ -2025,9 +2041,9 @@ func (b *bus) metricsHandlerGET(jc jape.Context) { switch key := jc.PathParam("key"); key { case api.MetricContract: var opts api.ContractMetricsQueryOpts - if jc.DecodeForm("fcid", &opts.ContractID) != nil { + if jc.DecodeForm("contractID", &opts.ContractID) != nil { return - } else if jc.DecodeForm("host", &opts.HostKey) != nil { + } else if jc.DecodeForm("hostKey", &opts.HostKey) != nil { return } else if metrics, err := b.metrics(jc.Request.Context(), key, start, n, interval, opts); jc.Check("failed to get contract metrics", err) != nil { return @@ -2035,6 +2051,20 @@ func (b *bus) metricsHandlerGET(jc jape.Context) { jc.Encode(metrics) return } + case api.MetricContractPrune: + var opts api.ContractPruneMetricsQueryOpts + if jc.DecodeForm("contractID", &opts.ContractID) != nil { + return + } else if jc.DecodeForm("hostKey", &opts.HostKey) != nil { + return + } else if jc.DecodeForm("hostVersion", &opts.HostVersion) != nil { + return + } else if metrics, err := b.metrics(jc.Request.Context(), key, start, n, interval, opts); jc.Check("failed to get contract prune metrics", err) != nil { + return + } else { + jc.Encode(metrics) + return + } case api.MetricContractSet: var opts api.ContractSetMetricsQueryOpts if jc.DecodeForm("name", &opts.Name) != nil { diff --git a/bus/client/metrics.go b/bus/client/metrics.go index f8b1e4270..b9f5eb64c 100644 --- a/bus/client/metrics.go +++ b/bus/client/metrics.go @@ -1,6 +1,7 @@ package client import ( + "bytes" "context" "encoding/json" "errors" @@ -33,6 +34,28 @@ func (c *Client) ContractMetrics(ctx context.Context, start time.Time, n uint64, return resp, nil } +func (c *Client) ContractPruneMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.ContractPruneMetricsQueryOpts) ([]api.ContractPruneMetric, error) { + values := url.Values{} + values.Set("start", api.TimeRFC3339(start).String()) + values.Set("n", fmt.Sprint(n)) + values.Set("interval", api.DurationMS(interval).String()) + if opts.ContractID != (types.FileContractID{}) { + values.Set("fcid", opts.ContractID.String()) + } + if opts.HostKey != (types.PublicKey{}) { + values.Set("hostKey", opts.HostKey.String()) + } + if opts.HostVersion != "" { + values.Set("hostVersion", opts.HostVersion) + } + + var resp []api.ContractPruneMetric + if err := c.metric(ctx, api.MetricContractPrune, values, &resp); err != nil { + return nil, err + } + return resp, nil +} + func (c *Client) ContractSetChurnMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.ContractSetChurnMetricsQueryOpts) ([]api.ContractSetChurnMetric, error) { values := url.Values{} values.Set("start", api.TimeRFC3339(start).String()) @@ -72,9 +95,41 @@ func (c *Client) ContractSetMetrics(ctx context.Context, start time.Time, n uint } func (c *Client) RecordContractSetChurnMetric(ctx context.Context, metrics ...api.ContractSetChurnMetric) error { - return c.c.WithContext(ctx).PUT(fmt.Sprintf("/metric/%s", api.MetricContractSetChurn), api.ContractSetChurnMetricRequestPUT{ - Metrics: metrics, - }) + return c.recordMetric(ctx, api.MetricContractSetChurn, api.ContractSetChurnMetricRequestPUT{Metrics: metrics}) +} + +func (c *Client) RecordContractPruneMetric(ctx context.Context, metrics ...api.ContractPruneMetric) error { + return c.recordMetric(ctx, api.MetricContractPrune, api.ContractPruneMetricRequestPUT{Metrics: metrics}) +} + +func (c *Client) recordMetric(ctx context.Context, key string, d interface{}) error { + c.c.Custom("PUT", fmt.Sprintf("/metric/%s", key), (interface{})(nil), nil) + + js, err := json.Marshal(d) + if err != nil { + return err + } + + u, err := url.Parse(fmt.Sprintf("%s/metric/%s", c.c.BaseURL, key)) + if err != nil { + panic(err) + } + req, err := http.NewRequestWithContext(ctx, "PUT", u.String(), bytes.NewReader(js)) + if err != nil { + panic(err) + } + req.SetBasicAuth("", c.c.WithContext(ctx).Password) + resp, err := http.DefaultClient.Do(req) + if err != nil { + return err + } + defer io.Copy(io.Discard, resp.Body) + defer resp.Body.Close() + if resp.StatusCode != 200 { + err, _ := io.ReadAll(resp.Body) + return errors.New(string(err)) + } + return nil } func (c *Client) metric(ctx context.Context, key string, values url.Values, res interface{}) error { diff --git a/stores/metrics.go b/stores/metrics.go index 5fca763f6..f6ab7fa56 100644 --- a/stores/metrics.go +++ b/stores/metrics.go @@ -39,6 +39,26 @@ type ( ListSpendingHi unsigned64 `gorm:"index:idx_list_spending;NOT NULL"` } + // dbContractPruneMetric tracks information about contract pruning. Such as + // the number of bytes pruned, how much data there is left to prune and how + // long it took, along with potential errors that occurred while trying to + // prune the contract. + dbContractPruneMetric struct { + Model + + Timestamp unixTimeMS `gorm:"index;NOT NULL"` + + FCID fileContractID `gorm:"index;size:32;NOT NULL;column:fcid"` + Host publicKey `gorm:"index;size:32;NOT NULL"` + HostVersion string `gorm:"index"` + + Pruned unsigned64 `gorm:"index;NOT NULL"` + Remaining unsigned64 `gorm:"index;NOT NULL"` + Duration time.Duration `gorm:"index;NOT NULL"` + + Error string `gorm:"index"` + } + // dbContractSetMetric tracks information about a specific contract set. // Such as the number of contracts it contains. Intended to be reported by // the bus every time the set is updated. @@ -78,6 +98,7 @@ type ( ) func (dbContractMetric) TableName() string { return "contracts" } +func (dbContractPruneMetric) TableName() string { return "contract_prunes" } func (dbContractSetMetric) TableName() string { return "contract_sets" } func (dbContractSetChurnMetric) TableName() string { return "contract_sets_churn" } func (dbPerformanceMetric) TableName() string { return "performance" } @@ -327,3 +348,69 @@ func (s *SQLStore) RecordContractMetric(ctx context.Context, metrics ...api.Cont } return s.dbMetrics.Create(&dbMetrics).Error } + +func (s *SQLStore) ContractPruneMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.ContractPruneMetricsQueryOpts) ([]api.ContractPruneMetric, error) { + metrics, err := s.contractPruneMetrics(ctx, start, n, interval, opts) + if err != nil { + return nil, err + } + + resp := make([]api.ContractPruneMetric, len(metrics)) + for i := range resp { + resp[i] = api.ContractPruneMetric{ + Timestamp: time.Time(metrics[i].Timestamp).UTC(), + + ContractID: types.FileContractID(metrics[i].FCID), + HostKey: types.PublicKey(metrics[i].Host), + HostVersion: metrics[i].HostVersion, + + Pruned: uint64(metrics[i].Pruned), + Remaining: uint64(metrics[i].Remaining), + Duration: metrics[i].Duration, + + Error: metrics[i].Error, + } + } + return resp, nil +} + +func (s *SQLStore) RecordContractPruneMetric(ctx context.Context, metrics ...api.ContractPruneMetric) error { + dbMetrics := make([]dbContractPruneMetric, len(metrics)) + for i, metric := range metrics { + dbMetrics[i] = dbContractPruneMetric{ + Timestamp: unixTimeMS(metric.Timestamp), + + FCID: fileContractID(metric.ContractID), + Host: publicKey(metric.HostKey), + HostVersion: metric.HostVersion, + + Pruned: unsigned64(metric.Pruned), + Remaining: unsigned64(metric.Remaining), + Duration: metric.Duration, + + Error: metric.Error, + } + } + return s.dbMetrics.Create(&dbMetrics).Error +} + +func (s *SQLStore) contractPruneMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.ContractPruneMetricsQueryOpts) ([]dbContractPruneMetric, error) { + tx := s.dbMetrics + if opts.ContractID != (types.FileContractID{}) { + tx = tx.Where("fcid", fileContractID(opts.ContractID)) + } + if opts.HostKey != (types.PublicKey{}) { + tx = tx.Where("host", publicKey(opts.HostKey)) + } + if opts.HostVersion != "" { + tx = tx.Where("host_version", opts.HostVersion) + } + + var metrics []dbContractPruneMetric + err := s.findPeriods(tx, &metrics, start, n, interval) + if err != nil { + return nil, fmt.Errorf("failed to fetch contract metrics: %w", err) + } + + return metrics, nil +} diff --git a/stores/metrics_test.go b/stores/metrics_test.go index ccb172f24..281e28b1f 100644 --- a/stores/metrics_test.go +++ b/stores/metrics_test.go @@ -13,6 +13,87 @@ import ( "lukechampine.com/frand" ) +func TestContractPruneMetrics(t *testing.T) { + ss := newTestSQLStore(t, defaultTestSQLStoreConfig) + defer ss.Close() + + // add some metrics + hosts := []types.PublicKey{{1}, {2}} + hostVersions := []string{"1.5.9", "1.6.0"} + times := []time.Time{time.UnixMilli(3), time.UnixMilli(1), time.UnixMilli(2)} + var i byte + fcid2Metric := make(map[types.FileContractID]api.ContractPruneMetric) + for hi, host := range hosts { + for _, recordedTime := range times { + metric := api.ContractPruneMetric{ + Timestamp: recordedTime, + + ContractID: types.FileContractID{i}, + HostKey: host, + HostVersion: hostVersions[hi], + + Pruned: math.MaxUint64, + Remaining: math.MaxUint64, + Duration: time.Second, + + Error: "a", + } + fcid2Metric[metric.ContractID] = metric + if err := ss.RecordContractPruneMetric(context.Background(), metric); err != nil { + t.Fatal(err) + } + i++ + } + } + + assertMetrics := func(start time.Time, n uint64, interval time.Duration, opts api.ContractPruneMetricsQueryOpts, expected int, cmpFn func(api.ContractPruneMetric)) { + t.Helper() + metrics, err := ss.ContractPruneMetrics(context.Background(), start, n, interval, opts) + if err != nil { + t.Fatal(err) + } + if len(metrics) != expected { + t.Fatalf("expected %v metrics, got %v", expected, len(metrics)) + } else if !sort.SliceIsSorted(metrics, func(i, j int) bool { + return time.Time(metrics[i].Timestamp).Before(time.Time(metrics[j].Timestamp)) + }) { + t.Fatal("expected metrics to be sorted by time") + } + for _, m := range metrics { + if !cmp.Equal(m, fcid2Metric[m.ContractID]) { + t.Fatal("unexpected metric", cmp.Diff(m, fcid2Metric[m.ContractID])) + } + cmpFn(m) + } + } + + // Query without any filters. + start := time.UnixMilli(1) + assertMetrics(start, 3, time.Millisecond, api.ContractPruneMetricsQueryOpts{}, 3, func(m api.ContractPruneMetric) {}) + + // Query by host. + assertMetrics(start, 3, time.Millisecond, api.ContractPruneMetricsQueryOpts{HostKey: hosts[0]}, 3, func(m api.ContractPruneMetric) { + if m.HostKey != hosts[0] { + t.Fatalf("expected host to be %v, got %v", hosts[0], m.HostKey) + } + }) + + // Query by host version. + assertMetrics(start, 3, time.Millisecond, api.ContractPruneMetricsQueryOpts{HostVersion: hostVersions[0]}, 3, func(m api.ContractPruneMetric) { + if m.HostKey != hosts[0] { + t.Fatalf("expected host to be %v, got %v", hosts[0], m.HostKey) + } + }) + + // Query by fcid. + fcid := types.FileContractID{2} + assertMetrics(start, 3, time.Millisecond, api.ContractPruneMetricsQueryOpts{ContractID: fcid}, 1, func(m api.ContractPruneMetric) { + if m.ContractID != fcid { + t.Fatalf("expected fcid to be %v, got %v", fcid, m.ContractID) + } + }) +} + func TestContractSetMetrics(t *testing.T) { testStart := time.Now() ss := newTestSQLStore(t, defaultTestSQLStoreConfig) diff --git a/stores/migrations.go b/stores/migrations.go index b3302e44e..7fe66767b 100644 --- a/stores/migrations.go +++ b/stores/migrations.go @@ -54,6 +54,7 @@ var ( } metricsTables = []interface{}{ &dbContractMetric{}, + &dbContractPruneMetric{}, &dbContractSetMetric{}, &dbContractSetChurnMetric{}, &dbPerformanceMetric{}, @@ -368,7 +369,16 @@ func performMigrations(db *gorm.DB, logger *zap.SugaredLogger) error { } func performMetricsMigrations(db *gorm.DB, logger *zap.SugaredLogger) error { - migrations := []*gormigrate.Migration{} + migrations := []*gormigrate.Migration{ + { + ID: "00001_contract_prune_metrics", + Migrate: func(tx *gorm.DB) error { + return performMigration00001_contract_prune_metrics(tx, logger) + }, + Rollback: nil, + }, + } + // Create migrator. m := gormigrate.New(db, gormigrate.DefaultOptions, migrations) @@ -382,6 +392,15 @@ func performMetricsMigrations(db *gorm.DB, logger *zap.SugaredLogger) error { return nil } +func performMigration00001_contract_prune_metrics(txn *gorm.DB, logger *zap.SugaredLogger) error { + logger.Info("performing migration 00001_contract_prune_metrics") + if err := txn.Migrator().AutoMigrate(&dbContractPruneMetric{}); err != nil { + return err + } + logger.Info("migration 00001_contract_prune_metrics complete") + return nil +} + // initSchema is executed only on a clean database. Otherwise the individual // migrations are executed. func initSchema(tx *gorm.DB) error { From 379230e09a8c4056db4d331e5103addd47978fa5 Mon Sep 17 00:00:00 2001 From: PJ Date: Wed, 29 Nov 2023 15:28:52 +0100 Subject: [PATCH 02/12] worker: add prunable --- api/worker.go | 1 + autopilot/autopilot.go | 2 +- autopilot/contract_pruning.go | 6 ++++-- worker/client/rhp.go | 3 ++- worker/rhpv2.go | 6 +++++- worker/worker.go | 3 ++- 6 files changed, 15 insertions(+), 6 deletions(-) diff --git a/api/worker.go b/api/worker.go index bce931639..dc9b37cc6 100644 --- a/api/worker.go +++ b/api/worker.go @@ -92,6 +92,7 @@ type ( // RHPPruneContractResponse is the response type for the /rhp/contract/:id/prune // endpoint. RHPPruneContractResponse struct { + Prunable uint64 `json:"prunable"` Pruned uint64 `json:"pruned"` Remaining uint64 `json:"remaining"` Error string `json:"error,omitempty"` diff --git a/autopilot/autopilot.go b/autopilot/autopilot.go index 740f9b724..c61721a33 100644 --- a/autopilot/autopilot.go +++ b/autopilot/autopilot.go @@ -101,7 +101,7 @@ type Worker interface { RHPForm(ctx context.Context, endHeight uint64, hk types.PublicKey, hostIP string, renterAddress types.Address, renterFunds types.Currency, hostCollateral types.Currency) (rhpv2.ContractRevision, []types.Transaction, error) RHPFund(ctx context.Context, contractID types.FileContractID, hostKey types.PublicKey, hostIP, siamuxAddr string, balance types.Currency) (err error) RHPPriceTable(ctx context.Context, hostKey types.PublicKey, siamuxAddr string, timeout time.Duration) (hostdb.HostPriceTable, error) - RHPPruneContract(ctx context.Context, fcid types.FileContractID, timeout time.Duration) (pruned, remaining uint64, err error) + RHPPruneContract(ctx context.Context, fcid types.FileContractID, timeout time.Duration) (prunable, pruned, remaining uint64, err error) RHPRenew(ctx context.Context, fcid types.FileContractID, endHeight uint64, hk types.PublicKey, hostIP string, hostAddress, renterAddress types.Address, renterFunds, newCollateral types.Currency, windowSize uint64) (rhpv2.ContractRevision, []types.Transaction, error) RHPScan(ctx context.Context, hostKey types.PublicKey, hostIP string, timeout time.Duration) (api.RHPScanResponse, error) RHPSync(ctx context.Context, contractID types.FileContractID, hostKey types.PublicKey, hostIP, siamuxAddr string) (err error) diff --git a/autopilot/contract_pruning.go b/autopilot/contract_pruning.go index fe7f73c4b..e201a20fd 100644 --- a/autopilot/contract_pruning.go +++ b/autopilot/contract_pruning.go @@ -29,6 +29,7 @@ type ( hk types.PublicKey version string + prunable uint64 pruned uint64 remaining uint64 duration time.Duration @@ -40,7 +41,7 @@ type ( ) func (pr pruneResult) String() string { - msg := fmt.Sprintf("contract %v, pruned %d bytes, remaining %d bytes, elapsed %v", pr.fcid, pr.pruned, pr.remaining, pr.duration) + msg := fmt.Sprintf("contract %v, prunable %d bytes, pruned %d bytes, remaining %d bytes, elapsed %v", pr.fcid, pr.prunable, pr.pruned, pr.remaining, pr.duration) if pr.err != nil { msg += fmt.Sprintf(", err: %v", pr.err) } @@ -175,7 +176,7 @@ func (c *contractor) pruneContract(w Worker, fcid types.FileContractID) pruneRes // prune the contract start := time.Now() - pruned, remaining, err := w.RHPPruneContract(ctx, fcid, timeoutPruneContract) + prunable, pruned, remaining, err := w.RHPPruneContract(ctx, fcid, timeoutPruneContract) if err != nil && pruned == 0 { return pruneResult{err: err} } else if err != nil && isErr(err, context.DeadlineExceeded) { @@ -189,6 +190,7 @@ func (c *contractor) pruneContract(w Worker, fcid types.FileContractID) pruneRes hk: contract.HostKey, version: host.Settings.Version, + prunable: prunable, pruned: pruned, remaining: remaining, duration: time.Since(start), diff --git a/worker/client/rhp.go b/worker/client/rhp.go index 0a263a20a..64da44eb1 100644 --- a/worker/client/rhp.go +++ b/worker/client/rhp.go @@ -65,7 +65,7 @@ func (c *Client) RHPPriceTable(ctx context.Context, hostKey types.PublicKey, sia } // RHPPruneContract prunes deleted sectors from the contract with given id. -func (c *Client) RHPPruneContract(ctx context.Context, contractID types.FileContractID, timeout time.Duration) (pruned, remaining uint64, err error) { +func (c *Client) RHPPruneContract(ctx context.Context, contractID types.FileContractID, timeout time.Duration) (prunable, pruned, remaining uint64, err error) { var res api.RHPPruneContractResponse if err = c.c.WithContext(ctx).POST(fmt.Sprintf("/rhp/contract/%s/prune", contractID), api.RHPPruneContractRequest{ Timeout: api.DurationMS(timeout), @@ -75,6 +75,7 @@ func (c *Client) RHPPruneContract(ctx context.Context, contractID types.FileCont err = errors.New(res.Error) } + prunable = res.Prunable pruned = res.Pruned remaining = res.Remaining return diff --git a/worker/rhpv2.go b/worker/rhpv2.go index ec592d9aa..046828298 100644 --- a/worker/rhpv2.go +++ b/worker/rhpv2.go @@ -48,6 +48,10 @@ var ( // question has reached its maximum revision number, meaning the contract // can no longer be revised. ErrContractFinalized = errors.New("contract cannot be revised further") + + // ErrNoSectorsToPrune is returned when we try to prune a contract that has + // no sectors to prune. + ErrNoSectorsToPrune = errors.New("no sectors to prune") ) // A HostErrorSet is a collection of errors from various hosts. @@ -300,7 +304,7 @@ func (w *worker) PruneContract(ctx context.Context, hostIP string, hostKey types indices = append(indices, uint64(i)) } if len(indices) == 0 { - return fmt.Errorf("no sectors to prune, database holds %d (%d pending), contract contains %d", len(want)+len(pending), len(pending), len(got)) + return fmt.Errorf("%w: database holds %d (%d pending), contract contains %d", ErrNoSectorsToPrune, len(want)+len(pending), len(pending), len(got)) } // delete the roots from the contract diff --git a/worker/worker.go b/worker/worker.go index aeb3e734e..56f89defd 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -663,13 +663,14 @@ func (w *worker) rhpPruneContractHandlerPOST(jc jape.Context) { // prune the contract pruned, remaining, err := w.PruneContract(ctx, contract.HostIP, contract.HostKey, fcid, contract.RevisionNumber) - if err != nil && pruned == 0 { + if err != nil && !errors.Is(err, ErrNoSectorsToPrune) && pruned == 0 { err = fmt.Errorf("failed to prune contract %v; %w", fcid, err) jc.Error(err, http.StatusInternalServerError) return } res := api.RHPPruneContractResponse{ + Prunable: size.Prunable, Pruned: pruned, Remaining: remaining, } From c02e1f0b69b37fe3f36658b7f41169c254bd627d Mon Sep 17 00:00:00 2001 From: PJ Date: Wed, 29 Nov 2023 16:23:09 +0100 Subject: [PATCH 03/12] autopilot: update alert --- autopilot/alerts.go | 25 ++++++++++++++++--------- autopilot/contract_pruning.go | 16 ++++++++++++---- 2 files changed, 28 insertions(+), 13 deletions(-) diff --git a/autopilot/alerts.go b/autopilot/alerts.go index eb0a6a4fe..7b42991e1 100644 --- a/autopilot/alerts.go +++ b/autopilot/alerts.go @@ -116,16 +116,23 @@ func newContractRenewalFailedAlert(contract api.ContractMetadata, interrupted bo } } -func newContractPruningFailedAlert(hk types.PublicKey, fcid types.FileContractID, err error) *alerts.Alert { +func newContractPruningFailedAlert(hk types.PublicKey, version string, fcid types.FileContractID, err error) *alerts.Alert { + data := map[string]interface{}{"error": err.Error()} + if hk != (types.PublicKey{}) { + data["hostKey"] = hk.String() + } + if version != "" { + data["hostVersion"] = version + } + if fcid != (types.FileContractID{}) { + data["contractID"] = fcid.String() + } + return &alerts.Alert{ - ID: alertIDForContract(alertPruningID, fcid), - Severity: alerts.SeverityWarning, - Message: "Contract pruning failed", - Data: map[string]interface{}{ - "error": err.Error(), - "contractID": fcid.String(), - "hostKey": hk.String(), - }, + ID: alertIDForContract(alertPruningID, fcid), + Severity: alerts.SeverityWarning, + Message: "Contract pruning failed", + Data: data, Timestamp: time.Now(), } } diff --git a/autopilot/contract_pruning.go b/autopilot/contract_pruning.go index e201a20fd..98bf1e232 100644 --- a/autopilot/contract_pruning.go +++ b/autopilot/contract_pruning.go @@ -65,7 +65,7 @@ func (pr pruneResult) toAlert() (id types.Hash256, alert *alerts.Alert) { isErr(pr.err, errInvalidHandshakeSignature) || isErr(pr.err, errNoRouteToHost) || isErr(pr.err, errNoSuchHost)); shouldTrigger { - alert = newContractPruningFailedAlert(pr.hk, pr.fcid, pr.err) + alert = newContractPruningFailedAlert(pr.hk, pr.version, pr.fcid, pr.err) } return } @@ -171,14 +171,22 @@ func (c *contractor) pruneContract(w Worker, fcid types.FileContractID) pruneRes // fetch the host host, contract, err := c.hostForContract(ctx, fcid) if err != nil { - return pruneResult{err: err} + return pruneResult{ + fcid: fcid, + err: err, + } } // prune the contract start := time.Now() prunable, pruned, remaining, err := w.RHPPruneContract(ctx, fcid, timeoutPruneContract) if err != nil && pruned == 0 { - return pruneResult{err: err} + return pruneResult{ + fcid: fcid, + hk: contract.HostKey, + version: host.Settings.Version, + err: err, + } } else if err != nil && isErr(err, context.DeadlineExceeded) { err = nil } @@ -186,7 +194,7 @@ func (c *contractor) pruneContract(w Worker, fcid types.FileContractID) pruneRes return pruneResult{ ts: start, - fcid: contract.ID, + fcid: fcid, hk: contract.HostKey, version: host.Settings.Version, From 2d74e9cf509a5735aaad10b8c369cb30d48a1794 Mon Sep 17 00:00:00 2001 From: PJ Date: Wed, 29 Nov 2023 16:37:16 +0100 Subject: [PATCH 04/12] contractor: add host version --- autopilot/contract_pruning.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/autopilot/contract_pruning.go b/autopilot/contract_pruning.go index 98bf1e232..243db548a 100644 --- a/autopilot/contract_pruning.go +++ b/autopilot/contract_pruning.go @@ -41,7 +41,7 @@ type ( ) func (pr pruneResult) String() string { - msg := fmt.Sprintf("contract %v, prunable %d bytes, pruned %d bytes, remaining %d bytes, elapsed %v", pr.fcid, pr.prunable, pr.pruned, pr.remaining, pr.duration) + msg := fmt.Sprintf("contract %v, prunable %d bytes, pruned %d bytes, remaining %d bytes, elapsed %v, host version %v", pr.fcid, pr.prunable, pr.pruned, pr.remaining, pr.duration, pr.version) if pr.err != nil { msg += fmt.Sprintf(", err: %v", pr.err) } From f9ff83b76219a125e9231e4177f41ac8cb086807 Mon Sep 17 00:00:00 2001 From: PJ Date: Wed, 29 Nov 2023 17:19:10 +0100 Subject: [PATCH 05/12] autopilot: add stop ctx --- autopilot/autopilot.go | 131 ++++++++++++---------------------- autopilot/contract_pruning.go | 7 +- autopilot/scanner_test.go | 7 +- autopilot/workerpool.go | 63 ++++++++++++++++ 4 files changed, 119 insertions(+), 89 deletions(-) create mode 100644 autopilot/workerpool.go diff --git a/autopilot/autopilot.go b/autopilot/autopilot.go index c61721a33..77fd67d63 100644 --- a/autopilot/autopilot.go +++ b/autopilot/autopilot.go @@ -24,7 +24,6 @@ import ( "go.sia.tech/renterd/wallet" "go.sia.tech/renterd/webhooks" "go.uber.org/zap" - "lukechampine.com/frand" ) type Bus interface { @@ -91,22 +90,6 @@ type Bus interface { WalletRedistribute(ctx context.Context, outputs int, amount types.Currency) (id types.TransactionID, err error) } -type Worker interface { - Account(ctx context.Context, hostKey types.PublicKey) (rhpv3.Account, error) - Contracts(ctx context.Context, hostTimeout time.Duration) (api.ContractsResponse, error) - ID(ctx context.Context) (string, error) - MigrateSlab(ctx context.Context, s object.Slab, set string) (api.MigrateSlabResponse, error) - - RHPBroadcast(ctx context.Context, fcid types.FileContractID) (err error) - RHPForm(ctx context.Context, endHeight uint64, hk types.PublicKey, hostIP string, renterAddress types.Address, renterFunds types.Currency, hostCollateral types.Currency) (rhpv2.ContractRevision, []types.Transaction, error) - RHPFund(ctx context.Context, contractID types.FileContractID, hostKey types.PublicKey, hostIP, siamuxAddr string, balance types.Currency) (err error) - RHPPriceTable(ctx context.Context, hostKey types.PublicKey, siamuxAddr string, timeout time.Duration) (hostdb.HostPriceTable, error) - RHPPruneContract(ctx context.Context, fcid types.FileContractID, timeout time.Duration) (prunable, pruned, remaining uint64, err error) - RHPRenew(ctx context.Context, fcid types.FileContractID, endHeight uint64, hk types.PublicKey, hostIP string, hostAddress, renterAddress types.Address, renterFunds, newCollateral types.Currency, windowSize uint64) (rhpv2.ContractRevision, []types.Transaction, error) - RHPScan(ctx context.Context, hostKey types.PublicKey, hostIP string, timeout time.Duration) (api.RHPScanResponse, error) - RHPSync(ctx context.Context, contractID types.FileContractID, hostKey types.PublicKey, hostIP, siamuxAddr string) (err error) -} - type Autopilot struct { id string @@ -126,11 +109,13 @@ type Autopilot struct { stateMu sync.Mutex state state - startStopMu sync.Mutex - startTime time.Time - stopChan chan struct{} - ticker *time.Ticker - triggerChan chan bool + startStopMu sync.Mutex + startTime time.Time + stopCtx context.Context + stopCtxCancel context.CancelFunc + stopChan chan struct{} + ticker *time.Ticker + triggerChan chan bool } // state holds a bunch of variables that are used by the autopilot and updated @@ -144,36 +129,35 @@ type state struct { period uint64 } -// workerPool contains all workers known to the autopilot. Users can call -// withWorker to execute a function with a worker of the pool or withWorkers to -// sequentially run a function on all workers. Due to the RWMutex this will -// never block during normal operations. However, during an update of the -// workerpool, this allows us to guarantee that all workers have finished their -// tasks by calling acquiring an exclusive lock on the pool before updating it. -// That way the caller who updated the pool can rely on the autopilot not using -// a worker that was removed during the update after the update operation -// returns. -type workerPool struct { - mu sync.RWMutex - workers []Worker -} +// New initializes an Autopilot. +func New(id string, bus Bus, workers []Worker, logger *zap.Logger, heartbeat time.Duration, scannerScanInterval time.Duration, scannerBatchSize, scannerNumThreads uint64, migrationHealthCutoff float64, accountsRefillInterval time.Duration, revisionSubmissionBuffer, migratorParallelSlabsPerWorker uint64, revisionBroadcastInterval time.Duration) (*Autopilot, error) { + ap := &Autopilot{ + alerts: alerts.WithOrigin(bus, fmt.Sprintf("autopilot.%s", id)), + id: id, + bus: bus, + logger: logger.Sugar().Named(api.DefaultAutopilotID), + workers: newWorkerPool(workers), -func newWorkerPool(workers []Worker) *workerPool { - return &workerPool{ - workers: workers, + tickerDuration: heartbeat, + } + scanner, err := newScanner( + ap, + scannerBatchSize, + scannerNumThreads, + scannerScanInterval, + scannerTimeoutInterval, + scannerTimeoutMinTimeout, + ) + if err != nil { + return nil, err } -} -func (wp *workerPool) withWorker(workerFunc func(Worker)) { - wp.mu.RLock() - defer wp.mu.RUnlock() - workerFunc(wp.workers[frand.Intn(len(wp.workers))]) -} + ap.s = scanner + ap.c = newContractor(ap, revisionSubmissionBuffer, revisionBroadcastInterval) + ap.m = newMigrator(ap, migrationHealthCutoff, migratorParallelSlabsPerWorker) + ap.a = newAccounts(ap, ap.bus, ap.bus, ap.workers, ap.logger, accountsRefillInterval) -func (wp *workerPool) withWorkers(workerFunc func([]Worker)) { - wp.mu.RLock() - defer wp.mu.RUnlock() - workerFunc(wp.workers) + return ap, nil } // Handler returns an HTTP handler that serves the autopilot api. @@ -194,6 +178,7 @@ func (ap *Autopilot) Run() error { ap.startStopMu.Unlock() return errors.New("already running") } + ap.stopCtx, ap.stopCtxCancel = context.WithCancel(context.Background()) ap.startTime = time.Now() ap.stopChan = make(chan struct{}) ap.triggerChan = make(chan bool, 1) @@ -338,12 +323,19 @@ func (ap *Autopilot) Shutdown(_ context.Context) error { ap.ticker.Stop() close(ap.stopChan) close(ap.triggerChan) + ap.stopCtxCancel() ap.wg.Wait() ap.startTime = time.Time{} } return nil } +func (ap *Autopilot) StartTime() time.Time { + ap.startStopMu.Lock() + defer ap.startStopMu.Unlock() + return ap.startTime +} + func (ap *Autopilot) State() state { ap.stateMu.Lock() defer ap.stateMu.Unlock() @@ -362,12 +354,6 @@ func (ap *Autopilot) Trigger(forceScan bool) bool { } } -func (ap *Autopilot) StartTime() time.Time { - ap.startStopMu.Lock() - defer ap.startStopMu.Unlock() - return ap.startTime -} - func (ap *Autopilot) Uptime() (dur time.Duration) { ap.startStopMu.Lock() defer ap.startStopMu.Unlock() @@ -585,6 +571,12 @@ func (ap *Autopilot) updateState(ctx context.Context) error { return nil } +func (ap *Autopilot) withTimeout(fn func(ctx context.Context), timeout time.Duration) { + ctx, cancel := context.WithTimeout(ap.stopCtx, timeout) + defer cancel() + fn(ctx) +} + func (ap *Autopilot) isStopped() bool { select { case <-ap.stopChan: @@ -643,37 +635,6 @@ func (ap *Autopilot) triggerHandlerPOST(jc jape.Context) { }) } -// New initializes an Autopilot. -func New(id string, bus Bus, workers []Worker, logger *zap.Logger, heartbeat time.Duration, scannerScanInterval time.Duration, scannerBatchSize, scannerNumThreads uint64, migrationHealthCutoff float64, accountsRefillInterval time.Duration, revisionSubmissionBuffer, migratorParallelSlabsPerWorker uint64, revisionBroadcastInterval time.Duration) (*Autopilot, error) { - ap := &Autopilot{ - alerts: alerts.WithOrigin(bus, fmt.Sprintf("autopilot.%s", id)), - id: id, - bus: bus, - logger: logger.Sugar().Named(api.DefaultAutopilotID), - workers: newWorkerPool(workers), - - tickerDuration: heartbeat, - } - scanner, err := newScanner( - ap, - scannerBatchSize, - scannerNumThreads, - scannerScanInterval, - scannerTimeoutInterval, - scannerTimeoutMinTimeout, - ) - if err != nil { - return nil, err - } - - ap.s = scanner - ap.c = newContractor(ap, revisionSubmissionBuffer, revisionBroadcastInterval) - ap.m = newMigrator(ap, migrationHealthCutoff, migratorParallelSlabsPerWorker) - ap.a = newAccounts(ap, ap.bus, ap.bus, ap.workers, ap.logger, accountsRefillInterval) - - return ap, nil -} - func (ap *Autopilot) hostHandlerGET(jc jape.Context) { var hostKey types.PublicKey if jc.DecodeParam("hostKey", &hostKey) != nil { diff --git a/autopilot/contract_pruning.go b/autopilot/contract_pruning.go index 243db548a..7230fcae4 100644 --- a/autopilot/contract_pruning.go +++ b/autopilot/contract_pruning.go @@ -125,6 +125,11 @@ func (c *contractor) performContractPruning(wp *workerPool) { var metrics pruneMetrics wp.withWorker(func(w Worker) { for _, contract := range res.Contracts { + // return if we're stopped + if c.ap.isStopped() { + return + } + // skip if there's nothing to prune if contract.Prunable == 0 { continue @@ -165,7 +170,7 @@ func (c *contractor) performContractPruning(wp *workerPool) { func (c *contractor) pruneContract(w Worker, fcid types.FileContractID) pruneResult { // create a sane timeout - ctx, cancel := context.WithTimeout(context.Background(), 2*timeoutPruneContract) + ctx, cancel := context.WithTimeout(c.ap.stopCtx, 2*timeoutPruneContract) defer cancel() // fetch the host diff --git a/autopilot/scanner_test.go b/autopilot/scanner_test.go index b30aea014..6d6f51df0 100644 --- a/autopilot/scanner_test.go +++ b/autopilot/scanner_test.go @@ -140,9 +140,10 @@ func (s *scanner) isScanning() bool { } func newTestScanner(b *mockBus, w *mockWorker) *scanner { - ap := &Autopilot{ - stopChan: make(chan struct{}), - } + ap := &Autopilot{} + ap.stopChan = make(chan struct{}) + ap.stopCtx, ap.stopCtxCancel = context.WithCancel(context.Background()) + return &scanner{ ap: ap, bus: b, diff --git a/autopilot/workerpool.go b/autopilot/workerpool.go new file mode 100644 index 000000000..d50cdc397 --- /dev/null +++ b/autopilot/workerpool.go @@ -0,0 +1,63 @@ +package autopilot + +import ( + "context" + "sync" + "time" + + rhpv2 "go.sia.tech/core/rhp/v2" + rhpv3 "go.sia.tech/core/rhp/v3" + "go.sia.tech/core/types" + "go.sia.tech/renterd/api" + "go.sia.tech/renterd/hostdb" + "go.sia.tech/renterd/object" + "lukechampine.com/frand" +) + +type Worker interface { + Account(ctx context.Context, hostKey types.PublicKey) (rhpv3.Account, error) + Contracts(ctx context.Context, hostTimeout time.Duration) (api.ContractsResponse, error) + ID(ctx context.Context) (string, error) + MigrateSlab(ctx context.Context, s object.Slab, set string) (api.MigrateSlabResponse, error) + + RHPBroadcast(ctx context.Context, fcid types.FileContractID) (err error) + RHPForm(ctx context.Context, endHeight uint64, hk types.PublicKey, hostIP string, renterAddress types.Address, renterFunds types.Currency, hostCollateral types.Currency) (rhpv2.ContractRevision, []types.Transaction, error) + RHPFund(ctx context.Context, contractID types.FileContractID, hostKey types.PublicKey, hostIP, siamuxAddr string, balance types.Currency) (err error) + RHPPriceTable(ctx context.Context, hostKey types.PublicKey, siamuxAddr string, timeout time.Duration) (hostdb.HostPriceTable, error) + RHPPruneContract(ctx context.Context, fcid types.FileContractID, timeout time.Duration) (prunable, pruned, remaining uint64, err error) + RHPRenew(ctx context.Context, fcid types.FileContractID, endHeight uint64, hk types.PublicKey, hostIP string, hostAddress, renterAddress types.Address, renterFunds, newCollateral types.Currency, windowSize uint64) (rhpv2.ContractRevision, []types.Transaction, error) + RHPScan(ctx context.Context, hostKey types.PublicKey, hostIP string, timeout time.Duration) (api.RHPScanResponse, error) + RHPSync(ctx context.Context, contractID types.FileContractID, hostKey types.PublicKey, hostIP, siamuxAddr string) (err error) +} + +// workerPool contains all workers known to the autopilot. Users can call +// withWorker to execute a function with a worker of the pool or withWorkers to +// sequentially run a function on all workers. Due to the RWMutex this will +// never block during normal operations. However, during an update of the +// workerpool, this allows us to guarantee that all workers have finished their +// tasks by calling acquiring an exclusive lock on the pool before updating it. +// That way the caller who updated the pool can rely on the autopilot not using +// a worker that was removed during the update after the update operation +// returns. +type workerPool struct { + mu sync.RWMutex + workers []Worker +} + +func newWorkerPool(workers []Worker) *workerPool { + return &workerPool{ + workers: workers, + } +} + +func (wp *workerPool) withWorker(workerFunc func(Worker)) { + wp.mu.RLock() + defer wp.mu.RUnlock() + workerFunc(wp.workers[frand.Intn(len(wp.workers))]) +} + +func (wp *workerPool) withWorkers(workerFunc func([]Worker)) { + wp.mu.RLock() + defer wp.mu.RUnlock() + workerFunc(wp.workers) +} From 0aa15006890c676314876548c3f3c9d80f92b7c0 Mon Sep 17 00:00:00 2001 From: PJ Date: Wed, 29 Nov 2023 18:06:07 +0100 Subject: [PATCH 06/12] stores: remove err from metric --- api/metrcis.go | 2 -- autopilot/contract_pruning.go | 10 ++++------ stores/metrics.go | 6 ------ stores/metrics_test.go | 2 -- 4 files changed, 4 insertions(+), 16 deletions(-) diff --git a/api/metrcis.go b/api/metrcis.go index b782d4223..79ff8511b 100644 --- a/api/metrcis.go +++ b/api/metrcis.go @@ -87,8 +87,6 @@ type ( Pruned uint64 `json:"pruned"` Remaining uint64 `json:"remaining"` Duration time.Duration `json:"duration"` - - Error string `json:"error,omitempty"` } ContractPruneMetricsQueryOpts struct { diff --git a/autopilot/contract_pruning.go b/autopilot/contract_pruning.go index 7230fcae4..4dd4664cb 100644 --- a/autopilot/contract_pruning.go +++ b/autopilot/contract_pruning.go @@ -70,8 +70,8 @@ func (pr pruneResult) toAlert() (id types.Hash256, alert *alerts.Alert) { return } -func (pr pruneResult) toMetric() (metric api.ContractPruneMetric) { - metric = api.ContractPruneMetric{ +func (pr pruneResult) toMetric() api.ContractPruneMetric { + return api.ContractPruneMetric{ Timestamp: pr.ts, ContractID: pr.fcid, HostKey: pr.hk, @@ -79,10 +79,6 @@ func (pr pruneResult) toMetric() (metric api.ContractPruneMetric) { Remaining: pr.remaining, Duration: pr.duration, } - if pr.err != nil { - metric.Error = pr.err.Error() - } - return } func humanReadableSize(b int) string { @@ -134,9 +130,11 @@ func (c *contractor) performContractPruning(wp *workerPool) { if contract.Prunable == 0 { continue } + fmt.Printf("DEBUG PJ: contract %v has %d bytes to prune\n", contract.ID, contract.Prunable) // prune contract result := c.pruneContract(w, contract.ID) + fmt.Printf("DEBUG PJ: res %+v", result) if result.err != nil { c.logger.Error(result) } else { diff --git a/stores/metrics.go b/stores/metrics.go index f6ab7fa56..71453fad3 100644 --- a/stores/metrics.go +++ b/stores/metrics.go @@ -55,8 +55,6 @@ type ( Pruned unsigned64 `gorm:"index;NOT NULL"` Remaining unsigned64 `gorm:"index;NOT NULL"` Duration time.Duration `gorm:"index;NOT NULL"` - - Error string `gorm:"index"` } // dbContractSetMetric tracks information about a specific contract set. @@ -367,8 +365,6 @@ func (s *SQLStore) ContractPruneMetrics(ctx context.Context, start time.Time, n Pruned: uint64(metrics[i].Pruned), Remaining: uint64(metrics[i].Remaining), Duration: metrics[i].Duration, - - Error: metrics[i].Error, } } return resp, nil @@ -387,8 +383,6 @@ func (s *SQLStore) RecordContractPruneMetric(ctx context.Context, metrics ...api Pruned: unsigned64(metric.Pruned), Remaining: unsigned64(metric.Remaining), Duration: metric.Duration, - - Error: metric.Error, } } return s.dbMetrics.Create(&dbMetrics).Error diff --git a/stores/metrics_test.go b/stores/metrics_test.go index 281e28b1f..adc6a8ff6 100644 --- a/stores/metrics_test.go +++ b/stores/metrics_test.go @@ -35,8 +35,6 @@ func TestContractPruneMetrics(t *testing.T) { Pruned: math.MaxUint64, Remaining: math.MaxUint64, Duration: time.Second, - - Error: "a", } fcid2Metric[metric.ContractID] = metric if err := ss.RecordContractPruneMetric(context.Background(), metric); err != nil { From aaa3cda4a84d597154e10568b2376456eea0d72b Mon Sep 17 00:00:00 2001 From: PJ Date: Wed, 29 Nov 2023 18:19:17 +0100 Subject: [PATCH 07/12] autopilot: add contract size --- api/worker.go | 1 - autopilot/contract_pruning.go | 26 ++++++++++++++------------ autopilot/workerpool.go | 2 +- worker/client/rhp.go | 3 +-- worker/worker.go | 1 - 5 files changed, 16 insertions(+), 17 deletions(-) diff --git a/api/worker.go b/api/worker.go index dc9b37cc6..bce931639 100644 --- a/api/worker.go +++ b/api/worker.go @@ -92,7 +92,6 @@ type ( // RHPPruneContractResponse is the response type for the /rhp/contract/:id/prune // endpoint. RHPPruneContractResponse struct { - Prunable uint64 `json:"prunable"` Pruned uint64 `json:"pruned"` Remaining uint64 `json:"remaining"` Error string `json:"error,omitempty"` diff --git a/autopilot/contract_pruning.go b/autopilot/contract_pruning.go index 4dd4664cb..720f54f34 100644 --- a/autopilot/contract_pruning.go +++ b/autopilot/contract_pruning.go @@ -28,8 +28,8 @@ type ( fcid types.FileContractID hk types.PublicKey version string + size api.ContractSize - prunable uint64 pruned uint64 remaining uint64 duration time.Duration @@ -41,7 +41,7 @@ type ( ) func (pr pruneResult) String() string { - msg := fmt.Sprintf("contract %v, prunable %d bytes, pruned %d bytes, remaining %d bytes, elapsed %v, host version %v", pr.fcid, pr.prunable, pr.pruned, pr.remaining, pr.duration, pr.version) + msg := fmt.Sprintf("contract %v, size %d bytes, prunable %d bytes, pruned %d bytes, remaining %d bytes, elapsed %v, host version %v", pr.fcid, pr.size.Size, pr.size.Prunable, pr.pruned, pr.remaining, pr.duration, pr.version) if pr.err != nil { msg += fmt.Sprintf(", err: %v", pr.err) } @@ -133,7 +133,7 @@ func (c *contractor) performContractPruning(wp *workerPool) { fmt.Printf("DEBUG PJ: contract %v has %d bytes to prune\n", contract.ID, contract.Prunable) // prune contract - result := c.pruneContract(w, contract.ID) + result := c.pruneContract(w, contract) fmt.Printf("DEBUG PJ: res %+v", result) if result.err != nil { c.logger.Error(result) @@ -166,27 +166,29 @@ func (c *contractor) performContractPruning(wp *workerPool) { c.logger.Info(metrics) } -func (c *contractor) pruneContract(w Worker, fcid types.FileContractID) pruneResult { +func (c *contractor) pruneContract(w Worker, contract api.ContractPrunableData) pruneResult { // create a sane timeout ctx, cancel := context.WithTimeout(c.ap.stopCtx, 2*timeoutPruneContract) defer cancel() // fetch the host - host, contract, err := c.hostForContract(ctx, fcid) + host, _, err := c.hostForContract(ctx, contract.ID) if err != nil { return pruneResult{ - fcid: fcid, + size: contract.ContractSize, + fcid: contract.ID, err: err, } } // prune the contract start := time.Now() - prunable, pruned, remaining, err := w.RHPPruneContract(ctx, fcid, timeoutPruneContract) + pruned, remaining, err := w.RHPPruneContract(ctx, contract.ID, timeoutPruneContract) if err != nil && pruned == 0 { return pruneResult{ - fcid: fcid, - hk: contract.HostKey, + size: contract.ContractSize, + fcid: contract.ID, + hk: host.PublicKey, version: host.Settings.Version, err: err, } @@ -197,11 +199,11 @@ func (c *contractor) pruneContract(w Worker, fcid types.FileContractID) pruneRes return pruneResult{ ts: start, - fcid: fcid, - hk: contract.HostKey, + fcid: contract.ID, + hk: host.PublicKey, version: host.Settings.Version, + size: contract.ContractSize, - prunable: prunable, pruned: pruned, remaining: remaining, duration: time.Since(start), diff --git a/autopilot/workerpool.go b/autopilot/workerpool.go index d50cdc397..ce42854c2 100644 --- a/autopilot/workerpool.go +++ b/autopilot/workerpool.go @@ -24,7 +24,7 @@ type Worker interface { RHPForm(ctx context.Context, endHeight uint64, hk types.PublicKey, hostIP string, renterAddress types.Address, renterFunds types.Currency, hostCollateral types.Currency) (rhpv2.ContractRevision, []types.Transaction, error) RHPFund(ctx context.Context, contractID types.FileContractID, hostKey types.PublicKey, hostIP, siamuxAddr string, balance types.Currency) (err error) RHPPriceTable(ctx context.Context, hostKey types.PublicKey, siamuxAddr string, timeout time.Duration) (hostdb.HostPriceTable, error) - RHPPruneContract(ctx context.Context, fcid types.FileContractID, timeout time.Duration) (prunable, pruned, remaining uint64, err error) + RHPPruneContract(ctx context.Context, fcid types.FileContractID, timeout time.Duration) (pruned, remaining uint64, err error) RHPRenew(ctx context.Context, fcid types.FileContractID, endHeight uint64, hk types.PublicKey, hostIP string, hostAddress, renterAddress types.Address, renterFunds, newCollateral types.Currency, windowSize uint64) (rhpv2.ContractRevision, []types.Transaction, error) RHPScan(ctx context.Context, hostKey types.PublicKey, hostIP string, timeout time.Duration) (api.RHPScanResponse, error) RHPSync(ctx context.Context, contractID types.FileContractID, hostKey types.PublicKey, hostIP, siamuxAddr string) (err error) diff --git a/worker/client/rhp.go b/worker/client/rhp.go index 64da44eb1..0a263a20a 100644 --- a/worker/client/rhp.go +++ b/worker/client/rhp.go @@ -65,7 +65,7 @@ func (c *Client) RHPPriceTable(ctx context.Context, hostKey types.PublicKey, sia } // RHPPruneContract prunes deleted sectors from the contract with given id. -func (c *Client) RHPPruneContract(ctx context.Context, contractID types.FileContractID, timeout time.Duration) (prunable, pruned, remaining uint64, err error) { +func (c *Client) RHPPruneContract(ctx context.Context, contractID types.FileContractID, timeout time.Duration) (pruned, remaining uint64, err error) { var res api.RHPPruneContractResponse if err = c.c.WithContext(ctx).POST(fmt.Sprintf("/rhp/contract/%s/prune", contractID), api.RHPPruneContractRequest{ Timeout: api.DurationMS(timeout), @@ -75,7 +75,6 @@ func (c *Client) RHPPruneContract(ctx context.Context, contractID types.FileCont err = errors.New(res.Error) } - prunable = res.Prunable pruned = res.Pruned remaining = res.Remaining return diff --git a/worker/worker.go b/worker/worker.go index 56f89defd..ab84ec315 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -670,7 +670,6 @@ func (w *worker) rhpPruneContractHandlerPOST(jc jape.Context) { } res := api.RHPPruneContractResponse{ - Prunable: size.Prunable, Pruned: pruned, Remaining: remaining, } From 9e3bcca8a4ca7c928a3d184242f4c467b36a68df Mon Sep 17 00:00:00 2001 From: PJ Date: Wed, 29 Nov 2023 18:24:06 +0100 Subject: [PATCH 08/12] stores: update migration --- stores/migrations.go | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/stores/migrations.go b/stores/migrations.go index 7fe66767b..08f060ced 100644 --- a/stores/migrations.go +++ b/stores/migrations.go @@ -7,6 +7,7 @@ import ( "fmt" "reflect" "strings" + "time" "github.com/go-gormigrate/gormigrate/v2" "go.sia.tech/renterd/api" @@ -394,7 +395,20 @@ func performMetricsMigrations(db *gorm.DB, logger *zap.SugaredLogger) error { func performMigration00001_contract_prune_metrics(txn *gorm.DB, logger *zap.SugaredLogger) error { logger.Info("performing migration 00001_contract_prune_metrics") - if err := txn.Migrator().AutoMigrate(&dbContractPruneMetric{}); err != nil { + if err := txn.Table("contract_prunes").Migrator().AutoMigrate(&struct { + ID uint `gorm:"primarykey"` + CreatedAt time.Time + + Timestamp unixTimeMS `gorm:"index;NOT NULL"` + + FCID fileContractID `gorm:"index;size:32;NOT NULL;column:fcid"` + Host publicKey `gorm:"index;size:32;NOT NULL"` + HostVersion string `gorm:"index"` + + Pruned unsigned64 `gorm:"index;NOT NULL"` + Remaining unsigned64 `gorm:"index;NOT NULL"` + Duration time.Duration `gorm:"index;NOT NULL"` + }{}); err != nil { return err } logger.Info("migration 00001_contract_prune_metrics complete") From c7596239f51d6daf189e85ad2bc80b3dc4a8f5ba Mon Sep 17 00:00:00 2001 From: PJ Date: Thu, 30 Nov 2023 15:27:52 +0100 Subject: [PATCH 09/12] autopilot: cleanup PR --- autopilot/autopilot.go | 7 ------- autopilot/contract_pruning.go | 3 +-- 2 files changed, 1 insertion(+), 9 deletions(-) diff --git a/autopilot/autopilot.go b/autopilot/autopilot.go index 2f4b0def3..a7ad8e280 100644 --- a/autopilot/autopilot.go +++ b/autopilot/autopilot.go @@ -321,7 +321,6 @@ func (ap *Autopilot) Shutdown(_ context.Context) error { ap.ticker.Stop() ap.stopCtxCancel() close(ap.triggerChan) - ap.stopCtxCancel() ap.wg.Wait() ap.startTime = time.Time{} } @@ -569,12 +568,6 @@ func (ap *Autopilot) updateState(ctx context.Context) error { return nil } -func (ap *Autopilot) withTimeout(fn func(ctx context.Context), timeout time.Duration) { - ctx, cancel := context.WithTimeout(ap.stopCtx, timeout) - defer cancel() - fn(ctx) -} - func (ap *Autopilot) isStopped() bool { select { case <-ap.stopCtx.Done(): diff --git a/autopilot/contract_pruning.go b/autopilot/contract_pruning.go index f5cb32144..902748b5d 100644 --- a/autopilot/contract_pruning.go +++ b/autopilot/contract_pruning.go @@ -28,7 +28,6 @@ type ( fcid types.FileContractID hk types.PublicKey version string - size api.ContractSize pruned uint64 remaining uint64 @@ -41,7 +40,7 @@ type ( ) func (pr pruneResult) String() string { - msg := fmt.Sprintf("contract %v, size %d bytes, prunable %d bytes, pruned %d bytes, remaining %d bytes, elapsed %v, host version %v", pr.fcid, pr.size.Size, pr.size.Prunable, pr.pruned, pr.remaining, pr.duration, pr.version) + msg := fmt.Sprintf("contract %v, pruned %d bytes, remaining %d bytes, elapsed %v, host version %v", pr.fcid, pr.pruned, pr.remaining, pr.duration, pr.version) if pr.err != nil { msg += fmt.Sprintf(", err: %v", pr.err) } From f43beae8426cc4c03a3be80b2f9c9d15fb7b917b Mon Sep 17 00:00:00 2001 From: PJ Date: Thu, 30 Nov 2023 16:14:41 +0100 Subject: [PATCH 10/12] testing: add integration test --- bus/bus.go | 4 +- internal/testing/metrics_test.go | 86 ++++++++++++++++++++++++++++++++ 2 files changed, 89 insertions(+), 1 deletion(-) create mode 100644 internal/testing/metrics_test.go diff --git a/bus/bus.go b/bus/bus.go index 3a2101f0f..6fb44c02e 100644 --- a/bus/bus.go +++ b/bus/bus.go @@ -2109,6 +2109,8 @@ func (b *bus) metrics(ctx context.Context, key string, start time.Time, n uint64 switch key { case api.MetricContract: return b.mtrcs.ContractMetrics(ctx, start, n, interval, opts.(api.ContractMetricsQueryOpts)) + case api.MetricContractPrune: + return b.mtrcs.ContractPruneMetrics(ctx, start, n, interval, opts.(api.ContractPruneMetricsQueryOpts)) case api.MetricContractSet: return b.mtrcs.ContractSetMetrics(ctx, start, n, interval, opts.(api.ContractSetMetricsQueryOpts)) case api.MetricContractSetChurn: @@ -2116,7 +2118,7 @@ func (b *bus) metrics(ctx context.Context, key string, start time.Time, n uint64 case api.MetricWallet: return b.mtrcs.WalletMetrics(ctx, start, n, interval, opts.(api.WalletMetricsQueryOpts)) } - return nil, nil + return nil, fmt.Errorf("unknown metric '%s'", key) } func (b *bus) multipartHandlerCreatePOST(jc jape.Context) { diff --git a/internal/testing/metrics_test.go b/internal/testing/metrics_test.go new file mode 100644 index 000000000..b69125a7a --- /dev/null +++ b/internal/testing/metrics_test.go @@ -0,0 +1,86 @@ +package testing + +import ( + "bytes" + "context" + "errors" + "io" + "testing" + "time" + + rhpv2 "go.sia.tech/core/rhp/v2" + "go.sia.tech/renterd/api" + "lukechampine.com/frand" +) + +func TestMetrics(t *testing.T) { + if testing.Short() { + t.SkipNow() + } + + // register start time + start := time.Now() + + // create a test cluster + cluster := newTestCluster(t, testClusterOptions{ + hosts: testRedundancySettings.TotalShards, + }) + defer cluster.Shutdown() + + // convenience variables + b := cluster.Bus + w := cluster.Worker + tt := cluster.tt + + // upload, download, delete + data := frand.Bytes(rhpv2.SectorSize) + tt.OKAll(w.UploadObject(context.Background(), bytes.NewReader(data), api.DefaultBucketName, "foo", api.UploadObjectOptions{})) + tt.OK(w.DownloadObject(context.Background(), io.Discard, api.DefaultBucketName, "foo", api.DownloadObjectOptions{})) + tt.OK(w.DeleteObject(context.Background(), api.DefaultBucketName, "foo", api.DeleteObjectOptions{})) + + tt.Retry(30, time.Second, func() (err error) { + defer func() { + if err != nil { + cluster.MineBlocks(1) + } + }() + + // check contract metrics + cm, err := b.ContractMetrics(context.Background(), start, 10, time.Minute, api.ContractMetricsQueryOpts{}) + tt.OK(err) + if len(cm) == 0 { + return errors.New("no contract metrics") + } + + // check contract prune metrics + cpm, err := b.ContractPruneMetrics(context.Background(), start, 10, time.Minute, api.ContractPruneMetricsQueryOpts{}) + tt.OK(err) + if len(cpm) == 0 { + return errors.New("no contract prune metrics") + } + + // check contract set metrics + csm, err := b.ContractSetMetrics(context.Background(), start, 10, time.Minute, api.ContractSetMetricsQueryOpts{}) + tt.OK(err) + if len(csm) == 0 { + return errors.New("no contract set metrics") + } + + // check contract set metrics + cscm, err := b.ContractSetChurnMetrics(context.Background(), start, 10, time.Minute, api.ContractSetChurnMetricsQueryOpts{}) + tt.OK(err) + if len(cscm) == 0 { + return errors.New("no contract set churn metrics") + } + + // check wallet metrics + wm, err := b.WalletMetrics(context.Background(), start, 10, time.Minute, api.WalletMetricsQueryOpts{}) + tt.OK(err) + if len(wm) == 0 { + return errors.New("no wallet metrics") + } + + return nil + }) + +} From 1f2791526e86e7bda3dfaa7101d11d5aaee5cfdb Mon Sep 17 00:00:00 2001 From: PJ Date: Thu, 30 Nov 2023 16:16:27 +0100 Subject: [PATCH 11/12] testing: fix lint --- internal/testing/metrics_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/internal/testing/metrics_test.go b/internal/testing/metrics_test.go index b69125a7a..87a108cfb 100644 --- a/internal/testing/metrics_test.go +++ b/internal/testing/metrics_test.go @@ -82,5 +82,4 @@ func TestMetrics(t *testing.T) { return nil }) - } From a0b5c8182c72cac5f84f96b9586f865a4e7b4876 Mon Sep 17 00:00:00 2001 From: PJ Date: Tue, 5 Dec 2023 09:53:43 +0100 Subject: [PATCH 12/12] testing: fix TestMetrics --- internal/testing/cluster.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/internal/testing/cluster.go b/internal/testing/cluster.go index f1748d015..55798a8c2 100644 --- a/internal/testing/cluster.go +++ b/internal/testing/cluster.go @@ -59,7 +59,8 @@ var ( Upload: rhpv2.SectorSize * 500, Storage: rhpv2.SectorSize * 5e3, - Set: testContractSet, + Set: testContractSet, + Prune: true, }, Hosts: api.HostsConfig{ MaxDowntimeHours: 10,