Skip to content

Commit

Permalink
feat: [TKC-2882] stream service and parallel step logs (#6052)
Browse files Browse the repository at this point in the history
* feat: api methods for service logs

Signed-off-by: Vladislav Sukhin <[email protected]>

* fix: client for get service logs

Signed-off-by: Vladislav Sukhin <[email protected]>

* fix: change log

Signed-off-by: Vladislav Sukhin <[email protected]>

* fix: disable hints

Signed-off-by: Vladislav Sukhin <[email protected]>

* fix: routing

Signed-off-by: Vladislav Sukhin <[email protected]>

* fix: show service logs

Signed-off-by: Vladislav Sukhin <[email protected]>

* fix: check service name

Signed-off-by: Vladislav Sukhin <[email protected]>

* fix: check service name

Signed-off-by: Vladislav Sukhin <[email protected]>

* fix: log comment

Signed-off-by: Vladislav Sukhin <[email protected]>

* fix: check for testworkflow service

Signed-off-by: Vladislav Sukhin <[email protected]>

* fix: friendly error

Signed-off-by: Vladislav Sukhin <[email protected]>

* fix: add spinner

Signed-off-by: Vladislav Sukhin <[email protected]>

* feat: proto for service notifications

Signed-off-by: Vladislav Sukhin <[email protected]>

* feat: add cloud grpc method for server notifications

Signed-off-by: Vladislav Sukhin <[email protected]>

* fix: change timeeout

Signed-off-by: Vladislav Sukhin <[email protected]>

* fix: waiting for service pod

Signed-off-by: Vladislav Sukhin <[email protected]>

* fix: typo

Signed-off-by: Vladislav Sukhin <[email protected]>

* fix: waiting for service pod

Signed-off-by: Vladislav Sukhin <[email protected]>

* fix: add service name check

Signed-off-by: Vladislav Sukhin <[email protected]>

* fix: adjust help

Signed-off-by: Vladislav Sukhin <[email protected]>

* fix: add method to parallel step

Signed-off-by: Vladislav Sukhin <[email protected]>

* fix: use retry library

Signed-off-by: Vladislav Sukhin <[email protected]>

* fix: rename const

Signed-off-by: Vladislav Sukhin <[email protected]>

* fix: 0 attempts

Signed-off-by: Vladislav Sukhin <[email protected]>

* fix: use option

Signed-off-by: Vladislav Sukhin <[email protected]>

* fix: remove ctx

Signed-off-by: Vladislav Sukhin <[email protected]>

* feat: add cli support for parallel steps

Signed-off-by: Vladislav Sukhin <[email protected]>

* fix: rename url

Signed-off-by: Vladislav Sukhin <[email protected]>

* feat: api methods for parallel steps

Signed-off-by: Vladislav Sukhin <[email protected]>

* fix: tune parallel step detection

Signed-off-by: Vladislav Sukhin <[email protected]>

* fix: typo

Signed-off-by: Vladislav Sukhin <[email protected]>

* fix: cli

Signed-off-by: Vladislav Sukhin <[email protected]>

* feat: add proto for parallel step logs

Signed-off-by: Vladislav Sukhin <[email protected]>

* fix: lint

Signed-off-by: Vladislav Sukhin <[email protected]>

* add: grpc method for parallel steps

Signed-off-by: Vladislav Sukhin <[email protected]>

* fix: comments

Signed-off-by: Vladislav Sukhin <[email protected]>

* fix: comments

Signed-off-by: Vladislav Sukhin <[email protected]>

* fix: typo

Signed-off-by: Vladislav Sukhin <[email protected]>

* fix: change proto

Signed-off-by: Vladislav Sukhin <[email protected]>

* fix: rename fields

Signed-off-by: Vladislav Sukhin <[email protected]>

* fix: use const

Signed-off-by: Vladislav Sukhin <[email protected]>

* fix: check for empty result

Signed-off-by: Vladislav Sukhin <[email protected]>

* fix: move methods to agent package

Signed-off-by: Vladislav Sukhin <[email protected]>

* fix: send org and env ids

Signed-off-by: Vladislav Sukhin <[email protected]>

* fix: add org and env ids

Signed-off-by: Vladislav Sukhin <[email protected]>

---------

Signed-off-by: Vladislav Sukhin <[email protected]>
  • Loading branch information
vsukhin authored Dec 9, 2024
1 parent b60b697 commit 8f3f8de
Show file tree
Hide file tree
Showing 24 changed files with 1,880 additions and 255 deletions.
26 changes: 14 additions & 12 deletions cmd/api-server/commons/commons.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,18 +282,20 @@ func ReadDefaultExecutors(cfg *config.Config) (executors []testkube.ExecutorDeta

func ReadProContext(ctx context.Context, cfg *config.Config, grpcClient cloud.TestKubeCloudAPIClient) config.ProContext {
proContext := config.ProContext{
APIKey: cfg.TestkubeProAPIKey,
URL: cfg.TestkubeProURL,
TLSInsecure: cfg.TestkubeProTLSInsecure,
WorkerCount: cfg.TestkubeProWorkerCount,
LogStreamWorkerCount: cfg.TestkubeProLogStreamWorkerCount,
WorkflowNotificationsWorkerCount: cfg.TestkubeProWorkflowNotificationsWorkerCount,
SkipVerify: cfg.TestkubeProSkipVerify,
EnvID: cfg.TestkubeProEnvID,
OrgID: cfg.TestkubeProOrgID,
Migrate: cfg.TestkubeProMigrate,
ConnectionTimeout: cfg.TestkubeProConnectionTimeout,
DashboardURI: cfg.TestkubeDashboardURI,
APIKey: cfg.TestkubeProAPIKey,
URL: cfg.TestkubeProURL,
TLSInsecure: cfg.TestkubeProTLSInsecure,
WorkerCount: cfg.TestkubeProWorkerCount,
LogStreamWorkerCount: cfg.TestkubeProLogStreamWorkerCount,
WorkflowNotificationsWorkerCount: cfg.TestkubeProWorkflowNotificationsWorkerCount,
WorkflowServiceNotificationsWorkerCount: cfg.TestkubeProWorkflowServiceNotificationsWorkerCount,
WorkflowParallelStepNotificationsWorkerCount: cfg.TestkubeProWorkflowParallelStepNotificationsWorkerCount,
SkipVerify: cfg.TestkubeProSkipVerify,
EnvID: cfg.TestkubeProEnvID,
OrgID: cfg.TestkubeProOrgID,
Migrate: cfg.TestkubeProMigrate,
ConnectionTimeout: cfg.TestkubeProConnectionTimeout,
DashboardURI: cfg.TestkubeDashboardURI,
}

if cfg.TestkubeProAPIKey == "" || grpcClient == nil {
Expand Down
29 changes: 5 additions & 24 deletions cmd/api-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package main

import (
"context"
"errors"
"flag"
"fmt"
"strings"
Expand All @@ -22,12 +21,10 @@ import (
"github.com/kubeshop/testkube/pkg/event/kind/k8sevent"
"github.com/kubeshop/testkube/pkg/event/kind/webhook"
ws "github.com/kubeshop/testkube/pkg/event/kind/websocket"
"github.com/kubeshop/testkube/pkg/executor/output"
"github.com/kubeshop/testkube/pkg/secretmanager"
"github.com/kubeshop/testkube/pkg/server"
"github.com/kubeshop/testkube/pkg/tcl/checktcl"
"github.com/kubeshop/testkube/pkg/tcl/schedulertcl"
"github.com/kubeshop/testkube/pkg/testworkflows/executionworker/executionworkertypes"
"github.com/kubeshop/testkube/pkg/testworkflows/testworkflowprocessor/presets"

"github.com/kubeshop/testkube/internal/common"
Expand Down Expand Up @@ -308,26 +305,8 @@ func main() {
api.Init(httpServer)

log.DefaultLogger.Info("starting agent service")
getTestWorkflowNotificationsStream := func(ctx context.Context, executionID string) (<-chan testkube.TestWorkflowExecutionNotification, error) {
execution, err := testWorkflowResultsRepository.Get(ctx, executionID)
if err != nil {
return nil, err
}
notifications := executionWorker.Notifications(ctx, execution.Id, executionworkertypes.NotificationsOptions{
Hints: executionworkertypes.Hints{
Namespace: execution.Namespace,
Signature: execution.Signature,
ScheduledAt: common.Ptr(execution.ScheduledAt),
},
})
if notifications.Err() != nil {
return nil, notifications.Err()
}
return notifications.Channel(), nil
}
getDeprecatedLogStream := func(ctx context.Context, executionID string) (chan output.Output, error) {
return nil, errors.New("deprecated features have been disabled")
}

getDeprecatedLogStream := agent.GetDeprecatedLogStream
if deprecatedSystem != nil && deprecatedSystem.StreamLogs != nil {
getDeprecatedLogStream = deprecatedSystem.StreamLogs
}
Expand All @@ -336,7 +315,9 @@ func main() {
httpServer.Mux.Handler(),
grpcClient,
getDeprecatedLogStream,
getTestWorkflowNotificationsStream,
agent.GetTestWorkflowNotificationsStream(testWorkflowResultsRepository, executionWorker),
agent.GetTestWorkflowServiceNotificationsStream(testWorkflowResultsRepository, executionWorker),
agent.GetTestWorkflowParallelStepNotificationsStream(testWorkflowResultsRepository, executionWorker),
clusterId,
cfg.TestkubeClusterName,
features,
Expand Down
2 changes: 1 addition & 1 deletion cmd/kubectl-testkube/commands/testworkflows/executions.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func NewGetTestWorkflowExecutionsCmd() *cobra.Command {
ui.Info("Getting logs for test workflow execution", executionID)

logs, err := client.GetTestWorkflowExecutionLogs(executionID)
ui.ExitOnError("getting logs from executor", err)
ui.ExitOnError("getting logs from test workflow", err)

sigs := flattenSignatures(execution.Signature)

Expand Down
156 changes: 145 additions & 11 deletions cmd/kubectl-testkube/commands/testworkflows/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
const (
LogTimestampLength = 30 // time.RFC3339Nano without 00:00 timezone
apiErrorMessage = "processing error:"
logsCheckDelay = 100 * time.Millisecond
)

var (
Expand All @@ -47,6 +48,10 @@ func NewRunTestWorkflowCmd() *cobra.Command {
masks []string
tags map[string]string
selectors []string
serviceName string
parallelStepName string
serviceIndex int
parallelStepIndex int
)

cmd := &cobra.Command{
Expand Down Expand Up @@ -146,7 +151,15 @@ func NewRunTestWorkflowCmd() *cobra.Command {
ui.NL()
if !execution.FailedToInitialize() {
if watchEnabled && len(args) > 0 {
exitCode = uiWatch(execution, client)
var pServiceName, pParallelStepName *string
if cmd.Flag("service-name").Changed || cmd.Flag("service-index").Changed {
pServiceName = &serviceName
}
if cmd.Flag("parallel-step-name").Changed || cmd.Flag("parallel-step-index").Changed {
pParallelStepName = &parallelStepName
}

exitCode = uiWatch(execution, pServiceName, serviceIndex, pParallelStepName, parallelStepIndex, client)
ui.NL()
if downloadArtifactsEnabled {
tests.DownloadTestWorkflowArtifacts(execution.Id, downloadDir, format, masks, client, outputPretty)
Expand Down Expand Up @@ -181,12 +194,46 @@ func NewRunTestWorkflowCmd() *cobra.Command {
cmd.Flags().StringArrayVarP(&masks, "mask", "", []string{}, "regexp to filter downloaded files, single or comma separated, like report/.* or .*\\.json,.*\\.js$")
cmd.Flags().StringToStringVarP(&tags, "tag", "", map[string]string{}, "execution tags in a form of name1=val1 passed to executor")
cmd.Flags().StringSliceVarP(&selectors, "label", "l", nil, "label key value pair: --label key1=value1 or label expression")
cmd.Flags().StringVar(&serviceName, "service-name", "", "test workflow service name")
cmd.Flags().IntVar(&serviceIndex, "service-index", 0, "test workflow service index starting from 0")
cmd.Flags().StringVar(&parallelStepName, "parallel-step-name", "", "test workflow parallel step name or reference")
cmd.Flags().IntVar(&parallelStepIndex, "parallel-step-index", 0, "test workflow parallel step index starting from 0")

return cmd
}

func uiWatch(execution testkube.TestWorkflowExecution, client apiclientv1.Client) int {
result, err := watchTestWorkflowLogs(execution.Id, execution.Signature, client)
func uiWatch(execution testkube.TestWorkflowExecution, serviceName *string, serviceIndex int,
parallelStepName *string, parallelStepIndex int, client apiclientv1.Client) int {
var result *testkube.TestWorkflowResult
var err error

switch {
case serviceName != nil:
found := false
if execution.Workflow != nil {
found = execution.Workflow.HasService(*serviceName)
}

if !found {
ui.Failf("unknown service '%s' for test workflow execution %s", *serviceName, execution.Id)
}

result, err = watchTestWorkflowServiceLogs(execution.Id, *serviceName, serviceIndex, execution.Signature, client)
case parallelStepName != nil:
ref := execution.GetParallelStepReference(*parallelStepName)
if ref == "" {
ui.Failf("unknown parallel step '%s' for test workflow execution %s", *parallelStepName, execution.Id)
}

result, err = watchTestWorkflowParallelStepLogs(execution.Id, ref, parallelStepIndex, execution.Signature, client)
default:
result, err = watchTestWorkflowLogs(execution.Id, execution.Signature, client)
}

if result == nil && err == nil {
err = errors.New("no result found")
}

ui.ExitOnError("reading test workflow execution logs", err)

// Apply the result in the execution
Expand Down Expand Up @@ -283,15 +330,10 @@ func getTimestampLength(line string) int {
return 0
}

func watchTestWorkflowLogs(id string, signature []testkube.TestWorkflowSignature, client apiclientv1.Client) (*testkube.TestWorkflowResult, error) {
ui.Info("Getting logs from test workflow job", id)

notifications, err := client.GetTestWorkflowExecutionNotifications(id)
ui.ExitOnError("getting logs from executor", err)

func printTestWorkflowLogs(signature []testkube.TestWorkflowSignature,
notifications chan testkube.TestWorkflowExecutionNotification) (result *testkube.TestWorkflowResult) {
steps := flattenSignatures(signature)

var result *testkube.TestWorkflowResult
var isLineBeginning = true
for l := range notifications {
if l.Output != nil {
Expand All @@ -309,8 +351,100 @@ func watchTestWorkflowLogs(id string, signature []testkube.TestWorkflowSignature
}

ui.NL()
return result
}

func watchTestWorkflowLogs(id string, signature []testkube.TestWorkflowSignature, client apiclientv1.Client) (*testkube.TestWorkflowResult, error) {
ui.Info("Getting logs from test workflow job", id)

notifications, err := client.GetTestWorkflowExecutionNotifications(id)
if err != nil {
return nil, err
}

return printTestWorkflowLogs(signature, notifications), nil
}

func watchTestWorkflowServiceLogs(id, serviceName string, serviceIndex int,
signature []testkube.TestWorkflowSignature, client apiclientv1.Client) (*testkube.TestWorkflowResult, error) {
ui.Info("Getting logs from test workflow service job", fmt.Sprintf("%s-%s-%d", id, serviceName, serviceIndex))

var (
notifications chan testkube.TestWorkflowExecutionNotification
nErr error
)

spinner := ui.NewSpinner("Waiting for service logs")
for {
notifications, nErr = client.GetTestWorkflowExecutionServiceNotifications(id, serviceName, serviceIndex)
if nErr != nil {
execution, cErr := client.GetTestWorkflowExecution(id)
if cErr != nil {
spinner.Fail()
return nil, cErr
}

if execution.Result != nil {
if execution.Result.IsFinished() {
nErr = errors.New("test workflow execution is finished")
} else {
time.Sleep(logsCheckDelay)
continue
}
}
}

if nErr != nil {
spinner.Fail()
return nil, nErr
}

break
}

spinner.Success()
return printTestWorkflowLogs(signature, notifications), nil
}

func watchTestWorkflowParallelStepLogs(id, ref string, workerIndex int,
signature []testkube.TestWorkflowSignature, client apiclientv1.Client) (*testkube.TestWorkflowResult, error) {
ui.Info("Getting logs from test workflow parallel step job", fmt.Sprintf("%s-%s-%d", id, ref, workerIndex))

var (
notifications chan testkube.TestWorkflowExecutionNotification
nErr error
)

spinner := ui.NewSpinner("Waiting for parallel step logs")
for {
notifications, nErr = client.GetTestWorkflowExecutionParallelStepNotifications(id, ref, workerIndex)
if nErr != nil {
execution, cErr := client.GetTestWorkflowExecution(id)
if cErr != nil {
spinner.Fail()
return nil, cErr
}

if execution.Result != nil {
if execution.Result.IsFinished() {
nErr = errors.New("test workflow execution is finished")
} else {
time.Sleep(logsCheckDelay)
continue
}
}
}

if nErr != nil {
spinner.Fail()
return nil, nErr
}

break
}

return result, err
spinner.Success()
return printTestWorkflowLogs(signature, notifications), nil
}

func printStatusHeader(i, n int, name string) {
Expand Down
22 changes: 21 additions & 1 deletion cmd/kubectl-testkube/commands/testworkflows/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,13 @@ import (
)

func NewWatchTestWorkflowExecutionCmd() *cobra.Command {
var (
serviceName string
parallelStepName string
serviceIndex int
parallelStepIndex int
)

cmd := &cobra.Command{
Use: "testworkflowexecution <executionName>",
Aliases: []string{"testworkflowexecutions", "twe", "tw"},
Expand All @@ -31,7 +38,15 @@ func NewWatchTestWorkflowExecutionCmd() *cobra.Command {
ui.ExitOnError("render test workflow execution", err)

ui.NL()
exitCode := uiWatch(execution, client)
var pServiceName, pParallelStepName *string
if cmd.Flag("service-name").Changed || cmd.Flag("service-index").Changed {
pServiceName = &serviceName
}
if cmd.Flag("parallel-step-name").Changed || cmd.Flag("parallel-step-index").Changed {
pParallelStepName = &parallelStepName
}

exitCode := uiWatch(execution, pServiceName, serviceIndex, pParallelStepName, parallelStepIndex, client)
ui.NL()

execution, err = client.GetTestWorkflowExecution(execution.Id)
Expand All @@ -43,5 +58,10 @@ func NewWatchTestWorkflowExecutionCmd() *cobra.Command {
},
}

cmd.Flags().StringVar(&serviceName, "service-name", "", "test workflow service name")
cmd.Flags().IntVar(&serviceIndex, "service-index", 0, "test workflow service index starting from 0")
cmd.Flags().StringVar(&parallelStepName, "parallel-step-name", "", "test workflow parallel step name or reference")
cmd.Flags().IntVar(&parallelStepIndex, "parallel-step-index", 0, "test workflow parallel step index starting from 0")

return cmd
}
4 changes: 4 additions & 0 deletions internal/app/api/v1/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,11 @@ func (s *TestkubeAPI) Init(server server.HTTPServer) {
testWorkflowExecutions.Post("/", s.ExecuteTestWorkflowHandler())
testWorkflowExecutions.Get("/:executionID", s.GetTestWorkflowExecutionHandler())
testWorkflowExecutions.Get("/:executionID/notifications", s.StreamTestWorkflowExecutionNotificationsHandler())
testWorkflowExecutions.Get("/:executionID/notifications/services/:serviceName/:serviceIndex<int>", s.StreamTestWorkflowExecutionServiceNotificationsHandler())
testWorkflowExecutions.Get("/:executionID/notifications/parallel-steps/:ref/:workerIndex<int>", s.StreamTestWorkflowExecutionParallelStepNotificationsHandler())
testWorkflowExecutions.Get("/:executionID/notifications/stream", s.StreamTestWorkflowExecutionNotificationsWebSocketHandler())
testWorkflowExecutions.Get("/:executionID/notifications/stream/services/:serviceName/:serviceIndex<int>", s.StreamTestWorkflowExecutionServiceNotificationsWebSocketHandler())
testWorkflowExecutions.Get("/:executionID/notifications/stream/parallel-steps/:ref/:workerIndex<int>", s.StreamTestWorkflowExecutionParallelStepNotificationsWebSocketHandler())
testWorkflowExecutions.Post("/:executionID/abort", s.AbortTestWorkflowExecutionHandler())
testWorkflowExecutions.Post("/:executionID/pause", s.PauseTestWorkflowExecutionHandler())
testWorkflowExecutions.Post("/:executionID/resume", s.ResumeTestWorkflowExecutionHandler())
Expand Down
Loading

0 comments on commit 8f3f8de

Please sign in to comment.