diff --git a/README.md b/README.md index c89c408..93fd3e7 100644 --- a/README.md +++ b/README.md @@ -31,9 +31,9 @@ A minimalistic React UI to start and view pipelines, job and task details. A Neos/Flow PHP package providing a backend module embedding prunner-ui and a PHP API for interacting with the prunner Rest API. -## User Guide +## User guide -### Main Concepts +### Main concepts prunner controls a set of *pipelines*, which are defined in *YAML* files (typically `pipelines.yml`). The pipelines consist of *tasks*, which are executed as part of the pipeline. Each task has a `script` @@ -57,7 +57,7 @@ pipelines: ``` -### Task Dependencies +### Task dependencies In case you need to ensure certain steps are executed in-order, you can use **task dependencies** to order tasks using the `depends_on` key: @@ -82,14 +82,14 @@ This is an intended limitation to keep complexity low; so we do not plan to supp or anything like this. In case you need to store information from one task to the next, it is recommended that you -do this outside prunner, and pass in a Job Argument with an identifier to every task +do this outside prunner, and pass in a job argument with an identifier to every task (explained in the next section). -### Job Variables +### Job variables When starting a job, (i.e. `do_something` in the example below), you can send additional -**variables** as JSON. The script is passed through the [text/template](https://pkg.go.dev/text/template) +**variables** as JSON. The script is passed through the Go [text/template](https://pkg.go.dev/text/template) templating language, where you can access the variables. This way, you can pass the variable contents to your scripts. @@ -136,9 +136,10 @@ pipelines: #### Dotenv files -Prunner will override the process environment from files `.env` and `.env.local` by default. The files are configurable via the `env-files` flag. +Prunner will override the process environment from files `.env` and `.env.local` by default. +The files are configurable via the `env-files` flag. -### Limiting Concurrency +### Limiting concurrency Certain pipelines, like deployment pipelines, usually should only run only once, and never be started concurrently. Prunner supports this via a configurable *concurrency*: @@ -156,7 +157,7 @@ tasks in the pipeline run concurrently.* Now, when the concurrency limit is reached and you schedule the pipeline again while it is running, **the job is queued** to be worked on later - it is added to the wait list by default. -### The Wait List +### The wait list By default, if you limit concurrency, and the limit is exceeded, further jobs are added to the waitlist of the pipeline. @@ -219,7 +220,7 @@ pipelines: ``` -### Disabling Fail-Fast Behavior +### Disabling fail-fast behavior By default, if a task in a pipeline fails, all other concurrently running tasks are directly aborted. Sometimes this is not desirable, e.g. if certain deployment tasks should continue running if already started. @@ -235,7 +236,7 @@ pipelines: ``` -### Configuring Retention Period +### Configuring retention period By default, we never delete any runs. For many projects, it is useful to configure this to keep the consumed disk space under control. This can be done on a per-pipeline level; using one of the two configuration @@ -264,87 +265,30 @@ You can also combine the two options. Then, deletion occurs with whatever comes If a pipeline does not exist at all anymore (i.e. if you renamed `do_something` to `another_name` above), its persisted logs and task data is removed automatically on saving to disk. +### Handling of child processes -### Interruptible Jobs - -When terminating a task, unfortunately, only the top-level executed script is terminated - and the script needs -to take care to kill its children appropriately and not hang. This is very likely because of the following invocation chain: - -- prunner -- taskctl -- mvdan/sh [DefaultExecHandler](https://github.com/mvdan/sh/blob/master/interp/handler.go#L100-L104) - --> This kills only the process itself, **but not its children**. Unfortunately for us, Bash [does not forward signals like -SIGTERM to processes it is currently waiting on](https://unix.stackexchange.com/questions/146756/forward-sigterm-to-child-in-bash). - -Currently, killing children recursively is not implemented, but can be done manually in a bash script [as explained here](https://newbedev.com/forward-sigterm-to-child-in-bash): - -Instead of doing: - -``` -#!/usr/bin/env bash - -/usr/local/bin/my_long_running_process -``` - -You can do the following to pass along signals: - -```bash -#!/usr/bin/env bash -# taken from https://newbedev.com/forward-sigterm-to-child-in-bash - -prep_term() -{ - unset term_child_pid - unset term_kill_needed - trap 'handle_term' TERM INT -} - -handle_term() -{ - if [ "${term_child_pid}" ]; then - kill -TERM "${term_child_pid}" 2>/dev/null - else - term_kill_needed="yes" - fi -} - -wait_term() -{ - term_child_pid=$! - if [ "${term_kill_needed}" ]; then - kill -TERM "${term_child_pid}" 2>/dev/null - fi - wait ${term_child_pid} 2>/dev/null - trap - TERM INT - wait ${term_child_pid} 2>/dev/null -} - - -prep_term -/usr/local/bin/my_long_running_process & -wait_term -``` - -**This is a workaround; and at some point in the future it would be nice to solve it as [explained here](https://medium.com/@felixge/killing-a-child-process-and-all-of-its-children-in-go-54079af94773)**. - -### Persistent Job State +Prunner starts child processes with `setpgid` to use a new process group for each task of a pipeline job. +This means that if a job is cancelled, all child processes are killed - even if they were run by a shell script. +Note: If prunner is killed hard (e.g. SIGKILL) without SIGINT / SIGTERM, the child processes of running jobs will not be terminated. +### Graceful shutdown +Prunner will handle a SIGINT signal and perform a graceful shutdown and wait for all running jobs to be completed. +Sending a SIGTERM signal to prunner will cancel all running jobs (and interrupt / kill child processes). +### Persistent job state ## Development ### Requirements -* Go (>= 1.16) +* Go (>= 1.18) ### Running locally - ```bash -go run cmd/prunner/main.go +go run ./cmd/prunner ``` > Note: for development a live reload wrapper like https://github.com/markbates/refresh is recommended. @@ -353,17 +297,17 @@ The API should now be accessible at http://localhost:9009/. The log will contain For interacting with the API, you need a JWT token which you can generate for developing using: ```bash -go run cmd/prunner/main.go debug +go run ./cmd/prunner debug ``` ### Building for different operating systems. Using the standard `GOOS` environment variable, you can build for different operating systems. This is helpful when you -want to use prunner inside a Docker container, but are developing on OSX. For this, a compile step like the following is useful: +want to use prunner inside a Docker container, but are developing on macOS. For this, a compile step like the following is useful: ```bash # after building, copy the executable inside the docker container; so it can be directly used. -GOOS=linux go build cmd/prunner/main.go && docker cp main cms_neos_1:/app/main +GOOS=linux go build ./cmd/prunner -o bin/prunner && docker cp bin/prunner my_container:/app/prunner ``` ### Running Tests @@ -389,7 +333,7 @@ golangci-lint run ### Generate OpenAPI (Swagger) spec -An OpenAPI 2.0 spec is generated from the Go types and annoations in source code using the `go-swagger` tool (it is not +An OpenAPI 2.0 spec is generated from the Go types and annotations in source code using the `go-swagger` tool (it is not bundled in this module). See https://goswagger.io/install.html for installation instructions. ```bash @@ -408,13 +352,6 @@ and then all platforms are built automatically. * An application that wants to embed prunner should read the shared secret and generate a JWT auth token for accessing the API by doing internal HTTP requests. This way custom policies can be implemented for ensuring access to prunner. -### UI - -* Show auth errors in UI - -* Idea: content ranges for polling streams - - ## License GPL, because we are building on [taskctl](https://github.com/taskctl/taskctl) - see [LICENSE](LICENSE). diff --git a/app/app.go b/app/app.go index ac724ee..3f6e256 100644 --- a/app/app.go +++ b/app/app.go @@ -5,8 +5,10 @@ import ( "io" "net/http" "os" + "os/signal" "path" "path/filepath" + "syscall" "github.com/apex/log" logfmt_handler "github.com/apex/log/handlers/logfmt" @@ -174,13 +176,23 @@ func appAction(c *cli.Context) error { return errors.Wrap(err, "building output store") } - store, err := store.NewJSONDataStore(path.Join(c.String("data"))) + dataStore, err := store.NewJSONDataStore(path.Join(c.String("data"))) if err != nil { return errors.Wrap(err, "building pipeline runner store") } + // How signals are handled: + // - SIGINT: Shutdown gracefully and wait for jobs to be finished completely + // - SIGTERM: Cancel running jobs + + gracefulShutdownCtx, gracefulCancel := signal.NotifyContext(c.Context, syscall.SIGINT, syscall.SIGTERM) + defer gracefulCancel() + // Cancel the shutdown context on SIGTERM to prevent waiting for jobs and client connections + forcedShutdownCtx, forcedCancel := signal.NotifyContext(c.Context, syscall.SIGTERM) + defer forcedCancel() + // Set up pipeline runner - pRunner, err := prunner.NewPipelineRunner(c.Context, defs, func(j *prunner.PipelineJob) taskctl.Runner { + pRunner, err := prunner.NewPipelineRunner(gracefulShutdownCtx, defs, func(j *prunner.PipelineJob) taskctl.Runner { // taskctl.NewTaskRunner never actually returns an error taskRunner, _ := taskctl.NewTaskRunner(outputStore, taskctl.WithEnv(variables.FromMap(j.Env))) @@ -189,7 +201,7 @@ func appAction(c *cli.Context) error { taskRunner.Stderr = io.Discard return taskRunner - }, store, outputStore) + }, dataStore, outputStore) if err != nil { return err } @@ -202,10 +214,33 @@ func appAction(c *cli.Context) error { ) // Set up a simple REST API for listing jobs and scheduling pipelines - + address := c.String("address") log. - Infof("HTTP API Listening on %s", c.String("address")) - return http.ListenAndServe(c.String("address"), srv) + Infof("HTTP API listening on %s", address) + + // Start http server and handle graceful shutdown + httpSrv := http.Server{ + Addr: address, + Handler: srv, + } + go func() { + if err := httpSrv.ListenAndServe(); err != nil && err != http.ErrServerClosed { + log.Fatalf("Error starting HTTP API: %s", err) + } + }() + + // Wait for SIGINT or SIGTERM + <-gracefulShutdownCtx.Done() + + log.Info("Received signal, waiting until jobs are finished...") + _ = pRunner.Shutdown(forcedShutdownCtx) + + log.Debugf("Shutting down HTTP API...") + _ = httpSrv.Shutdown(forcedShutdownCtx) + + log.Info("Shutdown complete") + + return nil } func createLogFormatter(c *cli.Context) middleware.LogFormatter { diff --git a/examples/pipelines.yml b/examples/pipelines.yml index 8f1391d..cb70203 100644 --- a/examples/pipelines.yml +++ b/examples/pipelines.yml @@ -3,7 +3,7 @@ pipelines: tasks: lint: script: - - staticcheck $(go list ./... | grep -v /vendor/) + # - staticcheck $(go list ./... | grep -v /vendor/) - go vet $(go list ./... | grep -v /vendor/) test: script: @@ -34,14 +34,6 @@ pipelines: - sleep 1 - echo Tock - sleep 1 - - echo Tick - - sleep 1 - - echo Tock - - sleep 1 - - echo Tick - - sleep 1 - - echo Tock - - sleep 1 - echo Dong build: script: diff --git a/go.mod b/go.mod index dc6d61a..b8caca5 100644 --- a/go.mod +++ b/go.mod @@ -13,11 +13,11 @@ require ( github.com/liamylian/jsontime/v2 v2.0.0 github.com/mattn/go-isatty v0.0.14 github.com/mattn/go-zglob v0.0.3 - github.com/sirupsen/logrus v1.8.1 github.com/stretchr/testify v1.7.1 github.com/taskctl/taskctl v1.3.1-0.20210426182424-d8747985c906 github.com/urfave/cli/v2 v2.4.0 gopkg.in/yaml.v2 v2.4.0 + mvdan.cc/sh/v3 v3.4.3 ) require ( @@ -42,6 +42,7 @@ require ( github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect + github.com/sirupsen/logrus v1.8.1 // indirect golang.org/x/crypto v0.0.0-20220331220935-ae2d96664a29 // indirect golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect golang.org/x/sys v0.0.0-20220403205710-6acee93ad0eb // indirect @@ -49,5 +50,4 @@ require ( golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f // indirect gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect - mvdan.cc/sh/v3 v3.4.3 // indirect ) diff --git a/prunner.go b/prunner.go index 7d1ff8b..92641d4 100644 --- a/prunner.go +++ b/prunner.go @@ -47,6 +47,11 @@ type PipelineRunner struct { // Wait group for waiting for asynchronous operations like job.Cancel wg sync.WaitGroup + // Flag if the runner is shutting down + isShuttingDown bool + + // Poll interval for completed jobs for graceful shutdown + ShutdownPollInterval time.Duration } // NewPipelineRunner creates the central data structure which controls the full runner state; so this knows what is currently running @@ -61,9 +66,10 @@ func NewPipelineRunner(ctx context.Context, defs *definition.PipelinesDef, creat waitListByPipeline: make(map[string][]*PipelineJob), store: store, outputStore: outputStore, - // Use channel buffered with one extra slot so we can keep save requests while a save is running without blocking - persistRequests: make(chan struct{}, 1), - createTaskRunner: createTaskRunner, + // Use channel buffered with one extra slot, so we can keep save requests while a save is running without blocking + persistRequests: make(chan struct{}, 1), + createTaskRunner: createTaskRunner, + ShutdownPollInterval: 3 * time.Second, } if store != nil { @@ -76,6 +82,9 @@ func NewPipelineRunner(ctx context.Context, defs *definition.PipelinesDef, creat for { select { case <-ctx.Done(): + log. + WithField("component", "runner"). + Debug("Stopping persist loop") return case <-pRunner.persistRequests: pRunner.SaveToStore() @@ -175,11 +184,16 @@ var errQueueFull = errors.New("concurrency exceeded and queue limit reached for var ErrJobNotFound = errors.New("job not found") var errJobAlreadyCompleted = errors.New("job is already completed") var errJobNotStarted = errors.New("job is not started") +var ErrShuttingDown = errors.New("runner is shutting down") func (r *PipelineRunner) ScheduleAsync(pipeline string, opts ScheduleOpts) (*PipelineJob, error) { r.mx.Lock() defer r.mx.Unlock() + if r.isShuttingDown { + return nil, ErrShuttingDown + } + pipelineDef, ok := r.defs.Pipelines[pipeline] if !ok { return nil, errors.Errorf("pipeline %q is not defined", pipeline) @@ -367,7 +381,9 @@ func (r *PipelineRunner) startJob(job *PipelineJob) { job.Start = &now // Run graph asynchronously + r.wg.Add(1) go func() { + defer r.wg.Done() lastErr := job.sched.Schedule(graph) r.JobCompleted(job.ID, lastErr) }() @@ -666,8 +682,8 @@ func (r *PipelineRunner) initialLoadFromStore() error { Warnf("Found running job when restoring state, marked as canceled") } - // Cancel jobs which have been scheduled on wait list but never been started - if job.Start == nil { + // Cancel jobs which have been scheduled on wait list but never been started or canceled + if job.Start == nil && !job.Canceled { job.Canceled = true log. @@ -685,6 +701,9 @@ func (r *PipelineRunner) initialLoadFromStore() error { } func (r *PipelineRunner) SaveToStore() { + r.wg.Add(1) + defer r.wg.Done() + log. WithField("component", "runner"). Debugf("Saving job state to data store") @@ -774,6 +793,78 @@ func (r *PipelineRunner) SaveToStore() { } } +func (r *PipelineRunner) Shutdown(ctx context.Context) error { + defer func() { + log. + WithField("component", "runner"). + Debugf("Shutting down, waiting for pending operations...") + // Wait for all running jobs to have called JobCompleted + r.wg.Wait() + + // Do a final save to include the state of recently completed jobs + r.SaveToStore() + }() + + r.mx.Lock() + r.isShuttingDown = true + // Cancel all jobs on wait list + for pipelineName, jobs := range r.waitListByPipeline { + for _, job := range jobs { + job.Canceled = true + log. + WithField("component", "runner"). + WithField("jobID", job.ID). + WithField("pipeline", pipelineName). + Warnf("Shutting down, marking job on wait list as canceled") + } + delete(r.waitListByPipeline, pipelineName) + } + r.mx.Unlock() + + for { + // Poll pipelines to check for running jobs + r.mx.RLock() + hasRunningPipelines := false + for pipelineName := range r.jobsByPipeline { + if r.isRunning(pipelineName) { + hasRunningPipelines = true + log. + WithField("component", "runner"). + WithField("pipeline", pipelineName). + Debugf("Shutting down, waiting for pipeline to finish...") + } + } + r.mx.RUnlock() + + if !hasRunningPipelines { + log. + WithField("component", "runner"). + Debugf("Shutting down, all pipelines finished") + break + } + + // Wait a few seconds before checking pipeline status again, or cancel jobs if the context is done + select { + case <-time.After(r.ShutdownPollInterval): + case <-ctx.Done(): + log. + WithField("component", "runner"). + Warnf("Forced shutdown, cancelling all jobs") + + r.mx.Lock() + + for jobID := range r.jobsByID { + _ = r.cancelJobInternal(jobID) + } + r.mx.Unlock() + + return ctx.Err() + } + } + + return nil +} + // taken from https://stackoverflow.com/a/37335777 func removeJobFromList(jobs []*PipelineJob, jobToRemove *PipelineJob) []*PipelineJob { for index, job := range jobs { diff --git a/prunner_test.go b/prunner_test.go index 56fd9f2..3295fba 100644 --- a/prunner_test.go +++ b/prunner_test.go @@ -175,7 +175,7 @@ func TestPipelineRunner_ScheduleAsync_WithEnvVars(t *testing.T) { Concurrency: 1, QueueLimit: nil, Env: map[string]string{ - "MY_VAR": "from pipeline", + "MY_VAR": "from pipeline", "OTHER_VAR": "from pipeline", }, Tasks: map[string]definition.TaskDef{ @@ -746,7 +746,7 @@ func TestPipelineRunner_ScheduleAsync_WithStartDelayNoQueueAndReplaceWillNotRunC pRunner, err := NewPipelineRunner(ctx, defs, func(j *PipelineJob) taskctl.Runner { return &test.MockRunner{ OnRun: func(tsk *task.Task) error { - time.Sleep(100*time.Millisecond) + time.Sleep(100 * time.Millisecond) return nil }, } @@ -787,6 +787,113 @@ func TestPipelineRunner_ScheduleAsync_WithStartDelayNoQueueAndReplaceWillNotRunC } } +func TestPipelineRunner_Shutdown_WithRunningJob_Graceful(t *testing.T) { + var defs = &definition.PipelinesDef{ + Pipelines: map[string]definition.PipelineDef{ + "long_running": { + // Concurrency of 1 is the default for a single concurrent execution + Concurrency: 1, + QueueLimit: nil, + Tasks: map[string]definition.TaskDef{ + "sleep": { + Script: []string{"sleep 10"}, + }, + }, + SourcePath: "fixtures", + }, + }, + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var ( + jobFinished bool + jobCanceled bool + ) + + store := test.NewMockStore() + + pRunner, err := NewPipelineRunner(ctx, defs, func(j *PipelineJob) taskctl.Runner { + return &test.MockRunner{ + OnRun: func(tsk *task.Task) error { + time.Sleep(100 * time.Millisecond) + jobFinished = true + return nil + }, + OnCancel: func() { + jobCanceled = true + }, + } + }, store, test.NewMockOutputStore()) + pRunner.ShutdownPollInterval = 100 * time.Millisecond + require.NoError(t, err) + + job, err := pRunner.ScheduleAsync("long_running", ScheduleOpts{}) + require.NoError(t, err) + + jobID := job.ID + + waitForStartedJobTask(t, pRunner, jobID, "sleep") + + shutdownCtx := context.Background() + + err = pRunner.Shutdown(shutdownCtx) + require.NoError(t, err) + + assert.False(t, job.Canceled, "job was not marked as canceled") + assert.True(t, jobFinished, "job was finished by runner") + assert.False(t, jobCanceled, "job was not canceled by runner") + assert.True(t, job.Completed, "job was marked as completed") +} + +func TestPipelineRunner_Shutdown_WithRunningJob_Forced(t *testing.T) { + var defs = &definition.PipelinesDef{ + Pipelines: map[string]definition.PipelineDef{ + "long_running": { + // Concurrency of 1 is the default for a single concurrent execution + Concurrency: 1, + QueueLimit: nil, + Tasks: map[string]definition.TaskDef{ + "sleep": { + Script: []string{"sleep 1"}, + }, + }, + SourcePath: "fixtures", + }, + }, + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + store := test.NewMockStore() + + pRunner, err := NewPipelineRunner(ctx, defs, func(j *PipelineJob) taskctl.Runner { + // Use a real runner here to test the actual processing of a task.Task + taskRunner, _ := taskctl.NewTaskRunner(test.NewMockOutputStore()) + return taskRunner + }, store, test.NewMockOutputStore()) + pRunner.ShutdownPollInterval = 100 * time.Millisecond + require.NoError(t, err) + + job, err := pRunner.ScheduleAsync("long_running", ScheduleOpts{}) + require.NoError(t, err) + + jobID := job.ID + + waitForStartedJobTask(t, pRunner, jobID, "sleep") + + // Force cancel of job after 100ms + shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer shutdownCancel() + + err = pRunner.Shutdown(shutdownCtx) + require.ErrorIs(t, err, context.DeadlineExceeded) + + assert.True(t, job.Canceled, "job was marked as canceled") +} + func intPtr(i int) *int { return &i } diff --git a/server/server.go b/server/server.go index 02a7dfd..8593558 100644 --- a/server/server.go +++ b/server/server.go @@ -129,6 +129,10 @@ func (s *server) pipelinesSchedule(w http.ResponseWriter, r *http.Request) { pJob, err := s.pRunner.ScheduleAsync(in.Body.Pipeline, prunner.ScheduleOpts{Variables: in.Body.Variables, User: user}) if err != nil { // TODO Send JSON error and include expected errors (see resolveScheduleAction) + if errors.Is(err, prunner.ErrShuttingDown) { + s.sendError(w, http.StatusServiceUnavailable, "Server is shutting down") + return + } s.sendError(w, http.StatusBadRequest, fmt.Sprintf("Error scheduling pipeline: %v", err)) return diff --git a/taskctl/executor.go b/taskctl/executor.go new file mode 100644 index 0000000..9a9aa17 --- /dev/null +++ b/taskctl/executor.go @@ -0,0 +1,204 @@ +package taskctl + +import ( + "bytes" + "context" + "fmt" + "io" + "os" + "os/exec" + "strings" + "syscall" + "time" + + "github.com/apex/log" + "github.com/taskctl/taskctl/pkg/executor" + "mvdan.cc/sh/v3/expand" + + "mvdan.cc/sh/v3/interp" + "mvdan.cc/sh/v3/syntax" + + "github.com/taskctl/taskctl/pkg/utils" +) + +// PgidExecutor is an executor that starts commands in a new process group +// +// It is based on executor.DefaultExecutor but uses an exec handler that sets setpgid to +// start child processes in a new process group (with negative pid). It is used to prevent +// signals from propagating to children and to kill all descendant processes e.g. when forked by a shell process. +type PgidExecutor struct { + dir string + env []string + interp *interp.Runner + buf bytes.Buffer +} + +// NewPgidExecutor creates new pgid executor +func NewPgidExecutor(stdin io.Reader, stdout, stderr io.Writer, killTimeout time.Duration) (*PgidExecutor, error) { + var err error + e := &PgidExecutor{ + env: os.Environ(), + } + + e.dir, err = os.Getwd() + if err != nil { + return nil, err + } + + if stdout == nil { + stdout = io.Discard + } + + if stderr == nil { + stderr = io.Discard + } + + e.interp, err = interp.New( + interp.StdIO(stdin, io.MultiWriter(&e.buf, stdout), io.MultiWriter(&e.buf, stderr)), + interp.ExecHandler(func(ctx context.Context, args []string) error { + hc := interp.HandlerCtx(ctx) + path, err := interp.LookPathDir(hc.Dir, hc.Env, args[0]) + if err != nil { + _, _ = fmt.Fprintln(hc.Stderr, err) + return interp.NewExitStatus(127) + } + cmd := exec.Cmd{ + Path: path, + Args: args, + Env: execEnv(hc.Env), + Dir: hc.Dir, + Stdin: hc.Stdin, + Stdout: hc.Stdout, + Stderr: hc.Stderr, + SysProcAttr: &syscall.SysProcAttr{ + // Added for prunner: Using a process group for the new child process fixes 2 things: + // 1. We can handle signals like SIGINT in the prunner main process and gracefully shutdown running tasks + // 2. Bash scripts will not forward signals by default, so using process groups can be used to send signals to all children in the group + // (https://unix.stackexchange.com/questions/146756/forward-sigterm-to-child-in-bash) + Setpgid: true, + }, + } + + err = cmd.Start() + if err == nil { + if done := ctx.Done(); done != nil { + go func() { + <-done + + if killTimeout <= 0 { + _ = syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL) + return + } + + // TODO: don't temporarily leak this goroutine if the program stops itself with the interrupt. (from github.com/mvdan/sh) + go func() { + time.Sleep(killTimeout) + _ = syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL) + }() + _ = syscall.Kill(-cmd.Process.Pid, syscall.SIGINT) + }() + } + + err = cmd.Wait() + } + + switch x := err.(type) { + case *exec.ExitError: + // started, but errored - default to 1 if OS doesn't have exit statuses + if status, ok := x.Sys().(syscall.WaitStatus); ok { + if status.Signaled() { + if ctx.Err() != nil { + return ctx.Err() + } + return interp.NewExitStatus(uint8(128 + status.Signal())) + } + return interp.NewExitStatus(uint8(status.ExitStatus())) + } + return interp.NewExitStatus(1) + case *exec.Error: + // did not start + _, _ = fmt.Fprintf(hc.Stderr, "%v\n", err) + return interp.NewExitStatus(127) + default: + return err + } + }), + ) + if err != nil { + return nil, err + } + + return e, nil +} + +// Execute executes given job with provided context +// Returns job output +func (e *PgidExecutor) Execute(ctx context.Context, job *executor.Job) ([]byte, error) { + command, err := utils.RenderString(job.Command, job.Vars.Map()) + if err != nil { + return nil, err + } + + cmd, err := syntax.NewParser(syntax.KeepComments(true)).Parse(strings.NewReader(command), "") + if err != nil { + return nil, err + } + + env := e.env + env = append(env, utils.ConvertEnv(utils.ConvertToMapOfStrings(job.Env.Map()))...) + + if job.Dir == "" { + job.Dir = e.dir + } + + jobID := job.Vars.Get(JobIDVariableName).(string) + + log. + WithField("component", "executor"). + WithField("jobID", jobID). + Debugf("Executing %q", command) + + e.interp.Dir = job.Dir + e.interp.Env = expand.ListEnviron(env...) + + var cancelFn context.CancelFunc + if job.Timeout != nil { + ctx, cancelFn = context.WithTimeout(ctx, *job.Timeout) + } + defer func() { + if cancelFn != nil { + cancelFn() + } + }() + + offset := e.buf.Len() + err = e.interp.Run(ctx, cmd) + if err != nil { + return e.buf.Bytes()[offset:], err + } + + return e.buf.Bytes()[offset:], nil +} + +func execEnv(env expand.Environ) []string { + list := make([]string, 0, 64) + env.Each(func(name string, vr expand.Variable) bool { + if !vr.IsSet() { + // If a variable is set globally but unset in the + // runner, we need to ensure it's not part of the final + // list. Seems like zeroing the element is enough. + // This is a linear search, but this scenario should be + // rare, and the number of variables shouldn't be large. + for i, kv := range list { + if strings.HasPrefix(kv, name+"=") { + list[i] = "" + } + } + } + if vr.Exported && vr.Kind == expand.String { + list = append(list, name+"="+vr.String()) + } + return true + }) + return list +} diff --git a/taskctl/executor_test.go b/taskctl/executor_test.go new file mode 100644 index 0000000..86d1a86 --- /dev/null +++ b/taskctl/executor_test.go @@ -0,0 +1,56 @@ +package taskctl_test + +import ( + "bytes" + "context" + "io/ioutil" + "testing" + "time" + + "github.com/taskctl/taskctl/pkg/executor" + + "github.com/Flowpack/prunner/taskctl" +) + +func TestPgidExecutor_Execute(t *testing.T) { + e, err := taskctl.NewPgidExecutor(nil, ioutil.Discard, ioutil.Discard, 2*time.Second) + if err != nil { + t.Fatal(err) + } + + job1 := executor.NewJobFromCommand("echo 'success'") + to := 1 * time.Minute + job1.Timeout = &to + + output, err := e.Execute(context.Background(), job1) + if err != nil { + t.Fatal(err) + } + + if !bytes.Contains(output, []byte("success")) { + t.Error() + } + + job1 = executor.NewJobFromCommand("exit 1") + + _, err = e.Execute(context.Background(), job1) + if err == nil { + t.Error() + } + + if _, ok := executor.IsExitStatus(err); !ok { + t.Error() + } + + job2 := executor.NewJobFromCommand("echo {{ .Fail }}") + _, err = e.Execute(context.Background(), job2) + if err == nil { + t.Error() + } + + job3 := executor.NewJobFromCommand("printf '%s\\nLine-2\\n' '=========== Line 1 ==================' ") + _, err = e.Execute(context.Background(), job3) + if err != nil { + t.Error() + } +} diff --git a/taskctl/runner.go b/taskctl/runner.go index 58928c2..74f2878 100644 --- a/taskctl/runner.go +++ b/taskctl/runner.go @@ -9,7 +9,7 @@ import ( "sync" "time" - "github.com/sirupsen/logrus" + "github.com/apex/log" "github.com/taskctl/taskctl/pkg/executor" "github.com/taskctl/taskctl/pkg/output" "github.com/taskctl/taskctl/pkg/runner" @@ -52,6 +52,8 @@ type TaskRunner struct { outputStore OutputStore onTaskChange func(t *task.Task) + + killTimeout time.Duration } var _ Runner = &TaskRunner{} @@ -68,6 +70,8 @@ func NewTaskRunner(outputStore OutputStore, opts ...Opts) (*TaskRunner, error) { env: variables.NewVariables(), outputStore: outputStore, + + killTimeout: 2 * time.Second, } r.ctx, r.cancelFunc = context.WithCancel(context.Background()) @@ -125,7 +129,7 @@ func (r *TaskRunner) Run(t *task.Task) error { defer func() { err = execContext.After() if err != nil { - logrus.Error(err) + log.Errorf("Error executing after tasks: %v", err) } if !t.Errored && !t.Skipped { @@ -139,13 +143,18 @@ func (r *TaskRunner) Run(t *task.Task) error { env = env.With("TASK_NAME", t.Name) env = env.Merge(t.Env) + jobID := t.Variables.Get(JobIDVariableName).(string) + meets, err := r.checkTaskCondition(t) if err != nil { return err } if !meets { - logrus.Infof("task %s was skipped", t.Name) + log. + WithField("component", "runner"). + WithField("jobID", jobID). + Infof("Task %s was skipped", t.Name) t.Skipped = true return nil } @@ -155,8 +164,6 @@ func (r *TaskRunner) Run(t *task.Task) error { return err } - jobID := t.Variables.Get(JobIDVariableName).(string) - var ( stdoutWriter = []io.Writer{&t.Log.Stdout} stderrWriter = []io.Writer{&t.Log.Stderr} @@ -214,7 +221,6 @@ func (r *TaskRunner) Cancel() { r.cancelMutex.Lock() if !r.canceling { r.canceling = true - defer logrus.Debug("runner has been cancelled") r.cancelFunc() } r.cancelMutex.Unlock() @@ -255,7 +261,7 @@ func (r *TaskRunner) before(ctx context.Context, t *task.Task, env, vars variabl return fmt.Errorf("\"before\" command compilation failed: %w", err) } - exec, err := executor.NewDefaultExecutor(job.Stdin, job.Stdout, job.Stderr) + exec, err := NewPgidExecutor(job.Stdin, job.Stdout, job.Stderr, r.killTimeout) if err != nil { return err } @@ -285,14 +291,17 @@ func (r *TaskRunner) after(ctx context.Context, t *task.Task, env, vars variable return fmt.Errorf("\"after\" command compilation failed: %w", err) } - exec, err := executor.NewDefaultExecutor(job.Stdin, job.Stdout, job.Stderr) + exec, err := NewPgidExecutor(job.Stdin, job.Stdout, job.Stderr, r.killTimeout) if err != nil { return err } _, err = exec.Execute(ctx, job) if err != nil { - logrus.Warning(err) + log. + WithField("component", "runner"). + WithField("command", command). + Warnf("After command failed: %v", err) } } @@ -340,7 +349,7 @@ func (r *TaskRunner) checkTaskCondition(t *task.Task) (bool, error) { return false, err } - exec, err := executor.NewDefaultExecutor(job.Stdin, job.Stdout, job.Stderr) + exec, err := NewPgidExecutor(job.Stdin, job.Stdout, job.Stderr, r.killTimeout) if err != nil { return false, err } @@ -370,7 +379,7 @@ func (r *TaskRunner) storeTaskOutput(t *task.Task) { } func (r *TaskRunner) execute(ctx context.Context, t *task.Task, job *executor.Job) error { - exec, err := executor.NewDefaultExecutor(job.Stdin, job.Stdout, job.Stderr) + exec, err := NewPgidExecutor(job.Stdin, job.Stdout, job.Stderr, r.killTimeout) if err != nil { return err } @@ -385,7 +394,6 @@ func (r *TaskRunner) execute(ctx context.Context, t *task.Task, job *executor.Jo prevOutput, err = exec.Execute(ctx, nextJob) if err != nil { - t.End = time.Now() if status, ok := executor.IsExitStatus(err); ok { t.ExitCode = int16(status) if t.AllowFailure { @@ -428,7 +436,6 @@ func WithEnv(env variables.Container) Opts { } } - // WithVariables adds provided variables to task runner func WithVariables(variables variables.Container) Opts { return func(runner *TaskRunner) { @@ -437,3 +444,10 @@ func WithVariables(variables variables.Container) Opts { // runner.compiler.variables = variables } } + +// WithKillTimeout sets the kill timeout for execution of commands +func WithKillTimeout(killTimeout time.Duration) Opts { + return func(runner *TaskRunner) { + runner.killTimeout = killTimeout + } +} diff --git a/taskctl/scheduler.go b/taskctl/scheduler.go index d2fead1..7ae2271 100644 --- a/taskctl/scheduler.go +++ b/taskctl/scheduler.go @@ -6,7 +6,7 @@ import ( "sync/atomic" "time" - "github.com/sirupsen/logrus" + "github.com/apex/log" "github.com/taskctl/taskctl/pkg/runner" "github.com/taskctl/taskctl/pkg/scheduler" "github.com/taskctl/taskctl/pkg/utils" @@ -61,7 +61,9 @@ func (s *Scheduler) Schedule(g *scheduler.ExecutionGraph) error { if stage.Condition != "" { meets, err := checkStageCondition(stage.Condition) if err != nil { - logrus.Error(err) + log. + WithField("component", "runner"). + Errorf("Failed to check stage condition: %v", err) stage.UpdateStatus(scheduler.StatusError) s.notifyStageChange(stage) s.Cancel() @@ -174,7 +176,8 @@ func checkStatus(p *scheduler.ExecutionGraph, stage *scheduler.Stage) (ready boo for _, dep := range p.To(stage.Name) { depStage, err := p.Node(dep) if err != nil { - logrus.Fatal(err) + // This should not happen unless the graph is inconsistent + panic(err) } switch depStage.ReadStatus() {