From c593dff795f36e1a1f9275340d6c26183f7d0fcf Mon Sep 17 00:00:00 2001 From: Christopher Hlubek Date: Mon, 4 Apr 2022 19:46:40 +0200 Subject: [PATCH 1/5] FEATURE: Graceful shutdown and process groups for child processes Implements shutdown via SIGINT (graceful) and SIGTERM (cancelling jobs). Also fix an issue with child processes from scripts by setting setpgid to start processes (which is needed to prevent signals being forwarded). Closes #14 --- README.md | 73 ++------------ app/app.go | 45 ++++++++- examples/pipelines.yml | 34 +------ go.mod | 4 +- prunner.go | 91 ++++++++++++++++- prunner_test.go | 107 +++++++++++++++++++- taskctl/executor.go | 204 +++++++++++++++++++++++++++++++++++++++ taskctl/executor_test.go | 56 +++++++++++ taskctl/runner.go | 43 ++++++--- taskctl/scheduler.go | 9 +- 10 files changed, 542 insertions(+), 124 deletions(-) create mode 100644 taskctl/executor.go create mode 100644 taskctl/executor_test.go diff --git a/README.md b/README.md index c89c408..89345e7 100644 --- a/README.md +++ b/README.md @@ -264,85 +264,28 @@ You can also combine the two options. Then, deletion occurs with whatever comes If a pipeline does not exist at all anymore (i.e. if you renamed `do_something` to `another_name` above), its persisted logs and task data is removed automatically on saving to disk. +### Handling of child processes -### Interruptible Jobs +Prunner starts child processes with `setpgid` to use a new process group for each task of a pipeline job. +This means that if a job is cancelled, all child processes are killed - even if they were run by a shell script. -When terminating a task, unfortunately, only the top-level executed script is terminated - and the script needs -to take care to kill its children appropriately and not hang. This is very likely because of the following invocation chain: +Note: If prunner is killed hard (e.g. SIGKILL) without SIGINT / SIGTERM, the child processes of running jobs will not be terminated. -- prunner -- taskctl -- mvdan/sh [DefaultExecHandler](https://github.com/mvdan/sh/blob/master/interp/handler.go#L100-L104) +### Graceful shutdown --> This kills only the process itself, **but not its children**. Unfortunately for us, Bash [does not forward signals like -SIGTERM to processes it is currently waiting on](https://unix.stackexchange.com/questions/146756/forward-sigterm-to-child-in-bash). - -Currently, killing children recursively is not implemented, but can be done manually in a bash script [as explained here](https://newbedev.com/forward-sigterm-to-child-in-bash): - -Instead of doing: - -``` -#!/usr/bin/env bash - -/usr/local/bin/my_long_running_process -``` - -You can do the following to pass along signals: - -```bash -#!/usr/bin/env bash -# taken from https://newbedev.com/forward-sigterm-to-child-in-bash - -prep_term() -{ - unset term_child_pid - unset term_kill_needed - trap 'handle_term' TERM INT -} - -handle_term() -{ - if [ "${term_child_pid}" ]; then - kill -TERM "${term_child_pid}" 2>/dev/null - else - term_kill_needed="yes" - fi -} - -wait_term() -{ - term_child_pid=$! - if [ "${term_kill_needed}" ]; then - kill -TERM "${term_child_pid}" 2>/dev/null - fi - wait ${term_child_pid} 2>/dev/null - trap - TERM INT - wait ${term_child_pid} 2>/dev/null -} - - -prep_term -/usr/local/bin/my_long_running_process & -wait_term -``` - -**This is a workaround; and at some point in the future it would be nice to solve it as [explained here](https://medium.com/@felixge/killing-a-child-process-and-all-of-its-children-in-go-54079af94773)**. +Prunner will handle a SIGINT signal and perform a graceful shutdown and wait for all running jobs to be completed. +Sending a SIGTERM signal to prunner will cancel all running jobs (and interrupt / kill child processes). ### Persistent Job State - - - - ## Development ### Requirements -* Go (>= 1.16) +* Go (>= 1.18) ### Running locally - ```bash go run cmd/prunner/main.go ``` diff --git a/app/app.go b/app/app.go index ac724ee..a971c89 100644 --- a/app/app.go +++ b/app/app.go @@ -5,8 +5,10 @@ import ( "io" "net/http" "os" + "os/signal" "path" "path/filepath" + "syscall" "github.com/apex/log" logfmt_handler "github.com/apex/log/handlers/logfmt" @@ -174,7 +176,7 @@ func appAction(c *cli.Context) error { return errors.Wrap(err, "building output store") } - store, err := store.NewJSONDataStore(path.Join(c.String("data"))) + dataStore, err := store.NewJSONDataStore(path.Join(c.String("data"))) if err != nil { return errors.Wrap(err, "building pipeline runner store") } @@ -189,7 +191,7 @@ func appAction(c *cli.Context) error { taskRunner.Stderr = io.Discard return taskRunner - }, store, outputStore) + }, dataStore, outputStore) if err != nil { return err } @@ -202,10 +204,43 @@ func appAction(c *cli.Context) error { ) // Set up a simple REST API for listing jobs and scheduling pipelines - + address := c.String("address") log. - Infof("HTTP API Listening on %s", c.String("address")) - return http.ListenAndServe(c.String("address"), srv) + Infof("HTTP API listening on %s", address) + + // Start http server and handle graceful shutdown + httpSrv := http.Server{ + Addr: address, + Handler: srv, + } + go func() { + if err := httpSrv.ListenAndServe(); err != nil && err != http.ErrServerClosed { + log.Fatalf("Error starting HTTP API: %s", err) + } + }() + + // How signals are handled: + // - SIGINT: Shutdown gracefully and wait for jobs to be finished completely + // - SIGTERM: Cancel running jobs + + ctx, cancel := signal.NotifyContext(c.Context, syscall.SIGINT, syscall.SIGTERM) + defer cancel() + // Cancel the shutdown context on SIGTERM to prevent waiting for jobs and client connections + shutdownCtx, shutdownCancel := signal.NotifyContext(c.Context, syscall.SIGTERM) + defer shutdownCancel() + + // Wait for SIGINT or SIGTERM + <-ctx.Done() + + log.Info("Received signal, waiting until jobs are finished...") + _ = pRunner.Shutdown(shutdownCtx) + + log.Debugf("Shutting down HTTP API...") + _ = httpSrv.Shutdown(shutdownCtx) + + log.Info("Shutdown complete") + + return nil } func createLogFormatter(c *cli.Context) middleware.LogFormatter { diff --git a/examples/pipelines.yml b/examples/pipelines.yml index 8f1391d..7dfb0a6 100644 --- a/examples/pipelines.yml +++ b/examples/pipelines.yml @@ -3,7 +3,7 @@ pipelines: tasks: lint: script: - - staticcheck $(go list ./... | grep -v /vendor/) + # - staticcheck $(go list ./... | grep -v /vendor/) - go vet $(go list ./... | grep -v /vendor/) test: script: @@ -11,37 +11,9 @@ pipelines: do_something_long: script: - echo Tick - - sleep 1 - - echo Tock - - sleep 1 - - echo Tick - - sleep 1 - - echo Tock - - sleep 1 - - echo Tick - - sleep 1 - - echo Tock - - sleep 1 - - echo Tick - - sleep 1 - - echo Tock - - sleep 1 - - echo Tick - - sleep 1 - - echo Tock - - sleep 1 - - echo Tick - - sleep 1 + - sleep 20 - echo Tock - - sleep 1 - - echo Tick - - sleep 1 - - echo Tock - - sleep 1 - - echo Tick - - sleep 1 - - echo Tock - - sleep 1 + - sleep 20 - echo Dong build: script: diff --git a/go.mod b/go.mod index dc6d61a..b8caca5 100644 --- a/go.mod +++ b/go.mod @@ -13,11 +13,11 @@ require ( github.com/liamylian/jsontime/v2 v2.0.0 github.com/mattn/go-isatty v0.0.14 github.com/mattn/go-zglob v0.0.3 - github.com/sirupsen/logrus v1.8.1 github.com/stretchr/testify v1.7.1 github.com/taskctl/taskctl v1.3.1-0.20210426182424-d8747985c906 github.com/urfave/cli/v2 v2.4.0 gopkg.in/yaml.v2 v2.4.0 + mvdan.cc/sh/v3 v3.4.3 ) require ( @@ -42,6 +42,7 @@ require ( github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect + github.com/sirupsen/logrus v1.8.1 // indirect golang.org/x/crypto v0.0.0-20220331220935-ae2d96664a29 // indirect golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect golang.org/x/sys v0.0.0-20220403205710-6acee93ad0eb // indirect @@ -49,5 +50,4 @@ require ( golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f // indirect gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect - mvdan.cc/sh/v3 v3.4.3 // indirect ) diff --git a/prunner.go b/prunner.go index 7d1ff8b..6b42f54 100644 --- a/prunner.go +++ b/prunner.go @@ -47,6 +47,11 @@ type PipelineRunner struct { // Wait group for waiting for asynchronous operations like job.Cancel wg sync.WaitGroup + // Flag if the runner is shutting down + isShuttingDown bool + + // Poll interval for completed jobs for graceful shutdown + ShutdownPollInterval time.Duration } // NewPipelineRunner creates the central data structure which controls the full runner state; so this knows what is currently running @@ -61,9 +66,10 @@ func NewPipelineRunner(ctx context.Context, defs *definition.PipelinesDef, creat waitListByPipeline: make(map[string][]*PipelineJob), store: store, outputStore: outputStore, - // Use channel buffered with one extra slot so we can keep save requests while a save is running without blocking - persistRequests: make(chan struct{}, 1), - createTaskRunner: createTaskRunner, + // Use channel buffered with one extra slot, so we can keep save requests while a save is running without blocking + persistRequests: make(chan struct{}, 1), + createTaskRunner: createTaskRunner, + ShutdownPollInterval: 3 * time.Second, } if store != nil { @@ -76,6 +82,7 @@ func NewPipelineRunner(ctx context.Context, defs *definition.PipelinesDef, creat for { select { case <-ctx.Done(): + log.Debugf("PipelineRunner: context done, stopping persist loop") return case <-pRunner.persistRequests: pRunner.SaveToStore() @@ -175,11 +182,16 @@ var errQueueFull = errors.New("concurrency exceeded and queue limit reached for var ErrJobNotFound = errors.New("job not found") var errJobAlreadyCompleted = errors.New("job is already completed") var errJobNotStarted = errors.New("job is not started") +var ErrShuttingDown = errors.New("runner is shutting down") func (r *PipelineRunner) ScheduleAsync(pipeline string, opts ScheduleOpts) (*PipelineJob, error) { r.mx.Lock() defer r.mx.Unlock() + if r.isShuttingDown { + return nil, ErrShuttingDown + } + pipelineDef, ok := r.defs.Pipelines[pipeline] if !ok { return nil, errors.Errorf("pipeline %q is not defined", pipeline) @@ -367,7 +379,9 @@ func (r *PipelineRunner) startJob(job *PipelineJob) { job.Start = &now // Run graph asynchronously + r.wg.Add(1) go func() { + defer r.wg.Done() lastErr := job.sched.Schedule(graph) r.JobCompleted(job.ID, lastErr) }() @@ -685,6 +699,9 @@ func (r *PipelineRunner) initialLoadFromStore() error { } func (r *PipelineRunner) SaveToStore() { + r.wg.Add(1) + defer r.wg.Done() + log. WithField("component", "runner"). Debugf("Saving job state to data store") @@ -774,6 +791,74 @@ func (r *PipelineRunner) SaveToStore() { } } +func (r *PipelineRunner) Shutdown(ctx context.Context) error { + defer func() { + log. + WithField("component", "runner"). + Debugf("Shutting down, waiting for pending operations...") + r.wg.Wait() + }() + + r.mx.Lock() + r.isShuttingDown = true + // Cancel all jobs on wait list + for pipelineName, jobs := range r.waitListByPipeline { + for _, job := range jobs { + job.Canceled = true + log. + WithField("component", "runner"). + WithField("jobID", job.ID). + WithField("pipeline", pipelineName). + Warnf("Shutting down, marking job on wait list as canceled") + } + delete(r.waitListByPipeline, pipelineName) + } + r.mx.Unlock() + + for { + // Poll pipelines to check for running jobs + r.mx.RLock() + hasRunningPipelines := false + for pipelineName := range r.jobsByPipeline { + if r.isRunning(pipelineName) { + hasRunningPipelines = true + log. + WithField("component", "runner"). + WithField("pipeline", pipelineName). + Debugf("Shutting down, waiting for pipeline to finish...") + } + } + r.mx.RUnlock() + + if !hasRunningPipelines { + log. + WithField("component", "runner"). + Debugf("Shutting down, all pipelines finished") + break + } + + // Wait a few seconds before checking pipeline status again, or cancel jobs if the context is done + select { + case <-time.After(r.ShutdownPollInterval): + case <-ctx.Done(): + log. + WithField("component", "runner"). + Warnf("Forced shutdown, cancelling all jobs") + + r.mx.Lock() + + for jobID := range r.jobsByID { + _ = r.cancelJobInternal(jobID) + } + r.mx.Unlock() + + return ctx.Err() + } + } + + return nil +} + // taken from https://stackoverflow.com/a/37335777 func removeJobFromList(jobs []*PipelineJob, jobToRemove *PipelineJob) []*PipelineJob { for index, job := range jobs { diff --git a/prunner_test.go b/prunner_test.go index 56fd9f2..9f422a4 100644 --- a/prunner_test.go +++ b/prunner_test.go @@ -175,7 +175,7 @@ func TestPipelineRunner_ScheduleAsync_WithEnvVars(t *testing.T) { Concurrency: 1, QueueLimit: nil, Env: map[string]string{ - "MY_VAR": "from pipeline", + "MY_VAR": "from pipeline", "OTHER_VAR": "from pipeline", }, Tasks: map[string]definition.TaskDef{ @@ -746,7 +746,7 @@ func TestPipelineRunner_ScheduleAsync_WithStartDelayNoQueueAndReplaceWillNotRunC pRunner, err := NewPipelineRunner(ctx, defs, func(j *PipelineJob) taskctl.Runner { return &test.MockRunner{ OnRun: func(tsk *task.Task) error { - time.Sleep(100*time.Millisecond) + time.Sleep(100 * time.Millisecond) return nil }, } @@ -787,6 +787,109 @@ func TestPipelineRunner_ScheduleAsync_WithStartDelayNoQueueAndReplaceWillNotRunC } } +func TestPipelineRunner_Shutdown_WithRunningJob_Graceful(t *testing.T) { + var defs = &definition.PipelinesDef{ + Pipelines: map[string]definition.PipelineDef{ + "long_running": { + // Concurrency of 1 is the default for a single concurrent execution + Concurrency: 1, + QueueLimit: nil, + Tasks: map[string]definition.TaskDef{ + "sleep": { + Script: []string{"sleep 10"}, + }, + }, + SourcePath: "fixtures", + }, + }, + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var ( + jobFinished bool + jobCanceled bool + ) + + pRunner, err := NewPipelineRunner(ctx, defs, func(j *PipelineJob) taskctl.Runner { + return &test.MockRunner{ + OnRun: func(tsk *task.Task) error { + time.Sleep(100 * time.Millisecond) + jobFinished = true + return nil + }, + OnCancel: func() { + jobCanceled = true + }, + } + }, nil, test.NewMockOutputStore()) + pRunner.ShutdownPollInterval = 100 * time.Millisecond + require.NoError(t, err) + + job, err := pRunner.ScheduleAsync("long_running", ScheduleOpts{}) + require.NoError(t, err) + + jobID := job.ID + + waitForStartedJobTask(t, pRunner, jobID, "sleep") + + shutdownCtx := context.Background() + + err = pRunner.Shutdown(shutdownCtx) + require.NoError(t, err) + + assert.False(t, job.Canceled, "job was not marked as canceled") + assert.True(t, jobFinished, "job was finished by runner") + assert.False(t, jobCanceled, "job was not canceled by runner") + assert.True(t, job.Completed, "job was marked as completed") +} + +func TestPipelineRunner_Shutdown_WithRunningJob_Forced(t *testing.T) { + var defs = &definition.PipelinesDef{ + Pipelines: map[string]definition.PipelineDef{ + "long_running": { + // Concurrency of 1 is the default for a single concurrent execution + Concurrency: 1, + QueueLimit: nil, + Tasks: map[string]definition.TaskDef{ + "sleep": { + Script: []string{"sleep 1"}, + }, + }, + SourcePath: "fixtures", + }, + }, + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + pRunner, err := NewPipelineRunner(ctx, defs, func(j *PipelineJob) taskctl.Runner { + // Use a real runner here to test the actual processing of a task.Task + taskRunner, _ := taskctl.NewTaskRunner(test.NewMockOutputStore()) + return taskRunner + }, nil, test.NewMockOutputStore()) + pRunner.ShutdownPollInterval = 100 * time.Millisecond + require.NoError(t, err) + + job, err := pRunner.ScheduleAsync("long_running", ScheduleOpts{}) + require.NoError(t, err) + + jobID := job.ID + + waitForStartedJobTask(t, pRunner, jobID, "sleep") + + // Force cancel of job after 100ms + shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer shutdownCancel() + + err = pRunner.Shutdown(shutdownCtx) + require.ErrorIs(t, err, context.DeadlineExceeded) + + assert.True(t, job.Canceled, "job was marked as canceled") +} + func intPtr(i int) *int { return &i } diff --git a/taskctl/executor.go b/taskctl/executor.go new file mode 100644 index 0000000..1b99452 --- /dev/null +++ b/taskctl/executor.go @@ -0,0 +1,204 @@ +package taskctl + +import ( + "bytes" + "context" + "fmt" + "io" + "os" + "os/exec" + "strings" + "syscall" + "time" + + "github.com/apex/log" + "github.com/taskctl/taskctl/pkg/executor" + "mvdan.cc/sh/v3/expand" + + "mvdan.cc/sh/v3/interp" + "mvdan.cc/sh/v3/syntax" + + "github.com/taskctl/taskctl/pkg/utils" +) + +// PgidExecutor is an executor that starts commands in a new process group +// +// It is based on executor.DefaultExecutor but uses an exec handler that sets setpgid to +// start child processes in a new process group (with negative pid). It is used to prevent +// signals from propagating to children and to kill all descendant processes e.g. when forked by a shell process. +type PgidExecutor struct { + dir string + env []string + interp *interp.Runner + buf bytes.Buffer +} + +// NewPgidExecutor creates new pgid executor +func NewPgidExecutor(stdin io.Reader, stdout, stderr io.Writer, killTimeout time.Duration) (*PgidExecutor, error) { + var err error + e := &PgidExecutor{ + env: os.Environ(), + } + + e.dir, err = os.Getwd() + if err != nil { + return nil, err + } + + if stdout == nil { + stdout = io.Discard + } + + if stderr == nil { + stderr = io.Discard + } + + e.interp, err = interp.New( + interp.StdIO(stdin, io.MultiWriter(&e.buf, stdout), io.MultiWriter(&e.buf, stderr)), + interp.ExecHandler(func(ctx context.Context, args []string) error { + hc := interp.HandlerCtx(ctx) + path, err := interp.LookPathDir(hc.Dir, hc.Env, args[0]) + if err != nil { + _, _ = fmt.Fprintln(hc.Stderr, err) + return interp.NewExitStatus(127) + } + cmd := exec.Cmd{ + Path: path, + Args: args, + Env: execEnv(hc.Env), + Dir: hc.Dir, + Stdin: hc.Stdin, + Stdout: hc.Stdout, + Stderr: hc.Stderr, + SysProcAttr: &syscall.SysProcAttr{ + // Added for prunner: Using a process group for the new child process fixes 2 things: + // 1. We can handle signals like SIGINT in the prunner main process and gracefully shutdown running tasks + // 2. Bash scripts will not forward signals by default, so using process groups can be used to send signals to all children in the group + // (https://unix.stackexchange.com/questions/146756/forward-sigterm-to-child-in-bash) + Setpgid: true, + }, + } + + err = cmd.Start() + if err == nil { + if done := ctx.Done(); done != nil { + go func() { + <-done + + if killTimeout <= 0 { + _ = syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL) + return + } + + // TODO: don't temporarily leak this goroutine if the program stops itself with the interrupt. (from github.com/mvdan/sh) + go func() { + time.Sleep(killTimeout) + _ = syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL) + }() + _ = syscall.Kill(-cmd.Process.Pid, syscall.SIGINT) + }() + } + + err = cmd.Wait() + } + + switch x := err.(type) { + case *exec.ExitError: + // started, but errored - default to 1 if OS doesn't have exit statuses + if status, ok := x.Sys().(syscall.WaitStatus); ok { + if status.Signaled() { + if ctx.Err() != nil { + return ctx.Err() + } + return interp.NewExitStatus(uint8(128 + status.Signal())) + } + return interp.NewExitStatus(uint8(status.ExitStatus())) + } + return interp.NewExitStatus(1) + case *exec.Error: + // did not start + _, _ = fmt.Fprintf(hc.Stderr, "%v\n", err) + return interp.NewExitStatus(127) + default: + return err + } + }), + ) + if err != nil { + return nil, err + } + + return e, nil +} + +// Execute executes given job with provided context +// Returns job output +func (e *PgidExecutor) Execute(ctx context.Context, job *executor.Job) ([]byte, error) { + command, err := utils.RenderString(job.Command, job.Vars.Map()) + if err != nil { + return nil, err + } + + cmd, err := syntax.NewParser(syntax.KeepComments(true)).Parse(strings.NewReader(command), "") + if err != nil { + return nil, err + } + + env := e.env + env = append(env, utils.ConvertEnv(utils.ConvertToMapOfStrings(job.Env.Map()))...) + + if job.Dir == "" { + job.Dir = e.dir + } + + jobID := job.Vars.Get(JobIDVariableName).(string) + + log. + WithField("component", "executor"). + WithField("jobID", jobID). + Debugf("Executing \"%s\"", command) + + e.interp.Dir = job.Dir + e.interp.Env = expand.ListEnviron(env...) + + var cancelFn context.CancelFunc + if job.Timeout != nil { + ctx, cancelFn = context.WithTimeout(ctx, *job.Timeout) + } + defer func() { + if cancelFn != nil { + cancelFn() + } + }() + + offset := e.buf.Len() + err = e.interp.Run(ctx, cmd) + if err != nil { + return e.buf.Bytes()[offset:], err + } + + return e.buf.Bytes()[offset:], nil +} + +func execEnv(env expand.Environ) []string { + list := make([]string, 0, 64) + env.Each(func(name string, vr expand.Variable) bool { + if !vr.IsSet() { + // If a variable is set globally but unset in the + // runner, we need to ensure it's not part of the final + // list. Seems like zeroing the element is enough. + // This is a linear search, but this scenario should be + // rare, and the number of variables shouldn't be large. + for i, kv := range list { + if strings.HasPrefix(kv, name+"=") { + list[i] = "" + } + } + } + if vr.Exported && vr.Kind == expand.String { + list = append(list, name+"="+vr.String()) + } + return true + }) + return list +} diff --git a/taskctl/executor_test.go b/taskctl/executor_test.go new file mode 100644 index 0000000..8ca0f70 --- /dev/null +++ b/taskctl/executor_test.go @@ -0,0 +1,56 @@ +package taskctl_test + +import ( + "bytes" + "context" + "io/ioutil" + "testing" + "time" + + "github.com/taskctl/taskctl/pkg/executor" + + "github.com/Flowpack/prunner/taskctl" +) + +func TestDefaultExecutor_Execute(t *testing.T) { + e, err := taskctl.NewPgidExecutor(nil, ioutil.Discard, ioutil.Discard, 2*time.Second) + if err != nil { + t.Fatal(err) + } + + job1 := executor.NewJobFromCommand("echo 'success'") + to := 1 * time.Minute + job1.Timeout = &to + + output, err := e.Execute(context.Background(), job1) + if err != nil { + t.Fatal(err) + } + + if !bytes.Contains(output, []byte("success")) { + t.Error() + } + + job1 = executor.NewJobFromCommand("exit 1") + + _, err = e.Execute(context.Background(), job1) + if err == nil { + t.Error() + } + + if _, ok := executor.IsExitStatus(err); !ok { + t.Error() + } + + job2 := executor.NewJobFromCommand("echo {{ .Fail }}") + _, err = e.Execute(context.Background(), job2) + if err == nil { + t.Error() + } + + job3 := executor.NewJobFromCommand("printf '%s\\nLine-2\\n' '=========== Line 1 ==================' ") + _, err = e.Execute(context.Background(), job3) + if err != nil { + t.Error() + } +} diff --git a/taskctl/runner.go b/taskctl/runner.go index 58928c2..b0f9990 100644 --- a/taskctl/runner.go +++ b/taskctl/runner.go @@ -9,7 +9,7 @@ import ( "sync" "time" - "github.com/sirupsen/logrus" + "github.com/apex/log" "github.com/taskctl/taskctl/pkg/executor" "github.com/taskctl/taskctl/pkg/output" "github.com/taskctl/taskctl/pkg/runner" @@ -52,6 +52,8 @@ type TaskRunner struct { outputStore OutputStore onTaskChange func(t *task.Task) + + killTimeout time.Duration } var _ Runner = &TaskRunner{} @@ -68,6 +70,8 @@ func NewTaskRunner(outputStore OutputStore, opts ...Opts) (*TaskRunner, error) { env: variables.NewVariables(), outputStore: outputStore, + + killTimeout: 2 * time.Second, } r.ctx, r.cancelFunc = context.WithCancel(context.Background()) @@ -125,7 +129,7 @@ func (r *TaskRunner) Run(t *task.Task) error { defer func() { err = execContext.After() if err != nil { - logrus.Error(err) + log.Errorf("Error executing after tasks: %v", err) } if !t.Errored && !t.Skipped { @@ -139,13 +143,18 @@ func (r *TaskRunner) Run(t *task.Task) error { env = env.With("TASK_NAME", t.Name) env = env.Merge(t.Env) + jobID := t.Variables.Get(JobIDVariableName).(string) + meets, err := r.checkTaskCondition(t) if err != nil { return err } if !meets { - logrus.Infof("task %s was skipped", t.Name) + log. + WithField("component", "runner"). + WithField("jobID", jobID). + Infof("Task %s was skipped", t.Name) t.Skipped = true return nil } @@ -155,8 +164,6 @@ func (r *TaskRunner) Run(t *task.Task) error { return err } - jobID := t.Variables.Get(JobIDVariableName).(string) - var ( stdoutWriter = []io.Writer{&t.Log.Stdout} stderrWriter = []io.Writer{&t.Log.Stderr} @@ -214,7 +221,9 @@ func (r *TaskRunner) Cancel() { r.cancelMutex.Lock() if !r.canceling { r.canceling = true - defer logrus.Debug("runner has been cancelled") + defer log. + WithField("component", "runner"). + Debug("Task runner has been cancelled") r.cancelFunc() } r.cancelMutex.Unlock() @@ -255,7 +264,7 @@ func (r *TaskRunner) before(ctx context.Context, t *task.Task, env, vars variabl return fmt.Errorf("\"before\" command compilation failed: %w", err) } - exec, err := executor.NewDefaultExecutor(job.Stdin, job.Stdout, job.Stderr) + exec, err := NewPgidExecutor(job.Stdin, job.Stdout, job.Stderr, r.killTimeout) if err != nil { return err } @@ -285,14 +294,17 @@ func (r *TaskRunner) after(ctx context.Context, t *task.Task, env, vars variable return fmt.Errorf("\"after\" command compilation failed: %w", err) } - exec, err := executor.NewDefaultExecutor(job.Stdin, job.Stdout, job.Stderr) + exec, err := NewPgidExecutor(job.Stdin, job.Stdout, job.Stderr, r.killTimeout) if err != nil { return err } _, err = exec.Execute(ctx, job) if err != nil { - logrus.Warning(err) + log. + WithField("component", "runner"). + WithField("command", command). + Warnf("After command failed: %v", err) } } @@ -340,7 +352,7 @@ func (r *TaskRunner) checkTaskCondition(t *task.Task) (bool, error) { return false, err } - exec, err := executor.NewDefaultExecutor(job.Stdin, job.Stdout, job.Stderr) + exec, err := NewPgidExecutor(job.Stdin, job.Stdout, job.Stderr, r.killTimeout) if err != nil { return false, err } @@ -370,7 +382,7 @@ func (r *TaskRunner) storeTaskOutput(t *task.Task) { } func (r *TaskRunner) execute(ctx context.Context, t *task.Task, job *executor.Job) error { - exec, err := executor.NewDefaultExecutor(job.Stdin, job.Stdout, job.Stderr) + exec, err := NewPgidExecutor(job.Stdin, job.Stdout, job.Stderr, r.killTimeout) if err != nil { return err } @@ -385,7 +397,6 @@ func (r *TaskRunner) execute(ctx context.Context, t *task.Task, job *executor.Jo prevOutput, err = exec.Execute(ctx, nextJob) if err != nil { - t.End = time.Now() if status, ok := executor.IsExitStatus(err); ok { t.ExitCode = int16(status) if t.AllowFailure { @@ -428,7 +439,6 @@ func WithEnv(env variables.Container) Opts { } } - // WithVariables adds provided variables to task runner func WithVariables(variables variables.Container) Opts { return func(runner *TaskRunner) { @@ -437,3 +447,10 @@ func WithVariables(variables variables.Container) Opts { // runner.compiler.variables = variables } } + +// WithKillTimeout sets the kill timeout for execution of commands +func WithKillTimeout(killTimeout time.Duration) Opts { + return func(runner *TaskRunner) { + runner.killTimeout = killTimeout + } +} diff --git a/taskctl/scheduler.go b/taskctl/scheduler.go index d2fead1..7ae2271 100644 --- a/taskctl/scheduler.go +++ b/taskctl/scheduler.go @@ -6,7 +6,7 @@ import ( "sync/atomic" "time" - "github.com/sirupsen/logrus" + "github.com/apex/log" "github.com/taskctl/taskctl/pkg/runner" "github.com/taskctl/taskctl/pkg/scheduler" "github.com/taskctl/taskctl/pkg/utils" @@ -61,7 +61,9 @@ func (s *Scheduler) Schedule(g *scheduler.ExecutionGraph) error { if stage.Condition != "" { meets, err := checkStageCondition(stage.Condition) if err != nil { - logrus.Error(err) + log. + WithField("component", "runner"). + Errorf("Failed to check stage condition: %v", err) stage.UpdateStatus(scheduler.StatusError) s.notifyStageChange(stage) s.Cancel() @@ -174,7 +176,8 @@ func checkStatus(p *scheduler.ExecutionGraph, stage *scheduler.Stage) (ready boo for _, dep := range p.To(stage.Name) { depStage, err := p.Node(dep) if err != nil { - logrus.Fatal(err) + // This should not happen unless the graph is inconsistent + panic(err) } switch depStage.ReadStatus() { From 2e0d132a6ab132cec571aa2ece918d2b6a6a4457 Mon Sep 17 00:00:00 2001 From: Christopher Hlubek Date: Mon, 4 Apr 2022 20:00:58 +0200 Subject: [PATCH 2/5] Do a final save on shutdown after waiting for everything --- examples/pipelines.yml | 24 ++++++++++++++++++++++-- prunner.go | 7 +++++-- prunner_test.go | 8 ++++++-- 3 files changed, 33 insertions(+), 6 deletions(-) diff --git a/examples/pipelines.yml b/examples/pipelines.yml index 7dfb0a6..cb70203 100644 --- a/examples/pipelines.yml +++ b/examples/pipelines.yml @@ -11,9 +11,29 @@ pipelines: do_something_long: script: - echo Tick - - sleep 20 + - sleep 1 + - echo Tock + - sleep 1 + - echo Tick + - sleep 1 + - echo Tock + - sleep 1 + - echo Tick + - sleep 1 + - echo Tock + - sleep 1 + - echo Tick + - sleep 1 + - echo Tock + - sleep 1 + - echo Tick + - sleep 1 - echo Tock - - sleep 20 + - sleep 1 + - echo Tick + - sleep 1 + - echo Tock + - sleep 1 - echo Dong build: script: diff --git a/prunner.go b/prunner.go index 6b42f54..d079079 100644 --- a/prunner.go +++ b/prunner.go @@ -680,8 +680,8 @@ func (r *PipelineRunner) initialLoadFromStore() error { Warnf("Found running job when restoring state, marked as canceled") } - // Cancel jobs which have been scheduled on wait list but never been started - if job.Start == nil { + // Cancel jobs which have been scheduled on wait list but never been started or canceled + if job.Start == nil && !job.Canceled { job.Canceled = true log. @@ -797,6 +797,9 @@ func (r *PipelineRunner) Shutdown(ctx context.Context) error { WithField("component", "runner"). Debugf("Shutting down, waiting for pending operations...") r.wg.Wait() + // Do a final save to include recently completed jobs + r.SaveToStore() + r.wg.Wait() }() r.mx.Lock() diff --git a/prunner_test.go b/prunner_test.go index 9f422a4..3295fba 100644 --- a/prunner_test.go +++ b/prunner_test.go @@ -812,6 +812,8 @@ func TestPipelineRunner_Shutdown_WithRunningJob_Graceful(t *testing.T) { jobCanceled bool ) + store := test.NewMockStore() + pRunner, err := NewPipelineRunner(ctx, defs, func(j *PipelineJob) taskctl.Runner { return &test.MockRunner{ OnRun: func(tsk *task.Task) error { @@ -823,7 +825,7 @@ func TestPipelineRunner_Shutdown_WithRunningJob_Graceful(t *testing.T) { jobCanceled = true }, } - }, nil, test.NewMockOutputStore()) + }, store, test.NewMockOutputStore()) pRunner.ShutdownPollInterval = 100 * time.Millisecond require.NoError(t, err) @@ -865,11 +867,13 @@ func TestPipelineRunner_Shutdown_WithRunningJob_Forced(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() + store := test.NewMockStore() + pRunner, err := NewPipelineRunner(ctx, defs, func(j *PipelineJob) taskctl.Runner { // Use a real runner here to test the actual processing of a task.Task taskRunner, _ := taskctl.NewTaskRunner(test.NewMockOutputStore()) return taskRunner - }, nil, test.NewMockOutputStore()) + }, store, test.NewMockOutputStore()) pRunner.ShutdownPollInterval = 100 * time.Millisecond require.NoError(t, err) From 484df8bd2e8406fb775d997b86e681f922758684 Mon Sep 17 00:00:00 2001 From: Christopher Hlubek Date: Tue, 5 Apr 2022 08:57:20 +0200 Subject: [PATCH 3/5] Fix small code issues --- prunner.go | 1 - server/server.go | 4 ++++ taskctl/executor.go | 2 +- taskctl/executor_test.go | 2 +- taskctl/runner.go | 3 --- 5 files changed, 6 insertions(+), 6 deletions(-) diff --git a/prunner.go b/prunner.go index d079079..621d5d6 100644 --- a/prunner.go +++ b/prunner.go @@ -82,7 +82,6 @@ func NewPipelineRunner(ctx context.Context, defs *definition.PipelinesDef, creat for { select { case <-ctx.Done(): - log.Debugf("PipelineRunner: context done, stopping persist loop") return case <-pRunner.persistRequests: pRunner.SaveToStore() diff --git a/server/server.go b/server/server.go index 02a7dfd..8593558 100644 --- a/server/server.go +++ b/server/server.go @@ -129,6 +129,10 @@ func (s *server) pipelinesSchedule(w http.ResponseWriter, r *http.Request) { pJob, err := s.pRunner.ScheduleAsync(in.Body.Pipeline, prunner.ScheduleOpts{Variables: in.Body.Variables, User: user}) if err != nil { // TODO Send JSON error and include expected errors (see resolveScheduleAction) + if errors.Is(err, prunner.ErrShuttingDown) { + s.sendError(w, http.StatusServiceUnavailable, "Server is shutting down") + return + } s.sendError(w, http.StatusBadRequest, fmt.Sprintf("Error scheduling pipeline: %v", err)) return diff --git a/taskctl/executor.go b/taskctl/executor.go index 1b99452..9a9aa17 100644 --- a/taskctl/executor.go +++ b/taskctl/executor.go @@ -156,7 +156,7 @@ func (e *PgidExecutor) Execute(ctx context.Context, job *executor.Job) ([]byte, log. WithField("component", "executor"). WithField("jobID", jobID). - Debugf("Executing \"%s\"", command) + Debugf("Executing %q", command) e.interp.Dir = job.Dir e.interp.Env = expand.ListEnviron(env...) diff --git a/taskctl/executor_test.go b/taskctl/executor_test.go index 8ca0f70..86d1a86 100644 --- a/taskctl/executor_test.go +++ b/taskctl/executor_test.go @@ -12,7 +12,7 @@ import ( "github.com/Flowpack/prunner/taskctl" ) -func TestDefaultExecutor_Execute(t *testing.T) { +func TestPgidExecutor_Execute(t *testing.T) { e, err := taskctl.NewPgidExecutor(nil, ioutil.Discard, ioutil.Discard, 2*time.Second) if err != nil { t.Fatal(err) diff --git a/taskctl/runner.go b/taskctl/runner.go index b0f9990..74f2878 100644 --- a/taskctl/runner.go +++ b/taskctl/runner.go @@ -221,9 +221,6 @@ func (r *TaskRunner) Cancel() { r.cancelMutex.Lock() if !r.canceling { r.canceling = true - defer log. - WithField("component", "runner"). - Debug("Task runner has been cancelled") r.cancelFunc() } r.cancelMutex.Unlock() From 0e3199787687be8c23598d1f03ce21f951e3e79c Mon Sep 17 00:00:00 2001 From: Christopher Hlubek Date: Tue, 5 Apr 2022 09:33:40 +0200 Subject: [PATCH 4/5] DOCS: Overhaul readme --- README.md | 42 ++++++++++++++++++------------------------ 1 file changed, 18 insertions(+), 24 deletions(-) diff --git a/README.md b/README.md index 89345e7..93fd3e7 100644 --- a/README.md +++ b/README.md @@ -31,9 +31,9 @@ A minimalistic React UI to start and view pipelines, job and task details. A Neos/Flow PHP package providing a backend module embedding prunner-ui and a PHP API for interacting with the prunner Rest API. -## User Guide +## User guide -### Main Concepts +### Main concepts prunner controls a set of *pipelines*, which are defined in *YAML* files (typically `pipelines.yml`). The pipelines consist of *tasks*, which are executed as part of the pipeline. Each task has a `script` @@ -57,7 +57,7 @@ pipelines: ``` -### Task Dependencies +### Task dependencies In case you need to ensure certain steps are executed in-order, you can use **task dependencies** to order tasks using the `depends_on` key: @@ -82,14 +82,14 @@ This is an intended limitation to keep complexity low; so we do not plan to supp or anything like this. In case you need to store information from one task to the next, it is recommended that you -do this outside prunner, and pass in a Job Argument with an identifier to every task +do this outside prunner, and pass in a job argument with an identifier to every task (explained in the next section). -### Job Variables +### Job variables When starting a job, (i.e. `do_something` in the example below), you can send additional -**variables** as JSON. The script is passed through the [text/template](https://pkg.go.dev/text/template) +**variables** as JSON. The script is passed through the Go [text/template](https://pkg.go.dev/text/template) templating language, where you can access the variables. This way, you can pass the variable contents to your scripts. @@ -136,9 +136,10 @@ pipelines: #### Dotenv files -Prunner will override the process environment from files `.env` and `.env.local` by default. The files are configurable via the `env-files` flag. +Prunner will override the process environment from files `.env` and `.env.local` by default. +The files are configurable via the `env-files` flag. -### Limiting Concurrency +### Limiting concurrency Certain pipelines, like deployment pipelines, usually should only run only once, and never be started concurrently. Prunner supports this via a configurable *concurrency*: @@ -156,7 +157,7 @@ tasks in the pipeline run concurrently.* Now, when the concurrency limit is reached and you schedule the pipeline again while it is running, **the job is queued** to be worked on later - it is added to the wait list by default. -### The Wait List +### The wait list By default, if you limit concurrency, and the limit is exceeded, further jobs are added to the waitlist of the pipeline. @@ -219,7 +220,7 @@ pipelines: ``` -### Disabling Fail-Fast Behavior +### Disabling fail-fast behavior By default, if a task in a pipeline fails, all other concurrently running tasks are directly aborted. Sometimes this is not desirable, e.g. if certain deployment tasks should continue running if already started. @@ -235,7 +236,7 @@ pipelines: ``` -### Configuring Retention Period +### Configuring retention period By default, we never delete any runs. For many projects, it is useful to configure this to keep the consumed disk space under control. This can be done on a per-pipeline level; using one of the two configuration @@ -276,7 +277,7 @@ Note: If prunner is killed hard (e.g. SIGKILL) without SIGINT / SIGTERM, the chi Prunner will handle a SIGINT signal and perform a graceful shutdown and wait for all running jobs to be completed. Sending a SIGTERM signal to prunner will cancel all running jobs (and interrupt / kill child processes). -### Persistent Job State +### Persistent job state ## Development @@ -287,7 +288,7 @@ Sending a SIGTERM signal to prunner will cancel all running jobs (and interrupt ### Running locally ```bash -go run cmd/prunner/main.go +go run ./cmd/prunner ``` > Note: for development a live reload wrapper like https://github.com/markbates/refresh is recommended. @@ -296,17 +297,17 @@ The API should now be accessible at http://localhost:9009/. The log will contain For interacting with the API, you need a JWT token which you can generate for developing using: ```bash -go run cmd/prunner/main.go debug +go run ./cmd/prunner debug ``` ### Building for different operating systems. Using the standard `GOOS` environment variable, you can build for different operating systems. This is helpful when you -want to use prunner inside a Docker container, but are developing on OSX. For this, a compile step like the following is useful: +want to use prunner inside a Docker container, but are developing on macOS. For this, a compile step like the following is useful: ```bash # after building, copy the executable inside the docker container; so it can be directly used. -GOOS=linux go build cmd/prunner/main.go && docker cp main cms_neos_1:/app/main +GOOS=linux go build ./cmd/prunner -o bin/prunner && docker cp bin/prunner my_container:/app/prunner ``` ### Running Tests @@ -332,7 +333,7 @@ golangci-lint run ### Generate OpenAPI (Swagger) spec -An OpenAPI 2.0 spec is generated from the Go types and annoations in source code using the `go-swagger` tool (it is not +An OpenAPI 2.0 spec is generated from the Go types and annotations in source code using the `go-swagger` tool (it is not bundled in this module). See https://goswagger.io/install.html for installation instructions. ```bash @@ -351,13 +352,6 @@ and then all platforms are built automatically. * An application that wants to embed prunner should read the shared secret and generate a JWT auth token for accessing the API by doing internal HTTP requests. This way custom policies can be implemented for ensuring access to prunner. -### UI - -* Show auth errors in UI - -* Idea: content ranges for polling streams - - ## License GPL, because we are building on [taskctl](https://github.com/taskctl/taskctl) - see [LICENSE](LICENSE). From e6e26db1120b175577b40c6639fab8234e5b2b4e Mon Sep 17 00:00:00 2001 From: Christopher Hlubek Date: Thu, 7 Apr 2022 09:59:14 +0200 Subject: [PATCH 5/5] Remove unnecessary Wait, stop persist loop on shutdown --- app/app.go | 28 ++++++++++++++-------------- prunner.go | 8 ++++++-- 2 files changed, 20 insertions(+), 16 deletions(-) diff --git a/app/app.go b/app/app.go index a971c89..3f6e256 100644 --- a/app/app.go +++ b/app/app.go @@ -181,8 +181,18 @@ func appAction(c *cli.Context) error { return errors.Wrap(err, "building pipeline runner store") } + // How signals are handled: + // - SIGINT: Shutdown gracefully and wait for jobs to be finished completely + // - SIGTERM: Cancel running jobs + + gracefulShutdownCtx, gracefulCancel := signal.NotifyContext(c.Context, syscall.SIGINT, syscall.SIGTERM) + defer gracefulCancel() + // Cancel the shutdown context on SIGTERM to prevent waiting for jobs and client connections + forcedShutdownCtx, forcedCancel := signal.NotifyContext(c.Context, syscall.SIGTERM) + defer forcedCancel() + // Set up pipeline runner - pRunner, err := prunner.NewPipelineRunner(c.Context, defs, func(j *prunner.PipelineJob) taskctl.Runner { + pRunner, err := prunner.NewPipelineRunner(gracefulShutdownCtx, defs, func(j *prunner.PipelineJob) taskctl.Runner { // taskctl.NewTaskRunner never actually returns an error taskRunner, _ := taskctl.NewTaskRunner(outputStore, taskctl.WithEnv(variables.FromMap(j.Env))) @@ -219,24 +229,14 @@ func appAction(c *cli.Context) error { } }() - // How signals are handled: - // - SIGINT: Shutdown gracefully and wait for jobs to be finished completely - // - SIGTERM: Cancel running jobs - - ctx, cancel := signal.NotifyContext(c.Context, syscall.SIGINT, syscall.SIGTERM) - defer cancel() - // Cancel the shutdown context on SIGTERM to prevent waiting for jobs and client connections - shutdownCtx, shutdownCancel := signal.NotifyContext(c.Context, syscall.SIGTERM) - defer shutdownCancel() - // Wait for SIGINT or SIGTERM - <-ctx.Done() + <-gracefulShutdownCtx.Done() log.Info("Received signal, waiting until jobs are finished...") - _ = pRunner.Shutdown(shutdownCtx) + _ = pRunner.Shutdown(forcedShutdownCtx) log.Debugf("Shutting down HTTP API...") - _ = httpSrv.Shutdown(shutdownCtx) + _ = httpSrv.Shutdown(forcedShutdownCtx) log.Info("Shutdown complete") diff --git a/prunner.go b/prunner.go index 621d5d6..92641d4 100644 --- a/prunner.go +++ b/prunner.go @@ -82,6 +82,9 @@ func NewPipelineRunner(ctx context.Context, defs *definition.PipelinesDef, creat for { select { case <-ctx.Done(): + log. + WithField("component", "runner"). + Debug("Stopping persist loop") return case <-pRunner.persistRequests: pRunner.SaveToStore() @@ -795,10 +798,11 @@ func (r *PipelineRunner) Shutdown(ctx context.Context) error { log. WithField("component", "runner"). Debugf("Shutting down, waiting for pending operations...") + // Wait for all running jobs to have called JobCompleted r.wg.Wait() - // Do a final save to include recently completed jobs + + // Do a final save to include the state of recently completed jobs r.SaveToStore() - r.wg.Wait() }() r.mx.Lock()