diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/admin/Admin.java b/fluss-client/src/main/java/com/alibaba/fluss/client/admin/Admin.java index dcfa86a8..1ed261db 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/admin/Admin.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/admin/Admin.java @@ -182,6 +182,25 @@ CompletableFuture createTable( */ CompletableFuture deleteTable(TablePath tablePath, boolean ignoreIfNotExists); + /** + * Rename the table with the given table path asynchronously. + * + *

The following exceptions can be anticipated when calling {@code get()} on returned future. + * + *

+ * + * @param fromTablePath The source table path of the table. + * @param toTablePath The target table path of the table. + * @param ignoreIfNotExists Flag to specify behavior when a table with the given name does not + * exist: if set to false, throw a TableNotExistException, if set to true, do nothing. + */ + CompletableFuture renameTable( + TablePath fromTablePath, TablePath toTablePath, boolean ignoreIfNotExists); + /** * Get whether table exists asynchronously. * diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/admin/FlussAdmin.java b/fluss-client/src/main/java/com/alibaba/fluss/client/admin/FlussAdmin.java index 948596c3..5b56ddf8 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/admin/FlussAdmin.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/admin/FlussAdmin.java @@ -53,6 +53,8 @@ import com.alibaba.fluss.rpc.messages.ListTablesRequest; import com.alibaba.fluss.rpc.messages.ListTablesResponse; import com.alibaba.fluss.rpc.messages.PbListOffsetsRespForBucket; +import com.alibaba.fluss.rpc.messages.PbTablePath; +import com.alibaba.fluss.rpc.messages.RenameTableRequest; import com.alibaba.fluss.rpc.messages.TableExistsRequest; import com.alibaba.fluss.rpc.messages.TableExistsResponse; import com.alibaba.fluss.rpc.protocol.ApiError; @@ -189,6 +191,21 @@ public CompletableFuture deleteTable(TablePath tablePath, boolean ignoreIf return gateway.dropTable(request).thenApply(r -> null); } + @Override + public CompletableFuture renameTable( + TablePath fromTablePath, TablePath toTablePath, boolean ignoreIfNotExists) { + RenameTableRequest request = new RenameTableRequest(); + request.setIgnoreIfNotExists(ignoreIfNotExists) + .setFromTablePath( + new PbTablePath() + .setDatabaseName(fromTablePath.getDatabaseName()) + .setTableName(fromTablePath.getTableName())) + .setToTablePath() + .setDatabaseName(toTablePath.getDatabaseName()) + .setTableName(toTablePath.getTableName()); + return gateway.renameTable(request).thenApply(r -> null); + } + @Override public CompletableFuture tableExists(TablePath tablePath) { TableExistsRequest request = new TableExistsRequest(); diff --git a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/catalog/FlinkCatalog.java b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/catalog/FlinkCatalog.java index 95d29e2f..caf9d039 100644 --- a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/catalog/FlinkCatalog.java +++ b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/catalog/FlinkCatalog.java @@ -317,9 +317,27 @@ public void dropTable(ObjectPath objectPath, boolean ignoreIfNotExists) } @Override - public void renameTable(ObjectPath objectPath, String s, boolean b) + public void renameTable(ObjectPath objectPath, String newTableName, boolean ignoreIfNotExists) throws TableNotExistException, TableAlreadyExistException, CatalogException { - throw new UnsupportedOperationException(); + TablePath fromTablePath = toTablePath(objectPath); + ObjectPath toObjectPath = new ObjectPath(objectPath.getDatabaseName(), newTableName); + TablePath toTablePath = toTablePath(toObjectPath); + try { + admin.renameTable(fromTablePath, toTablePath, ignoreIfNotExists).get(); + } catch (Exception e) { + Throwable t = ExceptionUtils.stripExecutionException(e); + if (CatalogExceptionUtil.isTableNotExist(t)) { + throw new TableNotExistException(getName(), objectPath); + } else if (CatalogExceptionUtil.isTableAlreadyExist(t)) { + throw new TableAlreadyExistException(getName(), toObjectPath); + } else { + throw new CatalogException( + String.format( + "Failed to rename table %s to %s in %s", + objectPath, toObjectPath, getName()), + t); + } + } } @Override diff --git a/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/catalog/FlinkCatalogTest.java b/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/catalog/FlinkCatalogTest.java index a77ad8ca..b358dc76 100644 --- a/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/catalog/FlinkCatalogTest.java +++ b/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/catalog/FlinkCatalogTest.java @@ -361,6 +361,29 @@ void testDatabase() throws Exception { .hasMessage("Database %s does not exist in Catalog %s.", "unknown", CATALOG_NAME); } + @Test + void testRenameTable() throws Exception { + Map options = new HashMap<>(); + CatalogTable table = this.newCatalogTable(options); + catalog.createTable(this.tableInDefaultDb, table, false); + assertThat(catalog.tableExists(this.tableInDefaultDb)).isTrue(); + // rename table name from 't1' to 't2'. + catalog.renameTable(this.tableInDefaultDb, "t2", false); + ObjectPath t2InDefaultDb = new ObjectPath(DEFAULT_DB, "t2"); + assertThat(catalog.tableExists(t2InDefaultDb)).isTrue(); + assertThat(catalog.tableExists(this.tableInDefaultDb)).isFalse(); + + // rename an existing table from 't2' to 't3', should throw exception. + ObjectPath t3InDefaultDb = new ObjectPath(DEFAULT_DB, "t3"); + catalog.createTable(t3InDefaultDb, table, false); + assertThatThrownBy(() -> catalog.renameTable(t2InDefaultDb, "t3", false)) + .isInstanceOf(TableAlreadyExistException.class) + .hasMessage( + String.format( + "Table (or view) %s already exists in Catalog %s.", + t3InDefaultDb, CATALOG_NAME)); + } + private void createAndCheckAndDropTable( final ResolvedSchema schema, ObjectPath tablePath, Map options) throws Exception { diff --git a/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/gateway/AdminGateway.java b/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/gateway/AdminGateway.java index 3ab96912..bfbd6e02 100644 --- a/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/gateway/AdminGateway.java +++ b/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/gateway/AdminGateway.java @@ -24,6 +24,8 @@ import com.alibaba.fluss.rpc.messages.DropDatabaseResponse; import com.alibaba.fluss.rpc.messages.DropTableRequest; import com.alibaba.fluss.rpc.messages.DropTableResponse; +import com.alibaba.fluss.rpc.messages.RenameTableRequest; +import com.alibaba.fluss.rpc.messages.RenameTableResponse; import com.alibaba.fluss.rpc.protocol.ApiKeys; import com.alibaba.fluss.rpc.protocol.RPC; @@ -63,5 +65,13 @@ public interface AdminGateway extends AdminReadOnlyGateway { @RPC(api = ApiKeys.DROP_TABLE) CompletableFuture dropTable(DropTableRequest request); - // todo: rename table & alter table + /** + * Rename a table. + * + * @param request Rename table request + */ + @RPC(api = ApiKeys.RENAME_TABLE) + CompletableFuture renameTable(RenameTableRequest request); + + // todo: alter table } diff --git a/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/protocol/ApiKeys.java b/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/protocol/ApiKeys.java index 949917b2..7f3dbf97 100644 --- a/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/protocol/ApiKeys.java +++ b/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/protocol/ApiKeys.java @@ -61,7 +61,8 @@ public enum ApiKeys { NOTIFY_LAKE_TABLE_OFFSET(1031, 0, 0, PRIVATE), DESCRIBE_LAKE_STORAGE(1032, 0, 0, PUBLIC), GET_LAKE_TABLE_SNAPSHOT(1033, 0, 0, PUBLIC), - LIMIT_SCAN(1034, 0, 0, PUBLIC); + LIMIT_SCAN(1034, 0, 0, PUBLIC), + RENAME_TABLE(1035, 0, 0, PUBLIC); private static final Map ID_TO_TYPE = Arrays.stream(ApiKeys.values()) diff --git a/fluss-rpc/src/main/proto/FlussApi.proto b/fluss-rpc/src/main/proto/FlussApi.proto index 4c68064a..0077ee8c 100644 --- a/fluss-rpc/src/main/proto/FlussApi.proto +++ b/fluss-rpc/src/main/proto/FlussApi.proto @@ -117,6 +117,16 @@ message DropTableRequest { message DropTableResponse { } +// rename table request and response +message RenameTableRequest { + required PbTablePath from_table_path = 1; + required PbTablePath to_table_path = 2; + required bool ignore_if_not_exists = 3; +} + +message RenameTableResponse { +} + // table exists request and response message TableExistsRequest { required PbTablePath table_path = 1; diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorService.java b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorService.java index 0271b5c2..4b9b7e5b 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorService.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorService.java @@ -42,6 +42,8 @@ import com.alibaba.fluss.rpc.messages.DropDatabaseResponse; import com.alibaba.fluss.rpc.messages.DropTableRequest; import com.alibaba.fluss.rpc.messages.DropTableResponse; +import com.alibaba.fluss.rpc.messages.RenameTableRequest; +import com.alibaba.fluss.rpc.messages.RenameTableResponse; import com.alibaba.fluss.server.RpcServiceBase; import com.alibaba.fluss.server.coordinator.event.AdjustIsrReceivedEvent; import com.alibaba.fluss.server.coordinator.event.CommitKvSnapshotEvent; @@ -208,6 +210,16 @@ public CompletableFuture dropTable(DropTableRequest request) return CompletableFuture.completedFuture(response); } + @Override + public CompletableFuture renameTable(RenameTableRequest request) { + RenameTableResponse response = new RenameTableResponse(); + metadataManager.renameTable( + toTablePath(request.getFromTablePath()), + toTablePath(request.getToTablePath()), + request.isIgnoreIfNotExists()); + return CompletableFuture.completedFuture(response); + } + public CompletableFuture adjustIsr(AdjustIsrRequest request) { CompletableFuture response = new CompletableFuture<>(); eventManagerSupplier diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/MetaDataManager.java b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/MetaDataManager.java index bea0b3a3..a1706726 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/MetaDataManager.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/MetaDataManager.java @@ -137,6 +137,25 @@ public void dropTable(TablePath tablePath, boolean ignoreIfNotExists) uncheck(() -> zookeeperClient.deleteTable(tablePath), "Fail to drop table: " + tablePath); } + public void renameTable( + TablePath fromTablePath, TablePath toTablePath, boolean ignoreIfNotExists) + throws TableNotExistException, TableAlreadyExistException { + if (!tableExists(fromTablePath)) { + if (ignoreIfNotExists) { + return; + } + throw new TableNotExistException("Table " + fromTablePath + " does not exist."); + } + + if (tableExists(toTablePath)) { + throw new TableAlreadyExistException("Table " + toTablePath + " already exists."); + } + + uncheck( + () -> zookeeperClient.renameTable(fromTablePath, toTablePath), + "Fail to rename table. from: " + fromTablePath + ", to: " + toTablePath); + } + public void completeDeleteTable(long tableId) { // final step for delete a table. // delete bucket assignments node, which will also delete the bucket state node, diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/zk/ZooKeeperClient.java b/fluss-server/src/main/java/com/alibaba/fluss/server/zk/ZooKeeperClient.java index 255b9d12..ba40088c 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/zk/ZooKeeperClient.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/zk/ZooKeeperClient.java @@ -353,6 +353,16 @@ public void deleteTable(TablePath tablePath) throws Exception { LOG.info("Deleted table {}.", tablePath); } + public void renameTable(TablePath fromTablePath, TablePath toTablePath) throws Exception { + byte[] tableBytes = zkClient.getData().forPath(TableZNode.path(fromTablePath)); + zkClient.create() + .creatingParentsIfNeeded() + .withMode(CreateMode.PERSISTENT) + .forPath(TableZNode.path(toTablePath), tableBytes); + deleteTable(fromTablePath); + LOG.info("Renamed table {} to {}.", fromTablePath, toTablePath); + } + public boolean tableExist(TablePath tablePath) throws Exception { String path = TableZNode.path(tablePath); Stat stat = zkClient.checkExists().forPath(path); diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/TestCoordinatorGateway.java b/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/TestCoordinatorGateway.java index a8473dcc..35739ba3 100644 --- a/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/TestCoordinatorGateway.java +++ b/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/TestCoordinatorGateway.java @@ -60,6 +60,8 @@ import com.alibaba.fluss.rpc.messages.ListTablesResponse; import com.alibaba.fluss.rpc.messages.MetadataRequest; import com.alibaba.fluss.rpc.messages.MetadataResponse; +import com.alibaba.fluss.rpc.messages.RenameTableRequest; +import com.alibaba.fluss.rpc.messages.RenameTableResponse; import com.alibaba.fluss.rpc.messages.TableExistsRequest; import com.alibaba.fluss.rpc.messages.TableExistsResponse; import com.alibaba.fluss.rpc.messages.UpdateMetadataRequest; @@ -122,6 +124,11 @@ public CompletableFuture dropTable(DropTableRequest request) throw new UnsupportedOperationException(); } + @Override + public CompletableFuture renameTable(RenameTableRequest request) { + throw new UnsupportedOperationException(); + } + @Override public CompletableFuture describeLakeStorage( DescribeLakeStorageRequest request) { diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/zk/ZooKeeperClientTest.java b/fluss-server/src/test/java/com/alibaba/fluss/server/zk/ZooKeeperClientTest.java index cd88d83b..fc7cb793 100644 --- a/fluss-server/src/test/java/com/alibaba/fluss/server/zk/ZooKeeperClientTest.java +++ b/fluss-server/src/test/java/com/alibaba/fluss/server/zk/ZooKeeperClientTest.java @@ -191,9 +191,17 @@ void testTable() throws Exception { assertThat(optionalTable.isPresent()).isTrue(); assertThat(optionalTable.get()).isEqualTo(tableReg); - // delete table. - zookeeperClient.deleteTable(tablePath); + // rename table. + TablePath toTablePath = TablePath.of("db", "tb_2"); + assertThat(zookeeperClient.getTable(toTablePath)).isEmpty(); + + zookeeperClient.renameTable(tablePath, toTablePath); assertThat(zookeeperClient.getTable(tablePath)).isEmpty(); + assertThat(zookeeperClient.getTable(toTablePath)).isNotEmpty(); + + // delete table. + zookeeperClient.deleteTable(toTablePath); + assertThat(zookeeperClient.getTable(toTablePath)).isEmpty(); } @Test