From 4d9c5c628e879c766f7dddd297200a3aba7eba74 Mon Sep 17 00:00:00 2001 From: Ratandeep Ratti Date: Thu, 17 Oct 2019 16:31:58 -0700 Subject: [PATCH] Schema Evolution: UpdateSchema api for addition and updates (#4) --- .gitignore | 3 + .../java/org/apache/iceberg/UpdateSchema.java | 10 +- .../java/org/apache/iceberg/SchemaUpdate.java | 95 ++++++++++++ .../org/apache/iceberg/TestSchemaUpdates.java | 145 ++++++++++++++++++ 4 files changed, 252 insertions(+), 1 deletion(-) create mode 100644 core/src/test/java/org/apache/iceberg/TestSchemaUpdates.java diff --git a/.gitignore b/.gitignore index b9bfdca32886..027f0a6a0e42 100644 --- a/.gitignore +++ b/.gitignore @@ -22,6 +22,9 @@ gradle/wrapper/gradle-wrapper.jar # web site build site/site +# vscode +.project + __pycache__/ *.py[cod] .eggs/ diff --git a/api/src/main/java/org/apache/iceberg/UpdateSchema.java b/api/src/main/java/org/apache/iceberg/UpdateSchema.java index b3727581bd53..a774b8dc1c59 100644 --- a/api/src/main/java/org/apache/iceberg/UpdateSchema.java +++ b/api/src/main/java/org/apache/iceberg/UpdateSchema.java @@ -362,7 +362,6 @@ default UpdateSchema updateColumn(String name, Type.PrimitiveType newType, Strin */ UpdateSchema moveAfter(String name, String afterName); - /** * Applies all field additions and updates from the provided new schema to the existing schema so * to create a union schema. @@ -384,4 +383,13 @@ default UpdateSchema updateColumn(String name, Type.PrimitiveType newType, Strin * with other additions, renames, or updates. */ UpdateSchema unionByNameWith(Schema newSchema); + + /** + * Applies all the additions and updates [type widening, field documentation] + * from the input schema + * + * @param newSchema - Input schema from which updates are applied + * @return this for method chaining + */ + UpdateSchema updateSchema(Schema newSchema); } diff --git a/core/src/main/java/org/apache/iceberg/SchemaUpdate.java b/core/src/main/java/org/apache/iceberg/SchemaUpdate.java index 66a402bdc526..e163e550b9b7 100644 --- a/core/src/main/java/org/apache/iceberg/SchemaUpdate.java +++ b/core/src/main/java/org/apache/iceberg/SchemaUpdate.java @@ -20,6 +20,7 @@ package org.apache.iceberg; import java.util.Collection; +import java.util.Deque; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -27,6 +28,7 @@ import org.apache.iceberg.mapping.MappingUtil; import org.apache.iceberg.mapping.NameMapping; import org.apache.iceberg.mapping.NameMappingParser; +import org.apache.iceberg.relocated.com.google.common.base.Joiner; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -348,6 +350,99 @@ private void internalMove(String name, Move move) { } } + public UpdateSchema updateSchema(Schema newSchema) { + TypeUtil.visit(newSchema, new ApplyUpdates(schema, this)); + return this; + } + + private static final Joiner DOT = Joiner.on("."); + + private static class ApplyUpdates extends TypeUtil.SchemaVisitor { + private final Schema schema; + private final UpdateSchema updateSchema; + private final Map indexByName; + private final Deque fieldNames = Lists.newLinkedList(); + + private ApplyUpdates(Schema schema, UpdateSchema updateSchema) { + this.schema = schema; + this.updateSchema = updateSchema; + this.indexByName = TypeUtil.indexByName(schema.asStruct()); + } + + @Override + public void beforeField(Types.NestedField field) { + fieldNames.push(field.name()); + } + + @Override + public void afterField(Types.NestedField field) { + fieldNames.pop(); + } + + @Override + public void beforeListElement(Types.NestedField elementField) { + } + + @Override + public void afterListElement(Types.NestedField elementField) { + } + + @Override + public void beforeMapKey(Types.NestedField keyField) { + } + + @Override + public void afterMapKey(Types.NestedField keyField) { + } + + @Override + public void beforeMapValue(Types.NestedField valueField) { + } + + @Override + public void afterMapValue(Types.NestedField valueField) { + } + + @Override + public Void struct(Types.StructType struct, List fieldResults) { + for (Types.NestedField newField : struct.fields()) { + Types.NestedField field = schema.findField(newField.fieldId()); + String parents = DOT.join(fieldNames); + if (field == null) { + String name = join(parents, newField.name()); + // found new field + if (parents.isEmpty()) { + // top level field + updateSchema.addColumn(null, name, newField.type()); + } else if (indexByName.containsKey(parents)) { + // parent struct present + updateSchema.addColumn(parents, newField.name(), newField.type()); + } + // else parent struct not present, so we will + // backtrack until we find a parent or reach top level + } else { + // updates + String name = join(parents, field.name()); + if (field.type().isPrimitiveType() && !field.type().equals(newField.type())) { + Preconditions.checkState(newField.type().isPrimitiveType(), "%s is not a primitive type", field); + updateSchema.updateColumn(name, newField.type().asPrimitiveType()); + } + if (newField.doc() != null && !newField.doc().equals(field.doc())) { + updateSchema.updateColumnDoc(name, newField.doc()); + } + } + } + return null; + } + } + + private static String join(String parent, String name) { + if (parent.isEmpty()) { + return name; + } + return DOT.join(parent, name); + } + /** * Apply the pending changes to the original schema and returns the result. *

diff --git a/core/src/test/java/org/apache/iceberg/TestSchemaUpdates.java b/core/src/test/java/org/apache/iceberg/TestSchemaUpdates.java new file mode 100644 index 000000000000..519e61aafda9 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/TestSchemaUpdates.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import org.apache.iceberg.types.Types; +import org.apache.iceberg.types.Types.NestedField; +import org.apache.iceberg.types.Types.StructType; +import org.junit.Assert; +import org.junit.Test; + +public class TestSchemaUpdates { + + @Test + public void testUpdates() { + Schema schema = new Schema( + NestedField.required(1, "id", Types.IntegerType.get()), + NestedField.optional(2, "partition", Types.DoubleType.get())); + + Schema newSchema = new Schema( + // type promotion with documentation + NestedField.required(1, "id", Types.LongType.get(), "type promoted"), + NestedField.optional(2, "partition", Types.DoubleType.get()), + // struct is added, as an optional field + NestedField.optional(3, "location", StructType.of( + NestedField.required(4, "lat", Types.FloatType.get()))), + // top level field added as optional + NestedField.optional(5, "tz", Types.TimestampType.withoutZone())); + + Schema applied = new SchemaUpdate(schema, 2).updateSchema(newSchema).apply(); + Assert.assertEquals(newSchema.asStruct(), applied.asStruct()); + + // nested adds + newSchema = new Schema( + NestedField.required(1, "id", Types.IntegerType.get()), + NestedField.optional(2, "partition", Types.DoubleType.get()), + // struct is added, as an optional field + NestedField.optional(3, "a", StructType.of( + NestedField.required(4, "a_b", StructType.of( + NestedField.required(5, "a_b_c", StructType.of( + NestedField.required(6, "a_b_c_d", Types.IntegerType.get())))))))); + + applied = new SchemaUpdate(schema, 2).updateSchema(newSchema).apply(); + Assert.assertEquals(newSchema.asStruct(), applied.asStruct()); + } + + @Test + public void testMapUpdates() { + Schema schema = new Schema( + NestedField.required(1, "id", Types.LongType.get())); + + Schema newSchema = new Schema( + // map addition + NestedField.required(1, "id", Types.LongType.get()), + NestedField.optional(2, "location", Types.MapType.ofRequired( + 3, 4, Types.LongType.get(), StructType.of( + NestedField.required(5, "lat", Types.FloatType.get()))))); + + Schema applied = new SchemaUpdate(schema, 1).updateSchema(newSchema).apply(); + + Assert.assertEquals(newSchema.asStruct(), applied.asStruct()); + + // complex maps + schema = new Schema( + NestedField.required(1, "locations", Types.MapType.ofOptional(2, 3, + StructType.of( + NestedField.required(4, "k1", Types.IntegerType.get()) + ), + StructType.of( + NestedField.required(5, "lat", Types.FloatType.get()), + NestedField.required(6, "long", Types.FloatType.get()) + ) + ))); + + newSchema = new Schema( + NestedField.required(1, "locations", Types.MapType.ofOptional(2, 3, + StructType.of( + NestedField.required(4, "k1", Types.IntegerType.get()) + ), + StructType.of( + NestedField.required(5, "lat", Types.FloatType.get()), + NestedField.required(6, "long", Types.DoubleType.get()) + ) + ))); + + + applied = new SchemaUpdate(schema, 6).updateSchema(newSchema).apply(); + Assert.assertEquals(newSchema.asStruct(), applied.asStruct()); + } + + @Test + public void testListUpdates() { + Schema schema = new Schema( + NestedField.required(1, "id", Types.LongType.get())); + + Schema newSchema = new Schema( + // optional list field added + NestedField.required(1, "id", Types.LongType.get()), + NestedField.optional(2, "list", Types.ListType.ofRequired( + 3, + StructType.of( + NestedField.required(4, "lat", Types.FloatType.get()), + NestedField.required(5, "long", Types.FloatType.get()))))); + + Schema applied = new SchemaUpdate(schema, 1).updateSchema(newSchema).apply(); + Assert.assertEquals(newSchema.asStruct(), applied.asStruct()); + + + schema = new Schema( + NestedField.required(1, "id", Types.LongType.get()), + NestedField.optional(2, "list", Types.ListType.ofRequired( + 3, + StructType.of( + NestedField.required(4, "lat", Types.FloatType.get()), + NestedField.required(5, "long", Types.FloatType.get()))))); + + newSchema = new Schema( + NestedField.required(1, "id", Types.LongType.get()), + NestedField.optional(2, "list", Types.ListType.ofRequired( + 3, + StructType.of( + NestedField.required(4, "lat", Types.DoubleType.get()), + NestedField.required(5, "long", Types.DoubleType.get()))))); + + applied = new SchemaUpdate(schema, 5).updateSchema(newSchema).apply(); + Assert.assertEquals(newSchema.asStruct(), applied.asStruct()); + } +} +