From 82371d48dce9c70ec2bcad8a466147c808712c73 Mon Sep 17 00:00:00 2001
From: Povilas Versockas
Date: Tue, 9 Jul 2024 20:17:04 +0300
Subject: [PATCH] feat: [TKC-2194] improve workflow execution telemetry
---
pkg/telemetry/payload.go | 25 +-
.../testworkflowexecutor/executor.go | 43 +---
.../testworkflowmetrics.go | 234 ++++++++++++++++++
3 files changed, 259 insertions(+), 43 deletions(-)
create mode 100644 pkg/testworkflows/testworkflowexecutor/testworkflowmetrics.go
diff --git a/pkg/telemetry/payload.go b/pkg/telemetry/payload.go
index ece02f9a3a1..84b113212c3 100644
--- a/pkg/telemetry/payload.go
+++ b/pkg/telemetry/payload.go
@@ -34,9 +34,16 @@ type Params struct {
ErrorType string `json:"error_type,omitempty"`
ErrorStackTrace string `json:"error_stacktrace,omitempty"`
TestWorkflowSteps int32 `json:"test_workflow_steps,omitempty"`
+ TestWorkflowExecuteCount int32 `json:"test_workflow_execute_count,omitempty"`
+ TestWorkflowParallelUsed bool `json:"test_workflow_parallel_used,omitempty"`
+ TestWorkflowMatrixUsed bool `json:"test_workflow_matrix_used,omitempty"`
+ TestWorkflowServicesUsed bool `json:"test_workflow_services_used,omitempty"`
+ TestWorkflowIsSample bool `json:"test_workflow_is_sample,omitempty"`
+ TestWorkflowTemplates []string `json:"testWorkflowTemplates"`
+ TestWorkflowImages []string `json:"testWorkflowImages"`
TestWorkflowTemplateUsed bool `json:"test_workflow_template_used,omitempty"`
- TestWorkflowImage string `json:"test_workflow_image,omitempty"`
TestWorkflowArtifactUsed bool `json:"test_workflow_artifact_used,omitempty"`
+ TestWorkflowImage string `json:"test_workflow_image,omitempty"`
TestWorkflowKubeshopGitURI bool `json:"test_workflow_kubeshop_git_uri,omitempty"`
License string `json:"license,omitempty"`
Step string `json:"step,omitempty"`
@@ -84,9 +91,16 @@ type RunContext struct {
type WorkflowParams struct {
TestWorkflowSteps int32
- TestWorkflowTemplateUsed bool
+ TestWorkflowExecuteCount int32
TestWorkflowImage string
TestWorkflowArtifactUsed bool
+ TestWorkflowParallelUsed bool
+ TestWorkflowMatrixUsed bool
+ TestWorkflowServicesUsed bool
+ TestWorkflowTemplateUsed bool
+ TestWorkflowIsSample bool
+ TestWorkflowTemplates []string
+ TestWorkflowImages []string
TestWorkflowKubeshopGitURI bool
}
@@ -290,7 +304,14 @@ func NewRunWorkflowPayload(name, clusterType string, params RunWorkflowParams) P
ClusterType: clusterType,
Context: getAgentContext(),
TestWorkflowSteps: params.TestWorkflowSteps,
+ TestWorkflowExecuteCount: params.TestWorkflowExecuteCount,
+ TestWorkflowParallelUsed: params.TestWorkflowParallelUsed,
TestWorkflowTemplateUsed: params.TestWorkflowTemplateUsed,
+ TestWorkflowMatrixUsed: params.TestWorkflowMatrixUsed,
+ TestWorkflowServicesUsed: params.TestWorkflowServicesUsed,
+ TestWorkflowIsSample: params.TestWorkflowIsSample,
+ TestWorkflowTemplates: params.TestWorkflowTemplates,
+ TestWorkflowImages: params.TestWorkflowImages,
TestWorkflowImage: params.TestWorkflowImage,
TestWorkflowArtifactUsed: params.TestWorkflowArtifactUsed,
TestWorkflowKubeshopGitURI: params.TestWorkflowKubeshopGitURI,
diff --git a/pkg/testworkflows/testworkflowexecutor/executor.go b/pkg/testworkflows/testworkflowexecutor/executor.go
index 3dea3b6d0f8..d3f00502c13 100644
--- a/pkg/testworkflows/testworkflowexecutor/executor.go
+++ b/pkg/testworkflows/testworkflowexecutor/executor.go
@@ -29,13 +29,10 @@ import (
configRepo "github.com/kubeshop/testkube/pkg/repository/config"
"github.com/kubeshop/testkube/pkg/repository/result"
"github.com/kubeshop/testkube/pkg/repository/testworkflow"
- "github.com/kubeshop/testkube/pkg/telemetry"
- "github.com/kubeshop/testkube/pkg/testworkflows"
"github.com/kubeshop/testkube/pkg/testworkflows/testworkflowcontroller"
"github.com/kubeshop/testkube/pkg/testworkflows/testworkflowprocessor"
"github.com/kubeshop/testkube/pkg/testworkflows/testworkflowprocessor/constants"
"github.com/kubeshop/testkube/pkg/testworkflows/testworkflowresolver"
- "github.com/kubeshop/testkube/pkg/version"
)
//go:generate mockgen -destination=./mock_executor.go -package=testworkflowexecutor "github.com/kubeshop/testkube/pkg/testworkflows/testworkflowexecutor" TestWorkflowExecutor
@@ -294,6 +291,8 @@ func (e *executor) Control(ctx context.Context, testWorkflow *testworkflowsv1.Te
// TODO: Consider AppendOutput ($push) instead
_ = e.repository.UpdateOutput(ctx, execution.Id, execution.Output)
if execution.Result.IsFinished() {
+ e.sendRunWorkflowTelemetry(ctx, testWorkflow, execution)
+
if execution.Result.IsPassed() {
e.emitter.Notify(testkube.NewEventEndTestWorkflowSuccess(execution))
} else if execution.Result.IsAborted() {
@@ -537,8 +536,6 @@ func (e *executor) Execute(ctx context.Context, workflow testworkflowsv1.TestWor
return execution, errors.Wrap(err, "deploying required resources")
}
- e.sendRunWorkflowTelemetry(ctx, &workflow)
-
// Start to control the results
go func() {
err = e.Control(context.Background(), initialWorkflow, &execution)
@@ -550,39 +547,3 @@ func (e *executor) Execute(ctx context.Context, workflow testworkflowsv1.TestWor
return execution, nil
}
-
-func (e *executor) sendRunWorkflowTelemetry(ctx context.Context, workflow *testworkflowsv1.TestWorkflow) {
- if workflow == nil {
- log.DefaultLogger.Debug("empty workflow passed to telemetry event")
- return
- }
- telemetryEnabled, err := e.configMap.GetTelemetryEnabled(ctx)
- if err != nil {
- log.DefaultLogger.Debugf("getting telemetry enabled error", "error", err)
- }
- if !telemetryEnabled {
- return
- }
-
- out, err := telemetry.SendRunWorkflowEvent("testkube_api_run_test_workflow", telemetry.RunWorkflowParams{
- RunParams: telemetry.RunParams{
- AppVersion: version.Version,
- DataSource: testworkflows.GetDataSource(workflow.Spec.Content),
- Host: testworkflows.GetHostname(),
- ClusterID: testworkflows.GetClusterID(ctx, e.configMap),
- },
- WorkflowParams: telemetry.WorkflowParams{
- TestWorkflowSteps: int32(len(workflow.Spec.Setup) + len(workflow.Spec.Steps) + len(workflow.Spec.After)),
- TestWorkflowImage: testworkflows.GetImage(workflow.Spec.Container),
- TestWorkflowArtifactUsed: testworkflows.HasWorkflowStepLike(workflow.Spec, testworkflows.HasArtifacts),
- TestWorkflowKubeshopGitURI: testworkflows.IsKubeshopGitURI(workflow.Spec.Content) ||
- testworkflows.HasWorkflowStepLike(workflow.Spec, testworkflows.HasKubeshopGitURI),
- },
- })
-
- if err != nil {
- log.DefaultLogger.Debugf("sending run test workflow telemetry event error", "error", err)
- } else {
- log.DefaultLogger.Debugf("sending run test workflow telemetry event", "output", out)
- }
-}
diff --git a/pkg/testworkflows/testworkflowexecutor/testworkflowmetrics.go b/pkg/testworkflows/testworkflowexecutor/testworkflowmetrics.go
new file mode 100644
index 00000000000..aa1f89f0f94
--- /dev/null
+++ b/pkg/testworkflows/testworkflowexecutor/testworkflowmetrics.go
@@ -0,0 +1,234 @@
+package testworkflowexecutor
+
+import (
+ "context"
+ "strings"
+
+ testworkflowsv1 "github.com/kubeshop/testkube-operator/api/testworkflows/v1"
+
+ "github.com/kubeshop/testkube/pkg/api/v1/testkube"
+ "github.com/kubeshop/testkube/pkg/log"
+ "github.com/kubeshop/testkube/pkg/telemetry"
+ "github.com/kubeshop/testkube/pkg/testworkflows"
+ "github.com/kubeshop/testkube/pkg/version"
+)
+
+type stepStats struct {
+ numSteps int
+ numExecute int
+ hasArtifacts bool
+ hasMatrix bool
+ hasParallel bool
+ hasTemplate bool
+ hasServices bool
+ imagesUsed map[string]struct{}
+ templatesUsed map[string]struct{}
+}
+
+func (ss *stepStats) Merge(stats *stepStats) {
+ ss.numSteps += stats.numSteps
+ ss.numExecute += stats.numExecute
+
+ if stats.hasArtifacts {
+ ss.hasArtifacts = true
+ }
+ if stats.hasMatrix {
+ ss.hasMatrix = true
+ }
+ if stats.hasParallel {
+ ss.hasParallel = true
+ }
+ if stats.hasServices {
+ ss.hasServices = true
+ }
+ if stats.hasTemplate {
+ ss.hasTemplate = true
+ }
+ for image := range stats.imagesUsed {
+ ss.imagesUsed[image] = struct{}{}
+ }
+ for tmpl := range stats.templatesUsed {
+ ss.templatesUsed[tmpl] = struct{}{}
+ }
+}
+
+func getStepInfo(step testworkflowsv1.Step) *stepStats {
+ res := &stepStats{
+ imagesUsed: make(map[string]struct{}),
+ templatesUsed: make(map[string]struct{}),
+ }
+ if step.Execute != nil {
+ res.numExecute++
+ }
+ if step.Artifacts != nil {
+ res.hasArtifacts = true
+ }
+ if len(step.Use) > 0 {
+ res.hasTemplate = true
+ for _, tmpl := range step.Use {
+ res.templatesUsed[tmpl.Name] = struct{}{}
+ }
+ }
+ if step.Template != nil {
+ res.hasTemplate = true
+ res.templatesUsed[step.Template.Name] = struct{}{}
+ }
+ if len(step.Services) > 0 {
+ res.hasServices = true
+ }
+
+ if step.Run != nil && step.Run.Image != "" {
+ res.imagesUsed[step.Run.Image] = struct{}{}
+ }
+ if step.Container != nil && step.Container.Image != "" {
+ res.imagesUsed[step.Container.Image] = struct{}{}
+ }
+
+ for _, step := range step.Steps {
+ res.Merge(getStepInfo(step))
+ }
+
+ if step.Parallel != nil {
+ res.hasParallel = true
+
+ if len(step.Parallel.Matrix) != 0 {
+ res.hasMatrix = true
+ }
+ if step.Parallel.Artifacts != nil {
+ res.hasArtifacts = true
+ }
+ if step.Parallel.Execute != nil {
+ res.numExecute++
+ }
+ if len(step.Parallel.Use) > 0 {
+ res.hasTemplate = true
+ for _, tmpl := range step.Parallel.Use {
+ res.templatesUsed[tmpl.Name] = struct{}{}
+ }
+ }
+ if step.Parallel.Template != nil {
+ res.hasTemplate = true
+ res.templatesUsed[step.Parallel.Template.Name] = struct{}{}
+ }
+
+ if len(step.Parallel.Services) > 0 {
+ res.hasServices = true
+ }
+
+ if step.Parallel.Run != nil && step.Parallel.Run.Image != "" {
+ res.imagesUsed[step.Parallel.Run.Image] = struct{}{}
+ }
+ if step.Parallel.Container != nil && step.Parallel.Container.Image != "" {
+ res.imagesUsed[step.Parallel.Container.Image] = struct{}{}
+ }
+
+ for _, step := range step.Parallel.Steps {
+ res.Merge(getStepInfo(step))
+ }
+ }
+
+ return res
+}
+
+func (e *executor) sendRunWorkflowTelemetry(ctx context.Context, workflow *testworkflowsv1.TestWorkflow, execution *testkube.TestWorkflowExecution) {
+ if workflow == nil {
+ log.DefaultLogger.Debug("empty workflow passed to telemetry event")
+ return
+ }
+ telemetryEnabled, err := e.configMap.GetTelemetryEnabled(ctx)
+ if err != nil {
+ log.DefaultLogger.Debugf("getting telemetry enabled error", "error", err)
+ }
+ if !telemetryEnabled {
+ return
+ }
+
+ properties := make(map[string]any)
+ properties["name"] = workflow.Name
+ stats := stepStats{
+ imagesUsed: make(map[string]struct{}),
+ templatesUsed: make(map[string]struct{}),
+ }
+
+ var isSample bool
+ if workflow.Labels != nil && workflow.Labels["docs"] == "example" && strings.HasSuffix(workflow.Name, "-sample") {
+ isSample = true
+ } else {
+ isSample = false
+ }
+
+ spec := workflow.Spec
+ for _, step := range spec.Steps {
+ stats.Merge(getStepInfo(step))
+ }
+ if spec.Container != nil {
+ stats.imagesUsed[spec.Container.Image] = struct{}{}
+ }
+ if len(spec.Services) != 0 {
+ stats.hasServices = true
+ }
+ if len(spec.Use) > 0 {
+ stats.hasTemplate = true
+ for _, tmpl := range spec.Use {
+ stats.templatesUsed[tmpl.Name] = struct{}{}
+ }
+ }
+
+ var images []string
+ for image := range stats.imagesUsed {
+ if image == "" {
+ continue
+ }
+ images = append(images, image)
+ }
+
+ var templates []string
+ for t := range stats.templatesUsed {
+ if t == "" {
+ continue
+ }
+ templates = append(templates, t)
+ }
+ var (
+ status string
+ durationMs int32
+ )
+ if execution.Result != nil {
+ if execution.Result.Status != nil {
+ status = string(*execution.Result.Status)
+ }
+ durationMs = execution.Result.DurationMs
+ }
+
+ out, err := telemetry.SendRunWorkflowEvent("testkube_api_run_test_workflow", telemetry.RunWorkflowParams{
+ RunParams: telemetry.RunParams{
+ AppVersion: version.Version,
+ DataSource: testworkflows.GetDataSource(workflow.Spec.Content),
+ Host: testworkflows.GetHostname(),
+ ClusterID: testworkflows.GetClusterID(ctx, e.configMap),
+ DurationMs: durationMs,
+ Status: status,
+ },
+ WorkflowParams: telemetry.WorkflowParams{
+ TestWorkflowSteps: int32(stats.numSteps),
+ TestWorkflowExecuteCount: int32(stats.numExecute),
+ TestWorkflowImage: testworkflows.GetImage(workflow.Spec.Container),
+ TestWorkflowArtifactUsed: stats.hasArtifacts,
+ TestWorkflowParallelUsed: stats.hasParallel,
+ TestWorkflowMatrixUsed: stats.hasMatrix,
+ TestWorkflowServicesUsed: stats.hasServices,
+ TestWorkflowTemplateUsed: stats.hasTemplate,
+ IsSample: isSample,
+ Templates: templates,
+ Images: images,
+ TestWorkflowKubeshopGitURI: testworkflows.IsKubeshopGitURI(workflow.Spec.Content) ||
+ testworkflows.HasWorkflowStepLike(workflow.Spec, testworkflows.HasKubeshopGitURI),
+ },
+ })
+
+ if err != nil {
+ log.DefaultLogger.Debugf("sending run test workflow telemetry event error", "error", err)
+ } else {
+ log.DefaultLogger.Debugf("sending run test workflow telemetry event", "output", out)
+ }
+}