From 1c64e4c0068f817dee632caa3f665cb70f48d801 Mon Sep 17 00:00:00 2001 From: Tom Tanaka <43331405+tomtongue@users.noreply.github.com> Date: Mon, 1 Jul 2024 22:24:02 +0900 Subject: [PATCH] Flink: Migrate HadoopCatalog related tests (#10358) --- .../iceberg/flink/HadoopTableExtension.java | 59 +++++++ .../flink/sink/TestFlinkIcebergSink.java | 148 ++++++++---------- .../sink/TestFlinkIcebergSinkBranch.java | 66 ++++---- .../flink/sink/TestFlinkIcebergSinkV2.java | 111 +++++-------- .../sink/TestFlinkIcebergSinkV2Base.java | 42 ++++- .../sink/TestFlinkIcebergSinkV2Branch.java | 69 ++++---- .../flink/source/TestFlinkSourceSql.java | 4 +- .../flink/source/TestIcebergSourceSql.java | 4 +- .../iceberg/flink/source/TestSqlBase.java | 46 +++--- .../flink/source/reader/ReaderUtil.java | 22 +++ .../TestColumnStatsWatermarkExtractor.java | 90 ++++++----- 11 files changed, 356 insertions(+), 305 deletions(-) create mode 100644 flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/HadoopTableExtension.java diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/HadoopTableExtension.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/HadoopTableExtension.java new file mode 100644 index 000000000000..dc6ef400a4a9 --- /dev/null +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/HadoopTableExtension.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink; + +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.TableIdentifier; +import org.junit.jupiter.api.extension.ExtensionContext; + +public class HadoopTableExtension extends HadoopCatalogExtension { + private final Schema schema; + private final PartitionSpec partitionSpec; + + private Table table; + + public HadoopTableExtension(String database, String tableName, Schema schema) { + this(database, tableName, schema, null); + } + + public HadoopTableExtension( + String database, String tableName, Schema schema, PartitionSpec partitionSpec) { + super(database, tableName); + this.schema = schema; + this.partitionSpec = partitionSpec; + } + + @Override + public void beforeEach(ExtensionContext context) throws Exception { + super.beforeEach(context); + if (partitionSpec == null) { + this.table = catalog.createTable(TableIdentifier.of(database, tableName), schema); + } else { + this.table = + catalog.createTable(TableIdentifier.of(database, tableName), schema, partitionSpec); + } + tableLoader.open(); + } + + public Table table() { + return table; + } +} diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java index 9ff79419b020..527525e9f167 100644 --- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java @@ -18,6 +18,8 @@ */ package org.apache.iceberg.flink.sink; +import static org.apache.iceberg.flink.TestFixtures.DATABASE; +import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import java.io.IOException; @@ -29,17 +31,21 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.data.RowData; -import org.apache.flink.test.util.MiniClusterWithClientResource; +import org.apache.flink.test.junit5.MiniClusterExtension; import org.apache.flink.types.Row; import org.apache.iceberg.DistributionMode; import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Parameter; +import org.apache.iceberg.ParameterizedTestExtension; +import org.apache.iceberg.Parameters; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.flink.FlinkWriteOptions; -import org.apache.iceberg.flink.HadoopCatalogResource; +import org.apache.iceberg.flink.HadoopCatalogExtension; import org.apache.iceberg.flink.MiniClusterResource; +import org.apache.iceberg.flink.MiniFlinkClusterExtension; import org.apache.iceberg.flink.SimpleDataUtil; import org.apache.iceberg.flink.TableLoader; import org.apache.iceberg.flink.TestFixtures; @@ -47,59 +53,52 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; -import org.junit.Assert; -import org.junit.Before; -import org.junit.ClassRule; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -@RunWith(Parameterized.class) -public class TestFlinkIcebergSink extends TestFlinkIcebergSinkBase { +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.extension.RegisterExtension; - @ClassRule - public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE = - MiniClusterResource.createWithClassloaderCheckDisabled(); +@ExtendWith(ParameterizedTestExtension.class) +public class TestFlinkIcebergSink extends TestFlinkIcebergSinkBase { - @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + @RegisterExtension + public static MiniClusterExtension miniClusterResource = + MiniFlinkClusterExtension.createWithClassloaderCheckDisabled(); - @Rule - public final HadoopCatalogResource catalogResource = - new HadoopCatalogResource(TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE); + @RegisterExtension + private static final HadoopCatalogExtension catalogResource = + new HadoopCatalogExtension(DATABASE, TestFixtures.TABLE); private TableLoader tableLoader; - private final FileFormat format; - private final int parallelism; - private final boolean partitioned; + @Parameter(index = 0) + private FileFormat format; + + @Parameter(index = 1) + private int parallelism; - @Parameterized.Parameters(name = "format={0}, parallelism = {1}, partitioned = {2}") + @Parameter(index = 2) + private boolean partitioned; + + @Parameters(name = "format={0}, parallelism = {1}, partitioned = {2}") public static Object[][] parameters() { return new Object[][] { - {"avro", 1, true}, - {"avro", 1, false}, - {"avro", 2, true}, - {"avro", 2, false}, - {"orc", 1, true}, - {"orc", 1, false}, - {"orc", 2, true}, - {"orc", 2, false}, - {"parquet", 1, true}, - {"parquet", 1, false}, - {"parquet", 2, true}, - {"parquet", 2, false} + {FileFormat.AVRO, 1, true}, + {FileFormat.AVRO, 1, false}, + {FileFormat.AVRO, 2, true}, + {FileFormat.AVRO, 2, false}, + {FileFormat.ORC, 1, true}, + {FileFormat.ORC, 1, false}, + {FileFormat.ORC, 2, true}, + {FileFormat.ORC, 2, false}, + {FileFormat.PARQUET, 1, true}, + {FileFormat.PARQUET, 1, false}, + {FileFormat.PARQUET, 2, true}, + {FileFormat.PARQUET, 2, false} }; } - public TestFlinkIcebergSink(String format, int parallelism, boolean partitioned) { - this.format = FileFormat.fromString(format); - this.parallelism = parallelism; - this.partitioned = partitioned; - } - - @Before + @BeforeEach public void before() throws IOException { table = catalogResource @@ -122,7 +121,7 @@ public void before() throws IOException { tableLoader = catalogResource.tableLoader(); } - @Test + @TestTemplate public void testWriteRowData() throws Exception { List rows = Lists.newArrayList(Row.of(1, "hello"), Row.of(2, "world"), Row.of(3, "foo")); DataStream dataStream = @@ -165,17 +164,17 @@ private int partitionFiles(String partition) throws IOException { return SimpleDataUtil.partitionDataFiles(table, ImmutableMap.of("data", partition)).size(); } - @Test + @TestTemplate public void testWriteRow() throws Exception { testWriteRow(null, DistributionMode.NONE); } - @Test + @TestTemplate public void testWriteRowWithTableSchema() throws Exception { testWriteRow(SimpleDataUtil.FLINK_SCHEMA, DistributionMode.NONE); } - @Test + @TestTemplate public void testJobNoneDistributeMode() throws Exception { table .updateProperties() @@ -187,12 +186,12 @@ public void testJobNoneDistributeMode() throws Exception { if (parallelism > 1) { if (partitioned) { int files = partitionFiles("aaa") + partitionFiles("bbb") + partitionFiles("ccc"); - Assert.assertTrue("Should have more than 3 files in iceberg table.", files > 3); + assertThat(files).isGreaterThan(3); } } } - @Test + @TestTemplate public void testJobHashDistributionMode() { table .updateProperties() @@ -204,7 +203,7 @@ public void testJobHashDistributionMode() { .hasMessage("Flink does not support 'range' write distribution mode now."); } - @Test + @TestTemplate public void testJobNullDistributionMode() throws Exception { table .updateProperties() @@ -214,42 +213,33 @@ public void testJobNullDistributionMode() throws Exception { testWriteRow(null, null); if (partitioned) { - Assert.assertEquals( - "There should be only 1 data file in partition 'aaa'", 1, partitionFiles("aaa")); - Assert.assertEquals( - "There should be only 1 data file in partition 'bbb'", 1, partitionFiles("bbb")); - Assert.assertEquals( - "There should be only 1 data file in partition 'ccc'", 1, partitionFiles("ccc")); + assertThat(partitionFiles("aaa")).isEqualTo(1); + assertThat(partitionFiles("bbb")).isEqualTo(1); + assertThat(partitionFiles("ccc")).isEqualTo(1); } } - @Test + @TestTemplate public void testPartitionWriteMode() throws Exception { testWriteRow(null, DistributionMode.HASH); if (partitioned) { - Assert.assertEquals( - "There should be only 1 data file in partition 'aaa'", 1, partitionFiles("aaa")); - Assert.assertEquals( - "There should be only 1 data file in partition 'bbb'", 1, partitionFiles("bbb")); - Assert.assertEquals( - "There should be only 1 data file in partition 'ccc'", 1, partitionFiles("ccc")); + assertThat(partitionFiles("aaa")).isEqualTo(1); + assertThat(partitionFiles("bbb")).isEqualTo(1); + assertThat(partitionFiles("ccc")).isEqualTo(1); } } - @Test + @TestTemplate public void testShuffleByPartitionWithSchema() throws Exception { testWriteRow(SimpleDataUtil.FLINK_SCHEMA, DistributionMode.HASH); if (partitioned) { - Assert.assertEquals( - "There should be only 1 data file in partition 'aaa'", 1, partitionFiles("aaa")); - Assert.assertEquals( - "There should be only 1 data file in partition 'bbb'", 1, partitionFiles("bbb")); - Assert.assertEquals( - "There should be only 1 data file in partition 'ccc'", 1, partitionFiles("ccc")); + assertThat(partitionFiles("aaa")).isEqualTo(1); + assertThat(partitionFiles("bbb")).isEqualTo(1); + assertThat(partitionFiles("ccc")).isEqualTo(1); } } - @Test + @TestTemplate public void testTwoSinksInDisjointedDAG() throws Exception { Map props = ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name()); @@ -323,16 +313,14 @@ public void testTwoSinksInDisjointedDAG() throws Exception { SimpleDataUtil.assertTableRows(rightTable, convertToRowData(rightRows)); leftTable.refresh(); - Assert.assertNull(leftTable.currentSnapshot().summary().get("flink.test")); - Assert.assertNull(leftTable.currentSnapshot().summary().get("direction")); + assertThat(leftTable.currentSnapshot().summary()).doesNotContainKeys("flink.test", "direction"); rightTable.refresh(); - Assert.assertEquals( - TestFlinkIcebergSink.class.getName(), - rightTable.currentSnapshot().summary().get("flink.test")); - Assert.assertEquals("rightTable", rightTable.currentSnapshot().summary().get("direction")); + assertThat(rightTable.currentSnapshot().summary()) + .containsEntry("flink.test", TestFlinkIcebergSink.class.getName()) + .containsEntry("direction", "rightTable"); } - @Test + @TestTemplate public void testOverrideWriteConfigWithUnknownDistributionMode() { Map newProps = Maps.newHashMap(); newProps.put(FlinkWriteOptions.DISTRIBUTION_MODE.key(), "UNRECOGNIZED"); @@ -352,7 +340,7 @@ public void testOverrideWriteConfigWithUnknownDistributionMode() { .hasMessage("Invalid distribution mode: UNRECOGNIZED"); } - @Test + @TestTemplate public void testOverrideWriteConfigWithUnknownFileFormat() { Map newProps = Maps.newHashMap(); newProps.put(FlinkWriteOptions.WRITE_FORMAT.key(), "UNRECOGNIZED"); @@ -372,7 +360,7 @@ public void testOverrideWriteConfigWithUnknownFileFormat() { .hasMessage("Invalid file format: UNRECOGNIZED"); } - @Test + @TestTemplate public void testWriteRowWithTableRefreshInterval() throws Exception { List rows = Lists.newArrayList(Row.of(1, "hello"), Row.of(2, "world"), Row.of(3, "foo")); DataStream dataStream = diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkBranch.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkBranch.java index 16b4542b00d3..547b4937c5bd 100644 --- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkBranch.java +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkBranch.java @@ -18,60 +18,60 @@ */ package org.apache.iceberg.flink.sink; +import static org.apache.iceberg.flink.TestFixtures.DATABASE; +import static org.assertj.core.api.Assertions.assertThat; + import java.io.IOException; import java.util.List; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.TableSchema; -import org.apache.flink.test.util.MiniClusterWithClientResource; import org.apache.flink.types.Row; import org.apache.iceberg.DistributionMode; import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Parameter; +import org.apache.iceberg.ParameterizedTestExtension; +import org.apache.iceberg.Parameters; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.SnapshotRef; import org.apache.iceberg.TableProperties; -import org.apache.iceberg.flink.HadoopCatalogResource; +import org.apache.iceberg.flink.HadoopCatalogExtension; import org.apache.iceberg.flink.MiniClusterResource; import org.apache.iceberg.flink.SimpleDataUtil; import org.apache.iceberg.flink.TableLoader; import org.apache.iceberg.flink.TestFixtures; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; -import org.junit.Assert; -import org.junit.Before; -import org.junit.ClassRule; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -@RunWith(Parameterized.class) -public class TestFlinkIcebergSinkBranch extends TestFlinkIcebergSinkBase { +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.extension.RegisterExtension; - @ClassRule - public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE = - MiniClusterResource.createWithClassloaderCheckDisabled(); +@ExtendWith(ParameterizedTestExtension.class) +public class TestFlinkIcebergSinkBranch extends TestFlinkIcebergSinkBase { + @RegisterExtension + public static final HadoopCatalogExtension catalogResource = + new HadoopCatalogExtension(DATABASE, TestFixtures.TABLE); - @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + @Parameter(index = 0) + private String formatVersion; - @Rule - public final HadoopCatalogResource catalogResource = - new HadoopCatalogResource(TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE); + @Parameter(index = 1) + private String branch; - private final String branch; private TableLoader tableLoader; - @Parameterized.Parameters(name = "formatVersion = {0}, branch = {1}") - public static Object[] parameters() { - return new Object[] {"main", "testBranch"}; - } - - public TestFlinkIcebergSinkBranch(String branch) { - this.branch = branch; + @Parameters(name = "formatVersion = {0}, branch = {1}") + public static Object[][] parameters() { + return new Object[][] { + {"1", "main"}, + {"1", "testBranch"}, + {"2", "main"}, + {"2", "testBranch"} + }; } - @Before + @BeforeEach public void before() throws IOException { table = catalogResource @@ -84,7 +84,7 @@ public void before() throws IOException { TableProperties.DEFAULT_FILE_FORMAT, FileFormat.AVRO.name(), TableProperties.FORMAT_VERSION, - "1")); + formatVersion)); env = StreamExecutionEnvironment.getExecutionEnvironment( @@ -94,7 +94,7 @@ public void before() throws IOException { tableLoader = catalogResource.tableLoader(); } - @Test + @TestTemplate public void testWriteRowWithTableSchema() throws Exception { testWriteRow(SimpleDataUtil.FLINK_SCHEMA, DistributionMode.NONE); verifyOtherBranchUnmodified(); @@ -129,9 +129,9 @@ private void verifyOtherBranchUnmodified() { String otherBranch = branch.equals(SnapshotRef.MAIN_BRANCH) ? "test-branch" : SnapshotRef.MAIN_BRANCH; if (otherBranch.equals(SnapshotRef.MAIN_BRANCH)) { - Assert.assertNull(table.currentSnapshot()); + assertThat(table.currentSnapshot()).isNull(); } - Assert.assertTrue(table.snapshot(otherBranch) == null); + assertThat(table.snapshot(otherBranch)).isNull(); } } diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java index 7712481d33d9..9cbb9f091e15 100644 --- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java @@ -18,85 +18,53 @@ */ package org.apache.iceberg.flink.sink; +import static org.apache.iceberg.flink.TestFixtures.DATABASE; +import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.assertj.core.api.Assumptions.assumeThat; import java.util.List; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.test.util.MiniClusterWithClientResource; +import org.apache.flink.test.junit5.MiniClusterExtension; import org.apache.flink.types.Row; import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileFormat; import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.ParameterizedTestExtension; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.SnapshotRef; import org.apache.iceberg.TableProperties; import org.apache.iceberg.data.Record; -import org.apache.iceberg.flink.HadoopCatalogResource; +import org.apache.iceberg.flink.HadoopCatalogExtension; import org.apache.iceberg.flink.MiniClusterResource; +import org.apache.iceberg.flink.MiniFlinkClusterExtension; import org.apache.iceberg.flink.SimpleDataUtil; import org.apache.iceberg.flink.TestFixtures; import org.apache.iceberg.flink.source.BoundedTestSource; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.types.Types; -import org.junit.Assert; -import org.junit.Before; -import org.junit.ClassRule; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.junit.rules.Timeout; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -@RunWith(Parameterized.class) +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.extension.RegisterExtension; + +@ExtendWith(ParameterizedTestExtension.class) +@Timeout(value = 60) public class TestFlinkIcebergSinkV2 extends TestFlinkIcebergSinkV2Base { + @RegisterExtension + public static MiniClusterExtension miniClusterResource = + MiniFlinkClusterExtension.createWithClassloaderCheckDisabled(); - @ClassRule - public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE = - MiniClusterResource.createWithClassloaderCheckDisabled(); - - @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); - - @Rule - public final HadoopCatalogResource catalogResource = - new HadoopCatalogResource(TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE); - - @Rule public final Timeout globalTimeout = Timeout.seconds(60); - - @Parameterized.Parameters( - name = "FileFormat = {0}, Parallelism = {1}, Partitioned={2}, WriteDistributionMode ={3}") - public static Object[][] parameters() { - return new Object[][] { - new Object[] {"avro", 1, true, TableProperties.WRITE_DISTRIBUTION_MODE_NONE}, - new Object[] {"avro", 1, false, TableProperties.WRITE_DISTRIBUTION_MODE_NONE}, - new Object[] {"avro", 4, true, TableProperties.WRITE_DISTRIBUTION_MODE_NONE}, - new Object[] {"avro", 4, false, TableProperties.WRITE_DISTRIBUTION_MODE_NONE}, - new Object[] {"orc", 1, true, TableProperties.WRITE_DISTRIBUTION_MODE_HASH}, - new Object[] {"orc", 1, false, TableProperties.WRITE_DISTRIBUTION_MODE_HASH}, - new Object[] {"orc", 4, true, TableProperties.WRITE_DISTRIBUTION_MODE_HASH}, - new Object[] {"orc", 4, false, TableProperties.WRITE_DISTRIBUTION_MODE_HASH}, - new Object[] {"parquet", 1, true, TableProperties.WRITE_DISTRIBUTION_MODE_RANGE}, - new Object[] {"parquet", 1, false, TableProperties.WRITE_DISTRIBUTION_MODE_RANGE}, - new Object[] {"parquet", 4, true, TableProperties.WRITE_DISTRIBUTION_MODE_RANGE}, - new Object[] {"parquet", 4, false, TableProperties.WRITE_DISTRIBUTION_MODE_RANGE} - }; - } - - public TestFlinkIcebergSinkV2( - String format, int parallelism, boolean partitioned, String writeDistributionMode) { - this.format = FileFormat.fromString(format); - this.parallelism = parallelism; - this.partitioned = partitioned; - this.writeDistributionMode = writeDistributionMode; - } + @RegisterExtension + private static final HadoopCatalogExtension catalogResource = + new HadoopCatalogExtension(DATABASE, TestFixtures.TABLE); - @Before + @BeforeEach public void setupTable() { table = catalogResource @@ -129,7 +97,7 @@ public void setupTable() { tableLoader = catalogResource.tableLoader(); } - @Test + @TestTemplate public void testCheckAndGetEqualityFieldIds() { table .updateSchema() @@ -144,28 +112,25 @@ public void testCheckAndGetEqualityFieldIds() { FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA).table(table); // Use schema identifier field IDs as equality field id list by default - Assert.assertEquals( - table.schema().identifierFieldIds(), - Sets.newHashSet(builder.checkAndGetEqualityFieldIds())); + assertThat(builder.checkAndGetEqualityFieldIds()) + .containsExactlyInAnyOrderElementsOf(table.schema().identifierFieldIds()); // Use user-provided equality field column as equality field id list builder.equalityFieldColumns(Lists.newArrayList("id")); - Assert.assertEquals( - Sets.newHashSet(table.schema().findField("id").fieldId()), - Sets.newHashSet(builder.checkAndGetEqualityFieldIds())); + assertThat(builder.checkAndGetEqualityFieldIds()) + .containsExactlyInAnyOrder(table.schema().findField("id").fieldId()); builder.equalityFieldColumns(Lists.newArrayList("type")); - Assert.assertEquals( - Sets.newHashSet(table.schema().findField("type").fieldId()), - Sets.newHashSet(builder.checkAndGetEqualityFieldIds())); + assertThat(builder.checkAndGetEqualityFieldIds()) + .containsExactlyInAnyOrder(table.schema().findField("type").fieldId()); } - @Test + @TestTemplate public void testChangeLogOnIdKey() throws Exception { testChangeLogOnIdKey(SnapshotRef.MAIN_BRANCH); } - @Test + @TestTemplate public void testUpsertOnlyDeletesOnDataKey() throws Exception { List> elementsPerCheckpoint = ImmutableList.of( @@ -184,22 +149,22 @@ public void testUpsertOnlyDeletesOnDataKey() throws Exception { SnapshotRef.MAIN_BRANCH); } - @Test + @TestTemplate public void testChangeLogOnDataKey() throws Exception { testChangeLogOnDataKey(SnapshotRef.MAIN_BRANCH); } - @Test + @TestTemplate public void testChangeLogOnIdDataKey() throws Exception { testChangeLogOnIdDataKey(SnapshotRef.MAIN_BRANCH); } - @Test + @TestTemplate public void testChangeLogOnSameKey() throws Exception { testChangeLogOnSameKey(SnapshotRef.MAIN_BRANCH); } - @Test + @TestTemplate public void testUpsertModeCheck() throws Exception { DataStream dataStream = env.addSource(new BoundedTestSource<>(ImmutableList.of()), ROW_TYPE_INFO); @@ -227,22 +192,22 @@ public void testUpsertModeCheck() throws Exception { "Equality field columns shouldn't be empty when configuring to use UPSERT data stream."); } - @Test + @TestTemplate public void testUpsertOnIdKey() throws Exception { testUpsertOnIdKey(SnapshotRef.MAIN_BRANCH); } - @Test + @TestTemplate public void testUpsertOnDataKey() throws Exception { testUpsertOnDataKey(SnapshotRef.MAIN_BRANCH); } - @Test + @TestTemplate public void testUpsertOnIdDataKey() throws Exception { testUpsertOnIdDataKey(SnapshotRef.MAIN_BRANCH); } - @Test + @TestTemplate public void testDeleteStats() throws Exception { assumeThat(format).isNotEqualTo(FileFormat.AVRO); diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2Base.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2Base.java index 9cdf7743c485..fc33c2fea5e6 100644 --- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2Base.java +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2Base.java @@ -18,6 +18,7 @@ */ package org.apache.iceberg.flink.sink; +import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import java.io.IOException; @@ -31,6 +32,8 @@ import org.apache.flink.types.Row; import org.apache.flink.types.RowKind; import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Parameter; +import org.apache.iceberg.Parameters; import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; @@ -44,7 +47,6 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.util.StructLikeSet; -import org.junit.Assert; public class TestFlinkIcebergSinkV2Base { @@ -55,14 +57,40 @@ public class TestFlinkIcebergSinkV2Base { protected static final int ROW_ID_POS = 0; protected static final int ROW_DATA_POS = 1; - protected int parallelism = 1; protected TableLoader tableLoader; protected Table table; protected StreamExecutionEnvironment env; + + @Parameter(index = 0) protected FileFormat format; + + @Parameter(index = 1) + protected int parallelism = 1; + + @Parameter(index = 2) protected boolean partitioned; + + @Parameter(index = 3) protected String writeDistributionMode; + @Parameters(name = "FileFormat={0}, Parallelism={1}, Partitioned={2}, WriteDistributionMode={3}") + public static Object[][] parameters() { + return new Object[][] { + new Object[] {FileFormat.AVRO, 1, true, TableProperties.WRITE_DISTRIBUTION_MODE_NONE}, + new Object[] {FileFormat.AVRO, 1, false, TableProperties.WRITE_DISTRIBUTION_MODE_NONE}, + new Object[] {FileFormat.AVRO, 4, true, TableProperties.WRITE_DISTRIBUTION_MODE_NONE}, + new Object[] {FileFormat.AVRO, 4, false, TableProperties.WRITE_DISTRIBUTION_MODE_NONE}, + new Object[] {FileFormat.ORC, 1, true, TableProperties.WRITE_DISTRIBUTION_MODE_HASH}, + new Object[] {FileFormat.ORC, 1, false, TableProperties.WRITE_DISTRIBUTION_MODE_HASH}, + new Object[] {FileFormat.ORC, 4, true, TableProperties.WRITE_DISTRIBUTION_MODE_HASH}, + new Object[] {FileFormat.ORC, 4, false, TableProperties.WRITE_DISTRIBUTION_MODE_HASH}, + new Object[] {FileFormat.PARQUET, 1, true, TableProperties.WRITE_DISTRIBUTION_MODE_RANGE}, + new Object[] {FileFormat.PARQUET, 1, false, TableProperties.WRITE_DISTRIBUTION_MODE_RANGE}, + new Object[] {FileFormat.PARQUET, 4, true, TableProperties.WRITE_DISTRIBUTION_MODE_RANGE}, + new Object[] {FileFormat.PARQUET, 4, false, TableProperties.WRITE_DISTRIBUTION_MODE_RANGE} + }; + } + protected static final Map ROW_KIND_MAP = ImmutableMap.of( "+I", RowKind.INSERT, @@ -319,16 +347,14 @@ protected void testChangeLogs( table.refresh(); List snapshots = findValidSnapshots(); int expectedSnapshotNum = expectedRecordsPerCheckpoint.size(); - Assert.assertEquals( - "Should have the expected snapshot number", expectedSnapshotNum, snapshots.size()); + assertThat(snapshots).hasSize(expectedSnapshotNum); for (int i = 0; i < expectedSnapshotNum; i++) { long snapshotId = snapshots.get(i).snapshotId(); List expectedRecords = expectedRecordsPerCheckpoint.get(i); - Assert.assertEquals( - "Should have the expected records for the checkpoint#" + i, - expectedRowSet(expectedRecords.toArray(new Record[0])), - actualRowSet(snapshotId, "*")); + assertThat(actualRowSet(snapshotId, "*")) + .as("Should have the expected records for the checkpoint#" + i) + .isEqualTo(expectedRowSet(expectedRecords.toArray(new Record[0]))); } } diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2Branch.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2Branch.java index fed333848279..1c5c97b58d2d 100644 --- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2Branch.java +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2Branch.java @@ -18,52 +18,43 @@ */ package org.apache.iceberg.flink.sink; +import static org.apache.iceberg.flink.TestFixtures.DATABASE; +import static org.assertj.core.api.Assertions.assertThat; + import java.io.IOException; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.test.util.MiniClusterWithClientResource; import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Parameter; +import org.apache.iceberg.ParameterizedTestExtension; +import org.apache.iceberg.Parameters; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.SnapshotRef; import org.apache.iceberg.TableProperties; -import org.apache.iceberg.flink.HadoopCatalogResource; +import org.apache.iceberg.flink.HadoopCatalogExtension; import org.apache.iceberg.flink.MiniClusterResource; import org.apache.iceberg.flink.SimpleDataUtil; import org.apache.iceberg.flink.TestFixtures; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; -import org.junit.Assert; -import org.junit.Before; -import org.junit.ClassRule; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -@RunWith(Parameterized.class) -public class TestFlinkIcebergSinkV2Branch extends TestFlinkIcebergSinkV2Base { - - @ClassRule - public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE = - MiniClusterResource.createWithClassloaderCheckDisabled(); - - @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.extension.RegisterExtension; - @Rule - public final HadoopCatalogResource catalogResource = - new HadoopCatalogResource(TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE); - - private final String branch; +@ExtendWith(ParameterizedTestExtension.class) +public class TestFlinkIcebergSinkV2Branch extends TestFlinkIcebergSinkV2Base { + @RegisterExtension + private static final HadoopCatalogExtension catalogResource = + new HadoopCatalogExtension(DATABASE, TestFixtures.TABLE); - @Parameterized.Parameters(name = "branch = {0}") - public static Object[] parameters() { - return new Object[] {"main", "testBranch"}; - } + @Parameter(index = 0) + private String branch; - public TestFlinkIcebergSinkV2Branch(String branch) { - this.branch = branch; + @Parameters(name = "branch = {0}") + public static Object[][] parameters() { + return new Object[][] {new Object[] {"main"}, new Object[] {"testBranch"}}; } - @Before + @BeforeEach public void before() throws IOException { table = catalogResource @@ -86,37 +77,37 @@ public void before() throws IOException { tableLoader = catalogResource.tableLoader(); } - @Test + @TestTemplate public void testChangeLogOnIdKey() throws Exception { testChangeLogOnIdKey(branch); verifyOtherBranchUnmodified(); } - @Test + @TestTemplate public void testChangeLogOnDataKey() throws Exception { testChangeLogOnDataKey(branch); verifyOtherBranchUnmodified(); } - @Test + @TestTemplate public void testChangeLogOnIdDataKey() throws Exception { testChangeLogOnIdDataKey(branch); verifyOtherBranchUnmodified(); } - @Test + @TestTemplate public void testUpsertOnIdKey() throws Exception { testUpsertOnIdKey(branch); verifyOtherBranchUnmodified(); } - @Test + @TestTemplate public void testUpsertOnDataKey() throws Exception { testUpsertOnDataKey(branch); verifyOtherBranchUnmodified(); } - @Test + @TestTemplate public void testUpsertOnIdDataKey() throws Exception { testUpsertOnIdDataKey(branch); verifyOtherBranchUnmodified(); @@ -126,9 +117,9 @@ private void verifyOtherBranchUnmodified() { String otherBranch = branch.equals(SnapshotRef.MAIN_BRANCH) ? "test-branch" : SnapshotRef.MAIN_BRANCH; if (otherBranch.equals(SnapshotRef.MAIN_BRANCH)) { - Assert.assertNull(table.currentSnapshot()); + assertThat(table.currentSnapshot()).isNull(); } - Assert.assertTrue(table.snapshot(otherBranch) == null); + assertThat(table.snapshot(otherBranch)).isNull(); } } diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkSourceSql.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkSourceSql.java index affd90c347dd..6857e0a7a366 100644 --- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkSourceSql.java +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkSourceSql.java @@ -33,7 +33,7 @@ import org.apache.iceberg.flink.TestFixtures; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; -import org.junit.Test; +import org.junit.jupiter.api.Test; /** Use the FlinkSource */ public class TestFlinkSourceSql extends TestSqlBase { @@ -61,7 +61,7 @@ public void testInferParallelismWithGlobalSetting() throws IOException { .createTable(TestFixtures.TABLE_IDENTIFIER, TestFixtures.SCHEMA, null); GenericAppenderHelper helper = - new GenericAppenderHelper(table, FileFormat.PARQUET, TEMPORARY_FOLDER); + new GenericAppenderHelper(table, FileFormat.PARQUET, temporaryFolder); List expectedRecords = Lists.newArrayList(); long maxFileLen = 0; for (int i = 0; i < 5; i++) { diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java index 4250460d278d..645af7cfa339 100644 --- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java @@ -40,7 +40,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.Types; -import org.junit.Test; +import org.junit.jupiter.api.Test; /** Use the IcebergSource (FLIP-27) */ public class TestIcebergSourceSql extends TestSqlBase { @@ -78,7 +78,7 @@ private List generateExpectedRecords(boolean ascending) throws Exception long baseTime = 1702382109000L; GenericAppenderHelper helper = - new GenericAppenderHelper(table, FileFormat.PARQUET, TEMPORARY_FOLDER); + new GenericAppenderHelper(table, FileFormat.PARQUET, temporaryFolder); Record file1Record1 = generateRecord(Instant.ofEpochMilli(baseTime), baseTime + (1000 * 60 * 60 * 24 * 30L)); diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestSqlBase.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestSqlBase.java index dda46033143e..94962e02bb05 100644 --- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestSqlBase.java +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestSqlBase.java @@ -18,13 +18,17 @@ */ package org.apache.iceberg.flink.source; +import static org.apache.iceberg.flink.TestFixtures.DATABASE; +import static org.assertj.core.api.Assertions.assertThat; + import java.io.IOException; +import java.nio.file.Path; import java.util.List; import java.util.Map; import org.apache.flink.configuration.Configuration; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableEnvironment; -import org.apache.flink.test.util.MiniClusterWithClientResource; +import org.apache.flink.test.junit5.MiniClusterExtension; import org.apache.flink.types.Row; import org.apache.iceberg.DataFile; import org.apache.iceberg.FileFormat; @@ -34,30 +38,28 @@ import org.apache.iceberg.data.RandomGenericData; import org.apache.iceberg.data.Record; import org.apache.iceberg.flink.FlinkConfigOptions; -import org.apache.iceberg.flink.HadoopCatalogResource; -import org.apache.iceberg.flink.MiniClusterResource; +import org.apache.iceberg.flink.HadoopCatalogExtension; +import org.apache.iceberg.flink.MiniFlinkClusterExtension; import org.apache.iceberg.flink.TableLoader; import org.apache.iceberg.flink.TestFixtures; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; -import org.junit.Assert; -import org.junit.Before; -import org.junit.ClassRule; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.api.io.TempDir; /** Test other more advanced usage of SQL. They don't need to run for every file format. */ public abstract class TestSqlBase { - @ClassRule - public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE = - MiniClusterResource.createWithClassloaderCheckDisabled(); + @RegisterExtension + public static MiniClusterExtension miniClusterResource = + MiniFlinkClusterExtension.createWithClassloaderCheckDisabled(); - @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + @RegisterExtension + public static final HadoopCatalogExtension catalogResource = + new HadoopCatalogExtension(DATABASE, TestFixtures.TABLE); - @Rule - public final HadoopCatalogResource catalogResource = - new HadoopCatalogResource(TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE); + @TempDir protected Path temporaryFolder; private volatile TableEnvironment tEnv; @@ -73,7 +75,7 @@ protected TableEnvironment getTableEnv() { return tEnv; } - @Before + @BeforeEach public abstract void before() throws IOException; @Test @@ -90,7 +92,7 @@ public void testResiduals() throws Exception { writeRecords.get(1).set(2, "2020-03-20"); GenericAppenderHelper helper = - new GenericAppenderHelper(table, FileFormat.PARQUET, TEMPORARY_FOLDER); + new GenericAppenderHelper(table, FileFormat.PARQUET, temporaryFolder); List expectedRecords = Lists.newArrayList(); expectedRecords.add(writeRecords.get(0)); @@ -120,7 +122,7 @@ public void testExposeLocality() throws Exception { expectedRecords.forEach(expectedRecord -> expectedRecord.set(2, "2020-03-20")); GenericAppenderHelper helper = - new GenericAppenderHelper(table, FileFormat.PARQUET, TEMPORARY_FOLDER); + new GenericAppenderHelper(table, FileFormat.PARQUET, temporaryFolder); DataFile dataFile = helper.writeFile(TestHelpers.Row.of("2020-03-20", 0), expectedRecords); helper.appendToTable(dataFile); @@ -140,9 +142,9 @@ public void testExposeLocality() throws Exception { // When running with CI or local, `localityEnabled` will be false even if this configuration is // enabled - Assert.assertFalse( - "Expose split locality info should be false.", - SourceUtil.isLocalityEnabled(table, tableConf, true)); + assertThat(SourceUtil.isLocalityEnabled(table, tableConf, true)) + .as("Expose split locality info should be false.") + .isFalse(); results = run(Maps.newHashMap(), "where dt='2020-03-20'", "*"); org.apache.iceberg.flink.TestHelpers.assertRecords( diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/reader/ReaderUtil.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/reader/ReaderUtil.java index f06d9b83bcd4..e3e341ca2c76 100644 --- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/reader/ReaderUtil.java +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/reader/ReaderUtil.java @@ -20,6 +20,7 @@ import java.io.File; import java.io.IOException; +import java.nio.file.Path; import java.util.Collections; import java.util.List; import org.apache.flink.table.data.RowData; @@ -106,6 +107,7 @@ public static List> createRecordBatchList( return Lists.partition(records, batchCount); } + // Only for JUnit4 tests. Keep this method for test migration from JUnit4 to JUnit5 public static CombinedScanTask createCombinedScanTask( List> recordBatchList, TemporaryFolder temporaryFolder, @@ -122,4 +124,24 @@ public static CombinedScanTask createCombinedScanTask( return new BaseCombinedScanTask(fileTasks); } + + public static CombinedScanTask createCombinedScanTask( + List> recordBatchList, + Path temporaryFolder, + FileFormat fileFormat, + GenericAppenderFactory appenderFactory) + throws IOException { + List fileTasks = Lists.newArrayListWithCapacity(recordBatchList.size()); + for (List recordBatch : recordBatchList) { + FileScanTask fileTask = + ReaderUtil.createFileTask( + recordBatch, + File.createTempFile("junit", null, temporaryFolder.toFile()), + fileFormat, + appenderFactory); + fileTasks.add(fileTask); + } + + return new BaseCombinedScanTask(fileTasks); + } } diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestColumnStatsWatermarkExtractor.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestColumnStatsWatermarkExtractor.java index 604bc09619e0..7033fd30e84f 100644 --- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestColumnStatsWatermarkExtractor.java +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestColumnStatsWatermarkExtractor.java @@ -18,10 +18,14 @@ */ package org.apache.iceberg.flink.source.reader; +import static org.apache.iceberg.flink.TestFixtures.DATABASE; import static org.apache.iceberg.types.Types.NestedField.required; +import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.api.Assumptions.assumeThat; import java.io.IOException; +import java.nio.file.Path; import java.time.LocalDateTime; import java.time.OffsetDateTime; import java.time.ZoneOffset; @@ -30,27 +34,26 @@ import java.util.Map; import java.util.concurrent.TimeUnit; import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Parameter; +import org.apache.iceberg.ParameterizedTestExtension; +import org.apache.iceberg.Parameters; import org.apache.iceberg.Schema; import org.apache.iceberg.data.GenericAppenderFactory; import org.apache.iceberg.data.RandomGenericData; import org.apache.iceberg.data.Record; -import org.apache.iceberg.flink.HadoopTableResource; +import org.apache.iceberg.flink.HadoopTableExtension; import org.apache.iceberg.flink.TestFixtures; import org.apache.iceberg.flink.source.split.IcebergSourceSplit; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.types.Types; -import org.junit.Assert; -import org.junit.Assume; -import org.junit.BeforeClass; -import org.junit.ClassRule; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -@RunWith(Parameterized.class) +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.api.io.TempDir; + +@ExtendWith(ParameterizedTestExtension.class) public class TestColumnStatsWatermarkExtractor { public static final Schema SCHEMA = new Schema( @@ -68,15 +71,16 @@ public class TestColumnStatsWatermarkExtractor { private static final List> MIN_VALUES = ImmutableList.of(Maps.newHashMapWithExpectedSize(3), Maps.newHashMapWithExpectedSize(3)); - @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + @TempDir protected Path temporaryFolder; - @Rule - public final HadoopTableResource sourceTableResource = - new HadoopTableResource(TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE, SCHEMA); + @RegisterExtension + private static final HadoopTableExtension sourceTableResource = + new HadoopTableExtension(DATABASE, TestFixtures.TABLE, SCHEMA); - private final String columnName; + @Parameter(index = 0) + private String columnName; - @BeforeClass + @BeforeAll public static void updateMinValue() { for (int i = 0; i < TEST_RECORDS.size(); ++i) { for (Record r : TEST_RECORDS.get(i)) { @@ -94,7 +98,7 @@ public static void updateMinValue() { } } - @Parameterized.Parameters(name = "{0}") + @Parameters(name = "columnName = {0}") public static Collection data() { return ImmutableList.of( new Object[] {"timestamp_column"}, @@ -102,62 +106,56 @@ public static Collection data() { new Object[] {"long_column"}); } - public TestColumnStatsWatermarkExtractor(String columnName) { - this.columnName = columnName; - } - - @Test + @TestTemplate public void testSingle() throws IOException { ColumnStatsWatermarkExtractor extractor = new ColumnStatsWatermarkExtractor(SCHEMA, columnName, TimeUnit.MILLISECONDS); - Assert.assertEquals( - MIN_VALUES.get(0).get(columnName).longValue(), extractor.extractWatermark(split(0))); + assertThat(extractor.extractWatermark(split(0))) + .isEqualTo(MIN_VALUES.get(0).get(columnName).longValue()); } - @Test + @TestTemplate public void testTimeUnit() throws IOException { - Assume.assumeTrue("Run only for long column", columnName.equals("long_column")); + assumeThat(columnName).isEqualTo("long_column"); ColumnStatsWatermarkExtractor extractor = new ColumnStatsWatermarkExtractor(SCHEMA, columnName, TimeUnit.MICROSECONDS); - Assert.assertEquals( - MIN_VALUES.get(0).get(columnName).longValue() / 1000L, - extractor.extractWatermark(split(0))); + assertThat(extractor.extractWatermark(split(0))) + .isEqualTo(MIN_VALUES.get(0).get(columnName).longValue() / 1000L); } - @Test + @TestTemplate public void testMultipleFiles() throws IOException { - Assume.assumeTrue("Run only for the timestamp column", columnName.equals("timestamp_column")); + assumeThat(columnName).isEqualTo("timestamp_column"); IcebergSourceSplit combinedSplit = IcebergSourceSplit.fromCombinedScanTask( ReaderUtil.createCombinedScanTask( - TEST_RECORDS, TEMPORARY_FOLDER, FileFormat.PARQUET, APPENDER_FACTORY)); + TEST_RECORDS, temporaryFolder, FileFormat.PARQUET, APPENDER_FACTORY)); ColumnStatsWatermarkExtractor extractor = new ColumnStatsWatermarkExtractor(SCHEMA, columnName, null); - Assert.assertEquals( - MIN_VALUES.get(0).get(columnName).longValue(), extractor.extractWatermark(split(0))); - Assert.assertEquals( - MIN_VALUES.get(1).get(columnName).longValue(), extractor.extractWatermark(split(1))); - Assert.assertEquals( - Math.min(MIN_VALUES.get(0).get(columnName), MIN_VALUES.get(1).get(columnName)), - extractor.extractWatermark(combinedSplit)); + assertThat(extractor.extractWatermark(split(0))) + .isEqualTo(MIN_VALUES.get(0).get(columnName).longValue()); + assertThat(extractor.extractWatermark(split(1))) + .isEqualTo(MIN_VALUES.get(1).get(columnName).longValue()); + assertThat(extractor.extractWatermark(combinedSplit)) + .isEqualTo(Math.min(MIN_VALUES.get(0).get(columnName), MIN_VALUES.get(1).get(columnName))); } - @Test + @TestTemplate public void testWrongColumn() { - Assume.assumeTrue("Run only for string column", columnName.equals("string_column")); + assumeThat(columnName).isEqualTo("string_column"); assertThatThrownBy(() -> new ColumnStatsWatermarkExtractor(SCHEMA, columnName, null)) .isInstanceOf(IllegalArgumentException.class) .hasMessageContaining( "Found STRING, expected a LONG or TIMESTAMP column for watermark generation."); } - @Test + @TestTemplate public void testEmptyStatistics() throws IOException { - Assume.assumeTrue("Run only for timestamp column", columnName.equals("timestamp_column")); + assumeThat(columnName).isEqualTo("timestamp_column"); // Create an extractor for a column we do not have statistics ColumnStatsWatermarkExtractor extractor = @@ -171,7 +169,7 @@ private IcebergSourceSplit split(int id) throws IOException { return IcebergSourceSplit.fromCombinedScanTask( ReaderUtil.createCombinedScanTask( ImmutableList.of(TEST_RECORDS.get(id)), - TEMPORARY_FOLDER, + temporaryFolder, FileFormat.PARQUET, APPENDER_FACTORY)); }