From e2bfd964dd782b6c098e91c772533fbfda78ead5 Mon Sep 17 00:00:00 2001 From: Povilas Versockas Date: Tue, 9 Jul 2024 20:17:04 +0300 Subject: [PATCH] feat: [TKC-2194] improve workflow execution telemetry --- pkg/telemetry/payload.go | 25 +- .../testworkflowexecutor/executor.go | 43 +--- .../testworkflowmetrics.go | 234 ++++++++++++++++++ 3 files changed, 259 insertions(+), 43 deletions(-) create mode 100644 pkg/testworkflows/testworkflowexecutor/testworkflowmetrics.go diff --git a/pkg/telemetry/payload.go b/pkg/telemetry/payload.go index ece02f9a3a1..84b113212c3 100644 --- a/pkg/telemetry/payload.go +++ b/pkg/telemetry/payload.go @@ -34,9 +34,16 @@ type Params struct { ErrorType string `json:"error_type,omitempty"` ErrorStackTrace string `json:"error_stacktrace,omitempty"` TestWorkflowSteps int32 `json:"test_workflow_steps,omitempty"` + TestWorkflowExecuteCount int32 `json:"test_workflow_execute_count,omitempty"` + TestWorkflowParallelUsed bool `json:"test_workflow_parallel_used,omitempty"` + TestWorkflowMatrixUsed bool `json:"test_workflow_matrix_used,omitempty"` + TestWorkflowServicesUsed bool `json:"test_workflow_services_used,omitempty"` + TestWorkflowIsSample bool `json:"test_workflow_is_sample,omitempty"` + TestWorkflowTemplates []string `json:"testWorkflowTemplates"` + TestWorkflowImages []string `json:"testWorkflowImages"` TestWorkflowTemplateUsed bool `json:"test_workflow_template_used,omitempty"` - TestWorkflowImage string `json:"test_workflow_image,omitempty"` TestWorkflowArtifactUsed bool `json:"test_workflow_artifact_used,omitempty"` + TestWorkflowImage string `json:"test_workflow_image,omitempty"` TestWorkflowKubeshopGitURI bool `json:"test_workflow_kubeshop_git_uri,omitempty"` License string `json:"license,omitempty"` Step string `json:"step,omitempty"` @@ -84,9 +91,16 @@ type RunContext struct { type WorkflowParams struct { TestWorkflowSteps int32 - TestWorkflowTemplateUsed bool + TestWorkflowExecuteCount int32 TestWorkflowImage string TestWorkflowArtifactUsed bool + TestWorkflowParallelUsed bool + TestWorkflowMatrixUsed bool + TestWorkflowServicesUsed bool + TestWorkflowTemplateUsed bool + TestWorkflowIsSample bool + TestWorkflowTemplates []string + TestWorkflowImages []string TestWorkflowKubeshopGitURI bool } @@ -290,7 +304,14 @@ func NewRunWorkflowPayload(name, clusterType string, params RunWorkflowParams) P ClusterType: clusterType, Context: getAgentContext(), TestWorkflowSteps: params.TestWorkflowSteps, + TestWorkflowExecuteCount: params.TestWorkflowExecuteCount, + TestWorkflowParallelUsed: params.TestWorkflowParallelUsed, TestWorkflowTemplateUsed: params.TestWorkflowTemplateUsed, + TestWorkflowMatrixUsed: params.TestWorkflowMatrixUsed, + TestWorkflowServicesUsed: params.TestWorkflowServicesUsed, + TestWorkflowIsSample: params.TestWorkflowIsSample, + TestWorkflowTemplates: params.TestWorkflowTemplates, + TestWorkflowImages: params.TestWorkflowImages, TestWorkflowImage: params.TestWorkflowImage, TestWorkflowArtifactUsed: params.TestWorkflowArtifactUsed, TestWorkflowKubeshopGitURI: params.TestWorkflowKubeshopGitURI, diff --git a/pkg/testworkflows/testworkflowexecutor/executor.go b/pkg/testworkflows/testworkflowexecutor/executor.go index 3dea3b6d0f8..d3f00502c13 100644 --- a/pkg/testworkflows/testworkflowexecutor/executor.go +++ b/pkg/testworkflows/testworkflowexecutor/executor.go @@ -29,13 +29,10 @@ import ( configRepo "github.com/kubeshop/testkube/pkg/repository/config" "github.com/kubeshop/testkube/pkg/repository/result" "github.com/kubeshop/testkube/pkg/repository/testworkflow" - "github.com/kubeshop/testkube/pkg/telemetry" - "github.com/kubeshop/testkube/pkg/testworkflows" "github.com/kubeshop/testkube/pkg/testworkflows/testworkflowcontroller" "github.com/kubeshop/testkube/pkg/testworkflows/testworkflowprocessor" "github.com/kubeshop/testkube/pkg/testworkflows/testworkflowprocessor/constants" "github.com/kubeshop/testkube/pkg/testworkflows/testworkflowresolver" - "github.com/kubeshop/testkube/pkg/version" ) //go:generate mockgen -destination=./mock_executor.go -package=testworkflowexecutor "github.com/kubeshop/testkube/pkg/testworkflows/testworkflowexecutor" TestWorkflowExecutor @@ -294,6 +291,8 @@ func (e *executor) Control(ctx context.Context, testWorkflow *testworkflowsv1.Te // TODO: Consider AppendOutput ($push) instead _ = e.repository.UpdateOutput(ctx, execution.Id, execution.Output) if execution.Result.IsFinished() { + e.sendRunWorkflowTelemetry(ctx, testWorkflow, execution) + if execution.Result.IsPassed() { e.emitter.Notify(testkube.NewEventEndTestWorkflowSuccess(execution)) } else if execution.Result.IsAborted() { @@ -537,8 +536,6 @@ func (e *executor) Execute(ctx context.Context, workflow testworkflowsv1.TestWor return execution, errors.Wrap(err, "deploying required resources") } - e.sendRunWorkflowTelemetry(ctx, &workflow) - // Start to control the results go func() { err = e.Control(context.Background(), initialWorkflow, &execution) @@ -550,39 +547,3 @@ func (e *executor) Execute(ctx context.Context, workflow testworkflowsv1.TestWor return execution, nil } - -func (e *executor) sendRunWorkflowTelemetry(ctx context.Context, workflow *testworkflowsv1.TestWorkflow) { - if workflow == nil { - log.DefaultLogger.Debug("empty workflow passed to telemetry event") - return - } - telemetryEnabled, err := e.configMap.GetTelemetryEnabled(ctx) - if err != nil { - log.DefaultLogger.Debugf("getting telemetry enabled error", "error", err) - } - if !telemetryEnabled { - return - } - - out, err := telemetry.SendRunWorkflowEvent("testkube_api_run_test_workflow", telemetry.RunWorkflowParams{ - RunParams: telemetry.RunParams{ - AppVersion: version.Version, - DataSource: testworkflows.GetDataSource(workflow.Spec.Content), - Host: testworkflows.GetHostname(), - ClusterID: testworkflows.GetClusterID(ctx, e.configMap), - }, - WorkflowParams: telemetry.WorkflowParams{ - TestWorkflowSteps: int32(len(workflow.Spec.Setup) + len(workflow.Spec.Steps) + len(workflow.Spec.After)), - TestWorkflowImage: testworkflows.GetImage(workflow.Spec.Container), - TestWorkflowArtifactUsed: testworkflows.HasWorkflowStepLike(workflow.Spec, testworkflows.HasArtifacts), - TestWorkflowKubeshopGitURI: testworkflows.IsKubeshopGitURI(workflow.Spec.Content) || - testworkflows.HasWorkflowStepLike(workflow.Spec, testworkflows.HasKubeshopGitURI), - }, - }) - - if err != nil { - log.DefaultLogger.Debugf("sending run test workflow telemetry event error", "error", err) - } else { - log.DefaultLogger.Debugf("sending run test workflow telemetry event", "output", out) - } -} diff --git a/pkg/testworkflows/testworkflowexecutor/testworkflowmetrics.go b/pkg/testworkflows/testworkflowexecutor/testworkflowmetrics.go new file mode 100644 index 00000000000..dae361f581b --- /dev/null +++ b/pkg/testworkflows/testworkflowexecutor/testworkflowmetrics.go @@ -0,0 +1,234 @@ +package testworkflowexecutor + +import ( + "context" + "strings" + + testworkflowsv1 "github.com/kubeshop/testkube-operator/api/testworkflows/v1" + + "github.com/kubeshop/testkube/pkg/api/v1/testkube" + "github.com/kubeshop/testkube/pkg/log" + "github.com/kubeshop/testkube/pkg/telemetry" + "github.com/kubeshop/testkube/pkg/testworkflows" + "github.com/kubeshop/testkube/pkg/version" +) + +type stepStats struct { + numSteps int + numExecute int + hasArtifacts bool + hasMatrix bool + hasParallel bool + hasTemplate bool + hasServices bool + imagesUsed map[string]struct{} + templatesUsed map[string]struct{} +} + +func (ss *stepStats) Merge(stats *stepStats) { + ss.numSteps += stats.numSteps + ss.numExecute += stats.numExecute + + if stats.hasArtifacts { + ss.hasArtifacts = true + } + if stats.hasMatrix { + ss.hasMatrix = true + } + if stats.hasParallel { + ss.hasParallel = true + } + if stats.hasServices { + ss.hasServices = true + } + if stats.hasTemplate { + ss.hasTemplate = true + } + for image := range stats.imagesUsed { + ss.imagesUsed[image] = struct{}{} + } + for tmpl := range stats.templatesUsed { + ss.templatesUsed[tmpl] = struct{}{} + } +} + +func getStepInfo(step testworkflowsv1.Step) *stepStats { + res := &stepStats{ + imagesUsed: make(map[string]struct{}), + templatesUsed: make(map[string]struct{}), + } + if step.Execute != nil { + res.numExecute++ + } + if step.Artifacts != nil { + res.hasArtifacts = true + } + if len(step.Use) > 0 { + res.hasTemplate = true + for _, tmpl := range step.Use { + res.templatesUsed[tmpl.Name] = struct{}{} + } + } + if step.Template != nil { + res.hasTemplate = true + res.templatesUsed[step.Template.Name] = struct{}{} + } + if len(step.Services) > 0 { + res.hasServices = true + } + + if step.Run != nil && step.Run.Image != "" { + res.imagesUsed[step.Run.Image] = struct{}{} + } + if step.Container != nil && step.Container.Image != "" { + res.imagesUsed[step.Container.Image] = struct{}{} + } + + for _, step := range step.Steps { + res.Merge(getStepInfo(step)) + } + + if step.Parallel != nil { + res.hasParallel = true + + if len(step.Parallel.Matrix) != 0 { + res.hasMatrix = true + } + if step.Parallel.Artifacts != nil { + res.hasArtifacts = true + } + if step.Parallel.Execute != nil { + res.numExecute++ + } + if len(step.Parallel.Use) > 0 { + res.hasTemplate = true + for _, tmpl := range step.Parallel.Use { + res.templatesUsed[tmpl.Name] = struct{}{} + } + } + if step.Parallel.Template != nil { + res.hasTemplate = true + res.templatesUsed[step.Parallel.Template.Name] = struct{}{} + } + + if len(step.Parallel.Services) > 0 { + res.hasServices = true + } + + if step.Parallel.Run != nil && step.Parallel.Run.Image != "" { + res.imagesUsed[step.Parallel.Run.Image] = struct{}{} + } + if step.Parallel.Container != nil && step.Parallel.Container.Image != "" { + res.imagesUsed[step.Parallel.Container.Image] = struct{}{} + } + + for _, step := range step.Parallel.Steps { + res.Merge(getStepInfo(step)) + } + } + + return res +} + +func (e *executor) sendRunWorkflowTelemetry(ctx context.Context, workflow *testworkflowsv1.TestWorkflow, execution *testkube.TestWorkflowExecution) { + if workflow == nil { + log.DefaultLogger.Debug("empty workflow passed to telemetry event") + return + } + telemetryEnabled, err := e.configMap.GetTelemetryEnabled(ctx) + if err != nil { + log.DefaultLogger.Debugf("getting telemetry enabled error", "error", err) + } + if !telemetryEnabled { + return + } + + properties := make(map[string]any) + properties["name"] = workflow.Name + stats := stepStats{ + imagesUsed: make(map[string]struct{}), + templatesUsed: make(map[string]struct{}), + } + + var isSample bool + if workflow.Labels != nil && workflow.Labels["docs"] == "example" && strings.HasSuffix(workflow.Name, "-sample") { + isSample = true + } else { + isSample = false + } + + spec := workflow.Spec + for _, step := range spec.Steps { + stats.Merge(getStepInfo(step)) + } + if spec.Container != nil { + stats.imagesUsed[spec.Container.Image] = struct{}{} + } + if len(spec.Services) != 0 { + stats.hasServices = true + } + if len(spec.Use) > 0 { + stats.hasTemplate = true + for _, tmpl := range spec.Use { + stats.templatesUsed[tmpl.Name] = struct{}{} + } + } + + var images []string + for image := range stats.imagesUsed { + if image == "" { + continue + } + images = append(images, image) + } + + var templates []string + for t := range stats.templatesUsed { + if t == "" { + continue + } + templates = append(templates, t) + } + var ( + status string + durationMs int32 + ) + if execution.Result != nil { + if execution.Result.Status != nil { + status = string(*execution.Result.Status) + } + durationMs = execution.Result.DurationMs + } + + out, err := telemetry.SendRunWorkflowEvent("testkube_api_run_test_workflow", telemetry.RunWorkflowParams{ + RunParams: telemetry.RunParams{ + AppVersion: version.Version, + DataSource: testworkflows.GetDataSource(workflow.Spec.Content), + Host: testworkflows.GetHostname(), + ClusterID: testworkflows.GetClusterID(ctx, e.configMap), + DurationMs: durationMs, + Status: status, + }, + WorkflowParams: telemetry.WorkflowParams{ + TestWorkflowSteps: int32(stats.numSteps), + TestWorkflowExecuteCount: int32(stats.numExecute), + TestWorkflowImage: testworkflows.GetImage(workflow.Spec.Container), + TestWorkflowArtifactUsed: stats.hasArtifacts, + TestWorkflowParallelUsed: stats.hasParallel, + TestWorkflowMatrixUsed: stats.hasMatrix, + TestWorkflowServicesUsed: stats.hasServices, + TestWorkflowTemplateUsed: stats.hasTemplate, + TestWorkflowIsSample: isSample, + TestWorkflowTemplates: templates, + TestWorkflowImages: images, + TestWorkflowKubeshopGitURI: testworkflows.IsKubeshopGitURI(workflow.Spec.Content) || + testworkflows.HasWorkflowStepLike(workflow.Spec, testworkflows.HasKubeshopGitURI), + }, + }) + + if err != nil { + log.DefaultLogger.Debugf("sending run test workflow telemetry event error", "error", err) + } else { + log.DefaultLogger.Debugf("sending run test workflow telemetry event", "output", out) + } +}