diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TableSourceTestBase.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TableSourceTestBase.java new file mode 100644 index 000000000000..32c81d9465a4 --- /dev/null +++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TableSourceTestBase.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.source; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.File; +import java.io.IOException; +import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Parameter; +import org.apache.iceberg.ParameterizedTestExtension; +import org.apache.iceberg.Parameters; +import org.apache.iceberg.events.Listeners; +import org.apache.iceberg.events.ScanEvent; +import org.apache.iceberg.flink.FlinkConfigOptions; +import org.apache.iceberg.flink.TestBase; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.extension.ExtendWith; + +@ExtendWith(ParameterizedTestExtension.class) +public abstract class TableSourceTestBase extends TestBase { + @Parameters(name = "useFlip27Source = {0}") + protected static Object[][] parameters() { + return new Object[][] { + {false}, {true}, + }; + } + + @Parameter(index = 0) + protected boolean useFlip27Source; + + protected static final String CATALOG_NAME = "test_catalog"; + protected static final String DATABASE_NAME = "test_db"; + protected static final String TABLE_NAME = "test_table"; + protected final FileFormat format = FileFormat.AVRO; + protected int scanEventCount = 0; + protected ScanEvent lastScanEvent = null; + + @Override + protected TableEnvironment getTableEnv() { + super.getTableEnv().getConfig().getConfiguration().set(CoreOptions.DEFAULT_PARALLELISM, 1); + super.getTableEnv() + .getConfig() + .getConfiguration() + .setBoolean(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_FLIP27_SOURCE.key(), useFlip27Source); + return super.getTableEnv(); + } + + @BeforeEach + public void before() throws IOException { + // register a scan event listener to validate pushdown + Listeners.register( + event -> { + scanEventCount += 1; + lastScanEvent = event; + }, + ScanEvent.class); + + File warehouseFile = File.createTempFile("junit", null, temporaryDirectory.toFile()); + assertThat(warehouseFile.delete()).isTrue(); + String warehouse = String.format("file:%s", warehouseFile); + + sql( + "CREATE CATALOG %s WITH ('type'='iceberg', 'catalog-type'='hadoop', 'warehouse'='%s')", + CATALOG_NAME, warehouse); + sql("USE CATALOG %s", CATALOG_NAME); + sql("CREATE DATABASE %s", DATABASE_NAME); + sql("USE %s", DATABASE_NAME); + sql( + "CREATE TABLE %s (id INT, data VARCHAR,d DOUBLE) WITH ('write.format.default'='%s')", + TABLE_NAME, format.name()); + sql( + "INSERT INTO %s VALUES (1,'iceberg',10),(2,'b',20),(3,CAST(NULL AS VARCHAR),30)", + TABLE_NAME); + + this.scanEventCount = 0; + this.lastScanEvent = null; + } + + @AfterEach + public void clean() { + sql("DROP TABLE IF EXISTS %s.%s", DATABASE_NAME, TABLE_NAME); + dropCatalog(CATALOG_NAME, true); + } +} diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkSourceConfig.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkSourceConfig.java index bc7194e38088..8131bd7ab0d3 100644 --- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkSourceConfig.java +++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkSourceConfig.java @@ -20,16 +20,17 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.api.Assumptions.assumeThat; import java.util.List; import org.apache.flink.types.Row; import org.apache.iceberg.flink.FlinkReadOptions; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestTemplate; -public class TestFlinkSourceConfig extends TestFlinkTableSource { +public class TestFlinkSourceConfig extends TableSourceTestBase { private static final String TABLE = "test_table"; - @Test + @TestTemplate public void testFlinkSessionConfig() { getTableEnv().getConfig().set(FlinkReadOptions.STREAMING_OPTION, true); assertThatThrownBy(() -> sql("SELECT * FROM %s /*+ OPTIONS('as-of-timestamp'='1')*/", TABLE)) @@ -37,7 +38,7 @@ public void testFlinkSessionConfig() { .hasMessage("Cannot set as-of-timestamp option for streaming reader"); } - @Test + @TestTemplate public void testFlinkHintConfig() { List result = sql( @@ -46,8 +47,11 @@ public void testFlinkHintConfig() { assertThat(result).hasSize(3); } - @Test + @TestTemplate public void testReadOptionHierarchy() { + // TODO: FLIP-27 source doesn't implement limit pushdown yet + assumeThat(useFlip27Source).isFalse(); + getTableEnv().getConfig().set(FlinkReadOptions.LIMIT_OPTION, 1L); List result = sql("SELECT * FROM %s", TABLE); assertThat(result).hasSize(1); diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkTableSource.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkTableSource.java index 01bab6d063fd..18528c789114 100644 --- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkTableSource.java +++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkTableSource.java @@ -21,78 +21,16 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; -import java.io.File; -import java.io.IOException; import java.util.List; -import org.apache.flink.configuration.CoreOptions; import org.apache.flink.table.api.SqlParserException; -import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.types.Row; -import org.apache.iceberg.FileFormat; -import org.apache.iceberg.events.Listeners; -import org.apache.iceberg.events.ScanEvent; import org.apache.iceberg.expressions.Expressions; -import org.apache.iceberg.flink.TestBase; import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -public class TestFlinkTableSource extends TestBase { - - private static final String CATALOG_NAME = "test_catalog"; - private static final String DATABASE_NAME = "test_db"; - private static final String TABLE_NAME = "test_table"; - private final FileFormat format = FileFormat.AVRO; - - private int scanEventCount = 0; - private ScanEvent lastScanEvent = null; - - @Override - protected TableEnvironment getTableEnv() { - super.getTableEnv().getConfig().getConfiguration().set(CoreOptions.DEFAULT_PARALLELISM, 1); - return super.getTableEnv(); - } - - @BeforeEach - public void before() throws IOException { - // register a scan event listener to validate pushdown - Listeners.register( - event -> { - scanEventCount += 1; - lastScanEvent = event; - }, - ScanEvent.class); - - File warehouseFile = File.createTempFile("junit", null, temporaryDirectory.toFile()); - assertThat(warehouseFile.delete()).isTrue(); - String warehouse = String.format("file:%s", warehouseFile); - - sql( - "CREATE CATALOG %s WITH ('type'='iceberg', 'catalog-type'='hadoop', 'warehouse'='%s')", - CATALOG_NAME, warehouse); - sql("USE CATALOG %s", CATALOG_NAME); - sql("CREATE DATABASE %s", DATABASE_NAME); - sql("USE %s", DATABASE_NAME); - sql( - "CREATE TABLE %s (id INT, data VARCHAR,d DOUBLE) WITH ('write.format.default'='%s')", - TABLE_NAME, format.name()); - sql( - "INSERT INTO %s VALUES (1,'iceberg',10),(2,'b',20),(3,CAST(NULL AS VARCHAR),30)", - TABLE_NAME); - - this.scanEventCount = 0; - this.lastScanEvent = null; - } - - @AfterEach - public void clean() { - sql("DROP TABLE IF EXISTS %s.%s", DATABASE_NAME, TABLE_NAME); - sql("DROP DATABASE IF EXISTS %s", DATABASE_NAME); - dropCatalog(CATALOG_NAME, true); - } - - @Test +import org.junit.jupiter.api.TestTemplate; + +public class TestFlinkTableSource extends TableSourceTestBase { + + @TestTemplate public void testLimitPushDown() { assertThatThrownBy(() -> sql("SELECT * FROM %s LIMIT -1", TABLE_NAME)) @@ -121,7 +59,7 @@ public void testLimitPushDown() { assertThat(mixedResult).hasSize(1).first().isEqualTo(Row.of(1, "iceberg", 10.0)); } - @Test + @TestTemplate public void testNoFilterPushDown() { String sql = String.format("SELECT * FROM %s ", TABLE_NAME); List result = sql(sql); @@ -133,7 +71,7 @@ public void testNoFilterPushDown() { .isEqualTo(Expressions.alwaysTrue()); } - @Test + @TestTemplate public void testFilterPushDownEqual() { String sqlLiteralRight = String.format("SELECT * FROM %s WHERE id = 1 ", TABLE_NAME); String expectedFilter = "ref(name=\"id\") == 1"; @@ -147,7 +85,7 @@ public void testFilterPushDownEqual() { .isEqualTo(expectedFilter); } - @Test + @TestTemplate public void testFilterPushDownEqualNull() { String sqlEqualNull = String.format("SELECT * FROM %s WHERE data = NULL ", TABLE_NAME); @@ -156,7 +94,7 @@ public void testFilterPushDownEqualNull() { assertThat(lastScanEvent).as("Should not push down a filter").isNull(); } - @Test + @TestTemplate public void testFilterPushDownEqualLiteralOnLeft() { String sqlLiteralLeft = String.format("SELECT * FROM %s WHERE 1 = id ", TABLE_NAME); String expectedFilter = "ref(name=\"id\") == 1"; @@ -170,7 +108,7 @@ public void testFilterPushDownEqualLiteralOnLeft() { .isEqualTo(expectedFilter); } - @Test + @TestTemplate public void testFilterPushDownNoEqual() { String sqlNE = String.format("SELECT * FROM %s WHERE id <> 1 ", TABLE_NAME); String expectedFilter = "ref(name=\"id\") != 1"; @@ -187,7 +125,7 @@ public void testFilterPushDownNoEqual() { .isEqualTo(expectedFilter); } - @Test + @TestTemplate public void testFilterPushDownNoEqualNull() { String sqlNotEqualNull = String.format("SELECT * FROM %s WHERE data <> NULL ", TABLE_NAME); @@ -196,7 +134,7 @@ public void testFilterPushDownNoEqualNull() { assertThat(lastScanEvent).as("Should not push down a filter").isNull(); } - @Test + @TestTemplate public void testFilterPushDownAnd() { String sqlAnd = String.format("SELECT * FROM %s WHERE id = 1 AND data = 'iceberg' ", TABLE_NAME); @@ -211,7 +149,7 @@ public void testFilterPushDownAnd() { .isEqualTo(expected); } - @Test + @TestTemplate public void testFilterPushDownOr() { String sqlOr = String.format("SELECT * FROM %s WHERE id = 1 OR data = 'b' ", TABLE_NAME); String expectedFilter = "(ref(name=\"id\") == 1 or ref(name=\"data\") == \"b\")"; @@ -229,7 +167,7 @@ public void testFilterPushDownOr() { .isEqualTo(expectedFilter); } - @Test + @TestTemplate public void testFilterPushDownGreaterThan() { String sqlGT = String.format("SELECT * FROM %s WHERE id > 1 ", TABLE_NAME); String expectedFilter = "ref(name=\"id\") > 1"; @@ -247,7 +185,7 @@ public void testFilterPushDownGreaterThan() { .isEqualTo(expectedFilter); } - @Test + @TestTemplate public void testFilterPushDownGreaterThanNull() { String sqlGT = String.format("SELECT * FROM %s WHERE data > null ", TABLE_NAME); @@ -256,7 +194,7 @@ public void testFilterPushDownGreaterThanNull() { assertThat(lastScanEvent).as("Should not push down a filter").isNull(); } - @Test + @TestTemplate public void testFilterPushDownGreaterThanLiteralOnLeft() { String sqlGT = String.format("SELECT * FROM %s WHERE 3 > id ", TABLE_NAME); String expectedFilter = "ref(name=\"id\") < 3"; @@ -274,7 +212,7 @@ public void testFilterPushDownGreaterThanLiteralOnLeft() { .isEqualTo(expectedFilter); } - @Test + @TestTemplate public void testFilterPushDownGreaterThanEqual() { String sqlGTE = String.format("SELECT * FROM %s WHERE id >= 2 ", TABLE_NAME); String expectedFilter = "ref(name=\"id\") >= 2"; @@ -292,7 +230,7 @@ public void testFilterPushDownGreaterThanEqual() { .isEqualTo(expectedFilter); } - @Test + @TestTemplate public void testFilterPushDownGreaterThanEqualNull() { String sqlGTE = String.format("SELECT * FROM %s WHERE data >= null ", TABLE_NAME); @@ -301,7 +239,7 @@ public void testFilterPushDownGreaterThanEqualNull() { assertThat(lastScanEvent).as("Should not push down a filter").isNull(); } - @Test + @TestTemplate public void testFilterPushDownGreaterThanEqualLiteralOnLeft() { String sqlGTE = String.format("SELECT * FROM %s WHERE 2 >= id ", TABLE_NAME); String expectedFilter = "ref(name=\"id\") <= 2"; @@ -319,7 +257,7 @@ public void testFilterPushDownGreaterThanEqualLiteralOnLeft() { .isEqualTo(expectedFilter); } - @Test + @TestTemplate public void testFilterPushDownLessThan() { String sqlLT = String.format("SELECT * FROM %s WHERE id < 2 ", TABLE_NAME); String expectedFilter = "ref(name=\"id\") < 2"; @@ -334,7 +272,7 @@ public void testFilterPushDownLessThan() { .isEqualTo(expectedFilter); } - @Test + @TestTemplate public void testFilterPushDownLessThanNull() { String sqlLT = String.format("SELECT * FROM %s WHERE data < null ", TABLE_NAME); @@ -343,7 +281,7 @@ public void testFilterPushDownLessThanNull() { assertThat(lastScanEvent).as("Should not push down a filter").isNull(); } - @Test + @TestTemplate public void testFilterPushDownLessThanLiteralOnLeft() { String sqlLT = String.format("SELECT * FROM %s WHERE 2 < id ", TABLE_NAME); String expectedFilter = "ref(name=\"id\") > 2"; @@ -358,7 +296,7 @@ public void testFilterPushDownLessThanLiteralOnLeft() { .isEqualTo(expectedFilter); } - @Test + @TestTemplate public void testFilterPushDownLessThanEqual() { String sqlLTE = String.format("SELECT * FROM %s WHERE id <= 1 ", TABLE_NAME); String expectedFilter = "ref(name=\"id\") <= 1"; @@ -373,7 +311,7 @@ public void testFilterPushDownLessThanEqual() { .isEqualTo(expectedFilter); } - @Test + @TestTemplate public void testFilterPushDownLessThanEqualNull() { String sqlLTE = String.format("SELECT * FROM %s WHERE data <= null ", TABLE_NAME); @@ -382,7 +320,7 @@ public void testFilterPushDownLessThanEqualNull() { assertThat(lastScanEvent).as("Should not push down a filter").isNull(); } - @Test + @TestTemplate public void testFilterPushDownLessThanEqualLiteralOnLeft() { String sqlLTE = String.format("SELECT * FROM %s WHERE 3 <= id ", TABLE_NAME); String expectedFilter = "ref(name=\"id\") >= 3"; @@ -397,7 +335,7 @@ public void testFilterPushDownLessThanEqualLiteralOnLeft() { .isEqualTo(expectedFilter); } - @Test + @TestTemplate public void testFilterPushDownIn() { String sqlIN = String.format("SELECT * FROM %s WHERE id IN (1,2) ", TABLE_NAME); String expectedFilter = "(ref(name=\"id\") == 1 or ref(name=\"id\") == 2)"; @@ -413,7 +351,7 @@ public void testFilterPushDownIn() { .isEqualTo(expectedFilter); } - @Test + @TestTemplate public void testFilterPushDownInNull() { String sqlInNull = String.format("SELECT * FROM %s WHERE data IN ('iceberg',NULL) ", TABLE_NAME); @@ -430,7 +368,7 @@ public void testFilterPushDownInNull() { .isEqualTo(expectedScan); } - @Test + @TestTemplate public void testFilterPushDownNotIn() { String sqlNotIn = String.format("SELECT * FROM %s WHERE id NOT IN (3,2) ", TABLE_NAME); @@ -444,7 +382,7 @@ public void testFilterPushDownNotIn() { .isEqualTo(expectedScan); } - @Test + @TestTemplate public void testFilterPushDownNotInNull() { String sqlNotInNull = String.format("SELECT * FROM %s WHERE id NOT IN (1,2,NULL) ", TABLE_NAME); List resultGT = sql(sqlNotInNull); @@ -455,7 +393,7 @@ public void testFilterPushDownNotInNull() { .isNull(); } - @Test + @TestTemplate public void testFilterPushDownIsNotNull() { String sqlNotNull = String.format("SELECT * FROM %s WHERE data IS NOT NULL", TABLE_NAME); String expectedFilter = "not_null(ref(name=\"data\"))"; @@ -473,7 +411,7 @@ public void testFilterPushDownIsNotNull() { .isEqualTo(expectedFilter); } - @Test + @TestTemplate public void testFilterPushDownIsNull() { String sqlNull = String.format("SELECT * FROM %s WHERE data IS NULL", TABLE_NAME); String expectedFilter = "is_null(ref(name=\"data\"))"; @@ -488,7 +426,7 @@ public void testFilterPushDownIsNull() { .isEqualTo(expectedFilter); } - @Test + @TestTemplate public void testFilterPushDownNot() { String sqlNot = String.format("SELECT * FROM %s WHERE NOT (id = 1 OR id = 2 ) ", TABLE_NAME); @@ -503,7 +441,7 @@ public void testFilterPushDownNot() { .isEqualTo(expectedFilter); } - @Test + @TestTemplate public void testFilterPushDownBetween() { String sqlBetween = String.format("SELECT * FROM %s WHERE id BETWEEN 1 AND 2 ", TABLE_NAME); @@ -522,7 +460,7 @@ public void testFilterPushDownBetween() { .isEqualTo(expected); } - @Test + @TestTemplate public void testFilterPushDownNotBetween() { String sqlNotBetween = String.format("SELECT * FROM %s WHERE id NOT BETWEEN 2 AND 3 ", TABLE_NAME); @@ -538,7 +476,7 @@ public void testFilterPushDownNotBetween() { .isEqualTo(expectedFilter); } - @Test + @TestTemplate public void testFilterPushDownLike() { String expectedFilter = "ref(name=\"data\") startsWith \"\"ice\"\""; @@ -565,7 +503,7 @@ public void testFilterPushDownLike() { .isEqualTo(expectedScan); } - @Test + @TestTemplate public void testFilterNotPushDownLike() { Row expectRecord = Row.of(1, "iceberg", 10.0); String sqlNoPushDown = "SELECT * FROM " + TABLE_NAME + " WHERE data LIKE '%%i' "; @@ -604,7 +542,7 @@ public void testFilterNotPushDownLike() { .isEqualTo(Expressions.alwaysTrue()); } - @Test + @TestTemplate public void testFilterPushDown2Literal() { String sql2Literal = String.format("SELECT * FROM %s WHERE 1 > 0 ", TABLE_NAME); List result = sql(sql2Literal); @@ -616,7 +554,7 @@ public void testFilterPushDown2Literal() { .isEqualTo(Expressions.alwaysTrue()); } - @Test + @TestTemplate public void testSqlParseNaN() { // todo add some test case to test NaN } diff --git a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TableSourceTestBase.java b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TableSourceTestBase.java new file mode 100644 index 000000000000..32c81d9465a4 --- /dev/null +++ b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TableSourceTestBase.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.source; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.File; +import java.io.IOException; +import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Parameter; +import org.apache.iceberg.ParameterizedTestExtension; +import org.apache.iceberg.Parameters; +import org.apache.iceberg.events.Listeners; +import org.apache.iceberg.events.ScanEvent; +import org.apache.iceberg.flink.FlinkConfigOptions; +import org.apache.iceberg.flink.TestBase; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.extension.ExtendWith; + +@ExtendWith(ParameterizedTestExtension.class) +public abstract class TableSourceTestBase extends TestBase { + @Parameters(name = "useFlip27Source = {0}") + protected static Object[][] parameters() { + return new Object[][] { + {false}, {true}, + }; + } + + @Parameter(index = 0) + protected boolean useFlip27Source; + + protected static final String CATALOG_NAME = "test_catalog"; + protected static final String DATABASE_NAME = "test_db"; + protected static final String TABLE_NAME = "test_table"; + protected final FileFormat format = FileFormat.AVRO; + protected int scanEventCount = 0; + protected ScanEvent lastScanEvent = null; + + @Override + protected TableEnvironment getTableEnv() { + super.getTableEnv().getConfig().getConfiguration().set(CoreOptions.DEFAULT_PARALLELISM, 1); + super.getTableEnv() + .getConfig() + .getConfiguration() + .setBoolean(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_FLIP27_SOURCE.key(), useFlip27Source); + return super.getTableEnv(); + } + + @BeforeEach + public void before() throws IOException { + // register a scan event listener to validate pushdown + Listeners.register( + event -> { + scanEventCount += 1; + lastScanEvent = event; + }, + ScanEvent.class); + + File warehouseFile = File.createTempFile("junit", null, temporaryDirectory.toFile()); + assertThat(warehouseFile.delete()).isTrue(); + String warehouse = String.format("file:%s", warehouseFile); + + sql( + "CREATE CATALOG %s WITH ('type'='iceberg', 'catalog-type'='hadoop', 'warehouse'='%s')", + CATALOG_NAME, warehouse); + sql("USE CATALOG %s", CATALOG_NAME); + sql("CREATE DATABASE %s", DATABASE_NAME); + sql("USE %s", DATABASE_NAME); + sql( + "CREATE TABLE %s (id INT, data VARCHAR,d DOUBLE) WITH ('write.format.default'='%s')", + TABLE_NAME, format.name()); + sql( + "INSERT INTO %s VALUES (1,'iceberg',10),(2,'b',20),(3,CAST(NULL AS VARCHAR),30)", + TABLE_NAME); + + this.scanEventCount = 0; + this.lastScanEvent = null; + } + + @AfterEach + public void clean() { + sql("DROP TABLE IF EXISTS %s.%s", DATABASE_NAME, TABLE_NAME); + dropCatalog(CATALOG_NAME, true); + } +} diff --git a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkSourceConfig.java b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkSourceConfig.java index bc7194e38088..8131bd7ab0d3 100644 --- a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkSourceConfig.java +++ b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkSourceConfig.java @@ -20,16 +20,17 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.api.Assumptions.assumeThat; import java.util.List; import org.apache.flink.types.Row; import org.apache.iceberg.flink.FlinkReadOptions; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestTemplate; -public class TestFlinkSourceConfig extends TestFlinkTableSource { +public class TestFlinkSourceConfig extends TableSourceTestBase { private static final String TABLE = "test_table"; - @Test + @TestTemplate public void testFlinkSessionConfig() { getTableEnv().getConfig().set(FlinkReadOptions.STREAMING_OPTION, true); assertThatThrownBy(() -> sql("SELECT * FROM %s /*+ OPTIONS('as-of-timestamp'='1')*/", TABLE)) @@ -37,7 +38,7 @@ public void testFlinkSessionConfig() { .hasMessage("Cannot set as-of-timestamp option for streaming reader"); } - @Test + @TestTemplate public void testFlinkHintConfig() { List result = sql( @@ -46,8 +47,11 @@ public void testFlinkHintConfig() { assertThat(result).hasSize(3); } - @Test + @TestTemplate public void testReadOptionHierarchy() { + // TODO: FLIP-27 source doesn't implement limit pushdown yet + assumeThat(useFlip27Source).isFalse(); + getTableEnv().getConfig().set(FlinkReadOptions.LIMIT_OPTION, 1L); List result = sql("SELECT * FROM %s", TABLE); assertThat(result).hasSize(1); diff --git a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkTableSource.java b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkTableSource.java index 01bab6d063fd..18528c789114 100644 --- a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkTableSource.java +++ b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkTableSource.java @@ -21,78 +21,16 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; -import java.io.File; -import java.io.IOException; import java.util.List; -import org.apache.flink.configuration.CoreOptions; import org.apache.flink.table.api.SqlParserException; -import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.types.Row; -import org.apache.iceberg.FileFormat; -import org.apache.iceberg.events.Listeners; -import org.apache.iceberg.events.ScanEvent; import org.apache.iceberg.expressions.Expressions; -import org.apache.iceberg.flink.TestBase; import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -public class TestFlinkTableSource extends TestBase { - - private static final String CATALOG_NAME = "test_catalog"; - private static final String DATABASE_NAME = "test_db"; - private static final String TABLE_NAME = "test_table"; - private final FileFormat format = FileFormat.AVRO; - - private int scanEventCount = 0; - private ScanEvent lastScanEvent = null; - - @Override - protected TableEnvironment getTableEnv() { - super.getTableEnv().getConfig().getConfiguration().set(CoreOptions.DEFAULT_PARALLELISM, 1); - return super.getTableEnv(); - } - - @BeforeEach - public void before() throws IOException { - // register a scan event listener to validate pushdown - Listeners.register( - event -> { - scanEventCount += 1; - lastScanEvent = event; - }, - ScanEvent.class); - - File warehouseFile = File.createTempFile("junit", null, temporaryDirectory.toFile()); - assertThat(warehouseFile.delete()).isTrue(); - String warehouse = String.format("file:%s", warehouseFile); - - sql( - "CREATE CATALOG %s WITH ('type'='iceberg', 'catalog-type'='hadoop', 'warehouse'='%s')", - CATALOG_NAME, warehouse); - sql("USE CATALOG %s", CATALOG_NAME); - sql("CREATE DATABASE %s", DATABASE_NAME); - sql("USE %s", DATABASE_NAME); - sql( - "CREATE TABLE %s (id INT, data VARCHAR,d DOUBLE) WITH ('write.format.default'='%s')", - TABLE_NAME, format.name()); - sql( - "INSERT INTO %s VALUES (1,'iceberg',10),(2,'b',20),(3,CAST(NULL AS VARCHAR),30)", - TABLE_NAME); - - this.scanEventCount = 0; - this.lastScanEvent = null; - } - - @AfterEach - public void clean() { - sql("DROP TABLE IF EXISTS %s.%s", DATABASE_NAME, TABLE_NAME); - sql("DROP DATABASE IF EXISTS %s", DATABASE_NAME); - dropCatalog(CATALOG_NAME, true); - } - - @Test +import org.junit.jupiter.api.TestTemplate; + +public class TestFlinkTableSource extends TableSourceTestBase { + + @TestTemplate public void testLimitPushDown() { assertThatThrownBy(() -> sql("SELECT * FROM %s LIMIT -1", TABLE_NAME)) @@ -121,7 +59,7 @@ public void testLimitPushDown() { assertThat(mixedResult).hasSize(1).first().isEqualTo(Row.of(1, "iceberg", 10.0)); } - @Test + @TestTemplate public void testNoFilterPushDown() { String sql = String.format("SELECT * FROM %s ", TABLE_NAME); List result = sql(sql); @@ -133,7 +71,7 @@ public void testNoFilterPushDown() { .isEqualTo(Expressions.alwaysTrue()); } - @Test + @TestTemplate public void testFilterPushDownEqual() { String sqlLiteralRight = String.format("SELECT * FROM %s WHERE id = 1 ", TABLE_NAME); String expectedFilter = "ref(name=\"id\") == 1"; @@ -147,7 +85,7 @@ public void testFilterPushDownEqual() { .isEqualTo(expectedFilter); } - @Test + @TestTemplate public void testFilterPushDownEqualNull() { String sqlEqualNull = String.format("SELECT * FROM %s WHERE data = NULL ", TABLE_NAME); @@ -156,7 +94,7 @@ public void testFilterPushDownEqualNull() { assertThat(lastScanEvent).as("Should not push down a filter").isNull(); } - @Test + @TestTemplate public void testFilterPushDownEqualLiteralOnLeft() { String sqlLiteralLeft = String.format("SELECT * FROM %s WHERE 1 = id ", TABLE_NAME); String expectedFilter = "ref(name=\"id\") == 1"; @@ -170,7 +108,7 @@ public void testFilterPushDownEqualLiteralOnLeft() { .isEqualTo(expectedFilter); } - @Test + @TestTemplate public void testFilterPushDownNoEqual() { String sqlNE = String.format("SELECT * FROM %s WHERE id <> 1 ", TABLE_NAME); String expectedFilter = "ref(name=\"id\") != 1"; @@ -187,7 +125,7 @@ public void testFilterPushDownNoEqual() { .isEqualTo(expectedFilter); } - @Test + @TestTemplate public void testFilterPushDownNoEqualNull() { String sqlNotEqualNull = String.format("SELECT * FROM %s WHERE data <> NULL ", TABLE_NAME); @@ -196,7 +134,7 @@ public void testFilterPushDownNoEqualNull() { assertThat(lastScanEvent).as("Should not push down a filter").isNull(); } - @Test + @TestTemplate public void testFilterPushDownAnd() { String sqlAnd = String.format("SELECT * FROM %s WHERE id = 1 AND data = 'iceberg' ", TABLE_NAME); @@ -211,7 +149,7 @@ public void testFilterPushDownAnd() { .isEqualTo(expected); } - @Test + @TestTemplate public void testFilterPushDownOr() { String sqlOr = String.format("SELECT * FROM %s WHERE id = 1 OR data = 'b' ", TABLE_NAME); String expectedFilter = "(ref(name=\"id\") == 1 or ref(name=\"data\") == \"b\")"; @@ -229,7 +167,7 @@ public void testFilterPushDownOr() { .isEqualTo(expectedFilter); } - @Test + @TestTemplate public void testFilterPushDownGreaterThan() { String sqlGT = String.format("SELECT * FROM %s WHERE id > 1 ", TABLE_NAME); String expectedFilter = "ref(name=\"id\") > 1"; @@ -247,7 +185,7 @@ public void testFilterPushDownGreaterThan() { .isEqualTo(expectedFilter); } - @Test + @TestTemplate public void testFilterPushDownGreaterThanNull() { String sqlGT = String.format("SELECT * FROM %s WHERE data > null ", TABLE_NAME); @@ -256,7 +194,7 @@ public void testFilterPushDownGreaterThanNull() { assertThat(lastScanEvent).as("Should not push down a filter").isNull(); } - @Test + @TestTemplate public void testFilterPushDownGreaterThanLiteralOnLeft() { String sqlGT = String.format("SELECT * FROM %s WHERE 3 > id ", TABLE_NAME); String expectedFilter = "ref(name=\"id\") < 3"; @@ -274,7 +212,7 @@ public void testFilterPushDownGreaterThanLiteralOnLeft() { .isEqualTo(expectedFilter); } - @Test + @TestTemplate public void testFilterPushDownGreaterThanEqual() { String sqlGTE = String.format("SELECT * FROM %s WHERE id >= 2 ", TABLE_NAME); String expectedFilter = "ref(name=\"id\") >= 2"; @@ -292,7 +230,7 @@ public void testFilterPushDownGreaterThanEqual() { .isEqualTo(expectedFilter); } - @Test + @TestTemplate public void testFilterPushDownGreaterThanEqualNull() { String sqlGTE = String.format("SELECT * FROM %s WHERE data >= null ", TABLE_NAME); @@ -301,7 +239,7 @@ public void testFilterPushDownGreaterThanEqualNull() { assertThat(lastScanEvent).as("Should not push down a filter").isNull(); } - @Test + @TestTemplate public void testFilterPushDownGreaterThanEqualLiteralOnLeft() { String sqlGTE = String.format("SELECT * FROM %s WHERE 2 >= id ", TABLE_NAME); String expectedFilter = "ref(name=\"id\") <= 2"; @@ -319,7 +257,7 @@ public void testFilterPushDownGreaterThanEqualLiteralOnLeft() { .isEqualTo(expectedFilter); } - @Test + @TestTemplate public void testFilterPushDownLessThan() { String sqlLT = String.format("SELECT * FROM %s WHERE id < 2 ", TABLE_NAME); String expectedFilter = "ref(name=\"id\") < 2"; @@ -334,7 +272,7 @@ public void testFilterPushDownLessThan() { .isEqualTo(expectedFilter); } - @Test + @TestTemplate public void testFilterPushDownLessThanNull() { String sqlLT = String.format("SELECT * FROM %s WHERE data < null ", TABLE_NAME); @@ -343,7 +281,7 @@ public void testFilterPushDownLessThanNull() { assertThat(lastScanEvent).as("Should not push down a filter").isNull(); } - @Test + @TestTemplate public void testFilterPushDownLessThanLiteralOnLeft() { String sqlLT = String.format("SELECT * FROM %s WHERE 2 < id ", TABLE_NAME); String expectedFilter = "ref(name=\"id\") > 2"; @@ -358,7 +296,7 @@ public void testFilterPushDownLessThanLiteralOnLeft() { .isEqualTo(expectedFilter); } - @Test + @TestTemplate public void testFilterPushDownLessThanEqual() { String sqlLTE = String.format("SELECT * FROM %s WHERE id <= 1 ", TABLE_NAME); String expectedFilter = "ref(name=\"id\") <= 1"; @@ -373,7 +311,7 @@ public void testFilterPushDownLessThanEqual() { .isEqualTo(expectedFilter); } - @Test + @TestTemplate public void testFilterPushDownLessThanEqualNull() { String sqlLTE = String.format("SELECT * FROM %s WHERE data <= null ", TABLE_NAME); @@ -382,7 +320,7 @@ public void testFilterPushDownLessThanEqualNull() { assertThat(lastScanEvent).as("Should not push down a filter").isNull(); } - @Test + @TestTemplate public void testFilterPushDownLessThanEqualLiteralOnLeft() { String sqlLTE = String.format("SELECT * FROM %s WHERE 3 <= id ", TABLE_NAME); String expectedFilter = "ref(name=\"id\") >= 3"; @@ -397,7 +335,7 @@ public void testFilterPushDownLessThanEqualLiteralOnLeft() { .isEqualTo(expectedFilter); } - @Test + @TestTemplate public void testFilterPushDownIn() { String sqlIN = String.format("SELECT * FROM %s WHERE id IN (1,2) ", TABLE_NAME); String expectedFilter = "(ref(name=\"id\") == 1 or ref(name=\"id\") == 2)"; @@ -413,7 +351,7 @@ public void testFilterPushDownIn() { .isEqualTo(expectedFilter); } - @Test + @TestTemplate public void testFilterPushDownInNull() { String sqlInNull = String.format("SELECT * FROM %s WHERE data IN ('iceberg',NULL) ", TABLE_NAME); @@ -430,7 +368,7 @@ public void testFilterPushDownInNull() { .isEqualTo(expectedScan); } - @Test + @TestTemplate public void testFilterPushDownNotIn() { String sqlNotIn = String.format("SELECT * FROM %s WHERE id NOT IN (3,2) ", TABLE_NAME); @@ -444,7 +382,7 @@ public void testFilterPushDownNotIn() { .isEqualTo(expectedScan); } - @Test + @TestTemplate public void testFilterPushDownNotInNull() { String sqlNotInNull = String.format("SELECT * FROM %s WHERE id NOT IN (1,2,NULL) ", TABLE_NAME); List resultGT = sql(sqlNotInNull); @@ -455,7 +393,7 @@ public void testFilterPushDownNotInNull() { .isNull(); } - @Test + @TestTemplate public void testFilterPushDownIsNotNull() { String sqlNotNull = String.format("SELECT * FROM %s WHERE data IS NOT NULL", TABLE_NAME); String expectedFilter = "not_null(ref(name=\"data\"))"; @@ -473,7 +411,7 @@ public void testFilterPushDownIsNotNull() { .isEqualTo(expectedFilter); } - @Test + @TestTemplate public void testFilterPushDownIsNull() { String sqlNull = String.format("SELECT * FROM %s WHERE data IS NULL", TABLE_NAME); String expectedFilter = "is_null(ref(name=\"data\"))"; @@ -488,7 +426,7 @@ public void testFilterPushDownIsNull() { .isEqualTo(expectedFilter); } - @Test + @TestTemplate public void testFilterPushDownNot() { String sqlNot = String.format("SELECT * FROM %s WHERE NOT (id = 1 OR id = 2 ) ", TABLE_NAME); @@ -503,7 +441,7 @@ public void testFilterPushDownNot() { .isEqualTo(expectedFilter); } - @Test + @TestTemplate public void testFilterPushDownBetween() { String sqlBetween = String.format("SELECT * FROM %s WHERE id BETWEEN 1 AND 2 ", TABLE_NAME); @@ -522,7 +460,7 @@ public void testFilterPushDownBetween() { .isEqualTo(expected); } - @Test + @TestTemplate public void testFilterPushDownNotBetween() { String sqlNotBetween = String.format("SELECT * FROM %s WHERE id NOT BETWEEN 2 AND 3 ", TABLE_NAME); @@ -538,7 +476,7 @@ public void testFilterPushDownNotBetween() { .isEqualTo(expectedFilter); } - @Test + @TestTemplate public void testFilterPushDownLike() { String expectedFilter = "ref(name=\"data\") startsWith \"\"ice\"\""; @@ -565,7 +503,7 @@ public void testFilterPushDownLike() { .isEqualTo(expectedScan); } - @Test + @TestTemplate public void testFilterNotPushDownLike() { Row expectRecord = Row.of(1, "iceberg", 10.0); String sqlNoPushDown = "SELECT * FROM " + TABLE_NAME + " WHERE data LIKE '%%i' "; @@ -604,7 +542,7 @@ public void testFilterNotPushDownLike() { .isEqualTo(Expressions.alwaysTrue()); } - @Test + @TestTemplate public void testFilterPushDown2Literal() { String sql2Literal = String.format("SELECT * FROM %s WHERE 1 > 0 ", TABLE_NAME); List result = sql(sql2Literal); @@ -616,7 +554,7 @@ public void testFilterPushDown2Literal() { .isEqualTo(Expressions.alwaysTrue()); } - @Test + @TestTemplate public void testSqlParseNaN() { // todo add some test case to test NaN } diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TableSourceTestBase.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TableSourceTestBase.java new file mode 100644 index 000000000000..f89d63ac73e3 --- /dev/null +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TableSourceTestBase.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.source; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.File; +import java.io.IOException; +import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Parameter; +import org.apache.iceberg.ParameterizedTestExtension; +import org.apache.iceberg.Parameters; +import org.apache.iceberg.events.Listeners; +import org.apache.iceberg.events.ScanEvent; +import org.apache.iceberg.flink.FlinkConfigOptions; +import org.apache.iceberg.flink.TestBase; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.extension.ExtendWith; + +@ExtendWith(ParameterizedTestExtension.class) +public abstract class TableSourceTestBase extends TestBase { + @Parameters(name = "useFlip27Source = {0}") + protected static Object[][] parameters() { + return new Object[][] { + {false}, {true}, + }; + } + + @Parameter(index = 0) + protected boolean useFlip27Source; + + protected static final String CATALOG_NAME = "test_catalog"; + protected static final String DATABASE_NAME = "test_db"; + protected static final String TABLE_NAME = "test_table"; + protected final FileFormat format = FileFormat.AVRO; + protected int scanEventCount = 0; + protected ScanEvent lastScanEvent = null; + + @Override + protected TableEnvironment getTableEnv() { + super.getTableEnv().getConfig().getConfiguration().set(CoreOptions.DEFAULT_PARALLELISM, 1); + super.getTableEnv() + .getConfig() + .getConfiguration() + .setBoolean(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_FLIP27_SOURCE.key(), useFlip27Source); + return super.getTableEnv(); + } + + @BeforeEach + public void before() throws IOException { + // register a scan event listener to validate pushdown + Listeners.register( + event -> { + scanEventCount += 1; + lastScanEvent = event; + }, + ScanEvent.class); + + File warehouseFile = File.createTempFile("junit", null, temporaryDirectory.toFile()); + assertThat(warehouseFile.delete()).isTrue(); + String warehouse = String.format("file:%s", warehouseFile); + + sql( + "CREATE CATALOG %s WITH ('type'='iceberg', 'catalog-type'='hadoop', 'warehouse'='%s')", + CATALOG_NAME, warehouse); + sql("USE CATALOG %s", CATALOG_NAME); + sql("CREATE DATABASE %s", DATABASE_NAME); + sql("USE %s", DATABASE_NAME); + sql( + "CREATE TABLE %s (id INT, data VARCHAR,d DOUBLE) WITH ('write.format.default'='%s')", + TABLE_NAME, format.name()); + sql( + "INSERT INTO %s VALUES (1,'iceberg',10),(2,'b',20),(3,CAST(NULL AS VARCHAR),30)", + TABLE_NAME); + + this.scanEventCount = 0; + this.lastScanEvent = null; + } + + @AfterEach + public void clean() { + sql("DROP TABLE IF EXISTS %s.%s", DATABASE_NAME, TABLE_NAME); + dropDatabase(DATABASE_NAME, true); + dropCatalog(CATALOG_NAME, true); + } +} diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkSourceConfig.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkSourceConfig.java index bc7194e38088..8131bd7ab0d3 100644 --- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkSourceConfig.java +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkSourceConfig.java @@ -20,16 +20,17 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.api.Assumptions.assumeThat; import java.util.List; import org.apache.flink.types.Row; import org.apache.iceberg.flink.FlinkReadOptions; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestTemplate; -public class TestFlinkSourceConfig extends TestFlinkTableSource { +public class TestFlinkSourceConfig extends TableSourceTestBase { private static final String TABLE = "test_table"; - @Test + @TestTemplate public void testFlinkSessionConfig() { getTableEnv().getConfig().set(FlinkReadOptions.STREAMING_OPTION, true); assertThatThrownBy(() -> sql("SELECT * FROM %s /*+ OPTIONS('as-of-timestamp'='1')*/", TABLE)) @@ -37,7 +38,7 @@ public void testFlinkSessionConfig() { .hasMessage("Cannot set as-of-timestamp option for streaming reader"); } - @Test + @TestTemplate public void testFlinkHintConfig() { List result = sql( @@ -46,8 +47,11 @@ public void testFlinkHintConfig() { assertThat(result).hasSize(3); } - @Test + @TestTemplate public void testReadOptionHierarchy() { + // TODO: FLIP-27 source doesn't implement limit pushdown yet + assumeThat(useFlip27Source).isFalse(); + getTableEnv().getConfig().set(FlinkReadOptions.LIMIT_OPTION, 1L); List result = sql("SELECT * FROM %s", TABLE); assertThat(result).hasSize(1); diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkTableSource.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkTableSource.java index facbdcaaa533..18528c789114 100644 --- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkTableSource.java +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkTableSource.java @@ -21,78 +21,16 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; -import java.io.File; -import java.io.IOException; import java.util.List; -import org.apache.flink.configuration.CoreOptions; import org.apache.flink.table.api.SqlParserException; -import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.types.Row; -import org.apache.iceberg.FileFormat; -import org.apache.iceberg.events.Listeners; -import org.apache.iceberg.events.ScanEvent; import org.apache.iceberg.expressions.Expressions; -import org.apache.iceberg.flink.TestBase; import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -public class TestFlinkTableSource extends TestBase { - - private static final String CATALOG_NAME = "test_catalog"; - private static final String DATABASE_NAME = "test_db"; - private static final String TABLE_NAME = "test_table"; - private final FileFormat format = FileFormat.AVRO; - - private int scanEventCount = 0; - private ScanEvent lastScanEvent = null; - - @Override - protected TableEnvironment getTableEnv() { - super.getTableEnv().getConfig().getConfiguration().set(CoreOptions.DEFAULT_PARALLELISM, 1); - return super.getTableEnv(); - } - - @BeforeEach - public void before() throws IOException { - // register a scan event listener to validate pushdown - Listeners.register( - event -> { - scanEventCount += 1; - lastScanEvent = event; - }, - ScanEvent.class); - - File warehouseFile = File.createTempFile("junit", null, temporaryDirectory.toFile()); - assertThat(warehouseFile.delete()).isTrue(); - String warehouse = String.format("file:%s", warehouseFile); - - sql( - "CREATE CATALOG %s WITH ('type'='iceberg', 'catalog-type'='hadoop', 'warehouse'='%s')", - CATALOG_NAME, warehouse); - sql("USE CATALOG %s", CATALOG_NAME); - sql("CREATE DATABASE %s", DATABASE_NAME); - sql("USE %s", DATABASE_NAME); - sql( - "CREATE TABLE %s (id INT, data VARCHAR,d DOUBLE) WITH ('write.format.default'='%s')", - TABLE_NAME, format.name()); - sql( - "INSERT INTO %s VALUES (1,'iceberg',10),(2,'b',20),(3,CAST(NULL AS VARCHAR),30)", - TABLE_NAME); - - this.scanEventCount = 0; - this.lastScanEvent = null; - } - - @AfterEach - public void clean() { - sql("DROP TABLE IF EXISTS %s.%s", DATABASE_NAME, TABLE_NAME); - dropDatabase(DATABASE_NAME, true); - dropCatalog(CATALOG_NAME, true); - } - - @Test +import org.junit.jupiter.api.TestTemplate; + +public class TestFlinkTableSource extends TableSourceTestBase { + + @TestTemplate public void testLimitPushDown() { assertThatThrownBy(() -> sql("SELECT * FROM %s LIMIT -1", TABLE_NAME)) @@ -121,7 +59,7 @@ public void testLimitPushDown() { assertThat(mixedResult).hasSize(1).first().isEqualTo(Row.of(1, "iceberg", 10.0)); } - @Test + @TestTemplate public void testNoFilterPushDown() { String sql = String.format("SELECT * FROM %s ", TABLE_NAME); List result = sql(sql); @@ -133,7 +71,7 @@ public void testNoFilterPushDown() { .isEqualTo(Expressions.alwaysTrue()); } - @Test + @TestTemplate public void testFilterPushDownEqual() { String sqlLiteralRight = String.format("SELECT * FROM %s WHERE id = 1 ", TABLE_NAME); String expectedFilter = "ref(name=\"id\") == 1"; @@ -147,7 +85,7 @@ public void testFilterPushDownEqual() { .isEqualTo(expectedFilter); } - @Test + @TestTemplate public void testFilterPushDownEqualNull() { String sqlEqualNull = String.format("SELECT * FROM %s WHERE data = NULL ", TABLE_NAME); @@ -156,7 +94,7 @@ public void testFilterPushDownEqualNull() { assertThat(lastScanEvent).as("Should not push down a filter").isNull(); } - @Test + @TestTemplate public void testFilterPushDownEqualLiteralOnLeft() { String sqlLiteralLeft = String.format("SELECT * FROM %s WHERE 1 = id ", TABLE_NAME); String expectedFilter = "ref(name=\"id\") == 1"; @@ -170,7 +108,7 @@ public void testFilterPushDownEqualLiteralOnLeft() { .isEqualTo(expectedFilter); } - @Test + @TestTemplate public void testFilterPushDownNoEqual() { String sqlNE = String.format("SELECT * FROM %s WHERE id <> 1 ", TABLE_NAME); String expectedFilter = "ref(name=\"id\") != 1"; @@ -187,7 +125,7 @@ public void testFilterPushDownNoEqual() { .isEqualTo(expectedFilter); } - @Test + @TestTemplate public void testFilterPushDownNoEqualNull() { String sqlNotEqualNull = String.format("SELECT * FROM %s WHERE data <> NULL ", TABLE_NAME); @@ -196,7 +134,7 @@ public void testFilterPushDownNoEqualNull() { assertThat(lastScanEvent).as("Should not push down a filter").isNull(); } - @Test + @TestTemplate public void testFilterPushDownAnd() { String sqlAnd = String.format("SELECT * FROM %s WHERE id = 1 AND data = 'iceberg' ", TABLE_NAME); @@ -211,7 +149,7 @@ public void testFilterPushDownAnd() { .isEqualTo(expected); } - @Test + @TestTemplate public void testFilterPushDownOr() { String sqlOr = String.format("SELECT * FROM %s WHERE id = 1 OR data = 'b' ", TABLE_NAME); String expectedFilter = "(ref(name=\"id\") == 1 or ref(name=\"data\") == \"b\")"; @@ -229,7 +167,7 @@ public void testFilterPushDownOr() { .isEqualTo(expectedFilter); } - @Test + @TestTemplate public void testFilterPushDownGreaterThan() { String sqlGT = String.format("SELECT * FROM %s WHERE id > 1 ", TABLE_NAME); String expectedFilter = "ref(name=\"id\") > 1"; @@ -247,7 +185,7 @@ public void testFilterPushDownGreaterThan() { .isEqualTo(expectedFilter); } - @Test + @TestTemplate public void testFilterPushDownGreaterThanNull() { String sqlGT = String.format("SELECT * FROM %s WHERE data > null ", TABLE_NAME); @@ -256,7 +194,7 @@ public void testFilterPushDownGreaterThanNull() { assertThat(lastScanEvent).as("Should not push down a filter").isNull(); } - @Test + @TestTemplate public void testFilterPushDownGreaterThanLiteralOnLeft() { String sqlGT = String.format("SELECT * FROM %s WHERE 3 > id ", TABLE_NAME); String expectedFilter = "ref(name=\"id\") < 3"; @@ -274,7 +212,7 @@ public void testFilterPushDownGreaterThanLiteralOnLeft() { .isEqualTo(expectedFilter); } - @Test + @TestTemplate public void testFilterPushDownGreaterThanEqual() { String sqlGTE = String.format("SELECT * FROM %s WHERE id >= 2 ", TABLE_NAME); String expectedFilter = "ref(name=\"id\") >= 2"; @@ -292,7 +230,7 @@ public void testFilterPushDownGreaterThanEqual() { .isEqualTo(expectedFilter); } - @Test + @TestTemplate public void testFilterPushDownGreaterThanEqualNull() { String sqlGTE = String.format("SELECT * FROM %s WHERE data >= null ", TABLE_NAME); @@ -301,7 +239,7 @@ public void testFilterPushDownGreaterThanEqualNull() { assertThat(lastScanEvent).as("Should not push down a filter").isNull(); } - @Test + @TestTemplate public void testFilterPushDownGreaterThanEqualLiteralOnLeft() { String sqlGTE = String.format("SELECT * FROM %s WHERE 2 >= id ", TABLE_NAME); String expectedFilter = "ref(name=\"id\") <= 2"; @@ -319,7 +257,7 @@ public void testFilterPushDownGreaterThanEqualLiteralOnLeft() { .isEqualTo(expectedFilter); } - @Test + @TestTemplate public void testFilterPushDownLessThan() { String sqlLT = String.format("SELECT * FROM %s WHERE id < 2 ", TABLE_NAME); String expectedFilter = "ref(name=\"id\") < 2"; @@ -334,7 +272,7 @@ public void testFilterPushDownLessThan() { .isEqualTo(expectedFilter); } - @Test + @TestTemplate public void testFilterPushDownLessThanNull() { String sqlLT = String.format("SELECT * FROM %s WHERE data < null ", TABLE_NAME); @@ -343,7 +281,7 @@ public void testFilterPushDownLessThanNull() { assertThat(lastScanEvent).as("Should not push down a filter").isNull(); } - @Test + @TestTemplate public void testFilterPushDownLessThanLiteralOnLeft() { String sqlLT = String.format("SELECT * FROM %s WHERE 2 < id ", TABLE_NAME); String expectedFilter = "ref(name=\"id\") > 2"; @@ -358,7 +296,7 @@ public void testFilterPushDownLessThanLiteralOnLeft() { .isEqualTo(expectedFilter); } - @Test + @TestTemplate public void testFilterPushDownLessThanEqual() { String sqlLTE = String.format("SELECT * FROM %s WHERE id <= 1 ", TABLE_NAME); String expectedFilter = "ref(name=\"id\") <= 1"; @@ -373,7 +311,7 @@ public void testFilterPushDownLessThanEqual() { .isEqualTo(expectedFilter); } - @Test + @TestTemplate public void testFilterPushDownLessThanEqualNull() { String sqlLTE = String.format("SELECT * FROM %s WHERE data <= null ", TABLE_NAME); @@ -382,7 +320,7 @@ public void testFilterPushDownLessThanEqualNull() { assertThat(lastScanEvent).as("Should not push down a filter").isNull(); } - @Test + @TestTemplate public void testFilterPushDownLessThanEqualLiteralOnLeft() { String sqlLTE = String.format("SELECT * FROM %s WHERE 3 <= id ", TABLE_NAME); String expectedFilter = "ref(name=\"id\") >= 3"; @@ -397,7 +335,7 @@ public void testFilterPushDownLessThanEqualLiteralOnLeft() { .isEqualTo(expectedFilter); } - @Test + @TestTemplate public void testFilterPushDownIn() { String sqlIN = String.format("SELECT * FROM %s WHERE id IN (1,2) ", TABLE_NAME); String expectedFilter = "(ref(name=\"id\") == 1 or ref(name=\"id\") == 2)"; @@ -413,7 +351,7 @@ public void testFilterPushDownIn() { .isEqualTo(expectedFilter); } - @Test + @TestTemplate public void testFilterPushDownInNull() { String sqlInNull = String.format("SELECT * FROM %s WHERE data IN ('iceberg',NULL) ", TABLE_NAME); @@ -430,7 +368,7 @@ public void testFilterPushDownInNull() { .isEqualTo(expectedScan); } - @Test + @TestTemplate public void testFilterPushDownNotIn() { String sqlNotIn = String.format("SELECT * FROM %s WHERE id NOT IN (3,2) ", TABLE_NAME); @@ -444,7 +382,7 @@ public void testFilterPushDownNotIn() { .isEqualTo(expectedScan); } - @Test + @TestTemplate public void testFilterPushDownNotInNull() { String sqlNotInNull = String.format("SELECT * FROM %s WHERE id NOT IN (1,2,NULL) ", TABLE_NAME); List resultGT = sql(sqlNotInNull); @@ -455,7 +393,7 @@ public void testFilterPushDownNotInNull() { .isNull(); } - @Test + @TestTemplate public void testFilterPushDownIsNotNull() { String sqlNotNull = String.format("SELECT * FROM %s WHERE data IS NOT NULL", TABLE_NAME); String expectedFilter = "not_null(ref(name=\"data\"))"; @@ -473,7 +411,7 @@ public void testFilterPushDownIsNotNull() { .isEqualTo(expectedFilter); } - @Test + @TestTemplate public void testFilterPushDownIsNull() { String sqlNull = String.format("SELECT * FROM %s WHERE data IS NULL", TABLE_NAME); String expectedFilter = "is_null(ref(name=\"data\"))"; @@ -488,7 +426,7 @@ public void testFilterPushDownIsNull() { .isEqualTo(expectedFilter); } - @Test + @TestTemplate public void testFilterPushDownNot() { String sqlNot = String.format("SELECT * FROM %s WHERE NOT (id = 1 OR id = 2 ) ", TABLE_NAME); @@ -503,7 +441,7 @@ public void testFilterPushDownNot() { .isEqualTo(expectedFilter); } - @Test + @TestTemplate public void testFilterPushDownBetween() { String sqlBetween = String.format("SELECT * FROM %s WHERE id BETWEEN 1 AND 2 ", TABLE_NAME); @@ -522,7 +460,7 @@ public void testFilterPushDownBetween() { .isEqualTo(expected); } - @Test + @TestTemplate public void testFilterPushDownNotBetween() { String sqlNotBetween = String.format("SELECT * FROM %s WHERE id NOT BETWEEN 2 AND 3 ", TABLE_NAME); @@ -538,7 +476,7 @@ public void testFilterPushDownNotBetween() { .isEqualTo(expectedFilter); } - @Test + @TestTemplate public void testFilterPushDownLike() { String expectedFilter = "ref(name=\"data\") startsWith \"\"ice\"\""; @@ -565,7 +503,7 @@ public void testFilterPushDownLike() { .isEqualTo(expectedScan); } - @Test + @TestTemplate public void testFilterNotPushDownLike() { Row expectRecord = Row.of(1, "iceberg", 10.0); String sqlNoPushDown = "SELECT * FROM " + TABLE_NAME + " WHERE data LIKE '%%i' "; @@ -604,7 +542,7 @@ public void testFilterNotPushDownLike() { .isEqualTo(Expressions.alwaysTrue()); } - @Test + @TestTemplate public void testFilterPushDown2Literal() { String sql2Literal = String.format("SELECT * FROM %s WHERE 1 > 0 ", TABLE_NAME); List result = sql(sql2Literal); @@ -616,7 +554,7 @@ public void testFilterPushDown2Literal() { .isEqualTo(Expressions.alwaysTrue()); } - @Test + @TestTemplate public void testSqlParseNaN() { // todo add some test case to test NaN }