Skip to content

Commit

Permalink
API, Core, Spark: Change behavior of fastForward/replace to create th…
Browse files Browse the repository at this point in the history
…e from branch if it does not exist (#9196)
  • Loading branch information
amogh-jahagirdar authored Jan 22, 2024
1 parent 0f509d2 commit 70b7aa5
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 50 deletions.
6 changes: 4 additions & 2 deletions api/src/main/java/org/apache/iceberg/ManageSnapshots.java
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,8 @@ default ManageSnapshots createBranch(String name) {

/**
* Replaces the {@code from} branch to point to the {@code to} snapshot. The {@code to} will
* remain unchanged, and {@code from} branch will retain its retention properties.
* remain unchanged, and {@code from} branch will retain its retention properties. If the {@code
* from} branch does not exist, it will be created with default retention properties.
*
* @param from Branch to replace
* @param to The branch {@code from} should be replaced with
Expand All @@ -175,7 +176,8 @@ default ManageSnapshots createBranch(String name) {
/**
* Performs a fast-forward of {@code from} up to the {@code to} snapshot if {@code from} is an
* ancestor of {@code to}. The {@code to} will remain unchanged, and {@code from} will retain its
* retention properties.
* retention properties. If the {@code from} branch does not exist, it will be created with
* default retention properties.
*
* @param from Branch to fast-forward
* @param to Ref for the {@code from} branch to be fast forwarded to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,11 @@ private UpdateSnapshotReferencesOperation replaceBranch(
Preconditions.checkNotNull(to, "Destination ref cannot be null");
SnapshotRef branchToUpdate = updatedRefs.get(from);
SnapshotRef toRef = updatedRefs.get(to);
Preconditions.checkArgument(
branchToUpdate != null, "Branch to update does not exist: %s", from);
Preconditions.checkArgument(toRef != null, "Ref does not exist: %s", to);
if (branchToUpdate == null) {
return createBranch(from, toRef.snapshotId());
}

Preconditions.checkArgument(branchToUpdate.isBranch(), "Ref %s is a tag not a branch", from);

// Nothing to replace
Expand Down
35 changes: 21 additions & 14 deletions core/src/test/java/org/apache/iceberg/TestSnapshotManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -408,14 +408,6 @@ public void testReplaceBranch() {
table.ops().refresh().ref("branch1").snapshotId(), secondSnapshot.snapshotId());
}

@Test
public void testReplaceBranchNonExistingBranchToUpdateFails() {
Assertions.assertThatThrownBy(
() -> table.manageSnapshots().replaceBranch("non-existing", "other-branch").commit())
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Branch to update does not exist: non-existing");
}

@Test
public void testReplaceBranchNonExistingToBranchFails() {
table.newAppend().appendFile(FILE_A).commit();
Expand All @@ -428,12 +420,27 @@ public void testReplaceBranchNonExistingToBranchFails() {
}

@Test
public void testFastForwardBranchNonExistingFromBranchFails() {
Assertions.assertThatThrownBy(
() ->
table.manageSnapshots().fastForwardBranch("non-existing", "other-branch").commit())
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Branch to update does not exist: non-existing");
public void testFastForwardBranchNonExistingFromBranchCreatesTheBranch() {
table.newAppend().appendFile(FILE_A).commit();
long snapshotId = table.currentSnapshot().snapshotId();
table.manageSnapshots().createBranch("branch1", snapshotId).commit();
table.manageSnapshots().fastForwardBranch("new-branch", "branch1").commit();

Assertions.assertThat(table.ops().current().ref("new-branch").isBranch()).isTrue();
Assertions.assertThat(table.ops().current().ref("new-branch").snapshotId())
.isEqualTo(snapshotId);
}

@Test
public void testReplaceBranchNonExistingFromBranchCreatesTheBranch() {
table.newAppend().appendFile(FILE_A).commit();
long snapshotId = table.currentSnapshot().snapshotId();
table.manageSnapshots().createBranch("branch1", snapshotId).commit();
table.manageSnapshots().replaceBranch("new-branch", "branch1").commit();

Assertions.assertThat(table.ops().current().ref("new-branch").isBranch()).isTrue();
Assertions.assertThat(table.ops().current().ref("new-branch").snapshotId())
.isEqualTo(snapshotId);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,21 +190,8 @@ public void testInvalidFastForwardBranchCases() {
}

@Test
public void testFastForwardNonExistingBranchCases() {
public void testFastForwardNonExistingToRefFails() {
sql("CREATE TABLE %s (id int NOT NULL, data string) USING iceberg", tableName);

Table table = validationCatalog.loadTable(tableIdent);
table.refresh();

assertThatThrownBy(
() ->
sql(
"CALL %s.system.fast_forward(table => '%s', branch => '%s', to => '%s')",
catalogName, tableIdent, "non_existing_branch", SnapshotRef.MAIN_BRANCH))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Branch to fast-forward does not exist: non_existing_branch");

sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
assertThatThrownBy(
() ->
sql(
Expand Down Expand Up @@ -237,14 +224,37 @@ public void testFastForwardNonMain() {
sql("INSERT INTO TABLE %s VALUES (3, 'c')", tableNameWithBranch2);
table.refresh();
Snapshot branch2Snapshot = table.snapshot(branch2);
assertThat(
sql(
"CALL %s.system.fast_forward('%s', '%s', '%s')",
catalogName, tableIdent, branch1, branch2))
.containsExactly(row(branch1, branch1Snapshot.snapshotId(), branch2Snapshot.snapshotId()));
}

List<Object[]> output =
sql(
"CALL %s.system.fast_forward('%s', '%s', '%s')",
catalogName, tableIdent, branch1, branch2);
List<Object> outputRow = Arrays.stream(output.get(0)).collect(Collectors.toList());
assertThat(outputRow.get(0)).isEqualTo(branch1);
assertThat(outputRow.get(1)).isEqualTo(branch1Snapshot.snapshotId());
assertThat(outputRow.get(2)).isEqualTo(branch2Snapshot.snapshotId());
@Test
public void testFastForwardNonExistingFromMainCreatesBranch() {
sql("CREATE TABLE %s (id int NOT NULL, data string) USING iceberg", tableName);
String branch1 = "branch1";
sql("ALTER TABLE %s CREATE BRANCH %s", tableName, branch1);
String branchIdentifier = String.format("%s.branch_%s", tableName, branch1);
sql("INSERT INTO TABLE %s VALUES (1, 'a')", branchIdentifier);
sql("INSERT INTO TABLE %s VALUES (2, 'b')", branchIdentifier);
Table table = validationCatalog.loadTable(tableIdent);
table.refresh();
Snapshot branch1Snapshot = table.snapshot(branch1);

assertThat(
sql(
"CALL %s.system.fast_forward('%s', '%s', '%s')",
catalogName, tableIdent, SnapshotRef.MAIN_BRANCH, branch1))
.containsExactly(row(SnapshotRef.MAIN_BRANCH, null, branch1Snapshot.snapshotId()));

// Ensure the same behavior for non-main branches
String branch2 = "branch2";
assertThat(
sql(
"CALL %s.system.fast_forward('%s', '%s', '%s')",
catalogName, tableIdent, branch2, branch1))
.containsExactly(row(branch2, null, branch1Snapshot.snapshotId()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
*/
package org.apache.iceberg.spark.procedures;

import org.apache.iceberg.Snapshot;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.TableCatalog;
Expand Down Expand Up @@ -73,19 +71,18 @@ public StructType outputType() {
@Override
public InternalRow[] call(InternalRow args) {
Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name());
String source = args.getString(1);
String target = args.getString(2);
String from = args.getString(1);
String to = args.getString(2);

return modifyIcebergTable(
tableIdent,
table -> {
Snapshot currentSnapshot = table.snapshot(source);
Preconditions.checkArgument(
currentSnapshot != null, "Branch to fast-forward does not exist: %s", source);
table.manageSnapshots().fastForwardBranch(source, target).commit();
long latest = table.snapshot(source).snapshotId();
Long snapshotBefore =
table.snapshot(from) != null ? table.snapshot(from).snapshotId() : null;
table.manageSnapshots().fastForwardBranch(from, to).commit();
long snapshotAfter = table.snapshot(from).snapshotId();
InternalRow outputRow =
newInternalRow(UTF8String.fromString(source), currentSnapshot.snapshotId(), latest);
newInternalRow(UTF8String.fromString(from), snapshotBefore, snapshotAfter);
return new InternalRow[] {outputRow};
});
}
Expand Down

0 comments on commit 70b7aa5

Please sign in to comment.