Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Support rename table in flink catalog. #77

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,25 @@ CompletableFuture<Void> createTable(
*/
CompletableFuture<Void> deleteTable(TablePath tablePath, boolean ignoreIfNotExists);

/**
* Rename the table with the given table path asynchronously.
*
* <p>The following exceptions can be anticipated when calling {@code get()} on returned future.
*
* <ul>
* <li>{@link TableNotExistException} if the source table does not exist and {@code
* ignoreIfNotExists} is false.
* <li>{@link TableAlreadyExistException} if the target table already exists.
* </ul>
*
* @param fromTablePath The source table path of the table.
* @param toTablePath The target table path of the table.
* @param ignoreIfNotExists Flag to specify behavior when a table with the given name does not
* exist: if set to false, throw a TableNotExistException, if set to true, do nothing.
*/
CompletableFuture<Void> renameTable(
TablePath fromTablePath, TablePath toTablePath, boolean ignoreIfNotExists);

/**
* Get whether table exists asynchronously.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@
import com.alibaba.fluss.rpc.messages.ListTablesRequest;
import com.alibaba.fluss.rpc.messages.ListTablesResponse;
import com.alibaba.fluss.rpc.messages.PbListOffsetsRespForBucket;
import com.alibaba.fluss.rpc.messages.PbTablePath;
import com.alibaba.fluss.rpc.messages.RenameTableRequest;
import com.alibaba.fluss.rpc.messages.TableExistsRequest;
import com.alibaba.fluss.rpc.messages.TableExistsResponse;
import com.alibaba.fluss.rpc.protocol.ApiError;
Expand Down Expand Up @@ -189,6 +191,21 @@ public CompletableFuture<Void> deleteTable(TablePath tablePath, boolean ignoreIf
return gateway.dropTable(request).thenApply(r -> null);
}

@Override
public CompletableFuture<Void> renameTable(
TablePath fromTablePath, TablePath toTablePath, boolean ignoreIfNotExists) {
RenameTableRequest request = new RenameTableRequest();
request.setIgnoreIfNotExists(ignoreIfNotExists)
.setFromTablePath(
new PbTablePath()
.setDatabaseName(fromTablePath.getDatabaseName())
.setTableName(fromTablePath.getTableName()))
.setToTablePath()
.setDatabaseName(toTablePath.getDatabaseName())
.setTableName(toTablePath.getTableName());
return gateway.renameTable(request).thenApply(r -> null);
}

@Override
public CompletableFuture<Boolean> tableExists(TablePath tablePath) {
TableExistsRequest request = new TableExistsRequest();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,9 +317,27 @@ public void dropTable(ObjectPath objectPath, boolean ignoreIfNotExists)
}

@Override
public void renameTable(ObjectPath objectPath, String s, boolean b)
public void renameTable(ObjectPath objectPath, String newTableName, boolean ignoreIfNotExists)
throws TableNotExistException, TableAlreadyExistException, CatalogException {
throw new UnsupportedOperationException();
TablePath fromTablePath = toTablePath(objectPath);
ObjectPath toObjectPath = new ObjectPath(objectPath.getDatabaseName(), newTableName);
TablePath toTablePath = toTablePath(toObjectPath);
try {
admin.renameTable(fromTablePath, toTablePath, ignoreIfNotExists).get();
} catch (Exception e) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to consider lake tables ?

Throwable t = ExceptionUtils.stripExecutionException(e);
if (CatalogExceptionUtil.isTableNotExist(t)) {
throw new TableNotExistException(getName(), objectPath);
} else if (CatalogExceptionUtil.isTableAlreadyExist(t)) {
throw new TableAlreadyExistException(getName(), toObjectPath);
} else {
throw new CatalogException(
String.format(
"Failed to rename table %s to %s in %s",
objectPath, toObjectPath, getName()),
t);
}
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,29 @@ void testDatabase() throws Exception {
.hasMessage("Database %s does not exist in Catalog %s.", "unknown", CATALOG_NAME);
}

@Test
void testRenameTable() throws Exception {
Map<String, String> options = new HashMap<>();
CatalogTable table = this.newCatalogTable(options);
catalog.createTable(this.tableInDefaultDb, table, false);
assertThat(catalog.tableExists(this.tableInDefaultDb)).isTrue();
// rename table name from 't1' to 't2'.
catalog.renameTable(this.tableInDefaultDb, "t2", false);
ObjectPath t2InDefaultDb = new ObjectPath(DEFAULT_DB, "t2");
assertThat(catalog.tableExists(t2InDefaultDb)).isTrue();
assertThat(catalog.tableExists(this.tableInDefaultDb)).isFalse();

// rename an existing table from 't2' to 't3', should throw exception.
ObjectPath t3InDefaultDb = new ObjectPath(DEFAULT_DB, "t3");
catalog.createTable(t3InDefaultDb, table, false);
assertThatThrownBy(() -> catalog.renameTable(t2InDefaultDb, "t3", false))
.isInstanceOf(TableAlreadyExistException.class)
.hasMessage(
String.format(
"Table (or view) %s already exists in Catalog %s.",
t3InDefaultDb, CATALOG_NAME));
}

private void createAndCheckAndDropTable(
final ResolvedSchema schema, ObjectPath tablePath, Map<String, String> options)
throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import com.alibaba.fluss.rpc.messages.DropDatabaseResponse;
import com.alibaba.fluss.rpc.messages.DropTableRequest;
import com.alibaba.fluss.rpc.messages.DropTableResponse;
import com.alibaba.fluss.rpc.messages.RenameTableRequest;
import com.alibaba.fluss.rpc.messages.RenameTableResponse;
import com.alibaba.fluss.rpc.protocol.ApiKeys;
import com.alibaba.fluss.rpc.protocol.RPC;

Expand Down Expand Up @@ -63,5 +65,13 @@ public interface AdminGateway extends AdminReadOnlyGateway {
@RPC(api = ApiKeys.DROP_TABLE)
CompletableFuture<DropTableResponse> dropTable(DropTableRequest request);

// todo: rename table & alter table
/**
* Rename a table.
*
* @param request Rename table request
*/
@RPC(api = ApiKeys.RENAME_TABLE)
CompletableFuture<RenameTableResponse> renameTable(RenameTableRequest request);

// todo: alter table
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ public enum ApiKeys {
NOTIFY_LAKE_TABLE_OFFSET(1031, 0, 0, PRIVATE),
DESCRIBE_LAKE_STORAGE(1032, 0, 0, PUBLIC),
GET_LAKE_TABLE_SNAPSHOT(1033, 0, 0, PUBLIC),
LIMIT_SCAN(1034, 0, 0, PUBLIC);
LIMIT_SCAN(1034, 0, 0, PUBLIC),
RENAME_TABLE(1035, 0, 0, PUBLIC);

private static final Map<Integer, ApiKeys> ID_TO_TYPE =
Arrays.stream(ApiKeys.values())
Expand Down
10 changes: 10 additions & 0 deletions fluss-rpc/src/main/proto/FlussApi.proto
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,16 @@ message DropTableRequest {
message DropTableResponse {
}

// rename table request and response
message RenameTableRequest {
required PbTablePath from_table_path = 1;
required PbTablePath to_table_path = 2;
required bool ignore_if_not_exists = 3;
}

message RenameTableResponse {
}

// table exists request and response
message TableExistsRequest {
required PbTablePath table_path = 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
import com.alibaba.fluss.rpc.messages.DropDatabaseResponse;
import com.alibaba.fluss.rpc.messages.DropTableRequest;
import com.alibaba.fluss.rpc.messages.DropTableResponse;
import com.alibaba.fluss.rpc.messages.RenameTableRequest;
import com.alibaba.fluss.rpc.messages.RenameTableResponse;
import com.alibaba.fluss.server.RpcServiceBase;
import com.alibaba.fluss.server.coordinator.event.AdjustIsrReceivedEvent;
import com.alibaba.fluss.server.coordinator.event.CommitKvSnapshotEvent;
Expand Down Expand Up @@ -208,6 +210,16 @@ public CompletableFuture<DropTableResponse> dropTable(DropTableRequest request)
return CompletableFuture.completedFuture(response);
}

@Override
public CompletableFuture<RenameTableResponse> renameTable(RenameTableRequest request) {
RenameTableResponse response = new RenameTableResponse();
metadataManager.renameTable(
toTablePath(request.getFromTablePath()),
toTablePath(request.getToTablePath()),
request.isIgnoreIfNotExists());
return CompletableFuture.completedFuture(response);
}

public CompletableFuture<AdjustIsrResponse> adjustIsr(AdjustIsrRequest request) {
CompletableFuture<AdjustIsrResponse> response = new CompletableFuture<>();
eventManagerSupplier
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,25 @@ public void dropTable(TablePath tablePath, boolean ignoreIfNotExists)
uncheck(() -> zookeeperClient.deleteTable(tablePath), "Fail to drop table: " + tablePath);
}

public void renameTable(
TablePath fromTablePath, TablePath toTablePath, boolean ignoreIfNotExists)
throws TableNotExistException, TableAlreadyExistException {
if (!tableExists(fromTablePath)) {
if (ignoreIfNotExists) {
return;
}
throw new TableNotExistException("Table " + fromTablePath + " does not exist.");
}

if (tableExists(toTablePath)) {
throw new TableAlreadyExistException("Table " + toTablePath + " already exists.");
}

uncheck(
() -> zookeeperClient.renameTable(fromTablePath, toTablePath),
"Fail to rename table. from: " + fromTablePath + ", to: " + toTablePath);
}

public void completeDeleteTable(long tableId) {
// final step for delete a table.
// delete bucket assignments node, which will also delete the bucket state node,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,16 @@ public void deleteTable(TablePath tablePath) throws Exception {
LOG.info("Deleted table {}.", tablePath);
}

public void renameTable(TablePath fromTablePath, TablePath toTablePath) throws Exception {
byte[] tableBytes = zkClient.getData().forPath(TableZNode.path(fromTablePath));
zkClient.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.forPath(TableZNode.path(toTablePath), tableBytes);
deleteTable(fromTablePath);
LOG.info("Renamed table {} to {}.", fromTablePath, toTablePath);
}

public boolean tableExist(TablePath tablePath) throws Exception {
String path = TableZNode.path(tablePath);
Stat stat = zkClient.checkExists().forPath(path);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@
import com.alibaba.fluss.rpc.messages.ListTablesResponse;
import com.alibaba.fluss.rpc.messages.MetadataRequest;
import com.alibaba.fluss.rpc.messages.MetadataResponse;
import com.alibaba.fluss.rpc.messages.RenameTableRequest;
import com.alibaba.fluss.rpc.messages.RenameTableResponse;
import com.alibaba.fluss.rpc.messages.TableExistsRequest;
import com.alibaba.fluss.rpc.messages.TableExistsResponse;
import com.alibaba.fluss.rpc.messages.UpdateMetadataRequest;
Expand Down Expand Up @@ -122,6 +124,11 @@ public CompletableFuture<DropTableResponse> dropTable(DropTableRequest request)
throw new UnsupportedOperationException();
}

@Override
public CompletableFuture<RenameTableResponse> renameTable(RenameTableRequest request) {
throw new UnsupportedOperationException();
}

@Override
public CompletableFuture<DescribeLakeStorageResponse> describeLakeStorage(
DescribeLakeStorageRequest request) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,9 +191,17 @@ void testTable() throws Exception {
assertThat(optionalTable.isPresent()).isTrue();
assertThat(optionalTable.get()).isEqualTo(tableReg);

// delete table.
zookeeperClient.deleteTable(tablePath);
// rename table.
TablePath toTablePath = TablePath.of("db", "tb_2");
assertThat(zookeeperClient.getTable(toTablePath)).isEmpty();

zookeeperClient.renameTable(tablePath, toTablePath);
assertThat(zookeeperClient.getTable(tablePath)).isEmpty();
assertThat(zookeeperClient.getTable(toTablePath)).isNotEmpty();

// delete table.
zookeeperClient.deleteTable(toTablePath);
assertThat(zookeeperClient.getTable(toTablePath)).isEmpty();
}

@Test
Expand Down