diff --git a/plugins/inputs/opcua/opcua.go b/plugins/inputs/opcua/opcua.go index dd0109adc5f8a..839b3d99cabe2 100644 --- a/plugins/inputs/opcua/opcua.go +++ b/plugins/inputs/opcua/opcua.go @@ -16,26 +16,24 @@ import ( var sampleConfig string type OpcUA struct { - ReadClientConfig + readClientConfig Log telegraf.Logger `toml:"-"` - client *ReadClient + client *readClient } func (*OpcUA) SampleConfig() string { return sampleConfig } -// Init Initialise all required objects func (o *OpcUA) Init() (err error) { - o.client, err = o.ReadClientConfig.CreateReadClient(o.Log) + o.client, err = o.readClientConfig.createReadClient(o.Log) return err } -// Gather defines what data the plugin will gather. func (o *OpcUA) Gather(acc telegraf.Accumulator) error { // Will (re)connect if the client is disconnected - metrics, err := o.client.CurrentValues() + metrics, err := o.client.currentValues() if err != nil { return err } @@ -51,7 +49,7 @@ func (o *OpcUA) Gather(acc telegraf.Accumulator) error { func init() { inputs.Add("opcua", func() telegraf.Input { return &OpcUA{ - ReadClientConfig: ReadClientConfig{ + readClientConfig: readClientConfig{ InputClientConfig: input.InputClientConfig{ OpcUAClientConfig: opcua.OpcUAClientConfig{ Endpoint: "opc.tcp://localhost:4840", diff --git a/plugins/inputs/opcua/opcua_test.go b/plugins/inputs/opcua/opcua_test.go index ee18778a9141d..2c0bc559128a7 100644 --- a/plugins/inputs/opcua/opcua_test.go +++ b/plugins/inputs/opcua/opcua_test.go @@ -19,19 +19,19 @@ import ( const servicePort = "4840" -type OPCTags struct { - Name string - Namespace string - IdentifierType string - Identifier string - Want interface{} +type opcTags struct { + name string + namespace string + identifierType string + identifier string + want interface{} } -func MapOPCTag(tags OPCTags) (out input.NodeSettings) { - out.FieldName = tags.Name - out.Namespace = tags.Namespace - out.IdentifierType = tags.IdentifierType - out.Identifier = tags.Identifier +func mapOPCTag(tags opcTags) (out input.NodeSettings) { + out.FieldName = tags.name + out.Namespace = tags.namespace + out.IdentifierType = tags.identifierType + out.Identifier = tags.identifier return out } @@ -52,13 +52,13 @@ func TestGetDataBadNodeContainerIntegration(t *testing.T) { require.NoError(t, err, "failed to start container") defer container.Terminate() - testopctags := []OPCTags{ + testopctags := []opcTags{ {"ProductName", "1", "i", "2261", "open62541 OPC UA Server"}, {"ProductUri", "0", "i", "2262", "http://open62541.org"}, {"ManufacturerName", "0", "i", "2263", "open62541"}, } - readConfig := ReadClientConfig{ + readConfig := readClientConfig{ InputClientConfig: input.InputClientConfig{ OpcUAClientConfig: opcua.OpcUAClientConfig{ Endpoint: fmt.Sprintf("opc.tcp://%s:%s", container.Address, container.Ports[servicePort]), @@ -83,14 +83,14 @@ func TestGetDataBadNodeContainerIntegration(t *testing.T) { } for _, tags := range testopctags { - g.Nodes = append(g.Nodes, MapOPCTag(tags)) + g.Nodes = append(g.Nodes, mapOPCTag(tags)) } readConfig.Groups = append(readConfig.Groups, g) logger := &testutil.CaptureLogger{} - readClient, err := readConfig.CreateReadClient(logger) + readClient, err := readConfig.createReadClient(logger) require.NoError(t, err) - err = readClient.Connect() + err = readClient.connect() require.NoError(t, err) } @@ -111,7 +111,7 @@ func TestReadClientIntegration(t *testing.T) { require.NoError(t, err, "failed to start container") defer container.Terminate() - testopctags := []OPCTags{ + testopctags := []opcTags{ {"ProductName", "0", "i", "2261", "open62541 OPC UA Server"}, {"ProductUri", "0", "i", "2262", "http://open62541.org"}, {"ManufacturerName", "0", "i", "2263", "open62541"}, @@ -120,7 +120,7 @@ func TestReadClientIntegration(t *testing.T) { {"DateTime", "1", "i", "51037", "0001-01-01T00:00:00Z"}, } - readConfig := ReadClientConfig{ + readConfig := readClientConfig{ InputClientConfig: input.InputClientConfig{ OpcUAClientConfig: opcua.OpcUAClientConfig{ Endpoint: fmt.Sprintf("opc.tcp://%s:%s", container.Address, container.Ports[servicePort]), @@ -138,17 +138,17 @@ func TestReadClientIntegration(t *testing.T) { } for _, tags := range testopctags { - readConfig.RootNodes = append(readConfig.RootNodes, MapOPCTag(tags)) + readConfig.RootNodes = append(readConfig.RootNodes, mapOPCTag(tags)) } - client, err := readConfig.CreateReadClient(testutil.Logger{}) + client, err := readConfig.createReadClient(testutil.Logger{}) require.NoError(t, err) - err = client.Connect() - require.NoError(t, err, "Connect") + err = client.connect() + require.NoError(t, err) for i, v := range client.LastReceivedData { - require.Equal(t, testopctags[i].Want, v.Value) + require.Equal(t, testopctags[i].want, v.Value) } } @@ -168,7 +168,7 @@ func TestReadClientIntegrationAdditionalFields(t *testing.T) { require.NoError(t, container.Start(), "failed to start container") defer container.Terminate() - testopctags := []OPCTags{ + testopctags := []opcTags{ {"ProductName", "0", "i", "2261", "open62541 OPC UA Server"}, {"ProductUri", "0", "i", "2262", "http://open62541.org"}, {"ManufacturerName", "0", "i", "2263", "open62541"}, @@ -196,17 +196,17 @@ func TestReadClientIntegrationAdditionalFields(t *testing.T) { for i, x := range testopctags { now := time.Now() tags := map[string]string{ - "id": fmt.Sprintf("ns=%s;%s=%s", x.Namespace, x.IdentifierType, x.Identifier), + "id": fmt.Sprintf("ns=%s;%s=%s", x.namespace, x.identifierType, x.identifier), } fields := map[string]interface{}{ - x.Name: x.Want, + x.name: x.want, "Quality": testopcquality[i], "DataType": testopctypes[i], } expectedopcmetrics = append(expectedopcmetrics, metric.New("testing", tags, fields, now)) } - readConfig := ReadClientConfig{ + readConfig := readClientConfig{ InputClientConfig: input.InputClientConfig{ OpcUAClientConfig: opcua.OpcUAClientConfig{ Endpoint: fmt.Sprintf("opc.tcp://%s:%s", container.Address, container.Ports[servicePort]), @@ -225,13 +225,13 @@ func TestReadClientIntegrationAdditionalFields(t *testing.T) { } for _, tags := range testopctags { - readConfig.RootNodes = append(readConfig.RootNodes, MapOPCTag(tags)) + readConfig.RootNodes = append(readConfig.RootNodes, mapOPCTag(tags)) } - client, err := readConfig.CreateReadClient(testutil.Logger{}) + client, err := readConfig.createReadClient(testutil.Logger{}) require.NoError(t, err) - require.NoError(t, client.Connect()) + require.NoError(t, client.connect()) actualopcmetrics := make([]telegraf.Metric, 0, len(client.LastReceivedData)) for i := range client.LastReceivedData { @@ -258,13 +258,13 @@ func TestReadClientIntegrationWithPasswordAuth(t *testing.T) { require.NoError(t, err, "failed to start container") defer container.Terminate() - testopctags := []OPCTags{ + testopctags := []opcTags{ {"ProductName", "0", "i", "2261", "open62541 OPC UA Server"}, {"ProductUri", "0", "i", "2262", "http://open62541.org"}, {"ManufacturerName", "0", "i", "2263", "open62541"}, } - readConfig := ReadClientConfig{ + readConfig := readClientConfig{ InputClientConfig: input.InputClientConfig{ OpcUAClientConfig: opcua.OpcUAClientConfig{ Endpoint: fmt.Sprintf("opc.tcp://%s:%s", container.Address, container.Ports[servicePort]), @@ -284,17 +284,17 @@ func TestReadClientIntegrationWithPasswordAuth(t *testing.T) { } for _, tags := range testopctags { - readConfig.RootNodes = append(readConfig.RootNodes, MapOPCTag(tags)) + readConfig.RootNodes = append(readConfig.RootNodes, mapOPCTag(tags)) } - client, err := readConfig.CreateReadClient(testutil.Logger{}) + client, err := readConfig.createReadClient(testutil.Logger{}) require.NoError(t, err) - err = client.Connect() - require.NoError(t, err, "Connect") + err = client.connect() + require.NoError(t, err) for i, v := range client.LastReceivedData { - require.Equal(t, testopctags[i].Want, v.Value) + require.Equal(t, testopctags[i].want, v.Value) } } @@ -369,17 +369,17 @@ use_unregistered_reads = true o, ok := c.Inputs[0].Input.(*OpcUA) require.True(t, ok) - require.Equal(t, "localhost", o.ReadClientConfig.MetricName) - require.Equal(t, "opc.tcp://localhost:4840", o.ReadClientConfig.Endpoint) - require.Equal(t, config.Duration(10*time.Second), o.ReadClientConfig.ConnectTimeout) - require.Equal(t, config.Duration(5*time.Second), o.ReadClientConfig.RequestTimeout) - require.Equal(t, "auto", o.ReadClientConfig.SecurityPolicy) - require.Equal(t, "auto", o.ReadClientConfig.SecurityMode) - require.Equal(t, "/etc/telegraf/cert.pem", o.ReadClientConfig.Certificate) - require.Equal(t, "/etc/telegraf/key.pem", o.ReadClientConfig.PrivateKey) - require.Equal(t, "Anonymous", o.ReadClientConfig.AuthMethod) - require.True(t, o.ReadClientConfig.Username.Empty()) - require.True(t, o.ReadClientConfig.Password.Empty()) + require.Equal(t, "localhost", o.readClientConfig.MetricName) + require.Equal(t, "opc.tcp://localhost:4840", o.readClientConfig.Endpoint) + require.Equal(t, config.Duration(10*time.Second), o.readClientConfig.ConnectTimeout) + require.Equal(t, config.Duration(5*time.Second), o.readClientConfig.RequestTimeout) + require.Equal(t, "auto", o.readClientConfig.SecurityPolicy) + require.Equal(t, "auto", o.readClientConfig.SecurityMode) + require.Equal(t, "/etc/telegraf/cert.pem", o.readClientConfig.Certificate) + require.Equal(t, "/etc/telegraf/key.pem", o.readClientConfig.PrivateKey) + require.Equal(t, "Anonymous", o.readClientConfig.AuthMethod) + require.True(t, o.readClientConfig.Username.Empty()) + require.True(t, o.readClientConfig.Password.Empty()) require.Equal(t, []input.NodeSettings{ { FieldName: "name", @@ -396,7 +396,7 @@ use_unregistered_reads = true TagsSlice: [][]string{{"tag0", "val0"}, {"tag00", "val00"}}, DefaultTags: map[string]string{"tag6": "val6"}, }, - }, o.ReadClientConfig.RootNodes) + }, o.readClientConfig.RootNodes) require.Equal(t, []input.NodeGroupSettings{ { MetricName: "foo", @@ -424,10 +424,10 @@ use_unregistered_reads = true Identifier: "4001", }}, }, - }, o.ReadClientConfig.Groups) - require.Equal(t, opcua.OpcUAWorkarounds{AdditionalValidStatusCodes: []string{"0xC0"}}, o.ReadClientConfig.Workarounds) - require.Equal(t, ReadClientWorkarounds{UseUnregisteredReads: true}, o.ReadClientConfig.ReadClientWorkarounds) - require.Equal(t, []string{"DataType"}, o.ReadClientConfig.OptionalFields) + }, o.readClientConfig.Groups) + require.Equal(t, opcua.OpcUAWorkarounds{AdditionalValidStatusCodes: []string{"0xC0"}}, o.readClientConfig.Workarounds) + require.Equal(t, readClientWorkarounds{UseUnregisteredReads: true}, o.readClientConfig.ReadClientWorkarounds) + require.Equal(t, []string{"DataType"}, o.readClientConfig.OptionalFields) err = o.Init() require.NoError(t, err) require.Len(t, o.client.NodeMetricMapping, 5, "incorrect number of nodes") diff --git a/plugins/inputs/opcua/read_client.go b/plugins/inputs/opcua/read_client.go index 25d8a3f9576e3..f8e04e02ea568 100644 --- a/plugins/inputs/opcua/read_client.go +++ b/plugins/inputs/opcua/read_client.go @@ -15,33 +15,33 @@ import ( "github.com/influxdata/telegraf/selfstat" ) -type ReadClientWorkarounds struct { +type readClientWorkarounds struct { UseUnregisteredReads bool `toml:"use_unregistered_reads"` } -type ReadClientConfig struct { +type readClientConfig struct { ReadRetryTimeout config.Duration `toml:"read_retry_timeout"` ReadRetries uint64 `toml:"read_retry_count"` - ReadClientWorkarounds ReadClientWorkarounds `toml:"request_workarounds"` + ReadClientWorkarounds readClientWorkarounds `toml:"request_workarounds"` input.InputClientConfig } -// ReadClient Requests the current values from the required nodes when gather is called. -type ReadClient struct { +// readClient Requests the current values from the required nodes when gather is called. +type readClient struct { *input.OpcUAInputClient ReadRetryTimeout time.Duration ReadRetries uint64 ReadSuccess selfstat.Stat ReadError selfstat.Stat - Workarounds ReadClientWorkarounds + Workarounds readClientWorkarounds // internal values reqIDs []*ua.ReadValueID ctx context.Context } -func (rc *ReadClientConfig) CreateReadClient(log telegraf.Logger) (*ReadClient, error) { +func (rc *readClientConfig) createReadClient(log telegraf.Logger) (*readClient, error) { inputClient, err := rc.InputClientConfig.CreateInputClient(log) if err != nil { return nil, err @@ -55,7 +55,7 @@ func (rc *ReadClientConfig) CreateReadClient(log telegraf.Logger) (*ReadClient, rc.ReadRetryTimeout = config.Duration(100 * time.Millisecond) } - return &ReadClient{ + return &readClient{ OpcUAInputClient: inputClient, ReadRetryTimeout: time.Duration(rc.ReadRetryTimeout), ReadRetries: rc.ReadRetries, @@ -65,7 +65,7 @@ func (rc *ReadClientConfig) CreateReadClient(log telegraf.Logger) (*ReadClient, }, nil } -func (o *ReadClient) Connect() error { +func (o *readClient) connect() error { o.ctx = context.Background() if err := o.OpcUAClient.Connect(o.ctx); err != nil { @@ -103,14 +103,14 @@ func (o *ReadClient) Connect() error { return nil } -func (o *ReadClient) ensureConnected() error { +func (o *readClient) ensureConnected() error { if o.State() == opcua.Disconnected || o.State() == opcua.Closed { - return o.Connect() + return o.connect() } return nil } -func (o *ReadClient) CurrentValues() ([]telegraf.Metric, error) { +func (o *readClient) currentValues() ([]telegraf.Metric, error) { if err := o.ensureConnected(); err != nil { return nil, err } @@ -142,7 +142,7 @@ func (o *ReadClient) CurrentValues() ([]telegraf.Metric, error) { return metrics, nil } -func (o *ReadClient) read() error { +func (o *readClient) read() error { req := &ua.ReadRequest{ MaxAge: 2000, TimestampsToReturn: ua.TimestampsToReturnBoth, diff --git a/plugins/inputs/opcua_listener/opcua_listener.go b/plugins/inputs/opcua_listener/opcua_listener.go index 9399f9c971869..6085c90c9f94c 100644 --- a/plugins/inputs/opcua_listener/opcua_listener.go +++ b/plugins/inputs/opcua_listener/opcua_listener.go @@ -15,8 +15,8 @@ import ( ) type OpcUaListener struct { - SubscribeClientConfig - client *SubscribeClient + subscribeClientConfig + client *subscribeClient Log telegraf.Logger `toml:"-"` } @@ -36,20 +36,35 @@ func (o *OpcUaListener) Init() (err error) { default: return fmt.Errorf("unknown setting %q for 'connect_fail_behavior'", o.ConnectFailBehavior) } - o.client, err = o.SubscribeClientConfig.CreateSubscribeClient(o.Log) + o.client, err = o.subscribeClientConfig.createSubscribeClient(o.Log) return err } +func (o *OpcUaListener) Start(acc telegraf.Accumulator) error { + return o.connect(acc) +} + func (o *OpcUaListener) Gather(acc telegraf.Accumulator) error { - if o.client.State() == opcua.Connected || o.SubscribeClientConfig.ConnectFailBehavior == "ignore" { + if o.client.State() == opcua.Connected || o.subscribeClientConfig.ConnectFailBehavior == "ignore" { return nil } return o.connect(acc) } +func (o *OpcUaListener) Stop() { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + select { + case <-o.client.stop(ctx): + o.Log.Infof("Unsubscribed OPC UA successfully") + case <-ctx.Done(): // Timeout context + o.Log.Warn("Timeout while stopping OPC UA subscription") + } + cancel() +} + func (o *OpcUaListener) connect(acc telegraf.Accumulator) error { ctx := context.Background() - ch, err := o.client.StartStreamValues(ctx) + ch, err := o.client.startStreamValues(ctx) if err != nil { return err } @@ -68,26 +83,10 @@ func (o *OpcUaListener) connect(acc telegraf.Accumulator) error { return nil } -func (o *OpcUaListener) Start(acc telegraf.Accumulator) error { - return o.connect(acc) -} - -func (o *OpcUaListener) Stop() { - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - select { - case <-o.client.Stop(ctx): - o.Log.Infof("Unsubscribed OPC UA successfully") - case <-ctx.Done(): // Timeout context - o.Log.Warn("Timeout while stopping OPC UA subscription") - } - cancel() -} - -// Add this plugin to telegraf func init() { inputs.Add("opcua_listener", func() telegraf.Input { return &OpcUaListener{ - SubscribeClientConfig: SubscribeClientConfig{ + subscribeClientConfig: subscribeClientConfig{ InputClientConfig: input.InputClientConfig{ OpcUAClientConfig: opcua.OpcUAClientConfig{ Endpoint: "opc.tcp://localhost:4840", diff --git a/plugins/inputs/opcua_listener/opcua_listener_test.go b/plugins/inputs/opcua_listener/opcua_listener_test.go index 86c384b76d709..5484252bbb56d 100644 --- a/plugins/inputs/opcua_listener/opcua_listener_test.go +++ b/plugins/inputs/opcua_listener/opcua_listener_test.go @@ -21,25 +21,25 @@ import ( const servicePort = "4840" -type OPCTags struct { - Name string - Namespace string - IdentifierType string - Identifier string - Want interface{} +type opcTags struct { + name string + namespace string + identifierType string + identifier string + want interface{} } -func MapOPCTag(tags OPCTags) (out input.NodeSettings) { - out.FieldName = tags.Name - out.Namespace = tags.Namespace - out.IdentifierType = tags.IdentifierType - out.Identifier = tags.Identifier +func mapOPCTag(tags opcTags) (out input.NodeSettings) { + out.FieldName = tags.name + out.Namespace = tags.namespace + out.IdentifierType = tags.identifierType + out.Identifier = tags.identifier return out } func TestInitPluginWithBadConnectFailBehaviorValue(t *testing.T) { plugin := OpcUaListener{ - SubscribeClientConfig: SubscribeClientConfig{ + subscribeClientConfig: subscribeClientConfig{ InputClientConfig: input.InputClientConfig{ OpcUAClientConfig: opcua.OpcUAClientConfig{ Endpoint: "opc.tcp://notarealserver:4840", @@ -69,7 +69,7 @@ func TestStartPlugin(t *testing.T) { acc := &testutil.Accumulator{} plugin := OpcUaListener{ - SubscribeClientConfig: SubscribeClientConfig{ + subscribeClientConfig: subscribeClientConfig{ InputClientConfig: input.InputClientConfig{ OpcUAClientConfig: opcua.OpcUAClientConfig{ Endpoint: "opc.tcp://notarealserver:4840", @@ -86,17 +86,17 @@ func TestStartPlugin(t *testing.T) { }, Log: testutil.Logger{}, } - testopctags := []OPCTags{ + testopctags := []opcTags{ {"ProductName", "0", "i", "2261", "open62541 OPC UA Server"}, } for _, tags := range testopctags { - plugin.SubscribeClientConfig.RootNodes = append(plugin.SubscribeClientConfig.RootNodes, MapOPCTag(tags)) + plugin.subscribeClientConfig.RootNodes = append(plugin.subscribeClientConfig.RootNodes, mapOPCTag(tags)) } require.NoError(t, plugin.Init()) err := plugin.Start(acc) require.ErrorContains(t, err, "could not resolve address") - plugin.SubscribeClientConfig.ConnectFailBehavior = "ignore" + plugin.subscribeClientConfig.ConnectFailBehavior = "ignore" require.NoError(t, plugin.Init()) require.NoError(t, plugin.Start(acc)) require.Equal(t, opcua.Disconnected, plugin.client.OpcUAClient.State()) @@ -110,7 +110,7 @@ func TestStartPlugin(t *testing.T) { wait.ForLog("TCP network layer listening on opc.tcp://"), ), } - plugin.SubscribeClientConfig.ConnectFailBehavior = "retry" + plugin.subscribeClientConfig.ConnectFailBehavior = "retry" require.NoError(t, plugin.Init()) require.NoError(t, plugin.Start(acc)) require.Equal(t, opcua.Disconnected, plugin.client.OpcUAClient.State()) @@ -144,7 +144,7 @@ func TestSubscribeClientIntegration(t *testing.T) { require.NoError(t, err, "failed to start container") defer container.Terminate() - testopctags := []OPCTags{ + testopctags := []opcTags{ {"ProductName", "0", "i", "2261", "open62541 OPC UA Server"}, {"ProductUri", "0", "i", "2262", "http://open62541.org"}, {"ManufacturerName", "0", "i", "2263", "open62541"}, @@ -154,12 +154,12 @@ func TestSubscribeClientIntegration(t *testing.T) { } tagsRemaining := make([]string, 0, len(testopctags)) for i, tag := range testopctags { - if tag.Want != nil { - tagsRemaining = append(tagsRemaining, testopctags[i].Name) + if tag.want != nil { + tagsRemaining = append(tagsRemaining, testopctags[i].name) } } - subscribeConfig := SubscribeClientConfig{ + subscribeConfig := subscribeClientConfig{ InputClientConfig: input.InputClientConfig{ OpcUAClientConfig: opcua.OpcUAClientConfig{ Endpoint: fmt.Sprintf("opc.tcp://%s:%s", container.Address, container.Ports[servicePort]), @@ -177,9 +177,9 @@ func TestSubscribeClientIntegration(t *testing.T) { SubscriptionInterval: 0, } for _, tags := range testopctags { - subscribeConfig.RootNodes = append(subscribeConfig.RootNodes, MapOPCTag(tags)) + subscribeConfig.RootNodes = append(subscribeConfig.RootNodes, mapOPCTag(tags)) } - o, err := subscribeConfig.CreateSubscribeClient(testutil.Logger{}) + o, err := subscribeConfig.createSubscribeClient(testutil.Logger{}) require.NoError(t, err) // give initial setup a couple extra attempts, as on CircleCI this can be @@ -188,12 +188,12 @@ func TestSubscribeClientIntegration(t *testing.T) { return o.SetupOptions() == nil }, 5*time.Second, 10*time.Millisecond) - err = o.Connect() + err = o.connect() require.NoError(t, err, "Connection failed") ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) defer cancel() - res, err := o.StartStreamValues(ctx) + res, err := o.startStreamValues(ctx) require.Equal(t, opcua.Connected, o.State()) require.NoError(t, err) @@ -202,16 +202,16 @@ func TestSubscribeClientIntegration(t *testing.T) { case m := <-res: for fieldName, fieldValue := range m.Fields() { for _, tag := range testopctags { - if fieldName != tag.Name { + if fieldName != tag.name { continue } - if tag.Want == nil { - t.Errorf("Tag: %s has value: %v", tag.Name, fieldValue) + if tag.want == nil { + t.Errorf("Tag: %s has value: %v", tag.name, fieldValue) return } - require.Equal(t, tag.Want, fieldValue) + require.Equal(t, tag.want, fieldValue) newRemaining := make([]string, 0, len(tagsRemaining)) for _, remainingTag := range tagsRemaining { @@ -257,7 +257,7 @@ func TestSubscribeClientIntegrationAdditionalFields(t *testing.T) { require.NoError(t, container.Start(), "failed to start container") defer container.Terminate() - testopctags := []OPCTags{ + testopctags := []opcTags{ {"ProductName", "0", "i", "2261", "open62541 OPC UA Server"}, {"ProductUri", "0", "i", "2262", "http://open62541.org"}, {"ManufacturerName", "0", "i", "2263", "open62541"}, @@ -285,10 +285,10 @@ func TestSubscribeClientIntegrationAdditionalFields(t *testing.T) { for i, x := range testopctags { now := time.Now() tags := map[string]string{ - "id": fmt.Sprintf("ns=%s;%s=%s", x.Namespace, x.IdentifierType, x.Identifier), + "id": fmt.Sprintf("ns=%s;%s=%s", x.namespace, x.identifierType, x.identifier), } fields := map[string]interface{}{ - x.Name: x.Want, + x.name: x.want, "Quality": testopcquality[i], "DataType": testopctypes[i], } @@ -297,12 +297,12 @@ func TestSubscribeClientIntegrationAdditionalFields(t *testing.T) { tagsRemaining := make([]string, 0, len(testopctags)) for i, tag := range testopctags { - if tag.Want != nil { - tagsRemaining = append(tagsRemaining, testopctags[i].Name) + if tag.want != nil { + tagsRemaining = append(tagsRemaining, testopctags[i].name) } } - subscribeConfig := SubscribeClientConfig{ + subscribeConfig := subscribeClientConfig{ InputClientConfig: input.InputClientConfig{ OpcUAClientConfig: opcua.OpcUAClientConfig{ Endpoint: fmt.Sprintf("opc.tcp://%s:%s", container.Address, container.Ports[servicePort]), @@ -321,9 +321,9 @@ func TestSubscribeClientIntegrationAdditionalFields(t *testing.T) { SubscriptionInterval: 0, } for _, tags := range testopctags { - subscribeConfig.RootNodes = append(subscribeConfig.RootNodes, MapOPCTag(tags)) + subscribeConfig.RootNodes = append(subscribeConfig.RootNodes, mapOPCTag(tags)) } - o, err := subscribeConfig.CreateSubscribeClient(testutil.Logger{}) + o, err := subscribeConfig.createSubscribeClient(testutil.Logger{}) require.NoError(t, err) // give initial setup a couple extra attempts, as on CircleCI this can be @@ -332,11 +332,11 @@ func TestSubscribeClientIntegrationAdditionalFields(t *testing.T) { return o.SetupOptions() == nil }, 5*time.Second, 10*time.Millisecond) - require.NoError(t, o.Connect(), "Connection failed") + require.NoError(t, o.connect(), "Connection failed") ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) defer cancel() - res, err := o.StartStreamValues(ctx) + res, err := o.startStreamValues(ctx) require.NoError(t, err) for { @@ -344,12 +344,12 @@ func TestSubscribeClientIntegrationAdditionalFields(t *testing.T) { case m := <-res: for fieldName, fieldValue := range m.Fields() { for _, tag := range testopctags { - if fieldName != tag.Name { + if fieldName != tag.name { continue } // nil-value tags should not be sent from server, error if one does - if tag.Want == nil { - t.Errorf("Tag: %s has value: %v", tag.Name, fieldValue) + if tag.want == nil { + t.Errorf("Tag: %s has value: %v", tag.name, fieldValue) return } @@ -434,19 +434,19 @@ additional_valid_status_codes = ["0xC0"] o, ok := c.Inputs[0].Input.(*OpcUaListener) require.True(t, ok) - require.Equal(t, "localhost", o.SubscribeClientConfig.MetricName) - require.Equal(t, "opc.tcp://localhost:4840", o.SubscribeClientConfig.Endpoint) - require.Equal(t, config.Duration(10*time.Second), o.SubscribeClientConfig.ConnectTimeout) - require.Equal(t, config.Duration(5*time.Second), o.SubscribeClientConfig.RequestTimeout) - require.Equal(t, config.Duration(200*time.Millisecond), o.SubscribeClientConfig.SubscriptionInterval) - require.Equal(t, "error", o.SubscribeClientConfig.ConnectFailBehavior) - require.Equal(t, "auto", o.SubscribeClientConfig.SecurityPolicy) - require.Equal(t, "auto", o.SubscribeClientConfig.SecurityMode) - require.Equal(t, "/etc/telegraf/cert.pem", o.SubscribeClientConfig.Certificate) - require.Equal(t, "/etc/telegraf/key.pem", o.SubscribeClientConfig.PrivateKey) - require.Equal(t, "Anonymous", o.SubscribeClientConfig.AuthMethod) - require.True(t, o.SubscribeClientConfig.Username.Empty()) - require.True(t, o.SubscribeClientConfig.Password.Empty()) + require.Equal(t, "localhost", o.subscribeClientConfig.MetricName) + require.Equal(t, "opc.tcp://localhost:4840", o.subscribeClientConfig.Endpoint) + require.Equal(t, config.Duration(10*time.Second), o.subscribeClientConfig.ConnectTimeout) + require.Equal(t, config.Duration(5*time.Second), o.subscribeClientConfig.RequestTimeout) + require.Equal(t, config.Duration(200*time.Millisecond), o.subscribeClientConfig.SubscriptionInterval) + require.Equal(t, "error", o.subscribeClientConfig.ConnectFailBehavior) + require.Equal(t, "auto", o.subscribeClientConfig.SecurityPolicy) + require.Equal(t, "auto", o.subscribeClientConfig.SecurityMode) + require.Equal(t, "/etc/telegraf/cert.pem", o.subscribeClientConfig.Certificate) + require.Equal(t, "/etc/telegraf/key.pem", o.subscribeClientConfig.PrivateKey) + require.Equal(t, "Anonymous", o.subscribeClientConfig.AuthMethod) + require.True(t, o.subscribeClientConfig.Username.Empty()) + require.True(t, o.subscribeClientConfig.Password.Empty()) require.Equal(t, []input.NodeSettings{ { FieldName: "name", @@ -460,7 +460,7 @@ additional_valid_status_codes = ["0xC0"] IdentifierType: "s", Identifier: "two", }, - }, o.SubscribeClientConfig.RootNodes) + }, o.subscribeClientConfig.RootNodes) require.Equal(t, []input.NodeGroupSettings{ { MetricName: "foo", @@ -484,9 +484,9 @@ additional_valid_status_codes = ["0xC0"] TagsSlice: [][]string{{"tag1", "override"}}, }}, }, - }, o.SubscribeClientConfig.Groups) - require.Equal(t, opcua.OpcUAWorkarounds{AdditionalValidStatusCodes: []string{"0xC0"}}, o.SubscribeClientConfig.Workarounds) - require.Equal(t, []string{"DataType"}, o.SubscribeClientConfig.OptionalFields) + }, o.subscribeClientConfig.Groups) + require.Equal(t, opcua.OpcUAWorkarounds{AdditionalValidStatusCodes: []string{"0xC0"}}, o.subscribeClientConfig.Workarounds) + require.Equal(t, []string{"DataType"}, o.subscribeClientConfig.OptionalFields) } func TestSubscribeClientConfigWithMonitoringParams(t *testing.T) { @@ -548,11 +548,11 @@ deadband_value = 100.0 }, }}, }, - }, o.SubscribeClientConfig.Groups) + }, o.subscribeClientConfig.Groups) } func TestSubscribeClientConfigInvalidTrigger(t *testing.T) { - subscribeConfig := SubscribeClientConfig{ + subscribeConfig := subscribeClientConfig{ InputClientConfig: input.InputClientConfig{ OpcUAClientConfig: opcua.OpcUAClientConfig{ Endpoint: "opc.tcp://localhost:4840", @@ -581,12 +581,12 @@ func TestSubscribeClientConfigInvalidTrigger(t *testing.T) { }, }) - _, err := subscribeConfig.CreateSubscribeClient(testutil.Logger{}) + _, err := subscribeConfig.createSubscribeClient(testutil.Logger{}) require.ErrorContains(t, err, "trigger 'not_valid' not supported, node 'ns=3;i=1'") } func TestSubscribeClientConfigMissingTrigger(t *testing.T) { - subscribeConfig := SubscribeClientConfig{ + subscribeConfig := subscribeClientConfig{ InputClientConfig: input.InputClientConfig{ OpcUAClientConfig: opcua.OpcUAClientConfig{ Endpoint: "opc.tcp://localhost:4840", @@ -615,12 +615,12 @@ func TestSubscribeClientConfigMissingTrigger(t *testing.T) { }, }) - _, err := subscribeConfig.CreateSubscribeClient(testutil.Logger{}) + _, err := subscribeConfig.createSubscribeClient(testutil.Logger{}) require.ErrorContains(t, err, "trigger '' not supported, node 'ns=3;i=1'") } func TestSubscribeClientConfigInvalidDeadbandType(t *testing.T) { - subscribeConfig := SubscribeClientConfig{ + subscribeConfig := subscribeClientConfig{ InputClientConfig: input.InputClientConfig{ OpcUAClientConfig: opcua.OpcUAClientConfig{ Endpoint: "opc.tcp://localhost:4840", @@ -650,12 +650,12 @@ func TestSubscribeClientConfigInvalidDeadbandType(t *testing.T) { }, }) - _, err := subscribeConfig.CreateSubscribeClient(testutil.Logger{}) + _, err := subscribeConfig.createSubscribeClient(testutil.Logger{}) require.ErrorContains(t, err, "deadband_type 'not_valid' not supported, node 'ns=3;i=1'") } func TestSubscribeClientConfigMissingDeadbandType(t *testing.T) { - subscribeConfig := SubscribeClientConfig{ + subscribeConfig := subscribeClientConfig{ InputClientConfig: input.InputClientConfig{ OpcUAClientConfig: opcua.OpcUAClientConfig{ Endpoint: "opc.tcp://localhost:4840", @@ -684,12 +684,12 @@ func TestSubscribeClientConfigMissingDeadbandType(t *testing.T) { }, }) - _, err := subscribeConfig.CreateSubscribeClient(testutil.Logger{}) + _, err := subscribeConfig.createSubscribeClient(testutil.Logger{}) require.ErrorContains(t, err, "deadband_type '' not supported, node 'ns=3;i=1'") } func TestSubscribeClientConfigInvalidDeadbandValue(t *testing.T) { - subscribeConfig := SubscribeClientConfig{ + subscribeConfig := subscribeClientConfig{ InputClientConfig: input.InputClientConfig{ OpcUAClientConfig: opcua.OpcUAClientConfig{ Endpoint: "opc.tcp://localhost:4840", @@ -721,12 +721,12 @@ func TestSubscribeClientConfigInvalidDeadbandValue(t *testing.T) { }, }) - _, err := subscribeConfig.CreateSubscribeClient(testutil.Logger{}) + _, err := subscribeConfig.createSubscribeClient(testutil.Logger{}) require.ErrorContains(t, err, "negative deadband_value not supported, node 'ns=3;i=1'") } func TestSubscribeClientConfigMissingDeadbandValue(t *testing.T) { - subscribeConfig := SubscribeClientConfig{ + subscribeConfig := subscribeClientConfig{ InputClientConfig: input.InputClientConfig{ OpcUAClientConfig: opcua.OpcUAClientConfig{ Endpoint: "opc.tcp://localhost:4840", @@ -756,12 +756,12 @@ func TestSubscribeClientConfigMissingDeadbandValue(t *testing.T) { }, }) - _, err := subscribeConfig.CreateSubscribeClient(testutil.Logger{}) + _, err := subscribeConfig.createSubscribeClient(testutil.Logger{}) require.ErrorContains(t, err, "deadband_value was not set, node 'ns=3;i=1'") } func TestSubscribeClientConfigValidMonitoringParams(t *testing.T) { - subscribeConfig := SubscribeClientConfig{ + subscribeConfig := subscribeClientConfig{ InputClientConfig: input.InputClientConfig{ OpcUAClientConfig: opcua.OpcUAClientConfig{ Endpoint: "opc.tcp://localhost:4840", @@ -799,7 +799,7 @@ func TestSubscribeClientConfigValidMonitoringParams(t *testing.T) { }, }) - subClient, err := subscribeConfig.CreateSubscribeClient(testutil.Logger{}) + subClient, err := subscribeConfig.createSubscribeClient(testutil.Logger{}) require.NoError(t, err) require.Equal(t, &ua.MonitoringParameters{ SamplingInterval: 50, diff --git a/plugins/inputs/opcua_listener/subscribe_client.go b/plugins/inputs/opcua_listener/subscribe_client.go index 320262bafbf60..1f70f006e7b6b 100644 --- a/plugins/inputs/opcua_listener/subscribe_client.go +++ b/plugins/inputs/opcua_listener/subscribe_client.go @@ -16,15 +16,15 @@ import ( "github.com/influxdata/telegraf/plugins/common/opcua/input" ) -type SubscribeClientConfig struct { +type subscribeClientConfig struct { input.InputClientConfig SubscriptionInterval config.Duration `toml:"subscription_interval"` ConnectFailBehavior string `toml:"connect_fail_behavior"` } -type SubscribeClient struct { +type subscribeClient struct { *input.OpcUAInputClient - Config SubscribeClientConfig + Config subscribeClientConfig sub *opcua.Subscription monitoredItemsReqs []*ua.MonitoredItemCreateRequest @@ -81,7 +81,7 @@ func assignConfigValuesToRequest(req *ua.MonitoredItemCreateRequest, monParams * return nil } -func (sc *SubscribeClientConfig) CreateSubscribeClient(log telegraf.Logger) (*SubscribeClient, error) { +func (sc *subscribeClientConfig) createSubscribeClient(log telegraf.Logger) (*subscribeClient, error) { client, err := sc.InputClientConfig.CreateInputClient(log) if err != nil { return nil, err @@ -92,7 +92,7 @@ func (sc *SubscribeClientConfig) CreateSubscribeClient(log telegraf.Logger) (*Su } processingCtx, processingCancel := context.WithCancel(context.Background()) - subClient := &SubscribeClient{ + subClient := &subscribeClient{ OpcUAInputClient: client, Config: *sc, monitoredItemsReqs: make([]*ua.MonitoredItemCreateRequest, len(client.NodeIDs)), @@ -118,7 +118,7 @@ func (sc *SubscribeClientConfig) CreateSubscribeClient(log telegraf.Logger) (*Su return subClient, nil } -func (o *SubscribeClient) Connect() error { +func (o *subscribeClient) connect() error { err := o.OpcUAClient.Connect(o.ctx) if err != nil { return err @@ -137,7 +137,7 @@ func (o *SubscribeClient) Connect() error { return nil } -func (o *SubscribeClient) Stop(ctx context.Context) <-chan struct{} { +func (o *subscribeClient) stop(ctx context.Context) <-chan struct{} { o.Log.Debugf("Stopping OPC subscription...") if o.State() != opcuaclient.Connected { return nil @@ -152,8 +152,8 @@ func (o *SubscribeClient) Stop(ctx context.Context) <-chan struct{} { return closing } -func (o *SubscribeClient) StartStreamValues(ctx context.Context) (<-chan telegraf.Metric, error) { - err := o.Connect() +func (o *subscribeClient) startStreamValues(ctx context.Context) (<-chan telegraf.Metric, error) { + err := o.connect() if err != nil { switch o.Config.ConnectFailBehavior { case "retry": @@ -191,7 +191,7 @@ func (o *SubscribeClient) StartStreamValues(ctx context.Context) (<-chan telegra return o.metrics, nil } -func (o *SubscribeClient) processReceivedNotifications() { +func (o *subscribeClient) processReceivedNotifications() { for { select { case <-o.ctx.Done(): diff --git a/plugins/inputs/openldap/openldap.go b/plugins/inputs/openldap/openldap.go index 81c63d13c4796..c89b623f9c802 100644 --- a/plugins/inputs/openldap/openldap.go +++ b/plugins/inputs/openldap/openldap.go @@ -7,7 +7,7 @@ import ( "strconv" "strings" - ldap "github.com/go-ldap/ldap/v3" + "github.com/go-ldap/ldap/v3" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/common/tls" @@ -17,56 +17,41 @@ import ( //go:embed sample.conf var sampleConfig string +var ( + searchBase = "cn=Monitor" + searchFilter = "(|(objectClass=monitorCounterObject)(objectClass=monitorOperation)(objectClass=monitoredObject))" + searchAttrs = []string{"monitorCounter", "monitorOpInitiated", "monitorOpCompleted", "monitoredInfo"} + attrTranslate = map[string]string{ + "monitorCounter": "", + "monitoredInfo": "", + "monitorOpInitiated": "_initiated", + "monitorOpCompleted": "_completed", + "olmMDBPagesMax": "_mdb_pages_max", + "olmMDBPagesUsed": "_mdb_pages_used", + "olmMDBPagesFree": "_mdb_pages_free", + "olmMDBReadersMax": "_mdb_readers_max", + "olmMDBReadersUsed": "_mdb_readers_used", + "olmMDBEntries": "_mdb_entries", + } +) + type Openldap struct { - Host string - Port int + Host string `toml:"host"` + Port int `toml:"port"` SSL string `toml:"ssl" deprecated:"1.7.0;1.35.0;use 'tls' instead"` TLS string `toml:"tls"` - InsecureSkipVerify bool + InsecureSkipVerify bool `toml:"insecure_skip_verify"` SSLCA string `toml:"ssl_ca" deprecated:"1.7.0;1.35.0;use 'tls_ca' instead"` TLSCA string `toml:"tls_ca"` - BindDn string - BindPassword string - ReverseMetricNames bool -} - -var searchBase = "cn=Monitor" -var searchFilter = "(|(objectClass=monitorCounterObject)(objectClass=monitorOperation)(objectClass=monitoredObject))" -var searchAttrs = []string{"monitorCounter", "monitorOpInitiated", "monitorOpCompleted", "monitoredInfo"} -var attrTranslate = map[string]string{ - "monitorCounter": "", - "monitoredInfo": "", - "monitorOpInitiated": "_initiated", - "monitorOpCompleted": "_completed", - "olmMDBPagesMax": "_mdb_pages_max", - "olmMDBPagesUsed": "_mdb_pages_used", - "olmMDBPagesFree": "_mdb_pages_free", - "olmMDBReadersMax": "_mdb_readers_max", - "olmMDBReadersUsed": "_mdb_readers_used", - "olmMDBEntries": "_mdb_entries", -} - -// return an initialized Openldap -func NewOpenldap() *Openldap { - return &Openldap{ - Host: "localhost", - Port: 389, - SSL: "", - TLS: "", - InsecureSkipVerify: false, - SSLCA: "", - TLSCA: "", - BindDn: "", - BindPassword: "", - ReverseMetricNames: false, - } + BindDn string `toml:"bind_dn"` + BindPassword string `toml:"bind_password"` + ReverseMetricNames bool `toml:"reverse_metric_names"` } func (*Openldap) SampleConfig() string { return sampleConfig } -// gather metrics func (o *Openldap) Gather(acc telegraf.Accumulator) error { if o.TLS == "" { o.TLS = o.SSL @@ -198,6 +183,21 @@ func dnToMetric(dn string, o *Openldap) string { return strings.ReplaceAll(metricName, ",", "") } +func newOpenldap() *Openldap { + return &Openldap{ + Host: "localhost", + Port: 389, + SSL: "", + TLS: "", + InsecureSkipVerify: false, + SSLCA: "", + TLSCA: "", + BindDn: "", + BindPassword: "", + ReverseMetricNames: false, + } +} + func init() { - inputs.Add("openldap", func() telegraf.Input { return NewOpenldap() }) + inputs.Add("openldap", func() telegraf.Input { return newOpenldap() }) } diff --git a/plugins/inputs/openntpd/openntpd.go b/plugins/inputs/openntpd/openntpd.go index 9066c6f5dc268..9daf61673bbc9 100644 --- a/plugins/inputs/openntpd/openntpd.go +++ b/plugins/inputs/openntpd/openntpd.go @@ -20,60 +20,38 @@ import ( //go:embed sample.conf var sampleConfig string -// Mapping of the ntpctl tag key to the index in the command output -var tagI = map[string]int{ - "stratum": 2, -} - -// Mapping of float metrics to their index in the command output -var floatI = map[string]int{ - "offset": 5, - "delay": 6, - "jitter": 7, -} +var ( + defaultBinary = "/usr/sbin/ntpctl" + defaultTimeout = config.Duration(5 * time.Second) -// Mapping of int metrics to their index in the command output -var intI = map[string]int{ - "wt": 0, - "tl": 1, - "next": 3, - "poll": 4, -} - -type runner func(cmdName string, timeout config.Duration, useSudo bool) (*bytes.Buffer, error) + // Mapping of the ntpctl tag key to the index in the command output + tagI = map[string]int{ + "stratum": 2, + } + // Mapping of float metrics to their index in the command output + floatI = map[string]int{ + "offset": 5, + "delay": 6, + "jitter": 7, + } + // Mapping of int metrics to their index in the command output + intI = map[string]int{ + "wt": 0, + "tl": 1, + "next": 3, + "poll": 4, + } +) -// Openntpd is used to store configuration values type Openntpd struct { - Binary string - Timeout config.Duration - UseSudo bool + Binary string `toml:"binary"` + Timeout config.Duration `toml:"timeout"` + UseSudo bool `toml:"use_sudo"` run runner } -var defaultBinary = "/usr/sbin/ntpctl" -var defaultTimeout = config.Duration(5 * time.Second) - -// Shell out to ntpctl and return the output -func openntpdRunner(cmdName string, timeout config.Duration, useSudo bool) (*bytes.Buffer, error) { - cmdArgs := []string{"-s", "peers"} - - cmd := exec.Command(cmdName, cmdArgs...) - - if useSudo { - cmdArgs = append([]string{cmdName}, cmdArgs...) - cmd = exec.Command("sudo", cmdArgs...) - } - - var out bytes.Buffer - cmd.Stdout = &out - err := internal.RunTimeout(cmd, time.Duration(timeout)) - if err != nil { - return &out, fmt.Errorf("error running ntpctl: %w", err) - } - - return &out, nil -} +type runner func(cmdName string, timeout config.Duration, useSudo bool) (*bytes.Buffer, error) func (*Openntpd) SampleConfig() string { return sampleConfig @@ -190,6 +168,27 @@ func (n *Openntpd) Gather(acc telegraf.Accumulator) error { return nil } +// Shell out to ntpctl and return the output +func openntpdRunner(cmdName string, timeout config.Duration, useSudo bool) (*bytes.Buffer, error) { + cmdArgs := []string{"-s", "peers"} + + cmd := exec.Command(cmdName, cmdArgs...) + + if useSudo { + cmdArgs = append([]string{cmdName}, cmdArgs...) + cmd = exec.Command("sudo", cmdArgs...) + } + + var out bytes.Buffer + cmd.Stdout = &out + err := internal.RunTimeout(cmd, time.Duration(timeout)) + if err != nil { + return &out, fmt.Errorf("error running ntpctl: %w", err) + } + + return &out, nil +} + func init() { inputs.Add("openntpd", func() telegraf.Input { return &Openntpd{ diff --git a/plugins/inputs/openntpd/openntpd_test.go b/plugins/inputs/openntpd/openntpd_test.go index df3b7187b094f..0ea15d0aa5703 100644 --- a/plugins/inputs/openntpd/openntpd_test.go +++ b/plugins/inputs/openntpd/openntpd_test.go @@ -10,7 +10,7 @@ import ( "github.com/influxdata/telegraf/testutil" ) -func OpenntpdCTL(output string) func(string, config.Duration, bool) (*bytes.Buffer, error) { +func openntpdCTL(output string) func(string, config.Duration, bool) (*bytes.Buffer, error) { return func(string, config.Duration, bool) (*bytes.Buffer, error) { return bytes.NewBufferString(output), nil } @@ -19,7 +19,7 @@ func OpenntpdCTL(output string) func(string, config.Duration, bool) (*bytes.Buff func TestParseSimpleOutput(t *testing.T) { acc := &testutil.Accumulator{} v := &Openntpd{ - run: OpenntpdCTL(simpleOutput), + run: openntpdCTL(simpleOutput), } err := v.Gather(acc) @@ -50,7 +50,7 @@ func TestParseSimpleOutput(t *testing.T) { func TestParseSimpleOutputwithStatePrefix(t *testing.T) { acc := &testutil.Accumulator{} v := &Openntpd{ - run: OpenntpdCTL(simpleOutputwithStatePrefix), + run: openntpdCTL(simpleOutputwithStatePrefix), } err := v.Gather(acc) @@ -82,7 +82,7 @@ func TestParseSimpleOutputwithStatePrefix(t *testing.T) { func TestParseSimpleOutputInvalidPeer(t *testing.T) { acc := &testutil.Accumulator{} v := &Openntpd{ - run: OpenntpdCTL(simpleOutputInvalidPeer), + run: openntpdCTL(simpleOutputInvalidPeer), } err := v.Gather(acc) @@ -110,7 +110,7 @@ func TestParseSimpleOutputInvalidPeer(t *testing.T) { func TestParseSimpleOutputServersDNSError(t *testing.T) { acc := &testutil.Accumulator{} v := &Openntpd{ - run: OpenntpdCTL(simpleOutputServersDNSError), + run: openntpdCTL(simpleOutputServersDNSError), } err := v.Gather(acc) @@ -152,7 +152,7 @@ func TestParseSimpleOutputServersDNSError(t *testing.T) { func TestParseSimpleOutputServerDNSError(t *testing.T) { acc := &testutil.Accumulator{} v := &Openntpd{ - run: OpenntpdCTL(simpleOutputServerDNSError), + run: openntpdCTL(simpleOutputServerDNSError), } err := v.Gather(acc) @@ -180,7 +180,7 @@ func TestParseSimpleOutputServerDNSError(t *testing.T) { func TestParseFullOutput(t *testing.T) { acc := &testutil.Accumulator{} v := &Openntpd{ - run: OpenntpdCTL(fullOutput), + run: openntpdCTL(fullOutput), } err := v.Gather(acc) diff --git a/plugins/inputs/opensearch_query/aggregation.bucket.go b/plugins/inputs/opensearch_query/aggregation.bucket.go index 87669e5c7ad0f..fcde3cc27320f 100644 --- a/plugins/inputs/opensearch_query/aggregation.bucket.go +++ b/plugins/inputs/opensearch_query/aggregation.bucket.go @@ -5,9 +5,9 @@ import ( "fmt" ) -type BucketAggregationRequest map[string]*aggregationFunction +type bucketAggregationRequest map[string]*aggregationFunction -func (b BucketAggregationRequest) AddAggregation(name, aggType, field string) error { +func (b bucketAggregationRequest) addAggregation(name, aggType, field string) error { switch aggType { case "terms": default: @@ -22,11 +22,11 @@ func (b BucketAggregationRequest) AddAggregation(name, aggType, field string) er return nil } -func (b BucketAggregationRequest) AddNestedAggregation(name string, a AggregationRequest) { +func (b bucketAggregationRequest) addNestedAggregation(name string, a aggregationRequest) { b[name].nested = a } -func (b BucketAggregationRequest) BucketSize(name string, size int) error { +func (b bucketAggregationRequest) bucketSize(name string, size int) error { if size <= 0 { return errors.New("invalid size; must be integer value > 0") } @@ -35,11 +35,11 @@ func (b BucketAggregationRequest) BucketSize(name string, size int) error { return fmt.Errorf("aggregation %q not found", name) } - b[name].Size(size) + b[name].setSize(size) return nil } -func (b BucketAggregationRequest) Missing(name, missing string) { - b[name].Missing(missing) +func (b bucketAggregationRequest) missing(name, missing string) { + b[name].setMissing(missing) } diff --git a/plugins/inputs/opensearch_query/aggregation.go b/plugins/inputs/opensearch_query/aggregation.go index 4dc8f7b070ec3..e4c8f68ad5875 100644 --- a/plugins/inputs/opensearch_query/aggregation.go +++ b/plugins/inputs/opensearch_query/aggregation.go @@ -4,14 +4,8 @@ import ( "encoding/json" ) -type AggregationRequest interface { - AddAggregation(string, string, string) error -} - -type NestedAggregation interface { - Nested(string, AggregationRequest) - Missing(string) - Size(int) +type aggregationRequest interface { + addAggregation(string, string, string) error } type aggregationFunction struct { @@ -20,7 +14,7 @@ type aggregationFunction struct { size int missing string - nested AggregationRequest + nested aggregationRequest } func (a *aggregationFunction) MarshalJSON() ([]byte, error) { @@ -45,11 +39,11 @@ func (a *aggregationFunction) MarshalJSON() ([]byte, error) { return json.Marshal(agg) } -func (a *aggregationFunction) Size(size int) { +func (a *aggregationFunction) setSize(size int) { a.size = size } -func (a *aggregationFunction) Missing(missing string) { +func (a *aggregationFunction) setMissing(missing string) { a.missing = missing } diff --git a/plugins/inputs/opensearch_query/aggregation.metric.go b/plugins/inputs/opensearch_query/aggregation.metric.go index d18296757af0e..084b9e2c3de0f 100644 --- a/plugins/inputs/opensearch_query/aggregation.metric.go +++ b/plugins/inputs/opensearch_query/aggregation.metric.go @@ -2,9 +2,9 @@ package opensearch_query import "fmt" -type MetricAggregationRequest map[string]*aggregationFunction +type metricAggregationRequest map[string]*aggregationFunction -func (m MetricAggregationRequest) AddAggregation(name, aggType, field string) error { +func (m metricAggregationRequest) addAggregation(name, aggType, field string) error { if t := getAggregationFunctionType(aggType); t != "metric" { return fmt.Errorf("aggregation function %q not supported", aggType) } diff --git a/plugins/inputs/opensearch_query/aggregation.response.go b/plugins/inputs/opensearch_query/aggregation.response.go index 54c5f173feb57..122a0cabb0407 100644 --- a/plugins/inputs/opensearch_query/aggregation.response.go +++ b/plugins/inputs/opensearch_query/aggregation.response.go @@ -36,7 +36,7 @@ type bucketData struct { subaggregation aggregation } -func (a *aggregationResponse) GetMetrics(acc telegraf.Accumulator, measurement string) error { +func (a *aggregationResponse) getMetrics(acc telegraf.Accumulator, measurement string) error { // Simple case (no aggregations) if a.Aggregations == nil { tags := make(map[string]string) @@ -47,20 +47,20 @@ func (a *aggregationResponse) GetMetrics(acc telegraf.Accumulator, measurement s return nil } - return a.Aggregations.GetMetrics(acc, measurement, a.Hits.TotalHits.Value, make(map[string]string)) + return a.Aggregations.getMetrics(acc, measurement, a.Hits.TotalHits.Value, make(map[string]string)) } -func (a *aggregation) GetMetrics(acc telegraf.Accumulator, measurement string, docCount int64, tags map[string]string) error { +func (a *aggregation) getMetrics(acc telegraf.Accumulator, measurement string, docCount int64, tags map[string]string) error { var err error fields := make(map[string]interface{}) for name, agg := range *a { - if agg.IsAggregation() { + if agg.isAggregation() { for _, bucket := range agg.buckets { tt := map[string]string{name: bucket.Key} for k, v := range tags { tt[k] = v } - err = bucket.subaggregation.GetMetrics(acc, measurement, bucket.DocumentCount, tt) + err = bucket.subaggregation.getMetrics(acc, measurement, bucket.DocumentCount, tt) if err != nil { return err } @@ -101,7 +101,7 @@ func (a *aggregateValue) UnmarshalJSON(bytes []byte) error { return json.Unmarshal(bytes, &a.metrics) } -func (a *aggregateValue) IsAggregation() bool { +func (a *aggregateValue) isAggregation() bool { return !(a.buckets == nil) } diff --git a/plugins/inputs/opensearch_query/opensearch_query.go b/plugins/inputs/opensearch_query/opensearch_query.go index b9cefce59e025..833bbaab960c1 100644 --- a/plugins/inputs/opensearch_query/opensearch_query.go +++ b/plugins/inputs/opensearch_query/opensearch_query.go @@ -25,7 +25,6 @@ import ( //go:embed sample.conf var sampleConfig string -// OpensearchQuery struct type OpensearchQuery struct { URLs []string `toml:"urls"` Username config.Secret `toml:"username"` @@ -41,7 +40,6 @@ type OpensearchQuery struct { osClient *opensearch.Client } -// osAggregation struct type osAggregation struct { Index string `toml:"index"` MeasurementName string `toml:"measurement_name"` @@ -56,14 +54,13 @@ type osAggregation struct { MissingTagValue string `toml:"missing_tag_value"` mapMetricFields map[string]string - aggregation AggregationRequest + aggregation aggregationRequest } func (*OpensearchQuery) SampleConfig() string { return sampleConfig } -// Init the plugin. func (o *OpensearchQuery) Init() error { if o.URLs == nil { return errors.New("no urls defined") @@ -89,19 +86,21 @@ func (o *OpensearchQuery) Init() error { return nil } -func (o *OpensearchQuery) initAggregation(agg osAggregation, i int) (err error) { - for _, metricField := range agg.MetricFields { - if _, ok := agg.mapMetricFields[metricField]; !ok { - return fmt.Errorf("metric field %q not found on index %q", metricField, agg.Index) - } - } +func (o *OpensearchQuery) Gather(acc telegraf.Accumulator) error { + var wg sync.WaitGroup - err = agg.buildAggregationQuery() - if err != nil { - return fmt.Errorf("error building aggregation: %w", err) + for _, agg := range o.Aggregations { + wg.Add(1) + go func(agg osAggregation) { + defer wg.Done() + err := o.osAggregationQuery(acc, agg) + if err != nil { + acc.AddError(fmt.Errorf("opensearch query aggregation %q: %w ", agg.MeasurementName, err)) + } + }(agg) } - o.Aggregations[i] = agg + wg.Wait() return nil } @@ -136,22 +135,19 @@ func (o *OpensearchQuery) newClient() error { return err } -// Gather writes the results of the queries from OpenSearch to the Accumulator. -func (o *OpensearchQuery) Gather(acc telegraf.Accumulator) error { - var wg sync.WaitGroup +func (o *OpensearchQuery) initAggregation(agg osAggregation, i int) (err error) { + for _, metricField := range agg.MetricFields { + if _, ok := agg.mapMetricFields[metricField]; !ok { + return fmt.Errorf("metric field %q not found on index %q", metricField, agg.Index) + } + } - for _, agg := range o.Aggregations { - wg.Add(1) - go func(agg osAggregation) { - defer wg.Done() - err := o.osAggregationQuery(acc, agg) - if err != nil { - acc.AddError(fmt.Errorf("opensearch query aggregation %q: %w ", agg.MeasurementName, err)) - } - }(agg) + err = agg.buildAggregationQuery() + if err != nil { + return fmt.Errorf("error building aggregation: %w", err) } - wg.Wait() + o.Aggregations[i] = agg return nil } @@ -164,16 +160,7 @@ func (o *OpensearchQuery) osAggregationQuery(acc telegraf.Accumulator, aggregati return err } - return searchResult.GetMetrics(acc, aggregation.MeasurementName) -} - -func init() { - inputs.Add("opensearch_query", func() telegraf.Input { - return &OpensearchQuery{ - Timeout: config.Duration(time.Second * 5), - HealthCheckInterval: config.Duration(time.Second * 10), - } - }) + return searchResult.getMetrics(acc, aggregation.MeasurementName) } func (o *OpensearchQuery) runAggregationQuery(ctx context.Context, aggregation osAggregation) (*aggregationResponse, error) { @@ -184,13 +171,13 @@ func (o *OpensearchQuery) runAggregationQuery(ctx context.Context, aggregation o filterQuery = "*" } - aq := &Query{ + aq := &query{ Size: 0, Aggregations: aggregation.aggregation, Query: nil, } - boolQuery := &BoolQuery{ + boolQuery := &boolQuery{ FilterQueryString: filterQuery, TimestampField: aggregation.DateField, TimeRangeFrom: from, @@ -231,8 +218,8 @@ func (o *OpensearchQuery) runAggregationQuery(ctx context.Context, aggregation o } func (aggregation *osAggregation) buildAggregationQuery() error { - var agg AggregationRequest - agg = &MetricAggregationRequest{} + var agg aggregationRequest + agg = &metricAggregationRequest{} // create one aggregation per metric field found & function defined for numeric fields for k, v := range aggregation.mapMetricFields { @@ -242,7 +229,7 @@ func (aggregation *osAggregation) buildAggregationQuery() error { continue } - err := agg.AddAggregation(strings.ReplaceAll(k, ".", "_")+"_"+aggregation.MetricFunction, aggregation.MetricFunction, k) + err := agg.addAggregation(strings.ReplaceAll(k, ".", "_")+"_"+aggregation.MetricFunction, aggregation.MetricFunction, k) if err != nil { return err } @@ -250,21 +237,21 @@ func (aggregation *osAggregation) buildAggregationQuery() error { // create a terms aggregation per tag for _, term := range aggregation.Tags { - bucket := &BucketAggregationRequest{} + bucket := &bucketAggregationRequest{} name := strings.ReplaceAll(term, ".", "_") - err := bucket.AddAggregation(name, "terms", term) + err := bucket.addAggregation(name, "terms", term) if err != nil { return err } - err = bucket.BucketSize(name, 1000) + err = bucket.bucketSize(name, 1000) if err != nil { return err } if aggregation.IncludeMissingTag && aggregation.MissingTagValue != "" { - bucket.Missing(name, aggregation.MissingTagValue) + bucket.missing(name, aggregation.MissingTagValue) } - bucket.AddNestedAggregation(name, agg) + bucket.addNestedAggregation(name, agg) agg = bucket } @@ -273,3 +260,12 @@ func (aggregation *osAggregation) buildAggregationQuery() error { return nil } + +func init() { + inputs.Add("opensearch_query", func() telegraf.Input { + return &OpensearchQuery{ + Timeout: config.Duration(time.Second * 5), + HealthCheckInterval: config.Duration(time.Second * 10), + } + }) +} diff --git a/plugins/inputs/opensearch_query/opensearch_query_test.go b/plugins/inputs/opensearch_query/opensearch_query_test.go index cc8627a98d4a8..2a1aced1e5418 100644 --- a/plugins/inputs/opensearch_query/opensearch_query_test.go +++ b/plugins/inputs/opensearch_query/opensearch_query_test.go @@ -12,13 +12,14 @@ import ( "time" "github.com/docker/go-connections/nat" + "github.com/opensearch-project/opensearch-go/v2/opensearchutil" + "github.com/stretchr/testify/require" + "github.com/testcontainers/testcontainers-go/wait" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/plugins/common/tls" "github.com/influxdata/telegraf/testutil" - "github.com/opensearch-project/opensearch-go/v2/opensearchutil" - "github.com/stretchr/testify/require" - "github.com/testcontainers/testcontainers-go/wait" ) const ( @@ -674,18 +675,18 @@ func TestOpensearchQueryIntegration(t *testing.T) { } func TestMetricAggregationMarshal(t *testing.T) { - agg := &MetricAggregationRequest{} - err := agg.AddAggregation("sum_taxful_total_price", "sum", "taxful_total_price") + agg := &metricAggregationRequest{} + err := agg.addAggregation("sum_taxful_total_price", "sum", "taxful_total_price") require.NoError(t, err) _, err = json.Marshal(agg) require.NoError(t, err) - bucket := &BucketAggregationRequest{} - err = bucket.AddAggregation("terms_by_currency", "terms", "currency") + bucket := &bucketAggregationRequest{} + err = bucket.addAggregation("terms_by_currency", "terms", "currency") require.NoError(t, err) - bucket.AddNestedAggregation("terms_by_currency", agg) + bucket.addNestedAggregation("terms_by_currency", agg) _, err = json.Marshal(bucket) require.NoError(t, err) } diff --git a/plugins/inputs/opensearch_query/query.go b/plugins/inputs/opensearch_query/query.go index 1a12aa5e3c594..e06f518f6c22b 100644 --- a/plugins/inputs/opensearch_query/query.go +++ b/plugins/inputs/opensearch_query/query.go @@ -5,13 +5,13 @@ import ( "time" ) -type Query struct { +type query struct { Size int `json:"size"` - Aggregations AggregationRequest `json:"aggregations"` + Aggregations aggregationRequest `json:"aggregations"` Query interface{} `json:"query,omitempty"` } -type BoolQuery struct { +type boolQuery struct { FilterQueryString string TimestampField string TimeRangeFrom time.Time @@ -19,7 +19,7 @@ type BoolQuery struct { DateFieldFormat string } -func (b *BoolQuery) MarshalJSON() ([]byte, error) { +func (b *boolQuery) MarshalJSON() ([]byte, error) { // Construct range dateTimeRange := map[string]interface{}{ "from": b.TimeRangeFrom, diff --git a/plugins/inputs/opensmtpd/opensmtpd.go b/plugins/inputs/opensmtpd/opensmtpd.go index 89f0f822d8c42..e3511e9a1f59f 100644 --- a/plugins/inputs/opensmtpd/opensmtpd.go +++ b/plugins/inputs/opensmtpd/opensmtpd.go @@ -21,49 +21,29 @@ import ( //go:embed sample.conf var sampleConfig string -type runner func(cmdName string, timeout config.Duration, useSudo bool) (*bytes.Buffer, error) +var ( + defaultBinary = "/usr/sbin/smtpctl" + defaultTimeout = config.Duration(time.Second) +) -// Opensmtpd is used to store configuration values type Opensmtpd struct { - Binary string - Timeout config.Duration - UseSudo bool + Binary string `toml:"binary"` + Timeout config.Duration `toml:"timeout"` + UseSudo bool `toml:"use_sudo"` run runner } -var defaultBinary = "/usr/sbin/smtpctl" -var defaultTimeout = config.Duration(time.Second) - -// Shell out to opensmtpd_stat and return the output -func opensmtpdRunner(cmdName string, timeout config.Duration, useSudo bool) (*bytes.Buffer, error) { - cmdArgs := []string{"show", "stats"} - - cmd := exec.Command(cmdName, cmdArgs...) - - if useSudo { - cmdArgs = append([]string{cmdName}, cmdArgs...) - cmd = exec.Command("sudo", cmdArgs...) - } - - var out bytes.Buffer - cmd.Stdout = &out - err := internal.RunTimeout(cmd, time.Duration(timeout)) - if err != nil { - return &out, fmt.Errorf("error running smtpctl: %w", err) - } - - return &out, nil -} +type runner func(cmdName string, timeout config.Duration, useSudo bool) (*bytes.Buffer, error) -// Gather collects the configured stats from smtpctl and adds them to the -// Accumulator func (*Opensmtpd) SampleConfig() string { return sampleConfig } -// All the dots in stat name will replaced by underscores. Histogram statistics will not be collected. func (s *Opensmtpd) Gather(acc telegraf.Accumulator) error { + // All the dots in stat name will be replaced by underscores. + // Histogram statistics will not be collected. + // Always exclude uptime.human statistics statExcluded := []string{"uptime.human"} filterExcluded, err := filter.Compile(statExcluded) @@ -108,6 +88,27 @@ func (s *Opensmtpd) Gather(acc telegraf.Accumulator) error { return nil } +// Shell out to opensmtpd_stat and return the output +func opensmtpdRunner(cmdName string, timeout config.Duration, useSudo bool) (*bytes.Buffer, error) { + cmdArgs := []string{"show", "stats"} + + cmd := exec.Command(cmdName, cmdArgs...) + + if useSudo { + cmdArgs = append([]string{cmdName}, cmdArgs...) + cmd = exec.Command("sudo", cmdArgs...) + } + + var out bytes.Buffer + cmd.Stdout = &out + err := internal.RunTimeout(cmd, time.Duration(timeout)) + if err != nil { + return &out, fmt.Errorf("error running smtpctl: %w", err) + } + + return &out, nil +} + func init() { inputs.Add("opensmtpd", func() telegraf.Input { return &Opensmtpd{ diff --git a/plugins/inputs/opensmtpd/opensmtpd_test.go b/plugins/inputs/opensmtpd/opensmtpd_test.go index 599bf500895f9..5d856f68b761c 100644 --- a/plugins/inputs/opensmtpd/opensmtpd_test.go +++ b/plugins/inputs/opensmtpd/opensmtpd_test.go @@ -10,7 +10,7 @@ import ( "github.com/influxdata/telegraf/testutil" ) -func SMTPCTL(output string) func(string, config.Duration, bool) (*bytes.Buffer, error) { +func smtpCTL(output string) func(string, config.Duration, bool) (*bytes.Buffer, error) { return func(string, config.Duration, bool) (*bytes.Buffer, error) { return bytes.NewBufferString(output), nil } @@ -19,7 +19,7 @@ func SMTPCTL(output string) func(string, config.Duration, bool) (*bytes.Buffer, func TestFilterSomeStats(t *testing.T) { acc := &testutil.Accumulator{} v := &Opensmtpd{ - run: SMTPCTL(fullOutput), + run: smtpCTL(fullOutput), } err := v.Gather(acc) diff --git a/plugins/inputs/openstack/openstack.go b/plugins/inputs/openstack/openstack.go index 4031ac2217711..881466a540e89 100644 --- a/plugins/inputs/openstack/openstack.go +++ b/plugins/inputs/openstack/openstack.go @@ -57,7 +57,6 @@ var ( typeStorage = regexp.MustCompile(`_errors$|_read$|_read_req$|_write$|_write_req$`) ) -// OpenStack is the main structure associated with a collection instance. type OpenStack struct { // Configuration variables IdentityEndpoint string `toml:"authentication_endpoint"` @@ -93,19 +92,10 @@ type OpenStack struct { services map[string]bool } -// convertTimeFormat, to convert time format based on HumanReadableTS -func (o *OpenStack) convertTimeFormat(t time.Time) interface{} { - if o.HumanReadableTS { - return t.Format("2006-01-02T15:04:05.999999999Z07:00") - } - return t.UnixNano() -} - func (*OpenStack) SampleConfig() string { return sampleConfig } -// initialize performs any necessary initialization functions func (o *OpenStack) Init() error { if len(o.EnabledServices) == 0 { o.EnabledServices = []string{"services", "projects", "hypervisors", "flavors", "networks", "volumes"} @@ -266,14 +256,6 @@ func (o *OpenStack) Start(telegraf.Accumulator) error { return nil } -func (o *OpenStack) Stop() { - if o.client != nil { - o.client.CloseIdleConnections() - } -} - -// Gather gathers resources from the OpenStack API and accumulates metrics. This -// implements the Input interface. func (o *OpenStack) Gather(acc telegraf.Accumulator) error { ctx := context.Background() callDuration := make(map[string]interface{}, len(o.services)) @@ -344,6 +326,12 @@ func (o *OpenStack) Gather(acc telegraf.Accumulator) error { return nil } +func (o *OpenStack) Stop() { + if o.client != nil { + o.client.CloseIdleConnections() + } +} + func (o *OpenStack) availableServicesFromAuth(provider *gophercloud.ProviderClient) (bool, error) { authResult := provider.GetAuthResult() if authResult == nil { @@ -1067,7 +1055,14 @@ func (o *OpenStack) gatherServerDiagnostics(ctx context.Context, acc telegraf.Ac return nil } -// init registers a callback which creates a new OpenStack input instance. +// convertTimeFormat, to convert time format based on HumanReadableTS +func (o *OpenStack) convertTimeFormat(t time.Time) interface{} { + if o.HumanReadableTS { + return t.Format("2006-01-02T15:04:05.999999999Z07:00") + } + return t.UnixNano() +} + func init() { inputs.Add("openstack", func() telegraf.Input { return &OpenStack{ diff --git a/plugins/inputs/opentelemetry/opentelemetry.go b/plugins/inputs/opentelemetry/opentelemetry.go index 6b6bd5a695877..3323569935728 100644 --- a/plugins/inputs/opentelemetry/opentelemetry.go +++ b/plugins/inputs/opentelemetry/opentelemetry.go @@ -46,10 +46,6 @@ func (*OpenTelemetry) SampleConfig() string { return sampleConfig } -func (*OpenTelemetry) Gather(telegraf.Accumulator) error { - return nil -} - func (o *OpenTelemetry) Init() error { if o.ServiceAddress == "" { o.ServiceAddress = "0.0.0.0:4317" @@ -123,6 +119,10 @@ func (o *OpenTelemetry) Start(acc telegraf.Accumulator) error { return nil } +func (*OpenTelemetry) Gather(telegraf.Accumulator) error { + return nil +} + func (o *OpenTelemetry) Stop() { if o.grpcServer != nil { o.grpcServer.Stop() diff --git a/plugins/inputs/opentelemetry/opentelemetry_test.go b/plugins/inputs/opentelemetry/opentelemetry_test.go index ae6b198f2da5b..751aaf1af3b12 100644 --- a/plugins/inputs/opentelemetry/opentelemetry_test.go +++ b/plugins/inputs/opentelemetry/opentelemetry_test.go @@ -12,6 +12,7 @@ import ( "time" "github.com/google/go-cmp/cmp" + "github.com/influxdata/influxdb-observability/otel2influx" "github.com/stretchr/testify/require" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc" "go.opentelemetry.io/otel/sdk/metric" @@ -24,7 +25,6 @@ import ( "google.golang.org/grpc/credentials/insecure" "google.golang.org/protobuf/encoding/protojson" - "github.com/influxdata/influxdb-observability/otel2influx" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/plugins/inputs" diff --git a/plugins/inputs/openweathermap/openweathermap.go b/plugins/inputs/openweathermap/openweathermap.go index 6543b113abcb2..4eb354474d150 100644 --- a/plugins/inputs/openweathermap/openweathermap.go +++ b/plugins/inputs/openweathermap/openweathermap.go @@ -174,7 +174,7 @@ func (n *OpenWeatherMap) gatherWeather(acc telegraf.Accumulator, city string) er return fmt.Errorf("querying %q failed: %w", addr, err) } - var e WeatherEntry + var e weatherEntry if err := json.Unmarshal(buf, &e); err != nil { return fmt.Errorf("parsing JSON response failed: %w", err) } @@ -223,7 +223,7 @@ func (n *OpenWeatherMap) gatherWeatherBatch(acc telegraf.Accumulator, cities str return fmt.Errorf("querying %q failed: %w", addr, err) } - var status Status + var status status if err := json.Unmarshal(buf, &status); err != nil { return fmt.Errorf("parsing JSON response failed: %w", err) } @@ -274,7 +274,7 @@ func (n *OpenWeatherMap) gatherForecast(acc telegraf.Accumulator, city string) e return fmt.Errorf("querying %q failed: %w", addr, err) } - var status Status + var status status if err := json.Unmarshal(buf, &status); err != nil { return fmt.Errorf("parsing JSON response failed: %w", err) } diff --git a/plugins/inputs/openweathermap/types.go b/plugins/inputs/openweathermap/types.go index 4920dd3f7acde..8fc170a472aa1 100644 --- a/plugins/inputs/openweathermap/types.go +++ b/plugins/inputs/openweathermap/types.go @@ -1,6 +1,6 @@ package openweathermap -type WeatherEntry struct { +type weatherEntry struct { Dt int64 `json:"dt"` Clouds struct { All int64 `json:"all"` @@ -43,21 +43,21 @@ type WeatherEntry struct { } `json:"weather"` } -func (e WeatherEntry) snow() float64 { +func (e weatherEntry) snow() float64 { if e.Snow.Snow1 > 0 { return e.Snow.Snow1 } return e.Snow.Snow3 } -func (e WeatherEntry) rain() float64 { +func (e weatherEntry) rain() float64 { if e.Rain.Rain1 > 0 { return e.Rain.Rain1 } return e.Rain.Rain3 } -type Status struct { +type status struct { City struct { Coord struct { Lat float64 `json:"lat"` @@ -67,5 +67,5 @@ type Status struct { ID int64 `json:"id"` Name string `json:"name"` } `json:"city"` - List []WeatherEntry `json:"list"` + List []weatherEntry `json:"list"` }