diff --git a/migrations/all/inputs_udp_listener.go b/migrations/all/inputs_udp_listener.go new file mode 100644 index 0000000000000..12d03e09e6514 --- /dev/null +++ b/migrations/all/inputs_udp_listener.go @@ -0,0 +1,5 @@ +//go:build !custom || (migrations && (inputs || inputs.udp_listener)) + +package all + +import _ "github.com/influxdata/telegraf/migrations/inputs_udp_listener" // register migration diff --git a/migrations/inputs_udp_listener/README.md b/migrations/inputs_udp_listener/README.md new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/migrations/inputs_udp_listener/migration.go b/migrations/inputs_udp_listener/migration.go new file mode 100644 index 0000000000000..ce74e3aaaafd1 --- /dev/null +++ b/migrations/inputs_udp_listener/migration.go @@ -0,0 +1,73 @@ +package inputs_udp_listener + +import ( + "fmt" + + "github.com/influxdata/toml" + "github.com/influxdata/toml/ast" + + "github.com/influxdata/telegraf/migrations" +) + +const allowPendingMessagesMsg = ` + Replacement 'inputs.socket_listener' does not allow to configure + 'allowed_pending_messages' and thus the setting will be dropped. +` + +const udpPacketSizeMsg = ` + The deprecated 'udp_buffer_size' setting will be dropped. +` + +// Define "old" data structure +type udpListener map[string]interface{} + +// Migration function +func migrate(tbl *ast.Table) ([]byte, string, error) { + // Decode the old data structure + var old udpListener + if err := toml.UnmarshalTable(tbl, &old); err != nil { + return nil, "", err + } + + // Copy the setting except the special plugin ones to preserve + // all parser settings of the existing (deprecated) config. + var msg string + plugin := make(map[string]interface{}, len(old)) + for k, v := range old { + switch k { + case "service_address": + addr, ok := v.(string) + if !ok { + return nil, "", fmt.Errorf("service_address is not a string but %T", v) + } + plugin["service_address"] = "udp://" + addr + case "allowed_pending_messages": + msg += allowPendingMessagesMsg + case "udp_packet_size": + msg += udpPacketSizeMsg + case "udp_buffer_size": + plugin["read_buffer_size"] = v + default: + plugin[k] = v + } + } + + // Create the corresponding metric configurations + cfg := migrations.CreateTOMLStruct("inputs", "socket_listener") + cfg.Add("inputs", "socket_listener", plugin) + + // Marshal the new configuration + buf, err := toml.Marshal(cfg) + if err != nil { + return nil, "", err + } + buf = append(buf, []byte("\n")...) + + // Create the new content to output + return buf, msg, nil +} + +// Register the migration function for the plugin type +func init() { + migrations.AddPluginMigration("inputs.udp_listener", migrate) +} diff --git a/migrations/inputs_udp_listener/migration_test.go b/migrations/inputs_udp_listener/migration_test.go new file mode 100644 index 0000000000000..368b8ff17ed37 --- /dev/null +++ b/migrations/inputs_udp_listener/migration_test.go @@ -0,0 +1,62 @@ +package inputs_udp_listener_test + +import ( + "os" + "path/filepath" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/influxdata/telegraf/config" + _ "github.com/influxdata/telegraf/migrations/inputs_udp_listener" // register migration + _ "github.com/influxdata/telegraf/plugins/inputs/socket_listener" // register plugin + _ "github.com/influxdata/telegraf/plugins/parsers/all" // register parsers +) + +func TestCases(t *testing.T) { + // Get all directories in testdata + folders, err := os.ReadDir("testcases") + require.NoError(t, err) + + for _, f := range folders { + // Only handle folders + if !f.IsDir() { + continue + } + + t.Run(f.Name(), func(t *testing.T) { + testcasePath := filepath.Join("testcases", f.Name()) + inputFile := filepath.Join(testcasePath, "telegraf.conf") + expectedFile := filepath.Join(testcasePath, "expected.conf") + + // Read the expected output + expected := config.NewConfig() + require.NoError(t, expected.LoadConfig(expectedFile)) + require.NotEmpty(t, expected.Inputs) + + // Read the input data + input, remote, err := config.LoadConfigFile(inputFile) + require.NoError(t, err) + require.False(t, remote) + require.NotEmpty(t, input) + + // Migrate + output, n, err := config.ApplyMigrations(input) + require.NoError(t, err) + require.NotEmpty(t, output) + require.GreaterOrEqual(t, n, uint64(1)) + actual := config.NewConfig() + require.NoError(t, actual.LoadConfigData(output)) + + // Test the output + require.Len(t, actual.Inputs, len(expected.Inputs)) + actualIDs := make([]string, 0, len(expected.Inputs)) + expectedIDs := make([]string, 0, len(expected.Inputs)) + for i := range actual.Inputs { + actualIDs = append(actualIDs, actual.Inputs[i].ID()) + expectedIDs = append(expectedIDs, expected.Inputs[i].ID()) + } + require.ElementsMatch(t, expectedIDs, actualIDs) + }) + } +} diff --git a/migrations/inputs_udp_listener/testcases/all_deprecated_messages/expected.conf b/migrations/inputs_udp_listener/testcases/all_deprecated_messages/expected.conf new file mode 100644 index 0000000000000..cc37b52b2d880 --- /dev/null +++ b/migrations/inputs_udp_listener/testcases/all_deprecated_messages/expected.conf @@ -0,0 +1,3 @@ +[[inputs.socket_listener]] +service_address = "udp://127.0.0.1:8000" +data_format = "influx" \ No newline at end of file diff --git a/migrations/inputs_udp_listener/testcases/all_deprecated_messages/telegraf.conf b/migrations/inputs_udp_listener/testcases/all_deprecated_messages/telegraf.conf new file mode 100644 index 0000000000000..dd7710e754cfc --- /dev/null +++ b/migrations/inputs_udp_listener/testcases/all_deprecated_messages/telegraf.conf @@ -0,0 +1,5 @@ +[[inputs.udp_listener]] + service_address = "127.0.0.1:8000" + allowed_pending_messages = 1000 + udp_packet_size = 1024 + data_format = "influx" \ No newline at end of file diff --git a/migrations/inputs_udp_listener/testcases/allow_pending_messages/expected.conf b/migrations/inputs_udp_listener/testcases/allow_pending_messages/expected.conf new file mode 100644 index 0000000000000..cc37b52b2d880 --- /dev/null +++ b/migrations/inputs_udp_listener/testcases/allow_pending_messages/expected.conf @@ -0,0 +1,3 @@ +[[inputs.socket_listener]] +service_address = "udp://127.0.0.1:8000" +data_format = "influx" \ No newline at end of file diff --git a/migrations/inputs_udp_listener/testcases/allow_pending_messages/telegraf.conf b/migrations/inputs_udp_listener/testcases/allow_pending_messages/telegraf.conf new file mode 100644 index 0000000000000..c4185d16f8357 --- /dev/null +++ b/migrations/inputs_udp_listener/testcases/allow_pending_messages/telegraf.conf @@ -0,0 +1,5 @@ +[[inputs.udp_listener]] + service_address = "127.0.0.1:8000" + allowed_pending_messages = 1000 + + data_format = "influx" \ No newline at end of file diff --git a/migrations/inputs_udp_listener/testcases/parser/expected.conf b/migrations/inputs_udp_listener/testcases/parser/expected.conf new file mode 100644 index 0000000000000..74ba526fbbd8f --- /dev/null +++ b/migrations/inputs_udp_listener/testcases/parser/expected.conf @@ -0,0 +1,10 @@ +[[inputs.socket_listener]] +service_address = "udp://127.0.0.1:8000" +data_format = "xpath_json" +xpath_native_types = true +[[inputs.socket_listener.xpath]] +metric_name = "/name" +timestamp = "/timestamp" +timestamp_format = "unix_ms" +field_selection = "/fields/*" +tag_selection = "/tags/*" \ No newline at end of file diff --git a/migrations/inputs_udp_listener/testcases/parser/telegraf.conf b/migrations/inputs_udp_listener/testcases/parser/telegraf.conf new file mode 100644 index 0000000000000..dedb77fc3baf3 --- /dev/null +++ b/migrations/inputs_udp_listener/testcases/parser/telegraf.conf @@ -0,0 +1,13 @@ +[[inputs.udp_listener]] + service_address = "127.0.0.1:8000" + + data_format = "xpath_json" + xpath_native_types = true + + # Configuration matching the first (ENERGY) message + [[inputs.udp_listener.xpath]] + metric_name = "/name" + timestamp = "/timestamp" + timestamp_format = "unix_ms" + field_selection = "/fields/*" + tag_selection = "/tags/*" \ No newline at end of file diff --git a/migrations/inputs_udp_listener/testcases/simple/expected.conf b/migrations/inputs_udp_listener/testcases/simple/expected.conf new file mode 100644 index 0000000000000..cc37b52b2d880 --- /dev/null +++ b/migrations/inputs_udp_listener/testcases/simple/expected.conf @@ -0,0 +1,3 @@ +[[inputs.socket_listener]] +service_address = "udp://127.0.0.1:8000" +data_format = "influx" \ No newline at end of file diff --git a/migrations/inputs_udp_listener/testcases/simple/telegraf.conf b/migrations/inputs_udp_listener/testcases/simple/telegraf.conf new file mode 100644 index 0000000000000..7737c09b6299e --- /dev/null +++ b/migrations/inputs_udp_listener/testcases/simple/telegraf.conf @@ -0,0 +1,4 @@ +[[inputs.udp_listener]] + service_address = "127.0.0.1:8000" + + data_format = "influx" \ No newline at end of file