diff --git a/.gitignore b/.gitignore index 511baa50d869..755e4669104b 100644 --- a/.gitignore +++ b/.gitignore @@ -14,6 +14,9 @@ gradle/wrapper/gradle-wrapper.jar # web site build site/site +# vscode +.project + __pycache__/ *.py[cod] .eggs/ diff --git a/api/src/main/java/org/apache/iceberg/UpdateSchema.java b/api/src/main/java/org/apache/iceberg/UpdateSchema.java index 503be926fd42..492c0d4190cf 100644 --- a/api/src/main/java/org/apache/iceberg/UpdateSchema.java +++ b/api/src/main/java/org/apache/iceberg/UpdateSchema.java @@ -325,4 +325,12 @@ default UpdateSchema updateColumn(String name, Type.PrimitiveType newType, Strin */ UpdateSchema deleteColumn(String name); + /** + * Applies all the additions and updates [type widening, field documentation] + * from the input schema + * + * @param newSchema - Input schema from which updates are applied + * @return this for method chaining + */ + UpdateSchema updateSchema(Schema newSchema); } diff --git a/api/src/main/java/org/apache/iceberg/types/TypeUtil.java b/api/src/main/java/org/apache/iceberg/types/TypeUtil.java index 58e56593b66f..2533df8b066d 100644 --- a/api/src/main/java/org/apache/iceberg/types/TypeUtil.java +++ b/api/src/main/java/org/apache/iceberg/types/TypeUtil.java @@ -195,6 +195,7 @@ public interface NextID { } public static class SchemaVisitor { + private final Deque fieldNames = Lists.newLinkedList(); private final Deque fieldIds = Lists.newLinkedList(); public T schema(Schema schema, T structResult) { @@ -221,6 +222,10 @@ public T primitive(Type.PrimitiveType primitive) { return null; } + protected Deque fieldNames() { + return fieldNames; + } + protected Deque fieldIds() { return fieldIds; } @@ -237,11 +242,13 @@ public static T visit(Type type, SchemaVisitor visitor) { List results = Lists.newArrayListWithExpectedSize(struct.fields().size()); for (Types.NestedField field : struct.fields()) { visitor.fieldIds.push(field.fieldId()); + visitor.fieldNames.push(field.name()); T result; try { result = visit(field.type(), visitor); } finally { visitor.fieldIds.pop(); + visitor.fieldNames.pop(); } results.add(visitor.field(field, result)); } diff --git a/core/src/main/java/org/apache/iceberg/SchemaUpdate.java b/core/src/main/java/org/apache/iceberg/SchemaUpdate.java index a1ef08cfd135..1dd1aa81feeb 100644 --- a/core/src/main/java/org/apache/iceberg/SchemaUpdate.java +++ b/core/src/main/java/org/apache/iceberg/SchemaUpdate.java @@ -19,6 +19,7 @@ package org.apache.iceberg; +import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -260,6 +261,65 @@ public UpdateSchema updateColumnDoc(String name, String doc) { return this; } + @Override + public UpdateSchema updateSchema(Schema newSchema) { + TypeUtil.visit(newSchema, new ApplyUpdates(schema, this)); + return this; + } + + private static final Joiner DOT = Joiner.on("."); + + private static class ApplyUpdates extends TypeUtil.SchemaVisitor { + private final Schema schema; + private final UpdateSchema updateSchema; + private final Map indexByName; + + private ApplyUpdates(Schema schema, UpdateSchema updateSchema) { + this.schema = schema; + this.updateSchema = updateSchema; + this.indexByName = TypeUtil.indexByName(schema.asStruct()); + } + + @Override + public Void struct(Types.StructType struct, List fieldResults) { + for (Types.NestedField newField : struct.fields()) { + Types.NestedField field = schema.findField(newField.fieldId()); + String parents = DOT.join(fieldNames()); + if (field == null) { + String name = join(parents, newField.name()); + // found new field + if (parents.isEmpty()) { + // top level field + updateSchema.addColumn(null, name, newField.type()); + } else if (indexByName.containsKey(parents)) { + // parent struct present + updateSchema.addColumn(parents, newField.name(), newField.type()); + } + // else parent struct not present, so we will + // backtrack until we find a parent or reach top level + } else { + // updates + String name = join(parents, field.name()); + if (field.type().isPrimitiveType() && !field.type().equals(newField.type())) { + Preconditions.checkState(newField.type().isPrimitiveType(), "%s is not a primitive type", field); + updateSchema.updateColumn(name, newField.type().asPrimitiveType()); + } + if (newField.doc() != null && !newField.doc().equals(field.doc())) { + updateSchema.updateColumnDoc(name, newField.doc()); + } + } + } + return null; + } + } + + private static String join(String parent, String name) { + if (parent.isEmpty()) { + return name; + } + return DOT.join(parent, name); + } + /** * Apply the pending changes to the original schema and returns the result. *

diff --git a/core/src/test/java/org/apache/iceberg/TestSchemaUpdates.java b/core/src/test/java/org/apache/iceberg/TestSchemaUpdates.java new file mode 100644 index 000000000000..519e61aafda9 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/TestSchemaUpdates.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import org.apache.iceberg.types.Types; +import org.apache.iceberg.types.Types.NestedField; +import org.apache.iceberg.types.Types.StructType; +import org.junit.Assert; +import org.junit.Test; + +public class TestSchemaUpdates { + + @Test + public void testUpdates() { + Schema schema = new Schema( + NestedField.required(1, "id", Types.IntegerType.get()), + NestedField.optional(2, "partition", Types.DoubleType.get())); + + Schema newSchema = new Schema( + // type promotion with documentation + NestedField.required(1, "id", Types.LongType.get(), "type promoted"), + NestedField.optional(2, "partition", Types.DoubleType.get()), + // struct is added, as an optional field + NestedField.optional(3, "location", StructType.of( + NestedField.required(4, "lat", Types.FloatType.get()))), + // top level field added as optional + NestedField.optional(5, "tz", Types.TimestampType.withoutZone())); + + Schema applied = new SchemaUpdate(schema, 2).updateSchema(newSchema).apply(); + Assert.assertEquals(newSchema.asStruct(), applied.asStruct()); + + // nested adds + newSchema = new Schema( + NestedField.required(1, "id", Types.IntegerType.get()), + NestedField.optional(2, "partition", Types.DoubleType.get()), + // struct is added, as an optional field + NestedField.optional(3, "a", StructType.of( + NestedField.required(4, "a_b", StructType.of( + NestedField.required(5, "a_b_c", StructType.of( + NestedField.required(6, "a_b_c_d", Types.IntegerType.get())))))))); + + applied = new SchemaUpdate(schema, 2).updateSchema(newSchema).apply(); + Assert.assertEquals(newSchema.asStruct(), applied.asStruct()); + } + + @Test + public void testMapUpdates() { + Schema schema = new Schema( + NestedField.required(1, "id", Types.LongType.get())); + + Schema newSchema = new Schema( + // map addition + NestedField.required(1, "id", Types.LongType.get()), + NestedField.optional(2, "location", Types.MapType.ofRequired( + 3, 4, Types.LongType.get(), StructType.of( + NestedField.required(5, "lat", Types.FloatType.get()))))); + + Schema applied = new SchemaUpdate(schema, 1).updateSchema(newSchema).apply(); + + Assert.assertEquals(newSchema.asStruct(), applied.asStruct()); + + // complex maps + schema = new Schema( + NestedField.required(1, "locations", Types.MapType.ofOptional(2, 3, + StructType.of( + NestedField.required(4, "k1", Types.IntegerType.get()) + ), + StructType.of( + NestedField.required(5, "lat", Types.FloatType.get()), + NestedField.required(6, "long", Types.FloatType.get()) + ) + ))); + + newSchema = new Schema( + NestedField.required(1, "locations", Types.MapType.ofOptional(2, 3, + StructType.of( + NestedField.required(4, "k1", Types.IntegerType.get()) + ), + StructType.of( + NestedField.required(5, "lat", Types.FloatType.get()), + NestedField.required(6, "long", Types.DoubleType.get()) + ) + ))); + + + applied = new SchemaUpdate(schema, 6).updateSchema(newSchema).apply(); + Assert.assertEquals(newSchema.asStruct(), applied.asStruct()); + } + + @Test + public void testListUpdates() { + Schema schema = new Schema( + NestedField.required(1, "id", Types.LongType.get())); + + Schema newSchema = new Schema( + // optional list field added + NestedField.required(1, "id", Types.LongType.get()), + NestedField.optional(2, "list", Types.ListType.ofRequired( + 3, + StructType.of( + NestedField.required(4, "lat", Types.FloatType.get()), + NestedField.required(5, "long", Types.FloatType.get()))))); + + Schema applied = new SchemaUpdate(schema, 1).updateSchema(newSchema).apply(); + Assert.assertEquals(newSchema.asStruct(), applied.asStruct()); + + + schema = new Schema( + NestedField.required(1, "id", Types.LongType.get()), + NestedField.optional(2, "list", Types.ListType.ofRequired( + 3, + StructType.of( + NestedField.required(4, "lat", Types.FloatType.get()), + NestedField.required(5, "long", Types.FloatType.get()))))); + + newSchema = new Schema( + NestedField.required(1, "id", Types.LongType.get()), + NestedField.optional(2, "list", Types.ListType.ofRequired( + 3, + StructType.of( + NestedField.required(4, "lat", Types.DoubleType.get()), + NestedField.required(5, "long", Types.DoubleType.get()))))); + + applied = new SchemaUpdate(schema, 5).updateSchema(newSchema).apply(); + Assert.assertEquals(newSchema.asStruct(), applied.asStruct()); + } +} +