Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add endpoint to prune metrics #806

Merged
merged 6 commits into from
Dec 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions api/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ const (
MetricContractSet = "contractset"
MetricContractSetChurn = "churn"
MetricContract = "contract"
MetricPerformance = "performance"
Copy link
Member

@peterjan peterjan Dec 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's odd we have to add this metric here, and only use it in newly added code. There should be several places (switch cases) that we have to extend to add this missing metric. Edit: checked and there's indeed 3 instances where a performance is missing.

MetricWallet = "wallet"
)

Expand Down
27 changes: 25 additions & 2 deletions bus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ type (
ContractMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.ContractMetricsQueryOpts) ([]api.ContractMetric, error)
RecordContractMetric(ctx context.Context, metrics ...api.ContractMetric) error

PruneMetrics(ctx context.Context, metric string, cutoff time.Time) error
ContractSetChurnMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.ContractSetChurnMetricsQueryOpts) ([]api.ContractSetChurnMetric, error)
RecordContractSetChurnMetric(ctx context.Context, metrics ...api.ContractSetChurnMetric) error

Expand Down Expand Up @@ -298,8 +299,9 @@ func (b *bus) Handler() http.Handler {
"GET /host/:hostkey": b.hostsPubkeyHandlerGET,
"POST /host/:hostkey/resetlostsectors": b.hostsResetLostSectorsPOST,

"PUT /metric/:key": b.metricsHandlerPUT,
"GET /metric/:key": b.metricsHandlerGET,
"PUT /metric/:key": b.metricsHandlerPUT,
"GET /metric/:key": b.metricsHandlerGET,
"DELETE /metric/:key": b.metricsHandlerDELETE,

"POST /multipart/create": b.multipartHandlerCreatePOST,
"POST /multipart/abort": b.multipartHandlerAbortPOST,
Expand Down Expand Up @@ -1997,6 +1999,27 @@ func (b *bus) webhookHandlerPost(jc jape.Context) {
}
}

func (b *bus) metricsHandlerDELETE(jc jape.Context) {
metric := jc.PathParam("key")
if metric == "" {
jc.Error(errors.New("parameter 'metric' is required"), http.StatusBadRequest)
return
}

var cutoff time.Time
if jc.DecodeForm("cutoff", (*api.TimeRFC3339)(&cutoff)) != nil {
return
} else if cutoff.IsZero() {
jc.Error(errors.New("parameter 'cutoff' is required"), http.StatusBadRequest)
return
}

err := b.mtrcs.PruneMetrics(jc.Request.Context(), metric, cutoff)
if jc.Check("failed to prune metrics", err) != nil {
return
}
}

func (b *bus) metricsHandlerPUT(jc jape.Context) {
jc.Custom((*interface{})(nil), nil)

Expand Down
27 changes: 27 additions & 0 deletions bus/client/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,33 @@ func (c *Client) RecordContractPruneMetric(ctx context.Context, metrics ...api.C
return c.recordMetric(ctx, api.MetricContractPrune, api.ContractPruneMetricRequestPUT{Metrics: metrics})
}

func (c *Client) PruneMetrics(ctx context.Context, metric string, cutoff time.Time) error {
values := url.Values{}
values.Set("cutoff", api.TimeRFC3339(cutoff).String())
c.c.Custom("DELETE", fmt.Sprintf("/metric/%s?"+values.Encode(), metric), nil, nil)

u, err := url.Parse(fmt.Sprintf("%s/metric/%s", c.c.BaseURL, metric))
if err != nil {
panic(err)
}
u.RawQuery = values.Encode()
req, err := http.NewRequestWithContext(ctx, "DELETE", u.String(), nil)
if err != nil {
panic(err)
}
req.SetBasicAuth("", c.c.WithContext(ctx).Password)
resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
err, _ := io.ReadAll(resp.Body)
return errors.New(string(err))
}
return nil
}

func (c *Client) recordMetric(ctx context.Context, key string, d interface{}) error {
c.c.Custom("PUT", fmt.Sprintf("/metric/%s", key), (interface{})(nil), nil)

Expand Down
9 changes: 9 additions & 0 deletions internal/testing/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2266,6 +2266,15 @@ func TestBusRecordedMetrics(t *testing.T) {
} else if !m.ListSpending.IsZero() {
t.Fatal("expected zero ListSpending")
}

// Prune one of the metrics
if err := cluster.Bus.PruneMetrics(context.Background(), api.MetricContract, time.Now()); err != nil {
t.Fatal(err)
} else if cMetrics, err = cluster.Bus.ContractMetrics(context.Background(), startTime, math.MaxUint32, time.Second, api.ContractMetricsQueryOpts{}); err != nil {
t.Fatal(err)
} else if len(cMetrics) > 0 {
t.Fatalf("expected 0 metrics, got %v", len(cscMetrics))
}
}

func TestMultipartUploadWrappedByPartialSlabs(t *testing.T) {
Expand Down
30 changes: 30 additions & 0 deletions stores/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package stores

import (
"context"
"errors"
"fmt"
"math"
"math/bits"
Expand Down Expand Up @@ -379,6 +380,35 @@ func (m dbContractMetric) Aggregate(o dbContractMetric) (out dbContractMetric) {
return
}

func (s *SQLStore) PruneMetrics(ctx context.Context, metric string, cutoff time.Time) error {
if metric == "" {
return errors.New("metric must be set")
} else if cutoff.IsZero() {
return errors.New("cutoff time must be set")
}
var model interface{}
switch metric {
case api.MetricContractPrune:
model = &dbContractPruneMetric{}
case api.MetricContractSet:
model = &dbContractSetMetric{}
case api.MetricContractSetChurn:
model = &dbContractSetChurnMetric{}
case api.MetricContract:
model = &dbContractMetric{}
case api.MetricPerformance:
model = &dbPerformanceMetric{}
case api.MetricWallet:
model = &dbWalletMetric{}
default:
return fmt.Errorf("unknown metric '%s'", metric)
}
return s.dbMetrics.Model(model).
Where("timestamp < ?", unixTimeMS(cutoff)).
Delete(model).
Error
}

func (s *SQLStore) contractMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.ContractMetricsQueryOpts) ([]dbContractMetric, error) {
tx := s.dbMetrics
if opts.ContractID != (types.FileContractID{}) {
Expand Down
55 changes: 54 additions & 1 deletion stores/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,15 @@ func TestContractPruneMetrics(t *testing.T) {
t.Fatalf("expected fcid to be %v, got %v", fcid, m.ContractID)
}
})

// Prune metrics
if err := ss.PruneMetrics(context.Background(), api.MetricContractPrune, time.UnixMilli(3)); err != nil {
t.Fatal(err)
} else if metrics, err := ss.ContractPruneMetrics(context.Background(), time.UnixMilli(1), 3, time.Millisecond, api.ContractPruneMetricsQueryOpts{}); err != nil {
t.Fatal(err)
} else if len(metrics) != 1 {
t.Fatalf("expected 1 metric, got %v", len(metrics))
}
}

func TestContractSetMetrics(t *testing.T) {
Expand Down Expand Up @@ -192,6 +201,15 @@ func TestContractSetMetrics(t *testing.T) {

// Query the metric in the middle of the 3 we added.
assertMetrics(time.UnixMilli(150), 1, 50*time.Millisecond, api.ContractSetMetricsQueryOpts{Name: cs}, 1, []int{3})

// Prune metrics
if err := ss.PruneMetrics(context.Background(), api.MetricContractSet, time.UnixMilli(200)); err != nil {
t.Fatal(err)
} else if metrics, err := ss.ContractSetMetrics(context.Background(), time.UnixMilli(100), 3, 50*time.Millisecond, api.ContractSetMetricsQueryOpts{}); err != nil {
t.Fatal(err)
} else if len(metrics) != 1 {
t.Fatalf("expected 1 metric, got %v", len(metrics))
}
}

func TestContractChurnSetMetrics(t *testing.T) {
Expand Down Expand Up @@ -266,6 +284,15 @@ func TestContractChurnSetMetrics(t *testing.T) {
t.Fatalf("expected reason to be %v, got %v", reasons[0], m.Reason)
}
})

// Prune metrics
if err := ss.PruneMetrics(context.Background(), api.MetricContractSetChurn, time.UnixMilli(3)); err != nil {
t.Fatal(err)
} else if metrics, err := ss.ContractSetChurnMetrics(context.Background(), time.UnixMilli(1), 3, time.Millisecond, api.ContractSetChurnMetricsQueryOpts{}); err != nil {
t.Fatal(err)
} else if len(metrics) != 1 {
t.Fatalf("expected 1 metric, got %v", len(metrics))
}
}

func TestPerformanceMetrics(t *testing.T) {
Expand Down Expand Up @@ -342,6 +369,15 @@ func TestPerformanceMetrics(t *testing.T) {
t.Fatalf("expected origin to be %v, got %v", origins[0], m.Origin)
}
})

// Prune metrics
if err := ss.PruneMetrics(context.Background(), api.MetricPerformance, time.UnixMilli(3)); err != nil {
t.Fatal(err)
} else if metrics, err := ss.PerformanceMetrics(context.Background(), time.UnixMilli(1), 3, time.Millisecond, api.PerformanceMetricsQueryOpts{}); err != nil {
t.Fatal(err)
} else if len(metrics) != 1 {
t.Fatalf("expected 1 metric, got %v", len(metrics))
}
}

func TestContractMetrics(t *testing.T) {
Expand Down Expand Up @@ -419,7 +455,6 @@ func TestContractMetrics(t *testing.T) {
t.Fatalf("expected fcid to be %v, got %v", fcid, m.ContractID)
}
})

// Query without any filters. This will cause aggregate values to be returned.
metrics, err := ss.ContractMetrics(context.Background(), start, 3, time.Millisecond, api.ContractMetricsQueryOpts{})
if err != nil {
Expand All @@ -444,6 +479,15 @@ func TestContractMetrics(t *testing.T) {
t.Fatal(i, "unexpected metric", cmp.Diff(m, expectedMetric, cmp.Comparer(api.CompareTimeRFC3339)))
}
}

// Prune metrics
if err := ss.PruneMetrics(context.Background(), api.MetricContract, time.UnixMilli(3)); err != nil {
t.Fatal(err)
} else if metrics, err := ss.ContractMetrics(context.Background(), time.UnixMilli(1), 3, time.Millisecond, api.ContractMetricsQueryOpts{}); err != nil {
t.Fatal(err)
} else if len(metrics) != 1 {
t.Fatalf("expected 1 metric, got %v", len(metrics))
}
}

func TestWalletMetrics(t *testing.T) {
Expand Down Expand Up @@ -475,4 +519,13 @@ func TestWalletMetrics(t *testing.T) {
}) {
t.Fatal("expected metrics to be sorted by time")
}

// Prune metrics
if err := ss.PruneMetrics(context.Background(), api.MetricWallet, time.UnixMilli(3)); err != nil {
t.Fatal(err)
} else if metrics, err := ss.WalletMetrics(context.Background(), time.UnixMilli(1), 3, time.Millisecond, api.WalletMetricsQueryOpts{}); err != nil {
t.Fatal(err)
} else if len(metrics) != 1 {
t.Fatalf("expected 1 metric, got %v", len(metrics))
}
}