Skip to content

Commit

Permalink
[core] Throw more clear exception for rescale bucket -1 (apache#3589)
Browse files Browse the repository at this point in the history
  • Loading branch information
yuzelin authored Aug 21, 2024
1 parent a3eed25 commit 24d8a3b
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ public TableSchema commitChanges(List<SchemaChange> changes)
new Catalog.TableNotExistException(
identifierFromPath(
tableRoot.toString(), true, branch)));
Map<String, String> oldOptions = new HashMap<>(oldTableSchema.options());
Map<String, String> newOptions = new HashMap<>(oldTableSchema.options());
List<DataField> newFields = new ArrayList<>(oldTableSchema.fields());
AtomicInteger highestFieldId = new AtomicInteger(oldTableSchema.highestFieldId());
Expand All @@ -203,13 +204,17 @@ public TableSchema commitChanges(List<SchemaChange> changes)
if (change instanceof SetOption) {
SetOption setOption = (SetOption) change;
if (hasSnapshots) {
checkAlterTableOption(setOption.key());
checkAlterTableOption(
setOption.key(),
oldOptions.get(setOption.key()),
setOption.value(),
false);
}
newOptions.put(setOption.key(), setOption.value());
} else if (change instanceof RemoveOption) {
RemoveOption removeOption = (RemoveOption) change;
if (hasSnapshots) {
checkAlterTableOption(removeOption.key());
checkResetTableOption(removeOption.key());
}
newOptions.remove(removeOption.key());
} else if (change instanceof UpdateComment) {
Expand Down Expand Up @@ -578,11 +583,42 @@ public void deleteSchema(long schemaId) {
fileIO.deleteQuietly(toSchemaPath(schemaId));
}

public static void checkAlterTableOption(String key) {
public static void checkAlterTableOption(
String key, @Nullable String oldValue, String newValue, boolean fromDynamicOptions) {
if (CoreOptions.IMMUTABLE_OPTIONS.contains(key)) {
throw new UnsupportedOperationException(
String.format("Change '%s' is not supported yet.", key));
}

if (CoreOptions.BUCKET.key().equals(key)) {
int oldBucket =
oldValue == null
? CoreOptions.BUCKET.defaultValue()
: Integer.parseInt(oldValue);
int newBucket = Integer.parseInt(newValue);

if (fromDynamicOptions) {
throw new UnsupportedOperationException(
"Cannot change bucket number through dynamic options. You might need to rescale bucket.");
}
if (oldBucket == -1) {
throw new UnsupportedOperationException("Cannot change bucket when it is -1.");
}
if (newBucket == -1) {
throw new UnsupportedOperationException("Cannot change bucket to -1.");
}
}
}

public static void checkResetTableOption(String key) {
if (CoreOptions.IMMUTABLE_OPTIONS.contains(key)) {
throw new UnsupportedOperationException(
String.format("Change '%s' is not supported yet.", key));
}

if (CoreOptions.BUCKET.key().equals(key)) {
throw new UnsupportedOperationException(String.format("Cannot reset %s.", key));
}
}

public static void checkAlterTablePath(String key) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,12 +251,13 @@ public FileStoreTable copyWithoutTimeTravel(Map<String, String> dynamicOptions)
}

private void checkImmutability(Map<String, String> dynamicOptions) {
Map<String, String> options = tableSchema.options();
Map<String, String> oldOptions = tableSchema.options();
// check option is not immutable
dynamicOptions.forEach(
(k, v) -> {
if (!Objects.equals(v, options.get(k))) {
SchemaManager.checkAlterTableOption(k);
(k, newValue) -> {
String oldValue = oldOptions.get(k);
if (!Objects.equals(oldValue, newValue)) {
SchemaManager.checkAlterTableOption(k, oldValue, newValue, true);

if (CoreOptions.BUCKET.key().equals(k)) {
throw new UnsupportedOperationException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.BiConsumer;
Expand Down Expand Up @@ -241,10 +242,10 @@ static Table buildPaimonTable(DynamicTableFactory.Context context) {

Map<String, String> dynamicOptions = getDynamicTableConfigOptions(context);
dynamicOptions.forEach(
(key, value) -> {
if (origin.getOptions().get(key) == null
|| !origin.getOptions().get(key).equals(value)) {
SchemaManager.checkAlterTableOption(key);
(key, newValue) -> {
String oldValue = origin.getOptions().get(key);
if (!Objects.equals(oldValue, newValue)) {
SchemaManager.checkAlterTableOption(key, oldValue, newValue, true);
}
});
Map<String, String> newOptions = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1030,6 +1030,33 @@ public void testAlterTableMetadataComment() {
+ ")")
.doesNotContain("schema");
// change name from non-physical column to physical column is not allowed
assertThatThrownBy(() -> sql("ALTER TABLE T MODIFY name VARCHAR COMMENT 'header3'"));
assertThatThrownBy(() -> sql("ALTER TABLE T MODIFY name VARCHAR COMMENT 'header3'"))
.satisfies(
anyCauseMatches(
UnsupportedOperationException.class,
"Change is not supported: class org.apache.flink.table.catalog.TableChange$ModifyColumn"));
}

@Test
public void testAlterBucket() {
sql("CREATE TABLE T1 (a INT PRIMARY KEY NOT ENFORCED, b STRING) WITH ('bucket' = '-1')");
sql("INSERT INTO T1 VALUES (1, '1')");
assertThatThrownBy(() -> sql("ALTER TABLE T1 RESET ('bucket')"))
.satisfies(
anyCauseMatches(
UnsupportedOperationException.class, "Cannot reset bucket."));
assertThatThrownBy(() -> sql("ALTER TABLE T1 SET ('bucket' = '1')"))
.satisfies(
anyCauseMatches(
UnsupportedOperationException.class,
"Cannot change bucket when it is -1."));

sql("CREATE TABLE T2 (a INT PRIMARY KEY NOT ENFORCED, b STRING) WITH ('bucket' = '1')");
sql("INSERT INTO T2 VALUES (1, '1')");
assertThatThrownBy(() -> sql("ALTER TABLE T2 SET ('bucket' = '-1')"))
.satisfies(
anyCauseMatches(
UnsupportedOperationException.class,
"Cannot change bucket to -1."));
}
}

0 comments on commit 24d8a3b

Please sign in to comment.