Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor various metric APIs to make distributed execution easier #3191

Merged
merged 4 commits into from
Jul 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion api/v1/group_routes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func getControlSurface(tb testing.TB, testState *lib.TestRunState) *ControlSurfa
execScheduler, err := execution.NewScheduler(testState)
require.NoError(tb, err)

me, err := engine.NewMetricsEngine(testState)
me, err := engine.NewMetricsEngine(testState.Registry, testState.Logger)
require.NoError(tb, err)

ctx, cancel := context.WithCancel(context.Background())
Expand Down
2 changes: 1 addition & 1 deletion api/v1/setup_teardown_routes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func TestSetupData(t *testing.T) {

execScheduler, err := execution.NewScheduler(testState)
require.NoError(t, err)
metricsEngine, err := engine.NewMetricsEngine(testState)
metricsEngine, err := engine.NewMetricsEngine(testState.Registry, testState.Logger)
require.NoError(t, err)

globalCtx, globalCancel := context.WithCancel(context.Background())
Expand Down
2 changes: 1 addition & 1 deletion api/v1/status_routes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func TestPatchStatus(t *testing.T) {
execScheduler, err := execution.NewScheduler(testState)
require.NoError(t, err)

metricsEngine, err := engine.NewMetricsEngine(testState)
metricsEngine, err := engine.NewMetricsEngine(testState.Registry, testState.Logger)
require.NoError(t, err)

globalCtx, globalCancel := context.WithCancel(context.Background())
Expand Down
25 changes: 19 additions & 6 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,17 +147,28 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) (err error) {
return err
}

executionState := execScheduler.GetState()
metricsEngine, err := engine.NewMetricsEngine(executionState.Test)
metricsEngine, err := engine.NewMetricsEngine(testRunState.Registry, logger)
if err != nil {
return err
}
if !testRunState.RuntimeOptions.NoSummary.Bool || !testRunState.RuntimeOptions.NoThresholds.Bool {

// We'll need to pipe metrics to the MetricsEngine and process them if any
// of these are enabled: thresholds, end-of-test summary
shouldProcessMetrics := (!testRunState.RuntimeOptions.NoSummary.Bool ||
!testRunState.RuntimeOptions.NoThresholds.Bool)
var metricsIngester *engine.OutputIngester
if shouldProcessMetrics {
err = metricsEngine.InitSubMetricsAndThresholds(conf.Options, testRunState.RuntimeOptions.NoThresholds.Bool)
if err != nil {
return err
}
// We'll need to pipe metrics to the MetricsEngine if either the
// thresholds or the end-of-test summary are enabled.
outputs = append(outputs, metricsEngine.CreateIngester())
metricsIngester = metricsEngine.CreateIngester()
outputs = append(outputs, metricsIngester)
}

executionState := execScheduler.GetState()
if !testRunState.RuntimeOptions.NoSummary.Bool {
defer func() {
logger.Debug("Generating the end-of-test summary...")
Expand Down Expand Up @@ -208,7 +219,9 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) (err error) {
}()

if !testRunState.RuntimeOptions.NoThresholds.Bool {
finalizeThresholds := metricsEngine.StartThresholdCalculations(runAbort, executionState.GetCurrentTestRunDuration)
finalizeThresholds := metricsEngine.StartThresholdCalculations(
metricsIngester, runAbort, executionState.GetCurrentTestRunDuration,
)
handleFinalThresholdCalculation := func() {
// This gets called after the Samples channel has been closed and
// the OutputManager has flushed all of the cached samples to
Expand Down Expand Up @@ -283,7 +296,7 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) (err error) {
}

printExecutionDescription(
c.gs, "local", args[0], "", conf, execScheduler.GetState().ExecutionTuple, executionPlan, outputs,
c.gs, "local", args[0], "", conf, executionState.ExecutionTuple, executionPlan, outputs,
)

// Trap Interrupts, SIGINTs and SIGTERMs.
Expand Down
14 changes: 7 additions & 7 deletions execution/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ func (e *Scheduler) runExecutor(
// and emission of the `vus` and `vus_max` metrics.
func (e *Scheduler) Init(
runCtx context.Context, samplesOut chan<- metrics.SampleContainer,
) (stopVUEmission func(), err error) {
) (stopVUEmission func(), initErr error) {
logger := e.state.Test.Logger.WithField("phase", "execution-scheduler-init")

execSchedRunCtx, execSchedRunCancel := context.WithCancel(runCtx)
Expand All @@ -390,11 +390,11 @@ func (e *Scheduler) Init(

defer func() {
if interruptErr := GetCancelReasonIfTestAborted(runCtx); interruptErr != nil {
logger.Debugf("The test run was interrupted, returning '%s' instead of '%s'", interruptErr, err)
logger.Debugf("The test run was interrupted, returning '%s' instead of '%s'", interruptErr, initErr)
e.state.SetExecutionStatus(lib.ExecutionStatusInterrupted)
err = interruptErr
initErr = interruptErr
}
if err != nil {
if initErr != nil {
stopVUEmission()
}
}()
Expand All @@ -406,14 +406,14 @@ func (e *Scheduler) Init(
// out channel.
//
//nolint:funlen
func (e *Scheduler) Run(globalCtx, runCtx context.Context, samplesOut chan<- metrics.SampleContainer) (err error) {
func (e *Scheduler) Run(globalCtx, runCtx context.Context, samplesOut chan<- metrics.SampleContainer) (runErr error) {
logger := e.state.Test.Logger.WithField("phase", "execution-scheduler-run")

defer func() {
if interruptErr := GetCancelReasonIfTestAborted(runCtx); interruptErr != nil {
logger.Debugf("The test run was interrupted, returning '%s' instead of '%s'", interruptErr, err)
logger.Debugf("The test run was interrupted, returning '%s' instead of '%s'", interruptErr, runErr)
e.state.SetExecutionStatus(lib.ExecutionStatusInterrupted)
err = interruptErr
runErr = interruptErr
}
}()

Expand Down
9 changes: 1 addition & 8 deletions js/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ import (
"go.k6.io/k6/lib/testutils/mockoutput"
"go.k6.io/k6/lib/types"
"go.k6.io/k6/metrics"
"go.k6.io/k6/metrics/engine"
"go.k6.io/k6/output"
)

Expand Down Expand Up @@ -387,23 +386,17 @@ func TestDataIsolation(t *testing.T) {
execScheduler, err := execution.NewScheduler(testRunState)
require.NoError(t, err)

metricsEngine, err := engine.NewMetricsEngine(testRunState)
require.NoError(t, err)

globalCtx, globalCancel := context.WithCancel(context.Background())
defer globalCancel()
runCtx, runAbort := execution.NewTestRunContext(globalCtx, testRunState.Logger)

mockOutput := mockoutput.New()
outputManager := output.NewManager([]output.Output{mockOutput, metricsEngine.CreateIngester()}, testRunState.Logger, runAbort)
outputManager := output.NewManager([]output.Output{mockOutput}, testRunState.Logger, runAbort)
samples := make(chan metrics.SampleContainer, 1000)
waitForMetricsFlushed, stopOutputs, err := outputManager.Start(samples)
require.NoError(t, err)
defer stopOutputs(nil)

finalizeThresholds := metricsEngine.StartThresholdCalculations(runAbort, execScheduler.GetState().GetCurrentTestRunDuration)
require.Nil(t, finalizeThresholds)

require.Empty(t, runner.defaultGroup.Groups)

stopEmission, err := execScheduler.Init(runCtx, samples)
Expand Down
2 changes: 1 addition & 1 deletion js/summary_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func createTestMetrics(t *testing.T) (map[string]*metrics.Metric, *lib.Group) {
require.NoError(t, err)
checksMetric.Tainted = null.BoolFrom(false)
checksMetric.Thresholds = metrics.Thresholds{Thresholds: []*metrics.Threshold{{Source: "rate>70", LastFailed: false}}}
sink := &metrics.TrendSink{}
sink := metrics.NewTrendSink()

samples := []float64{10.0, 15.0, 20.0}
for _, s := range samples {
Expand Down
48 changes: 20 additions & 28 deletions metrics/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"go.k6.io/k6/errext/exitcodes"
"go.k6.io/k6/lib"
"go.k6.io/k6/metrics"
"go.k6.io/k6/output"
"gopkg.in/guregu/null.v3"
)

Expand All @@ -25,13 +24,11 @@ const thresholdsRate = 2 * time.Second
// aggregated metric sample values. They are used to generate the end-of-test
// summary and to evaluate the test thresholds.
type MetricsEngine struct {
logger logrus.FieldLogger
test *lib.TestRunState
outputIngester *outputIngester
registry *metrics.Registry
logger logrus.FieldLogger

// These can be both top-level metrics or sub-metrics
metricsWithThresholds []*metrics.Metric

metricsWithThresholds []*metrics.Metric
breachedThresholdsCount uint32

// TODO: completely refactor:
Expand All @@ -44,39 +41,31 @@ type MetricsEngine struct {
}

// NewMetricsEngine creates a new metrics Engine with the given parameters.
func NewMetricsEngine(runState *lib.TestRunState) (*MetricsEngine, error) {
func NewMetricsEngine(registry *metrics.Registry, logger logrus.FieldLogger) (*MetricsEngine, error) {
me := &MetricsEngine{
test: runState,
logger: runState.Logger.WithField("component", "metrics-engine"),
registry: registry,
logger: logger.WithField("component", "metrics-engine"),
ObservedMetrics: make(map[string]*metrics.Metric),
}

if !(me.test.RuntimeOptions.NoSummary.Bool && me.test.RuntimeOptions.NoThresholds.Bool) {
err := me.initSubMetricsAndThresholds()
if err != nil {
return nil, err
}
}

return me, nil
}

// CreateIngester returns a pseudo-Output that uses the given metric samples to
// update the engine's inner state.
func (me *MetricsEngine) CreateIngester() output.Output {
me.outputIngester = &outputIngester{
func (me *MetricsEngine) CreateIngester() *OutputIngester {
return &OutputIngester{
logger: me.logger.WithField("component", "metrics-engine-ingester"),
metricsEngine: me,
cardinality: newCardinalityControl(),
}
return me.outputIngester
}

func (me *MetricsEngine) getThresholdMetricOrSubmetric(name string) (*metrics.Metric, error) {
// TODO: replace with strings.Cut after Go 1.18
nameParts := strings.SplitN(name, "{", 2)

metric := me.test.Registry.Get(nameParts[0])
metric := me.registry.Get(nameParts[0])
if metric == nil {
return nil, fmt.Errorf("metric '%s' does not exist in the script", nameParts[0])
}
Expand Down Expand Up @@ -125,11 +114,14 @@ func (me *MetricsEngine) markObserved(metric *metrics.Metric) {
}
}

func (me *MetricsEngine) initSubMetricsAndThresholds() error {
for metricName, thresholds := range me.test.Options.Thresholds {
// InitSubMetricsAndThresholds parses the thresholds from the test Options and
// initializes both the thresholds themselves, as well as any submetrics that
// were referenced in them.
func (me *MetricsEngine) InitSubMetricsAndThresholds(options lib.Options, onlyLogErrors bool) error {
for metricName, thresholds := range options.Thresholds {
metric, err := me.getThresholdMetricOrSubmetric(metricName)

if me.test.RuntimeOptions.NoThresholds.Bool {
if onlyLogErrors {
if err != nil {
me.logger.WithError(err).Warnf("Invalid metric '%s' in threshold definitions", metricName)
}
Expand All @@ -154,7 +146,7 @@ func (me *MetricsEngine) initSubMetricsAndThresholds() error {

// TODO: refactor out of here when https://github.com/grafana/k6/issues/1321
// lands and there is a better way to enable a metric with tag
if me.test.Options.SystemTags.Has(metrics.TagExpectedResponse) {
if options.SystemTags.Has(metrics.TagExpectedResponse) {
_, err := me.getThresholdMetricOrSubmetric("http_req_duration{expected_response:true}")
if err != nil {
return err // shouldn't happen, but ¯\_(ツ)_/¯
Expand All @@ -167,10 +159,10 @@ func (me *MetricsEngine) initSubMetricsAndThresholds() error {
// StartThresholdCalculations spins up a new goroutine to crunch thresholds and
// returns a callback that will stop the goroutine and finalizes calculations.
func (me *MetricsEngine) StartThresholdCalculations(
ingester *OutputIngester,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is already an improvement, in the final picture, I think we should try to reverse the control here. Passing the MetricsEngine as a dependency to the Ingester (the Output).

Just sharing this to you for collecting thoughts, not really a request for change.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this might be a better approach 🤔

Though the MetricsEngine and the OutputIngester also have other issues that I didn't fully fix when I extracted them from the core.Engine (e.g. #572 (comment) probably being the biggest one) that we can probably refactor things here even further 🤔

Still, this PR at least somewhat further decouples the metrics crunching from the test execution, so any such refactoring should hopefully be easier 🤞

abortRun func(error),
getCurrentTestRunDuration func() time.Duration,
) (finalize func() (breached []string),
) {
) (finalize func() (breached []string)) {
if len(me.metricsWithThresholds) == 0 {
return nil // no thresholds were defined
}
Expand Down Expand Up @@ -205,9 +197,9 @@ func (me *MetricsEngine) StartThresholdCalculations(
}()

return func() []string {
if me.outputIngester != nil {
if ingester != nil {
// Stop the ingester so we don't get any more metrics
err := me.outputIngester.Stop()
err := ingester.Stop()
if err != nil {
me.logger.WithError(err).Warnf("There was a problem stopping the output ingester.")
}
Expand Down
40 changes: 13 additions & 27 deletions metrics/engine/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,12 @@ func TestNewMetricsEngineWithThresholds(t *testing.T) {
_, err = trs.Registry.NewMetric("metric2", metrics.Counter)
require.NoError(t, err)

me, err := NewMetricsEngine(trs)
me, err := NewMetricsEngine(trs.Registry, trs.Logger)
require.NoError(t, err)
require.NotNil(t, me)

require.NoError(t, me.InitSubMetricsAndThresholds(trs.Options, false))

assert.Len(t, me.metricsWithThresholds, 2)
}

Expand All @@ -57,7 +59,7 @@ func TestMetricsEngineGetThresholdMetricOrSubmetricError(t *testing.T) {
t.Parallel()

me := newTestMetricsEngine(t)
_, err := me.test.Registry.NewMetric("metric1", metrics.Counter)
_, err := me.registry.NewMetric("metric1", metrics.Counter)
require.NoError(t, err)

_, err = me.getThresholdMetricOrSubmetric(tc.metricDefinition)
Expand All @@ -69,16 +71,8 @@ func TestMetricsEngineGetThresholdMetricOrSubmetricError(t *testing.T) {
func TestNewMetricsEngineNoThresholds(t *testing.T) {
t.Parallel()

trs := &lib.TestRunState{
TestPreInitState: &lib.TestPreInitState{
Logger: testutils.NewLogger(t),
},
}

me, err := NewMetricsEngine(trs)
require.NoError(t, err)
me := newTestMetricsEngine(t)
require.NotNil(t, me)

assert.Empty(t, me.metricsWithThresholds)
}

Expand Down Expand Up @@ -113,9 +107,9 @@ func TestMetricsEngineEvaluateThresholdNoAbort(t *testing.T) {
t.Parallel()
me := newTestMetricsEngine(t)

m1, err := me.test.Registry.NewMetric("m1", metrics.Counter)
m1, err := me.registry.NewMetric("m1", metrics.Counter)
require.NoError(t, err)
m2, err := me.test.Registry.NewMetric("m2", metrics.Counter)
m2, err := me.registry.NewMetric("m2", metrics.Counter)
require.NoError(t, err)

ths := metrics.NewThresholds([]string{tc.threshold})
Expand All @@ -138,9 +132,9 @@ func TestMetricsEngineEvaluateIgnoreEmptySink(t *testing.T) {

me := newTestMetricsEngine(t)

m1, err := me.test.Registry.NewMetric("m1", metrics.Counter)
m1, err := me.registry.NewMetric("m1", metrics.Counter)
require.NoError(t, err)
m2, err := me.test.Registry.NewMetric("m2", metrics.Counter)
m2, err := me.registry.NewMetric("m2", metrics.Counter)
require.NoError(t, err)

ths := metrics.NewThresholds([]string{"count>5"})
Expand All @@ -159,18 +153,10 @@ func TestMetricsEngineEvaluateIgnoreEmptySink(t *testing.T) {
assert.Empty(t, breached)
}

func newTestMetricsEngine(t *testing.T) MetricsEngine {
trs := &lib.TestRunState{
TestPreInitState: &lib.TestPreInitState{
Logger: testutils.NewLogger(t),
Registry: metrics.NewRegistry(),
},
}

return MetricsEngine{
logger: trs.Logger,
test: trs,
}
func newTestMetricsEngine(t *testing.T) *MetricsEngine {
m, err := NewMetricsEngine(metrics.NewRegistry(), testutils.NewLogger(t))
require.NoError(t, err)
return m
}

func zeroTestRunDuration() time.Duration {
Expand Down
Loading