Skip to content

Commit

Permalink
UpdateSchema api for addition and updates (apache#4)
Browse files Browse the repository at this point in the history
  • Loading branch information
rdsr authored and shardulm94 committed Feb 25, 2020
1 parent dd946f7 commit dff35dd
Show file tree
Hide file tree
Showing 5 changed files with 223 additions and 0 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ gradle/wrapper/gradle-wrapper.jar
# web site build
site/site

# vscode
.project

__pycache__/
*.py[cod]
.eggs/
Expand Down
8 changes: 8 additions & 0 deletions api/src/main/java/org/apache/iceberg/UpdateSchema.java
Original file line number Diff line number Diff line change
Expand Up @@ -325,4 +325,12 @@ default UpdateSchema updateColumn(String name, Type.PrimitiveType newType, Strin
*/
UpdateSchema deleteColumn(String name);

/**
* Applies all the additions and updates [type widening, field documentation]
* from the input schema
*
* @param newSchema - Input schema from which updates are applied
* @return this for method chaining
*/
UpdateSchema updateSchema(Schema newSchema);
}
7 changes: 7 additions & 0 deletions api/src/main/java/org/apache/iceberg/types/TypeUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ public interface NextID {
}

public static class SchemaVisitor<T> {
private final Deque<String> fieldNames = Lists.newLinkedList();
private final Deque<Integer> fieldIds = Lists.newLinkedList();

public T schema(Schema schema, T structResult) {
Expand All @@ -221,6 +222,10 @@ public T primitive(Type.PrimitiveType primitive) {
return null;
}

protected Deque<String> fieldNames() {
return fieldNames;
}

protected Deque<Integer> fieldIds() {
return fieldIds;
}
Expand All @@ -237,11 +242,13 @@ public static <T> T visit(Type type, SchemaVisitor<T> visitor) {
List<T> results = Lists.newArrayListWithExpectedSize(struct.fields().size());
for (Types.NestedField field : struct.fields()) {
visitor.fieldIds.push(field.fieldId());
visitor.fieldNames.push(field.name());
T result;
try {
result = visit(field.type(), visitor);
} finally {
visitor.fieldIds.pop();
visitor.fieldNames.pop();
}
results.add(visitor.field(field, result));
}
Expand Down
60 changes: 60 additions & 0 deletions core/src/main/java/org/apache/iceberg/SchemaUpdate.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.iceberg;

import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
Expand Down Expand Up @@ -260,6 +261,65 @@ public UpdateSchema updateColumnDoc(String name, String doc) {
return this;
}

@Override
public UpdateSchema updateSchema(Schema newSchema) {
TypeUtil.visit(newSchema, new ApplyUpdates(schema, this));
return this;
}

private static final Joiner DOT = Joiner.on(".");

private static class ApplyUpdates extends TypeUtil.SchemaVisitor<Void> {
private final Schema schema;
private final UpdateSchema updateSchema;
private final Map<String, Integer> indexByName;

private ApplyUpdates(Schema schema, UpdateSchema updateSchema) {
this.schema = schema;
this.updateSchema = updateSchema;
this.indexByName = TypeUtil.indexByName(schema.asStruct());
}

@Override
public Void struct(Types.StructType struct, List<Void> fieldResults) {
for (Types.NestedField newField : struct.fields()) {
Types.NestedField field = schema.findField(newField.fieldId());
String parents = DOT.join(fieldNames());
if (field == null) {
String name = join(parents, newField.name());
// found new field
if (parents.isEmpty()) {
// top level field
updateSchema.addColumn(null, name, newField.type());
} else if (indexByName.containsKey(parents)) {
// parent struct present
updateSchema.addColumn(parents, newField.name(), newField.type());
}
// else parent struct not present, so we will
// backtrack until we find a parent or reach top level
} else {
// updates
String name = join(parents, field.name());
if (field.type().isPrimitiveType() && !field.type().equals(newField.type())) {
Preconditions.checkState(newField.type().isPrimitiveType(), "%s is not a primitive type", field);
updateSchema.updateColumn(name, newField.type().asPrimitiveType());
}
if (newField.doc() != null && !newField.doc().equals(field.doc())) {
updateSchema.updateColumnDoc(name, newField.doc());
}
}
}
return null;
}
}

private static String join(String parent, String name) {
if (parent.isEmpty()) {
return name;
}
return DOT.join(parent, name);
}

/**
* Apply the pending changes to the original schema and returns the result.
* <p>
Expand Down
145 changes: 145 additions & 0 deletions core/src/test/java/org/apache/iceberg/TestSchemaUpdates.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg;

import org.apache.iceberg.types.Types;
import org.apache.iceberg.types.Types.NestedField;
import org.apache.iceberg.types.Types.StructType;
import org.junit.Assert;
import org.junit.Test;

public class TestSchemaUpdates {

@Test
public void testUpdates() {
Schema schema = new Schema(
NestedField.required(1, "id", Types.IntegerType.get()),
NestedField.optional(2, "partition", Types.DoubleType.get()));

Schema newSchema = new Schema(
// type promotion with documentation
NestedField.required(1, "id", Types.LongType.get(), "type promoted"),
NestedField.optional(2, "partition", Types.DoubleType.get()),
// struct is added, as an optional field
NestedField.optional(3, "location", StructType.of(
NestedField.required(4, "lat", Types.FloatType.get()))),
// top level field added as optional
NestedField.optional(5, "tz", Types.TimestampType.withoutZone()));

Schema applied = new SchemaUpdate(schema, 2).updateSchema(newSchema).apply();
Assert.assertEquals(newSchema.asStruct(), applied.asStruct());

// nested adds
newSchema = new Schema(
NestedField.required(1, "id", Types.IntegerType.get()),
NestedField.optional(2, "partition", Types.DoubleType.get()),
// struct is added, as an optional field
NestedField.optional(3, "a", StructType.of(
NestedField.required(4, "a_b", StructType.of(
NestedField.required(5, "a_b_c", StructType.of(
NestedField.required(6, "a_b_c_d", Types.IntegerType.get()))))))));

applied = new SchemaUpdate(schema, 2).updateSchema(newSchema).apply();
Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
}

@Test
public void testMapUpdates() {
Schema schema = new Schema(
NestedField.required(1, "id", Types.LongType.get()));

Schema newSchema = new Schema(
// map addition
NestedField.required(1, "id", Types.LongType.get()),
NestedField.optional(2, "location", Types.MapType.ofRequired(
3, 4, Types.LongType.get(), StructType.of(
NestedField.required(5, "lat", Types.FloatType.get())))));

Schema applied = new SchemaUpdate(schema, 1).updateSchema(newSchema).apply();

Assert.assertEquals(newSchema.asStruct(), applied.asStruct());

// complex maps
schema = new Schema(
NestedField.required(1, "locations", Types.MapType.ofOptional(2, 3,
StructType.of(
NestedField.required(4, "k1", Types.IntegerType.get())
),
StructType.of(
NestedField.required(5, "lat", Types.FloatType.get()),
NestedField.required(6, "long", Types.FloatType.get())
)
)));

newSchema = new Schema(
NestedField.required(1, "locations", Types.MapType.ofOptional(2, 3,
StructType.of(
NestedField.required(4, "k1", Types.IntegerType.get())
),
StructType.of(
NestedField.required(5, "lat", Types.FloatType.get()),
NestedField.required(6, "long", Types.DoubleType.get())
)
)));


applied = new SchemaUpdate(schema, 6).updateSchema(newSchema).apply();
Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
}

@Test
public void testListUpdates() {
Schema schema = new Schema(
NestedField.required(1, "id", Types.LongType.get()));

Schema newSchema = new Schema(
// optional list field added
NestedField.required(1, "id", Types.LongType.get()),
NestedField.optional(2, "list", Types.ListType.ofRequired(
3,
StructType.of(
NestedField.required(4, "lat", Types.FloatType.get()),
NestedField.required(5, "long", Types.FloatType.get())))));

Schema applied = new SchemaUpdate(schema, 1).updateSchema(newSchema).apply();
Assert.assertEquals(newSchema.asStruct(), applied.asStruct());


schema = new Schema(
NestedField.required(1, "id", Types.LongType.get()),
NestedField.optional(2, "list", Types.ListType.ofRequired(
3,
StructType.of(
NestedField.required(4, "lat", Types.FloatType.get()),
NestedField.required(5, "long", Types.FloatType.get())))));

newSchema = new Schema(
NestedField.required(1, "id", Types.LongType.get()),
NestedField.optional(2, "list", Types.ListType.ofRequired(
3,
StructType.of(
NestedField.required(4, "lat", Types.DoubleType.get()),
NestedField.required(5, "long", Types.DoubleType.get())))));

applied = new SchemaUpdate(schema, 5).updateSchema(newSchema).apply();
Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
}
}

0 comments on commit dff35dd

Please sign in to comment.