From 2f8d1064d321f7413edf26728990919151cf6320 Mon Sep 17 00:00:00 2001 From: Dawid Rusnak Date: Thu, 16 May 2024 14:47:42 +0200 Subject: [PATCH] feat(testworkflows): TKC-1764 - support accompanying services (#5436) * feat(testworkflows): support `services` in the TestWorkflow objects * feat(testworkflows): support `use` in `services` * fix(testworkflows): orchestration fixes * feat(testworkflows): TKC-1764 - add support for accompanying services * chore: update integration tests to use provided URLs * chore: update integration tests to use provided Minio URLs * chore: fix NATS URIs in tests * chore: hardcode again Minio buckets for tests * fix: add missing environment variables for integration tests * fix(testworkflows): cloning the repository * fix(testworkflows): merging securityContext * fix(testworkflows): sort out proper group/fsGroup/user for the services * feat(testworkflows): add example integration tests for Testkube * fix: adjust make integration-tests to fill the access/secret keys * chore: update testkube-operator * feat: expose PWD environment variable for the processing --- .github/workflows/test.yaml | 3 + Makefile | 2 +- api/v1/testkube.yaml | 124 ++++++ cmd/tcl/testworkflow-init/data/expressions.go | 6 + cmd/tcl/testworkflow-init/main.go | 8 + .../testworkflow-toolkit/commands/clone.go | 6 +- cmd/tcl/testworkflow-toolkit/commands/kill.go | 98 +++++ .../testworkflow-toolkit/commands/parallel.go | 4 +- cmd/tcl/testworkflow-toolkit/commands/root.go | 2 + .../testworkflow-toolkit/commands/services.go | 388 ++++++++++++++++++ cmd/tcl/testworkflow-toolkit/spawn/utils.go | 24 +- go.mod | 2 +- go.sum | 4 +- pkg/api/v1/testkube/model_exec_action.go | 15 + pkg/api/v1/testkube/model_grpc_action.go | 15 + pkg/api/v1/testkube/model_http_get_action.go | 18 + pkg/api/v1/testkube/model_http_header.go | 15 + pkg/api/v1/testkube/model_probe.go | 24 ++ .../v1/testkube/model_tcp_socket_action.go | 15 + ..._test_workflow_independent_service_spec.go | 42 ++ .../model_test_workflow_independent_step.go | 5 +- ...test_workflow_independent_step_parallel.go | 21 +- .../model_test_workflow_service_spec.go | 43 ++ .../v1/testkube/model_test_workflow_spec.go | 1 + .../v1/testkube/model_test_workflow_step.go | 5 +- .../model_test_workflow_step_parallel.go | 1 + .../model_test_workflow_template_spec.go | 19 +- pkg/event/bus/nats_integration_test.go | 9 +- pkg/event/bus/testserver.go | 2 +- pkg/event/emitter_integration_test.go | 7 +- .../scraper/minio_scraper_integration_test.go | 49 ++- .../minio_uploader_integration_test.go | 60 +-- pkg/logs/adapter/minio_test.go | 15 +- pkg/logs/adapter/minio_v2_test.go | 2 +- pkg/repository/config/mongo_test.go | 8 +- .../result/mongo_integration_test.go | 8 +- pkg/repository/testresult/mongo_test.go | 8 +- .../artifacts_storage_integration_test.go | 23 +- .../mapperstcl/testworkflows/kube_openapi.go | 108 +++++ .../mapperstcl/testworkflows/openapi_kube.go | 139 ++++++- .../testworkflow/mongo_integration_test.go | 7 +- .../testworkflowcontroller/cleanup.go | 26 ++ .../constants/constants.go | 1 + .../testworkflowprocessor/container.go | 8 +- .../testworkflowprocessor/operations.go | 60 +++ .../testworkflowprocessor/processor.go | 43 +- .../testworkflowprocessor/utils.go | 20 +- .../testworkflowresolver/analyze.go | 10 + .../testworkflowresolver/apply.go | 56 +++ .../testworkflowresolver/merge.go | 5 + .../testkube-integration-tests.yaml | 77 ++++ .../executor-tests/crd-workflow/smoke.yaml | 103 +++++ 52 files changed, 1635 insertions(+), 129 deletions(-) create mode 100644 cmd/tcl/testworkflow-toolkit/commands/kill.go create mode 100644 cmd/tcl/testworkflow-toolkit/commands/services.go create mode 100644 pkg/api/v1/testkube/model_exec_action.go create mode 100644 pkg/api/v1/testkube/model_grpc_action.go create mode 100644 pkg/api/v1/testkube/model_http_get_action.go create mode 100644 pkg/api/v1/testkube/model_http_header.go create mode 100644 pkg/api/v1/testkube/model_probe.go create mode 100644 pkg/api/v1/testkube/model_tcp_socket_action.go create mode 100644 pkg/api/v1/testkube/model_test_workflow_independent_service_spec.go create mode 100644 pkg/api/v1/testkube/model_test_workflow_service_spec.go create mode 100644 test/integration/crd-workflow/testkube-integration-tests.yaml diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index b6b460079b8..3f6f3935349 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -152,6 +152,9 @@ jobs: run: sudo apt-get install -y git - name: Integration Tests + env: + STORAGE_ACCESSKEYID: minio99 + STORAGE_SECRETACCESSKEY: minio123 run: INTEGRATION=y gotestsum --format pkgname --junitfile integration-tests.xml --jsonfile integration-tests.json -- -run _Integration -coverprofile=coverage.out -covermode=atomic ./... - name: Integration Test Summary diff --git a/Makefile b/Makefile index e1d17208924..2ccee6a0cdb 100644 --- a/Makefile +++ b/Makefile @@ -164,7 +164,7 @@ unit-tests: .PHONY: integration-tests integration-tests: - INTEGRATION="true" gotestsum --format pkgname -- -tags=integration -cover ./... + INTEGRATION="true" STORAGE_ACCESSKEYID="minio99" STORAGE_SECRETACCESSKEY="minio123" gotestsum --format pkgname -- -tags=integration -cover ./... test-e2e: go test --tags=e2e -v ./test/e2e diff --git a/api/v1/testkube.yaml b/api/v1/testkube.yaml index c5df5ed391a..b0603f0a211 100644 --- a/api/v1/testkube.yaml +++ b/api/v1/testkube.yaml @@ -7942,6 +7942,40 @@ components: items: $ref: "#/components/schemas/TestWorkflowStepParallelFetch" + TestWorkflowIndependentServiceSpec: + type: object + allOf: + - $ref: "#/components/schemas/TestWorkflowStepExecuteStrategy" + - $ref: "#/components/schemas/TestWorkflowStepRun" + - properties: + timeout: + type: string + description: "maximum time until reaching readiness" + transfer: + type: array + description: list of files to send to parallel steps + items: + $ref: "#/components/schemas/TestWorkflowStepParallelTransfer" + content: + $ref: "#/components/schemas/TestWorkflowContent" + pod: + $ref: "#/components/schemas/TestWorkflowPodConfig" + restartPolicy: + type: string + readinessProbe: + $ref: "#/components/schemas/Probe" + + TestWorkflowServiceSpec: + type: object + allOf: + - $ref: "#/components/schemas/TestWorkflowStepExecuteStrategy" + - $ref: "#/components/schemas/TestWorkflowIndependentServiceSpec" + - properties: + use: + type: array + items: + $ref: "#/components/schemas/TestWorkflowTemplateRef" + TestWorkflowStepExecuteStrategy: type: object properties: @@ -8011,6 +8045,10 @@ components: $ref: "#/components/schemas/TestWorkflowConfigSchema" content: $ref: "#/components/schemas/TestWorkflowContent" + services: + type: object + additionalProperties: + $ref: "#/components/schemas/TestWorkflowServiceSpec" container: $ref: "#/components/schemas/TestWorkflowContainerConfig" job: @@ -8041,6 +8079,10 @@ components: $ref: "#/components/schemas/TestWorkflowConfigSchema" content: $ref: "#/components/schemas/TestWorkflowContent" + services: + type: object + additionalProperties: + $ref: "#/components/schemas/TestWorkflowIndependentServiceSpec" container: $ref: "#/components/schemas/TestWorkflowContainerConfig" job: @@ -8130,6 +8172,10 @@ components: description: delay before the step content: $ref: "#/components/schemas/TestWorkflowContent" + services: + type: object + additionalProperties: + $ref: "#/components/schemas/TestWorkflowIndependentServiceSpec" shell: type: string description: script to run in a default shell for the container @@ -8193,6 +8239,10 @@ components: description: delay before the step content: $ref: "#/components/schemas/TestWorkflowContent" + services: + type: object + additionalProperties: + $ref: "#/components/schemas/TestWorkflowServiceSpec" shell: type: string description: script to run in a default shell for the container @@ -9171,6 +9221,80 @@ components: value: $ref: "#/components/schemas/BoxedString" + Probe: + type: object + description: "Probe describes a health check to be performed against a container to determine whether it is alive or ready to receive traffic." + properties: + initialDelaySeconds: + type: integer + timeoutSeconds: + type: integer + periodSeconds: + type: integer + successThreshold: + type: integer + failureThreshold: + type: integer + terminationGracePeriodSeconds: + $ref: "#/components/schemas/BoxedInteger" + exec: + $ref: "#/components/schemas/ExecAction" + httpGet: + $ref: "#/components/schemas/HTTPGetAction" + tcpSocket: + $ref: "#/components/schemas/TCPSocketAction" + grpc: + $ref: "#/components/schemas/GRPCAction" + + ExecAction: + type: object + properties: + command: + type: array + description: "Command is the command line to execute inside the container, the working directory for the command is root ('/') in the container's filesystem. Exit status of 0 is treated as live/healthy and non-zero is unhealthy." + items: + type: string + + HTTPGetAction: + type: object + properties: + path: + type: string + port: + type: string + host: + type: string + scheme: + type: string + httpHeaders: + type: array + items: + $ref: "#/components/schemas/HTTPHeader" + + HTTPHeader: + type: object + properties: + name: + type: string + value: + type: string + + TCPSocketAction: + type: object + properties: + port: + type: string + host: + type: string + + GRPCAction: + type: object + properties: + port: + type: integer + service: + $ref: "#/components/schemas/BoxedString" + VolumeMount: description: VolumeMount describes a mounting of a Volume within a container. diff --git a/cmd/tcl/testworkflow-init/data/expressions.go b/cmd/tcl/testworkflow-init/data/expressions.go index 763bcbe16af..b347b95f727 100644 --- a/cmd/tcl/testworkflow-init/data/expressions.go +++ b/cmd/tcl/testworkflow-init/data/expressions.go @@ -71,6 +71,12 @@ var StateMachine = expressionstcl.NewMachine(). return State.GetOutput(name[7:]) } return nil, false, nil + }). + RegisterAccessorExt(func(name string) (interface{}, bool, error) { + if strings.HasPrefix(name, "services.") { + return State.GetOutput(name) + } + return nil, false, nil }) var EnvMachine = expressionstcl.NewMachine(). diff --git a/cmd/tcl/testworkflow-init/main.go b/cmd/tcl/testworkflow-init/main.go index 31cdc15bebe..5fb59afe09e 100644 --- a/cmd/tcl/testworkflow-init/main.go +++ b/cmd/tcl/testworkflow-init/main.go @@ -111,6 +111,14 @@ func main() { } } + // Configure PWD variable, to make it similar to shell environment variables + if os.Getenv("PWD") == "" { + cwd, err := os.Getwd() + if err == nil { + _ = os.Setenv("PWD", cwd) + } + } + // Compute environment variables for _, name := range computed { initial := os.Getenv(name) diff --git a/cmd/tcl/testworkflow-toolkit/commands/clone.go b/cmd/tcl/testworkflow-toolkit/commands/clone.go index 08b9df2f064..17d4186efca 100644 --- a/cmd/tcl/testworkflow-toolkit/commands/clone.go +++ b/cmd/tcl/testworkflow-toolkit/commands/clone.go @@ -69,7 +69,11 @@ func NewCloneCmd() *cobra.Command { // Clone repository if len(paths) == 0 { ui.Debug("full checkout") - err = Run("git", "clone", configArgs, authArgs, "--depth", 1, "--verbose", uri.String(), outputPath) + if revision == "" { + err = Run("git", "clone", configArgs, authArgs, "--depth", 1, "--verbose", uri.String(), outputPath) + } else { + err = Run("git", "clone", configArgs, authArgs, "--depth", 1, "--branch", revision, "--verbose", uri.String(), outputPath) + } ui.ExitOnError("cloning repository", err) } else { ui.Debug("sparse checkout") diff --git a/cmd/tcl/testworkflow-toolkit/commands/kill.go b/cmd/tcl/testworkflow-toolkit/commands/kill.go new file mode 100644 index 00000000000..eae094c4c1e --- /dev/null +++ b/cmd/tcl/testworkflow-toolkit/commands/kill.go @@ -0,0 +1,98 @@ +// Copyright 2024 Testkube. +// +// Licensed as a Testkube Pro file under the Testkube Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/kubeshop/testkube/blob/main/licenses/TCL.txt + +package commands + +import ( + "context" + "fmt" + "os" + "slices" + + "github.com/spf13/cobra" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/kubeshop/testkube/cmd/tcl/testworkflow-init/data" + "github.com/kubeshop/testkube/cmd/tcl/testworkflow-toolkit/artifacts" + "github.com/kubeshop/testkube/cmd/tcl/testworkflow-toolkit/common" + "github.com/kubeshop/testkube/cmd/tcl/testworkflow-toolkit/env" + "github.com/kubeshop/testkube/cmd/tcl/testworkflow-toolkit/spawn" + "github.com/kubeshop/testkube/pkg/tcl/testworkflowstcl/testworkflowcontroller" + "github.com/kubeshop/testkube/pkg/tcl/testworkflowstcl/testworkflowprocessor/constants" + "github.com/kubeshop/testkube/pkg/ui" +) + +func NewKillCmd() *cobra.Command { + var ( + logs []string + ) + cmd := &cobra.Command{ + Use: "kill ", + Short: "Kill accompanying service(s)", + Args: cobra.ExactArgs(1), + + Run: func(cmd *cobra.Command, args []string) { + groupRef := args[0] + clientSet := env.Kubernetes() + + // Fast-track + if len(logs) == 0 { + os.Exit(0) + } + + // Fetch the services when needed + if len(logs) > 0 { + jobs, err := clientSet.BatchV1().Jobs(env.Namespace()).List(context.Background(), metav1.ListOptions{ + LabelSelector: fmt.Sprintf("%s=%s", constants.GroupIdLabelName, groupRef), + }) + ui.ExitOnError("listing service resources", err) + + services := make(map[string]int64) + ids := make([]string, 0) + for _, job := range jobs.Items { + service, _ := spawn.GetServiceByResourceId(job.Name) + if slices.Contains(logs, service) { + services[service]++ + ids = append(ids, job.Name) + } + } + + // Inform about detected services + for name, count := range services { + fmt.Printf("%s: fetching logs of %d instances\n", common.ServiceLabel(name), count) + } + + // Fetch logs for them + storage := artifacts.InternalStorage() + for _, id := range ids { + service, index := spawn.GetServiceByResourceId(id) + count := index + 1 + if services[service] > count { + count = services[service] + } + log := spawn.CreateLogger(service, "", index, count) + + logsFilePath, err := spawn.SaveLogs(context.Background(), clientSet, storage, env.Namespace(), id, service+"/", index) + if err == nil { + data.PrintOutput(env.Ref(), "service", ServiceInfo{Group: groupRef, Name: service, Index: index, Logs: storage.FullPath(logsFilePath)}) + log("saved logs") + } else { + log("warning", "problem saving the logs", err.Error()) + } + } + } + + err := testworkflowcontroller.CleanupGroup(context.Background(), clientSet, env.Namespace(), groupRef) + ui.ExitOnError("cleaning up resources", err) + }, + } + + cmd.Flags().StringArrayVarP(&logs, "logs", "l", nil, "fetch the logs for specific services") + + return cmd +} diff --git a/cmd/tcl/testworkflow-toolkit/commands/parallel.go b/cmd/tcl/testworkflow-toolkit/commands/parallel.go index a9870daa8a7..e7e7da8efd5 100644 --- a/cmd/tcl/testworkflow-toolkit/commands/parallel.go +++ b/cmd/tcl/testworkflow-toolkit/commands/parallel.go @@ -177,7 +177,7 @@ func NewParallelCmd() *cobra.Command { run := func(index int64, spec *testworkflowsv1.TestWorkflowSpec) bool { clientSet := env.Kubernetes() log := spawn.CreateLogger("worker", descriptions[index], index, params.Count) - id, machine := spawn.CreateExecutionMachine(index) + id, machine := spawn.CreateExecutionMachine("", index) updates <- Update{index: index} @@ -209,7 +209,7 @@ func NewParallelCmd() *cobra.Command { // Save logs if shouldSaveLogs { - logsFilePath, err := spawn.SaveLogs(context.Background(), clientSet, storage, namespace, id, index) + logsFilePath, err := spawn.SaveLogs(context.Background(), clientSet, storage, namespace, id, "", index) if err == nil { data.PrintOutput(env.Ref(), "parallel", ParallelStatus{Index: int(index), Logs: storage.FullPath(logsFilePath)}) log("saved logs") diff --git a/cmd/tcl/testworkflow-toolkit/commands/root.go b/cmd/tcl/testworkflow-toolkit/commands/root.go index 29d28de7fe3..00a4358f7f6 100644 --- a/cmd/tcl/testworkflow-toolkit/commands/root.go +++ b/cmd/tcl/testworkflow-toolkit/commands/root.go @@ -30,6 +30,8 @@ func init() { RootCmd.AddCommand(NewExecuteCmd()) RootCmd.AddCommand(NewArtifactsCmd()) RootCmd.AddCommand(NewParallelCmd()) + RootCmd.AddCommand(NewServicesCmd()) + RootCmd.AddCommand(NewKillCmd()) } var RootCmd = &cobra.Command{ diff --git a/cmd/tcl/testworkflow-toolkit/commands/services.go b/cmd/tcl/testworkflow-toolkit/commands/services.go new file mode 100644 index 00000000000..9f921f4dc65 --- /dev/null +++ b/cmd/tcl/testworkflow-toolkit/commands/services.go @@ -0,0 +1,388 @@ +// Copyright 2024 Testkube. +// +// Licensed as a Testkube Pro file under the Testkube Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/kubeshop/testkube/blob/main/licenses/TCL.txt + +package commands + +import ( + "context" + "encoding/json" + "fmt" + "math" + "os" + "strings" + "time" + + "github.com/pkg/errors" + "github.com/spf13/cobra" + corev1 "k8s.io/api/core/v1" + + testworkflowsv1 "github.com/kubeshop/testkube-operator/api/testworkflows/v1" + "github.com/kubeshop/testkube/cmd/tcl/testworkflow-init/data" + "github.com/kubeshop/testkube/cmd/tcl/testworkflow-toolkit/common" + "github.com/kubeshop/testkube/cmd/tcl/testworkflow-toolkit/env" + "github.com/kubeshop/testkube/cmd/tcl/testworkflow-toolkit/spawn" + "github.com/kubeshop/testkube/cmd/tcl/testworkflow-toolkit/transfer" + common2 "github.com/kubeshop/testkube/internal/common" + "github.com/kubeshop/testkube/pkg/tcl/expressionstcl" + "github.com/kubeshop/testkube/pkg/tcl/testworkflowstcl/testworkflowcontroller" + "github.com/kubeshop/testkube/pkg/tcl/testworkflowstcl/testworkflowprocessor" + "github.com/kubeshop/testkube/pkg/tcl/testworkflowstcl/testworkflowprocessor/constants" + "github.com/kubeshop/testkube/pkg/ui" +) + +type ServiceInstance struct { + Index int64 + Name string + Description string + Timeout *time.Duration + RestartPolicy corev1.RestartPolicy + ReadinessProbe *corev1.Probe + Spec testworkflowsv1.TestWorkflowSpec +} + +type ServiceState struct { + Ip string `json:"ip"` +} + +type ServiceStatus string + +const ( + ServiceStatusQueued ServiceStatus = "queued" + ServiceStatusRunning ServiceStatus = "running" + ServiceStatusReady ServiceStatus = "passed" + ServiceStatusFailed ServiceStatus = "failed" +) + +type ServiceInfo struct { + Group string `json:"group"` + Index int64 `json:"index"` + Name string `json:"name"` + Description string `json:"description,omitempty"` + Logs string `json:"logs,omitempty"` + Status ServiceStatus `json:"status,omitempty"` +} + +func NewServicesCmd() *cobra.Command { + var ( + groupRef string + ) + cmd := &cobra.Command{ + Use: "services ", + Short: "Start accompanying service(s)", + + Run: func(cmd *cobra.Command, pairs []string) { + // Initialize basic adapters + baseMachine := spawn.CreateBaseMachine() + inspector := env.ImageInspector() + transferSrv := transfer.NewServer(constants.DefaultTransferDirPath, env.IP(), constants.DefaultTransferPort) + + // Validate data + if groupRef == "" { + ui.Fail(errors.New("missing required --group for starting the services")) + } + + // Read the services to start + services := make(map[string]testworkflowsv1.ServiceSpec, len(pairs)) + for i := range pairs { + name, v, found := strings.Cut(pairs[i], "=") + if !found { + ui.Fail(fmt.Errorf("invalid service declaration: %s", pairs[i])) + } + var svc *testworkflowsv1.ServiceSpec + err := json.Unmarshal([]byte(v), &svc) + ui.ExitOnError("parsing service spec", err) + services[name] = *svc + + // Initialize empty array of details for each of the services + data.PrintHintDetails(env.Ref(), fmt.Sprintf("services.%s", name), []ServiceState{}) + } + + // Analyze instances to run + state := make(map[string][]ServiceState) + instances := make([]ServiceInstance, 0) + svcParams := make(map[string]*common.ParamsSpec) + for name, svc := range services { + // Resolve the params + params, err := common.GetParamsSpec(svc.Matrix, svc.Shards, svc.Count, svc.MaxCount, baseMachine) + ui.ExitOnError(fmt.Sprintf("%s: compute matrix and sharding", common.ServiceLabel(name)), err) + svcParams[name] = params + + // Ignore no instances + if params.Count == 0 { + fmt.Printf("%s: 0 instances requested (combinations=%d, count=%d), skipping\n", common.ServiceLabel(name), params.MatrixCount, params.ShardCount) + continue + } + + // Print information about the number of params + fmt.Printf("%s: %s\n", common.ServiceLabel(name), params.String(math.MaxInt64)) + + svcInstances := make([]ServiceInstance, params.Count) + for index := int64(0); index < params.Count; index++ { + machines := []expressionstcl.Machine{baseMachine, params.MachineAt(index)} + + // Clone the spec + svcSpec := svc.DeepCopy() + err = expressionstcl.Simplify(&svcSpec, machines...) + ui.ExitOnError(fmt.Sprintf("%s: %d: error", common.ServiceLabel(name), index), err) + + // Build the spec + spec := testworkflowsv1.TestWorkflowSpec{ + TestWorkflowSpecBase: testworkflowsv1.TestWorkflowSpecBase{ + Content: svcSpec.Content, + Container: common2.Ptr(svcSpec.ContainerConfig), + Pod: svcSpec.Pod, + }, + Steps: []testworkflowsv1.Step{ + {StepOperations: testworkflowsv1.StepOperations{Run: common2.Ptr(svcSpec.StepRun)}}, + }, + } + + // Transfer the data + if spec.Content == nil { + spec.Content = &testworkflowsv1.Content{} + } + tarballs, err := spawn.ProcessTransfer(transferSrv, svcSpec.Transfer, machines...) + ui.ExitOnError(fmt.Sprintf("%s: %d: error: transfer", common.ServiceLabel(name), index), err) + spec.Content.Tarball = append(spec.Content.Tarball, tarballs...) + + // Save the instance + svcInstances[index] = ServiceInstance{ + Index: index, + Name: name, + RestartPolicy: corev1.RestartPolicy(svcSpec.RestartPolicy), + ReadinessProbe: svcSpec.ReadinessProbe, + Spec: spec, + } + + // Save the timeout + if svcSpec.Timeout != "" { + v, err := expressionstcl.EvalTemplate(svcSpec.Timeout, machines...) + ui.ExitOnError(fmt.Sprintf("%s: %d: error: timeout expression", common.ServiceLabel(name), index), err) + d, err := time.ParseDuration(strings.ReplaceAll(v, " ", "")) + ui.ExitOnError(fmt.Sprintf("%s: %d: error: invalid timeout: %s:", common.ServiceLabel(name), index, v), err) + svcInstances[index].Timeout = &d + } + } + instances = append(instances, svcInstances...) + + // Update the state + state[name] = make([]ServiceState, len(svcInstances)) + data.PrintHintDetails(env.Ref(), fmt.Sprintf("services.%s", name), state) + } + + // Inform about each service instance + for _, instance := range instances { + data.PrintOutput(env.Ref(), "service", ServiceInfo{ + Group: groupRef, + Index: instance.Index, + Name: instance.Name, + Description: instance.Description, + Status: ServiceStatusQueued, + }) + } + + // Initialize transfer server if expected + if transferSrv.Count() > 0 || transferSrv.RequestsCount() > 0 { + infos := make([]string, 0) + if transferSrv.Count() > 0 { + infos = append(infos, fmt.Sprintf("sending %d tarballs", transferSrv.Count())) + } + if transferSrv.RequestsCount() > 0 { + infos = append(infos, fmt.Sprintf("fetching %d requests", transferSrv.RequestsCount())) + } + fmt.Printf("Starting transfer server for %s...\n", strings.Join(infos, " and ")) + if _, err := transferSrv.Listen(); err != nil { + ui.Fail(errors.Wrap(err, "failed to start transfer server")) + } + fmt.Printf("Transfer server started.\n") + } + + // Validate if there is anything to run + if len(instances) == 0 { + ui.SuccessAndExit("nothing to run") + } + + run := func(_ int64, instance *ServiceInstance) bool { + info := ServiceInfo{ + Group: groupRef, + Index: instance.Index, + Name: instance.Name, + Description: instance.Description, + Status: ServiceStatusQueued, + } + index := instance.Index + id, machine := spawn.CreateExecutionMachine(instance.Name+"-", index) + params := svcParams[instance.Name] + log := spawn.CreateLogger(instance.Name, instance.Description, index, params.Count) + clientSet := env.Kubernetes() + + // Build the resources bundle + scheduledAt := time.Now() + bundle, err := testworkflowprocessor.NewFullFeatured(inspector). + Bundle(context.Background(), &testworkflowsv1.TestWorkflow{Spec: instance.Spec}, machine, baseMachine, params.MachineAt(index)) + if err != nil { + log("error", "failed to build the service", err.Error()) + return false + } + ui.ExitOnError(fmt.Sprintf("%s: %d: failed to prepare resources", common.InstanceLabel(instance.Name, index, params.Count), index), err) + + // Apply the service specific data + // TODO: Handle RestartPolicy: Always? + if instance.RestartPolicy == "Never" { + bundle.Job.Spec.BackoffLimit = common2.Ptr(int32(0)) + bundle.Job.Spec.Template.Spec.RestartPolicy = "Never" + } else { + // TODO: Throw errors from the pod containers? Atm it will just end with "Success"... + bundle.Job.Spec.BackoffLimit = nil + bundle.Job.Spec.Template.Spec.RestartPolicy = "OnFailure" + } + bundle.Job.Spec.Template.Spec.Containers[0].ReadinessProbe = instance.ReadinessProbe + + // Add group recognition + testworkflowprocessor.AnnotateGroupId(&bundle.Job, groupRef) + for i := range bundle.ConfigMaps { + testworkflowprocessor.AnnotateGroupId(&bundle.ConfigMaps[i], groupRef) + } + for i := range bundle.Secrets { + testworkflowprocessor.AnnotateGroupId(&bundle.Secrets[i], groupRef) + } + + // Compute the bundle instructions + namespace := bundle.Job.Namespace + if namespace == "" { + namespace = env.Namespace() + } + mainRef := bundle.Job.Spec.Template.Spec.Containers[0].Name + + // Deploy the resources + // TODO: Avoid using Job + err = bundle.Deploy(context.Background(), clientSet, namespace) + if err != nil { + log("problem deploying", err.Error()) + return false + } + + // Handle timeout + timeoutCtx, timeoutCtxCancel := context.WithCancel(context.Background()) + defer timeoutCtxCancel() + if instance.Timeout != nil { + go func() { + select { + case <-timeoutCtx.Done(): + case <-time.After(*instance.Timeout): + log("timed out", instance.Timeout.String()+" elapsed") + timeoutCtxCancel() + } + }() + } + + // Control the execution + // TODO: Consider aggregated controller to limit number of watchers + ctx, ctxCancel := context.WithCancel(timeoutCtx) + defer ctxCancel() + ctrl, err := testworkflowcontroller.New(ctx, clientSet, namespace, id, scheduledAt, testworkflowcontroller.ControllerOptions{ + Timeout: spawn.ControllerTimeout, + }) + if err != nil { + log("error", "failed to connect to the job", err.Error()) + return false + } + log("created") + scheduled := false + started := false + for v := range ctrl.WatchLightweight(ctx) { + // Handle error + if v.Error != nil { + log("error", v.Error.Error()) + continue + } + + // Inform about the node assignment + if !scheduled && v.NodeName != "" { + scheduled = true + log(fmt.Sprintf("assigned to %s node", ui.LightBlue(v.NodeName))) + } + + if state[instance.Name][index].Ip == "" && v.PodIP != "" { + state[instance.Name][index].Ip = v.PodIP + log(fmt.Sprintf("assigned to %s IP", ui.LightBlue(v.PodIP))) + info.Status = ServiceStatusRunning + data.PrintOutput(env.Ref(), "service", info) + } + + if v.Current == mainRef { + started = true + if instance.ReadinessProbe == nil { + log("container started") + } else { + log("container started, waiting for readiness") + } + ctxCancel() + break + } + } + + // Fail if the container has not started + if !started { + info.Status = ServiceStatusFailed + log("container failed") + data.PrintOutput(env.Ref(), "service", info) + return false + } + + // Watch for container readiness + ready := instance.ReadinessProbe == nil + if !ready { + podWatcher := testworkflowcontroller.WatchMainPod(timeoutCtx, clientSet, namespace, id, 0) + for pod := range podWatcher.Channel() { + if pod.Error != nil { + log("error", pod.Error.Error()) + continue + } + + ready = pod.Value.Status.ContainerStatuses[0].Ready + if ready { + break + } + } + } + + if !ready { + log("container did not reach readiness") + info.Status = ServiceStatusFailed + } else { + log("container ready") + info.Status = ServiceStatusReady + } + data.PrintOutput(env.Ref(), "service", info) + + return ready + } + + // Start all the services + failed := spawn.ExecuteParallel(run, instances, int64(len(instances))) + + // Inform about the services state + for k := range state { + data.PrintHintDetails(env.Ref(), fmt.Sprintf("services.%s", k), state[k]) + } + + // Notify the results + if failed == 0 { + fmt.Printf("Successfully started %d workers.\n", len(instances)) + } else { + fmt.Printf("Failed to start %d out of %d expected workers.\n", failed, len(instances)) + os.Exit(1) + } + }, + } + + cmd.Flags().StringVarP(&groupRef, "group", "g", "", "services group reference") + + return cmd +} diff --git a/cmd/tcl/testworkflow-toolkit/spawn/utils.go b/cmd/tcl/testworkflow-toolkit/spawn/utils.go index 94887fd80f9..16d5f5baf9c 100644 --- a/cmd/tcl/testworkflow-toolkit/spawn/utils.go +++ b/cmd/tcl/testworkflow-toolkit/spawn/utils.go @@ -12,6 +12,7 @@ import ( "context" "encoding/json" "fmt" + "regexp" "strconv" "strings" "sync" @@ -166,9 +167,9 @@ func ProcessFetch(transferSrv transfer.Server, fetch []testworkflowsv1.StepParal }, nil } -func CreateExecutionMachine(index int64) (string, expressionstcl.Machine) { - id := fmt.Sprintf("%s-%d", env.ExecutionId(), index) - fsPrefix := fmt.Sprintf("%s/%d", env.Ref(), index+1) +func CreateExecutionMachine(prefix string, index int64) (string, expressionstcl.Machine) { + id := fmt.Sprintf("%s-%s%d", env.ExecutionId(), prefix, index) + fsPrefix := fmt.Sprintf("%s/%s%d", env.Ref(), prefix, index+1) if env.Config().Execution.FSPrefix != "" { fsPrefix = fmt.Sprintf("%s/%s", env.Config().Execution.FSPrefix, fsPrefix) } @@ -180,6 +181,19 @@ func CreateExecutionMachine(index int64) (string, expressionstcl.Machine) { Register("workflow.name", env.WorkflowName()) } +func GetServiceByResourceId(jobName string) (string, int64) { + regex := regexp.MustCompile(`-(.+?)-(\d+)$`) + v := regex.FindSubmatch([]byte(jobName)) + if v == nil { + return "", 0 + } + index, err := strconv.ParseInt(string(v[2]), 10, 64) + if err != nil { + return "", 0 + } + return string(v[1]), index +} + func ExecuteParallel[T any](run func(int64, *T) bool, items []T, parallelism int64) int64 { var wg sync.WaitGroup wg.Add(len(items)) @@ -201,8 +215,8 @@ func ExecuteParallel[T any](run func(int64, *T) bool, items []T, parallelism int return int64(len(items)) - success.Load() } -func SaveLogs(ctx context.Context, clientSet kubernetes.Interface, storage artifacts.InternalArtifactStorage, namespace, id string, index int64) (string, error) { - filePath := fmt.Sprintf("logs/%d.log", index) +func SaveLogs(ctx context.Context, clientSet kubernetes.Interface, storage artifacts.InternalArtifactStorage, namespace, id, prefix string, index int64) (string, error) { + filePath := fmt.Sprintf("logs/%s%d.log", prefix, index) ctrl, err := testworkflowcontroller.New(ctx, clientSet, namespace, id, time.Time{}, testworkflowcontroller.ControllerOptions{ Timeout: ControllerTimeout, }) diff --git a/go.mod b/go.mod index 75cf29e17ec..a0b217d611d 100644 --- a/go.mod +++ b/go.mod @@ -30,7 +30,7 @@ require ( github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 github.com/kelseyhightower/envconfig v1.4.0 github.com/kubepug/kubepug v1.7.1 - github.com/kubeshop/testkube-operator v1.17.33 + github.com/kubeshop/testkube-operator v1.15.2-beta1.0.20240516121220-6e48b6c3ec3b github.com/minio/minio-go/v7 v7.0.47 github.com/montanaflynn/stats v0.6.6 github.com/moogar0880/problems v0.1.1 diff --git a/go.sum b/go.sum index d62bce5d036..71d7abfd61b 100644 --- a/go.sum +++ b/go.sum @@ -366,8 +366,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/kubepug/kubepug v1.7.1 h1:LKhfSxS8Y5mXs50v+3Lpyec+cogErDLcV7CMUuiaisw= github.com/kubepug/kubepug v1.7.1/go.mod h1:lv+HxD0oTFL7ZWjj0u6HKhMbbTIId3eG7aWIW0gyF8g= -github.com/kubeshop/testkube-operator v1.17.33 h1:wzE8odtQBLJw8jvUNAQHHXGhm8zA78gS4R7KjChMQ9I= -github.com/kubeshop/testkube-operator v1.17.33/go.mod h1:P47tw1nKQFufdsZndyq2HG2MSa0zK/lU0XpRfZtEmIk= +github.com/kubeshop/testkube-operator v1.15.2-beta1.0.20240516121220-6e48b6c3ec3b h1:f6GsEyh+oi6DeBhbhWf8cOiRGrMSXX65+DrNoikdarM= +github.com/kubeshop/testkube-operator v1.15.2-beta1.0.20240516121220-6e48b6c3ec3b/go.mod h1:P47tw1nKQFufdsZndyq2HG2MSa0zK/lU0XpRfZtEmIk= github.com/leodido/go-urn v1.2.1 h1:BqpAaACuzVSgi/VLzGZIobT2z4v53pjosyNd9Yv6n/w= github.com/leodido/go-urn v1.2.1/go.mod h1:zt4jvISO2HfUBqxjfIshjdMTYS56ZS/qv49ictyFfxY= github.com/lithammer/fuzzysearch v1.1.8 h1:/HIuJnjHuXS8bKaiTMeeDlW2/AyIWk2brx1V8LFgLN4= diff --git a/pkg/api/v1/testkube/model_exec_action.go b/pkg/api/v1/testkube/model_exec_action.go new file mode 100644 index 00000000000..2ca69e60351 --- /dev/null +++ b/pkg/api/v1/testkube/model_exec_action.go @@ -0,0 +1,15 @@ +/* + * Testkube API + * + * Testkube provides a Kubernetes-native framework for test definition, execution and results + * + * API version: 1.0.0 + * Contact: testkube@kubeshop.io + * Generated by: Swagger Codegen (https://github.com/swagger-api/swagger-codegen.git) + */ +package testkube + +type ExecAction struct { + // Command is the command line to execute inside the container, the working directory for the command is root ('/') in the container's filesystem. Exit status of 0 is treated as live/healthy and non-zero is unhealthy. + Command []string `json:"command,omitempty"` +} diff --git a/pkg/api/v1/testkube/model_grpc_action.go b/pkg/api/v1/testkube/model_grpc_action.go new file mode 100644 index 00000000000..257d7d8f797 --- /dev/null +++ b/pkg/api/v1/testkube/model_grpc_action.go @@ -0,0 +1,15 @@ +/* + * Testkube API + * + * Testkube provides a Kubernetes-native framework for test definition, execution and results + * + * API version: 1.0.0 + * Contact: testkube@kubeshop.io + * Generated by: Swagger Codegen (https://github.com/swagger-api/swagger-codegen.git) + */ +package testkube + +type GrpcAction struct { + Port int32 `json:"port,omitempty"` + Service *BoxedString `json:"service,omitempty"` +} diff --git a/pkg/api/v1/testkube/model_http_get_action.go b/pkg/api/v1/testkube/model_http_get_action.go new file mode 100644 index 00000000000..7459a4011ee --- /dev/null +++ b/pkg/api/v1/testkube/model_http_get_action.go @@ -0,0 +1,18 @@ +/* + * Testkube API + * + * Testkube provides a Kubernetes-native framework for test definition, execution and results + * + * API version: 1.0.0 + * Contact: testkube@kubeshop.io + * Generated by: Swagger Codegen (https://github.com/swagger-api/swagger-codegen.git) + */ +package testkube + +type HttpGetAction struct { + Path string `json:"path,omitempty"` + Port string `json:"port,omitempty"` + Host string `json:"host,omitempty"` + Scheme string `json:"scheme,omitempty"` + HttpHeaders []HttpHeader `json:"httpHeaders,omitempty"` +} diff --git a/pkg/api/v1/testkube/model_http_header.go b/pkg/api/v1/testkube/model_http_header.go new file mode 100644 index 00000000000..b8169990ece --- /dev/null +++ b/pkg/api/v1/testkube/model_http_header.go @@ -0,0 +1,15 @@ +/* + * Testkube API + * + * Testkube provides a Kubernetes-native framework for test definition, execution and results + * + * API version: 1.0.0 + * Contact: testkube@kubeshop.io + * Generated by: Swagger Codegen (https://github.com/swagger-api/swagger-codegen.git) + */ +package testkube + +type HttpHeader struct { + Name string `json:"name,omitempty"` + Value string `json:"value,omitempty"` +} diff --git a/pkg/api/v1/testkube/model_probe.go b/pkg/api/v1/testkube/model_probe.go new file mode 100644 index 00000000000..16f47899bbf --- /dev/null +++ b/pkg/api/v1/testkube/model_probe.go @@ -0,0 +1,24 @@ +/* + * Testkube API + * + * Testkube provides a Kubernetes-native framework for test definition, execution and results + * + * API version: 1.0.0 + * Contact: testkube@kubeshop.io + * Generated by: Swagger Codegen (https://github.com/swagger-api/swagger-codegen.git) + */ +package testkube + +// Probe describes a health check to be performed against a container to determine whether it is alive or ready to receive traffic. +type Probe struct { + InitialDelaySeconds int32 `json:"initialDelaySeconds,omitempty"` + TimeoutSeconds int32 `json:"timeoutSeconds,omitempty"` + PeriodSeconds int32 `json:"periodSeconds,omitempty"` + SuccessThreshold int32 `json:"successThreshold,omitempty"` + FailureThreshold int32 `json:"failureThreshold,omitempty"` + TerminationGracePeriodSeconds *BoxedInteger `json:"terminationGracePeriodSeconds,omitempty"` + Exec *ExecAction `json:"exec,omitempty"` + HttpGet *HttpGetAction `json:"httpGet,omitempty"` + TcpSocket *TcpSocketAction `json:"tcpSocket,omitempty"` + Grpc *GrpcAction `json:"grpc,omitempty"` +} diff --git a/pkg/api/v1/testkube/model_tcp_socket_action.go b/pkg/api/v1/testkube/model_tcp_socket_action.go new file mode 100644 index 00000000000..afba7555b0d --- /dev/null +++ b/pkg/api/v1/testkube/model_tcp_socket_action.go @@ -0,0 +1,15 @@ +/* + * Testkube API + * + * Testkube provides a Kubernetes-native framework for test definition, execution and results + * + * API version: 1.0.0 + * Contact: testkube@kubeshop.io + * Generated by: Swagger Codegen (https://github.com/swagger-api/swagger-codegen.git) + */ +package testkube + +type TcpSocketAction struct { + Port string `json:"port,omitempty"` + Host string `json:"host,omitempty"` +} diff --git a/pkg/api/v1/testkube/model_test_workflow_independent_service_spec.go b/pkg/api/v1/testkube/model_test_workflow_independent_service_spec.go new file mode 100644 index 00000000000..087d2ebdd45 --- /dev/null +++ b/pkg/api/v1/testkube/model_test_workflow_independent_service_spec.go @@ -0,0 +1,42 @@ +/* + * Testkube API + * + * Testkube provides a Kubernetes-native framework for test definition, execution and results + * + * API version: 1.0.0 + * Contact: testkube@kubeshop.io + * Generated by: Swagger Codegen (https://github.com/swagger-api/swagger-codegen.git) + */ +package testkube + +type TestWorkflowIndependentServiceSpec struct { + WorkingDir *BoxedString `json:"workingDir,omitempty"` + // image to be used for the container + Image string `json:"image,omitempty"` + ImagePullPolicy *ImagePullPolicy `json:"imagePullPolicy,omitempty"` + // environment variables to append to the container + Env []EnvVar `json:"env,omitempty"` + // external environment variables to append to the container + EnvFrom []EnvFromSource `json:"envFrom,omitempty"` + Command *BoxedStringList `json:"command,omitempty"` + Args *BoxedStringList `json:"args,omitempty"` + Shell *BoxedString `json:"shell,omitempty"` + Resources *TestWorkflowResources `json:"resources,omitempty"` + SecurityContext *SecurityContext `json:"securityContext,omitempty"` + // volumes to mount to the container + VolumeMounts []VolumeMount `json:"volumeMounts,omitempty"` + // maximum time until reaching readiness + Timeout string `json:"timeout,omitempty"` + // list of files to send to parallel steps + Transfer []TestWorkflowStepParallelTransfer `json:"transfer,omitempty"` + Content *TestWorkflowContent `json:"content,omitempty"` + Pod *TestWorkflowPodConfig `json:"pod,omitempty"` + RestartPolicy string `json:"restartPolicy,omitempty"` + ReadinessProbe *Probe `json:"readinessProbe,omitempty"` + Count *BoxedString `json:"count,omitempty"` + MaxCount *BoxedString `json:"maxCount,omitempty"` + // matrix of parameters to spawn instances + Matrix map[string]interface{} `json:"matrix,omitempty"` + // parameters that should be distributed across sharded instances + Shards map[string]interface{} `json:"shards,omitempty"` +} diff --git a/pkg/api/v1/testkube/model_test_workflow_independent_step.go b/pkg/api/v1/testkube/model_test_workflow_independent_step.go index 1773ce1a4bf..8e733d0466e 100644 --- a/pkg/api/v1/testkube/model_test_workflow_independent_step.go +++ b/pkg/api/v1/testkube/model_test_workflow_independent_step.go @@ -24,8 +24,9 @@ type TestWorkflowIndependentStep struct { // maximum time this step may take Timeout string `json:"timeout,omitempty"` // delay before the step - Delay string `json:"delay,omitempty"` - Content *TestWorkflowContent `json:"content,omitempty"` + Delay string `json:"delay,omitempty"` + Content *TestWorkflowContent `json:"content,omitempty"` + Services map[string]TestWorkflowIndependentServiceSpec `json:"services,omitempty"` // script to run in a default shell for the container Shell string `json:"shell,omitempty"` Run *TestWorkflowStepRun `json:"run,omitempty"` diff --git a/pkg/api/v1/testkube/model_test_workflow_independent_step_parallel.go b/pkg/api/v1/testkube/model_test_workflow_independent_step_parallel.go index bfceb5f2a7a..3ca736332e0 100644 --- a/pkg/api/v1/testkube/model_test_workflow_independent_step_parallel.go +++ b/pkg/api/v1/testkube/model_test_workflow_independent_step_parallel.go @@ -40,14 +40,15 @@ type TestWorkflowIndependentStepParallel struct { // list of files to send to parallel steps Transfer []TestWorkflowStepParallelTransfer `json:"transfer,omitempty"` // list of files to fetch from parallel steps - Fetch []TestWorkflowStepParallelFetch `json:"fetch,omitempty"` - Config map[string]TestWorkflowParameterSchema `json:"config,omitempty"` - Content *TestWorkflowContent `json:"content,omitempty"` - Container *TestWorkflowContainerConfig `json:"container,omitempty"` - Job *TestWorkflowJobConfig `json:"job,omitempty"` - Pod *TestWorkflowPodConfig `json:"pod,omitempty"` - Setup []TestWorkflowIndependentStep `json:"setup,omitempty"` - Steps []TestWorkflowIndependentStep `json:"steps,omitempty"` - After []TestWorkflowIndependentStep `json:"after,omitempty"` - Events []TestWorkflowEvent `json:"events,omitempty"` + Fetch []TestWorkflowStepParallelFetch `json:"fetch,omitempty"` + Config map[string]TestWorkflowParameterSchema `json:"config,omitempty"` + Content *TestWorkflowContent `json:"content,omitempty"` + Services map[string]TestWorkflowIndependentServiceSpec `json:"services,omitempty"` + Container *TestWorkflowContainerConfig `json:"container,omitempty"` + Job *TestWorkflowJobConfig `json:"job,omitempty"` + Pod *TestWorkflowPodConfig `json:"pod,omitempty"` + Setup []TestWorkflowIndependentStep `json:"setup,omitempty"` + Steps []TestWorkflowIndependentStep `json:"steps,omitempty"` + After []TestWorkflowIndependentStep `json:"after,omitempty"` + Events []TestWorkflowEvent `json:"events,omitempty"` } diff --git a/pkg/api/v1/testkube/model_test_workflow_service_spec.go b/pkg/api/v1/testkube/model_test_workflow_service_spec.go new file mode 100644 index 00000000000..e1174c2d1ba --- /dev/null +++ b/pkg/api/v1/testkube/model_test_workflow_service_spec.go @@ -0,0 +1,43 @@ +/* + * Testkube API + * + * Testkube provides a Kubernetes-native framework for test definition, execution and results + * + * API version: 1.0.0 + * Contact: testkube@kubeshop.io + * Generated by: Swagger Codegen (https://github.com/swagger-api/swagger-codegen.git) + */ +package testkube + +type TestWorkflowServiceSpec struct { + WorkingDir *BoxedString `json:"workingDir,omitempty"` + // image to be used for the container + Image string `json:"image,omitempty"` + ImagePullPolicy *ImagePullPolicy `json:"imagePullPolicy,omitempty"` + // environment variables to append to the container + Env []EnvVar `json:"env,omitempty"` + // external environment variables to append to the container + EnvFrom []EnvFromSource `json:"envFrom,omitempty"` + Command *BoxedStringList `json:"command,omitempty"` + Args *BoxedStringList `json:"args,omitempty"` + Shell *BoxedString `json:"shell,omitempty"` + Resources *TestWorkflowResources `json:"resources,omitempty"` + SecurityContext *SecurityContext `json:"securityContext,omitempty"` + // volumes to mount to the container + VolumeMounts []VolumeMount `json:"volumeMounts,omitempty"` + // maximum time until reaching readiness + Timeout string `json:"timeout,omitempty"` + // list of files to send to parallel steps + Transfer []TestWorkflowStepParallelTransfer `json:"transfer,omitempty"` + Content *TestWorkflowContent `json:"content,omitempty"` + Pod *TestWorkflowPodConfig `json:"pod,omitempty"` + RestartPolicy string `json:"restartPolicy,omitempty"` + ReadinessProbe *Probe `json:"readinessProbe,omitempty"` + Use []TestWorkflowTemplateRef `json:"use,omitempty"` + Count *BoxedString `json:"count,omitempty"` + MaxCount *BoxedString `json:"maxCount,omitempty"` + // matrix of parameters to spawn instances + Matrix map[string]interface{} `json:"matrix,omitempty"` + // parameters that should be distributed across sharded instances + Shards map[string]interface{} `json:"shards,omitempty"` +} diff --git a/pkg/api/v1/testkube/model_test_workflow_spec.go b/pkg/api/v1/testkube/model_test_workflow_spec.go index 66df4198014..dace346a30d 100644 --- a/pkg/api/v1/testkube/model_test_workflow_spec.go +++ b/pkg/api/v1/testkube/model_test_workflow_spec.go @@ -13,6 +13,7 @@ type TestWorkflowSpec struct { Use []TestWorkflowTemplateRef `json:"use,omitempty"` Config map[string]TestWorkflowParameterSchema `json:"config,omitempty"` Content *TestWorkflowContent `json:"content,omitempty"` + Services map[string]TestWorkflowServiceSpec `json:"services,omitempty"` Container *TestWorkflowContainerConfig `json:"container,omitempty"` Job *TestWorkflowJobConfig `json:"job,omitempty"` Pod *TestWorkflowPodConfig `json:"pod,omitempty"` diff --git a/pkg/api/v1/testkube/model_test_workflow_step.go b/pkg/api/v1/testkube/model_test_workflow_step.go index d1f1a5ecb6b..c390747c359 100644 --- a/pkg/api/v1/testkube/model_test_workflow_step.go +++ b/pkg/api/v1/testkube/model_test_workflow_step.go @@ -27,8 +27,9 @@ type TestWorkflowStep struct { // maximum time this step may take Timeout string `json:"timeout,omitempty"` // delay before the step - Delay string `json:"delay,omitempty"` - Content *TestWorkflowContent `json:"content,omitempty"` + Delay string `json:"delay,omitempty"` + Content *TestWorkflowContent `json:"content,omitempty"` + Services map[string]TestWorkflowServiceSpec `json:"services,omitempty"` // script to run in a default shell for the container Shell string `json:"shell,omitempty"` Run *TestWorkflowStepRun `json:"run,omitempty"` diff --git a/pkg/api/v1/testkube/model_test_workflow_step_parallel.go b/pkg/api/v1/testkube/model_test_workflow_step_parallel.go index 158c63b97ee..d8b4de8e7ff 100644 --- a/pkg/api/v1/testkube/model_test_workflow_step_parallel.go +++ b/pkg/api/v1/testkube/model_test_workflow_step_parallel.go @@ -45,6 +45,7 @@ type TestWorkflowStepParallel struct { Use []TestWorkflowTemplateRef `json:"use,omitempty"` Config map[string]TestWorkflowParameterSchema `json:"config,omitempty"` Content *TestWorkflowContent `json:"content,omitempty"` + Services map[string]TestWorkflowServiceSpec `json:"services,omitempty"` Container *TestWorkflowContainerConfig `json:"container,omitempty"` Job *TestWorkflowJobConfig `json:"job,omitempty"` Pod *TestWorkflowPodConfig `json:"pod,omitempty"` diff --git a/pkg/api/v1/testkube/model_test_workflow_template_spec.go b/pkg/api/v1/testkube/model_test_workflow_template_spec.go index fc3d2f9498d..d920ce791a0 100644 --- a/pkg/api/v1/testkube/model_test_workflow_template_spec.go +++ b/pkg/api/v1/testkube/model_test_workflow_template_spec.go @@ -10,13 +10,14 @@ package testkube type TestWorkflowTemplateSpec struct { - Config map[string]TestWorkflowParameterSchema `json:"config,omitempty"` - Content *TestWorkflowContent `json:"content,omitempty"` - Container *TestWorkflowContainerConfig `json:"container,omitempty"` - Job *TestWorkflowJobConfig `json:"job,omitempty"` - Pod *TestWorkflowPodConfig `json:"pod,omitempty"` - Setup []TestWorkflowIndependentStep `json:"setup,omitempty"` - Steps []TestWorkflowIndependentStep `json:"steps,omitempty"` - After []TestWorkflowIndependentStep `json:"after,omitempty"` - Events []TestWorkflowEvent `json:"events,omitempty"` + Config map[string]TestWorkflowParameterSchema `json:"config,omitempty"` + Content *TestWorkflowContent `json:"content,omitempty"` + Services map[string]TestWorkflowIndependentServiceSpec `json:"services,omitempty"` + Container *TestWorkflowContainerConfig `json:"container,omitempty"` + Job *TestWorkflowJobConfig `json:"job,omitempty"` + Pod *TestWorkflowPodConfig `json:"pod,omitempty"` + Setup []TestWorkflowIndependentStep `json:"setup,omitempty"` + Steps []TestWorkflowIndependentStep `json:"steps,omitempty"` + After []TestWorkflowIndependentStep `json:"after,omitempty"` + Events []TestWorkflowEvent `json:"events,omitempty"` } diff --git a/pkg/event/bus/nats_integration_test.go b/pkg/event/bus/nats_integration_test.go index 578260bf38b..e783c45bdd3 100644 --- a/pkg/event/bus/nats_integration_test.go +++ b/pkg/event/bus/nats_integration_test.go @@ -7,6 +7,7 @@ import ( "sync/atomic" "testing" + "github.com/kubeshop/testkube/internal/config" "github.com/kubeshop/testkube/pkg/utils/test" "github.com/nats-io/nats.go" @@ -15,11 +16,15 @@ import ( "github.com/kubeshop/testkube/pkg/api/v1/testkube" ) +var ( + cfg, _ = config.Get() +) + func TestMultipleMessages_Integration(t *testing.T) { test.IntegrationTest(t) // given NATS connection - nc, err := nats.Connect("localhost") + nc, err := nats.Connect(cfg.NatsURI) assert.NoError(t, err) defer nc.Close() @@ -81,7 +86,7 @@ func TestNATS_Integration(t *testing.T) { event.Id = "123" // and connection - nc, err := nats.Connect("localhost") + nc, err := nats.Connect(cfg.NatsURI) assert.NoError(t, err) defer nc.Close() diff --git a/pkg/event/bus/testserver.go b/pkg/event/bus/testserver.go index 9087f3bfb6c..0c765c5db1a 100644 --- a/pkg/event/bus/testserver.go +++ b/pkg/event/bus/testserver.go @@ -5,7 +5,7 @@ import ( "github.com/nats-io/nats-server/v2/server" natsserver "github.com/nats-io/nats-server/v2/test" - nats "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go" ) func TestServerWithConnection() (*server.Server, *nats.Conn) { diff --git a/pkg/event/emitter_integration_test.go b/pkg/event/emitter_integration_test.go index b699e5ede6e..138e5e88556 100644 --- a/pkg/event/emitter_integration_test.go +++ b/pkg/event/emitter_integration_test.go @@ -6,6 +6,7 @@ import ( "testing" "time" + "github.com/kubeshop/testkube/pkg/logs/config" "github.com/kubeshop/testkube/pkg/utils/test" "github.com/stretchr/testify/assert" @@ -15,13 +16,17 @@ import ( "github.com/kubeshop/testkube/pkg/event/kind/dummy" ) +var ( + cfg, _ = config.Get() +) + // tests based on real NATS event bus func GetTestNATSEmitter() *Emitter { os.Setenv("DEBUG", "true") // configure NATS event bus nc, err := bus.NewNATSEncodedConnection(bus.ConnectionConfig{ - NatsURI: "http://localhost:4222", + NatsURI: cfg.NatsURI, }) if err != nil { diff --git a/pkg/executor/scraper/minio_scraper_integration_test.go b/pkg/executor/scraper/minio_scraper_integration_test.go index 1192eec0411..7c33085d2a0 100644 --- a/pkg/executor/scraper/minio_scraper_integration_test.go +++ b/pkg/executor/scraper/minio_scraper_integration_test.go @@ -8,6 +8,7 @@ import ( "path/filepath" "testing" + "github.com/kubeshop/testkube/internal/config" "github.com/kubeshop/testkube/pkg/utils/test" cloudevents "github.com/cloudevents/sdk-go/v2" @@ -21,6 +22,10 @@ import ( "github.com/kubeshop/testkube/pkg/storage/minio" ) +var ( + cfg, _ = config.Get() +) + func TestMinIOScraper_Archive_Integration(t *testing.T) { test.IntegrationTest(t) t.Parallel() @@ -52,17 +57,17 @@ func TestMinIOScraper_Archive_Integration(t *testing.T) { extractor := scraper.NewArchiveFilesystemExtractor(filesystem.NewOSFileSystem()) loader, err := scraper.NewMinIOUploader( - "localhost:9000", - "minio99", - "minio123", - "us-east-1", - "", + cfg.StorageEndpoint, + cfg.StorageAccessKeyID, + cfg.StorageSecretAccessKey, + cfg.StorageRegion, + cfg.StorageToken, "test-bucket-asdf", - false, - false, - "", - "", - "", + cfg.StorageSSL, + cfg.StorageSkipVerify, + cfg.StorageCertFile, + cfg.StorageKeyFile, + cfg.StorageCAFile, ) if err != nil { t.Fatalf("error creating minio loader: %v", err) @@ -89,7 +94,7 @@ func TestMinIOScraper_Archive_Integration(t *testing.T) { t.Fatalf("error scraping: %v", err) } - c := minio.NewClient("localhost:9000", "minio99", "minio123", "us-east-1", "", "test-bucket-asdf") + c := minio.NewClient(cfg.StorageEndpoint, cfg.StorageAccessKeyID, cfg.StorageSecretAccessKey, cfg.StorageRegion, cfg.StorageToken, "test-bucket-asdf") assert.NoError(t, c.Connect()) artifacts, err := c.ListFiles(context.Background(), "test-bucket-asdf") if err != nil { @@ -132,17 +137,17 @@ func TestMinIOScraper_Recursive_Integration(t *testing.T) { bucketName := "test-bucket-asdf1" loader, err := scraper.NewMinIOUploader( - "localhost:9000", - "minio99", - "minio123", - "us-east-1", - "", + cfg.StorageEndpoint, + cfg.StorageAccessKeyID, + cfg.StorageSecretAccessKey, + cfg.StorageRegion, + cfg.StorageToken, bucketName, - false, - false, - "", - "", - "", + cfg.StorageSSL, + cfg.StorageSkipVerify, + cfg.StorageCertFile, + cfg.StorageKeyFile, + cfg.StorageCAFile, ) if err != nil { t.Fatalf("error creating minio loader: %v", err) @@ -169,7 +174,7 @@ func TestMinIOScraper_Recursive_Integration(t *testing.T) { t.Fatalf("error scraping: %v", err) } - c := minio.NewClient("localhost:9000", "minio99", "minio123", "us-east-1", "", bucketName) + c := minio.NewClient(cfg.StorageEndpoint, cfg.StorageAccessKeyID, cfg.StorageSecretAccessKey, cfg.StorageRegion, cfg.StorageToken, bucketName) assert.NoError(t, c.Connect()) artifacts, err := c.ListFiles(context.Background(), bucketName) if err != nil { diff --git a/pkg/executor/scraper/minio_uploader_integration_test.go b/pkg/executor/scraper/minio_uploader_integration_test.go index 51434803139..92f5fc909b3 100644 --- a/pkg/executor/scraper/minio_uploader_integration_test.go +++ b/pkg/executor/scraper/minio_uploader_integration_test.go @@ -23,17 +23,17 @@ func TestMinIOUploader_Upload_Tarball_Integration(t *testing.T) { // Create a new MinIO uploader with the appropriate configuration uploader, err := scraper.NewMinIOUploader( - "localhost:9000", - "minio99", - "minio123", - "us-east-1", - "", + cfg.StorageEndpoint, + cfg.StorageAccessKeyID, + cfg.StorageSecretAccessKey, + cfg.StorageRegion, + cfg.StorageToken, "test-bucket-fsgds", - false, - false, - "", - "", - "", + cfg.StorageSSL, + cfg.StorageSkipVerify, + cfg.StorageCertFile, + cfg.StorageKeyFile, + cfg.StorageCAFile, ) if err != nil { t.Fatalf("failed to create MinIO loader: %v", err) @@ -66,11 +66,11 @@ func TestMinIOUploader_Upload_Tarball_Integration(t *testing.T) { } m := minio.NewClient( - "localhost:9000", - "minio99", - "minio123", - "us-east-1", - "", + cfg.StorageEndpoint, + cfg.StorageAccessKeyID, + cfg.StorageSecretAccessKey, + cfg.StorageRegion, + cfg.StorageToken, "test-bucket-fsgds", ) if err := m.Connect(); err != nil { @@ -89,17 +89,17 @@ func TestMinIOUploader_Upload_Raw_Integration(t *testing.T) { // Create a new MinIO loader with the appropriate configuration uploader, err := scraper.NewMinIOUploader( - "localhost:9000", - "minio99", - "minio123", - "us-east-1", - "", + cfg.StorageEndpoint, + cfg.StorageAccessKeyID, + cfg.StorageSecretAccessKey, + cfg.StorageRegion, + cfg.StorageToken, "test-bucket-hgfhfg", - false, - false, - "", - "", - "", + cfg.StorageSSL, + cfg.StorageSkipVerify, + cfg.StorageCertFile, + cfg.StorageKeyFile, + cfg.StorageCAFile, ) if err != nil { t.Fatalf("failed to create MinIO loader: %v", err) @@ -125,11 +125,11 @@ func TestMinIOUploader_Upload_Raw_Integration(t *testing.T) { } m := minio.NewClient( - "localhost:9000", - "minio99", - "minio123", - "us-east-1", - "", + cfg.StorageEndpoint, + cfg.StorageAccessKeyID, + cfg.StorageSecretAccessKey, + cfg.StorageRegion, + cfg.StorageToken, "test-bucket-hgfhfg", ) if err := m.Connect(); err != nil { diff --git a/pkg/logs/adapter/minio_test.go b/pkg/logs/adapter/minio_test.go index eb941de82c7..127b5c62d5b 100644 --- a/pkg/logs/adapter/minio_test.go +++ b/pkg/logs/adapter/minio_test.go @@ -16,12 +16,17 @@ import ( "github.com/minio/minio-go/v7" "github.com/stretchr/testify/assert" + "github.com/kubeshop/testkube/pkg/logs/config" "github.com/kubeshop/testkube/pkg/logs/events" "github.com/kubeshop/testkube/pkg/utils" ) const hugeString = "82vbUcyQ0chpR665zbXY2mySOk7DGDFQCF1iLFjDNUYtNV8oQNaX3IYgJR30zBVhmDVjoZDJXO479tGSirHilZWEbzhjKJOdUwGb2HWOSOOjGh5r5wH0EHxRiOp8mBJv2rwdB2SoKF7JTBFgRt9M8F0JKp2Zx5kqh8eOGB1DGj64NLmwIpfuevJSv0wbDLrls5kEL5hHkszXPsuufVjJBsjNrxCoafuk93L2jE3ivVrMlkmLd9XAWKdop0oo0yRMJ9Vs1T5SZTkM6KXJB5hY3c14NsoPiG9Ay4EZmXrGpzGWI3RLAU6snXL8kV9sVLCG5DuRDnW047VR8eb78fpVj8YY3o9xpZd7xYPAhsmK0SwznHfrb0etAqdjQO6LFS9Blwre3G94DG5scVFH8RfteVNgKJXa8lTp8kKjtQLKNNA9mqyWfJ7uy8yjnVKwl7rodKqdtU6wjH2hf597MXA3roIS2xVhFpsCAVDybo9TVvZpoGfE9povhApoUR6Rmae9zvXPRoDbClOrvDElFkfgkJFzuoY2rPoV3dKuiTNwhYgPm36WPRk3SeFf2NiBQnWJBvjbRMIk5DsGfxcEiXQBfDvY4hgFctjwZ3USvWGriqT1cPsJ90LMLxbp38TRD1KVJ8ZgpqdvKTTi8dBqgEtob7okhdrkOahHJ3EKPtqV4PmaHvXSaIJvDG9c8jza64wxYBwMkHGt22i3HhCcIi8KmmfVo1ruqQLqKvINJg8eD5rKGV1mX9IipQcnrqADYnAj1wls7NSxsL0VZZm2pxRaGN494o2LCicHGEcOYkVLHufXY4Gv3friOIZSrT1r3NUgDBufpXWiG2b02TrRyFhgwRSS1a2OyMjHkT9tALmlIwFGF5HdaZphN6Mo5TFGdJyp65YU1scnlSGAVXzVdhsoD0RDZPSetdK2fzJC20kncaujAujHtSKnXrJNIhObnOjgMhCkx5E4z0oIH26DlfrbxS7k5SBQb1Zo3papQOk4uTNIdMBW4cE3V7AB8r6v4en3" +var ( + cfg, _ = config.Get() +) + func init() { rand.New(rand.NewSource(time.Now().UnixNano())) } @@ -39,7 +44,7 @@ func RandString(n int) string { func TestLogs(t *testing.T) { t.Skip("skipping test") ctx := context.Background() - consumer, _ := NewMinioAdapter("localhost:9000", "minio", "minio123", "", "", "test-1", false, false, "", "", "") + consumer, _ := NewMinioAdapter(cfg.StorageEndpoint, cfg.StorageAccessKeyID, cfg.StorageSecretAccessKey, cfg.StorageRegion, cfg.StorageToken, "test-1", cfg.StorageSSL, cfg.StorageSkipVerify, cfg.StorageCertFile, cfg.StorageKeyFile, cfg.StorageCAFile) id := "test-bla" for i := 0; i < 1000; i++ { fmt.Println("sending", i) @@ -56,8 +61,8 @@ func BenchmarkLogs(b *testing.B) { ctx := context.Background() randomString := RandString(5) bucket := "test-bench" - consumer, _ := NewMinioAdapter("localhost:9000", "minio", "minio123", "", "", bucket, false, false, "", "", "") - id := "test-bench" + "-" + randomString + "-" + strconv.Itoa(b.N) + consumer, _ := NewMinioAdapter(cfg.StorageEndpoint, cfg.StorageAccessKeyID, cfg.StorageSecretAccessKey, cfg.StorageRegion, cfg.StorageToken, bucket, cfg.StorageSSL, cfg.StorageSkipVerify, cfg.StorageCertFile, cfg.StorageKeyFile, cfg.StorageCAFile) + id := bucket + "-" + randomString + "-" + strconv.Itoa(b.N) totalSize := 0 for i := 0; i < b.N; i++ { consumer.Notify(ctx, id, events.Log{Time: time.Now(), @@ -73,7 +78,7 @@ func BenchmarkLogs(b *testing.B) { func BenchmarkLogs2(b *testing.B) { bucket := "test-bench" - consumer, _ := NewMinioAdapter("localhost:9000", "minio", "minio123", "", "", bucket, false, false, "", "", "") + consumer, _ := NewMinioAdapter(cfg.StorageEndpoint, cfg.StorageAccessKeyID, cfg.StorageSecretAccessKey, cfg.StorageRegion, cfg.StorageToken, bucket, cfg.StorageSSL, cfg.StorageSkipVerify, cfg.StorageCertFile, cfg.StorageKeyFile, cfg.StorageCAFile) idChan := make(chan string, 100) go verifyConsumer(idChan, bucket, consumer.minioClient) var counter atomic.Int32 @@ -160,7 +165,7 @@ func verifyConsumer(idChan chan string, bucket string, minioClient *minio.Client func DoRunBenchmark() { numberOfConsumers := 100 bucket := "test-bench" - consumer, _ := NewMinioAdapter("testkube-minio-service-testkube:9000", "minio", "minio123", "", "", bucket, false, false, "", "", "") + consumer, _ := NewMinioAdapter(cfg.StorageEndpoint, cfg.StorageAccessKeyID, cfg.StorageSecretAccessKey, cfg.StorageRegion, cfg.StorageToken, bucket, cfg.StorageSSL, cfg.StorageSkipVerify, cfg.StorageCertFile, cfg.StorageKeyFile, cfg.StorageCAFile) idChan := make(chan string, numberOfConsumers) DoRunBenchmark2(idChan, numberOfConsumers, consumer) diff --git a/pkg/logs/adapter/minio_v2_test.go b/pkg/logs/adapter/minio_v2_test.go index 28325e033ca..106acb98dd8 100644 --- a/pkg/logs/adapter/minio_v2_test.go +++ b/pkg/logs/adapter/minio_v2_test.go @@ -26,7 +26,7 @@ func generateWideLine(sizeKb int) string { func TestLogsV2Local(t *testing.T) { t.Skip("only local") ctx := context.Background() - consumer, _ := NewMinioV2Adapter("127.0.0.1:9000", "minioadmin", "minioadmin", "", "", "test-1", false, false, "", "", "") + consumer, _ := NewMinioV2Adapter(cfg.StorageEndpoint, cfg.StorageAccessKeyID, cfg.StorageSecretAccessKey, cfg.StorageRegion, cfg.StorageToken, "test-1", cfg.StorageSSL, cfg.StorageSkipVerify, cfg.StorageCertFile, cfg.StorageKeyFile, cfg.StorageCAFile) consumer.WithPath("./") id := "test-bla" err := consumer.Init(ctx, id) diff --git a/pkg/repository/config/mongo_test.go b/pkg/repository/config/mongo_test.go index 5ec0fa172fb..dcc37362c39 100644 --- a/pkg/repository/config/mongo_test.go +++ b/pkg/repository/config/mongo_test.go @@ -4,6 +4,7 @@ import ( "context" "testing" + "github.com/kubeshop/testkube/internal/config" "github.com/kubeshop/testkube/pkg/utils/test" "github.com/kubeshop/testkube/pkg/repository/storage" @@ -14,12 +15,15 @@ import ( ) const ( - mongoDns = "mongodb://localhost:27017" mongoDbName = "testkube-test" ) +var ( + cfg, _ = config.Get() +) + func getRepository() (*MongoRepository, error) { - db, err := storage.GetMongoDatabase(mongoDns, mongoDbName, storage.TypeMongoDB, false, nil) + db, err := storage.GetMongoDatabase(cfg.APIMongoDSN, mongoDbName, storage.TypeMongoDB, false, nil) repository := NewMongoRepository(db) return repository, err } diff --git a/pkg/repository/result/mongo_integration_test.go b/pkg/repository/result/mongo_integration_test.go index 3681f993f62..3b9a6cfcb98 100644 --- a/pkg/repository/result/mongo_integration_test.go +++ b/pkg/repository/result/mongo_integration_test.go @@ -11,6 +11,7 @@ import ( "github.com/stretchr/testify/assert" + "github.com/kubeshop/testkube/internal/config" "github.com/kubeshop/testkube/pkg/utils/test" "github.com/kubeshop/testkube/pkg/datefilter" @@ -23,10 +24,13 @@ import ( ) const ( - mongoDns = "mongodb://localhost:27017" mongoDbName = "testkube-test" ) +var ( + cfg, _ = config.Get() +) + func TestStorage_Integration(t *testing.T) { test.IntegrationTest(t) assert := require.New(t) @@ -369,7 +373,7 @@ func TestTestExecutionsMetrics_Integration(t *testing.T) { } func getRepository() (*MongoRepository, error) { - db, err := storage.GetMongoDatabase(mongoDns, mongoDbName, storage.TypeMongoDB, false, nil) + db, err := storage.GetMongoDatabase(cfg.APIMongoDSN, mongoDbName, storage.TypeMongoDB, false, nil) repository := NewMongoRepository(db, true, false) return repository, err } diff --git a/pkg/repository/testresult/mongo_test.go b/pkg/repository/testresult/mongo_test.go index 6f5d6b56344..0da4414bb28 100644 --- a/pkg/repository/testresult/mongo_test.go +++ b/pkg/repository/testresult/mongo_test.go @@ -7,6 +7,7 @@ import ( "testing" "time" + "github.com/kubeshop/testkube/internal/config" "github.com/kubeshop/testkube/pkg/repository/storage" "github.com/stretchr/testify/require" @@ -16,10 +17,13 @@ import ( ) const ( - mongoDns = "mongodb://localhost:27017" mongoDbName = "testkube-test" ) +var ( + cfg, _ = config.Get() +) + func TestTestExecutionsMetrics(t *testing.T) { assert := require.New(t) @@ -106,7 +110,7 @@ func TestTestExecutionsMetrics(t *testing.T) { } func getRepository() (*MongoRepository, error) { - db, err := storage.GetMongoDatabase(mongoDns, mongoDbName, storage.TypeMongoDB, false, nil) + db, err := storage.GetMongoDatabase(cfg.APIMongoDSN, mongoDbName, storage.TypeMongoDB, false, nil) repository := NewMongoRepository(db, true, false) return repository, err } diff --git a/pkg/storage/minio/artifacts_storage_integration_test.go b/pkg/storage/minio/artifacts_storage_integration_test.go index 85712de8b64..a49a9195d3a 100644 --- a/pkg/storage/minio/artifacts_storage_integration_test.go +++ b/pkg/storage/minio/artifacts_storage_integration_test.go @@ -7,6 +7,7 @@ import ( "os" "testing" + "github.com/kubeshop/testkube/internal/config" "github.com/kubeshop/testkube/pkg/utils/test" "github.com/minio/minio-go/v7" @@ -14,19 +15,23 @@ import ( "github.com/stretchr/testify/assert" ) +var ( + cfg, _ = config.Get() +) + func TestArtifactClient(t *testing.T) { test.IntegrationTest(t) t.Parallel() - directMinioClient, err := minio.New("localhost:9000", &minio.Options{ - Creds: credentials.NewStaticV4("minio99", "minio123", ""), - Secure: false, + directMinioClient, err := minio.New(cfg.StorageEndpoint, &minio.Options{ + Creds: credentials.NewStaticV4(cfg.StorageAccessKeyID, cfg.StorageSecretAccessKey, cfg.StorageToken), + Secure: cfg.StorageSSL, }) if err != nil { t.Fatalf("unable to create direct minio client: %v", err) } // Prepare MinIO client - minioClient := NewClient("localhost:9000", "minio99", "minio123", "us-east-1", "", "test-bucket-fsdf") + minioClient := NewClient(cfg.StorageEndpoint, cfg.StorageAccessKeyID, cfg.StorageSecretAccessKey, cfg.StorageRegion, cfg.StorageToken, cfg.StorageBucket) if err := minioClient.Connect(); err != nil { t.Fatalf("unable to connect to minio: %v", err) } @@ -41,7 +46,7 @@ func TestArtifactClient(t *testing.T) { t.Run("ListFiles", func(t *testing.T) { t.Parallel() // Upload a test file - _, err = directMinioClient.PutObject(ctx, "test-bucket-fsdf", "test-execution-id-1/test-file", bytes.NewReader([]byte("test-content")), 12, minio.PutObjectOptions{}) + _, err = directMinioClient.PutObject(ctx, cfg.StorageBucket, "test-execution-id-1/test-file", bytes.NewReader([]byte("test-content")), 12, minio.PutObjectOptions{}) if err != nil { t.Fatalf("unable to upload file: %v", err) } @@ -58,7 +63,7 @@ func TestArtifactClient(t *testing.T) { t.Run("DownloadFile", func(t *testing.T) { t.Parallel() // Upload a test file - _, err = directMinioClient.PutObject(ctx, "test-bucket-fsdf", "test-execution-id-2/test-file", bytes.NewReader([]byte("test-content")), 12, minio.PutObjectOptions{}) + _, err = directMinioClient.PutObject(ctx, cfg.StorageBucket, "test-execution-id-2/test-file", bytes.NewReader([]byte("test-content")), 12, minio.PutObjectOptions{}) if err != nil { t.Fatalf("unable to upload file: %v", err) } @@ -83,7 +88,7 @@ func TestArtifactClient(t *testing.T) { } // Check if the file is uploaded - obj, err := directMinioClient.GetObject(ctx, "test-bucket-fsdf", "test-execution-id-3/test-file", minio.GetObjectOptions{}) + obj, err := directMinioClient.GetObject(ctx, cfg.StorageBucket, "test-execution-id-3/test-file", minio.GetObjectOptions{}) if err != nil { t.Fatalf("unable to get object from minio: %v", err) } @@ -103,11 +108,11 @@ func TestArtifactClient(t *testing.T) { } defer os.RemoveAll(tempDir) // Upload test files - _, err = directMinioClient.PutObject(ctx, "test-bucket-fsdf", "test-execution-id-4/test-file1", bytes.NewReader([]byte("test-content")), 12, minio.PutObjectOptions{}) + _, err = directMinioClient.PutObject(ctx, cfg.StorageBucket, "test-execution-id-4/test-file1", bytes.NewReader([]byte("test-content")), 12, minio.PutObjectOptions{}) if err != nil { t.Fatalf("unable to upload file: %v", err) } - _, err = directMinioClient.PutObject(ctx, "test-bucket-fsdf", "test-execution-id-4/test-file2", bytes.NewReader([]byte("test-content")), 12, minio.PutObjectOptions{}) + _, err = directMinioClient.PutObject(ctx, cfg.StorageBucket, "test-execution-id-4/test-file2", bytes.NewReader([]byte("test-content")), 12, minio.PutObjectOptions{}) if err != nil { t.Fatalf("unable to upload file: %v", err) } diff --git a/pkg/tcl/mapperstcl/testworkflows/kube_openapi.go b/pkg/tcl/mapperstcl/testworkflows/kube_openapi.go index f39123b484b..9b117452f2e 100644 --- a/pkg/tcl/mapperstcl/testworkflows/kube_openapi.go +++ b/pkg/tcl/mapperstcl/testworkflows/kube_openapi.go @@ -928,6 +928,111 @@ func MapIndependentStepParallelKubeToAPI(v testworkflowsv1.IndependentStepParall } } +func MapExecActionKubeToAPI(v corev1.ExecAction) testkube.ExecAction { + return testkube.ExecAction{ + Command: v.Command, + } +} + +func MapHTTPHeaderKubeToAPI(v corev1.HTTPHeader) testkube.HttpHeader { + return testkube.HttpHeader{ + Name: v.Name, + Value: v.Value, + } +} + +func MapHTTPGetActionKubeToAPI(v corev1.HTTPGetAction) testkube.HttpGetAction { + return testkube.HttpGetAction{ + Path: v.Path, + Port: MapIntOrStringToString(v.Port), + Host: v.Host, + Scheme: string(v.Scheme), + HttpHeaders: common.MapSlice(v.HTTPHeaders, MapHTTPHeaderKubeToAPI), + } +} + +func MapTCPSocketActionKubeToAPI(v corev1.TCPSocketAction) testkube.TcpSocketAction { + return testkube.TcpSocketAction{ + Port: MapIntOrStringToString(v.Port), + Host: v.Host, + } +} + +func MapGRPCActionKubeToAPI(v corev1.GRPCAction) testkube.GrpcAction { + return testkube.GrpcAction{ + Port: v.Port, + Service: MapStringToBoxedString(v.Service), + } +} + +func MapProbeKubeToAPI(v corev1.Probe) testkube.Probe { + return testkube.Probe{ + InitialDelaySeconds: v.InitialDelaySeconds, + TimeoutSeconds: v.TimeoutSeconds, + PeriodSeconds: v.PeriodSeconds, + SuccessThreshold: v.SuccessThreshold, + FailureThreshold: v.FailureThreshold, + TerminationGracePeriodSeconds: MapInt64ToBoxedInteger(v.TerminationGracePeriodSeconds), + Exec: common.MapPtr(v.Exec, MapExecActionKubeToAPI), + HttpGet: common.MapPtr(v.HTTPGet, MapHTTPGetActionKubeToAPI), + TcpSocket: common.MapPtr(v.TCPSocket, MapTCPSocketActionKubeToAPI), + Grpc: common.MapPtr(v.GRPC, MapGRPCActionKubeToAPI), + } +} + +func MapIndependentServiceSpecKubeToAPI(v testworkflowsv1.IndependentServiceSpec) testkube.TestWorkflowIndependentServiceSpec { + return testkube.TestWorkflowIndependentServiceSpec{ + Count: MapIntOrStringToBoxedString(v.Count), + MaxCount: MapIntOrStringToBoxedString(v.MaxCount), + Matrix: MapDynamicListMapKubeToAPI(v.Matrix), + Shards: MapDynamicListMapKubeToAPI(v.Shards), + Pod: common.MapPtr(v.Pod, MapPodConfigKubeToAPI), + WorkingDir: MapStringToBoxedString(v.WorkingDir), + Image: v.Image, + ImagePullPolicy: MapImagePullPolicyKubeToAPI(v.ImagePullPolicy), + Env: common.MapSlice(v.Env, MapEnvVarKubeToAPI), + EnvFrom: common.MapSlice(v.EnvFrom, MapEnvFromSourceKubeToAPI), + Command: MapStringSliceToBoxedStringList(v.Command), + Args: MapStringSliceToBoxedStringList(v.Args), + Shell: MapStringToBoxedString(v.Shell), + Resources: common.MapPtr(v.Resources, MapResourcesKubeToAPI), + SecurityContext: MapSecurityContextKubeToAPI(v.SecurityContext), + VolumeMounts: common.MapSlice(v.VolumeMounts, MapVolumeMountKubeToAPI), + Timeout: v.Timeout, + Transfer: common.MapSlice(v.Transfer, MapStepParallelTransferKubeToAPI), + Content: common.MapPtr(v.Content, MapContentKubeToAPI), + RestartPolicy: string(v.RestartPolicy), + ReadinessProbe: common.MapPtr(v.ReadinessProbe, MapProbeKubeToAPI), + } +} + +func MapServiceSpecKubeToAPI(v testworkflowsv1.ServiceSpec) testkube.TestWorkflowServiceSpec { + return testkube.TestWorkflowServiceSpec{ + Count: MapIntOrStringToBoxedString(v.Count), + MaxCount: MapIntOrStringToBoxedString(v.MaxCount), + Matrix: MapDynamicListMapKubeToAPI(v.Matrix), + Shards: MapDynamicListMapKubeToAPI(v.Shards), + Use: common.MapSlice(v.Use, MapTemplateRefKubeToAPI), + Pod: common.MapPtr(v.Pod, MapPodConfigKubeToAPI), + WorkingDir: MapStringToBoxedString(v.WorkingDir), + Image: v.Image, + ImagePullPolicy: MapImagePullPolicyKubeToAPI(v.ImagePullPolicy), + Env: common.MapSlice(v.Env, MapEnvVarKubeToAPI), + EnvFrom: common.MapSlice(v.EnvFrom, MapEnvFromSourceKubeToAPI), + Command: MapStringSliceToBoxedStringList(v.Command), + Args: MapStringSliceToBoxedStringList(v.Args), + Shell: MapStringToBoxedString(v.Shell), + Resources: common.MapPtr(v.Resources, MapResourcesKubeToAPI), + SecurityContext: MapSecurityContextKubeToAPI(v.SecurityContext), + VolumeMounts: common.MapSlice(v.VolumeMounts, MapVolumeMountKubeToAPI), + Timeout: v.Timeout, + Transfer: common.MapSlice(v.Transfer, MapStepParallelTransferKubeToAPI), + Content: common.MapPtr(v.Content, MapContentKubeToAPI), + RestartPolicy: string(v.RestartPolicy), + ReadinessProbe: common.MapPtr(v.ReadinessProbe, MapProbeKubeToAPI), + } +} + func MapStepKubeToAPI(v testworkflowsv1.Step) testkube.TestWorkflowStep { return testkube.TestWorkflowStep{ Name: v.Name, @@ -941,6 +1046,7 @@ func MapStepKubeToAPI(v testworkflowsv1.Step) testkube.TestWorkflowStep { Timeout: v.Timeout, Delay: v.Delay, Content: common.MapPtr(v.Content, MapContentKubeToAPI), + Services: common.MapMap(v.Services, MapServiceSpecKubeToAPI), Shell: v.Shell, Run: common.MapPtr(v.Run, MapStepRunKubeToAPI), WorkingDir: MapStringToBoxedString(v.WorkingDir), @@ -964,6 +1070,7 @@ func MapIndependentStepKubeToAPI(v testworkflowsv1.IndependentStep) testkube.Tes Timeout: v.Timeout, Delay: v.Delay, Content: common.MapPtr(v.Content, MapContentKubeToAPI), + Services: common.MapMap(v.Services, MapIndependentServiceSpecKubeToAPI), Shell: v.Shell, Run: common.MapPtr(v.Run, MapStepRunKubeToAPI), WorkingDir: MapStringToBoxedString(v.WorkingDir), @@ -981,6 +1088,7 @@ func MapSpecKubeToAPI(v testworkflowsv1.TestWorkflowSpec) testkube.TestWorkflowS Use: common.MapSlice(v.Use, MapTemplateRefKubeToAPI), Config: common.MapMap(v.Config, MapParameterSchemaKubeToAPI), Content: common.MapPtr(v.Content, MapContentKubeToAPI), + Services: common.MapMap(v.Services, MapServiceSpecKubeToAPI), Container: common.MapPtr(v.Container, MapContainerConfigKubeToAPI), Job: common.MapPtr(v.Job, MapJobConfigKubeToAPI), Pod: common.MapPtr(v.Pod, MapPodConfigKubeToAPI), diff --git a/pkg/tcl/mapperstcl/testworkflows/openapi_kube.go b/pkg/tcl/mapperstcl/testworkflows/openapi_kube.go index 5f87d1c1747..90389ced6be 100644 --- a/pkg/tcl/mapperstcl/testworkflows/openapi_kube.go +++ b/pkg/tcl/mapperstcl/testworkflows/openapi_kube.go @@ -984,6 +984,127 @@ func MapIndependentStepParallelAPIToKube(v testkube.TestWorkflowIndependentStepP } } +func MapExecActionAPIToKube(v testkube.ExecAction) corev1.ExecAction { + return corev1.ExecAction{ + Command: v.Command, + } +} + +func MapHTTPHeaderAPIToKube(v testkube.HttpHeader) corev1.HTTPHeader { + return corev1.HTTPHeader{ + Name: v.Name, + Value: v.Value, + } +} + +func MapHTTPGetActionAPIToKube(v testkube.HttpGetAction) corev1.HTTPGetAction { + return corev1.HTTPGetAction{ + Path: v.Path, + Port: MapStringToIntOrString(v.Port), + Host: v.Host, + Scheme: corev1.URIScheme(v.Scheme), + HTTPHeaders: common.MapSlice(v.HttpHeaders, MapHTTPHeaderAPIToKube), + } +} + +func MapTCPSocketActionAPIToKube(v testkube.TcpSocketAction) corev1.TCPSocketAction { + return corev1.TCPSocketAction{ + Port: MapStringToIntOrString(v.Port), + Host: v.Host, + } +} + +func MapGRPCActionAPIToKube(v testkube.GrpcAction) corev1.GRPCAction { + return corev1.GRPCAction{ + Port: v.Port, + Service: MapBoxedStringToString(v.Service), + } +} + +func MapProbeAPIToKube(v testkube.Probe) corev1.Probe { + return corev1.Probe{ + InitialDelaySeconds: v.InitialDelaySeconds, + TimeoutSeconds: v.TimeoutSeconds, + PeriodSeconds: v.PeriodSeconds, + SuccessThreshold: v.SuccessThreshold, + FailureThreshold: v.FailureThreshold, + TerminationGracePeriodSeconds: MapBoxedIntegerToInt64(v.TerminationGracePeriodSeconds), + ProbeHandler: corev1.ProbeHandler{ + Exec: common.MapPtr(v.Exec, MapExecActionAPIToKube), + HTTPGet: common.MapPtr(v.HttpGet, MapHTTPGetActionAPIToKube), + TCPSocket: common.MapPtr(v.TcpSocket, MapTCPSocketActionAPIToKube), + GRPC: common.MapPtr(v.Grpc, MapGRPCActionAPIToKube), + }, + } +} + +func MapIndependentServiceSpecAPIToKube(v testkube.TestWorkflowIndependentServiceSpec) testworkflowsv1.IndependentServiceSpec { + return testworkflowsv1.IndependentServiceSpec{ + StepExecuteStrategy: testworkflowsv1.StepExecuteStrategy{ + Count: MapBoxedStringToIntOrString(v.Count), + MaxCount: MapBoxedStringToIntOrString(v.MaxCount), + Matrix: MapDynamicListMapAPIToKube(v.Matrix), + Shards: MapDynamicListMapAPIToKube(v.Shards), + }, + Pod: common.MapPtr(v.Pod, MapPodConfigAPIToKube), + StepRun: testworkflowsv1.StepRun{ + ContainerConfig: testworkflowsv1.ContainerConfig{ + WorkingDir: MapBoxedStringToString(v.WorkingDir), + Image: v.Image, + ImagePullPolicy: MapImagePullPolicyAPIToKube(v.ImagePullPolicy), + Env: common.MapSlice(v.Env, MapEnvVarAPIToKube), + EnvFrom: common.MapSlice(v.EnvFrom, MapEnvFromSourceAPIToKube), + Command: MapBoxedStringListToStringSlice(v.Command), + Args: MapBoxedStringListToStringSlice(v.Args), + Resources: common.MapPtr(v.Resources, MapResourcesAPIToKube), + SecurityContext: MapSecurityContextAPIToKube(v.SecurityContext), + VolumeMounts: common.MapSlice(v.VolumeMounts, MapVolumeMountAPIToKube), + }, + Shell: MapBoxedStringToString(v.Shell), + }, + Timeout: v.Timeout, + Transfer: common.MapSlice(v.Transfer, MapStepParallelTransferAPIToKube), + Content: common.MapPtr(v.Content, MapContentAPIToKube), + RestartPolicy: testworkflowsv1.ServiceRestartPolicy(v.RestartPolicy), + ReadinessProbe: common.MapPtr(v.ReadinessProbe, MapProbeAPIToKube), + } +} + +func MapServiceSpecAPIToKube(v testkube.TestWorkflowServiceSpec) testworkflowsv1.ServiceSpec { + return testworkflowsv1.ServiceSpec{ + Use: common.MapSlice(v.Use, MapTemplateRefAPIToKube), + IndependentServiceSpec: testworkflowsv1.IndependentServiceSpec{ + StepExecuteStrategy: testworkflowsv1.StepExecuteStrategy{ + Count: MapBoxedStringToIntOrString(v.Count), + MaxCount: MapBoxedStringToIntOrString(v.MaxCount), + Matrix: MapDynamicListMapAPIToKube(v.Matrix), + Shards: MapDynamicListMapAPIToKube(v.Shards), + }, + Pod: common.MapPtr(v.Pod, MapPodConfigAPIToKube), + StepRun: testworkflowsv1.StepRun{ + ContainerConfig: testworkflowsv1.ContainerConfig{ + WorkingDir: MapBoxedStringToString(v.WorkingDir), + Image: v.Image, + ImagePullPolicy: MapImagePullPolicyAPIToKube(v.ImagePullPolicy), + Env: common.MapSlice(v.Env, MapEnvVarAPIToKube), + EnvFrom: common.MapSlice(v.EnvFrom, MapEnvFromSourceAPIToKube), + Command: MapBoxedStringListToStringSlice(v.Command), + Args: MapBoxedStringListToStringSlice(v.Args), + Resources: common.MapPtr(v.Resources, MapResourcesAPIToKube), + SecurityContext: MapSecurityContextAPIToKube(v.SecurityContext), + VolumeMounts: common.MapSlice(v.VolumeMounts, MapVolumeMountAPIToKube), + }, + Shell: MapBoxedStringToString(v.Shell), + }, + Timeout: v.Timeout, + Transfer: common.MapSlice(v.Transfer, MapStepParallelTransferAPIToKube), + Content: common.MapPtr(v.Content, MapContentAPIToKube), + RestartPolicy: testworkflowsv1.ServiceRestartPolicy(v.RestartPolicy), + ReadinessProbe: common.MapPtr(v.ReadinessProbe, MapProbeAPIToKube), + }, + } +} + func MapStepAPIToKube(v testkube.TestWorkflowStep) testworkflowsv1.Step { return testworkflowsv1.Step{ StepMeta: testworkflowsv1.StepMeta{ @@ -997,6 +1118,7 @@ func MapStepAPIToKube(v testkube.TestWorkflowStep) testworkflowsv1.Step { Retry: common.MapPtr(v.Retry, MapRetryPolicyAPIToKube), Timeout: v.Timeout, }, + Services: common.MapMap(v.Services, MapServiceSpecAPIToKube), StepSource: testworkflowsv1.StepSource{ Content: common.MapPtr(v.Content, MapContentAPIToKube), }, @@ -1032,6 +1154,7 @@ func MapIndependentStepAPIToKube(v testkube.TestWorkflowIndependentStep) testwor Retry: common.MapPtr(v.Retry, MapRetryPolicyAPIToKube), Timeout: v.Timeout, }, + Services: common.MapMap(v.Services, MapIndependentServiceSpecAPIToKube), StepSource: testworkflowsv1.StepSource{ Content: common.MapPtr(v.Content, MapContentAPIToKube), }, @@ -1062,10 +1185,11 @@ func MapSpecAPIToKube(v testkube.TestWorkflowSpec) testworkflowsv1.TestWorkflowS Pod: common.MapPtr(v.Pod, MapPodConfigAPIToKube), Events: common.MapSlice(v.Events, MapEventAPIToKube), }, - Use: common.MapSlice(v.Use, MapTemplateRefAPIToKube), - Setup: common.MapSlice(v.Setup, MapStepAPIToKube), - Steps: common.MapSlice(v.Steps, MapStepAPIToKube), - After: common.MapSlice(v.After, MapStepAPIToKube), + Services: common.MapMap(v.Services, MapServiceSpecAPIToKube), + Use: common.MapSlice(v.Use, MapTemplateRefAPIToKube), + Setup: common.MapSlice(v.Setup, MapStepAPIToKube), + Steps: common.MapSlice(v.Steps, MapStepAPIToKube), + After: common.MapSlice(v.After, MapStepAPIToKube), } } @@ -1079,9 +1203,10 @@ func MapTemplateSpecAPIToKube(v testkube.TestWorkflowTemplateSpec) testworkflows Pod: common.MapPtr(v.Pod, MapPodConfigAPIToKube), Events: common.MapSlice(v.Events, MapEventAPIToKube), }, - Setup: common.MapSlice(v.Setup, MapIndependentStepAPIToKube), - Steps: common.MapSlice(v.Steps, MapIndependentStepAPIToKube), - After: common.MapSlice(v.After, MapIndependentStepAPIToKube), + Services: common.MapMap(v.Services, MapIndependentServiceSpecAPIToKube), + Setup: common.MapSlice(v.Setup, MapIndependentStepAPIToKube), + Steps: common.MapSlice(v.Steps, MapIndependentStepAPIToKube), + After: common.MapSlice(v.After, MapIndependentStepAPIToKube), } } diff --git a/pkg/tcl/repositorytcl/testworkflow/mongo_integration_test.go b/pkg/tcl/repositorytcl/testworkflow/mongo_integration_test.go index 7606d6ea5c3..f233041c1ac 100644 --- a/pkg/tcl/repositorytcl/testworkflow/mongo_integration_test.go +++ b/pkg/tcl/repositorytcl/testworkflow/mongo_integration_test.go @@ -4,6 +4,7 @@ import ( "context" "testing" + "github.com/kubeshop/testkube/internal/config" "github.com/kubeshop/testkube/pkg/utils/test" "github.com/stretchr/testify/assert" @@ -13,12 +14,16 @@ import ( "github.com/kubeshop/testkube/pkg/api/v1/testkube" ) +var ( + cfg, _ = config.Get() +) + func TestNewMongoRepository_UpdateReport_Integration(t *testing.T) { test.IntegrationTest(t) ctx := context.Background() - client, err := mongo.Connect(ctx, options.Client().ApplyURI("mongodb://localhost:27017")) + client, err := mongo.Connect(ctx, options.Client().ApplyURI(cfg.APIMongoDSN)) if err != nil { t.Fatalf("error connecting to mongo: %v", err) } diff --git a/pkg/tcl/testworkflowstcl/testworkflowcontroller/cleanup.go b/pkg/tcl/testworkflowstcl/testworkflowcontroller/cleanup.go index 3c974dbc3a8..33d12ae9446 100644 --- a/pkg/tcl/testworkflowstcl/testworkflowcontroller/cleanup.go +++ b/pkg/tcl/testworkflowstcl/testworkflowcontroller/cleanup.go @@ -94,3 +94,29 @@ func Cleanup(ctx context.Context, clientSet kubernetes.Interface, namespace, id wg.Wait() return errors.Join(errs...) } + +func CleanupGroup(ctx context.Context, clientSet kubernetes.Interface, namespace, id string) error { + var errs []error + var errsMu sync.Mutex + var wg sync.WaitGroup + ops := []func(context.Context, kubernetes.Interface, string, string) error{ + cleanupJobs(constants.GroupIdLabelName), + cleanupPods(constants.GroupIdLabelName), + cleanupConfigMaps(constants.GroupIdLabelName), + cleanupSecrets(constants.GroupIdLabelName), + } + wg.Add(len(ops)) + for _, op := range ops { + go func(op func(context.Context, kubernetes.Interface, string, string) error) { + err := op(ctx, clientSet, namespace, id) + if err != nil { + errsMu.Lock() + errs = append(errs, err) + errsMu.Unlock() + } + wg.Done() + }(op) + } + wg.Wait() + return errors.Join(errs...) +} diff --git a/pkg/tcl/testworkflowstcl/testworkflowprocessor/constants/constants.go b/pkg/tcl/testworkflowstcl/testworkflowprocessor/constants/constants.go index f1a2922034a..5a2a8e61e5e 100644 --- a/pkg/tcl/testworkflowstcl/testworkflowprocessor/constants/constants.go +++ b/pkg/tcl/testworkflowstcl/testworkflowprocessor/constants/constants.go @@ -27,6 +27,7 @@ const ( DefaultFsGroup = int64(1001) ResourceIdLabelName = "testworkflowid" RootResourceIdLabelName = "testworkflowid-root" + GroupIdLabelName = "testworkflowid-group" SignatureAnnotationName = "testworkflows.testkube.io/signature" ) diff --git a/pkg/tcl/testworkflowstcl/testworkflowprocessor/container.go b/pkg/tcl/testworkflowstcl/testworkflowprocessor/container.go index 8c4f99b1bb3..31cdc902bdd 100644 --- a/pkg/tcl/testworkflowstcl/testworkflowprocessor/container.go +++ b/pkg/tcl/testworkflowstcl/testworkflowprocessor/container.go @@ -209,13 +209,13 @@ func (c *container) Resources() (r testworkflowsv1.Resources) { } func (c *container) SecurityContext() *corev1.SecurityContext { - if c.Cr.SecurityContext != nil { + if c.parent == nil || c.parent.SecurityContext() == nil { return c.Cr.SecurityContext } - if c.parent == nil { - return nil + if c.Cr.SecurityContext == nil { + return c.parent.SecurityContext() } - return c.parent.SecurityContext() + return testworkflowresolver.MergeSecurityContext(c.parent.SecurityContext().DeepCopy(), c.Cr.SecurityContext) } func (c *container) HasVolumeAt(path string) bool { diff --git a/pkg/tcl/testworkflowstcl/testworkflowprocessor/operations.go b/pkg/tcl/testworkflowstcl/testworkflowprocessor/operations.go index b38d68bb171..9bf60b0db83 100644 --- a/pkg/tcl/testworkflowstcl/testworkflowprocessor/operations.go +++ b/pkg/tcl/testworkflowstcl/testworkflowprocessor/operations.go @@ -357,6 +357,66 @@ func ProcessParallel(_ InternalProcessor, layer Intermediate, container Containe return stage, nil } +func ProcessServicesStart(_ InternalProcessor, layer Intermediate, container Container, step testworkflowsv1.Step) (Stage, error) { + if len(step.Services) == 0 { + return nil, nil + } + + // TODO: Think of better way to pass the data between steps + podsRef := layer.NextRef() + container.AppendEnv(corev1.EnvVar{Name: "TK_SVC_REF", Value: podsRef}) + + stage := NewContainerStage(layer.NextRef(), container.CreateChild()) + stage.SetCategory("Start services") + + stage.Container(). + SetImage(constants.DefaultToolkitImage). + SetImagePullPolicy(corev1.PullIfNotPresent). + SetCommand("/toolkit", "services", "-g", "{{env.TK_SVC_REF}}"). + EnableToolkit(stage.Ref()). + AppendVolumeMounts(layer.AddEmptyDirVolume(nil, constants.DefaultTransferDirPath)) + + args := make([]string, 0, len(step.Services)) + for name := range step.Services { + v, err := json.Marshal(step.Services[name]) + if err != nil { + return nil, errors.Wrapf(err, "services[%s]: marshalling error", name) + } + args = append(args, fmt.Sprintf("%s=%s", name, expressionstcl.NewStringValue(string(v)).Template())) + } + stage.Container().SetArgs(args...) + + return stage, nil +} + +func ProcessServicesStop(_ InternalProcessor, layer Intermediate, container Container, step testworkflowsv1.Step) (Stage, error) { + if len(step.Services) == 0 { + return nil, nil + } + + stage := NewContainerStage(layer.NextRef(), container.CreateChild()) + stage.SetCondition("always") // TODO: actually, it's enough to do it when "Start services" is not skipped + stage.SetOptional(true) + stage.SetCategory("Stop services") + + stage.Container(). + SetImage(constants.DefaultToolkitImage). + SetImagePullPolicy(corev1.PullIfNotPresent). + SetCommand("/toolkit", "kill", "{{env.TK_SVC_REF}}"). + EnableToolkit(stage.Ref()). + AppendVolumeMounts(layer.AddEmptyDirVolume(nil, constants.DefaultTransferDirPath)) + + args := make([]string, 0) + for name, v := range step.Services { + if v.Logs { + args = append(args, "-l", name) + } + } + stage.Container().SetArgs(args...) + + return stage, nil +} + func ProcessArtifacts(_ InternalProcessor, layer Intermediate, container Container, step testworkflowsv1.Step) (Stage, error) { if step.Artifacts == nil { return nil, nil diff --git a/pkg/tcl/testworkflowstcl/testworkflowprocessor/processor.go b/pkg/tcl/testworkflowstcl/testworkflowprocessor/processor.go index 2e0d0497692..7307197cb90 100644 --- a/pkg/tcl/testworkflowstcl/testworkflowprocessor/processor.go +++ b/pkg/tcl/testworkflowstcl/testworkflowprocessor/processor.go @@ -55,12 +55,14 @@ func NewFullFeatured(inspector imageinspector.Inspector) Processor { Register(ProcessContentFiles). Register(ProcessContentGit). Register(ProcessContentTarball). + Register(ProcessServicesStart). Register(ProcessNestedSetupSteps). Register(ProcessRunCommand). Register(ProcessShellCommand). Register(ProcessExecute). Register(ProcessParallel). Register(ProcessNestedSteps). + Register(ProcessServicesStop). Register(ProcessArtifacts) } @@ -126,6 +128,7 @@ func (p *processor) Bundle(ctx context.Context, workflow *testworkflowsv1.TestWo StepSource: testworkflowsv1.StepSource{ Content: workflow.Spec.Content, }, + Services: workflow.Spec.Services, StepDefaults: testworkflowsv1.StepDefaults{ Container: workflow.Spec.Container, }, @@ -217,8 +220,44 @@ func (p *processor) Bundle(ctx context.Context, workflow *testworkflowsv1.TestWo return nil, errors.Wrap(err, "applying image data") } + // Adjust the security context in case it's a single container besides the Testkube' containers + // TODO: Consider flag argument, that would be used only for services? + containerStages := root.ContainerStages() + var otherContainers []ContainerStage + for _, c := range containerStages { + if c.Container().Image() != constants.DefaultInitImage && c.Container().Image() != constants.DefaultToolkitImage { + otherContainers = append(otherContainers, c) + } + } + if len(otherContainers) == 1 { + image := otherContainers[0].Container().Image() + if _, ok := images[image]; ok { + sc := otherContainers[0].Container().SecurityContext() + if sc == nil { + sc = &corev1.SecurityContext{} + } + if podConfig.SecurityContext == nil { + podConfig.SecurityContext = &corev1.PodSecurityContext{} + } + if sc.RunAsGroup == nil && podConfig.SecurityContext.RunAsGroup == nil { + sc.RunAsGroup = common.Ptr(images[image].Group) + otherContainers[0].Container().SetSecurityContext(sc) + } + if podConfig.SecurityContext.FSGroup == nil { + podConfig.SecurityContext.FSGroup = sc.RunAsGroup + } + } + } + containerStages = nil + + // Determine FS Group for the containers + fsGroup := common.Ptr(constants.DefaultFsGroup) + if podConfig.SecurityContext != nil && podConfig.SecurityContext.FSGroup != nil { + fsGroup = podConfig.SecurityContext.FSGroup + } + // Build list of the containers - containers, err := buildKubernetesContainers(root, NewInitProcess().SetRef(root.Ref()), machines...) + containers, err := buildKubernetesContainers(root, NewInitProcess().SetRef(root.Ref()), fsGroup, machines...) if err != nil { return nil, errors.Wrap(err, "building Kubernetes containers") } @@ -297,7 +336,7 @@ func (p *processor) Bundle(ctx context.Context, workflow *testworkflowsv1.TestWo Args: []string{constants.InitScript}, VolumeMounts: layer.ContainerDefaults().VolumeMounts(), SecurityContext: &corev1.SecurityContext{ - RunAsGroup: common.Ptr(constants.DefaultFsGroup), + RunAsGroup: fsGroup, }, } err = expressionstcl.FinalizeForce(&initContainer, machines...) diff --git a/pkg/tcl/testworkflowstcl/testworkflowprocessor/utils.go b/pkg/tcl/testworkflowstcl/testworkflowprocessor/utils.go index f6d5e1f5856..6f8a4348c13 100644 --- a/pkg/tcl/testworkflowstcl/testworkflowprocessor/utils.go +++ b/pkg/tcl/testworkflowstcl/testworkflowprocessor/utils.go @@ -36,6 +36,20 @@ func AnnotateControlledBy(obj metav1.Object, rootId, id string) { } } +func AnnotateGroupId(obj metav1.Object, id string) { + labels := obj.GetLabels() + if labels == nil { + labels = map[string]string{} + } + labels[constants.GroupIdLabelName] = id + obj.SetLabels(labels) + + // Annotate Pod template in the Job + if v, ok := obj.(*batchv1.Job); ok { + AnnotateGroupId(&v.Spec.Template, id) + } +} + func getRef(stage Stage) string { return stage.Ref() } @@ -44,7 +58,7 @@ func isNotOptional(stage Stage) bool { return !stage.Optional() } -func buildKubernetesContainers(stage Stage, init *initProcess, machines ...expressionstcl.Machine) (containers []corev1.Container, err error) { +func buildKubernetesContainers(stage Stage, init *initProcess, fsGroup *int64, machines ...expressionstcl.Machine) (containers []corev1.Container, err error) { if stage.Paused() { init.SetPaused(stage.Paused()) } @@ -89,7 +103,7 @@ func buildKubernetesContainers(stage Stage, init *initProcess, machines ...expre init.ResetCondition().SetPaused(false) } // Pass down to another group or container - sub, serr := buildKubernetesContainers(ch, init.Children(ch.Ref()), machines...) + sub, serr := buildKubernetesContainers(ch, init.Children(ch.Ref()), fsGroup, machines...) if serr != nil { return nil, fmt.Errorf("%s: %s: resolving children: %s", stage.Ref(), stage.Name(), serr.Error()) } @@ -142,7 +156,7 @@ func buildKubernetesContainers(stage Stage, init *initProcess, machines ...expre cr.SecurityContext = &corev1.SecurityContext{} } if cr.SecurityContext.RunAsGroup == nil { - cr.SecurityContext.RunAsGroup = common.Ptr(constants.DefaultFsGroup) + cr.SecurityContext.RunAsGroup = fsGroup } containers = []corev1.Container{cr} diff --git a/pkg/tcl/testworkflowstcl/testworkflowresolver/analyze.go b/pkg/tcl/testworkflowstcl/testworkflowresolver/analyze.go index 39c8076063e..d79165d8a78 100644 --- a/pkg/tcl/testworkflowstcl/testworkflowresolver/analyze.go +++ b/pkg/tcl/testworkflowstcl/testworkflowresolver/analyze.go @@ -31,6 +31,11 @@ func listStepTemplates(cr testworkflowsv1.Step) map[string]struct{} { for i := range cr.Use { v[GetInternalTemplateName(cr.Use[i].Name)] = struct{}{} } + for name := range cr.Services { + for _, tpl := range cr.Services[name].Use { + v[GetInternalTemplateName(tpl.Name)] = struct{}{} + } + } if cr.Parallel != nil { maps.Copy(v, listSpecTemplates(cr.Parallel.TestWorkflowSpec)) if cr.Parallel.Template != nil { @@ -51,6 +56,11 @@ func listSpecTemplates(spec testworkflowsv1.TestWorkflowSpec) map[string]struct{ for i := range spec.Use { v[GetInternalTemplateName(spec.Use[i].Name)] = struct{}{} } + for name := range spec.Services { + for _, tpl := range spec.Services[name].Use { + v[GetInternalTemplateName(tpl.Name)] = struct{}{} + } + } for i := range spec.Setup { maps.Copy(v, listStepTemplates(spec.Setup[i])) } diff --git a/pkg/tcl/testworkflowstcl/testworkflowresolver/apply.go b/pkg/tcl/testworkflowstcl/testworkflowresolver/apply.go index c61987f4d7c..a58ab5eae6c 100644 --- a/pkg/tcl/testworkflowstcl/testworkflowresolver/apply.go +++ b/pkg/tcl/testworkflowstcl/testworkflowresolver/apply.go @@ -61,6 +61,7 @@ func injectTemplateToSpec(spec *testworkflowsv1.TestWorkflowSpec, template testw // Apply basic configuration spec.Content = MergeContent(template.Spec.Content, spec.Content) + spec.Services = MergeMap(common.MapMap(template.Spec.Services, ConvertIndependentServiceToService), spec.Services) spec.Container = MergeContainerConfig(template.Spec.Container, spec.Container) // Include the steps from the template @@ -87,6 +88,7 @@ func InjectStepTemplate(step *testworkflowsv1.Step, template testworkflowsv1.Tes // Apply basic configuration step.Content = MergeContent(template.Spec.Content, step.Content) + step.Services = MergeMap(common.MapMap(template.Spec.Services, ConvertIndependentServiceToService), step.Services) step.Container = MergeContainerConfig(template.Spec.Container, step.Container) // Fast-track when the template doesn't contain any steps to run @@ -105,6 +107,16 @@ func InjectStepTemplate(step *testworkflowsv1.Step, template testworkflowsv1.Tes return nil } +func InjectServiceTemplate(svc *testworkflowsv1.ServiceSpec, template testworkflowsv1.TestWorkflowTemplate) error { + if svc == nil { + return nil + } + svc.Pod = MergePodConfig(template.Spec.Pod, svc.Pod) + svc.Content = MergeContent(template.Spec.Content, svc.Content) + svc.ContainerConfig = *MergeContainerConfig(template.Spec.Container, &svc.ContainerConfig) + return nil +} + func applyTemplatesToStep(step testworkflowsv1.Step, templates map[string]testworkflowsv1.TestWorkflowTemplate) (testworkflowsv1.Step, error) { // Apply regular templates for i, ref := range step.Use { @@ -142,6 +154,28 @@ func applyTemplatesToStep(step testworkflowsv1.Step, templates map[string]testwo step.Template = nil } + // Apply templates to the services + for name, svc := range step.Services { + for i, ref := range svc.Use { + tpl, err := getConfiguredTemplate(ref.Name, ref.Config, templates) + if err != nil { + return step, errors.Wrap(err, fmt.Sprintf("services[%s].use[%d]: resolving template", name, i)) + } + if len(tpl.Spec.Setup) > 0 || len(tpl.Spec.Steps) > 0 || len(tpl.Spec.After) > 0 { + return step, fmt.Errorf("services[%s].use[%d]: steps in template used for the service are not supported", name, i) + } + if len(tpl.Spec.Services) > 0 { + return step, fmt.Errorf("services[%s].use[%d]: additional services in template used for the service are not supported", name, i) + } + err = InjectServiceTemplate(&svc, tpl) + if err != nil { + return step, errors.Wrap(err, fmt.Sprintf("services[%s].use[%d]: injecting template", name, i)) + } + } + svc.Use = nil + step.Services[name] = svc + } + // Apply templates in the parallel steps if step.Parallel != nil { // Move the template operation alias along with other operations, @@ -229,6 +263,28 @@ func applyTemplatesToSpec(spec *testworkflowsv1.TestWorkflowSpec, templates map[ } spec.Use = nil + // Apply templates to the services + for name, svc := range spec.Services { + for i, ref := range svc.Use { + tpl, err := getConfiguredTemplate(ref.Name, ref.Config, templates) + if err != nil { + return errors.Wrap(err, fmt.Sprintf("services[%s].use[%d]: resolving template", name, i)) + } + if len(tpl.Spec.Setup) > 0 || len(tpl.Spec.Steps) > 0 || len(tpl.Spec.After) > 0 { + return fmt.Errorf("services[%s].use[%d]: steps in template used for the service are not supported", name, i) + } + if len(tpl.Spec.Services) > 0 { + return fmt.Errorf("services[%s].use[%d]: additional services in template used for the service are not supported", name, i) + } + err = InjectServiceTemplate(&svc, tpl) + if err != nil { + return errors.Wrap(err, fmt.Sprintf("services[%s].use[%d]: injecting template", name, i)) + } + } + svc.Use = nil + spec.Services[name] = svc + } + // Apply templates on the step level for i := range spec.Setup { spec.Setup[i], err = applyTemplatesToStep(spec.Setup[i], templates) diff --git a/pkg/tcl/testworkflowstcl/testworkflowresolver/merge.go b/pkg/tcl/testworkflowstcl/testworkflowresolver/merge.go index 3901d4279ae..a13dc33828b 100644 --- a/pkg/tcl/testworkflowstcl/testworkflowresolver/merge.go +++ b/pkg/tcl/testworkflowstcl/testworkflowresolver/merge.go @@ -341,6 +341,10 @@ func MergeMap[T comparable, U any](dst, include map[T]U) map[T]U { return dst } +func ConvertIndependentServiceToService(svc testworkflowsv1.IndependentServiceSpec) testworkflowsv1.ServiceSpec { + return testworkflowsv1.ServiceSpec{IndependentServiceSpec: svc} +} + func ConvertIndependentStepParallelToStepParallel(step testworkflowsv1.IndependentStepParallel) testworkflowsv1.StepParallel { return testworkflowsv1.StepParallel{ Parallelism: step.Parallelism, @@ -363,6 +367,7 @@ func ConvertIndependentStepParallelToStepParallel(step testworkflowsv1.Independe func ConvertIndependentStepToStep(step testworkflowsv1.IndependentStep) (res testworkflowsv1.Step) { res.StepMeta = step.StepMeta res.StepControl = step.StepControl + res.Services = common.MapMap(step.Services, ConvertIndependentServiceToService) res.StepSource = step.StepSource res.StepDefaults = step.StepDefaults res.StepOperations = step.StepOperations diff --git a/test/integration/crd-workflow/testkube-integration-tests.yaml b/test/integration/crd-workflow/testkube-integration-tests.yaml new file mode 100644 index 00000000000..855a3ee6199 --- /dev/null +++ b/test/integration/crd-workflow/testkube-integration-tests.yaml @@ -0,0 +1,77 @@ +apiVersion: testworkflows.testkube.io/v1 +kind: TestWorkflow +metadata: + name: testkube-integration + labels: + core-tests: workflows +spec: +# config: +# revision: +# type: string +# default: develop + content: + git: + uri: https://github.com/kubeshop/testkube.git + revision: 'dawid/feat/accompanying-services' + services: + db: + image: mongo + env: + - name: MONGO_INITDB_ROOT_USERNAME + value: root + - name: MONGO_INITDB_ROOT_PASSWORD + value: p4ssw0rd + readinessProbe: + tcpSocket: + port: 27017 + periodSeconds: 1 + nats: + image: bitnami/nats + readinessProbe: + tcpSocket: + port: 4222 + periodSeconds: 1 + minio: + image: bitnami/minio + env: + - name: MINIO_ROOT_USER + value: minio99 + - name: MINIO_ROOT_PASSWORD + value: minio123 + readinessProbe: + tcpSocket: + port: 9000 + periodSeconds: 1 + pod: + volumes: + - name: gocache + hostPath: + path: /go-cache/{{ workflow.name }} + steps: + - name: Run integration tests + run: + image: golang:1.22.1-bookworm + workingDir: /data/repo + volumeMounts: + - name: gocache + mountPath: /go-cache + env: + - name: GOCACHE + value: /go-cache + - name: API_MONGO_DSN + value: mongodb://root:p4ssw0rd@{{services.db.0.ip}}:27017 + - name: NATS_URI + value: nats://{{services.nats.0.ip}}:4222 + - name: STORAGE_ENDPOINT + value: '{{services.minio.0.ip}}:9000' + - name: STORAGE_ACCESSKEYID + value: minio99 + - name: STORAGE_SECRETACCESSKEY + value: minio123 + shell: | + apt-get update + apt-get install -y ca-certificates libssl3 git skopeo + go install gotest.tools/gotestsum@v1.9.0 + + # TODO: Support job executor tests too (./...) + INTEGRATION=y gotestsum --format short-verbose -- -count 1 -run _Integration -cover ./pkg/... diff --git a/test/jmeter/executor-tests/crd-workflow/smoke.yaml b/test/jmeter/executor-tests/crd-workflow/smoke.yaml index 8e9bee29a06..efcfd68d557 100644 --- a/test/jmeter/executor-tests/crd-workflow/smoke.yaml +++ b/test/jmeter/executor-tests/crd-workflow/smoke.yaml @@ -64,3 +64,106 @@ spec: events: - cronjob: cron: "11 */4 * * *" +--- +apiVersion: testworkflows.testkube.io/v1 +kind: TestWorkflow +metadata: + name: distributed-jmeter-workflow-smoke + labels: + core-tests: workflows +spec: + content: + git: + uri: https://github.com/kubeshop/testkube + revision: main + paths: + - test/jmeter/executor-tests/jmeter-executor-smoke.jmx + container: + workingDir: /data/repo/test/jmeter/executor-tests + services: + slave: + use: + - name: distribute/evenly + count: 5 + timeout: 30s # initialization timeout + logs: true + image: justb4/jmeter:5.5 + command: + - jmeter-server + - -Dserver.rmi.localport=60000 + - -Dserver_port=1099 + - -Jserver.rmi.ssl.disable=true + resources: + requests: + cpu: 128m + memory: 128Mi + readinessProbe: + tcpSocket: + port: 1099 + periodSeconds: 1 + steps: + - name: Run tests + run: + image: justb4/jmeter:5.5 + command: + - jmeter + args: + - -n + - -X + - -Jserver.rmi.ssl.disable=true + - -Jclient.rmi.localport=7000 + - -R + - '{{services.slave.*.ip}}' + - -t + - jmeter-executor-smoke.jmx +--- +apiVersion: testworkflows.testkube.io/v1 +kind: TestWorkflow +metadata: + name: distributed-jmeter-workflow-smoke-shell-artifacts + labels: + core-tests: workflows +spec: + content: + git: + uri: https://github.com/kubeshop/testkube + revision: main + paths: + - test/jmeter/executor-tests/jmeter-executor-smoke.jmx + container: + workingDir: /data/repo/test/jmeter/executor-tests + + pod: + serviceAccountName: testkube-api-server + services: + slave: + use: + - name: distribute/evenly + count: 5 + timeout: 30s # initialization timeout + logs: true + image: justb4/jmeter:5.5 + command: + - jmeter-server + - -Dserver.rmi.localport=60000 + - -Dserver_port=1099 + - -Jserver.rmi.ssl.disable=true + resources: + requests: + cpu: 128m + memory: 128Mi + readinessProbe: + tcpSocket: + port: 1099 + periodSeconds: 1 + steps: + - name: Run tests + run: + image: justb4/jmeter:5.5 + shell: jmeter -n -X -Jserver.rmi.ssl.disable=true -Jclient.rmi.localport=7000 -R {{services.slave.*.ip}} -t jmeter-executor-smoke.jmx -j /data/artifacts/jmeter.log -o /data/artifacts/report -l /data/artifacts/jtl-report.jtl -e + steps: + - name: Save artifacts + workingDir: /data/artifacts + artifacts: + paths: + - '**/*'