diff --git a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/catalog/FlinkCatalog.java b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/catalog/FlinkCatalog.java index 95d29e2f..8b7af920 100644 --- a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/catalog/FlinkCatalog.java +++ b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/catalog/FlinkCatalog.java @@ -25,6 +25,7 @@ import com.alibaba.fluss.connector.flink.utils.CatalogExceptionUtil; import com.alibaba.fluss.connector.flink.utils.FlinkConversions; import com.alibaba.fluss.exception.FlussRuntimeException; +import com.alibaba.fluss.metadata.PartitionInfo; import com.alibaba.fluss.metadata.TableDescriptor; import com.alibaba.fluss.metadata.TableInfo; import com.alibaba.fluss.metadata.TablePath; @@ -67,6 +68,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.stream.Collectors; import static com.alibaba.fluss.config.ConfigOptions.BOOTSTRAP_SERVERS; import static org.apache.flink.util.Preconditions.checkArgument; @@ -359,8 +361,45 @@ public void alterTable(ObjectPath objectPath, CatalogBaseTable catalogBaseTable, @Override public List listPartitions(ObjectPath objectPath) throws TableNotExistException, TableNotPartitionedException, CatalogException { - // TODO: use admin.listPartitionInfos() - return Collections.emptyList(); + + String tableName = objectPath.getObjectName(); + TablePath tablePath; + try { + if (tableName.contains(LAKE_TABLE_SPLITTER)) { + tablePath = + TablePath.of( + objectPath.getDatabaseName(), + tableName.split("\\" + LAKE_TABLE_SPLITTER)[0]); + } else { + tablePath = toTablePath(objectPath); + } + TableInfo tableInfo = admin.getTable(tablePath).get(); + TableDescriptor tableDescriptor = tableInfo.getTableDescriptor(); + if (!tableDescriptor.isPartitioned()) { + throw new TableNotPartitionedException(getName(), objectPath); + } + String partitionField = tableDescriptor.getPartitionKeys().get(0); + List partitionInfoList = admin.listPartitionInfos(tablePath).get(); + return partitionInfoList.stream() + .map( + par -> + new CatalogPartitionSpec( + Collections.singletonMap( + partitionField, par.getPartitionName()))) + .collect(Collectors.toList()); + + } catch (Exception e) { + Throwable t = ExceptionUtils.stripExecutionException(e); + if (CatalogExceptionUtil.isTableNotExist(t)) { + throw new TableNotExistException(getName(), objectPath); + } else { + throw new CatalogException( + String.format( + "Failed to list table [%s]'s partition infos in %s", + objectPath, getName()), + t); + } + } } @Override @@ -368,6 +407,7 @@ public List listPartitions( ObjectPath objectPath, CatalogPartitionSpec catalogPartitionSpec) throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, CatalogException { + // TODO Supports listing infos of specified partitions. throw new UnsupportedOperationException(); } diff --git a/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/catalog/FlinkCatalogITCase.java b/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/catalog/FlinkCatalogITCase.java index 16fbe0bb..20c26564 100644 --- a/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/catalog/FlinkCatalogITCase.java +++ b/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/catalog/FlinkCatalogITCase.java @@ -19,8 +19,10 @@ import com.alibaba.fluss.config.ConfigOptions; import com.alibaba.fluss.config.Configuration; import com.alibaba.fluss.exception.InvalidTableException; +import com.alibaba.fluss.metadata.TablePath; import com.alibaba.fluss.server.testutils.FlussClusterExtension; +import org.apache.commons.lang3.time.DateFormatUtils; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Schema; @@ -47,6 +49,7 @@ import static com.alibaba.fluss.connector.flink.FlinkConnectorOptions.BOOTSTRAP_SERVERS; import static com.alibaba.fluss.connector.flink.FlinkConnectorOptions.BUCKET_KEY; import static com.alibaba.fluss.connector.flink.FlinkConnectorOptions.BUCKET_NUMBER; +import static com.alibaba.fluss.connector.flink.source.testutils.FlinkTestBase.waitUntilPartitions; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -187,6 +190,7 @@ void testCreatePartitionedTable() throws Exception { "create table test_partitioned_table (a int, b string) partitioned by (b) " + "with ('table.auto-partition.enabled' = 'true'," + " 'table.auto-partition.time-unit' = 'day')"); + tEnv.executeSql("show partitions test_partitioned_table$lake").print(); Schema.Builder schemaBuilder = Schema.newBuilder(); schemaBuilder.column("a", DataTypes.INT()).column("b", DataTypes.STRING()); Schema expectedSchema = schemaBuilder.build(); @@ -197,6 +201,40 @@ void testCreatePartitionedTable() throws Exception { assertThat(table.getPartitionKeys()).isEqualTo(Collections.singletonList("b")); } + @Test + void testListPartitionInfos() throws Exception { + tEnv.executeSql( + "create table test_partitioned_table (a int, b string) partitioned by (b) " + + "with ('table.auto-partition.enabled' = 'true'," + + " 'table.auto-partition.time-unit' = 'day'," + + " 'table.auto-partition.num-precreate' = '1')"); + + CatalogTable table = + (CatalogTable) + catalog.getTable(new ObjectPath(DEFAULT_DB, "test_partitioned_table")); + assertThat(table.getPartitionKeys()).isEqualTo(Collections.singletonList("b")); + + String today = DateFormatUtils.format(System.currentTimeMillis(), "yyyyMMdd"); + // Wait until ${today} partition is created. + waitUntilPartitions( + FLUSS_CLUSTER_EXTENSION.getZooKeeperClient(), + TablePath.of(DEFAULT_DB, "test_partitioned_table"), + 1); + + List partitionInfos = + CollectionUtil.iteratorToList( + tEnv.executeSql("show partitions test_partitioned_table").collect()); + + assertThat(partitionInfos).containsExactlyInAnyOrder(Row.of("b=" + today)); + + // show the lake table partitions info. + List lakeTablePartitionInfos = + CollectionUtil.iteratorToList( + tEnv.executeSql("show partitions test_partitioned_table$lake").collect()); + + assertThat(lakeTablePartitionInfos).containsExactlyInAnyOrder(Row.of("b=" + today)); + } + @Test void testTableWithExpression() throws Exception { // create a table with watermark and computed column diff --git a/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/catalog/FlinkCatalogTest.java b/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/catalog/FlinkCatalogTest.java index a77ad8ca..f0768621 100644 --- a/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/catalog/FlinkCatalogTest.java +++ b/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/catalog/FlinkCatalogTest.java @@ -18,13 +18,16 @@ import com.alibaba.fluss.config.ConfigOptions; import com.alibaba.fluss.config.Configuration; +import com.alibaba.fluss.metadata.TablePath; import com.alibaba.fluss.server.testutils.FlussClusterExtension; +import org.apache.commons.lang3.time.DateFormatUtils; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.Schema; import org.apache.flink.table.catalog.Catalog; import org.apache.flink.table.catalog.CatalogBaseTable; import org.apache.flink.table.catalog.CatalogDatabase; +import org.apache.flink.table.catalog.CatalogPartitionSpec; import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.Column; import org.apache.flink.table.catalog.ObjectPath; @@ -37,6 +40,7 @@ import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException; import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException; import org.apache.flink.table.expressions.ResolvedExpression; import org.apache.flink.table.expressions.utils.ResolvedExpressionMock; import org.apache.flink.table.factories.FactoryUtil; @@ -54,6 +58,7 @@ import static com.alibaba.fluss.connector.flink.FlinkConnectorOptions.BUCKET_KEY; import static com.alibaba.fluss.connector.flink.FlinkConnectorOptions.BUCKET_NUMBER; +import static com.alibaba.fluss.connector.flink.source.testutils.FlinkTestBase.waitUntilPartitions; import static com.alibaba.fluss.connector.flink.utils.CatalogTableTestUtils.addOptions; import static com.alibaba.fluss.connector.flink.utils.CatalogTableTestUtils.checkEqualsIgnoreSchema; import static com.alibaba.fluss.connector.flink.utils.CatalogTableTestUtils.checkEqualsRespectSchema; @@ -361,6 +366,64 @@ void testDatabase() throws Exception { .hasMessage("Database %s does not exist in Catalog %s.", "unknown", CATALOG_NAME); } + @Test + public void testListPartitionInfos() throws Exception { + Map options = new HashMap<>(); + assertThatThrownBy(() -> catalog.getTable(tableInDefaultDb)) + .isInstanceOf(TableNotExistException.class) + .hasMessage( + String.format( + "Table (or view) %s does not exist in Catalog %s.", + tableInDefaultDb, CATALOG_NAME)); + + // test create partition table + options.put(ConfigOptions.TABLE_AUTO_PARTITION_ENABLED.key(), "true"); + options.put(ConfigOptions.TABLE_AUTO_PARTITION_TIME_UNIT.key(), "day"); + options.put(ConfigOptions.TABLE_AUTO_PARTITION_NUM_PRECREATE.key(), "1"); + ResolvedSchema resolvedSchema = this.createSchema(); + CatalogTable table = + new ResolvedCatalogTable( + CatalogTable.of( + Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(), + "test comment", + Collections.singletonList("first"), + options), + resolvedSchema); + catalog.createTable(this.tableInDefaultDb, table, false); + assertThat(catalog.tableExists(this.tableInDefaultDb)).isTrue(); + + String today = DateFormatUtils.format(System.currentTimeMillis(), "yyyyMMdd"); + // Wait until ${today} partition is created. + waitUntilPartitions( + FLUSS_CLUSTER_EXTENSION.getZooKeeperClient(), + TablePath.of( + this.tableInDefaultDb.getDatabaseName(), + this.tableInDefaultDb.getObjectName()), + 1); + + List result = catalog.listPartitions(this.tableInDefaultDb); + assertThat(result) + .containsExactlyInAnyOrder( + new CatalogPartitionSpec(Collections.singletonMap("first", today))); + + // Query the partition infos of the lake table. + List lakeTablePartitions = + catalog.listPartitions(new ObjectPath(DEFAULT_DB, "t1$lake")); + assertThat(lakeTablePartitions) + .containsExactlyInAnyOrder( + new CatalogPartitionSpec(Collections.singletonMap("first", today))); + } + + @Test + public void testNotPartitionedTable() throws Exception { + CatalogTable table = this.newCatalogTable(new HashMap<>()); + catalog.createTable(this.tableInDefaultDb, table, false); + assertThatThrownBy(() -> catalog.listPartitions(this.tableInDefaultDb)) + .hasCauseInstanceOf(TableNotPartitionedException.class) + .hasRootCauseMessage( + "Table default.t1 in catalog test-catalog is not partitioned."); + } + private void createAndCheckAndDropTable( final ResolvedSchema schema, ObjectPath tablePath, Map options) throws Exception {