From 616542a848941842a9cd9079259f3b0070f90805 Mon Sep 17 00:00:00 2001 From: Sven Rebhan Date: Wed, 1 Nov 2023 15:38:03 +0100 Subject: [PATCH] feat(migrations): Add option migration for inputs.nats_consumer --- migrations/all/inputs.nats_consumer.go | 5 + migrations/inputs_nats_consumer/migration.go | 43 ++++++ .../inputs_nats_consumer/migration_test.go | 135 ++++++++++++++++++ .../deprecated_metric_buffer/expected.conf | 6 + .../deprecated_metric_buffer/telegraf.conf | 19 +++ .../expected.conf | 14 ++ .../telegraf.conf | 27 ++++ 7 files changed, 249 insertions(+) create mode 100644 migrations/all/inputs.nats_consumer.go create mode 100644 migrations/inputs_nats_consumer/migration.go create mode 100644 migrations/inputs_nats_consumer/migration_test.go create mode 100644 migrations/inputs_nats_consumer/testcases/deprecated_metric_buffer/expected.conf create mode 100644 migrations/inputs_nats_consumer/testcases/deprecated_metric_buffer/telegraf.conf create mode 100644 migrations/inputs_nats_consumer/testcases/deprecated_metric_buffer_parser/expected.conf create mode 100644 migrations/inputs_nats_consumer/testcases/deprecated_metric_buffer_parser/telegraf.conf diff --git a/migrations/all/inputs.nats_consumer.go b/migrations/all/inputs.nats_consumer.go new file mode 100644 index 0000000000000..e0ed769fb3014 --- /dev/null +++ b/migrations/all/inputs.nats_consumer.go @@ -0,0 +1,5 @@ +//go:build !custom || (migrations && (inputs || inputs.nats_consumer)) + +package all + +import _ "github.com/influxdata/telegraf/migrations/inputs_nats_consumer" // register migration diff --git a/migrations/inputs_nats_consumer/migration.go b/migrations/inputs_nats_consumer/migration.go new file mode 100644 index 0000000000000..77b1f4d3ed4ac --- /dev/null +++ b/migrations/inputs_nats_consumer/migration.go @@ -0,0 +1,43 @@ +package inputs_nats_consumer + +import ( + "github.com/influxdata/toml" + "github.com/influxdata/toml/ast" + + "github.com/influxdata/telegraf/migrations" +) + +// Migration function +func migrate(tbl *ast.Table) ([]byte, string, error) { + // Decode the old data structure + var plugin map[string]interface{} + if err := toml.UnmarshalTable(tbl, &plugin); err != nil { + return nil, "", err + } + + // Check for deprecated option(s) and migrate them + var applied bool + if _, found := plugin["metric_buffer"]; found { + applied = true + + // Remove the ignored setting + delete(plugin, "metric_buffer") + } + + // No options migrated so we can exit early + if !applied { + return nil, "", migrations.ErrNotApplicable + } + + // Create the corresponding plugin configurations + cfg := migrations.CreateTOMLStruct("inputs", "nats_consumer") + cfg.Add("inputs", "nats_consumer", plugin) + + output, err := toml.Marshal(cfg) + return output, "", err +} + +// Register the migration function for the plugin type +func init() { + migrations.AddPluginOptionMigration("inputs.nats_consumer", migrate) +} diff --git a/migrations/inputs_nats_consumer/migration_test.go b/migrations/inputs_nats_consumer/migration_test.go new file mode 100644 index 0000000000000..66ab428fd8b6b --- /dev/null +++ b/migrations/inputs_nats_consumer/migration_test.go @@ -0,0 +1,135 @@ +package inputs_nats_consumer_test + +import ( + "os" + "path/filepath" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/influxdata/telegraf/config" + _ "github.com/influxdata/telegraf/migrations/inputs_nats_consumer" // register migration + _ "github.com/influxdata/telegraf/plugins/inputs/nats_consumer" // register plugin + _ "github.com/influxdata/telegraf/plugins/parsers/all" // register parsers +) + +func TestNoMigration(t *testing.T) { + defaultCfg := []byte(` +# Read metrics from NATS subject(s) + [[inputs.nats_consumer]] + ## urls of NATS servers + servers = ["nats://localhost:4222"] + + ## subject(s) to consume + ## If you use jetstream you need to set the subjects + ## in jetstream_subjects + subjects = ["telegraf"] + + ## jetstream subjects + ## jetstream is a streaming technology inside of nats. + ## With jetstream the nats-server persists messages and + ## a consumer can consume historical messages. This is + ## useful when telegraf needs to restart it don't miss a + ## message. You need to configure the nats-server. + ## https://docs.nats.io/nats-concepts/jetstream. + jetstream_subjects = ["js_telegraf"] + + ## name a queue group + queue_group = "telegraf_consumers" + + ## Optional credentials + # username = "" + # password = "" + + ## Optional NATS 2.0 and NATS NGS compatible user credentials + # credentials = "/etc/telegraf/nats.creds" + + ## Use Transport Layer Security + # secure = false + + ## Optional TLS Config + # tls_ca = "/etc/telegraf/ca.pem" + # tls_cert = "/etc/telegraf/cert.pem" + # tls_key = "/etc/telegraf/key.pem" + ## Use TLS but skip chain & host verification + # insecure_skip_verify = false + + ## Sets the limits for pending msgs and bytes for each subscription + ## These shouldn't need to be adjusted except in very high throughput scenarios + # pending_message_limit = 65536 + # pending_bytes_limit = 67108864 + + ## Max undelivered messages + ## This plugin uses tracking metrics, which ensure messages are read to + ## outputs before acknowledging them to the original broker to ensure data + ## is not lost. This option sets the maximum messages to read from the + ## broker that have not been written by an output. + ## + ## This value needs to be picked with awareness of the agent's + ## metric_batch_size value as well. Setting max undelivered messages too high + ## can result in a constant stream of data batches to the output. While + ## setting it too low may never flush the broker's messages. + # max_undelivered_messages = 1000 + + ## Data format to consume. + ## Each data format has its own unique set of configuration options, read + ## more about them here: + ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md + data_format = "influx" +`) + + // Migrate and check that nothing changed + output, n, err := config.ApplyMigrations(defaultCfg) + require.NoError(t, err) + require.NotEmpty(t, output) + require.Zero(t, n) + require.Equal(t, string(defaultCfg), string(output)) +} + +func TestCases(t *testing.T) { + // Get all directories in testdata + folders, err := os.ReadDir("testcases") + require.NoError(t, err) + + for _, f := range folders { + // Only handle folders + if !f.IsDir() { + continue + } + + t.Run(f.Name(), func(t *testing.T) { + testcasePath := filepath.Join("testcases", f.Name()) + inputFile := filepath.Join(testcasePath, "telegraf.conf") + expectedFile := filepath.Join(testcasePath, "expected.conf") + + // Read the expected output + expected := config.NewConfig() + require.NoError(t, expected.LoadConfig(expectedFile)) + require.NotEmpty(t, expected.Inputs) + + // Read the input data + input, remote, err := config.LoadConfigFile(inputFile) + require.NoError(t, err) + require.False(t, remote) + require.NotEmpty(t, input) + + // Migrate + output, n, err := config.ApplyMigrations(input) + require.NoError(t, err) + require.NotEmpty(t, output) + require.GreaterOrEqual(t, n, uint64(1)) + actual := config.NewConfig() + require.NoError(t, actual.LoadConfigData(output)) + + // Test the output + require.Len(t, actual.Inputs, len(expected.Inputs)) + actualIDs := make([]string, 0, len(expected.Inputs)) + expectedIDs := make([]string, 0, len(expected.Inputs)) + for i := range actual.Inputs { + actualIDs = append(actualIDs, actual.Inputs[i].ID()) + expectedIDs = append(expectedIDs, expected.Inputs[i].ID()) + } + require.ElementsMatch(t, expectedIDs, actualIDs, string(output)) + }) + } +} diff --git a/migrations/inputs_nats_consumer/testcases/deprecated_metric_buffer/expected.conf b/migrations/inputs_nats_consumer/testcases/deprecated_metric_buffer/expected.conf new file mode 100644 index 0000000000000..13465564cb7fd --- /dev/null +++ b/migrations/inputs_nats_consumer/testcases/deprecated_metric_buffer/expected.conf @@ -0,0 +1,6 @@ +[[inputs.nats_consumer]] +servers = ["nats://localhost:4222"] +subjects = ["telegraf"] +jetstream_subjects = ["js_telegraf"] +queue_group = "telegraf_consumers" +data_format = "influx" diff --git a/migrations/inputs_nats_consumer/testcases/deprecated_metric_buffer/telegraf.conf b/migrations/inputs_nats_consumer/testcases/deprecated_metric_buffer/telegraf.conf new file mode 100644 index 0000000000000..edccdc6dc496f --- /dev/null +++ b/migrations/inputs_nats_consumer/testcases/deprecated_metric_buffer/telegraf.conf @@ -0,0 +1,19 @@ +# Read metrics from NATS subject(s) +[[inputs.nats_consumer]] + ## urls of NATS servers + servers = ["nats://localhost:4222"] + + ## subject(s) to consume + subjects = ["telegraf"] + + ## jetstream subjects + jetstream_subjects = ["js_telegraf"] + + ## name a queue group + queue_group = "telegraf_consumers" + + ## Input data format + data_format = "influx" + + ## Number of metrics to buffer + metric_buffer = 1024 diff --git a/migrations/inputs_nats_consumer/testcases/deprecated_metric_buffer_parser/expected.conf b/migrations/inputs_nats_consumer/testcases/deprecated_metric_buffer_parser/expected.conf new file mode 100644 index 0000000000000..1e001ae76271e --- /dev/null +++ b/migrations/inputs_nats_consumer/testcases/deprecated_metric_buffer_parser/expected.conf @@ -0,0 +1,14 @@ +[[inputs.nats_consumer]] + servers = ["nats://localhost:4222"] + subjects = ["telegraf"] + jetstream_subjects = ["js_telegraf"] + queue_group = "telegraf_consumers" + data_format = "xpath_json" + xpath_native_types = true + + [[inputs.nats_consumer.xpath]] + metric_name = "/name" + timestamp = "/timestamp" + timestamp_format = "unix_ms" + field_selection = "/fields/*" + tag_selection = "/tags/*" \ No newline at end of file diff --git a/migrations/inputs_nats_consumer/testcases/deprecated_metric_buffer_parser/telegraf.conf b/migrations/inputs_nats_consumer/testcases/deprecated_metric_buffer_parser/telegraf.conf new file mode 100644 index 0000000000000..6655049bdc251 --- /dev/null +++ b/migrations/inputs_nats_consumer/testcases/deprecated_metric_buffer_parser/telegraf.conf @@ -0,0 +1,27 @@ +# Read metrics from NATS subject(s) +[[inputs.nats_consumer]] + ## urls of NATS servers + servers = ["nats://localhost:4222"] + + ## subject(s) to consume + subjects = ["telegraf"] + + ## jetstream subjects + jetstream_subjects = ["js_telegraf"] + + ## name a queue group + queue_group = "telegraf_consumers" + + ## Number of metrics to buffer + metric_buffer = 1024 + + data_format = "xpath_json" + xpath_native_types = true + + # Configuration matching the first (ENERGY) message + [[inputs.nats_consumer.xpath]] + metric_name = "/name" + timestamp = "/timestamp" + timestamp_format = "unix_ms" + field_selection = "/fields/*" + tag_selection = "/tags/*" \ No newline at end of file