Skip to content

Commit

Permalink
Merge branch 'influxdata:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
izekr authored Dec 19, 2024
2 parents 0e6e3ad + d829a5b commit 944ff9f
Show file tree
Hide file tree
Showing 185 changed files with 1,951 additions and 1,896 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/linter.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ jobs:
# Run Linter against code base #
################################
- name: Lint Code Base
uses: super-linter/[email protected].0
uses: super-linter/[email protected].1
env:
VALIDATE_ALL_CODEBASE: false
DEFAULT_BRANCH: master
Expand Down
3 changes: 3 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,9 @@ linters-settings:
- blank-import
- bool-compare
- compares
- contains
- empty
- encoded-compare
- error-is-as
- error-nil
- expected-actual
Expand All @@ -345,6 +347,7 @@ linters-settings:
- len
- negative-positive
- nil-compare
- regexp
- require-error
- suite-broken-parallel
- suite-dont-use-pkg
Expand Down
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,16 @@
<!-- markdownlint-disable MD024 -->
# Changelog

## Unreleased

### Important Changes

- The default value of `skip_processors_after_aggregators` will change to `true`
with Telegraf `v1.40.0`, skip running the processors again after aggregators!
If you need the current default behavior, please explicitly set the option to
`false`! To silence the warning and use the future default behavior, please
explicitly set the option to `true`.

## v1.33.0 [2024-12-09]

### New Plugins
Expand Down
44 changes: 37 additions & 7 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"sync"
"time"

"github.com/fatih/color"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal"
Expand Down Expand Up @@ -106,6 +107,15 @@ func (a *Agent) Run(ctx context.Context) error {
time.Duration(a.Config.Agent.Interval), a.Config.Agent.Quiet,
a.Config.Agent.Hostname, time.Duration(a.Config.Agent.FlushInterval))

// Set the default for processor skipping
if a.Config.Agent.SkipProcessorsAfterAggregators == nil {
msg := `The default value of 'skip_processors_after_aggregators' will change to 'true' with Telegraf v1.40.0! `
msg += `If you need the current default behavior, please explicitly set the option to 'false'!`
log.Print("W! [agent] ", color.YellowString(msg))
skipProcessorsAfterAggregators := false
a.Config.Agent.SkipProcessorsAfterAggregators = &skipProcessorsAfterAggregators
}

log.Printf("D! [agent] Initializing plugins")
if err := a.InitPlugins(); err != nil {
return err
Expand Down Expand Up @@ -136,7 +146,7 @@ func (a *Agent) Run(ctx context.Context) error {
var au *aggregatorUnit
if len(a.Config.Aggregators) != 0 {
aggC := next
if len(a.Config.AggProcessors) != 0 && !a.Config.Agent.SkipProcessorsAfterAggregators {
if len(a.Config.AggProcessors) != 0 && !*a.Config.Agent.SkipProcessorsAfterAggregators {
aggC, apu, err = a.startProcessors(next, a.Config.AggProcessors)
if err != nil {
return err
Expand Down Expand Up @@ -231,10 +241,12 @@ func (a *Agent) InitPlugins() error {
return fmt.Errorf("could not initialize aggregator %s: %w", aggregator.LogName(), err)
}
}
for _, processor := range a.Config.AggProcessors {
err := processor.Init()
if err != nil {
return fmt.Errorf("could not initialize processor %s: %w", processor.LogName(), err)
if !*a.Config.Agent.SkipProcessorsAfterAggregators {
for _, processor := range a.Config.AggProcessors {
err := processor.Init()
if err != nil {
return fmt.Errorf("could not initialize processor %s: %w", processor.LogName(), err)
}
}
}
for _, output := range a.Config.Outputs {
Expand Down Expand Up @@ -998,6 +1010,15 @@ func (a *Agent) Test(ctx context.Context, wait time.Duration) error {
// outputC. After gathering pauses for the wait duration to allow service
// inputs to run.
func (a *Agent) runTest(ctx context.Context, wait time.Duration, outputC chan<- telegraf.Metric) error {
// Set the default for processor skipping
if a.Config.Agent.SkipProcessorsAfterAggregators == nil {
msg := `The default value of 'skip_processors_after_aggregators' will change to 'true' with Telegraf v1.40.0! `
msg += `If you need the current default behavior, please explicitly set the option to 'false'!`
log.Print("W! [agent] ", color.YellowString(msg))
skipProcessorsAfterAggregators := false
a.Config.Agent.SkipProcessorsAfterAggregators = &skipProcessorsAfterAggregators
}

log.Printf("D! [agent] Initializing plugins")
if err := a.InitPlugins(); err != nil {
return err
Expand All @@ -1011,7 +1032,7 @@ func (a *Agent) runTest(ctx context.Context, wait time.Duration, outputC chan<-
var au *aggregatorUnit
if len(a.Config.Aggregators) != 0 {
procC := next
if len(a.Config.AggProcessors) != 0 && !a.Config.Agent.SkipProcessorsAfterAggregators {
if len(a.Config.AggProcessors) != 0 && !*a.Config.Agent.SkipProcessorsAfterAggregators {
var err error
procC, apu, err = a.startProcessors(next, a.Config.AggProcessors)
if err != nil {
Expand Down Expand Up @@ -1094,6 +1115,15 @@ func (a *Agent) Once(ctx context.Context, wait time.Duration) error {
// outputC. After gathering pauses for the wait duration to allow service
// inputs to run.
func (a *Agent) runOnce(ctx context.Context, wait time.Duration) error {
// Set the default for processor skipping
if a.Config.Agent.SkipProcessorsAfterAggregators == nil {
msg := `The default value of 'skip_processors_after_aggregators' will change to 'true' with Telegraf v1.40.0! `
msg += `If you need the current default behavior, please explicitly set the option to 'false'!`
log.Print("W! [agent] ", color.YellowString(msg))
skipProcessorsAfterAggregators := false
a.Config.Agent.SkipProcessorsAfterAggregators = &skipProcessorsAfterAggregators
}

log.Printf("D! [agent] Initializing plugins")
if err := a.InitPlugins(); err != nil {
return err
Expand All @@ -1111,7 +1141,7 @@ func (a *Agent) runOnce(ctx context.Context, wait time.Duration) error {
var au *aggregatorUnit
if len(a.Config.Aggregators) != 0 {
procC := next
if len(a.Config.AggProcessors) != 0 && !a.Config.Agent.SkipProcessorsAfterAggregators {
if len(a.Config.AggProcessors) != 0 && !*a.Config.Agent.SkipProcessorsAfterAggregators {
procC, apu, err = a.startProcessors(next, a.Config.AggProcessors)
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ type AgentConfig struct {
// Flag to skip running processors after aggregators
// By default, processors are run a second time after aggregators. Changing
// this setting to true will skip the second run of processors.
SkipProcessorsAfterAggregators bool `toml:"skip_processors_after_aggregators"`
SkipProcessorsAfterAggregators *bool `toml:"skip_processors_after_aggregators"`

// Number of attempts to obtain a remote configuration via a URL during
// startup. Set to -1 for unlimited attempts.
Expand Down
13 changes: 13 additions & 0 deletions docs/specs/tsd-006-startup-error-behavior.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,19 @@ must *not* fail on startup errors and should continue running. On startup error,
Telegraf must ignore the plugin as-if it was not configured at all, i.e. the
plugin must be completely removed from processing.

### `probe` behavior

When using the `probe` setting for the `startup_error_behavior` option Telegraf
must *not* fail on startup errors and should continue running. On startup error,
Telegraf must ignore the plugin as-if it was not configured at all, i.e. the
plugin must be completely removed from processing, similar to the `ignore`
behavior. Additionally, Telegraf must probe the plugin (as defined in
[TSD-009][tsd_009]) after startup, if it implements the `ProbePlugin` interface.
If probing is available *and* returns an error Telegraf must *ignore* the
plugin as-if it was not configured at all.

[tsd_009]: /docs/specs/tsd-009-probe-on-startup.md

## Plugin Requirements

Plugins participating in handling startup errors must implement the `Start()`
Expand Down
68 changes: 68 additions & 0 deletions docs/specs/tsd-009-probe-on-startup.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
# Probing plugins after startup

## Objective

Allow Telegraf to probe plugins during startup to enable enhanced plugin error
detection like availability of hardware or services

## Keywords

inputs, outputs, startup, probe, error, ignore, behavior

## Overview

When plugins are first instantiated, Telegraf will call the plugin's `Start()`
method (for inputs) or `Connect()` (for outputs) which will initialize its
configuration based off of config options and the running environment. It is
sometimes the case that while the initialization step succeeds, the upstream
service in which the plugin relies on is not actually running, or is not capable
of being communicated with due to incorrect configuration or environmental
problems. In situations like this, Telegraf does not detect that the plugin's
upstream service is not functioning properly, and thus it will continually call
the plugin during each `Gather()` iteration. This often has the effect of
polluting journald and system logs with voluminous error messages, which creates
issues for system administrators who rely on such logs to identify other
unrelated system problems.

More background discussion on this option, including other possible avenues, can
be viewed [here](https://github.com/influxdata/telegraf/issues/16028).

## Probing

Probing is an action whereby the plugin should ensure that the plugin will be
fully functional on a best effort basis. This may comprise communicating with
its external service, trying to access required devices, entities or executables
etc to ensure that the plugin will not produce errors during e.g. data collection
or data output. Probing must *not* produce, process or output any metrics.

Plugins that support probing must implement the `ProbePlugin` interface. Such
plugins must behave in the following manner:

1. Return an error if the external dependencies (hardware, services,
executables, etc.) of the plugin are not available.
2. Return an error if information cannot be gathered (in the case of inputs) or
sent (in the case of outputs) due to unrecoverable issues. For example, invalid
authentication, missing permissions, or non-existent endpoints.
3. Otherwise, return `nil` indicating the plugin will be fully functional.

## Plugin Requirements

Plugins that allow probing must implement the `ProbePlugin` interface. The
exact implementation depends on the plugin's functionality and requirements,
but generally it should take the same actions as it would during normal operation
e.g. calling `Gather()` or `Write()` and check if errors occur. If probing fails,
it must be safe to call the plugin's `Close()` method.

Input plugins must *not* produce metrics, output plugins must *not* send any
metrics to the service. Plugins must *not* influence the later data processing or
collection by modifying the internal state of the plugin or the external state of the
service or hardware. For example, file-offsets or other service states must be
reset to not lose data during the first gather or write cycle.

Plugins must return `nil` upon successful probing or an error otherwise.

## Related Issues

- [#16028](https://github.com/influxdata/telegraf/issues/16028)
- [#15916](https://github.com/influxdata/telegraf/pull/15916)
- [#16001](https://github.com/influxdata/telegraf/pull/16001)
30 changes: 15 additions & 15 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ require (
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armresources v1.2.0
github.com/Azure/azure-storage-queue-go v0.0.0-20230531184854-c06a8eff66fe
github.com/Azure/go-autorest/autorest v0.11.29
github.com/Azure/go-autorest/autorest/adal v0.9.23
github.com/Azure/go-autorest/autorest/adal v0.9.24
github.com/Azure/go-autorest/autorest/azure/auth v0.5.13
github.com/BurntSushi/toml v1.4.0
github.com/ClickHouse/clickhouse-go v1.5.4
github.com/DATA-DOG/go-sqlmock v1.5.2
github.com/IBM/nzgo/v12 v12.0.9-0.20231115043259-49c27f2dfe48
github.com/IBM/nzgo/v12 v12.0.9
github.com/IBM/sarama v1.43.3
github.com/Masterminds/semver/v3 v3.3.0
github.com/Masterminds/sprig v2.22.0+incompatible
Expand Down Expand Up @@ -80,7 +80,7 @@ require (
github.com/eclipse/paho.golang v0.21.0
github.com/eclipse/paho.mqtt.golang v1.5.0
github.com/facebook/time v0.0.0-20240626113945-18207c5d8ddc
github.com/fatih/color v1.17.0
github.com/fatih/color v1.18.0
github.com/go-ldap/ldap/v3 v3.4.8
github.com/go-logfmt/logfmt v0.6.0
github.com/go-ole/go-ole v1.3.0
Expand All @@ -89,7 +89,6 @@ require (
github.com/go-sql-driver/mysql v1.8.1
github.com/go-stomp/stomp v2.1.4+incompatible
github.com/gobwas/glob v0.2.3
github.com/godbus/dbus/v5 v5.1.0
github.com/gofrs/uuid/v5 v5.3.0
github.com/golang-jwt/jwt/v5 v5.2.1
github.com/golang/geo v0.0.0-20190916061304-5b978397cfec
Expand Down Expand Up @@ -165,7 +164,7 @@ require (
github.com/prometheus-community/pro-bing v0.4.1
github.com/prometheus/client_golang v1.20.5
github.com/prometheus/client_model v0.6.1
github.com/prometheus/common v0.60.0
github.com/prometheus/common v0.61.0
github.com/prometheus/procfs v0.15.1
github.com/prometheus/prometheus v0.54.1
github.com/rabbitmq/amqp091-go v1.10.0
Expand All @@ -187,7 +186,7 @@ require (
github.com/snowflakedb/gosnowflake v1.11.2
github.com/srebhan/cborquery v1.0.1
github.com/srebhan/protobufquery v1.0.1
github.com/stretchr/testify v1.9.0
github.com/stretchr/testify v1.10.0
github.com/tbrandon/mbserver v0.0.0-20170611213546-993e1772cc62
github.com/testcontainers/testcontainers-go v0.34.0
github.com/testcontainers/testcontainers-go/modules/kafka v0.34.0
Expand All @@ -196,7 +195,7 @@ require (
github.com/tidwall/wal v1.1.7
github.com/tinylib/msgp v1.2.0
github.com/urfave/cli/v2 v2.27.2
github.com/vapourismo/knx-go v0.0.0-20240217175130-922a0d50c241
github.com/vapourismo/knx-go v0.0.0-20240915133544-a6ab43471c11
github.com/vishvananda/netlink v1.3.0
github.com/vishvananda/netns v0.0.5
github.com/vjeantet/grok v1.0.1
Expand All @@ -212,20 +211,20 @@ require (
go.opentelemetry.io/proto/otlp v1.3.1
go.starlark.net v0.0.0-20240925182052-1207426daebd
go.step.sm/crypto v0.54.0
golang.org/x/crypto v0.29.0
golang.org/x/crypto v0.31.0
golang.org/x/mod v0.21.0
golang.org/x/net v0.31.0
golang.org/x/oauth2 v0.23.0
golang.org/x/sync v0.9.0
golang.org/x/sys v0.27.0
golang.org/x/term v0.26.0
golang.org/x/text v0.20.0
golang.org/x/net v0.32.0
golang.org/x/oauth2 v0.24.0
golang.org/x/sync v0.10.0
golang.org/x/sys v0.28.0
golang.org/x/term v0.27.0
golang.org/x/text v0.21.0
golang.zx2c4.com/wireguard/wgctrl v0.0.0-20211230205640-daad0b7ba671
gonum.org/v1/gonum v0.15.1
google.golang.org/api v0.203.0
google.golang.org/genproto/googleapis/api v0.0.0-20241007155032-5fefd90f89a9
google.golang.org/grpc v1.68.0
google.golang.org/protobuf v1.35.1
google.golang.org/protobuf v1.35.2
gopkg.in/gorethink/gorethink.v3 v3.0.5
gopkg.in/olivere/elastic.v5 v5.0.86
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7
Expand Down Expand Up @@ -348,6 +347,7 @@ require (
github.com/goburrow/serial v0.1.1-0.20211022031912-bfb69110f8dd // indirect
github.com/goccy/go-json v0.10.3 // indirect
github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2 // indirect
github.com/godbus/dbus/v5 v5.1.0 // indirect
github.com/gofrs/uuid v4.4.0+incompatible // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang-jwt/jwt/v4 v4.5.1 // indirect
Expand Down
Loading

0 comments on commit 944ff9f

Please sign in to comment.