Skip to content

Commit

Permalink
feat: [TKC-2194] improve workflow execution telemetry
Browse files Browse the repository at this point in the history
  • Loading branch information
povilasv committed Jul 10, 2024
1 parent 56b2224 commit e2bfd96
Show file tree
Hide file tree
Showing 3 changed files with 259 additions and 43 deletions.
25 changes: 23 additions & 2 deletions pkg/telemetry/payload.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,16 @@ type Params struct {
ErrorType string `json:"error_type,omitempty"`
ErrorStackTrace string `json:"error_stacktrace,omitempty"`
TestWorkflowSteps int32 `json:"test_workflow_steps,omitempty"`
TestWorkflowExecuteCount int32 `json:"test_workflow_execute_count,omitempty"`
TestWorkflowParallelUsed bool `json:"test_workflow_parallel_used,omitempty"`
TestWorkflowMatrixUsed bool `json:"test_workflow_matrix_used,omitempty"`
TestWorkflowServicesUsed bool `json:"test_workflow_services_used,omitempty"`
TestWorkflowIsSample bool `json:"test_workflow_is_sample,omitempty"`
TestWorkflowTemplates []string `json:"testWorkflowTemplates"`
TestWorkflowImages []string `json:"testWorkflowImages"`
TestWorkflowTemplateUsed bool `json:"test_workflow_template_used,omitempty"`
TestWorkflowImage string `json:"test_workflow_image,omitempty"`
TestWorkflowArtifactUsed bool `json:"test_workflow_artifact_used,omitempty"`
TestWorkflowImage string `json:"test_workflow_image,omitempty"`
TestWorkflowKubeshopGitURI bool `json:"test_workflow_kubeshop_git_uri,omitempty"`
License string `json:"license,omitempty"`
Step string `json:"step,omitempty"`
Expand Down Expand Up @@ -84,9 +91,16 @@ type RunContext struct {

type WorkflowParams struct {
TestWorkflowSteps int32
TestWorkflowTemplateUsed bool
TestWorkflowExecuteCount int32
TestWorkflowImage string
TestWorkflowArtifactUsed bool
TestWorkflowParallelUsed bool
TestWorkflowMatrixUsed bool
TestWorkflowServicesUsed bool
TestWorkflowTemplateUsed bool
TestWorkflowIsSample bool
TestWorkflowTemplates []string
TestWorkflowImages []string
TestWorkflowKubeshopGitURI bool
}

Expand Down Expand Up @@ -290,7 +304,14 @@ func NewRunWorkflowPayload(name, clusterType string, params RunWorkflowParams) P
ClusterType: clusterType,
Context: getAgentContext(),
TestWorkflowSteps: params.TestWorkflowSteps,
TestWorkflowExecuteCount: params.TestWorkflowExecuteCount,
TestWorkflowParallelUsed: params.TestWorkflowParallelUsed,
TestWorkflowTemplateUsed: params.TestWorkflowTemplateUsed,
TestWorkflowMatrixUsed: params.TestWorkflowMatrixUsed,
TestWorkflowServicesUsed: params.TestWorkflowServicesUsed,
TestWorkflowIsSample: params.TestWorkflowIsSample,
TestWorkflowTemplates: params.TestWorkflowTemplates,
TestWorkflowImages: params.TestWorkflowImages,
TestWorkflowImage: params.TestWorkflowImage,
TestWorkflowArtifactUsed: params.TestWorkflowArtifactUsed,
TestWorkflowKubeshopGitURI: params.TestWorkflowKubeshopGitURI,
Expand Down
43 changes: 2 additions & 41 deletions pkg/testworkflows/testworkflowexecutor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,10 @@ import (
configRepo "github.com/kubeshop/testkube/pkg/repository/config"
"github.com/kubeshop/testkube/pkg/repository/result"
"github.com/kubeshop/testkube/pkg/repository/testworkflow"
"github.com/kubeshop/testkube/pkg/telemetry"
"github.com/kubeshop/testkube/pkg/testworkflows"
"github.com/kubeshop/testkube/pkg/testworkflows/testworkflowcontroller"
"github.com/kubeshop/testkube/pkg/testworkflows/testworkflowprocessor"
"github.com/kubeshop/testkube/pkg/testworkflows/testworkflowprocessor/constants"
"github.com/kubeshop/testkube/pkg/testworkflows/testworkflowresolver"
"github.com/kubeshop/testkube/pkg/version"
)

//go:generate mockgen -destination=./mock_executor.go -package=testworkflowexecutor "github.com/kubeshop/testkube/pkg/testworkflows/testworkflowexecutor" TestWorkflowExecutor
Expand Down Expand Up @@ -294,6 +291,8 @@ func (e *executor) Control(ctx context.Context, testWorkflow *testworkflowsv1.Te
// TODO: Consider AppendOutput ($push) instead
_ = e.repository.UpdateOutput(ctx, execution.Id, execution.Output)
if execution.Result.IsFinished() {
e.sendRunWorkflowTelemetry(ctx, testWorkflow, execution)

if execution.Result.IsPassed() {
e.emitter.Notify(testkube.NewEventEndTestWorkflowSuccess(execution))
} else if execution.Result.IsAborted() {
Expand Down Expand Up @@ -537,8 +536,6 @@ func (e *executor) Execute(ctx context.Context, workflow testworkflowsv1.TestWor
return execution, errors.Wrap(err, "deploying required resources")
}

e.sendRunWorkflowTelemetry(ctx, &workflow)

// Start to control the results
go func() {
err = e.Control(context.Background(), initialWorkflow, &execution)
Expand All @@ -550,39 +547,3 @@ func (e *executor) Execute(ctx context.Context, workflow testworkflowsv1.TestWor

return execution, nil
}

func (e *executor) sendRunWorkflowTelemetry(ctx context.Context, workflow *testworkflowsv1.TestWorkflow) {
if workflow == nil {
log.DefaultLogger.Debug("empty workflow passed to telemetry event")
return
}
telemetryEnabled, err := e.configMap.GetTelemetryEnabled(ctx)
if err != nil {
log.DefaultLogger.Debugf("getting telemetry enabled error", "error", err)
}
if !telemetryEnabled {
return
}

out, err := telemetry.SendRunWorkflowEvent("testkube_api_run_test_workflow", telemetry.RunWorkflowParams{
RunParams: telemetry.RunParams{
AppVersion: version.Version,
DataSource: testworkflows.GetDataSource(workflow.Spec.Content),
Host: testworkflows.GetHostname(),
ClusterID: testworkflows.GetClusterID(ctx, e.configMap),
},
WorkflowParams: telemetry.WorkflowParams{
TestWorkflowSteps: int32(len(workflow.Spec.Setup) + len(workflow.Spec.Steps) + len(workflow.Spec.After)),
TestWorkflowImage: testworkflows.GetImage(workflow.Spec.Container),
TestWorkflowArtifactUsed: testworkflows.HasWorkflowStepLike(workflow.Spec, testworkflows.HasArtifacts),
TestWorkflowKubeshopGitURI: testworkflows.IsKubeshopGitURI(workflow.Spec.Content) ||
testworkflows.HasWorkflowStepLike(workflow.Spec, testworkflows.HasKubeshopGitURI),
},
})

if err != nil {
log.DefaultLogger.Debugf("sending run test workflow telemetry event error", "error", err)
} else {
log.DefaultLogger.Debugf("sending run test workflow telemetry event", "output", out)
}
}
234 changes: 234 additions & 0 deletions pkg/testworkflows/testworkflowexecutor/testworkflowmetrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
package testworkflowexecutor

import (
"context"
"strings"

testworkflowsv1 "github.com/kubeshop/testkube-operator/api/testworkflows/v1"

"github.com/kubeshop/testkube/pkg/api/v1/testkube"
"github.com/kubeshop/testkube/pkg/log"
"github.com/kubeshop/testkube/pkg/telemetry"
"github.com/kubeshop/testkube/pkg/testworkflows"
"github.com/kubeshop/testkube/pkg/version"
)

type stepStats struct {
numSteps int
numExecute int
hasArtifacts bool
hasMatrix bool
hasParallel bool
hasTemplate bool
hasServices bool
imagesUsed map[string]struct{}
templatesUsed map[string]struct{}
}

func (ss *stepStats) Merge(stats *stepStats) {
ss.numSteps += stats.numSteps
ss.numExecute += stats.numExecute

if stats.hasArtifacts {
ss.hasArtifacts = true
}
if stats.hasMatrix {
ss.hasMatrix = true
}
if stats.hasParallel {
ss.hasParallel = true
}
if stats.hasServices {
ss.hasServices = true
}
if stats.hasTemplate {
ss.hasTemplate = true
}
for image := range stats.imagesUsed {
ss.imagesUsed[image] = struct{}{}
}
for tmpl := range stats.templatesUsed {
ss.templatesUsed[tmpl] = struct{}{}
}
}

func getStepInfo(step testworkflowsv1.Step) *stepStats {
res := &stepStats{
imagesUsed: make(map[string]struct{}),
templatesUsed: make(map[string]struct{}),
}
if step.Execute != nil {
res.numExecute++
}
if step.Artifacts != nil {
res.hasArtifacts = true
}
if len(step.Use) > 0 {
res.hasTemplate = true
for _, tmpl := range step.Use {
res.templatesUsed[tmpl.Name] = struct{}{}
}
}
if step.Template != nil {
res.hasTemplate = true
res.templatesUsed[step.Template.Name] = struct{}{}
}
if len(step.Services) > 0 {
res.hasServices = true
}

if step.Run != nil && step.Run.Image != "" {
res.imagesUsed[step.Run.Image] = struct{}{}
}
if step.Container != nil && step.Container.Image != "" {
res.imagesUsed[step.Container.Image] = struct{}{}
}

for _, step := range step.Steps {
res.Merge(getStepInfo(step))
}

if step.Parallel != nil {
res.hasParallel = true

if len(step.Parallel.Matrix) != 0 {
res.hasMatrix = true
}
if step.Parallel.Artifacts != nil {
res.hasArtifacts = true
}
if step.Parallel.Execute != nil {
res.numExecute++
}
if len(step.Parallel.Use) > 0 {
res.hasTemplate = true
for _, tmpl := range step.Parallel.Use {
res.templatesUsed[tmpl.Name] = struct{}{}
}
}
if step.Parallel.Template != nil {
res.hasTemplate = true
res.templatesUsed[step.Parallel.Template.Name] = struct{}{}
}

if len(step.Parallel.Services) > 0 {
res.hasServices = true
}

if step.Parallel.Run != nil && step.Parallel.Run.Image != "" {
res.imagesUsed[step.Parallel.Run.Image] = struct{}{}
}
if step.Parallel.Container != nil && step.Parallel.Container.Image != "" {
res.imagesUsed[step.Parallel.Container.Image] = struct{}{}
}

for _, step := range step.Parallel.Steps {
res.Merge(getStepInfo(step))
}
}

return res
}

func (e *executor) sendRunWorkflowTelemetry(ctx context.Context, workflow *testworkflowsv1.TestWorkflow, execution *testkube.TestWorkflowExecution) {
if workflow == nil {
log.DefaultLogger.Debug("empty workflow passed to telemetry event")
return
}
telemetryEnabled, err := e.configMap.GetTelemetryEnabled(ctx)
if err != nil {
log.DefaultLogger.Debugf("getting telemetry enabled error", "error", err)
}
if !telemetryEnabled {
return
}

properties := make(map[string]any)
properties["name"] = workflow.Name
stats := stepStats{
imagesUsed: make(map[string]struct{}),
templatesUsed: make(map[string]struct{}),
}

var isSample bool
if workflow.Labels != nil && workflow.Labels["docs"] == "example" && strings.HasSuffix(workflow.Name, "-sample") {
isSample = true
} else {
isSample = false
}

spec := workflow.Spec
for _, step := range spec.Steps {
stats.Merge(getStepInfo(step))
}
if spec.Container != nil {
stats.imagesUsed[spec.Container.Image] = struct{}{}
}
if len(spec.Services) != 0 {
stats.hasServices = true
}
if len(spec.Use) > 0 {
stats.hasTemplate = true
for _, tmpl := range spec.Use {
stats.templatesUsed[tmpl.Name] = struct{}{}
}
}

var images []string
for image := range stats.imagesUsed {
if image == "" {
continue
}
images = append(images, image)
}

var templates []string
for t := range stats.templatesUsed {
if t == "" {
continue
}
templates = append(templates, t)
}
var (
status string
durationMs int32
)
if execution.Result != nil {
if execution.Result.Status != nil {
status = string(*execution.Result.Status)
}
durationMs = execution.Result.DurationMs
}

out, err := telemetry.SendRunWorkflowEvent("testkube_api_run_test_workflow", telemetry.RunWorkflowParams{
RunParams: telemetry.RunParams{
AppVersion: version.Version,
DataSource: testworkflows.GetDataSource(workflow.Spec.Content),
Host: testworkflows.GetHostname(),
ClusterID: testworkflows.GetClusterID(ctx, e.configMap),
DurationMs: durationMs,
Status: status,
},
WorkflowParams: telemetry.WorkflowParams{
TestWorkflowSteps: int32(stats.numSteps),
TestWorkflowExecuteCount: int32(stats.numExecute),
TestWorkflowImage: testworkflows.GetImage(workflow.Spec.Container),
TestWorkflowArtifactUsed: stats.hasArtifacts,
TestWorkflowParallelUsed: stats.hasParallel,
TestWorkflowMatrixUsed: stats.hasMatrix,
TestWorkflowServicesUsed: stats.hasServices,
TestWorkflowTemplateUsed: stats.hasTemplate,
TestWorkflowIsSample: isSample,
TestWorkflowTemplates: templates,
TestWorkflowImages: images,
TestWorkflowKubeshopGitURI: testworkflows.IsKubeshopGitURI(workflow.Spec.Content) ||
testworkflows.HasWorkflowStepLike(workflow.Spec, testworkflows.HasKubeshopGitURI),
},
})

if err != nil {
log.DefaultLogger.Debugf("sending run test workflow telemetry event error", "error", err)
} else {
log.DefaultLogger.Debugf("sending run test workflow telemetry event", "output", out)
}
}

0 comments on commit e2bfd96

Please sign in to comment.