Skip to content

Commit

Permalink
Spark: Clarify schema behavior when working with branches (apache#10055)
Browse files Browse the repository at this point in the history
  • Loading branch information
nastra authored and zachdisc committed Dec 12, 2024
1 parent 133f3da commit ae1d430
Show file tree
Hide file tree
Showing 4 changed files with 176 additions and 1 deletion.
91 changes: 91 additions & 0 deletions docs/docs/branching.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,3 +113,94 @@ Creating, querying and writing to branches and tags are supported in the Iceberg
- [Spark Branch Writes](spark-writes.md#writing-to-branches)
- [Flink Reads](flink-queries.md#reading-branches-and-tags-with-SQL)
- [Flink Branch Writes](flink-writes.md#branch-writes)


## Schema selection with branches and tags

It is important to understand that the schema tracked for a table is valid across all branches.
When working with branches, the table's schema is used as that's the schema being validated when writing data to a branch.
On the other hands, querying a tag uses the snapshot's schema, which is the schema id that snapshot pointed to when the snapshot was created.

The below examples show which schema is being used when working with branches.

Create a table and insert some data:

```sql
CREATE TABLE db.table (id bigint, data string, col float);
INSERT INTO db.table values (1, 'a', 1.0), (2, 'b', 2.0), (3, 'c', 3.0);
SELECT * FROM db.table;
1 a 1.0
2 b 2.0
3 c 3.0
```

Create a branch `test_branch` that points to the current snapshot and read data from the branch:

```sql
ALTER TABLE db.table CREATE BRANCH test_branch;

SELECT * FROM db.table.branch_test_branch;
1 a 1.0
2 b 2.0
3 c 3.0
```

Modify the table's schema by dropping the `col` column and adding a new column named `new_col`:

```sql
ALTER TABLE db.table drop column float;

ALTER TABLE db.table add column new_col date;

INSERT INTO db.table values (4, 'd', date('2024-04-04')), (5, 'e', date('2024-05-05'));

SELECT * FROM db.table;
1 a NULL
2 b NULL
3 c NULL
4 d 2024-04-04
5 e 2024-05-05
```

Querying the head of the branch using one of the below statements will return data using the **table's schema**:

```sql
SELECT * FROM db.table.branch_test_branch;
1 a NULL
2 b NULL
3 c NULL

SELECT * FROM db.table VERSION AS OF 'test_branch';
1 a NULL
2 b NULL
3 c NULL
```

Performing a time travel query using the snapshot id uses the **snapshot's schema**:

```sql

SELECT * FROM db.table.refs;
test_branch BRANCH 8109744798576441359 NULL NULL NULL
main BRANCH 6910357365743665710 NULL NULL NULL


SELECT * FROM db.table VERSION AS OF 8109744798576441359;
1 a 1.0
2 b 2.0
3 c 3.0
```

When writing to the branch, the **table's schema** is used for validation:

```sql

INSERT INTO db.table.branch_test_branch values (6, 'e', date('2024-06-06')), (7, 'g', date('2024-07-07'));

SELECT * FROM db.table.branch_test_branch;
6 e 2024-06-06
7 g 2024-07-07
1 a NULL
2 b NULL
3 c NULL
```
23 changes: 22 additions & 1 deletion docs/docs/spark-queries.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ The `VERSION AS OF` clause can contain a long snapshot ID or a string branch or
If this is not desired, rename the tag or branch with a well-defined prefix such as 'snapshot-1'.


```sql
```sql
-- time travel to October 26, 1986 at 01:21:00
SELECT * FROM prod.db.table TIMESTAMP AS OF '1986-10-26 01:21:00';

Expand Down Expand Up @@ -124,6 +124,27 @@ SELECT * FROM prod.db.table.`tag_historical-snapshot`;

Note that the identifier with branch or tag may not be used in combination with `VERSION AS OF`.


#### Schema selection in time travel queries

The different time travel queries mentioned in the previous section can use either the snapshot's schema or the table's schema:

```sql
-- time travel to October 26, 1986 at 01:21:00 -> uses the snapshot's schema
SELECT * FROM prod.db.table TIMESTAMP AS OF '1986-10-26 01:21:00';

-- time travel to snapshot with id 10963874102873L -> uses the snapshot's schema
SELECT * FROM prod.db.table VERSION AS OF 10963874102873;

-- time travel to the head of audit-branch -> uses the table's schema
SELECT * FROM prod.db.table VERSION AS OF 'audit-branch';
SELECT * FROM prod.db.table.`branch_audit-branch`;

-- time travel to the snapshot referenced by the tag historical-snapshot -> uses the snapshot's schema
SELECT * FROM prod.db.table VERSION AS OF 'historical-snapshot';
SELECT * FROM prod.db.table.`tag_historical-snapshot`;
```

#### DataFrame

To select a specific table snapshot or the snapshot at some time in the DataFrame API, Iceberg supports four Spark read options:
Expand Down
4 changes: 4 additions & 0 deletions docs/docs/spark-writes.md
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,10 @@ Note WAP branch and branch identifier cannot both be specified.
Also, the branch must exist before performing the write.
The operation does **not** create the branch if it does not exist.
For more information on branches please refer to [branches](branching.md).

!!! info
Note: When writing to a branch, the current schema of the table will be used for validation.


```sql
-- INSERT (1,' a') (2, 'b') into the audit branch.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static org.assertj.core.api.Assumptions.assumeThat;

import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -348,6 +349,64 @@ public void testBranchReference() {
assertEquals("Snapshot at specific branch reference name", expected, fromDF);
}

@TestTemplate
public void readAndWriteWithBranchAfterSchemaChange() {
Table table = validationCatalog.loadTable(tableIdent);
String branchName = "test_branch";
table.manageSnapshots().createBranch(branchName, table.currentSnapshot().snapshotId()).commit();

List<Object[]> expected =
Arrays.asList(row(1L, "a", 1.0f), row(2L, "b", 2.0f), row(3L, "c", Float.NaN));
assertThat(sql("SELECT * FROM %s", tableName)).containsExactlyElementsOf(expected);

// change schema on the table and add more data
sql("ALTER TABLE %s DROP COLUMN float", tableName);
sql("ALTER TABLE %s ADD COLUMN new_col date", tableName);
sql(
"INSERT INTO %s VALUES (4, 'd', date('2024-04-04')), (5, 'e', date('2024-05-05'))",
tableName);

// time-travel query using snapshot id should return the snapshot's schema
long branchSnapshotId = table.refs().get(branchName).snapshotId();
assertThat(sql("SELECT * FROM %s VERSION AS OF %s", tableName, branchSnapshotId))
.containsExactlyElementsOf(expected);

// querying the head of the branch should return the table's schema
assertThat(sql("SELECT * FROM %s VERSION AS OF '%s'", tableName, branchName))
.containsExactly(row(1L, "a", null), row(2L, "b", null), row(3L, "c", null));

if (!"spark_catalog".equals(catalogName)) {
// querying the head of the branch using 'branch_' should return the table's schema
assertThat(sql("SELECT * FROM %s.branch_%s", tableName, branchName))
.containsExactly(row(1L, "a", null), row(2L, "b", null), row(3L, "c", null));
}

// writing to a branch uses the table's schema
sql(
"INSERT INTO %s.branch_%s VALUES (6L, 'f', cast('2023-06-06' as date)), (7L, 'g', cast('2023-07-07' as date))",
tableName, branchName);

// querying the head of the branch returns the table's schema
assertThat(sql("SELECT * FROM %s VERSION AS OF '%s'", tableName, branchName))
.containsExactlyInAnyOrder(
row(1L, "a", null),
row(2L, "b", null),
row(3L, "c", null),
row(6L, "f", java.sql.Date.valueOf("2023-06-06")),
row(7L, "g", java.sql.Date.valueOf("2023-07-07")));

// using DataFrameReader with the 'branch' option should return the table's schema
Dataset<Row> df =
spark.read().format("iceberg").option(SparkReadOptions.BRANCH, branchName).load(tableName);
assertThat(rowsToJava(df.collectAsList()))
.containsExactlyInAnyOrder(
row(1L, "a", null),
row(2L, "b", null),
row(3L, "c", null),
row(6L, "f", java.sql.Date.valueOf("2023-06-06")),
row(7L, "g", java.sql.Date.valueOf("2023-07-07")));
}

@TestTemplate
public void testUnknownReferenceAsOf() {
assertThatThrownBy(() -> sql("SELECT * FROM %s VERSION AS OF 'test_unknown'", tableName))
Expand Down

0 comments on commit ae1d430

Please sign in to comment.