Skip to content

Commit

Permalink
FEATURE: Graceful shutdown and process groups for child processes
Browse files Browse the repository at this point in the history
Implements shutdown via SIGINT (graceful) and SIGTERM (cancelling jobs).
Also fix an issue with child processes from scripts by setting setpgid
to start processes (which is needed to prevent signals being forwarded).

Closes #14
  • Loading branch information
hlubek committed Apr 4, 2022
1 parent 4b39ed5 commit c593dff
Show file tree
Hide file tree
Showing 10 changed files with 542 additions and 124 deletions.
73 changes: 8 additions & 65 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -264,85 +264,28 @@ You can also combine the two options. Then, deletion occurs with whatever comes
If a pipeline does not exist at all anymore (i.e. if you renamed `do_something` to `another_name` above),
its persisted logs and task data is removed automatically on saving to disk.

### Handling of child processes

### Interruptible Jobs
Prunner starts child processes with `setpgid` to use a new process group for each task of a pipeline job.
This means that if a job is cancelled, all child processes are killed - even if they were run by a shell script.

When terminating a task, unfortunately, only the top-level executed script is terminated - and the script needs
to take care to kill its children appropriately and not hang. This is very likely because of the following invocation chain:
Note: If prunner is killed hard (e.g. SIGKILL) without SIGINT / SIGTERM, the child processes of running jobs will not be terminated.

- prunner
- taskctl
- mvdan/sh [DefaultExecHandler](https://github.com/mvdan/sh/blob/master/interp/handler.go#L100-L104)
### Graceful shutdown

-> This kills only the process itself, **but not its children**. Unfortunately for us, Bash [does not forward signals like
SIGTERM to processes it is currently waiting on](https://unix.stackexchange.com/questions/146756/forward-sigterm-to-child-in-bash).

Currently, killing children recursively is not implemented, but can be done manually in a bash script [as explained here](https://newbedev.com/forward-sigterm-to-child-in-bash):

Instead of doing:

```
#!/usr/bin/env bash
/usr/local/bin/my_long_running_process
```

You can do the following to pass along signals:

```bash
#!/usr/bin/env bash
# taken from https://newbedev.com/forward-sigterm-to-child-in-bash
prep_term()
{
unset term_child_pid
unset term_kill_needed
trap 'handle_term' TERM INT
}
handle_term()
{
if [ "${term_child_pid}" ]; then
kill -TERM "${term_child_pid}" 2>/dev/null
else
term_kill_needed="yes"
fi
}
wait_term()
{
term_child_pid=$!
if [ "${term_kill_needed}" ]; then
kill -TERM "${term_child_pid}" 2>/dev/null
fi
wait ${term_child_pid} 2>/dev/null
trap - TERM INT
wait ${term_child_pid} 2>/dev/null
}
prep_term
/usr/local/bin/my_long_running_process &
wait_term
```

**This is a workaround; and at some point in the future it would be nice to solve it as [explained here](https://medium.com/@felixge/killing-a-child-process-and-all-of-its-children-in-go-54079af94773)**.
Prunner will handle a SIGINT signal and perform a graceful shutdown and wait for all running jobs to be completed.
Sending a SIGTERM signal to prunner will cancel all running jobs (and interrupt / kill child processes).

### Persistent Job State





## Development

### Requirements

* Go (>= 1.16)
* Go (>= 1.18)

### Running locally


```bash
go run cmd/prunner/main.go
```
Expand Down
45 changes: 40 additions & 5 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ import (
"io"
"net/http"
"os"
"os/signal"
"path"
"path/filepath"
"syscall"

"github.com/apex/log"
logfmt_handler "github.com/apex/log/handlers/logfmt"
Expand Down Expand Up @@ -174,7 +176,7 @@ func appAction(c *cli.Context) error {
return errors.Wrap(err, "building output store")
}

store, err := store.NewJSONDataStore(path.Join(c.String("data")))
dataStore, err := store.NewJSONDataStore(path.Join(c.String("data")))
if err != nil {
return errors.Wrap(err, "building pipeline runner store")
}
Expand All @@ -189,7 +191,7 @@ func appAction(c *cli.Context) error {
taskRunner.Stderr = io.Discard

return taskRunner
}, store, outputStore)
}, dataStore, outputStore)
if err != nil {
return err
}
Expand All @@ -202,10 +204,43 @@ func appAction(c *cli.Context) error {
)

// Set up a simple REST API for listing jobs and scheduling pipelines

address := c.String("address")
log.
Infof("HTTP API Listening on %s", c.String("address"))
return http.ListenAndServe(c.String("address"), srv)
Infof("HTTP API listening on %s", address)

// Start http server and handle graceful shutdown
httpSrv := http.Server{
Addr: address,
Handler: srv,
}
go func() {
if err := httpSrv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Fatalf("Error starting HTTP API: %s", err)
}
}()

// How signals are handled:
// - SIGINT: Shutdown gracefully and wait for jobs to be finished completely
// - SIGTERM: Cancel running jobs

ctx, cancel := signal.NotifyContext(c.Context, syscall.SIGINT, syscall.SIGTERM)
defer cancel()
// Cancel the shutdown context on SIGTERM to prevent waiting for jobs and client connections
shutdownCtx, shutdownCancel := signal.NotifyContext(c.Context, syscall.SIGTERM)
defer shutdownCancel()

// Wait for SIGINT or SIGTERM
<-ctx.Done()

log.Info("Received signal, waiting until jobs are finished...")
_ = pRunner.Shutdown(shutdownCtx)

log.Debugf("Shutting down HTTP API...")
_ = httpSrv.Shutdown(shutdownCtx)

log.Info("Shutdown complete")

return nil
}

func createLogFormatter(c *cli.Context) middleware.LogFormatter {
Expand Down
34 changes: 3 additions & 31 deletions examples/pipelines.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,45 +3,17 @@ pipelines:
tasks:
lint:
script:
- staticcheck $(go list ./... | grep -v /vendor/)
# - staticcheck $(go list ./... | grep -v /vendor/)
- go vet $(go list ./... | grep -v /vendor/)
test:
script:
- go test ./...
do_something_long:
script:
- echo Tick
- sleep 1
- echo Tock
- sleep 1
- echo Tick
- sleep 1
- echo Tock
- sleep 1
- echo Tick
- sleep 1
- echo Tock
- sleep 1
- echo Tick
- sleep 1
- echo Tock
- sleep 1
- echo Tick
- sleep 1
- echo Tock
- sleep 1
- echo Tick
- sleep 1
- sleep 20
- echo Tock
- sleep 1
- echo Tick
- sleep 1
- echo Tock
- sleep 1
- echo Tick
- sleep 1
- echo Tock
- sleep 1
- sleep 20
- echo Dong
build:
script:
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ require (
github.com/liamylian/jsontime/v2 v2.0.0
github.com/mattn/go-isatty v0.0.14
github.com/mattn/go-zglob v0.0.3
github.com/sirupsen/logrus v1.8.1
github.com/stretchr/testify v1.7.1
github.com/taskctl/taskctl v1.3.1-0.20210426182424-d8747985c906
github.com/urfave/cli/v2 v2.4.0
gopkg.in/yaml.v2 v2.4.0
mvdan.cc/sh/v3 v3.4.3
)

require (
Expand All @@ -42,12 +42,12 @@ require (
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/sirupsen/logrus v1.8.1 // indirect
golang.org/x/crypto v0.0.0-20220331220935-ae2d96664a29 // indirect
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect
golang.org/x/sys v0.0.0-20220403205710-6acee93ad0eb // indirect
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 // indirect
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
mvdan.cc/sh/v3 v3.4.3 // indirect
)
91 changes: 88 additions & 3 deletions prunner.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ type PipelineRunner struct {

// Wait group for waiting for asynchronous operations like job.Cancel
wg sync.WaitGroup
// Flag if the runner is shutting down
isShuttingDown bool

// Poll interval for completed jobs for graceful shutdown
ShutdownPollInterval time.Duration
}

// NewPipelineRunner creates the central data structure which controls the full runner state; so this knows what is currently running
Expand All @@ -61,9 +66,10 @@ func NewPipelineRunner(ctx context.Context, defs *definition.PipelinesDef, creat
waitListByPipeline: make(map[string][]*PipelineJob),
store: store,
outputStore: outputStore,
// Use channel buffered with one extra slot so we can keep save requests while a save is running without blocking
persistRequests: make(chan struct{}, 1),
createTaskRunner: createTaskRunner,
// Use channel buffered with one extra slot, so we can keep save requests while a save is running without blocking
persistRequests: make(chan struct{}, 1),
createTaskRunner: createTaskRunner,
ShutdownPollInterval: 3 * time.Second,
}

if store != nil {
Expand All @@ -76,6 +82,7 @@ func NewPipelineRunner(ctx context.Context, defs *definition.PipelinesDef, creat
for {
select {
case <-ctx.Done():
log.Debugf("PipelineRunner: context done, stopping persist loop")
return
case <-pRunner.persistRequests:
pRunner.SaveToStore()
Expand Down Expand Up @@ -175,11 +182,16 @@ var errQueueFull = errors.New("concurrency exceeded and queue limit reached for
var ErrJobNotFound = errors.New("job not found")
var errJobAlreadyCompleted = errors.New("job is already completed")
var errJobNotStarted = errors.New("job is not started")
var ErrShuttingDown = errors.New("runner is shutting down")

func (r *PipelineRunner) ScheduleAsync(pipeline string, opts ScheduleOpts) (*PipelineJob, error) {
r.mx.Lock()
defer r.mx.Unlock()

if r.isShuttingDown {
return nil, ErrShuttingDown
}

pipelineDef, ok := r.defs.Pipelines[pipeline]
if !ok {
return nil, errors.Errorf("pipeline %q is not defined", pipeline)
Expand Down Expand Up @@ -367,7 +379,9 @@ func (r *PipelineRunner) startJob(job *PipelineJob) {
job.Start = &now

// Run graph asynchronously
r.wg.Add(1)
go func() {
defer r.wg.Done()
lastErr := job.sched.Schedule(graph)
r.JobCompleted(job.ID, lastErr)
}()
Expand Down Expand Up @@ -685,6 +699,9 @@ func (r *PipelineRunner) initialLoadFromStore() error {
}

func (r *PipelineRunner) SaveToStore() {
r.wg.Add(1)
defer r.wg.Done()

log.
WithField("component", "runner").
Debugf("Saving job state to data store")
Expand Down Expand Up @@ -774,6 +791,74 @@ func (r *PipelineRunner) SaveToStore() {
}
}

func (r *PipelineRunner) Shutdown(ctx context.Context) error {
defer func() {
log.
WithField("component", "runner").
Debugf("Shutting down, waiting for pending operations...")
r.wg.Wait()
}()

r.mx.Lock()
r.isShuttingDown = true
// Cancel all jobs on wait list
for pipelineName, jobs := range r.waitListByPipeline {
for _, job := range jobs {
job.Canceled = true
log.
WithField("component", "runner").
WithField("jobID", job.ID).
WithField("pipeline", pipelineName).
Warnf("Shutting down, marking job on wait list as canceled")
}
delete(r.waitListByPipeline, pipelineName)
}
r.mx.Unlock()

for {
// Poll pipelines to check for running jobs
r.mx.RLock()
hasRunningPipelines := false
for pipelineName := range r.jobsByPipeline {
if r.isRunning(pipelineName) {
hasRunningPipelines = true
log.
WithField("component", "runner").
WithField("pipeline", pipelineName).
Debugf("Shutting down, waiting for pipeline to finish...")
}
}
r.mx.RUnlock()

if !hasRunningPipelines {
log.
WithField("component", "runner").
Debugf("Shutting down, all pipelines finished")
break
}

// Wait a few seconds before checking pipeline status again, or cancel jobs if the context is done
select {
case <-time.After(r.ShutdownPollInterval):
case <-ctx.Done():
log.
WithField("component", "runner").
Warnf("Forced shutdown, cancelling all jobs")

r.mx.Lock()

for jobID := range r.jobsByID {
_ = r.cancelJobInternal(jobID)
}
r.mx.Unlock()

return ctx.Err()
}
}

return nil
}

// taken from https://stackoverflow.com/a/37335777
func removeJobFromList(jobs []*PipelineJob, jobToRemove *PipelineJob) []*PipelineJob {
for index, job := range jobs {
Expand Down
Loading

0 comments on commit c593dff

Please sign in to comment.