-
Notifications
You must be signed in to change notification settings - Fork 0
/
environment.go
388 lines (334 loc) · 10.4 KB
/
environment.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
// Copyright 2024 HUMAN Security.
// Use of this source code is governed by a MIT style
// license that can be found in the LICENSE file.
package envite
import (
"context"
"errors"
"fmt"
"golang.org/x/sync/errgroup"
"sort"
"strings"
)
// Environment represents a collection of components that can be managed together.
// Components within an environment can be started, stopped, and configured collectively or individually.
type Environment struct {
id string
components []map[string]Component
componentsByID map[string]Component
outputManager *outputManager
Logger Logger
}
// NewEnvironment creates and initializes a new Environment with the specified id and component graph.
// It returns an error if the id is empty, the graph is nil, or if any components are misconfigured.
func NewEnvironment(id string, componentGraph *ComponentGraph, options ...Option) (*Environment, error) {
if id == "" {
return nil, ErrEmptyEnvID
}
if componentGraph == nil {
return nil, ErrNilGraph
}
id = strings.ReplaceAll(id, " ", "_")
om := newOutputManager()
b := &Environment{
id: id,
components: componentGraph.components,
componentsByID: make(map[string]Component),
outputManager: om,
}
for _, layer := range componentGraph.components {
for componentID, component := range layer {
if componentID == "" {
return nil, ErrInvalidComponentID{msg: "component id may not be empty"}
}
if strings.Contains(componentID, "|") || strings.Contains(componentID, " ") {
return nil, ErrInvalidComponentID{id: componentID, msg: "component id may not contain '|' or ' '"}
}
_, exists := b.componentsByID[componentID]
if exists {
return nil, ErrInvalidComponentID{id: componentID, msg: "duplicate component id"}
}
err := component.AttachEnvironment(context.Background(), b, om.writer(componentID))
if err != nil {
return nil, err
}
b.componentsByID[componentID] = component
}
}
for _, option := range options {
option(b)
}
if b.Logger == nil {
b.Logger = func(LogLevel, string) {}
}
return b, nil
}
// Components returns a slice of all components within the environment.
func (b *Environment) Components() []Component {
result := make([]Component, 0, len(b.componentsByID))
for _, component := range b.componentsByID {
result = append(result, component)
}
return result
}
// Apply applies the specified configuration to the environment, enabling only the components with IDs in
// enabledComponentIDs.
// It returns an error if applying the configuration fails.
func (b *Environment) Apply(ctx context.Context, enabledComponentIDs []string) error {
b.Logger(LogLevelInfo, "applying state")
enabledComponents := make(map[string]struct{}, len(enabledComponentIDs))
for _, id := range enabledComponentIDs {
enabledComponents[id] = struct{}{}
}
err := b.apply(ctx, enabledComponents)
if err != nil {
return err
}
b.Logger(LogLevelInfo, "finished applying state")
return nil
}
// StartAll starts all components in the environment concurrently.
// It returns an error if starting any component fails.
func (b *Environment) StartAll(ctx context.Context) error {
b.Logger(LogLevelInfo, "starting all")
all := make(map[string]struct{}, len(b.componentsByID))
for id := range b.componentsByID {
all[id] = struct{}{}
}
err := b.apply(ctx, all)
if err != nil {
return err
}
b.Logger(LogLevelInfo, "finished starting all")
return nil
}
// StopAll stops all components in the environment in reverse order of their startup.
// It returns an error if stopping any component fails.
func (b *Environment) StopAll(ctx context.Context) error {
b.Logger(LogLevelInfo, "stopping all")
for i := len(b.components) - 1; i >= 0; i-- {
layer := b.components[i]
g, ctx := errgroup.WithContext(ctx)
for id, component := range layer {
id := id
component := component
g.Go(func() error {
b.Logger(LogLevelInfo, fmt.Sprintf("stopping %s", id))
err := component.Stop(ctx)
if err != nil {
return fmt.Errorf("could not stop %s: %w", id, err)
}
return nil
})
}
err := g.Wait()
if err != nil {
return err
}
}
b.Logger(LogLevelInfo, "finished stopping all")
return nil
}
// StartComponent starts a single component identified by componentID.
// It does nothing if the component is already running.
// Returns an error if the component fails to start.
func (b *Environment) StartComponent(ctx context.Context, componentID string) error {
component, err := b.componentByID(componentID)
if err != nil {
return err
}
status, err := component.Status(ctx)
if err != nil {
return err
}
if status == ComponentStatusRunning || status == ComponentStatusStarting {
return nil
}
b.Logger(LogLevelInfo, fmt.Sprintf("preparing %s", componentID))
err = component.Prepare(ctx)
if err != nil {
return err
}
b.Logger(LogLevelInfo, fmt.Sprintf("starting %s", componentID))
err = component.Start(ctx)
if err != nil {
return err
}
b.Logger(LogLevelInfo, fmt.Sprintf("finished starting %s", componentID))
return nil
}
// StopComponent stops a single component identified by componentID.
// Returns an error if the component fails to stop.
func (b *Environment) StopComponent(ctx context.Context, componentID string) error {
component, err := b.componentByID(componentID)
if err != nil {
return err
}
b.Logger(LogLevelInfo, fmt.Sprintf("stopping %s", componentID))
err = component.Stop(ctx)
if err != nil {
return err
}
b.Logger(LogLevelInfo, fmt.Sprintf("finished stopping %s", componentID))
return nil
}
// Status returns the current status of all components within the environment.
func (b *Environment) Status(ctx context.Context) (GetStatusResponse, error) {
result := GetStatusResponse{ID: b.id, Components: make([][]GetStatusResponseComponent, len(b.components))}
for i, layer := range b.components {
components := make([]GetStatusResponseComponent, 0, len(layer))
for id, component := range layer {
status, err := component.Status(ctx)
if err != nil {
return GetStatusResponse{}, fmt.Errorf("could not get status for %s: %w", id, err)
}
info, err := buildComponentInfo(component)
if err != nil {
return GetStatusResponse{}, err
}
components = append(components, GetStatusResponseComponent{
ID: id,
Type: component.Type(),
Status: status,
Config: info,
})
}
// since components of each layer are stored in a map,
// their order is shuffled each time we produce status.
// to allow the ordering of components to be consistent,
// we sort them lexicographically by their ID:
sort.Slice(components, func(i, j int) bool {
return components[i].ID < components[j].ID
})
result.Components[i] = components
}
return result, nil
}
// Output returns a reader for the environment's combined output from all components.
func (b *Environment) Output() *Reader {
return b.outputManager.reader()
}
// Cleanup performs cleanup operations for all components within the environment.
// It returns an error if cleaning up any component fails.
func (b *Environment) Cleanup(ctx context.Context) error {
b.Logger(LogLevelInfo, "cleaning up")
g, ctx := errgroup.WithContext(ctx)
for _, layer := range b.components {
for id, component := range layer {
id := id
component := component
g.Go(func() error {
b.Logger(LogLevelInfo, fmt.Sprintf("cleaning up %s", id))
err := component.Cleanup(ctx)
if err != nil {
return fmt.Errorf("could not cleanup %s: %w", id, err)
}
return nil
})
}
}
err := g.Wait()
if err != nil {
return err
}
b.Logger(LogLevelInfo, "finished cleaning up")
return nil
}
func (b *Environment) apply(ctx context.Context, enabledComponentIDs map[string]struct{}) error {
err := b.prepare(ctx, enabledComponentIDs)
if err != nil {
return err
}
for _, layer := range b.components {
g, ctx := errgroup.WithContext(ctx)
for id, component := range layer {
id := id
component := component
_, ok := enabledComponentIDs[id]
if ok {
g.Go(func() error {
status, err := component.Status(ctx)
if err != nil {
return fmt.Errorf("could not get status for %s: %w", id, err)
}
if status == ComponentStatusRunning || status == ComponentStatusStarting {
return nil
}
b.Logger(LogLevelInfo, fmt.Sprintf("starting %s", id))
err = component.Start(ctx)
if err != nil {
return fmt.Errorf("could not start %s: %w", id, err)
}
b.Logger(LogLevelInfo, fmt.Sprintf("finished starting %s", id))
return nil
})
} else {
g.Go(func() error {
b.Logger(LogLevelInfo, fmt.Sprintf("stopping %s", id))
err := component.Stop(ctx)
if err != nil {
return fmt.Errorf("could not stop %s: %w", id, err)
}
b.Logger(LogLevelInfo, fmt.Sprintf("finished stopping %s", id))
return nil
})
}
}
err := g.Wait()
if err != nil {
return err
}
}
return nil
}
func (b *Environment) prepare(ctx context.Context, enabledComponentIDs map[string]struct{}) error {
g, ctx := errgroup.WithContext(ctx)
for _, layer := range b.components {
for id, component := range layer {
_, ok := enabledComponentIDs[id]
if !ok {
continue
}
id := id
component := component
g.Go(func() error {
status, err := component.Status(ctx)
if err != nil {
return fmt.Errorf("could not get status for %s: %w", id, err)
}
if status == ComponentStatusRunning || status == ComponentStatusStarting {
return nil
}
b.Logger(LogLevelInfo, fmt.Sprintf("preparing %s", id))
err = component.Prepare(ctx)
if err != nil {
return fmt.Errorf("could not prepare %s: %w", id, err)
}
b.Logger(LogLevelInfo, fmt.Sprintf("finished preparing %s", id))
return nil
})
}
}
return g.Wait()
}
func (b *Environment) componentByID(componentID string) (Component, error) {
component := b.componentsByID[componentID]
if component == nil {
return nil, ErrInvalidComponentID{id: componentID, msg: "not found"}
}
return component, nil
}
var (
// ErrEmptyEnvID indicates that an empty environment ID was provided.
ErrEmptyEnvID = errors.New("environment ID cannot be empty")
// ErrNilGraph indicates that a nil component graph was provided.
ErrNilGraph = errors.New("environment component graph cannot be nil")
)
// ErrInvalidComponentID represents an error when a component ID is invalid.
type ErrInvalidComponentID struct {
id string
msg string
}
func (e ErrInvalidComponentID) Error() string {
return fmt.Sprintf("component id '%s' is invalid: %s", e.id, e.msg)
}