Skip to content

Commit

Permalink
Add IT for aggregate mode cases
Browse files Browse the repository at this point in the history
Signed-off-by: Srikanth Govindarajan <[email protected]>
  • Loading branch information
srikanthjg committed Nov 22, 2024
1 parent e3d2d61 commit 422fd64
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 14 deletions.
27 changes: 26 additions & 1 deletion data-prepper-plugins/aws-lambda/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,33 @@ The following command runs the integration tests:
-Dtests.lambda.processor.region="us-east-1" \
-Dtests.lambda.processor.functionName="test-lambda-processor" \
-Dtests.lambda.processor.sts_role_arn="arn:aws:iam::<>:role/lambda-role"
```

Lambda handler used to test:
```
def lambda_handler(event, context):
input_arr = event.get('osi_key', [])
output = []
if len(input_arr) == 1:
input = input_arr[0]
if "returnNone" in input:
return
if "returnString" in input:
return "RandomString"
if "returnObject" in input:
return input_arr[0]
if "returnEmptyArray" in input:
return output
if "returnNull" in input:
return "null"
for input in input_arr:
input["_out_"] = "transformed";
for k,v in input.items():
if type(v) is str:
input[k] = v.upper()
output.append(input)
return output
```


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,21 @@

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Timer;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import org.mockito.Mock;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
Expand Down Expand Up @@ -50,16 +59,6 @@
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;

@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.LENIENT)
public class LambdaProcessorIT {
Expand Down Expand Up @@ -92,9 +91,12 @@ private LambdaProcessor createObjectUnderTest(LambdaProcessorConfig processorCon

@BeforeEach
public void setup() {
lambdaRegion = System.getProperty("tests.lambda.processor.region");
functionName = System.getProperty("tests.lambda.processor.functionName");
role = System.getProperty("tests.lambda.processor.sts_role_arn");
// lambdaRegion = System.getProperty("tests.lambda.processor.region");
// functionName = System.getProperty("tests.lambda.processor.functionName");
// role = System.getProperty("tests.lambda.processor.sts_role_arn");
lambdaRegion = "us-west-2";
functionName = "lambdaNoReturn";
role = "arn:aws:iam::176893235612:role/osis-lambda-role";
pluginMetrics = mock(PluginMetrics.class);
pluginSetting = mock(PluginSetting.class);
when(pluginSetting.getPipelineName()).thenReturn("pipeline");
Expand Down Expand Up @@ -232,6 +234,39 @@ public void testWithFailureTags() throws Exception {
}
}

@ParameterizedTest
@ValueSource(strings = {"returnNull", "returnEmptyArray", "returnString"})
public void testAggregateMode_WithNullOrEmptyResponse(String input) {
when(lambdaProcessorConfig.getFunctionName()).thenReturn(functionName);
when(invocationType.getAwsLambdaValue()).thenReturn(InvocationType.REQUEST_RESPONSE.getAwsLambdaValue());
when(lambdaProcessorConfig.getResponseEventsMatch()).thenReturn(false);
when(lambdaProcessorConfig.getTagsOnFailure()).thenReturn(Collections.singletonList("lambda_failure"));
lambdaProcessor = createObjectUnderTest(lambdaProcessorConfig);
List<Record<Event>> records = createRecord(input);

Collection<Record<Event>> results = lambdaProcessor.doExecute(records);

assertThat("Drop events on null or empty response", results.isEmpty());
}

private List<Record<Event>> createRecord(String input) {
List<Record<Event>> records = new ArrayList<>();
Map<String, Object> map = new HashMap<>();
map.put(input, 42);
EventMetadata metadata = DefaultEventMetadata.builder()
.withEventType("event")
.build();
final Event event = JacksonEvent.builder()
.withData(map)
.withEventType("event")
.withEventMetadata(metadata)
.build();
records.add(new Record<>(event));

return records;
}


private void validateResultsForAggregateMode(Collection<Record<Event>> results) {
List<Record<Event>> resultRecords = new ArrayList<>(results);
for (int i = 0; i < resultRecords.size(); i++) {
Expand Down

0 comments on commit 422fd64

Please sign in to comment.