Skip to content

Commit

Permalink
Merge branch 'influxdata:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
asaharn authored Jan 6, 2025
2 parents cca87a0 + 0c7c424 commit 28fb084
Show file tree
Hide file tree
Showing 41 changed files with 296 additions and 141 deletions.
3 changes: 2 additions & 1 deletion plugins/inputs/ipset/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,10 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
## You can avoid using sudo or root, by setting appropriate privileges for
## the telegraf.service systemd service.
use_sudo = false
## Add number of entries and number of individual IPs (resolve CIDR syntax) for each ipset
count_per_ip_entries = false
## The default timeout of 1s for ipset execution can be overridden here:
# timeout = "1s"

```

## Metrics
Expand Down
12 changes: 11 additions & 1 deletion plugins/inputs/ipset/ipset.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@ type Ipset struct {
IncludeUnmatchedSets bool `toml:"include_unmatched_sets"`
UseSudo bool `toml:"use_sudo"`
Timeout config.Duration `toml:"timeout"`
CountPerIPEntries bool

lister setLister
lister setLister
entriesParser ipsetEntries
}

type setLister func(Timeout config.Duration, UseSudo bool) (*bytes.Buffer, error)
Expand All @@ -56,6 +58,11 @@ func (i *Ipset) Gather(acc telegraf.Accumulator) error {
scanner := bufio.NewScanner(out)
for scanner.Scan() {
line := scanner.Text()

if i.CountPerIPEntries {
acc.AddError(i.entriesParser.addLine(line, acc))
}

// Ignore sets created without the "counters" option
nocomment := strings.Split(line, "\"")[0]
if !(strings.Contains(nocomment, "packets") &&
Expand Down Expand Up @@ -101,6 +108,9 @@ func (i *Ipset) Gather(acc telegraf.Accumulator) error {
acc.AddCounter(measurement, fields, tags)
}
}

i.entriesParser.commit(acc)

return nil
}

Expand Down
86 changes: 86 additions & 0 deletions plugins/inputs/ipset/ipset_entries.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
//go:generate ../../../tools/readme_config_includer/generator
package ipset

import (
"errors"
"fmt"
"net"
"strings"

"github.com/influxdata/telegraf"
)

type ipsetEntries struct {
initialized bool
setName string
entries int
ips int
}

func getCountInCidr(cidr string) (int, error) {
_, ipNet, err := net.ParseCIDR(cidr)
if err != nil {
// check if single IP
if net.ParseIP(cidr) == nil {
return 0, errors.New("invalid IP address format. Not CIDR format and not a single IP address")
}
return 1, nil // Single IP has only one address
}

ones, bits := ipNet.Mask.Size()
if ones == 0 && bits == 0 {
return 0, errors.New("invalid CIDR range")
}
numIps := 1 << (bits - ones)

// exclude network and broadcast addresses if IPv4 and range > /31
if bits == 32 && numIps > 2 {
numIps -= 2
}

return numIps, nil
}

func (counter *ipsetEntries) addLine(line string, acc telegraf.Accumulator) error {
data := strings.Fields(line)
if len(data) < 3 {
return fmt.Errorf("error parsing line (expected at least 3 fields): %s", line)
}

switch data[0] {
case "create":
counter.commit(acc)
counter.initialized = true
counter.setName = data[1]
counter.entries = 0
counter.ips = 0
case "add":
counter.entries++
count, err := getCountInCidr(data[2])
if err != nil {
return err
}
counter.ips += count
}
return nil
}

func (counter *ipsetEntries) commit(acc telegraf.Accumulator) {
if !counter.initialized {
return
}

fields := map[string]interface{}{
"entries": counter.entries,
"ips": counter.ips,
}

tags := map[string]string{
"set": counter.setName,
}

acc.AddGauge(measurement, fields, tags)

// reset counter and prepare for next usage
counter.initialized = false
}
96 changes: 96 additions & 0 deletions plugins/inputs/ipset/ipset_entries_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package ipset

import (
"testing"
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
)

func TestIpsetEntries(t *testing.T) {
var acc testutil.Accumulator

lines := []string{
"create mylist hash:net family inet hashsize 16384 maxelem 131072 timeout 300 bucketsize 12 initval 0x4effa9ad",
"add mylist 89.101.238.143 timeout 161558",
"add mylist 122.224.15.166 timeout 186758",
"add mylist 47.128.40.145 timeout 431559",
}

entries := ipsetEntries{}
for _, line := range lines {
require.NoError(t, entries.addLine(line, &acc))
}
entries.commit(&acc)

expected := []telegraf.Metric{
testutil.MustMetric(
"ipset",
map[string]string{
"set": "mylist",
},
map[string]interface{}{
"entries": 3,
"ips": 3,
},
time.Unix(0, 0),
telegraf.Gauge,
),
}

testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics(), testutil.IgnoreTime())
}

func TestIpsetEntriesCidr(t *testing.T) {
var acc testutil.Accumulator

lines := []string{
"create mylist0 hash:net family inet hashsize 16384 maxelem 131072 timeout 300 bucketsize 12 initval 0x4effa9ad",
"add mylist0 89.101.238.143 timeout 161558",
"add mylist0 122.224.5.0/24 timeout 186758",
"add mylist0 47.128.40.145 timeout 431559",

"create mylist1 hash:net family inet hashsize 16384 maxelem 131072 timeout 300 bucketsize 12 initval 0x4effa9ad",
"add mylist1 90.101.238.143 timeout 161558",
"add mylist1 44.128.40.145 timeout 431559",
"add mylist1 122.224.5.0/8 timeout 186758",
"add mylist1 45.128.40.145 timeout 431560",
}

entries := ipsetEntries{}
for _, line := range lines {
require.NoError(t, entries.addLine(line, &acc))
}
entries.commit(&acc)

expected := []telegraf.Metric{
testutil.MustMetric(
"ipset",
map[string]string{
"set": "mylist0",
},
map[string]interface{}{
"entries": 3,
"ips": 256,
},
time.Now().Add(time.Millisecond*0),
telegraf.Gauge,
),
testutil.MustMetric(
"ipset",
map[string]string{
"set": "mylist1",
},
map[string]interface{}{
"entries": 4,
"ips": 16777217,
},
time.Unix(0, 0),
telegraf.Gauge,
),
}

testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics(), testutil.IgnoreTime())
}
3 changes: 2 additions & 1 deletion plugins/inputs/ipset/sample.conf
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
## You can avoid using sudo or root, by setting appropriate privileges for
## the telegraf.service systemd service.
use_sudo = false
## Add number of entries and number of individual IPs (resolve CIDR syntax) for each ipset
count_per_ip_entries = false
## The default timeout of 1s for ipset execution can be overridden here:
# timeout = "1s"

12 changes: 6 additions & 6 deletions plugins/inputs/sflow/packetdecoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,9 +372,9 @@ func (d *packetDecoder) decodeIPv4Header(r io.Reader) (h ipV4Header, err error)
}
switch h.Protocol {
case ipProtocolTCP:
h.ProtocolHeader, err = d.decodeTCPHeader(r)
h.ProtocolHeader, err = decodeTCPHeader(r)
case ipProtocolUDP:
h.ProtocolHeader, err = d.decodeUDPHeader(r)
h.ProtocolHeader, err = decodeUDPHeader(r)
default:
d.debug("Unknown IP protocol: ", h.Protocol)
}
Expand Down Expand Up @@ -412,9 +412,9 @@ func (d *packetDecoder) decodeIPv6Header(r io.Reader) (h ipV6Header, err error)
}
switch h.NextHeaderProto {
case ipProtocolTCP:
h.ProtocolHeader, err = d.decodeTCPHeader(r)
h.ProtocolHeader, err = decodeTCPHeader(r)
case ipProtocolUDP:
h.ProtocolHeader, err = d.decodeUDPHeader(r)
h.ProtocolHeader, err = decodeUDPHeader(r)
default:
// not handled
d.debug("Unknown IP protocol: ", h.NextHeaderProto)
Expand All @@ -423,7 +423,7 @@ func (d *packetDecoder) decodeIPv6Header(r io.Reader) (h ipV6Header, err error)
}

// https://en.wikipedia.org/wiki/Transmission_Control_Protocol#TCP_segment_structure
func (d *packetDecoder) decodeTCPHeader(r io.Reader) (h tcpHeader, err error) {
func decodeTCPHeader(r io.Reader) (h tcpHeader, err error) {
if err := read(r, &h.SourcePort, "SourcePort"); err != nil {
return h, err
}
Expand Down Expand Up @@ -461,7 +461,7 @@ func (d *packetDecoder) decodeTCPHeader(r io.Reader) (h tcpHeader, err error) {
return h, err
}

func (d *packetDecoder) decodeUDPHeader(r io.Reader) (h udpHeader, err error) {
func decodeUDPHeader(r io.Reader) (h udpHeader, err error) {
if err := read(r, &h.SourcePort, "SourcePort"); err != nil {
return h, err
}
Expand Down
7 changes: 2 additions & 5 deletions plugins/inputs/sflow/packetdecoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@ func TestUDPHeader(t *testing.T) {
0x00, 0x00, // checksum
})

dc := newDecoder()
actual, err := dc.decodeUDPHeader(octets)
actual, err := decodeUDPHeader(octets)
require.NoError(t, err)

expected := udpHeader{
Expand All @@ -36,11 +35,9 @@ func BenchmarkUDPHeader(b *testing.B) {
0x00, 0x00, // checksum
})

dc := newDecoder()

b.ResetTimer()
for n := 0; n < b.N; n++ {
_, err := dc.decodeUDPHeader(octets)
_, err := decodeUDPHeader(octets)
require.NoError(b, err)
}
}
Expand Down
2 changes: 1 addition & 1 deletion plugins/inputs/sflow/sflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (s *SFlow) Start(acc telegraf.Accumulator) error {
}

// Gather is a NOOP for sFlow as it receives, asynchronously, sFlow network packets
func (s *SFlow) Gather(_ telegraf.Accumulator) error {
func (*SFlow) Gather(telegraf.Accumulator) error {
return nil
}

Expand Down
4 changes: 0 additions & 4 deletions plugins/inputs/slab/slab.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,6 @@ func (*SlabStats) SampleConfig() string {
return sampleConfig
}

func (ss *SlabStats) Init() error {
return nil
}

func (ss *SlabStats) Gather(acc telegraf.Accumulator) error {
fields, err := ss.getSlabStats()
if err != nil {
Expand Down
10 changes: 5 additions & 5 deletions plugins/inputs/slurm/slurm.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func (s *Slurm) Init() error {
return nil
}

func (s *Slurm) parseTres(tres string) map[string]interface{} {
func parseTres(tres string) map[string]interface{} {
tresKVs := strings.Split(tres, ",")
parsedValues := make(map[string]interface{}, len(tresKVs))

Expand Down Expand Up @@ -258,7 +258,7 @@ func (s *Slurm) gatherJobsMetrics(acc telegraf.Accumulator, jobs []goslurm.V0038
records["time_limit"] = *int64Ptr
}
if strPtr, ok := jobs[i].GetTresReqStrOk(); ok {
for k, v := range s.parseTres(*strPtr) {
for k, v := range parseTres(*strPtr) {
records["tres_"+k] = v
}
}
Expand Down Expand Up @@ -302,12 +302,12 @@ func (s *Slurm) gatherNodesMetrics(acc telegraf.Accumulator, nodes []goslurm.V00
records["alloc_memory"] = *int64Ptr
}
if strPtr, ok := node.GetTresOk(); ok {
for k, v := range s.parseTres(*strPtr) {
for k, v := range parseTres(*strPtr) {
records["tres_"+k] = v
}
}
if strPtr, ok := node.GetTresUsedOk(); ok {
for k, v := range s.parseTres(*strPtr) {
for k, v := range parseTres(*strPtr) {
records["tres_used_"+k] = v
}
}
Expand Down Expand Up @@ -348,7 +348,7 @@ func (s *Slurm) gatherPartitionsMetrics(acc telegraf.Accumulator, partitions []g
records["nodes"] = *strPtr
}
if strPtr, ok := partition.GetTresOk(); ok {
for k, v := range s.parseTres(*strPtr) {
for k, v := range parseTres(*strPtr) {
records["tres_"+k] = v
}
}
Expand Down
2 changes: 1 addition & 1 deletion plugins/inputs/snmp/snmp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (tsc *testSNMPConnection) Walk(oid string, wf gosnmp.WalkFunc) error {
}
return nil
}
func (tsc *testSNMPConnection) Reconnect() error {
func (*testSNMPConnection) Reconnect() error {
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion plugins/inputs/snmp_trap/gosmi.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
type gosmiTranslator struct {
}

func (t *gosmiTranslator) lookup(oid string) (snmp.MibEntry, error) {
func (*gosmiTranslator) lookup(oid string) (snmp.MibEntry, error) {
return snmp.TrapLookup(oid)
}

Expand Down
2 changes: 1 addition & 1 deletion plugins/inputs/snmp_trap/snmp_trap.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (*SnmpTrap) SampleConfig() string {
return sampleConfig
}

func (s *SnmpTrap) Gather(_ telegraf.Accumulator) error {
func (*SnmpTrap) Gather(telegraf.Accumulator) error {
return nil
}

Expand Down
Loading

0 comments on commit 28fb084

Please sign in to comment.