diff --git a/metrics/engine/engine_test.go b/metrics/engine/engine_test.go index a46e409f39d..06267cfa3c8 100644 --- a/metrics/engine/engine_test.go +++ b/metrics/engine/engine_test.go @@ -1,459 +1,186 @@ package engine -// TODO: refactor and move the tests that still make sense in other packages - -/* - -const isWindows = runtime.GOOS == "windows" - -func TestEngineRun(t *testing.T) { +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.k6.io/k6/lib" + "go.k6.io/k6/lib/testutils" + "go.k6.io/k6/metrics" +) + +func TestNewMetricsEngineWithThresholds(t *testing.T) { t.Parallel() - logrus.SetLevel(logrus.DebugLevel) - t.Run("exits with context", func(t *testing.T) { - t.Parallel() - done := make(chan struct{}) - runner := &minirunner.MiniRunner{ - Fn: func(ctx context.Context, _ *lib.State, _ chan<- metrics.SampleContainer) error { - <-ctx.Done() - close(done) - return nil - }, - } - duration := 100 * time.Millisecond - test := newTestEngine(t, &duration, runner, nil, lib.Options{}) - defer test.wait() - - startTime := time.Now() - assert.ErrorContains(t, test.run(), "context deadline exceeded") - assert.WithinDuration(t, startTime.Add(duration), time.Now(), 100*time.Millisecond) - <-done - }) - t.Run("exits with executor", func(t *testing.T) { - t.Parallel() - test := newTestEngine(t, nil, nil, nil, lib.Options{ - VUs: null.IntFrom(10), - Iterations: null.IntFrom(100), - }) - defer test.wait() - assert.NoError(t, test.run()) - assert.Equal(t, uint64(100), test.engine.ExecutionScheduler.GetState().GetFullIterationCount()) - }) - // Make sure samples are discarded after context close (using "cutoff" timestamp in local.go) - t.Run("collects samples", func(t *testing.T) { - t.Parallel() - - piState := getTestPreInitState(t) - testMetric, err := piState.Registry.NewMetric("test_metric", metrics.Trend) - require.NoError(t, err) - - signalChan := make(chan interface{}) - - runner := &minirunner.MiniRunner{ - Fn: func(ctx context.Context, _ *lib.State, out chan<- metrics.SampleContainer) error { - metrics.PushIfNotDone(ctx, out, metrics.Sample{ - TimeSeries: metrics.TimeSeries{Metric: testMetric, Tags: piState.Registry.RootTagSet()}, - Time: time.Now(), - Value: 1, - }) - close(signalChan) - <-ctx.Done() - metrics.PushIfNotDone(ctx, out, metrics.Sample{ - TimeSeries: metrics.TimeSeries{Metric: testMetric, Tags: piState.Registry.RootTagSet()}, - Time: time.Now(), Value: 1, - }) - return nil + trs := &lib.TestRunState{ + TestPreInitState: &lib.TestPreInitState{ + Logger: testutils.NewLogger(t), + Registry: metrics.NewRegistry(), + }, + Options: lib.Options{ + Thresholds: map[string]metrics.Thresholds{ + "metric1": {Thresholds: []*metrics.Threshold{}}, + "metric2": {Thresholds: []*metrics.Threshold{}}, }, - } - - mockOutput := mockoutput.New() - test := newTestEngineWithTestPreInitState(t, nil, runner, []output.Output{mockOutput}, lib.Options{ - VUs: null.IntFrom(1), - Iterations: null.IntFrom(1), - }, piState) - - errC := make(chan error) - go func() { errC <- test.run() }() - <-signalChan - test.runAbort(fmt.Errorf("custom error")) - assert.ErrorContains(t, <-errC, "custom error") - test.wait() - - found := 0 - for _, s := range mockOutput.Samples { - if s.Metric != testMetric { - continue - } - found++ - assert.Equal(t, 1.0, s.Value, "wrong value") - } - assert.Equal(t, 1, found, "wrong number of samples") - }) -} - -func TestEngineOutput(t *testing.T) { - t.Parallel() + }, + } + _, err := trs.Registry.NewMetric("metric1", metrics.Counter) + require.NoError(t, err) - piState := getTestPreInitState(t) - testMetric, err := piState.Registry.NewMetric("test_metric", metrics.Trend) + _, err = trs.Registry.NewMetric("metric2", metrics.Counter) require.NoError(t, err) - runner := &minirunner.MiniRunner{ - Fn: func(_ context.Context, _ *lib.State, out chan<- metrics.SampleContainer) error { - out <- metrics.Sample{TimeSeries: metrics.TimeSeries{Metric: testMetric}} - return nil - }, - } + me, err := NewMetricsEngine(trs) + require.NoError(t, err) + require.NotNil(t, me) - mockOutput := mockoutput.New() - test := newTestEngineWithTestPreInitState(t, nil, runner, []output.Output{mockOutput}, lib.Options{ - VUs: null.IntFrom(1), - Iterations: null.IntFrom(1), - }, piState) - - assert.NoError(t, test.run()) - test.wait() - - cSamples := []metrics.Sample{} - for _, sample := range mockOutput.Samples { - if sample.Metric == testMetric { - cSamples = append(cSamples, sample) - } - } - metric := test.engine.MetricsEngine.ObservedMetrics["test_metric"] - if assert.NotNil(t, metric) { - sink := metric.Sink.(*metrics.TrendSink) //nolint:forcetypeassert - if assert.NotNil(t, sink) { - numOutputSamples := len(cSamples) - numEngineSamples := len(sink.Values) - assert.Equal(t, numEngineSamples, numOutputSamples) - } - } + assert.Len(t, me.metricsWithThresholds, 2) } -func TestEngine_processSamples(t *testing.T) { +func TestMetricsEngineGetThresholdMetricOrSubmetricError(t *testing.T) { t.Parallel() - t.Run("metric", func(t *testing.T) { - t.Parallel() - - piState := getTestPreInitState(t) - metric, err := piState.Registry.NewMetric("my_metric", metrics.Gauge) - require.NoError(t, err) - - done := make(chan struct{}) - runner := &minirunner.MiniRunner{ - Fn: func(ctx context.Context, _ *lib.State, out chan<- metrics.SampleContainer) error { - out <- metrics.Sample{ - TimeSeries: metrics.TimeSeries{ - Metric: metric, - Tags: piState.Registry.RootTagSet().WithTagsFromMap(map[string]string{"a": "1"}), - }, - Value: 1.25, - Time: time.Now(), - } - close(done) - return nil - }, - } - test := newTestEngineWithTestPreInitState(t, nil, runner, nil, lib.Options{}, piState) - - go func() { - assert.NoError(t, test.run()) - }() - - select { - case <-done: - return - case <-time.After(10 * time.Second): - assert.Fail(t, "Test should have completed within 10 seconds") - } - - test.wait() - - assert.IsType(t, &metrics.GaugeSink{}, test.engine.MetricsEngine.ObservedMetrics["my_metric"].Sink) - }) - t.Run("submetric", func(t *testing.T) { - t.Parallel() - - piState := getTestPreInitState(t) - metric, err := piState.Registry.NewMetric("my_metric", metrics.Gauge) - require.NoError(t, err) - - ths := metrics.NewThresholds([]string{`value<2`}) - gotParseErr := ths.Parse() - require.NoError(t, gotParseErr) - - done := make(chan struct{}) - runner := &minirunner.MiniRunner{ - Fn: func(ctx context.Context, _ *lib.State, out chan<- metrics.SampleContainer) error { - out <- metrics.Sample{ - TimeSeries: metrics.TimeSeries{ - Metric: metric, - Tags: piState.Registry.RootTagSet().WithTagsFromMap(map[string]string{"a": "1", "b": "2"}), - }, - Value: 1.25, - Time: time.Now(), - } - close(done) - return nil - }, - } - test := newTestEngineWithTestPreInitState(t, nil, runner, nil, lib.Options{ - Thresholds: map[string]metrics.Thresholds{ - "my_metric{a:1}": ths, - }, - }, piState) - - go func() { - assert.NoError(t, test.run()) - }() + cases := []struct { + metricDefinition string + expErr string + }{ + {metricDefinition: "metric1{test:a", expErr: "missing ending bracket"}, + {metricDefinition: "metric2", expErr: "'metric2' does not exist in the script"}, + {metricDefinition: "metric1{}", expErr: "submetric criteria for metric 'metric1' cannot be empty"}, + } - select { - case <-done: - return - case <-time.After(10 * time.Second): - assert.Fail(t, "Test should have completed within 10 seconds") - } - test.wait() + for _, tc := range cases { + tc := tc + t.Run("", func(t *testing.T) { + t.Parallel() - assert.Len(t, test.engine.MetricsEngine.ObservedMetrics, 2) - sms := test.engine.MetricsEngine.ObservedMetrics["my_metric{a:1}"] - assert.EqualValues(t, map[string]string{"a": "1"}, sms.Sub.Tags.Map()) + me := newTestMetricsEngine(t) + _, err := me.test.Registry.NewMetric("metric1", metrics.Counter) + require.NoError(t, err) - assert.IsType(t, &metrics.GaugeSink{}, test.engine.MetricsEngine.ObservedMetrics["my_metric"].Sink) - assert.IsType(t, &metrics.GaugeSink{}, test.engine.MetricsEngine.ObservedMetrics["my_metric{a:1}"].Sink) - }) + _, err = me.getThresholdMetricOrSubmetric(tc.metricDefinition) + assert.ErrorContains(t, err, tc.expErr) + }) + } } -func TestEngineThresholdsWillAbort(t *testing.T) { +func TestNewMetricsEngineNoThresholds(t *testing.T) { t.Parallel() - piState := getTestPreInitState(t) - metric, err := piState.Registry.NewMetric("my_metric", metrics.Gauge) - require.NoError(t, err) - - // The incoming samples for the metric set it to 1.25. Considering - // the metric is of type Gauge, value > 1.25 should always fail, and - // trigger an abort. - ths := metrics.NewThresholds([]string{"value>1.25"}) - gotParseErr := ths.Parse() - require.NoError(t, gotParseErr) - ths.Thresholds[0].AbortOnFail = true - - thresholds := map[string]metrics.Thresholds{metric.Name: ths} - - done := make(chan struct{}) - runner := &minirunner.MiniRunner{ - Fn: func(ctx context.Context, _ *lib.State, out chan<- metrics.SampleContainer) error { - out <- metrics.Sample{ - TimeSeries: metrics.TimeSeries{ - Metric: metric, - Tags: piState.Registry.RootTagSet().WithTagsFromMap(map[string]string{"a": "1"}), - }, - Time: time.Now(), - Value: 1.25, - } - close(done) - return nil + trs := &lib.TestRunState{ + TestPreInitState: &lib.TestPreInitState{ + Logger: testutils.NewLogger(t), }, } - test := newTestEngineWithTestPreInitState(t, nil, runner, nil, lib.Options{Thresholds: thresholds}, piState) - go func() { - assert.NoError(t, test.run()) - }() + me, err := NewMetricsEngine(trs) + require.NoError(t, err) + require.NotNil(t, me) - select { - case <-done: - return - case <-time.After(10 * time.Second): - assert.Fail(t, "Test should have completed within 10 seconds") - } - test.wait() - assert.True(t, test.engine.IsTainted()) + assert.Empty(t, me.metricsWithThresholds) } -func TestEngineAbortedByThresholds(t *testing.T) { +func TestMetricsEngineCreateIngester(t *testing.T) { t.Parallel() - piState := getTestPreInitState(t) - metric, err := piState.Registry.NewMetric("my_metric", metrics.Gauge) - require.NoError(t, err) - - // The MiniRunner sets the value of the metric to 1.25. Considering - // the metric is of type Gauge, value > 1.25 should always fail, and - // trigger an abort. - // **N.B**: a threshold returning an error, won't trigger an abort. - ths := metrics.NewThresholds([]string{"value>1.25"}) - gotParseErr := ths.Parse() - require.NoError(t, gotParseErr) - ths.Thresholds[0].AbortOnFail = true - - thresholds := map[string]metrics.Thresholds{metric.Name: ths} - - doneIter := make(chan struct{}) - doneRun := make(chan struct{}) - runner := &minirunner.MiniRunner{ - Fn: func(ctx context.Context, _ *lib.State, out chan<- metrics.SampleContainer) error { - out <- metrics.Sample{ - TimeSeries: metrics.TimeSeries{ - Metric: metric, - Tags: piState.Registry.RootTagSet().WithTagsFromMap(map[string]string{"a": "1"}), - }, - Time: time.Now(), - Value: 1.25, - } - <-ctx.Done() - close(doneIter) - return nil - }, - } - - test := newTestEngineWithTestPreInitState(t, nil, runner, nil, lib.Options{Thresholds: thresholds}, piState) - defer test.wait() - - go func() { - defer close(doneRun) - t.Logf("test run done with err '%s'", err) - assert.ErrorContains(t, test.run(), "thresholds on metrics 'my_metric' were breached") - }() - - select { - case <-doneIter: - case <-time.After(10 * time.Second): - assert.Fail(t, "Iteration should have completed within 10 seconds") - } - select { - case <-doneRun: - case <-time.After(10 * time.Second): - assert.Fail(t, "Test should have completed within 10 seconds") + me := MetricsEngine{ + logger: testutils.NewLogger(t), } + ingester := me.CreateIngester() + assert.NotNil(t, ingester) + require.NoError(t, ingester.Start()) + require.NoError(t, ingester.Stop()) } -func TestEngine_processThresholds(t *testing.T) { +func TestMetricsEngineEvaluateThresholdNoAbort(t *testing.T) { t.Parallel() - testdata := map[string]struct { - pass bool - ths map[string][]string + cases := []struct { + threshold string + abortOnFail bool + expBreached []string }{ - "passing": {true, map[string][]string{"my_metric": {"value<2"}}}, - "failing": {false, map[string][]string{"my_metric": {"value>1.25"}}}, - - "submetric,match,passing": {true, map[string][]string{"my_metric{a:1}": {"value<2"}}}, - "submetric,match,failing": {false, map[string][]string{"my_metric{a:1}": {"value>1.25"}}}, - "submetric,nomatch,passing": {true, map[string][]string{"my_metric{a:2}": {"value<2"}}}, - "submetric,nomatch,failing": {false, map[string][]string{"my_metric{a:2}": {"value>1.25"}}}, - - "unused,passing": {true, map[string][]string{"unused_counter": {"count==0"}}}, - "unused,failing": {false, map[string][]string{"unused_counter": {"count>1"}}}, - "unused,subm,passing": {true, map[string][]string{"unused_counter{a:2}": {"count<1"}}}, - "unused,subm,failing": {false, map[string][]string{"unused_counter{a:2}": {"count>1"}}}, - - "used,passing": {true, map[string][]string{"used_counter": {"count==2"}}}, - "used,failing": {false, map[string][]string{"used_counter": {"count<1"}}}, - "used,subm,passing": {true, map[string][]string{"used_counter{b:1}": {"count==2"}}}, - "used,not-subm,passing": {true, map[string][]string{"used_counter{b:2}": {"count==0"}}}, - "used,invalid-subm,passing1": {true, map[string][]string{"used_counter{c:''}": {"count==0"}}}, - "used,invalid-subm,failing1": {false, map[string][]string{"used_counter{c:''}": {"count>0"}}}, - "used,invalid-subm,passing2": {true, map[string][]string{"used_counter{c:}": {"count==0"}}}, - "used,invalid-subm,failing2": {false, map[string][]string{"used_counter{c:}": {"count>0"}}}, + {threshold: "count>5", expBreached: nil}, + {threshold: "count<5", expBreached: []string{"m1"}}, + {threshold: "count<5", expBreached: []string{"m1"}, abortOnFail: true}, } - for name, data := range testdata { - name, data := name, data - t.Run(name, func(t *testing.T) { + for _, tc := range cases { + tc := tc + t.Run(tc.threshold, func(t *testing.T) { t.Parallel() + me := newTestMetricsEngine(t) - piState := getTestPreInitState(t) - gaugeMetric, err := piState.Registry.NewMetric("my_metric", metrics.Gauge) - require.NoError(t, err) - counterMetric, err := piState.Registry.NewMetric("used_counter", metrics.Counter) + m1, err := me.test.Registry.NewMetric("m1", metrics.Counter) require.NoError(t, err) - _, err = piState.Registry.NewMetric("unused_counter", metrics.Counter) + m2, err := me.test.Registry.NewMetric("m2", metrics.Counter) require.NoError(t, err) - thresholds := make(map[string]metrics.Thresholds, len(data.ths)) - for m, srcs := range data.ths { - ths := metrics.NewThresholds(srcs) - gotParseErr := ths.Parse() - require.NoError(t, gotParseErr) - thresholds[m] = ths - } - - test := newTestEngineWithTestPreInitState( - t, nil, nil, nil, lib.Options{Thresholds: thresholds}, piState, - ) + ths := metrics.NewThresholds([]string{tc.threshold}) + require.NoError(t, ths.Parse()) + m1.Thresholds = ths + m1.Thresholds.Thresholds[0].AbortOnFail = tc.abortOnFail - tag1 := piState.Registry.RootTagSet().With("a", "1") - tag2 := piState.Registry.RootTagSet().With("b", "1") + me.metricsWithThresholds = []*metrics.Metric{m1, m2} + m1.Sink.Add(metrics.Sample{Value: 6.0}) - test.engine.OutputManager.AddMetricSamples( - []metrics.SampleContainer{ - metrics.Sample{ - TimeSeries: metrics.TimeSeries{ - Metric: gaugeMetric, - Tags: tag1, - }, - Time: time.Now(), - Value: 1.25, - }, - metrics.Sample{ - TimeSeries: metrics.TimeSeries{ - Metric: counterMetric, - Tags: tag2, - }, - Time: time.Now(), - Value: 2, - }, - }, - ) - - require.NoError(t, test.run()) - test.wait() - - assert.Equal(t, data.pass, !test.engine.IsTainted()) + breached, abort := me.evaluateThresholds(false, zeroTestRunDuration) + require.Equal(t, tc.abortOnFail, abort) + assert.Equal(t, tc.expBreached, breached) }) } } -func getMetricSum(mo *mockoutput.MockOutput, name string) (result float64) { - for _, sc := range mo.SampleContainers { - for _, s := range sc.GetSamples() { - if s.Metric.Name == name { - result += s.Value - } - } - } - return +func TestMetricsEngineEvaluateIgnoreEmptySink(t *testing.T) { + t.Parallel() + + me := newTestMetricsEngine(t) + + m1, err := me.test.Registry.NewMetric("m1", metrics.Counter) + require.NoError(t, err) + m2, err := me.test.Registry.NewMetric("m2", metrics.Counter) + require.NoError(t, err) + + ths := metrics.NewThresholds([]string{"count>5"}) + require.NoError(t, ths.Parse()) + m1.Thresholds = ths + m1.Thresholds.Thresholds[0].AbortOnFail = true + + me.metricsWithThresholds = []*metrics.Metric{m1, m2} + + breached, abort := me.evaluateThresholds(false, zeroTestRunDuration) + require.True(t, abort) + require.Equal(t, []string{"m1"}, breached) + + breached, abort = me.evaluateThresholds(true, zeroTestRunDuration) + require.False(t, abort) + assert.Empty(t, breached) } -func getMetricCount(mo *mockoutput.MockOutput, name string) (result uint) { - for _, sc := range mo.SampleContainers { - for _, s := range sc.GetSamples() { - if s.Metric.Name == name { - result++ - } - } +func newTestMetricsEngine(t *testing.T) MetricsEngine { + trs := &lib.TestRunState{ + TestPreInitState: &lib.TestPreInitState{ + Logger: testutils.NewLogger(t), + Registry: metrics.NewRegistry(), + }, } - return -} -func getMetricMax(mo *mockoutput.MockOutput, name string) (result float64) { - for _, sc := range mo.SampleContainers { - for _, s := range sc.GetSamples() { - if s.Metric.Name == name && s.Value > result { - result = s.Value - } - } + return MetricsEngine{ + logger: trs.Logger, + test: trs, } - return } -const expectedHeaderMaxLength = 550 +func zeroTestRunDuration() time.Duration { + return 0 +} -// FIXME: This test is too brittle, consider simplifying. +/* +// FIXME: This test is too brittle, +// move them as e2e tests and consider to simplify. +// func TestSentReceivedMetrics(t *testing.T) { t.Parallel() tb := httpmultibin.NewHTTPMultiBin(t) diff --git a/metrics/engine/ingester_test.go b/metrics/engine/ingester_test.go new file mode 100644 index 00000000000..3bb155243bd --- /dev/null +++ b/metrics/engine/ingester_test.go @@ -0,0 +1,109 @@ +package engine + +import ( + "testing" + + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.k6.io/k6/lib" + "go.k6.io/k6/lib/testutils" + "go.k6.io/k6/metrics" +) + +func TestIngesterOutputFlushMetrics(t *testing.T) { + t.Parallel() + + piState := newTestPreInitState(t) + testMetric, err := piState.Registry.NewMetric("test_metric", metrics.Trend) + require.NoError(t, err) + + ingester := outputIngester{ + logger: piState.Logger, + metricsEngine: &MetricsEngine{ + ObservedMetrics: make(map[string]*metrics.Metric), + }, + } + require.NoError(t, ingester.Start()) + ingester.AddMetricSamples([]metrics.SampleContainer{metrics.Sample{ + TimeSeries: metrics.TimeSeries{Metric: testMetric}, + Value: 21, + }}) + ingester.AddMetricSamples([]metrics.SampleContainer{metrics.Sample{ + TimeSeries: metrics.TimeSeries{Metric: testMetric}, + Value: 21, + }}) + require.NoError(t, ingester.Stop()) + + require.Len(t, ingester.metricsEngine.ObservedMetrics, 1) + metric := ingester.metricsEngine.ObservedMetrics["test_metric"] + require.NotNil(t, metric) + require.NotNil(t, metric.Sink) + assert.Equal(t, testMetric, metric) + + sink := metric.Sink.(*metrics.TrendSink) //nolint:forcetypeassert + assert.Equal(t, 42.0, sink.Sum) +} + +func TestIngesterOutputFlushSubmetrics(t *testing.T) { + t.Parallel() + + piState := newTestPreInitState(t) + testMetric, err := piState.Registry.NewMetric("test_metric", metrics.Gauge) + require.NoError(t, err) + + me := &MetricsEngine{ + test: &lib.TestRunState{ + TestPreInitState: piState, + }, + ObservedMetrics: make(map[string]*metrics.Metric), + } + _, err = me.getThresholdMetricOrSubmetric("test_metric{a:1}") + require.NoError(t, err) + + // assert that observed metrics is empty before to start + require.Empty(t, me.ObservedMetrics) + + ingester := outputIngester{ + logger: piState.Logger, + metricsEngine: me, + } + require.NoError(t, ingester.Start()) + ingester.AddMetricSamples([]metrics.SampleContainer{metrics.Sample{ + TimeSeries: metrics.TimeSeries{ + Metric: testMetric, + Tags: piState.Registry.RootTagSet().WithTagsFromMap( + map[string]string{"a": "1", "b": "2"}), + }, + Value: 21, + }}) + require.NoError(t, ingester.Stop()) + + require.Len(t, ingester.metricsEngine.ObservedMetrics, 2) + + // assert the parent has been observed + metric := ingester.metricsEngine.ObservedMetrics["test_metric"] + require.NotNil(t, metric) + require.NotNil(t, metric.Sink) + assert.IsType(t, &metrics.GaugeSink{}, metric.Sink) + + // assert the submetric has been observed + metric = ingester.metricsEngine.ObservedMetrics["test_metric{a:1}"] + require.NotNil(t, metric) + require.NotNil(t, metric.Sink) + require.NotNil(t, metric.Sub) + assert.EqualValues(t, map[string]string{"a": "1"}, metric.Sub.Tags.Map()) + assert.IsType(t, &metrics.GaugeSink{}, metric.Sink) +} + +func newTestPreInitState(tb testing.TB) *lib.TestPreInitState { + reg := metrics.NewRegistry() + logger := testutils.NewLogger(tb) + logger.SetLevel(logrus.DebugLevel) + return &lib.TestPreInitState{ + Logger: logger, + RuntimeOptions: lib.RuntimeOptions{}, + Registry: reg, + BuiltinMetrics: metrics.RegisterBuiltinMetrics(reg), + } +}