From ae1d4301fa0fb067e0f424fc97ff8669a3b194fb Mon Sep 17 00:00:00 2001 From: Eduard Tudenhoefner Date: Sat, 30 Mar 2024 09:07:45 +0100 Subject: [PATCH] Spark: Clarify schema behavior when working with branches (#10055) --- docs/docs/branching.md | 91 +++++++++++++++++++ docs/docs/spark-queries.md | 23 ++++- docs/docs/spark-writes.md | 4 + .../apache/iceberg/spark/sql/TestSelect.java | 59 ++++++++++++ 4 files changed, 176 insertions(+), 1 deletion(-) diff --git a/docs/docs/branching.md b/docs/docs/branching.md index e944e4eb3bc2..3379264d8a5f 100644 --- a/docs/docs/branching.md +++ b/docs/docs/branching.md @@ -113,3 +113,94 @@ Creating, querying and writing to branches and tags are supported in the Iceberg - [Spark Branch Writes](spark-writes.md#writing-to-branches) - [Flink Reads](flink-queries.md#reading-branches-and-tags-with-SQL) - [Flink Branch Writes](flink-writes.md#branch-writes) + + +## Schema selection with branches and tags + +It is important to understand that the schema tracked for a table is valid across all branches. +When working with branches, the table's schema is used as that's the schema being validated when writing data to a branch. +On the other hands, querying a tag uses the snapshot's schema, which is the schema id that snapshot pointed to when the snapshot was created. + +The below examples show which schema is being used when working with branches. + +Create a table and insert some data: + +```sql +CREATE TABLE db.table (id bigint, data string, col float); +INSERT INTO db.table values (1, 'a', 1.0), (2, 'b', 2.0), (3, 'c', 3.0); +SELECT * FROM db.table; +1 a 1.0 +2 b 2.0 +3 c 3.0 +``` + +Create a branch `test_branch` that points to the current snapshot and read data from the branch: + +```sql +ALTER TABLE db.table CREATE BRANCH test_branch; + +SELECT * FROM db.table.branch_test_branch; +1 a 1.0 +2 b 2.0 +3 c 3.0 +``` + +Modify the table's schema by dropping the `col` column and adding a new column named `new_col`: + +```sql +ALTER TABLE db.table drop column float; + +ALTER TABLE db.table add column new_col date; + +INSERT INTO db.table values (4, 'd', date('2024-04-04')), (5, 'e', date('2024-05-05')); + +SELECT * FROM db.table; +1 a NULL +2 b NULL +3 c NULL +4 d 2024-04-04 +5 e 2024-05-05 +``` + +Querying the head of the branch using one of the below statements will return data using the **table's schema**: + +```sql +SELECT * FROM db.table.branch_test_branch; +1 a NULL +2 b NULL +3 c NULL + +SELECT * FROM db.table VERSION AS OF 'test_branch'; +1 a NULL +2 b NULL +3 c NULL +``` + +Performing a time travel query using the snapshot id uses the **snapshot's schema**: + +```sql + +SELECT * FROM db.table.refs; +test_branch BRANCH 8109744798576441359 NULL NULL NULL +main BRANCH 6910357365743665710 NULL NULL NULL + + +SELECT * FROM db.table VERSION AS OF 8109744798576441359; +1 a 1.0 +2 b 2.0 +3 c 3.0 +``` + +When writing to the branch, the **table's schema** is used for validation: + +```sql + +INSERT INTO db.table.branch_test_branch values (6, 'e', date('2024-06-06')), (7, 'g', date('2024-07-07')); + +SELECT * FROM db.table.branch_test_branch; +6 e 2024-06-06 +7 g 2024-07-07 +1 a NULL +2 b NULL +3 c NULL +``` diff --git a/docs/docs/spark-queries.md b/docs/docs/spark-queries.md index 536c136d7e55..b606d849a692 100644 --- a/docs/docs/spark-queries.md +++ b/docs/docs/spark-queries.md @@ -82,7 +82,7 @@ The `VERSION AS OF` clause can contain a long snapshot ID or a string branch or If this is not desired, rename the tag or branch with a well-defined prefix such as 'snapshot-1'. -```sql +```sql -- time travel to October 26, 1986 at 01:21:00 SELECT * FROM prod.db.table TIMESTAMP AS OF '1986-10-26 01:21:00'; @@ -124,6 +124,27 @@ SELECT * FROM prod.db.table.`tag_historical-snapshot`; Note that the identifier with branch or tag may not be used in combination with `VERSION AS OF`. + +#### Schema selection in time travel queries + +The different time travel queries mentioned in the previous section can use either the snapshot's schema or the table's schema: + +```sql +-- time travel to October 26, 1986 at 01:21:00 -> uses the snapshot's schema +SELECT * FROM prod.db.table TIMESTAMP AS OF '1986-10-26 01:21:00'; + +-- time travel to snapshot with id 10963874102873L -> uses the snapshot's schema +SELECT * FROM prod.db.table VERSION AS OF 10963874102873; + +-- time travel to the head of audit-branch -> uses the table's schema +SELECT * FROM prod.db.table VERSION AS OF 'audit-branch'; +SELECT * FROM prod.db.table.`branch_audit-branch`; + +-- time travel to the snapshot referenced by the tag historical-snapshot -> uses the snapshot's schema +SELECT * FROM prod.db.table VERSION AS OF 'historical-snapshot'; +SELECT * FROM prod.db.table.`tag_historical-snapshot`; +``` + #### DataFrame To select a specific table snapshot or the snapshot at some time in the DataFrame API, Iceberg supports four Spark read options: diff --git a/docs/docs/spark-writes.md b/docs/docs/spark-writes.md index 626dee6c96e6..96fcc5f7ce32 100644 --- a/docs/docs/spark-writes.md +++ b/docs/docs/spark-writes.md @@ -201,6 +201,10 @@ Note WAP branch and branch identifier cannot both be specified. Also, the branch must exist before performing the write. The operation does **not** create the branch if it does not exist. For more information on branches please refer to [branches](branching.md). + +!!! info + Note: When writing to a branch, the current schema of the table will be used for validation. + ```sql -- INSERT (1,' a') (2, 'b') into the audit branch. diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java index 80d7d8787e27..4c99a38d29fc 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java @@ -23,6 +23,7 @@ import static org.assertj.core.api.Assumptions.assumeThat; import java.text.SimpleDateFormat; +import java.util.Arrays; import java.util.Date; import java.util.List; import java.util.concurrent.TimeUnit; @@ -348,6 +349,64 @@ public void testBranchReference() { assertEquals("Snapshot at specific branch reference name", expected, fromDF); } + @TestTemplate + public void readAndWriteWithBranchAfterSchemaChange() { + Table table = validationCatalog.loadTable(tableIdent); + String branchName = "test_branch"; + table.manageSnapshots().createBranch(branchName, table.currentSnapshot().snapshotId()).commit(); + + List expected = + Arrays.asList(row(1L, "a", 1.0f), row(2L, "b", 2.0f), row(3L, "c", Float.NaN)); + assertThat(sql("SELECT * FROM %s", tableName)).containsExactlyElementsOf(expected); + + // change schema on the table and add more data + sql("ALTER TABLE %s DROP COLUMN float", tableName); + sql("ALTER TABLE %s ADD COLUMN new_col date", tableName); + sql( + "INSERT INTO %s VALUES (4, 'd', date('2024-04-04')), (5, 'e', date('2024-05-05'))", + tableName); + + // time-travel query using snapshot id should return the snapshot's schema + long branchSnapshotId = table.refs().get(branchName).snapshotId(); + assertThat(sql("SELECT * FROM %s VERSION AS OF %s", tableName, branchSnapshotId)) + .containsExactlyElementsOf(expected); + + // querying the head of the branch should return the table's schema + assertThat(sql("SELECT * FROM %s VERSION AS OF '%s'", tableName, branchName)) + .containsExactly(row(1L, "a", null), row(2L, "b", null), row(3L, "c", null)); + + if (!"spark_catalog".equals(catalogName)) { + // querying the head of the branch using 'branch_' should return the table's schema + assertThat(sql("SELECT * FROM %s.branch_%s", tableName, branchName)) + .containsExactly(row(1L, "a", null), row(2L, "b", null), row(3L, "c", null)); + } + + // writing to a branch uses the table's schema + sql( + "INSERT INTO %s.branch_%s VALUES (6L, 'f', cast('2023-06-06' as date)), (7L, 'g', cast('2023-07-07' as date))", + tableName, branchName); + + // querying the head of the branch returns the table's schema + assertThat(sql("SELECT * FROM %s VERSION AS OF '%s'", tableName, branchName)) + .containsExactlyInAnyOrder( + row(1L, "a", null), + row(2L, "b", null), + row(3L, "c", null), + row(6L, "f", java.sql.Date.valueOf("2023-06-06")), + row(7L, "g", java.sql.Date.valueOf("2023-07-07"))); + + // using DataFrameReader with the 'branch' option should return the table's schema + Dataset df = + spark.read().format("iceberg").option(SparkReadOptions.BRANCH, branchName).load(tableName); + assertThat(rowsToJava(df.collectAsList())) + .containsExactlyInAnyOrder( + row(1L, "a", null), + row(2L, "b", null), + row(3L, "c", null), + row(6L, "f", java.sql.Date.valueOf("2023-06-06")), + row(7L, "g", java.sql.Date.valueOf("2023-07-07"))); + } + @TestTemplate public void testUnknownReferenceAsOf() { assertThatThrownBy(() -> sql("SELECT * FROM %s VERSION AS OF 'test_unknown'", tableName))