diff --git a/cmd/testworkflow-init/main.go b/cmd/testworkflow-init/main.go index 7ed32ecb50..f89f593114 100644 --- a/cmd/testworkflow-init/main.go +++ b/cmd/testworkflow-init/main.go @@ -3,10 +3,12 @@ package main import ( "encoding/json" "errors" + "fmt" "os" "os/signal" "slices" "strconv" + "sync/atomic" "syscall" "time" @@ -310,6 +312,7 @@ func main() { } // Configure timeout finalizer + var hasTimeout, hasOwnTimeout atomic.Bool finalizeTimeout := func() { // Check timed out steps in leaf timedOut := orchestration.GetTimedOut(leaf...) @@ -321,6 +324,10 @@ func main() { for _, r := range timedOut { r.SetStatus(constants.StepStatusTimeout) sub := state.GetSubSteps(r.Ref) + hasTimeout.Store(true) + if step.Ref == r.Ref { + hasOwnTimeout.Store(true) + } for i := range sub { if sub[i].IsFinished() { continue @@ -331,7 +338,6 @@ func main() { sub[i].SetStatus(constants.StepStatusSkipped) } } - stdoutUnsafe.Println("Timed out.") } _ = orchestration.Executions.Kill() @@ -358,6 +364,8 @@ func main() { } // Register timeouts + hasTimeout.Store(false) + hasOwnTimeout.Store(false) stopTimeoutWatcher := orchestration.WatchTimeout(finalizeTimeout, leaf...) // Run the command @@ -366,12 +374,17 @@ func main() { // Stop timer listener stopTimeoutWatcher() + // Handle timeout gracefully + if hasOwnTimeout.Load() { + orchestration.Executions.ClearAbortedStatus() + } + // Ensure there won't be any hanging processes after the command is executed _ = orchestration.Executions.Kill() // TODO: Handle retry policy in tree independently // Verify if there may be any other iteration - if step.Iteration >= step.Retry.Count { + if step.Iteration >= step.Retry.Count || (!hasOwnTimeout.Load() && hasTimeout.Load()) { break } @@ -393,7 +406,15 @@ func main() { // Continue with the next iteration step.Iteration++ stdout.HintDetails(step.Ref, constants.InstructionIteration, step.Iteration) - stdoutUnsafe.Printf("\nExit code: %d • Retrying: attempt #%d (of %d):\n", step.ExitCode, step.Iteration, step.Retry.Count) + message := fmt.Sprintf("Exit code: %d", step.ExitCode) + if hasOwnTimeout.Load() { + message = "Timed out" + } + stdoutUnsafe.Printf("\n%s • Retrying: attempt #%d (of %d):\n", message, step.Iteration, step.Retry.Count) + + // Restart start time for the next iteration to allow retries + now := time.Now() + step.StartedAt = &now } } diff --git a/cmd/testworkflow-init/orchestration/executions.go b/cmd/testworkflow-init/orchestration/executions.go index 5bf0eadd57..f1911bfcf4 100644 --- a/cmd/testworkflow-init/orchestration/executions.go +++ b/cmd/testworkflow-init/orchestration/executions.go @@ -34,6 +34,8 @@ type executionGroup struct { paused atomic.Bool pauseMu sync.Mutex + + softKillProgress atomic.Bool } func newExecutionGroup(outStream io.Writer, errStream io.Writer) *executionGroup { @@ -142,6 +144,10 @@ func (e *executionGroup) IsAborted() bool { return e.aborted.Load() } +func (e *executionGroup) ClearAbortedStatus() { + e.aborted.Store(false) +} + type execution struct { cmd *exec.Cmd cmdMu sync.Mutex @@ -203,7 +209,7 @@ func (e *execution) Run() (*executionResult, error) { // Mark the execution group as aborted when this process was aborted. // In Kubernetes, when that child process is killed, it may mean OOM Kill. - if aborted && !e.group.aborted.Load() { + if aborted && !e.group.aborted.Load() && !e.group.softKillProgress.Load() { e.group.Abort() }