diff --git a/plugins/inputs/radius/radius.go b/plugins/inputs/radius/radius.go index efb71a1df9f2e..396ba8adad1ee 100644 --- a/plugins/inputs/radius/radius.go +++ b/plugins/inputs/radius/radius.go @@ -18,6 +18,9 @@ import ( "github.com/influxdata/telegraf/plugins/inputs" ) +//go:embed sample.conf +var sampleConfig string + type Radius struct { Servers []string `toml:"servers"` Username config.Secret `toml:"username"` @@ -29,9 +32,6 @@ type Radius struct { client radius.Client } -//go:embed sample.conf -var sampleConfig string - func (*Radius) SampleConfig() string { return sampleConfig } diff --git a/plugins/inputs/raindrops/raindrops.go b/plugins/inputs/raindrops/raindrops.go index de2c5a82bc458..dad3752b628d9 100644 --- a/plugins/inputs/raindrops/raindrops.go +++ b/plugins/inputs/raindrops/raindrops.go @@ -21,7 +21,7 @@ import ( var sampleConfig string type Raindrops struct { - Urls []string + Urls []string `toml:"urls"` httpClient *http.Client } diff --git a/plugins/inputs/ras/ras.go b/plugins/inputs/ras/ras.go index 1cf8dda86bfb0..8e3537686ff3a 100644 --- a/plugins/inputs/ras/ras.go +++ b/plugins/inputs/ras/ras.go @@ -22,28 +22,6 @@ import ( //go:embed sample.conf var sampleConfig string -// Ras plugin gathers and counts errors provided by RASDaemon -type Ras struct { - DBPath string `toml:"db_path"` - - Log telegraf.Logger `toml:"-"` - - db *sql.DB - latestTimestamp time.Time - cpuSocketCounters map[int]metricCounters - serverCounters metricCounters -} - -type machineCheckError struct { - ID int - Timestamp string - SocketID int - ErrorMsg string - MciStatusMsg string -} - -type metricCounters map[string]int64 - const ( mceQuery = ` SELECT @@ -72,6 +50,27 @@ const ( unclassifiedMCEBase = "unclassified_mce_errors" ) +type Ras struct { + DBPath string `toml:"db_path"` + + Log telegraf.Logger `toml:"-"` + + db *sql.DB + latestTimestamp time.Time + cpuSocketCounters map[int]metricCounters + serverCounters metricCounters +} + +type machineCheckError struct { + id int + timestamp string + socketID int + errorMsg string + mciStatusMsg string +} + +type metricCounters map[string]int64 + func (*Ras) SampleConfig() string { return sampleConfig } @@ -91,16 +90,6 @@ func (r *Ras) Start(telegraf.Accumulator) error { return nil } -// Stop closes any existing DB connection -func (r *Ras) Stop() { - if r.db != nil { - err := r.db.Close() - if err != nil { - r.Log.Errorf("Error appeared during closing DB (%s): %v", r.DBPath, err) - } - } -} - // Gather reads the stats provided by RASDaemon and writes it to the Accumulator. func (r *Ras) Gather(acc telegraf.Accumulator) error { rows, err := r.db.Query(mceQuery, r.latestTimestamp) @@ -114,7 +103,7 @@ func (r *Ras) Gather(acc telegraf.Accumulator) error { if err != nil { return err } - tsErr := r.updateLatestTimestamp(mcError.Timestamp) + tsErr := r.updateLatestTimestamp(mcError.timestamp) if tsErr != nil { return err } @@ -127,6 +116,16 @@ func (r *Ras) Gather(acc telegraf.Accumulator) error { return nil } +// Stop closes any existing DB connection +func (r *Ras) Stop() { + if r.db != nil { + err := r.db.Close() + if err != nil { + r.Log.Errorf("Error appeared during closing DB (%s): %v", r.DBPath, err) + } + } +} + func (r *Ras) updateLatestTimestamp(timestamp string) error { ts, err := parseDate(timestamp) if err != nil { @@ -140,11 +139,11 @@ func (r *Ras) updateLatestTimestamp(timestamp string) error { } func (r *Ras) updateCounters(mcError *machineCheckError) { - if strings.Contains(mcError.ErrorMsg, "No Error") { + if strings.Contains(mcError.errorMsg, "No Error") { return } - r.initializeCPUMetricDataIfRequired(mcError.SocketID) + r.initializeCPUMetricDataIfRequired(mcError.socketID) r.updateSocketCounters(mcError) r.updateServerCounters(mcError) } @@ -170,11 +169,11 @@ func newMetricCounters() *metricCounters { } func (r *Ras) updateServerCounters(mcError *machineCheckError) { - if strings.Contains(mcError.ErrorMsg, "CACHE Level-2") && strings.Contains(mcError.ErrorMsg, "Error") { + if strings.Contains(mcError.errorMsg, "CACHE Level-2") && strings.Contains(mcError.errorMsg, "Error") { r.serverCounters[levelTwoCache]++ } - if strings.Contains(mcError.ErrorMsg, "UPI:") { + if strings.Contains(mcError.errorMsg, "UPI:") { r.serverCounters[upi]++ } } @@ -210,71 +209,71 @@ func (r *Ras) updateSocketCounters(mcError *machineCheckError) { r.updateMemoryCounters(mcError) r.updateProcessorBaseCounters(mcError) - if strings.Contains(mcError.ErrorMsg, "Instruction TLB") && strings.Contains(mcError.ErrorMsg, "Error") { - r.cpuSocketCounters[mcError.SocketID][instructionTLB]++ + if strings.Contains(mcError.errorMsg, "Instruction TLB") && strings.Contains(mcError.errorMsg, "Error") { + r.cpuSocketCounters[mcError.socketID][instructionTLB]++ } - if strings.Contains(mcError.ErrorMsg, "BUS") && strings.Contains(mcError.ErrorMsg, "Error") { - r.cpuSocketCounters[mcError.SocketID][processorBus]++ + if strings.Contains(mcError.errorMsg, "BUS") && strings.Contains(mcError.errorMsg, "Error") { + r.cpuSocketCounters[mcError.socketID][processorBus]++ } - if (strings.Contains(mcError.ErrorMsg, "CACHE Level-0") || - strings.Contains(mcError.ErrorMsg, "CACHE Level-1")) && - strings.Contains(mcError.ErrorMsg, "Error") { - r.cpuSocketCounters[mcError.SocketID][instructionCache]++ + if (strings.Contains(mcError.errorMsg, "CACHE Level-0") || + strings.Contains(mcError.errorMsg, "CACHE Level-1")) && + strings.Contains(mcError.errorMsg, "Error") { + r.cpuSocketCounters[mcError.socketID][instructionCache]++ } } func (r *Ras) updateProcessorBaseCounters(mcError *machineCheckError) { - if strings.Contains(mcError.ErrorMsg, "Internal Timer error") { - r.cpuSocketCounters[mcError.SocketID][internalTimer]++ - r.cpuSocketCounters[mcError.SocketID][processorBase]++ + if strings.Contains(mcError.errorMsg, "Internal Timer error") { + r.cpuSocketCounters[mcError.socketID][internalTimer]++ + r.cpuSocketCounters[mcError.socketID][processorBase]++ } - if strings.Contains(mcError.ErrorMsg, "SMM Handler Code Access Violation") { - r.cpuSocketCounters[mcError.SocketID][smmHandlerCode]++ - r.cpuSocketCounters[mcError.SocketID][processorBase]++ + if strings.Contains(mcError.errorMsg, "SMM Handler Code Access Violation") { + r.cpuSocketCounters[mcError.socketID][smmHandlerCode]++ + r.cpuSocketCounters[mcError.socketID][processorBase]++ } - if strings.Contains(mcError.ErrorMsg, "Internal parity error") { - r.cpuSocketCounters[mcError.SocketID][internalParity]++ - r.cpuSocketCounters[mcError.SocketID][processorBase]++ + if strings.Contains(mcError.errorMsg, "Internal parity error") { + r.cpuSocketCounters[mcError.socketID][internalParity]++ + r.cpuSocketCounters[mcError.socketID][processorBase]++ } - if strings.Contains(mcError.ErrorMsg, "FRC error") { - r.cpuSocketCounters[mcError.SocketID][frc]++ - r.cpuSocketCounters[mcError.SocketID][processorBase]++ + if strings.Contains(mcError.errorMsg, "FRC error") { + r.cpuSocketCounters[mcError.socketID][frc]++ + r.cpuSocketCounters[mcError.socketID][processorBase]++ } - if strings.Contains(mcError.ErrorMsg, "External error") { - r.cpuSocketCounters[mcError.SocketID][externalMCEBase]++ - r.cpuSocketCounters[mcError.SocketID][processorBase]++ + if strings.Contains(mcError.errorMsg, "External error") { + r.cpuSocketCounters[mcError.socketID][externalMCEBase]++ + r.cpuSocketCounters[mcError.socketID][processorBase]++ } - if strings.Contains(mcError.ErrorMsg, "Microcode ROM parity error") { - r.cpuSocketCounters[mcError.SocketID][microcodeROMParity]++ - r.cpuSocketCounters[mcError.SocketID][processorBase]++ + if strings.Contains(mcError.errorMsg, "Microcode ROM parity error") { + r.cpuSocketCounters[mcError.socketID][microcodeROMParity]++ + r.cpuSocketCounters[mcError.socketID][processorBase]++ } - if strings.Contains(mcError.ErrorMsg, "Unclassified") || strings.Contains(mcError.ErrorMsg, "Internal unclassified") { - r.cpuSocketCounters[mcError.SocketID][unclassifiedMCEBase]++ - r.cpuSocketCounters[mcError.SocketID][processorBase]++ + if strings.Contains(mcError.errorMsg, "Unclassified") || strings.Contains(mcError.errorMsg, "Internal unclassified") { + r.cpuSocketCounters[mcError.socketID][unclassifiedMCEBase]++ + r.cpuSocketCounters[mcError.socketID][processorBase]++ } } func (r *Ras) updateMemoryCounters(mcError *machineCheckError) { - if strings.Contains(mcError.ErrorMsg, "Memory read error") { - if strings.Contains(mcError.MciStatusMsg, "Corrected_error") { - r.cpuSocketCounters[mcError.SocketID][memoryReadCorrected]++ + if strings.Contains(mcError.errorMsg, "Memory read error") { + if strings.Contains(mcError.mciStatusMsg, "Corrected_error") { + r.cpuSocketCounters[mcError.socketID][memoryReadCorrected]++ } else { - r.cpuSocketCounters[mcError.SocketID][memoryReadUncorrected]++ + r.cpuSocketCounters[mcError.socketID][memoryReadUncorrected]++ } } - if strings.Contains(mcError.ErrorMsg, "Memory write error") { - if strings.Contains(mcError.MciStatusMsg, "Corrected_error") { - r.cpuSocketCounters[mcError.SocketID][memoryWriteCorrected]++ + if strings.Contains(mcError.errorMsg, "Memory write error") { + if strings.Contains(mcError.mciStatusMsg, "Corrected_error") { + r.cpuSocketCounters[mcError.socketID][memoryWriteCorrected]++ } else { - r.cpuSocketCounters[mcError.SocketID][memoryWriteUncorrected]++ + r.cpuSocketCounters[mcError.socketID][memoryWriteUncorrected]++ } } } @@ -305,7 +304,7 @@ func addServerMetrics(acc telegraf.Accumulator, counters map[string]int64) { func fetchMachineCheckError(rows *sql.Rows) (*machineCheckError, error) { mcError := &machineCheckError{} - err := rows.Scan(&mcError.ID, &mcError.Timestamp, &mcError.ErrorMsg, &mcError.MciStatusMsg, &mcError.SocketID) + err := rows.Scan(&mcError.id, &mcError.timestamp, &mcError.errorMsg, &mcError.mciStatusMsg, &mcError.socketID) if err != nil { return nil, err diff --git a/plugins/inputs/ras/ras_notlinux.go b/plugins/inputs/ras/ras_notlinux.go index 4c3dec4113829..a73de06ca9faf 100644 --- a/plugins/inputs/ras/ras_notlinux.go +++ b/plugins/inputs/ras/ras_notlinux.go @@ -17,14 +17,18 @@ type Ras struct { Log telegraf.Logger `toml:"-"` } +func (*Ras) SampleConfig() string { return sampleConfig } + func (r *Ras) Init() error { - r.Log.Warn("current platform is not supported") + r.Log.Warn("Current platform is not supported") return nil } -func (*Ras) SampleConfig() string { return sampleConfig } -func (*Ras) Gather(_ telegraf.Accumulator) error { return nil } -func (*Ras) Start(_ telegraf.Accumulator) error { return nil } -func (*Ras) Stop() {} + +func (*Ras) Start(telegraf.Accumulator) error { return nil } + +func (*Ras) Gather(telegraf.Accumulator) error { return nil } + +func (*Ras) Stop() {} func init() { inputs.Add("ras", func() telegraf.Input { diff --git a/plugins/inputs/ras/ras_test.go b/plugins/inputs/ras/ras_test.go index 98c24e733e86e..2c6fcc5882c3f 100644 --- a/plugins/inputs/ras/ras_test.go +++ b/plugins/inputs/ras/ras_test.go @@ -38,26 +38,26 @@ func TestUpdateLatestTimestamp(t *testing.T) { ts := "2020-08-01 15:13:27 +0200" testData = append(testData, []machineCheckError{ { - Timestamp: "2019-05-20 08:25:55 +0200", - SocketID: 0, - ErrorMsg: "", - MciStatusMsg: "", + timestamp: "2019-05-20 08:25:55 +0200", + socketID: 0, + errorMsg: "", + mciStatusMsg: "", }, { - Timestamp: "2018-02-21 12:27:22 +0200", - SocketID: 0, - ErrorMsg: "", - MciStatusMsg: "", + timestamp: "2018-02-21 12:27:22 +0200", + socketID: 0, + errorMsg: "", + mciStatusMsg: "", }, { - Timestamp: ts, - SocketID: 0, - ErrorMsg: "", - MciStatusMsg: "", + timestamp: ts, + socketID: 0, + errorMsg: "", + mciStatusMsg: "", }, }...) for _, mce := range testData { - err := ras.updateLatestTimestamp(mce.Timestamp) + err := ras.updateLatestTimestamp(mce.timestamp) require.NoError(t, err) } require.Equal(t, ts, ras.latestTimestamp.Format(dateLayout)) @@ -69,28 +69,28 @@ func TestMultipleSockets(t *testing.T) { overflow := "Error_overflow Corrected_error" testData = []machineCheckError{ { - Timestamp: "2019-05-20 08:25:55 +0200", - SocketID: 0, - ErrorMsg: cacheL2, - MciStatusMsg: overflow, + timestamp: "2019-05-20 08:25:55 +0200", + socketID: 0, + errorMsg: cacheL2, + mciStatusMsg: overflow, }, { - Timestamp: "2018-02-21 12:27:22 +0200", - SocketID: 1, - ErrorMsg: cacheL2, - MciStatusMsg: overflow, + timestamp: "2018-02-21 12:27:22 +0200", + socketID: 1, + errorMsg: cacheL2, + mciStatusMsg: overflow, }, { - Timestamp: "2020-03-21 14:17:28 +0200", - SocketID: 2, - ErrorMsg: cacheL2, - MciStatusMsg: overflow, + timestamp: "2020-03-21 14:17:28 +0200", + socketID: 2, + errorMsg: cacheL2, + mciStatusMsg: overflow, }, { - Timestamp: "2020-03-21 17:24:18 +0200", - SocketID: 3, - ErrorMsg: cacheL2, - MciStatusMsg: overflow, + timestamp: "2020-03-21 17:24:18 +0200", + socketID: 3, + errorMsg: cacheL2, + mciStatusMsg: overflow, }, } for i := range testData { @@ -150,105 +150,105 @@ func newRas() *Ras { var testData = []machineCheckError{ { - Timestamp: "2020-05-20 07:34:53 +0200", - SocketID: 0, - ErrorMsg: "MEMORY CONTROLLER RD_CHANNEL0_ERR Transaction: Memory read error", - MciStatusMsg: "Error_overflow Corrected_error", + timestamp: "2020-05-20 07:34:53 +0200", + socketID: 0, + errorMsg: "MEMORY CONTROLLER RD_CHANNEL0_ERR Transaction: Memory read error", + mciStatusMsg: "Error_overflow Corrected_error", }, { - Timestamp: "2020-05-20 07:35:11 +0200", - SocketID: 0, - ErrorMsg: "MEMORY CONTROLLER RD_CHANNEL0_ERR Transaction: Memory read error", - MciStatusMsg: "Uncorrected_error", + timestamp: "2020-05-20 07:35:11 +0200", + socketID: 0, + errorMsg: "MEMORY CONTROLLER RD_CHANNEL0_ERR Transaction: Memory read error", + mciStatusMsg: "Uncorrected_error", }, { - Timestamp: "2020-05-20 07:37:50 +0200", - SocketID: 0, - ErrorMsg: "MEMORY CONTROLLER RD_CHANNEL2_ERR Transaction: Memory write error", - MciStatusMsg: "Uncorrected_error", + timestamp: "2020-05-20 07:37:50 +0200", + socketID: 0, + errorMsg: "MEMORY CONTROLLER RD_CHANNEL2_ERR Transaction: Memory write error", + mciStatusMsg: "Uncorrected_error", }, { - Timestamp: "2020-05-20 08:14:51 +0200", - SocketID: 0, - ErrorMsg: "MEMORY CONTROLLER WR_CHANNEL2_ERR Transaction: Memory write error", - MciStatusMsg: "Error_overflow Corrected_error", + timestamp: "2020-05-20 08:14:51 +0200", + socketID: 0, + errorMsg: "MEMORY CONTROLLER WR_CHANNEL2_ERR Transaction: Memory write error", + mciStatusMsg: "Error_overflow Corrected_error", }, { - Timestamp: "2020-05-20 08:15:31 +0200", - SocketID: 0, - ErrorMsg: "corrected filtering (some unreported errors in same region) Instruction CACHE Level-0 Read Error", - MciStatusMsg: "Error_overflow Corrected_error", + timestamp: "2020-05-20 08:15:31 +0200", + socketID: 0, + errorMsg: "corrected filtering (some unreported errors in same region) Instruction CACHE Level-0 Read Error", + mciStatusMsg: "Error_overflow Corrected_error", }, { - Timestamp: "2020-05-20 08:16:32 +0200", - SocketID: 0, - ErrorMsg: "Instruction TLB Level-0 Error", - MciStatusMsg: "Error_overflow Corrected_error", + timestamp: "2020-05-20 08:16:32 +0200", + socketID: 0, + errorMsg: "Instruction TLB Level-0 Error", + mciStatusMsg: "Error_overflow Corrected_error", }, { - Timestamp: "2020-05-20 08:16:56 +0200", - SocketID: 0, - ErrorMsg: "No Error", - MciStatusMsg: "Error_overflow Corrected_error", + timestamp: "2020-05-20 08:16:56 +0200", + socketID: 0, + errorMsg: "No Error", + mciStatusMsg: "Error_overflow Corrected_error", }, { - Timestamp: "2020-05-20 08:17:24 +0200", - SocketID: 0, - ErrorMsg: "Unclassified", - MciStatusMsg: "Error_overflow Corrected_error", + timestamp: "2020-05-20 08:17:24 +0200", + socketID: 0, + errorMsg: "Unclassified", + mciStatusMsg: "Error_overflow Corrected_error", }, { - Timestamp: "2020-05-20 08:17:41 +0200", - SocketID: 0, - ErrorMsg: "Microcode ROM parity error", - MciStatusMsg: "Error_overflow Corrected_error", + timestamp: "2020-05-20 08:17:41 +0200", + socketID: 0, + errorMsg: "Microcode ROM parity error", + mciStatusMsg: "Error_overflow Corrected_error", }, { - Timestamp: "2020-05-20 08:17:48 +0200", - SocketID: 0, - ErrorMsg: "FRC error", - MciStatusMsg: "Error_overflow Corrected_error", + timestamp: "2020-05-20 08:17:48 +0200", + socketID: 0, + errorMsg: "FRC error", + mciStatusMsg: "Error_overflow Corrected_error", }, { - Timestamp: "2020-05-20 08:18:18 +0200", - SocketID: 0, - ErrorMsg: "Internal parity error", - MciStatusMsg: "Error_overflow Corrected_error", + timestamp: "2020-05-20 08:18:18 +0200", + socketID: 0, + errorMsg: "Internal parity error", + mciStatusMsg: "Error_overflow Corrected_error", }, { - Timestamp: "2020-05-20 08:18:34 +0200", - SocketID: 0, - ErrorMsg: "SMM Handler Code Access Violation", - MciStatusMsg: "Error_overflow Corrected_error", + timestamp: "2020-05-20 08:18:34 +0200", + socketID: 0, + errorMsg: "SMM Handler Code Access Violation", + mciStatusMsg: "Error_overflow Corrected_error", }, { - Timestamp: "2020-05-20 08:18:54 +0200", - SocketID: 0, - ErrorMsg: "Internal Timer error", - MciStatusMsg: "Error_overflow Corrected_error", + timestamp: "2020-05-20 08:18:54 +0200", + socketID: 0, + errorMsg: "Internal Timer error", + mciStatusMsg: "Error_overflow Corrected_error", }, { - Timestamp: "2020-05-20 08:21:23 +0200", - SocketID: 0, - ErrorMsg: "BUS Level-3 Generic Generic IO Request-did-not-timeout Error", - MciStatusMsg: "Error_overflow Corrected_error", + timestamp: "2020-05-20 08:21:23 +0200", + socketID: 0, + errorMsg: "BUS Level-3 Generic Generic IO Request-did-not-timeout Error", + mciStatusMsg: "Error_overflow Corrected_error", }, { - Timestamp: "2020-05-20 08:23:23 +0200", - SocketID: 0, - ErrorMsg: "External error", - MciStatusMsg: "Error_overflow Corrected_error", + timestamp: "2020-05-20 08:23:23 +0200", + socketID: 0, + errorMsg: "External error", + mciStatusMsg: "Error_overflow Corrected_error", }, { - Timestamp: "2020-05-20 08:25:31 +0200", - SocketID: 0, - ErrorMsg: "UPI: COR LL Rx detected CRC error - successful LLR without Phy Reinit", - MciStatusMsg: "Error_overflow Corrected_error", + timestamp: "2020-05-20 08:25:31 +0200", + socketID: 0, + errorMsg: "UPI: COR LL Rx detected CRC error - successful LLR without Phy Reinit", + mciStatusMsg: "Error_overflow Corrected_error", }, { - Timestamp: "2020-05-20 08:25:55 +0200", - SocketID: 0, - ErrorMsg: "Instruction CACHE Level-2 Generic Error", - MciStatusMsg: "Error_overflow Corrected_error", + timestamp: "2020-05-20 08:25:55 +0200", + socketID: 0, + errorMsg: "Instruction CACHE Level-2 Generic Error", + mciStatusMsg: "Error_overflow Corrected_error", }, } diff --git a/plugins/inputs/ravendb/ravendb.go b/plugins/inputs/ravendb/ravendb.go index b540dfb2aa3aa..f19fa5ec7e90e 100644 --- a/plugins/inputs/ravendb/ravendb.go +++ b/plugins/inputs/ravendb/ravendb.go @@ -21,14 +21,11 @@ import ( //go:embed sample.conf var sampleConfig string -// defaultURL will set a default value that corresponds to the default value -// used by RavenDB -const defaultURL = "http://localhost:8080" - -const defaultTimeout = 5 +const ( + defaultURL = "http://localhost:8080" + defaultTimeout = 5 +) -// RavenDB defines the configuration necessary for gathering metrics, -// see the sample config for further details type RavenDB struct { URL string `toml:"url"` Name string `toml:"name"` @@ -55,6 +52,30 @@ func (*RavenDB) SampleConfig() string { return sampleConfig } +func (r *RavenDB) Init() error { + if r.URL == "" { + r.URL = defaultURL + } + + r.requestURLServer = r.URL + "/admin/monitoring/v1/server" + r.requestURLDatabases = r.URL + "/admin/monitoring/v1/databases" + prepareDBNamesURLPart(r.DbStatsDbs) + r.requestURLIndexes = r.URL + "/admin/monitoring/v1/indexes" + prepareDBNamesURLPart(r.IndexStatsDbs) + r.requestURLCollection = r.URL + "/admin/monitoring/v1/collections" + prepareDBNamesURLPart(r.IndexStatsDbs) + + err := choice.CheckSlice(r.StatsInclude, []string{"server", "databases", "indexes", "collections"}) + if err != nil { + return err + } + + err = r.ensureClient() + if nil != err { + r.Log.Errorf("Error with Client %s", err) + return err + } + + return nil +} + func (r *RavenDB) Gather(acc telegraf.Accumulator) error { var wg sync.WaitGroup @@ -363,30 +384,6 @@ func prepareDBNamesURLPart(dbNames []string) string { return result } -func (r *RavenDB) Init() error { - if r.URL == "" { - r.URL = defaultURL - } - - r.requestURLServer = r.URL + "/admin/monitoring/v1/server" - r.requestURLDatabases = r.URL + "/admin/monitoring/v1/databases" + prepareDBNamesURLPart(r.DbStatsDbs) - r.requestURLIndexes = r.URL + "/admin/monitoring/v1/indexes" + prepareDBNamesURLPart(r.IndexStatsDbs) - r.requestURLCollection = r.URL + "/admin/monitoring/v1/collections" + prepareDBNamesURLPart(r.IndexStatsDbs) - - err := choice.CheckSlice(r.StatsInclude, []string{"server", "databases", "indexes", "collections"}) - if err != nil { - return err - } - - err = r.ensureClient() - if nil != err { - r.Log.Errorf("Error with Client %s", err) - return err - } - - return nil -} - func init() { inputs.Add("ravendb", func() telegraf.Input { return &RavenDB{ diff --git a/plugins/inputs/redfish/redfish.go b/plugins/inputs/redfish/redfish.go index 5db51bfcafe26..7afa853b9bd7a 100644 --- a/plugins/inputs/redfish/redfish.go +++ b/plugins/inputs/redfish/redfish.go @@ -24,6 +24,12 @@ import ( //go:embed sample.conf var sampleConfig string +const ( + // tag sets used for including redfish OData link parent data + tagSetChassisLocation = "chassis.location" + tagSetChassis = "chassis" +) + type Redfish struct { Address string `toml:"address"` Username config.Secret `toml:"username"` @@ -40,12 +46,6 @@ type Redfish struct { baseURL *url.URL } -const ( - // tag sets used for including redfish OData link parent data - tagSetChassisLocation = "chassis.location" - tagSetChassis = "chassis" -) - type system struct { Hostname string `json:"hostname"` Links struct { @@ -215,6 +215,41 @@ func (r *Redfish) Init() error { return nil } +func (r *Redfish) Gather(acc telegraf.Accumulator) error { + address, _, err := net.SplitHostPort(r.baseURL.Host) + if err != nil { + address = r.baseURL.Host + } + + system, err := r.getComputerSystem(r.ComputerSystemID) + if err != nil { + return err + } + + for _, link := range system.Links.Chassis { + chassis, err := r.getChassis(link.Ref) + if err != nil { + return err + } + + for _, metric := range r.IncludeMetrics { + var err error + switch metric { + case "thermal": + err = r.gatherThermal(acc, address, system, chassis) + case "power": + err = r.gatherPower(acc, address, system, chassis) + default: + return fmt.Errorf("unknown metric requested: %s", metric) + } + if err != nil { + return err + } + } + } + return nil +} + func (r *Redfish) getData(address string, payload interface{}) error { req, err := http.NewRequest("GET", address, nil) if err != nil { @@ -323,41 +358,6 @@ func setChassisTags(chassis *chassis, tags map[string]string) { tags["chassis_health"] = chassis.Status.Health } -func (r *Redfish) Gather(acc telegraf.Accumulator) error { - address, _, err := net.SplitHostPort(r.baseURL.Host) - if err != nil { - address = r.baseURL.Host - } - - system, err := r.getComputerSystem(r.ComputerSystemID) - if err != nil { - return err - } - - for _, link := range system.Links.Chassis { - chassis, err := r.getChassis(link.Ref) - if err != nil { - return err - } - - for _, metric := range r.IncludeMetrics { - var err error - switch metric { - case "thermal": - err = r.gatherThermal(acc, address, system, chassis) - case "power": - err = r.gatherPower(acc, address, system, chassis) - default: - return fmt.Errorf("unknown metric requested: %s", metric) - } - if err != nil { - return err - } - } - } - return nil -} - func (r *Redfish) gatherThermal(acc telegraf.Accumulator, address string, system *system, chassis *chassis) error { thermal, err := r.getThermal(chassis.Thermal.Ref) if err != nil { diff --git a/plugins/inputs/redis/redis.go b/plugins/inputs/redis/redis.go index e0a19f3f18760..8687cc409fde9 100644 --- a/plugins/inputs/redis/redis.go +++ b/plugins/inputs/redis/redis.go @@ -25,14 +25,17 @@ import ( //go:embed sample.conf var sampleConfig string -type RedisCommand struct { - Command []interface{} `toml:"command"` - Field string `toml:"field"` - Type string `toml:"type"` -} +var ( + replicationSlaveMetricPrefix = regexp.MustCompile(`^slave\d+`) + tracking = map[string]string{ + "uptime_in_seconds": "uptime", + "connected_clients": "clients", + "role": "replication_role", + } +) type Redis struct { - Commands []*RedisCommand `toml:"commands"` + Commands []*redisCommand `toml:"commands"` Servers []string `toml:"servers"` Username string `toml:"username"` Password string `toml:"password"` @@ -41,24 +44,23 @@ type Redis struct { Log telegraf.Logger `toml:"-"` - clients []Client + clients []client connected bool } -type Client interface { - Do(returnType string, args ...interface{}) (interface{}, error) - Info() *redis.StringCmd - BaseTags() map[string]string - Close() error +type redisCommand struct { + Command []interface{} `toml:"command"` + Field string `toml:"field"` + Type string `toml:"type"` } -type RedisClient struct { +type redisClient struct { client *redis.Client tags map[string]string } -// RedisFieldTypes defines the types expected for each of the fields redis reports on -type RedisFieldTypes struct { +// redisFieldTypes defines the types expected for each of the fields redis reports on +type redisFieldTypes struct { ActiveDefragHits int64 `json:"active_defrag_hits"` ActiveDefragKeyHits int64 `json:"active_defrag_key_hits"` ActiveDefragKeyMisses int64 `json:"active_defrag_key_misses"` @@ -168,57 +170,62 @@ type RedisFieldTypes struct { UsedMemoryStartup int64 `json:"used_memory_startup"` } -func (r *RedisClient) Do(returnType string, args ...interface{}) (interface{}, error) { - rawVal := r.client.Do(context.Background(), args...) - - switch returnType { - case "integer": - return rawVal.Int64() - case "string": - return rawVal.Text() - case "float": - return rawVal.Float64() - default: - return rawVal.Text() - } +type client interface { + do(returnType string, args ...interface{}) (interface{}, error) + info() *redis.StringCmd + baseTags() map[string]string + close() error } -func (r *RedisClient) Info() *redis.StringCmd { - return r.client.Info(context.Background(), "ALL") +func (*Redis) SampleConfig() string { + return sampleConfig } -func (r *RedisClient) BaseTags() map[string]string { - tags := make(map[string]string) - for k, v := range r.tags { - tags[k] = v +func (r *Redis) Init() error { + for _, command := range r.Commands { + if command.Type != "string" && command.Type != "integer" && command.Type != "float" { + return fmt.Errorf(`unknown result type: expected one of "string", "integer", "float"; got %q`, command.Type) + } } - return tags + + return nil } -func (r *RedisClient) Close() error { - return r.client.Close() +func (*Redis) Start(telegraf.Accumulator) error { + return nil } -var replicationSlaveMetricPrefix = regexp.MustCompile(`^slave\d+`) +func (r *Redis) Gather(acc telegraf.Accumulator) error { + if !r.connected { + err := r.connect() + if err != nil { + return err + } + } -var Tracking = map[string]string{ - "uptime_in_seconds": "uptime", - "connected_clients": "clients", - "role": "replication_role", -} + var wg sync.WaitGroup -func (*Redis) SampleConfig() string { - return sampleConfig + for _, cl := range r.clients { + wg.Add(1) + go func(client client) { + defer wg.Done() + acc.AddError(gatherServer(client, acc)) + acc.AddError(r.gatherCommandValues(client, acc)) + }(cl) + } + + wg.Wait() + return nil } -func (r *Redis) Init() error { - for _, command := range r.Commands { - if command.Type != "string" && command.Type != "integer" && command.Type != "float" { - return fmt.Errorf(`unknown result type: expected one of "string", "integer", "float"; got %q`, command.Type) +// Stop close the client through ServiceInput interface Start/Stop methods impl. +func (r *Redis) Stop() { + for _, c := range r.clients { + err := c.close() + if err != nil { + r.Log.Errorf("error closing client: %v", err) } } - - return nil } func (r *Redis) connect() error { @@ -230,7 +237,7 @@ func (r *Redis) connect() error { r.Servers = []string{"tcp://localhost:6379"} } - r.clients = make([]Client, 0, len(r.Servers)) + r.clients = make([]client, 0, len(r.Servers)) for _, serv := range r.Servers { if !strings.HasPrefix(serv, "tcp://") && !strings.HasPrefix(serv, "unix://") { r.Log.Warn("Server URL found without scheme; please update your configuration file") @@ -289,7 +296,7 @@ func (r *Redis) connect() error { tags["port"] = u.Port() } - r.clients = append(r.clients, &RedisClient{ + r.clients = append(r.clients, &redisClient{ client: client, tags: tags, }) @@ -299,35 +306,10 @@ func (r *Redis) connect() error { return nil } -// Reads stats from all configured servers accumulates stats. -// Returns one of the errors encountered while gather stats (if any). -func (r *Redis) Gather(acc telegraf.Accumulator) error { - if !r.connected { - err := r.connect() - if err != nil { - return err - } - } - - var wg sync.WaitGroup - - for _, client := range r.clients { - wg.Add(1) - go func(client Client) { - defer wg.Done() - acc.AddError(gatherServer(client, acc)) - acc.AddError(r.gatherCommandValues(client, acc)) - }(client) - } - - wg.Wait() - return nil -} - -func (r *Redis) gatherCommandValues(client Client, acc telegraf.Accumulator) error { +func (r *Redis) gatherCommandValues(client client, acc telegraf.Accumulator) error { fields := make(map[string]interface{}) for _, command := range r.Commands { - val, err := client.Do(command.Type, command.Command...) + val, err := client.do(command.Type, command.Command...) if err != nil { if strings.Contains(err.Error(), "unexpected type=") { return fmt.Errorf("could not get command result: %w", err) @@ -339,27 +321,53 @@ func (r *Redis) gatherCommandValues(client Client, acc telegraf.Accumulator) err fields[command.Field] = val } - acc.AddFields("redis_commands", fields, client.BaseTags()) + acc.AddFields("redis_commands", fields, client.baseTags()) return nil } -func gatherServer(client Client, acc telegraf.Accumulator) error { - info, err := client.Info().Result() +func (r *redisClient) do(returnType string, args ...interface{}) (interface{}, error) { + rawVal := r.client.Do(context.Background(), args...) + + switch returnType { + case "integer": + return rawVal.Int64() + case "string": + return rawVal.Text() + case "float": + return rawVal.Float64() + default: + return rawVal.Text() + } +} + +func (r *redisClient) info() *redis.StringCmd { + return r.client.Info(context.Background(), "ALL") +} + +func (r *redisClient) baseTags() map[string]string { + tags := make(map[string]string) + for k, v := range r.tags { + tags[k] = v + } + return tags +} + +func (r *redisClient) close() error { + return r.client.Close() +} + +func gatherServer(client client, acc telegraf.Accumulator) error { + info, err := client.info().Result() if err != nil { return err } rdr := strings.NewReader(info) - return gatherInfoOutput(rdr, acc, client.BaseTags()) + return gatherInfoOutput(rdr, acc, client.baseTags()) } -// gatherInfoOutput gathers -func gatherInfoOutput( - rdr io.Reader, - acc telegraf.Accumulator, - tags map[string]string, -) error { +func gatherInfoOutput(rdr io.Reader, acc telegraf.Accumulator, tags map[string]string) error { var section string var keyspaceHits, keyspaceMisses int64 @@ -403,7 +411,7 @@ func gatherInfoOutput( continue } - metric, ok := Tracking[name] + metric, ok := tracking[name] if !ok { if section == "Keyspace" { kline := strings.TrimSpace(parts[1]) @@ -412,12 +420,12 @@ func gatherInfoOutput( } if section == "Commandstats" { kline := strings.TrimSpace(parts[1]) - gatherCommandstateLine(name, kline, acc, tags) + gatherCommandStateLine(name, kline, acc, tags) continue } if section == "Latencystats" { kline := strings.TrimSpace(parts[1]) - gatherLatencystatsLine(name, kline, acc, tags) + gatherLatencyStatsLine(name, kline, acc, tags) continue } if section == "Replication" && replicationSlaveMetricPrefix.MatchString(name) { @@ -427,7 +435,7 @@ func gatherInfoOutput( } if section == "Errorstats" { kline := strings.TrimSpace(parts[1]) - gatherErrorstatsLine(name, kline, acc, tags) + gatherErrorStatsLine(name, kline, acc, tags) continue } @@ -475,7 +483,7 @@ func gatherInfoOutput( } fields["keyspace_hitrate"] = keyspaceHitrate - o := RedisFieldTypes{} + o := redisFieldTypes{} setStructFieldsFromObject(fields, &o) setExistingFieldsFromStruct(fields, &o) @@ -516,7 +524,7 @@ func gatherKeyspaceLine(name, line string, acc telegraf.Accumulator, globalTags // cmdstat_publish:calls=33791,usec=208789,usec_per_call=6.18 // // Tag: command=publish; Fields: calls=33791i,usec=208789i,usec_per_call=6.18 -func gatherCommandstateLine(name, line string, acc telegraf.Accumulator, globalTags map[string]string) { +func gatherCommandStateLine(name, line string, acc telegraf.Accumulator, globalTags map[string]string) { if !strings.HasPrefix(name, "cmdstat") { return } @@ -558,7 +566,7 @@ func gatherCommandstateLine(name, line string, acc telegraf.Accumulator, globalT // latency_percentiles_usec_zadd:p50=9.023,p99=28.031,p99.9=43.007 // // Tag: command=zadd; Fields: p50=9.023,p99=28.031,p99.9=43.007 -func gatherLatencystatsLine(name, line string, acc telegraf.Accumulator, globalTags map[string]string) { +func gatherLatencyStatsLine(name, line string, acc telegraf.Accumulator, globalTags map[string]string) { if !strings.HasPrefix(name, "latency_percentiles_usec") { return } @@ -633,7 +641,7 @@ func gatherReplicationLine(name, line string, acc telegraf.Accumulator, globalTa // // errorstat_ERR:count=37 // errorstat_MOVED:count=3626 -func gatherErrorstatsLine(name, line string, acc telegraf.Accumulator, globalTags map[string]string) { +func gatherErrorStatsLine(name, line string, acc telegraf.Accumulator, globalTags map[string]string) { tags := make(map[string]string, len(globalTags)+1) for k, v := range globalTags { tags[k] = v @@ -654,13 +662,7 @@ func gatherErrorstatsLine(name, line string, acc telegraf.Accumulator, globalTag acc.AddFields("redis_errorstat", fields, tags) } -func init() { - inputs.Add("redis", func() telegraf.Input { - return &Redis{} - }) -} - -func setExistingFieldsFromStruct(fields map[string]interface{}, o *RedisFieldTypes) { +func setExistingFieldsFromStruct(fields map[string]interface{}, o *redisFieldTypes) { val := reflect.ValueOf(o).Elem() typ := val.Type() @@ -678,7 +680,7 @@ func setExistingFieldsFromStruct(fields map[string]interface{}, o *RedisFieldTyp } } -func setStructFieldsFromObject(fields map[string]interface{}, o *RedisFieldTypes) { +func setStructFieldsFromObject(fields map[string]interface{}, o *redisFieldTypes) { val := reflect.ValueOf(o).Elem() typ := val.Type() @@ -774,16 +776,8 @@ func coerceType(value interface{}, typ reflect.Type) reflect.Value { return reflect.ValueOf(value) } -func (*Redis) Start(telegraf.Accumulator) error { - return nil -} - -// Stop close the client through ServiceInput interface Start/Stop methods impl. -func (r *Redis) Stop() { - for _, c := range r.clients { - err := c.Close() - if err != nil { - r.Log.Errorf("error closing client: %v", err) - } - } +func init() { + inputs.Add("redis", func() telegraf.Input { + return &Redis{} + }) } diff --git a/plugins/inputs/redis/redis_test.go b/plugins/inputs/redis/redis_test.go index f8f0d5b540f4d..b103c34a4c286 100644 --- a/plugins/inputs/redis/redis_test.go +++ b/plugins/inputs/redis/redis_test.go @@ -2,6 +2,7 @@ package redis import ( "bufio" + "fmt" "strings" "testing" @@ -17,19 +18,19 @@ import ( type testClient struct{} -func (*testClient) BaseTags() map[string]string { +func (*testClient) baseTags() map[string]string { return map[string]string{"host": "redis.net"} } -func (*testClient) Info() *redis.StringCmd { +func (*testClient) info() *redis.StringCmd { return nil } -func (*testClient) Do(string, ...interface{}) (interface{}, error) { +func (*testClient) do(string, ...interface{}) (interface{}, error) { return 2, nil } -func (*testClient) Close() error { +func (*testClient) close() error { return nil } @@ -67,15 +68,15 @@ func TestRedis_Commands(t *testing.T) { tc := &testClient{} - rc := &RedisCommand{ + rc := &redisCommand{ Command: []interface{}{"llen", "test-list"}, Field: redisListKey, Type: "integer", } r := &Redis{ - Commands: []*RedisCommand{rc}, - clients: []Client{tc}, + Commands: []*redisCommand{rc}, + clients: []client{tc}, } err := r.gatherCommandValues(tc, &acc) @@ -389,17 +390,17 @@ func TestRedis_GatherErrorstatsLine(t *testing.T) { var acc testutil.Accumulator globalTags := map[string]string{} - gatherErrorstatsLine("FOO", "BAR", &acc, globalTags) + gatherErrorStatsLine("FOO", "BAR", &acc, globalTags) require.Len(t, acc.Errors, 1) require.Equal(t, "invalid line for \"FOO\": BAR", acc.Errors[0].Error()) acc = testutil.Accumulator{} - gatherErrorstatsLine("FOO", "BAR=a", &acc, globalTags) + gatherErrorStatsLine("FOO", "BAR=a", &acc, globalTags) require.Len(t, acc.Errors, 1) require.Equal(t, "parsing value in line \"BAR=a\" failed: strconv.ParseInt: parsing \"a\": invalid syntax", acc.Errors[0].Error()) acc = testutil.Accumulator{} - gatherErrorstatsLine("FOO", "BAR=77", &acc, globalTags) + gatherErrorStatsLine("FOO", "BAR=77", &acc, globalTags) require.Empty(t, acc.Errors) } diff --git a/plugins/inputs/redis_sentinel/redis_sentinel.go b/plugins/inputs/redis_sentinel/redis_sentinel.go index cc6a64f7ca0a3..f9ec064079194 100644 --- a/plugins/inputs/redis_sentinel/redis_sentinel.go +++ b/plugins/inputs/redis_sentinel/redis_sentinel.go @@ -22,29 +22,25 @@ import ( //go:embed sample.conf var sampleConfig string +const ( + measurementMasters = "redis_sentinel_masters" + measurementSentinel = "redis_sentinel" + measurementSentinels = "redis_sentinel_sentinels" + measurementReplicas = "redis_sentinel_replicas" +) + type RedisSentinel struct { Servers []string `toml:"servers"` tls.ClientConfig - clients []*RedisSentinelClient + clients []*redisSentinelClient } -type RedisSentinelClient struct { +type redisSentinelClient struct { sentinel *redis.SentinelClient tags map[string]string } -const measurementMasters = "redis_sentinel_masters" -const measurementSentinel = "redis_sentinel" -const measurementSentinels = "redis_sentinel_sentinels" -const measurementReplicas = "redis_sentinel_replicas" - -func init() { - inputs.Add("redis_sentinel", func() telegraf.Input { - return &RedisSentinel{} - }) -} - func (*RedisSentinel) SampleConfig() string { return sampleConfig } @@ -59,7 +55,7 @@ func (r *RedisSentinel) Init() error { return err } - r.clients = make([]*RedisSentinelClient, 0, len(r.Servers)) + r.clients = make([]*redisSentinelClient, 0, len(r.Servers)) for _, serv := range r.Servers { u, err := url.Parse(serv) if err != nil { @@ -101,7 +97,7 @@ func (r *RedisSentinel) Init() error { }, ) - r.clients = append(r.clients, &RedisSentinelClient{ + r.clients = append(r.clients, &redisSentinelClient{ sentinel: sentinel, tags: tags, }) @@ -110,6 +106,32 @@ func (r *RedisSentinel) Init() error { return nil } +func (r *RedisSentinel) Gather(acc telegraf.Accumulator) error { + var wg sync.WaitGroup + + for _, client := range r.clients { + wg.Add(1) + + go func(acc telegraf.Accumulator, client *redisSentinelClient) { + defer wg.Done() + + masters, err := client.gatherMasterStats(acc) + acc.AddError(err) + + for _, master := range masters { + acc.AddError(client.gatherReplicaStats(acc, master)) + acc.AddError(client.gatherSentinelStats(acc, master)) + } + + acc.AddError(client.gatherInfoStats(acc)) + }(acc, client) + } + + wg.Wait() + + return nil +} + // Redis list format has string key/values adjacent, so convert to a map for easier use func toMap(vals []interface{}) map[string]string { m := make(map[string]string) @@ -170,35 +192,7 @@ func prepareFieldValues(fields map[string]string, typeMap map[string]configField return preparedFields, nil } -// Reads stats from all configured servers accumulates stats. -// Returns one of the errors encountered while gather stats (if any). -func (r *RedisSentinel) Gather(acc telegraf.Accumulator) error { - var wg sync.WaitGroup - - for _, client := range r.clients { - wg.Add(1) - - go func(acc telegraf.Accumulator, client *RedisSentinelClient) { - defer wg.Done() - - masters, err := client.gatherMasterStats(acc) - acc.AddError(err) - - for _, master := range masters { - acc.AddError(client.gatherReplicaStats(acc, master)) - acc.AddError(client.gatherSentinelStats(acc, master)) - } - - acc.AddError(client.gatherInfoStats(acc)) - }(acc, client) - } - - wg.Wait() - - return nil -} - -func (client *RedisSentinelClient) gatherInfoStats(acc telegraf.Accumulator) error { +func (client *redisSentinelClient) gatherInfoStats(acc telegraf.Accumulator) error { infoCmd := redis.NewStringCmd("info", "all") if err := client.sentinel.Process(infoCmd); err != nil { return err @@ -220,7 +214,7 @@ func (client *RedisSentinelClient) gatherInfoStats(acc telegraf.Accumulator) err return nil } -func (client *RedisSentinelClient) gatherMasterStats(acc telegraf.Accumulator) ([]string, error) { +func (client *redisSentinelClient) gatherMasterStats(acc telegraf.Accumulator) ([]string, error) { mastersCmd := redis.NewSliceCmd("sentinel", "masters") if err := client.sentinel.Process(mastersCmd); err != nil { return nil, err @@ -262,7 +256,7 @@ func (client *RedisSentinelClient) gatherMasterStats(acc telegraf.Accumulator) ( return masterNames, nil } -func (client *RedisSentinelClient) gatherReplicaStats(acc telegraf.Accumulator, masterName string) error { +func (client *redisSentinelClient) gatherReplicaStats(acc telegraf.Accumulator, masterName string) error { replicasCmd := redis.NewSliceCmd("sentinel", "replicas", masterName) if err := client.sentinel.Process(replicasCmd); err != nil { return err @@ -294,7 +288,7 @@ func (client *RedisSentinelClient) gatherReplicaStats(acc telegraf.Accumulator, return nil } -func (client *RedisSentinelClient) gatherSentinelStats(acc telegraf.Accumulator, masterName string) error { +func (client *redisSentinelClient) gatherSentinelStats(acc telegraf.Accumulator, masterName string) error { sentinelsCmd := redis.NewSliceCmd("sentinel", "sentinels", masterName) if err := client.sentinel.Process(sentinelsCmd); err != nil { return err @@ -384,10 +378,7 @@ func convertSentinelReplicaOutput( // convertSentinelInfoOutput parses `INFO` command output // Largely copied from the Redis input plugin's gatherInfoOutput() -func convertSentinelInfoOutput( - globalTags map[string]string, - rdr io.Reader, -) (map[string]string, map[string]interface{}, error) { +func convertSentinelInfoOutput(globalTags map[string]string, rdr io.Reader) (map[string]string, map[string]interface{}, error) { scanner := bufio.NewScanner(rdr) rawFields := make(map[string]string) @@ -437,3 +428,9 @@ func convertSentinelInfoOutput( return tags, fields, nil } + +func init() { + inputs.Add("redis_sentinel", func() telegraf.Input { + return &RedisSentinel{} + }) +} diff --git a/plugins/inputs/rethinkdb/rethinkdb.go b/plugins/inputs/rethinkdb/rethinkdb.go index 2daf19312b4a2..ee1a2bfdf0101 100644 --- a/plugins/inputs/rethinkdb/rethinkdb.go +++ b/plugins/inputs/rethinkdb/rethinkdb.go @@ -16,18 +16,16 @@ import ( //go:embed sample.conf var sampleConfig string +var localhost = &server{url: &url.URL{Host: "127.0.0.1:28015"}} + type RethinkDB struct { - Servers []string + Servers []string `toml:"servers"` } -var localhost = &Server{URL: &url.URL{Host: "127.0.0.1:28015"}} - func (*RethinkDB) SampleConfig() string { return sampleConfig } -// Reads stats from all configured servers accumulates stats. -// Returns one of the errors encountered while gather stats (if any). func (r *RethinkDB) Gather(acc telegraf.Accumulator) error { if len(r.Servers) == 0 { return gatherServer(localhost, acc) @@ -47,7 +45,7 @@ func (r *RethinkDB) Gather(acc telegraf.Accumulator) error { wg.Add(1) go func() { defer wg.Done() - acc.AddError(gatherServer(&Server{URL: u}, acc)) + acc.AddError(gatherServer(&server{url: u}, acc)) }() } @@ -56,23 +54,23 @@ func (r *RethinkDB) Gather(acc telegraf.Accumulator) error { return nil } -func gatherServer(server *Server, acc telegraf.Accumulator) error { +func gatherServer(server *server, acc telegraf.Accumulator) error { var err error connectOpts := gorethink.ConnectOpts{ - Address: server.URL.Host, + Address: server.url.Host, DiscoverHosts: false, } - if server.URL.User != nil { - pwd, set := server.URL.User.Password() + if server.url.User != nil { + pwd, set := server.url.User.Password() if set && pwd != "" { connectOpts.AuthKey = pwd connectOpts.HandshakeVersion = gorethink.HandshakeV0_4 } } - if server.URL.Scheme == "rethinkdb2" && server.URL.User != nil { - pwd, set := server.URL.User.Password() + if server.url.Scheme == "rethinkdb2" && server.url.User != nil { + pwd, set := server.url.User.Password() if set && pwd != "" { - connectOpts.Username = server.URL.User.Username() + connectOpts.Username = server.url.User.Username() connectOpts.Password = pwd connectOpts.HandshakeVersion = gorethink.HandshakeV1_0 } diff --git a/plugins/inputs/rethinkdb/rethinkdb_data.go b/plugins/inputs/rethinkdb/rethinkdb_data.go index dbd6e3143b3f4..a8715e41870c4 100644 --- a/plugins/inputs/rethinkdb/rethinkdb_data.go +++ b/plugins/inputs/rethinkdb/rethinkdb_data.go @@ -86,11 +86,7 @@ var engineStats = map[string]string{ "total_writes": "TotalWrites", } -func (e *engine) AddEngineStats( - keys []string, - acc telegraf.Accumulator, - tags map[string]string, -) { +func (e *engine) addEngineStats(keys []string, acc telegraf.Accumulator, tags map[string]string) { engine := reflect.ValueOf(e).Elem() fields := make(map[string]interface{}) for _, key := range keys { @@ -99,7 +95,7 @@ func (e *engine) AddEngineStats( acc.AddFields("rethinkdb_engine", fields, tags) } -func (s *storage) AddStats(acc telegraf.Accumulator, tags map[string]string) { +func (s *storage) addStats(acc telegraf.Accumulator, tags map[string]string) { fields := map[string]interface{}{ "cache_bytes_in_use": s.Cache.BytesInUse, "disk_read_bytes_per_sec": s.Disk.ReadBytesPerSec, diff --git a/plugins/inputs/rethinkdb/rethinkdb_data_test.go b/plugins/inputs/rethinkdb/rethinkdb_data_test.go index bdee0f39950e1..11be99f9b8009 100644 --- a/plugins/inputs/rethinkdb/rethinkdb_data_test.go +++ b/plugins/inputs/rethinkdb/rethinkdb_data_test.go @@ -34,7 +34,7 @@ func TestAddEngineStats(t *testing.T) { "written_docs_per_sec", "total_writes", } - engine.AddEngineStats(keys, &acc, tags) + engine.addEngineStats(keys, &acc, tags) for _, metric := range keys { require.True(t, acc.HasInt64Field("rethinkdb_engine", metric)) @@ -65,7 +65,7 @@ func TestAddEngineStatsPartial(t *testing.T) { "total_reads", "total_writes", } - engine.AddEngineStats(keys, &acc, tags) + engine.addEngineStats(keys, &acc, tags) for _, metric := range missingKeys { require.False(t, acc.HasInt64Field("rethinkdb", metric)) @@ -105,7 +105,7 @@ func TestAddStorageStats(t *testing.T) { "disk_usage_preallocated_bytes", } - storage.AddStats(&acc, tags) + storage.addStats(&acc, tags) for _, metric := range keys { require.True(t, acc.HasInt64Field("rethinkdb", metric)) diff --git a/plugins/inputs/rethinkdb/rethinkdb_server.go b/plugins/inputs/rethinkdb/rethinkdb_server.go index 257a43f7e7559..f8085def38bf2 100644 --- a/plugins/inputs/rethinkdb/rethinkdb_server.go +++ b/plugins/inputs/rethinkdb/rethinkdb_server.go @@ -14,13 +14,13 @@ import ( "github.com/influxdata/telegraf" ) -type Server struct { - URL *url.URL +type server struct { + url *url.URL session *gorethink.Session serverStatus serverStatus } -func (s *Server) gatherData(acc telegraf.Accumulator) error { +func (s *server) gatherData(acc telegraf.Accumulator) error { if err := s.getServerStatus(); err != nil { return fmt.Errorf("failed to get server_status: %w", err) } @@ -44,7 +44,7 @@ func (s *Server) gatherData(acc telegraf.Accumulator) error { return nil } -func (s *Server) validateVersion() error { +func (s *server) validateVersion() error { if s.serverStatus.Process.Version == "" { return errors.New("could not determine the RethinkDB server version: process.version key missing") } @@ -62,7 +62,7 @@ func (s *Server) validateVersion() error { return nil } -func (s *Server) getServerStatus() error { +func (s *server) getServerStatus() error { cursor, err := gorethink.DB("rethinkdb").Table("server_status").Run(s.session) if err != nil { return err @@ -77,9 +77,9 @@ func (s *Server) getServerStatus() error { if err != nil { return errors.New("could not parse server_status results") } - host, port, err := net.SplitHostPort(s.URL.Host) + host, port, err := net.SplitHostPort(s.url.Host) if err != nil { - return fmt.Errorf("unable to determine provided hostname from %s", s.URL.Host) + return fmt.Errorf("unable to determine provided hostname from %s", s.url.Host) } driverPort, err := strconv.Atoi(port) if err != nil { @@ -94,17 +94,17 @@ func (s *Server) getServerStatus() error { } } - return fmt.Errorf("unable to determine host id from server_status with %s", s.URL.Host) + return fmt.Errorf("unable to determine host id from server_status with %s", s.url.Host) } -func (s *Server) getDefaultTags() map[string]string { +func (s *server) getDefaultTags() map[string]string { tags := make(map[string]string) - tags["rethinkdb_host"] = s.URL.Host + tags["rethinkdb_host"] = s.url.Host tags["rethinkdb_hostname"] = s.serverStatus.Network.Hostname return tags } -var ClusterTracking = []string{ +var clusterTracking = []string{ "active_clients", "clients", "queries_per_sec", @@ -112,7 +112,7 @@ var ClusterTracking = []string{ "written_docs_per_sec", } -func (s *Server) addClusterStats(acc telegraf.Accumulator) error { +func (s *server) addClusterStats(acc telegraf.Accumulator) error { cursor, err := gorethink.DB("rethinkdb").Table("stats").Get([]string{"cluster"}).Run(s.session) if err != nil { return fmt.Errorf("cluster stats query error: %w", err) @@ -125,11 +125,11 @@ func (s *Server) addClusterStats(acc telegraf.Accumulator) error { tags := s.getDefaultTags() tags["type"] = "cluster" - clusterStats.Engine.AddEngineStats(ClusterTracking, acc, tags) + clusterStats.Engine.addEngineStats(clusterTracking, acc, tags) return nil } -var MemberTracking = []string{ +var memberTracking = []string{ "active_clients", "clients", "queries_per_sec", @@ -140,7 +140,7 @@ var MemberTracking = []string{ "total_writes", } -func (s *Server) addMemberStats(acc telegraf.Accumulator) error { +func (s *server) addMemberStats(acc telegraf.Accumulator) error { cursor, err := gorethink.DB("rethinkdb").Table("stats").Get([]string{"server", s.serverStatus.ID}).Run(s.session) if err != nil { return fmt.Errorf("member stats query error: %w", err) @@ -153,18 +153,18 @@ func (s *Server) addMemberStats(acc telegraf.Accumulator) error { tags := s.getDefaultTags() tags["type"] = "member" - memberStats.Engine.AddEngineStats(MemberTracking, acc, tags) + memberStats.Engine.addEngineStats(memberTracking, acc, tags) return nil } -var TableTracking = []string{ +var tableTracking = []string{ "read_docs_per_sec", "total_reads", "written_docs_per_sec", "total_writes", } -func (s *Server) addTablesStats(acc telegraf.Accumulator) error { +func (s *server) addTablesStats(acc telegraf.Accumulator) error { tablesCursor, err := gorethink.DB("rethinkdb").Table("table_status").Run(s.session) if err != nil { return fmt.Errorf("table stats query error: %w", err) @@ -185,7 +185,7 @@ func (s *Server) addTablesStats(acc telegraf.Accumulator) error { return nil } -func (s *Server) addTableStats(acc telegraf.Accumulator, table tableStatus) error { +func (s *server) addTableStats(acc telegraf.Accumulator, table tableStatus) error { cursor, err := gorethink.DB("rethinkdb").Table("stats"). Get([]string{"table_server", table.ID, s.serverStatus.ID}). Run(s.session) @@ -202,8 +202,8 @@ func (s *Server) addTableStats(acc telegraf.Accumulator, table tableStatus) erro tags := s.getDefaultTags() tags["type"] = "data" tags["ns"] = fmt.Sprintf("%s.%s", table.DB, table.Name) - ts.Engine.AddEngineStats(TableTracking, acc, tags) - ts.Storage.AddStats(acc, tags) + ts.Engine.addEngineStats(tableTracking, acc, tags) + ts.Storage.addStats(acc, tags) return nil } diff --git a/plugins/inputs/rethinkdb/rethinkdb_server_test.go b/plugins/inputs/rethinkdb/rethinkdb_server_test.go index 6eeabe78aeb6a..f21d1dc6cf98f 100644 --- a/plugins/inputs/rethinkdb/rethinkdb_server_test.go +++ b/plugins/inputs/rethinkdb/rethinkdb_server_test.go @@ -37,7 +37,7 @@ func TestAddClusterStats(t *testing.T) { err := server.addClusterStats(&acc) require.NoError(t, err) - for _, metric := range ClusterTracking { + for _, metric := range clusterTracking { require.True(t, acc.HasIntValue(metric)) } } @@ -48,7 +48,7 @@ func TestAddMemberStats(t *testing.T) { err := server.addMemberStats(&acc) require.NoError(t, err) - for _, metric := range MemberTracking { + for _, metric := range memberTracking { require.True(t, acc.HasIntValue(metric)) } } @@ -59,7 +59,7 @@ func TestAddTableStats(t *testing.T) { err := server.addTableStats(&acc) require.NoError(t, err) - for _, metric := range TableTracking { + for _, metric := range tableTracking { require.True(t, acc.HasIntValue(metric)) } diff --git a/plugins/inputs/rethinkdb/rethinkdb_test.go b/plugins/inputs/rethinkdb/rethinkdb_test.go index 56ecb9edc7f3c..03d1221ea62a2 100644 --- a/plugins/inputs/rethinkdb/rethinkdb_test.go +++ b/plugins/inputs/rethinkdb/rethinkdb_test.go @@ -14,7 +14,7 @@ import ( ) var connect_url, authKey, username, password string -var server *Server +var server *server func init() { connect_url = os.Getenv("RETHINKDB_URL") @@ -28,18 +28,18 @@ func init() { func testSetup(m *testing.M) { var err error - server = &Server{URL: &url.URL{Host: connect_url}} + server = &server{url: &url.URL{Host: connect_url}} if authKey { server.session, _ = gorethink.Connect(gorethink.ConnectOpts{ - Address: server.URL.Host, + Address: server.url.Host, AuthKey: authKey, HandshakeVersion: gorethink.HandshakeV0_4, DiscoverHosts: false, }) } else { server.session, _ = gorethink.Connect(gorethink.ConnectOpts{ - Address: server.URL.Host, + Address: server.url.Host, Username: username, Password: password, HandshakeVersion: gorethink.HandshakeV1_0, diff --git a/plugins/inputs/riak/riak.go b/plugins/inputs/riak/riak.go index eeb9fee2cc141..47a053c891404 100644 --- a/plugins/inputs/riak/riak.go +++ b/plugins/inputs/riak/riak.go @@ -16,25 +16,14 @@ import ( //go:embed sample.conf var sampleConfig string -// Type Riak gathers statistics from one or more Riak instances type Riak struct { // Servers is a slice of servers as http addresses (ex. http://127.0.0.1:8098) - Servers []string + Servers []string `toml:"servers"` client *http.Client } -// NewRiak return a new instance of Riak with a default http client -func NewRiak() *Riak { - tr := &http.Transport{ResponseHeaderTimeout: 3 * time.Second} - client := &http.Client{ - Transport: tr, - Timeout: 4 * time.Second, - } - return &Riak{client: client} -} - -// Type riakStats represents the data that is received from Riak +// Type riakStats represents the data received from Riak type riakStats struct { CPUAvg1 int64 `json:"cpu_avg1"` CPUAvg15 int64 `json:"cpu_avg15"` @@ -88,7 +77,6 @@ func (*Riak) SampleConfig() string { return sampleConfig } -// Reads stats from all configured servers. func (r *Riak) Gather(acc telegraf.Accumulator) error { // Default to a single server at localhost (default port) if none specified if len(r.Servers) == 0 { @@ -103,7 +91,6 @@ func (r *Riak) Gather(acc telegraf.Accumulator) error { return nil } -// Gathers stats from a single server, adding them to the accumulator func (r *Riak) gatherServer(s string, acc telegraf.Accumulator) error { // Parse the given URL to extract the server tag u, err := url.Parse(s) @@ -190,8 +177,18 @@ func (r *Riak) gatherServer(s string, acc telegraf.Accumulator) error { return nil } +// newRiak return a new instance of Riak with a default http client +func newRiak() *Riak { + tr := &http.Transport{ResponseHeaderTimeout: 3 * time.Second} + client := &http.Client{ + Transport: tr, + Timeout: 4 * time.Second, + } + return &Riak{client: client} +} + func init() { inputs.Add("riak", func() telegraf.Input { - return NewRiak() + return newRiak() }) } diff --git a/plugins/inputs/riak/riak_test.go b/plugins/inputs/riak/riak_test.go index 0575cd0cd18be..3e11a07975bbd 100644 --- a/plugins/inputs/riak/riak_test.go +++ b/plugins/inputs/riak/riak_test.go @@ -29,7 +29,7 @@ func TestRiak(t *testing.T) { require.NoError(t, err) // Create a new Riak instance with our given test server - riak := NewRiak() + riak := newRiak() riak.Servers = []string{ts.URL} // Create a test accumulator diff --git a/plugins/inputs/riemann_listener/riemann_listener.go b/plugins/inputs/riemann_listener/riemann_listener.go index e269f1bea8417..35ca57158ed10 100644 --- a/plugins/inputs/riemann_listener/riemann_listener.go +++ b/plugins/inputs/riemann_listener/riemann_listener.go @@ -39,12 +39,12 @@ type RiemannSocketListener struct { SocketMode string `toml:"socket_mode"` common_tls.ServerConfig - wg sync.WaitGroup - Log telegraf.Logger `toml:"-"` + wg sync.WaitGroup telegraf.Accumulator } + type setReadBufferer interface { SetReadBuffer(sizeInBytes int) error } @@ -59,6 +59,73 @@ type riemannListener struct { connectionsMtx sync.Mutex } +func (*RiemannSocketListener) SampleConfig() string { + return sampleConfig +} + +func (rsl *RiemannSocketListener) Start(acc telegraf.Accumulator) error { + ctx, cancelFunc := context.WithCancel(context.Background()) + go rsl.processOsSignals(cancelFunc) + rsl.Accumulator = acc + if rsl.ServiceAddress == "" { + rsl.Log.Warnf("Using default service_address tcp://:5555") + rsl.ServiceAddress = "tcp://:5555" + } + spl := strings.SplitN(rsl.ServiceAddress, "://", 2) + if len(spl) != 2 { + return fmt.Errorf("invalid service address: %s", rsl.ServiceAddress) + } + + protocol := spl[0] + addr := spl[1] + + switch protocol { + case "tcp", "tcp4", "tcp6": + tlsCfg, err := rsl.ServerConfig.TLSConfig() + if err != nil { + return err + } + + var l net.Listener + if tlsCfg == nil { + l, err = net.Listen(protocol, addr) + } else { + l, err = tls.Listen(protocol, addr, tlsCfg) + } + if err != nil { + return err + } + + rsl.Log.Infof("Listening on %s://%s", protocol, l.Addr()) + + rsl := &riemannListener{ + Listener: l, + RiemannSocketListener: rsl, + sockType: spl[0], + } + + rsl.wg = sync.WaitGroup{} + rsl.wg.Add(1) + go func() { + defer rsl.wg.Done() + rsl.listen(ctx) + }() + default: + return fmt.Errorf("unknown protocol %q in %q", protocol, rsl.ServiceAddress) + } + + return nil +} + +func (*RiemannSocketListener) Gather(telegraf.Accumulator) error { + return nil +} + +func (rsl *RiemannSocketListener) Stop() { + rsl.wg.Done() + rsl.wg.Wait() +} + func (rsl *riemannListener) listen(ctx context.Context) { rsl.connections = make(map[string]net.Conn) @@ -125,6 +192,8 @@ func (rsl *riemannListener) closeAllConnections() { rsl.connectionsMtx.Unlock() } +// Utilities + func (rsl *riemannListener) setKeepAlive(c net.Conn) error { if rsl.KeepAlivePeriod == nil { return nil @@ -148,8 +217,6 @@ func (rsl *riemannListener) removeConnection(c net.Conn) { rsl.connectionsMtx.Unlock() } -// Utilities - /* readMessages will read Riemann messages in binary format from the TCP connection. byte Array p size will depend on the size @@ -271,68 +338,6 @@ func (rsl *riemannListener) riemannReturnErrorResponse(conn net.Conn, errorMessa } } -func (*RiemannSocketListener) SampleConfig() string { - return sampleConfig -} - -func (*RiemannSocketListener) Gather(telegraf.Accumulator) error { - return nil -} - -func (rsl *RiemannSocketListener) Start(acc telegraf.Accumulator) error { - ctx, cancelFunc := context.WithCancel(context.Background()) - go rsl.processOsSignals(cancelFunc) - rsl.Accumulator = acc - if rsl.ServiceAddress == "" { - rsl.Log.Warnf("Using default service_address tcp://:5555") - rsl.ServiceAddress = "tcp://:5555" - } - spl := strings.SplitN(rsl.ServiceAddress, "://", 2) - if len(spl) != 2 { - return fmt.Errorf("invalid service address: %s", rsl.ServiceAddress) - } - - protocol := spl[0] - addr := spl[1] - - switch protocol { - case "tcp", "tcp4", "tcp6": - tlsCfg, err := rsl.ServerConfig.TLSConfig() - if err != nil { - return err - } - - var l net.Listener - if tlsCfg == nil { - l, err = net.Listen(protocol, addr) - } else { - l, err = tls.Listen(protocol, addr, tlsCfg) - } - if err != nil { - return err - } - - rsl.Log.Infof("Listening on %s://%s", protocol, l.Addr()) - - rsl := &riemannListener{ - Listener: l, - RiemannSocketListener: rsl, - sockType: spl[0], - } - - rsl.wg = sync.WaitGroup{} - rsl.wg.Add(1) - go func() { - defer rsl.wg.Done() - rsl.listen(ctx) - }() - default: - return fmt.Errorf("unknown protocol %q in %q", protocol, rsl.ServiceAddress) - } - - return nil -} - // Handle cancellations from the process func (rsl *RiemannSocketListener) processOsSignals(cancelFunc context.CancelFunc) { signalChan := make(chan os.Signal, 1) @@ -347,11 +352,6 @@ func (rsl *RiemannSocketListener) processOsSignals(cancelFunc context.CancelFunc } } -func (rsl *RiemannSocketListener) Stop() { - rsl.wg.Done() - rsl.wg.Wait() -} - func newRiemannSocketListener() *RiemannSocketListener { return &RiemannSocketListener{} }