From 89a2456d5c22e0012fb8fea2b8806b66d5efe8b8 Mon Sep 17 00:00:00 2001 From: Pawel Zak Date: Fri, 11 Oct 2024 13:07:03 +0200 Subject: [PATCH] chore: Fix linter findings for `revive:exported` in `plugins/inputs/[c]*` --- plugins/inputs/ceph/ceph.go | 94 +++++----- plugins/inputs/chrony/chrony.go | 100 +++++----- .../cisco_telemetry_mdt.go | 149 ++++++++------- .../cisco_telemetry_mdt_test.go | 12 +- plugins/inputs/clickhouse/clickhouse.go | 39 ++-- plugins/inputs/cloud_pubsub/cloud_pubsub.go | 76 ++++---- .../inputs/cloud_pubsub/cloud_pubsub_test.go | 4 +- .../inputs/cloud_pubsub/subscription_stub.go | 21 +-- .../cloud_pubsub_push/cloud_pubsub_push.go | 34 ++-- plugins/inputs/cloudwatch/cloudwatch.go | 73 ++++---- plugins/inputs/cloudwatch/cloudwatch_test.go | 4 +- .../cloudwatch_metric_streams.go | 129 ++++++------- .../cloudwatch_metric_streams_test.go | 8 +- plugins/inputs/conntrack/conntrack.go | 55 +++--- .../inputs/conntrack/conntrack_notlinux.go | 4 +- plugins/inputs/consul/consul.go | 48 ++--- plugins/inputs/consul/consul_test.go | 8 +- plugins/inputs/consul_agent/consul_agent.go | 28 ++- plugins/inputs/consul_agent/consul_structs.go | 16 +- plugins/inputs/couchbase/couchbase.go | 68 +++---- plugins/inputs/couchbase/couchbase_data.go | 2 +- plugins/inputs/couchbase/couchbase_test.go | 2 +- plugins/inputs/couchdb/couchdb.go | 20 +- plugins/inputs/cpu/cpu.go | 38 ++-- plugins/inputs/cpu/cpu_test.go | 8 +- .../inputs/ctrlx_datalayer/ctrlx_datalayer.go | 177 +++++++++--------- 26 files changed, 599 insertions(+), 618 deletions(-) diff --git a/plugins/inputs/ceph/ceph.go b/plugins/inputs/ceph/ceph.go index e87c777171dff..528768dc23bfb 100644 --- a/plugins/inputs/ceph/ceph.go +++ b/plugins/inputs/ceph/ceph.go @@ -18,19 +18,6 @@ import ( //go:embed sample.conf var sampleConfig string -const ( - measurement = "ceph" - typeMon = "monitor" - typeOsd = "osd" - typeMds = "mds" - typeRgw = "rgw" - osdPrefix = "ceph-osd" - monPrefix = "ceph-mon" - mdsPrefix = "ceph-mds" - rgwPrefix = "ceph-client" - sockSuffix = "asok" -) - type Ceph struct { CephBinary string `toml:"ceph_binary"` OsdPrefix string `toml:"osd_prefix"` @@ -48,6 +35,19 @@ type Ceph struct { schemaMaps map[socket]perfSchemaMap } +const ( + measurement = "ceph" + typeMon = "monitor" + typeOsd = "osd" + typeMds = "mds" + typeRgw = "rgw" + osdPrefix = "ceph-osd" + monPrefix = "ceph-mon" + mdsPrefix = "ceph-mds" + rgwPrefix = "ceph-client" + sockSuffix = "asok" +) + func (*Ceph) SampleConfig() string { return sampleConfig } @@ -149,24 +149,6 @@ func (c *Ceph) gatherClusterStats(acc telegraf.Accumulator) error { return nil } -func init() { - inputs.Add(measurement, func() telegraf.Input { - return &Ceph{ - CephBinary: "/usr/bin/ceph", - OsdPrefix: osdPrefix, - MonPrefix: monPrefix, - MdsPrefix: mdsPrefix, - RgwPrefix: rgwPrefix, - SocketDir: "/var/run/ceph", - SocketSuffix: sockSuffix, - CephUser: "client.admin", - CephConfig: "/etc/ceph/ceph.conf", - GatherAdminSocketStats: true, - GatherClusterStats: false, - } - }) -} - // Run ceph perf schema on the passed socket. The output is a JSON string // mapping collection names to a map of counter names to information. // @@ -428,8 +410,8 @@ func (c *Ceph) execute(command string) (string, error) { return output, nil } -// CephStatus is used to unmarshal "ceph -s" output -type CephStatus struct { +// status is used to unmarshal "ceph -s" output +type status struct { FSMap struct { NumIn float64 `json:"in"` NumMax float64 `json:"max"` @@ -492,12 +474,12 @@ type CephStatus struct { // decodeStatus decodes the output of 'ceph -s' func decodeStatus(acc telegraf.Accumulator, input string) error { - data := &CephStatus{} + data := &status{} if err := json.Unmarshal([]byte(input), data); err != nil { return fmt.Errorf("failed to parse json: %q: %w", input, err) } - decoders := []func(telegraf.Accumulator, *CephStatus) error{ + decoders := []func(telegraf.Accumulator, *status) error{ decodeStatusFsmap, decodeStatusHealth, decodeStatusMonmap, @@ -516,7 +498,7 @@ func decodeStatus(acc telegraf.Accumulator, input string) error { } // decodeStatusFsmap decodes the FS map portion of the output of 'ceph -s' -func decodeStatusFsmap(acc telegraf.Accumulator, data *CephStatus) error { +func decodeStatusFsmap(acc telegraf.Accumulator, data *status) error { fields := map[string]interface{}{ "in": data.FSMap.NumIn, "max": data.FSMap.NumMax, @@ -528,7 +510,7 @@ func decodeStatusFsmap(acc telegraf.Accumulator, data *CephStatus) error { } // decodeStatusHealth decodes the health portion of the output of 'ceph status' -func decodeStatusHealth(acc telegraf.Accumulator, data *CephStatus) error { +func decodeStatusHealth(acc telegraf.Accumulator, data *status) error { statusCodes := map[string]float64{ "HEALTH_ERR": 0, "HEALTH_WARN": 1, @@ -544,7 +526,7 @@ func decodeStatusHealth(acc telegraf.Accumulator, data *CephStatus) error { } // decodeStatusMonmap decodes the Mon map portion of the output of 'ceph -s' -func decodeStatusMonmap(acc telegraf.Accumulator, data *CephStatus) error { +func decodeStatusMonmap(acc telegraf.Accumulator, data *status) error { fields := map[string]interface{}{ "num_mons": data.MonMap.NumMons, } @@ -553,7 +535,7 @@ func decodeStatusMonmap(acc telegraf.Accumulator, data *CephStatus) error { } // decodeStatusOsdmap decodes the OSD map portion of the output of 'ceph -s' -func decodeStatusOsdmap(acc telegraf.Accumulator, data *CephStatus) error { +func decodeStatusOsdmap(acc telegraf.Accumulator, data *status) error { fields := map[string]interface{}{ "epoch": data.OSDMap.Epoch, "num_in_osds": data.OSDMap.NumInOSDs, @@ -578,7 +560,7 @@ func decodeStatusOsdmap(acc telegraf.Accumulator, data *CephStatus) error { } // decodeStatusPgmap decodes the PG map portion of the output of 'ceph -s' -func decodeStatusPgmap(acc telegraf.Accumulator, data *CephStatus) error { +func decodeStatusPgmap(acc telegraf.Accumulator, data *status) error { fields := map[string]interface{}{ "bytes_avail": data.PGMap.BytesAvail, "bytes_total": data.PGMap.BytesTotal, @@ -609,7 +591,7 @@ func decodeStatusPgmap(acc telegraf.Accumulator, data *CephStatus) error { } // decodeStatusPgmapState decodes the PG map state portion of the output of 'ceph -s' -func decodeStatusPgmapState(acc telegraf.Accumulator, data *CephStatus) error { +func decodeStatusPgmapState(acc telegraf.Accumulator, data *status) error { for _, pgState := range data.PGMap.PGsByState { tags := map[string]string{ "state": pgState.StateName, @@ -622,8 +604,8 @@ func decodeStatusPgmapState(acc telegraf.Accumulator, data *CephStatus) error { return nil } -// CephDF is used to unmarshal 'ceph df' output -type CephDf struct { +// df is used to unmarshal 'ceph df' output +type df struct { Stats struct { NumOSDs float64 `json:"num_osds"` NumPerPoolOmapOSDs float64 `json:"num_per_pool_omap_osds"` @@ -653,7 +635,7 @@ type CephDf struct { // decodeDf decodes the output of 'ceph df' func decodeDf(acc telegraf.Accumulator, input string) error { - data := &CephDf{} + data := &df{} if err := json.Unmarshal([]byte(input), data); err != nil { return fmt.Errorf("failed to parse json: %q: %w", input, err) } @@ -705,8 +687,8 @@ func decodeDf(acc telegraf.Accumulator, input string) error { return nil } -// CephOSDPoolStats is used to unmarshal 'ceph osd pool stats' output -type CephOSDPoolStats []struct { +// osdPoolStats is used to unmarshal 'ceph osd pool stats' output +type osdPoolStats []struct { PoolName string `json:"pool_name"` ClientIORate struct { OpPerSec float64 `json:"op_per_sec"` // This field is no longer reported in ceph 10 and later @@ -732,7 +714,7 @@ type CephOSDPoolStats []struct { // decodeOsdPoolStats decodes the output of 'ceph osd pool stats' func decodeOsdPoolStats(acc telegraf.Accumulator, input string) error { - data := CephOSDPoolStats{} + data := osdPoolStats{} if err := json.Unmarshal([]byte(input), &data); err != nil { return fmt.Errorf("failed to parse json: %q: %w", input, err) } @@ -763,3 +745,21 @@ func decodeOsdPoolStats(acc telegraf.Accumulator, input string) error { return nil } + +func init() { + inputs.Add(measurement, func() telegraf.Input { + return &Ceph{ + CephBinary: "/usr/bin/ceph", + OsdPrefix: osdPrefix, + MonPrefix: monPrefix, + MdsPrefix: mdsPrefix, + RgwPrefix: rgwPrefix, + SocketDir: "/var/run/ceph", + SocketSuffix: sockSuffix, + CephUser: "client.admin", + CephConfig: "/etc/ceph/ceph.conf", + GatherAdminSocketStats: true, + GatherClusterStats: false, + } + }) +} diff --git a/plugins/inputs/chrony/chrony.go b/plugins/inputs/chrony/chrony.go index d7a596820a14c..6a76aa6e57b4d 100644 --- a/plugins/inputs/chrony/chrony.go +++ b/plugins/inputs/chrony/chrony.go @@ -46,43 +46,6 @@ func (*Chrony) SampleConfig() string { return sampleConfig } -// dialUnix opens an unixgram connection with chrony -func (c *Chrony) dialUnix(address string) (*net.UnixConn, error) { - dir := path.Dir(address) - c.local = path.Join(dir, fmt.Sprintf("chrony-telegraf-%s.sock", uuid.New().String())) - conn, err := net.DialUnix("unixgram", - &net.UnixAddr{Name: c.local, Net: "unixgram"}, - &net.UnixAddr{Name: address, Net: "unixgram"}, - ) - - if err != nil { - return nil, err - } - - filemode, err := strconv.ParseUint(c.SocketPerms, 8, 32) - if err != nil { - return nil, fmt.Errorf("parsing file mode %q failed: %w", c.SocketPerms, err) - } - - if err := os.Chmod(c.local, os.FileMode(filemode)); err != nil { - return nil, fmt.Errorf("changing file mode of %q failed: %w", c.local, err) - } - - group, err := user.LookupGroup(c.SocketGroup) - if err != nil { - return nil, fmt.Errorf("looking up group %q failed: %w", c.SocketGroup, err) - } - gid, err := strconv.Atoi(group.Gid) - if err != nil { - return nil, fmt.Errorf("parsing group ID %q failed: %w", group.Gid, err) - } - if err := os.Chown(c.local, os.Getuid(), gid); err != nil { - return nil, fmt.Errorf("changing group of %q failed: %w", c.local, err) - } - - return conn, nil -} - func (c *Chrony) Init() error { // Use the configured server, if none set, we try to guess it in Start() if c.Server != "" { @@ -182,19 +145,6 @@ func (c *Chrony) Start(_ telegraf.Accumulator) error { return nil } -func (c *Chrony) Stop() { - if c.conn != nil { - if err := c.conn.Close(); err != nil && !errors.Is(err, net.ErrClosed) && !errors.Is(err, syscall.EPIPE) { - c.Log.Errorf("Closing connection to %q failed: %v", c.Server, err) - } - } - if c.local != "" { - if err := os.Remove(c.local); err != nil { - c.Log.Errorf("Removing temporary socket %q failed: %v", c.local, err) - } - } -} - func (c *Chrony) Gather(acc telegraf.Accumulator) error { for _, m := range c.Metrics { switch m { @@ -216,6 +166,56 @@ func (c *Chrony) Gather(acc telegraf.Accumulator) error { return nil } +func (c *Chrony) Stop() { + if c.conn != nil { + if err := c.conn.Close(); err != nil && !errors.Is(err, net.ErrClosed) && !errors.Is(err, syscall.EPIPE) { + c.Log.Errorf("Closing connection to %q failed: %v", c.Server, err) + } + } + if c.local != "" { + if err := os.Remove(c.local); err != nil { + c.Log.Errorf("Removing temporary socket %q failed: %v", c.local, err) + } + } +} + +// dialUnix opens an unixgram connection with chrony +func (c *Chrony) dialUnix(address string) (*net.UnixConn, error) { + dir := path.Dir(address) + c.local = path.Join(dir, fmt.Sprintf("chrony-telegraf-%s.sock", uuid.New().String())) + conn, err := net.DialUnix("unixgram", + &net.UnixAddr{Name: c.local, Net: "unixgram"}, + &net.UnixAddr{Name: address, Net: "unixgram"}, + ) + + if err != nil { + return nil, err + } + + filemode, err := strconv.ParseUint(c.SocketPerms, 8, 32) + if err != nil { + return nil, fmt.Errorf("parsing file mode %q failed: %w", c.SocketPerms, err) + } + + if err := os.Chmod(c.local, os.FileMode(filemode)); err != nil { + return nil, fmt.Errorf("changing file mode of %q failed: %w", c.local, err) + } + + group, err := user.LookupGroup(c.SocketGroup) + if err != nil { + return nil, fmt.Errorf("looking up group %q failed: %w", c.SocketGroup, err) + } + gid, err := strconv.Atoi(group.Gid) + if err != nil { + return nil, fmt.Errorf("parsing group ID %q failed: %w", group.Gid, err) + } + if err := os.Chown(c.local, os.Getuid(), gid); err != nil { + return nil, fmt.Errorf("changing group of %q failed: %w", c.local, err) + } + + return conn, nil +} + func (c *Chrony) gatherActivity(acc telegraf.Accumulator) error { req := fbchrony.NewActivityPacket() r, err := c.client.Communicate(req) diff --git a/plugins/inputs/cisco_telemetry_mdt/cisco_telemetry_mdt.go b/plugins/inputs/cisco_telemetry_mdt/cisco_telemetry_mdt.go index 89e3ca76d6b5a..a87762b5004c1 100644 --- a/plugins/inputs/cisco_telemetry_mdt/cisco_telemetry_mdt.go +++ b/plugins/inputs/cisco_telemetry_mdt/cisco_telemetry_mdt.go @@ -38,16 +38,11 @@ var sampleConfig string const ( // Maximum telemetry payload size (in bytes) to accept for GRPC dialout transport tcpMaxMsgLen uint32 = 1024 * 1024 -) - -// default minimum time between successive pings -// this value is specified in the GRPC docs via GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS -const defaultKeepaliveMinTime = config.Duration(time.Second * 300) -type GRPCEnforcementPolicy struct { - PermitKeepaliveWithoutCalls bool `toml:"permit_keepalive_without_calls"` - KeepaliveMinTime config.Duration `toml:"keepalive_minimum_time"` -} + // default minimum time between successive pings + // this value is specified in the GRPC docs via GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS + defaultKeepaliveMinTime = config.Duration(time.Second * 300) +) // CiscoTelemetryMDT plugin for IOS XR, IOS XE and NXOS platforms type CiscoTelemetryMDT struct { @@ -58,7 +53,7 @@ type CiscoTelemetryMDT struct { Aliases map[string]string `toml:"aliases"` Dmes map[string]string `toml:"dmes"` EmbeddedTags []string `toml:"embedded_tags"` - EnforcementPolicy GRPCEnforcementPolicy `toml:"grpc_enforcement_policy"` + EnforcementPolicy grpcEnforcementPolicy `toml:"grpc_enforcement_policy"` IncludeDeleteField bool `toml:"include_delete_field"` SourceFieldName string `toml:"source_field_name"` @@ -86,7 +81,12 @@ type CiscoTelemetryMDT struct { mdtdialout.UnimplementedGRPCMdtDialoutServer } -type NxPayloadXfromStructure struct { +type grpcEnforcementPolicy struct { + PermitKeepaliveWithoutCalls bool `toml:"permit_keepalive_without_calls"` + KeepaliveMinTime config.Duration `toml:"keepalive_minimum_time"` +} + +type nxPayloadXfromStructure struct { Name string `json:"Name"` Prop []struct { Key string `json:"Key"` @@ -142,7 +142,7 @@ func (c *CiscoTelemetryMDT) Start(acc telegraf.Accumulator) error { continue } - var jsStruct NxPayloadXfromStructure + var jsStruct nxPayloadXfromStructure err := json.Unmarshal([]byte(dmeKey), &jsStruct) if err != nil { continue @@ -218,7 +218,67 @@ func (c *CiscoTelemetryMDT) Start(acc telegraf.Accumulator) error { return nil } -// AcceptTCPDialoutClients defines the TCP dialout server main routine +func (c *CiscoTelemetryMDT) Gather(_ telegraf.Accumulator) error { + return nil +} + +// Stop listener and cleanup +func (c *CiscoTelemetryMDT) Stop() { + if c.grpcServer != nil { + // Stop server and terminate all running dialout routines + c.grpcServer.Stop() + } + if c.listener != nil { + c.listener.Close() + } + c.wg.Wait() +} + +// MdtDialout RPC server method for grpc-dialout transport +func (c *CiscoTelemetryMDT) MdtDialout(stream mdtdialout.GRPCMdtDialout_MdtDialoutServer) error { + peerInCtx, peerOK := peer.FromContext(stream.Context()) + if peerOK { + c.Log.Debugf("Accepted Cisco MDT GRPC dialout connection from %s", peerInCtx.Addr) + } + + var chunkBuffer bytes.Buffer + + for { + packet, err := stream.Recv() + if err != nil { + if !errors.Is(err, io.EOF) { + c.acc.AddError(fmt.Errorf("receive error during GRPC dialout: %w", err)) + } + break + } + + if len(packet.Data) == 0 && len(packet.Errors) != 0 { + c.acc.AddError(fmt.Errorf("error during GRPC dialout: %s", packet.Errors)) + break + } + + // Reassemble chunked telemetry data received from NX-OS + if packet.TotalSize == 0 { + c.handleTelemetry(packet.Data) + } else if int(packet.TotalSize) <= c.MaxMsgSize { + chunkBuffer.Write(packet.Data) + if chunkBuffer.Len() >= int(packet.TotalSize) { + c.handleTelemetry(chunkBuffer.Bytes()) + chunkBuffer.Reset() + } + } else { + c.acc.AddError(fmt.Errorf("dropped too large packet: %dB > %dB", packet.TotalSize, c.MaxMsgSize)) + } + } + + if peerOK { + c.Log.Debugf("Closed Cisco MDT GRPC dialout connection from %s", peerInCtx.Addr) + } + + return nil +} + +// acceptTCPClients defines the TCP dialout server main routine func (c *CiscoTelemetryMDT) acceptTCPClients() { // Keep track of all active connections, so we can close them if necessary var mutex sync.Mutex @@ -311,50 +371,6 @@ func (c *CiscoTelemetryMDT) handleTCPClient(conn net.Conn) error { } } -// MdtDialout RPC server method for grpc-dialout transport -func (c *CiscoTelemetryMDT) MdtDialout(stream mdtdialout.GRPCMdtDialout_MdtDialoutServer) error { - peerInCtx, peerOK := peer.FromContext(stream.Context()) - if peerOK { - c.Log.Debugf("Accepted Cisco MDT GRPC dialout connection from %s", peerInCtx.Addr) - } - - var chunkBuffer bytes.Buffer - - for { - packet, err := stream.Recv() - if err != nil { - if !errors.Is(err, io.EOF) { - c.acc.AddError(fmt.Errorf("receive error during GRPC dialout: %w", err)) - } - break - } - - if len(packet.Data) == 0 && len(packet.Errors) != 0 { - c.acc.AddError(fmt.Errorf("error during GRPC dialout: %s", packet.Errors)) - break - } - - // Reassemble chunked telemetry data received from NX-OS - if packet.TotalSize == 0 { - c.handleTelemetry(packet.Data) - } else if int(packet.TotalSize) <= c.MaxMsgSize { - chunkBuffer.Write(packet.Data) - if chunkBuffer.Len() >= int(packet.TotalSize) { - c.handleTelemetry(chunkBuffer.Bytes()) - chunkBuffer.Reset() - } - } else { - c.acc.AddError(fmt.Errorf("dropped too large packet: %dB > %dB", packet.TotalSize, c.MaxMsgSize)) - } - } - - if peerOK { - c.Log.Debugf("Closed Cisco MDT GRPC dialout connection from %s", peerInCtx.Addr) - } - - return nil -} - // Handle telemetry packet from any transport, decode and add as measurement func (c *CiscoTelemetryMDT) handleTelemetry(data []byte) { msg := &telemetry.Telemetry{} @@ -782,27 +798,10 @@ func (c *CiscoTelemetryMDT) parseContentField( delete(tags, prefix) } -func (c *CiscoTelemetryMDT) Address() net.Addr { +func (c *CiscoTelemetryMDT) address() net.Addr { return c.listener.Addr() } -// Stop listener and cleanup -func (c *CiscoTelemetryMDT) Stop() { - if c.grpcServer != nil { - // Stop server and terminate all running dialout routines - c.grpcServer.Stop() - } - if c.listener != nil { - c.listener.Close() - } - c.wg.Wait() -} - -// Gather plugin measurements (unused) -func (c *CiscoTelemetryMDT) Gather(_ telegraf.Accumulator) error { - return nil -} - func init() { inputs.Add("cisco_telemetry_mdt", func() telegraf.Input { return &CiscoTelemetryMDT{ diff --git a/plugins/inputs/cisco_telemetry_mdt/cisco_telemetry_mdt_test.go b/plugins/inputs/cisco_telemetry_mdt/cisco_telemetry_mdt_test.go index d9102c1522f3c..5a172f2e8f420 100644 --- a/plugins/inputs/cisco_telemetry_mdt/cisco_telemetry_mdt_test.go +++ b/plugins/inputs/cisco_telemetry_mdt/cisco_telemetry_mdt_test.go @@ -979,7 +979,7 @@ func TestTCPDialoutOverflow(t *testing.T) { MsgLen uint32 }{MsgLen: uint32(1000000000)} - addr := c.Address() + addr := c.address() conn, err := net.Dial(addr.Network(), addr.String()) require.NoError(t, err) require.NoError(t, binary.Write(conn, binary.BigEndian, hdr)) @@ -1104,7 +1104,7 @@ func TestTCPDialoutMultiple(t *testing.T) { MsgLen uint32 }{} - addr := c.Address() + addr := c.address() conn, err := net.Dial(addr.Network(), addr.String()) require.NoError(t, err) @@ -1186,7 +1186,7 @@ func TestGRPCDialoutError(t *testing.T) { err := c.Start(acc) require.NoError(t, err) - addr := c.Address() + addr := c.address() conn, err := grpc.NewClient(addr.String(), grpc.WithTransportCredentials(insecure.NewCredentials())) require.NoError(t, err) client := mdtdialout.NewGRPCMdtDialoutClient(conn) @@ -1220,7 +1220,7 @@ func TestGRPCDialoutMultiple(t *testing.T) { require.NoError(t, err) tel := mockTelemetryMessage() - addr := c.Address() + addr := c.address() conn, err := grpc.NewClient(addr.String(), grpc.WithTransportCredentials(insecure.NewCredentials())) require.NoError(t, err) require.True(t, conn.WaitForStateChange(context.Background(), connectivity.Connecting)) @@ -1297,7 +1297,7 @@ func TestGRPCDialoutKeepalive(t *testing.T) { Log: testutil.Logger{}, Transport: "grpc", ServiceAddress: "127.0.0.1:0", - EnforcementPolicy: GRPCEnforcementPolicy{ + EnforcementPolicy: grpcEnforcementPolicy{ PermitKeepaliveWithoutCalls: true, KeepaliveMinTime: 0, }, @@ -1306,7 +1306,7 @@ func TestGRPCDialoutKeepalive(t *testing.T) { err := c.Start(acc) require.NoError(t, err) - addr := c.Address() + addr := c.address() conn, err := grpc.NewClient(addr.String(), grpc.WithTransportCredentials(insecure.NewCredentials())) require.NoError(t, err) client := mdtdialout.NewGRPCMdtDialoutClient(conn) diff --git a/plugins/inputs/clickhouse/clickhouse.go b/plugins/inputs/clickhouse/clickhouse.go index e1b16912037ac..73a8e39c623bb 100644 --- a/plugins/inputs/clickhouse/clickhouse.go +++ b/plugins/inputs/clickhouse/clickhouse.go @@ -26,26 +26,6 @@ var sampleConfig string var defaultTimeout = 5 * time.Second -type connect struct { - Cluster string `json:"cluster"` - ShardNum int `json:"shard_num"` - Hostname string `json:"host_name"` - url *url.URL -} - -func init() { - inputs.Add("clickhouse", func() telegraf.Input { - return &ClickHouse{ - AutoDiscovery: true, - ClientConfig: tls.ClientConfig{ - InsecureSkipVerify: false, - }, - Timeout: config.Duration(defaultTimeout), - } - }) -} - -// ClickHouse Telegraf Input Plugin type ClickHouse struct { Username string `toml:"username"` Password string `toml:"password"` @@ -60,6 +40,13 @@ type ClickHouse struct { tls.ClientConfig } +type connect struct { + Cluster string `json:"cluster"` + ShardNum int `json:"shard_num"` + Hostname string `json:"host_name"` + url *url.URL +} + func (*ClickHouse) SampleConfig() string { return sampleConfig } @@ -638,3 +625,15 @@ var commonMetricsIsFloat = map[string]bool{ } var _ telegraf.ServiceInput = &ClickHouse{} + +func init() { + inputs.Add("clickhouse", func() telegraf.Input { + return &ClickHouse{ + AutoDiscovery: true, + ClientConfig: tls.ClientConfig{ + InsecureSkipVerify: false, + }, + Timeout: config.Duration(defaultTimeout), + } + }) +} diff --git a/plugins/inputs/cloud_pubsub/cloud_pubsub.go b/plugins/inputs/cloud_pubsub/cloud_pubsub.go index 3792431304c89..0f686b3e7f0df 100644 --- a/plugins/inputs/cloud_pubsub/cloud_pubsub.go +++ b/plugins/inputs/cloud_pubsub/cloud_pubsub.go @@ -25,11 +25,10 @@ var sampleConfig string var once sync.Once -type empty struct{} -type semaphore chan empty - -const defaultMaxUndeliveredMessages = 1000 -const defaultRetryDelaySeconds = 5 +const ( + defaultMaxUndeliveredMessages = 1000 + defaultRetryDelaySeconds = 5 +) type PubSub struct { sync.Mutex @@ -70,12 +69,41 @@ type PubSub struct { decoderMutex sync.Mutex } +type ( + empty struct{} + semaphore chan empty +) + func (*PubSub) SampleConfig() string { return sampleConfig } -// Gather does nothing for this service input. -func (ps *PubSub) Gather(_ telegraf.Accumulator) error { +func (ps *PubSub) Init() error { + if ps.Subscription == "" { + return errors.New(`"subscription" is required`) + } + + if ps.Project == "" { + return errors.New(`"project" is required`) + } + + switch ps.ContentEncoding { + case "", "identity": + ps.ContentEncoding = "identity" + case "gzip": + var err error + var options []internal.DecodingOption + if ps.MaxDecompressionSize > 0 { + options = append(options, internal.WithMaxDecompressionSize(int64(ps.MaxDecompressionSize))) + } + ps.decoder, err = internal.NewContentDecoder(ps.ContentEncoding, options...) + if err != nil { + return err + } + default: + return fmt.Errorf("invalid value %q for content_encoding", ps.ContentEncoding) + } + return nil } @@ -123,6 +151,11 @@ func (ps *PubSub) Start(ac telegraf.Accumulator) error { return nil } +// Gather does nothing for this service input. +func (ps *PubSub) Gather(_ telegraf.Accumulator) error { + return nil +} + // Stop ensures the PubSub subscriptions receivers are stopped by // canceling the context and waits for goroutines to finish. func (ps *PubSub) Stop() { @@ -315,35 +348,6 @@ func (ps *PubSub) getGCPSubscription(subID string) (subscription, error) { return &gcpSubscription{s}, nil } -func (ps *PubSub) Init() error { - if ps.Subscription == "" { - return errors.New(`"subscription" is required`) - } - - if ps.Project == "" { - return errors.New(`"project" is required`) - } - - switch ps.ContentEncoding { - case "", "identity": - ps.ContentEncoding = "identity" - case "gzip": - var err error - var options []internal.DecodingOption - if ps.MaxDecompressionSize > 0 { - options = append(options, internal.WithMaxDecompressionSize(int64(ps.MaxDecompressionSize))) - } - ps.decoder, err = internal.NewContentDecoder(ps.ContentEncoding, options...) - if err != nil { - return err - } - default: - return fmt.Errorf("invalid value %q for content_encoding", ps.ContentEncoding) - } - - return nil -} - func init() { inputs.Add("cloud_pubsub", func() telegraf.Input { ps := &PubSub{ diff --git a/plugins/inputs/cloud_pubsub/cloud_pubsub_test.go b/plugins/inputs/cloud_pubsub/cloud_pubsub_test.go index 32c81b530dc3b..4362f87f7680a 100644 --- a/plugins/inputs/cloud_pubsub/cloud_pubsub_test.go +++ b/plugins/inputs/cloud_pubsub/cloud_pubsub_test.go @@ -200,7 +200,7 @@ func TestRunInvalidMessages(t *testing.T) { acc.WaitError(1) // Make sure we acknowledged message so we don't receive it again. - testTracker.WaitForAck(1) + testTracker.waitForAck(1) require.Equal(t, 0, acc.NFields()) } @@ -249,7 +249,7 @@ func TestRunOverlongMessages(t *testing.T) { acc.WaitError(1) // Make sure we acknowledged message so we don't receive it again. - testTracker.WaitForAck(1) + testTracker.waitForAck(1) require.Equal(t, 0, acc.NFields()) } diff --git a/plugins/inputs/cloud_pubsub/subscription_stub.go b/plugins/inputs/cloud_pubsub/subscription_stub.go index cd5abdfa38b28..91ca7467d347f 100644 --- a/plugins/inputs/cloud_pubsub/subscription_stub.go +++ b/plugins/inputs/cloud_pubsub/subscription_stub.go @@ -51,11 +51,11 @@ type testMsg struct { } func (tm *testMsg) Ack() { - tm.tracker.Ack() + tm.tracker.ack() } func (tm *testMsg) Nack() { - tm.tracker.Nack() + tm.tracker.nack() } func (tm *testMsg) ID() string { @@ -82,7 +82,7 @@ type testTracker struct { numNacks int } -func (t *testTracker) WaitForAck(num int) { +func (t *testTracker) waitForAck(num int) { t.Lock() if t.Cond == nil { t.Cond = sync.NewCond(&t.Mutex) @@ -93,25 +93,14 @@ func (t *testTracker) WaitForAck(num int) { t.Unlock() } -func (t *testTracker) WaitForNack(num int) { - t.Lock() - if t.Cond == nil { - t.Cond = sync.NewCond(&t.Mutex) - } - for t.numNacks < num { - t.Wait() - } - t.Unlock() -} - -func (t *testTracker) Ack() { +func (t *testTracker) ack() { t.Lock() defer t.Unlock() t.numAcks++ } -func (t *testTracker) Nack() { +func (t *testTracker) nack() { t.Lock() defer t.Unlock() diff --git a/plugins/inputs/cloud_pubsub_push/cloud_pubsub_push.go b/plugins/inputs/cloud_pubsub_push/cloud_pubsub_push.go index 3634f9f834e95..e745ee57eeb2c 100644 --- a/plugins/inputs/cloud_pubsub_push/cloud_pubsub_push.go +++ b/plugins/inputs/cloud_pubsub_push/cloud_pubsub_push.go @@ -26,9 +26,11 @@ var once sync.Once // defaultMaxBodySize is the default maximum request body size, in bytes. // if the request body is over this size, we will return an HTTP 413 error. -// 500 MB -const defaultMaxBodySize = 500 * 1024 * 1024 -const defaultMaxUndeliveredMessages = 1000 +const ( + // 500 MB + defaultMaxBodySize = 500 * 1024 * 1024 + defaultMaxUndeliveredMessages = 1000 +) type PubSubPush struct { ServiceAddress string @@ -56,15 +58,15 @@ type PubSubPush struct { sem chan struct{} } -// Message defines the structure of a Google Pub/Sub message. -type Message struct { +// message defines the structure of a Google Pub/Sub message. +type message struct { Atts map[string]string `json:"attributes"` Data string `json:"data"` // Data is base64 encoded data } -// Payload is the received Google Pub/Sub data. (https://cloud.google.com/pubsub/docs/push) -type Payload struct { - Msg Message `json:"message"` +// payload is the received Google Pub/Sub data. (https://cloud.google.com/pubsub/docs/push) +type payload struct { + Msg message `json:"message"` Subscription string `json:"subscription"` } @@ -72,10 +74,6 @@ func (*PubSubPush) SampleConfig() string { return sampleConfig } -func (p *PubSubPush) Gather(_ telegraf.Accumulator) error { - return nil -} - func (p *PubSubPush) SetParser(parser telegraf.Parser) { p.Parser = parser } @@ -135,6 +133,10 @@ func (p *PubSubPush) Start(acc telegraf.Accumulator) error { return nil } +func (p *PubSubPush) Gather(_ telegraf.Accumulator) error { + return nil +} + // Stop cleans up all resources func (p *PubSubPush) Stop() { p.cancel() @@ -144,9 +146,9 @@ func (p *PubSubPush) Stop() { func (p *PubSubPush) ServeHTTP(res http.ResponseWriter, req *http.Request) { if req.URL.Path == p.Path { - p.AuthenticateIfSet(p.serveWrite, res, req) + p.authenticateIfSet(p.serveWrite, res, req) } else { - p.AuthenticateIfSet(http.NotFound, res, req) + p.authenticateIfSet(http.NotFound, res, req) } } @@ -180,7 +182,7 @@ func (p *PubSubPush) serveWrite(res http.ResponseWriter, req *http.Request) { return } - var payload Payload + var payload payload if err = json.Unmarshal(bytes, &payload); err != nil { p.Log.Errorf("Error decoding payload %s", err.Error()) res.WriteHeader(http.StatusBadRequest) @@ -262,7 +264,7 @@ func (p *PubSubPush) receiveDelivered() { } } -func (p *PubSubPush) AuthenticateIfSet(handler http.HandlerFunc, res http.ResponseWriter, req *http.Request) { +func (p *PubSubPush) authenticateIfSet(handler http.HandlerFunc, res http.ResponseWriter, req *http.Request) { if p.Token != "" { if subtle.ConstantTimeCompare([]byte(req.FormValue("token")), []byte(p.Token)) != 1 { http.Error(res, "Unauthorized.", http.StatusUnauthorized) diff --git a/plugins/inputs/cloudwatch/cloudwatch.go b/plugins/inputs/cloudwatch/cloudwatch.go index d84999c068da2..c40cc41a4823c 100644 --- a/plugins/inputs/cloudwatch/cloudwatch.go +++ b/plugins/inputs/cloudwatch/cloudwatch.go @@ -39,18 +39,18 @@ type CloudWatch struct { proxy.HTTPProxy - Period config.Duration `toml:"period"` - Delay config.Duration `toml:"delay"` - Namespace string `toml:"namespace" deprecated:"1.25.0;1.35.0;use 'namespaces' instead"` - Namespaces []string `toml:"namespaces"` - Metrics []*Metric `toml:"metrics"` - CacheTTL config.Duration `toml:"cache_ttl"` - RateLimit int `toml:"ratelimit"` - RecentlyActive string `toml:"recently_active"` - BatchSize int `toml:"batch_size"` - IncludeLinkedAccounts bool `toml:"include_linked_accounts"` - MetricFormat string `toml:"metric_format"` - Log telegraf.Logger `toml:"-"` + Period config.Duration `toml:"period"` + Delay config.Duration `toml:"delay"` + Namespace string `toml:"namespace" deprecated:"1.25.0;1.35.0;use 'namespaces' instead"` + Namespaces []string `toml:"namespaces"` + Metrics []*cloudwatchMetric `toml:"metrics"` + CacheTTL config.Duration `toml:"cache_ttl"` + RateLimit int `toml:"ratelimit"` + RecentlyActive string `toml:"recently_active"` + BatchSize int `toml:"batch_size"` + IncludeLinkedAccounts bool `toml:"include_linked_accounts"` + MetricFormat string `toml:"metric_format"` + Log telegraf.Logger `toml:"-"` client cloudwatchClient statFilter filter.Filter @@ -62,16 +62,16 @@ type CloudWatch struct { common_aws.CredentialConfig } -// Metric defines a simplified Cloudwatch metric. -type Metric struct { +// cloudwatchMetric defines a simplified Cloudwatch metric. +type cloudwatchMetric struct { StatisticExclude *[]string `toml:"statistic_exclude"` StatisticInclude *[]string `toml:"statistic_include"` MetricNames []string `toml:"names"` - Dimensions []*Dimension `toml:"dimensions"` + Dimensions []*dimension `toml:"dimensions"` } -// Dimension defines a simplified Cloudwatch dimension (provides metric filtering). -type Dimension struct { +// dimension defines a simplified Cloudwatch dimension (provides metric filtering). +type dimension struct { Name string `toml:"name"` Value string `toml:"value"` valueMatcher filter.Filter @@ -121,8 +121,6 @@ func (c *CloudWatch) Init() error { return nil } -// Gather takes in an accumulator and adds the metrics that the Input -// gathers. This is called every "interval". func (c *CloudWatch) Gather(acc telegraf.Accumulator) error { filteredMetrics, err := getFilteredMetrics(c) if err != nil { @@ -212,7 +210,7 @@ func (c *CloudWatch) initializeCloudWatch() error { } }) - // Initialize regex matchers for each Dimension value. + // Initialize regex matchers for each dimension value. for _, m := range c.Metrics { for _, dimension := range m.Dimensions { matcher, err := filter.NewIncludeExcludeFilter([]string{dimension.Value}, nil) @@ -494,22 +492,6 @@ func (c *CloudWatch) aggregateMetrics(acc telegraf.Accumulator, metricDataResult } } -func init() { - inputs.Add("cloudwatch", func() telegraf.Input { - return New() - }) -} - -// New instance of the cloudwatch plugin -func New() *CloudWatch { - return &CloudWatch{ - CacheTTL: config.Duration(time.Hour), - RateLimit: 25, - Timeout: config.Duration(time.Second * 5), - BatchSize: 500, - } -} - func sanitizeMeasurement(namespace string) string { namespace = strings.ReplaceAll(namespace, "/", "_") namespace = snakeCase(namespace) @@ -545,7 +527,7 @@ func (f *metricCache) isValid() bool { return f.metrics != nil && time.Since(f.built) < f.ttl } -func hasWildcard(dimensions []*Dimension) bool { +func hasWildcard(dimensions []*dimension) bool { for _, d := range dimensions { if d.Value == "" || strings.ContainsAny(d.Value, "*?[") { return true @@ -554,7 +536,7 @@ func hasWildcard(dimensions []*Dimension) bool { return false } -func isSelected(name string, cloudwatchMetric types.Metric, dimensions []*Dimension) bool { +func isSelected(name string, cloudwatchMetric types.Metric, dimensions []*dimension) bool { if name != *cloudwatchMetric.MetricName { return false } @@ -576,3 +558,18 @@ func isSelected(name string, cloudwatchMetric types.Metric, dimensions []*Dimens } return true } + +func newCloudWatch() *CloudWatch { + return &CloudWatch{ + CacheTTL: config.Duration(time.Hour), + RateLimit: 25, + Timeout: config.Duration(time.Second * 5), + BatchSize: 500, + } +} + +func init() { + inputs.Add("cloudwatch", func() telegraf.Input { + return newCloudWatch() + }) +} diff --git a/plugins/inputs/cloudwatch/cloudwatch_test.go b/plugins/inputs/cloudwatch/cloudwatch_test.go index cfcfad655df9b..283f1e9232b15 100644 --- a/plugins/inputs/cloudwatch/cloudwatch_test.go +++ b/plugins/inputs/cloudwatch/cloudwatch_test.go @@ -379,10 +379,10 @@ func TestSelectMetrics(t *testing.T) { Period: internalDuration, RateLimit: 200, BatchSize: 500, - Metrics: []*Metric{ + Metrics: []*cloudwatchMetric{ { MetricNames: []string{"Latency", "RequestCount"}, - Dimensions: []*Dimension{ + Dimensions: []*dimension{ { Name: "LoadBalancerName", Value: "lb*", diff --git a/plugins/inputs/cloudwatch_metric_streams/cloudwatch_metric_streams.go b/plugins/inputs/cloudwatch_metric_streams/cloudwatch_metric_streams.go index ef20ffa5706bf..a453932859139 100644 --- a/plugins/inputs/cloudwatch_metric_streams/cloudwatch_metric_streams.go +++ b/plugins/inputs/cloudwatch_metric_streams/cloudwatch_metric_streams.go @@ -55,7 +55,7 @@ type CloudWatchMetricStreams struct { acc telegraf.Accumulator } -type Request struct { +type request struct { RequestID string `json:"requestId"` Timestamp int64 `json:"timestamp"` Records []struct { @@ -63,7 +63,7 @@ type Request struct { } `json:"records"` } -type Data struct { +type data struct { MetricStreamName string `json:"metric_stream_name"` AccountID string `json:"account_id"` Region string `json:"region"` @@ -75,7 +75,7 @@ type Data struct { Unit string `json:"unit"` } -type Response struct { +type response struct { RequestID string `json:"requestId"` Timestamp int64 `json:"timestamp"` } @@ -89,29 +89,28 @@ func (*CloudWatchMetricStreams) SampleConfig() string { return sampleConfig } -func (a *age) Record(t time.Duration) { - if t > a.max { - a.max = t +func (cms *CloudWatchMetricStreams) Init() error { + tags := map[string]string{ + "address": cms.ServiceAddress, } + cms.requestsReceived = selfstat.Register("cloudwatch_metric_streams", "requests_received", tags) + cms.writesServed = selfstat.Register("cloudwatch_metric_streams", "writes_served", tags) + cms.requestTime = selfstat.Register("cloudwatch_metric_streams", "request_time", tags) + cms.ageMax = selfstat.Register("cloudwatch_metric_streams", "age_max", tags) + cms.ageMin = selfstat.Register("cloudwatch_metric_streams", "age_min", tags) - if t < a.min { - a.min = t + if cms.MaxBodySize == 0 { + cms.MaxBodySize = config.Size(defaultMaxBodySize) } -} - -func (a *age) SubmitMax(stat selfstat.Stat) { - stat.Incr(a.max.Nanoseconds()) -} -func (a *age) SubmitMin(stat selfstat.Stat) { - stat.Incr(a.min.Nanoseconds()) -} + if cms.ReadTimeout < config.Duration(time.Second) { + cms.ReadTimeout = config.Duration(time.Second * 10) + } -func (cms *CloudWatchMetricStreams) Description() string { - return "HTTP listener & parser for AWS Metric Streams" -} + if cms.WriteTimeout < config.Duration(time.Second) { + cms.WriteTimeout = config.Duration(time.Second * 10) + } -func (cms *CloudWatchMetricStreams) Gather(_ telegraf.Accumulator) error { return nil } @@ -150,13 +149,15 @@ func (cms *CloudWatchMetricStreams) Start(acc telegraf.Accumulator) error { return nil } -func (cms *CloudWatchMetricStreams) createHTTPServer() *http.Server { - return &http.Server{ - Addr: cms.ServiceAddress, - Handler: cms, - ReadTimeout: time.Duration(cms.ReadTimeout), - WriteTimeout: time.Duration(cms.WriteTimeout), +func (cms *CloudWatchMetricStreams) Gather(_ telegraf.Accumulator) error { + return nil +} + +func (cms *CloudWatchMetricStreams) Stop() { + if cms.listener != nil { + cms.listener.Close() } + cms.wg.Wait() } func (cms *CloudWatchMetricStreams) ServeHTTP(res http.ResponseWriter, req *http.Request) { @@ -173,6 +174,33 @@ func (cms *CloudWatchMetricStreams) ServeHTTP(res http.ResponseWriter, req *http cms.authenticateIfSet(handler, res, req) } +func (a *age) record(t time.Duration) { + if t > a.max { + a.max = t + } + + if t < a.min { + a.min = t + } +} + +func (a *age) submitMax(stat selfstat.Stat) { + stat.Incr(a.max.Nanoseconds()) +} + +func (a *age) submitMin(stat selfstat.Stat) { + stat.Incr(a.min.Nanoseconds()) +} + +func (cms *CloudWatchMetricStreams) createHTTPServer() *http.Server { + return &http.Server{ + Addr: cms.ServiceAddress, + Handler: cms, + ReadTimeout: time.Duration(cms.ReadTimeout), + WriteTimeout: time.Duration(cms.WriteTimeout), + } +} + func (cms *CloudWatchMetricStreams) recordRequestTime(start time.Time) { elapsed := time.Since(start) cms.requestTime.Incr(elapsed.Nanoseconds()) @@ -224,7 +252,7 @@ func (cms *CloudWatchMetricStreams) serveWrite(res http.ResponseWriter, req *htt } // Decode the request - var r Request + var r request err := json.NewDecoder(body).Decode(&r) if err != nil { cms.Log.Errorf("unable to decode metric-streams request: %v", err) @@ -235,10 +263,10 @@ func (cms *CloudWatchMetricStreams) serveWrite(res http.ResponseWriter, req *htt } agesInRequest := &age{max: 0, min: math.MaxInt32} - defer agesInRequest.SubmitMax(cms.ageMax) - defer agesInRequest.SubmitMin(cms.ageMin) + defer agesInRequest.submitMax(cms.ageMax) + defer agesInRequest.submitMin(cms.ageMin) - // For each record, decode the base64 data and store it in a Data struct + // For each record, decode the base64 data and store it in a data struct // Metrics from Metric Streams are Base64 encoded JSON // https://docs.aws.amazon.com/firehose/latest/dev/httpdeliveryrequestresponse.html for _, record := range r.Records { @@ -261,7 +289,7 @@ func (cms *CloudWatchMetricStreams) serveWrite(res http.ResponseWriter, req *htt } for _, js := range list { - var d Data + var d data err = json.Unmarshal([]byte(js), &d) if err != nil { cms.Log.Errorf("unable to unmarshal metric-streams data: %v", err) @@ -271,13 +299,13 @@ func (cms *CloudWatchMetricStreams) serveWrite(res http.ResponseWriter, req *htt return } cms.composeMetrics(d) - agesInRequest.Record(time.Since(time.Unix(d.Timestamp/1000, 0))) + agesInRequest.record(time.Since(time.Unix(d.Timestamp/1000, 0))) } } // Compose the response to AWS using the request's requestId // https://docs.aws.amazon.com/firehose/latest/dev/httpdeliveryrequestresponse.html#responseformat - response := Response{ + response := response{ RequestID: r.RequestID, Timestamp: time.Now().UnixNano() / 1000000, } @@ -300,7 +328,7 @@ func (cms *CloudWatchMetricStreams) serveWrite(res http.ResponseWriter, req *htt } } -func (cms *CloudWatchMetricStreams) composeMetrics(data Data) { +func (cms *CloudWatchMetricStreams) composeMetrics(data data) { fields := make(map[string]interface{}) tags := make(map[string]string) timestamp := time.Unix(data.Timestamp/1000, 0) @@ -386,39 +414,6 @@ func (cms *CloudWatchMetricStreams) authenticateIfSet(handler http.HandlerFunc, } } -// Stop cleans up all resources -func (cms *CloudWatchMetricStreams) Stop() { - if cms.listener != nil { - cms.listener.Close() - } - cms.wg.Wait() -} - -func (cms *CloudWatchMetricStreams) Init() error { - tags := map[string]string{ - "address": cms.ServiceAddress, - } - cms.requestsReceived = selfstat.Register("cloudwatch_metric_streams", "requests_received", tags) - cms.writesServed = selfstat.Register("cloudwatch_metric_streams", "writes_served", tags) - cms.requestTime = selfstat.Register("cloudwatch_metric_streams", "request_time", tags) - cms.ageMax = selfstat.Register("cloudwatch_metric_streams", "age_max", tags) - cms.ageMin = selfstat.Register("cloudwatch_metric_streams", "age_min", tags) - - if cms.MaxBodySize == 0 { - cms.MaxBodySize = config.Size(defaultMaxBodySize) - } - - if cms.ReadTimeout < config.Duration(time.Second) { - cms.ReadTimeout = config.Duration(time.Second * 10) - } - - if cms.WriteTimeout < config.Duration(time.Second) { - cms.WriteTimeout = config.Duration(time.Second * 10) - } - - return nil -} - func init() { inputs.Add("cloudwatch_metric_streams", func() telegraf.Input { return &CloudWatchMetricStreams{ diff --git a/plugins/inputs/cloudwatch_metric_streams/cloudwatch_metric_streams_test.go b/plugins/inputs/cloudwatch_metric_streams/cloudwatch_metric_streams_test.go index 3379c34c128c5..5b98692a55564 100644 --- a/plugins/inputs/cloudwatch_metric_streams/cloudwatch_metric_streams_test.go +++ b/plugins/inputs/cloudwatch_metric_streams/cloudwatch_metric_streams_test.go @@ -304,8 +304,8 @@ func TestComposeMetrics(t *testing.T) { require.NoError(t, metricStream.Start(acc)) defer metricStream.Stop() - // compose a Data object for writing - data := Data{ + // compose a data object for writing + data := data{ MetricStreamName: "cloudwatch-metric-stream", AccountID: "546734499701", Region: "us-west-2", @@ -335,8 +335,8 @@ func TestComposeAPICompatibleMetrics(t *testing.T) { require.NoError(t, metricStream.Start(acc)) defer metricStream.Stop() - // compose a Data object for writing - data := Data{ + // compose a data object for writing + data := data{ MetricStreamName: "cloudwatch-metric-stream", AccountID: "546734499701", Region: "us-west-2", diff --git a/plugins/inputs/conntrack/conntrack.go b/plugins/inputs/conntrack/conntrack.go index d8e29aa10e3f3..ee0bd93d1bccd 100644 --- a/plugins/inputs/conntrack/conntrack.go +++ b/plugins/inputs/conntrack/conntrack.go @@ -21,38 +21,29 @@ import ( //go:embed sample.conf var sampleConfig string -type Conntrack struct { - ps system.PS - Path string - Dirs []string - Files []string - Collect []string -} +var ( + dfltDirs = []string{ + "/proc/sys/net/ipv4/netfilter", + "/proc/sys/net/netfilter", + } + + dfltFiles = []string{ + "ip_conntrack_count", + "ip_conntrack_max", + "nf_conntrack_count", + "nf_conntrack_max", + } +) const ( inputName = "conntrack" ) -var dfltDirs = []string{ - "/proc/sys/net/ipv4/netfilter", - "/proc/sys/net/netfilter", -} - -var dfltFiles = []string{ - "ip_conntrack_count", - "ip_conntrack_max", - "nf_conntrack_count", - "nf_conntrack_max", -} - -func (c *Conntrack) setDefaults() { - if len(c.Dirs) == 0 { - c.Dirs = dfltDirs - } - - if len(c.Files) == 0 { - c.Files = dfltFiles - } +type Conntrack struct { + Collect []string `toml:"collect"` + Dirs []string `toml:"dirs"` + Files []string `toml:"files"` + ps system.PS } func (*Conntrack) SampleConfig() string { @@ -154,6 +145,16 @@ func (c *Conntrack) Gather(acc telegraf.Accumulator) error { return nil } +func (c *Conntrack) setDefaults() { + if len(c.Dirs) == 0 { + c.Dirs = dfltDirs + } + + if len(c.Files) == 0 { + c.Files = dfltFiles + } +} + func init() { inputs.Add(inputName, func() telegraf.Input { return &Conntrack{ diff --git a/plugins/inputs/conntrack/conntrack_notlinux.go b/plugins/inputs/conntrack/conntrack_notlinux.go index 474cbf487b86d..0906a56e83817 100644 --- a/plugins/inputs/conntrack/conntrack_notlinux.go +++ b/plugins/inputs/conntrack/conntrack_notlinux.go @@ -17,11 +17,13 @@ type Conntrack struct { Log telegraf.Logger `toml:"-"` } +func (*Conntrack) SampleConfig() string { return sampleConfig } + func (c *Conntrack) Init() error { c.Log.Warn("current platform is not supported") return nil } -func (*Conntrack) SampleConfig() string { return sampleConfig } + func (*Conntrack) Gather(_ telegraf.Accumulator) error { return nil } func init() { diff --git a/plugins/inputs/consul/consul.go b/plugins/inputs/consul/consul.go index e79823fc7dbd8..45b55ce90b2ed 100644 --- a/plugins/inputs/consul/consul.go +++ b/plugins/inputs/consul/consul.go @@ -18,19 +18,19 @@ import ( var sampleConfig string type Consul struct { - Address string - Scheme string - Token string - Username string - Password string - Datacentre string `toml:"datacentre" deprecated:"1.10.0;1.35.0;use 'datacenter' instead"` - Datacenter string - tls.ClientConfig - TagDelimiter string - MetricVersion int + Address string `toml:"address"` + Scheme string `toml:"scheme"` + Token string `toml:"token"` + Username string `toml:"username"` + Password string `toml:"password"` + Datacentre string `toml:"datacentre" deprecated:"1.10.0;1.35.0;use 'datacenter' instead"` + Datacenter string `toml:"datacenter"` + TagDelimiter string `toml:"tag_delimiter"` + MetricVersion int `toml:"metric_version"` Log telegraf.Logger + tls.ClientConfig - // client used to connect to Consul agnet + // client used to connect to Consul agent client *api.Client } @@ -91,7 +91,19 @@ func (c *Consul) Init() error { return err } -func (c *Consul) GatherHealthCheck(acc telegraf.Accumulator, checks []*api.HealthCheck) { +func (c *Consul) Gather(acc telegraf.Accumulator) error { + checks, _, err := c.client.Health().State("any", nil) + + if err != nil { + return err + } + + c.gatherHealthCheck(acc, checks) + + return nil +} + +func (c *Consul) gatherHealthCheck(acc telegraf.Accumulator, checks []*api.HealthCheck) { for _, check := range checks { record := make(map[string]interface{}) tags := make(map[string]string) @@ -132,18 +144,6 @@ func (c *Consul) GatherHealthCheck(acc telegraf.Accumulator, checks []*api.Healt } } -func (c *Consul) Gather(acc telegraf.Accumulator) error { - checks, _, err := c.client.Health().State("any", nil) - - if err != nil { - return err - } - - c.GatherHealthCheck(acc, checks) - - return nil -} - func init() { inputs.Add("consul", func() telegraf.Input { return &Consul{} diff --git a/plugins/inputs/consul/consul_test.go b/plugins/inputs/consul/consul_test.go index f7301b5fb37fb..0a0eced15c0fd 100644 --- a/plugins/inputs/consul/consul_test.go +++ b/plugins/inputs/consul/consul_test.go @@ -43,7 +43,7 @@ func TestGatherHealthCheck(t *testing.T) { var acc testutil.Accumulator consul := &Consul{} - consul.GatherHealthCheck(&acc, sampleChecks) + consul.gatherHealthCheck(&acc, sampleChecks) acc.AssertContainsTaggedFields(t, "consul_health_checks", expectedFields, expectedTags) } @@ -72,7 +72,7 @@ func TestGatherHealthCheckWithDelimitedTags(t *testing.T) { consul := &Consul{ TagDelimiter: ":", } - consul.GatherHealthCheck(&acc, sampleChecks) + consul.gatherHealthCheck(&acc, sampleChecks) acc.AssertContainsTaggedFields(t, "consul_health_checks", expectedFields, expectedTags) } @@ -101,7 +101,7 @@ func TestGatherHealthCheckV2(t *testing.T) { consul := &Consul{ MetricVersion: 2, } - consul.GatherHealthCheck(&acc, sampleChecks) + consul.gatherHealthCheck(&acc, sampleChecks) acc.AssertContainsTaggedFields(t, "consul_health_checks", expectedFields, expectedTags) } @@ -131,7 +131,7 @@ func TestGatherHealthCheckWithDelimitedTagsV2(t *testing.T) { MetricVersion: 2, TagDelimiter: ":", } - consul.GatherHealthCheck(&acc, sampleChecks) + consul.gatherHealthCheck(&acc, sampleChecks) acc.AssertContainsTaggedFields(t, "consul_health_checks", expectedFields, expectedTags) } diff --git a/plugins/inputs/consul_agent/consul_agent.go b/plugins/inputs/consul_agent/consul_agent.go index b0c20f45a12f7..89f0cd5e6b707 100644 --- a/plugins/inputs/consul_agent/consul_agent.go +++ b/plugins/inputs/consul_agent/consul_agent.go @@ -21,7 +21,8 @@ import ( //go:embed sample.conf var sampleConfig string -// consul_agent configuration object +const timeLayout = "2006-01-02 15:04:05 -0700 MST" + type ConsulAgent struct { URL string `toml:"url"` @@ -35,16 +36,6 @@ type ConsulAgent struct { roundTripper http.RoundTripper } -const timeLayout = "2006-01-02 15:04:05 -0700 MST" - -func init() { - inputs.Add("consul_agent", func() telegraf.Input { - return &ConsulAgent{ - ResponseTimeout: config.Duration(5 * time.Second), - } - }) -} - func (*ConsulAgent) SampleConfig() string { return sampleConfig } @@ -80,7 +71,6 @@ func (n *ConsulAgent) Init() error { return nil } -// Gather, collects metrics from Consul endpoint func (n *ConsulAgent) Gather(acc telegraf.Accumulator) error { summaryMetrics, err := n.loadJSON(n.URL + "/v1/agent/metrics") if err != nil { @@ -90,7 +80,7 @@ func (n *ConsulAgent) Gather(acc telegraf.Accumulator) error { return buildConsulAgent(acc, summaryMetrics) } -func (n *ConsulAgent) loadJSON(url string) (*AgentInfo, error) { +func (n *ConsulAgent) loadJSON(url string) (*agentInfo, error) { req, err := http.NewRequest("GET", url, nil) if err != nil { return nil, err @@ -109,7 +99,7 @@ func (n *ConsulAgent) loadJSON(url string) (*AgentInfo, error) { return nil, fmt.Errorf("%s returned HTTP status %s", url, resp.Status) } - var metrics AgentInfo + var metrics agentInfo err = json.NewDecoder(resp.Body).Decode(&metrics) if err != nil { return nil, fmt.Errorf("error parsing json response: %w", err) @@ -119,7 +109,7 @@ func (n *ConsulAgent) loadJSON(url string) (*AgentInfo, error) { } // buildConsulAgent, it builds all the metrics and adds them to the accumulator) -func buildConsulAgent(acc telegraf.Accumulator, agentInfo *AgentInfo) error { +func buildConsulAgent(acc telegraf.Accumulator, agentInfo *agentInfo) error { t, err := internal.ParseTimestamp(timeLayout, agentInfo.Timestamp, nil) if err != nil { return fmt.Errorf("error parsing time: %w", err) @@ -175,3 +165,11 @@ func buildConsulAgent(acc telegraf.Accumulator, agentInfo *AgentInfo) error { return nil } + +func init() { + inputs.Add("consul_agent", func() telegraf.Input { + return &ConsulAgent{ + ResponseTimeout: config.Duration(5 * time.Second), + } + }) +} diff --git a/plugins/inputs/consul_agent/consul_structs.go b/plugins/inputs/consul_agent/consul_structs.go index c17509189ed46..b4aa602bbc227 100644 --- a/plugins/inputs/consul_agent/consul_structs.go +++ b/plugins/inputs/consul_agent/consul_structs.go @@ -1,25 +1,25 @@ package consul_agent -type AgentInfo struct { +type agentInfo struct { Timestamp string - Gauges []GaugeValue - Points []PointValue - Counters []SampledValue - Samples []SampledValue + Gauges []gaugeValue + Points []pointValue + Counters []sampledValue + Samples []sampledValue } -type GaugeValue struct { +type gaugeValue struct { Name string Value float32 Labels map[string]string } -type PointValue struct { +type pointValue struct { Name string Points []float32 } -type SampledValue struct { +type sampledValue struct { Name string Count int Sum float64 diff --git a/plugins/inputs/couchbase/couchbase.go b/plugins/inputs/couchbase/couchbase.go index fc2ed00c48a20..f8e1489eb3e18 100644 --- a/plugins/inputs/couchbase/couchbase.go +++ b/plugins/inputs/couchbase/couchbase.go @@ -22,6 +22,8 @@ import ( //go:embed sample.conf var sampleConfig string +var regexpURI = regexp.MustCompile(`(\S+://)?(\S+\:\S+@)`) + type Couchbase struct { Servers []string `toml:"servers"` BucketStatsIncluded []string `toml:"bucket_stats_included"` @@ -42,13 +44,40 @@ type autoFailover struct { Timeout int `json:"timeout"` } -var regexpURI = regexp.MustCompile(`(\S+://)?(\S+\:\S+@)`) - func (*Couchbase) SampleConfig() string { return sampleConfig } -// Reads stats from all configured clusters. Accumulates stats. +func (cb *Couchbase) Init() error { + f, err := filter.NewIncludeExcludeFilter(cb.BucketStatsIncluded, []string{}) + if err != nil { + return err + } + + cb.bucketInclude = f + + tlsConfig, err := cb.TLSConfig() + if err != nil { + return err + } + + cb.client = &http.Client{ + Timeout: 10 * time.Second, + Transport: &http.Transport{ + MaxIdleConnsPerHost: couchbase.MaxIdleConnsPerHost, + TLSClientConfig: tlsConfig, + }, + } + + couchbase.SetSkipVerify(cb.ClientConfig.InsecureSkipVerify) + couchbase.SetCertFile(cb.ClientConfig.TLSCert) + couchbase.SetKeyFile(cb.ClientConfig.TLSKey) + couchbase.SetRootFile(cb.ClientConfig.TLSCA) + + return nil +} + +// Gather reads stats from all configured clusters. Accumulates stats. // Returns one of the errors encountered while gathering stats (if any). func (cb *Couchbase) Gather(acc telegraf.Accumulator) error { if len(cb.Servers) == 0 { @@ -181,7 +210,7 @@ func (cb *Couchbase) basicBucketStats(basicStats map[string]interface{}) map[str } func (cb *Couchbase) gatherDetailedBucketStats(server, bucket, nodeHostname string, fields map[string]interface{}) error { - extendedBucketStats := &BucketStats{} + extendedBucketStats := &bucketStats{} err := cb.queryDetailedBucketStats(server, bucket, nodeHostname, extendedBucketStats) if err != nil { return err @@ -421,7 +450,7 @@ func (cb *Couchbase) addBucketFieldChecked(fields map[string]interface{}, fieldK cb.addBucketField(fields, fieldKey, values[len(values)-1]) } -func (cb *Couchbase) queryDetailedBucketStats(server, bucket, nodeHostname string, bucketStats *BucketStats) error { +func (cb *Couchbase) queryDetailedBucketStats(server, bucket, nodeHostname string, bucketStats *bucketStats) error { url := server + "/pools/default/buckets/" + bucket if nodeHostname != "" { url += "/nodes/" + nodeHostname @@ -444,35 +473,6 @@ func (cb *Couchbase) queryDetailedBucketStats(server, bucket, nodeHostname strin return json.NewDecoder(r.Body).Decode(bucketStats) } -func (cb *Couchbase) Init() error { - f, err := filter.NewIncludeExcludeFilter(cb.BucketStatsIncluded, []string{}) - if err != nil { - return err - } - - cb.bucketInclude = f - - tlsConfig, err := cb.TLSConfig() - if err != nil { - return err - } - - cb.client = &http.Client{ - Timeout: 10 * time.Second, - Transport: &http.Transport{ - MaxIdleConnsPerHost: couchbase.MaxIdleConnsPerHost, - TLSClientConfig: tlsConfig, - }, - } - - couchbase.SetSkipVerify(cb.ClientConfig.InsecureSkipVerify) - couchbase.SetCertFile(cb.ClientConfig.TLSCert) - couchbase.SetKeyFile(cb.ClientConfig.TLSKey) - couchbase.SetRootFile(cb.ClientConfig.TLSCA) - - return nil -} - func init() { inputs.Add("couchbase", func() telegraf.Input { return &Couchbase{ diff --git a/plugins/inputs/couchbase/couchbase_data.go b/plugins/inputs/couchbase/couchbase_data.go index 2b1227f5c8cdc..04f4db4b5da86 100644 --- a/plugins/inputs/couchbase/couchbase_data.go +++ b/plugins/inputs/couchbase/couchbase_data.go @@ -1,6 +1,6 @@ package couchbase -type BucketStats struct { +type bucketStats struct { Op struct { Samples struct { CouchTotalDiskSize []float64 `json:"couch_total_disk_size"` diff --git a/plugins/inputs/couchbase/couchbase_test.go b/plugins/inputs/couchbase/couchbase_test.go index a9a75792aade0..9621843a90028 100644 --- a/plugins/inputs/couchbase/couchbase_test.go +++ b/plugins/inputs/couchbase/couchbase_test.go @@ -132,7 +132,7 @@ func TestGatherDetailedBucketMetrics(t *testing.T) { err = cb.Init() require.NoError(t, err) var acc testutil.Accumulator - bucketStats := &BucketStats{} + bucketStats := &bucketStats{} if err := json.Unmarshal(test.response, bucketStats); err != nil { t.Fatal("parse bucketResponse", err) } diff --git a/plugins/inputs/couchdb/couchdb.go b/plugins/inputs/couchdb/couchdb.go index f035183f4e24d..c51d9f30d6da7 100644 --- a/plugins/inputs/couchdb/couchdb.go +++ b/plugins/inputs/couchdb/couchdb.go @@ -16,6 +16,14 @@ import ( //go:embed sample.conf var sampleConfig string +type CouchDB struct { + Hosts []string `toml:"hosts"` + BasicUsername string `toml:"basic_username"` + BasicPassword string `toml:"basic_password"` + + client *http.Client +} + type ( metaData struct { Current *float64 `json:"current"` @@ -77,20 +85,12 @@ type ( ClientsRequestingChanges metaData `json:"clients_requesting_changes"` } - Stats struct { + stats struct { Couchdb couchdb `json:"couchdb"` HttpdRequestMethods httpdRequestMethods `json:"httpd_request_methods"` HttpdStatusCodes httpdStatusCodes `json:"httpd_status_codes"` Httpd httpd `json:"httpd"` } - - CouchDB struct { - Hosts []string `toml:"hosts"` - BasicUsername string `toml:"basic_username"` - BasicPassword string `toml:"basic_password"` - - client *http.Client - } ) func (*CouchDB) SampleConfig() string { @@ -143,7 +143,7 @@ func (c *CouchDB) fetchAndInsertData(accumulator telegraf.Accumulator, host stri return fmt.Errorf("failed to get stats from couchdb: HTTP responded %d", response.StatusCode) } - stats := Stats{} + stats := stats{} decoder := json.NewDecoder(response.Body) if err := decoder.Decode(&stats); err != nil { return fmt.Errorf("failed to decode stats from couchdb: HTTP body %q", response.Body) diff --git a/plugins/inputs/cpu/cpu.go b/plugins/inputs/cpu/cpu.go index 1dbf51bdcd9e0..b1704e0023bb9 100644 --- a/plugins/inputs/cpu/cpu.go +++ b/plugins/inputs/cpu/cpu.go @@ -37,6 +37,25 @@ func (*CPUStats) SampleConfig() string { return sampleConfig } +func (c *CPUStats) Init() error { + if c.CoreTags { + cpuInfo, err := cpu.Info() + if err == nil { + c.coreID = cpuInfo[0].CoreID != "" + c.physicalID = cpuInfo[0].PhysicalID != "" + + c.cpuInfo = make(map[string]cpu.InfoStat) + for _, ci := range cpuInfo { + c.cpuInfo[fmt.Sprintf("cpu%d", ci.CPU)] = ci + } + } else { + c.Log.Warnf("Failed to gather info about CPUs: %s", err) + } + } + + return nil +} + func (c *CPUStats) Gather(acc telegraf.Accumulator) error { times, err := c.ps.CPUTimes(c.PerCPU, c.TotalCPU) if err != nil { @@ -127,25 +146,6 @@ func (c *CPUStats) Gather(acc telegraf.Accumulator) error { return err } -func (c *CPUStats) Init() error { - if c.CoreTags { - cpuInfo, err := cpu.Info() - if err == nil { - c.coreID = cpuInfo[0].CoreID != "" - c.physicalID = cpuInfo[0].PhysicalID != "" - - c.cpuInfo = make(map[string]cpu.InfoStat) - for _, ci := range cpuInfo { - c.cpuInfo[fmt.Sprintf("cpu%d", ci.CPU)] = ci - } - } else { - c.Log.Warnf("Failed to gather info about CPUs: %s", err) - } - } - - return nil -} - func totalCPUTime(t cpu.TimesStat) float64 { total := t.User + t.System + t.Nice + t.Iowait + t.Irq + t.Softirq + t.Steal + t.Idle return total diff --git a/plugins/inputs/cpu/cpu_test.go b/plugins/inputs/cpu/cpu_test.go index 73090b55830c1..75f596356402f 100644 --- a/plugins/inputs/cpu/cpu_test.go +++ b/plugins/inputs/cpu/cpu_test.go @@ -11,7 +11,7 @@ import ( "github.com/influxdata/telegraf/testutil" ) -func NewCPUStats(ps system.PS) *CPUStats { +func newCPUStats(ps system.PS) *CPUStats { return &CPUStats{ ps: ps, CollectCPUTime: true, @@ -54,7 +54,7 @@ func TestCPUStats(t *testing.T) { mps.On("CPUTimes").Return([]cpu.TimesStat{cts}, nil) - cs := NewCPUStats(&mps) + cs := newCPUStats(&mps) err := cs.Gather(&acc) require.NoError(t, err) @@ -159,7 +159,7 @@ func TestCPUCountIncrease(t *testing.T) { var acc testutil.Accumulator var err error - cs := NewCPUStats(&mps) + cs := newCPUStats(&mps) mps.On("CPUTimes").Return( []cpu.TimesStat{ @@ -216,7 +216,7 @@ func TestCPUTimesDecrease(t *testing.T) { mps.On("CPUTimes").Return([]cpu.TimesStat{cts}, nil) - cs := NewCPUStats(&mps) + cs := newCPUStats(&mps) err := cs.Gather(&acc) require.NoError(t, err) diff --git a/plugins/inputs/ctrlx_datalayer/ctrlx_datalayer.go b/plugins/inputs/ctrlx_datalayer/ctrlx_datalayer.go index 42c4628f96185..c6c4791597e39 100644 --- a/plugins/inputs/ctrlx_datalayer/ctrlx_datalayer.go +++ b/plugins/inputs/ctrlx_datalayer/ctrlx_datalayer.go @@ -58,6 +58,92 @@ type CtrlXDataLayer struct { common_http.HTTPClientConfig } +func (*CtrlXDataLayer) SampleConfig() string { + return sampleConfig +} + +func (c *CtrlXDataLayer) Init() error { + // Check all configured subscriptions for valid settings + for i := range c.Subscription { + sub := &c.Subscription[i] + sub.applyDefaultSettings() + if !choice.Contains(sub.QueueBehaviour, queueBehaviours) { + c.Log.Infof("The right queue behaviour values are %v", queueBehaviours) + return fmt.Errorf("subscription %d: setting 'queue_behaviour' %q is invalid", i, sub.QueueBehaviour) + } + if !choice.Contains(sub.ValueChange, valueChanges) { + c.Log.Infof("The right value change values are %v", valueChanges) + return fmt.Errorf("subscription %d: setting 'value_change' %q is invalid", i, sub.ValueChange) + } + if len(sub.Nodes) == 0 { + c.Log.Warn("A configured subscription has no nodes configured") + } + sub.index = i + } + + // Generate valid communication url based on configured server address + u := url.URL{ + Scheme: "https", + Host: c.Server, + } + c.url = u.String() + if _, err := url.Parse(c.url); err != nil { + return errors.New("invalid server address") + } + + return nil +} + +func (c *CtrlXDataLayer) Start(acc telegraf.Accumulator) error { + var ctx context.Context + ctx, c.cancel = context.WithCancel(context.Background()) + + var err error + c.connection, err = c.HTTPClientConfig.CreateClient(ctx, c.Log) + if err != nil { + return fmt.Errorf("failed to create http client: %w", err) + } + + username, err := c.Username.Get() + if err != nil { + return fmt.Errorf("getting username failed: %w", err) + } + + password, err := c.Password.Get() + if err != nil { + username.Destroy() + return fmt.Errorf("getting password failed: %w", err) + } + + c.tokenManager = token.TokenManager{ + Url: c.url, + Username: username.String(), + Password: password.String(), + Connection: c.connection, + } + username.Destroy() + password.Destroy() + + c.acc = acc + + c.gatherLoop(ctx) + + return nil +} + +func (c *CtrlXDataLayer) Gather(_ telegraf.Accumulator) error { + // Metrics are sent to the accumulator asynchronously in worker thread. So nothing to do here. + return nil +} + +func (c *CtrlXDataLayer) Stop() { + c.cancel() + c.wg.Wait() + if c.connection != nil { + c.connection.CloseIdleConnections() + } +} + // convertTimestamp2UnixTime converts the given Data Layer timestamp of the payload to UnixTime. func convertTimestamp2UnixTime(t int64) time.Time { // 1 sec=1000 millisec=1000000 microsec=1000000000 nanosec. @@ -238,77 +324,6 @@ func (c *CtrlXDataLayer) createMetric(em *sseEventData, sub *subscription) (tele return nil, fmt.Errorf("unsupported value type: %s", em.Type) } -// Init is for setup, and validating config -func (c *CtrlXDataLayer) Init() error { - // Check all configured subscriptions for valid settings - for i := range c.Subscription { - sub := &c.Subscription[i] - sub.applyDefaultSettings() - if !choice.Contains(sub.QueueBehaviour, queueBehaviours) { - c.Log.Infof("The right queue behaviour values are %v", queueBehaviours) - return fmt.Errorf("subscription %d: setting 'queue_behaviour' %q is invalid", i, sub.QueueBehaviour) - } - if !choice.Contains(sub.ValueChange, valueChanges) { - c.Log.Infof("The right value change values are %v", valueChanges) - return fmt.Errorf("subscription %d: setting 'value_change' %q is invalid", i, sub.ValueChange) - } - if len(sub.Nodes) == 0 { - c.Log.Warn("A configured subscription has no nodes configured") - } - sub.index = i - } - - // Generate valid communication url based on configured server address - u := url.URL{ - Scheme: "https", - Host: c.Server, - } - c.url = u.String() - if _, err := url.Parse(c.url); err != nil { - return errors.New("invalid server address") - } - - return nil -} - -// Start input as service, retain the accumulator, establish the connection. -func (c *CtrlXDataLayer) Start(acc telegraf.Accumulator) error { - var ctx context.Context - ctx, c.cancel = context.WithCancel(context.Background()) - - var err error - c.connection, err = c.HTTPClientConfig.CreateClient(ctx, c.Log) - if err != nil { - return fmt.Errorf("failed to create http client: %w", err) - } - - username, err := c.Username.Get() - if err != nil { - return fmt.Errorf("getting username failed: %w", err) - } - - password, err := c.Password.Get() - if err != nil { - username.Destroy() - return fmt.Errorf("getting password failed: %w", err) - } - - c.tokenManager = token.TokenManager{ - Url: c.url, - Username: username.String(), - Password: password.String(), - Connection: c.connection, - } - username.Destroy() - password.Destroy() - - c.acc = acc - - c.gatherLoop(ctx) - - return nil -} - // gatherLoop creates sse subscriptions on the Data Layer and requests the sse data // the connection will be restablished if the sse subscription is broken. func (c *CtrlXDataLayer) gatherLoop(ctx context.Context) { @@ -349,26 +364,6 @@ func (c *CtrlXDataLayer) gatherLoop(ctx context.Context) { } } -// Stop input as service. -func (c *CtrlXDataLayer) Stop() { - c.cancel() - c.wg.Wait() - if c.connection != nil { - c.connection.CloseIdleConnections() - } -} - -// Gather is called by telegraf to collect the metrics. -func (c *CtrlXDataLayer) Gather(_ telegraf.Accumulator) error { - // Metrics are sent to the accumulator asynchronously in worker thread. So nothing to do here. - return nil -} - -// SampleConfig returns the auto-inserted sample configuration to the telegraf. -func (*CtrlXDataLayer) SampleConfig() string { - return sampleConfig -} - // init registers the plugin in telegraf. func init() { inputs.Add("ctrlx_datalayer", func() telegraf.Input {