diff --git a/data-prepper-plugins/aws-lambda/README.md b/data-prepper-plugins/aws-lambda/README.md index 099b390702..3df838491a 100644 --- a/data-prepper-plugins/aws-lambda/README.md +++ b/data-prepper-plugins/aws-lambda/README.md @@ -49,8 +49,33 @@ The following command runs the integration tests: -Dtests.lambda.processor.region="us-east-1" \ -Dtests.lambda.processor.functionName="test-lambda-processor" \ -Dtests.lambda.processor.sts_role_arn="arn:aws:iam::<>:role/lambda-role" +``` - +Lambda handler used to test: +``` +def lambda_handler(event, context): + input_arr = event.get('osi_key', []) + output = [] + if len(input_arr) == 1: + input = input_arr[0] + if "returnNone" in input: + return + if "returnString" in input: + return "RandomString" + if "returnObject" in input: + return input_arr[0] + if "returnEmptyArray" in input: + return output + if "returnNull" in input: + return "null" + for input in input_arr: + input["_out_"] = "transformed"; + for k,v in input.items(): + if type(v) is str: + input[k] = v.upper() + output.append(input) + + return output ``` diff --git a/data-prepper-plugins/aws-lambda/src/integrationTest/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessorIT.java b/data-prepper-plugins/aws-lambda/src/integrationTest/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessorIT.java index 5ea7115bbf..a9df9c4518 100644 --- a/data-prepper-plugins/aws-lambda/src/integrationTest/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessorIT.java +++ b/data-prepper-plugins/aws-lambda/src/integrationTest/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessorIT.java @@ -6,12 +6,21 @@ import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.Timer; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import org.mockito.Mock; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoSettings; import org.mockito.quality.Strictness; @@ -50,16 +59,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; -import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.lenient; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.when; - @ExtendWith(MockitoExtension.class) @MockitoSettings(strictness = Strictness.LENIENT) public class LambdaProcessorIT { @@ -92,9 +91,12 @@ private LambdaProcessor createObjectUnderTest(LambdaProcessorConfig processorCon @BeforeEach public void setup() { - lambdaRegion = System.getProperty("tests.lambda.processor.region"); - functionName = System.getProperty("tests.lambda.processor.functionName"); - role = System.getProperty("tests.lambda.processor.sts_role_arn"); +// lambdaRegion = System.getProperty("tests.lambda.processor.region"); +// functionName = System.getProperty("tests.lambda.processor.functionName"); +// role = System.getProperty("tests.lambda.processor.sts_role_arn"); + lambdaRegion = "us-west-2"; + functionName = "lambdaNoReturn"; + role = "arn:aws:iam::176893235612:role/osis-lambda-role"; pluginMetrics = mock(PluginMetrics.class); pluginSetting = mock(PluginSetting.class); when(pluginSetting.getPipelineName()).thenReturn("pipeline"); @@ -232,6 +234,39 @@ public void testWithFailureTags() throws Exception { } } + @ParameterizedTest + @ValueSource(strings = {"returnNull", "returnEmptyArray", "returnString"}) + public void testAggregateMode_WithNullOrEmptyResponse(String input) { + when(lambdaProcessorConfig.getFunctionName()).thenReturn(functionName); + when(invocationType.getAwsLambdaValue()).thenReturn(InvocationType.REQUEST_RESPONSE.getAwsLambdaValue()); + when(lambdaProcessorConfig.getResponseEventsMatch()).thenReturn(false); + when(lambdaProcessorConfig.getTagsOnFailure()).thenReturn(Collections.singletonList("lambda_failure")); + lambdaProcessor = createObjectUnderTest(lambdaProcessorConfig); + List> records = createRecord(input); + + Collection> results = lambdaProcessor.doExecute(records); + + assertThat("Drop events on null or empty response", results.isEmpty()); + } + + private List> createRecord(String input) { + List> records = new ArrayList<>(); + Map map = new HashMap<>(); + map.put(input, 42); + EventMetadata metadata = DefaultEventMetadata.builder() + .withEventType("event") + .build(); + final Event event = JacksonEvent.builder() + .withData(map) + .withEventType("event") + .withEventMetadata(metadata) + .build(); + records.add(new Record<>(event)); + + return records; + } + + private void validateResultsForAggregateMode(Collection> results) { List> resultRecords = new ArrayList<>(results); for (int i = 0; i < resultRecords.size(); i++) {