-
-
Notifications
You must be signed in to change notification settings - Fork 1
/
grace.go
214 lines (182 loc) · 5.13 KB
/
grace.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
package grace
import (
"context"
"errors"
"fmt"
"log"
"os"
"os/signal"
"runtime/debug"
"sync"
"syscall"
)
var (
// ErrShutdownSignalReceived is used when a shutdown signal is received.
// It will cause a graceful shutdown.
ErrShutdownSignalReceived = errors.New("shutdown signal received")
// ErrShutdownRequestReceived is used when a shutdown is initiated without a signal.
// It will cause a graceful shutdown.
ErrShutdownRequestReceived = errors.New("shutdown request received")
// ErrImmediateShutdownSignalReceived is used when a shutdown signal is received for the second time.
// It will cause an immediate shutdown.
ErrImmediateShutdownSignalReceived = errors.New("immediate shutdown signal received")
)
// RecoveredPanicError represents a recovered panic.
type RecoveredPanicError struct {
// Err is the recovered error.
Err interface{}
// Stack is the stack trace from the recover.
Stack []byte
}
// Error returns the recovered panic as a string
func (p RecoveredPanicError) Error() string {
return fmt.Sprintf("%v", p.Err)
}
// Init returns a new and initialised instance of Grace.
func Init(ctx context.Context, options ...InitOption) *Grace {
ctx, cancel := context.WithCancel(ctx)
g := &Grace{
wg: &sync.WaitGroup{},
ctx: ctx,
cancelFn: cancel,
errCh: make(chan error),
LogFn: log.Printf,
}
g.ErrHandler = DefaultErrorHandler(g)
for _, option := range options {
option.Apply(g)
}
go g.handleErrors()
go g.handleShutdownSignals()
return g
}
// Grace manages Runners and handles errors.
type Grace struct {
// ErrHandler is the active error handler grace will pass errors to.
ErrHandler ErrHandlerFn
// LogFn is the log function that grace will use.
LogFn func(format string, v ...interface{})
wg *sync.WaitGroup
ctx context.Context
cancelFn context.CancelFunc
errCh chan error
}
// ErrHandlerFn is an error handler used to handle errors returned from Runners.
// Returns true if a graceful shutdown should be triggered as a result of the error.
type ErrHandlerFn func(err error) bool
// Context returns the grace context.
// Use this throughout your application to be notified when a graceful shutdown has started.
func (l *Grace) Context() context.Context {
return l.ctx
}
// Wait will block until all Runners have stopped running.
func (l *Grace) Wait() {
l.wg.Wait()
}
// Run runs a new Runner.
func (l *Grace) Run(runner Runner) {
l.wg.Add(1)
go l.run(runner)
}
// runRunner executes the given runner and returns any panics as a RecoveredPanicError.
// If a runner panics it will cause a graceful shutdown.
func (l *Grace) runRunner(ctx context.Context, runner Runner) (err error) {
defer func() {
if r := recover(); r != nil {
err = RecoveredPanicError{Err: r, Stack: debug.Stack()}
}
}()
err = nil
err = runner.Run(ctx)
return
}
func (l *Grace) run(runner Runner) {
defer l.wg.Done()
runErrs := make(chan error)
runWg := &sync.WaitGroup{}
runWg.Add(1)
go func() {
defer func() {
runWg.Done()
}()
err := l.runRunner(l.ctx, runner)
if err != nil {
// If the context is already done, we won't be able to write to runErrs
// as nothing is waiting to read from it anymore.
// We may want to handle this slightly differently to ensure all errors are handled, but this can
// be safely ignored for now.
select {
case runErrs <- err:
default:
}
}
}()
select {
case runErr := <-runErrs:
// The runner returned an error.
l.errCh <- runErr
case <-l.ctx.Done():
// We've been told to shutdown.
// The runner should shutdown on ctx.Done() so we can just wait for it to finish.
runWg.Wait()
}
}
func (l *Grace) handleShutdownSignals() {
signals := make(chan os.Signal, 1)
signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM, syscall.SIGKILL)
go func() {
count := 0
for sig := range signals {
count++
if count > 1 || sig == syscall.SIGKILL {
l.errCh <- ErrImmediateShutdownSignalReceived
continue
}
l.errCh <- ErrShutdownSignalReceived
}
}()
}
// DefaultErrorHandler returns a standard error handler that will log errors and trigger a shutdown.
func DefaultErrorHandler(g *Grace) func(err error) bool {
return func(err error) bool {
if err == ErrImmediateShutdownSignalReceived {
os.Exit(1)
}
if g.LogFn != nil {
g.LogFn("default error handler: %s\n", err.Error())
}
// Always shutdown on error.
return true
}
}
func (l *Grace) handleErrors() {
go func() {
for err := range l.errCh {
if l.ErrHandler(err) {
l.cancelFn()
}
}
}()
}
// Shutdown will start a manual shutdown.
func (l *Grace) Shutdown() {
l.errCh <- ErrShutdownRequestReceived
}
// InitOption allows you to modify the way grace is initialised.
type InitOption interface {
// Apply allows you to apply options to the grace instance as it is being initialised.
Apply(g *Grace)
}
type initOptionFn struct {
apply func(g *Grace)
}
// Apply allows you to apply options to the grace instance as it is being initialised.
func (o *initOptionFn) Apply(g *Grace) {
o.apply(g)
}
// InitOptionFn returns an InitOption that applies the given function.
func InitOptionFn(fn func(g *Grace)) InitOption {
return &initOptionFn{
apply: fn,
}
}