Skip to content

Commit

Permalink
Flink 1.16: Create JUnit5 version of TestFlinkScan (apache#9482)
Browse files Browse the repository at this point in the history
  • Loading branch information
nastra authored and geruh committed Jan 25, 2024
1 parent fec36f6 commit fc808fa
Show file tree
Hide file tree
Showing 7 changed files with 278 additions and 300 deletions.
360 changes: 181 additions & 179 deletions flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.iceberg.flink.source;

import static org.apache.iceberg.types.Types.NestedField.required;
import static org.assertj.core.api.Assumptions.assumeThat;

import java.io.IOException;
import java.util.Collections;
Expand All @@ -41,16 +42,11 @@
import org.apache.iceberg.flink.TestHelpers;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Types;
import org.junit.Assume;
import org.junit.Test;
import org.junit.jupiter.api.TestTemplate;

/** Test {@link FlinkInputFormat}. */
public class TestFlinkInputFormat extends TestFlinkSource {

public TestFlinkInputFormat(String fileFormat) {
super(fileFormat);
}

@Override
protected List<Row> run(
FlinkSource.Builder formatBuilder,
Expand All @@ -61,7 +57,7 @@ protected List<Row> run(
return runFormat(formatBuilder.tableLoader(tableLoader()).buildFormat());
}

@Test
@TestTemplate
public void testNestedProjection() throws Exception {
Schema schema =
new Schema(
Expand All @@ -75,10 +71,11 @@ public void testNestedProjection() throws Exception {
Types.NestedField.required(5, "f3", Types.LongType.get()))),
required(6, "id", Types.LongType.get()));

Table table = catalogResource.catalog().createTable(TableIdentifier.of("default", "t"), schema);
Table table =
catalogExtension.catalog().createTable(TableIdentifier.of("default", "t"), schema);

List<Record> writeRecords = RandomGenericData.generate(schema, 2, 0L);
new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER).appendToTable(writeRecords);
new GenericAppenderHelper(table, fileFormat, temporaryDirectory).appendToTable(writeRecords);

// Schema: [data, nested[f1, f2, f3], id]
// Projection: [nested.f2, data]
Expand Down Expand Up @@ -106,7 +103,7 @@ public void testNestedProjection() throws Exception {
TestHelpers.assertRows(result, expected);
}

@Test
@TestTemplate
public void testBasicProjection() throws IOException {
Schema writeSchema =
new Schema(
Expand All @@ -115,10 +112,10 @@ public void testBasicProjection() throws IOException {
Types.NestedField.optional(2, "time", Types.TimestampType.withZone()));

Table table =
catalogResource.catalog().createTable(TableIdentifier.of("default", "t"), writeSchema);
catalogExtension.catalog().createTable(TableIdentifier.of("default", "t"), writeSchema);

List<Record> writeRecords = RandomGenericData.generate(writeSchema, 2, 0L);
new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER).appendToTable(writeRecords);
new GenericAppenderHelper(table, fileFormat, temporaryDirectory).appendToTable(writeRecords);

TableSchema projectedSchema =
TableSchema.builder()
Expand All @@ -140,9 +137,9 @@ public void testBasicProjection() throws IOException {
TestHelpers.assertRows(result, expected);
}

@Test
@TestTemplate
public void testReadPartitionColumn() throws Exception {
Assume.assumeTrue("Temporary skip ORC", FileFormat.ORC != fileFormat);
assumeThat(fileFormat).as("Temporary skip ORC").isNotEqualTo(FileFormat.ORC);

Schema nestedSchema =
new Schema(
Expand All @@ -157,9 +154,10 @@ public void testReadPartitionColumn() throws Exception {
PartitionSpec.builderFor(nestedSchema).identity("struct.innerName").build();

Table table =
catalogResource.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, nestedSchema, spec);
catalogExtension.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, nestedSchema, spec);
List<Record> records = RandomGenericData.generate(nestedSchema, 10, 0L);
GenericAppenderHelper appender = new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER);
GenericAppenderHelper appender =
new GenericAppenderHelper(table, fileFormat, temporaryDirectory);
for (Record record : records) {
org.apache.iceberg.TestHelpers.Row partition =
org.apache.iceberg.TestHelpers.Row.of(record.get(1, Record.class).get(1));
Expand Down
Loading

0 comments on commit fc808fa

Please sign in to comment.