Skip to content

Commit

Permalink
Adds a new integration test to the S3 sink which can test different s…
Browse files Browse the repository at this point in the history
…cenarios. This currently is testing against ndjson since this codec generally works. (#3179)

Signed-off-by: David Venable <[email protected]>
  • Loading branch information
dlvenable authored Aug 16, 2023
1 parent b0e5006 commit b7661e6
Show file tree
Hide file tree
Showing 9 changed files with 433 additions and 2 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ subprojects {

configure(subprojects.findAll {it.name != 'data-prepper-api'}) {
dependencies {
implementation platform('software.amazon.awssdk:bom:2.17.264')
implementation platform('software.amazon.awssdk:bom:2.20.67')
implementation 'jakarta.validation:jakarta.validation-api:3.0.2'
}
}
Expand Down
6 changes: 5 additions & 1 deletion data-prepper-plugins/s3-sink/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ dependencies {
testImplementation project(':data-prepper-plugins:parse-json-processor')
testImplementation project(':data-prepper-plugins:csv-processor')
testImplementation project(':data-prepper-plugins:avro-codecs')
testImplementation testLibs.slf4j.simple
testImplementation 'software.amazon.awssdk:s3-transfer-manager'
}

test {
Expand Down Expand Up @@ -56,10 +58,12 @@ task integrationTest(type: Test) {
useJUnitPlatform()

classpath = sourceSets.integrationTest.runtimeClasspath

systemProperty 'log4j.configurationFile', 'src/test/resources/log4j2.properties'
systemProperty 'tests.s3sink.bucket', System.getProperty('tests.s3sink.bucket')
systemProperty 'tests.s3sink.region', System.getProperty('tests.s3sink.region')

filter {
includeTestsMatching '*IT'
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.sink.s3;

import org.opensearch.dataprepper.plugins.sink.s3.compression.CompressionOption;

import java.io.IOException;
import java.io.InputStream;

/**
* A scenario for whole-file compression.
*/
public interface CompressionScenario {
CompressionOption getCompressionOption();
InputStream decompressingInputStream(final InputStream inputStream) throws IOException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.sink.s3;

import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
import org.opensearch.dataprepper.plugins.sink.s3.compression.CompressionOption;

import java.io.IOException;
import java.io.InputStream;

public class GZipCompressionScenario implements CompressionScenario {
@Override
public CompressionOption getCompressionOption() {
return CompressionOption.GZIP;
}

@Override
public InputStream decompressingInputStream(final InputStream inputStream) throws IOException {
return new GzipCompressorInputStream(inputStream);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.sink.s3;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.opensearch.dataprepper.model.codec.OutputCodec;
import org.opensearch.dataprepper.plugins.codec.json.NdjsonOutputCodec;
import org.opensearch.dataprepper.plugins.codec.json.NdjsonOutputConfig;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Scanner;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;

public class NdjsonOutputScenario implements OutputScenario {

public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

@Override
public OutputCodec getCodec() {
return new NdjsonOutputCodec(new NdjsonOutputConfig());
}

@Override
public void validate(final List<Map<String, Object>> allEventData, final File actualContentFile) throws IOException {
final FileInputStream fileInputStream = new FileInputStream(actualContentFile);

final Scanner scanner = new Scanner(fileInputStream);

int i = 0;
while (scanner.hasNext()) {
final Map<String, Object> expectedData = allEventData.get(i);

final String actualJsonString = scanner.next();

final Map<String, Object> actualData = OBJECT_MAPPER.readValue(actualJsonString, Map.class);

assertThat(actualData, equalTo(expectedData));
i++;
}

assertThat(i, equalTo(allEventData.size()));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.sink.s3;

import org.opensearch.dataprepper.plugins.sink.s3.compression.CompressionOption;

import java.io.IOException;
import java.io.InputStream;

public class NoneCompressionScenario implements CompressionScenario {
@Override
public CompressionOption getCompressionOption() {
return CompressionOption.NONE;
}

@Override
public InputStream decompressingInputStream(final InputStream inputStream) throws IOException {
return inputStream;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.sink.s3;

import org.opensearch.dataprepper.model.codec.OutputCodec;

import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Map;

/**
* Represents a scenario for the output format.
*/
public interface OutputScenario {
/**
* Gets the codec this scenario uses.
*
* @return The {@link OutputCodec}
*/
OutputCodec getCodec();

/**
* Validates the data against all the events provided.
*
* @param allEventData The collection of all the expected event maps.
* @param actualContentFile The actual file which has been downloaded and decompressed as part of the test
* @throws IOException Some IOException
*/
void validate(List<Map<String, Object>> allEventData, File actualContentFile) throws IOException;
}
Loading

0 comments on commit b7661e6

Please sign in to comment.