diff --git a/plugins/inputs/activemq/activemq.go b/plugins/inputs/activemq/activemq.go index 1d08b457172f5..ffb259d945699 100644 --- a/plugins/inputs/activemq/activemq.go +++ b/plugins/inputs/activemq/activemq.go @@ -22,6 +22,7 @@ import ( //go:embed sample.conf var sampleConfig string +// ActiveMQ Input Plugin type ActiveMQ struct { Server string `toml:"server" deprecated:"1.11.0;use 'url' instead"` Port int `toml:"port" deprecated:"1.11.0;use 'url' instead"` @@ -87,22 +88,6 @@ type stats struct { DequeueCounter int `xml:"dequeueCounter,attr"` } -func (a *ActiveMQ) createHTTPClient() (*http.Client, error) { - tlsCfg, err := a.ClientConfig.TLSConfig() - if err != nil { - return nil, err - } - - client := &http.Client{ - Transport: &http.Transport{ - TLSClientConfig: tlsCfg, - }, - Timeout: time.Duration(a.ResponseTimeout), - } - - return client, nil -} - func (*ActiveMQ) SampleConfig() string { return sampleConfig } @@ -138,7 +123,61 @@ func (a *ActiveMQ) Init() error { return nil } -func (a *ActiveMQ) GetMetrics(u string) ([]byte, error) { +func (a *ActiveMQ) Gather(acc telegraf.Accumulator) error { + dataQueues, err := a.getMetrics(a.queuesURL()) + if err != nil { + return err + } + queues := queues{} + err = xml.Unmarshal(dataQueues, &queues) + if err != nil { + return fmt.Errorf("queues XML unmarshal error: %w", err) + } + + dataTopics, err := a.getMetrics(a.topicsURL()) + if err != nil { + return err + } + topics := topics{} + err = xml.Unmarshal(dataTopics, &topics) + if err != nil { + return fmt.Errorf("topics XML unmarshal error: %w", err) + } + + dataSubscribers, err := a.getMetrics(a.subscribersURL()) + if err != nil { + return err + } + subscribers := subscribers{} + err = xml.Unmarshal(dataSubscribers, &subscribers) + if err != nil { + return fmt.Errorf("subscribers XML unmarshal error: %w", err) + } + + a.gatherQueuesMetrics(acc, queues) + a.gatherTopicsMetrics(acc, topics) + a.gatherSubscribersMetrics(acc, subscribers) + + return nil +} + +func (a *ActiveMQ) createHTTPClient() (*http.Client, error) { + tlsCfg, err := a.ClientConfig.TLSConfig() + if err != nil { + return nil, err + } + + client := &http.Client{ + Transport: &http.Transport{ + TLSClientConfig: tlsCfg, + }, + Timeout: time.Duration(a.ResponseTimeout), + } + + return client, nil +} + +func (a *ActiveMQ) getMetrics(u string) ([]byte, error) { req, err := http.NewRequest("GET", u, nil) if err != nil { return nil, err @@ -161,7 +200,7 @@ func (a *ActiveMQ) GetMetrics(u string) ([]byte, error) { return io.ReadAll(resp.Body) } -func (a *ActiveMQ) GatherQueuesMetrics(acc telegraf.Accumulator, queues queues) { +func (a *ActiveMQ) gatherQueuesMetrics(acc telegraf.Accumulator, queues queues) { for _, queue := range queues.QueueItems { records := make(map[string]interface{}) tags := make(map[string]string) @@ -179,7 +218,7 @@ func (a *ActiveMQ) GatherQueuesMetrics(acc telegraf.Accumulator, queues queues) } } -func (a *ActiveMQ) GatherTopicsMetrics(acc telegraf.Accumulator, topics topics) { +func (a *ActiveMQ) gatherTopicsMetrics(acc telegraf.Accumulator, topics topics) { for _, topic := range topics.TopicItems { records := make(map[string]interface{}) tags := make(map[string]string) @@ -197,7 +236,7 @@ func (a *ActiveMQ) GatherTopicsMetrics(acc telegraf.Accumulator, topics topics) } } -func (a *ActiveMQ) GatherSubscribersMetrics(acc telegraf.Accumulator, subscribers subscribers) { +func (a *ActiveMQ) gatherSubscribersMetrics(acc telegraf.Accumulator, subscribers subscribers) { for _, subscriber := range subscribers.SubscriberItems { records := make(map[string]interface{}) tags := make(map[string]string) @@ -221,55 +260,17 @@ func (a *ActiveMQ) GatherSubscribersMetrics(acc telegraf.Accumulator, subscriber } } -func (a *ActiveMQ) Gather(acc telegraf.Accumulator) error { - dataQueues, err := a.GetMetrics(a.QueuesURL()) - if err != nil { - return err - } - queues := queues{} - err = xml.Unmarshal(dataQueues, &queues) - if err != nil { - return fmt.Errorf("queues XML unmarshal error: %w", err) - } - - dataTopics, err := a.GetMetrics(a.TopicsURL()) - if err != nil { - return err - } - topics := topics{} - err = xml.Unmarshal(dataTopics, &topics) - if err != nil { - return fmt.Errorf("topics XML unmarshal error: %w", err) - } - - dataSubscribers, err := a.GetMetrics(a.SubscribersURL()) - if err != nil { - return err - } - subscribers := subscribers{} - err = xml.Unmarshal(dataSubscribers, &subscribers) - if err != nil { - return fmt.Errorf("subscribers XML unmarshal error: %w", err) - } - - a.GatherQueuesMetrics(acc, queues) - a.GatherTopicsMetrics(acc, topics) - a.GatherSubscribersMetrics(acc, subscribers) - - return nil -} - -func (a *ActiveMQ) QueuesURL() string { +func (a *ActiveMQ) queuesURL() string { ref := url.URL{Path: path.Join("/", a.Webadmin, "/xml/queues.jsp")} return a.baseURL.ResolveReference(&ref).String() } -func (a *ActiveMQ) TopicsURL() string { +func (a *ActiveMQ) topicsURL() string { ref := url.URL{Path: path.Join("/", a.Webadmin, "/xml/topics.jsp")} return a.baseURL.ResolveReference(&ref).String() } -func (a *ActiveMQ) SubscribersURL() string { +func (a *ActiveMQ) subscribersURL() string { ref := url.URL{Path: path.Join("/", a.Webadmin, "/xml/subscribers.jsp")} return a.baseURL.ResolveReference(&ref).String() } diff --git a/plugins/inputs/activemq/activemq_test.go b/plugins/inputs/activemq/activemq_test.go index 8bf78de37d828..cfece824c54a6 100644 --- a/plugins/inputs/activemq/activemq_test.go +++ b/plugins/inputs/activemq/activemq_test.go @@ -52,7 +52,7 @@ func TestGatherQueuesMetrics(t *testing.T) { require.NoError(t, plugin.Init()) var acc testutil.Accumulator - plugin.GatherQueuesMetrics(&acc, queues) + plugin.gatherQueuesMetrics(&acc, queues) acc.AssertContainsTaggedFields(t, "activemq_queues", records, tags) } @@ -98,7 +98,7 @@ func TestGatherTopicsMetrics(t *testing.T) { require.NoError(t, plugin.Init()) var acc testutil.Accumulator - plugin.GatherTopicsMetrics(&acc, topics) + plugin.gatherTopicsMetrics(&acc, topics) acc.AssertContainsTaggedFields(t, "activemq_topics", records, tags) } @@ -137,7 +137,7 @@ func TestGatherSubscribersMetrics(t *testing.T) { require.NoError(t, plugin.Init()) var acc testutil.Accumulator - plugin.GatherSubscribersMetrics(&acc, subscribers) + plugin.gatherSubscribersMetrics(&acc, subscribers) acc.AssertContainsTaggedFields(t, "activemq_subscribers", records, tags) } diff --git a/plugins/inputs/aerospike/aerospike.go b/plugins/inputs/aerospike/aerospike.go index 3dc552062fada..dd7a302ae66f2 100644 --- a/plugins/inputs/aerospike/aerospike.go +++ b/plugins/inputs/aerospike/aerospike.go @@ -21,6 +21,7 @@ import ( //go:embed sample.conf var sampleConfig string +// Aerospike Input Plugin type Aerospike struct { Servers []string `toml:"servers"` diff --git a/plugins/inputs/aliyuncms/aliyuncms.go b/plugins/inputs/aliyuncms/aliyuncms.go index be2b3d112dfe4..78c6c4f20cce4 100644 --- a/plugins/inputs/aliyuncms/aliyuncms.go +++ b/plugins/inputs/aliyuncms/aliyuncms.go @@ -43,7 +43,7 @@ type ( Period config.Duration `toml:"period"` Delay config.Duration `toml:"delay"` Project string `toml:"project"` - Metrics []*Metric `toml:"metrics"` + Metrics []*metric `toml:"metrics"` RateLimit int `toml:"ratelimit"` Log telegraf.Logger `toml:"-"` @@ -57,8 +57,8 @@ type ( measurement string } - // Metric describes what metrics to get - Metric struct { + // metric describes what metrics to get + metric struct { ObjectsFilter string `toml:"objects_filter"` MetricNames []string `toml:"names"` Dimensions string `toml:"dimensions"` // String representation of JSON dimensions @@ -74,11 +74,6 @@ type ( } - // Dimension describe how to get metrics - Dimension struct { - Value string `toml:"value"` - } - aliyuncmsClient interface { DescribeMetricList(request *cms.DescribeMetricListRequest) (response *cms.DescribeMetricListResponse, err error) } @@ -113,7 +108,6 @@ func (*AliyunCMS) SampleConfig() string { return sampleConfig } -// Init perform checks of plugin inputs and initialize internals func (s *AliyunCMS) Init() error { if s.Project == "" { return errors.New("project is not set") @@ -216,7 +210,6 @@ func (s *AliyunCMS) Start(telegraf.Accumulator) error { return nil } -// Gather implements telegraf.Inputs interface func (s *AliyunCMS) Gather(acc telegraf.Accumulator) error { s.updateWindow(time.Now()) @@ -225,16 +218,16 @@ func (s *AliyunCMS) Gather(acc telegraf.Accumulator) error { defer lmtr.Stop() var wg sync.WaitGroup - for _, metric := range s.Metrics { + for _, m := range s.Metrics { // Prepare internal structure with data from discovery - s.prepareTagsAndDimensions(metric) - wg.Add(len(metric.MetricNames)) - for _, metricName := range metric.MetricNames { + s.prepareTagsAndDimensions(m) + wg.Add(len(m.MetricNames)) + for _, metricName := range m.MetricNames { <-lmtr.C - go func(metricName string, metric *Metric) { + go func(metricName string, m *metric) { defer wg.Done() - acc.AddError(s.gatherMetric(acc, metricName, metric)) - }(metricName, metric) + acc.AddError(s.gatherMetric(acc, metricName, m)) + }(metricName, m) } wg.Wait() } @@ -269,7 +262,7 @@ func (s *AliyunCMS) updateWindow(relativeTo time.Time) { } // Gather given metric and emit error -func (s *AliyunCMS) gatherMetric(acc telegraf.Accumulator, metricName string, metric *Metric) error { +func (s *AliyunCMS) gatherMetric(acc telegraf.Accumulator, metricName string, metric *metric) error { for _, region := range s.Regions { req := cms.CreateDescribeMetricListRequest() req.Period = strconv.FormatInt(int64(time.Duration(s.Period).Seconds()), 10) @@ -372,7 +365,7 @@ func parseTag(tagSpec string, data interface{}) (tagKey, tagValue string, err er return tagKey, tagValue, nil } -func (s *AliyunCMS) prepareTagsAndDimensions(metric *Metric) { +func (s *AliyunCMS) prepareTagsAndDimensions(metric *metric) { var ( newData bool defaultTags = []string{"RegionId:RegionId"} diff --git a/plugins/inputs/aliyuncms/aliyuncms_test.go b/plugins/inputs/aliyuncms/aliyuncms_test.go index 5b0e48f770b3c..ebd7c8bfb23e2 100644 --- a/plugins/inputs/aliyuncms/aliyuncms_test.go +++ b/plugins/inputs/aliyuncms/aliyuncms_test.go @@ -240,7 +240,7 @@ func TestPluginMetricsInitialize(t *testing.T) { expectedErrorString string regions []string discoveryRegions []string - metrics []*Metric + metrics []*metric }{ { name: "Valid project", @@ -248,7 +248,7 @@ func TestPluginMetricsInitialize(t *testing.T) { regions: []string{"cn-shanghai"}, accessKeyID: "dummy", accessKeySecret: "dummy", - metrics: []*Metric{ + metrics: []*metric{ { MetricNames: []string{}, Dimensions: `{"instanceId": "i-abcdefgh123456"}`, @@ -261,7 +261,7 @@ func TestPluginMetricsInitialize(t *testing.T) { regions: []string{"cn-shanghai"}, accessKeyID: "dummy", accessKeySecret: "dummy", - metrics: []*Metric{ + metrics: []*metric{ { MetricNames: []string{}, Dimensions: `[{"instanceId": "p-example"},{"instanceId": "q-example"}]`, @@ -275,7 +275,7 @@ func TestPluginMetricsInitialize(t *testing.T) { accessKeyID: "dummy", accessKeySecret: "dummy", expectedErrorString: `cannot parse dimensions (neither obj, nor array) "[": unexpected end of JSON input`, - metrics: []*Metric{ + metrics: []*metric{ { MetricNames: []string{}, Dimensions: `[`, @@ -343,7 +343,7 @@ func TestGatherMetric(t *testing.T) { Regions: []string{"cn-shanghai"}, } - metric := &Metric{ + metric := &metric{ MetricNames: []string{}, Dimensions: `"instanceId": "i-abcdefgh123456"`, } @@ -374,7 +374,7 @@ func TestGatherMetric(t *testing.T) { } func TestGather(t *testing.T) { - metric := &Metric{ + m := &metric{ MetricNames: []string{}, Dimensions: `{"instanceId": "i-abcdefgh123456"}`, } @@ -382,7 +382,7 @@ func TestGather(t *testing.T) { AccessKeyID: "my_access_key_id", AccessKeySecret: "my_access_key_secret", Project: "acs_slb_dashboard", - Metrics: []*Metric{metric}, + Metrics: []*metric{m}, RateLimit: 200, measurement: formatMeasurement("acs_slb_dashboard"), Regions: []string{"cn-shanghai"}, diff --git a/plugins/inputs/aliyuncms/discovery.go b/plugins/inputs/aliyuncms/discovery.go index 9a72106477ca0..574570e14636f 100644 --- a/plugins/inputs/aliyuncms/discovery.go +++ b/plugins/inputs/aliyuncms/discovery.go @@ -395,8 +395,7 @@ func (dt *discoveryTool) getDiscoveryDataAcrossRegions(lmtr chan bool) (map[stri return resultData, nil } -// start the discovery pooling -// In case smth. new found it will be reported back through `DataChan` +// start the discovery pooling; in case something new is found, it will be reported back through `dataChan` func (dt *discoveryTool) start() { var ( err error @@ -443,8 +442,7 @@ func (dt *discoveryTool) start() { }() } -// stop the discovery loop, making sure -// all data is read from 'dataChan' +// stop the discovery loop, making sure all data is read from 'dataChan' func (dt *discoveryTool) stop() { close(dt.done) diff --git a/plugins/inputs/amd_rocm_smi/amd_rocm_smi.go b/plugins/inputs/amd_rocm_smi/amd_rocm_smi.go index 922fc56c066c2..82d5d229ff2ab 100644 --- a/plugins/inputs/amd_rocm_smi/amd_rocm_smi.go +++ b/plugins/inputs/amd_rocm_smi/amd_rocm_smi.go @@ -22,24 +22,93 @@ var sampleConfig string const measurement = "amd_rocm_smi" +// ROCmSMI Input Plugin type ROCmSMI struct { BinPath string `toml:"bin_path"` Timeout config.Duration `toml:"timeout"` Log telegraf.Logger `toml:"-"` } -func (*ROCmSMI) SampleConfig() string { - return sampleConfig +type gpu struct { + DeviceID string `json:"Device ID"` + GpuID string `json:"GPU ID"` + GpuUniqueID string `json:"Unique ID"` + GpuVBIOSVersion string `json:"VBIOS version"` + GpuTemperatureSensorEdge string `json:"Temperature (Sensor edge) (C)"` + GpuTemperatureSensorJunction string `json:"Temperature (Sensor junction) (C)"` + GpuTemperatureSensorMemory string `json:"Temperature (Sensor memory) (C)"` + GpuDcefClkClockSpeed string `json:"dcefclk clock speed:"` + GpuDcefClkClockLevel string `json:"dcefclk clock level:"` + GpuFclkClockSpeed string `json:"fclk clock speed:"` + GpuFclkClockLevel string `json:"fclk clock level:"` + GpuMclkClockSpeed string `json:"mclk clock speed:"` + GpuMclkClockLevel string `json:"mclk clock level:"` + GpuSclkClockSpeed string `json:"sclk clock speed:"` + GpuSclkClockLevel string `json:"sclk clock level:"` + GpuSocclkClockSpeed string `json:"socclk clock speed:"` + GpuSocclkClockLevel string `json:"socclk clock level:"` + GpuPcieClock string `json:"pcie clock level"` + GpuFanSpeedLevel string `json:"Fan speed (level)"` + GpuFanSpeedPercentage string `json:"Fan speed (%)"` + GpuFanRPM string `json:"Fan RPM"` + GpuPerformanceLevel string `json:"Performance Level"` + GpuOverdrive string `json:"GPU OverDrive value (%)"` + GpuMaxPower string `json:"Max Graphics Package Power (W)"` + GpuAveragePower string `json:"Average Graphics Package Power (W)"` + GpuUsePercentage string `json:"GPU use (%)"` + GpuMemoryAllocatedPercentage string `json:"GPU Memory Allocated (VRAM%)"` + GpuMemoryUsePercentage string `json:"GPU memory use (%)"` + GpuMemoryVendor string `json:"GPU memory vendor"` + GpuPCIeReplay string `json:"PCIe Replay Count"` + GpuSerialNumber string `json:"Serial Number"` + GpuVoltagemV string `json:"Voltage (mV)"` + GpuPCIBus string `json:"PCI Bus"` + GpuASDDirmware string `json:"ASD firmware version"` + GpuCEFirmware string `json:"CE firmware version"` + GpuDMCUFirmware string `json:"DMCU firmware version"` + GpuMCFirmware string `json:"MC firmware version"` + GpuMEFirmware string `json:"ME firmware version"` + GpuMECFirmware string `json:"MEC firmware version"` + GpuMEC2Firmware string `json:"MEC2 firmware version"` + GpuPFPFirmware string `json:"PFP firmware version"` + GpuRLCFirmware string `json:"RLC firmware version"` + GpuRLCSRLC string `json:"RLC SRLC firmware version"` + GpuRLCSRLG string `json:"RLC SRLG firmware version"` + GpuRLCSRLS string `json:"RLC SRLS firmware version"` + GpuSDMAFirmware string `json:"SDMA firmware version"` + GpuSDMA2Firmware string `json:"SDMA2 firmware version"` + GpuSMCFirmware string `json:"SMC firmware version"` + GpuSOSFirmware string `json:"SOS firmware version"` + GpuTARAS string `json:"TA RAS firmware version"` + GpuTAXGMI string `json:"TA XGMI firmware version"` + GpuUVDFirmware string `json:"UVD firmware version"` + GpuVCEFirmware string `json:"VCE firmware version"` + GpuVCNFirmware string `json:"VCN firmware version"` + GpuCardSeries string `json:"Card series"` + GpuCardModel string `json:"Card model"` + GpuCardVendor string `json:"Card vendor"` + GpuCardSKU string `json:"Card SKU"` + GpuNUMANode string `json:"(Topology) Numa Node"` + GpuNUMAAffinity string `json:"(Topology) Numa Affinity"` + GpuVisVRAMTotalMemory string `json:"VIS_VRAM Total Memory (B)"` + GpuVisVRAMTotalUsedMemory string `json:"VIS_VRAM Total Used Memory (B)"` + GpuVRAMTotalMemory string `json:"VRAM Total Memory (B)"` + GpuVRAMTotalUsedMemory string `json:"VRAM Total Used Memory (B)"` + GpuGTTTotalMemory string `json:"GTT Total Memory (B)"` + GpuGTTTotalUsedMemory string `json:"GTT Total Used Memory (B)"` } -// Gather implements the telegraf interface -func (rsmi *ROCmSMI) Gather(acc telegraf.Accumulator) error { - data, err := rsmi.pollROCmSMI() - if err != nil { - return fmt.Errorf("failed to execute command in pollROCmSMI: %w", err) - } +type sysInfo struct { + DriverVersion string `json:"Driver version"` +} - return gatherROCmSMI(data, acc) +type metric struct { + tags map[string]string + fields map[string]interface{} +} + +func (*ROCmSMI) SampleConfig() string { + return sampleConfig } func (rsmi *ROCmSMI) Start(telegraf.Accumulator) error { @@ -54,17 +123,17 @@ func (rsmi *ROCmSMI) Start(telegraf.Accumulator) error { return nil } -func (rsmi *ROCmSMI) Stop() {} +func (rsmi *ROCmSMI) Gather(acc telegraf.Accumulator) error { + data, err := rsmi.pollROCmSMI() + if err != nil { + return fmt.Errorf("failed to execute command in pollROCmSMI: %w", err) + } -func init() { - inputs.Add("amd_rocm_smi", func() telegraf.Input { - return &ROCmSMI{ - BinPath: "/opt/rocm/bin/rocm-smi", - Timeout: config.Duration(5 * time.Second), - } - }) + return gatherROCmSMI(data, acc) } +func (rsmi *ROCmSMI) Stop() {} + func (rsmi *ROCmSMI) pollROCmSMI() ([]byte, error) { // Construct and execute metrics query, there currently exist (ROCm v4.3.x) a "-a" option // that does not provide all the information, so each needed parameter is set manually @@ -111,34 +180,7 @@ func (rsmi *ROCmSMI) pollROCmSMI() ([]byte, error) { return internal.StdOutputTimeout(cmd, time.Duration(rsmi.Timeout)) } -func gatherROCmSMI(ret []byte, acc telegraf.Accumulator) error { - var gpus map[string]GPU - var sys map[string]sysInfo - - err1 := json.Unmarshal(ret, &gpus) - if err1 != nil { - return err1 - } - - err2 := json.Unmarshal(ret, &sys) - if err2 != nil { - return err2 - } - - metrics := genTagsFields(gpus, sys) - for _, metric := range metrics { - acc.AddFields(measurement, metric.fields, metric.tags) - } - - return nil -} - -type metric struct { - tags map[string]string - fields map[string]interface{} -} - -func genTagsFields(gpus map[string]GPU, system map[string]sysInfo) []metric { +func genTagsFields(gpus map[string]gpu, system map[string]sysInfo) []metric { metrics := []metric{} for cardID := range gpus { if strings.Contains(cardID, "card") { @@ -189,6 +231,28 @@ func genTagsFields(gpus map[string]GPU, system map[string]sysInfo) []metric { return metrics } +func gatherROCmSMI(ret []byte, acc telegraf.Accumulator) error { + var gpus map[string]gpu + var sys map[string]sysInfo + + err1 := json.Unmarshal(ret, &gpus) + if err1 != nil { + return err1 + } + + err2 := json.Unmarshal(ret, &sys) + if err2 != nil { + return err2 + } + + metrics := genTagsFields(gpus, sys) + for _, metric := range metrics { + acc.AddFields(measurement, metric.fields, metric.tags) + } + + return nil +} + func setTagIfUsed(m map[string]string, k, v string) { if v != "" { m[k] = v @@ -232,75 +296,11 @@ func setIfUsed(t string, m map[string]interface{}, k, v string) { } } -type sysInfo struct { - DriverVersion string `json:"Driver version"` -} - -type GPU struct { - DeviceID string `json:"Device ID"` - GpuID string `json:"GPU ID"` - GpuUniqueID string `json:"Unique ID"` - GpuVBIOSVersion string `json:"VBIOS version"` - GpuTemperatureSensorEdge string `json:"Temperature (Sensor edge) (C)"` - GpuTemperatureSensorJunction string `json:"Temperature (Sensor junction) (C)"` - GpuTemperatureSensorMemory string `json:"Temperature (Sensor memory) (C)"` - GpuDcefClkClockSpeed string `json:"dcefclk clock speed:"` - GpuDcefClkClockLevel string `json:"dcefclk clock level:"` - GpuFclkClockSpeed string `json:"fclk clock speed:"` - GpuFclkClockLevel string `json:"fclk clock level:"` - GpuMclkClockSpeed string `json:"mclk clock speed:"` - GpuMclkClockLevel string `json:"mclk clock level:"` - GpuSclkClockSpeed string `json:"sclk clock speed:"` - GpuSclkClockLevel string `json:"sclk clock level:"` - GpuSocclkClockSpeed string `json:"socclk clock speed:"` - GpuSocclkClockLevel string `json:"socclk clock level:"` - GpuPcieClock string `json:"pcie clock level"` - GpuFanSpeedLevel string `json:"Fan speed (level)"` - GpuFanSpeedPercentage string `json:"Fan speed (%)"` - GpuFanRPM string `json:"Fan RPM"` - GpuPerformanceLevel string `json:"Performance Level"` - GpuOverdrive string `json:"GPU OverDrive value (%)"` - GpuMaxPower string `json:"Max Graphics Package Power (W)"` - GpuAveragePower string `json:"Average Graphics Package Power (W)"` - GpuUsePercentage string `json:"GPU use (%)"` - GpuMemoryAllocatedPercentage string `json:"GPU Memory Allocated (VRAM%)"` - GpuMemoryUsePercentage string `json:"GPU memory use (%)"` - GpuMemoryVendor string `json:"GPU memory vendor"` - GpuPCIeReplay string `json:"PCIe Replay Count"` - GpuSerialNumber string `json:"Serial Number"` - GpuVoltagemV string `json:"Voltage (mV)"` - GpuPCIBus string `json:"PCI Bus"` - GpuASDDirmware string `json:"ASD firmware version"` - GpuCEFirmware string `json:"CE firmware version"` - GpuDMCUFirmware string `json:"DMCU firmware version"` - GpuMCFirmware string `json:"MC firmware version"` - GpuMEFirmware string `json:"ME firmware version"` - GpuMECFirmware string `json:"MEC firmware version"` - GpuMEC2Firmware string `json:"MEC2 firmware version"` - GpuPFPFirmware string `json:"PFP firmware version"` - GpuRLCFirmware string `json:"RLC firmware version"` - GpuRLCSRLC string `json:"RLC SRLC firmware version"` - GpuRLCSRLG string `json:"RLC SRLG firmware version"` - GpuRLCSRLS string `json:"RLC SRLS firmware version"` - GpuSDMAFirmware string `json:"SDMA firmware version"` - GpuSDMA2Firmware string `json:"SDMA2 firmware version"` - GpuSMCFirmware string `json:"SMC firmware version"` - GpuSOSFirmware string `json:"SOS firmware version"` - GpuTARAS string `json:"TA RAS firmware version"` - GpuTAXGMI string `json:"TA XGMI firmware version"` - GpuUVDFirmware string `json:"UVD firmware version"` - GpuVCEFirmware string `json:"VCE firmware version"` - GpuVCNFirmware string `json:"VCN firmware version"` - GpuCardSeries string `json:"Card series"` - GpuCardModel string `json:"Card model"` - GpuCardVendor string `json:"Card vendor"` - GpuCardSKU string `json:"Card SKU"` - GpuNUMANode string `json:"(Topology) Numa Node"` - GpuNUMAAffinity string `json:"(Topology) Numa Affinity"` - GpuVisVRAMTotalMemory string `json:"VIS_VRAM Total Memory (B)"` - GpuVisVRAMTotalUsedMemory string `json:"VIS_VRAM Total Used Memory (B)"` - GpuVRAMTotalMemory string `json:"VRAM Total Memory (B)"` - GpuVRAMTotalUsedMemory string `json:"VRAM Total Used Memory (B)"` - GpuGTTTotalMemory string `json:"GTT Total Memory (B)"` - GpuGTTTotalUsedMemory string `json:"GTT Total Used Memory (B)"` +func init() { + inputs.Add("amd_rocm_smi", func() telegraf.Input { + return &ROCmSMI{ + BinPath: "/opt/rocm/bin/rocm-smi", + Timeout: config.Duration(5 * time.Second), + } + }) } diff --git a/plugins/inputs/amqp_consumer/amqp_consumer.go b/plugins/inputs/amqp_consumer/amqp_consumer.go index f076261a3bb41..ba531f48214e0 100644 --- a/plugins/inputs/amqp_consumer/amqp_consumer.go +++ b/plugins/inputs/amqp_consumer/amqp_consumer.go @@ -26,6 +26,8 @@ var sampleConfig string var once sync.Once type empty struct{} +type externalAuth struct{} + type semaphore chan empty // AMQPConsumer is the top level struct for this plugin @@ -62,11 +64,12 @@ type AMQPConsumer struct { decoder internal.ContentDecoder } -type externalAuth struct{} - +// Mechanism returns "EXTERNAL" func (a *externalAuth) Mechanism() string { return "EXTERNAL" } + +// Response returns "\000" func (a *externalAuth) Response() string { return "\000" } @@ -115,51 +118,6 @@ func (a *AMQPConsumer) SetParser(parser telegraf.Parser) { a.parser = parser } -// All gathering is done in the Start function -func (a *AMQPConsumer) Gather(_ telegraf.Accumulator) error { - return nil -} - -func (a *AMQPConsumer) createConfig() (*amqp.Config, error) { - // make new tls config - tlsCfg, err := a.ClientConfig.TLSConfig() - if err != nil { - return nil, err - } - - var auth []amqp.Authentication - - if strings.EqualFold(a.AuthMethod, "EXTERNAL") { - auth = []amqp.Authentication{&externalAuth{}} - } else if !a.Username.Empty() || !a.Password.Empty() { - username, err := a.Username.Get() - if err != nil { - return nil, fmt.Errorf("getting username failed: %w", err) - } - defer username.Destroy() - - password, err := a.Password.Get() - if err != nil { - return nil, fmt.Errorf("getting password failed: %w", err) - } - defer password.Destroy() - - auth = []amqp.Authentication{ - &amqp.PlainAuth{ - Username: username.String(), - Password: password.String(), - }, - } - } - amqpConfig := amqp.Config{ - TLSClientConfig: tlsCfg, - SASL: auth, // if nil, it will be PLAIN - Dial: amqp.DefaultDial(time.Duration(a.Timeout)), - } - return &amqpConfig, nil -} - -// Start satisfies the telegraf.ServiceInput interface func (a *AMQPConsumer) Start(acc telegraf.Accumulator) error { amqpConf, err := a.createConfig() if err != nil { @@ -219,6 +177,63 @@ func (a *AMQPConsumer) Start(acc telegraf.Accumulator) error { return nil } +func (a *AMQPConsumer) Gather(_ telegraf.Accumulator) error { + return nil +} + +func (a *AMQPConsumer) Stop() { + // We did not connect successfully so there is nothing to do here. + if a.conn == nil || a.conn.IsClosed() { + return + } + a.cancel() + a.wg.Wait() + err := a.conn.Close() + if err != nil && !errors.Is(err, amqp.ErrClosed) { + a.Log.Errorf("Error closing AMQP connection: %s", err) + return + } +} + +func (a *AMQPConsumer) createConfig() (*amqp.Config, error) { + // make new tls config + tlsCfg, err := a.ClientConfig.TLSConfig() + if err != nil { + return nil, err + } + + var auth []amqp.Authentication + + if strings.EqualFold(a.AuthMethod, "EXTERNAL") { + auth = []amqp.Authentication{&externalAuth{}} + } else if !a.Username.Empty() || !a.Password.Empty() { + username, err := a.Username.Get() + if err != nil { + return nil, fmt.Errorf("getting username failed: %w", err) + } + defer username.Destroy() + + password, err := a.Password.Get() + if err != nil { + return nil, fmt.Errorf("getting password failed: %w", err) + } + defer password.Destroy() + + auth = []amqp.Authentication{ + &amqp.PlainAuth{ + Username: username.String(), + Password: password.String(), + }, + } + } + amqpConfig := amqp.Config{ + TLSClientConfig: tlsCfg, + SASL: auth, // if nil, it will be PLAIN + Dial: amqp.DefaultDial(time.Duration(a.Timeout)), + } + return &amqpConfig, nil +} + func (a *AMQPConsumer) connect(amqpConf *amqp.Config) (<-chan amqp.Delivery, error) { brokers := a.Brokers @@ -479,20 +494,6 @@ func (a *AMQPConsumer) onDelivery(track telegraf.DeliveryInfo) bool { return true } -func (a *AMQPConsumer) Stop() { - // We did not connect successfully so there is nothing to do here. - if a.conn == nil || a.conn.IsClosed() { - return - } - a.cancel() - a.wg.Wait() - err := a.conn.Close() - if err != nil && !errors.Is(err, amqp.ErrClosed) { - a.Log.Errorf("Error closing AMQP connection: %s", err) - return - } -} - func init() { inputs.Add("amqp_consumer", func() telegraf.Input { return &AMQPConsumer{Timeout: config.Duration(30 * time.Second)} diff --git a/plugins/inputs/apache/apache.go b/plugins/inputs/apache/apache.go index 6263404faa549..5a18c32c5840d 100644 --- a/plugins/inputs/apache/apache.go +++ b/plugins/inputs/apache/apache.go @@ -22,6 +22,7 @@ import ( //go:embed sample.conf var sampleConfig string +// Apache Input Plugin type Apache struct { Urls []string Username string diff --git a/plugins/inputs/apcupsd/apcupsd.go b/plugins/inputs/apcupsd/apcupsd.go index 299b973627204..7dbbaefeedc36 100644 --- a/plugins/inputs/apcupsd/apcupsd.go +++ b/plugins/inputs/apcupsd/apcupsd.go @@ -23,6 +23,7 @@ const defaultAddress = "tcp://127.0.0.1:3551" var defaultTimeout = config.Duration(5 * time.Second) +// ApcUpsd Input Plugin type ApcUpsd struct { Servers []string Timeout config.Duration diff --git a/plugins/inputs/aurora/aurora.go b/plugins/inputs/aurora/aurora.go index a76ca835df4ed..bf0c7d4700bdd 100644 --- a/plugins/inputs/aurora/aurora.go +++ b/plugins/inputs/aurora/aurora.go @@ -21,19 +21,19 @@ import ( //go:embed sample.conf var sampleConfig string -type RoleType int +type roleType int const ( - Unknown RoleType = iota - Leader - Follower + unknown roleType = iota + leader + follower ) -func (r RoleType) String() string { +func (r roleType) String() string { switch r { - case Leader: + case leader: return "leader" - case Follower: + case follower: return "follower" default: return "unknown" @@ -45,8 +45,9 @@ var ( defaultRoles = []string{"leader", "follower"} ) -type Vars map[string]interface{} +type vars map[string]interface{} +// Aurora Input Plugin type Aurora struct { Schedulers []string `toml:"schedulers"` Roles []string `toml:"roles"` @@ -136,7 +137,7 @@ func (a *Aurora) initialize() error { return nil } -func (a *Aurora) roleEnabled(role RoleType) bool { +func (a *Aurora) roleEnabled(role roleType) bool { if len(a.Roles) == 0 { return true } @@ -149,12 +150,12 @@ func (a *Aurora) roleEnabled(role RoleType) bool { return false } -func (a *Aurora) gatherRole(ctx context.Context, origin *url.URL) (RoleType, error) { +func (a *Aurora) gatherRole(ctx context.Context, origin *url.URL) (roleType, error) { loc := *origin loc.Path = "leaderhealth" req, err := http.NewRequest("GET", loc.String(), nil) if err != nil { - return Unknown, err + return unknown, err } if a.Username != "" || a.Password != "" { @@ -164,26 +165,26 @@ func (a *Aurora) gatherRole(ctx context.Context, origin *url.URL) (RoleType, err resp, err := a.client.Do(req.WithContext(ctx)) if err != nil { - return Unknown, err + return unknown, err } if err := resp.Body.Close(); err != nil { - return Unknown, fmt.Errorf("closing body failed: %w", err) + return unknown, fmt.Errorf("closing body failed: %w", err) } switch resp.StatusCode { case http.StatusOK: - return Leader, nil + return leader, nil case http.StatusBadGateway: fallthrough case http.StatusServiceUnavailable: - return Follower, nil + return follower, nil default: - return Unknown, fmt.Errorf("%v", resp.Status) + return unknown, fmt.Errorf("%v", resp.Status) } } func (a *Aurora) gatherScheduler( - ctx context.Context, origin *url.URL, role RoleType, acc telegraf.Accumulator, + ctx context.Context, origin *url.URL, role roleType, acc telegraf.Accumulator, ) error { loc := *origin loc.Path = "vars.json" @@ -207,7 +208,7 @@ func (a *Aurora) gatherScheduler( return fmt.Errorf("%v", resp.Status) } - var vars Vars + var vars vars decoder := json.NewDecoder(resp.Body) decoder.UseNumber() err = decoder.Decode(&vars) diff --git a/plugins/inputs/aurora/aurora_test.go b/plugins/inputs/aurora/aurora_test.go index 6eeaa93f4809e..b43949f25f897 100644 --- a/plugins/inputs/aurora/aurora_test.go +++ b/plugins/inputs/aurora/aurora_test.go @@ -12,8 +12,8 @@ import ( ) type ( - TestHandlerFunc func(t *testing.T, w http.ResponseWriter, r *http.Request) - CheckFunc func(t *testing.T, err error, acc *testutil.Accumulator) + testHandlerFunc func(t *testing.T, w http.ResponseWriter, r *http.Request) + checkFunc func(t *testing.T, err error, acc *testutil.Accumulator) ) func TestAurora(t *testing.T) { @@ -28,9 +28,9 @@ func TestAurora(t *testing.T) { plugin *Aurora schedulers []string roles []string - leaderhealth TestHandlerFunc - varsjson TestHandlerFunc - check CheckFunc + leaderhealth testHandlerFunc + varsjson testHandlerFunc + check checkFunc }{ { name: "minimal", diff --git a/plugins/inputs/azure_monitor/azure_monitor.go b/plugins/inputs/azure_monitor/azure_monitor.go index 495b67ec7d637..b4f3fbd32b51d 100644 --- a/plugins/inputs/azure_monitor/azure_monitor.go +++ b/plugins/inputs/azure_monitor/azure_monitor.go @@ -14,15 +14,16 @@ import ( receiver "github.com/logzio/azure-monitor-metrics-receiver" ) +// AzureMonitor Input Plugin type AzureMonitor struct { SubscriptionID string `toml:"subscription_id"` ClientID string `toml:"client_id"` ClientSecret string `toml:"client_secret"` TenantID string `toml:"tenant_id"` CloudOption string `toml:"cloud_option,omitempty"` - ResourceTargets []*ResourceTarget `toml:"resource_target"` - ResourceGroupTargets []*ResourceGroupTarget `toml:"resource_group_target"` - SubscriptionTargets []*Resource `toml:"subscription_target"` + ResourceTargets []*resourceTarget `toml:"resource_target"` + ResourceGroupTargets []*resourceGroupTarget `toml:"resource_group_target"` + SubscriptionTargets []*resource `toml:"subscription_target"` Log telegraf.Logger `toml:"-"` receiver *receiver.AzureMonitorMetricsReceiver @@ -30,18 +31,18 @@ type AzureMonitor struct { azureClients *receiver.AzureClients } -type ResourceTarget struct { +type resourceTarget struct { ResourceID string `toml:"resource_id"` Metrics []string `toml:"metrics"` Aggregations []string `toml:"aggregations"` } -type ResourceGroupTarget struct { +type resourceGroupTarget struct { ResourceGroup string `toml:"resource_group"` - Resources []*Resource `toml:"resource"` + Resources []*resource `toml:"resource"` } -type Resource struct { +type resource struct { ResourceType string `toml:"resource_type"` Metrics []string `toml:"metrics"` Aggregations []string `toml:"aggregations"` @@ -61,7 +62,6 @@ func (am *AzureMonitor) SampleConfig() string { return sampleConfig } -// Init is for setup, and validating config. func (am *AzureMonitor) Init() error { var clientOptions azcore.ClientOptions switch am.CloudOption { diff --git a/plugins/inputs/azure_storage_queue/azure_storage_queue.go b/plugins/inputs/azure_storage_queue/azure_storage_queue.go index d65a8fe7b329e..73a3df4d9dfba 100644 --- a/plugins/inputs/azure_storage_queue/azure_storage_queue.go +++ b/plugins/inputs/azure_storage_queue/azure_storage_queue.go @@ -18,6 +18,7 @@ import ( //go:embed sample.conf var sampleConfig string +// AzureStorageQueue Input Plugin type AzureStorageQueue struct { StorageAccountName string `toml:"account_name"` StorageAccountKey string `toml:"account_key"` @@ -42,45 +43,8 @@ func (a *AzureStorageQueue) Init() error { return nil } -func (a *AzureStorageQueue) GetServiceURL() (azqueue.ServiceURL, error) { - if a.serviceURL == nil { - _url, err := url.Parse("https://" + a.StorageAccountName + ".queue.core.windows.net") - if err != nil { - return azqueue.ServiceURL{}, err - } - - credential, err := azqueue.NewSharedKeyCredential(a.StorageAccountName, a.StorageAccountKey) - if err != nil { - return azqueue.ServiceURL{}, err - } - - pipeline := azqueue.NewPipeline(credential, azqueue.PipelineOptions{}) - - serviceURL := azqueue.NewServiceURL(*_url, pipeline) - a.serviceURL = &serviceURL - } - return *a.serviceURL, nil -} - -func (a *AzureStorageQueue) GatherQueueMetrics( - acc telegraf.Accumulator, - queueItem azqueue.QueueItem, - properties *azqueue.QueueGetPropertiesResponse, - peekedMessage *azqueue.PeekedMessage, -) { - fields := make(map[string]interface{}) - tags := make(map[string]string) - tags["queue"] = strings.TrimSpace(queueItem.Name) - tags["account"] = a.StorageAccountName - fields["size"] = properties.ApproximateMessagesCount() - if peekedMessage != nil { - fields["oldest_message_age_ns"] = time.Now().UnixNano() - peekedMessage.InsertionTime.UnixNano() - } - acc.AddFields("azure_storage_queues", fields, tags) -} - func (a *AzureStorageQueue) Gather(acc telegraf.Accumulator) error { - serviceURL, err := a.GetServiceURL() + serviceURL, err := a.getServiceURL() if err != nil { return err } @@ -117,12 +81,49 @@ func (a *AzureStorageQueue) Gather(acc telegraf.Accumulator) error { } } - a.GatherQueueMetrics(acc, queueItem, properties, peekedMessage) + a.gatherQueueMetrics(acc, queueItem, properties, peekedMessage) } } return nil } +func (a *AzureStorageQueue) getServiceURL() (azqueue.ServiceURL, error) { + if a.serviceURL == nil { + _url, err := url.Parse("https://" + a.StorageAccountName + ".queue.core.windows.net") + if err != nil { + return azqueue.ServiceURL{}, err + } + + credential, err := azqueue.NewSharedKeyCredential(a.StorageAccountName, a.StorageAccountKey) + if err != nil { + return azqueue.ServiceURL{}, err + } + + pipeline := azqueue.NewPipeline(credential, azqueue.PipelineOptions{}) + + serviceURL := azqueue.NewServiceURL(*_url, pipeline) + a.serviceURL = &serviceURL + } + return *a.serviceURL, nil +} + +func (a *AzureStorageQueue) gatherQueueMetrics( + acc telegraf.Accumulator, + queueItem azqueue.QueueItem, + properties *azqueue.QueueGetPropertiesResponse, + peekedMessage *azqueue.PeekedMessage, +) { + fields := make(map[string]interface{}) + tags := make(map[string]string) + tags["queue"] = strings.TrimSpace(queueItem.Name) + tags["account"] = a.StorageAccountName + fields["size"] = properties.ApproximateMessagesCount() + if peekedMessage != nil { + fields["oldest_message_age_ns"] = time.Now().UnixNano() - peekedMessage.InsertionTime.UnixNano() + } + acc.AddFields("azure_storage_queues", fields, tags) +} + func init() { inputs.Add("azure_storage_queue", func() telegraf.Input { return &AzureStorageQueue{PeekOldestMessageAge: true} diff --git a/plugins/inputs/bcache/bcache.go b/plugins/inputs/bcache/bcache.go index 2157fad55a8a1..c8762205a3cee 100644 --- a/plugins/inputs/bcache/bcache.go +++ b/plugins/inputs/bcache/bcache.go @@ -19,9 +19,46 @@ import ( //go:embed sample.conf var sampleConfig string +// Bcache Input Plugin type Bcache struct { - BcachePath string - BcacheDevs []string + BcachePath string `toml:"bcachePath"` + BcacheDevs []string `toml:"bcacheDevs"` +} + +func (*Bcache) SampleConfig() string { + return sampleConfig +} + +func (b *Bcache) Gather(acc telegraf.Accumulator) error { + bcacheDevsChecked := make(map[string]bool) + var restrictDevs bool + if len(b.BcacheDevs) != 0 { + restrictDevs = true + for _, bcacheDev := range b.BcacheDevs { + bcacheDevsChecked[bcacheDev] = true + } + } + + bcachePath := b.BcachePath + if len(bcachePath) == 0 { + bcachePath = "/sys/fs/bcache" + } + bdevs, err := filepath.Glob(bcachePath + "/*/bdev*") + if len(bdevs) < 1 || err != nil { + return errors.New("can't find any bcache device") + } + for _, bdev := range bdevs { + if restrictDevs { + bcacheDev := getTags(bdev)["bcache_dev"] + if !bcacheDevsChecked[bcacheDev] { + continue + } + } + if err := b.gatherBcache(bdev, acc); err != nil { + return fmt.Errorf("gathering bcache failed: %w", err) + } + } + return nil } func getTags(bdev string) map[string]string { @@ -102,42 +139,6 @@ func (b *Bcache) gatherBcache(bdev string, acc telegraf.Accumulator) error { return nil } -func (*Bcache) SampleConfig() string { - return sampleConfig -} - -func (b *Bcache) Gather(acc telegraf.Accumulator) error { - bcacheDevsChecked := make(map[string]bool) - var restrictDevs bool - if len(b.BcacheDevs) != 0 { - restrictDevs = true - for _, bcacheDev := range b.BcacheDevs { - bcacheDevsChecked[bcacheDev] = true - } - } - - bcachePath := b.BcachePath - if len(bcachePath) == 0 { - bcachePath = "/sys/fs/bcache" - } - bdevs, err := filepath.Glob(bcachePath + "/*/bdev*") - if len(bdevs) < 1 || err != nil { - return errors.New("can't find any bcache device") - } - for _, bdev := range bdevs { - if restrictDevs { - bcacheDev := getTags(bdev)["bcache_dev"] - if !bcacheDevsChecked[bcacheDev] { - continue - } - } - if err := b.gatherBcache(bdev, acc); err != nil { - return fmt.Errorf("gathering bcache failed: %w", err) - } - } - return nil -} - func init() { inputs.Add("bcache", func() telegraf.Input { return &Bcache{} diff --git a/plugins/inputs/beanstalkd/beanstalkd.go b/plugins/inputs/beanstalkd/beanstalkd.go index 2b3935d8fbeb6..ffb2c1939a545 100644 --- a/plugins/inputs/beanstalkd/beanstalkd.go +++ b/plugins/inputs/beanstalkd/beanstalkd.go @@ -17,6 +17,7 @@ import ( //go:embed sample.conf var sampleConfig string +// Beanstalkd Input Plugin type Beanstalkd struct { Server string `toml:"server"` Tubes []string `toml:"tubes"` diff --git a/plugins/inputs/beat/beat.go b/plugins/inputs/beat/beat.go index 5992bf9ec9ce8..bed51273bf28e 100644 --- a/plugins/inputs/beat/beat.go +++ b/plugins/inputs/beat/beat.go @@ -20,24 +20,12 @@ import ( //go:embed sample.conf var sampleConfig string -const suffixInfo = "/" -const suffixStats = "/stats" - -type Info struct { - Beat string `json:"beat"` - Hostname string `json:"hostname"` - Name string `json:"name"` - UUID string `json:"uuid"` - Version string `json:"version"` -} - -type Stats struct { - Beat map[string]interface{} `json:"beat"` - FileBeat interface{} `json:"filebeat"` - Libbeat interface{} `json:"libbeat"` - System interface{} `json:"system"` -} +const ( + suffixInfo = "/" + suffixStats = "/stats" +) +// Beat Input Plugin type Beat struct { URL string `toml:"url"` @@ -54,14 +42,19 @@ type Beat struct { client *http.Client } -func NewBeat() *Beat { - return &Beat{ - URL: "http://127.0.0.1:5066", - Includes: []string{"beat", "libbeat", "filebeat"}, - Method: "GET", - Headers: make(map[string]string), - Timeout: config.Duration(time.Second * 5), - } +type info struct { + Beat string `json:"beat"` + Hostname string `json:"hostname"` + Name string `json:"name"` + UUID string `json:"uuid"` + Version string `json:"version"` +} + +type stats struct { + Beat map[string]interface{} `json:"beat"` + FileBeat interface{} `json:"filebeat"` + LibBeat interface{} `json:"libbeat"` + System interface{} `json:"system"` } func (*Beat) SampleConfig() string { @@ -86,53 +79,9 @@ func (beat *Beat) Init() error { return nil } -// createHTTPClient create a clients to access API -func (beat *Beat) createHTTPClient() (*http.Client, error) { - tlsConfig, err := beat.ClientConfig.TLSConfig() - if err != nil { - return nil, err - } - - client := &http.Client{ - Transport: &http.Transport{ - TLSClientConfig: tlsConfig, - }, - Timeout: time.Duration(beat.Timeout), - } - - return client, nil -} - -// gatherJSONData query the data source and parse the response JSON -func (beat *Beat) gatherJSONData(address string, value interface{}) error { - request, err := http.NewRequest(beat.Method, address, nil) - if err != nil { - return err - } - - if beat.Username != "" { - request.SetBasicAuth(beat.Username, beat.Password) - } - for k, v := range beat.Headers { - request.Header.Add(k, v) - } - if beat.HostHeader != "" { - request.Host = beat.HostHeader - } - - response, err := beat.client.Do(request) - if err != nil { - return err - } - - defer response.Body.Close() - - return json.NewDecoder(response.Body).Decode(value) -} - func (beat *Beat) Gather(accumulator telegraf.Accumulator) error { - beatStats := &Stats{} - beatInfo := &Info{} + beatStats := &stats{} + beatInfo := &info{} infoURL, err := url.Parse(beat.URL + suffixInfo) if err != nil { @@ -175,7 +124,7 @@ func (beat *Beat) Gather(accumulator telegraf.Accumulator) error { stats = beatStats.System metric = "beat_system" case "libbeat": - stats = beatStats.Libbeat + stats = beatStats.LibBeat metric = "beat_libbeat" default: return fmt.Errorf("unknown stats-type %q", name) @@ -191,8 +140,62 @@ func (beat *Beat) Gather(accumulator telegraf.Accumulator) error { return nil } +// createHTTPClient create a clients to access API +func (beat *Beat) createHTTPClient() (*http.Client, error) { + tlsConfig, err := beat.ClientConfig.TLSConfig() + if err != nil { + return nil, err + } + + client := &http.Client{ + Transport: &http.Transport{ + TLSClientConfig: tlsConfig, + }, + Timeout: time.Duration(beat.Timeout), + } + + return client, nil +} + +// gatherJSONData query the data source and parse the response JSON +func (beat *Beat) gatherJSONData(address string, value interface{}) error { + request, err := http.NewRequest(beat.Method, address, nil) + if err != nil { + return err + } + + if beat.Username != "" { + request.SetBasicAuth(beat.Username, beat.Password) + } + for k, v := range beat.Headers { + request.Header.Add(k, v) + } + if beat.HostHeader != "" { + request.Host = beat.HostHeader + } + + response, err := beat.client.Do(request) + if err != nil { + return err + } + + defer response.Body.Close() + + return json.NewDecoder(response.Body).Decode(value) +} + +func newBeat() *Beat { + return &Beat{ + URL: "http://127.0.0.1:5066", + Includes: []string{"beat", "libbeat", "filebeat"}, + Method: "GET", + Headers: make(map[string]string), + Timeout: config.Duration(time.Second * 5), + } +} + func init() { inputs.Add("beat", func() telegraf.Input { - return NewBeat() + return newBeat() }) } diff --git a/plugins/inputs/beat/beat_test.go b/plugins/inputs/beat/beat_test.go index fae8818f49f3d..c9b9d6071a626 100644 --- a/plugins/inputs/beat/beat_test.go +++ b/plugins/inputs/beat/beat_test.go @@ -16,7 +16,7 @@ import ( func Test_BeatStats(t *testing.T) { var beat6StatsAccumulator testutil.Accumulator - var beatTest = NewBeat() + var beatTest = newBeat() // System stats are disabled by default beatTest.Includes = []string{"beat", "libbeat", "system", "filebeat"} require.NoError(t, beatTest.Init()) @@ -160,7 +160,7 @@ func Test_BeatStats(t *testing.T) { func Test_BeatRequest(t *testing.T) { var beat6StatsAccumulator testutil.Accumulator - beatTest := NewBeat() + beatTest := newBeat() // System stats are disabled by default beatTest.Includes = []string{"beat", "libbeat", "system", "filebeat"} require.NoError(t, beatTest.Init()) diff --git a/plugins/inputs/bind/bind.go b/plugins/inputs/bind/bind.go index c78d3a5cbeb3d..e07a8aeb2b54d 100644 --- a/plugins/inputs/bind/bind.go +++ b/plugins/inputs/bind/bind.go @@ -17,6 +17,7 @@ import ( //go:embed sample.conf var sampleConfig string +// Bind 9 Nameserver Statistics Input Plugin type Bind struct { Urls []string GatherMemoryContexts bool diff --git a/plugins/inputs/bond/bond.go b/plugins/inputs/bond/bond.go index 929d469510fe3..4635da70d338b 100644 --- a/plugins/inputs/bond/bond.go +++ b/plugins/inputs/bond/bond.go @@ -17,14 +17,14 @@ import ( //go:embed sample.conf var sampleConfig string -// default host proc path -const defaultHostProc = "/proc" -const defaultHostSys = "/sys" - -// env host proc variable name -const envProc = "HOST_PROC" -const envSys = "HOST_SYS" +const ( + defaultHostProc = "/proc" + defaultHostSys = "/sys" + envProc = "HOST_PROC" + envSys = "HOST_SYS" +) +// Bond Input Plugin type Bond struct { HostProc string `toml:"host_proc"` HostSys string `toml:"host_sys"` diff --git a/plugins/inputs/burrow/burrow.go b/plugins/inputs/burrow/burrow.go index c58f2f24b9264..51ae4892bac0f 100644 --- a/plugins/inputs/burrow/burrow.go +++ b/plugins/inputs/burrow/burrow.go @@ -31,7 +31,8 @@ const ( ) type ( - burrow struct { + // Burrow Kafka Consumer Lag Checking Input Plugin + Burrow struct { tls.ClientConfig Servers []string @@ -91,17 +92,11 @@ type ( } ) -func init() { - inputs.Add("burrow", func() telegraf.Input { - return &burrow{} - }) -} - -func (*burrow) SampleConfig() string { +func (*Burrow) SampleConfig() string { return sampleConfig } -func (b *burrow) Gather(acc telegraf.Accumulator) error { +func (b *Burrow) Gather(acc telegraf.Accumulator) error { var wg sync.WaitGroup if len(b.Servers) == 0 { @@ -141,7 +136,7 @@ func (b *burrow) Gather(acc telegraf.Accumulator) error { return nil } -func (b *burrow) setDefaults() { +func (b *Burrow) setDefaults() { if b.APIPrefix == "" { b.APIPrefix = defaultBurrowPrefix } @@ -153,7 +148,7 @@ func (b *burrow) setDefaults() { } } -func (b *burrow) compileGlobs() error { +func (b *Burrow) compileGlobs() error { var err error // compile glob patterns @@ -172,7 +167,7 @@ func (b *burrow) compileGlobs() error { return nil } -func (b *burrow) createClient() (*http.Client, error) { +func (b *Burrow) createClient() (*http.Client, error) { tlsCfg, err := b.ClientConfig.TLSConfig() if err != nil { return nil, err @@ -199,7 +194,7 @@ func (b *burrow) createClient() (*http.Client, error) { return client, nil } -func (b *burrow) getResponse(u *url.URL) (*apiResponse, error) { +func (b *Burrow) getResponse(u *url.URL) (*apiResponse, error) { req, err := http.NewRequest(http.MethodGet, u.String(), nil) if err != nil { return nil, err @@ -224,7 +219,7 @@ func (b *burrow) getResponse(u *url.URL) (*apiResponse, error) { return ares, dec.Decode(ares) } -func (b *burrow) gatherServer(src *url.URL, acc telegraf.Accumulator) error { +func (b *Burrow) gatherServer(src *url.URL, acc telegraf.Accumulator) error { var wg sync.WaitGroup r, err := b.getResponse(src) @@ -263,7 +258,7 @@ func (b *burrow) gatherServer(src *url.URL, acc telegraf.Accumulator) error { return nil } -func (b *burrow) gatherTopics(guard chan struct{}, src *url.URL, cluster string, acc telegraf.Accumulator) { +func (b *Burrow) gatherTopics(guard chan struct{}, src *url.URL, cluster string, acc telegraf.Accumulator) { var wg sync.WaitGroup r, err := b.getResponse(src) @@ -302,7 +297,7 @@ func (b *burrow) gatherTopics(guard chan struct{}, src *url.URL, cluster string, wg.Wait() } -func (b *burrow) genTopicMetrics(r *apiResponse, cluster, topic string, acc telegraf.Accumulator) { +func (b *Burrow) genTopicMetrics(r *apiResponse, cluster, topic string, acc telegraf.Accumulator) { for i, offset := range r.Offsets { tags := map[string]string{ "cluster": cluster, @@ -320,7 +315,7 @@ func (b *burrow) genTopicMetrics(r *apiResponse, cluster, topic string, acc tele } } -func (b *burrow) gatherGroups(guard chan struct{}, src *url.URL, cluster string, acc telegraf.Accumulator) { +func (b *Burrow) gatherGroups(guard chan struct{}, src *url.URL, cluster string, acc telegraf.Accumulator) { var wg sync.WaitGroup r, err := b.getResponse(src) @@ -360,7 +355,7 @@ func (b *burrow) gatherGroups(guard chan struct{}, src *url.URL, cluster string, wg.Wait() } -func (b *burrow) genGroupStatusMetrics(r *apiResponse, cluster, group string, acc telegraf.Accumulator) { +func (b *Burrow) genGroupStatusMetrics(r *apiResponse, cluster, group string, acc telegraf.Accumulator) { partitionCount := r.Status.PartitionCount if partitionCount == 0 { partitionCount = len(r.Status.Partitions) @@ -399,7 +394,7 @@ func (b *burrow) genGroupStatusMetrics(r *apiResponse, cluster, group string, ac ) } -func (b *burrow) genGroupLagMetrics(r *apiResponse, cluster, group string, acc telegraf.Accumulator) { +func (b *Burrow) genGroupLagMetrics(r *apiResponse, cluster, group string, acc telegraf.Accumulator) { for _, partition := range r.Status.Partitions { if !b.filterTopics.Match(partition.Topic) { continue @@ -455,3 +450,9 @@ func mapStatusToCode(src string) int { return 0 } } + +func init() { + inputs.Add("burrow", func() telegraf.Input { + return &Burrow{} + }) +} diff --git a/plugins/inputs/burrow/burrow_test.go b/plugins/inputs/burrow/burrow_test.go index 358af7447a465..aefa78202518c 100644 --- a/plugins/inputs/burrow/burrow_test.go +++ b/plugins/inputs/burrow/burrow_test.go @@ -73,7 +73,7 @@ func TestBurrowTopic(t *testing.T) { s := getHTTPServer() defer s.Close() - plugin := &burrow{Servers: []string{s.URL}} + plugin := &Burrow{Servers: []string{s.URL}} acc := &testutil.Accumulator{} require.NoError(t, plugin.Gather(acc)) @@ -102,7 +102,7 @@ func TestBurrowPartition(t *testing.T) { s := getHTTPServer() defer s.Close() - plugin := &burrow{ + plugin := &Burrow{ Servers: []string{s.URL}, } acc := &testutil.Accumulator{} @@ -150,7 +150,7 @@ func TestBurrowGroup(t *testing.T) { s := getHTTPServer() defer s.Close() - plugin := &burrow{ + plugin := &Burrow{ Servers: []string{s.URL}, } acc := &testutil.Accumulator{} @@ -188,7 +188,7 @@ func TestMultipleServers(t *testing.T) { s2 := getHTTPServer() defer s2.Close() - plugin := &burrow{ + plugin := &Burrow{ Servers: []string{s1.URL, s2.URL}, } acc := &testutil.Accumulator{} @@ -203,7 +203,7 @@ func TestMultipleRuns(t *testing.T) { s := getHTTPServer() defer s.Close() - plugin := &burrow{ + plugin := &Burrow{ Servers: []string{s.URL}, } for i := 0; i < 4; i++ { @@ -220,7 +220,7 @@ func TestBasicAuthConfig(t *testing.T) { s := getHTTPServerBasicAuth() defer s.Close() - plugin := &burrow{ + plugin := &Burrow{ Servers: []string{s.URL}, Username: "test", Password: "test", @@ -238,7 +238,7 @@ func TestFilterClusters(t *testing.T) { s := getHTTPServer() defer s.Close() - plugin := &burrow{ + plugin := &Burrow{ Servers: []string{s.URL}, ClustersInclude: []string{"wrongname*"}, // clustername1 -> no match } @@ -256,7 +256,7 @@ func TestFilterGroups(t *testing.T) { s := getHTTPServer() defer s.Close() - plugin := &burrow{ + plugin := &Burrow{ Servers: []string{s.URL}, GroupsInclude: []string{"group?"}, // group1 -> match TopicsExclude: []string{"*"}, // exclude all @@ -274,7 +274,7 @@ func TestFilterTopics(t *testing.T) { s := getHTTPServer() defer s.Close() - plugin := &burrow{ + plugin := &Burrow{ Servers: []string{s.URL}, TopicsInclude: []string{"topic?"}, // topicA -> match GroupsExclude: []string{"*"}, // exclude all