From 942d2b3f6fda6281bfbde8c102eb292ce443a92b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20=C5=BBak?= Date: Tue, 12 Nov 2024 18:26:23 +0100 Subject: [PATCH] chore: Fix linter findings for `revive:exported` in `plugins/inputs/l*` (#16167) --- internal/env.go | 19 + plugins/inputs/bond/bond.go | 22 +- plugins/inputs/dpdk/mocks/conn.go | 58 +- .../intel_baseband/{mock => mocks}/conn.go | 58 +- .../intel_baseband/sock_connector_test.go | 2 +- ...{mock_ras_reader.go => ras_reader_mock.go} | 29 +- plugins/inputs/lanz/lanz.go | 24 +- plugins/inputs/lanz/lanz_test.go | 10 +- plugins/inputs/leofs/leofs.go | 63 +- plugins/inputs/leofs/leofs_test.go | 15 +- plugins/inputs/libvirt/libvirt.go | 78 +- plugins/inputs/libvirt/libvirt_test.go | 88 +- plugins/inputs/libvirt/libvirt_utils.go | 30 +- plugins/inputs/libvirt/libvirt_utils_mock.go | 85 +- plugins/inputs/linux_cpu/linux_cpu.go | 24 +- .../inputs/linux_cpu/linux_cpu_nonlinux.go | 6 +- .../inputs/linux_sysctl_fs/linux_sysctl_fs.go | 71 +- plugins/inputs/logparser/logparser.go | 182 ++- plugins/inputs/logparser/logparser_test.go | 22 +- plugins/inputs/logstash/logstash.go | 147 ++- plugins/inputs/lustre2/lustre2.go | 1015 ++++++++--------- plugins/inputs/lustre2/lustre2_notlinux.go | 6 +- plugins/inputs/lvm/lvm_test.go | 3 +- plugins/inputs/mdstat/mdstat.go | 18 +- plugins/inputs/net/net.go | 6 +- .../inputs/processes/processes_notwindows.go | 4 +- plugins/inputs/procstat/os_linux.go | 14 +- plugins/inputs/slab/slab.go | 10 +- plugins/inputs/synproxy/synproxy.go | 12 +- plugins/inputs/temp/temp_linux.go | 6 +- plugins/inputs/wireless/wireless_linux.go | 29 +- 31 files changed, 1112 insertions(+), 1044 deletions(-) create mode 100644 internal/env.go rename plugins/inputs/intel_baseband/{mock => mocks}/conn.go (67%) rename plugins/inputs/intel_dlb/{mock_ras_reader.go => ras_reader_mock.go} (74%) diff --git a/internal/env.go b/internal/env.go new file mode 100644 index 0000000000000..e91c4f0440991 --- /dev/null +++ b/internal/env.go @@ -0,0 +1,19 @@ +package internal + +import "os" + +// GetProcPath returns the path stored in HOST_PROC env variable, or /proc if HOST_PROC has not been set. +func GetProcPath() string { + if hostProc := os.Getenv("HOST_PROC"); hostProc != "" { + return hostProc + } + return "/proc" +} + +// GetSysPath returns the path stored in HOST_SYS env variable, or /sys if HOST_SYS has not been set. +func GetSysPath() string { + if hostSys := os.Getenv("HOST_SYS"); hostSys != "" { + return hostSys + } + return "/sys" +} diff --git a/plugins/inputs/bond/bond.go b/plugins/inputs/bond/bond.go index fb5bded85492e..a5c244ad3ce3e 100644 --- a/plugins/inputs/bond/bond.go +++ b/plugins/inputs/bond/bond.go @@ -11,19 +11,13 @@ import ( "strings" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/inputs" ) //go:embed sample.conf var sampleConfig string -const ( - defaultHostProc = "/proc" - defaultHostSys = "/sys" - envProc = "HOST_PROC" - envSys = "HOST_SYS" -) - type Bond struct { HostProc string `toml:"host_proc"` HostSys string `toml:"host_sys"` @@ -286,21 +280,11 @@ func (bond *Bond) gatherSlavePart(bondName, rawFile string, acc telegraf.Accumul // if it is empty then try read from env variable func (bond *Bond) loadPaths() { if bond.HostProc == "" { - bond.HostProc = proc(envProc, defaultHostProc) + bond.HostProc = internal.GetProcPath() } if bond.HostSys == "" { - bond.HostSys = proc(envSys, defaultHostSys) - } -} - -// proc can be used to read file paths from env -func proc(env, path string) string { - // try to read full file path - if p := os.Getenv(env); p != "" { - return p + bond.HostSys = internal.GetSysPath() } - // return default path - return path } func (bond *Bond) listInterfaces() ([]string, error) { diff --git a/plugins/inputs/dpdk/mocks/conn.go b/plugins/inputs/dpdk/mocks/conn.go index 58961039dce86..4f8d231e2756c 100644 --- a/plugins/inputs/dpdk/mocks/conn.go +++ b/plugins/inputs/dpdk/mocks/conn.go @@ -1,4 +1,4 @@ -// Code generated by mockery v0.0.0-dev. DO NOT EDIT. +// Code generated by mockery v2.46.3. DO NOT EDIT. package mocks @@ -19,6 +19,10 @@ type Conn struct { func (_m *Conn) Close() error { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for Close") + } + var r0 error if rf, ok := ret.Get(0).(func() error); ok { r0 = rf() @@ -33,6 +37,10 @@ func (_m *Conn) Close() error { func (_m *Conn) LocalAddr() net.Addr { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for LocalAddr") + } + var r0 net.Addr if rf, ok := ret.Get(0).(func() net.Addr); ok { r0 = rf() @@ -49,14 +57,21 @@ func (_m *Conn) LocalAddr() net.Addr { func (_m *Conn) Read(b []byte) (int, error) { ret := _m.Called(b) + if len(ret) == 0 { + panic("no return value specified for Read") + } + var r0 int + var r1 error + if rf, ok := ret.Get(0).(func([]byte) (int, error)); ok { + return rf(b) + } if rf, ok := ret.Get(0).(func([]byte) int); ok { r0 = rf(b) } else { r0 = ret.Get(0).(int) } - var r1 error if rf, ok := ret.Get(1).(func([]byte) error); ok { r1 = rf(b) } else { @@ -70,6 +85,10 @@ func (_m *Conn) Read(b []byte) (int, error) { func (_m *Conn) RemoteAddr() net.Addr { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for RemoteAddr") + } + var r0 net.Addr if rf, ok := ret.Get(0).(func() net.Addr); ok { r0 = rf() @@ -86,6 +105,10 @@ func (_m *Conn) RemoteAddr() net.Addr { func (_m *Conn) SetDeadline(t time.Time) error { ret := _m.Called(t) + if len(ret) == 0 { + panic("no return value specified for SetDeadline") + } + var r0 error if rf, ok := ret.Get(0).(func(time.Time) error); ok { r0 = rf(t) @@ -100,6 +123,10 @@ func (_m *Conn) SetDeadline(t time.Time) error { func (_m *Conn) SetReadDeadline(t time.Time) error { ret := _m.Called(t) + if len(ret) == 0 { + panic("no return value specified for SetReadDeadline") + } + var r0 error if rf, ok := ret.Get(0).(func(time.Time) error); ok { r0 = rf(t) @@ -114,6 +141,10 @@ func (_m *Conn) SetReadDeadline(t time.Time) error { func (_m *Conn) SetWriteDeadline(t time.Time) error { ret := _m.Called(t) + if len(ret) == 0 { + panic("no return value specified for SetWriteDeadline") + } + var r0 error if rf, ok := ret.Get(0).(func(time.Time) error); ok { r0 = rf(t) @@ -128,14 +159,21 @@ func (_m *Conn) SetWriteDeadline(t time.Time) error { func (_m *Conn) Write(b []byte) (int, error) { ret := _m.Called(b) + if len(ret) == 0 { + panic("no return value specified for Write") + } + var r0 int + var r1 error + if rf, ok := ret.Get(0).(func([]byte) (int, error)); ok { + return rf(b) + } if rf, ok := ret.Get(0).(func([]byte) int); ok { r0 = rf(b) } else { r0 = ret.Get(0).(int) } - var r1 error if rf, ok := ret.Get(1).(func([]byte) error); ok { r1 = rf(b) } else { @@ -144,3 +182,17 @@ func (_m *Conn) Write(b []byte) (int, error) { return r0, r1 } + +// NewConn creates a new instance of Conn. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewConn(t interface { + mock.TestingT + Cleanup(func()) +}) *Conn { + mock := &Conn{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/plugins/inputs/intel_baseband/mock/conn.go b/plugins/inputs/intel_baseband/mocks/conn.go similarity index 67% rename from plugins/inputs/intel_baseband/mock/conn.go rename to plugins/inputs/intel_baseband/mocks/conn.go index 58961039dce86..4f8d231e2756c 100644 --- a/plugins/inputs/intel_baseband/mock/conn.go +++ b/plugins/inputs/intel_baseband/mocks/conn.go @@ -1,4 +1,4 @@ -// Code generated by mockery v0.0.0-dev. DO NOT EDIT. +// Code generated by mockery v2.46.3. DO NOT EDIT. package mocks @@ -19,6 +19,10 @@ type Conn struct { func (_m *Conn) Close() error { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for Close") + } + var r0 error if rf, ok := ret.Get(0).(func() error); ok { r0 = rf() @@ -33,6 +37,10 @@ func (_m *Conn) Close() error { func (_m *Conn) LocalAddr() net.Addr { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for LocalAddr") + } + var r0 net.Addr if rf, ok := ret.Get(0).(func() net.Addr); ok { r0 = rf() @@ -49,14 +57,21 @@ func (_m *Conn) LocalAddr() net.Addr { func (_m *Conn) Read(b []byte) (int, error) { ret := _m.Called(b) + if len(ret) == 0 { + panic("no return value specified for Read") + } + var r0 int + var r1 error + if rf, ok := ret.Get(0).(func([]byte) (int, error)); ok { + return rf(b) + } if rf, ok := ret.Get(0).(func([]byte) int); ok { r0 = rf(b) } else { r0 = ret.Get(0).(int) } - var r1 error if rf, ok := ret.Get(1).(func([]byte) error); ok { r1 = rf(b) } else { @@ -70,6 +85,10 @@ func (_m *Conn) Read(b []byte) (int, error) { func (_m *Conn) RemoteAddr() net.Addr { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for RemoteAddr") + } + var r0 net.Addr if rf, ok := ret.Get(0).(func() net.Addr); ok { r0 = rf() @@ -86,6 +105,10 @@ func (_m *Conn) RemoteAddr() net.Addr { func (_m *Conn) SetDeadline(t time.Time) error { ret := _m.Called(t) + if len(ret) == 0 { + panic("no return value specified for SetDeadline") + } + var r0 error if rf, ok := ret.Get(0).(func(time.Time) error); ok { r0 = rf(t) @@ -100,6 +123,10 @@ func (_m *Conn) SetDeadline(t time.Time) error { func (_m *Conn) SetReadDeadline(t time.Time) error { ret := _m.Called(t) + if len(ret) == 0 { + panic("no return value specified for SetReadDeadline") + } + var r0 error if rf, ok := ret.Get(0).(func(time.Time) error); ok { r0 = rf(t) @@ -114,6 +141,10 @@ func (_m *Conn) SetReadDeadline(t time.Time) error { func (_m *Conn) SetWriteDeadline(t time.Time) error { ret := _m.Called(t) + if len(ret) == 0 { + panic("no return value specified for SetWriteDeadline") + } + var r0 error if rf, ok := ret.Get(0).(func(time.Time) error); ok { r0 = rf(t) @@ -128,14 +159,21 @@ func (_m *Conn) SetWriteDeadline(t time.Time) error { func (_m *Conn) Write(b []byte) (int, error) { ret := _m.Called(b) + if len(ret) == 0 { + panic("no return value specified for Write") + } + var r0 int + var r1 error + if rf, ok := ret.Get(0).(func([]byte) (int, error)); ok { + return rf(b) + } if rf, ok := ret.Get(0).(func([]byte) int); ok { r0 = rf(b) } else { r0 = ret.Get(0).(int) } - var r1 error if rf, ok := ret.Get(1).(func([]byte) error); ok { r1 = rf(b) } else { @@ -144,3 +182,17 @@ func (_m *Conn) Write(b []byte) (int, error) { return r0, r1 } + +// NewConn creates a new instance of Conn. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewConn(t interface { + mock.TestingT + Cleanup(func()) +}) *Conn { + mock := &Conn{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/plugins/inputs/intel_baseband/sock_connector_test.go b/plugins/inputs/intel_baseband/sock_connector_test.go index 98fe93f758e98..0b0ab3540905f 100644 --- a/plugins/inputs/intel_baseband/sock_connector_test.go +++ b/plugins/inputs/intel_baseband/sock_connector_test.go @@ -10,7 +10,7 @@ import ( "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" - mocks "github.com/influxdata/telegraf/plugins/inputs/intel_baseband/mock" + "github.com/influxdata/telegraf/plugins/inputs/intel_baseband/mocks" ) func TestWriteCommandToSocket(t *testing.T) { diff --git a/plugins/inputs/intel_dlb/mock_ras_reader.go b/plugins/inputs/intel_dlb/ras_reader_mock.go similarity index 74% rename from plugins/inputs/intel_dlb/mock_ras_reader.go rename to plugins/inputs/intel_dlb/ras_reader_mock.go index 4d36e54f25990..ec92cac20f046 100644 --- a/plugins/inputs/intel_dlb/mock_ras_reader.go +++ b/plugins/inputs/intel_dlb/ras_reader_mock.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.14.0. DO NOT EDIT. +// Code generated by mockery v2.46.3. DO NOT EDIT. package intel_dlb @@ -13,7 +13,15 @@ type mockRasReader struct { func (_m *mockRasReader) gatherPaths(path string) ([]string, error) { ret := _m.Called(path) + if len(ret) == 0 { + panic("no return value specified for gatherPaths") + } + var r0 []string + var r1 error + if rf, ok := ret.Get(0).(func(string) ([]string, error)); ok { + return rf(path) + } if rf, ok := ret.Get(0).(func(string) []string); ok { r0 = rf(path) } else { @@ -22,7 +30,6 @@ func (_m *mockRasReader) gatherPaths(path string) ([]string, error) { } } - var r1 error if rf, ok := ret.Get(1).(func(string) error); ok { r1 = rf(path) } else { @@ -36,7 +43,15 @@ func (_m *mockRasReader) gatherPaths(path string) ([]string, error) { func (_m *mockRasReader) readFromFile(filePath string) ([]byte, error) { ret := _m.Called(filePath) + if len(ret) == 0 { + panic("no return value specified for readFromFile") + } + var r0 []byte + var r1 error + if rf, ok := ret.Get(0).(func(string) ([]byte, error)); ok { + return rf(filePath) + } if rf, ok := ret.Get(0).(func(string) []byte); ok { r0 = rf(filePath) } else { @@ -45,7 +60,6 @@ func (_m *mockRasReader) readFromFile(filePath string) ([]byte, error) { } } - var r1 error if rf, ok := ret.Get(1).(func(string) error); ok { r1 = rf(filePath) } else { @@ -55,13 +69,12 @@ func (_m *mockRasReader) readFromFile(filePath string) ([]byte, error) { return r0, r1 } -type mockConstructorTestingTnewMockRasReader interface { +// newMockRasReader creates a new instance of mockRasReader. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func newMockRasReader(t interface { mock.TestingT Cleanup(func()) -} - -// newMockRasReader creates a new instance of mockRasReader. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -func newMockRasReader(t mockConstructorTestingTnewMockRasReader) *mockRasReader { +}) *mockRasReader { mock := &mockRasReader{} mock.Mock.Test(t) diff --git a/plugins/inputs/lanz/lanz.go b/plugins/inputs/lanz/lanz.go index c87c180469d53..bb2040d317cc9 100644 --- a/plugins/inputs/lanz/lanz.go +++ b/plugins/inputs/lanz/lanz.go @@ -18,30 +18,16 @@ import ( //go:embed sample.conf var sampleConfig string -func init() { - inputs.Add("lanz", func() telegraf.Input { - return NewLanz() - }) -} - type Lanz struct { Servers []string `toml:"servers"` clients []lanz.Client wg sync.WaitGroup } -func NewLanz() *Lanz { - return &Lanz{} -} - func (*Lanz) SampleConfig() string { return sampleConfig } -func (l *Lanz) Gather(_ telegraf.Accumulator) error { - return nil -} - func (l *Lanz) Start(acc telegraf.Accumulator) error { if len(l.Servers) == 0 { l.Servers = append(l.Servers, "tcp://127.0.0.1:50001") @@ -72,6 +58,10 @@ func (l *Lanz) Start(acc telegraf.Accumulator) error { return nil } +func (l *Lanz) Gather(_ telegraf.Accumulator) error { + return nil +} + func (l *Lanz) Stop() { for _, client := range l.clients { client.Stop() @@ -130,3 +120,9 @@ func msgToAccumulator(acc telegraf.Accumulator, msg *pb.LanzRecord, deviceURL *u acc.AddFields("lanz_global_buffer_usage_record", vals, tags) } } + +func init() { + inputs.Add("lanz", func() telegraf.Input { + return &Lanz{} + }) +} diff --git a/plugins/inputs/lanz/lanz_test.go b/plugins/inputs/lanz/lanz_test.go index b65eae5e02989..07fc0e21bb4f9 100644 --- a/plugins/inputs/lanz/lanz_test.go +++ b/plugins/inputs/lanz/lanz_test.go @@ -52,14 +52,11 @@ var testProtoBufGlobalBufferUsageRecord = &pb.LanzRecord{ } func TestLanzGeneratesMetrics(t *testing.T) { - var acc testutil.Accumulator - - l := NewLanz() - - l.Servers = append(l.Servers, + l := &Lanz{Servers: []string{ "tcp://switch01.int.example.com:50001", "tcp://switch02.int.example.com:50001", - ) + }} + deviceURL1, err := url.Parse(l.Servers[0]) if err != nil { t.Fail() @@ -69,6 +66,7 @@ func TestLanzGeneratesMetrics(t *testing.T) { t.Fail() } + var acc testutil.Accumulator msgToAccumulator(&acc, testProtoBufCongestionRecord1, deviceURL1) acc.Wait(1) diff --git a/plugins/inputs/leofs/leofs.go b/plugins/inputs/leofs/leofs.go index 116ef2f30b396..3845384a19179 100644 --- a/plugins/inputs/leofs/leofs.go +++ b/plugins/inputs/leofs/leofs.go @@ -20,26 +20,27 @@ import ( //go:embed sample.conf var sampleConfig string -const oid = ".1.3.6.1.4.1.35450" - -// For Manager Master -const defaultEndpoint = "127.0.0.1:4020" +const ( + oid = ".1.3.6.1.4.1.35450" + // For Manager Master + defaultEndpoint = "127.0.0.1:4020" +) -type ServerType int +type serverType int const ( - ServerTypeManagerMaster ServerType = iota - ServerTypeManagerSlave - ServerTypeStorage - ServerTypeGateway + serverTypeManagerMaster serverType = iota + serverTypeManagerSlave + serverTypeStorage + serverTypeGateway ) type LeoFS struct { - Servers []string + Servers []string `toml:"servers"` } -var KeyMapping = map[ServerType][]string{ - ServerTypeManagerMaster: { +var keyMapping = map[serverType][]string{ + serverTypeManagerMaster: { "num_of_processes", "total_memory_usage", "system_memory_usage", @@ -55,7 +56,7 @@ var KeyMapping = map[ServerType][]string{ "used_allocated_memory_5min", "allocated_memory_5min", }, - ServerTypeManagerSlave: { + serverTypeManagerSlave: { "num_of_processes", "total_memory_usage", "system_memory_usage", @@ -71,7 +72,7 @@ var KeyMapping = map[ServerType][]string{ "used_allocated_memory_5min", "allocated_memory_5min", }, - ServerTypeStorage: { + serverTypeStorage: { "num_of_processes", "total_memory_usage", "system_memory_usage", @@ -113,7 +114,7 @@ var KeyMapping = map[ServerType][]string{ "comp_num_of_ongoing_targets", "comp_num_of_out_of_targets", }, - ServerTypeGateway: { + serverTypeGateway: { "num_of_processes", "total_memory_usage", "system_memory_usage", @@ -141,15 +142,15 @@ var KeyMapping = map[ServerType][]string{ }, } -var serverTypeMapping = map[string]ServerType{ - "4020": ServerTypeManagerMaster, - "4021": ServerTypeManagerSlave, - "4010": ServerTypeStorage, - "4011": ServerTypeStorage, - "4012": ServerTypeStorage, - "4013": ServerTypeStorage, - "4000": ServerTypeGateway, - "4001": ServerTypeGateway, +var serverTypeMapping = map[string]serverType{ + "4020": serverTypeManagerMaster, + "4021": serverTypeManagerSlave, + "4010": serverTypeStorage, + "4011": serverTypeStorage, + "4012": serverTypeStorage, + "4013": serverTypeStorage, + "4000": serverTypeGateway, + "4001": serverTypeGateway, } func (*LeoFS) SampleConfig() string { @@ -158,7 +159,7 @@ func (*LeoFS) SampleConfig() string { func (l *LeoFS) Gather(acc telegraf.Accumulator) error { if len(l.Servers) == 0 { - return l.gatherServer(defaultEndpoint, ServerTypeManagerMaster, acc) + return l.gatherServer(defaultEndpoint, serverTypeManagerMaster, acc) } var wg sync.WaitGroup for _, endpoint := range l.Servers { @@ -179,10 +180,10 @@ func (l *LeoFS) Gather(acc telegraf.Accumulator) error { st, ok := serverTypeMapping[port] if !ok { - st = ServerTypeStorage + st = serverTypeStorage } wg.Add(1) - go func(endpoint string, st ServerType) { + go func(endpoint string, st serverType) { defer wg.Done() acc.AddError(l.gatherServer(endpoint, st, acc)) }(endpoint, st) @@ -191,11 +192,7 @@ func (l *LeoFS) Gather(acc telegraf.Accumulator) error { return nil } -func (l *LeoFS) gatherServer( - endpoint string, - serverType ServerType, - acc telegraf.Accumulator, -) error { +func (l *LeoFS) gatherServer(endpoint string, serverType serverType, acc telegraf.Accumulator) error { cmd := exec.Command("snmpwalk", "-v2c", "-cpublic", "-On", endpoint, oid) stdout, err := cmd.StdoutPipe() if err != nil { @@ -221,7 +218,7 @@ func (l *LeoFS) gatherServer( fields := make(map[string]interface{}) for scanner.Scan() { - key := KeyMapping[serverType][i] + key := keyMapping[serverType][i] val, err := retrieveTokenAfterColon(scanner.Text()) if err != nil { return err diff --git a/plugins/inputs/leofs/leofs_test.go b/plugins/inputs/leofs/leofs_test.go index 9cf19e23efd41..fb261a636daf5 100644 --- a/plugins/inputs/leofs/leofs_test.go +++ b/plugins/inputs/leofs/leofs_test.go @@ -6,8 +6,9 @@ import ( "runtime" "testing" - "github.com/influxdata/telegraf/testutil" "github.com/stretchr/testify/require" + + "github.com/influxdata/telegraf/testutil" ) var fakeSNMP4Manager = ` @@ -123,7 +124,7 @@ func main() { } ` -func testMain(t *testing.T, code, endpoint string, serverType ServerType) { +func testMain(t *testing.T, code, endpoint string, serverType serverType) { executable := "snmpwalk" if runtime.GOOS == "windows" { executable = "snmpwalk.exe" @@ -153,7 +154,7 @@ func testMain(t *testing.T, code, endpoint string, serverType ServerType) { err = acc.GatherError(l.Gather) require.NoError(t, err) - floatMetrics := KeyMapping[serverType] + floatMetrics := keyMapping[serverType] for _, metric := range floatMetrics { require.True(t, acc.HasFloatField("leofs", metric), metric) @@ -165,7 +166,7 @@ func TestLeoFSManagerMasterMetricsIntegration(t *testing.T) { t.Skip("Skipping integration test in short mode") } - testMain(t, fakeSNMP4Manager, "localhost:4020", ServerTypeManagerMaster) + testMain(t, fakeSNMP4Manager, "localhost:4020", serverTypeManagerMaster) } func TestLeoFSManagerSlaveMetricsIntegration(t *testing.T) { @@ -173,7 +174,7 @@ func TestLeoFSManagerSlaveMetricsIntegration(t *testing.T) { t.Skip("Skipping integration test in short mode") } - testMain(t, fakeSNMP4Manager, "localhost:4021", ServerTypeManagerSlave) + testMain(t, fakeSNMP4Manager, "localhost:4021", serverTypeManagerSlave) } func TestLeoFSStorageMetricsIntegration(t *testing.T) { @@ -181,7 +182,7 @@ func TestLeoFSStorageMetricsIntegration(t *testing.T) { t.Skip("Skipping integration test in short mode") } - testMain(t, fakeSNMP4Storage, "localhost:4010", ServerTypeStorage) + testMain(t, fakeSNMP4Storage, "localhost:4010", serverTypeStorage) } func TestLeoFSGatewayMetricsIntegration(t *testing.T) { @@ -189,5 +190,5 @@ func TestLeoFSGatewayMetricsIntegration(t *testing.T) { t.Skip("Skipping integration test in short mode") } - testMain(t, fakeSNMP4Gateway, "localhost:4000", ServerTypeGateway) + testMain(t, fakeSNMP4Gateway, "localhost:4000", serverTypeGateway) } diff --git a/plugins/inputs/libvirt/libvirt.go b/plugins/inputs/libvirt/libvirt.go index e5a4b663b43bd..4a32eaf761773 100644 --- a/plugins/inputs/libvirt/libvirt.go +++ b/plugins/inputs/libvirt/libvirt.go @@ -90,6 +90,40 @@ func (l *Libvirt) Init() error { return nil } +func (l *Libvirt) Gather(acc telegraf.Accumulator) error { + var err error + if err := l.utils.ensureConnected(l.LibvirtURI); err != nil { + return err + } + + // Get all available domains + gatheredDomains, err := l.utils.gatherAllDomains() + if handledErr := handleError(err, "error occurred while gathering all domains", l.utils); handledErr != nil { + return handledErr + } else if len(gatheredDomains) == 0 { + l.Log.Debug("Couldn't find any domains on system") + return nil + } + + // Exclude domain. + domains := l.filterDomains(gatheredDomains) + if len(domains) == 0 { + l.Log.Debug("Configured domains are not available on system") + return nil + } + + var vcpuInfos map[string][]vcpuAffinity + if l.vcpuMappingEnabled { + vcpuInfos, err = l.getVcpuMapping(domains) + if handledErr := handleError(err, "error occurred while gathering vcpu mapping", l.utils); handledErr != nil { + return handledErr + } + } + + err = l.gatherMetrics(domains, vcpuInfos, acc) + return handleError(err, "error occurred while gathering metrics", l.utils) +} + func (l *Libvirt) validateLibvirtURI() error { uri := libvirtutils.LibvirtUri{} err := uri.Unmarshal(l.LibvirtURI) @@ -150,43 +184,9 @@ func (l *Libvirt) isThereAnythingToGather() bool { return l.metricNumber > 0 || len(l.AdditionalStatistics) > 0 } -func (l *Libvirt) Gather(acc telegraf.Accumulator) error { - var err error - if err := l.utils.EnsureConnected(l.LibvirtURI); err != nil { - return err - } - - // Get all available domains - gatheredDomains, err := l.utils.GatherAllDomains() - if handledErr := handleError(err, "error occurred while gathering all domains", l.utils); handledErr != nil { - return handledErr - } else if len(gatheredDomains) == 0 { - l.Log.Debug("Couldn't find any domains on system") - return nil - } - - // Exclude domain. - domains := l.filterDomains(gatheredDomains) - if len(domains) == 0 { - l.Log.Debug("Configured domains are not available on system") - return nil - } - - var vcpuInfos map[string][]vcpuAffinity - if l.vcpuMappingEnabled { - vcpuInfos, err = l.getVcpuMapping(domains) - if handledErr := handleError(err, "error occurred while gathering vcpu mapping", l.utils); handledErr != nil { - return handledErr - } - } - - err = l.gatherMetrics(domains, vcpuInfos, acc) - return handleError(err, "error occurred while gathering metrics", l.utils) -} - func handleError(err error, errMessage string, utils utils) error { if err != nil { - if chanErr := utils.Disconnect(); chanErr != nil { + if chanErr := utils.disconnect(); chanErr != nil { return fmt.Errorf("%s: %w; error occurred when disconnecting: %w", errMessage, err, chanErr) } return fmt.Errorf("%s: %w", errMessage, err) @@ -210,7 +210,7 @@ func (l *Libvirt) filterDomains(availableDomains []golibvirt.Domain) []golibvirt } func (l *Libvirt) gatherMetrics(domains []golibvirt.Domain, vcpuInfos map[string][]vcpuAffinity, acc telegraf.Accumulator) error { - stats, err := l.utils.GatherStatsForDomains(domains, l.metricNumber) + stats, err := l.utils.gatherStatsForDomains(domains, l.metricNumber) if err != nil { return err } @@ -220,7 +220,7 @@ func (l *Libvirt) gatherMetrics(domains []golibvirt.Domain, vcpuInfos map[string } func (l *Libvirt) getVcpuMapping(domains []golibvirt.Domain) (map[string][]vcpuAffinity, error) { - pCPUs, err := l.utils.GatherNumberOfPCPUs() + pCPUs, err := l.utils.gatherNumberOfPCPUs() if err != nil { return nil, err } @@ -231,9 +231,9 @@ func (l *Libvirt) getVcpuMapping(domains []golibvirt.Domain) (map[string][]vcpuA for i := range domains { domain := domains[i] - // Executing GatherVcpuMapping can take some time, it is worth to call it in parallel + // Executing gatherVcpuMapping can take some time, it is worth to call it in parallel group.Go(func() error { - vcpuInfo, err := l.utils.GatherVcpuMapping(domain, pCPUs, l.shouldGetCurrentPCPU()) + vcpuInfo, err := l.utils.gatherVcpuMapping(domain, pCPUs, l.shouldGetCurrentPCPU()) if err != nil { return err } diff --git a/plugins/inputs/libvirt/libvirt_test.go b/plugins/inputs/libvirt/libvirt_test.go index 6b0b3f5deb108..d9b5cc30a1f38 100644 --- a/plugins/inputs/libvirt/libvirt_test.go +++ b/plugins/inputs/libvirt/libvirt_test.go @@ -35,10 +35,10 @@ func TestLibvirt_Init(t *testing.T) { }) t.Run("throw error when user provided invalid uri", func(t *testing.T) { - mockLibvirtUtils := MockLibvirtUtils{} + mockUtils := mockLibvirtUtils{} l := Libvirt{ LibvirtURI: "this/is/wrong/uri", - utils: &mockLibvirtUtils, + utils: &mockUtils, Log: testutil.Logger{}, } err := l.Init() @@ -47,10 +47,10 @@ func TestLibvirt_Init(t *testing.T) { }) t.Run("successfully initialize libvirt on correct user input", func(t *testing.T) { - mockLibvirtUtils := MockLibvirtUtils{} + mockUtils := mockLibvirtUtils{} l := Libvirt{ StatisticsGroups: []string{"state", "cpu_total", "vcpu", "interface"}, - utils: &mockLibvirtUtils, + utils: &mockUtils, LibvirtURI: defaultLibvirtURI, Log: testutil.Logger{}, } @@ -62,66 +62,66 @@ func TestLibvirt_Init(t *testing.T) { func TestLibvirt_Gather(t *testing.T) { t.Run("wrong uri throws error", func(t *testing.T) { var acc testutil.Accumulator - mockLibvirtUtils := MockLibvirtUtils{} + mockUtils := mockLibvirtUtils{} l := Libvirt{ LibvirtURI: "this/is/wrong/uri", Log: testutil.Logger{}, - utils: &mockLibvirtUtils, + utils: &mockUtils, } - mockLibvirtUtils.On("EnsureConnected", mock.Anything).Return(errors.New("failed to connect")).Once() + mockUtils.On("ensureConnected", mock.Anything).Return(errors.New("failed to connect")).Once() err := l.Gather(&acc) require.Error(t, err) require.Contains(t, err.Error(), "failed to connect") - mockLibvirtUtils.AssertExpectations(t) + mockUtils.AssertExpectations(t) }) t.Run("error when read error happened in gathering domains", func(t *testing.T) { var acc testutil.Accumulator - mockLibvirtUtils := MockLibvirtUtils{} + mockUtils := mockLibvirtUtils{} l := Libvirt{ - utils: &mockLibvirtUtils, + utils: &mockUtils, Log: testutil.Logger{}, StatisticsGroups: []string{"state"}, } - mockLibvirtUtils.On("EnsureConnected", mock.Anything).Return(nil).Once(). - On("GatherAllDomains", mock.Anything).Return(nil, errors.New("gather domain error")).Once(). - On("Disconnect").Return(nil).Once() + mockUtils.On("ensureConnected", mock.Anything).Return(nil).Once(). + On("gatherAllDomains", mock.Anything).Return(nil, errors.New("gather domain error")).Once(). + On("disconnect").Return(nil).Once() err := l.Gather(&acc) require.Error(t, err) require.Contains(t, err.Error(), "gather domain error") - mockLibvirtUtils.AssertExpectations(t) + mockUtils.AssertExpectations(t) }) t.Run("no error when empty list of domains is returned", func(t *testing.T) { var acc testutil.Accumulator - mockLibvirtUtils := MockLibvirtUtils{} + mockUtils := mockLibvirtUtils{} l := Libvirt{ - utils: &mockLibvirtUtils, + utils: &mockUtils, Log: testutil.Logger{}, StatisticsGroups: []string{"state"}, } - mockLibvirtUtils.On("EnsureConnected", mock.Anything).Return(nil).Once(). - On("GatherAllDomains", mock.Anything).Return(nil, nil).Once() + mockUtils.On("ensureConnected", mock.Anything).Return(nil).Once(). + On("gatherAllDomains", mock.Anything).Return(nil, nil).Once() err := l.Gather(&acc) require.NoError(t, err) - mockLibvirtUtils.AssertExpectations(t) + mockUtils.AssertExpectations(t) }) t.Run("error when gathering metrics by number", func(t *testing.T) { var acc testutil.Accumulator - mockLibvirtUtils := MockLibvirtUtils{} + mockUtils := mockLibvirtUtils{} l := Libvirt{ - utils: &mockLibvirtUtils, + utils: &mockUtils, Log: testutil.Logger{}, StatisticsGroups: []string{"state"}, } - mockLibvirtUtils.On("EnsureConnected", mock.Anything).Return(nil).Once(). - On("GatherAllDomains", mock.Anything).Return(domains, nil).Once(). - On("GatherStatsForDomains", mock.Anything, mock.Anything). + mockUtils.On("ensureConnected", mock.Anything).Return(nil).Once(). + On("gatherAllDomains", mock.Anything).Return(domains, nil).Once(). + On("gatherStatsForDomains", mock.Anything, mock.Anything). Return(nil, errors.New("gathering metric by number error")).Once(). - On("Disconnect").Return(nil).Once() + On("disconnect").Return(nil).Once() err := l.Init() require.NoError(t, err) @@ -129,7 +129,7 @@ func TestLibvirt_Gather(t *testing.T) { err = l.Gather(&acc) require.Error(t, err) require.Contains(t, err.Error(), "gathering metric by number error") - mockLibvirtUtils.AssertExpectations(t) + mockUtils.AssertExpectations(t) }) var successfulTests = []struct { @@ -153,20 +153,20 @@ func TestLibvirt_Gather(t *testing.T) { for _, test := range successfulTests { t.Run(test.testName, func(t *testing.T) { var acc testutil.Accumulator - mockLibvirtUtils := MockLibvirtUtils{} + mockUtils := mockLibvirtUtils{} l := Libvirt{ - utils: &mockLibvirtUtils, + utils: &mockUtils, Log: testutil.Logger{}, StatisticsGroups: []string{"state"}, Domains: test.excludeDomains, AdditionalStatistics: []string{"vcpu_mapping"}, } - mockLibvirtUtils.On("EnsureConnected", mock.Anything).Return(nil).Once(). - On("GatherAllDomains", mock.Anything).Return(test.allDomains, nil).Once(). - On("GatherVcpuMapping", domains[0], mock.Anything, mock.Anything).Return(test.vcpuMapping, nil).Maybe(). - On("GatherVcpuMapping", domains[1], mock.Anything, mock.Anything).Return(test.vcpuMapping, nil).Once(). - On("GatherNumberOfPCPUs").Return(4, nil).Once(). - On("GatherStatsForDomains", mock.Anything, mock.Anything).Return(test.statsForDomains, nil).Once() + mockUtils.On("ensureConnected", mock.Anything).Return(nil).Once(). + On("gatherAllDomains", mock.Anything).Return(test.allDomains, nil).Once(). + On("gatherVcpuMapping", domains[0], mock.Anything, mock.Anything).Return(test.vcpuMapping, nil).Maybe(). + On("gatherVcpuMapping", domains[1], mock.Anything, mock.Anything).Return(test.vcpuMapping, nil).Once(). + On("gatherNumberOfPCPUs").Return(4, nil).Once(). + On("gatherStatsForDomains", mock.Anything, mock.Anything).Return(test.statsForDomains, nil).Once() err := l.Init() require.NoError(t, err) @@ -177,7 +177,7 @@ func TestLibvirt_Gather(t *testing.T) { actual := acc.GetTelegrafMetrics() expected := test.expectedMetrics testutil.RequireMetricsEqual(t, expected, actual, testutil.SortMetrics(), testutil.IgnoreTime()) - mockLibvirtUtils.AssertExpectations(t) + mockUtils.AssertExpectations(t) }) } } @@ -205,23 +205,23 @@ func TestLibvirt_GatherMetrics(t *testing.T) { for _, test := range successfulTests { t.Run(test.testName, func(t *testing.T) { var acc testutil.Accumulator - mockLibvirtUtils := MockLibvirtUtils{} + mockUtils := mockLibvirtUtils{} l := Libvirt{ - utils: &mockLibvirtUtils, + utils: &mockUtils, Log: testutil.Logger{}, Domains: test.excludeDomains, } - mockLibvirtUtils.On("EnsureConnected", mock.Anything).Return(nil).Once(). - On("GatherAllDomains", mock.Anything).Return(test.allDomains, nil).Once(). - On("GatherStatsForDomains", mock.Anything, mock.Anything).Return(test.statsForDomains, nil).Once() + mockUtils.On("ensureConnected", mock.Anything).Return(nil).Once(). + On("gatherAllDomains", mock.Anything).Return(test.allDomains, nil).Once(). + On("gatherStatsForDomains", mock.Anything, mock.Anything).Return(test.statsForDomains, nil).Once() if test.vcpuMapping != nil { l.vcpuMappingEnabled = true l.metricNumber = domainStatsVCPU - mockLibvirtUtils.On("GatherNumberOfPCPUs").Return(4, nil).Once(). - On("GatherVcpuMapping", domains[0], mock.Anything, mock.Anything).Return(test.vcpuMapping, nil).Once(). - On("GatherVcpuMapping", domains[1], mock.Anything, mock.Anything).Return(nil, nil).Once() + mockUtils.On("gatherNumberOfPCPUs").Return(4, nil).Once(). + On("gatherVcpuMapping", domains[0], mock.Anything, mock.Anything).Return(test.vcpuMapping, nil).Once(). + On("gatherVcpuMapping", domains[1], mock.Anything, mock.Anything).Return(nil, nil).Once() } err := l.Gather(&acc) @@ -230,7 +230,7 @@ func TestLibvirt_GatherMetrics(t *testing.T) { actual := acc.GetTelegrafMetrics() expected := test.expectedMetrics testutil.RequireMetricsEqual(t, expected, actual, testutil.SortMetrics(), testutil.IgnoreTime()) - mockLibvirtUtils.AssertExpectations(t) + mockUtils.AssertExpectations(t) }) } } diff --git a/plugins/inputs/libvirt/libvirt_utils.go b/plugins/inputs/libvirt/libvirt_utils.go index 4c16b905c679c..c807583387771 100644 --- a/plugins/inputs/libvirt/libvirt_utils.go +++ b/plugins/inputs/libvirt/libvirt_utils.go @@ -9,12 +9,12 @@ import ( ) type utils interface { - GatherAllDomains() (domains []golibvirt.Domain, err error) - GatherStatsForDomains(domains []golibvirt.Domain, metricNumber uint32) ([]golibvirt.DomainStatsRecord, error) - GatherNumberOfPCPUs() (int, error) - GatherVcpuMapping(domain golibvirt.Domain, pCPUs int, shouldGetCurrentPCPU bool) ([]vcpuAffinity, error) - EnsureConnected(libvirtURI string) error - Disconnect() error + gatherAllDomains() (domains []golibvirt.Domain, err error) + gatherStatsForDomains(domains []golibvirt.Domain, metricNumber uint32) ([]golibvirt.DomainStatsRecord, error) + gatherNumberOfPCPUs() (int, error) + gatherVcpuMapping(domain golibvirt.Domain, pCPUs int, shouldGetCurrentPCPU bool) ([]vcpuAffinity, error) + ensureConnected(libvirtURI string) error + disconnect() error } type utilsImpl struct { @@ -27,8 +27,8 @@ type vcpuAffinity struct { currentPCPUID int32 } -// GatherAllDomains gathers all domains on system -func (l *utilsImpl) GatherAllDomains() (domains []golibvirt.Domain, err error) { +// gatherAllDomains gathers all domains on system +func (l *utilsImpl) gatherAllDomains() (domains []golibvirt.Domain, err error) { allDomainStatesFlag := golibvirt.ConnectListDomainsRunning + golibvirt.ConnectListDomainsPaused + golibvirt.ConnectListDomainsShutoff + golibvirt.ConnectListDomainsOther @@ -36,8 +36,8 @@ func (l *utilsImpl) GatherAllDomains() (domains []golibvirt.Domain, err error) { return domains, err } -// GatherStatsForDomains gathers stats for given domains based on number that was previously calculated -func (l *utilsImpl) GatherStatsForDomains(domains []golibvirt.Domain, metricNumber uint32) ([]golibvirt.DomainStatsRecord, error) { +// gatherStatsForDomains gathers stats for given domains based on number that was previously calculated +func (l *utilsImpl) gatherStatsForDomains(domains []golibvirt.Domain, metricNumber uint32) ([]golibvirt.DomainStatsRecord, error) { if metricNumber == 0 { // do not need to do expensive call if no stats were set to gather return nil, nil @@ -49,7 +49,7 @@ func (l *utilsImpl) GatherStatsForDomains(domains []golibvirt.Domain, metricNumb return l.libvirt.ConnectGetAllDomainStats(domains, metricNumber, uint32(allDomainStatesFlag)) } -func (l *utilsImpl) GatherNumberOfPCPUs() (int, error) { +func (l *utilsImpl) gatherNumberOfPCPUs() (int, error) { //nolint:dogsled //Using only needed values from library function _, _, _, _, nodes, sockets, cores, threads, err := l.libvirt.NodeGetInfo() if err != nil { @@ -59,10 +59,10 @@ func (l *utilsImpl) GatherNumberOfPCPUs() (int, error) { return int(nodes * sockets * cores * threads), nil } -// GatherVcpuMapping is based on official go-libvirt library: +// gatherVcpuMapping is based on official go-libvirt library: // https://github.com/libvirt/libvirt-go-module/blob/268a5d02e00cc9b3d5d7fa6c08d753071e7d14b8/domain.go#L4516 // (this library cannot be used here because of C bindings) -func (l *utilsImpl) GatherVcpuMapping(domain golibvirt.Domain, pCPUs int, shouldGetCurrentPCPU bool) ([]vcpuAffinity, error) { +func (l *utilsImpl) gatherVcpuMapping(domain golibvirt.Domain, pCPUs int, shouldGetCurrentPCPU bool) ([]vcpuAffinity, error) { //nolint:dogsled //Using only needed values from library function _, _, _, vCPUs, _, err := l.libvirt.DomainGetInfo(domain) if err != nil { @@ -114,7 +114,7 @@ func (l *utilsImpl) GatherVcpuMapping(domain golibvirt.Domain, pCPUs int, should return vcpuAffinities, nil } -func (l *utilsImpl) EnsureConnected(libvirtURI string) error { +func (l *utilsImpl) ensureConnected(libvirtURI string) error { if isConnected(l.libvirt) { return nil } @@ -127,7 +127,7 @@ func (l *utilsImpl) EnsureConnected(libvirtURI string) error { return nil } -func (l *utilsImpl) Disconnect() error { +func (l *utilsImpl) disconnect() error { l.libvirt = nil return nil } diff --git a/plugins/inputs/libvirt/libvirt_utils_mock.go b/plugins/inputs/libvirt/libvirt_utils_mock.go index 3ebdf322c1301..158485e024b7e 100644 --- a/plugins/inputs/libvirt/libvirt_utils_mock.go +++ b/plugins/inputs/libvirt/libvirt_utils_mock.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.14.0. DO NOT EDIT. +// Code generated by mockery v2.46.3. DO NOT EDIT. package libvirt @@ -7,15 +7,19 @@ import ( mock "github.com/stretchr/testify/mock" ) -// MockLibvirtUtils is an autogenerated mock type for the utils type -type MockLibvirtUtils struct { +// mockLibvirtUtils is an autogenerated mock type for the utils type +type mockLibvirtUtils struct { mock.Mock } -// Disconnect provides a mock function with given fields: -func (_m *MockLibvirtUtils) Disconnect() error { +// disconnect provides a mock function with given fields: +func (_m *mockLibvirtUtils) disconnect() error { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for disconnect") + } + var r0 error if rf, ok := ret.Get(0).(func() error); ok { r0 = rf() @@ -26,10 +30,14 @@ func (_m *MockLibvirtUtils) Disconnect() error { return r0 } -// EnsureConnected provides a mock function with given fields: libvirtURI -func (_m *MockLibvirtUtils) EnsureConnected(libvirtURI string) error { +// ensureConnected provides a mock function with given fields: libvirtURI +func (_m *mockLibvirtUtils) ensureConnected(libvirtURI string) error { ret := _m.Called(libvirtURI) + if len(ret) == 0 { + panic("no return value specified for ensureConnected") + } + var r0 error if rf, ok := ret.Get(0).(func(string) error); ok { r0 = rf(libvirtURI) @@ -40,11 +48,19 @@ func (_m *MockLibvirtUtils) EnsureConnected(libvirtURI string) error { return r0 } -// GatherAllDomains provides a mock function with given fields: -func (_m *MockLibvirtUtils) GatherAllDomains() ([]go_libvirt.Domain, error) { +// gatherAllDomains provides a mock function with given fields: +func (_m *mockLibvirtUtils) gatherAllDomains() ([]go_libvirt.Domain, error) { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for gatherAllDomains") + } + var r0 []go_libvirt.Domain + var r1 error + if rf, ok := ret.Get(0).(func() ([]go_libvirt.Domain, error)); ok { + return rf() + } if rf, ok := ret.Get(0).(func() []go_libvirt.Domain); ok { r0 = rf() } else { @@ -53,7 +69,6 @@ func (_m *MockLibvirtUtils) GatherAllDomains() ([]go_libvirt.Domain, error) { } } - var r1 error if rf, ok := ret.Get(1).(func() error); ok { r1 = rf() } else { @@ -63,18 +78,25 @@ func (_m *MockLibvirtUtils) GatherAllDomains() ([]go_libvirt.Domain, error) { return r0, r1 } -// GatherNumberOfPCPUs provides a mock function with given fields: -func (_m *MockLibvirtUtils) GatherNumberOfPCPUs() (int, error) { +// gatherNumberOfPCPUs provides a mock function with given fields: +func (_m *mockLibvirtUtils) gatherNumberOfPCPUs() (int, error) { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for gatherNumberOfPCPUs") + } + var r0 int + var r1 error + if rf, ok := ret.Get(0).(func() (int, error)); ok { + return rf() + } if rf, ok := ret.Get(0).(func() int); ok { r0 = rf() } else { r0 = ret.Get(0).(int) } - var r1 error if rf, ok := ret.Get(1).(func() error); ok { r1 = rf() } else { @@ -84,11 +106,19 @@ func (_m *MockLibvirtUtils) GatherNumberOfPCPUs() (int, error) { return r0, r1 } -// GatherStatsForDomains provides a mock function with given fields: domains, metricNumber -func (_m *MockLibvirtUtils) GatherStatsForDomains(domains []go_libvirt.Domain, metricNumber uint32) ([]go_libvirt.DomainStatsRecord, error) { +// gatherStatsForDomains provides a mock function with given fields: domains, metricNumber +func (_m *mockLibvirtUtils) gatherStatsForDomains(domains []go_libvirt.Domain, metricNumber uint32) ([]go_libvirt.DomainStatsRecord, error) { ret := _m.Called(domains, metricNumber) + if len(ret) == 0 { + panic("no return value specified for gatherStatsForDomains") + } + var r0 []go_libvirt.DomainStatsRecord + var r1 error + if rf, ok := ret.Get(0).(func([]go_libvirt.Domain, uint32) ([]go_libvirt.DomainStatsRecord, error)); ok { + return rf(domains, metricNumber) + } if rf, ok := ret.Get(0).(func([]go_libvirt.Domain, uint32) []go_libvirt.DomainStatsRecord); ok { r0 = rf(domains, metricNumber) } else { @@ -97,7 +127,6 @@ func (_m *MockLibvirtUtils) GatherStatsForDomains(domains []go_libvirt.Domain, m } } - var r1 error if rf, ok := ret.Get(1).(func([]go_libvirt.Domain, uint32) error); ok { r1 = rf(domains, metricNumber) } else { @@ -107,11 +136,19 @@ func (_m *MockLibvirtUtils) GatherStatsForDomains(domains []go_libvirt.Domain, m return r0, r1 } -// GatherVcpuMapping provides a mock function with given fields: domain, pCPUs, shouldGetCurrentPCPU -func (_m *MockLibvirtUtils) GatherVcpuMapping(domain go_libvirt.Domain, pCPUs int, shouldGetCurrentPCPU bool) ([]vcpuAffinity, error) { +// gatherVcpuMapping provides a mock function with given fields: domain, pCPUs, shouldGetCurrentPCPU +func (_m *mockLibvirtUtils) gatherVcpuMapping(domain go_libvirt.Domain, pCPUs int, shouldGetCurrentPCPU bool) ([]vcpuAffinity, error) { ret := _m.Called(domain, pCPUs, shouldGetCurrentPCPU) + if len(ret) == 0 { + panic("no return value specified for gatherVcpuMapping") + } + var r0 []vcpuAffinity + var r1 error + if rf, ok := ret.Get(0).(func(go_libvirt.Domain, int, bool) ([]vcpuAffinity, error)); ok { + return rf(domain, pCPUs, shouldGetCurrentPCPU) + } if rf, ok := ret.Get(0).(func(go_libvirt.Domain, int, bool) []vcpuAffinity); ok { r0 = rf(domain, pCPUs, shouldGetCurrentPCPU) } else { @@ -120,7 +157,6 @@ func (_m *MockLibvirtUtils) GatherVcpuMapping(domain go_libvirt.Domain, pCPUs in } } - var r1 error if rf, ok := ret.Get(1).(func(go_libvirt.Domain, int, bool) error); ok { r1 = rf(domain, pCPUs, shouldGetCurrentPCPU) } else { @@ -130,14 +166,13 @@ func (_m *MockLibvirtUtils) GatherVcpuMapping(domain go_libvirt.Domain, pCPUs in return r0, r1 } -type mockConstructorTestingTNewMockLibvirtUtils interface { +// newMockLibvirtUtils creates a new instance of mockLibvirtUtils. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func newMockLibvirtUtils(t interface { mock.TestingT Cleanup(func()) -} - -// NewMockLibvirtUtils creates a new instance of MockLibvirtUtils. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -func NewMockLibvirtUtils(t mockConstructorTestingTNewMockLibvirtUtils) *MockLibvirtUtils { - mock := &MockLibvirtUtils{} +}) *mockLibvirtUtils { + mock := &mockLibvirtUtils{} mock.Mock.Test(t) t.Cleanup(func() { mock.AssertExpectations(t) }) diff --git a/plugins/inputs/linux_cpu/linux_cpu.go b/plugins/inputs/linux_cpu/linux_cpu.go index df2870545d867..e7839d4e172c7 100644 --- a/plugins/inputs/linux_cpu/linux_cpu.go +++ b/plugins/inputs/linux_cpu/linux_cpu.go @@ -19,6 +19,9 @@ import ( "github.com/influxdata/telegraf/plugins/inputs" ) +//go:embed sample.conf +var sampleConfig string + const ( defaultHostSys = "/sys" cpufreq = "cpufreq" @@ -26,9 +29,9 @@ const ( ) type LinuxCPU struct { - Log telegraf.Logger `toml:"-"` PathSysfs string `toml:"host_sys"` Metrics []string `toml:"metrics"` + Log telegraf.Logger `toml:"-"` cpus []cpu } @@ -44,9 +47,6 @@ type prop struct { optional bool } -//go:embed sample.conf -var sampleConfig string - func (g *LinuxCPU) SampleConfig() string { return sampleConfig } @@ -172,14 +172,6 @@ func (g *LinuxCPU) discoverCpus() ([]cpu, error) { return cpus, nil } -func init() { - inputs.Add("linux_cpu", func() telegraf.Input { - return &LinuxCPU{ - Metrics: []string{"cpufreq"}, - } - }) -} - func validatePath(propPath string) error { f, err := os.Open(propPath) if os.IsNotExist(err) { @@ -211,3 +203,11 @@ func readUintFromFile(propPath string) (uint64, error) { return strconv.ParseUint(string(buffer[:n-1]), 10, 64) } + +func init() { + inputs.Add("linux_cpu", func() telegraf.Input { + return &LinuxCPU{ + Metrics: []string{"cpufreq"}, + } + }) +} diff --git a/plugins/inputs/linux_cpu/linux_cpu_nonlinux.go b/plugins/inputs/linux_cpu/linux_cpu_nonlinux.go index 66d51156fe592..f6a5a09106f0a 100644 --- a/plugins/inputs/linux_cpu/linux_cpu_nonlinux.go +++ b/plugins/inputs/linux_cpu/linux_cpu_nonlinux.go @@ -16,11 +16,13 @@ type LinuxCPU struct { Log telegraf.Logger `toml:"-"` } +func (*LinuxCPU) SampleConfig() string { return sampleConfig } + func (l *LinuxCPU) Init() error { - l.Log.Warn("current platform is not supported") + l.Log.Warn("Current platform is not supported") return nil } -func (*LinuxCPU) SampleConfig() string { return sampleConfig } + func (*LinuxCPU) Gather(_ telegraf.Accumulator) error { return nil } func init() { diff --git a/plugins/inputs/linux_sysctl_fs/linux_sysctl_fs.go b/plugins/inputs/linux_sysctl_fs/linux_sysctl_fs.go index dff49d1c0ebe9..0777dae2c4dcc 100644 --- a/plugins/inputs/linux_sysctl_fs/linux_sysctl_fs.go +++ b/plugins/inputs/linux_sysctl_fs/linux_sysctl_fs.go @@ -10,6 +10,7 @@ import ( "strconv" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/inputs" ) @@ -21,6 +22,36 @@ type SysctlFS struct { path string } +func (*SysctlFS) SampleConfig() string { + return sampleConfig +} + +func (sfs *SysctlFS) Gather(acc telegraf.Accumulator) error { + fields := make(map[string]interface{}) + + for _, n := range []string{"aio-nr", "aio-max-nr", "dquot-nr", "dquot-max", "super-nr", "super-max"} { + if err := sfs.gatherOne(n, fields); err != nil { + return err + } + } + + err := sfs.gatherList("inode-state", fields, "inode-nr", "inode-free-nr", "inode-preshrink-nr") + if err != nil { + return err + } + err = sfs.gatherList("dentry-state", fields, "dentry-nr", "dentry-unused-nr", "dentry-age-limit", "dentry-want-pages") + if err != nil { + return err + } + err = sfs.gatherList("file-nr", fields, "file-nr", "", "file-max") + if err != nil { + return err + } + + acc.AddFields("linux_sysctl_fs", fields, nil) + return nil +} + func (sfs *SysctlFS) gatherList(file string, fields map[string]interface{}, fieldNames ...string) error { bs, err := os.ReadFile(sfs.path + "/" + file) if err != nil { @@ -69,48 +100,10 @@ func (sfs *SysctlFS) gatherOne(name string, fields map[string]interface{}) error return nil } -func (*SysctlFS) SampleConfig() string { - return sampleConfig -} - -func (sfs *SysctlFS) Gather(acc telegraf.Accumulator) error { - fields := make(map[string]interface{}) - - for _, n := range []string{"aio-nr", "aio-max-nr", "dquot-nr", "dquot-max", "super-nr", "super-max"} { - if err := sfs.gatherOne(n, fields); err != nil { - return err - } - } - - err := sfs.gatherList("inode-state", fields, "inode-nr", "inode-free-nr", "inode-preshrink-nr") - if err != nil { - return err - } - err = sfs.gatherList("dentry-state", fields, "dentry-nr", "dentry-unused-nr", "dentry-age-limit", "dentry-want-pages") - if err != nil { - return err - } - err = sfs.gatherList("file-nr", fields, "file-nr", "", "file-max") - if err != nil { - return err - } - - acc.AddFields("linux_sysctl_fs", fields, nil) - return nil -} - -func GetHostProc() string { - procPath := "/proc" - if os.Getenv("HOST_PROC") != "" { - procPath = os.Getenv("HOST_PROC") - } - return procPath -} - func init() { inputs.Add("linux_sysctl_fs", func() telegraf.Input { return &SysctlFS{ - path: path.Join(GetHostProc(), "/sys/fs"), + path: path.Join(internal.GetProcPath(), "/sys/fs"), } }) } diff --git a/plugins/inputs/logparser/logparser.go b/plugins/inputs/logparser/logparser.go index cfbdc27984ff0..1923a3bbf5456 100644 --- a/plugins/inputs/logparser/logparser.go +++ b/plugins/inputs/logparser/logparser.go @@ -21,88 +21,59 @@ import ( //go:embed sample.conf var sampleConfig string -const ( - defaultWatchMethod = "inotify" -) - var ( offsets = make(map[string]int64) offsetsMutex = new(sync.Mutex) ) -// LogParser in the primary interface for the plugin -type GrokConfig struct { - MeasurementName string `toml:"measurement"` - Patterns []string - NamedPatterns []string - CustomPatterns string - CustomPatternFiles []string - Timezone string - UniqueTimestamp string -} - -type logEntry struct { - path string - line string -} - -// LogParserPlugin is the primary struct to implement the interface for logparser plugin -type LogParserPlugin struct { - Files []string - FromBeginning bool - WatchMethod string +const ( + defaultWatchMethod = "inotify" +) - Log telegraf.Logger +type LogParser struct { + Files []string `toml:"files"` + FromBeginning bool `toml:"from_beginning"` + WatchMethod string `toml:"watch_method"` + GrokConfig grokConfig `toml:"grok"` + Log telegraf.Logger `toml:"-"` tailers map[string]*tail.Tail offsets map[string]int64 lines chan logEntry done chan struct{} wg sync.WaitGroup - acc telegraf.Accumulator - sync.Mutex + acc telegraf.Accumulator - GrokParser telegraf.Parser - GrokConfig GrokConfig `toml:"grok"` + sync.Mutex + grokParser telegraf.Parser } -func NewLogParser() *LogParserPlugin { - offsetsMutex.Lock() - offsetsCopy := make(map[string]int64, len(offsets)) - for k, v := range offsets { - offsetsCopy[k] = v - } - offsetsMutex.Unlock() +type grokConfig struct { + MeasurementName string `toml:"measurement"` + Patterns []string + NamedPatterns []string + CustomPatterns string + CustomPatternFiles []string + Timezone string + UniqueTimestamp string +} - return &LogParserPlugin{ - WatchMethod: defaultWatchMethod, - offsets: offsetsCopy, - } +type logEntry struct { + path string + line string } -func (*LogParserPlugin) SampleConfig() string { +func (*LogParser) SampleConfig() string { return sampleConfig } -func (l *LogParserPlugin) Init() error { +func (l *LogParser) Init() error { l.Log.Warnf(`The logparser plugin is deprecated; please use the 'tail' input with the 'grok' data_format`) return nil } -// Gather is the primary function to collect the metrics for the plugin -func (l *LogParserPlugin) Gather(_ telegraf.Accumulator) error { - l.Lock() - defer l.Unlock() - - // always start from the beginning of files that appear while we're running - l.tailNewFiles(true) - - return nil -} - -// Start kicks off collection of stats for the plugin -func (l *LogParserPlugin) Start(acc telegraf.Accumulator) error { +func (l *LogParser) Start(acc telegraf.Accumulator) error { l.Lock() defer l.Unlock() @@ -130,8 +101,8 @@ func (l *LogParserPlugin) Start(acc telegraf.Accumulator) error { if err != nil { return err } - l.GrokParser = &parser - models.SetLoggerOnPlugin(l.GrokParser, l.Log) + l.grokParser = &parser + models.SetLoggerOnPlugin(l.grokParser, l.Log) l.wg.Add(1) go l.parser() @@ -148,9 +119,54 @@ func (l *LogParserPlugin) Start(acc telegraf.Accumulator) error { return nil } +func (l *LogParser) Gather(_ telegraf.Accumulator) error { + l.Lock() + defer l.Unlock() + + // always start from the beginning of files that appear while we're running + l.tailNewFiles(true) + + return nil +} + +func (l *LogParser) Stop() { + l.Lock() + defer l.Unlock() + + for _, t := range l.tailers { + if !l.FromBeginning { + // store offset for resume + offset, err := t.Tell() + if err == nil { + l.offsets[t.Filename] = offset + l.Log.Debugf("Recording offset %d for file: %v", offset, t.Filename) + } else { + l.acc.AddError(fmt.Errorf("error recording offset for file %s", t.Filename)) + } + } + err := t.Stop() + + // message for a stopped tailer + l.Log.Debugf("Tail dropped for file: %v", t.Filename) + + if err != nil { + l.Log.Errorf("Error stopping tail on file %s", t.Filename) + } + } + close(l.done) + l.wg.Wait() + + // persist offsets + offsetsMutex.Lock() + for k, v := range l.offsets { + offsets[k] = v + } + offsetsMutex.Unlock() +} + // check the globs against files on disk, and start tailing any new files. // Assumes l's lock is held! -func (l *LogParserPlugin) tailNewFiles(fromBeginning bool) { +func (l *LogParser) tailNewFiles(fromBeginning bool) { var poll bool if l.WatchMethod == "poll" { poll = true @@ -213,7 +229,7 @@ func (l *LogParserPlugin) tailNewFiles(fromBeginning bool) { // receiver is launched as a goroutine to continuously watch a tailed logfile // for changes and send any log lines down the l.lines channel. -func (l *LogParserPlugin) receiver(tailer *tail.Tail) { +func (l *LogParser) receiver(tailer *tail.Tail) { defer l.wg.Done() var line *tail.Line @@ -242,7 +258,7 @@ func (l *LogParserPlugin) receiver(tailer *tail.Tail) { // parse is launched as a goroutine to watch the l.lines channel. // when a line is available, parse parses it and adds the metric(s) to the // accumulator. -func (l *LogParserPlugin) parser() { +func (l *LogParser) parser() { defer l.wg.Done() var m telegraf.Metric @@ -257,7 +273,7 @@ func (l *LogParserPlugin) parser() { continue } } - m, err = l.GrokParser.ParseLine(entry.line) + m, err = l.grokParser.ParseLine(entry.line) if err == nil { if m != nil { tags := m.Tags() @@ -270,44 +286,22 @@ func (l *LogParserPlugin) parser() { } } -// Stop will end the metrics collection process on file tailers -func (l *LogParserPlugin) Stop() { - l.Lock() - defer l.Unlock() - - for _, t := range l.tailers { - if !l.FromBeginning { - // store offset for resume - offset, err := t.Tell() - if err == nil { - l.offsets[t.Filename] = offset - l.Log.Debugf("Recording offset %d for file: %v", offset, t.Filename) - } else { - l.acc.AddError(fmt.Errorf("error recording offset for file %s", t.Filename)) - } - } - err := t.Stop() - - // message for a stopped tailer - l.Log.Debugf("Tail dropped for file: %v", t.Filename) - - if err != nil { - l.Log.Errorf("Error stopping tail on file %s", t.Filename) - } - } - close(l.done) - l.wg.Wait() - - // persist offsets +func newLogParser() *LogParser { offsetsMutex.Lock() - for k, v := range l.offsets { - offsets[k] = v + offsetsCopy := make(map[string]int64, len(offsets)) + for k, v := range offsets { + offsetsCopy[k] = v } offsetsMutex.Unlock() + + return &LogParser{ + WatchMethod: defaultWatchMethod, + offsets: offsetsCopy, + } } func init() { inputs.Add("logparser", func() telegraf.Input { - return NewLogParser() + return newLogParser() }) } diff --git a/plugins/inputs/logparser/logparser_test.go b/plugins/inputs/logparser/logparser_test.go index 07f891e371af7..7ff9589b5d268 100644 --- a/plugins/inputs/logparser/logparser_test.go +++ b/plugins/inputs/logparser/logparser_test.go @@ -17,7 +17,7 @@ var ( ) func TestStartNoParsers(t *testing.T) { - logparser := &LogParserPlugin{ + logparser := &LogParser{ Log: testutil.Logger{}, FromBeginning: true, Files: []string{filepath.Join(testdataDir, "*.log")}, @@ -28,11 +28,11 @@ func TestStartNoParsers(t *testing.T) { } func TestGrokParseLogFilesNonExistPattern(t *testing.T) { - logparser := &LogParserPlugin{ + logparser := &LogParser{ Log: testutil.Logger{}, FromBeginning: true, Files: []string{filepath.Join(testdataDir, "*.log")}, - GrokConfig: GrokConfig{ + GrokConfig: grokConfig{ Patterns: []string{"%{FOOBAR}"}, CustomPatternFiles: []string{filepath.Join(testdataDir, "test-patterns")}, }, @@ -44,9 +44,9 @@ func TestGrokParseLogFilesNonExistPattern(t *testing.T) { } func TestGrokParseLogFiles(t *testing.T) { - logparser := &LogParserPlugin{ + logparser := &LogParser{ Log: testutil.Logger{}, - GrokConfig: GrokConfig{ + GrokConfig: grokConfig{ MeasurementName: "logparser_grok", Patterns: []string{"%{TEST_LOG_A}", "%{TEST_LOG_B}", "%{TEST_LOG_C}"}, CustomPatternFiles: []string{filepath.Join(testdataDir, "test-patterns")}, @@ -122,11 +122,11 @@ func TestGrokParseLogFilesAppearLater(t *testing.T) { require.NoError(t, err) defer os.RemoveAll(emptydir) - logparser := &LogParserPlugin{ + logparser := &LogParser{ Log: testutil.Logger{}, FromBeginning: true, Files: []string{filepath.Join(emptydir, "*.log")}, - GrokConfig: GrokConfig{ + GrokConfig: grokConfig{ MeasurementName: "logparser_grok", Patterns: []string{"%{TEST_LOG_A}", "%{TEST_LOG_B}"}, CustomPatternFiles: []string{filepath.Join(testdataDir, "test-patterns")}, @@ -165,11 +165,11 @@ func TestGrokParseLogFilesAppearLater(t *testing.T) { // Test that test_a.log line gets parsed even though we don't have the correct // pattern available for test_b.log func TestGrokParseLogFilesOneBad(t *testing.T) { - logparser := &LogParserPlugin{ + logparser := &LogParser{ Log: testutil.Logger{}, FromBeginning: true, Files: []string{filepath.Join(testdataDir, "test_a.log")}, - GrokConfig: GrokConfig{ + GrokConfig: grokConfig{ MeasurementName: "logparser_grok", Patterns: []string{"%{TEST_LOG_A}", "%{TEST_LOG_BAD}"}, CustomPatternFiles: []string{filepath.Join(testdataDir, "test-patterns")}, @@ -197,9 +197,9 @@ func TestGrokParseLogFilesOneBad(t *testing.T) { } func TestGrokParseLogFiles_TimestampInEpochMilli(t *testing.T) { - logparser := &LogParserPlugin{ + logparser := &LogParser{ Log: testutil.Logger{}, - GrokConfig: GrokConfig{ + GrokConfig: grokConfig{ MeasurementName: "logparser_grok", Patterns: []string{"%{TEST_LOG_C}"}, CustomPatternFiles: []string{filepath.Join(testdataDir, "test-patterns")}, diff --git a/plugins/inputs/logstash/logstash.go b/plugins/inputs/logstash/logstash.go index 77251b32fe798..da65773c46f39 100644 --- a/plugins/inputs/logstash/logstash.go +++ b/plugins/inputs/logstash/logstash.go @@ -122,6 +122,69 @@ func (logstash *Logstash) Init() error { return nil } +func (*Logstash) Start(telegraf.Accumulator) error { + return nil +} + +func (logstash *Logstash) Gather(accumulator telegraf.Accumulator) error { + if logstash.client == nil { + client, err := logstash.createHTTPClient() + + if err != nil { + return err + } + logstash.client = client + } + + if choice.Contains("jvm", logstash.Collect) { + jvmURL, err := url.Parse(logstash.URL + jvmStatsNode) + if err != nil { + return err + } + if err := logstash.gatherJVMStats(jvmURL.String(), accumulator); err != nil { + return err + } + } + + if choice.Contains("process", logstash.Collect) { + processURL, err := url.Parse(logstash.URL + processStatsNode) + if err != nil { + return err + } + if err := logstash.gatherProcessStats(processURL.String(), accumulator); err != nil { + return err + } + } + + if choice.Contains("pipelines", logstash.Collect) { + if logstash.SinglePipeline { + pipelineURL, err := url.Parse(logstash.URL + pipelineStatsNode) + if err != nil { + return err + } + if err := logstash.gatherPipelineStats(pipelineURL.String(), accumulator); err != nil { + return err + } + } else { + pipelinesURL, err := url.Parse(logstash.URL + pipelinesStatsNode) + if err != nil { + return err + } + if err := logstash.gatherPipelinesStats(pipelinesURL.String(), accumulator); err != nil { + return err + } + } + } + + return nil +} + +func (logstash *Logstash) Stop() { + if logstash.client != nil { + logstash.client.CloseIdleConnections() + } +} + // createHTTPClient create a clients to access API func (logstash *Logstash) createHTTPClient() (*http.Client, error) { ctx := context.Background() @@ -193,7 +256,7 @@ func (logstash *Logstash) gatherJVMStats(address string, accumulator telegraf.Ac return nil } -// gatherJVMStats gather the Process metrics and add results to the accumulator +// gatherProcessStats gather the Process metrics and add results to the accumulator func (logstash *Logstash) gatherProcessStats(address string, accumulator telegraf.Accumulator) error { processStats := &processStats{} @@ -352,7 +415,7 @@ func (logstash *Logstash) gatherQueueStats(queue pipelineQueue, tags map[string] return nil } -// gatherJVMStats gather the Pipeline metrics and add results to the accumulator (for Logstash < 6) +// gatherPipelineStats gather the Pipeline metrics and add results to the accumulator (for Logstash < 6) func (logstash *Logstash) gatherPipelineStats(address string, accumulator telegraf.Accumulator) error { pipelineStats := &pipelineStats{} @@ -396,7 +459,7 @@ func (logstash *Logstash) gatherPipelineStats(address string, accumulator telegr return nil } -// gatherJVMStats gather the Pipelines metrics and add results to the accumulator (for Logstash >= 6) +// gatherPipelinesStats gather the Pipelines metrics and add results to the accumulator (for Logstash >= 6) func (logstash *Logstash) gatherPipelinesStats(address string, accumulator telegraf.Accumulator) error { pipelinesStats := &pipelinesStats{} @@ -443,78 +506,6 @@ func (logstash *Logstash) gatherPipelinesStats(address string, accumulator teleg return nil } -func (logstash *Logstash) Start(_ telegraf.Accumulator) error { - return nil -} - -// Gather ask this plugin to start gathering metrics -func (logstash *Logstash) Gather(accumulator telegraf.Accumulator) error { - if logstash.client == nil { - client, err := logstash.createHTTPClient() - - if err != nil { - return err - } - logstash.client = client - } - - if choice.Contains("jvm", logstash.Collect) { - jvmURL, err := url.Parse(logstash.URL + jvmStatsNode) - if err != nil { - return err - } - if err := logstash.gatherJVMStats(jvmURL.String(), accumulator); err != nil { - return err - } - } - - if choice.Contains("process", logstash.Collect) { - processURL, err := url.Parse(logstash.URL + processStatsNode) - if err != nil { - return err - } - if err := logstash.gatherProcessStats(processURL.String(), accumulator); err != nil { - return err - } - } - - if choice.Contains("pipelines", logstash.Collect) { - if logstash.SinglePipeline { - pipelineURL, err := url.Parse(logstash.URL + pipelineStatsNode) - if err != nil { - return err - } - if err := logstash.gatherPipelineStats(pipelineURL.String(), accumulator); err != nil { - return err - } - } else { - pipelinesURL, err := url.Parse(logstash.URL + pipelinesStatsNode) - if err != nil { - return err - } - if err := logstash.gatherPipelinesStats(pipelinesURL.String(), accumulator); err != nil { - return err - } - } - } - - return nil -} - -func (logstash *Logstash) Stop() { - if logstash.client != nil { - logstash.client.CloseIdleConnections() - } -} - -// init registers this plugin instance -func init() { - inputs.Add("logstash", func() telegraf.Input { - return newLogstash() - }) -} - -// newLogstash create an instance of the plugin with default settings func newLogstash() *Logstash { return &Logstash{ URL: "http://127.0.0.1:9600", @@ -525,3 +516,9 @@ func newLogstash() *Logstash { }, } } + +func init() { + inputs.Add("logstash", func() telegraf.Input { + return newLogstash() + }) +} diff --git a/plugins/inputs/lustre2/lustre2.go b/plugins/inputs/lustre2/lustre2.go index 7f8a582be4d5b..ef570f2c0ce59 100644 --- a/plugins/inputs/lustre2/lustre2.go +++ b/plugins/inputs/lustre2/lustre2.go @@ -24,13 +24,8 @@ import ( //go:embed sample.conf var sampleConfig string -type tags struct { - name, brwSection, bucket, job, client string -} - -// Lustre proc files can change between versions, so we want to future-proof -// by letting people choose what to look at. type Lustre2 struct { + // Lustre proc files can change between versions, so we want to future-proof by letting people choose what to look at. MgsProcfiles []string `toml:"mgs_procfiles"` OstProcfiles []string `toml:"ost_procfiles"` MdsProcfiles []string `toml:"mds_procfiles"` @@ -43,137 +38,524 @@ type Lustre2 struct { allFields map[tags]map[string]interface{} } -/* - The wanted fields would be a []string if not for the - -lines that start with read_bytes/write_bytes and contain +type tags struct { + name, brwSection, bucket, job, client string +} - both the byte count and the function call count -*/ -type mapping struct { - inProc string // What to look for at the start of a line in /proc/fs/lustre/* - field uint32 // which field to extract from that line - reportAs string // What measurement name to use +func (*Lustre2) SampleConfig() string { + return sampleConfig } -var wantedBrwstatsFields = []*mapping{ - { - inProc: "pages per bulk r/w", - reportAs: "pages_per_bulk_rw", - }, - { - inProc: "discontiguous pages", - reportAs: "discontiguous_pages", - }, - { - inProc: "disk I/Os in flight", - reportAs: "disk_ios_in_flight", - }, - { - inProc: "I/O time (1/1000s)", - reportAs: "io_time", - }, - { - inProc: "disk I/O size", - reportAs: "disk_io_size", - }, +func (l *Lustre2) Gather(acc telegraf.Accumulator) error { + l.allFields = make(map[tags]map[string]interface{}) + + err := l.getLustreHealth() + if err != nil { + return err + } + + if len(l.MgsProcfiles) == 0 { + l.MgsProcfiles = []string{ + // eviction count + "/sys/fs/lustre/mgs/*/eviction_count", + } + } + + if len(l.OstProcfiles) == 0 { + l.OstProcfiles = []string{ + // read/write bytes are in obdfilter//stats + "/proc/fs/lustre/obdfilter/*/stats", + // cache counters are in osd-ldiskfs//stats + "/proc/fs/lustre/osd-ldiskfs/*/stats", + // per job statistics are in obdfilter//job_stats + "/proc/fs/lustre/obdfilter/*/job_stats", + // bulk read/write statistics for ldiskfs + "/proc/fs/lustre/osd-ldiskfs/*/brw_stats", + // bulk read/write statistics for zfs + "/proc/fs/lustre/osd-zfs/*/brw_stats", + // eviction count + "/sys/fs/lustre/obdfilter/*/eviction_count", + } + } + + if len(l.MdsProcfiles) == 0 { + l.MdsProcfiles = []string{ + // Metadata server stats + "/proc/fs/lustre/mdt/*/md_stats", + // Metadata target job stats + "/proc/fs/lustre/mdt/*/job_stats", + // eviction count + "/sys/fs/lustre/mdt/*/eviction_count", + } + } + + for _, procfile := range l.MgsProcfiles { + if !strings.HasSuffix(procfile, "eviction_count") { + return fmt.Errorf("no handler found for mgs procfile pattern \"%s\"", procfile) + } + err := l.getLustreEvictionCount(procfile) + if err != nil { + return err + } + } + for _, procfile := range l.OstProcfiles { + if strings.HasSuffix(procfile, "brw_stats") { + err := l.getLustreProcBrwStats(procfile, wantedBrwstatsFields) + if err != nil { + return err + } + } else if strings.HasSuffix(procfile, "job_stats") { + err := l.getLustreProcStats(procfile, wantedOstJobstatsFields) + if err != nil { + return err + } + } else if strings.HasSuffix(procfile, "eviction_count") { + err := l.getLustreEvictionCount(procfile) + if err != nil { + return err + } + } else { + err := l.getLustreProcStats(procfile, wantedOstFields) + if err != nil { + return err + } + } + } + for _, procfile := range l.MdsProcfiles { + if strings.HasSuffix(procfile, "brw_stats") { + err := l.getLustreProcBrwStats(procfile, wantedBrwstatsFields) + if err != nil { + return err + } + } else if strings.HasSuffix(procfile, "job_stats") { + err := l.getLustreProcStats(procfile, wantedMdtJobstatsFields) + if err != nil { + return err + } + } else if strings.HasSuffix(procfile, "eviction_count") { + err := l.getLustreEvictionCount(procfile) + if err != nil { + return err + } + } else { + err := l.getLustreProcStats(procfile, wantedMdsFields) + if err != nil { + return err + } + } + } + + for tgs, fields := range l.allFields { + tags := make(map[string]string, 5) + if len(tgs.name) > 0 { + tags["name"] = tgs.name + } + if len(tgs.brwSection) > 0 { + tags["brw_section"] = tgs.brwSection + } + if len(tgs.bucket) > 0 { + tags["bucket"] = tgs.bucket + } + if len(tgs.job) > 0 { + tags["jobid"] = tgs.job + } + if len(tgs.client) > 0 { + tags["client"] = tgs.client + } + acc.AddFields("lustre2", fields, tags) + } + + return nil } -var wantedOstFields = []*mapping{ - { - inProc: "write_bytes", - field: 6, - reportAs: "write_bytes", - }, - { // line starts with 'write_bytes', but value write_calls is in second column - inProc: "write_bytes", - field: 1, - reportAs: "write_calls", - }, - { - inProc: "read_bytes", - field: 6, - reportAs: "read_bytes", - }, - { // line starts with 'read_bytes', but value read_calls is in second column - inProc: "read_bytes", - field: 1, - reportAs: "read_calls", - }, - { - inProc: "cache_hit", - }, - { - inProc: "cache_miss", - }, - { - inProc: "cache_access", - }, +func (l *Lustre2) getLustreHealth() error { + // the linter complains about using an element containing '/' in filepath.Join() + // so we explicitly set the rootdir default to '/' in this function rather than + // starting the second element with a '/'. + rootdir := l.rootdir + if rootdir == "" { + rootdir = "/" + } + + filename := filepath.Join(rootdir, "sys", "fs", "lustre", "health_check") + if _, err := os.Stat(filename); err != nil { + // try falling back to the old procfs location + // it was moved in https://github.com/lustre/lustre-release/commit/5d368bd0b2 + filename = filepath.Join(rootdir, "proc", "fs", "lustre", "health_check") + if _, err = os.Stat(filename); err != nil { + return nil //nolint:nilerr // we don't want to return an error if the file doesn't exist + } + } + contents, err := os.ReadFile(filename) + if err != nil { + return err + } + + value := strings.TrimSpace(string(contents)) + var health uint64 + if value == "healthy" { + health = 1 + } + + t := tags{} + var fields map[string]interface{} + fields, ok := l.allFields[t] + if !ok { + fields = make(map[string]interface{}) + l.allFields[t] = fields + } + + fields["health"] = health + return nil } -var wantedOstJobstatsFields = []*mapping{ - { // The read line has several fields, so we need to differentiate what they are - inProc: "read", - field: 3, - reportAs: "jobstats_read_calls", - }, - { - inProc: "read", - field: 7, - reportAs: "jobstats_read_min_size", - }, - { - inProc: "read", - field: 9, - reportAs: "jobstats_read_max_size", - }, - { - inProc: "read", - field: 11, - reportAs: "jobstats_read_bytes", - }, - { // Different inProc for newer versions - inProc: "read_bytes", - field: 3, - reportAs: "jobstats_read_calls", - }, - { - inProc: "read_bytes", - field: 7, - reportAs: "jobstats_read_min_size", - }, - { - inProc: "read_bytes", - field: 9, - reportAs: "jobstats_read_max_size", - }, - { - inProc: "read_bytes", - field: 11, - reportAs: "jobstats_read_bytes", +func (l *Lustre2) getLustreProcStats(fileglob string, wantedFields []*mapping) error { + files, err := filepath.Glob(filepath.Join(l.rootdir, fileglob)) + if err != nil { + return err + } + + fieldSplitter := regexp.MustCompile(`[ :]+`) + + for _, file := range files { + /* From /proc/fs/lustre/obdfilter//stats and similar + * extract the object store target name, + * and for per-client files under + * /proc/fs/lustre/obdfilter//exports//stats + * and similar the client NID + * Assumption: the target name is fourth to last + * for per-client files and second to last otherwise + * and the client NID is always second to last, + * which is true in Lustre 2.1->2.14 + */ + path := strings.Split(file, "/") + var name, client string + if strings.Contains(file, "/exports/") { + name = path[len(path)-4] + client = path[len(path)-2] + } else { + name = path[len(path)-2] + client = "" + } + + wholeFile, err := os.ReadFile(file) + if err != nil { + return err + } + jobs := strings.Split(string(wholeFile), "- ") + for _, job := range jobs { + lines := strings.Split(job, "\n") + jobid := "" + + // figure out if the data should be tagged with job_id here + parts := strings.Fields(lines[0]) + if strings.TrimSuffix(parts[0], ":") == "job_id" { + jobid = parts[1] + } + + for _, line := range lines { + // skip any empty lines + if len(line) < 1 { + continue + } + + parts := fieldSplitter.Split(line, -1) + if len(parts[0]) == 0 { + parts = parts[1:] + } + + var fields map[string]interface{} + fields, ok := l.allFields[tags{name, "", "", jobid, client}] + if !ok { + fields = make(map[string]interface{}) + l.allFields[tags{name, "", "", jobid, client}] = fields + } + + for _, wanted := range wantedFields { + var data uint64 + if parts[0] == wanted.inProc { + wantedField := wanted.field + // if not set, assume field[1]. Shouldn't be field[0], as + // that's a string + if wantedField == 0 { + wantedField = 1 + } + data, err = strconv.ParseUint(strings.TrimSuffix(parts[wantedField], ","), 10, 64) + if err != nil { + return err + } + reportName := wanted.inProc + if wanted.reportAs != "" { + reportName = wanted.reportAs + } + fields[reportName] = data + } + } + } + } + } + return nil +} + +func (l *Lustre2) getLustreProcBrwStats(fileglob string, wantedFields []*mapping) error { + files, err := filepath.Glob(filepath.Join(l.rootdir, fileglob)) + if err != nil { + return fmt.Errorf("failed to find files matching glob %s: %w", fileglob, err) + } + + for _, file := range files { + // Turn /proc/fs/lustre/obdfilter//stats and similar into just the object store target name + // This assumes that the target name is always second to last, which is true in Lustre 2.1->2.12 + path := strings.Split(file, "/") + if len(path) < 2 { + continue + } + name := path[len(path)-2] + + wholeFile, err := os.ReadFile(file) + if err != nil { + if errors.Is(err, os.ErrPermission) { + l.Log.Debugf("%s", err) + continue + } + return fmt.Errorf("failed to read file %s: %w", file, err) + } + lines := strings.Split(string(wholeFile), "\n") + + var headerName string + for _, line := range lines { + // There are four types of lines in a brw_stats file: + // 1. Header lines - contain the category of metric (e.g. disk I/Os in flight, disk I/O time) + // 2. Bucket lines - follow headers, contain the bucket value (e.g. 4K, 1M) and metric values + // 3. Empty lines - these will simply be filtered out + // 4. snapshot_time line - this will be filtered out, as it "looks" like a bucket line + if len(line) < 1 { + continue + } + parts := strings.Fields(line) + + // This is a header line + // Set report name for use by the buckets that follow + if !strings.Contains(parts[0], ":") { + nameParts := strings.Split(line, " ") + headerName = nameParts[0] + continue + } + + // snapshot_time should be discarded + if strings.Contains(parts[0], "snapshot_time") { + continue + } + + // This is a bucket for a given header + for _, wanted := range wantedFields { + if headerName != wanted.inProc { + continue + } + bucket := strings.TrimSuffix(parts[0], ":") + + // brw_stats columns are static and don't need configurable fields + readIos, err := strconv.ParseUint(parts[1], 10, 64) + if err != nil { + return fmt.Errorf("failed to parse read_ios: %w", err) + } + readPercent, err := strconv.ParseUint(parts[2], 10, 64) + if err != nil { + return fmt.Errorf("failed to parse read_percent: %w", err) + } + writeIos, err := strconv.ParseUint(parts[5], 10, 64) + if err != nil { + return fmt.Errorf("failed to parse write_ios: %w", err) + } + writePercent, err := strconv.ParseUint(parts[6], 10, 64) + if err != nil { + return fmt.Errorf("failed to parse write_percent: %w", err) + } + reportName := headerName + if wanted.reportAs != "" { + reportName = wanted.reportAs + } + + tag := tags{name, reportName, bucket, "", ""} + fields, ok := l.allFields[tag] + if !ok { + fields = make(map[string]interface{}) + l.allFields[tag] = fields + } + + fields["read_ios"] = readIos + fields["read_percent"] = readPercent + fields["write_ios"] = writeIos + fields["write_percent"] = writePercent + } + } + } + return nil +} + +func (l *Lustre2) getLustreEvictionCount(fileglob string) error { + files, err := filepath.Glob(filepath.Join(l.rootdir, fileglob)) + if err != nil { + return fmt.Errorf("failed to find files matching glob %s: %w", fileglob, err) + } + + for _, file := range files { + // Turn /sys/fs/lustre/*//eviction_count into just the object store target name + // This assumes that the target name is always second to last, which is true in Lustre 2.1->2.12 + path := strings.Split(file, "/") + if len(path) < 2 { + continue + } + name := path[len(path)-2] + + contents, err := os.ReadFile(file) + if err != nil { + return fmt.Errorf("failed to read file %s: %w", file, err) + } + + value, err := strconv.ParseUint(strings.TrimSpace(string(contents)), 10, 64) + if err != nil { + return fmt.Errorf("failed to parse file %s: %w", file, err) + } + + tag := tags{name, "", "", "", ""} + fields, ok := l.allFields[tag] + if !ok { + fields = make(map[string]interface{}) + l.allFields[tag] = fields + } + + fields["evictions"] = value + } + return nil +} + +// The wanted fields would be a []string, if not for the lines that start with read_bytes/write_bytes +// and contain both the byte count and the function call count +type mapping struct { + inProc string // What to look for at the start of a line in /proc/fs/lustre/* + field uint32 // which field to extract from that line + reportAs string // What measurement name to use +} + +var wantedBrwstatsFields = []*mapping{ + { + inProc: "pages per bulk r/w", + reportAs: "pages_per_bulk_rw", }, - { // We need to do the same for the write fields - inProc: "write", - field: 3, - reportAs: "jobstats_write_calls", + { + inProc: "discontiguous pages", + reportAs: "discontiguous_pages", }, { - inProc: "write", - field: 7, - reportAs: "jobstats_write_min_size", + inProc: "disk I/Os in flight", + reportAs: "disk_ios_in_flight", }, { - inProc: "write", - field: 9, - reportAs: "jobstats_write_max_size", + inProc: "I/O time (1/1000s)", + reportAs: "io_time", }, { - inProc: "write", - field: 11, - reportAs: "jobstats_write_bytes", + inProc: "disk I/O size", + reportAs: "disk_io_size", }, - { // Different inProc for newer versions - inProc: "write_bytes", +} + +var wantedOstFields = []*mapping{ + { + inProc: "write_bytes", + field: 6, + reportAs: "write_bytes", + }, + { // line starts with 'write_bytes', but value write_calls is in second column + inProc: "write_bytes", + field: 1, + reportAs: "write_calls", + }, + { + inProc: "read_bytes", + field: 6, + reportAs: "read_bytes", + }, + { // line starts with 'read_bytes', but value read_calls is in second column + inProc: "read_bytes", + field: 1, + reportAs: "read_calls", + }, + { + inProc: "cache_hit", + }, + { + inProc: "cache_miss", + }, + { + inProc: "cache_access", + }, +} + +var wantedOstJobstatsFields = []*mapping{ + { // The read line has several fields, so we need to differentiate what they are + inProc: "read", + field: 3, + reportAs: "jobstats_read_calls", + }, + { + inProc: "read", + field: 7, + reportAs: "jobstats_read_min_size", + }, + { + inProc: "read", + field: 9, + reportAs: "jobstats_read_max_size", + }, + { + inProc: "read", + field: 11, + reportAs: "jobstats_read_bytes", + }, + { // Different inProc for newer versions + inProc: "read_bytes", + field: 3, + reportAs: "jobstats_read_calls", + }, + { + inProc: "read_bytes", + field: 7, + reportAs: "jobstats_read_min_size", + }, + { + inProc: "read_bytes", + field: 9, + reportAs: "jobstats_read_max_size", + }, + { + inProc: "read_bytes", + field: 11, + reportAs: "jobstats_read_bytes", + }, + { // We need to do the same for the write fields + inProc: "write", + field: 3, + reportAs: "jobstats_write_calls", + }, + { + inProc: "write", + field: 7, + reportAs: "jobstats_write_min_size", + }, + { + inProc: "write", + field: 9, + reportAs: "jobstats_write_max_size", + }, + { + inProc: "write", + field: 11, + reportAs: "jobstats_write_bytes", + }, + { // Different inProc for newer versions + inProc: "write_bytes", field: 3, reportAs: "jobstats_write_calls", }, @@ -378,395 +760,6 @@ var wantedMdtJobstatsFields = []*mapping{ }, } -func (*Lustre2) SampleConfig() string { - return sampleConfig -} - -func (l *Lustre2) GetLustreHealth() error { - // the linter complains about using an element containing '/' in filepath.Join() - // so we explicitly set the rootdir default to '/' in this function rather than - // starting the second element with a '/'. - rootdir := l.rootdir - if rootdir == "" { - rootdir = "/" - } - - filename := filepath.Join(rootdir, "sys", "fs", "lustre", "health_check") - if _, err := os.Stat(filename); err != nil { - // try falling back to the old procfs location - // it was moved in https://github.com/lustre/lustre-release/commit/5d368bd0b2 - filename = filepath.Join(rootdir, "proc", "fs", "lustre", "health_check") - if _, err = os.Stat(filename); err != nil { - return nil //nolint:nilerr // we don't want to return an error if the file doesn't exist - } - } - contents, err := os.ReadFile(filename) - if err != nil { - return err - } - - value := strings.TrimSpace(string(contents)) - var health uint64 - if value == "healthy" { - health = 1 - } - - t := tags{} - var fields map[string]interface{} - fields, ok := l.allFields[t] - if !ok { - fields = make(map[string]interface{}) - l.allFields[t] = fields - } - - fields["health"] = health - return nil -} - -func (l *Lustre2) GetLustreProcStats(fileglob string, wantedFields []*mapping) error { - files, err := filepath.Glob(filepath.Join(l.rootdir, fileglob)) - if err != nil { - return err - } - - fieldSplitter := regexp.MustCompile(`[ :]+`) - - for _, file := range files { - /* From /proc/fs/lustre/obdfilter//stats and similar - * extract the object store target name, - * and for per-client files under - * /proc/fs/lustre/obdfilter//exports//stats - * and similar the client NID - * Assumption: the target name is fourth to last - * for per-client files and second to last otherwise - * and the client NID is always second to last, - * which is true in Lustre 2.1->2.14 - */ - path := strings.Split(file, "/") - var name, client string - if strings.Contains(file, "/exports/") { - name = path[len(path)-4] - client = path[len(path)-2] - } else { - name = path[len(path)-2] - client = "" - } - - wholeFile, err := os.ReadFile(file) - if err != nil { - return err - } - jobs := strings.Split(string(wholeFile), "- ") - for _, job := range jobs { - lines := strings.Split(job, "\n") - jobid := "" - - // figure out if the data should be tagged with job_id here - parts := strings.Fields(lines[0]) - if strings.TrimSuffix(parts[0], ":") == "job_id" { - jobid = parts[1] - } - - for _, line := range lines { - // skip any empty lines - if len(line) < 1 { - continue - } - - parts := fieldSplitter.Split(line, -1) - if len(parts[0]) == 0 { - parts = parts[1:] - } - - var fields map[string]interface{} - fields, ok := l.allFields[tags{name, "", "", jobid, client}] - if !ok { - fields = make(map[string]interface{}) - l.allFields[tags{name, "", "", jobid, client}] = fields - } - - for _, wanted := range wantedFields { - var data uint64 - if parts[0] == wanted.inProc { - wantedField := wanted.field - // if not set, assume field[1]. Shouldn't be field[0], as - // that's a string - if wantedField == 0 { - wantedField = 1 - } - data, err = strconv.ParseUint(strings.TrimSuffix(parts[wantedField], ","), 10, 64) - if err != nil { - return err - } - reportName := wanted.inProc - if wanted.reportAs != "" { - reportName = wanted.reportAs - } - fields[reportName] = data - } - } - } - } - } - return nil -} - -func (l *Lustre2) getLustreProcBrwStats(fileglob string, wantedFields []*mapping) error { - files, err := filepath.Glob(filepath.Join(l.rootdir, fileglob)) - if err != nil { - return fmt.Errorf("failed to find files matching glob %s: %w", fileglob, err) - } - - for _, file := range files { - // Turn /proc/fs/lustre/obdfilter//stats and similar into just the object store target name - // This assumes that the target name is always second to last, which is true in Lustre 2.1->2.12 - path := strings.Split(file, "/") - if len(path) < 2 { - continue - } - name := path[len(path)-2] - - wholeFile, err := os.ReadFile(file) - if err != nil { - if errors.Is(err, os.ErrPermission) { - l.Log.Debugf("%s", err) - continue - } - return fmt.Errorf("failed to read file %s: %w", file, err) - } - lines := strings.Split(string(wholeFile), "\n") - - var headerName string - for _, line := range lines { - // There are four types of lines in a brw_stats file: - // 1. Header lines - contain the category of metric (e.g. disk I/Os in flight, disk I/O time) - // 2. Bucket lines - follow headers, contain the bucket value (e.g. 4K, 1M) and metric values - // 3. Empty lines - these will simply be filtered out - // 4. snapshot_time line - this will be filtered out, as it "looks" like a bucket line - if len(line) < 1 { - continue - } - parts := strings.Fields(line) - - // This is a header line - // Set report name for use by the buckets that follow - if !strings.Contains(parts[0], ":") { - nameParts := strings.Split(line, " ") - headerName = nameParts[0] - continue - } - - // snapshot_time should be discarded - if strings.Contains(parts[0], "snapshot_time") { - continue - } - - // This is a bucket for a given header - for _, wanted := range wantedFields { - if headerName != wanted.inProc { - continue - } - bucket := strings.TrimSuffix(parts[0], ":") - - // brw_stats columns are static and don't need configurable fields - readIos, err := strconv.ParseUint(parts[1], 10, 64) - if err != nil { - return fmt.Errorf("failed to parse read_ios: %w", err) - } - readPercent, err := strconv.ParseUint(parts[2], 10, 64) - if err != nil { - return fmt.Errorf("failed to parse read_percent: %w", err) - } - writeIos, err := strconv.ParseUint(parts[5], 10, 64) - if err != nil { - return fmt.Errorf("failed to parse write_ios: %w", err) - } - writePercent, err := strconv.ParseUint(parts[6], 10, 64) - if err != nil { - return fmt.Errorf("failed to parse write_percent: %w", err) - } - reportName := headerName - if wanted.reportAs != "" { - reportName = wanted.reportAs - } - - tag := tags{name, reportName, bucket, "", ""} - fields, ok := l.allFields[tag] - if !ok { - fields = make(map[string]interface{}) - l.allFields[tag] = fields - } - - fields["read_ios"] = readIos - fields["read_percent"] = readPercent - fields["write_ios"] = writeIos - fields["write_percent"] = writePercent - } - } - } - return nil -} - -func (l *Lustre2) getLustreEvictionCount(fileglob string) error { - files, err := filepath.Glob(filepath.Join(l.rootdir, fileglob)) - if err != nil { - return fmt.Errorf("failed to find files matching glob %s: %w", fileglob, err) - } - - for _, file := range files { - // Turn /sys/fs/lustre/*//eviction_count into just the object store target name - // This assumes that the target name is always second to last, which is true in Lustre 2.1->2.12 - path := strings.Split(file, "/") - if len(path) < 2 { - continue - } - name := path[len(path)-2] - - contents, err := os.ReadFile(file) - if err != nil { - return fmt.Errorf("failed to read file %s: %w", file, err) - } - - value, err := strconv.ParseUint(strings.TrimSpace(string(contents)), 10, 64) - if err != nil { - return fmt.Errorf("failed to parse file %s: %w", file, err) - } - - tag := tags{name, "", "", "", ""} - fields, ok := l.allFields[tag] - if !ok { - fields = make(map[string]interface{}) - l.allFields[tag] = fields - } - - fields["evictions"] = value - } - return nil -} - -// Gather reads stats from all lustre targets -func (l *Lustre2) Gather(acc telegraf.Accumulator) error { - l.allFields = make(map[tags]map[string]interface{}) - - err := l.GetLustreHealth() - if err != nil { - return err - } - - if len(l.MgsProcfiles) == 0 { - l.MgsProcfiles = []string{ - // eviction count - "/sys/fs/lustre/mgs/*/eviction_count", - } - } - - if len(l.OstProcfiles) == 0 { - l.OstProcfiles = []string{ - // read/write bytes are in obdfilter//stats - "/proc/fs/lustre/obdfilter/*/stats", - // cache counters are in osd-ldiskfs//stats - "/proc/fs/lustre/osd-ldiskfs/*/stats", - // per job statistics are in obdfilter//job_stats - "/proc/fs/lustre/obdfilter/*/job_stats", - // bulk read/write statistics for ldiskfs - "/proc/fs/lustre/osd-ldiskfs/*/brw_stats", - // bulk read/write statistics for zfs - "/proc/fs/lustre/osd-zfs/*/brw_stats", - // eviction count - "/sys/fs/lustre/obdfilter/*/eviction_count", - } - } - - if len(l.MdsProcfiles) == 0 { - l.MdsProcfiles = []string{ - // Metadata server stats - "/proc/fs/lustre/mdt/*/md_stats", - // Metadata target job stats - "/proc/fs/lustre/mdt/*/job_stats", - // eviction count - "/sys/fs/lustre/mdt/*/eviction_count", - } - } - - for _, procfile := range l.MgsProcfiles { - if !strings.HasSuffix(procfile, "eviction_count") { - return fmt.Errorf("no handler found for mgs procfile pattern \"%s\"", procfile) - } - err := l.getLustreEvictionCount(procfile) - if err != nil { - return err - } - } - for _, procfile := range l.OstProcfiles { - if strings.HasSuffix(procfile, "brw_stats") { - err := l.getLustreProcBrwStats(procfile, wantedBrwstatsFields) - if err != nil { - return err - } - } else if strings.HasSuffix(procfile, "job_stats") { - err := l.GetLustreProcStats(procfile, wantedOstJobstatsFields) - if err != nil { - return err - } - } else if strings.HasSuffix(procfile, "eviction_count") { - err := l.getLustreEvictionCount(procfile) - if err != nil { - return err - } - } else { - err := l.GetLustreProcStats(procfile, wantedOstFields) - if err != nil { - return err - } - } - } - for _, procfile := range l.MdsProcfiles { - if strings.HasSuffix(procfile, "brw_stats") { - err := l.getLustreProcBrwStats(procfile, wantedBrwstatsFields) - if err != nil { - return err - } - } else if strings.HasSuffix(procfile, "job_stats") { - err := l.GetLustreProcStats(procfile, wantedMdtJobstatsFields) - if err != nil { - return err - } - } else if strings.HasSuffix(procfile, "eviction_count") { - err := l.getLustreEvictionCount(procfile) - if err != nil { - return err - } - } else { - err := l.GetLustreProcStats(procfile, wantedMdsFields) - if err != nil { - return err - } - } - } - - for tgs, fields := range l.allFields { - tags := make(map[string]string, 5) - if len(tgs.name) > 0 { - tags["name"] = tgs.name - } - if len(tgs.brwSection) > 0 { - tags["brw_section"] = tgs.brwSection - } - if len(tgs.bucket) > 0 { - tags["bucket"] = tgs.bucket - } - if len(tgs.job) > 0 { - tags["jobid"] = tgs.job - } - if len(tgs.client) > 0 { - tags["client"] = tgs.client - } - acc.AddFields("lustre2", fields, tags) - } - - return nil -} - func init() { inputs.Add("lustre2", func() telegraf.Input { return &Lustre2{} diff --git a/plugins/inputs/lustre2/lustre2_notlinux.go b/plugins/inputs/lustre2/lustre2_notlinux.go index a19cd4690da34..1ba7a436c7847 100644 --- a/plugins/inputs/lustre2/lustre2_notlinux.go +++ b/plugins/inputs/lustre2/lustre2_notlinux.go @@ -17,11 +17,13 @@ type Lustre2 struct { Log telegraf.Logger `toml:"-"` } +func (*Lustre2) SampleConfig() string { return sampleConfig } + func (l *Lustre2) Init() error { - l.Log.Warn("current platform is not supported") + l.Log.Warn("Current platform is not supported") return nil } -func (*Lustre2) SampleConfig() string { return sampleConfig } + func (*Lustre2) Gather(_ telegraf.Accumulator) error { return nil } func init() { diff --git a/plugins/inputs/lvm/lvm_test.go b/plugins/inputs/lvm/lvm_test.go index 0b3caf5f7e852..e689eecad674c 100644 --- a/plugins/inputs/lvm/lvm_test.go +++ b/plugins/inputs/lvm/lvm_test.go @@ -6,8 +6,9 @@ import ( "os/exec" "testing" - "github.com/influxdata/telegraf/testutil" "github.com/stretchr/testify/require" + + "github.com/influxdata/telegraf/testutil" ) func TestGather(t *testing.T) { diff --git a/plugins/inputs/mdstat/mdstat.go b/plugins/inputs/mdstat/mdstat.go index 926399636d62b..f1edf3f0e753e 100644 --- a/plugins/inputs/mdstat/mdstat.go +++ b/plugins/inputs/mdstat/mdstat.go @@ -28,17 +28,13 @@ import ( "strings" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/inputs" ) //go:embed sample.conf var sampleConfig string -const ( - defaultHostProc = "/proc" - envProc = "HOST_PROC" -) - var ( statusLineRE = regexp.MustCompile(`(\d+) blocks .*\[(\d+)/(\d+)\] \[([U_]+)\]`) recoveryLineBlocksRE = regexp.MustCompile(`\((\d+)/\d+\)`) @@ -274,7 +270,7 @@ func (k *MdstatConf) Gather(acc telegraf.Accumulator) error { func (k *MdstatConf) getProcMdstat() ([]byte, error) { var mdStatFile string if k.FileName == "" { - mdStatFile = proc(envProc, defaultHostProc) + "/mdstat" + mdStatFile = internal.GetProcPath() + "/mdstat" } else { mdStatFile = k.FileName } @@ -295,13 +291,3 @@ func (k *MdstatConf) getProcMdstat() ([]byte, error) { func init() { inputs.Add("mdstat", func() telegraf.Input { return &MdstatConf{} }) } - -// proc can be used to read file paths from env -func proc(env, path string) string { - // try to read full file path - if p := os.Getenv(env); p != "" { - return p - } - // return default path - return path -} diff --git a/plugins/inputs/net/net.go b/plugins/inputs/net/net.go index 0c3c4cdcc023a..03d770c75c210 100644 --- a/plugins/inputs/net/net.go +++ b/plugins/inputs/net/net.go @@ -13,6 +13,7 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/filter" + "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs/system" ) @@ -136,10 +137,7 @@ func (n *NetIOStats) Gather(acc telegraf.Accumulator) error { // Get the interface speed from /sys/class/net/*/speed file. returns -1 if unsupported func getInterfaceSpeed(ioName string) int64 { - sysPath := os.Getenv("HOST_SYS") - if sysPath == "" { - sysPath = "/sys" - } + sysPath := internal.GetSysPath() raw, err := os.ReadFile(filepath.Join(sysPath, "class", "net", ioName, "speed")) if err != nil { diff --git a/plugins/inputs/processes/processes_notwindows.go b/plugins/inputs/processes/processes_notwindows.go index 8ff578b9f0326..c574238fd5a23 100644 --- a/plugins/inputs/processes/processes_notwindows.go +++ b/plugins/inputs/processes/processes_notwindows.go @@ -14,8 +14,8 @@ import ( "syscall" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/inputs" - "github.com/influxdata/telegraf/plugins/inputs/linux_sysctl_fs" ) type Processes struct { @@ -130,7 +130,7 @@ func (p *Processes) gatherFromPS(fields map[string]interface{}) error { // get process states from /proc/(pid)/stat files func (p *Processes) gatherFromProc(fields map[string]interface{}) error { - filenames, err := filepath.Glob(linux_sysctl_fs.GetHostProc() + "/[0-9]*/stat") + filenames, err := filepath.Glob(internal.GetProcPath() + "/[0-9]*/stat") if err != nil { return err } diff --git a/plugins/inputs/procstat/os_linux.go b/plugins/inputs/procstat/os_linux.go index a146f2c500afa..6c9d906faa276 100644 --- a/plugins/inputs/procstat/os_linux.go +++ b/plugins/inputs/procstat/os_linux.go @@ -17,6 +17,8 @@ import ( "github.com/shirou/gopsutil/v4/process" "github.com/vishvananda/netlink" "golang.org/x/sys/unix" + + "github.com/influxdata/telegraf/internal" ) func processName(p *process.Process) (string, error) { @@ -86,11 +88,7 @@ func findByWindowsServices(_ []string) ([]processGroup, error) { } func collectTotalReadWrite(proc Process) (r, w uint64, err error) { - path := procfs.DefaultMountPoint - if hp := os.Getenv("HOST_PROC"); hp != "" { - path = hp - } - + path := internal.GetProcPath() fs, err := procfs.NewFS(path) if err != nil { return 0, 0, err @@ -163,11 +161,7 @@ func socketTypeName(t uint8) string { } func mapFdToInode(pid int32, fd uint32) (uint32, error) { - root := os.Getenv("HOST_PROC") - if root == "" { - root = "/proc" - } - + root := internal.GetProcPath() fn := fmt.Sprintf("%s/%d/fd/%d", root, pid, fd) link, err := os.Readlink(fn) if err != nil { diff --git a/plugins/inputs/slab/slab.go b/plugins/inputs/slab/slab.go index a2fd19faf02e8..702e9c0e8a2bf 100644 --- a/plugins/inputs/slab/slab.go +++ b/plugins/inputs/slab/slab.go @@ -103,14 +103,6 @@ func (ss *SlabStats) runCmd(cmd string, args []string) ([]byte, error) { return out, nil } -func getHostProc() string { - procPath := "/proc" - if os.Getenv("HOST_PROC") != "" { - procPath = os.Getenv("HOST_PROC") - } - return procPath -} - func normalizeName(name string) string { return strings.ReplaceAll(strings.ToLower(name), "-", "_") + "_size" } @@ -118,7 +110,7 @@ func normalizeName(name string) string { func init() { inputs.Add("slab", func() telegraf.Input { return &SlabStats{ - statFile: path.Join(getHostProc(), "slabinfo"), + statFile: path.Join(internal.GetProcPath(), "slabinfo"), useSudo: true, } }) diff --git a/plugins/inputs/synproxy/synproxy.go b/plugins/inputs/synproxy/synproxy.go index c2b2ef49fa848..fa1d66ee3513a 100644 --- a/plugins/inputs/synproxy/synproxy.go +++ b/plugins/inputs/synproxy/synproxy.go @@ -3,10 +3,10 @@ package synproxy import ( _ "embed" - "os" "path" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/inputs" ) @@ -24,18 +24,10 @@ func (*Synproxy) SampleConfig() string { return sampleConfig } -func getHostProc() string { - procPath := "/proc" - if os.Getenv("HOST_PROC") != "" { - procPath = os.Getenv("HOST_PROC") - } - return procPath -} - func init() { inputs.Add("synproxy", func() telegraf.Input { return &Synproxy{ - statFile: path.Join(getHostProc(), "/net/stat/synproxy"), + statFile: path.Join(internal.GetProcPath(), "/net/stat/synproxy"), } }) } diff --git a/plugins/inputs/temp/temp_linux.go b/plugins/inputs/temp/temp_linux.go index ea373fc4849e3..819796ee9b4fd 100644 --- a/plugins/inputs/temp/temp_linux.go +++ b/plugins/inputs/temp/temp_linux.go @@ -11,6 +11,7 @@ import ( "strings" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" ) const scalingFactor = float64(1000.0) @@ -37,10 +38,7 @@ func (t *Temperature) Init() error { func (t *Temperature) Gather(acc telegraf.Accumulator) error { // Get all sensors and honor the HOST_SYS environment variable - path := os.Getenv("HOST_SYS") - if path == "" { - path = "/sys" - } + path := internal.GetSysPath() // Try to use the hwmon interface temperatures, err := t.gatherHwmon(path) diff --git a/plugins/inputs/wireless/wireless_linux.go b/plugins/inputs/wireless/wireless_linux.go index 11996961bf874..54da833d54423 100644 --- a/plugins/inputs/wireless/wireless_linux.go +++ b/plugins/inputs/wireless/wireless_linux.go @@ -10,15 +10,10 @@ import ( "strings" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/inputs" ) -// default host proc path -const defaultHostProc = "/proc" - -// env host proc variable name -const envProc = "HOST_PROC" - // length of wireless interface fields const interfaceFieldLength = 10 @@ -41,7 +36,9 @@ type wirelessInterface struct { // Gather collects the wireless information. func (w *Wireless) Gather(acc telegraf.Accumulator) error { // load proc path, get default value if config value and env variable are empty - w.loadPath() + if w.HostProc == "" { + w.HostProc = internal.GetProcPath() + } wirelessPath := path.Join(w.HostProc, "net", "wireless") table, err := os.ReadFile(wirelessPath) @@ -117,24 +114,6 @@ func (w *Wireless) loadWirelessTable(table []byte) ([]*wirelessInterface, error) return wi, nil } -// loadPath can be used to read path firstly from config -// if it is empty then try read from env variable -func (w *Wireless) loadPath() { - if w.HostProc == "" { - w.HostProc = proc(envProc, defaultHostProc) - } -} - -// proc can be used to read file paths from env -func proc(env, defaultPath string) string { - // try to read full file path - if p := os.Getenv(env); p != "" { - return p - } - // return default path - return defaultPath -} - func init() { inputs.Add("wireless", func() telegraf.Input { return &Wireless{}