Skip to content

Commit

Permalink
Exception flow clean up and Test cases clean up (#5204)
Browse files Browse the repository at this point in the history
* Addressed review comments.

Signed-off-by: Kondaka <[email protected]>

* logic clean up and additional Test cases. Tests shouldn't be flaky anymore

Signed-off-by: Santhosh Gandhe <[email protected]>

* additional exception handling
metrics renaming

Signed-off-by: Santhosh Gandhe <[email protected]>

* better assertions

Signed-off-by: Santhosh Gandhe <[email protected]>

* aligned else blocks to fall into one catch

Signed-off-by: Santhosh Gandhe <[email protected]>

* removing concurrency

Signed-off-by: Santhosh Gandhe <[email protected]>

* making the mock object local

Signed-off-by: Santhosh Gandhe <[email protected]>

---------

Signed-off-by: Kondaka <[email protected]>
Signed-off-by: Santhosh Gandhe <[email protected]>
Co-authored-by: Kondaka <[email protected]>
  • Loading branch information
san81 and kkondaka authored Nov 20, 2024
1 parent 72fa423 commit f3bcd6e
Show file tree
Hide file tree
Showing 17 changed files with 946 additions and 961 deletions.
5 changes: 1 addition & 4 deletions data-prepper-plugins/aws-lambda/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,7 @@ task integrationTest(type: Test) {
classpath = sourceSets.integrationTest.runtimeClasspath

systemProperty 'log4j.configurationFile', 'src/test/resources/log4j2.properties'

//Enable Multi-thread in tests
systemProperty 'junit.jupiter.execution.parallel.enabled', 'true'
systemProperty 'junit.jupiter.execution.parallel.mode.default', 'concurrent'


systemProperty 'tests.lambda.sink.region', System.getProperty('tests.lambda.sink.region')
systemProperty 'tests.lambda.sink.functionName', System.getProperty('tests.lambda.sink.functionName')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,65 +3,68 @@
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.dataprepper.plugins.lambda.processor;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.spy;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Timer;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.codec.InputCodec;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.event.DefaultEventMetadata;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.event.EventMetadata;
import org.opensearch.dataprepper.model.event.DefaultEventMetadata;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.types.ByteCount;
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.plugins.codec.json.JsonInputCodec;
import org.opensearch.dataprepper.plugins.codec.json.JsonInputCodecConfig;
import org.opensearch.dataprepper.plugins.lambda.common.accumlator.Buffer;
import org.opensearch.dataprepper.plugins.lambda.common.config.InvocationType;
import org.opensearch.dataprepper.plugins.lambda.common.config.AwsAuthenticationOptions;
import org.opensearch.dataprepper.plugins.lambda.common.config.BatchOptions;
import org.opensearch.dataprepper.plugins.lambda.common.config.InvocationType;
import org.opensearch.dataprepper.plugins.lambda.common.config.ThresholdOptions;
import org.opensearch.dataprepper.plugins.lambda.common.config.AwsAuthenticationOptions;
import org.opensearch.dataprepper.model.codec.InputCodec;
import org.opensearch.dataprepper.plugins.codec.json.JsonInputCodec;
import org.opensearch.dataprepper.plugins.codec.json.JsonInputCodecConfig;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.ArgumentMatchers.any;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.lenient;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.Timer;
import software.amazon.awssdk.services.lambda.model.InvokeResponse;
import java.lang.reflect.Field;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.List;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;

@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.LENIENT)
public class LambdaProcessorIT {
@Mock
InvocationType invocationType;
private AwsCredentialsProvider awsCredentialsProvider;
private LambdaProcessor lambdaProcessor;
private LambdaProcessorConfig lambdaProcessorConfig;
Expand All @@ -73,78 +76,45 @@ public class LambdaProcessorIT {
@Mock
private PluginFactory pluginFactory;
@Mock
private PluginMetrics pluginMetrics;
@Mock
private PluginSetting pluginSetting;
@Mock
private ExpressionEvaluator expressionEvaluator;
@Mock
private Counter numberOfRecordsSuccessCounter;
private Counter testCounter;
@Mock
private Counter numberOfRecordsFailedCounter;
@Mock
private Counter numberOfRequestsSuccessCounter;
@Mock
private Counter numberOfRequestsFailedCounter;
@Mock
private Counter sinkSuccessCounter;
@Mock
private Timer lambdaLatencyMetric;
@Mock
private DistributionSummary requestPayloadMetric;
@Mock
private DistributionSummary responsePayloadMetric;
@Mock
InvocationType invocationType;
private Timer testTimer;

private LambdaProcessor createObjectUnderTest(LambdaProcessorConfig processorConfig) {
return new LambdaProcessor(pluginFactory, pluginSetting, processorConfig, awsCredentialsSupplier, expressionEvaluator);
}

private void setPrivateField(Object targetObject, String fieldName, Object value) throws Exception {
Field field = targetObject.getClass().getDeclaredField(fieldName);
field.setAccessible(true);
field.set(targetObject, value);
}

private void setPrivateFields(final LambdaProcessor lambdaProcessor) throws Exception {
setPrivateField(lambdaProcessor, "numberOfRecordsSuccessCounter", numberOfRecordsSuccessCounter);
setPrivateField(lambdaProcessor, "numberOfRecordsFailedCounter", numberOfRecordsFailedCounter);
setPrivateField(lambdaProcessor, "numberOfRequestsSuccessCounter", numberOfRequestsSuccessCounter);
setPrivateField(lambdaProcessor, "numberOfRequestsFailedCounter", numberOfRequestsFailedCounter);
setPrivateField(lambdaProcessor, "lambdaLatencyMetric", lambdaLatencyMetric);
setPrivateField(lambdaProcessor, "requestPayloadMetric", requestPayloadMetric);
setPrivateField(lambdaProcessor, "responsePayloadMetric", responsePayloadMetric);
}

@BeforeEach
public void setup() {
lambdaRegion = System.getProperty("tests.lambda.processor.region");
functionName = System.getProperty("tests.lambda.processor.functionName");
role = System.getProperty("tests.lambda.processor.sts_role_arn");
pluginMetrics = mock(PluginMetrics.class);
pluginSetting = mock(PluginSetting.class);
when(pluginSetting.getPipelineName()).thenReturn("pipeline");
when(pluginSetting.getName()).thenReturn("name");
numberOfRecordsSuccessCounter = mock(Counter.class);
numberOfRecordsFailedCounter = mock(Counter.class);
numberOfRequestsSuccessCounter = mock(Counter.class);
numberOfRequestsFailedCounter = mock(Counter.class);
lambdaLatencyMetric = mock(Timer.class);
requestPayloadMetric = mock(DistributionSummary.class);
responsePayloadMetric = mock(DistributionSummary.class);
try {
lenient().doAnswer(args -> {
return null;
}).when(numberOfRecordsSuccessCounter).increment(any(Double.class));
} catch (Exception e){}
testCounter = mock(Counter.class);
try {
lenient().doAnswer(args -> {
return null;
}).when(numberOfRecordsFailedCounter).increment();
} catch (Exception e){}
}).when(testCounter).increment(any(Double.class));
} catch (Exception e) {
}
try {
lenient().doAnswer(args -> {
return null;
}).when(lambdaLatencyMetric).record(any(Long.class), any(TimeUnit.class));
} catch (Exception e){}

}).when(testTimer).record(any(Long.class), any(TimeUnit.class));
} catch (Exception e) {
}
when(pluginMetrics.counter(any())).thenReturn(testCounter);
testTimer = mock(Timer.class);
when(pluginMetrics.timer(any())).thenReturn(testTimer);
lambdaProcessorConfig = mock(LambdaProcessorConfig.class);
expressionEvaluator = mock(ExpressionEvaluator.class);
awsCredentialsProvider = DefaultCredentialsProvider.create();
Expand Down Expand Up @@ -201,7 +171,7 @@ public void testRequestResponseWithMatchingEventsAggregateMode(int numRecords) {
List<Record<Event>> records = createRecords(numRecords);
Collection<Record<Event>> results = lambdaProcessor.doExecute(records);
assertThat(results.size(), equalTo(numRecords));
validateResultsForAggregateMode(results );
validateResultsForAggregateMode(results);
}

@ParameterizedTest
Expand Down Expand Up @@ -242,7 +212,7 @@ public void testDifferentInvocationTypes(String invocationType) throws Exception
validateStrictModeResults(results);
} else {
// For "Event" invocation type
assertThat(results.size(), equalTo(0));
assertThat(results.size(), equalTo(10));
}
}

Expand Down Expand Up @@ -287,12 +257,12 @@ private void validateStrictModeResults(Collection<Record<Event>> results) {
for (int i = 0; i < resultRecords.size(); i++) {
Map<String, Object> eventData = resultRecords.get(i).getData().toMap();
Map<String, Object> attr = resultRecords.get(i).getData().getMetadata().getAttributes();
int id = (Integer)eventData.get("id");
assertThat(eventData.get("key"+id), equalTo(id));
String stringValue = "value"+id;
assertThat(eventData.get("keys"+id), equalTo(stringValue.toUpperCase()));
assertThat(attr.get("attr"+id), equalTo(id));
assertThat(attr.get("attrs"+id), equalTo("attrvalue"+id));
int id = (Integer) eventData.get("id");
assertThat(eventData.get("key" + id), equalTo(id));
String stringValue = "value" + id;
assertThat(eventData.get("keys" + id), equalTo(stringValue.toUpperCase()));
assertThat(attr.get("attr" + id), equalTo(id));
assertThat(attr.get("attrs" + id), equalTo("attrvalue" + id));
}
}

Expand All @@ -301,11 +271,11 @@ private List<Record<Event>> createRecords(int numRecords) {
for (int i = 0; i < numRecords; i++) {
Map<String, Object> map = new HashMap<>();
map.put("id", i);
map.put("key"+i, i);
map.put("keys"+i, "value"+i);
map.put("key" + i, i);
map.put("keys" + i, "value" + i);
Map<String, Object> attrs = new HashMap<>();
attrs.put("attr"+i, i);
attrs.put("attrs"+i, "attrvalue"+i);
attrs.put("attr" + i, i);
attrs.put("attrs" + i, "attrvalue" + i);
EventMetadata metadata = DefaultEventMetadata.builder()
.withEventType("event")
.withAttributes(attrs)
Expand Down
Loading

0 comments on commit f3bcd6e

Please sign in to comment.