Skip to content

Commit

Permalink
chore: review fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
rangoo94 committed Dec 19, 2024
1 parent 8f71eea commit b39ab7e
Showing 3 changed files with 26 additions and 11 deletions.
4 changes: 3 additions & 1 deletion cmd/tcl/testworkflow-toolkit/commands/execute.go
Original file line number Diff line number Diff line change
@@ -44,6 +44,8 @@ const (

GetExecutionRetryOnFailureMaxAttempts = 30
GetExecutionRetryOnFailureDelay = 500 * time.Millisecond

ExecutionResultPollingTime = 200 * time.Millisecond
)

type testExecutionDetails struct {
@@ -218,7 +220,7 @@ func buildWorkflowExecution(workflow testworkflowsv1.StepExecuteWorkflow, async
loop:
for {
// TODO: Consider real-time Notifications without logs instead
time.Sleep(200 * time.Millisecond)
time.Sleep(ExecutionResultPollingTime)
for i := 0; i < GetExecutionRetryOnFailureMaxAttempts; i++ {
var resp []byte
resp, err = c.Execute(context.Background(), testworkflow.CmdTestWorkflowExecutionGet, testworkflow.ExecutionGetRequest{ID: exec.Id})
9 changes: 7 additions & 2 deletions pkg/runner/executionsaver.go
Original file line number Diff line number Diff line change
@@ -13,6 +13,11 @@ import (
"github.com/kubeshop/testkube/pkg/testworkflows/executionworker/controller/store"
)

const (
ExecutionSaverUpdateRetryCount = 10
ExecutionSaverUpdateRetryDelay = 300 * time.Millisecond
)

//go:generate mockgen -destination=./mock_executionsaver.go -package=runner "github.com/kubeshop/testkube/pkg/runner" ExecutionSaver
type ExecutionSaver interface {
UpdateResult(result testkube.TestWorkflowResult)
@@ -67,7 +72,7 @@ func (s *executionSaver) watchResultUpdates() {
if !ok {
return
}
for i := 0; i < 10; i++ {
for i := 0; i < ExecutionSaverUpdateRetryCount; i++ {
s.resultMu.Lock()
next := s.result
s.resultMu.Unlock()
@@ -81,7 +86,7 @@ func (s *executionSaver) watchResultUpdates() {
select {
case <-s.ctx.Done():
return
case <-time.After(300 * time.Millisecond):
case <-time.After(ExecutionSaverUpdateRetryDelay):
}
}
}
24 changes: 16 additions & 8 deletions pkg/runner/runner.go
Original file line number Diff line number Diff line change
@@ -17,6 +17,14 @@ import (
"github.com/kubeshop/testkube/pkg/testworkflows/executionworker/registry"
)

const (
GetNotificationsRetryCount = 10
GetNotificationsRetryDelay = 500 * time.Millisecond

SaveEndResultRetryCount = 100
SaveEndResultRetryBaseDelay = 500 * time.Millisecond
)

//go:generate mockgen -destination=./mock_runner.go -package=runner "github.com/kubeshop/testkube/pkg/runner" Runner
type Runner interface {
Monitor(ctx context.Context, id string) error
@@ -66,7 +74,7 @@ func (r *runner) monitor(ctx context.Context, execution testkube.TestWorkflowExe
defer r.watching.Delete(execution.Id)

var notifications executionworkertypes.NotificationsWatcher
for i := 0; i < 10; i++ {
for i := 0; i < GetNotificationsRetryCount; i++ {
notifications = r.worker.Notifications(ctx, execution.Id, executionworkertypes.NotificationsOptions{})
if notifications.Err() == nil {
break
@@ -75,7 +83,7 @@ func (r *runner) monitor(ctx context.Context, execution testkube.TestWorkflowExe
// TODO: should it mark as job was aborted then?
return registry.ErrResourceNotFound
}
time.Sleep(500 * time.Millisecond)
time.Sleep(GetNotificationsRetryDelay)
}
if notifications.Err() != nil {
return errors.Wrapf(notifications.Err(), "failed to listen for '%s' execution notifications", execution.Id)
@@ -112,14 +120,14 @@ func (r *runner) monitor(ctx context.Context, execution testkube.TestWorkflowExe
currentRef = n.Ref
err = logs.WriteStart(n.Ref)
if err != nil {
// FIXME: what to do then?
panic("logs write ref error")
log.DefaultLogger.Errorw("failed to write start logs", "id", execution.Id, "ref", n.Ref)
continue
}
}
_, err = logs.Write([]byte(n.Log))
if err != nil {
// FIXME: what to do then?
panic("logs write error")
log.DefaultLogger.Errorw("failed to write logs", "id", execution.Id, "ref", n.Ref)
continue
}
}
}
@@ -155,13 +163,13 @@ func (r *runner) monitor(ctx context.Context, execution testkube.TestWorkflowExe
}
}

for i := 0; i < 100; i++ {
for i := 0; i < SaveEndResultRetryCount; i++ {
err = saver.End(ctx, *lastResult)
if err == nil {
break
}
log.DefaultLogger.Warnw("failed to save execution data", "id", execution.Id, "error", err)
time.Sleep(time.Duration(i/10) * 500 * time.Millisecond)
time.Sleep(time.Duration(i/10) * SaveEndResultRetryBaseDelay)
}

// Handle fatal error

0 comments on commit b39ab7e

Please sign in to comment.