-
-
Notifications
You must be signed in to change notification settings - Fork 12
/
errsizedgroup.go
170 lines (145 loc) · 3.66 KB
/
errsizedgroup.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
package syncs
import (
"fmt"
"strings"
"sync"
)
// ErrSizedGroup is a SizedGroup with error control. Works the same as errgrp.Group, i.e. returns first error.
// Can work as regular errgrp.Group or with early termination. Thread safe.
// ErrSizedGroup interface enforces constructor usage and doesn't allow direct creation of errSizedGroup
type ErrSizedGroup struct {
options
wg sync.WaitGroup
sema Locker
err *MultiError
errLock sync.RWMutex
errOnce sync.Once
}
// NewErrSizedGroup makes wait group with limited size alive goroutines.
// By default, all goroutines will be started but will wait inside.
// For limited number of goroutines use Preemptive() options.
// TermOnErr will skip (won't start) all other goroutines if any error returned.
func NewErrSizedGroup(size int, options ...GroupOption) *ErrSizedGroup {
res := ErrSizedGroup{
sema: NewSemaphore(size),
err: new(MultiError),
}
for _, opt := range options {
opt(&res.options)
}
return &res
}
// Go calls the given function in a new goroutine.
// The first call to return a non-nil error cancels the group if termOnError; its error will be
// returned by Wait. If no termOnError all errors will be collected in multierror.
func (g *ErrSizedGroup) Go(f func() error) {
canceled := func() bool {
if g.ctx == nil {
return false
}
select {
case <-g.ctx.Done():
return true
default:
return false
}
}
if canceled() {
g.errOnce.Do(func() {
// don't repeat this error
g.err.append(g.ctx.Err())
})
return
}
g.wg.Add(1)
isLocked := false
if g.preLock {
lockOk := g.sema.TryLock()
if lockOk {
isLocked = true
}
if !lockOk && g.discardIfFull {
// lock failed and discardIfFull is set, discard this goroutine
g.wg.Done()
return
}
if !lockOk && !g.discardIfFull {
g.sema.Lock() // make sure we have block until lock is acquired
isLocked = true
}
}
go func() {
defer g.wg.Done()
// terminated will be true if any error happened before and g.termOnError
terminated := func() bool {
if !g.termOnError {
return false
}
g.errLock.RLock()
defer g.errLock.RUnlock()
return g.err.ErrorOrNil() != nil
}
defer func() {
if isLocked {
g.sema.Unlock()
}
}()
if terminated() {
return // terminated due prev error, don't run anything in this group anymore
}
if !g.preLock {
g.sema.Lock()
isLocked = true
}
if err := f(); err != nil {
g.errLock.Lock()
g.err = g.err.append(err)
g.errLock.Unlock()
}
}()
}
// Wait blocks until all function calls from the Go method have returned, then
// returns all errors (if any) wrapped with multierror from them.
func (g *ErrSizedGroup) Wait() error {
g.wg.Wait()
return g.err.ErrorOrNil()
}
// MultiError is a thread safe container for multi-error type that implements error interface
type MultiError struct {
errors []error
lock sync.Mutex
}
func (m *MultiError) append(err error) *MultiError {
m.lock.Lock()
m.errors = append(m.errors, err)
m.lock.Unlock()
return m
}
// ErrorOrNil returns nil if no errors or multierror if errors occurred
func (m *MultiError) ErrorOrNil() error {
m.lock.Lock()
defer m.lock.Unlock()
if len(m.errors) == 0 {
return nil
}
return m
}
// Error returns multi-error string
func (m *MultiError) Error() string {
m.lock.Lock()
defer m.lock.Unlock()
if len(m.errors) == 0 {
return ""
}
errs := []string{}
for n, e := range m.errors {
errs = append(errs, fmt.Sprintf("[%d] {%s}", n, e.Error()))
}
return fmt.Sprintf("%d error(s) occurred: %s", len(m.errors), strings.Join(errs, ", "))
}
// Errors returns all errors collected
func (m *MultiError) Errors() []error {
m.lock.Lock()
defer m.lock.Unlock()
return m.errors
}