From 70b7aa534b2c79ccd7b6c0e0fd1be980772bb20a Mon Sep 17 00:00:00 2001 From: Amogh Jahagirdar Date: Mon, 22 Jan 2024 14:56:43 -0800 Subject: [PATCH] API, Core, Spark: Change behavior of fastForward/replace to create the from branch if it does not exist (#9196) --- .../org/apache/iceberg/ManageSnapshots.java | 6 ++- .../UpdateSnapshotReferencesOperation.java | 6 ++- .../apache/iceberg/TestSnapshotManager.java | 35 +++++++----- .../TestFastForwardBranchProcedure.java | 54 +++++++++++-------- .../FastForwardBranchProcedure.java | 17 +++--- 5 files changed, 68 insertions(+), 50 deletions(-) diff --git a/api/src/main/java/org/apache/iceberg/ManageSnapshots.java b/api/src/main/java/org/apache/iceberg/ManageSnapshots.java index 986bbb6f5809..12cd5021fa3e 100644 --- a/api/src/main/java/org/apache/iceberg/ManageSnapshots.java +++ b/api/src/main/java/org/apache/iceberg/ManageSnapshots.java @@ -164,7 +164,8 @@ default ManageSnapshots createBranch(String name) { /** * Replaces the {@code from} branch to point to the {@code to} snapshot. The {@code to} will - * remain unchanged, and {@code from} branch will retain its retention properties. + * remain unchanged, and {@code from} branch will retain its retention properties. If the {@code + * from} branch does not exist, it will be created with default retention properties. * * @param from Branch to replace * @param to The branch {@code from} should be replaced with @@ -175,7 +176,8 @@ default ManageSnapshots createBranch(String name) { /** * Performs a fast-forward of {@code from} up to the {@code to} snapshot if {@code from} is an * ancestor of {@code to}. The {@code to} will remain unchanged, and {@code from} will retain its - * retention properties. + * retention properties. If the {@code from} branch does not exist, it will be created with + * default retention properties. * * @param from Branch to fast-forward * @param to Ref for the {@code from} branch to be fast forwarded to diff --git a/core/src/main/java/org/apache/iceberg/UpdateSnapshotReferencesOperation.java b/core/src/main/java/org/apache/iceberg/UpdateSnapshotReferencesOperation.java index 2c3c6c1f7e10..9d15bf0ee207 100644 --- a/core/src/main/java/org/apache/iceberg/UpdateSnapshotReferencesOperation.java +++ b/core/src/main/java/org/apache/iceberg/UpdateSnapshotReferencesOperation.java @@ -120,9 +120,11 @@ private UpdateSnapshotReferencesOperation replaceBranch( Preconditions.checkNotNull(to, "Destination ref cannot be null"); SnapshotRef branchToUpdate = updatedRefs.get(from); SnapshotRef toRef = updatedRefs.get(to); - Preconditions.checkArgument( - branchToUpdate != null, "Branch to update does not exist: %s", from); Preconditions.checkArgument(toRef != null, "Ref does not exist: %s", to); + if (branchToUpdate == null) { + return createBranch(from, toRef.snapshotId()); + } + Preconditions.checkArgument(branchToUpdate.isBranch(), "Ref %s is a tag not a branch", from); // Nothing to replace diff --git a/core/src/test/java/org/apache/iceberg/TestSnapshotManager.java b/core/src/test/java/org/apache/iceberg/TestSnapshotManager.java index d561d697d3e9..fd22ae24d0e4 100644 --- a/core/src/test/java/org/apache/iceberg/TestSnapshotManager.java +++ b/core/src/test/java/org/apache/iceberg/TestSnapshotManager.java @@ -408,14 +408,6 @@ public void testReplaceBranch() { table.ops().refresh().ref("branch1").snapshotId(), secondSnapshot.snapshotId()); } - @Test - public void testReplaceBranchNonExistingBranchToUpdateFails() { - Assertions.assertThatThrownBy( - () -> table.manageSnapshots().replaceBranch("non-existing", "other-branch").commit()) - .isInstanceOf(IllegalArgumentException.class) - .hasMessage("Branch to update does not exist: non-existing"); - } - @Test public void testReplaceBranchNonExistingToBranchFails() { table.newAppend().appendFile(FILE_A).commit(); @@ -428,12 +420,27 @@ public void testReplaceBranchNonExistingToBranchFails() { } @Test - public void testFastForwardBranchNonExistingFromBranchFails() { - Assertions.assertThatThrownBy( - () -> - table.manageSnapshots().fastForwardBranch("non-existing", "other-branch").commit()) - .isInstanceOf(IllegalArgumentException.class) - .hasMessage("Branch to update does not exist: non-existing"); + public void testFastForwardBranchNonExistingFromBranchCreatesTheBranch() { + table.newAppend().appendFile(FILE_A).commit(); + long snapshotId = table.currentSnapshot().snapshotId(); + table.manageSnapshots().createBranch("branch1", snapshotId).commit(); + table.manageSnapshots().fastForwardBranch("new-branch", "branch1").commit(); + + Assertions.assertThat(table.ops().current().ref("new-branch").isBranch()).isTrue(); + Assertions.assertThat(table.ops().current().ref("new-branch").snapshotId()) + .isEqualTo(snapshotId); + } + + @Test + public void testReplaceBranchNonExistingFromBranchCreatesTheBranch() { + table.newAppend().appendFile(FILE_A).commit(); + long snapshotId = table.currentSnapshot().snapshotId(); + table.manageSnapshots().createBranch("branch1", snapshotId).commit(); + table.manageSnapshots().replaceBranch("new-branch", "branch1").commit(); + + Assertions.assertThat(table.ops().current().ref("new-branch").isBranch()).isTrue(); + Assertions.assertThat(table.ops().current().ref("new-branch").snapshotId()) + .isEqualTo(snapshotId); } @Test diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestFastForwardBranchProcedure.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestFastForwardBranchProcedure.java index 99bc86248519..0c99c3e07f6b 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestFastForwardBranchProcedure.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestFastForwardBranchProcedure.java @@ -190,21 +190,8 @@ public void testInvalidFastForwardBranchCases() { } @Test - public void testFastForwardNonExistingBranchCases() { + public void testFastForwardNonExistingToRefFails() { sql("CREATE TABLE %s (id int NOT NULL, data string) USING iceberg", tableName); - - Table table = validationCatalog.loadTable(tableIdent); - table.refresh(); - - assertThatThrownBy( - () -> - sql( - "CALL %s.system.fast_forward(table => '%s', branch => '%s', to => '%s')", - catalogName, tableIdent, "non_existing_branch", SnapshotRef.MAIN_BRANCH)) - .isInstanceOf(IllegalArgumentException.class) - .hasMessage("Branch to fast-forward does not exist: non_existing_branch"); - - sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName); assertThatThrownBy( () -> sql( @@ -237,14 +224,37 @@ public void testFastForwardNonMain() { sql("INSERT INTO TABLE %s VALUES (3, 'c')", tableNameWithBranch2); table.refresh(); Snapshot branch2Snapshot = table.snapshot(branch2); + assertThat( + sql( + "CALL %s.system.fast_forward('%s', '%s', '%s')", + catalogName, tableIdent, branch1, branch2)) + .containsExactly(row(branch1, branch1Snapshot.snapshotId(), branch2Snapshot.snapshotId())); + } - List output = - sql( - "CALL %s.system.fast_forward('%s', '%s', '%s')", - catalogName, tableIdent, branch1, branch2); - List outputRow = Arrays.stream(output.get(0)).collect(Collectors.toList()); - assertThat(outputRow.get(0)).isEqualTo(branch1); - assertThat(outputRow.get(1)).isEqualTo(branch1Snapshot.snapshotId()); - assertThat(outputRow.get(2)).isEqualTo(branch2Snapshot.snapshotId()); + @Test + public void testFastForwardNonExistingFromMainCreatesBranch() { + sql("CREATE TABLE %s (id int NOT NULL, data string) USING iceberg", tableName); + String branch1 = "branch1"; + sql("ALTER TABLE %s CREATE BRANCH %s", tableName, branch1); + String branchIdentifier = String.format("%s.branch_%s", tableName, branch1); + sql("INSERT INTO TABLE %s VALUES (1, 'a')", branchIdentifier); + sql("INSERT INTO TABLE %s VALUES (2, 'b')", branchIdentifier); + Table table = validationCatalog.loadTable(tableIdent); + table.refresh(); + Snapshot branch1Snapshot = table.snapshot(branch1); + + assertThat( + sql( + "CALL %s.system.fast_forward('%s', '%s', '%s')", + catalogName, tableIdent, SnapshotRef.MAIN_BRANCH, branch1)) + .containsExactly(row(SnapshotRef.MAIN_BRANCH, null, branch1Snapshot.snapshotId())); + + // Ensure the same behavior for non-main branches + String branch2 = "branch2"; + assertThat( + sql( + "CALL %s.system.fast_forward('%s', '%s', '%s')", + catalogName, tableIdent, branch2, branch1)) + .containsExactly(row(branch2, null, branch1Snapshot.snapshotId())); } } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/FastForwardBranchProcedure.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/FastForwardBranchProcedure.java index 83908f284b91..11ea5d44c9f8 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/FastForwardBranchProcedure.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/FastForwardBranchProcedure.java @@ -18,8 +18,6 @@ */ package org.apache.iceberg.spark.procedures; -import org.apache.iceberg.Snapshot; -import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.connector.catalog.Identifier; import org.apache.spark.sql.connector.catalog.TableCatalog; @@ -73,19 +71,18 @@ public StructType outputType() { @Override public InternalRow[] call(InternalRow args) { Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name()); - String source = args.getString(1); - String target = args.getString(2); + String from = args.getString(1); + String to = args.getString(2); return modifyIcebergTable( tableIdent, table -> { - Snapshot currentSnapshot = table.snapshot(source); - Preconditions.checkArgument( - currentSnapshot != null, "Branch to fast-forward does not exist: %s", source); - table.manageSnapshots().fastForwardBranch(source, target).commit(); - long latest = table.snapshot(source).snapshotId(); + Long snapshotBefore = + table.snapshot(from) != null ? table.snapshot(from).snapshotId() : null; + table.manageSnapshots().fastForwardBranch(from, to).commit(); + long snapshotAfter = table.snapshot(from).snapshotId(); InternalRow outputRow = - newInternalRow(UTF8String.fromString(source), currentSnapshot.snapshotId(), latest); + newInternalRow(UTF8String.fromString(from), snapshotBefore, snapshotAfter); return new InternalRow[] {outputRow}; }); }