Skip to content

Commit

Permalink
Spark, Flink: Migrate DeleteReadTests and its subclasses to JUnit5 (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
chinmay-bhat authored Jan 15, 2024
1 parent 23e17ce commit bc7e56c
Show file tree
Hide file tree
Showing 16 changed files with 424 additions and 397 deletions.
176 changes: 123 additions & 53 deletions data/src/test/java/org/apache/iceberg/data/DeleteReadTests.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,28 @@
*/
package org.apache.iceberg.data;

import static org.assertj.core.api.Assertions.assertThat;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import org.apache.iceberg.ParameterizedTestExtension;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.TestTables;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.util.StructLikeSet;
import org.junit.Assert;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith(ParameterizedTestExtension.class)
public class TestGenericReaderDeletes extends DeleteReadTests {

@Override
protected Table createTable(String name, Schema schema, PartitionSpec spec) throws IOException {
File tableDir = temp.newFolder();
Assert.assertTrue(tableDir.delete());
File tableDir = Files.createTempDirectory(temp, "junit").toFile();
assertThat(tableDir.delete()).isTrue();

return TestTables.create(tableDir, name, schema, spec, 2);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.flink.table.types.logical.RowType;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
Expand All @@ -36,11 +35,6 @@
import org.apache.iceberg.util.StructLikeSet;

public class TestFlinkInputFormatReaderDeletes extends TestFlinkReaderDeletesBase {

public TestFlinkInputFormatReaderDeletes(FileFormat inputFormat) {
super(inputFormat);
}

@Override
protected StructLikeSet rowSet(String tableName, Table testTable, String... columns)
throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.ParameterizedTestExtension;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
Expand All @@ -35,40 +35,19 @@
import org.apache.iceberg.hive.TestHiveMetastore;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.extension.ExtendWith;

@RunWith(Parameterized.class)
@ExtendWith(ParameterizedTestExtension.class)
public abstract class TestFlinkReaderDeletesBase extends DeleteReadTests {

@ClassRule public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();

protected static String databaseName = "default";

protected static HiveConf hiveConf = null;
protected static HiveCatalog catalog = null;
private static TestHiveMetastore metastore = null;

protected final FileFormat format;

@Parameterized.Parameters(name = "fileFormat={0}")
public static Object[][] parameters() {
return new Object[][] {
new Object[] {FileFormat.PARQUET},
new Object[] {FileFormat.AVRO},
new Object[] {FileFormat.ORC}
};
}

TestFlinkReaderDeletesBase(FileFormat fileFormat) {
this.format = fileFormat;
}

@BeforeClass
@BeforeAll
public static void startMetastore() {
metastore = new TestHiveMetastore();
metastore.start();
Expand All @@ -79,7 +58,7 @@ public static void startMetastore() {
HiveCatalog.class.getName(), "hive", ImmutableMap.of(), hiveConf);
}

@AfterClass
@AfterAll
public static void stopMetastore() throws Exception {
metastore.stop();
catalog = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,47 +23,35 @@
import java.util.Map;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.test.junit5.MiniClusterExtension;
import org.apache.flink.util.CloseableIterator;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.CatalogLoader;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.MiniFlinkClusterExtension;
import org.apache.iceberg.flink.RowDataWrapper;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.source.assigner.SimpleSplitAssignerFactory;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.StructLikeSet;
import org.junit.ClassRule;
import org.junit.rules.TemporaryFolder;
import org.junit.jupiter.api.extension.RegisterExtension;

public class TestIcebergSourceReaderDeletes extends TestFlinkReaderDeletesBase {

private static final int PARALLELISM = 4;

@ClassRule public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder();

@ClassRule
public static final MiniClusterWithClientResource MINI_CLUSTER =
new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setNumberTaskManagers(1)
.setNumberSlotsPerTaskManager(PARALLELISM)
.build());

public TestIcebergSourceReaderDeletes(FileFormat inputFormat) {
super(inputFormat);
}
@RegisterExtension
private static final MiniClusterExtension MINI_CLUSTER =
MiniFlinkClusterExtension.createWithClassloaderCheckDisabled();

@Override
protected StructLikeSet rowSet(String tableName, Table testTable, String... columns)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.flink.table.types.logical.RowType;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
Expand All @@ -36,11 +35,6 @@
import org.apache.iceberg.util.StructLikeSet;

public class TestFlinkInputFormatReaderDeletes extends TestFlinkReaderDeletesBase {

public TestFlinkInputFormatReaderDeletes(FileFormat inputFormat) {
super(inputFormat);
}

@Override
protected StructLikeSet rowSet(String tableName, Table testTable, String... columns)
throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.ParameterizedTestExtension;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
Expand All @@ -35,40 +35,20 @@
import org.apache.iceberg.hive.TestHiveMetastore;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.extension.ExtendWith;

@RunWith(Parameterized.class)
@ExtendWith(ParameterizedTestExtension.class)
public abstract class TestFlinkReaderDeletesBase extends DeleteReadTests {

@ClassRule public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();

protected static String databaseName = "default";

protected static HiveConf hiveConf = null;
protected static HiveCatalog catalog = null;
private static TestHiveMetastore metastore = null;

protected final FileFormat format;

@Parameterized.Parameters(name = "fileFormat={0}")
public static Object[][] parameters() {
return new Object[][] {
new Object[] {FileFormat.PARQUET},
new Object[] {FileFormat.AVRO},
new Object[] {FileFormat.ORC}
};
}

TestFlinkReaderDeletesBase(FileFormat fileFormat) {
this.format = fileFormat;
}

@BeforeClass
@BeforeAll
public static void startMetastore() {
metastore = new TestHiveMetastore();
metastore.start();
Expand All @@ -79,7 +59,7 @@ public static void startMetastore() {
HiveCatalog.class.getName(), "hive", ImmutableMap.of(), hiveConf);
}

@AfterClass
@AfterAll
public static void stopMetastore() throws Exception {
metastore.stop();
catalog = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,47 +23,35 @@
import java.util.Map;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.test.junit5.MiniClusterExtension;
import org.apache.flink.util.CloseableIterator;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.CatalogLoader;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.MiniFlinkClusterExtension;
import org.apache.iceberg.flink.RowDataWrapper;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.source.assigner.SimpleSplitAssignerFactory;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.StructLikeSet;
import org.junit.ClassRule;
import org.junit.rules.TemporaryFolder;
import org.junit.jupiter.api.extension.RegisterExtension;

public class TestIcebergSourceReaderDeletes extends TestFlinkReaderDeletesBase {

private static final int PARALLELISM = 4;

@ClassRule public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder();

@ClassRule
public static final MiniClusterWithClientResource MINI_CLUSTER =
new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setNumberTaskManagers(1)
.setNumberSlotsPerTaskManager(PARALLELISM)
.build());

public TestIcebergSourceReaderDeletes(FileFormat inputFormat) {
super(inputFormat);
}
@RegisterExtension
private static final MiniClusterExtension MINI_CLUSTER =
MiniFlinkClusterExtension.createWithClassloaderCheckDisabled();

@Override
protected StructLikeSet rowSet(String tableName, Table testTable, String... columns)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.flink.table.types.logical.RowType;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
Expand All @@ -37,10 +36,6 @@

public class TestFlinkInputFormatReaderDeletes extends TestFlinkReaderDeletesBase {

public TestFlinkInputFormatReaderDeletes(FileFormat inputFormat) {
super(inputFormat);
}

@Override
protected StructLikeSet rowSet(String tableName, Table testTable, String... columns)
throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.ParameterizedTestExtension;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
Expand All @@ -35,40 +35,20 @@
import org.apache.iceberg.hive.TestHiveMetastore;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.extension.ExtendWith;

@RunWith(Parameterized.class)
@ExtendWith(ParameterizedTestExtension.class)
public abstract class TestFlinkReaderDeletesBase extends DeleteReadTests {

@ClassRule public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();

protected static String databaseName = "default";

protected static HiveConf hiveConf = null;
protected static HiveCatalog catalog = null;
private static TestHiveMetastore metastore = null;

protected final FileFormat format;

@Parameterized.Parameters(name = "fileFormat={0}")
public static Object[][] parameters() {
return new Object[][] {
new Object[] {FileFormat.PARQUET},
new Object[] {FileFormat.AVRO},
new Object[] {FileFormat.ORC}
};
}

TestFlinkReaderDeletesBase(FileFormat fileFormat) {
this.format = fileFormat;
}

@BeforeClass
@BeforeAll
public static void startMetastore() {
metastore = new TestHiveMetastore();
metastore.start();
Expand All @@ -79,7 +59,7 @@ public static void startMetastore() {
HiveCatalog.class.getName(), "hive", ImmutableMap.of(), hiveConf);
}

@AfterClass
@AfterAll
public static void stopMetastore() throws Exception {
metastore.stop();
catalog = null;
Expand Down
Loading

0 comments on commit bc7e56c

Please sign in to comment.