-
Notifications
You must be signed in to change notification settings - Fork 8
/
scheduler.go
91 lines (79 loc) · 2.57 KB
/
scheduler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
package cff
import (
"sync/atomic"
"go.uber.org/cff/scheduler"
)
// We re-export things here so that users of cff don't have to add other
// packages as dependencies to their BUILD.bazel.
// Job is a job prepared to be enqueued to the cff scheduler.
//
// This is intended to be used by cff's generated code.
// Do not use directly.
// This can change without warning.
type Job = scheduler.Job
// AtomicBool is a type-safe means of reading and writing boolean values.
//
// This is intended to be used by cff's generated code.
// Do not use directly.
// This can change without warning.
type AtomicBool = atomic.Bool
// TODO(abg): For Go 1.19 or newer, we can use sync/atomic.Bool
// which drops one more dependency for users.
// ScheduledJob is a job that has been scheduled for execution with the cff
// scheduler.
//
// This is intended to be used by cff's generated code.
// Do not use directly.
// This can change without warning.
type ScheduledJob = scheduler.ScheduledJob
// SchedulerParams configures the cff scheduler.
//
// This is intended to be used by cff's generated code.
// Do not use directly.
// This can change without warning.
type SchedulerParams struct {
// Concurrency specifies the number of concurrent workers
// used by the scheduler to run jobs.
Concurrency int
// Emitter provides an emitter for the scheduler.
Emitter SchedulerEmitter
// ContinueOnError when true directs the scheduler to continue running
// through job errors.
ContinueOnError bool
}
// NewScheduler starts up a cff scheduler for use by Flow or Parallel.
//
// sched := cff.NewScheduler(..)
// j1 := sched.Enqueue(cff.Job{...}
// j2 := sched.Enqueue(cff.Job{..., Dependencies: []*cff.ScheduledJob{j1}}
// // ...
// err := sched.Wait()
//
// This is intended to be used by cff's generated code.
// Do not use directly.
// This can change without warning.
func NewScheduler(p SchedulerParams) *scheduler.Scheduler {
cfg := scheduler.Config{
Concurrency: p.Concurrency,
Emitter: adaptSchedulerEmitter(p.Emitter),
ContinueOnError: p.ContinueOnError,
}
return cfg.New()
}
// schedulerAdapter adapts a SchedulerEmitter into a scheduler.Emitter.
type schedulerAdapter struct {
emitter SchedulerEmitter
}
func adaptSchedulerEmitter(e SchedulerEmitter) scheduler.Emitter {
if _, isNop := e.(*nopEmitter); e == nil || isNop {
// Avoid the cost of a live ticker in the scheduler if we're using
// no emitter or a no-op emitter.
return nil
}
return schedulerAdapter{
emitter: e,
}
}
func (s schedulerAdapter) Emit(state scheduler.State) {
s.emitter.EmitScheduler(SchedulerState(state))
}