Skip to content

Commit

Permalink
Schema Evolution: UpdateSchema api for addition and updates (apache#4)
Browse files Browse the repository at this point in the history
  • Loading branch information
rdsr authored and shardulm94 committed Nov 18, 2020
1 parent d59af23 commit 4d9c5c6
Show file tree
Hide file tree
Showing 4 changed files with 252 additions and 1 deletion.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ gradle/wrapper/gradle-wrapper.jar
# web site build
site/site

# vscode
.project

__pycache__/
*.py[cod]
.eggs/
Expand Down
10 changes: 9 additions & 1 deletion api/src/main/java/org/apache/iceberg/UpdateSchema.java
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,6 @@ default UpdateSchema updateColumn(String name, Type.PrimitiveType newType, Strin
*/
UpdateSchema moveAfter(String name, String afterName);


/**
* Applies all field additions and updates from the provided new schema to the existing schema so
* to create a union schema.
Expand All @@ -384,4 +383,13 @@ default UpdateSchema updateColumn(String name, Type.PrimitiveType newType, Strin
* with other additions, renames, or updates.
*/
UpdateSchema unionByNameWith(Schema newSchema);

/**
* Applies all the additions and updates [type widening, field documentation]
* from the input schema
*
* @param newSchema - Input schema from which updates are applied
* @return this for method chaining
*/
UpdateSchema updateSchema(Schema newSchema);
}
95 changes: 95 additions & 0 deletions core/src/main/java/org/apache/iceberg/SchemaUpdate.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@
package org.apache.iceberg;

import java.util.Collection;
import java.util.Deque;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.apache.iceberg.mapping.MappingUtil;
import org.apache.iceberg.mapping.NameMapping;
import org.apache.iceberg.mapping.NameMappingParser;
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
Expand Down Expand Up @@ -348,6 +350,99 @@ private void internalMove(String name, Move move) {
}
}

public UpdateSchema updateSchema(Schema newSchema) {
TypeUtil.visit(newSchema, new ApplyUpdates(schema, this));
return this;
}

private static final Joiner DOT = Joiner.on(".");

private static class ApplyUpdates extends TypeUtil.SchemaVisitor<Void> {
private final Schema schema;
private final UpdateSchema updateSchema;
private final Map<String, Integer> indexByName;
private final Deque<String> fieldNames = Lists.newLinkedList();

private ApplyUpdates(Schema schema, UpdateSchema updateSchema) {
this.schema = schema;
this.updateSchema = updateSchema;
this.indexByName = TypeUtil.indexByName(schema.asStruct());
}

@Override
public void beforeField(Types.NestedField field) {
fieldNames.push(field.name());
}

@Override
public void afterField(Types.NestedField field) {
fieldNames.pop();
}

@Override
public void beforeListElement(Types.NestedField elementField) {
}

@Override
public void afterListElement(Types.NestedField elementField) {
}

@Override
public void beforeMapKey(Types.NestedField keyField) {
}

@Override
public void afterMapKey(Types.NestedField keyField) {
}

@Override
public void beforeMapValue(Types.NestedField valueField) {
}

@Override
public void afterMapValue(Types.NestedField valueField) {
}

@Override
public Void struct(Types.StructType struct, List<Void> fieldResults) {
for (Types.NestedField newField : struct.fields()) {
Types.NestedField field = schema.findField(newField.fieldId());
String parents = DOT.join(fieldNames);
if (field == null) {
String name = join(parents, newField.name());
// found new field
if (parents.isEmpty()) {
// top level field
updateSchema.addColumn(null, name, newField.type());
} else if (indexByName.containsKey(parents)) {
// parent struct present
updateSchema.addColumn(parents, newField.name(), newField.type());
}
// else parent struct not present, so we will
// backtrack until we find a parent or reach top level
} else {
// updates
String name = join(parents, field.name());
if (field.type().isPrimitiveType() && !field.type().equals(newField.type())) {
Preconditions.checkState(newField.type().isPrimitiveType(), "%s is not a primitive type", field);
updateSchema.updateColumn(name, newField.type().asPrimitiveType());
}
if (newField.doc() != null && !newField.doc().equals(field.doc())) {
updateSchema.updateColumnDoc(name, newField.doc());
}
}
}
return null;
}
}

private static String join(String parent, String name) {
if (parent.isEmpty()) {
return name;
}
return DOT.join(parent, name);
}

/**
* Apply the pending changes to the original schema and returns the result.
* <p>
Expand Down
145 changes: 145 additions & 0 deletions core/src/test/java/org/apache/iceberg/TestSchemaUpdates.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg;

import org.apache.iceberg.types.Types;
import org.apache.iceberg.types.Types.NestedField;
import org.apache.iceberg.types.Types.StructType;
import org.junit.Assert;
import org.junit.Test;

public class TestSchemaUpdates {

@Test
public void testUpdates() {
Schema schema = new Schema(
NestedField.required(1, "id", Types.IntegerType.get()),
NestedField.optional(2, "partition", Types.DoubleType.get()));

Schema newSchema = new Schema(
// type promotion with documentation
NestedField.required(1, "id", Types.LongType.get(), "type promoted"),
NestedField.optional(2, "partition", Types.DoubleType.get()),
// struct is added, as an optional field
NestedField.optional(3, "location", StructType.of(
NestedField.required(4, "lat", Types.FloatType.get()))),
// top level field added as optional
NestedField.optional(5, "tz", Types.TimestampType.withoutZone()));

Schema applied = new SchemaUpdate(schema, 2).updateSchema(newSchema).apply();
Assert.assertEquals(newSchema.asStruct(), applied.asStruct());

// nested adds
newSchema = new Schema(
NestedField.required(1, "id", Types.IntegerType.get()),
NestedField.optional(2, "partition", Types.DoubleType.get()),
// struct is added, as an optional field
NestedField.optional(3, "a", StructType.of(
NestedField.required(4, "a_b", StructType.of(
NestedField.required(5, "a_b_c", StructType.of(
NestedField.required(6, "a_b_c_d", Types.IntegerType.get()))))))));

applied = new SchemaUpdate(schema, 2).updateSchema(newSchema).apply();
Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
}

@Test
public void testMapUpdates() {
Schema schema = new Schema(
NestedField.required(1, "id", Types.LongType.get()));

Schema newSchema = new Schema(
// map addition
NestedField.required(1, "id", Types.LongType.get()),
NestedField.optional(2, "location", Types.MapType.ofRequired(
3, 4, Types.LongType.get(), StructType.of(
NestedField.required(5, "lat", Types.FloatType.get())))));

Schema applied = new SchemaUpdate(schema, 1).updateSchema(newSchema).apply();

Assert.assertEquals(newSchema.asStruct(), applied.asStruct());

// complex maps
schema = new Schema(
NestedField.required(1, "locations", Types.MapType.ofOptional(2, 3,
StructType.of(
NestedField.required(4, "k1", Types.IntegerType.get())
),
StructType.of(
NestedField.required(5, "lat", Types.FloatType.get()),
NestedField.required(6, "long", Types.FloatType.get())
)
)));

newSchema = new Schema(
NestedField.required(1, "locations", Types.MapType.ofOptional(2, 3,
StructType.of(
NestedField.required(4, "k1", Types.IntegerType.get())
),
StructType.of(
NestedField.required(5, "lat", Types.FloatType.get()),
NestedField.required(6, "long", Types.DoubleType.get())
)
)));


applied = new SchemaUpdate(schema, 6).updateSchema(newSchema).apply();
Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
}

@Test
public void testListUpdates() {
Schema schema = new Schema(
NestedField.required(1, "id", Types.LongType.get()));

Schema newSchema = new Schema(
// optional list field added
NestedField.required(1, "id", Types.LongType.get()),
NestedField.optional(2, "list", Types.ListType.ofRequired(
3,
StructType.of(
NestedField.required(4, "lat", Types.FloatType.get()),
NestedField.required(5, "long", Types.FloatType.get())))));

Schema applied = new SchemaUpdate(schema, 1).updateSchema(newSchema).apply();
Assert.assertEquals(newSchema.asStruct(), applied.asStruct());


schema = new Schema(
NestedField.required(1, "id", Types.LongType.get()),
NestedField.optional(2, "list", Types.ListType.ofRequired(
3,
StructType.of(
NestedField.required(4, "lat", Types.FloatType.get()),
NestedField.required(5, "long", Types.FloatType.get())))));

newSchema = new Schema(
NestedField.required(1, "id", Types.LongType.get()),
NestedField.optional(2, "list", Types.ListType.ofRequired(
3,
StructType.of(
NestedField.required(4, "lat", Types.DoubleType.get()),
NestedField.required(5, "long", Types.DoubleType.get())))));

applied = new SchemaUpdate(schema, 5).updateSchema(newSchema).apply();
Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
}
}

0 comments on commit 4d9c5c6

Please sign in to comment.