-
Notifications
You must be signed in to change notification settings - Fork 5.6k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(migrations): Add option migration for inputs.nats_consumer
- Loading branch information
Showing
7 changed files
with
249 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
//go:build !custom || (migrations && (inputs || inputs.nats_consumer)) | ||
|
||
package all | ||
|
||
import _ "github.com/influxdata/telegraf/migrations/inputs_nats_consumer" // register migration |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
package inputs_nats_consumer | ||
|
||
import ( | ||
"github.com/influxdata/toml" | ||
"github.com/influxdata/toml/ast" | ||
|
||
"github.com/influxdata/telegraf/migrations" | ||
) | ||
|
||
// Migration function | ||
func migrate(tbl *ast.Table) ([]byte, string, error) { | ||
// Decode the old data structure | ||
var plugin map[string]interface{} | ||
if err := toml.UnmarshalTable(tbl, &plugin); err != nil { | ||
return nil, "", err | ||
} | ||
|
||
// Check for deprecated option(s) and migrate them | ||
var applied bool | ||
if _, found := plugin["metric_buffer"]; found { | ||
applied = true | ||
|
||
// Remove the ignored setting | ||
delete(plugin, "metric_buffer") | ||
} | ||
|
||
// No options migrated so we can exit early | ||
if !applied { | ||
return nil, "", migrations.ErrNotApplicable | ||
} | ||
|
||
// Create the corresponding plugin configurations | ||
cfg := migrations.CreateTOMLStruct("inputs", "nats_consumer") | ||
cfg.Add("inputs", "nats_consumer", plugin) | ||
|
||
output, err := toml.Marshal(cfg) | ||
return output, "", err | ||
} | ||
|
||
// Register the migration function for the plugin type | ||
func init() { | ||
migrations.AddPluginOptionMigration("inputs.nats_consumer", migrate) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,135 @@ | ||
package inputs_nats_consumer_test | ||
|
||
import ( | ||
"os" | ||
"path/filepath" | ||
"testing" | ||
|
||
"github.com/stretchr/testify/require" | ||
|
||
"github.com/influxdata/telegraf/config" | ||
_ "github.com/influxdata/telegraf/migrations/inputs_nats_consumer" // register migration | ||
_ "github.com/influxdata/telegraf/plugins/inputs/nats_consumer" // register plugin | ||
_ "github.com/influxdata/telegraf/plugins/parsers/all" // register parsers | ||
) | ||
|
||
func TestNoMigration(t *testing.T) { | ||
defaultCfg := []byte(` | ||
# Read metrics from NATS subject(s) | ||
[[inputs.nats_consumer]] | ||
## urls of NATS servers | ||
servers = ["nats://localhost:4222"] | ||
## subject(s) to consume | ||
## If you use jetstream you need to set the subjects | ||
## in jetstream_subjects | ||
subjects = ["telegraf"] | ||
## jetstream subjects | ||
## jetstream is a streaming technology inside of nats. | ||
## With jetstream the nats-server persists messages and | ||
## a consumer can consume historical messages. This is | ||
## useful when telegraf needs to restart it don't miss a | ||
## message. You need to configure the nats-server. | ||
## https://docs.nats.io/nats-concepts/jetstream. | ||
jetstream_subjects = ["js_telegraf"] | ||
## name a queue group | ||
queue_group = "telegraf_consumers" | ||
## Optional credentials | ||
# username = "" | ||
# password = "" | ||
## Optional NATS 2.0 and NATS NGS compatible user credentials | ||
# credentials = "/etc/telegraf/nats.creds" | ||
## Use Transport Layer Security | ||
# secure = false | ||
## Optional TLS Config | ||
# tls_ca = "/etc/telegraf/ca.pem" | ||
# tls_cert = "/etc/telegraf/cert.pem" | ||
# tls_key = "/etc/telegraf/key.pem" | ||
## Use TLS but skip chain & host verification | ||
# insecure_skip_verify = false | ||
## Sets the limits for pending msgs and bytes for each subscription | ||
## These shouldn't need to be adjusted except in very high throughput scenarios | ||
# pending_message_limit = 65536 | ||
# pending_bytes_limit = 67108864 | ||
## Max undelivered messages | ||
## This plugin uses tracking metrics, which ensure messages are read to | ||
## outputs before acknowledging them to the original broker to ensure data | ||
## is not lost. This option sets the maximum messages to read from the | ||
## broker that have not been written by an output. | ||
## | ||
## This value needs to be picked with awareness of the agent's | ||
## metric_batch_size value as well. Setting max undelivered messages too high | ||
## can result in a constant stream of data batches to the output. While | ||
## setting it too low may never flush the broker's messages. | ||
# max_undelivered_messages = 1000 | ||
## Data format to consume. | ||
## Each data format has its own unique set of configuration options, read | ||
## more about them here: | ||
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md | ||
data_format = "influx" | ||
`) | ||
|
||
// Migrate and check that nothing changed | ||
output, n, err := config.ApplyMigrations(defaultCfg) | ||
require.NoError(t, err) | ||
require.NotEmpty(t, output) | ||
require.Zero(t, n) | ||
require.Equal(t, string(defaultCfg), string(output)) | ||
} | ||
|
||
func TestCases(t *testing.T) { | ||
// Get all directories in testdata | ||
folders, err := os.ReadDir("testcases") | ||
require.NoError(t, err) | ||
|
||
for _, f := range folders { | ||
// Only handle folders | ||
if !f.IsDir() { | ||
continue | ||
} | ||
|
||
t.Run(f.Name(), func(t *testing.T) { | ||
testcasePath := filepath.Join("testcases", f.Name()) | ||
inputFile := filepath.Join(testcasePath, "telegraf.conf") | ||
expectedFile := filepath.Join(testcasePath, "expected.conf") | ||
|
||
// Read the expected output | ||
expected := config.NewConfig() | ||
require.NoError(t, expected.LoadConfig(expectedFile)) | ||
require.NotEmpty(t, expected.Inputs) | ||
|
||
// Read the input data | ||
input, remote, err := config.LoadConfigFile(inputFile) | ||
require.NoError(t, err) | ||
require.False(t, remote) | ||
require.NotEmpty(t, input) | ||
|
||
// Migrate | ||
output, n, err := config.ApplyMigrations(input) | ||
require.NoError(t, err) | ||
require.NotEmpty(t, output) | ||
require.GreaterOrEqual(t, n, uint64(1)) | ||
actual := config.NewConfig() | ||
require.NoError(t, actual.LoadConfigData(output)) | ||
|
||
// Test the output | ||
require.Len(t, actual.Inputs, len(expected.Inputs)) | ||
actualIDs := make([]string, 0, len(expected.Inputs)) | ||
expectedIDs := make([]string, 0, len(expected.Inputs)) | ||
for i := range actual.Inputs { | ||
actualIDs = append(actualIDs, actual.Inputs[i].ID()) | ||
expectedIDs = append(expectedIDs, expected.Inputs[i].ID()) | ||
} | ||
require.ElementsMatch(t, expectedIDs, actualIDs, string(output)) | ||
}) | ||
} | ||
} |
6 changes: 6 additions & 0 deletions
6
migrations/inputs_nats_consumer/testcases/deprecated_metric_buffer/expected.conf
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
[[inputs.nats_consumer]] | ||
servers = ["nats://localhost:4222"] | ||
subjects = ["telegraf"] | ||
jetstream_subjects = ["js_telegraf"] | ||
queue_group = "telegraf_consumers" | ||
data_format = "influx" |
19 changes: 19 additions & 0 deletions
19
migrations/inputs_nats_consumer/testcases/deprecated_metric_buffer/telegraf.conf
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
# Read metrics from NATS subject(s) | ||
[[inputs.nats_consumer]] | ||
## urls of NATS servers | ||
servers = ["nats://localhost:4222"] | ||
|
||
## subject(s) to consume | ||
subjects = ["telegraf"] | ||
|
||
## jetstream subjects | ||
jetstream_subjects = ["js_telegraf"] | ||
|
||
## name a queue group | ||
queue_group = "telegraf_consumers" | ||
|
||
## Input data format | ||
data_format = "influx" | ||
|
||
## Number of metrics to buffer | ||
metric_buffer = 1024 |
14 changes: 14 additions & 0 deletions
14
migrations/inputs_nats_consumer/testcases/deprecated_metric_buffer_parser/expected.conf
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
[[inputs.nats_consumer]] | ||
servers = ["nats://localhost:4222"] | ||
subjects = ["telegraf"] | ||
jetstream_subjects = ["js_telegraf"] | ||
queue_group = "telegraf_consumers" | ||
data_format = "xpath_json" | ||
xpath_native_types = true | ||
|
||
[[inputs.nats_consumer.xpath]] | ||
metric_name = "/name" | ||
timestamp = "/timestamp" | ||
timestamp_format = "unix_ms" | ||
field_selection = "/fields/*" | ||
tag_selection = "/tags/*" |
27 changes: 27 additions & 0 deletions
27
migrations/inputs_nats_consumer/testcases/deprecated_metric_buffer_parser/telegraf.conf
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
# Read metrics from NATS subject(s) | ||
[[inputs.nats_consumer]] | ||
## urls of NATS servers | ||
servers = ["nats://localhost:4222"] | ||
|
||
## subject(s) to consume | ||
subjects = ["telegraf"] | ||
|
||
## jetstream subjects | ||
jetstream_subjects = ["js_telegraf"] | ||
|
||
## name a queue group | ||
queue_group = "telegraf_consumers" | ||
|
||
## Number of metrics to buffer | ||
metric_buffer = 1024 | ||
|
||
data_format = "xpath_json" | ||
xpath_native_types = true | ||
|
||
# Configuration matching the first (ENERGY) message | ||
[[inputs.nats_consumer.xpath]] | ||
metric_name = "/name" | ||
timestamp = "/timestamp" | ||
timestamp_format = "unix_ms" | ||
field_selection = "/fields/*" | ||
tag_selection = "/tags/*" |