From bc7e56c2e5e9c5a3b97e599ab518c77d65014db9 Mon Sep 17 00:00:00 2001 From: Chinmay Bhat <12948588+chinmay-bhat@users.noreply.github.com> Date: Mon, 15 Jan 2024 19:21:03 +0530 Subject: [PATCH] Spark, Flink: Migrate DeleteReadTests and its subclasses to JUnit5 (#9382) --- .../apache/iceberg/data/DeleteReadTests.java | 176 ++++++++++++------ .../data/TestGenericReaderDeletes.java | 11 +- .../TestFlinkInputFormatReaderDeletes.java | 6 - .../source/TestFlinkReaderDeletesBase.java | 35 +--- .../TestIcebergSourceReaderDeletes.java | 24 +-- .../TestFlinkInputFormatReaderDeletes.java | 6 - .../source/TestFlinkReaderDeletesBase.java | 34 +--- .../TestIcebergSourceReaderDeletes.java | 24 +-- .../TestFlinkInputFormatReaderDeletes.java | 5 - .../source/TestFlinkReaderDeletesBase.java | 34 +--- .../TestIcebergSourceReaderDeletes.java | 24 +-- .../org/apache/iceberg/mr/TestHelper.java | 27 ++- .../mr/TestInputFormatReaderDeletes.java | 32 ++-- .../spark/source/TestSparkReaderDeletes.java | 123 ++++++------ .../spark/source/TestSparkReaderDeletes.java | 130 +++++++------ .../spark/source/TestSparkReaderDeletes.java | 130 +++++++------ 16 files changed, 424 insertions(+), 397 deletions(-) diff --git a/data/src/test/java/org/apache/iceberg/data/DeleteReadTests.java b/data/src/test/java/org/apache/iceberg/data/DeleteReadTests.java index 8d8c5e531984..9d16da124062 100644 --- a/data/src/test/java/org/apache/iceberg/data/DeleteReadTests.java +++ b/data/src/test/java/org/apache/iceberg/data/DeleteReadTests.java @@ -18,13 +18,20 @@ */ package org.apache.iceberg.data; +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.File; import java.io.IOException; +import java.nio.file.Path; import java.time.LocalDate; import java.util.List; import java.util.Set; import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileFormat; import org.apache.iceberg.Files; +import org.apache.iceberg.Parameter; +import org.apache.iceberg.Parameters; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; @@ -38,12 +45,10 @@ import org.apache.iceberg.util.Pair; import org.apache.iceberg.util.StructLikeSet; import org.apache.iceberg.util.StructProjection; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.io.TempDir; public abstract class DeleteReadTests { // Schema passed to create tables @@ -65,7 +70,7 @@ public abstract class DeleteReadTests { public static final PartitionSpec DATE_SPEC = PartitionSpec.builderFor(DATE_SCHEMA).day("dt").build(); - @Rule public TemporaryFolder temp = new TemporaryFolder(); + @TempDir protected Path temp; protected String tableName = null; protected String dateTableName = null; @@ -75,7 +80,18 @@ public abstract class DeleteReadTests { private List dateRecords = null; protected DataFile dataFile = null; - @Before + @Parameter protected FileFormat format; + + @Parameters(name = "fileFormat = {0}") + public static Object[][] parameters() { + return new Object[][] { + new Object[] {FileFormat.PARQUET}, + new Object[] {FileFormat.AVRO}, + new Object[] {FileFormat.ORC} + }; + } + + @BeforeEach public void writeTestDataFile() throws IOException { this.tableName = "test"; this.table = createTable(tableName, SCHEMA, SPEC); @@ -92,12 +108,16 @@ public void writeTestDataFile() throws IOException { records.add(record.copy("id", 122, "data", "g")); this.dataFile = - FileHelpers.writeDataFile(table, Files.localOutput(temp.newFile()), Row.of(0), records); + FileHelpers.writeDataFile( + table, + Files.localOutput(File.createTempFile("junit", null, temp.toFile())), + Row.of(0), + records); table.newAppend().appendFile(dataFile).commit(); } - @After + @AfterEach public void cleanup() throws IOException { dropTable("test"); dropTable("test2"); @@ -121,31 +141,31 @@ private void initDateTable() throws IOException { DataFile dataFile1 = FileHelpers.writeDataFile( dateTable, - Files.localOutput(temp.newFile()), + Files.localOutput(File.createTempFile("junit", null, temp.toFile())), Row.of(DateTimeUtil.daysFromDate(LocalDate.parse("2021-09-01"))), dateRecords.subList(0, 1)); DataFile dataFile2 = FileHelpers.writeDataFile( dateTable, - Files.localOutput(temp.newFile()), + Files.localOutput(File.createTempFile("junit", null, temp.toFile())), Row.of(DateTimeUtil.daysFromDate(LocalDate.parse("2021-09-02"))), dateRecords.subList(1, 2)); DataFile dataFile3 = FileHelpers.writeDataFile( dateTable, - Files.localOutput(temp.newFile()), + Files.localOutput(File.createTempFile("junit", null, temp.toFile())), Row.of(DateTimeUtil.daysFromDate(LocalDate.parse("2021-09-03"))), dateRecords.subList(2, 3)); DataFile dataFile4 = FileHelpers.writeDataFile( dateTable, - Files.localOutput(temp.newFile()), + Files.localOutput(File.createTempFile("junit", null, temp.toFile())), Row.of(DateTimeUtil.daysFromDate(LocalDate.parse("2021-09-04"))), dateRecords.subList(3, 4)); DataFile dataFile5 = FileHelpers.writeDataFile( dateTable, - Files.localOutput(temp.newFile()), + Files.localOutput(File.createTempFile("junit", null, temp.toFile())), Row.of(DateTimeUtil.daysFromDate(LocalDate.parse("2021-09-05"))), dateRecords.subList(4, 5)); @@ -186,12 +206,13 @@ protected long deleteCount() { protected void checkDeleteCount(long expectedDeletes) { if (countDeletes()) { long actualDeletes = deleteCount(); - Assert.assertEquals( - "Table should contain expected number of deletes", expectedDeletes, actualDeletes); + assertThat(actualDeletes) + .as("Table should contain expected number of deletes") + .isEqualTo(expectedDeletes); } } - @Test + @TestTemplate public void testEqualityDeletes() throws IOException { Schema deleteRowSchema = table.schema().select("data"); Record dataDelete = GenericRecord.create(deleteRowSchema); @@ -204,18 +225,22 @@ public void testEqualityDeletes() throws IOException { DeleteFile eqDeletes = FileHelpers.writeDeleteFile( - table, Files.localOutput(temp.newFile()), Row.of(0), dataDeletes, deleteRowSchema); + table, + Files.localOutput(File.createTempFile("junit", null, temp.toFile())), + Row.of(0), + dataDeletes, + deleteRowSchema); table.newRowDelta().addDeletes(eqDeletes).commit(); StructLikeSet expected = rowSetWithoutIds(table, records, 29, 89, 122); StructLikeSet actual = rowSet(tableName, table, "*"); - Assert.assertEquals("Table should contain expected rows", expected, actual); + assertThat(actual).as("Table should contain expected rows").isEqualTo(expected); checkDeleteCount(3L); } - @Test + @TestTemplate public void testEqualityDateDeletes() throws IOException { initDateTable(); @@ -230,21 +255,21 @@ public void testEqualityDateDeletes() throws IOException { DeleteFile eqDeletes1 = FileHelpers.writeDeleteFile( dateTable, - Files.localOutput(temp.newFile()), + Files.localOutput(File.createTempFile("junit", null, temp.toFile())), Row.of(DateTimeUtil.daysFromDate(LocalDate.parse("2021-09-01"))), dataDeletes.subList(0, 1), deleteRowSchema); DeleteFile eqDeletes2 = FileHelpers.writeDeleteFile( dateTable, - Files.localOutput(temp.newFile()), + Files.localOutput(File.createTempFile("junit", null, temp.toFile())), Row.of(DateTimeUtil.daysFromDate(LocalDate.parse("2021-09-02"))), dataDeletes.subList(1, 2), deleteRowSchema); DeleteFile eqDeletes3 = FileHelpers.writeDeleteFile( dateTable, - Files.localOutput(temp.newFile()), + Files.localOutput(File.createTempFile("junit", null, temp.toFile())), Row.of(DateTimeUtil.daysFromDate(LocalDate.parse("2021-09-03"))), dataDeletes.subList(2, 3), deleteRowSchema); @@ -260,11 +285,11 @@ public void testEqualityDateDeletes() throws IOException { StructLikeSet actual = rowSet(dateTableName, dateTable, "*"); - Assert.assertEquals("Table should contain expected rows", expected, actual); + assertThat(actual).as("Table should contain expected rows").isEqualTo(expected); checkDeleteCount(3L); } - @Test + @TestTemplate public void testEqualityDeletesWithRequiredEqColumn() throws IOException { Schema deleteRowSchema = table.schema().select("data"); Record dataDelete = GenericRecord.create(deleteRowSchema); @@ -277,7 +302,11 @@ public void testEqualityDeletesWithRequiredEqColumn() throws IOException { DeleteFile eqDeletes = FileHelpers.writeDeleteFile( - table, Files.localOutput(temp.newFile()), Row.of(0), dataDeletes, deleteRowSchema); + table, + Files.localOutput(File.createTempFile("junit", null, temp.toFile())), + Row.of(0), + dataDeletes, + deleteRowSchema); table.newRowDelta().addDeletes(eqDeletes).commit(); @@ -285,25 +314,30 @@ public void testEqualityDeletesWithRequiredEqColumn() throws IOException { StructLikeSet actual = rowSet(tableName, table, "id"); if (expectPruned()) { - Assert.assertEquals("Table should contain expected rows", expected, actual); + assertThat(actual).as("Table should contain expected rows").isEqualTo(expected); } else { // data is added by the reader to apply the eq deletes, use StructProjection to remove it from // comparison - Assert.assertEquals( - "Table should contain expected rows", expected, selectColumns(actual, "id")); + assertThat(selectColumns(actual, "id")) + .as("Table should contain expected rows") + .isEqualTo(expected); } checkDeleteCount(3L); } - @Test + @TestTemplate public void testEqualityDeletesSpanningMultipleDataFiles() throws IOException { // Add another DataFile with common values GenericRecord record = GenericRecord.create(table.schema()); records.add(record.copy("id", 144, "data", "a")); this.dataFile = - FileHelpers.writeDataFile(table, Files.localOutput(temp.newFile()), Row.of(0), records); + FileHelpers.writeDataFile( + table, + Files.localOutput(File.createTempFile("junit", null, temp.toFile())), + Row.of(0), + records); // At this point, the table has two data files, with 7 and 8 rows respectively, of which all but // one are in duplicate. @@ -320,7 +354,11 @@ public void testEqualityDeletesSpanningMultipleDataFiles() throws IOException { DeleteFile eqDeletes = FileHelpers.writeDeleteFile( - table, Files.localOutput(temp.newFile()), Row.of(0), dataDeletes, deleteRowSchema); + table, + Files.localOutput(File.createTempFile("junit", null, temp.toFile())), + Row.of(0), + dataDeletes, + deleteRowSchema); // At this point, 3 rows in the first data file and 4 rows in the second data file are deleted. table.newRowDelta().addDeletes(eqDeletes).commit(); @@ -328,11 +366,11 @@ public void testEqualityDeletesSpanningMultipleDataFiles() throws IOException { StructLikeSet expected = rowSetWithoutIds(table, records, 29, 89, 122, 144); StructLikeSet actual = rowSet(tableName, table, "*"); - Assert.assertEquals("Table should contain expected rows", expected, actual); + assertThat(actual).as("Table should contain expected rows").isEqualTo(expected); checkDeleteCount(7L); } - @Test + @TestTemplate public void testPositionDeletes() throws IOException { List> deletes = Lists.newArrayList( @@ -342,7 +380,11 @@ public void testPositionDeletes() throws IOException { ); Pair posDeletes = - FileHelpers.writeDeleteFile(table, Files.localOutput(temp.newFile()), Row.of(0), deletes); + FileHelpers.writeDeleteFile( + table, + Files.localOutput(File.createTempFile("junit", null, temp.toFile())), + Row.of(0), + deletes); table .newRowDelta() @@ -353,11 +395,11 @@ public void testPositionDeletes() throws IOException { StructLikeSet expected = rowSetWithoutIds(table, records, 29, 89, 122); StructLikeSet actual = rowSet(tableName, table, "*"); - Assert.assertEquals("Table should contain expected rows", expected, actual); + assertThat(actual).as("Table should contain expected rows").isEqualTo(expected); checkDeleteCount(3L); } - @Test + @TestTemplate public void testMultiplePosDeleteFiles() throws IOException { List> deletes = Lists.newArrayList( @@ -366,7 +408,11 @@ public void testMultiplePosDeleteFiles() throws IOException { ); Pair posDeletes = - FileHelpers.writeDeleteFile(table, Files.localOutput(temp.newFile()), Row.of(0), deletes); + FileHelpers.writeDeleteFile( + table, + Files.localOutput(File.createTempFile("junit", null, temp.toFile())), + Row.of(0), + deletes); table .newRowDelta() @@ -380,7 +426,11 @@ public void testMultiplePosDeleteFiles() throws IOException { ); posDeletes = - FileHelpers.writeDeleteFile(table, Files.localOutput(temp.newFile()), Row.of(0), deletes); + FileHelpers.writeDeleteFile( + table, + Files.localOutput(File.createTempFile("junit", null, temp.toFile())), + Row.of(0), + deletes); table .newRowDelta() @@ -391,11 +441,11 @@ public void testMultiplePosDeleteFiles() throws IOException { StructLikeSet expected = rowSetWithoutIds(table, records, 29, 89, 122); StructLikeSet actual = rowSet(tableName, table, "*"); - Assert.assertEquals("Table should contain expected rows", expected, actual); + assertThat(actual).as("Table should contain expected rows").isEqualTo(expected); checkDeleteCount(3L); } - @Test + @TestTemplate public void testMixedPositionAndEqualityDeletes() throws IOException { Schema dataSchema = table.schema().select("data"); Record dataDelete = GenericRecord.create(dataSchema); @@ -408,7 +458,11 @@ public void testMixedPositionAndEqualityDeletes() throws IOException { DeleteFile eqDeletes = FileHelpers.writeDeleteFile( - table, Files.localOutput(temp.newFile()), Row.of(0), dataDeletes, dataSchema); + table, + Files.localOutput(File.createTempFile("junit", null, temp.toFile())), + Row.of(0), + dataDeletes, + dataSchema); List> deletes = Lists.newArrayList( @@ -417,7 +471,11 @@ public void testMixedPositionAndEqualityDeletes() throws IOException { ); Pair posDeletes = - FileHelpers.writeDeleteFile(table, Files.localOutput(temp.newFile()), Row.of(0), deletes); + FileHelpers.writeDeleteFile( + table, + Files.localOutput(File.createTempFile("junit", null, temp.toFile())), + Row.of(0), + deletes); table .newRowDelta() @@ -429,11 +487,11 @@ public void testMixedPositionAndEqualityDeletes() throws IOException { StructLikeSet expected = rowSetWithoutIds(table, records, 29, 89, 121, 122); StructLikeSet actual = rowSet(tableName, table, "*"); - Assert.assertEquals("Table should contain expected rows", expected, actual); + assertThat(actual).as("Table should contain expected rows").isEqualTo(expected); checkDeleteCount(4L); } - @Test + @TestTemplate public void testMultipleEqualityDeleteSchemas() throws IOException { Schema dataSchema = table.schema().select("data"); Record dataDelete = GenericRecord.create(dataSchema); @@ -446,7 +504,11 @@ public void testMultipleEqualityDeleteSchemas() throws IOException { DeleteFile dataEqDeletes = FileHelpers.writeDeleteFile( - table, Files.localOutput(temp.newFile()), Row.of(0), dataDeletes, dataSchema); + table, + Files.localOutput(File.createTempFile("junit", null, temp.toFile())), + Row.of(0), + dataDeletes, + dataSchema); Schema idSchema = table.schema().select("id"); Record idDelete = GenericRecord.create(idSchema); @@ -458,18 +520,22 @@ public void testMultipleEqualityDeleteSchemas() throws IOException { DeleteFile idEqDeletes = FileHelpers.writeDeleteFile( - table, Files.localOutput(temp.newFile()), Row.of(0), idDeletes, idSchema); + table, + Files.localOutput(File.createTempFile("junit", null, temp.toFile())), + Row.of(0), + idDeletes, + idSchema); table.newRowDelta().addDeletes(dataEqDeletes).addDeletes(idEqDeletes).commit(); StructLikeSet expected = rowSetWithoutIds(table, records, 29, 89, 121, 122); StructLikeSet actual = rowSet(tableName, table, "*"); - Assert.assertEquals("Table should contain expected rows", expected, actual); + assertThat(actual).as("Table should contain expected rows").isEqualTo(expected); checkDeleteCount(4L); } - @Test + @TestTemplate public void testEqualityDeleteByNull() throws IOException { // data is required in the test table; make it optional for this test table.updateSchema().makeColumnOptional("data").commit(); @@ -479,7 +545,7 @@ public void testEqualityDeleteByNull() throws IOException { DataFile dataFileWithNull = FileHelpers.writeDataFile( table, - Files.localOutput(temp.newFile()), + Files.localOutput(File.createTempFile("junit", null, temp.toFile())), Row.of(0), Lists.newArrayList(record.copy("id", 131, "data", null))); @@ -495,14 +561,18 @@ public void testEqualityDeleteByNull() throws IOException { DeleteFile eqDeletes = FileHelpers.writeDeleteFile( - table, Files.localOutput(temp.newFile()), Row.of(0), dataDeletes, dataSchema); + table, + Files.localOutput(File.createTempFile("junit", null, temp.toFile())), + Row.of(0), + dataDeletes, + dataSchema); table.newRowDelta().addDeletes(eqDeletes).commit(); StructLikeSet expected = rowSetWithoutIds(table, records, 131); StructLikeSet actual = rowSet(tableName, table, "*"); - Assert.assertEquals("Table should contain expected rows", expected, actual); + assertThat(actual).as("Table should contain expected rows").isEqualTo(expected); checkDeleteCount(1L); } diff --git a/data/src/test/java/org/apache/iceberg/data/TestGenericReaderDeletes.java b/data/src/test/java/org/apache/iceberg/data/TestGenericReaderDeletes.java index 7a17c142eea2..fa44be06ee8f 100644 --- a/data/src/test/java/org/apache/iceberg/data/TestGenericReaderDeletes.java +++ b/data/src/test/java/org/apache/iceberg/data/TestGenericReaderDeletes.java @@ -18,8 +18,12 @@ */ package org.apache.iceberg.data; +import static org.assertj.core.api.Assertions.assertThat; + import java.io.File; import java.io.IOException; +import java.nio.file.Files; +import org.apache.iceberg.ParameterizedTestExtension; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; @@ -27,14 +31,15 @@ import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.util.StructLikeSet; -import org.junit.Assert; +import org.junit.jupiter.api.extension.ExtendWith; +@ExtendWith(ParameterizedTestExtension.class) public class TestGenericReaderDeletes extends DeleteReadTests { @Override protected Table createTable(String name, Schema schema, PartitionSpec spec) throws IOException { - File tableDir = temp.newFolder(); - Assert.assertTrue(tableDir.delete()); + File tableDir = Files.createTempDirectory(temp, "junit").toFile(); + assertThat(tableDir.delete()).isTrue(); return TestTables.create(tableDir, name, schema, spec, 2); } diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormatReaderDeletes.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormatReaderDeletes.java index b2f914e51299..226da5813ad8 100644 --- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormatReaderDeletes.java +++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormatReaderDeletes.java @@ -23,7 +23,6 @@ import org.apache.flink.table.types.logical.RowType; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.iceberg.CatalogProperties; -import org.apache.iceberg.FileFormat; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.catalog.TableIdentifier; @@ -36,11 +35,6 @@ import org.apache.iceberg.util.StructLikeSet; public class TestFlinkInputFormatReaderDeletes extends TestFlinkReaderDeletesBase { - - public TestFlinkInputFormatReaderDeletes(FileFormat inputFormat) { - super(inputFormat); - } - @Override protected StructLikeSet rowSet(String tableName, Table testTable, String... columns) throws IOException { diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkReaderDeletesBase.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkReaderDeletesBase.java index 987d79fed3c3..2f0af1c3ba0c 100644 --- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkReaderDeletesBase.java +++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkReaderDeletesBase.java @@ -22,7 +22,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.iceberg.BaseTable; import org.apache.iceberg.CatalogUtil; -import org.apache.iceberg.FileFormat; +import org.apache.iceberg.ParameterizedTestExtension; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; @@ -35,40 +35,19 @@ import org.apache.iceberg.hive.TestHiveMetastore; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Maps; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.ClassRule; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.extension.ExtendWith; -@RunWith(Parameterized.class) +@ExtendWith(ParameterizedTestExtension.class) public abstract class TestFlinkReaderDeletesBase extends DeleteReadTests { - - @ClassRule public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder(); - protected static String databaseName = "default"; protected static HiveConf hiveConf = null; protected static HiveCatalog catalog = null; private static TestHiveMetastore metastore = null; - protected final FileFormat format; - - @Parameterized.Parameters(name = "fileFormat={0}") - public static Object[][] parameters() { - return new Object[][] { - new Object[] {FileFormat.PARQUET}, - new Object[] {FileFormat.AVRO}, - new Object[] {FileFormat.ORC} - }; - } - - TestFlinkReaderDeletesBase(FileFormat fileFormat) { - this.format = fileFormat; - } - - @BeforeClass + @BeforeAll public static void startMetastore() { metastore = new TestHiveMetastore(); metastore.start(); @@ -79,7 +58,7 @@ public static void startMetastore() { HiveCatalog.class.getName(), "hive", ImmutableMap.of(), hiveConf); } - @AfterClass + @AfterAll public static void stopMetastore() throws Exception { metastore.stop(); catalog = null; diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceReaderDeletes.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceReaderDeletes.java index 2974f4bc94a2..df148c212ebd 100644 --- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceReaderDeletes.java +++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceReaderDeletes.java @@ -23,47 +23,35 @@ import java.util.Map; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.RowType; -import org.apache.flink.test.util.MiniClusterWithClientResource; +import org.apache.flink.test.junit5.MiniClusterExtension; import org.apache.flink.util.CloseableIterator; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.iceberg.CatalogProperties; -import org.apache.iceberg.FileFormat; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.flink.CatalogLoader; import org.apache.iceberg.flink.FlinkSchemaUtil; +import org.apache.iceberg.flink.MiniFlinkClusterExtension; import org.apache.iceberg.flink.RowDataWrapper; import org.apache.iceberg.flink.TableLoader; import org.apache.iceberg.flink.source.assigner.SimpleSplitAssignerFactory; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.util.StructLikeSet; -import org.junit.ClassRule; -import org.junit.rules.TemporaryFolder; +import org.junit.jupiter.api.extension.RegisterExtension; public class TestIcebergSourceReaderDeletes extends TestFlinkReaderDeletesBase { private static final int PARALLELISM = 4; - @ClassRule public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder(); - - @ClassRule - public static final MiniClusterWithClientResource MINI_CLUSTER = - new MiniClusterWithClientResource( - new MiniClusterResourceConfiguration.Builder() - .setNumberTaskManagers(1) - .setNumberSlotsPerTaskManager(PARALLELISM) - .build()); - - public TestIcebergSourceReaderDeletes(FileFormat inputFormat) { - super(inputFormat); - } + @RegisterExtension + private static final MiniClusterExtension MINI_CLUSTER = + MiniFlinkClusterExtension.createWithClassloaderCheckDisabled(); @Override protected StructLikeSet rowSet(String tableName, Table testTable, String... columns) diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormatReaderDeletes.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormatReaderDeletes.java index b2f914e51299..226da5813ad8 100644 --- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormatReaderDeletes.java +++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormatReaderDeletes.java @@ -23,7 +23,6 @@ import org.apache.flink.table.types.logical.RowType; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.iceberg.CatalogProperties; -import org.apache.iceberg.FileFormat; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.catalog.TableIdentifier; @@ -36,11 +35,6 @@ import org.apache.iceberg.util.StructLikeSet; public class TestFlinkInputFormatReaderDeletes extends TestFlinkReaderDeletesBase { - - public TestFlinkInputFormatReaderDeletes(FileFormat inputFormat) { - super(inputFormat); - } - @Override protected StructLikeSet rowSet(String tableName, Table testTable, String... columns) throws IOException { diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkReaderDeletesBase.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkReaderDeletesBase.java index 987d79fed3c3..0b5a8011ad3f 100644 --- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkReaderDeletesBase.java +++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkReaderDeletesBase.java @@ -22,7 +22,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.iceberg.BaseTable; import org.apache.iceberg.CatalogUtil; -import org.apache.iceberg.FileFormat; +import org.apache.iceberg.ParameterizedTestExtension; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; @@ -35,40 +35,20 @@ import org.apache.iceberg.hive.TestHiveMetastore; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Maps; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.ClassRule; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.extension.ExtendWith; -@RunWith(Parameterized.class) +@ExtendWith(ParameterizedTestExtension.class) public abstract class TestFlinkReaderDeletesBase extends DeleteReadTests { - @ClassRule public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder(); - protected static String databaseName = "default"; protected static HiveConf hiveConf = null; protected static HiveCatalog catalog = null; private static TestHiveMetastore metastore = null; - protected final FileFormat format; - - @Parameterized.Parameters(name = "fileFormat={0}") - public static Object[][] parameters() { - return new Object[][] { - new Object[] {FileFormat.PARQUET}, - new Object[] {FileFormat.AVRO}, - new Object[] {FileFormat.ORC} - }; - } - - TestFlinkReaderDeletesBase(FileFormat fileFormat) { - this.format = fileFormat; - } - - @BeforeClass + @BeforeAll public static void startMetastore() { metastore = new TestHiveMetastore(); metastore.start(); @@ -79,7 +59,7 @@ public static void startMetastore() { HiveCatalog.class.getName(), "hive", ImmutableMap.of(), hiveConf); } - @AfterClass + @AfterAll public static void stopMetastore() throws Exception { metastore.stop(); catalog = null; diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceReaderDeletes.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceReaderDeletes.java index 2974f4bc94a2..df148c212ebd 100644 --- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceReaderDeletes.java +++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceReaderDeletes.java @@ -23,47 +23,35 @@ import java.util.Map; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.RowType; -import org.apache.flink.test.util.MiniClusterWithClientResource; +import org.apache.flink.test.junit5.MiniClusterExtension; import org.apache.flink.util.CloseableIterator; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.iceberg.CatalogProperties; -import org.apache.iceberg.FileFormat; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.flink.CatalogLoader; import org.apache.iceberg.flink.FlinkSchemaUtil; +import org.apache.iceberg.flink.MiniFlinkClusterExtension; import org.apache.iceberg.flink.RowDataWrapper; import org.apache.iceberg.flink.TableLoader; import org.apache.iceberg.flink.source.assigner.SimpleSplitAssignerFactory; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.util.StructLikeSet; -import org.junit.ClassRule; -import org.junit.rules.TemporaryFolder; +import org.junit.jupiter.api.extension.RegisterExtension; public class TestIcebergSourceReaderDeletes extends TestFlinkReaderDeletesBase { private static final int PARALLELISM = 4; - @ClassRule public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder(); - - @ClassRule - public static final MiniClusterWithClientResource MINI_CLUSTER = - new MiniClusterWithClientResource( - new MiniClusterResourceConfiguration.Builder() - .setNumberTaskManagers(1) - .setNumberSlotsPerTaskManager(PARALLELISM) - .build()); - - public TestIcebergSourceReaderDeletes(FileFormat inputFormat) { - super(inputFormat); - } + @RegisterExtension + private static final MiniClusterExtension MINI_CLUSTER = + MiniFlinkClusterExtension.createWithClassloaderCheckDisabled(); @Override protected StructLikeSet rowSet(String tableName, Table testTable, String... columns) diff --git a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormatReaderDeletes.java b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormatReaderDeletes.java index b2f914e51299..1b4fc863631f 100644 --- a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormatReaderDeletes.java +++ b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormatReaderDeletes.java @@ -23,7 +23,6 @@ import org.apache.flink.table.types.logical.RowType; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.iceberg.CatalogProperties; -import org.apache.iceberg.FileFormat; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.catalog.TableIdentifier; @@ -37,10 +36,6 @@ public class TestFlinkInputFormatReaderDeletes extends TestFlinkReaderDeletesBase { - public TestFlinkInputFormatReaderDeletes(FileFormat inputFormat) { - super(inputFormat); - } - @Override protected StructLikeSet rowSet(String tableName, Table testTable, String... columns) throws IOException { diff --git a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkReaderDeletesBase.java b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkReaderDeletesBase.java index 987d79fed3c3..0b5a8011ad3f 100644 --- a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkReaderDeletesBase.java +++ b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkReaderDeletesBase.java @@ -22,7 +22,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.iceberg.BaseTable; import org.apache.iceberg.CatalogUtil; -import org.apache.iceberg.FileFormat; +import org.apache.iceberg.ParameterizedTestExtension; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; @@ -35,40 +35,20 @@ import org.apache.iceberg.hive.TestHiveMetastore; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Maps; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.ClassRule; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.extension.ExtendWith; -@RunWith(Parameterized.class) +@ExtendWith(ParameterizedTestExtension.class) public abstract class TestFlinkReaderDeletesBase extends DeleteReadTests { - @ClassRule public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder(); - protected static String databaseName = "default"; protected static HiveConf hiveConf = null; protected static HiveCatalog catalog = null; private static TestHiveMetastore metastore = null; - protected final FileFormat format; - - @Parameterized.Parameters(name = "fileFormat={0}") - public static Object[][] parameters() { - return new Object[][] { - new Object[] {FileFormat.PARQUET}, - new Object[] {FileFormat.AVRO}, - new Object[] {FileFormat.ORC} - }; - } - - TestFlinkReaderDeletesBase(FileFormat fileFormat) { - this.format = fileFormat; - } - - @BeforeClass + @BeforeAll public static void startMetastore() { metastore = new TestHiveMetastore(); metastore.start(); @@ -79,7 +59,7 @@ public static void startMetastore() { HiveCatalog.class.getName(), "hive", ImmutableMap.of(), hiveConf); } - @AfterClass + @AfterAll public static void stopMetastore() throws Exception { metastore.stop(); catalog = null; diff --git a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceReaderDeletes.java b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceReaderDeletes.java index 2974f4bc94a2..df148c212ebd 100644 --- a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceReaderDeletes.java +++ b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceReaderDeletes.java @@ -23,47 +23,35 @@ import java.util.Map; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.RowType; -import org.apache.flink.test.util.MiniClusterWithClientResource; +import org.apache.flink.test.junit5.MiniClusterExtension; import org.apache.flink.util.CloseableIterator; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.iceberg.CatalogProperties; -import org.apache.iceberg.FileFormat; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.flink.CatalogLoader; import org.apache.iceberg.flink.FlinkSchemaUtil; +import org.apache.iceberg.flink.MiniFlinkClusterExtension; import org.apache.iceberg.flink.RowDataWrapper; import org.apache.iceberg.flink.TableLoader; import org.apache.iceberg.flink.source.assigner.SimpleSplitAssignerFactory; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.util.StructLikeSet; -import org.junit.ClassRule; -import org.junit.rules.TemporaryFolder; +import org.junit.jupiter.api.extension.RegisterExtension; public class TestIcebergSourceReaderDeletes extends TestFlinkReaderDeletesBase { private static final int PARALLELISM = 4; - @ClassRule public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder(); - - @ClassRule - public static final MiniClusterWithClientResource MINI_CLUSTER = - new MiniClusterWithClientResource( - new MiniClusterResourceConfiguration.Builder() - .setNumberTaskManagers(1) - .setNumberSlotsPerTaskManager(PARALLELISM) - .build()); - - public TestIcebergSourceReaderDeletes(FileFormat inputFormat) { - super(inputFormat); - } + @RegisterExtension + private static final MiniClusterExtension MINI_CLUSTER = + MiniFlinkClusterExtension.createWithClassloaderCheckDisabled(); @Override protected StructLikeSet rowSet(String tableName, Table testTable, String... columns) diff --git a/mr/src/test/java/org/apache/iceberg/mr/TestHelper.java b/mr/src/test/java/org/apache/iceberg/mr/TestHelper.java index 8d877d4a3173..72475f70d718 100644 --- a/mr/src/test/java/org/apache/iceberg/mr/TestHelper.java +++ b/mr/src/test/java/org/apache/iceberg/mr/TestHelper.java @@ -19,6 +19,7 @@ package org.apache.iceberg.mr; import java.io.IOException; +import java.nio.file.Path; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -49,9 +50,11 @@ public class TestHelper { private final PartitionSpec spec; private final FileFormat fileFormat; private final TemporaryFolder tmp; + private final Path temp; private Table table; + @Deprecated public TestHelper( Configuration conf, Tables tables, @@ -66,9 +69,28 @@ public TestHelper( this.schema = schema; this.spec = spec; this.fileFormat = fileFormat; + this.temp = null; this.tmp = tmp; } + public TestHelper( + Configuration conf, + Tables tables, + String tableIdentifier, + Schema schema, + PartitionSpec spec, + FileFormat fileFormat, + Path temp) { + this.conf = conf; + this.tables = tables; + this.tableIdentifier = tableIdentifier; + this.schema = schema; + this.spec = spec; + this.fileFormat = fileFormat; + this.temp = temp; + this.tmp = null; + } + public void setTable(Table table) { this.table = table; conf.set(InputFormatConfig.TABLE_SCHEMA, SchemaParser.toJson(table.schema())); @@ -122,7 +144,10 @@ public DataFile writeFile(StructLike partition, List records) throws IOE } private GenericAppenderHelper appender() { - return new GenericAppenderHelper(table, fileFormat, tmp, conf); + if (null != tmp) { + return new GenericAppenderHelper(table, fileFormat, tmp, conf); + } + return new GenericAppenderHelper(table, fileFormat, temp, conf); } public static class RecordsBuilder { diff --git a/mr/src/test/java/org/apache/iceberg/mr/TestInputFormatReaderDeletes.java b/mr/src/test/java/org/apache/iceberg/mr/TestInputFormatReaderDeletes.java index 73b01da519a3..a5f108969249 100644 --- a/mr/src/test/java/org/apache/iceberg/mr/TestInputFormatReaderDeletes.java +++ b/mr/src/test/java/org/apache/iceberg/mr/TestInputFormatReaderDeletes.java @@ -18,6 +18,8 @@ */ package org.apache.iceberg.mr; +import static org.assertj.core.api.Assertions.assertThat; + import java.io.File; import java.io.IOException; import java.util.List; @@ -26,6 +28,9 @@ import org.apache.iceberg.BaseTable; import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Parameter; +import org.apache.iceberg.ParameterizedTestExtension; +import org.apache.iceberg.Parameters; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; @@ -35,22 +40,22 @@ import org.apache.iceberg.data.InternalRecordWrapper; import org.apache.iceberg.hadoop.HadoopTables; import org.apache.iceberg.util.StructLikeSet; -import org.junit.Assert; -import org.junit.Before; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.extension.ExtendWith; -@RunWith(Parameterized.class) +@ExtendWith(ParameterizedTestExtension.class) public class TestInputFormatReaderDeletes extends DeleteReadTests { private final Configuration conf = new Configuration(); private final HadoopTables tables = new HadoopTables(conf); private TestHelper helper; // parametrized variables - private final String inputFormat; - private final FileFormat fileFormat; + @Parameter private String inputFormat; + + @Parameter(index = 1) + private FileFormat fileFormat; - @Parameterized.Parameters(name = "inputFormat = {0}, fileFormat={1}") + @Parameters(name = "inputFormat = {0}, fileFormat = {1}") public static Object[][] parameters() { return new Object[][] { {"IcebergInputFormat", FileFormat.PARQUET}, @@ -62,24 +67,19 @@ public static Object[][] parameters() { }; } - @Before + @BeforeEach @Override public void writeTestDataFile() throws IOException { conf.set(CatalogUtil.ICEBERG_CATALOG_TYPE, Catalogs.LOCATION); super.writeTestDataFile(); } - public TestInputFormatReaderDeletes(String inputFormat, FileFormat fileFormat) { - this.inputFormat = inputFormat; - this.fileFormat = fileFormat; - } - @Override protected Table createTable(String name, Schema schema, PartitionSpec spec) throws IOException { Table table; - File location = temp.newFolder(inputFormat, fileFormat.name()); - Assert.assertTrue(location.delete()); + File location = temp.resolve(inputFormat).resolve(fileFormat.name()).toFile(); + assertThat(location.mkdirs()).isTrue(); helper = new TestHelper(conf, tables, location.toString(), schema, spec, fileFormat, temp); table = helper.createTable(); diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java index cadcbad6aa76..f8f8c09a9b9b 100644 --- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java @@ -21,6 +21,8 @@ import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS; import static org.apache.iceberg.spark.source.SparkSQLExecutionHelper.lastExecutedMetricValue; import static org.apache.iceberg.types.Types.NestedField.required; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assumptions.assumeThat; import java.io.File; import java.io.IOException; @@ -37,6 +39,9 @@ import org.apache.iceberg.DeleteFile; import org.apache.iceberg.Files; import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.Parameter; +import org.apache.iceberg.ParameterizedTestExtension; +import org.apache.iceberg.Parameters; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; @@ -80,30 +85,25 @@ import org.apache.spark.sql.internal.SQLConf; import org.apache.spark.sql.types.StructType; import org.jetbrains.annotations.NotNull; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.Assume; -import org.junit.BeforeClass; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -@RunWith(Parameterized.class) +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; + +@ExtendWith(ParameterizedTestExtension.class) public class TestSparkReaderDeletes extends DeleteReadTests { private static TestHiveMetastore metastore = null; protected static SparkSession spark = null; protected static HiveCatalog catalog = null; - private final String format; - private final boolean vectorized; - public TestSparkReaderDeletes(String format, boolean vectorized) { - this.format = format; - this.vectorized = vectorized; - } + @Parameter private String format; + + @Parameter(index = 1) + private boolean vectorized; - @Parameterized.Parameters(name = "format = {0}, vectorized = {1}") + @Parameters(name = "format = {0}, vectorized = {1}") public static Object[][] parameters() { return new Object[][] { new Object[] {"parquet", false}, @@ -113,7 +113,7 @@ public static Object[][] parameters() { }; } - @BeforeClass + @BeforeAll public static void startMetastoreAndSpark() { metastore = new TestHiveMetastore(); metastore.start(); @@ -140,7 +140,7 @@ public static void startMetastoreAndSpark() { } } - @AfterClass + @AfterAll public static void stopMetastoreAndSpark() throws Exception { catalog = null; metastore.stop(); @@ -149,7 +149,7 @@ public static void stopMetastoreAndSpark() throws Exception { spark = null; } - @After + @AfterEach @Override public void cleanup() throws IOException { super.cleanup(); @@ -219,7 +219,7 @@ public StructLikeSet rowSet(String name, Types.StructType projection, String... return set; } - @Test + @TestTemplate public void testEqualityDeleteWithFilter() throws IOException { String tableName = table.name().substring(table.name().lastIndexOf(".") + 1); Schema deleteRowSchema = table.schema().select("data"); @@ -234,7 +234,7 @@ public void testEqualityDeleteWithFilter() throws IOException { DeleteFile eqDeletes = FileHelpers.writeDeleteFile( table, - Files.localOutput(temp.newFile()), + Files.localOutput(File.createTempFile("junit", null, temp.toFile())), TestHelpers.Row.of(0), dataDeletes, deleteRowSchema); @@ -258,10 +258,10 @@ public void testEqualityDeleteWithFilter() throws IOException { actual.add(rowWrapper.wrap(row)); }); - Assert.assertEquals("Table should contain no rows", 0, actual.size()); + assertThat(actual).as("Table should contain no rows").hasSize(0); } - @Test + @TestTemplate public void testReadEqualityDeleteRows() throws IOException { Schema deleteSchema1 = table.schema().select("data"); Record dataDelete = GenericRecord.create(deleteSchema1); @@ -282,7 +282,7 @@ public void testReadEqualityDeleteRows() throws IOException { DeleteFile eqDelete1 = FileHelpers.writeDeleteFile( table, - Files.localOutput(temp.newFile()), + Files.localOutput(File.createTempFile("junit", null, temp.toFile())), TestHelpers.Row.of(0), dataDeletes, deleteSchema1); @@ -290,7 +290,7 @@ public void testReadEqualityDeleteRows() throws IOException { DeleteFile eqDelete2 = FileHelpers.writeDeleteFile( table, - Files.localOutput(temp.newFile()), + Files.localOutput(File.createTempFile("junit", null, temp.toFile())), TestHelpers.Row.of(0), idDeletes, deleteSchema2); @@ -320,11 +320,11 @@ public void testReadEqualityDeleteRows() throws IOException { } } - Assert.assertEquals("should include 4 deleted row", 4, actualRowSet.size()); - Assert.assertEquals("deleted row should be matched", expectedRowSet, actualRowSet); + assertThat(actualRowSet).as("should include 4 deleted row").hasSize(4); + assertThat(actualRowSet).as("deleted row should be matched").isEqualTo(expectedRowSet); } - @Test + @TestTemplate public void testPosDeletesAllRowsInBatch() throws IOException { // read.parquet.vectorization.batch-size is set to 4, so the 4 rows in the first batch are all // deleted. @@ -338,7 +338,10 @@ public void testPosDeletesAllRowsInBatch() throws IOException { Pair posDeletes = FileHelpers.writeDeleteFile( - table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0), deletes); + table, + Files.localOutput(File.createTempFile("junit", null, temp.toFile())), + TestHelpers.Row.of(0), + deletes); table .newRowDelta() @@ -349,11 +352,11 @@ public void testPosDeletesAllRowsInBatch() throws IOException { StructLikeSet expected = rowSetWithoutIds(table, records, 29, 43, 61, 89); StructLikeSet actual = rowSet(tableName, table, "*"); - Assert.assertEquals("Table should contain expected rows", expected, actual); + assertThat(actual).as("Table should contain expected rows").isEqualTo(expected); checkDeleteCount(4L); } - @Test + @TestTemplate public void testPosDeletesWithDeletedColumn() throws IOException { // read.parquet.vectorization.batch-size is set to 4, so the 4 rows in the first batch are all // deleted. @@ -367,7 +370,10 @@ public void testPosDeletesWithDeletedColumn() throws IOException { Pair posDeletes = FileHelpers.writeDeleteFile( - table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0), deletes); + table, + Files.localOutput(File.createTempFile("junit", null, temp.toFile())), + TestHelpers.Row.of(0), + deletes); table .newRowDelta() @@ -379,11 +385,11 @@ public void testPosDeletesWithDeletedColumn() throws IOException { StructLikeSet actual = rowSet(tableName, PROJECTION_SCHEMA.asStruct(), "id", "data", "_deleted"); - Assert.assertEquals("Table should contain expected row", expected, actual); + assertThat(actual).as("Table should contain expected row").isEqualTo(expected); checkDeleteCount(4L); } - @Test + @TestTemplate public void testEqualityDeleteWithDeletedColumn() throws IOException { String tableName = table.name().substring(table.name().lastIndexOf(".") + 1); Schema deleteRowSchema = table.schema().select("data"); @@ -398,7 +404,7 @@ public void testEqualityDeleteWithDeletedColumn() throws IOException { DeleteFile eqDeletes = FileHelpers.writeDeleteFile( table, - Files.localOutput(temp.newFile()), + Files.localOutput(File.createTempFile("junit", null, temp.toFile())), TestHelpers.Row.of(0), dataDeletes, deleteRowSchema); @@ -409,11 +415,11 @@ public void testEqualityDeleteWithDeletedColumn() throws IOException { StructLikeSet actual = rowSet(tableName, PROJECTION_SCHEMA.asStruct(), "id", "data", "_deleted"); - Assert.assertEquals("Table should contain expected row", expected, actual); + assertThat(actual).as("Table should contain expected row").isEqualTo(expected); checkDeleteCount(3L); } - @Test + @TestTemplate public void testMixedPosAndEqDeletesWithDeletedColumn() throws IOException { Schema dataSchema = table.schema().select("data"); Record dataDelete = GenericRecord.create(dataSchema); @@ -427,7 +433,7 @@ public void testMixedPosAndEqDeletesWithDeletedColumn() throws IOException { DeleteFile eqDeletes = FileHelpers.writeDeleteFile( table, - Files.localOutput(temp.newFile()), + Files.localOutput(File.createTempFile("junit", null, temp.toFile())), TestHelpers.Row.of(0), dataDeletes, dataSchema); @@ -440,7 +446,10 @@ public void testMixedPosAndEqDeletesWithDeletedColumn() throws IOException { Pair posDeletes = FileHelpers.writeDeleteFile( - table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0), deletes); + table, + Files.localOutput(File.createTempFile("junit", null, temp.toFile())), + TestHelpers.Row.of(0), + deletes); table .newRowDelta() @@ -453,11 +462,11 @@ public void testMixedPosAndEqDeletesWithDeletedColumn() throws IOException { StructLikeSet actual = rowSet(tableName, PROJECTION_SCHEMA.asStruct(), "id", "data", "_deleted"); - Assert.assertEquals("Table should contain expected row", expected, actual); + assertThat(actual).as("Table should contain expected row").isEqualTo(expected); checkDeleteCount(4L); } - @Test + @TestTemplate public void testFilterOnDeletedMetadataColumn() throws IOException { List> deletes = Lists.newArrayList( @@ -469,7 +478,10 @@ public void testFilterOnDeletedMetadataColumn() throws IOException { Pair posDeletes = FileHelpers.writeDeleteFile( - table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0), deletes); + table, + Files.localOutput(File.createTempFile("junit", null, temp.toFile())), + TestHelpers.Row.of(0), + deletes); table .newRowDelta() @@ -497,7 +509,7 @@ public void testFilterOnDeletedMetadataColumn() throws IOException { actual.add(rowWrapper.wrap(row)); }); - Assert.assertEquals("Table should contain expected row", expected, actual); + assertThat(actual).as("Table should contain expected row").isEqualTo(expected); StructLikeSet expectedDeleted = expectedRowSetWithDeletesOnly(29, 43, 61, 89); @@ -518,21 +530,21 @@ public void testFilterOnDeletedMetadataColumn() throws IOException { actualDeleted.add(rowWrapper.wrap(row)); }); - Assert.assertEquals("Table should contain expected row", expectedDeleted, actualDeleted); + assertThat(actualDeleted).as("Table should contain expected row").isEqualTo(expectedDeleted); } - @Test + @TestTemplate public void testIsDeletedColumnWithoutDeleteFile() { StructLikeSet expected = expectedRowSet(); StructLikeSet actual = rowSet(tableName, PROJECTION_SCHEMA.asStruct(), "id", "data", "_deleted"); - Assert.assertEquals("Table should contain expected row", expected, actual); + assertThat(actual).as("Table should contain expected row").isEqualTo(expected); checkDeleteCount(0L); } - @Test + @TestTemplate public void testPosDeletesOnParquetFileWithMultipleRowGroups() throws IOException { - Assume.assumeTrue(format.equals("parquet")); + assumeThat(format).isEqualTo("parquet"); String tblName = "test3"; Table tbl = createTable(tblName, SCHEMA, PartitionSpec.unpartitioned()); @@ -540,8 +552,8 @@ public void testPosDeletesOnParquetFileWithMultipleRowGroups() throws IOExceptio List fileSplits = Lists.newArrayList(); StructType sparkSchema = SparkSchemaUtil.convert(SCHEMA); Configuration conf = new Configuration(); - File testFile = temp.newFile(); - Assert.assertTrue("Delete should succeed", testFile.delete()); + File testFile = File.createTempFile("junit", null, temp.toFile()); + assertThat(testFile.delete()).as("Delete should succeed").isTrue(); Path testFilePath = new Path(testFile.getAbsolutePath()); // Write a Parquet file with more than one row group @@ -549,8 +561,8 @@ public void testPosDeletesOnParquetFileWithMultipleRowGroups() throws IOExceptio new ParquetFileWriter(conf, ParquetSchemaUtil.convert(SCHEMA, "test3Schema"), testFilePath); parquetFileWriter.start(); for (int i = 0; i < 2; i += 1) { - File split = temp.newFile(); - Assert.assertTrue("Delete should succeed", split.delete()); + File split = File.createTempFile("junit", null, temp.toFile()); + assertThat(split.delete()).as("Delete should succeed").isTrue(); Path splitPath = new Path(split.getAbsolutePath()); fileSplits.add(splitPath); try (FileAppender writer = @@ -590,13 +602,14 @@ public void testPosDeletesOnParquetFileWithMultipleRowGroups() throws IOExceptio Pair.of(dataFile.path(), 107L), Pair.of(dataFile.path(), 109L)); Pair posDeletes = - FileHelpers.writeDeleteFile(table, Files.localOutput(temp.newFile()), deletes); + FileHelpers.writeDeleteFile( + table, Files.localOutput(File.createTempFile("junit", null, temp.toFile())), deletes); tbl.newRowDelta() .addDeletes(posDeletes.first()) .validateDataFilesExist(posDeletes.second()) .commit(); - Assert.assertEquals(193, rowSet(tblName, tbl, "*").size()); + assertThat(rowSet(tblName, tbl, "*")).hasSize(193); } private static final Schema PROJECTION_SCHEMA = diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java index d5ea85b335ab..07f16e9de927 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java @@ -21,6 +21,8 @@ import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS; import static org.apache.iceberg.spark.source.SparkSQLExecutionHelper.lastExecutedMetricValue; import static org.apache.iceberg.types.Types.NestedField.required; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assumptions.assumeThat; import java.io.File; import java.io.IOException; @@ -37,6 +39,9 @@ import org.apache.iceberg.DeleteFile; import org.apache.iceberg.Files; import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.Parameter; +import org.apache.iceberg.ParameterizedTestExtension; +import org.apache.iceberg.Parameters; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.PlanningMode; import org.apache.iceberg.Schema; @@ -81,32 +86,28 @@ import org.apache.spark.sql.internal.SQLConf; import org.apache.spark.sql.types.StructType; import org.jetbrains.annotations.NotNull; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.Assume; -import org.junit.BeforeClass; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -@RunWith(Parameterized.class) +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; + +@ExtendWith(ParameterizedTestExtension.class) public class TestSparkReaderDeletes extends DeleteReadTests { private static TestHiveMetastore metastore = null; protected static SparkSession spark = null; protected static HiveCatalog catalog = null; - private final String format; - private final boolean vectorized; - private final PlanningMode planningMode; - - public TestSparkReaderDeletes(String format, boolean vectorized, PlanningMode planningMode) { - this.format = format; - this.vectorized = vectorized; - this.planningMode = planningMode; - } - @Parameterized.Parameters(name = "format = {0}, vectorized = {1}, planningMode = {2}") + @Parameter private String format; + + @Parameter(index = 1) + private boolean vectorized; + + @Parameter(index = 2) + private PlanningMode planningMode; + + @Parameters(name = "format = {0}, vectorized = {1}, planningMode = {2}") public static Object[][] parameters() { return new Object[][] { new Object[] {"parquet", false, PlanningMode.DISTRIBUTED}, @@ -116,7 +117,7 @@ public static Object[][] parameters() { }; } - @BeforeClass + @BeforeAll public static void startMetastoreAndSpark() { metastore = new TestHiveMetastore(); metastore.start(); @@ -143,7 +144,7 @@ public static void startMetastoreAndSpark() { } } - @AfterClass + @AfterAll public static void stopMetastoreAndSpark() throws Exception { catalog = null; metastore.stop(); @@ -152,7 +153,7 @@ public static void stopMetastoreAndSpark() throws Exception { spark = null; } - @After + @AfterEach @Override public void cleanup() throws IOException { super.cleanup(); @@ -227,7 +228,7 @@ public StructLikeSet rowSet(String name, Types.StructType projection, String... return set; } - @Test + @TestTemplate public void testEqualityDeleteWithFilter() throws IOException { String tableName = table.name().substring(table.name().lastIndexOf(".") + 1); Schema deleteRowSchema = table.schema().select("data"); @@ -242,7 +243,7 @@ public void testEqualityDeleteWithFilter() throws IOException { DeleteFile eqDeletes = FileHelpers.writeDeleteFile( table, - Files.localOutput(temp.newFile()), + Files.localOutput(File.createTempFile("junit", null, temp.toFile())), TestHelpers.Row.of(0), dataDeletes, deleteRowSchema); @@ -266,10 +267,10 @@ public void testEqualityDeleteWithFilter() throws IOException { actual.add(rowWrapper.wrap(row)); }); - Assert.assertEquals("Table should contain no rows", 0, actual.size()); + assertThat(actual).as("Table should contain no rows").hasSize(0); } - @Test + @TestTemplate public void testReadEqualityDeleteRows() throws IOException { Schema deleteSchema1 = table.schema().select("data"); Record dataDelete = GenericRecord.create(deleteSchema1); @@ -290,7 +291,7 @@ public void testReadEqualityDeleteRows() throws IOException { DeleteFile eqDelete1 = FileHelpers.writeDeleteFile( table, - Files.localOutput(temp.newFile()), + Files.localOutput(File.createTempFile("junit", null, temp.toFile())), TestHelpers.Row.of(0), dataDeletes, deleteSchema1); @@ -298,7 +299,7 @@ public void testReadEqualityDeleteRows() throws IOException { DeleteFile eqDelete2 = FileHelpers.writeDeleteFile( table, - Files.localOutput(temp.newFile()), + Files.localOutput(File.createTempFile("junit", null, temp.toFile())), TestHelpers.Row.of(0), idDeletes, deleteSchema2); @@ -328,11 +329,11 @@ public void testReadEqualityDeleteRows() throws IOException { } } - Assert.assertEquals("should include 4 deleted row", 4, actualRowSet.size()); - Assert.assertEquals("deleted row should be matched", expectedRowSet, actualRowSet); + assertThat(actualRowSet).as("should include 4 deleted row").hasSize(4); + assertThat(actualRowSet).as("deleted row should be matched").isEqualTo(expectedRowSet); } - @Test + @TestTemplate public void testPosDeletesAllRowsInBatch() throws IOException { // read.parquet.vectorization.batch-size is set to 4, so the 4 rows in the first batch are all // deleted. @@ -346,7 +347,10 @@ public void testPosDeletesAllRowsInBatch() throws IOException { Pair posDeletes = FileHelpers.writeDeleteFile( - table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0), deletes); + table, + Files.localOutput(File.createTempFile("junit", null, temp.toFile())), + TestHelpers.Row.of(0), + deletes); table .newRowDelta() @@ -357,11 +361,11 @@ public void testPosDeletesAllRowsInBatch() throws IOException { StructLikeSet expected = rowSetWithoutIds(table, records, 29, 43, 61, 89); StructLikeSet actual = rowSet(tableName, table, "*"); - Assert.assertEquals("Table should contain expected rows", expected, actual); + assertThat(actual).as("Table should contain expected rows").isEqualTo(expected); checkDeleteCount(4L); } - @Test + @TestTemplate public void testPosDeletesWithDeletedColumn() throws IOException { // read.parquet.vectorization.batch-size is set to 4, so the 4 rows in the first batch are all // deleted. @@ -375,7 +379,10 @@ public void testPosDeletesWithDeletedColumn() throws IOException { Pair posDeletes = FileHelpers.writeDeleteFile( - table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0), deletes); + table, + Files.localOutput(File.createTempFile("junit", null, temp.toFile())), + TestHelpers.Row.of(0), + deletes); table .newRowDelta() @@ -387,11 +394,11 @@ public void testPosDeletesWithDeletedColumn() throws IOException { StructLikeSet actual = rowSet(tableName, PROJECTION_SCHEMA.asStruct(), "id", "data", "_deleted"); - Assert.assertEquals("Table should contain expected row", expected, actual); + assertThat(actual).as("Table should contain expected row").isEqualTo(expected); checkDeleteCount(4L); } - @Test + @TestTemplate public void testEqualityDeleteWithDeletedColumn() throws IOException { String tableName = table.name().substring(table.name().lastIndexOf(".") + 1); Schema deleteRowSchema = table.schema().select("data"); @@ -406,7 +413,7 @@ public void testEqualityDeleteWithDeletedColumn() throws IOException { DeleteFile eqDeletes = FileHelpers.writeDeleteFile( table, - Files.localOutput(temp.newFile()), + Files.localOutput(File.createTempFile("junit", null, temp.toFile())), TestHelpers.Row.of(0), dataDeletes, deleteRowSchema); @@ -417,11 +424,11 @@ public void testEqualityDeleteWithDeletedColumn() throws IOException { StructLikeSet actual = rowSet(tableName, PROJECTION_SCHEMA.asStruct(), "id", "data", "_deleted"); - Assert.assertEquals("Table should contain expected row", expected, actual); + assertThat(actual).as("Table should contain expected row").isEqualTo(expected); checkDeleteCount(3L); } - @Test + @TestTemplate public void testMixedPosAndEqDeletesWithDeletedColumn() throws IOException { Schema dataSchema = table.schema().select("data"); Record dataDelete = GenericRecord.create(dataSchema); @@ -435,7 +442,7 @@ public void testMixedPosAndEqDeletesWithDeletedColumn() throws IOException { DeleteFile eqDeletes = FileHelpers.writeDeleteFile( table, - Files.localOutput(temp.newFile()), + Files.localOutput(File.createTempFile("junit", null, temp.toFile())), TestHelpers.Row.of(0), dataDeletes, dataSchema); @@ -448,7 +455,10 @@ public void testMixedPosAndEqDeletesWithDeletedColumn() throws IOException { Pair posDeletes = FileHelpers.writeDeleteFile( - table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0), deletes); + table, + Files.localOutput(File.createTempFile("junit", null, temp.toFile())), + TestHelpers.Row.of(0), + deletes); table .newRowDelta() @@ -461,11 +471,11 @@ public void testMixedPosAndEqDeletesWithDeletedColumn() throws IOException { StructLikeSet actual = rowSet(tableName, PROJECTION_SCHEMA.asStruct(), "id", "data", "_deleted"); - Assert.assertEquals("Table should contain expected row", expected, actual); + assertThat(actual).as("Table should contain expected row").isEqualTo(expected); checkDeleteCount(4L); } - @Test + @TestTemplate public void testFilterOnDeletedMetadataColumn() throws IOException { List> deletes = Lists.newArrayList( @@ -477,7 +487,10 @@ public void testFilterOnDeletedMetadataColumn() throws IOException { Pair posDeletes = FileHelpers.writeDeleteFile( - table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0), deletes); + table, + Files.localOutput(File.createTempFile("junit", null, temp.toFile())), + TestHelpers.Row.of(0), + deletes); table .newRowDelta() @@ -505,7 +518,7 @@ public void testFilterOnDeletedMetadataColumn() throws IOException { actual.add(rowWrapper.wrap(row)); }); - Assert.assertEquals("Table should contain expected row", expected, actual); + assertThat(actual).as("Table should contain expected row").isEqualTo(expected); StructLikeSet expectedDeleted = expectedRowSetWithDeletesOnly(29, 43, 61, 89); @@ -526,21 +539,21 @@ public void testFilterOnDeletedMetadataColumn() throws IOException { actualDeleted.add(rowWrapper.wrap(row)); }); - Assert.assertEquals("Table should contain expected row", expectedDeleted, actualDeleted); + assertThat(actualDeleted).as("Table should contain expected row").isEqualTo(expectedDeleted); } - @Test + @TestTemplate public void testIsDeletedColumnWithoutDeleteFile() { StructLikeSet expected = expectedRowSet(); StructLikeSet actual = rowSet(tableName, PROJECTION_SCHEMA.asStruct(), "id", "data", "_deleted"); - Assert.assertEquals("Table should contain expected row", expected, actual); + assertThat(actual).as("Table should contain expected row").isEqualTo(expected); checkDeleteCount(0L); } - @Test + @TestTemplate public void testPosDeletesOnParquetFileWithMultipleRowGroups() throws IOException { - Assume.assumeTrue(format.equals("parquet")); + assumeThat(format).isEqualTo("parquet"); String tblName = "test3"; Table tbl = createTable(tblName, SCHEMA, PartitionSpec.unpartitioned()); @@ -548,8 +561,8 @@ public void testPosDeletesOnParquetFileWithMultipleRowGroups() throws IOExceptio List fileSplits = Lists.newArrayList(); StructType sparkSchema = SparkSchemaUtil.convert(SCHEMA); Configuration conf = new Configuration(); - File testFile = temp.newFile(); - Assert.assertTrue("Delete should succeed", testFile.delete()); + File testFile = File.createTempFile("junit", null, temp.toFile()); + assertThat(testFile.delete()).as("Delete should succeed").isTrue(); Path testFilePath = new Path(testFile.getAbsolutePath()); // Write a Parquet file with more than one row group @@ -557,8 +570,8 @@ public void testPosDeletesOnParquetFileWithMultipleRowGroups() throws IOExceptio new ParquetFileWriter(conf, ParquetSchemaUtil.convert(SCHEMA, "test3Schema"), testFilePath); parquetFileWriter.start(); for (int i = 0; i < 2; i += 1) { - File split = temp.newFile(); - Assert.assertTrue("Delete should succeed", split.delete()); + File split = File.createTempFile("junit", null, temp.toFile()); + assertThat(split.delete()).as("Delete should succeed").isTrue(); Path splitPath = new Path(split.getAbsolutePath()); fileSplits.add(splitPath); try (FileAppender writer = @@ -598,13 +611,14 @@ public void testPosDeletesOnParquetFileWithMultipleRowGroups() throws IOExceptio Pair.of(dataFile.path(), 107L), Pair.of(dataFile.path(), 109L)); Pair posDeletes = - FileHelpers.writeDeleteFile(table, Files.localOutput(temp.newFile()), deletes); + FileHelpers.writeDeleteFile( + table, Files.localOutput(File.createTempFile("junit", null, temp.toFile())), deletes); tbl.newRowDelta() .addDeletes(posDeletes.first()) .validateDataFilesExist(posDeletes.second()) .commit(); - Assert.assertEquals(193, rowSet(tblName, tbl, "*").size()); + assertThat(rowSet(tblName, tbl, "*")).hasSize(193); } private static final Schema PROJECTION_SCHEMA = diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java index d5ea85b335ab..07f16e9de927 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java @@ -21,6 +21,8 @@ import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS; import static org.apache.iceberg.spark.source.SparkSQLExecutionHelper.lastExecutedMetricValue; import static org.apache.iceberg.types.Types.NestedField.required; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assumptions.assumeThat; import java.io.File; import java.io.IOException; @@ -37,6 +39,9 @@ import org.apache.iceberg.DeleteFile; import org.apache.iceberg.Files; import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.Parameter; +import org.apache.iceberg.ParameterizedTestExtension; +import org.apache.iceberg.Parameters; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.PlanningMode; import org.apache.iceberg.Schema; @@ -81,32 +86,28 @@ import org.apache.spark.sql.internal.SQLConf; import org.apache.spark.sql.types.StructType; import org.jetbrains.annotations.NotNull; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.Assume; -import org.junit.BeforeClass; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -@RunWith(Parameterized.class) +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; + +@ExtendWith(ParameterizedTestExtension.class) public class TestSparkReaderDeletes extends DeleteReadTests { private static TestHiveMetastore metastore = null; protected static SparkSession spark = null; protected static HiveCatalog catalog = null; - private final String format; - private final boolean vectorized; - private final PlanningMode planningMode; - - public TestSparkReaderDeletes(String format, boolean vectorized, PlanningMode planningMode) { - this.format = format; - this.vectorized = vectorized; - this.planningMode = planningMode; - } - @Parameterized.Parameters(name = "format = {0}, vectorized = {1}, planningMode = {2}") + @Parameter private String format; + + @Parameter(index = 1) + private boolean vectorized; + + @Parameter(index = 2) + private PlanningMode planningMode; + + @Parameters(name = "format = {0}, vectorized = {1}, planningMode = {2}") public static Object[][] parameters() { return new Object[][] { new Object[] {"parquet", false, PlanningMode.DISTRIBUTED}, @@ -116,7 +117,7 @@ public static Object[][] parameters() { }; } - @BeforeClass + @BeforeAll public static void startMetastoreAndSpark() { metastore = new TestHiveMetastore(); metastore.start(); @@ -143,7 +144,7 @@ public static void startMetastoreAndSpark() { } } - @AfterClass + @AfterAll public static void stopMetastoreAndSpark() throws Exception { catalog = null; metastore.stop(); @@ -152,7 +153,7 @@ public static void stopMetastoreAndSpark() throws Exception { spark = null; } - @After + @AfterEach @Override public void cleanup() throws IOException { super.cleanup(); @@ -227,7 +228,7 @@ public StructLikeSet rowSet(String name, Types.StructType projection, String... return set; } - @Test + @TestTemplate public void testEqualityDeleteWithFilter() throws IOException { String tableName = table.name().substring(table.name().lastIndexOf(".") + 1); Schema deleteRowSchema = table.schema().select("data"); @@ -242,7 +243,7 @@ public void testEqualityDeleteWithFilter() throws IOException { DeleteFile eqDeletes = FileHelpers.writeDeleteFile( table, - Files.localOutput(temp.newFile()), + Files.localOutput(File.createTempFile("junit", null, temp.toFile())), TestHelpers.Row.of(0), dataDeletes, deleteRowSchema); @@ -266,10 +267,10 @@ public void testEqualityDeleteWithFilter() throws IOException { actual.add(rowWrapper.wrap(row)); }); - Assert.assertEquals("Table should contain no rows", 0, actual.size()); + assertThat(actual).as("Table should contain no rows").hasSize(0); } - @Test + @TestTemplate public void testReadEqualityDeleteRows() throws IOException { Schema deleteSchema1 = table.schema().select("data"); Record dataDelete = GenericRecord.create(deleteSchema1); @@ -290,7 +291,7 @@ public void testReadEqualityDeleteRows() throws IOException { DeleteFile eqDelete1 = FileHelpers.writeDeleteFile( table, - Files.localOutput(temp.newFile()), + Files.localOutput(File.createTempFile("junit", null, temp.toFile())), TestHelpers.Row.of(0), dataDeletes, deleteSchema1); @@ -298,7 +299,7 @@ public void testReadEqualityDeleteRows() throws IOException { DeleteFile eqDelete2 = FileHelpers.writeDeleteFile( table, - Files.localOutput(temp.newFile()), + Files.localOutput(File.createTempFile("junit", null, temp.toFile())), TestHelpers.Row.of(0), idDeletes, deleteSchema2); @@ -328,11 +329,11 @@ public void testReadEqualityDeleteRows() throws IOException { } } - Assert.assertEquals("should include 4 deleted row", 4, actualRowSet.size()); - Assert.assertEquals("deleted row should be matched", expectedRowSet, actualRowSet); + assertThat(actualRowSet).as("should include 4 deleted row").hasSize(4); + assertThat(actualRowSet).as("deleted row should be matched").isEqualTo(expectedRowSet); } - @Test + @TestTemplate public void testPosDeletesAllRowsInBatch() throws IOException { // read.parquet.vectorization.batch-size is set to 4, so the 4 rows in the first batch are all // deleted. @@ -346,7 +347,10 @@ public void testPosDeletesAllRowsInBatch() throws IOException { Pair posDeletes = FileHelpers.writeDeleteFile( - table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0), deletes); + table, + Files.localOutput(File.createTempFile("junit", null, temp.toFile())), + TestHelpers.Row.of(0), + deletes); table .newRowDelta() @@ -357,11 +361,11 @@ public void testPosDeletesAllRowsInBatch() throws IOException { StructLikeSet expected = rowSetWithoutIds(table, records, 29, 43, 61, 89); StructLikeSet actual = rowSet(tableName, table, "*"); - Assert.assertEquals("Table should contain expected rows", expected, actual); + assertThat(actual).as("Table should contain expected rows").isEqualTo(expected); checkDeleteCount(4L); } - @Test + @TestTemplate public void testPosDeletesWithDeletedColumn() throws IOException { // read.parquet.vectorization.batch-size is set to 4, so the 4 rows in the first batch are all // deleted. @@ -375,7 +379,10 @@ public void testPosDeletesWithDeletedColumn() throws IOException { Pair posDeletes = FileHelpers.writeDeleteFile( - table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0), deletes); + table, + Files.localOutput(File.createTempFile("junit", null, temp.toFile())), + TestHelpers.Row.of(0), + deletes); table .newRowDelta() @@ -387,11 +394,11 @@ public void testPosDeletesWithDeletedColumn() throws IOException { StructLikeSet actual = rowSet(tableName, PROJECTION_SCHEMA.asStruct(), "id", "data", "_deleted"); - Assert.assertEquals("Table should contain expected row", expected, actual); + assertThat(actual).as("Table should contain expected row").isEqualTo(expected); checkDeleteCount(4L); } - @Test + @TestTemplate public void testEqualityDeleteWithDeletedColumn() throws IOException { String tableName = table.name().substring(table.name().lastIndexOf(".") + 1); Schema deleteRowSchema = table.schema().select("data"); @@ -406,7 +413,7 @@ public void testEqualityDeleteWithDeletedColumn() throws IOException { DeleteFile eqDeletes = FileHelpers.writeDeleteFile( table, - Files.localOutput(temp.newFile()), + Files.localOutput(File.createTempFile("junit", null, temp.toFile())), TestHelpers.Row.of(0), dataDeletes, deleteRowSchema); @@ -417,11 +424,11 @@ public void testEqualityDeleteWithDeletedColumn() throws IOException { StructLikeSet actual = rowSet(tableName, PROJECTION_SCHEMA.asStruct(), "id", "data", "_deleted"); - Assert.assertEquals("Table should contain expected row", expected, actual); + assertThat(actual).as("Table should contain expected row").isEqualTo(expected); checkDeleteCount(3L); } - @Test + @TestTemplate public void testMixedPosAndEqDeletesWithDeletedColumn() throws IOException { Schema dataSchema = table.schema().select("data"); Record dataDelete = GenericRecord.create(dataSchema); @@ -435,7 +442,7 @@ public void testMixedPosAndEqDeletesWithDeletedColumn() throws IOException { DeleteFile eqDeletes = FileHelpers.writeDeleteFile( table, - Files.localOutput(temp.newFile()), + Files.localOutput(File.createTempFile("junit", null, temp.toFile())), TestHelpers.Row.of(0), dataDeletes, dataSchema); @@ -448,7 +455,10 @@ public void testMixedPosAndEqDeletesWithDeletedColumn() throws IOException { Pair posDeletes = FileHelpers.writeDeleteFile( - table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0), deletes); + table, + Files.localOutput(File.createTempFile("junit", null, temp.toFile())), + TestHelpers.Row.of(0), + deletes); table .newRowDelta() @@ -461,11 +471,11 @@ public void testMixedPosAndEqDeletesWithDeletedColumn() throws IOException { StructLikeSet actual = rowSet(tableName, PROJECTION_SCHEMA.asStruct(), "id", "data", "_deleted"); - Assert.assertEquals("Table should contain expected row", expected, actual); + assertThat(actual).as("Table should contain expected row").isEqualTo(expected); checkDeleteCount(4L); } - @Test + @TestTemplate public void testFilterOnDeletedMetadataColumn() throws IOException { List> deletes = Lists.newArrayList( @@ -477,7 +487,10 @@ public void testFilterOnDeletedMetadataColumn() throws IOException { Pair posDeletes = FileHelpers.writeDeleteFile( - table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0), deletes); + table, + Files.localOutput(File.createTempFile("junit", null, temp.toFile())), + TestHelpers.Row.of(0), + deletes); table .newRowDelta() @@ -505,7 +518,7 @@ public void testFilterOnDeletedMetadataColumn() throws IOException { actual.add(rowWrapper.wrap(row)); }); - Assert.assertEquals("Table should contain expected row", expected, actual); + assertThat(actual).as("Table should contain expected row").isEqualTo(expected); StructLikeSet expectedDeleted = expectedRowSetWithDeletesOnly(29, 43, 61, 89); @@ -526,21 +539,21 @@ public void testFilterOnDeletedMetadataColumn() throws IOException { actualDeleted.add(rowWrapper.wrap(row)); }); - Assert.assertEquals("Table should contain expected row", expectedDeleted, actualDeleted); + assertThat(actualDeleted).as("Table should contain expected row").isEqualTo(expectedDeleted); } - @Test + @TestTemplate public void testIsDeletedColumnWithoutDeleteFile() { StructLikeSet expected = expectedRowSet(); StructLikeSet actual = rowSet(tableName, PROJECTION_SCHEMA.asStruct(), "id", "data", "_deleted"); - Assert.assertEquals("Table should contain expected row", expected, actual); + assertThat(actual).as("Table should contain expected row").isEqualTo(expected); checkDeleteCount(0L); } - @Test + @TestTemplate public void testPosDeletesOnParquetFileWithMultipleRowGroups() throws IOException { - Assume.assumeTrue(format.equals("parquet")); + assumeThat(format).isEqualTo("parquet"); String tblName = "test3"; Table tbl = createTable(tblName, SCHEMA, PartitionSpec.unpartitioned()); @@ -548,8 +561,8 @@ public void testPosDeletesOnParquetFileWithMultipleRowGroups() throws IOExceptio List fileSplits = Lists.newArrayList(); StructType sparkSchema = SparkSchemaUtil.convert(SCHEMA); Configuration conf = new Configuration(); - File testFile = temp.newFile(); - Assert.assertTrue("Delete should succeed", testFile.delete()); + File testFile = File.createTempFile("junit", null, temp.toFile()); + assertThat(testFile.delete()).as("Delete should succeed").isTrue(); Path testFilePath = new Path(testFile.getAbsolutePath()); // Write a Parquet file with more than one row group @@ -557,8 +570,8 @@ public void testPosDeletesOnParquetFileWithMultipleRowGroups() throws IOExceptio new ParquetFileWriter(conf, ParquetSchemaUtil.convert(SCHEMA, "test3Schema"), testFilePath); parquetFileWriter.start(); for (int i = 0; i < 2; i += 1) { - File split = temp.newFile(); - Assert.assertTrue("Delete should succeed", split.delete()); + File split = File.createTempFile("junit", null, temp.toFile()); + assertThat(split.delete()).as("Delete should succeed").isTrue(); Path splitPath = new Path(split.getAbsolutePath()); fileSplits.add(splitPath); try (FileAppender writer = @@ -598,13 +611,14 @@ public void testPosDeletesOnParquetFileWithMultipleRowGroups() throws IOExceptio Pair.of(dataFile.path(), 107L), Pair.of(dataFile.path(), 109L)); Pair posDeletes = - FileHelpers.writeDeleteFile(table, Files.localOutput(temp.newFile()), deletes); + FileHelpers.writeDeleteFile( + table, Files.localOutput(File.createTempFile("junit", null, temp.toFile())), deletes); tbl.newRowDelta() .addDeletes(posDeletes.first()) .validateDataFilesExist(posDeletes.second()) .commit(); - Assert.assertEquals(193, rowSet(tblName, tbl, "*").size()); + assertThat(rowSet(tblName, tbl, "*")).hasSize(193); } private static final Schema PROJECTION_SCHEMA =