Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] FlinkCatalog supports list partition infos. #87

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.alibaba.fluss.connector.flink.utils.CatalogExceptionUtil;
import com.alibaba.fluss.connector.flink.utils.FlinkConversions;
import com.alibaba.fluss.exception.FlussRuntimeException;
import com.alibaba.fluss.metadata.PartitionInfo;
import com.alibaba.fluss.metadata.TableDescriptor;
import com.alibaba.fluss.metadata.TableInfo;
import com.alibaba.fluss.metadata.TablePath;
Expand Down Expand Up @@ -67,6 +68,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

import static com.alibaba.fluss.config.ConfigOptions.BOOTSTRAP_SERVERS;
import static org.apache.flink.util.Preconditions.checkArgument;
Expand Down Expand Up @@ -359,15 +361,53 @@ public void alterTable(ObjectPath objectPath, CatalogBaseTable catalogBaseTable,
@Override
public List<CatalogPartitionSpec> listPartitions(ObjectPath objectPath)
throws TableNotExistException, TableNotPartitionedException, CatalogException {
// TODO: use admin.listPartitionInfos()
return Collections.emptyList();

String tableName = objectPath.getObjectName();
TablePath tablePath;
try {
if (tableName.contains(LAKE_TABLE_SPLITTER)) {
tablePath =
TablePath.of(
LinMingQiang marked this conversation as resolved.
Show resolved Hide resolved
objectPath.getDatabaseName(),
tableName.split("\\" + LAKE_TABLE_SPLITTER)[0]);
} else {
tablePath = toTablePath(objectPath);
}
TableInfo tableInfo = admin.getTable(tablePath).get();
TableDescriptor tableDescriptor = tableInfo.getTableDescriptor();
if (!tableDescriptor.isPartitioned()) {
throw new TableNotPartitionedException(getName(), objectPath);
}
String partitionField = tableDescriptor.getPartitionKeys().get(0);
LinMingQiang marked this conversation as resolved.
Show resolved Hide resolved
List<PartitionInfo> partitionInfoList = admin.listPartitionInfos(tablePath).get();
return partitionInfoList.stream()
.map(
par ->
new CatalogPartitionSpec(
Collections.singletonMap(
partitionField, par.getPartitionName())))
.collect(Collectors.toList());

} catch (Exception e) {
Throwable t = ExceptionUtils.stripExecutionException(e);
if (CatalogExceptionUtil.isTableNotExist(t)) {
throw new TableNotExistException(getName(), objectPath);
} else {
throw new CatalogException(
String.format(
"Failed to list table [%s]'s partition infos in %s",
objectPath, getName()),
t);
}
}
}

@Override
public List<CatalogPartitionSpec> listPartitions(
ObjectPath objectPath, CatalogPartitionSpec catalogPartitionSpec)
throws TableNotExistException, TableNotPartitionedException,
PartitionSpecInvalidException, CatalogException {
// TODO Supports listing infos of specified partitions.
throw new UnsupportedOperationException();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
import com.alibaba.fluss.config.ConfigOptions;
import com.alibaba.fluss.config.Configuration;
import com.alibaba.fluss.exception.InvalidTableException;
import com.alibaba.fluss.metadata.TablePath;
import com.alibaba.fluss.server.testutils.FlussClusterExtension;

import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Schema;
Expand All @@ -47,6 +49,7 @@
import static com.alibaba.fluss.connector.flink.FlinkConnectorOptions.BOOTSTRAP_SERVERS;
import static com.alibaba.fluss.connector.flink.FlinkConnectorOptions.BUCKET_KEY;
import static com.alibaba.fluss.connector.flink.FlinkConnectorOptions.BUCKET_NUMBER;
import static com.alibaba.fluss.connector.flink.source.testutils.FlinkTestBase.waitUntilPartitions;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

Expand Down Expand Up @@ -187,6 +190,7 @@ void testCreatePartitionedTable() throws Exception {
"create table test_partitioned_table (a int, b string) partitioned by (b) "
+ "with ('table.auto-partition.enabled' = 'true',"
+ " 'table.auto-partition.time-unit' = 'day')");
tEnv.executeSql("show partitions test_partitioned_table$lake").print();
Schema.Builder schemaBuilder = Schema.newBuilder();
schemaBuilder.column("a", DataTypes.INT()).column("b", DataTypes.STRING());
Schema expectedSchema = schemaBuilder.build();
Expand All @@ -197,6 +201,40 @@ void testCreatePartitionedTable() throws Exception {
assertThat(table.getPartitionKeys()).isEqualTo(Collections.singletonList("b"));
}

@Test
void testListPartitionInfos() throws Exception {
tEnv.executeSql(
"create table test_partitioned_table (a int, b string) partitioned by (b) "
+ "with ('table.auto-partition.enabled' = 'true',"
+ " 'table.auto-partition.time-unit' = 'day',"
+ " 'table.auto-partition.num-precreate' = '1')");

CatalogTable table =
(CatalogTable)
catalog.getTable(new ObjectPath(DEFAULT_DB, "test_partitioned_table"));
assertThat(table.getPartitionKeys()).isEqualTo(Collections.singletonList("b"));

String today = DateFormatUtils.format(System.currentTimeMillis(), "yyyyMMdd");
// Wait until ${today} partition is created.
waitUntilPartitions(
FLUSS_CLUSTER_EXTENSION.getZooKeeperClient(),
TablePath.of(DEFAULT_DB, "test_partitioned_table"),
1);

List<Row> partitionInfos =
CollectionUtil.iteratorToList(
tEnv.executeSql("show partitions test_partitioned_table").collect());

assertThat(partitionInfos).containsExactlyInAnyOrder(Row.of("b=" + today));

// show the lake table partitions info.
List<Row> lakeTablePartitionInfos =
CollectionUtil.iteratorToList(
tEnv.executeSql("show partitions test_partitioned_table$lake").collect());

assertThat(lakeTablePartitionInfos).containsExactlyInAnyOrder(Row.of("b=" + today));
}

@Test
void testTableWithExpression() throws Exception {
// create a table with watermark and computed column
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,16 @@

import com.alibaba.fluss.config.ConfigOptions;
import com.alibaba.fluss.config.Configuration;
import com.alibaba.fluss.metadata.TablePath;
import com.alibaba.fluss.server.testutils.FlussClusterExtension;

import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogDatabase;
import org.apache.flink.table.catalog.CatalogPartitionSpec;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ObjectPath;
Expand All @@ -37,6 +40,7 @@
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.expressions.utils.ResolvedExpressionMock;
import org.apache.flink.table.factories.FactoryUtil;
Expand All @@ -54,6 +58,7 @@

import static com.alibaba.fluss.connector.flink.FlinkConnectorOptions.BUCKET_KEY;
import static com.alibaba.fluss.connector.flink.FlinkConnectorOptions.BUCKET_NUMBER;
import static com.alibaba.fluss.connector.flink.source.testutils.FlinkTestBase.waitUntilPartitions;
import static com.alibaba.fluss.connector.flink.utils.CatalogTableTestUtils.addOptions;
import static com.alibaba.fluss.connector.flink.utils.CatalogTableTestUtils.checkEqualsIgnoreSchema;
import static com.alibaba.fluss.connector.flink.utils.CatalogTableTestUtils.checkEqualsRespectSchema;
Expand Down Expand Up @@ -361,6 +366,64 @@ void testDatabase() throws Exception {
.hasMessage("Database %s does not exist in Catalog %s.", "unknown", CATALOG_NAME);
}

@Test
public void testListPartitionInfos() throws Exception {
Map<String, String> options = new HashMap<>();
assertThatThrownBy(() -> catalog.getTable(tableInDefaultDb))
.isInstanceOf(TableNotExistException.class)
.hasMessage(
String.format(
"Table (or view) %s does not exist in Catalog %s.",
tableInDefaultDb, CATALOG_NAME));

// test create partition table
options.put(ConfigOptions.TABLE_AUTO_PARTITION_ENABLED.key(), "true");
options.put(ConfigOptions.TABLE_AUTO_PARTITION_TIME_UNIT.key(), "day");
options.put(ConfigOptions.TABLE_AUTO_PARTITION_NUM_PRECREATE.key(), "1");
ResolvedSchema resolvedSchema = this.createSchema();
CatalogTable table =
new ResolvedCatalogTable(
CatalogTable.of(
Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(),
"test comment",
Collections.singletonList("first"),
options),
resolvedSchema);
catalog.createTable(this.tableInDefaultDb, table, false);
assertThat(catalog.tableExists(this.tableInDefaultDb)).isTrue();

String today = DateFormatUtils.format(System.currentTimeMillis(), "yyyyMMdd");
// Wait until ${today} partition is created.
waitUntilPartitions(
FLUSS_CLUSTER_EXTENSION.getZooKeeperClient(),
TablePath.of(
this.tableInDefaultDb.getDatabaseName(),
this.tableInDefaultDb.getObjectName()),
1);

List<CatalogPartitionSpec> result = catalog.listPartitions(this.tableInDefaultDb);
assertThat(result)
.containsExactlyInAnyOrder(
new CatalogPartitionSpec(Collections.singletonMap("first", today)));

// Query the partition infos of the lake table.
List<CatalogPartitionSpec> lakeTablePartitions =
catalog.listPartitions(new ObjectPath(DEFAULT_DB, "t1$lake"));
assertThat(lakeTablePartitions)
.containsExactlyInAnyOrder(
new CatalogPartitionSpec(Collections.singletonMap("first", today)));
}

@Test
public void testNotPartitionedTable() throws Exception {
CatalogTable table = this.newCatalogTable(new HashMap<>());
catalog.createTable(this.tableInDefaultDb, table, false);
assertThatThrownBy(() -> catalog.listPartitions(this.tableInDefaultDb))
.hasCauseInstanceOf(TableNotPartitionedException.class)
.hasRootCauseMessage(
"Table default.t1 in catalog test-catalog is not partitioned.");
}

private void createAndCheckAndDropTable(
final ResolvedSchema schema, ObjectPath tablePath, Map<String, String> options)
throws Exception {
Expand Down