From c593dff795f36e1a1f9275340d6c26183f7d0fcf Mon Sep 17 00:00:00 2001 From: Christopher Hlubek Date: Mon, 4 Apr 2022 19:46:40 +0200 Subject: [PATCH] FEATURE: Graceful shutdown and process groups for child processes Implements shutdown via SIGINT (graceful) and SIGTERM (cancelling jobs). Also fix an issue with child processes from scripts by setting setpgid to start processes (which is needed to prevent signals being forwarded). Closes #14 --- README.md | 73 ++------------ app/app.go | 45 ++++++++- examples/pipelines.yml | 34 +------ go.mod | 4 +- prunner.go | 91 ++++++++++++++++- prunner_test.go | 107 +++++++++++++++++++- taskctl/executor.go | 204 +++++++++++++++++++++++++++++++++++++++ taskctl/executor_test.go | 56 +++++++++++ taskctl/runner.go | 43 ++++++--- taskctl/scheduler.go | 9 +- 10 files changed, 542 insertions(+), 124 deletions(-) create mode 100644 taskctl/executor.go create mode 100644 taskctl/executor_test.go diff --git a/README.md b/README.md index c89c408..89345e7 100644 --- a/README.md +++ b/README.md @@ -264,85 +264,28 @@ You can also combine the two options. Then, deletion occurs with whatever comes If a pipeline does not exist at all anymore (i.e. if you renamed `do_something` to `another_name` above), its persisted logs and task data is removed automatically on saving to disk. +### Handling of child processes -### Interruptible Jobs +Prunner starts child processes with `setpgid` to use a new process group for each task of a pipeline job. +This means that if a job is cancelled, all child processes are killed - even if they were run by a shell script. -When terminating a task, unfortunately, only the top-level executed script is terminated - and the script needs -to take care to kill its children appropriately and not hang. This is very likely because of the following invocation chain: +Note: If prunner is killed hard (e.g. SIGKILL) without SIGINT / SIGTERM, the child processes of running jobs will not be terminated. -- prunner -- taskctl -- mvdan/sh [DefaultExecHandler](https://github.com/mvdan/sh/blob/master/interp/handler.go#L100-L104) +### Graceful shutdown --> This kills only the process itself, **but not its children**. Unfortunately for us, Bash [does not forward signals like -SIGTERM to processes it is currently waiting on](https://unix.stackexchange.com/questions/146756/forward-sigterm-to-child-in-bash). - -Currently, killing children recursively is not implemented, but can be done manually in a bash script [as explained here](https://newbedev.com/forward-sigterm-to-child-in-bash): - -Instead of doing: - -``` -#!/usr/bin/env bash - -/usr/local/bin/my_long_running_process -``` - -You can do the following to pass along signals: - -```bash -#!/usr/bin/env bash -# taken from https://newbedev.com/forward-sigterm-to-child-in-bash - -prep_term() -{ - unset term_child_pid - unset term_kill_needed - trap 'handle_term' TERM INT -} - -handle_term() -{ - if [ "${term_child_pid}" ]; then - kill -TERM "${term_child_pid}" 2>/dev/null - else - term_kill_needed="yes" - fi -} - -wait_term() -{ - term_child_pid=$! - if [ "${term_kill_needed}" ]; then - kill -TERM "${term_child_pid}" 2>/dev/null - fi - wait ${term_child_pid} 2>/dev/null - trap - TERM INT - wait ${term_child_pid} 2>/dev/null -} - - -prep_term -/usr/local/bin/my_long_running_process & -wait_term -``` - -**This is a workaround; and at some point in the future it would be nice to solve it as [explained here](https://medium.com/@felixge/killing-a-child-process-and-all-of-its-children-in-go-54079af94773)**. +Prunner will handle a SIGINT signal and perform a graceful shutdown and wait for all running jobs to be completed. +Sending a SIGTERM signal to prunner will cancel all running jobs (and interrupt / kill child processes). ### Persistent Job State - - - - ## Development ### Requirements -* Go (>= 1.16) +* Go (>= 1.18) ### Running locally - ```bash go run cmd/prunner/main.go ``` diff --git a/app/app.go b/app/app.go index ac724ee..a971c89 100644 --- a/app/app.go +++ b/app/app.go @@ -5,8 +5,10 @@ import ( "io" "net/http" "os" + "os/signal" "path" "path/filepath" + "syscall" "github.com/apex/log" logfmt_handler "github.com/apex/log/handlers/logfmt" @@ -174,7 +176,7 @@ func appAction(c *cli.Context) error { return errors.Wrap(err, "building output store") } - store, err := store.NewJSONDataStore(path.Join(c.String("data"))) + dataStore, err := store.NewJSONDataStore(path.Join(c.String("data"))) if err != nil { return errors.Wrap(err, "building pipeline runner store") } @@ -189,7 +191,7 @@ func appAction(c *cli.Context) error { taskRunner.Stderr = io.Discard return taskRunner - }, store, outputStore) + }, dataStore, outputStore) if err != nil { return err } @@ -202,10 +204,43 @@ func appAction(c *cli.Context) error { ) // Set up a simple REST API for listing jobs and scheduling pipelines - + address := c.String("address") log. - Infof("HTTP API Listening on %s", c.String("address")) - return http.ListenAndServe(c.String("address"), srv) + Infof("HTTP API listening on %s", address) + + // Start http server and handle graceful shutdown + httpSrv := http.Server{ + Addr: address, + Handler: srv, + } + go func() { + if err := httpSrv.ListenAndServe(); err != nil && err != http.ErrServerClosed { + log.Fatalf("Error starting HTTP API: %s", err) + } + }() + + // How signals are handled: + // - SIGINT: Shutdown gracefully and wait for jobs to be finished completely + // - SIGTERM: Cancel running jobs + + ctx, cancel := signal.NotifyContext(c.Context, syscall.SIGINT, syscall.SIGTERM) + defer cancel() + // Cancel the shutdown context on SIGTERM to prevent waiting for jobs and client connections + shutdownCtx, shutdownCancel := signal.NotifyContext(c.Context, syscall.SIGTERM) + defer shutdownCancel() + + // Wait for SIGINT or SIGTERM + <-ctx.Done() + + log.Info("Received signal, waiting until jobs are finished...") + _ = pRunner.Shutdown(shutdownCtx) + + log.Debugf("Shutting down HTTP API...") + _ = httpSrv.Shutdown(shutdownCtx) + + log.Info("Shutdown complete") + + return nil } func createLogFormatter(c *cli.Context) middleware.LogFormatter { diff --git a/examples/pipelines.yml b/examples/pipelines.yml index 8f1391d..7dfb0a6 100644 --- a/examples/pipelines.yml +++ b/examples/pipelines.yml @@ -3,7 +3,7 @@ pipelines: tasks: lint: script: - - staticcheck $(go list ./... | grep -v /vendor/) + # - staticcheck $(go list ./... | grep -v /vendor/) - go vet $(go list ./... | grep -v /vendor/) test: script: @@ -11,37 +11,9 @@ pipelines: do_something_long: script: - echo Tick - - sleep 1 - - echo Tock - - sleep 1 - - echo Tick - - sleep 1 - - echo Tock - - sleep 1 - - echo Tick - - sleep 1 - - echo Tock - - sleep 1 - - echo Tick - - sleep 1 - - echo Tock - - sleep 1 - - echo Tick - - sleep 1 - - echo Tock - - sleep 1 - - echo Tick - - sleep 1 + - sleep 20 - echo Tock - - sleep 1 - - echo Tick - - sleep 1 - - echo Tock - - sleep 1 - - echo Tick - - sleep 1 - - echo Tock - - sleep 1 + - sleep 20 - echo Dong build: script: diff --git a/go.mod b/go.mod index dc6d61a..b8caca5 100644 --- a/go.mod +++ b/go.mod @@ -13,11 +13,11 @@ require ( github.com/liamylian/jsontime/v2 v2.0.0 github.com/mattn/go-isatty v0.0.14 github.com/mattn/go-zglob v0.0.3 - github.com/sirupsen/logrus v1.8.1 github.com/stretchr/testify v1.7.1 github.com/taskctl/taskctl v1.3.1-0.20210426182424-d8747985c906 github.com/urfave/cli/v2 v2.4.0 gopkg.in/yaml.v2 v2.4.0 + mvdan.cc/sh/v3 v3.4.3 ) require ( @@ -42,6 +42,7 @@ require ( github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect + github.com/sirupsen/logrus v1.8.1 // indirect golang.org/x/crypto v0.0.0-20220331220935-ae2d96664a29 // indirect golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect golang.org/x/sys v0.0.0-20220403205710-6acee93ad0eb // indirect @@ -49,5 +50,4 @@ require ( golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f // indirect gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect - mvdan.cc/sh/v3 v3.4.3 // indirect ) diff --git a/prunner.go b/prunner.go index 7d1ff8b..6b42f54 100644 --- a/prunner.go +++ b/prunner.go @@ -47,6 +47,11 @@ type PipelineRunner struct { // Wait group for waiting for asynchronous operations like job.Cancel wg sync.WaitGroup + // Flag if the runner is shutting down + isShuttingDown bool + + // Poll interval for completed jobs for graceful shutdown + ShutdownPollInterval time.Duration } // NewPipelineRunner creates the central data structure which controls the full runner state; so this knows what is currently running @@ -61,9 +66,10 @@ func NewPipelineRunner(ctx context.Context, defs *definition.PipelinesDef, creat waitListByPipeline: make(map[string][]*PipelineJob), store: store, outputStore: outputStore, - // Use channel buffered with one extra slot so we can keep save requests while a save is running without blocking - persistRequests: make(chan struct{}, 1), - createTaskRunner: createTaskRunner, + // Use channel buffered with one extra slot, so we can keep save requests while a save is running without blocking + persistRequests: make(chan struct{}, 1), + createTaskRunner: createTaskRunner, + ShutdownPollInterval: 3 * time.Second, } if store != nil { @@ -76,6 +82,7 @@ func NewPipelineRunner(ctx context.Context, defs *definition.PipelinesDef, creat for { select { case <-ctx.Done(): + log.Debugf("PipelineRunner: context done, stopping persist loop") return case <-pRunner.persistRequests: pRunner.SaveToStore() @@ -175,11 +182,16 @@ var errQueueFull = errors.New("concurrency exceeded and queue limit reached for var ErrJobNotFound = errors.New("job not found") var errJobAlreadyCompleted = errors.New("job is already completed") var errJobNotStarted = errors.New("job is not started") +var ErrShuttingDown = errors.New("runner is shutting down") func (r *PipelineRunner) ScheduleAsync(pipeline string, opts ScheduleOpts) (*PipelineJob, error) { r.mx.Lock() defer r.mx.Unlock() + if r.isShuttingDown { + return nil, ErrShuttingDown + } + pipelineDef, ok := r.defs.Pipelines[pipeline] if !ok { return nil, errors.Errorf("pipeline %q is not defined", pipeline) @@ -367,7 +379,9 @@ func (r *PipelineRunner) startJob(job *PipelineJob) { job.Start = &now // Run graph asynchronously + r.wg.Add(1) go func() { + defer r.wg.Done() lastErr := job.sched.Schedule(graph) r.JobCompleted(job.ID, lastErr) }() @@ -685,6 +699,9 @@ func (r *PipelineRunner) initialLoadFromStore() error { } func (r *PipelineRunner) SaveToStore() { + r.wg.Add(1) + defer r.wg.Done() + log. WithField("component", "runner"). Debugf("Saving job state to data store") @@ -774,6 +791,74 @@ func (r *PipelineRunner) SaveToStore() { } } +func (r *PipelineRunner) Shutdown(ctx context.Context) error { + defer func() { + log. + WithField("component", "runner"). + Debugf("Shutting down, waiting for pending operations...") + r.wg.Wait() + }() + + r.mx.Lock() + r.isShuttingDown = true + // Cancel all jobs on wait list + for pipelineName, jobs := range r.waitListByPipeline { + for _, job := range jobs { + job.Canceled = true + log. + WithField("component", "runner"). + WithField("jobID", job.ID). + WithField("pipeline", pipelineName). + Warnf("Shutting down, marking job on wait list as canceled") + } + delete(r.waitListByPipeline, pipelineName) + } + r.mx.Unlock() + + for { + // Poll pipelines to check for running jobs + r.mx.RLock() + hasRunningPipelines := false + for pipelineName := range r.jobsByPipeline { + if r.isRunning(pipelineName) { + hasRunningPipelines = true + log. + WithField("component", "runner"). + WithField("pipeline", pipelineName). + Debugf("Shutting down, waiting for pipeline to finish...") + } + } + r.mx.RUnlock() + + if !hasRunningPipelines { + log. + WithField("component", "runner"). + Debugf("Shutting down, all pipelines finished") + break + } + + // Wait a few seconds before checking pipeline status again, or cancel jobs if the context is done + select { + case <-time.After(r.ShutdownPollInterval): + case <-ctx.Done(): + log. + WithField("component", "runner"). + Warnf("Forced shutdown, cancelling all jobs") + + r.mx.Lock() + + for jobID := range r.jobsByID { + _ = r.cancelJobInternal(jobID) + } + r.mx.Unlock() + + return ctx.Err() + } + } + + return nil +} + // taken from https://stackoverflow.com/a/37335777 func removeJobFromList(jobs []*PipelineJob, jobToRemove *PipelineJob) []*PipelineJob { for index, job := range jobs { diff --git a/prunner_test.go b/prunner_test.go index 56fd9f2..9f422a4 100644 --- a/prunner_test.go +++ b/prunner_test.go @@ -175,7 +175,7 @@ func TestPipelineRunner_ScheduleAsync_WithEnvVars(t *testing.T) { Concurrency: 1, QueueLimit: nil, Env: map[string]string{ - "MY_VAR": "from pipeline", + "MY_VAR": "from pipeline", "OTHER_VAR": "from pipeline", }, Tasks: map[string]definition.TaskDef{ @@ -746,7 +746,7 @@ func TestPipelineRunner_ScheduleAsync_WithStartDelayNoQueueAndReplaceWillNotRunC pRunner, err := NewPipelineRunner(ctx, defs, func(j *PipelineJob) taskctl.Runner { return &test.MockRunner{ OnRun: func(tsk *task.Task) error { - time.Sleep(100*time.Millisecond) + time.Sleep(100 * time.Millisecond) return nil }, } @@ -787,6 +787,109 @@ func TestPipelineRunner_ScheduleAsync_WithStartDelayNoQueueAndReplaceWillNotRunC } } +func TestPipelineRunner_Shutdown_WithRunningJob_Graceful(t *testing.T) { + var defs = &definition.PipelinesDef{ + Pipelines: map[string]definition.PipelineDef{ + "long_running": { + // Concurrency of 1 is the default for a single concurrent execution + Concurrency: 1, + QueueLimit: nil, + Tasks: map[string]definition.TaskDef{ + "sleep": { + Script: []string{"sleep 10"}, + }, + }, + SourcePath: "fixtures", + }, + }, + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var ( + jobFinished bool + jobCanceled bool + ) + + pRunner, err := NewPipelineRunner(ctx, defs, func(j *PipelineJob) taskctl.Runner { + return &test.MockRunner{ + OnRun: func(tsk *task.Task) error { + time.Sleep(100 * time.Millisecond) + jobFinished = true + return nil + }, + OnCancel: func() { + jobCanceled = true + }, + } + }, nil, test.NewMockOutputStore()) + pRunner.ShutdownPollInterval = 100 * time.Millisecond + require.NoError(t, err) + + job, err := pRunner.ScheduleAsync("long_running", ScheduleOpts{}) + require.NoError(t, err) + + jobID := job.ID + + waitForStartedJobTask(t, pRunner, jobID, "sleep") + + shutdownCtx := context.Background() + + err = pRunner.Shutdown(shutdownCtx) + require.NoError(t, err) + + assert.False(t, job.Canceled, "job was not marked as canceled") + assert.True(t, jobFinished, "job was finished by runner") + assert.False(t, jobCanceled, "job was not canceled by runner") + assert.True(t, job.Completed, "job was marked as completed") +} + +func TestPipelineRunner_Shutdown_WithRunningJob_Forced(t *testing.T) { + var defs = &definition.PipelinesDef{ + Pipelines: map[string]definition.PipelineDef{ + "long_running": { + // Concurrency of 1 is the default for a single concurrent execution + Concurrency: 1, + QueueLimit: nil, + Tasks: map[string]definition.TaskDef{ + "sleep": { + Script: []string{"sleep 1"}, + }, + }, + SourcePath: "fixtures", + }, + }, + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + pRunner, err := NewPipelineRunner(ctx, defs, func(j *PipelineJob) taskctl.Runner { + // Use a real runner here to test the actual processing of a task.Task + taskRunner, _ := taskctl.NewTaskRunner(test.NewMockOutputStore()) + return taskRunner + }, nil, test.NewMockOutputStore()) + pRunner.ShutdownPollInterval = 100 * time.Millisecond + require.NoError(t, err) + + job, err := pRunner.ScheduleAsync("long_running", ScheduleOpts{}) + require.NoError(t, err) + + jobID := job.ID + + waitForStartedJobTask(t, pRunner, jobID, "sleep") + + // Force cancel of job after 100ms + shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer shutdownCancel() + + err = pRunner.Shutdown(shutdownCtx) + require.ErrorIs(t, err, context.DeadlineExceeded) + + assert.True(t, job.Canceled, "job was marked as canceled") +} + func intPtr(i int) *int { return &i } diff --git a/taskctl/executor.go b/taskctl/executor.go new file mode 100644 index 0000000..1b99452 --- /dev/null +++ b/taskctl/executor.go @@ -0,0 +1,204 @@ +package taskctl + +import ( + "bytes" + "context" + "fmt" + "io" + "os" + "os/exec" + "strings" + "syscall" + "time" + + "github.com/apex/log" + "github.com/taskctl/taskctl/pkg/executor" + "mvdan.cc/sh/v3/expand" + + "mvdan.cc/sh/v3/interp" + "mvdan.cc/sh/v3/syntax" + + "github.com/taskctl/taskctl/pkg/utils" +) + +// PgidExecutor is an executor that starts commands in a new process group +// +// It is based on executor.DefaultExecutor but uses an exec handler that sets setpgid to +// start child processes in a new process group (with negative pid). It is used to prevent +// signals from propagating to children and to kill all descendant processes e.g. when forked by a shell process. +type PgidExecutor struct { + dir string + env []string + interp *interp.Runner + buf bytes.Buffer +} + +// NewPgidExecutor creates new pgid executor +func NewPgidExecutor(stdin io.Reader, stdout, stderr io.Writer, killTimeout time.Duration) (*PgidExecutor, error) { + var err error + e := &PgidExecutor{ + env: os.Environ(), + } + + e.dir, err = os.Getwd() + if err != nil { + return nil, err + } + + if stdout == nil { + stdout = io.Discard + } + + if stderr == nil { + stderr = io.Discard + } + + e.interp, err = interp.New( + interp.StdIO(stdin, io.MultiWriter(&e.buf, stdout), io.MultiWriter(&e.buf, stderr)), + interp.ExecHandler(func(ctx context.Context, args []string) error { + hc := interp.HandlerCtx(ctx) + path, err := interp.LookPathDir(hc.Dir, hc.Env, args[0]) + if err != nil { + _, _ = fmt.Fprintln(hc.Stderr, err) + return interp.NewExitStatus(127) + } + cmd := exec.Cmd{ + Path: path, + Args: args, + Env: execEnv(hc.Env), + Dir: hc.Dir, + Stdin: hc.Stdin, + Stdout: hc.Stdout, + Stderr: hc.Stderr, + SysProcAttr: &syscall.SysProcAttr{ + // Added for prunner: Using a process group for the new child process fixes 2 things: + // 1. We can handle signals like SIGINT in the prunner main process and gracefully shutdown running tasks + // 2. Bash scripts will not forward signals by default, so using process groups can be used to send signals to all children in the group + // (https://unix.stackexchange.com/questions/146756/forward-sigterm-to-child-in-bash) + Setpgid: true, + }, + } + + err = cmd.Start() + if err == nil { + if done := ctx.Done(); done != nil { + go func() { + <-done + + if killTimeout <= 0 { + _ = syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL) + return + } + + // TODO: don't temporarily leak this goroutine if the program stops itself with the interrupt. (from github.com/mvdan/sh) + go func() { + time.Sleep(killTimeout) + _ = syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL) + }() + _ = syscall.Kill(-cmd.Process.Pid, syscall.SIGINT) + }() + } + + err = cmd.Wait() + } + + switch x := err.(type) { + case *exec.ExitError: + // started, but errored - default to 1 if OS doesn't have exit statuses + if status, ok := x.Sys().(syscall.WaitStatus); ok { + if status.Signaled() { + if ctx.Err() != nil { + return ctx.Err() + } + return interp.NewExitStatus(uint8(128 + status.Signal())) + } + return interp.NewExitStatus(uint8(status.ExitStatus())) + } + return interp.NewExitStatus(1) + case *exec.Error: + // did not start + _, _ = fmt.Fprintf(hc.Stderr, "%v\n", err) + return interp.NewExitStatus(127) + default: + return err + } + }), + ) + if err != nil { + return nil, err + } + + return e, nil +} + +// Execute executes given job with provided context +// Returns job output +func (e *PgidExecutor) Execute(ctx context.Context, job *executor.Job) ([]byte, error) { + command, err := utils.RenderString(job.Command, job.Vars.Map()) + if err != nil { + return nil, err + } + + cmd, err := syntax.NewParser(syntax.KeepComments(true)).Parse(strings.NewReader(command), "") + if err != nil { + return nil, err + } + + env := e.env + env = append(env, utils.ConvertEnv(utils.ConvertToMapOfStrings(job.Env.Map()))...) + + if job.Dir == "" { + job.Dir = e.dir + } + + jobID := job.Vars.Get(JobIDVariableName).(string) + + log. + WithField("component", "executor"). + WithField("jobID", jobID). + Debugf("Executing \"%s\"", command) + + e.interp.Dir = job.Dir + e.interp.Env = expand.ListEnviron(env...) + + var cancelFn context.CancelFunc + if job.Timeout != nil { + ctx, cancelFn = context.WithTimeout(ctx, *job.Timeout) + } + defer func() { + if cancelFn != nil { + cancelFn() + } + }() + + offset := e.buf.Len() + err = e.interp.Run(ctx, cmd) + if err != nil { + return e.buf.Bytes()[offset:], err + } + + return e.buf.Bytes()[offset:], nil +} + +func execEnv(env expand.Environ) []string { + list := make([]string, 0, 64) + env.Each(func(name string, vr expand.Variable) bool { + if !vr.IsSet() { + // If a variable is set globally but unset in the + // runner, we need to ensure it's not part of the final + // list. Seems like zeroing the element is enough. + // This is a linear search, but this scenario should be + // rare, and the number of variables shouldn't be large. + for i, kv := range list { + if strings.HasPrefix(kv, name+"=") { + list[i] = "" + } + } + } + if vr.Exported && vr.Kind == expand.String { + list = append(list, name+"="+vr.String()) + } + return true + }) + return list +} diff --git a/taskctl/executor_test.go b/taskctl/executor_test.go new file mode 100644 index 0000000..8ca0f70 --- /dev/null +++ b/taskctl/executor_test.go @@ -0,0 +1,56 @@ +package taskctl_test + +import ( + "bytes" + "context" + "io/ioutil" + "testing" + "time" + + "github.com/taskctl/taskctl/pkg/executor" + + "github.com/Flowpack/prunner/taskctl" +) + +func TestDefaultExecutor_Execute(t *testing.T) { + e, err := taskctl.NewPgidExecutor(nil, ioutil.Discard, ioutil.Discard, 2*time.Second) + if err != nil { + t.Fatal(err) + } + + job1 := executor.NewJobFromCommand("echo 'success'") + to := 1 * time.Minute + job1.Timeout = &to + + output, err := e.Execute(context.Background(), job1) + if err != nil { + t.Fatal(err) + } + + if !bytes.Contains(output, []byte("success")) { + t.Error() + } + + job1 = executor.NewJobFromCommand("exit 1") + + _, err = e.Execute(context.Background(), job1) + if err == nil { + t.Error() + } + + if _, ok := executor.IsExitStatus(err); !ok { + t.Error() + } + + job2 := executor.NewJobFromCommand("echo {{ .Fail }}") + _, err = e.Execute(context.Background(), job2) + if err == nil { + t.Error() + } + + job3 := executor.NewJobFromCommand("printf '%s\\nLine-2\\n' '=========== Line 1 ==================' ") + _, err = e.Execute(context.Background(), job3) + if err != nil { + t.Error() + } +} diff --git a/taskctl/runner.go b/taskctl/runner.go index 58928c2..b0f9990 100644 --- a/taskctl/runner.go +++ b/taskctl/runner.go @@ -9,7 +9,7 @@ import ( "sync" "time" - "github.com/sirupsen/logrus" + "github.com/apex/log" "github.com/taskctl/taskctl/pkg/executor" "github.com/taskctl/taskctl/pkg/output" "github.com/taskctl/taskctl/pkg/runner" @@ -52,6 +52,8 @@ type TaskRunner struct { outputStore OutputStore onTaskChange func(t *task.Task) + + killTimeout time.Duration } var _ Runner = &TaskRunner{} @@ -68,6 +70,8 @@ func NewTaskRunner(outputStore OutputStore, opts ...Opts) (*TaskRunner, error) { env: variables.NewVariables(), outputStore: outputStore, + + killTimeout: 2 * time.Second, } r.ctx, r.cancelFunc = context.WithCancel(context.Background()) @@ -125,7 +129,7 @@ func (r *TaskRunner) Run(t *task.Task) error { defer func() { err = execContext.After() if err != nil { - logrus.Error(err) + log.Errorf("Error executing after tasks: %v", err) } if !t.Errored && !t.Skipped { @@ -139,13 +143,18 @@ func (r *TaskRunner) Run(t *task.Task) error { env = env.With("TASK_NAME", t.Name) env = env.Merge(t.Env) + jobID := t.Variables.Get(JobIDVariableName).(string) + meets, err := r.checkTaskCondition(t) if err != nil { return err } if !meets { - logrus.Infof("task %s was skipped", t.Name) + log. + WithField("component", "runner"). + WithField("jobID", jobID). + Infof("Task %s was skipped", t.Name) t.Skipped = true return nil } @@ -155,8 +164,6 @@ func (r *TaskRunner) Run(t *task.Task) error { return err } - jobID := t.Variables.Get(JobIDVariableName).(string) - var ( stdoutWriter = []io.Writer{&t.Log.Stdout} stderrWriter = []io.Writer{&t.Log.Stderr} @@ -214,7 +221,9 @@ func (r *TaskRunner) Cancel() { r.cancelMutex.Lock() if !r.canceling { r.canceling = true - defer logrus.Debug("runner has been cancelled") + defer log. + WithField("component", "runner"). + Debug("Task runner has been cancelled") r.cancelFunc() } r.cancelMutex.Unlock() @@ -255,7 +264,7 @@ func (r *TaskRunner) before(ctx context.Context, t *task.Task, env, vars variabl return fmt.Errorf("\"before\" command compilation failed: %w", err) } - exec, err := executor.NewDefaultExecutor(job.Stdin, job.Stdout, job.Stderr) + exec, err := NewPgidExecutor(job.Stdin, job.Stdout, job.Stderr, r.killTimeout) if err != nil { return err } @@ -285,14 +294,17 @@ func (r *TaskRunner) after(ctx context.Context, t *task.Task, env, vars variable return fmt.Errorf("\"after\" command compilation failed: %w", err) } - exec, err := executor.NewDefaultExecutor(job.Stdin, job.Stdout, job.Stderr) + exec, err := NewPgidExecutor(job.Stdin, job.Stdout, job.Stderr, r.killTimeout) if err != nil { return err } _, err = exec.Execute(ctx, job) if err != nil { - logrus.Warning(err) + log. + WithField("component", "runner"). + WithField("command", command). + Warnf("After command failed: %v", err) } } @@ -340,7 +352,7 @@ func (r *TaskRunner) checkTaskCondition(t *task.Task) (bool, error) { return false, err } - exec, err := executor.NewDefaultExecutor(job.Stdin, job.Stdout, job.Stderr) + exec, err := NewPgidExecutor(job.Stdin, job.Stdout, job.Stderr, r.killTimeout) if err != nil { return false, err } @@ -370,7 +382,7 @@ func (r *TaskRunner) storeTaskOutput(t *task.Task) { } func (r *TaskRunner) execute(ctx context.Context, t *task.Task, job *executor.Job) error { - exec, err := executor.NewDefaultExecutor(job.Stdin, job.Stdout, job.Stderr) + exec, err := NewPgidExecutor(job.Stdin, job.Stdout, job.Stderr, r.killTimeout) if err != nil { return err } @@ -385,7 +397,6 @@ func (r *TaskRunner) execute(ctx context.Context, t *task.Task, job *executor.Jo prevOutput, err = exec.Execute(ctx, nextJob) if err != nil { - t.End = time.Now() if status, ok := executor.IsExitStatus(err); ok { t.ExitCode = int16(status) if t.AllowFailure { @@ -428,7 +439,6 @@ func WithEnv(env variables.Container) Opts { } } - // WithVariables adds provided variables to task runner func WithVariables(variables variables.Container) Opts { return func(runner *TaskRunner) { @@ -437,3 +447,10 @@ func WithVariables(variables variables.Container) Opts { // runner.compiler.variables = variables } } + +// WithKillTimeout sets the kill timeout for execution of commands +func WithKillTimeout(killTimeout time.Duration) Opts { + return func(runner *TaskRunner) { + runner.killTimeout = killTimeout + } +} diff --git a/taskctl/scheduler.go b/taskctl/scheduler.go index d2fead1..7ae2271 100644 --- a/taskctl/scheduler.go +++ b/taskctl/scheduler.go @@ -6,7 +6,7 @@ import ( "sync/atomic" "time" - "github.com/sirupsen/logrus" + "github.com/apex/log" "github.com/taskctl/taskctl/pkg/runner" "github.com/taskctl/taskctl/pkg/scheduler" "github.com/taskctl/taskctl/pkg/utils" @@ -61,7 +61,9 @@ func (s *Scheduler) Schedule(g *scheduler.ExecutionGraph) error { if stage.Condition != "" { meets, err := checkStageCondition(stage.Condition) if err != nil { - logrus.Error(err) + log. + WithField("component", "runner"). + Errorf("Failed to check stage condition: %v", err) stage.UpdateStatus(scheduler.StatusError) s.notifyStageChange(stage) s.Cancel() @@ -174,7 +176,8 @@ func checkStatus(p *scheduler.ExecutionGraph, stage *scheduler.Stage) (ready boo for _, dep := range p.To(stage.Name) { depStage, err := p.Node(dep) if err != nil { - logrus.Fatal(err) + // This should not happen unless the graph is inconsistent + panic(err) } switch depStage.ReadStatus() {