Skip to content

Commit

Permalink
Spark 3:5 Migrate tests to JUnit5 in source directory (apache#9342)
Browse files Browse the repository at this point in the history
  • Loading branch information
chinmay-bhat authored and lisirrx committed Jan 4, 2024
1 parent 723e4f1 commit 9e9b62d
Show file tree
Hide file tree
Showing 24 changed files with 833 additions and 756 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.StructLikeWrapper;
import org.junit.Assert;
import org.junit.Test;
import org.junit.jupiter.api.Test;

public abstract class RecordWrapperTest {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,29 +18,40 @@
*/
package org.apache.iceberg.spark;

import java.nio.file.Path;
import java.util.Map;
import java.util.stream.Stream;
import org.junit.jupiter.params.provider.Arguments;
import org.apache.iceberg.ParameterizedTestExtension;
import org.apache.iceberg.Parameters;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;

@ExtendWith(ParameterizedTestExtension.class)
public abstract class CatalogTestBase extends TestBaseWithCatalog {

// these parameters are broken out to avoid changes that need to modify lots of test suites
public static Stream<Arguments> parameters() {
return Stream.of(
Arguments.of(
SparkCatalogConfig.HIVE.catalogName(),
SparkCatalogConfig.HIVE.implementation(),
SparkCatalogConfig.HIVE.properties()),
Arguments.of(
SparkCatalogConfig.HADOOP.catalogName(),
SparkCatalogConfig.HADOOP.implementation(),
SparkCatalogConfig.HADOOP.properties()),
Arguments.of(
SparkCatalogConfig.SPARK.catalogName(),
SparkCatalogConfig.SPARK.implementation(),
SparkCatalogConfig.SPARK.properties()));
@Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}")
public static Object[][] parameters() {
return new Object[][] {
{
SparkCatalogConfig.HIVE.catalogName(),
SparkCatalogConfig.HIVE.implementation(),
SparkCatalogConfig.HIVE.properties()
},
{
SparkCatalogConfig.HADOOP.catalogName(),
SparkCatalogConfig.HADOOP.implementation(),
SparkCatalogConfig.HADOOP.properties()
},
{
SparkCatalogConfig.SPARK.catalogName(),
SparkCatalogConfig.SPARK.implementation(),
SparkCatalogConfig.SPARK.properties()
}
};
}

@TempDir protected Path temp;

public CatalogTestBase(SparkCatalogConfig config) {
super(config);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@

import static org.apache.iceberg.FileFormat.PARQUET;
import static org.apache.iceberg.Files.localOutput;
import static org.assertj.core.api.Assertions.assertThat;

import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand All @@ -47,14 +49,12 @@
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.spark.data.RandomData;
import org.apache.iceberg.types.Types;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

public class TestBaseReader {

@Rule public TemporaryFolder temp = new TemporaryFolder();
@TempDir private Path temp;

private Table table;

Expand Down Expand Up @@ -129,44 +129,47 @@ public void testClosureOnDataExhaustion() throws IOException {
int countRecords = 0;
while (reader.next()) {
countRecords += 1;
Assert.assertNotNull("Reader should return non-null value", reader.get());
assertThat(reader.get()).as("Reader should return non-null value").isNotNull();
}

Assert.assertEquals(
"Reader returned incorrect number of records", totalTasks * recordPerTask, countRecords);
assertThat(totalTasks * recordPerTask)
.as("Reader returned incorrect number of records")
.isEqualTo(countRecords);
tasks.forEach(
t ->
Assert.assertTrue(
"All iterators should be closed after read exhausion", reader.isIteratorClosed(t)));
assertThat(reader.isIteratorClosed(t))
.as("All iterators should be closed after read exhausion")
.isTrue());
}

@Test
public void testClosureDuringIteration() throws IOException {
Integer totalTasks = 2;
Integer recordPerTask = 1;
List<FileScanTask> tasks = createFileScanTasks(totalTasks, recordPerTask);
Assert.assertEquals(2, tasks.size());
assertThat(tasks).hasSize(2);
FileScanTask firstTask = tasks.get(0);
FileScanTask secondTask = tasks.get(1);

ClosureTrackingReader reader = new ClosureTrackingReader(table, tasks);

// Total of 2 elements
Assert.assertTrue(reader.next());
Assert.assertFalse(
"First iter should not be closed on its last element", reader.isIteratorClosed(firstTask));

Assert.assertTrue(reader.next());
Assert.assertTrue(
"First iter should be closed after moving to second iter",
reader.isIteratorClosed(firstTask));
Assert.assertFalse(
"Second iter should not be closed on its last element",
reader.isIteratorClosed(secondTask));

Assert.assertFalse(reader.next());
Assert.assertTrue(reader.isIteratorClosed(firstTask));
Assert.assertTrue(reader.isIteratorClosed(secondTask));
assertThat(reader.next()).isTrue();
assertThat(reader.isIteratorClosed(firstTask))
.as("First iter should not be closed on its last element")
.isFalse();

assertThat(reader.next()).isTrue();
assertThat(reader.isIteratorClosed(firstTask))
.as("First iter should be closed after moving to second iter")
.isTrue();
assertThat(reader.isIteratorClosed(secondTask))
.as("Second iter should not be closed on its last element")
.isFalse();

assertThat(reader.next()).isFalse();
assertThat(reader.isIteratorClosed(firstTask)).isTrue();
assertThat(reader.isIteratorClosed(secondTask)).isTrue();
}

@Test
Expand All @@ -181,8 +184,9 @@ public void testClosureWithoutAnyRead() throws IOException {

tasks.forEach(
t ->
Assert.assertFalse(
"Iterator should not be created eagerly for tasks", reader.hasIterator(t)));
assertThat(reader.hasIterator(t))
.as("Iterator should not be created eagerly for tasks")
.isFalse());
}

@Test
Expand All @@ -195,8 +199,8 @@ public void testExplicitClosure() throws IOException {

Integer halfDataSize = (totalTasks * recordPerTask) / 2;
for (int i = 0; i < halfDataSize; i++) {
Assert.assertTrue("Reader should have some element", reader.next());
Assert.assertNotNull("Reader should return non-null value", reader.get());
assertThat(reader.next()).as("Reader should have some element").isTrue();
assertThat(reader.get()).as("Reader should return non-null value").isNotNull();
}

reader.close();
Expand All @@ -206,8 +210,9 @@ public void testExplicitClosure() throws IOException {
tasks.forEach(
t -> {
if (reader.hasIterator(t)) {
Assert.assertTrue(
"Iterator should be closed after read exhausion", reader.isIteratorClosed(t));
assertThat(reader.isIteratorClosed(t))
.as("Iterator should be closed after read exhausion")
.isTrue();
}
});
}
Expand All @@ -222,31 +227,32 @@ public void testIdempotentExplicitClosure() throws IOException {

// Total 100 elements, only 5 iterators have been created
for (int i = 0; i < 45; i++) {
Assert.assertTrue("eader should have some element", reader.next());
Assert.assertNotNull("Reader should return non-null value", reader.get());
assertThat(reader.next()).as("Reader should have some element").isTrue();
assertThat(reader.get()).as("Reader should return non-null value").isNotNull();
}

for (int closeAttempt = 0; closeAttempt < 5; closeAttempt++) {
reader.close();
for (int i = 0; i < 5; i++) {
Assert.assertTrue(
"Iterator should be closed after read exhausion",
reader.isIteratorClosed(tasks.get(i)));
assertThat(reader.isIteratorClosed(tasks.get(i)))
.as("Iterator should be closed after read exhausion")
.isTrue();
}
for (int i = 5; i < 10; i++) {
Assert.assertFalse(
"Iterator should not be created eagerly for tasks", reader.hasIterator(tasks.get(i)));
assertThat(reader.hasIterator(tasks.get(i)))
.as("Iterator should not be created eagerly for tasks")
.isFalse();
}
}
}

private List<FileScanTask> createFileScanTasks(Integer totalTasks, Integer recordPerTask)
throws IOException {
String desc = "make_scan_tasks";
File parent = temp.newFolder(desc);
File parent = temp.resolve(desc).toFile();
File location = new File(parent, "test");
File dataFolder = new File(location, "data");
Assert.assertTrue("mkdirs should succeed", dataFolder.mkdirs());
assertThat(dataFolder.mkdirs()).as("mkdirs should succeed").isTrue();

Schema schema = new Schema(Types.NestedField.required(0, "id", Types.LongType.get()));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@

import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.apache.iceberg.types.Types.NestedField.required;
import static org.assertj.core.api.Assertions.assertThat;

import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.iceberg.ChangelogOperation;
Expand All @@ -41,17 +44,15 @@
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.SparkTestBase;
import org.apache.iceberg.spark.TestBase;
import org.apache.iceberg.types.Types;
import org.apache.spark.sql.catalyst.InternalRow;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class TestChangelogReader extends SparkTestBase {
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

public class TestChangelogReader extends TestBase {
private static final Schema SCHEMA =
new Schema(
required(1, "id", Types.IntegerType.get()), optional(2, "data", Types.StringType.get()));
Expand All @@ -64,9 +65,9 @@ public class TestChangelogReader extends SparkTestBase {
private DataFile dataFile1;
private DataFile dataFile2;

@Rule public TemporaryFolder temp = new TemporaryFolder();
@TempDir private Path temp;

@Before
@BeforeEach
public void before() throws IOException {
table = catalog.createTable(TableIdentifier.of("default", "test"), SCHEMA, SPEC);
// create some data
Expand All @@ -85,7 +86,7 @@ public void before() throws IOException {
dataFile2 = writeDataFile(records2);
}

@After
@AfterEach
public void after() {
catalog.dropTable(TableIdentifier.of("default", "test"));
}
Expand Down Expand Up @@ -176,7 +177,7 @@ public void testDataFileRewrite() throws IOException {
reader.close();
}

Assert.assertEquals("Should have no rows", 0, rows.size());
assertThat(rows).as("Should have no rows").hasSize(0);
}

@Test
Expand Down Expand Up @@ -254,6 +255,9 @@ private Object[] toJava(InternalRow row) {
private DataFile writeDataFile(List<Record> records) throws IOException {
// records all use IDs that are in bucket id_bucket=0
return FileHelpers.writeDataFile(
table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0), records);
table,
Files.localOutput(File.createTempFile("junit", null, temp.toFile())),
TestHelpers.Row.of(0),
records);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@
*/
package org.apache.iceberg.spark.source;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import java.util.List;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.spark.Spark3Util;
import org.apache.iceberg.spark.SparkTestBaseWithCatalog;
import org.apache.iceberg.spark.TestBaseWithCatalog;
import org.apache.iceberg.types.Types;
import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.Dataset;
Expand All @@ -32,19 +35,17 @@
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.sql.catalyst.parser.ParseException;
import org.apache.spark.sql.internal.SQLConf;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class TestDataFrameWriterV2 extends SparkTestBaseWithCatalog {
@Before
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

public class TestDataFrameWriterV2 extends TestBaseWithCatalog {
@BeforeEach
public void createTable() {
sql("CREATE TABLE %s (id bigint, data string) USING iceberg", tableName);
}

@After
@AfterEach
public void removeTables() {
sql("DROP TABLE IF EXISTS %s", tableName);
}
Expand Down Expand Up @@ -76,7 +77,7 @@ public void testMergeSchemaFailsWithoutWriterOption() throws Exception {

// this has a different error message than the case without accept-any-schema because it uses
// Iceberg checks
Assertions.assertThatThrownBy(() -> threeColDF.writeTo(tableName).append())
assertThatThrownBy(() -> threeColDF.writeTo(tableName).append())
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Field new_col not found in source schema");
}
Expand All @@ -102,8 +103,7 @@ public void testMergeSchemaWithoutAcceptAnySchema() throws Exception {
"{ \"id\": 3, \"data\": \"c\", \"new_col\": 12.06 }",
"{ \"id\": 4, \"data\": \"d\", \"new_col\": 14.41 }");

Assertions.assertThatThrownBy(
() -> threeColDF.writeTo(tableName).option("merge-schema", "true").append())
assertThatThrownBy(() -> threeColDF.writeTo(tableName).option("merge-schema", "true").append())
.isInstanceOf(AnalysisException.class)
.hasMessageContaining(
"Cannot write to `testhadoop`.`default`.`table`, the reason is too many data columns");
Expand Down Expand Up @@ -201,12 +201,12 @@ public void testWriteWithCaseSensitiveOption() throws NoSuchTableException, Pars
List<Types.NestedField> fields =
Spark3Util.loadIcebergTable(sparkSession, tableName).schema().asStruct().fields();
// Additional columns should not be created
Assert.assertEquals(2, fields.size());
assertThat(fields).hasSize(2);

// enable spark.sql.caseSensitive
sparkSession.sql(String.format("SET %s=true", SQLConf.CASE_SENSITIVE().key()));
ds.writeTo(tableName).option("merge-schema", "true").option("check-ordering", "false").append();
fields = Spark3Util.loadIcebergTable(sparkSession, tableName).schema().asStruct().fields();
Assert.assertEquals(4, fields.size());
assertThat(fields).hasSize(4);
}
}
Loading

0 comments on commit 9e9b62d

Please sign in to comment.