diff --git a/plugins/inputs/p4runtime/p4runtime_test.go b/plugins/inputs/p4runtime/p4runtime_test.go index 58dbb8336ceaa..2972963fc0fed 100644 --- a/plugins/inputs/p4runtime/p4runtime_test.go +++ b/plugins/inputs/p4runtime/p4runtime_test.go @@ -43,7 +43,7 @@ func createEntityCounterEntry( } } -func NewTestP4RuntimeClient( +func newTestP4RuntimeClient( p4RuntimeClient *fakeP4RuntimeClient, addr string, t *testing.T, @@ -102,7 +102,7 @@ func TestErrorGetP4Info(t *testing.T) { listener, err := net.Listen("tcp", "127.0.0.1:0") require.NoError(t, err) - plugin := NewTestP4RuntimeClient(p4RtClient, listener.Addr().String(), t) + plugin := newTestP4RuntimeClient(p4RtClient, listener.Addr().String(), t) var acc testutil.Accumulator require.Error(t, plugin.Gather(&acc)) @@ -245,7 +245,7 @@ func TestOneCounterRead(t *testing.T) { listener, err := net.Listen("tcp", "127.0.0.1:0") require.NoError(t, err) - plugin := NewTestP4RuntimeClient(p4RtClient, listener.Addr().String(), t) + plugin := newTestP4RuntimeClient(p4RtClient, listener.Addr().String(), t) var acc testutil.Accumulator require.NoError(t, plugin.Gather(&acc)) @@ -333,7 +333,7 @@ func TestMultipleEntitiesSingleCounterRead(t *testing.T) { listener, err := net.Listen("tcp", "127.0.0.1:0") require.NoError(t, err) - plugin := NewTestP4RuntimeClient(p4RtClient, listener.Addr().String(), t) + plugin := newTestP4RuntimeClient(p4RtClient, listener.Addr().String(), t) var acc testutil.Accumulator require.NoError(t, plugin.Gather(&acc)) @@ -425,7 +425,7 @@ func TestSingleEntitiesMultipleCounterRead(t *testing.T) { listener, err := net.Listen("tcp", "127.0.0.1:0") require.NoError(t, err) - plugin := NewTestP4RuntimeClient(p4RtClient, listener.Addr().String(), t) + plugin := newTestP4RuntimeClient(p4RtClient, listener.Addr().String(), t) var acc testutil.Accumulator require.NoError(t, plugin.Gather(&acc)) @@ -457,7 +457,7 @@ func TestNoCountersAvailable(t *testing.T) { listener, err := net.Listen("tcp", "127.0.0.1:0") require.NoError(t, err) - plugin := NewTestP4RuntimeClient(p4RtClient, listener.Addr().String(), t) + plugin := newTestP4RuntimeClient(p4RtClient, listener.Addr().String(), t) var acc testutil.Accumulator require.NoError(t, plugin.Gather(&acc)) @@ -484,7 +484,7 @@ func TestFilterCounters(t *testing.T) { listener, err := net.Listen("tcp", "127.0.0.1:0") require.NoError(t, err) - plugin := NewTestP4RuntimeClient(p4RtClient, listener.Addr().String(), t) + plugin := newTestP4RuntimeClient(p4RtClient, listener.Addr().String(), t) plugin.CounterNamesInclude = []string{"oof"} @@ -534,7 +534,7 @@ func TestFailReadCounterEntryFromEntry(t *testing.T) { listener, err := net.Listen("tcp", "127.0.0.1:0") require.NoError(t, err) - plugin := NewTestP4RuntimeClient(p4RtClient, listener.Addr().String(), t) + plugin := newTestP4RuntimeClient(p4RtClient, listener.Addr().String(), t) var acc testutil.Accumulator require.NoError(t, plugin.Gather(&acc)) @@ -577,7 +577,7 @@ func TestFailReadAllEntries(t *testing.T) { listener, err := net.Listen("tcp", "127.0.0.1:0") require.NoError(t, err) - plugin := NewTestP4RuntimeClient(p4RtClient, listener.Addr().String(), t) + plugin := newTestP4RuntimeClient(p4RtClient, listener.Addr().String(), t) var acc testutil.Accumulator require.NoError(t, plugin.Gather(&acc)) diff --git a/plugins/inputs/passenger/passenger.go b/plugins/inputs/passenger/passenger.go index 7123cc70b012d..0175a5000ced5 100644 --- a/plugins/inputs/passenger/passenger.go +++ b/plugins/inputs/passenger/passenger.go @@ -19,22 +19,8 @@ import ( //go:embed sample.conf var sampleConfig string -type passenger struct { - Command string -} - -func (p *passenger) parseCommand() (string, []string) { - var arguments []string - if !strings.Contains(p.Command, " ") { - return p.Command, arguments - } - - arguments = strings.Split(p.Command, " ") - if len(arguments) == 1 { - return arguments[0], arguments[1:] - } - - return arguments[0], arguments[1:] +type Passenger struct { + Command string `toml:"command"` } type info struct { @@ -91,6 +77,39 @@ type process struct { ProcessGroupID string `xml:"process_group_id"` } +func (*Passenger) SampleConfig() string { + return sampleConfig +} + +func (p *Passenger) Gather(acc telegraf.Accumulator) error { + if p.Command == "" { + p.Command = "passenger-status -v --show=xml" + } + + cmd, args := p.parseCommand() + out, err := exec.Command(cmd, args...).Output() + + if err != nil { + return err + } + + return importMetric(out, acc) +} + +func (p *Passenger) parseCommand() (string, []string) { + var arguments []string + if !strings.Contains(p.Command, " ") { + return p.Command, arguments + } + + arguments = strings.Split(p.Command, " ") + if len(arguments) == 1 { + return arguments[0], arguments[1:] + } + + return arguments[0], arguments[1:] +} + func (p *process) getUptime() int64 { if p.Uptime == "" { return 0 @@ -131,25 +150,6 @@ func (p *process) getUptime() int64 { return uptime } -func (*passenger) SampleConfig() string { - return sampleConfig -} - -func (p *passenger) Gather(acc telegraf.Accumulator) error { - if p.Command == "" { - p.Command = "passenger-status -v --show=xml" - } - - cmd, args := p.parseCommand() - out, err := exec.Command(cmd, args...).Output() - - if err != nil { - return err - } - - return importMetric(out, acc) -} - func importMetric(stat []byte, acc telegraf.Accumulator) error { var p info @@ -231,6 +231,6 @@ func importMetric(stat []byte, acc telegraf.Accumulator) error { func init() { inputs.Add("passenger", func() telegraf.Input { - return &passenger{} + return &Passenger{} }) } diff --git a/plugins/inputs/passenger/passenger_test.go b/plugins/inputs/passenger/passenger_test.go index 6c53578d7e636..49411d04919d5 100644 --- a/plugins/inputs/passenger/passenger_test.go +++ b/plugins/inputs/passenger/passenger_test.go @@ -39,7 +39,7 @@ func teardown(tempFilePath string) { } func Test_Invalid_Passenger_Status_Cli(t *testing.T) { - r := &passenger{ + r := &Passenger{ Command: "an-invalid-command passenger-status", } @@ -55,7 +55,7 @@ func Test_Invalid_Xml(t *testing.T) { require.NoError(t, err) defer teardown(tempFilePath) - r := &passenger{ + r := &Passenger{ Command: tempFilePath, } @@ -72,7 +72,7 @@ func Test_Default_Config_Load_Default_Command(t *testing.T) { require.NoError(t, err) defer teardown(tempFilePath) - r := &passenger{} + r := &Passenger{} var acc testutil.Accumulator @@ -87,7 +87,7 @@ func TestPassengerGenerateMetric(t *testing.T) { defer teardown(tempFilePath) // Now we tested again above server, with our authentication data - r := &passenger{ + r := &Passenger{ Command: tempFilePath, } diff --git a/plugins/inputs/pf/pf.go b/plugins/inputs/pf/pf.go index 204c30a5dbc96..20709aaf750d9 100644 --- a/plugins/inputs/pf/pf.go +++ b/plugins/inputs/pf/pf.go @@ -18,26 +18,81 @@ import ( //go:embed sample.conf var sampleConfig string -const measurement = "pf" -const pfctlCommand = "pfctl" +var ( + errParseHeader = fmt.Errorf("cannot find header in %s output", pfctlCommand) + anyTableHeaderRE = regexp.MustCompile("^[A-Z]") + stateTableRE = regexp.MustCompile(`^ (.*?)\s+(\d+)`) + counterTableRE = regexp.MustCompile(`^ (.*?)\s+(\d+)`) + execLookPath = exec.LookPath + execCommand = exec.Command + pfctlOutputStanzas = []*pfctlOutputStanza{ + { + headerRE: regexp.MustCompile("^State Table"), + parseFunc: parseStateTable, + }, + { + headerRE: regexp.MustCompile("^Counters"), + parseFunc: parseCounterTable, + }, + } + stateTable = []*entry{ + {"entries", "current entries", -1}, + {"searches", "searches", -1}, + {"inserts", "inserts", -1}, + {"removals", "removals", -1}, + } + counterTable = []*entry{ + {"match", "match", -1}, + {"bad-offset", "bad-offset", -1}, + {"fragment", "fragment", -1}, + {"short", "short", -1}, + {"normalize", "normalize", -1}, + {"memory", "memory", -1}, + {"bad-timestamp", "bad-timestamp", -1}, + {"congestion", "congestion", -1}, + {"ip-option", "ip-option", -1}, + {"proto-cksum", "proto-cksum", -1}, + {"state-mismatch", "state-mismatch", -1}, + {"state-insert", "state-insert", -1}, + {"state-limit", "state-limit", -1}, + {"src-limit", "src-limit", -1}, + {"synproxy", "synproxy", -1}, + } +) + +const ( + measurement = "pf" + pfctlCommand = "pfctl" +) type PF struct { - PfctlCommand string - PfctlArgs []string - UseSudo bool - StateTable []*Entry + UseSudo bool `toml:"use_sudo"` + + pfctlCommand string + pfctlArgs []string infoFunc func() (string, error) } +type pfctlOutputStanza struct { + headerRE *regexp.Regexp + parseFunc func([]string, map[string]interface{}) error + found bool +} + +type entry struct { + field string + pfctlTitle string + value int64 +} + func (*PF) SampleConfig() string { return sampleConfig } -// Gather is the entrypoint for the plugin. func (pf *PF) Gather(acc telegraf.Accumulator) error { - if pf.PfctlCommand == "" { + if pf.pfctlCommand == "" { var err error - if pf.PfctlCommand, pf.PfctlArgs, err = pf.buildPfctlCmd(); err != nil { + if pf.pfctlCommand, pf.pfctlArgs, err = pf.buildPfctlCmd(); err != nil { acc.AddError(fmt.Errorf("can't construct pfctl commandline: %w", err)) return nil } @@ -55,38 +110,17 @@ func (pf *PF) Gather(acc telegraf.Accumulator) error { return nil } -var errParseHeader = fmt.Errorf("cannot find header in %s output", pfctlCommand) - func errMissingData(tag string) error { return fmt.Errorf("struct data for tag %q not found in %s output", tag, pfctlCommand) } -type pfctlOutputStanza struct { - HeaderRE *regexp.Regexp - ParseFunc func([]string, map[string]interface{}) error - Found bool -} - -var pfctlOutputStanzas = []*pfctlOutputStanza{ - { - HeaderRE: regexp.MustCompile("^State Table"), - ParseFunc: parseStateTable, - }, - { - HeaderRE: regexp.MustCompile("^Counters"), - ParseFunc: parseCounterTable, - }, -} - -var anyTableHeaderRE = regexp.MustCompile("^[A-Z]") - func (pf *PF) parsePfctlOutput(pfoutput string, acc telegraf.Accumulator) error { fields := make(map[string]interface{}) scanner := bufio.NewScanner(strings.NewReader(pfoutput)) for scanner.Scan() { line := scanner.Text() for _, s := range pfctlOutputStanzas { - if s.HeaderRE.MatchString(line) { + if s.headerRE.MatchString(line) { var stanzaLines []string scanner.Scan() line = scanner.Text() @@ -98,15 +132,15 @@ func (pf *PF) parsePfctlOutput(pfoutput string, acc telegraf.Accumulator) error } line = scanner.Text() } - if perr := s.ParseFunc(stanzaLines, fields); perr != nil { + if perr := s.parseFunc(stanzaLines, fields); perr != nil { return perr } - s.Found = true + s.found = true } } } for _, s := range pfctlOutputStanzas { - if !s.Found { + if !s.found { return errParseHeader } } @@ -115,57 +149,22 @@ func (pf *PF) parsePfctlOutput(pfoutput string, acc telegraf.Accumulator) error return nil } -type Entry struct { - Field string - PfctlTitle string - Value int64 -} - -var StateTable = []*Entry{ - {"entries", "current entries", -1}, - {"searches", "searches", -1}, - {"inserts", "inserts", -1}, - {"removals", "removals", -1}, -} - -var stateTableRE = regexp.MustCompile(`^ (.*?)\s+(\d+)`) - func parseStateTable(lines []string, fields map[string]interface{}) error { - return storeFieldValues(lines, stateTableRE, fields, StateTable) + return storeFieldValues(lines, stateTableRE, fields, stateTable) } -var CounterTable = []*Entry{ - {"match", "match", -1}, - {"bad-offset", "bad-offset", -1}, - {"fragment", "fragment", -1}, - {"short", "short", -1}, - {"normalize", "normalize", -1}, - {"memory", "memory", -1}, - {"bad-timestamp", "bad-timestamp", -1}, - {"congestion", "congestion", -1}, - {"ip-option", "ip-option", -1}, - {"proto-cksum", "proto-cksum", -1}, - {"state-mismatch", "state-mismatch", -1}, - {"state-insert", "state-insert", -1}, - {"state-limit", "state-limit", -1}, - {"src-limit", "src-limit", -1}, - {"synproxy", "synproxy", -1}, -} - -var counterTableRE = regexp.MustCompile(`^ (.*?)\s+(\d+)`) - func parseCounterTable(lines []string, fields map[string]interface{}) error { - return storeFieldValues(lines, counterTableRE, fields, CounterTable) + return storeFieldValues(lines, counterTableRE, fields, counterTable) } -func storeFieldValues(lines []string, regex *regexp.Regexp, fields map[string]interface{}, entryTable []*Entry) error { +func storeFieldValues(lines []string, regex *regexp.Regexp, fields map[string]interface{}, entryTable []*entry) error { for _, v := range lines { entries := regex.FindStringSubmatch(v) if entries != nil { for _, f := range entryTable { - if f.PfctlTitle == entries[1] { + if f.pfctlTitle == entries[1] { var err error - if f.Value, err = strconv.ParseInt(entries[2], 10, 64); err != nil { + if f.value, err = strconv.ParseInt(entries[2], 10, 64); err != nil { return err } } @@ -174,17 +173,17 @@ func storeFieldValues(lines []string, regex *regexp.Regexp, fields map[string]in } for _, v := range entryTable { - if v.Value == -1 { - return errMissingData(v.PfctlTitle) + if v.value == -1 { + return errMissingData(v.pfctlTitle) } - fields[v.Field] = v.Value + fields[v.field] = v.value } return nil } func (pf *PF) callPfctl() (string, error) { - cmd := execCommand(pf.PfctlCommand, pf.PfctlArgs...) + cmd := execCommand(pf.pfctlCommand, pf.pfctlArgs...) out, oerr := cmd.Output() if oerr != nil { var ee *exec.ExitError @@ -196,9 +195,6 @@ func (pf *PF) callPfctl() (string, error) { return string(out), oerr } -var execLookPath = exec.LookPath -var execCommand = exec.Command - func (pf *PF) buildPfctlCmd() (string, []string, error) { cmd, err := execLookPath(pfctlCommand) if err != nil { diff --git a/plugins/inputs/pgbouncer/pgbouncer.go b/plugins/inputs/pgbouncer/pgbouncer.go index 4d079e1731f0a..2c6ccf43bc4bd 100644 --- a/plugins/inputs/pgbouncer/pgbouncer.go +++ b/plugins/inputs/pgbouncer/pgbouncer.go @@ -16,6 +16,11 @@ import ( //go:embed sample.conf var sampleConfig string +var ignoredColumns = map[string]bool{"user": true, "database": true, "pool_mode": true, + "avg_req": true, "avg_recv": true, "avg_sent": true, "avg_query": true, + "force_user": true, "host": true, "port": true, "name": true, +} + type PgBouncer struct { ShowCommands []string `toml:"show_commands"` postgresql.Config @@ -23,11 +28,6 @@ type PgBouncer struct { service *postgresql.Service } -var ignoredColumns = map[string]bool{"user": true, "database": true, "pool_mode": true, - "avg_req": true, "avg_recv": true, "avg_sent": true, "avg_query": true, - "force_user": true, "host": true, "port": true, "name": true, -} - func (*PgBouncer) SampleConfig() string { return sampleConfig } @@ -58,10 +58,6 @@ func (p *PgBouncer) Start(_ telegraf.Accumulator) error { return p.service.Start() } -func (p *PgBouncer) Stop() { - p.service.Stop() -} - func (p *PgBouncer) Gather(acc telegraf.Accumulator) error { for _, cmd := range p.ShowCommands { switch cmd { @@ -87,6 +83,10 @@ func (p *PgBouncer) Gather(acc telegraf.Accumulator) error { return nil } +func (p *PgBouncer) Stop() { + p.service.Stop() +} + func (p *PgBouncer) accRow(row *sql.Rows, columns []string) (map[string]string, map[string]*interface{}, error) { var dbname bytes.Buffer diff --git a/plugins/inputs/phpfpm/child.go b/plugins/inputs/phpfpm/child.go index 3448db40be4a9..f921dc4bf13d2 100644 --- a/plugins/inputs/phpfpm/child.go +++ b/plugins/inputs/phpfpm/child.go @@ -10,10 +10,8 @@ import ( "errors" "fmt" "io" - "net" "net/http" "net/http/cgi" - "os" "strings" "sync" "time" @@ -164,13 +162,13 @@ var errCloseConn = errors.New("fcgi: connection should be closed") var emptyBody = io.NopCloser(strings.NewReader("")) -// ErrRequestAborted is returned by Read when a handler attempts to read the +// errRequestAborted is returned by Read when a handler attempts to read the // body of a request that has been aborted by the web server. -var ErrRequestAborted = errors.New("fcgi: request aborted by web server") +var errRequestAborted = errors.New("fcgi: request aborted by web server") -// ErrConnClosed is returned by Read when a handler attempts to read the body of +// errConnClosed is returned by Read when a handler attempts to read the body of // a request after the connection to the web server has been closed. -var ErrConnClosed = errors.New("fcgi: connection to web server closed") +var errConnClosed = errors.New("fcgi: connection to web server closed") func (c *child) handleRecord(rec *record) error { c.mu.Lock() @@ -249,7 +247,7 @@ func (c *child) handleRecord(rec *record) error { return err } if req.pw != nil { - req.pw.CloseWithError(ErrRequestAborted) + req.pw.CloseWithError(errRequestAborted) } if !req.keepConn { // connection will close upon return @@ -306,34 +304,7 @@ func (c *child) cleanUp() { if req.pw != nil { // race with call to Close in c.serveRequest doesn't matter because // Pipe(Reader|Writer).Close are idempotent - req.pw.CloseWithError(ErrConnClosed) + req.pw.CloseWithError(errConnClosed) } } } - -// Serve accepts incoming FastCGI connections on the listener l, creating a new -// goroutine for each. The goroutine reads requests and then calls handler -// to reply to them. -// If l is nil, Serve accepts connections from os.Stdin. -// If handler is nil, http.DefaultServeMux is used. -func Serve(l net.Listener, handler http.Handler) error { - if l == nil { - var err error - l, err = net.FileListener(os.Stdin) - if err != nil { - return err - } - defer l.Close() - } - if handler == nil { - handler = http.DefaultServeMux - } - for { - rw, err := l.Accept() - if err != nil { - return err - } - c := newChild(rw, handler) - go c.serve() - } -} diff --git a/plugins/inputs/phpfpm/fcgi_client.go b/plugins/inputs/phpfpm/fcgi_client.go index f33b68d0af9a5..e982471b3d0e6 100644 --- a/plugins/inputs/phpfpm/fcgi_client.go +++ b/plugins/inputs/phpfpm/fcgi_client.go @@ -44,7 +44,7 @@ func newFcgiClient(timeout time.Duration, h string, args ...interface{}) (*conn, return &conn{rwc: con}, nil } -func (c *conn) Request(env map[string]string, requestData string) (retout, reterr []byte, err error) { +func (c *conn) request(env map[string]string, requestData string) (retout, reterr []byte, err error) { defer c.rwc.Close() var reqID uint16 = 1 diff --git a/plugins/inputs/phpfpm/fcgi_test.go b/plugins/inputs/phpfpm/fcgi_test.go index f96c22b6fec90..d039685bb05f8 100644 --- a/plugins/inputs/phpfpm/fcgi_test.go +++ b/plugins/inputs/phpfpm/fcgi_test.go @@ -206,7 +206,7 @@ var cleanUpTests = []struct { makeRecord(typeAbortRequest, nil), }, nil), - ErrRequestAborted, + errRequestAborted, }, // confirm that child.serve closes all pipes after error reading record { @@ -215,7 +215,7 @@ var cleanUpTests = []struct { nil, }, nil), - ErrConnClosed, + errConnClosed, }, } diff --git a/plugins/inputs/phpfpm/phpfpm.go b/plugins/inputs/phpfpm/phpfpm.go index e1b3ce515fd30..9b3c5dc2704c4 100644 --- a/plugins/inputs/phpfpm/phpfpm.go +++ b/plugins/inputs/phpfpm/phpfpm.go @@ -26,22 +26,31 @@ import ( var sampleConfig string const ( - PfPool = "pool" - PfProcessManager = "process manager" - PfStartSince = "start since" - PfAcceptedConn = "accepted conn" - PfListenQueue = "listen queue" - PfMaxListenQueue = "max listen queue" - PfListenQueueLen = "listen queue len" - PfIdleProcesses = "idle processes" - PfActiveProcesses = "active processes" - PfTotalProcesses = "total processes" - PfMaxActiveProcesses = "max active processes" - PfMaxChildrenReached = "max children reached" - PfSlowRequests = "slow requests" + pfPool = "pool" + pfStartSince = "start since" + pfAcceptedConn = "accepted conn" + pfListenQueue = "listen queue" + pfMaxListenQueue = "max listen queue" + pfListenQueueLen = "listen queue len" + pfIdleProcesses = "idle processes" + pfActiveProcesses = "active processes" + pfTotalProcesses = "total processes" + pfMaxActiveProcesses = "max active processes" + pfMaxChildrenReached = "max children reached" + pfSlowRequests = "slow requests" ) -type JSONMetrics struct { +type Phpfpm struct { + Format string `toml:"format"` + Timeout config.Duration `toml:"timeout"` + Urls []string `toml:"urls"` + Log telegraf.Logger `toml:"-"` + tls.ClientConfig + + client *http.Client +} + +type jsonMetrics struct { Pool string `json:"pool"` ProcessManager string `json:"process manager"` StartTime int `json:"start time"` @@ -76,21 +85,11 @@ type JSONMetrics struct { type metricStat map[string]int64 type poolStat map[string]metricStat -type phpfpm struct { - Format string `toml:"format"` - Timeout config.Duration `toml:"timeout"` - Urls []string `toml:"urls"` - Log telegraf.Logger `toml:"-"` - tls.ClientConfig - - client *http.Client -} - -func (*phpfpm) SampleConfig() string { +func (*Phpfpm) SampleConfig() string { return sampleConfig } -func (p *phpfpm) Init() error { +func (p *Phpfpm) Init() error { if len(p.Urls) == 0 { p.Urls = []string{"http://127.0.0.1/status"} } @@ -118,9 +117,7 @@ func (p *phpfpm) Init() error { return nil } -// Reads stats from all configured servers accumulates stats. -// Returns one of the errors encountered while gather stats (if any). -func (p *phpfpm) Gather(acc telegraf.Accumulator) error { +func (p *Phpfpm) Gather(acc telegraf.Accumulator) error { var wg sync.WaitGroup for _, serv := range expandUrls(acc, p.Urls) { wg.Add(1) @@ -136,7 +133,7 @@ func (p *phpfpm) Gather(acc telegraf.Accumulator) error { } // Request status page to get stat raw data and import it -func (p *phpfpm) gatherServer(addr string, acc telegraf.Accumulator) error { +func (p *Phpfpm) gatherServer(addr string, acc telegraf.Accumulator) error { if strings.HasPrefix(addr, "http://") || strings.HasPrefix(addr, "https://") { return p.gatherHTTP(addr, acc) } @@ -187,8 +184,8 @@ func (p *phpfpm) gatherServer(addr string, acc telegraf.Accumulator) error { } // Gather stat using fcgi protocol -func (p *phpfpm) gatherFcgi(fcgi *conn, statusPath string, acc telegraf.Accumulator, addr string) error { - fpmOutput, fpmErr, err := fcgi.Request(map[string]string{ +func (p *Phpfpm) gatherFcgi(fcgi *conn, statusPath string, acc telegraf.Accumulator, addr string) error { + fpmOutput, fpmErr, err := fcgi.request(map[string]string{ "SCRIPT_NAME": "/" + statusPath, "SCRIPT_FILENAME": statusPath, "REQUEST_METHOD": "GET", @@ -206,7 +203,7 @@ func (p *phpfpm) gatherFcgi(fcgi *conn, statusPath string, acc telegraf.Accumula } // Gather stat using http protocol -func (p *phpfpm) gatherHTTP(addr string, acc telegraf.Accumulator) error { +func (p *Phpfpm) gatherHTTP(addr string, acc telegraf.Accumulator) error { u, err := url.Parse(addr) if err != nil { return fmt.Errorf("unable parse server address %q: %w", addr, err) @@ -232,7 +229,7 @@ func (p *phpfpm) gatherHTTP(addr string, acc telegraf.Accumulator) error { } // Import stat data into Telegraf system -func (p *phpfpm) importMetric(r io.Reader, acc telegraf.Accumulator, addr string) { +func (p *Phpfpm) importMetric(r io.Reader, acc telegraf.Accumulator, addr string) { if p.Format == "json" { p.parseJSON(r, acc, addr) } else { @@ -254,7 +251,7 @@ func parseLines(r io.Reader, acc telegraf.Accumulator, addr string) { } fieldName := strings.Trim(keyvalue[0], " ") // We start to gather data for a new pool here - if fieldName == PfPool { + if fieldName == pfPool { currentPool = strings.Trim(keyvalue[1], " ") stats[currentPool] = make(metricStat) continue @@ -262,17 +259,17 @@ func parseLines(r io.Reader, acc telegraf.Accumulator, addr string) { // Start to parse metric for current pool switch fieldName { - case PfStartSince, - PfAcceptedConn, - PfListenQueue, - PfMaxListenQueue, - PfListenQueueLen, - PfIdleProcesses, - PfActiveProcesses, - PfTotalProcesses, - PfMaxActiveProcesses, - PfMaxChildrenReached, - PfSlowRequests: + case pfStartSince, + pfAcceptedConn, + pfListenQueue, + pfMaxListenQueue, + pfListenQueueLen, + pfIdleProcesses, + pfActiveProcesses, + pfTotalProcesses, + pfMaxActiveProcesses, + pfMaxChildrenReached, + pfSlowRequests: fieldValue, err := strconv.ParseInt(strings.Trim(keyvalue[1], " "), 10, 64) if err == nil { stats[currentPool][fieldName] = fieldValue @@ -294,8 +291,8 @@ func parseLines(r io.Reader, acc telegraf.Accumulator, addr string) { } } -func (p *phpfpm) parseJSON(r io.Reader, acc telegraf.Accumulator, addr string) { - var metrics JSONMetrics +func (p *Phpfpm) parseJSON(r io.Reader, acc telegraf.Accumulator, addr string) { + var metrics jsonMetrics if err := json.NewDecoder(r).Decode(&metrics); err != nil { p.Log.Errorf("Unable to decode JSON response: %s", err) return @@ -402,6 +399,6 @@ func isNetworkURL(addr string) bool { func init() { inputs.Add("phpfpm", func() telegraf.Input { - return &phpfpm{} + return &Phpfpm{} }) } diff --git a/plugins/inputs/phpfpm/phpfpm_test.go b/plugins/inputs/phpfpm/phpfpm_test.go index 92b3affa7ad08..802c761532ccc 100644 --- a/plugins/inputs/phpfpm/phpfpm_test.go +++ b/plugins/inputs/phpfpm/phpfpm_test.go @@ -56,7 +56,7 @@ func TestPhpFpmGeneratesMetrics_From_Http(t *testing.T) { defer ts.Close() url := ts.URL + "?test=ok" - r := &phpfpm{ + r := &Phpfpm{ Urls: []string{url}, Log: &testutil.Logger{}, } @@ -106,7 +106,7 @@ func TestPhpFpmGeneratesJSONMetrics_From_Http(t *testing.T) { expected, err := testutil.ParseMetricsFromFile("testdata/expected.out", parser) require.NoError(t, err) - input := &phpfpm{ + input := &Phpfpm{ Urls: []string{server.URL + "?full&json"}, Format: "json", Log: &testutil.Logger{}, @@ -128,7 +128,7 @@ func TestPhpFpmGeneratesMetrics_From_Fcgi(t *testing.T) { go fcgi.Serve(tcp, s) //nolint:errcheck // ignore the returned error as we cannot do anything about it anyway // Now we tested again above server - r := &phpfpm{ + r := &Phpfpm{ Urls: []string{"fcgi://" + tcp.Addr().String() + "/status"}, Log: &testutil.Logger{}, } @@ -179,7 +179,7 @@ func TestPhpFpmTimeout_From_Fcgi(t *testing.T) { }() // Now we tested again above server - r := &phpfpm{ + r := &Phpfpm{ Urls: []string{"fcgi://" + tcp.Addr().String() + "/status"}, Timeout: config.Duration(timeout), Log: &testutil.Logger{}, @@ -211,7 +211,7 @@ func TestPhpFpmCrashWithTimeout_From_Fcgi(t *testing.T) { const timeout = 200 * time.Millisecond // Now we tested again above server - r := &phpfpm{ + r := &Phpfpm{ Urls: []string{"fcgi://" + tcpAddress + "/status"}, Timeout: config.Duration(timeout), Log: &testutil.Logger{}, @@ -237,7 +237,7 @@ func TestPhpFpmGeneratesMetrics_From_Socket(t *testing.T) { s := statServer{} go fcgi.Serve(tcp, s) //nolint:errcheck // ignore the returned error as we cannot do anything about it anyway - r := &phpfpm{ + r := &Phpfpm{ Urls: []string{tcp.Addr().String()}, Log: &testutil.Logger{}, } @@ -289,7 +289,7 @@ func TestPhpFpmGeneratesMetrics_From_Multiple_Sockets_With_Glob(t *testing.T) { go fcgi.Serve(tcp1, s) //nolint:errcheck // ignore the returned error as we cannot do anything about it anyway go fcgi.Serve(tcp2, s) //nolint:errcheck // ignore the returned error as we cannot do anything about it anyway - r := &phpfpm{ + r := &Phpfpm{ Urls: []string{"/tmp/test-fpm[\\-0-9]*.sock"}, Log: &testutil.Logger{}, } @@ -340,7 +340,7 @@ func TestPhpFpmGeneratesMetrics_From_Socket_Custom_Status_Path(t *testing.T) { s := statServer{} go fcgi.Serve(tcp, s) //nolint:errcheck // ignore the returned error as we cannot do anything about it anyway - r := &phpfpm{ + r := &Phpfpm{ Urls: []string{tcp.Addr().String() + ":custom-status-path"}, Log: &testutil.Logger{}, } @@ -374,7 +374,7 @@ func TestPhpFpmGeneratesMetrics_From_Socket_Custom_Status_Path(t *testing.T) { // When not passing server config, we default to localhost // We just want to make sure we did request stat from localhost func TestPhpFpmDefaultGetFromLocalhost(t *testing.T) { - r := &phpfpm{ + r := &Phpfpm{ Urls: []string{"http://bad.localhost:62001/status"}, Log: &testutil.Logger{}, } @@ -389,7 +389,7 @@ func TestPhpFpmGeneratesMetrics_Throw_Error_When_Fpm_Status_Is_Not_Responding(t t.Skip("Skipping long test in short mode") } - r := &phpfpm{ + r := &Phpfpm{ Urls: []string{"http://aninvalidone"}, Log: &testutil.Logger{}, } @@ -402,7 +402,7 @@ func TestPhpFpmGeneratesMetrics_Throw_Error_When_Fpm_Status_Is_Not_Responding(t } func TestPhpFpmGeneratesMetrics_Throw_Error_When_Socket_Path_Is_Invalid(t *testing.T) { - r := &phpfpm{ + r := &Phpfpm{ Urls: []string{"/tmp/invalid.sock"}, Log: &testutil.Logger{}, } @@ -435,7 +435,7 @@ var outputSampleJSON []byte func TestPhpFpmParseJSON_Log_Error_Without_Panic_When_When_JSON_Is_Invalid(t *testing.T) { // Capture the logging output for checking logger := &testutil.CaptureLogger{Name: "inputs.phpfpm"} - plugin := &phpfpm{Log: logger} + plugin := &Phpfpm{Log: logger} require.NoError(t, plugin.Init()) // parse valid JSON without panic and without log output @@ -459,7 +459,7 @@ func TestGatherDespiteUnavailable(t *testing.T) { go fcgi.Serve(tcp, s) //nolint:errcheck // ignore the returned error as we cannot do anything about it anyway // Now we tested again above server - r := &phpfpm{ + r := &Phpfpm{ Urls: []string{"fcgi://" + tcp.Addr().String() + "/status", "/lala"}, Log: &testutil.Logger{}, } diff --git a/plugins/inputs/ping/ping.go b/plugins/inputs/ping/ping.go index 9f2e692f1cf70..8538d394bc809 100644 --- a/plugins/inputs/ping/ping.go +++ b/plugins/inputs/ping/ping.go @@ -28,71 +28,69 @@ const ( defaultPingDataBytesSize = 56 ) -// HostPinger is a function that runs the "ping" function using a list of -// passed arguments. This can be easily switched with a mocked ping function -// for unit test purposes (see ping_test.go) -type HostPinger func(binary string, timeout float64, args ...string) (string, error) - type Ping struct { - // wg is used to wait for ping with multiple URLs - wg sync.WaitGroup - - // Pre-calculated interval and timeout - calcInterval time.Duration - calcTimeout time.Duration - - sourceAddress string - - Log telegraf.Logger `toml:"-"` - - // Interval at which to ping (ping -i ) - PingInterval float64 `toml:"ping_interval"` - - // Number of pings to send (ping -c ) - Count int - - // Per-ping timeout, in seconds. 0 means no timeout (ping -W ) - Timeout float64 - - // Ping deadline, in seconds. 0 means no deadline. (ping -w ) - Deadline int - - // Interface or source address to send ping from (ping -I/-S ) - Interface string - - // URLs to ping - Urls []string - - // Method defines how to ping (native or exec) - Method string + Urls []string `toml:"urls"` // URLs to ping + Method string `toml:"method"` // Method defines how to ping (native or exec) + Count int `toml:"count"` // Number of pings to send (ping -c ) + PingInterval float64 `toml:"ping_interval"` // Interval at which to ping (ping -i ) + Timeout float64 `toml:"timeout"` // Per-ping timeout, in seconds. 0 means no timeout (ping -W ) + Deadline int `toml:"deadline"` // Ping deadline, in seconds. 0 means no deadline. (ping -w ) + Interface string `toml:"interface"` // Interface or source address to send ping from (ping -I/-S ) + Percentiles []int `toml:"percentiles"` // Calculate the given percentiles when using native method + Binary string `toml:"binary"` // Ping executable binary + // Arguments for ping command. When arguments are not empty, system binary will be used and other options (ping_interval, timeout, etc.) will be ignored + Arguments []string `toml:"arguments"` + IPv4 bool `toml:"ipv4"` // Whether to resolve addresses using ipv4 or not. + IPv6 bool `toml:"ipv6"` // Whether to resolve addresses using ipv6 or not. + Size *int `toml:"size"` // Packet size + Log telegraf.Logger `toml:"-"` + + wg sync.WaitGroup // wg is used to wait for ping with multiple URLs + calcInterval time.Duration // Pre-calculated interval and timeout + calcTimeout time.Duration + sourceAddress string + pingHost hostPingerFunc // host ping function + nativePingFunc nativePingFunc +} - // Ping executable binary - Binary string +// hostPingerFunc is a function that runs the "ping" function using a list of +// passed arguments. This can be easily switched with a mocked ping function +// for unit test purposes (see ping_test.go) +type hostPingerFunc func(binary string, timeout float64, args ...string) (string, error) - // Arguments for ping command. When arguments is not empty, system binary will be used and - // other options (ping_interval, timeout, etc.) will be ignored - Arguments []string +type nativePingFunc func(destination string) (*pingStats, error) - // Whether to resolve addresses using ipv4 or not. - IPv4 bool +type durationSlice []time.Duration - // Whether to resolve addresses using ipv6 or not. - IPv6 bool +type pingStats struct { + ping.Statistics + ttl int +} - // host ping function - pingHost HostPinger +func (*Ping) SampleConfig() string { + return sampleConfig +} - nativePingFunc NativePingFunc +func (p *Ping) Init() error { + if p.Count < 1 { + return errors.New("bad number of packets to transmit") + } - // Calculate the given percentiles when using native method - Percentiles []int + // The interval cannot be below 0.2 seconds, matching ping implementation: https://linux.die.net/man/8/ping + if p.PingInterval < 0.2 { + p.calcInterval = time.Duration(.2 * float64(time.Second)) + } else { + p.calcInterval = time.Duration(p.PingInterval * float64(time.Second)) + } - // Packet size - Size *int -} + // If no timeout is given default to 5 seconds, matching original implementation + if p.Timeout == 0 { + p.calcTimeout = time.Duration(5) * time.Second + } else { + p.calcTimeout = time.Duration(p.Timeout) * time.Second + } -func (*Ping) SampleConfig() string { - return sampleConfig + return nil } func (p *Ping) Gather(acc telegraf.Accumulator) error { @@ -115,13 +113,6 @@ func (p *Ping) Gather(acc telegraf.Accumulator) error { return nil } -type pingStats struct { - ping.Statistics - ttl int -} - -type NativePingFunc func(destination string) (*pingStats, error) - func (p *Ping) nativePing(destination string) (*pingStats, error) { ps := &pingStats{} @@ -259,11 +250,11 @@ func (p *Ping) pingToURLNative(destination string, acc telegraf.Accumulator) { acc.AddFields("ping", fields, tags) } -type durationSlice []time.Duration +func (p durationSlice) Len() int { return len(p) } -func (p durationSlice) Len() int { return len(p) } func (p durationSlice) Less(i, j int) bool { return p[i] < p[j] } -func (p durationSlice) Swap(i, j int) { p[i], p[j] = p[j], p[i] } + +func (p durationSlice) Swap(i, j int) { p[i], p[j] = p[j], p[i] } // R7 from Hyndman and Fan (1996), which matches Excel func percentile(values durationSlice, perc int) time.Duration { @@ -292,29 +283,6 @@ func percentile(values durationSlice, perc int) time.Duration { return lower + time.Duration(rankFraction*float64(upper-lower)) } -// Init ensures the plugin is configured correctly. -func (p *Ping) Init() error { - if p.Count < 1 { - return errors.New("bad number of packets to transmit") - } - - // The interval cannot be below 0.2 seconds, matching ping implementation: https://linux.die.net/man/8/ping - if p.PingInterval < 0.2 { - p.calcInterval = time.Duration(.2 * float64(time.Second)) - } else { - p.calcInterval = time.Duration(p.PingInterval * float64(time.Second)) - } - - // If no timeout is given default to 5 seconds, matching original implementation - if p.Timeout == 0 { - p.calcTimeout = time.Duration(5) * time.Second - } else { - p.calcTimeout = time.Duration(p.Timeout) * time.Second - } - - return nil -} - func hostPinger(binary string, timeout float64, args ...string) (string, error) { bin, err := exec.LookPath(binary) if err != nil { diff --git a/plugins/inputs/ping/ping_windows_test.go b/plugins/inputs/ping/ping_windows_test.go index 4517bf8f33736..93b2bd04ff99a 100644 --- a/plugins/inputs/ping/ping_windows_test.go +++ b/plugins/inputs/ping/ping_windows_test.go @@ -261,7 +261,7 @@ func TestFatalPingGather(t *testing.T) { "Fatal ping should not have packet measurements") } -var UnreachablePingOutput = ` +var unreachablePingOutput = ` Pinging www.google.pl [8.8.8.8] with 32 bytes of data: Request timed out. Request timed out. @@ -273,7 +273,7 @@ Ping statistics for 8.8.8.8: ` func mockUnreachableHostPinger(string, float64, ...string) (string, error) { - return UnreachablePingOutput, errors.New("so very bad") + return unreachablePingOutput, errors.New("so very bad") } // Reply from 185.28.251.217: TTL expired in transit. @@ -312,7 +312,7 @@ func TestUnreachablePingGather(t *testing.T) { "Fatal ping should not have packet measurements") } -var TTLExpiredPingOutput = ` +var ttlExpiredPingOutput = ` Pinging www.google.pl [8.8.8.8] with 32 bytes of data: Request timed out. Request timed out. @@ -324,7 +324,7 @@ Ping statistics for 8.8.8.8: ` func mockTTLExpiredPinger(string, float64, ...string) (string, error) { - return TTLExpiredPingOutput, errors.New("so very bad") + return ttlExpiredPingOutput, errors.New("so very bad") } // in case 'Destination net unreachable' ping app return receive packet which is not what we need diff --git a/plugins/inputs/postfix/postfix.go b/plugins/inputs/postfix/postfix.go index cc5c7024c57e8..f657404d2882e 100644 --- a/plugins/inputs/postfix/postfix.go +++ b/plugins/inputs/postfix/postfix.go @@ -21,6 +21,36 @@ import ( //go:embed sample.conf var sampleConfig string +type Postfix struct { + QueueDirectory string `toml:"queue_directory"` +} + +func (*Postfix) SampleConfig() string { + return sampleConfig +} + +func (p *Postfix) Gather(acc telegraf.Accumulator) error { + if p.QueueDirectory == "" { + var err error + p.QueueDirectory, err = getQueueDirectory() + if err != nil { + return fmt.Errorf("unable to determine queue directory: %w", err) + } + } + + for _, q := range []string{"active", "hold", "incoming", "maildrop", "deferred"} { + fields, err := qScan(filepath.Join(p.QueueDirectory, q), acc) + if err != nil { + acc.AddError(fmt.Errorf("error scanning queue %q: %w", q, err)) + continue + } + + acc.AddFields("postfix_queue", fields, map[string]string{"queue": q}) + } + + return nil +} + func getQueueDirectory() (string, error) { qd, err := exec.Command("postconf", "-h", "queue_directory").Output() if err != nil { @@ -75,36 +105,6 @@ func qScan(path string, acc telegraf.Accumulator) (map[string]interface{}, error return fields, nil } -type Postfix struct { - QueueDirectory string -} - -func (*Postfix) SampleConfig() string { - return sampleConfig -} - -func (p *Postfix) Gather(acc telegraf.Accumulator) error { - if p.QueueDirectory == "" { - var err error - p.QueueDirectory, err = getQueueDirectory() - if err != nil { - return fmt.Errorf("unable to determine queue directory: %w", err) - } - } - - for _, q := range []string{"active", "hold", "incoming", "maildrop", "deferred"} { - fields, err := qScan(filepath.Join(p.QueueDirectory, q), acc) - if err != nil { - acc.AddError(fmt.Errorf("error scanning queue %q: %w", q, err)) - continue - } - - acc.AddFields("postfix_queue", fields, map[string]string{"queue": q}) - } - - return nil -} - func init() { inputs.Add("postfix", func() telegraf.Input { return &Postfix{ diff --git a/plugins/inputs/postfix/postfix_windows.go b/plugins/inputs/postfix/postfix_windows.go index 3b027f24a2ade..9831787ff7194 100644 --- a/plugins/inputs/postfix/postfix_windows.go +++ b/plugins/inputs/postfix/postfix_windows.go @@ -16,11 +16,13 @@ type Postfix struct { Log telegraf.Logger `toml:"-"` } +func (*Postfix) SampleConfig() string { return sampleConfig } + func (p *Postfix) Init() error { - p.Log.Warn("current platform is not supported") + p.Log.Warn("Current platform is not supported") return nil } -func (*Postfix) SampleConfig() string { return sampleConfig } + func (*Postfix) Gather(_ telegraf.Accumulator) error { return nil } func init() { diff --git a/plugins/inputs/postgresql/postgresql.go b/plugins/inputs/postgresql/postgresql.go index dc8f37ca8d6d3..46b2354874cb2 100644 --- a/plugins/inputs/postgresql/postgresql.go +++ b/plugins/inputs/postgresql/postgresql.go @@ -16,6 +16,8 @@ import ( //go:embed sample.conf var sampleConfig string +var ignoredColumns = map[string]bool{"stats_reset": true} + type Postgresql struct { Databases []string `toml:"databases"` IgnoredDatabases []string `toml:"ignored_databases"` @@ -25,8 +27,6 @@ type Postgresql struct { service *postgresql.Service } -var ignoredColumns = map[string]bool{"stats_reset": true} - func (*Postgresql) SampleConfig() string { return sampleConfig } @@ -47,10 +47,6 @@ func (p *Postgresql) Start(_ telegraf.Accumulator) error { return p.service.Start() } -func (p *Postgresql) Stop() { - p.service.Stop() -} - func (p *Postgresql) Gather(acc telegraf.Accumulator) error { var query string if len(p.Databases) == 0 && len(p.IgnoredDatabases) == 0 { @@ -106,6 +102,10 @@ func (p *Postgresql) Gather(acc telegraf.Accumulator) error { return bgWriterRow.Err() } +func (p *Postgresql) Stop() { + p.service.Stop() +} + func (p *Postgresql) accRow(row *sql.Rows, acc telegraf.Accumulator, columns []string) error { var dbname bytes.Buffer diff --git a/plugins/inputs/postgresql_extensible/postgresql_extensible.go b/plugins/inputs/postgresql_extensible/postgresql_extensible.go index a4d867b8435c6..cb10f266bcedd 100644 --- a/plugins/inputs/postgresql_extensible/postgresql_extensible.go +++ b/plugins/inputs/postgresql_extensible/postgresql_extensible.go @@ -21,6 +21,8 @@ import ( //go:embed sample.conf var sampleConfig string +var ignoredColumns = map[string]bool{"stats_reset": true} + type Postgresql struct { Databases []string `deprecated:"1.22.4;use the sqlquery option to specify database to use"` Query []query `toml:"query"` @@ -45,7 +47,9 @@ type query struct { additionalTags map[string]bool } -var ignoredColumns = map[string]bool{"stats_reset": true} +type scanner interface { + Scan(dest ...interface{}) error +} func (*Postgresql) SampleConfig() string { return sampleConfig @@ -102,10 +106,6 @@ func (p *Postgresql) Start(_ telegraf.Accumulator) error { return p.service.Start() } -func (p *Postgresql) Stop() { - p.service.Stop() -} - func (p *Postgresql) Gather(acc telegraf.Accumulator) error { // Retrieving the database version query := `SELECT setting::integer / 100 AS version FROM pg_settings WHERE name = 'server_version_num'` @@ -128,6 +128,10 @@ func (p *Postgresql) Gather(acc telegraf.Accumulator) error { return nil } +func (p *Postgresql) Stop() { + p.service.Stop() +} + func (p *Postgresql) gatherMetricsFromQuery(acc telegraf.Accumulator, q query, timestamp time.Time) error { rows, err := p.service.DB.Query(q.Sqlquery) if err != nil { @@ -150,10 +154,6 @@ func (p *Postgresql) gatherMetricsFromQuery(acc telegraf.Accumulator, q query, t return nil } -type scanner interface { - Scan(dest ...interface{}) error -} - func (p *Postgresql) accRow(acc telegraf.Accumulator, row scanner, columns []string, q query, timestamp time.Time) error { // this is where we'll store the column name with its *interface{} columnMap := make(map[string]*interface{}) diff --git a/plugins/inputs/powerdns/powerdns.go b/plugins/inputs/powerdns/powerdns.go index 44c765348a646..5ac8397e077fc 100644 --- a/plugins/inputs/powerdns/powerdns.go +++ b/plugins/inputs/powerdns/powerdns.go @@ -19,14 +19,13 @@ import ( //go:embed sample.conf var sampleConfig string -type Powerdns struct { - UnixSockets []string +const defaultTimeout = 5 * time.Second - Log telegraf.Logger `toml:"-"` +type Powerdns struct { + UnixSockets []string `toml:"unix_sockets"` + Log telegraf.Logger `toml:"-"` } -var defaultTimeout = 5 * time.Second - func (*Powerdns) SampleConfig() string { return sampleConfig } diff --git a/plugins/inputs/powerdns_recursor/powerdns_recursor.go b/plugins/inputs/powerdns_recursor/powerdns_recursor.go index 48e83179a4746..48a77518f5a6a 100644 --- a/plugins/inputs/powerdns_recursor/powerdns_recursor.go +++ b/plugins/inputs/powerdns_recursor/powerdns_recursor.go @@ -14,6 +14,8 @@ import ( //go:embed sample.conf var sampleConfig string +const defaultTimeout = 5 * time.Second + type PowerdnsRecursor struct { UnixSockets []string `toml:"unix_sockets"` SocketDir string `toml:"socket_dir"` @@ -26,8 +28,6 @@ type PowerdnsRecursor struct { gatherFromServer func(address string, acc telegraf.Accumulator) error } -var defaultTimeout = 5 * time.Second - func (*PowerdnsRecursor) SampleConfig() string { return sampleConfig } diff --git a/plugins/inputs/processes/processes_notwindows.go b/plugins/inputs/processes/processes_notwindows.go index c574238fd5a23..e476e8ff2454f 100644 --- a/plugins/inputs/processes/processes_notwindows.go +++ b/plugins/inputs/processes/processes_notwindows.go @@ -19,15 +19,13 @@ import ( ) type Processes struct { - UseSudo bool `toml:"use_sudo"` + UseSudo bool `toml:"use_sudo"` + Log telegraf.Logger `toml:"-"` execPS func(UseSudo bool) ([]byte, error) readProcFile func(filename string) ([]byte, error) - - Log telegraf.Logger - - forcePS bool - forceProc bool + forcePS bool + forceProc bool } func (p *Processes) Gather(acc telegraf.Accumulator) error { diff --git a/plugins/inputs/procstat/filter.go b/plugins/inputs/procstat/filter.go index 3c090549c0718..d8f621048b77e 100644 --- a/plugins/inputs/procstat/filter.go +++ b/plugins/inputs/procstat/filter.go @@ -7,13 +7,13 @@ import ( "strconv" "strings" - "github.com/shirou/gopsutil/v4/process" + gopsprocess "github.com/shirou/gopsutil/v4/process" "github.com/influxdata/telegraf" - "github.com/influxdata/telegraf/filter" + telegraf_filter "github.com/influxdata/telegraf/filter" ) -type Filter struct { +type filter struct { Name string `toml:"name"` PidFiles []string `toml:"pid_files"` SystemdUnits []string `toml:"systemd_units"` @@ -29,13 +29,13 @@ type Filter struct { filterSupervisorUnit string filterCmds []*regexp.Regexp - filterUser filter.Filter - filterExecutable filter.Filter - filterProcessName filter.Filter + filterUser telegraf_filter.Filter + filterExecutable telegraf_filter.Filter + filterProcessName telegraf_filter.Filter finder *processFinder } -func (f *Filter) Init() error { +func (f *filter) init() error { if f.Name == "" { return errors.New("filter must be named") } @@ -74,13 +74,13 @@ func (f *Filter) Init() error { f.filterSupervisorUnit = strings.TrimSpace(strings.Join(f.SupervisorUnits, " ")) var err error - if f.filterUser, err = filter.Compile(f.Users); err != nil { + if f.filterUser, err = telegraf_filter.Compile(f.Users); err != nil { return fmt.Errorf("compiling users filter for %q failed: %w", f.Name, err) } - if f.filterExecutable, err = filter.Compile(f.Executables); err != nil { + if f.filterExecutable, err = telegraf_filter.Compile(f.Executables); err != nil { return fmt.Errorf("compiling executables filter for %q failed: %w", f.Name, err) } - if f.filterProcessName, err = filter.Compile(f.ProcessNames); err != nil { + if f.filterProcessName, err = telegraf_filter.Compile(f.ProcessNames); err != nil { return fmt.Errorf("compiling process-names filter for %q failed: %w", f.Name, err) } @@ -89,7 +89,7 @@ func (f *Filter) Init() error { return nil } -func (f *Filter) ApplyFilter() ([]processGroup, error) { +func (f *filter) applyFilter() ([]processGroup, error) { // Determine processes on service level. if there is no constraint on the // services, use all processes for matching. var groups []processGroup @@ -125,7 +125,7 @@ func (f *Filter) ApplyFilter() ([]processGroup, error) { } groups = append(groups, g...) default: - procs, err := process.Processes() + procs, err := gopsprocess.Processes() if err != nil { return nil, err } @@ -135,7 +135,7 @@ func (f *Filter) ApplyFilter() ([]processGroup, error) { // Filter by additional properties such as users, patterns etc result := make([]processGroup, 0, len(groups)) for _, g := range groups { - var matched []*process.Process + var matched []*gopsprocess.Process for _, p := range g.processes { // Users if f.filterUser != nil { @@ -218,13 +218,13 @@ func (f *Filter) ApplyFilter() ([]processGroup, error) { return result, nil } -func getChildren(p *process.Process) ([]*process.Process, error) { +func getChildren(p *gopsprocess.Process) ([]*gopsprocess.Process, error) { children, err := p.Children() // Check for cases that do not really mean error but rather means that there // is no match. switch { case err == nil, - errors.Is(err, process.ErrorNoChildren), + errors.Is(err, gopsprocess.ErrorNoChildren), strings.Contains(err.Error(), "exit status 1"): return children, nil } diff --git a/plugins/inputs/procstat/native_finder.go b/plugins/inputs/procstat/native_finder.go index 5f9812782b094..192a431acd503 100644 --- a/plugins/inputs/procstat/native_finder.go +++ b/plugins/inputs/procstat/native_finder.go @@ -7,16 +7,16 @@ import ( "strconv" "strings" - "github.com/shirou/gopsutil/v4/process" + gopsprocess "github.com/shirou/gopsutil/v4/process" ) // NativeFinder uses gopsutil to find processes type NativeFinder struct{} // Uid will return all pids for the given user -func (pg *NativeFinder) UID(user string) ([]PID, error) { - var dst []PID - procs, err := process.Processes() +func (pg *NativeFinder) uid(user string) ([]pid, error) { + var dst []pid + procs, err := gopsprocess.Processes() if err != nil { return dst, err } @@ -27,35 +27,35 @@ func (pg *NativeFinder) UID(user string) ([]PID, error) { continue } if username == user { - dst = append(dst, PID(p.Pid)) + dst = append(dst, pid(p.Pid)) } } return dst, nil } // PidFile returns the pid from the pid file given. -func (pg *NativeFinder) PidFile(path string) ([]PID, error) { - var pids []PID +func (pg *NativeFinder) pidFile(path string) ([]pid, error) { + var pids []pid pidString, err := os.ReadFile(path) if err != nil { return pids, fmt.Errorf("failed to read pidfile %q: %w", path, err) } - pid, err := strconv.ParseInt(strings.TrimSpace(string(pidString)), 10, 32) + processID, err := strconv.ParseInt(strings.TrimSpace(string(pidString)), 10, 32) if err != nil { return pids, err } - pids = append(pids, PID(pid)) + pids = append(pids, pid(processID)) return pids, nil } // FullPattern matches on the command line when the process was executed -func (pg *NativeFinder) FullPattern(pattern string) ([]PID, error) { - var pids []PID +func (pg *NativeFinder) fullPattern(pattern string) ([]pid, error) { + var pids []pid regxPattern, err := regexp.Compile(pattern) if err != nil { return pids, err } - procs, err := pg.FastProcessList() + procs, err := pg.fastProcessList() if err != nil { return pids, err } @@ -66,18 +66,18 @@ func (pg *NativeFinder) FullPattern(pattern string) ([]PID, error) { continue } if regxPattern.MatchString(cmd) { - pids = append(pids, PID(p.Pid)) + pids = append(pids, pid(p.Pid)) } } return pids, err } // Children matches children pids on the command line when the process was executed -func (pg *NativeFinder) Children(pid PID) ([]PID, error) { +func (pg *NativeFinder) children(processID pid) ([]pid, error) { // Get all running processes - p, err := process.NewProcess(int32(pid)) + p, err := gopsprocess.NewProcess(int32(processID)) if err != nil { - return nil, fmt.Errorf("getting process %d failed: %w", pid, err) + return nil, fmt.Errorf("getting process %d failed: %w", processID, err) } // Get all children of the current process @@ -85,35 +85,35 @@ func (pg *NativeFinder) Children(pid PID) ([]PID, error) { if err != nil { return nil, fmt.Errorf("unable to get children of process %d: %w", p.Pid, err) } - pids := make([]PID, 0, len(children)) + pids := make([]pid, 0, len(children)) for _, child := range children { - pids = append(pids, PID(child.Pid)) + pids = append(pids, pid(child.Pid)) } return pids, err } -func (pg *NativeFinder) FastProcessList() ([]*process.Process, error) { - pids, err := process.Pids() +func (pg *NativeFinder) fastProcessList() ([]*gopsprocess.Process, error) { + pids, err := gopsprocess.Pids() if err != nil { return nil, err } - result := make([]*process.Process, 0, len(pids)) + result := make([]*gopsprocess.Process, 0, len(pids)) for _, pid := range pids { - result = append(result, &process.Process{Pid: pid}) + result = append(result, &gopsprocess.Process{Pid: pid}) } return result, nil } // Pattern matches on the process name -func (pg *NativeFinder) Pattern(pattern string) ([]PID, error) { - var pids []PID +func (pg *NativeFinder) pattern(pattern string) ([]pid, error) { + var pids []pid regxPattern, err := regexp.Compile(pattern) if err != nil { return pids, err } - procs, err := pg.FastProcessList() + procs, err := pg.fastProcessList() if err != nil { return pids, err } @@ -124,7 +124,7 @@ func (pg *NativeFinder) Pattern(pattern string) ([]PID, error) { continue } if regxPattern.MatchString(name) { - pids = append(pids, PID(p.Pid)) + pids = append(pids, pid(p.Pid)) } } return pids, err diff --git a/plugins/inputs/procstat/native_finder_test.go b/plugins/inputs/procstat/native_finder_test.go index 1e6c6d84ade0c..e4e6e0bb8726d 100644 --- a/plugins/inputs/procstat/native_finder_test.go +++ b/plugins/inputs/procstat/native_finder_test.go @@ -14,7 +14,7 @@ import ( func BenchmarkPattern(b *testing.B) { finder := &NativeFinder{} for n := 0; n < b.N; n++ { - _, err := finder.Pattern(".*") + _, err := finder.pattern(".*") require.NoError(b, err) } } @@ -22,7 +22,7 @@ func BenchmarkPattern(b *testing.B) { func BenchmarkFullPattern(b *testing.B) { finder := &NativeFinder{} for n := 0; n < b.N; n++ { - _, err := finder.FullPattern(".*") + _, err := finder.fullPattern(".*") require.NoError(b, err) } } @@ -37,26 +37,26 @@ func TestChildPattern(t *testing.T) { require.NoError(t, err) // Spawn two child processes and get their PIDs - expected := make([]PID, 0, 2) + expected := make([]pid, 0, 2) ctx, cancel := context.WithCancel(context.Background()) defer cancel() // First process cmd1 := exec.CommandContext(ctx, "/bin/sh") require.NoError(t, cmd1.Start(), "starting first command failed") - expected = append(expected, PID(cmd1.Process.Pid)) + expected = append(expected, pid(cmd1.Process.Pid)) // Second process cmd2 := exec.CommandContext(ctx, "/bin/sh") require.NoError(t, cmd2.Start(), "starting first command failed") - expected = append(expected, PID(cmd2.Process.Pid)) + expected = append(expected, pid(cmd2.Process.Pid)) // Use the plugin to find the children finder := &NativeFinder{} - parent, err := finder.Pattern(parentName) + parent, err := finder.pattern(parentName) require.NoError(t, err) require.Len(t, parent, 1) - children, err := finder.Children(parent[0]) + children, err := finder.children(parent[0]) require.NoError(t, err) require.ElementsMatch(t, expected, children) } @@ -66,7 +66,7 @@ func TestGather_RealPatternIntegration(t *testing.T) { t.Skip("Skipping integration test in short mode") } pg := &NativeFinder{} - pids, err := pg.Pattern(`procstat`) + pids, err := pg.pattern(`procstat`) require.NoError(t, err) require.NotEmpty(t, pids) } @@ -79,7 +79,7 @@ func TestGather_RealFullPatternIntegration(t *testing.T) { t.Skip("Skipping integration test on Non-Windows OS") } pg := &NativeFinder{} - pids, err := pg.FullPattern(`%procstat%`) + pids, err := pg.fullPattern(`%procstat%`) require.NoError(t, err) require.NotEmpty(t, pids) } @@ -92,7 +92,7 @@ func TestGather_RealUserIntegration(t *testing.T) { require.NoError(t, err) pg := &NativeFinder{} - pids, err := pg.UID(currentUser.Username) + pids, err := pg.uid(currentUser.Username) require.NoError(t, err) require.NotEmpty(t, pids) } diff --git a/plugins/inputs/procstat/os_linux.go b/plugins/inputs/procstat/os_linux.go index 6c9d906faa276..cec134ee33232 100644 --- a/plugins/inputs/procstat/os_linux.go +++ b/plugins/inputs/procstat/os_linux.go @@ -13,15 +13,15 @@ import ( "github.com/coreos/go-systemd/v22/dbus" "github.com/prometheus/procfs" - "github.com/shirou/gopsutil/v4/net" - "github.com/shirou/gopsutil/v4/process" + gopsnet "github.com/shirou/gopsutil/v4/net" + gopsprocess "github.com/shirou/gopsutil/v4/process" "github.com/vishvananda/netlink" "golang.org/x/sys/unix" "github.com/influxdata/telegraf/internal" ) -func processName(p *process.Process) (string, error) { +func processName(p *gopsprocess.Process) (string, error) { return p.Exe() } @@ -29,7 +29,7 @@ func queryPidWithWinServiceName(_ string) (uint32, error) { return 0, errors.New("os not supporting win_service option") } -func collectMemmap(proc Process, prefix string, fields map[string]any) { +func collectMemmap(proc process, prefix string, fields map[string]any) { memMapStats, err := proc.MemoryMaps(true) if err == nil && len(*memMapStats) == 1 { memMap := (*memMapStats)[0] @@ -70,12 +70,12 @@ func findBySystemdUnits(units []string) ([]processGroup, error) { if !ok { return nil, fmt.Errorf("failed to parse PID %v of unit %q: invalid type %T", raw, u, raw) } - p, err := process.NewProcess(int32(pid)) + p, err := gopsprocess.NewProcess(int32(pid)) if err != nil { return nil, fmt.Errorf("failed to find process for PID %d of unit %q: %w", pid, u, err) } groups = append(groups, processGroup{ - processes: []*process.Process{p}, + processes: []*gopsprocess.Process{p}, tags: map[string]string{"systemd_unit": u.Name}, }) } @@ -87,14 +87,14 @@ func findByWindowsServices(_ []string) ([]processGroup, error) { return nil, nil } -func collectTotalReadWrite(proc Process) (r, w uint64, err error) { +func collectTotalReadWrite(proc process) (r, w uint64, err error) { path := internal.GetProcPath() fs, err := procfs.NewFS(path) if err != nil { return 0, 0, err } - p, err := fs.Proc(int(proc.PID())) + p, err := fs.Proc(int(proc.pid())) if err != nil { return 0, 0, err } @@ -177,7 +177,7 @@ func mapFdToInode(pid int32, fd uint32) (uint32, error) { return uint32(inode), nil } -func statsTCP(conns []net.ConnectionStat, family uint8) ([]map[string]interface{}, error) { +func statsTCP(conns []gopsnet.ConnectionStat, family uint8) ([]map[string]interface{}, error) { if len(conns) == 0 { return nil, nil } @@ -185,7 +185,7 @@ func statsTCP(conns []net.ConnectionStat, family uint8) ([]map[string]interface{ // For TCP we need the inode for each connection to relate the connection // statistics to the actual process socket. Therefore, map the // file-descriptors to inodes using the /proc//fd entries. - inodes := make(map[uint32]net.ConnectionStat, len(conns)) + inodes := make(map[uint32]gopsnet.ConnectionStat, len(conns)) for _, c := range conns { inode, err := mapFdToInode(c.Pid, c.Fd) if err != nil { @@ -240,7 +240,7 @@ func statsTCP(conns []net.ConnectionStat, family uint8) ([]map[string]interface{ return fieldslist, nil } -func statsUDP(conns []net.ConnectionStat, family uint8) ([]map[string]interface{}, error) { +func statsUDP(conns []gopsnet.ConnectionStat, family uint8) ([]map[string]interface{}, error) { if len(conns) == 0 { return nil, nil } @@ -248,7 +248,7 @@ func statsUDP(conns []net.ConnectionStat, family uint8) ([]map[string]interface{ // For UDP we need the inode for each connection to relate the connection // statistics to the actual process socket. Therefore, map the // file-descriptors to inodes using the /proc//fd entries. - inodes := make(map[uint32]net.ConnectionStat, len(conns)) + inodes := make(map[uint32]gopsnet.ConnectionStat, len(conns)) for _, c := range conns { inode, err := mapFdToInode(c.Pid, c.Fd) if err != nil { @@ -299,7 +299,7 @@ func statsUDP(conns []net.ConnectionStat, family uint8) ([]map[string]interface{ return fieldslist, nil } -func statsUnix(conns []net.ConnectionStat) ([]map[string]interface{}, error) { +func statsUnix(conns []gopsnet.ConnectionStat) ([]map[string]interface{}, error) { if len(conns) == 0 { return nil, nil } @@ -307,7 +307,7 @@ func statsUnix(conns []net.ConnectionStat) ([]map[string]interface{}, error) { // We need to read the inode for each connection to relate the connection // statistics to the actual process socket. Therefore, map the // file-descriptors to inodes using the /proc//fd entries. - inodes := make(map[uint32]net.ConnectionStat, len(conns)) + inodes := make(map[uint32]gopsnet.ConnectionStat, len(conns)) for _, c := range conns { inode, err := mapFdToInode(c.Pid, c.Fd) if err != nil { diff --git a/plugins/inputs/procstat/os_others.go b/plugins/inputs/procstat/os_others.go index 62334f885ccda..ba34038072a21 100644 --- a/plugins/inputs/procstat/os_others.go +++ b/plugins/inputs/procstat/os_others.go @@ -6,11 +6,11 @@ import ( "errors" "syscall" - "github.com/shirou/gopsutil/v4/net" - "github.com/shirou/gopsutil/v4/process" + gopsnet "github.com/shirou/gopsutil/v4/net" + gopsprocess "github.com/shirou/gopsutil/v4/process" ) -func processName(p *process.Process) (string, error) { +func processName(p *gopsprocess.Process) (string, error) { return p.Exe() } @@ -18,7 +18,7 @@ func queryPidWithWinServiceName(string) (uint32, error) { return 0, errors.New("os not supporting win_service option") } -func collectMemmap(Process, string, map[string]any) {} +func collectMemmap(process, string, map[string]any) {} func findBySystemdUnits([]string) ([]processGroup, error) { return nil, nil @@ -28,11 +28,11 @@ func findByWindowsServices([]string) ([]processGroup, error) { return nil, nil } -func collectTotalReadWrite(Process) (r, w uint64, err error) { +func collectTotalReadWrite(process) (r, w uint64, err error) { return 0, 0, errors.ErrUnsupported } -func statsTCP(conns []net.ConnectionStat, _ uint8) ([]map[string]interface{}, error) { +func statsTCP(conns []gopsnet.ConnectionStat, _ uint8) ([]map[string]interface{}, error) { if len(conns) == 0 { return nil, nil } @@ -65,7 +65,7 @@ func statsTCP(conns []net.ConnectionStat, _ uint8) ([]map[string]interface{}, er return fieldslist, nil } -func statsUDP(conns []net.ConnectionStat, _ uint8) ([]map[string]interface{}, error) { +func statsUDP(conns []gopsnet.ConnectionStat, _ uint8) ([]map[string]interface{}, error) { if len(conns) == 0 { return nil, nil } @@ -98,6 +98,6 @@ func statsUDP(conns []net.ConnectionStat, _ uint8) ([]map[string]interface{}, er return fieldslist, nil } -func statsUnix([]net.ConnectionStat) ([]map[string]interface{}, error) { +func statsUnix([]gopsnet.ConnectionStat) ([]map[string]interface{}, error) { return nil, errors.ErrUnsupported } diff --git a/plugins/inputs/procstat/os_windows.go b/plugins/inputs/procstat/os_windows.go index 05ada5a4748bc..b15e424d405f7 100644 --- a/plugins/inputs/procstat/os_windows.go +++ b/plugins/inputs/procstat/os_windows.go @@ -8,13 +8,13 @@ import ( "syscall" "unsafe" - "github.com/shirou/gopsutil/v4/net" - "github.com/shirou/gopsutil/v4/process" + gopsnet "github.com/shirou/gopsutil/v4/net" + gopsprocess "github.com/shirou/gopsutil/v4/process" "golang.org/x/sys/windows" "golang.org/x/sys/windows/svc/mgr" ) -func processName(p *process.Process) (string, error) { +func processName(p *gopsprocess.Process) (string, error) { return p.Name() } @@ -57,7 +57,7 @@ func queryPidWithWinServiceName(winServiceName string) (uint32, error) { return p.ProcessId, nil } -func collectMemmap(Process, string, map[string]any) {} +func collectMemmap(process, string, map[string]any) {} func findBySystemdUnits([]string) ([]processGroup, error) { return nil, nil @@ -71,13 +71,13 @@ func findByWindowsServices(services []string) ([]processGroup, error) { return nil, fmt.Errorf("failed to query PID of service %q: %w", service, err) } - p, err := process.NewProcess(int32(pid)) + p, err := gopsprocess.NewProcess(int32(pid)) if err != nil { return nil, fmt.Errorf("failed to find process for PID %d of service %q: %w", pid, service, err) } groups = append(groups, processGroup{ - processes: []*process.Process{p}, + processes: []*gopsprocess.Process{p}, tags: map[string]string{"win_service": service}, }) } @@ -85,11 +85,11 @@ func findByWindowsServices(services []string) ([]processGroup, error) { return groups, nil } -func collectTotalReadWrite(Process) (r, w uint64, err error) { +func collectTotalReadWrite(process) (r, w uint64, err error) { return 0, 0, errors.ErrUnsupported } -func statsTCP(conns []net.ConnectionStat, _ uint8) ([]map[string]interface{}, error) { +func statsTCP(conns []gopsnet.ConnectionStat, _ uint8) ([]map[string]interface{}, error) { if len(conns) == 0 { return nil, nil } @@ -122,7 +122,7 @@ func statsTCP(conns []net.ConnectionStat, _ uint8) ([]map[string]interface{}, er return fieldslist, nil } -func statsUDP(conns []net.ConnectionStat, _ uint8) ([]map[string]interface{}, error) { +func statsUDP(conns []gopsnet.ConnectionStat, _ uint8) ([]map[string]interface{}, error) { if len(conns) == 0 { return nil, nil } @@ -155,6 +155,6 @@ func statsUDP(conns []net.ConnectionStat, _ uint8) ([]map[string]interface{}, er return fieldslist, nil } -func statsUnix([]net.ConnectionStat) ([]map[string]interface{}, error) { +func statsUnix([]gopsnet.ConnectionStat) ([]map[string]interface{}, error) { return nil, nil } diff --git a/plugins/inputs/procstat/pgrep.go b/plugins/inputs/procstat/pgrep.go index 8451210e94530..add3a2dfb120d 100644 --- a/plugins/inputs/procstat/pgrep.go +++ b/plugins/inputs/procstat/pgrep.go @@ -11,54 +11,54 @@ import ( ) // Implementation of PIDGatherer that execs pgrep to find processes -type Pgrep struct { +type pgrep struct { path string } -func newPgrepFinder() (PIDFinder, error) { +func newPgrepFinder() (pidFinder, error) { path, err := exec.LookPath("pgrep") if err != nil { return nil, fmt.Errorf("could not find pgrep binary: %w", err) } - return &Pgrep{path}, nil + return &pgrep{path}, nil } -func (pg *Pgrep) PidFile(path string) ([]PID, error) { - var pids []PID +func (pg *pgrep) pidFile(path string) ([]pid, error) { + var pids []pid pidString, err := os.ReadFile(path) if err != nil { return pids, fmt.Errorf("failed to read pidfile %q: %w", path, err) } - pid, err := strconv.ParseInt(strings.TrimSpace(string(pidString)), 10, 32) + processID, err := strconv.ParseInt(strings.TrimSpace(string(pidString)), 10, 32) if err != nil { return pids, err } - pids = append(pids, PID(pid)) + pids = append(pids, pid(processID)) return pids, nil } -func (pg *Pgrep) Pattern(pattern string) ([]PID, error) { +func (pg *pgrep) pattern(pattern string) ([]pid, error) { args := []string{pattern} return pg.find(args) } -func (pg *Pgrep) UID(user string) ([]PID, error) { +func (pg *pgrep) uid(user string) ([]pid, error) { args := []string{"-u", user} return pg.find(args) } -func (pg *Pgrep) FullPattern(pattern string) ([]PID, error) { +func (pg *pgrep) fullPattern(pattern string) ([]pid, error) { args := []string{"-f", pattern} return pg.find(args) } -func (pg *Pgrep) Children(pid PID) ([]PID, error) { +func (pg *pgrep) children(pid pid) ([]pid, error) { args := []string{"-P", strconv.FormatInt(int64(pid), 10)} return pg.find(args) } -func (pg *Pgrep) find(args []string) ([]PID, error) { +func (pg *pgrep) find(args []string) ([]pid, error) { // Execute pgrep with the given arguments buf, err := exec.Command(pg.path, args...).Output() if err != nil { @@ -73,13 +73,13 @@ func (pg *Pgrep) find(args []string) ([]PID, error) { // Parse the command output to extract the PIDs fields := strings.Fields(out) - pids := make([]PID, 0, len(fields)) + pids := make([]pid, 0, len(fields)) for _, field := range fields { - pid, err := strconv.ParseInt(field, 10, 32) + processID, err := strconv.ParseInt(field, 10, 32) if err != nil { return nil, err } - pids = append(pids, PID(pid)) + pids = append(pids, pid(processID)) } return pids, nil } diff --git a/plugins/inputs/procstat/process.go b/plugins/inputs/procstat/process.go index a0e8e60c880f0..c5eeb831d8b73 100644 --- a/plugins/inputs/procstat/process.go +++ b/plugins/inputs/procstat/process.go @@ -9,41 +9,41 @@ import ( "time" gopsnet "github.com/shirou/gopsutil/v4/net" - "github.com/shirou/gopsutil/v4/process" + gopsprocess "github.com/shirou/gopsutil/v4/process" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/metric" ) -type Process interface { - PID() PID +type process interface { Name() (string, error) - SetTag(string, string) - MemoryMaps(bool) (*[]process.MemoryMapsStat, error) - Metrics(string, *collectionConfig, time.Time) ([]telegraf.Metric, error) + MemoryMaps(bool) (*[]gopsprocess.MemoryMapsStat, error) + pid() pid + setTag(string, string) + metrics(string, *collectionConfig, time.Time) ([]telegraf.Metric, error) } -type PIDFinder interface { - PidFile(path string) ([]PID, error) - Pattern(pattern string) ([]PID, error) - UID(user string) ([]PID, error) - FullPattern(path string) ([]PID, error) - Children(pid PID) ([]PID, error) +type pidFinder interface { + pidFile(path string) ([]pid, error) + pattern(pattern string) ([]pid, error) + uid(user string) ([]pid, error) + fullPattern(path string) ([]pid, error) + children(pid pid) ([]pid, error) } -type Proc struct { +type proc struct { hasCPUTimes bool tags map[string]string - *process.Process + *gopsprocess.Process } -func newProc(pid PID) (Process, error) { - p, err := process.NewProcess(int32(pid)) +func newProc(pid pid) (process, error) { + p, err := gopsprocess.NewProcess(int32(pid)) if err != nil { return nil, err } - proc := &Proc{ + proc := &proc{ Process: p, hasCPUTimes: false, tags: make(map[string]string), @@ -51,15 +51,15 @@ func newProc(pid PID) (Process, error) { return proc, nil } -func (p *Proc) PID() PID { - return PID(p.Process.Pid) +func (p *proc) pid() pid { + return pid(p.Process.Pid) } -func (p *Proc) SetTag(k, v string) { +func (p *proc) setTag(k, v string) { p.tags[k] = v } -func (p *Proc) percent(_ time.Duration) (float64, error) { +func (p *proc) percent(_ time.Duration) (float64, error) { cpuPerc, err := p.Process.Percent(time.Duration(0)) if !p.hasCPUTimes && err == nil { p.hasCPUTimes = true @@ -68,8 +68,8 @@ func (p *Proc) percent(_ time.Duration) (float64, error) { return cpuPerc, err } -// Add metrics a single Process -func (p *Proc) Metrics(prefix string, cfg *collectionConfig, t time.Time) ([]telegraf.Metric, error) { +// Add metrics a single process +func (p *proc) metrics(prefix string, cfg *collectionConfig, t time.Time) ([]telegraf.Metric, error) { if prefix != "" { prefix += "_" } @@ -163,27 +163,27 @@ func (p *Proc) Metrics(prefix string, cfg *collectionConfig, t time.Time) ([]tel for _, rlim := range rlims { var name string switch rlim.Resource { - case process.RLIMIT_CPU: + case gopsprocess.RLIMIT_CPU: name = "cpu_time" - case process.RLIMIT_DATA: + case gopsprocess.RLIMIT_DATA: name = "memory_data" - case process.RLIMIT_STACK: + case gopsprocess.RLIMIT_STACK: name = "memory_stack" - case process.RLIMIT_RSS: + case gopsprocess.RLIMIT_RSS: name = "memory_rss" - case process.RLIMIT_NOFILE: + case gopsprocess.RLIMIT_NOFILE: name = "num_fds" - case process.RLIMIT_MEMLOCK: + case gopsprocess.RLIMIT_MEMLOCK: name = "memory_locked" - case process.RLIMIT_AS: + case gopsprocess.RLIMIT_AS: name = "memory_vms" - case process.RLIMIT_LOCKS: + case gopsprocess.RLIMIT_LOCKS: name = "file_locks" - case process.RLIMIT_SIGPENDING: + case gopsprocess.RLIMIT_SIGPENDING: name = "signals_pending" - case process.RLIMIT_NICE: + case gopsprocess.RLIMIT_NICE: name = "nice_priority" - case process.RLIMIT_RTPRIO: + case gopsprocess.RLIMIT_RTPRIO: name = "realtime_priority" default: continue diff --git a/plugins/inputs/procstat/procstat.go b/plugins/inputs/procstat/procstat.go index 4e3e4df6d38c0..6bf1e8402dc69 100644 --- a/plugins/inputs/procstat/procstat.go +++ b/plugins/inputs/procstat/procstat.go @@ -15,7 +15,7 @@ import ( "strings" "time" - "github.com/shirou/gopsutil/v4/process" + gopsprocess "github.com/shirou/gopsutil/v4/process" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal/choice" @@ -28,14 +28,7 @@ var sampleConfig string // execCommand is so tests can mock out exec.Command usage. var execCommand = exec.Command -type PID int32 - -type collectionConfig struct { - solarisMode bool - tagging map[string]bool - features map[string]bool - socketProtos []string -} +type pid int32 type Procstat struct { PidFinder string `toml:"pid_finder"` @@ -57,24 +50,31 @@ type Procstat struct { Properties []string `toml:"properties"` SocketProtocols []string `toml:"socket_protocols"` TagWith []string `toml:"tag_with"` - Filter []Filter `toml:"filter"` + Filter []filter `toml:"filter"` Log telegraf.Logger `toml:"-"` - finder PIDFinder - processes map[PID]Process + finder pidFinder + processes map[pid]process cfg collectionConfig oldMode bool - createProcess func(PID) (Process, error) + createProcess func(pid) (process, error) +} + +type collectionConfig struct { + solarisMode bool + tagging map[string]bool + features map[string]bool + socketProtos []string } -type PidsTags struct { - PIDs []PID +type pidsTags struct { + PIDs []pid Tags map[string]string } type processGroup struct { - processes []*process.Process + processes []*gopsprocess.Process tags map[string]string } @@ -196,14 +196,14 @@ func (p *Procstat) Init() error { // New-style operations for i := range p.Filter { p.Filter[i].Log = p.Log - if err := p.Filter[i].Init(); err != nil { + if err := p.Filter[i].init(); err != nil { return fmt.Errorf("initializing filter %d failed: %w", i, err) } } } // Initialize the running process cache - p.processes = make(map[PID]Process) + p.processes = make(map[pid]process) return nil } @@ -240,7 +240,7 @@ func (p *Procstat) gatherOld(acc telegraf.Accumulator) error { } var count int - running := make(map[PID]bool) + running := make(map[pid]bool) for _, r := range results { if len(r.PIDs) < 1 && len(p.SupervisorUnits) > 0 { continue @@ -271,16 +271,16 @@ func (p *Procstat) gatherOld(acc telegraf.Accumulator) error { // Add initial tags for k, v := range r.Tags { - proc.SetTag(k, v) + proc.setTag(k, v) } if p.ProcessName != "" { - proc.SetTag("process_name", p.ProcessName) + proc.setTag("process_name", p.ProcessName) } p.processes[pid] = proc } running[pid] = true - metrics, err := proc.Metrics(p.Prefix, &p.cfg, now) + metrics, err := proc.metrics(p.Prefix, &p.cfg, now) if err != nil { // Continue after logging an error as there might still be // metrics available @@ -324,9 +324,9 @@ func (p *Procstat) gatherOld(acc telegraf.Accumulator) error { func (p *Procstat) gatherNew(acc telegraf.Accumulator) error { now := time.Now() - running := make(map[PID]bool) + running := make(map[pid]bool) for _, f := range p.Filter { - groups, err := f.ApplyFilter() + groups, err := f.applyFilter() if err != nil { // Add lookup error-metric acc.AddFields( @@ -357,8 +357,8 @@ func (p *Procstat) gatherNew(acc telegraf.Accumulator) error { // Use the cached processes as we need the existing instances // to compute delta-metrics (e.g. cpu-usage). - pid := PID(gp.Pid) - proc, found := p.processes[pid] + pid := pid(gp.Pid) + process, found := p.processes[pid] if !found { //nolint:errcheck // Assumption: if a process has no name, it probably does not exist if name, _ := gp.Name(); name == "" { @@ -372,19 +372,19 @@ func (p *Procstat) gatherNew(acc telegraf.Accumulator) error { tags[k] = v } if p.ProcessName != "" { - proc.SetTag("process_name", p.ProcessName) + process.setTag("process_name", p.ProcessName) } tags["filter"] = f.Name - proc = &Proc{ + process = &proc{ Process: gp, hasCPUTimes: false, tags: tags, } - p.processes[pid] = proc + p.processes[pid] = process } running[pid] = true - metrics, err := proc.Metrics(p.Prefix, &p.cfg, now) + metrics, err := process.metrics(p.Prefix, &p.cfg, now) if err != nil { // Continue after logging an error as there might still be // metrics available @@ -422,7 +422,7 @@ func (p *Procstat) gatherNew(acc telegraf.Accumulator) error { } // Get matching PIDs and their initial tags -func (p *Procstat) findPids() ([]PidsTags, error) { +func (p *Procstat) findPids() ([]pidsTags, error) { switch { case len(p.SupervisorUnits) > 0: return p.findSupervisorUnits() @@ -434,65 +434,65 @@ func (p *Procstat) findPids() ([]PidsTags, error) { return nil, err } tags := map[string]string{"win_service": p.WinService} - return []PidsTags{{pids, tags}}, nil + return []pidsTags{{pids, tags}}, nil case p.CGroup != "": return p.cgroupPIDs() case p.PidFile != "": - pids, err := p.finder.PidFile(p.PidFile) + pids, err := p.finder.pidFile(p.PidFile) if err != nil { return nil, err } tags := map[string]string{"pidfile": p.PidFile} - return []PidsTags{{pids, tags}}, nil + return []pidsTags{{pids, tags}}, nil case p.Exe != "": - pids, err := p.finder.Pattern(p.Exe) + pids, err := p.finder.pattern(p.Exe) if err != nil { return nil, err } tags := map[string]string{"exe": p.Exe} - return []PidsTags{{pids, tags}}, nil + return []pidsTags{{pids, tags}}, nil case p.Pattern != "": - pids, err := p.finder.FullPattern(p.Pattern) + pids, err := p.finder.fullPattern(p.Pattern) if err != nil { return nil, err } tags := map[string]string{"pattern": p.Pattern} - return []PidsTags{{pids, tags}}, nil + return []pidsTags{{pids, tags}}, nil case p.User != "": - pids, err := p.finder.UID(p.User) + pids, err := p.finder.uid(p.User) if err != nil { return nil, err } tags := map[string]string{"user": p.User} - return []PidsTags{{pids, tags}}, nil + return []pidsTags{{pids, tags}}, nil } return nil, errors.New("no filter option set") } -func (p *Procstat) findSupervisorUnits() ([]PidsTags, error) { +func (p *Procstat) findSupervisorUnits() ([]pidsTags, error) { groups, groupsTags, err := p.supervisorPIDs() if err != nil { return nil, fmt.Errorf("getting supervisor PIDs failed: %w", err) } // According to the PID, find the system process number and get the child processes - pidTags := make([]PidsTags, 0, len(groups)) + pidTags := make([]pidsTags, 0, len(groups)) for _, group := range groups { grppid := groupsTags[group]["pid"] if grppid == "" { - pidTags = append(pidTags, PidsTags{nil, groupsTags[group]}) + pidTags = append(pidTags, pidsTags{nil, groupsTags[group]}) continue } - pid, err := strconv.ParseInt(grppid, 10, 32) + processID, err := strconv.ParseInt(grppid, 10, 32) if err != nil { return nil, fmt.Errorf("converting PID %q failed: %w", grppid, err) } // Get all children of the supervisor unit - pids, err := p.finder.Children(PID(pid)) + pids, err := p.finder.children(pid(processID)) if err != nil { - return nil, fmt.Errorf("getting children for %d failed: %w", pid, err) + return nil, fmt.Errorf("getting children for %d failed: %w", processID, err) } tags := map[string]string{"pattern": p.Pattern, "parent_pid": p.Pattern} @@ -510,7 +510,7 @@ func (p *Procstat) findSupervisorUnits() ([]PidsTags, error) { } // Remove duplicate pid tags delete(tags, "pid") - pidTags = append(pidTags, PidsTags{pids, tags}) + pidTags = append(pidTags, pidsTags{pids, tags}) } return pidTags, nil } @@ -559,30 +559,30 @@ func (p *Procstat) supervisorPIDs() ([]string, map[string]map[string]string, err return p.SupervisorUnits, mainPids, nil } -func (p *Procstat) systemdUnitPIDs() ([]PidsTags, error) { +func (p *Procstat) systemdUnitPIDs() ([]pidsTags, error) { if p.IncludeSystemdChildren { p.CGroup = "systemd/system.slice/" + p.SystemdUnit return p.cgroupPIDs() } - var pidTags []PidsTags + var pidTags []pidsTags pids, err := p.simpleSystemdUnitPIDs() if err != nil { return nil, err } tags := map[string]string{"systemd_unit": p.SystemdUnit} - pidTags = append(pidTags, PidsTags{pids, tags}) + pidTags = append(pidTags, pidsTags{pids, tags}) return pidTags, nil } -func (p *Procstat) simpleSystemdUnitPIDs() ([]PID, error) { +func (p *Procstat) simpleSystemdUnitPIDs() ([]pid, error) { out, err := execCommand("systemctl", "show", p.SystemdUnit).Output() if err != nil { return nil, err } lines := bytes.Split(out, []byte{'\n'}) - pids := make([]PID, 0, len(lines)) + pids := make([]pid, 0, len(lines)) for _, line := range lines { kv := bytes.SplitN(line, []byte{'='}, 2) if len(kv) != 2 { @@ -594,17 +594,17 @@ func (p *Procstat) simpleSystemdUnitPIDs() ([]PID, error) { if len(kv[1]) == 0 || bytes.Equal(kv[1], []byte("0")) { return nil, nil } - pid, err := strconv.ParseInt(string(kv[1]), 10, 32) + processID, err := strconv.ParseInt(string(kv[1]), 10, 32) if err != nil { return nil, fmt.Errorf("invalid pid %q", kv[1]) } - pids = append(pids, PID(pid)) + pids = append(pids, pid(processID)) } return pids, nil } -func (p *Procstat) cgroupPIDs() ([]PidsTags, error) { +func (p *Procstat) cgroupPIDs() ([]pidsTags, error) { procsPath := p.CGroup if procsPath[0] != '/' { procsPath = "/sys/fs/cgroup/" + procsPath @@ -615,20 +615,20 @@ func (p *Procstat) cgroupPIDs() ([]PidsTags, error) { return nil, fmt.Errorf("glob failed: %w", err) } - pidTags := make([]PidsTags, 0, len(items)) + pidTags := make([]pidsTags, 0, len(items)) for _, item := range items { pids, err := p.singleCgroupPIDs(item) if err != nil { return nil, err } tags := map[string]string{"cgroup": p.CGroup, "cgroup_full": item} - pidTags = append(pidTags, PidsTags{pids, tags}) + pidTags = append(pidTags, pidsTags{pids, tags}) } return pidTags, nil } -func (p *Procstat) singleCgroupPIDs(path string) ([]PID, error) { +func (p *Procstat) singleCgroupPIDs(path string) ([]pid, error) { ok, err := isDir(path) if err != nil { return nil, err @@ -643,16 +643,16 @@ func (p *Procstat) singleCgroupPIDs(path string) ([]PID, error) { } lines := bytes.Split(out, []byte{'\n'}) - pids := make([]PID, 0, len(lines)) + pids := make([]pid, 0, len(lines)) for _, pidBS := range lines { if len(pidBS) == 0 { continue } - pid, err := strconv.ParseInt(string(pidBS), 10, 32) + processID, err := strconv.ParseInt(string(pidBS), 10, 32) if err != nil { return nil, fmt.Errorf("invalid pid %q", pidBS) } - pids = append(pids, PID(pid)) + pids = append(pids, pid(processID)) } return pids, nil @@ -666,15 +666,15 @@ func isDir(path string) (bool, error) { return result.IsDir(), nil } -func (p *Procstat) winServicePIDs() ([]PID, error) { - var pids []PID +func (p *Procstat) winServicePIDs() ([]pid, error) { + var pids []pid - pid, err := queryPidWithWinServiceName(p.WinService) + processID, err := queryPidWithWinServiceName(p.WinService) if err != nil { return pids, err } - pids = append(pids, PID(pid)) + pids = append(pids, pid(processID)) return pids, nil } diff --git a/plugins/inputs/procstat/procstat_test.go b/plugins/inputs/procstat/procstat_test.go index aa833a86f9b24..4256f08e24234 100644 --- a/plugins/inputs/procstat/procstat_test.go +++ b/plugins/inputs/procstat/procstat_test.go @@ -12,7 +12,7 @@ import ( "testing" "time" - "github.com/shirou/gopsutil/v4/process" + gopsprocess "github.com/shirou/gopsutil/v4/process" "github.com/stretchr/testify/require" "github.com/influxdata/telegraf" @@ -77,73 +77,69 @@ TestGather_STARTINGsupervisorUnitPIDs STARTING`) } type testPgrep struct { - pids []PID + pids []pid err error } -func newTestFinder(pids []PID) PIDFinder { +func newTestFinder(pids []pid) pidFinder { return &testPgrep{ pids: pids, err: nil, } } -func (pg *testPgrep) PidFile(_ string) ([]PID, error) { +func (pg *testPgrep) pidFile(_ string) ([]pid, error) { return pg.pids, pg.err } -func (p *testProc) Cmdline() (string, error) { - return "test_proc", nil -} - -func (pg *testPgrep) Pattern(_ string) ([]PID, error) { +func (pg *testPgrep) pattern(_ string) ([]pid, error) { return pg.pids, pg.err } -func (pg *testPgrep) UID(_ string) ([]PID, error) { +func (pg *testPgrep) uid(_ string) ([]pid, error) { return pg.pids, pg.err } -func (pg *testPgrep) FullPattern(_ string) ([]PID, error) { +func (pg *testPgrep) fullPattern(_ string) ([]pid, error) { return pg.pids, pg.err } -func (pg *testPgrep) Children(_ PID) ([]PID, error) { - pids := []PID{7311, 8111, 8112} +func (pg *testPgrep) children(_ pid) ([]pid, error) { + pids := []pid{7311, 8111, 8112} return pids, pg.err } type testProc struct { - pid PID - tags map[string]string + procID pid + tags map[string]string } -func newTestProc(pid PID) (Process, error) { +func newTestProc(pid pid) (process, error) { proc := &testProc{ - pid: pid, - tags: make(map[string]string), + procID: pid, + tags: make(map[string]string), } return proc, nil } -func (p *testProc) PID() PID { - return p.pid +func (p *testProc) pid() pid { + return p.procID } func (p *testProc) Name() (string, error) { return "test_proc", nil } -func (p *testProc) SetTag(k, v string) { +func (p *testProc) setTag(k, v string) { p.tags[k] = v } -func (p *testProc) MemoryMaps(bool) (*[]process.MemoryMapsStat, error) { - stats := make([]process.MemoryMapsStat, 0) +func (p *testProc) MemoryMaps(bool) (*[]gopsprocess.MemoryMapsStat, error) { + stats := make([]gopsprocess.MemoryMapsStat, 0) return &stats, nil } -func (p *testProc) Metrics(prefix string, cfg *collectionConfig, t time.Time) ([]telegraf.Metric, error) { +func (p *testProc) metrics(prefix string, cfg *collectionConfig, t time.Time) ([]telegraf.Metric, error) { if prefix != "" { prefix += "_" } @@ -190,9 +186,9 @@ func (p *testProc) Metrics(prefix string, cfg *collectionConfig, t time.Time) ([ } if cfg.tagging["pid"] { - tags["pid"] = strconv.Itoa(int(p.pid)) + tags["pid"] = strconv.Itoa(int(p.procID)) } else { - fields["pid"] = int32(p.pid) + fields["pid"] = int32(p.procID) } if cfg.tagging["ppid"] { @@ -216,7 +212,7 @@ func (p *testProc) Metrics(prefix string, cfg *collectionConfig, t time.Time) ([ return []telegraf.Metric{metric.New("procstat", tags, fields, t)}, nil } -var pid = PID(42) +var processID = pid(42) var exe = "foo" func TestInitInvalidFinder(t *testing.T) { @@ -277,8 +273,8 @@ func TestGather_CreateProcessErrorOk(t *testing.T) { PidFinder: "test", Properties: []string{"cpu", "memory", "mmap"}, Log: testutil.Logger{}, - finder: newTestFinder([]PID{pid}), - createProcess: func(PID) (Process, error) { + finder: newTestFinder([]pid{processID}), + createProcess: func(pid) (process, error) { return nil, errors.New("createProcess error") }, } @@ -350,7 +346,7 @@ func TestGather_ProcessName(t *testing.T) { PidFinder: "test", Properties: []string{"cpu", "memory", "mmap"}, Log: testutil.Logger{}, - finder: newTestFinder([]PID{pid}), + finder: newTestFinder([]pid{processID}), createProcess: newTestProc, } require.NoError(t, p.Init()) @@ -362,14 +358,14 @@ func TestGather_ProcessName(t *testing.T) { } func TestGather_NoProcessNameUsesReal(t *testing.T) { - pid := PID(os.Getpid()) + processID := pid(os.Getpid()) p := Procstat{ Exe: exe, PidFinder: "test", Properties: []string{"cpu", "memory", "mmap"}, Log: testutil.Logger{}, - finder: newTestFinder([]PID{pid}), + finder: newTestFinder([]pid{processID}), createProcess: newTestProc, } require.NoError(t, p.Init()) @@ -386,7 +382,7 @@ func TestGather_NoPidTag(t *testing.T) { PidFinder: "test", Properties: []string{"cpu", "memory", "mmap"}, Log: testutil.Logger{}, - finder: newTestFinder([]PID{pid}), + finder: newTestFinder([]pid{processID}), createProcess: newTestProc, } require.NoError(t, p.Init()) @@ -405,7 +401,7 @@ func TestGather_PidTag(t *testing.T) { PidFinder: "test", Properties: []string{"cpu", "memory", "mmap"}, Log: testutil.Logger{}, - finder: newTestFinder([]PID{pid}), + finder: newTestFinder([]pid{processID}), createProcess: newTestProc, } require.NoError(t, p.Init()) @@ -424,7 +420,7 @@ func TestGather_Prefix(t *testing.T) { PidFinder: "test", Properties: []string{"cpu", "memory", "mmap"}, Log: testutil.Logger{}, - finder: newTestFinder([]PID{pid}), + finder: newTestFinder([]pid{processID}), createProcess: newTestProc, } require.NoError(t, p.Init()) @@ -441,7 +437,7 @@ func TestGather_Exe(t *testing.T) { PidFinder: "test", Properties: []string{"cpu", "memory", "mmap"}, Log: testutil.Logger{}, - finder: newTestFinder([]PID{pid}), + finder: newTestFinder([]pid{processID}), createProcess: newTestProc, } require.NoError(t, p.Init()) @@ -460,7 +456,7 @@ func TestGather_User(t *testing.T) { PidFinder: "test", Properties: []string{"cpu", "memory", "mmap"}, Log: testutil.Logger{}, - finder: newTestFinder([]PID{pid}), + finder: newTestFinder([]pid{processID}), createProcess: newTestProc, } require.NoError(t, p.Init()) @@ -479,7 +475,7 @@ func TestGather_Pattern(t *testing.T) { PidFinder: "test", Properties: []string{"cpu", "memory", "mmap"}, Log: testutil.Logger{}, - finder: newTestFinder([]PID{pid}), + finder: newTestFinder([]pid{processID}), createProcess: newTestProc, } require.NoError(t, p.Init()) @@ -498,7 +494,7 @@ func TestGather_PidFile(t *testing.T) { PidFinder: "test", Properties: []string{"cpu", "memory", "mmap"}, Log: testutil.Logger{}, - finder: newTestFinder([]PID{pid}), + finder: newTestFinder([]pid{processID}), createProcess: newTestProc, } require.NoError(t, p.Init()) @@ -510,7 +506,7 @@ func TestGather_PidFile(t *testing.T) { } func TestGather_PercentFirstPass(t *testing.T) { - pid := PID(os.Getpid()) + processID := pid(os.Getpid()) p := Procstat{ Pattern: "foo", @@ -518,7 +514,7 @@ func TestGather_PercentFirstPass(t *testing.T) { PidFinder: "test", Properties: []string{"cpu", "memory", "mmap"}, Log: testutil.Logger{}, - finder: newTestFinder([]PID{pid}), + finder: newTestFinder([]pid{processID}), createProcess: newProc, } require.NoError(t, p.Init()) @@ -531,7 +527,7 @@ func TestGather_PercentFirstPass(t *testing.T) { } func TestGather_PercentSecondPass(t *testing.T) { - pid := PID(os.Getpid()) + processID := pid(os.Getpid()) p := Procstat{ Pattern: "foo", @@ -539,7 +535,7 @@ func TestGather_PercentSecondPass(t *testing.T) { PidFinder: "test", Properties: []string{"cpu", "memory", "mmap"}, Log: testutil.Logger{}, - finder: newTestFinder([]PID{pid}), + finder: newTestFinder([]pid{processID}), createProcess: newProc, } require.NoError(t, p.Init()) @@ -558,7 +554,7 @@ func TestGather_systemdUnitPIDs(t *testing.T) { PidFinder: "test", Properties: []string{"cpu", "memory", "mmap"}, Log: testutil.Logger{}, - finder: newTestFinder([]PID{pid}), + finder: newTestFinder([]pid{processID}), } require.NoError(t, p.Init()) @@ -566,7 +562,7 @@ func TestGather_systemdUnitPIDs(t *testing.T) { require.NoError(t, err) for _, pidsTag := range pidsTags { - require.Equal(t, []PID{11408}, pidsTag.PIDs) + require.Equal(t, []pid{11408}, pidsTag.PIDs) require.Equal(t, "TestGather_systemdUnitPIDs", pidsTag.Tags["systemd_unit"]) } } @@ -585,14 +581,14 @@ func TestGather_cgroupPIDs(t *testing.T) { PidFinder: "test", Properties: []string{"cpu", "memory", "mmap"}, Log: testutil.Logger{}, - finder: newTestFinder([]PID{pid}), + finder: newTestFinder([]pid{processID}), } require.NoError(t, p.Init()) pidsTags, err := p.findPids() require.NoError(t, err) for _, pidsTag := range pidsTags { - require.Equal(t, []PID{1234, 5678}, pidsTag.PIDs) + require.Equal(t, []pid{1234, 5678}, pidsTag.PIDs) require.Equal(t, td, pidsTag.Tags["cgroup"]) } } @@ -603,7 +599,7 @@ func TestProcstatLookupMetric(t *testing.T) { PidFinder: "test", Properties: []string{"cpu", "memory", "mmap"}, Log: testutil.Logger{}, - finder: newTestFinder([]PID{543}), + finder: newTestFinder([]pid{543}), createProcess: newProc, } require.NoError(t, p.Init()) @@ -621,7 +617,7 @@ func TestGather_SameTimestamps(t *testing.T) { PidFinder: "test", Properties: []string{"cpu", "memory", "mmap"}, Log: testutil.Logger{}, - finder: newTestFinder([]PID{pid}), + finder: newTestFinder([]pid{processID}), createProcess: newTestProc, } require.NoError(t, p.Init()) @@ -641,14 +637,14 @@ func TestGather_supervisorUnitPIDs(t *testing.T) { PidFinder: "test", Properties: []string{"cpu", "memory", "mmap"}, Log: testutil.Logger{}, - finder: newTestFinder([]PID{pid}), + finder: newTestFinder([]pid{processID}), } require.NoError(t, p.Init()) pidsTags, err := p.findPids() require.NoError(t, err) for _, pidsTag := range pidsTags { - require.Equal(t, []PID{7311, 8111, 8112}, pidsTag.PIDs) + require.Equal(t, []pid{7311, 8111, 8112}, pidsTag.PIDs) require.Equal(t, "TestGather_supervisorUnitPIDs", pidsTag.Tags["supervisor_unit"]) } } @@ -659,7 +655,7 @@ func TestGather_MoresupervisorUnitPIDs(t *testing.T) { PidFinder: "test", Properties: []string{"cpu", "memory", "mmap"}, Log: testutil.Logger{}, - finder: newTestFinder([]PID{pid}), + finder: newTestFinder([]pid{processID}), } require.NoError(t, p.Init()) diff --git a/plugins/inputs/procstat/service_finders.go b/plugins/inputs/procstat/service_finders.go index 169c64f70957c..df9bc039d326f 100644 --- a/plugins/inputs/procstat/service_finders.go +++ b/plugins/inputs/procstat/service_finders.go @@ -8,8 +8,9 @@ import ( "strconv" "strings" + gopsprocess "github.com/shirou/gopsutil/v4/process" + "github.com/influxdata/telegraf" - "github.com/shirou/gopsutil/v4/process" ) type processFinder struct { @@ -36,13 +37,13 @@ func (f *processFinder) findByPidFiles(paths []string) ([]processGroup, error) { return nil, fmt.Errorf("failed to parse PID in file %q: %w", path, err) } - p, err := process.NewProcess(int32(pid)) + p, err := gopsprocess.NewProcess(int32(pid)) if err != nil && !f.errPidFiles[path] { f.log.Errorf("failed to find process for PID %d of file %q: %v", pid, path, err) f.errPidFiles[path] = true } groups = append(groups, processGroup{ - processes: []*process.Process{p}, + processes: []*gopsprocess.Process{p}, tags: map[string]string{"pidfile": path}, }) } @@ -76,7 +77,7 @@ func findByCgroups(cgroups []string) ([]processGroup, error) { return nil, err } lines := bytes.Split(buf, []byte{'\n'}) - procs := make([]*process.Process, 0, len(lines)) + procs := make([]*gopsprocess.Process, 0, len(lines)) for _, l := range lines { l := strings.TrimSpace(string(l)) if len(l) == 0 { @@ -86,7 +87,7 @@ func findByCgroups(cgroups []string) ([]processGroup, error) { if err != nil { return nil, fmt.Errorf("failed to parse PID %q in file %q", l, fpath) } - p, err := process.NewProcess(int32(pid)) + p, err := gopsprocess.NewProcess(int32(pid)) if err != nil { return nil, fmt.Errorf("failed to find process for PID %d of %q: %w", pid, fpath, err) } @@ -130,7 +131,7 @@ func findBySupervisorUnits(units string) ([]processGroup, error) { "status": status, } - var procs []*process.Process + var procs []*gopsprocess.Process switch status { case "FATAL", "EXITED", "BACKOFF", "STOPPING": tags["error"] = strings.Join(kv[2:], " ") @@ -141,7 +142,7 @@ func findBySupervisorUnits(units string) ([]processGroup, error) { if err != nil { return nil, fmt.Errorf("failed to parse group PID %q: %w", rawpid, err) } - p, err := process.NewProcess(int32(grouppid)) + p, err := gopsprocess.NewProcess(int32(grouppid)) if err != nil { return nil, fmt.Errorf("failed to find process for PID %d of unit %q: %w", grouppid, name, err) } diff --git a/plugins/inputs/prometheus/consul.go b/plugins/inputs/prometheus/consul.go index 431e3231996e0..9020929f598ab 100644 --- a/plugins/inputs/prometheus/consul.go +++ b/plugins/inputs/prometheus/consul.go @@ -14,17 +14,17 @@ import ( "github.com/influxdata/telegraf/config" ) -type ConsulConfig struct { +type consulConfig struct { // Address of the Consul agent. The address must contain a hostname or an IP address // and optionally a port (format: "host:port"). Enabled bool `toml:"enabled"` Agent string `toml:"agent"` QueryInterval config.Duration `toml:"query_interval"` - Queries []*ConsulQuery `toml:"query"` + Queries []*consulQuery `toml:"query"` } // One Consul service discovery query -type ConsulQuery struct { +type consulQuery struct { // A name of the searched services (not ID) ServiceName string `toml:"name"` @@ -128,7 +128,7 @@ func (p *Prometheus) startConsul(ctx context.Context) error { } func (p *Prometheus) refreshConsulServices(c *api.Catalog) error { - consulServiceURLs := make(map[string]URLAndAddress) + consulServiceURLs := make(map[string]urlAndAddress) p.Log.Debugf("Refreshing Consul services") @@ -165,8 +165,8 @@ func (p *Prometheus) refreshConsulServices(c *api.Catalog) error { p.Log.Infof("Created scrape URLs from Consul for Service (%s, %s)", q.ServiceName, q.ServiceTag) } q.lastQueryFailed = false - p.Log.Debugf("Adding scrape URL from Consul for Service (%s, %s): %s", q.ServiceName, q.ServiceTag, uaa.URL.String()) - consulServiceURLs[uaa.URL.String()] = *uaa + p.Log.Debugf("Adding scrape URL from Consul for Service (%s, %s): %s", q.ServiceName, q.ServiceTag, uaa.url.String()) + consulServiceURLs[uaa.url.String()] = *uaa } } @@ -177,7 +177,7 @@ func (p *Prometheus) refreshConsulServices(c *api.Catalog) error { return nil } -func (p *Prometheus) getConsulServiceURL(q *ConsulQuery, s *api.CatalogService) (*URLAndAddress, error) { +func (p *Prometheus) getConsulServiceURL(q *consulQuery, s *api.CatalogService) (*urlAndAddress, error) { var buffer bytes.Buffer buffer.Reset() err := q.serviceURLTemplate.Execute(&buffer, s) @@ -201,9 +201,9 @@ func (p *Prometheus) getConsulServiceURL(q *ConsulQuery, s *api.CatalogService) p.Log.Debugf("Will scrape metrics from Consul Service %s", serviceURL.String()) - return &URLAndAddress{ - URL: serviceURL, - OriginalURL: serviceURL, - Tags: extraTags, + return &urlAndAddress{ + url: serviceURL, + originalURL: serviceURL, + tags: extraTags, }, nil } diff --git a/plugins/inputs/prometheus/kubernetes.go b/plugins/inputs/prometheus/kubernetes.go index eefe5a215a8cf..2c4ef136c18ca 100644 --- a/plugins/inputs/prometheus/kubernetes.go +++ b/plugins/inputs/prometheus/kubernetes.go @@ -124,11 +124,11 @@ func shouldScrapePod(pod *corev1.Pod, p *Prometheus) bool { var shouldScrape bool switch p.MonitorKubernetesPodsMethod { - case MonitorMethodAnnotations: // must have 'true' annotation to be scraped + case monitorMethodAnnotations: // must have 'true' annotation to be scraped shouldScrape = pod.Annotations != nil && pod.Annotations["prometheus.io/scrape"] == "true" - case MonitorMethodSettings: // will be scraped regardless of annotation + case monitorMethodSettings: // will be scraped regardless of annotation shouldScrape = true - case MonitorMethodSettingsAndAnnotations: // will be scraped unless opts out with 'false' annotation + case monitorMethodSettingsAndAnnotations: // will be scraped unless opts out with 'false' annotation shouldScrape = pod.Annotations == nil || pod.Annotations["prometheus.io/scrape"] != "false" } @@ -194,7 +194,7 @@ func (p *Prometheus) watchPod(ctx context.Context, clientset *kubernetes.Clients if err != nil { p.Log.Errorf("getting key from cache %s", err.Error()) } - podID := PodID(key) + podID := podID(key) if shouldScrapePod(newPod, p) { // When Informers re-Lists, pod might already be registered, // do nothing if it is, register otherwise @@ -209,7 +209,7 @@ func (p *Prometheus) watchPod(ctx context.Context, clientset *kubernetes.Clients DeleteFunc: func(oldObj interface{}) { key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(oldObj) if err == nil { - unregisterPod(PodID(key), p) + unregisterPod(podID(key), p) } }, }) @@ -280,7 +280,7 @@ func updateCadvisorPodList(p *Prometheus, req *http.Request) error { // Updating pod list to be latest cadvisor response p.lock.Lock() - p.kubernetesPods = make(map[PodID]URLAndAddress) + p.kubernetesPods = make(map[podID]urlAndAddress) // Register pod only if it has an annotation to scrape, if it is ready, // and if namespace and selectors are specified and match @@ -419,7 +419,7 @@ func registerPod(pod *corev1.Pod, p *Prometheus) { tags[k] = v } } - podURL := p.AddressToURL(targetURL, targetURL.Hostname()) + podURL := p.addressToURL(targetURL, targetURL.Hostname()) // Locks earlier if using cAdvisor calls - makes a new list each time // rather than updating and removing from the same list @@ -427,12 +427,12 @@ func registerPod(pod *corev1.Pod, p *Prometheus) { p.lock.Lock() defer p.lock.Unlock() } - p.kubernetesPods[PodID(pod.GetNamespace()+"/"+pod.GetName())] = URLAndAddress{ - URL: podURL, - Address: targetURL.Hostname(), - OriginalURL: targetURL, - Tags: tags, - Namespace: pod.GetNamespace(), + p.kubernetesPods[podID(pod.GetNamespace()+"/"+pod.GetName())] = urlAndAddress{ + url: podURL, + address: targetURL.Hostname(), + originalURL: targetURL, + tags: tags, + namespace: pod.GetNamespace(), } } @@ -446,15 +446,15 @@ func getScrapeURL(pod *corev1.Pod, p *Prometheus) (*url.URL, error) { var scheme, pathAndQuery, port string - if p.MonitorKubernetesPodsMethod == MonitorMethodSettings || - p.MonitorKubernetesPodsMethod == MonitorMethodSettingsAndAnnotations { + if p.MonitorKubernetesPodsMethod == monitorMethodSettings || + p.MonitorKubernetesPodsMethod == monitorMethodSettingsAndAnnotations { scheme = p.MonitorKubernetesPodsScheme pathAndQuery = p.MonitorKubernetesPodsPath port = strconv.Itoa(p.MonitorKubernetesPodsPort) } - if p.MonitorKubernetesPodsMethod == MonitorMethodAnnotations || - p.MonitorKubernetesPodsMethod == MonitorMethodSettingsAndAnnotations { + if p.MonitorKubernetesPodsMethod == monitorMethodAnnotations || + p.MonitorKubernetesPodsMethod == monitorMethodSettingsAndAnnotations { if ann := pod.Annotations["prometheus.io/scheme"]; ann != "" { scheme = ann } @@ -489,12 +489,12 @@ func getScrapeURL(pod *corev1.Pod, p *Prometheus) (*url.URL, error) { return base, nil } -func unregisterPod(podID PodID, p *Prometheus) { +func unregisterPod(podID podID, p *Prometheus) { p.lock.Lock() defer p.lock.Unlock() if v, ok := p.kubernetesPods[podID]; ok { p.Log.Debugf("registered a delete request for %s", podID) delete(p.kubernetesPods, podID) - p.Log.Debugf("will stop scraping for %q", v.URL.String()) + p.Log.Debugf("will stop scraping for %q", v.url.String()) } } diff --git a/plugins/inputs/prometheus/kubernetes_test.go b/plugins/inputs/prometheus/kubernetes_test.go index 98be067b395d1..5e2e2e3ca8cfb 100644 --- a/plugins/inputs/prometheus/kubernetes_test.go +++ b/plugins/inputs/prometheus/kubernetes_test.go @@ -18,8 +18,8 @@ func initPrometheus() *Prometheus { prom.MonitorKubernetesPodsScheme = "http" prom.MonitorKubernetesPodsPort = 9102 prom.MonitorKubernetesPodsPath = "/metrics" - prom.MonitorKubernetesPodsMethod = MonitorMethodAnnotations - prom.kubernetesPods = map[PodID]URLAndAddress{} + prom.MonitorKubernetesPodsMethod = monitorMethodAnnotations + prom.kubernetesPods = map[podID]urlAndAddress{} return prom } @@ -34,7 +34,7 @@ func TestScrapeURLNoAnnotations(t *testing.T) { func TestScrapeURLNoAnnotationsScrapeConfig(t *testing.T) { prom := initPrometheus() - prom.MonitorKubernetesPodsMethod = MonitorMethodSettingsAndAnnotations + prom.MonitorKubernetesPodsMethod = monitorMethodSettingsAndAnnotations p := pod() p.Annotations = map[string]string{} @@ -45,7 +45,7 @@ func TestScrapeURLNoAnnotationsScrapeConfig(t *testing.T) { func TestScrapeURLScrapeConfigCustom(t *testing.T) { prom := initPrometheus() - prom.MonitorKubernetesPodsMethod = MonitorMethodSettingsAndAnnotations + prom.MonitorKubernetesPodsMethod = monitorMethodSettingsAndAnnotations prom.MonitorKubernetesPodsScheme = "https" prom.MonitorKubernetesPodsPort = 9999 @@ -66,7 +66,7 @@ func TestScrapeURLAnnotations(t *testing.T) { func TestScrapeURLAnnotationsScrapeConfig(t *testing.T) { prom := initPrometheus() - prom.MonitorKubernetesPodsMethod = MonitorMethodSettingsAndAnnotations + prom.MonitorKubernetesPodsMethod = monitorMethodSettingsAndAnnotations p := pod() url, err := getScrapeURL(p, prom) require.NoError(t, err) @@ -84,7 +84,7 @@ func TestScrapeURLAnnotationsCustomPort(t *testing.T) { func TestScrapeURLAnnotationsCustomPortScrapeConfig(t *testing.T) { prom := initPrometheus() - prom.MonitorKubernetesPodsMethod = MonitorMethodSettingsAndAnnotations + prom.MonitorKubernetesPodsMethod = monitorMethodSettingsAndAnnotations p := pod() p.Annotations = map[string]string{"prometheus.io/port": "9000"} url, err := getScrapeURL(p, prom) @@ -129,7 +129,7 @@ func TestScrapeURLAnnotationsCustomPathWithFragment(t *testing.T) { } func TestAddPod(t *testing.T) { - prom := &Prometheus{Log: testutil.Logger{}, kubernetesPods: map[PodID]URLAndAddress{}} + prom := &Prometheus{Log: testutil.Logger{}, kubernetesPods: map[podID]urlAndAddress{}} p := pod() p.Annotations = map[string]string{"prometheus.io/scrape": "true"} @@ -139,7 +139,7 @@ func TestAddPod(t *testing.T) { func TestAddPodScrapeConfig(t *testing.T) { prom := initPrometheus() - prom.MonitorKubernetesPodsMethod = MonitorMethodSettingsAndAnnotations + prom.MonitorKubernetesPodsMethod = monitorMethodSettingsAndAnnotations p := pod() p.Annotations = map[string]string{} @@ -148,7 +148,7 @@ func TestAddPodScrapeConfig(t *testing.T) { } func TestAddMultipleDuplicatePods(t *testing.T) { - prom := &Prometheus{Log: testutil.Logger{}, kubernetesPods: map[PodID]URLAndAddress{}} + prom := &Prometheus{Log: testutil.Logger{}, kubernetesPods: map[podID]urlAndAddress{}} p := pod() p.Annotations = map[string]string{"prometheus.io/scrape": "true"} @@ -156,13 +156,13 @@ func TestAddMultipleDuplicatePods(t *testing.T) { p.Name = "Pod2" registerPod(p, prom) - urls, err := prom.GetAllURLs() + urls, err := prom.getAllURLs() require.NoError(t, err) require.Len(t, urls, 1) } func TestAddMultiplePods(t *testing.T) { - prom := &Prometheus{Log: testutil.Logger{}, kubernetesPods: map[PodID]URLAndAddress{}} + prom := &Prometheus{Log: testutil.Logger{}, kubernetesPods: map[podID]urlAndAddress{}} p := pod() p.Annotations = map[string]string{"prometheus.io/scrape": "true"} @@ -174,41 +174,41 @@ func TestAddMultiplePods(t *testing.T) { } func TestDeletePods(t *testing.T) { - prom := &Prometheus{Log: testutil.Logger{}, kubernetesPods: map[PodID]URLAndAddress{}} + prom := &Prometheus{Log: testutil.Logger{}, kubernetesPods: map[podID]urlAndAddress{}} p := pod() p.Annotations = map[string]string{"prometheus.io/scrape": "true"} registerPod(p, prom) - podID, err := cache.MetaNamespaceKeyFunc(p) + id, err := cache.MetaNamespaceKeyFunc(p) require.NoError(t, err) - unregisterPod(PodID(podID), prom) + unregisterPod(podID(id), prom) require.Empty(t, prom.kubernetesPods) } func TestKeepDefaultNamespaceLabelName(t *testing.T) { - prom := &Prometheus{Log: testutil.Logger{}, kubernetesPods: map[PodID]URLAndAddress{}} + prom := &Prometheus{Log: testutil.Logger{}, kubernetesPods: map[podID]urlAndAddress{}} p := pod() p.Annotations = map[string]string{"prometheus.io/scrape": "true"} registerPod(p, prom) - podID, err := cache.MetaNamespaceKeyFunc(p) + id, err := cache.MetaNamespaceKeyFunc(p) require.NoError(t, err) - tags := prom.kubernetesPods[PodID(podID)].Tags + tags := prom.kubernetesPods[podID(id)].tags require.Equal(t, "default", tags["namespace"]) } func TestChangeNamespaceLabelName(t *testing.T) { - prom := &Prometheus{Log: testutil.Logger{}, PodNamespaceLabelName: "pod_namespace", kubernetesPods: map[PodID]URLAndAddress{}} + prom := &Prometheus{Log: testutil.Logger{}, PodNamespaceLabelName: "pod_namespace", kubernetesPods: map[podID]urlAndAddress{}} p := pod() p.Annotations = map[string]string{"prometheus.io/scrape": "true"} registerPod(p, prom) - podID, err := cache.MetaNamespaceKeyFunc(p) + id, err := cache.MetaNamespaceKeyFunc(p) require.NoError(t, err) - tags := prom.kubernetesPods[PodID(podID)].Tags + tags := prom.kubernetesPods[podID(id)].tags require.Equal(t, "default", tags["pod_namespace"]) require.Equal(t, "", tags["namespace"]) } @@ -300,14 +300,14 @@ func TestAnnotationFilters(t *testing.T) { for _, tc := range cases { t.Run(tc.desc, func(t *testing.T) { - prom := &Prometheus{Log: testutil.Logger{}, kubernetesPods: map[PodID]URLAndAddress{}} + prom := &Prometheus{Log: testutil.Logger{}, kubernetesPods: map[podID]urlAndAddress{}} prom.PodAnnotationInclude = tc.include prom.PodAnnotationExclude = tc.exclude require.NoError(t, prom.initFilters()) registerPod(p, prom) for _, pd := range prom.kubernetesPods { for _, tagKey := range tc.expectedTags { - require.Contains(t, pd.Tags, tagKey) + require.Contains(t, pd.tags, tagKey) } } }) @@ -345,14 +345,14 @@ func TestLabelFilters(t *testing.T) { for _, tc := range cases { t.Run(tc.desc, func(t *testing.T) { - prom := &Prometheus{Log: testutil.Logger{}, kubernetesPods: map[PodID]URLAndAddress{}} + prom := &Prometheus{Log: testutil.Logger{}, kubernetesPods: map[podID]urlAndAddress{}} prom.PodLabelInclude = tc.include prom.PodLabelExclude = tc.exclude require.NoError(t, prom.initFilters()) registerPod(p, prom) for _, pd := range prom.kubernetesPods { for _, tagKey := range tc.expectedTags { - require.Contains(t, pd.Tags, tagKey) + require.Contains(t, pd.tags, tagKey) } } }) diff --git a/plugins/inputs/prometheus/prometheus.go b/plugins/inputs/prometheus/prometheus.go index 191d27dd29a58..8b557a9cab979 100644 --- a/plugins/inputs/prometheus/prometheus.go +++ b/plugins/inputs/prometheus/prometheus.go @@ -34,18 +34,14 @@ import ( //go:embed sample.conf var sampleConfig string -const acceptHeader = `application/vnd.google.protobuf;proto=io.prometheus.client.MetricFamily;encoding=delimited;q=0.7,text/plain;version=0.0.4;q=0.3` - -type MonitorMethod string - const ( - MonitorMethodNone MonitorMethod = "" - MonitorMethodAnnotations MonitorMethod = "annotations" - MonitorMethodSettings MonitorMethod = "settings" - MonitorMethodSettingsAndAnnotations MonitorMethod = "settings+annotations" -) + acceptHeader = `application/vnd.google.protobuf;proto=io.prometheus.client.MetricFamily;encoding=delimited;q=0.7,text/plain;version=0.0.4;q=0.3` -type PodID string + monitorMethodNone monitorMethod = "" + monitorMethodAnnotations monitorMethod = "annotations" + monitorMethodSettings monitorMethod = "settings" + monitorMethodSettingsAndAnnotations monitorMethod = "settings+annotations" +) type Prometheus struct { URLs []string `toml:"urls"` @@ -72,7 +68,7 @@ type Prometheus struct { KubeConfig string `toml:"kube_config"` KubernetesLabelSelector string `toml:"kubernetes_label_selector"` KubernetesFieldSelector string `toml:"kubernetes_field_selector"` - MonitorKubernetesPodsMethod MonitorMethod `toml:"monitor_kubernetes_pods_method"` + MonitorKubernetesPodsMethod monitorMethod `toml:"monitor_kubernetes_pods_method"` MonitorKubernetesPodsScheme string `toml:"monitor_kubernetes_pods_scheme"` MonitorKubernetesPodsPath string `toml:"monitor_kubernetes_pods_path"` MonitorKubernetesPodsPort int `toml:"monitor_kubernetes_pods_port"` @@ -85,7 +81,7 @@ type Prometheus struct { CacheRefreshInterval int `toml:"cache_refresh_interval"` // Consul discovery - ConsulConfig ConsulConfig `toml:"consul"` + ConsulConfig consulConfig `toml:"consul"` Log telegraf.Logger `toml:"-"` common_http.HTTPClientConfig @@ -100,7 +96,7 @@ type Prometheus struct { // Should we scrape Kubernetes services for prometheus annotations lock sync.Mutex - kubernetesPods map[PodID]URLAndAddress + kubernetesPods map[podID]urlAndAddress cancel context.CancelFunc wg sync.WaitGroup @@ -114,9 +110,21 @@ type Prometheus struct { podLabelExcludeFilter filter.Filter // List of consul services to scrape - consulServices map[string]URLAndAddress + consulServices map[string]urlAndAddress } +type urlAndAddress struct { + originalURL *url.URL + url *url.URL + address string + tags map[string]string + namespace string +} + +type monitorMethod string + +type podID string + func (*Prometheus) SampleConfig() string { return sampleConfig } @@ -164,8 +172,8 @@ func (p *Prometheus) Init() error { p.Log.Infof("Using pod scrape scope at node level to get pod list using cAdvisor.") } - if p.MonitorKubernetesPodsMethod == MonitorMethodNone { - p.MonitorKubernetesPodsMethod = MonitorMethodAnnotations + if p.MonitorKubernetesPodsMethod == monitorMethodNone { + p.MonitorKubernetesPodsMethod = monitorMethodAnnotations } // Parse label and field selectors - will be used to filter pods after cAdvisor call @@ -239,11 +247,65 @@ func (p *Prometheus) Init() error { "Accept": acceptHeader, } - p.kubernetesPods = make(map[PodID]URLAndAddress) + p.kubernetesPods = make(map[podID]urlAndAddress) return nil } +// Start will start the Kubernetes and/or Consul scraping if enabled in the configuration +func (p *Prometheus) Start(_ telegraf.Accumulator) error { + var ctx context.Context + p.wg = sync.WaitGroup{} + ctx, p.cancel = context.WithCancel(context.Background()) + + if p.ConsulConfig.Enabled && len(p.ConsulConfig.Queries) > 0 { + if err := p.startConsul(ctx); err != nil { + return err + } + } + if p.MonitorPods { + if err := p.startK8s(ctx); err != nil { + return err + } + } + return nil +} + +func (p *Prometheus) Gather(acc telegraf.Accumulator) error { + var wg sync.WaitGroup + + allURLs, err := p.getAllURLs() + if err != nil { + return err + } + for _, URL := range allURLs { + wg.Add(1) + go func(serviceURL urlAndAddress) { + defer wg.Done() + requestFields, tags, err := p.gatherURL(serviceURL, acc) + acc.AddError(err) + + // Add metrics + if p.EnableRequestMetrics { + acc.AddFields("prometheus_request", requestFields, tags) + } + }(URL) + } + + wg.Wait() + + return nil +} + +func (p *Prometheus) Stop() { + p.cancel() + p.wg.Wait() + + if p.client != nil { + p.client.CloseIdleConnections() + } +} + func (p *Prometheus) initFilters() error { if p.PodAnnotationExclude != nil { podAnnotationExclude, err := filter.Compile(p.PodAnnotationExclude) @@ -276,7 +338,7 @@ func (p *Prometheus) initFilters() error { return nil } -func (p *Prometheus) AddressToURL(u *url.URL, address string) *url.URL { +func (p *Prometheus) addressToURL(u *url.URL, address string) *url.URL { host := address if u.Port() != "" { host = address + ":" + u.Port() @@ -295,23 +357,15 @@ func (p *Prometheus) AddressToURL(u *url.URL, address string) *url.URL { return reconstructedURL } -type URLAndAddress struct { - OriginalURL *url.URL - URL *url.URL - Address string - Tags map[string]string - Namespace string -} - -func (p *Prometheus) GetAllURLs() (map[string]URLAndAddress, error) { - allURLs := make(map[string]URLAndAddress, len(p.URLs)+len(p.consulServices)+len(p.kubernetesPods)) +func (p *Prometheus) getAllURLs() (map[string]urlAndAddress, error) { + allURLs := make(map[string]urlAndAddress, len(p.URLs)+len(p.consulServices)+len(p.kubernetesPods)) for _, u := range p.URLs { address, err := url.Parse(u) if err != nil { p.Log.Errorf("Could not parse %q, skipping it. Error: %s", u, err.Error()) continue } - allURLs[address.String()] = URLAndAddress{URL: address, OriginalURL: address} + allURLs[address.String()] = urlAndAddress{url: address, originalURL: address} } p.lock.Lock() @@ -322,8 +376,8 @@ func (p *Prometheus) GetAllURLs() (map[string]URLAndAddress, error) { } // loop through all pods scraped via the prometheus annotation on the pods for _, v := range p.kubernetesPods { - if namespaceAnnotationMatch(v.Namespace, p) { - allURLs[v.URL.String()] = v + if namespaceAnnotationMatch(v.namespace, p) { + allURLs[v.url.String()] = v } } @@ -339,62 +393,34 @@ func (p *Prometheus) GetAllURLs() (map[string]URLAndAddress, error) { continue } for _, resolved := range resolvedAddresses { - serviceURL := p.AddressToURL(address, resolved) - allURLs[serviceURL.String()] = URLAndAddress{ - URL: serviceURL, - Address: resolved, - OriginalURL: address, + serviceURL := p.addressToURL(address, resolved) + allURLs[serviceURL.String()] = urlAndAddress{ + url: serviceURL, + address: resolved, + originalURL: address, } } } return allURLs, nil } -// Reads stats from all configured servers accumulates stats. -// Returns one of the errors encountered while gather stats (if any). -func (p *Prometheus) Gather(acc telegraf.Accumulator) error { - var wg sync.WaitGroup - - allURLs, err := p.GetAllURLs() - if err != nil { - return err - } - for _, URL := range allURLs { - wg.Add(1) - go func(serviceURL URLAndAddress) { - defer wg.Done() - requestFields, tags, err := p.gatherURL(serviceURL, acc) - acc.AddError(err) - - // Add metrics - if p.EnableRequestMetrics { - acc.AddFields("prometheus_request", requestFields, tags) - } - }(URL) - } - - wg.Wait() - - return nil -} - -func (p *Prometheus) gatherURL(u URLAndAddress, acc telegraf.Accumulator) (map[string]interface{}, map[string]string, error) { +func (p *Prometheus) gatherURL(u urlAndAddress, acc telegraf.Accumulator) (map[string]interface{}, map[string]string, error) { var req *http.Request var uClient *http.Client requestFields := make(map[string]interface{}) - tags := make(map[string]string, len(u.Tags)+2) + tags := make(map[string]string, len(u.tags)+2) if p.URLTag != "" { - tags[p.URLTag] = u.OriginalURL.String() + tags[p.URLTag] = u.originalURL.String() } - if u.Address != "" { - tags["address"] = u.Address + if u.address != "" { + tags["address"] = u.address } - for k, v := range u.Tags { + for k, v := range u.tags { tags[k] = v } - if u.URL.Scheme == "unix" { - path := u.URL.Query().Get("path") + if u.url.Scheme == "unix" { + path := u.url.Query().Get("path") if path == "" { path = "/metrics" } @@ -413,19 +439,19 @@ func (p *Prometheus) gatherURL(u URLAndAddress, acc telegraf.Accumulator) (map[s TLSClientConfig: tlsCfg, DisableKeepAlives: true, Dial: func(string, string) (net.Conn, error) { - c, err := net.Dial("unix", u.URL.Path) + c, err := net.Dial("unix", u.url.Path) return c, err }, }, } } else { - if u.URL.Path == "" { - u.URL.Path = "/metrics" + if u.url.Path == "" { + u.url.Path = "/metrics" } var err error - req, err = http.NewRequest("GET", u.URL.String(), nil) + req, err = http.NewRequest("GET", u.url.String(), nil) if err != nil { - return nil, nil, fmt.Errorf("unable to create new request %q: %w", u.URL.String(), err) + return nil, nil, fmt.Errorf("unable to create new request %q: %w", u.url.String(), err) } } @@ -469,7 +495,7 @@ func (p *Prometheus) gatherURL(u URLAndAddress, acc telegraf.Accumulator) (map[s var err error var resp *http.Response var start time.Time - if u.URL.Scheme != "unix" { + if u.url.Scheme != "unix" { start = time.Now() //nolint:bodyclose // False positive (because of if-else) - body will be closed in `defer` resp, err = p.client.Do(req) @@ -480,14 +506,14 @@ func (p *Prometheus) gatherURL(u URLAndAddress, acc telegraf.Accumulator) (map[s } end := time.Since(start).Seconds() if err != nil { - return requestFields, tags, fmt.Errorf("error making HTTP request to %q: %w", u.URL, err) + return requestFields, tags, fmt.Errorf("error making HTTP request to %q: %w", u.url, err) } requestFields["response_time"] = end defer resp.Body.Close() if resp.StatusCode != http.StatusOK { - return requestFields, tags, fmt.Errorf("%q returned HTTP status %q", u.URL, resp.Status) + return requestFields, tags, fmt.Errorf("%q returned HTTP status %q", u.url, resp.Status) } var body []byte @@ -504,7 +530,7 @@ func (p *Prometheus) gatherURL(u URLAndAddress, acc telegraf.Accumulator) (map[s return requestFields, tags, fmt.Errorf("error reading body: %w", err) } if int64(len(body)) > limit { - p.Log.Infof("skipping %s: content length exceeded maximum body size (%d)", u.URL, limit) + p.Log.Infof("skipping %s: content length exceeded maximum body size (%d)", u.url, limit) return requestFields, tags, nil } } else { @@ -539,20 +565,20 @@ func (p *Prometheus) gatherURL(u URLAndAddress, acc telegraf.Accumulator) (map[s } metrics, err := metricParser.Parse(body) if err != nil { - return requestFields, tags, fmt.Errorf("error reading metrics for %q: %w", u.URL, err) + return requestFields, tags, fmt.Errorf("error reading metrics for %q: %w", u.url, err) } for _, metric := range metrics { tags := metric.Tags() // strip user and password from URL - u.OriginalURL.User = nil + u.originalURL.User = nil if p.URLTag != "" { - tags[p.URLTag] = u.OriginalURL.String() + tags[p.URLTag] = u.originalURL.String() } - if u.Address != "" { - tags["address"] = u.Address + if u.address != "" { + tags["address"] = u.address } - for k, v := range u.Tags { + for k, v := range u.tags { tags[k] = v } @@ -603,39 +629,11 @@ func fieldSelectorIsSupported(fieldSelector fields.Selector) (bool, string) { return true, "" } -// Start will start the Kubernetes and/or Consul scraping if enabled in the configuration -func (p *Prometheus) Start(_ telegraf.Accumulator) error { - var ctx context.Context - p.wg = sync.WaitGroup{} - ctx, p.cancel = context.WithCancel(context.Background()) - - if p.ConsulConfig.Enabled && len(p.ConsulConfig.Queries) > 0 { - if err := p.startConsul(ctx); err != nil { - return err - } - } - if p.MonitorPods { - if err := p.startK8s(ctx); err != nil { - return err - } - } - return nil -} - -func (p *Prometheus) Stop() { - p.cancel() - p.wg.Wait() - - if p.client != nil { - p.client.CloseIdleConnections() - } -} - func init() { inputs.Add("prometheus", func() telegraf.Input { return &Prometheus{ - kubernetesPods: make(map[PodID]URLAndAddress), - consulServices: make(map[string]URLAndAddress), + kubernetesPods: make(map[podID]urlAndAddress), + consulServices: make(map[string]urlAndAddress), URLTag: "url", } }) diff --git a/plugins/inputs/prometheus/prometheus_test.go b/plugins/inputs/prometheus/prometheus_test.go index 995ec9c8dcb8c..cdb723da3d357 100644 --- a/plugins/inputs/prometheus/prometheus_test.go +++ b/plugins/inputs/prometheus/prometheus_test.go @@ -630,7 +630,7 @@ func TestInitConfigSelectors(t *testing.T) { URLs: nil, URLTag: "url", MonitorPods: true, - MonitorKubernetesPodsMethod: MonitorMethodSettings, + MonitorKubernetesPodsMethod: monitorMethodSettings, PodScrapeInterval: 60, KubernetesLabelSelector: "app=test", KubernetesFieldSelector: "spec.nodeName=node-0", diff --git a/plugins/inputs/proxmox/proxmox.go b/plugins/inputs/proxmox/proxmox.go index 22729e5ce76c2..7d4cebf2b9a95 100644 --- a/plugins/inputs/proxmox/proxmox.go +++ b/plugins/inputs/proxmox/proxmox.go @@ -23,18 +23,6 @@ func (*Proxmox) SampleConfig() string { return sampleConfig } -func (px *Proxmox) Gather(acc telegraf.Accumulator) error { - err := getNodeSearchDomain(px) - if err != nil { - return err - } - - gatherLxcData(px, acc) - gatherQemuData(px, acc) - - return nil -} - func (px *Proxmox) Init() error { // Set hostname as default node name for backwards compatibility if px.NodeName == "" { @@ -57,12 +45,16 @@ func (px *Proxmox) Init() error { return nil } -func init() { - inputs.Add("proxmox", func() telegraf.Input { - return &Proxmox{ - requestFunction: performRequest, - } - }) +func (px *Proxmox) Gather(acc telegraf.Accumulator) error { + err := getNodeSearchDomain(px) + if err != nil { + return err + } + + gatherLxcData(px, acc) + gatherQemuData(px, acc) + + return nil } func getNodeSearchDomain(px *Proxmox) error { @@ -274,3 +266,11 @@ func getTags(px *Proxmox, name string, vmConfig vmConfig, rt resourceType) map[s "vm_type": string(rt), } } + +func init() { + inputs.Add("proxmox", func() telegraf.Input { + return &Proxmox{ + requestFunction: performRequest, + } + }) +} diff --git a/plugins/inputs/proxmox/structs.go b/plugins/inputs/proxmox/structs.go index 941af52fb8a2b..47b6856f61e86 100644 --- a/plugins/inputs/proxmox/structs.go +++ b/plugins/inputs/proxmox/structs.go @@ -10,28 +10,28 @@ import ( "github.com/influxdata/telegraf/plugins/common/tls" ) +var ( + qemu resourceType = "qemu" + lxc resourceType = "lxc" +) + type Proxmox struct { BaseURL string `toml:"base_url"` APIToken string `toml:"api_token"` ResponseTimeout config.Duration `toml:"response_timeout"` NodeName string `toml:"node_name"` - tls.ClientConfig - httpClient *http.Client - nodeSearchDomain string + Log telegraf.Logger `toml:"-"` - requestFunction func(px *Proxmox, apiUrl string, method string, data url.Values) ([]byte, error) - Log telegraf.Logger `toml:"-"` + httpClient *http.Client + + nodeSearchDomain string + requestFunction func(px *Proxmox, apiUrl string, method string, data url.Values) ([]byte, error) } type resourceType string -var ( - qemu resourceType = "qemu" - lxc resourceType = "lxc" -) - type vmStats struct { Data []vmStat `json:"data"` } diff --git a/plugins/inputs/puppetagent/puppetagent.go b/plugins/inputs/puppetagent/puppetagent.go index f4332858d9d29..d7cc5d882d877 100644 --- a/plugins/inputs/puppetagent/puppetagent.go +++ b/plugins/inputs/puppetagent/puppetagent.go @@ -17,12 +17,11 @@ import ( //go:embed sample.conf var sampleConfig string -// PuppetAgent is a PuppetAgent plugin type PuppetAgent struct { - Location string + Location string `toml:"location"` } -type State struct { +type state struct { Events event Resources resource Changes change @@ -101,7 +100,7 @@ func (pa *PuppetAgent) Gather(acc telegraf.Accumulator) error { return err } - var puppetState State + var puppetState state err = yaml.Unmarshal(fh, &puppetState) if err != nil { @@ -114,7 +113,7 @@ func (pa *PuppetAgent) Gather(acc telegraf.Accumulator) error { return nil } -func structPrinter(s *State, acc telegraf.Accumulator, tags map[string]string) { +func structPrinter(s *state, acc telegraf.Accumulator, tags map[string]string) { e := reflect.ValueOf(s).Elem() fields := make(map[string]interface{})