Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: [TKC-2194] improve workflow execution telemetry #5648

Merged
merged 1 commit into from
Jul 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 23 additions & 2 deletions pkg/telemetry/payload.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,16 @@ type Params struct {
ErrorType string `json:"error_type,omitempty"`
ErrorStackTrace string `json:"error_stacktrace,omitempty"`
TestWorkflowSteps int32 `json:"test_workflow_steps,omitempty"`
TestWorkflowExecuteCount int32 `json:"test_workflow_execute_count,omitempty"`
TestWorkflowParallelUsed bool `json:"test_workflow_parallel_used,omitempty"`
TestWorkflowMatrixUsed bool `json:"test_workflow_matrix_used,omitempty"`
TestWorkflowServicesUsed bool `json:"test_workflow_services_used,omitempty"`
TestWorkflowIsSample bool `json:"test_workflow_is_sample,omitempty"`
TestWorkflowTemplates []string `json:"testWorkflowTemplates"`
TestWorkflowImages []string `json:"testWorkflowImages"`
TestWorkflowTemplateUsed bool `json:"test_workflow_template_used,omitempty"`
TestWorkflowImage string `json:"test_workflow_image,omitempty"`
TestWorkflowArtifactUsed bool `json:"test_workflow_artifact_used,omitempty"`
TestWorkflowImage string `json:"test_workflow_image,omitempty"`
TestWorkflowKubeshopGitURI bool `json:"test_workflow_kubeshop_git_uri,omitempty"`
License string `json:"license,omitempty"`
Step string `json:"step,omitempty"`
Expand Down Expand Up @@ -84,9 +91,16 @@ type RunContext struct {

type WorkflowParams struct {
TestWorkflowSteps int32
TestWorkflowTemplateUsed bool
TestWorkflowExecuteCount int32
TestWorkflowImage string
TestWorkflowArtifactUsed bool
TestWorkflowParallelUsed bool
TestWorkflowMatrixUsed bool
TestWorkflowServicesUsed bool
TestWorkflowTemplateUsed bool
TestWorkflowIsSample bool
TestWorkflowTemplates []string
TestWorkflowImages []string
TestWorkflowKubeshopGitURI bool
}

Expand Down Expand Up @@ -290,7 +304,14 @@ func NewRunWorkflowPayload(name, clusterType string, params RunWorkflowParams) P
ClusterType: clusterType,
Context: getAgentContext(),
TestWorkflowSteps: params.TestWorkflowSteps,
TestWorkflowExecuteCount: params.TestWorkflowExecuteCount,
TestWorkflowParallelUsed: params.TestWorkflowParallelUsed,
TestWorkflowTemplateUsed: params.TestWorkflowTemplateUsed,
TestWorkflowMatrixUsed: params.TestWorkflowMatrixUsed,
TestWorkflowServicesUsed: params.TestWorkflowServicesUsed,
TestWorkflowIsSample: params.TestWorkflowIsSample,
TestWorkflowTemplates: params.TestWorkflowTemplates,
TestWorkflowImages: params.TestWorkflowImages,
TestWorkflowImage: params.TestWorkflowImage,
TestWorkflowArtifactUsed: params.TestWorkflowArtifactUsed,
TestWorkflowKubeshopGitURI: params.TestWorkflowKubeshopGitURI,
Expand Down
43 changes: 2 additions & 41 deletions pkg/testworkflows/testworkflowexecutor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,10 @@ import (
configRepo "github.com/kubeshop/testkube/pkg/repository/config"
"github.com/kubeshop/testkube/pkg/repository/result"
"github.com/kubeshop/testkube/pkg/repository/testworkflow"
"github.com/kubeshop/testkube/pkg/telemetry"
"github.com/kubeshop/testkube/pkg/testworkflows"
"github.com/kubeshop/testkube/pkg/testworkflows/testworkflowcontroller"
"github.com/kubeshop/testkube/pkg/testworkflows/testworkflowprocessor"
"github.com/kubeshop/testkube/pkg/testworkflows/testworkflowprocessor/constants"
"github.com/kubeshop/testkube/pkg/testworkflows/testworkflowresolver"
"github.com/kubeshop/testkube/pkg/version"
)

//go:generate mockgen -destination=./mock_executor.go -package=testworkflowexecutor "github.com/kubeshop/testkube/pkg/testworkflows/testworkflowexecutor" TestWorkflowExecutor
Expand Down Expand Up @@ -294,6 +291,8 @@ func (e *executor) Control(ctx context.Context, testWorkflow *testworkflowsv1.Te
// TODO: Consider AppendOutput ($push) instead
_ = e.repository.UpdateOutput(ctx, execution.Id, execution.Output)
if execution.Result.IsFinished() {
e.sendRunWorkflowTelemetry(ctx, testWorkflow, execution)

if execution.Result.IsPassed() {
e.emitter.Notify(testkube.NewEventEndTestWorkflowSuccess(execution))
} else if execution.Result.IsAborted() {
Expand Down Expand Up @@ -537,8 +536,6 @@ func (e *executor) Execute(ctx context.Context, workflow testworkflowsv1.TestWor
return execution, errors.Wrap(err, "deploying required resources")
}

e.sendRunWorkflowTelemetry(ctx, &workflow)

// Start to control the results
go func() {
err = e.Control(context.Background(), initialWorkflow, &execution)
Expand All @@ -550,39 +547,3 @@ func (e *executor) Execute(ctx context.Context, workflow testworkflowsv1.TestWor

return execution, nil
}

func (e *executor) sendRunWorkflowTelemetry(ctx context.Context, workflow *testworkflowsv1.TestWorkflow) {
if workflow == nil {
log.DefaultLogger.Debug("empty workflow passed to telemetry event")
return
}
telemetryEnabled, err := e.configMap.GetTelemetryEnabled(ctx)
if err != nil {
log.DefaultLogger.Debugf("getting telemetry enabled error", "error", err)
}
if !telemetryEnabled {
return
}

out, err := telemetry.SendRunWorkflowEvent("testkube_api_run_test_workflow", telemetry.RunWorkflowParams{
RunParams: telemetry.RunParams{
AppVersion: version.Version,
DataSource: testworkflows.GetDataSource(workflow.Spec.Content),
Host: testworkflows.GetHostname(),
ClusterID: testworkflows.GetClusterID(ctx, e.configMap),
},
WorkflowParams: telemetry.WorkflowParams{
TestWorkflowSteps: int32(len(workflow.Spec.Setup) + len(workflow.Spec.Steps) + len(workflow.Spec.After)),
TestWorkflowImage: testworkflows.GetImage(workflow.Spec.Container),
TestWorkflowArtifactUsed: testworkflows.HasWorkflowStepLike(workflow.Spec, testworkflows.HasArtifacts),
TestWorkflowKubeshopGitURI: testworkflows.IsKubeshopGitURI(workflow.Spec.Content) ||
testworkflows.HasWorkflowStepLike(workflow.Spec, testworkflows.HasKubeshopGitURI),
},
})

if err != nil {
log.DefaultLogger.Debugf("sending run test workflow telemetry event error", "error", err)
} else {
log.DefaultLogger.Debugf("sending run test workflow telemetry event", "output", out)
}
}
234 changes: 234 additions & 0 deletions pkg/testworkflows/testworkflowexecutor/testworkflowmetrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
package testworkflowexecutor

import (
"context"
"strings"

testworkflowsv1 "github.com/kubeshop/testkube-operator/api/testworkflows/v1"

"github.com/kubeshop/testkube/pkg/api/v1/testkube"
"github.com/kubeshop/testkube/pkg/log"
"github.com/kubeshop/testkube/pkg/telemetry"
"github.com/kubeshop/testkube/pkg/testworkflows"
"github.com/kubeshop/testkube/pkg/version"
)

type stepStats struct {
numSteps int
numExecute int
hasArtifacts bool
hasMatrix bool
hasParallel bool
hasTemplate bool
hasServices bool
imagesUsed map[string]struct{}
templatesUsed map[string]struct{}
}

func (ss *stepStats) Merge(stats *stepStats) {
ss.numSteps += stats.numSteps
ss.numExecute += stats.numExecute

if stats.hasArtifacts {
ss.hasArtifacts = true
}
if stats.hasMatrix {
ss.hasMatrix = true
}
if stats.hasParallel {
ss.hasParallel = true
}
if stats.hasServices {
ss.hasServices = true
}
if stats.hasTemplate {
ss.hasTemplate = true
}
for image := range stats.imagesUsed {
ss.imagesUsed[image] = struct{}{}
}
for tmpl := range stats.templatesUsed {
ss.templatesUsed[tmpl] = struct{}{}
}
}

func getStepInfo(step testworkflowsv1.Step) *stepStats {
res := &stepStats{
imagesUsed: make(map[string]struct{}),
templatesUsed: make(map[string]struct{}),
}
if step.Execute != nil {
res.numExecute++
}
if step.Artifacts != nil {
res.hasArtifacts = true
}
if len(step.Use) > 0 {
res.hasTemplate = true
for _, tmpl := range step.Use {
res.templatesUsed[tmpl.Name] = struct{}{}
}
}
if step.Template != nil {
res.hasTemplate = true
res.templatesUsed[step.Template.Name] = struct{}{}
}
if len(step.Services) > 0 {
res.hasServices = true
}

if step.Run != nil && step.Run.Image != "" {
res.imagesUsed[step.Run.Image] = struct{}{}
}
if step.Container != nil && step.Container.Image != "" {
res.imagesUsed[step.Container.Image] = struct{}{}
}

for _, step := range step.Steps {
res.Merge(getStepInfo(step))
}

if step.Parallel != nil {
res.hasParallel = true

if len(step.Parallel.Matrix) != 0 {
res.hasMatrix = true
}
if step.Parallel.Artifacts != nil {
res.hasArtifacts = true
}
if step.Parallel.Execute != nil {
res.numExecute++
}
if len(step.Parallel.Use) > 0 {
res.hasTemplate = true
for _, tmpl := range step.Parallel.Use {
res.templatesUsed[tmpl.Name] = struct{}{}
}
}
if step.Parallel.Template != nil {
res.hasTemplate = true
res.templatesUsed[step.Parallel.Template.Name] = struct{}{}
}

if len(step.Parallel.Services) > 0 {
res.hasServices = true
}

if step.Parallel.Run != nil && step.Parallel.Run.Image != "" {
res.imagesUsed[step.Parallel.Run.Image] = struct{}{}
}
if step.Parallel.Container != nil && step.Parallel.Container.Image != "" {
res.imagesUsed[step.Parallel.Container.Image] = struct{}{}
}

for _, step := range step.Parallel.Steps {
res.Merge(getStepInfo(step))
}
}

return res
}

func (e *executor) sendRunWorkflowTelemetry(ctx context.Context, workflow *testworkflowsv1.TestWorkflow, execution *testkube.TestWorkflowExecution) {
if workflow == nil {
log.DefaultLogger.Debug("empty workflow passed to telemetry event")
return
}
telemetryEnabled, err := e.configMap.GetTelemetryEnabled(ctx)
if err != nil {
log.DefaultLogger.Debugf("getting telemetry enabled error", "error", err)
}
if !telemetryEnabled {
return
}

properties := make(map[string]any)
properties["name"] = workflow.Name
stats := stepStats{
imagesUsed: make(map[string]struct{}),
templatesUsed: make(map[string]struct{}),
}

var isSample bool
if workflow.Labels != nil && workflow.Labels["docs"] == "example" && strings.HasSuffix(workflow.Name, "-sample") {
isSample = true
} else {
isSample = false
}

spec := workflow.Spec
for _, step := range spec.Steps {
stats.Merge(getStepInfo(step))
}
if spec.Container != nil {
stats.imagesUsed[spec.Container.Image] = struct{}{}
}
if len(spec.Services) != 0 {
stats.hasServices = true
}
if len(spec.Use) > 0 {
stats.hasTemplate = true
for _, tmpl := range spec.Use {
stats.templatesUsed[tmpl.Name] = struct{}{}
}
}

var images []string
for image := range stats.imagesUsed {
if image == "" {
continue
}
images = append(images, image)
}

var templates []string
for t := range stats.templatesUsed {
if t == "" {
continue
}
templates = append(templates, t)
}
var (
status string
durationMs int32
)
if execution.Result != nil {
if execution.Result.Status != nil {
status = string(*execution.Result.Status)
}
durationMs = execution.Result.DurationMs
}

out, err := telemetry.SendRunWorkflowEvent("testkube_api_run_test_workflow", telemetry.RunWorkflowParams{
RunParams: telemetry.RunParams{
AppVersion: version.Version,
DataSource: testworkflows.GetDataSource(workflow.Spec.Content),
Host: testworkflows.GetHostname(),
ClusterID: testworkflows.GetClusterID(ctx, e.configMap),
DurationMs: durationMs,
Status: status,
},
WorkflowParams: telemetry.WorkflowParams{
TestWorkflowSteps: int32(stats.numSteps),
TestWorkflowExecuteCount: int32(stats.numExecute),
TestWorkflowImage: testworkflows.GetImage(workflow.Spec.Container),
TestWorkflowArtifactUsed: stats.hasArtifacts,
TestWorkflowParallelUsed: stats.hasParallel,
TestWorkflowMatrixUsed: stats.hasMatrix,
TestWorkflowServicesUsed: stats.hasServices,
TestWorkflowTemplateUsed: stats.hasTemplate,
TestWorkflowIsSample: isSample,
TestWorkflowTemplates: templates,
TestWorkflowImages: images,
TestWorkflowKubeshopGitURI: testworkflows.IsKubeshopGitURI(workflow.Spec.Content) ||
testworkflows.HasWorkflowStepLike(workflow.Spec, testworkflows.HasKubeshopGitURI),
},
})

if err != nil {
log.DefaultLogger.Debugf("sending run test workflow telemetry event error", "error", err)
} else {
log.DefaultLogger.Debugf("sending run test workflow telemetry event", "output", out)
}
}
Loading