forked from influxdata/telegraf
-
Notifications
You must be signed in to change notification settings - Fork 1
/
goshim.go
137 lines (118 loc) · 2.9 KB
/
goshim.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
package shim
import (
"context"
"fmt"
"io"
"os"
"os/signal"
"strings"
"syscall"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/serializers/influx"
)
type empty struct{}
var (
forever = 100 * 365 * 24 * time.Hour
envVarEscaper = strings.NewReplacer(
`"`, `\"`,
`\`, `\\`,
)
)
const (
// PollIntervalDisabled is used to indicate that you want to disable polling,
// as opposed to duration 0 meaning poll constantly.
PollIntervalDisabled = time.Duration(0)
)
// Shim allows you to wrap your inputs and run them as if they were part of Telegraf,
// except built externally.
type Shim struct {
Input telegraf.Input
Processor telegraf.StreamingProcessor
Output telegraf.Output
log *Logger
// streams
stdin io.Reader
stdout io.Writer
stderr io.Writer
// outgoing metric channel
metricCh chan telegraf.Metric
// input only
gatherPromptCh chan empty
}
// New creates a new shim interface
func New() *Shim {
return &Shim{
metricCh: make(chan telegraf.Metric, 1),
stdin: os.Stdin,
stdout: os.Stdout,
stderr: os.Stderr,
log: NewLogger(),
}
}
func (s *Shim) watchForShutdown(cancel context.CancelFunc) {
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-quit // user-triggered quit
// cancel, but keep looping until the metric channel closes.
cancel()
}()
}
// Run the input plugins..
func (s *Shim) Run(pollInterval time.Duration) error {
if s.Input != nil {
err := s.RunInput(pollInterval)
if err != nil {
return fmt.Errorf("RunInput error: %w", err)
}
} else if s.Processor != nil {
err := s.RunProcessor()
if err != nil {
return fmt.Errorf("RunProcessor error: %w", err)
}
} else if s.Output != nil { //nolint:revive // Not simplifying here to stay in the structure for better understanding the code
err := s.RunOutput()
if err != nil {
return fmt.Errorf("RunOutput error: %w", err)
}
} else {
return fmt.Errorf("nothing to run")
}
return nil
}
func hasQuit(ctx context.Context) bool {
return ctx.Err() != nil
}
func (s *Shim) writeProcessedMetrics() error {
serializer := influx.NewSerializer()
for { //nolint:gosimple // for-select used on purpose
select {
case m, open := <-s.metricCh:
if !open {
return nil
}
b, err := serializer.Serialize(m)
if err != nil {
return fmt.Errorf("failed to serialize metric: %s", err)
}
// Write this to stdout
_, err = fmt.Fprint(s.stdout, string(b))
if err != nil {
return fmt.Errorf("failed to write metric: %s", err)
}
}
}
}
// LogName satisfies the MetricMaker interface
func (s *Shim) LogName() string {
return ""
}
// MakeMetric satisfies the MetricMaker interface
func (s *Shim) MakeMetric(m telegraf.Metric) telegraf.Metric {
return m // don't need to do anything to it.
}
// Log satisfies the MetricMaker interface
func (s *Shim) Log() telegraf.Logger {
return s.log
}