Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flink v1.16: Switch Flink v1.16 tests to Junit5 (Part 1) #9565

Closed
wants to merge 15 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions flink/v1.18/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ String scalaVersion = System.getProperty("scalaVersion") != null ? System.getPro

project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") {

test {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is already set in L127, so it's not needed here

useJUnitPlatform()
}

dependencies {
implementation project(path: ':iceberg-bundled-guava', configuration: 'shadow')
api project(':iceberg-api')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.apache.iceberg.types.Types.NestedField.required;
import static org.assertj.core.api.Assertions.assertThat;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
Expand All @@ -29,6 +30,7 @@
import java.io.ObjectOutputStream;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.file.Path;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
import org.apache.flink.core.memory.DataInputDeserializer;
Expand All @@ -48,11 +50,8 @@
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.types.Types;
import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

public class TestManifestFileSerialization {

Expand Down Expand Up @@ -104,7 +103,7 @@ public class TestManifestFileSerialization {

private static final FileIO FILE_IO = new HadoopFileIO(new Configuration());

@Rule public TemporaryFolder temp = new TemporaryFolder();
@TempDir private Path temp;

@Test
public void testKryoSerialization() throws IOException {
Expand Down Expand Up @@ -145,15 +144,15 @@ public void testJavaSerialization() throws Exception {
new ObjectInputStream(new ByteArrayInputStream(bytes.toByteArray()))) {
for (int i = 0; i < 3; i += 1) {
Object obj = in.readObject();
Assertions.assertThat(obj).as("Should be a ManifestFile").isInstanceOf(ManifestFile.class);
assertThat(obj).as("Should be a ManifestFile").isInstanceOf(ManifestFile.class);
TestHelpers.assertEquals(manifest, (ManifestFile) obj);
}
}
}

private ManifestFile writeManifest(DataFile... files) throws IOException {
File manifestFile = temp.newFile("input.m0.avro");
Assert.assertTrue(manifestFile.delete());
File manifestFile = temp.resolve("input.m0.avro").toFile();
assertThat(manifestFile).doesNotExist();
OutputFile outputFile = FILE_IO.newOutputFile(manifestFile.getCanonicalPath());

ManifestWriter<DataFile> writer = ManifestFiles.write(SPEC, outputFile);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.iceberg.flink;

import static org.assertj.core.api.Assertions.assertThat;

import java.util.Iterator;
import org.apache.flink.table.data.RowData;
import org.apache.iceberg.RecordWrapperTest;
Expand All @@ -28,8 +30,6 @@
import org.apache.iceberg.data.Record;
import org.apache.iceberg.flink.data.RandomRowData;
import org.apache.iceberg.util.StructLikeWrapper;
import org.assertj.core.api.Assertions;
import org.junit.Assert;

public class TestRowDataWrapper extends RecordWrapperTest {

Expand All @@ -49,12 +49,12 @@ public void testTime() {
return;
}

Assertions.assertThat(actual).isNotNull();
Assertions.assertThat(expected).isNotNull();
assertThat(actual).isNotNull();
assertThat(expected).isNotNull();

int expectedMilliseconds = (int) ((long) expected / 1000_000);
int actualMilliseconds = (int) ((long) actual / 1000_000);
Assert.assertEquals(message, expectedMilliseconds, actualMilliseconds);
assertThat(expectedMilliseconds).as(message).isEqualTo(actualMilliseconds);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the actual/expected is the wrong way around. It should be assertThat(actualMilliseconds).isEqualTo(expectedMilliseconds)

}
});
}
Expand All @@ -75,8 +75,8 @@ protected void generateAndValidate(Schema schema, RecordWrapperTest.AssertMethod
StructLikeWrapper actualWrapper = StructLikeWrapper.forType(schema.asStruct());
StructLikeWrapper expectedWrapper = StructLikeWrapper.forType(schema.asStruct());
for (int i = 0; i < numRecords; i++) {
Assert.assertTrue("Should have more records", actual.hasNext());
Assert.assertTrue("Should have more RowData", expected.hasNext());
assertThat(actual.hasNext()).as("Should have more records").isTrue();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
assertThat(actual.hasNext()).as("Should have more records").isTrue();
assertThat(actual).as("Should have more records").hasNext();

same for the other places that use a similar pattern. We want to avoid using isTrue/isFalse as much as possible, because we want to get rich context from AssertJ when the assertion ever fails

assertThat(expected.hasNext()).as("Should have more RowData").isTrue();

StructLike recordStructLike = recordWrapper.wrap(actual.next());
StructLike rowDataStructLike = rowDataWrapper.wrap(expected.next());
Expand All @@ -87,7 +87,7 @@ protected void generateAndValidate(Schema schema, RecordWrapperTest.AssertMethod
expectedWrapper.set(rowDataStructLike));
}

Assert.assertFalse("Shouldn't have more record", actual.hasNext());
Assert.assertFalse("Shouldn't have more RowData", expected.hasNext());
assertThat(actual.hasNext()).as("Shouldn't have more record").isFalse();
assertThat(expected.hasNext()).as("Shouldn't have more RowData").isFalse();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@
import static org.apache.iceberg.flink.TestHelpers.roundTripKryoSerialize;
import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.apache.iceberg.types.Types.NestedField.required;
import static org.assertj.core.api.Assertions.assertThat;

import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Map;
import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.MetadataTableType;
Expand All @@ -39,11 +41,9 @@
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.types.Types;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

public class TestTableSerialization {
private static final HadoopTables TABLES = new HadoopTables();
Expand All @@ -60,15 +60,15 @@ public class TestTableSerialization {

private static final SortOrder SORT_ORDER = SortOrder.builderFor(SCHEMA).asc("id").build();

@Rule public TemporaryFolder temp = new TemporaryFolder();
@TempDir private Path temp;
private Table table;

@Before
@BeforeEach
public void initTable() throws IOException {
Map<String, String> props = ImmutableMap.of("k1", "v1", "k2", "v2");

File tableLocation = temp.newFolder();
Assert.assertTrue(tableLocation.delete());
File tableLocation = temp.resolve("table").toFile();
assertThat(tableLocation).doesNotExist();

this.table = TABLES.create(SCHEMA, SPEC, SORT_ORDER, props, tableLocation.toString());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.iceberg.flink.DataGenerator;
import org.apache.iceberg.flink.DataGenerators;
import org.apache.iceberg.flink.TestHelpers;
import org.junit.Test;
import org.junit.jupiter.api.Test;

public class TestStructRowData {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not also include the other files from the data package in this PR?


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@
*/
package org.apache.iceberg.flink.sink;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would have expected to include other test files from the sink package here is well, but just picking 2 files seems quite random to me. I'd suggest to focus on one or more packages per PR (depending on how big the changes are) instead of just randomly picking files across the codebase


import static org.junit.jupiter.api.Assertions.assertEquals;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wrong import. We should be using AssertJ assertions instead of the JUnit ones


import org.apache.flink.table.data.RowData;
import org.apache.iceberg.flink.AvroGenericRecordConverterBase;
import org.apache.iceberg.flink.DataGenerator;
import org.junit.Assert;

public class TestAvroGenericRecordToRowDataMapper extends AvroGenericRecordConverterBase {
@Override
Expand All @@ -32,6 +33,6 @@ protected void testConverter(DataGenerator dataGenerator) throws Exception {
AvroGenericRecordToRowDataMapper.forAvroSchema(dataGenerator.avroSchema());
RowData expected = dataGenerator.generateFlinkRowData();
RowData actual = mapper.map(dataGenerator.generateAvroGenericRecord());
Assert.assertEquals(expected, actual);
assertEquals(expected, actual);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.iceberg.flink.sink;

import static org.assertj.core.api.Assertions.assertThat;

import java.util.List;
import java.util.stream.Collectors;
import org.apache.flink.table.data.GenericRowData;
Expand All @@ -35,8 +37,7 @@
import org.apache.iceberg.flink.data.RandomRowData;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Types;
import org.junit.Assert;
import org.junit.Test;
import org.junit.jupiter.api.Test;

public class TestRowDataPartitionKey {
private static final Schema SCHEMA =
Expand Down Expand Up @@ -91,10 +92,10 @@ public void testNullPartitionValue() {
for (RowData row : rows) {
PartitionKey partitionKey = new PartitionKey(spec, schema);
partitionKey.partition(rowWrapper.wrap(row));
Assert.assertEquals(partitionKey.size(), 1);
assertThat(partitionKey.size()).isEqualTo(1);

String expectedStr = row.isNullAt(1) ? null : row.getString(1).toString();
Assert.assertEquals(expectedStr, partitionKey.get(0, String.class));
assertThat(expectedStr).isEqualTo(partitionKey.get(0, String.class));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the ordering of actual/expected needs to be fixed here

}
}

Expand All @@ -116,15 +117,15 @@ public void testPartitionWithOneNestedField() {

PartitionKey partitionKey1 = new PartitionKey(spec1, NESTED_SCHEMA);
partitionKey1.partition(rowWrapper.wrap(row));
Assert.assertEquals(partitionKey1.size(), 1);
assertThat(partitionKey1.size()).isEqualTo(1);

Assert.assertEquals(record.get(0), partitionKey1.get(0, String.class));
assertThat(record.get(0)).isEqualTo(partitionKey1.get(0, String.class));

PartitionKey partitionKey2 = new PartitionKey(spec2, NESTED_SCHEMA);
partitionKey2.partition(rowWrapper.wrap(row));
Assert.assertEquals(partitionKey2.size(), 1);
assertThat(partitionKey2.size()).isEqualTo(1);

Assert.assertEquals(record.get(1), partitionKey2.get(0, Integer.class));
assertThat(record.get(1)).isEqualTo(partitionKey2.get(0, Integer.class));
}
}

Expand Down Expand Up @@ -154,16 +155,16 @@ public void testPartitionMultipleNestedField() {
Record record = (Record) records.get(i).get(0);

pk1.partition(rowWrapper.wrap(row));
Assert.assertEquals(2, pk1.size());
assertThat(pk1.size()).isEqualTo(2);

Assert.assertEquals(record.get(1), pk1.get(0, Integer.class));
Assert.assertEquals(record.get(0), pk1.get(1, String.class));
assertThat(record.get(1)).isEqualTo(pk1.get(0, Integer.class));
assertThat(record.get(0)).isEqualTo(pk1.get(1, String.class));

pk2.partition(rowWrapper.wrap(row));
Assert.assertEquals(2, pk2.size());
assertThat(pk2.size()).isEqualTo(2);

Assert.assertEquals(record.get(0), pk2.get(0, String.class));
Assert.assertEquals(record.get(1), pk2.get(1, Integer.class));
assertThat(record.get(0)).isEqualTo(pk2.get(0, String.class));
assertThat(record.get(1)).isEqualTo(pk2.get(1, Integer.class));
}
}

Expand All @@ -190,19 +191,19 @@ public void testPartitionValueTypes() {
pk.partition(rowWrapper.wrap(row));
expectedPK.partition(recordWrapper.wrap(record));

Assert.assertEquals(
"Partition with column " + column + " should have one field.", 1, pk.size());
assertThat(pk.size())
.as("Partition with column " + column + " should have one field.")
.isEqualTo(1);

if (column.equals("timeType")) {
Assert.assertEquals(
"Partition with column " + column + " should have the expected values",
expectedPK.get(0, Long.class) / 1000,
pk.get(0, Long.class) / 1000);
assertThat(expectedPK.get(0, Long.class) / 1000)
.as("Partition with column " + column + " should have the expected values")
.isEqualTo(pk.get(0, Long.class) / 1000);

} else {
Assert.assertEquals(
"Partition with column " + column + " should have the expected values",
expectedPK.get(0, javaClasses[0]),
pk.get(0, javaClasses[0]));
assertThat(expectedPK.get(0, javaClasses[0]))
.as("Partition with column " + column + " should have the expected values")
.isEqualTo(pk.get(0, javaClasses[0]));
}
}
}
Expand Down Expand Up @@ -232,19 +233,18 @@ public void testNestedPartitionValues() {
pk.partition(rowWrapper.wrap(rows.get(j)));
expectedPK.partition(recordWrapper.wrap(records.get(j)));

Assert.assertEquals(
"Partition with nested column " + column + " should have one field.", 1, pk.size());
assertThat(pk.size())
.as("Partition with nested column " + column + " should have one field.")
.isEqualTo(1);

if (column.equals("nested.timeType")) {
Assert.assertEquals(
"Partition with nested column " + column + " should have the expected values.",
expectedPK.get(0, Long.class) / 1000,
pk.get(0, Long.class) / 1000);
assertThat(expectedPK.get(0, Long.class) / 1000)
.as("Partition with nested column " + column + " should have the expected values.")
.isEqualTo(pk.get(0, Long.class) / 1000);
} else {
Assert.assertEquals(
"Partition with nested column " + column + " should have the expected values.",
expectedPK.get(0, javaClasses[0]),
pk.get(0, javaClasses[0]));
assertThat(expectedPK.get(0, javaClasses[0]))
.as("Partition with nested column " + column + " should have the expected values.")
.isEqualTo(pk.get(0, javaClasses[0]));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.apache.iceberg.SortKey;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.types.Types;
import org.junit.Test;
import org.junit.jupiter.api.Test;

public class TestAggregatedStatistics {
private final Schema schema =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
import org.apache.iceberg.SortKey;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.types.Types;
import org.junit.Before;
import org.junit.Test;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

public class TestAggregatedStatisticsTracker {
private static final int NUM_SUBTASKS = 2;
Expand All @@ -48,7 +48,7 @@ public TestAggregatedStatisticsTracker() {
keyB.set(0, "b");
}

@Before
@BeforeEach
public void before() throws Exception {
aggregatedStatisticsTracker =
new AggregatedStatisticsTracker<>("testOperator", statisticsSerializer, NUM_SUBTASKS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.types.Types;
import org.junit.Before;
import org.junit.Test;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

public class TestDataStatisticsCoordinator {
private static final String OPERATOR_NAME = "TestCoordinator";
Expand All @@ -52,7 +52,7 @@ public class TestDataStatisticsCoordinator {
private DataStatisticsCoordinator<MapDataStatistics, Map<SortKey, Long>>
dataStatisticsCoordinator;

@Before
@BeforeEach
public void before() throws Exception {
receivingTasks = EventReceivingTasks.createForRunningTasks();
dataStatisticsCoordinator =
Expand Down
Loading