Skip to content

Commit

Permalink
Spark 3.5: Migrate remaining tests in source directory to JUnit5 (#9380)
Browse files Browse the repository at this point in the history
  • Loading branch information
chinmay-bhat authored Jan 13, 2024
1 parent 850cd5c commit a3d87e2
Show file tree
Hide file tree
Showing 19 changed files with 1,183 additions and 1,018 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.iceberg.spark.source;

import static org.apache.iceberg.FileFormat.AVRO;
import static org.apache.iceberg.FileFormat.ORC;
import static org.apache.iceberg.FileFormat.PARQUET;
import static org.apache.iceberg.RowLevelOperationMode.MERGE_ON_READ;
import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION;
Expand All @@ -34,6 +36,7 @@
import static org.apache.iceberg.spark.SparkSQLProperties.COMPRESSION_LEVEL;
import static org.apache.iceberg.spark.SparkSQLProperties.COMPRESSION_STRATEGY;
import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
import static org.assertj.core.api.Assertions.assertThat;

import java.util.List;
import java.util.Map;
Expand All @@ -50,6 +53,9 @@
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.ManifestReader;
import org.apache.iceberg.Parameter;
import org.apache.iceberg.ParameterizedTestExtension;
import org.apache.iceberg.Parameters;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Table;
import org.apache.iceberg.actions.SizeBasedFileRewriter;
Expand All @@ -58,8 +64,8 @@
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.spark.CatalogTestBase;
import org.apache.iceberg.spark.SparkCatalogConfig;
import org.apache.iceberg.spark.SparkCatalogTestBase;
import org.apache.iceberg.spark.SparkWriteOptions;
import org.apache.iceberg.spark.actions.SparkActions;
import org.apache.orc.OrcFile;
Expand All @@ -69,66 +75,90 @@
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.assertj.core.api.Assertions;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
public class TestCompressionSettings extends SparkCatalogTestBase {
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;

@ExtendWith(ParameterizedTestExtension.class)
public class TestCompressionSettings extends CatalogTestBase {

private static final Configuration CONF = new Configuration();
private static final String tableName = "testWriteData";

private static SparkSession spark = null;

private final FileFormat format;
private final ImmutableMap<String, String> properties;
@Parameter(index = 3)
private FileFormat format;

@Parameter(index = 4)
private Map<String, String> properties;

@Rule public TemporaryFolder temp = new TemporaryFolder();
@TempDir private java.nio.file.Path temp;

@Parameterized.Parameters(name = "format = {0}, properties = {1}")
@Parameters(
name =
"catalogName = {0}, implementation = {1}, config = {2}, format = {3}, properties = {4}")
public static Object[][] parameters() {
return new Object[][] {
{"parquet", ImmutableMap.of(COMPRESSION_CODEC, "zstd", COMPRESSION_LEVEL, "1")},
{"parquet", ImmutableMap.of(COMPRESSION_CODEC, "gzip")},
{"orc", ImmutableMap.of(COMPRESSION_CODEC, "zstd", COMPRESSION_STRATEGY, "speed")},
{"orc", ImmutableMap.of(COMPRESSION_CODEC, "zstd", COMPRESSION_STRATEGY, "compression")},
{"avro", ImmutableMap.of(COMPRESSION_CODEC, "snappy", COMPRESSION_LEVEL, "3")}
{
SparkCatalogConfig.SPARK.catalogName(),
SparkCatalogConfig.SPARK.implementation(),
SparkCatalogConfig.SPARK.properties(),
PARQUET,
ImmutableMap.of(COMPRESSION_CODEC, "zstd", COMPRESSION_LEVEL, "1")
},
{
SparkCatalogConfig.SPARK.catalogName(),
SparkCatalogConfig.SPARK.implementation(),
SparkCatalogConfig.SPARK.properties(),
PARQUET,
ImmutableMap.of(COMPRESSION_CODEC, "gzip")
},
{
SparkCatalogConfig.SPARK.catalogName(),
SparkCatalogConfig.SPARK.implementation(),
SparkCatalogConfig.SPARK.properties(),
ORC,
ImmutableMap.of(COMPRESSION_CODEC, "zstd", COMPRESSION_STRATEGY, "speed")
},
{
SparkCatalogConfig.SPARK.catalogName(),
SparkCatalogConfig.SPARK.implementation(),
SparkCatalogConfig.SPARK.properties(),
ORC,
ImmutableMap.of(COMPRESSION_CODEC, "zstd", COMPRESSION_STRATEGY, "compression")
},
{
SparkCatalogConfig.SPARK.catalogName(),
SparkCatalogConfig.SPARK.implementation(),
SparkCatalogConfig.SPARK.properties(),
AVRO,
ImmutableMap.of(COMPRESSION_CODEC, "snappy", COMPRESSION_LEVEL, "3")
}
};
}

@BeforeClass
@BeforeAll
public static void startSpark() {
TestCompressionSettings.spark = SparkSession.builder().master("local[2]").getOrCreate();
}

@Parameterized.AfterParam
public static void clearSourceCache() {
@AfterEach
public void afterEach() {
spark.sql(String.format("DROP TABLE IF EXISTS %s", tableName));
}

@AfterClass
@AfterAll
public static void stopSpark() {
SparkSession currentSpark = TestCompressionSettings.spark;
TestCompressionSettings.spark = null;
currentSpark.stop();
}

public TestCompressionSettings(String format, ImmutableMap properties) {
super(
SparkCatalogConfig.SPARK.catalogName(),
SparkCatalogConfig.SPARK.implementation(),
SparkCatalogConfig.SPARK.properties());
this.format = FileFormat.fromString(format);
this.properties = properties;
}

@Test
@TestTemplate
public void testWriteDataWithDifferentSetting() throws Exception {
sql("CREATE TABLE %s (id int, data string) USING iceberg", tableName);
Map<String, String> tableProperties = Maps.newHashMap();
Expand Down Expand Up @@ -170,7 +200,7 @@ public void testWriteDataWithDifferentSetting() throws Exception {
try (ManifestReader<DataFile> reader = ManifestFiles.read(manifestFiles.get(0), table.io())) {
DataFile file = reader.iterator().next();
InputFile inputFile = table.io().newInputFile(file.path().toString());
Assertions.assertThat(getCompressionType(inputFile))
assertThat(getCompressionType(inputFile))
.isEqualToIgnoringCase(properties.get(COMPRESSION_CODEC));
}

Expand All @@ -184,7 +214,7 @@ public void testWriteDataWithDifferentSetting() throws Exception {
ManifestFiles.readDeleteManifest(deleteManifestFiles.get(0), table.io(), specMap)) {
DeleteFile file = reader.iterator().next();
InputFile inputFile = table.io().newInputFile(file.path().toString());
Assertions.assertThat(getCompressionType(inputFile))
assertThat(getCompressionType(inputFile))
.isEqualToIgnoringCase(properties.get(COMPRESSION_CODEC));
}

Expand All @@ -198,7 +228,7 @@ public void testWriteDataWithDifferentSetting() throws Exception {
ManifestFiles.readDeleteManifest(deleteManifestFiles.get(0), table.io(), specMap)) {
DeleteFile file = reader.iterator().next();
InputFile inputFile = table.io().newInputFile(file.path().toString());
Assertions.assertThat(getCompressionType(inputFile))
assertThat(getCompressionType(inputFile))
.isEqualToIgnoringCase(properties.get(COMPRESSION_CODEC));
}
}
Expand Down
Loading

0 comments on commit a3d87e2

Please sign in to comment.