Skip to content

Commit

Permalink
move subclasses of TestBase used in older Spark versions to JUnit5
Browse files Browse the repository at this point in the history
  • Loading branch information
chinmay-bhat committed Jan 15, 2024
1 parent cb6297c commit ad730d1
Show file tree
Hide file tree
Showing 6 changed files with 78 additions and 105 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,46 +21,41 @@
import static org.apache.iceberg.PlanningMode.DISTRIBUTED;
import static org.apache.iceberg.PlanningMode.LOCAL;

import java.util.Arrays;
import java.util.List;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.spark.SparkReadConf;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.internal.SQLConf;
import org.junit.Before;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.extension.ExtendWith;

@RunWith(Parameterized.class)
@ExtendWith(ParameterizedTestExtension.class)
public abstract class SparkDistributedDataScanTestBase
extends DataTableScanTestBase<BatchScan, ScanTask, ScanTaskGroup<ScanTask>> {

@Parameters(name = "formatVersion = {0}, dataMode = {1}, deleteMode = {2}")
public static Object[] parameters() {
return new Object[][] {
new Object[] {1, LOCAL, LOCAL},
new Object[] {1, LOCAL, DISTRIBUTED},
new Object[] {1, DISTRIBUTED, LOCAL},
new Object[] {1, DISTRIBUTED, DISTRIBUTED},
new Object[] {2, LOCAL, LOCAL},
new Object[] {2, LOCAL, DISTRIBUTED},
new Object[] {2, DISTRIBUTED, LOCAL},
new Object[] {2, DISTRIBUTED, DISTRIBUTED}
};
public static List<Object> parameters() {
return Arrays.asList(
new Object[] {1, LOCAL, LOCAL},
new Object[] {1, LOCAL, DISTRIBUTED},
new Object[] {1, DISTRIBUTED, LOCAL},
new Object[] {1, DISTRIBUTED, DISTRIBUTED},
new Object[] {2, LOCAL, LOCAL},
new Object[] {2, LOCAL, DISTRIBUTED},
new Object[] {2, DISTRIBUTED, LOCAL},
new Object[] {2, DISTRIBUTED, DISTRIBUTED});
}

protected static SparkSession spark = null;

private final PlanningMode dataMode;
private final PlanningMode deleteMode;
@Parameter(index = 1)
private PlanningMode dataMode;

public SparkDistributedDataScanTestBase(
int formatVersion, PlanningMode dataPlanningMode, PlanningMode deletePlanningMode) {
super(formatVersion);
this.dataMode = dataPlanningMode;
this.deleteMode = deletePlanningMode;
}
@Parameter(index = 2)
private PlanningMode deleteMode;

@Before
@BeforeEach
public void configurePlanningModes() {
table
.updateProperties()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,42 +21,39 @@
import static org.apache.iceberg.PlanningMode.DISTRIBUTED;
import static org.apache.iceberg.PlanningMode.LOCAL;

import java.util.Arrays;
import java.util.List;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.spark.SparkReadConf;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.internal.SQLConf;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.extension.ExtendWith;

@RunWith(Parameterized.class)
@ExtendWith(ParameterizedTestExtension.class)
public class TestSparkDistributedDataScanDeletes
extends DeleteFileIndexTestBase<BatchScan, ScanTask, ScanTaskGroup<ScanTask>> {

@Parameterized.Parameters(name = "dataMode = {0}, deleteMode = {1}")
public static Object[] parameters() {
return new Object[][] {
new Object[] {LOCAL, LOCAL},
new Object[] {LOCAL, DISTRIBUTED},
new Object[] {DISTRIBUTED, LOCAL},
new Object[] {DISTRIBUTED, DISTRIBUTED}
};
@Parameters(name = "formatVersion = {0}, dataMode = {1}, deleteMode = {2}")
public static List<Object> parameters() {
return Arrays.asList(
new Object[] {2, LOCAL, LOCAL},
new Object[] {2, LOCAL, DISTRIBUTED},
new Object[] {2, DISTRIBUTED, LOCAL},
new Object[] {2, LOCAL, DISTRIBUTED});
}

private static SparkSession spark = null;

private final PlanningMode dataMode;
private final PlanningMode deleteMode;
@Parameter(index = 1)
private PlanningMode dataMode;

public TestSparkDistributedDataScanDeletes(
PlanningMode dataPlanningMode, PlanningMode deletePlanningMode) {
this.dataMode = dataPlanningMode;
this.deleteMode = deletePlanningMode;
}
@Parameter(index = 2)
private PlanningMode deleteMode;

@Before
@BeforeEach
public void configurePlanningModes() {
table
.updateProperties()
Expand All @@ -65,7 +62,7 @@ public void configurePlanningModes() {
.commit();
}

@BeforeClass
@BeforeAll
public static void startSpark() {
TestSparkDistributedDataScanDeletes.spark =
SparkSession.builder()
Expand All @@ -75,7 +72,7 @@ public static void startSpark() {
.getOrCreate();
}

@AfterClass
@AfterAll
public static void stopSpark() {
SparkSession currentSpark = TestSparkDistributedDataScanDeletes.spark;
TestSparkDistributedDataScanDeletes.spark = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,11 @@
import org.apache.iceberg.spark.SparkReadConf;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.internal.SQLConf;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.extension.ExtendWith;

@RunWith(Parameterized.class)
@ExtendWith(ParameterizedTestExtension.class)
public class TestSparkDistributedDataScanFilterFiles
extends FilterFilesTestBase<BatchScan, ScanTask, ScanTaskGroup<ScanTask>> {

Expand All @@ -51,17 +49,13 @@ public static Object[] parameters() {

private static SparkSession spark = null;

private final PlanningMode dataMode;
private final PlanningMode deleteMode;
@Parameter(index = 1)
private PlanningMode dataMode;

public TestSparkDistributedDataScanFilterFiles(
int formatVersion, PlanningMode dataPlanningMode, PlanningMode deletePlanningMode) {
super(formatVersion);
this.dataMode = dataPlanningMode;
this.deleteMode = deletePlanningMode;
}
@Parameter(index = 2)
private PlanningMode deleteMode;

@BeforeClass
@BeforeAll
public static void startSpark() {
TestSparkDistributedDataScanFilterFiles.spark =
SparkSession.builder()
Expand All @@ -71,7 +65,7 @@ public static void startSpark() {
.getOrCreate();
}

@AfterClass
@AfterAll
public static void stopSpark() {
SparkSession currentSpark = TestSparkDistributedDataScanFilterFiles.spark;
TestSparkDistributedDataScanFilterFiles.spark = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,19 @@
package org.apache.iceberg;

import org.apache.spark.sql.SparkSession;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;

public class TestSparkDistributedDataScanJavaSerialization
extends SparkDistributedDataScanTestBase {

public TestSparkDistributedDataScanJavaSerialization(
int formatVersion, PlanningMode dataPlanningMode, PlanningMode deletePlanningMode) {
super(formatVersion, dataPlanningMode, deletePlanningMode);
}

@BeforeClass
@BeforeAll
public static void startSpark() {
SparkDistributedDataScanTestBase.spark =
initSpark("org.apache.spark.serializer.JavaSerializer");
}

@AfterClass
@AfterAll
public static void stopSpark() {
SparkSession currentSpark = SparkDistributedDataScanTestBase.spark;
SparkDistributedDataScanTestBase.spark = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,19 @@
package org.apache.iceberg;

import org.apache.spark.sql.SparkSession;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;

public class TestSparkDistributedDataScanKryoSerialization
extends SparkDistributedDataScanTestBase {

public TestSparkDistributedDataScanKryoSerialization(
int formatVersion, PlanningMode dataPlanningMode, PlanningMode deletePlanningMode) {
super(formatVersion, dataPlanningMode, deletePlanningMode);
}

@BeforeClass
@BeforeAll
public static void startSpark() {
SparkDistributedDataScanTestBase.spark =
initSpark("org.apache.spark.serializer.KryoSerializer");
}

@AfterClass
@AfterAll
public static void stopSpark() {
SparkSession currentSpark = SparkDistributedDataScanTestBase.spark;
SparkDistributedDataScanTestBase.spark = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,41 +21,38 @@
import static org.apache.iceberg.PlanningMode.DISTRIBUTED;
import static org.apache.iceberg.PlanningMode.LOCAL;

import java.util.Arrays;
import java.util.List;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.spark.SparkReadConf;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.internal.SQLConf;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.extension.ExtendWith;

@RunWith(Parameterized.class)
@ExtendWith(ParameterizedTestExtension.class)
public class TestSparkDistributedDataScanReporting
extends ScanPlanningAndReportingTestBase<BatchScan, ScanTask, ScanTaskGroup<ScanTask>> {

@Parameterized.Parameters(name = "dataMode = {0}, deleteMode = {1}")
public static Object[] parameters() {
return new Object[][] {
new Object[] {LOCAL, LOCAL},
new Object[] {LOCAL, DISTRIBUTED},
new Object[] {DISTRIBUTED, LOCAL},
new Object[] {DISTRIBUTED, DISTRIBUTED}
};
@Parameters(name = "formatVersion = {0}, dataMode = {1}, deleteMode = {2}")
public static List<Object> parameters() {
return Arrays.asList(
new Object[] {2, LOCAL, LOCAL},
new Object[] {2, LOCAL, DISTRIBUTED},
new Object[] {2, DISTRIBUTED, LOCAL},
new Object[] {2, DISTRIBUTED, DISTRIBUTED});
}

private static SparkSession spark = null;

private final PlanningMode dataMode;
private final PlanningMode deleteMode;
@Parameter(index = 1)
private PlanningMode dataMode;

public TestSparkDistributedDataScanReporting(
PlanningMode dataPlanningMode, PlanningMode deletePlanningMode) {
this.dataMode = dataPlanningMode;
this.deleteMode = deletePlanningMode;
}
@Parameter(index = 2)
private PlanningMode deleteMode;

@BeforeClass
@BeforeAll
public static void startSpark() {
TestSparkDistributedDataScanReporting.spark =
SparkSession.builder()
Expand All @@ -65,7 +62,7 @@ public static void startSpark() {
.getOrCreate();
}

@AfterClass
@AfterAll
public static void stopSpark() {
SparkSession currentSpark = TestSparkDistributedDataScanReporting.spark;
TestSparkDistributedDataScanReporting.spark = null;
Expand Down

0 comments on commit ad730d1

Please sign in to comment.