Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FEATURE: Graceful shutdown and process groups for child processes #33

Merged
merged 5 commits into from
Apr 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 26 additions & 89 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ A minimalistic React UI to start and view pipelines, job and task details.

A Neos/Flow PHP package providing a backend module embedding prunner-ui and a PHP API for interacting with the prunner Rest API.

## User Guide
## User guide

### Main Concepts
### Main concepts

prunner controls a set of *pipelines*, which are defined in *YAML* files (typically `pipelines.yml`).
The pipelines consist of *tasks*, which are executed as part of the pipeline. Each task has a `script`
Expand All @@ -57,7 +57,7 @@ pipelines:
```


### Task Dependencies
### Task dependencies

In case you need to ensure certain steps are executed in-order, you can use
**task dependencies** to order tasks using the `depends_on` key:
Expand All @@ -82,14 +82,14 @@ This is an intended limitation to keep complexity low; so we do not plan to supp
or anything like this.

In case you need to store information from one task to the next, it is recommended that you
do this outside prunner, and pass in a Job Argument with an identifier to every task
do this outside prunner, and pass in a job argument with an identifier to every task
(explained in the next section).


### Job Variables
### Job variables

When starting a job, (i.e. `do_something` in the example below), you can send additional
**variables** as JSON. The script is passed through the [text/template](https://pkg.go.dev/text/template)
**variables** as JSON. The script is passed through the Go [text/template](https://pkg.go.dev/text/template)
templating language, where you can access the variables. This way, you can pass the variable
contents to your scripts.

Expand Down Expand Up @@ -136,9 +136,10 @@ pipelines:

#### Dotenv files

Prunner will override the process environment from files `.env` and `.env.local` by default. The files are configurable via the `env-files` flag.
Prunner will override the process environment from files `.env` and `.env.local` by default.
The files are configurable via the `env-files` flag.

### Limiting Concurrency
### Limiting concurrency

Certain pipelines, like deployment pipelines, usually should only run only once, and never be started
concurrently. Prunner supports this via a configurable *concurrency*:
Expand All @@ -156,7 +157,7 @@ tasks in the pipeline run concurrently.*
Now, when the concurrency limit is reached and you schedule the pipeline again while it is running,
**the job is queued** to be worked on later - it is added to the wait list by default.

### The Wait List
### The wait list

By default, if you limit concurrency, and the limit is exceeded, further jobs are added to the
waitlist of the pipeline.
Expand Down Expand Up @@ -219,7 +220,7 @@ pipelines:
```


### Disabling Fail-Fast Behavior
### Disabling fail-fast behavior

By default, if a task in a pipeline fails, all other concurrently running tasks are directly aborted.
Sometimes this is not desirable, e.g. if certain deployment tasks should continue running if already started.
Expand All @@ -235,7 +236,7 @@ pipelines:
```


### Configuring Retention Period
### Configuring retention period

By default, we never delete any runs. For many projects, it is useful to configure this to keep the
consumed disk space under control. This can be done on a per-pipeline level; using one of the two configuration
Expand Down Expand Up @@ -264,87 +265,30 @@ You can also combine the two options. Then, deletion occurs with whatever comes
If a pipeline does not exist at all anymore (i.e. if you renamed `do_something` to `another_name` above),
its persisted logs and task data is removed automatically on saving to disk.

### Handling of child processes

### Interruptible Jobs

When terminating a task, unfortunately, only the top-level executed script is terminated - and the script needs
to take care to kill its children appropriately and not hang. This is very likely because of the following invocation chain:

- prunner
- taskctl
- mvdan/sh [DefaultExecHandler](https://github.com/mvdan/sh/blob/master/interp/handler.go#L100-L104)

-> This kills only the process itself, **but not its children**. Unfortunately for us, Bash [does not forward signals like
SIGTERM to processes it is currently waiting on](https://unix.stackexchange.com/questions/146756/forward-sigterm-to-child-in-bash).

Currently, killing children recursively is not implemented, but can be done manually in a bash script [as explained here](https://newbedev.com/forward-sigterm-to-child-in-bash):

Instead of doing:

```
#!/usr/bin/env bash

/usr/local/bin/my_long_running_process
```

You can do the following to pass along signals:

```bash
#!/usr/bin/env bash
# taken from https://newbedev.com/forward-sigterm-to-child-in-bash

prep_term()
{
unset term_child_pid
unset term_kill_needed
trap 'handle_term' TERM INT
}

handle_term()
{
if [ "${term_child_pid}" ]; then
kill -TERM "${term_child_pid}" 2>/dev/null
else
term_kill_needed="yes"
fi
}

wait_term()
{
term_child_pid=$!
if [ "${term_kill_needed}" ]; then
kill -TERM "${term_child_pid}" 2>/dev/null
fi
wait ${term_child_pid} 2>/dev/null
trap - TERM INT
wait ${term_child_pid} 2>/dev/null
}


prep_term
/usr/local/bin/my_long_running_process &
wait_term
```

**This is a workaround; and at some point in the future it would be nice to solve it as [explained here](https://medium.com/@felixge/killing-a-child-process-and-all-of-its-children-in-go-54079af94773)**.

### Persistent Job State
Prunner starts child processes with `setpgid` to use a new process group for each task of a pipeline job.
This means that if a job is cancelled, all child processes are killed - even if they were run by a shell script.

Note: If prunner is killed hard (e.g. SIGKILL) without SIGINT / SIGTERM, the child processes of running jobs will not be terminated.

### Graceful shutdown

Prunner will handle a SIGINT signal and perform a graceful shutdown and wait for all running jobs to be completed.
Sending a SIGTERM signal to prunner will cancel all running jobs (and interrupt / kill child processes).

### Persistent job state

## Development

### Requirements

* Go (>= 1.16)
* Go (>= 1.18)

### Running locally


```bash
go run cmd/prunner/main.go
go run ./cmd/prunner
```
> Note: for development a live reload wrapper like https://github.com/markbates/refresh is recommended.

Expand All @@ -353,17 +297,17 @@ The API should now be accessible at http://localhost:9009/. The log will contain
For interacting with the API, you need a JWT token which you can generate for developing using:

```bash
go run cmd/prunner/main.go debug
go run ./cmd/prunner debug
```

### Building for different operating systems.

Using the standard `GOOS` environment variable, you can build for different operating systems. This is helpful when you
want to use prunner inside a Docker container, but are developing on OSX. For this, a compile step like the following is useful:
want to use prunner inside a Docker container, but are developing on macOS. For this, a compile step like the following is useful:

```bash
# after building, copy the executable inside the docker container; so it can be directly used.
GOOS=linux go build cmd/prunner/main.go && docker cp main cms_neos_1:/app/main
GOOS=linux go build ./cmd/prunner -o bin/prunner && docker cp bin/prunner my_container:/app/prunner
```

### Running Tests
Expand All @@ -389,7 +333,7 @@ golangci-lint run

### Generate OpenAPI (Swagger) spec

An OpenAPI 2.0 spec is generated from the Go types and annoations in source code using the `go-swagger` tool (it is not
An OpenAPI 2.0 spec is generated from the Go types and annotations in source code using the `go-swagger` tool (it is not
bundled in this module). See https://goswagger.io/install.html for installation instructions.

```bash
Expand All @@ -408,13 +352,6 @@ and then all platforms are built automatically.
* An application that wants to embed prunner should read the shared secret and generate a JWT auth token for accessing the API by
doing internal HTTP requests. This way custom policies can be implemented for ensuring access to prunner.

### UI

* Show auth errors in UI

* Idea: content ranges for polling streams


## License

GPL, because we are building on [taskctl](https://github.com/taskctl/taskctl) - see [LICENSE](LICENSE).
47 changes: 41 additions & 6 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ import (
"io"
"net/http"
"os"
"os/signal"
"path"
"path/filepath"
"syscall"

"github.com/apex/log"
logfmt_handler "github.com/apex/log/handlers/logfmt"
Expand Down Expand Up @@ -174,13 +176,23 @@ func appAction(c *cli.Context) error {
return errors.Wrap(err, "building output store")
}

store, err := store.NewJSONDataStore(path.Join(c.String("data")))
dataStore, err := store.NewJSONDataStore(path.Join(c.String("data")))
if err != nil {
return errors.Wrap(err, "building pipeline runner store")
}

// How signals are handled:
// - SIGINT: Shutdown gracefully and wait for jobs to be finished completely
// - SIGTERM: Cancel running jobs

gracefulShutdownCtx, gracefulCancel := signal.NotifyContext(c.Context, syscall.SIGINT, syscall.SIGTERM)
defer gracefulCancel()
// Cancel the shutdown context on SIGTERM to prevent waiting for jobs and client connections
forcedShutdownCtx, forcedCancel := signal.NotifyContext(c.Context, syscall.SIGTERM)
defer forcedCancel()

// Set up pipeline runner
pRunner, err := prunner.NewPipelineRunner(c.Context, defs, func(j *prunner.PipelineJob) taskctl.Runner {
pRunner, err := prunner.NewPipelineRunner(gracefulShutdownCtx, defs, func(j *prunner.PipelineJob) taskctl.Runner {
// taskctl.NewTaskRunner never actually returns an error
taskRunner, _ := taskctl.NewTaskRunner(outputStore, taskctl.WithEnv(variables.FromMap(j.Env)))

Expand All @@ -189,7 +201,7 @@ func appAction(c *cli.Context) error {
taskRunner.Stderr = io.Discard

return taskRunner
}, store, outputStore)
}, dataStore, outputStore)
if err != nil {
return err
}
Expand All @@ -202,10 +214,33 @@ func appAction(c *cli.Context) error {
)

// Set up a simple REST API for listing jobs and scheduling pipelines

address := c.String("address")
log.
Infof("HTTP API Listening on %s", c.String("address"))
return http.ListenAndServe(c.String("address"), srv)
Infof("HTTP API listening on %s", address)

// Start http server and handle graceful shutdown
httpSrv := http.Server{
Addr: address,
Handler: srv,
}
go func() {
if err := httpSrv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Fatalf("Error starting HTTP API: %s", err)
}
}()

// Wait for SIGINT or SIGTERM
<-gracefulShutdownCtx.Done()

log.Info("Received signal, waiting until jobs are finished...")
_ = pRunner.Shutdown(forcedShutdownCtx)

log.Debugf("Shutting down HTTP API...")
_ = httpSrv.Shutdown(forcedShutdownCtx)

log.Info("Shutdown complete")

return nil
}

func createLogFormatter(c *cli.Context) middleware.LogFormatter {
Expand Down
10 changes: 1 addition & 9 deletions examples/pipelines.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ pipelines:
tasks:
lint:
script:
- staticcheck $(go list ./... | grep -v /vendor/)
# - staticcheck $(go list ./... | grep -v /vendor/)
- go vet $(go list ./... | grep -v /vendor/)
test:
script:
Expand Down Expand Up @@ -34,14 +34,6 @@ pipelines:
- sleep 1
- echo Tock
- sleep 1
- echo Tick
- sleep 1
- echo Tock
- sleep 1
- echo Tick
- sleep 1
- echo Tock
- sleep 1
- echo Dong
build:
script:
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ require (
github.com/liamylian/jsontime/v2 v2.0.0
github.com/mattn/go-isatty v0.0.14
github.com/mattn/go-zglob v0.0.3
github.com/sirupsen/logrus v1.8.1
github.com/stretchr/testify v1.7.1
github.com/taskctl/taskctl v1.3.1-0.20210426182424-d8747985c906
github.com/urfave/cli/v2 v2.4.0
gopkg.in/yaml.v2 v2.4.0
mvdan.cc/sh/v3 v3.4.3
)

require (
Expand All @@ -42,12 +42,12 @@ require (
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/sirupsen/logrus v1.8.1 // indirect
golang.org/x/crypto v0.0.0-20220331220935-ae2d96664a29 // indirect
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect
golang.org/x/sys v0.0.0-20220403205710-6acee93ad0eb // indirect
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 // indirect
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
mvdan.cc/sh/v3 v3.4.3 // indirect
)
Loading