From 7065f3becdf1f002d3318081cce84404fec27889 Mon Sep 17 00:00:00 2001 From: Chris Schinnerl Date: Fri, 8 Dec 2023 11:54:15 +0100 Subject: [PATCH 1/5] bus: add endpoint to prune metrics --- api/metrics.go | 1 + bus/bus.go | 30 +++++++++++++++++++++-- stores/metrics.go | 30 +++++++++++++++++++++++ stores/metrics_test.go | 54 ++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 113 insertions(+), 2 deletions(-) diff --git a/api/metrics.go b/api/metrics.go index 0d982076b..c697193f0 100644 --- a/api/metrics.go +++ b/api/metrics.go @@ -14,6 +14,7 @@ const ( MetricContractSet = "contractset" MetricContractSetChurn = "churn" MetricContract = "contract" + MetricPerformance = "performance" MetricWallet = "wallet" ) diff --git a/bus/bus.go b/bus/bus.go index 305ac839a..d13ceb08f 100644 --- a/bus/bus.go +++ b/bus/bus.go @@ -202,6 +202,7 @@ type ( ContractMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.ContractMetricsQueryOpts) ([]api.ContractMetric, error) RecordContractMetric(ctx context.Context, metrics ...api.ContractMetric) error + PruneMetrics(ctx context.Context, metric string, cutoff time.Time) error ContractSetChurnMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.ContractSetChurnMetricsQueryOpts) ([]api.ContractSetChurnMetric, error) RecordContractSetChurnMetric(ctx context.Context, metrics ...api.ContractSetChurnMetric) error @@ -298,8 +299,9 @@ func (b *bus) Handler() http.Handler { "GET /host/:hostkey": b.hostsPubkeyHandlerGET, "POST /host/:hostkey/resetlostsectors": b.hostsResetLostSectorsPOST, - "PUT /metric/:key": b.metricsHandlerPUT, - "GET /metric/:key": b.metricsHandlerGET, + "PUT /metric/:key": b.metricsHandlerPUT, + "GET /metric/:key": b.metricsHandlerGET, + "DELETE /metric/:key": b.metricsHandlerDELETE, "POST /multipart/create": b.multipartHandlerCreatePOST, "POST /multipart/abort": b.multipartHandlerAbortPOST, @@ -1997,6 +1999,30 @@ func (b *bus) webhookHandlerPost(jc jape.Context) { } } +func (b *bus) metricsHandlerDELETE(jc jape.Context) { + // parse mandatory query parameters + var cutoff time.Time + if jc.DecodeForm("start", (*api.TimeRFC3339)(&cutoff)) != nil { + return + } else if cutoff.IsZero() { + jc.Error(errors.New("parameter 'cutoff' is required"), http.StatusBadRequest) + return + } + + var metric string + if jc.DecodeForm("n", &metric) != nil { + return + } else if metric == "" { + jc.Error(errors.New("parameter 'metric' is required"), http.StatusBadRequest) + return + } + + err := b.mtrcs.PruneMetrics(jc.Request.Context(), metric, cutoff) + if jc.Check("failed to prune metrics", err) != nil { + return + } +} + func (b *bus) metricsHandlerPUT(jc jape.Context) { jc.Custom((*interface{})(nil), nil) diff --git a/stores/metrics.go b/stores/metrics.go index ba46b601a..60fc162ee 100644 --- a/stores/metrics.go +++ b/stores/metrics.go @@ -2,6 +2,7 @@ package stores import ( "context" + "errors" "fmt" "time" @@ -490,3 +491,32 @@ func (s *SQLStore) performanceMetrics(ctx context.Context, start time.Time, n ui return metrics, nil } + +func (s *SQLStore) PruneMetrics(ctx context.Context, metric string, cutoff time.Time) error { + if metric == "" { + return errors.New("metric must be set") + } else if cutoff.IsZero() { + return errors.New("cutoff time must be set") + } + var model interface{} + switch metric { + case api.MetricContractPrune: + model = &dbContractPruneMetric{} + case api.MetricContractSet: + model = &dbContractSetMetric{} + case api.MetricContractSetChurn: + model = &dbContractSetChurnMetric{} + case api.MetricContract: + model = &dbContractMetric{} + case api.MetricPerformance: + model = &dbPerformanceMetric{} + case api.MetricWallet: + model = &dbWalletMetric{} + default: + return fmt.Errorf("unknown metric '%s'", metric) + } + return s.dbMetrics.Model(model). + Where("timestamp < ?", unixTimeMS(cutoff)). + Delete(model). + Error +} diff --git a/stores/metrics_test.go b/stores/metrics_test.go index 4a09f5110..2ddd6212b 100644 --- a/stores/metrics_test.go +++ b/stores/metrics_test.go @@ -124,6 +124,15 @@ func TestContractPruneMetrics(t *testing.T) { t.Fatalf("expected fcid to be %v, got %v", fcid, m.ContractID) } }) + + // Prune metrics + if err := ss.PruneMetrics(context.Background(), api.MetricContractPrune, time.UnixMilli(3)); err != nil { + t.Fatal(err) + } else if metrics, err := ss.ContractPruneMetrics(context.Background(), time.UnixMilli(1), 3, time.Millisecond, api.ContractPruneMetricsQueryOpts{}); err != nil { + t.Fatal(err) + } else if len(metrics) != 1 { + t.Fatalf("expected 1 metric, got %v", len(metrics)) + } } func TestContractSetMetrics(t *testing.T) { @@ -192,6 +201,15 @@ func TestContractSetMetrics(t *testing.T) { // Query the metric in the middle of the 3 we added. assertMetrics(time.UnixMilli(150), 1, 50*time.Millisecond, api.ContractSetMetricsQueryOpts{Name: cs}, 1, []int{3}) + + // Prune metrics + if err := ss.PruneMetrics(context.Background(), api.MetricContractSet, time.UnixMilli(200)); err != nil { + t.Fatal(err) + } else if metrics, err := ss.ContractSetMetrics(context.Background(), time.UnixMilli(100), 3, 50*time.Millisecond, api.ContractSetMetricsQueryOpts{}); err != nil { + t.Fatal(err) + } else if len(metrics) != 1 { + t.Fatalf("expected 1 metric, got %v", len(metrics)) + } } func TestContractChurnSetMetrics(t *testing.T) { @@ -266,6 +284,15 @@ func TestContractChurnSetMetrics(t *testing.T) { t.Fatalf("expected reason to be %v, got %v", reasons[0], m.Reason) } }) + + // Prune metrics + if err := ss.PruneMetrics(context.Background(), api.MetricContractSetChurn, time.UnixMilli(3)); err != nil { + t.Fatal(err) + } else if metrics, err := ss.ContractSetChurnMetrics(context.Background(), time.UnixMilli(1), 3, time.Millisecond, api.ContractSetChurnMetricsQueryOpts{}); err != nil { + t.Fatal(err) + } else if len(metrics) != 1 { + t.Fatalf("expected 1 metric, got %v", len(metrics)) + } } func TestPerformanceMetrics(t *testing.T) { @@ -342,6 +369,15 @@ func TestPerformanceMetrics(t *testing.T) { t.Fatalf("expected origin to be %v, got %v", origins[0], m.Origin) } }) + + // Prune metrics + if err := ss.PruneMetrics(context.Background(), api.MetricPerformance, time.UnixMilli(3)); err != nil { + t.Fatal(err) + } else if metrics, err := ss.PerformanceMetrics(context.Background(), time.UnixMilli(1), 3, time.Millisecond, api.PerformanceMetricsQueryOpts{}); err != nil { + t.Fatal(err) + } else if len(metrics) != 1 { + t.Fatalf("expected 1 metric, got %v", len(metrics)) + } } func TestContractMetrics(t *testing.T) { @@ -417,6 +453,15 @@ func TestContractMetrics(t *testing.T) { t.Fatalf("expected fcid to be %v, got %v", fcid, m.ContractID) } }) + + // Prune metrics + if err := ss.PruneMetrics(context.Background(), api.MetricContract, time.UnixMilli(3)); err != nil { + t.Fatal(err) + } else if metrics, err := ss.ContractMetrics(context.Background(), time.UnixMilli(1), 3, time.Millisecond, api.ContractMetricsQueryOpts{}); err != nil { + t.Fatal(err) + } else if len(metrics) != 1 { + t.Fatalf("expected 1 metric, got %v", len(metrics)) + } } func TestWalletMetrics(t *testing.T) { @@ -448,4 +493,13 @@ func TestWalletMetrics(t *testing.T) { }) { t.Fatal("expected metrics to be sorted by time") } + + // Prune metrics + if err := ss.PruneMetrics(context.Background(), api.MetricWallet, time.UnixMilli(3)); err != nil { + t.Fatal(err) + } else if metrics, err := ss.WalletMetrics(context.Background(), time.UnixMilli(1), 3, time.Millisecond, api.WalletMetricsQueryOpts{}); err != nil { + t.Fatal(err) + } else if len(metrics) != 1 { + t.Fatalf("expected 1 metric, got %v", len(metrics)) + } } From 87d7ea36f77920e3af3a87f60136f96e95ebaa8e Mon Sep 17 00:00:00 2001 From: Chris Schinnerl Date: Fri, 8 Dec 2023 12:04:21 +0100 Subject: [PATCH 2/5] client: add client method --- bus/bus.go | 2 +- bus/client/metrics.go | 6 +++++ stores/metrics.go | 58 +++++++++++++++++++++---------------------- 3 files changed, 36 insertions(+), 30 deletions(-) diff --git a/bus/bus.go b/bus/bus.go index d13ceb08f..9efdb968f 100644 --- a/bus/bus.go +++ b/bus/bus.go @@ -2009,7 +2009,7 @@ func (b *bus) metricsHandlerDELETE(jc jape.Context) { return } - var metric string + metric := jc.PathParam("key") if jc.DecodeForm("n", &metric) != nil { return } else if metric == "" { diff --git a/bus/client/metrics.go b/bus/client/metrics.go index 4a4bf9a29..d502ba6c8 100644 --- a/bus/client/metrics.go +++ b/bus/client/metrics.go @@ -115,6 +115,12 @@ func (c *Client) RecordContractPruneMetric(ctx context.Context, metrics ...api.C return c.recordMetric(ctx, api.MetricContractPrune, api.ContractPruneMetricRequestPUT{Metrics: metrics}) } +func (c *Client) PruneMetrics(ctx context.Context, metric string, cutoff time.Time) error { + values := url.Values{} + values.Set("cutoff", api.TimeRFC3339(cutoff).String()) + return c.c.WithContext(ctx).DELETE(fmt.Sprintf("/metric/%s?%s", metric, values.Encode())) +} + func (c *Client) recordMetric(ctx context.Context, key string, d interface{}) error { c.c.Custom("PUT", fmt.Sprintf("/metric/%s", key), (interface{})(nil), nil) diff --git a/stores/metrics.go b/stores/metrics.go index 60fc162ee..313f74e3e 100644 --- a/stores/metrics.go +++ b/stores/metrics.go @@ -339,6 +339,35 @@ func (s *SQLStore) WalletMetrics(ctx context.Context, start time.Time, n uint64, return resp, nil } +func (s *SQLStore) PruneMetrics(ctx context.Context, metric string, cutoff time.Time) error { + if metric == "" { + return errors.New("metric must be set") + } else if cutoff.IsZero() { + return errors.New("cutoff time must be set") + } + var model interface{} + switch metric { + case api.MetricContractPrune: + model = &dbContractPruneMetric{} + case api.MetricContractSet: + model = &dbContractSetMetric{} + case api.MetricContractSetChurn: + model = &dbContractSetChurnMetric{} + case api.MetricContract: + model = &dbContractMetric{} + case api.MetricPerformance: + model = &dbPerformanceMetric{} + case api.MetricWallet: + model = &dbWalletMetric{} + default: + return fmt.Errorf("unknown metric '%s'", metric) + } + return s.dbMetrics.Model(model). + Where("timestamp < ?", unixTimeMS(cutoff)). + Delete(model). + Error +} + func (s *SQLStore) contractMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.ContractMetricsQueryOpts) ([]dbContractMetric, error) { tx := s.dbMetrics if opts.ContractID != (types.FileContractID{}) { @@ -491,32 +520,3 @@ func (s *SQLStore) performanceMetrics(ctx context.Context, start time.Time, n ui return metrics, nil } - -func (s *SQLStore) PruneMetrics(ctx context.Context, metric string, cutoff time.Time) error { - if metric == "" { - return errors.New("metric must be set") - } else if cutoff.IsZero() { - return errors.New("cutoff time must be set") - } - var model interface{} - switch metric { - case api.MetricContractPrune: - model = &dbContractPruneMetric{} - case api.MetricContractSet: - model = &dbContractSetMetric{} - case api.MetricContractSetChurn: - model = &dbContractSetChurnMetric{} - case api.MetricContract: - model = &dbContractMetric{} - case api.MetricPerformance: - model = &dbPerformanceMetric{} - case api.MetricWallet: - model = &dbWalletMetric{} - default: - return fmt.Errorf("unknown metric '%s'", metric) - } - return s.dbMetrics.Model(model). - Where("timestamp < ?", unixTimeMS(cutoff)). - Delete(model). - Error -} From 51cdbf1290a9414d002119d5d321984ed4454af5 Mon Sep 17 00:00:00 2001 From: Chris Schinnerl Date: Fri, 8 Dec 2023 13:16:05 +0100 Subject: [PATCH 3/5] bus: fix jape --- bus/bus.go | 17 ++++++++--------- bus/client/metrics.go | 2 +- 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/bus/bus.go b/bus/bus.go index 9efdb968f..afcb0112e 100644 --- a/bus/bus.go +++ b/bus/bus.go @@ -2000,20 +2000,19 @@ func (b *bus) webhookHandlerPost(jc jape.Context) { } func (b *bus) metricsHandlerDELETE(jc jape.Context) { - // parse mandatory query parameters - var cutoff time.Time - if jc.DecodeForm("start", (*api.TimeRFC3339)(&cutoff)) != nil { + metric := jc.PathParam("key") + if jc.DecodeForm("cutoff", &metric) != nil { return - } else if cutoff.IsZero() { - jc.Error(errors.New("parameter 'cutoff' is required"), http.StatusBadRequest) + } else if metric == "" { + jc.Error(errors.New("parameter 'metric' is required"), http.StatusBadRequest) return } - metric := jc.PathParam("key") - if jc.DecodeForm("n", &metric) != nil { + var cutoff time.Time + if jc.DecodeForm("start", (*api.TimeRFC3339)(&cutoff)) != nil { return - } else if metric == "" { - jc.Error(errors.New("parameter 'metric' is required"), http.StatusBadRequest) + } else if cutoff.IsZero() { + jc.Error(errors.New("parameter 'cutoff' is required"), http.StatusBadRequest) return } diff --git a/bus/client/metrics.go b/bus/client/metrics.go index d502ba6c8..33d9a6286 100644 --- a/bus/client/metrics.go +++ b/bus/client/metrics.go @@ -118,7 +118,7 @@ func (c *Client) RecordContractPruneMetric(ctx context.Context, metrics ...api.C func (c *Client) PruneMetrics(ctx context.Context, metric string, cutoff time.Time) error { values := url.Values{} values.Set("cutoff", api.TimeRFC3339(cutoff).String()) - return c.c.WithContext(ctx).DELETE(fmt.Sprintf("/metric/%s?%s", metric, values.Encode())) + return c.c.WithContext(ctx).DELETE(fmt.Sprintf("/metric/%s?"+values.Encode(), metric)) } func (c *Client) recordMetric(ctx context.Context, key string, d interface{}) error { From 7c724d28e476493956ea70d2d36d859a3de5f9fd Mon Sep 17 00:00:00 2001 From: Chris Schinnerl Date: Fri, 8 Dec 2023 14:39:11 +0100 Subject: [PATCH 4/5] testing: extend TestBusRecordedMetrics --- bus/bus.go | 6 ++---- bus/client/metrics.go | 2 +- internal/testing/cluster_test.go | 9 +++++++++ 3 files changed, 12 insertions(+), 5 deletions(-) diff --git a/bus/bus.go b/bus/bus.go index afcb0112e..cfc3d9aa1 100644 --- a/bus/bus.go +++ b/bus/bus.go @@ -2001,15 +2001,13 @@ func (b *bus) webhookHandlerPost(jc jape.Context) { func (b *bus) metricsHandlerDELETE(jc jape.Context) { metric := jc.PathParam("key") - if jc.DecodeForm("cutoff", &metric) != nil { - return - } else if metric == "" { + if metric == "" { jc.Error(errors.New("parameter 'metric' is required"), http.StatusBadRequest) return } var cutoff time.Time - if jc.DecodeForm("start", (*api.TimeRFC3339)(&cutoff)) != nil { + if jc.DecodeForm("cutoff", (*api.TimeRFC3339)(&cutoff)) != nil { return } else if cutoff.IsZero() { jc.Error(errors.New("parameter 'cutoff' is required"), http.StatusBadRequest) diff --git a/bus/client/metrics.go b/bus/client/metrics.go index 33d9a6286..017a4ce84 100644 --- a/bus/client/metrics.go +++ b/bus/client/metrics.go @@ -118,7 +118,7 @@ func (c *Client) RecordContractPruneMetric(ctx context.Context, metrics ...api.C func (c *Client) PruneMetrics(ctx context.Context, metric string, cutoff time.Time) error { values := url.Values{} values.Set("cutoff", api.TimeRFC3339(cutoff).String()) - return c.c.WithContext(ctx).DELETE(fmt.Sprintf("/metric/%s?"+values.Encode(), metric)) + return c.c.WithContext(ctx).DELETE(fmt.Sprintf("/metric/%s?", metric) + values.Encode()) } func (c *Client) recordMetric(ctx context.Context, key string, d interface{}) error { diff --git a/internal/testing/cluster_test.go b/internal/testing/cluster_test.go index 645db0bd0..e596da598 100644 --- a/internal/testing/cluster_test.go +++ b/internal/testing/cluster_test.go @@ -2266,6 +2266,15 @@ func TestBusRecordedMetrics(t *testing.T) { } else if !m.ListSpending.IsZero() { t.Fatal("expected zero ListSpending") } + + // Prune one of the metrics + if err := cluster.Bus.PruneMetrics(context.Background(), api.MetricContract, time.Now()); err != nil { + t.Fatal(err) + } else if cMetrics, err = cluster.Bus.ContractMetrics(context.Background(), startTime, math.MaxUint32, time.Second, api.ContractMetricsQueryOpts{}); err != nil { + t.Fatal(err) + } else if len(cMetrics) > 0 { + t.Fatalf("expected 0 metrics, got %v", len(cscMetrics)) + } } func TestMultipartUploadWrappedByPartialSlabs(t *testing.T) { From 068d4d3364309538e32825e68db89e591b7e8fc9 Mon Sep 17 00:00:00 2001 From: Chris Schinnerl Date: Fri, 8 Dec 2023 14:48:50 +0100 Subject: [PATCH 5/5] client: add c.c.Custom to client method --- bus/client/metrics.go | 23 ++++++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/bus/client/metrics.go b/bus/client/metrics.go index 017a4ce84..dce120ca8 100644 --- a/bus/client/metrics.go +++ b/bus/client/metrics.go @@ -118,7 +118,28 @@ func (c *Client) RecordContractPruneMetric(ctx context.Context, metrics ...api.C func (c *Client) PruneMetrics(ctx context.Context, metric string, cutoff time.Time) error { values := url.Values{} values.Set("cutoff", api.TimeRFC3339(cutoff).String()) - return c.c.WithContext(ctx).DELETE(fmt.Sprintf("/metric/%s?", metric) + values.Encode()) + c.c.Custom("DELETE", fmt.Sprintf("/metric/%s?"+values.Encode(), metric), nil, nil) + + u, err := url.Parse(fmt.Sprintf("%s/metric/%s", c.c.BaseURL, metric)) + if err != nil { + panic(err) + } + u.RawQuery = values.Encode() + req, err := http.NewRequestWithContext(ctx, "DELETE", u.String(), nil) + if err != nil { + panic(err) + } + req.SetBasicAuth("", c.c.WithContext(ctx).Password) + resp, err := http.DefaultClient.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + if resp.StatusCode != 200 { + err, _ := io.ReadAll(resp.Body) + return errors.New(string(err)) + } + return nil } func (c *Client) recordMetric(ctx context.Context, key string, d interface{}) error {