diff --git a/data-prepper-plugins/key-value-processor/README.md b/data-prepper-plugins/key-value-processor/README.md index 02c01a7c4c..f787284fbb 100644 --- a/data-prepper-plugins/key-value-processor/README.md +++ b/data-prepper-plugins/key-value-processor/README.md @@ -31,7 +31,7 @@ When run, the processor will parse the message into the following output: ## Configuration * `source` - The field in the message that will be parsed. * Default: `message` -* `destination` - The field the parsed source will be output to. This will overwrite any preexisting data for that key. +* `destination` - The field the parsed source will be output to. This will overwrite any preexisting data for that key. If `destination` is set to `null`, the parsed fields will be written to the root of the event. * Default: `parsed_message` * `field_delimiter_regex` - A regex specifying the delimiter between key/value pairs. Special regex characters such as `[` and `]` must be escaped using `\\`. * There is no default. @@ -98,6 +98,8 @@ When run, the processor will parse the message into the following output: * While `recursive` is `true`, `remove_brackets` cannot also be `true`. * While `recursive` is `true`, `skip_duplicate_values` will always be `true`. * While `recursive` is `true`, `whitespace` will always be `"strict"`. +* `overwrite_if_destination_exists` - Specify whether to overwrite existing fields if there are key conflicts when writing parsed fields to the event. + * Default: `true` ## Developer Guide This plugin is compatible with Java 14. See diff --git a/data-prepper-plugins/key-value-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessor.java b/data-prepper-plugins/key-value-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessor.java index fb11a3386d..9a347ad9fd 100644 --- a/data-prepper-plugins/key-value-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessor.java +++ b/data-prepper-plugins/key-value-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessor.java @@ -248,12 +248,32 @@ public Collection> doExecute(final Collection> recor final Map processedMap = executeConfigs(outputMap); - recordEvent.put(keyValueProcessorConfig.getDestination(), processedMap); + if (Objects.isNull(keyValueProcessorConfig.getDestination())) { + writeToRoot(recordEvent, processedMap); + } else { + if (keyValueProcessorConfig.getOverwriteIfDestinationExists() || + !recordEvent.containsKey(keyValueProcessorConfig.getDestination())) { + recordEvent.put(keyValueProcessorConfig.getDestination(), processedMap); + } + } } return records; } + @Override + public void prepareForShutdown() { + } + + @Override + public boolean isReadyForShutdown() { + return true; + } + + @Override + public void shutdown() { + } + private ObjectNode recurse(final String input, final ObjectMapper mapper) { Stack bracketStack = new Stack(); Map bracketMap = initBracketMap(); @@ -495,16 +515,11 @@ private void addKeyValueToMap(final Map parsedMap, final String } } - @Override - public void prepareForShutdown() { - } - - @Override - public boolean isReadyForShutdown() { - return true; - } - - @Override - public void shutdown() { + private void writeToRoot(final Event event, final Map parsedJson) { + for (Map.Entry entry : parsedJson.entrySet()) { + if (keyValueProcessorConfig.getOverwriteIfDestinationExists() || !event.containsKey(entry.getKey())) { + event.put(entry.getKey(), entry.getValue()); + } + } } } diff --git a/data-prepper-plugins/key-value-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessorConfig.java b/data-prepper-plugins/key-value-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessorConfig.java index 9688a38441..fb7e9abdad 100644 --- a/data-prepper-plugins/key-value-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessorConfig.java +++ b/data-prepper-plugins/key-value-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessorConfig.java @@ -34,7 +34,6 @@ public class KeyValueProcessorConfig { @NotEmpty private String source = DEFAULT_SOURCE; - @NotEmpty private String destination = DEFAULT_DESTINATION; @JsonProperty("field_delimiter_regex") @@ -96,6 +95,9 @@ public class KeyValueProcessorConfig { @NotNull private boolean recursive = DEFAULT_RECURSIVE; + @JsonProperty("overwrite_if_destination_exists") + private boolean overwriteIfDestinationExists = true; + public String getSource() { return source; } @@ -167,4 +169,8 @@ public boolean getRemoveBrackets() { public boolean getRecursive() { return recursive; } + + public boolean getOverwriteIfDestinationExists() { + return overwriteIfDestinationExists; + } } diff --git a/data-prepper-plugins/key-value-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessorTests.java b/data-prepper-plugins/key-value-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessorTests.java index 38da5beebf..66d9c6f7ed 100644 --- a/data-prepper-plugins/key-value-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessorTests.java +++ b/data-prepper-plugins/key-value-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessorTests.java @@ -72,6 +72,7 @@ void setup() { lenient().when(mockConfig.getSkipDuplicateValues()).thenReturn(defaultConfig.getSkipDuplicateValues()); lenient().when(mockConfig.getRemoveBrackets()).thenReturn(defaultConfig.getRemoveBrackets()); lenient().when(mockConfig.getRecursive()).thenReturn(defaultConfig.getRecursive()); + lenient().when(mockConfig.getOverwriteIfDestinationExists()).thenReturn(defaultConfig.getOverwriteIfDestinationExists()); keyValueProcessor = new KeyValueProcessor(pluginMetrics, mockConfig); } @@ -97,6 +98,76 @@ void testMultipleKvToObjectKeyValueProcessor() { assertThatKeyEquals(parsed_message, "key2", "value2"); } + @Test + void testWriteToRoot() { + when(mockConfig.getDestination()).thenReturn(null); + final Record record = getMessage("key1=value1&key2=value2"); + final List> editedRecords = (List>) keyValueProcessor.doExecute(Collections.singletonList(record)); + + final Event event = editedRecords.get(0).getData(); + assertThat(event.containsKey("parsed_message"), is(false)); + + assertThat(event.containsKey("key1"), is(true)); + assertThat(event.containsKey("key2"), is(true)); + assertThat(event.get("key1", Object.class), is("value1")); + assertThat(event.get("key2", Object.class), is("value2")); + } + + @Test + void testWriteToRootWithOverwrite() { + when(mockConfig.getDestination()).thenReturn(null); + final Record record = getMessage("key1=value1&key2=value2"); + record.getData().put("key1", "value to be overwritten"); + final List> editedRecords = (List>) keyValueProcessor.doExecute(Collections.singletonList(record)); + + final Event event = editedRecords.get(0).getData(); + + assertThat(event.containsKey("key1"), is(true)); + assertThat(event.containsKey("key2"), is(true)); + assertThat(event.get("key1", Object.class), is("value1")); + assertThat(event.get("key2", Object.class), is("value2")); + } + + @Test + void testWriteToDestinationWithOverwrite() { + final Record record = getMessage("key1=value1&key2=value2"); + record.getData().put("parsed_message", "value to be overwritten"); + final List> editedRecords = (List>) keyValueProcessor.doExecute(Collections.singletonList(record)); + final LinkedHashMap parsed_message = getLinkedHashMap(editedRecords); + + assertThat(parsed_message.size(), equalTo(2)); + assertThatKeyEquals(parsed_message, "key1", "value1"); + assertThatKeyEquals(parsed_message, "key2", "value2"); + } + + @Test + void testWriteToRootWithOverwriteDisabled() { + when(mockConfig.getDestination()).thenReturn(null); + when(mockConfig.getOverwriteIfDestinationExists()).thenReturn(false); + final Record record = getMessage("key1=value1&key2=value2"); + record.getData().put("key1", "value will not be overwritten"); + final List> editedRecords = (List>) keyValueProcessor.doExecute(Collections.singletonList(record)); + + final Event event = editedRecords.get(0).getData(); + + assertThat(event.containsKey("key1"), is(true)); + assertThat(event.containsKey("key2"), is(true)); + assertThat(event.get("key1", Object.class), is("value will not be overwritten")); + assertThat(event.get("key2", Object.class), is("value2")); + } + + @Test + void testWriteToDestinationWithOverwriteDisabled() { + when(mockConfig.getOverwriteIfDestinationExists()).thenReturn(false); + final Record record = getMessage("key1=value1&key2=value2"); + record.getData().put("parsed_message", "value will not be overwritten"); + final List> editedRecords = (List>) keyValueProcessor.doExecute(Collections.singletonList(record)); + final Event event = editedRecords.get(0).getData(); + + assertThat(event.containsKey("parsed_message"), is(true)); + assertThat(event.get("parsed_message", Object.class), is("value will not be overwritten")); + } + @Test void testSingleRegexFieldDelimiterKvToObjectKeyValueProcessor() { when(mockConfig.getFieldDelimiterRegex()).thenReturn(":_*:");