Skip to content

Commit

Permalink
Core: Add ApplyNameMapping for Avro (#9347)
Browse files Browse the repository at this point in the history
Co-authored-by: Fokko Driesprong <[email protected]>
  • Loading branch information
rdblue and Fokko authored Dec 22, 2023
1 parent 58d3ad3 commit 4f6c5fc
Show file tree
Hide file tree
Showing 5 changed files with 254 additions and 4 deletions.
4 changes: 4 additions & 0 deletions .palantir/revapi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -874,6 +874,10 @@ acceptedBreaks:
justification: "Static utility class - should not have public constructor"
"1.4.0":
org.apache.iceberg:iceberg-core:
- code: "java.class.defaultSerializationChanged"
old: "class org.apache.iceberg.mapping.NameMapping"
new: "class org.apache.iceberg.mapping.NameMapping"
justification: "Serialization across versions is not guaranteed"
- code: "java.class.removed"
old: "class org.apache.iceberg.actions.BinPackStrategy"
justification: "Removing deprecated code"
Expand Down
190 changes: 190 additions & 0 deletions core/src/main/java/org/apache/iceberg/avro/ApplyNameMapping.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.avro;

import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.avro.LogicalType;
import org.apache.avro.Schema;
import org.apache.iceberg.mapping.MappedField;
import org.apache.iceberg.mapping.NameMapping;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;

/**
* An Avro Schema visitor to apply a name mapping to add Iceberg field IDs.
*
* <p>Methods return null when a schema has no ID and cannot be projected.
*/
public class ApplyNameMapping extends AvroSchemaVisitor<Schema> {
private final NameMapping nameMapping;

public ApplyNameMapping(NameMapping nameMapping) {
this.nameMapping = nameMapping;
}

@Override
public Schema record(Schema record, List<String> names, List<Schema> fields) {
List<Schema.Field> originalFields = record.getFields();

List<Schema.Field> newFields = Lists.newArrayList();
boolean hasChange = false;
for (int i = 0; i < originalFields.size(); i += 1) {
Schema newSchema = fields.get(i);
Schema.Field field = originalFields.get(i);
Integer fieldId = AvroSchemaUtil.getFieldId(field, nameMapping, fieldNames());
// always copy because fields can't be reused
if (newSchema != null && fieldId != null) {
newFields.add(copyField(field, newSchema, fieldId));
hasChange = true;
} else {
newFields.add(copyField(field, field.schema(), null));
}
}

if (!hasChange) {
return record;
}

return copyRecord(record, newFields);
}

@Override
public Schema union(Schema union, List<Schema> options) {
if (options.equals(union.getTypes())) {
return union;
}

List<Schema> validOptions =
options.stream().filter(Objects::nonNull).collect(Collectors.toList());

return copyProps(union, Schema.createUnion(validOptions));
}

@Override
public Schema array(Schema array, Schema element) {
if (array.getLogicalType() instanceof LogicalMap
|| (isKeyValueMapping(fieldNames())
&& AvroSchemaUtil.isKeyValueSchema(array.getElementType()))) {
return copyProps(array, Schema.createArray(element));
}

Integer elementId = AvroSchemaUtil.elementId(array);
if (elementId != null) {
if (array.getElementType().equals(element)) {
return array;
}

return copyProps(array, createArray(element, elementId));
}

MappedField mapping = nameMapping.find(fieldNames(), "element");
if (mapping != null) {
return copyProps(array, createArray(element, mapping.id()));
}

return array;
}

private boolean isKeyValueMapping(Iterable<String> names) {
return nameMapping.find(names, "key") != null && nameMapping.find(names, "value") != null;
}

@Override
public Schema map(Schema map, Schema value) {
Integer keyId = AvroSchemaUtil.keyId(map);
Integer valueId = AvroSchemaUtil.valueId(map);
if (keyId != null && valueId != null) {
if (map.getValueType().equals(value)) {
return map;
}

return copyProps(map, createMap(value, keyId, valueId));
}

MappedField keyMapping = nameMapping.find(fieldNames(), "key");
MappedField valueMapping = nameMapping.find(fieldNames(), "value");
if (keyMapping != null && valueMapping != null) {
return copyProps(map, createMap(value, keyMapping.id(), valueMapping.id()));
}

return map;
}

@Override
public Schema primitive(Schema primitive) {
return primitive;
}

private Schema copyRecord(Schema record, List<Schema.Field> newFields) {
Schema copy =
Schema.createRecord(
record.getName(), record.getDoc(), record.getNamespace(), record.isError(), newFields);

copyProps(record, copy);

return copy;
}

static Schema.Field copyField(Schema.Field field, Schema newSchema, Integer fieldId) {
Schema.Field copy =
new Schema.Field(field.name(), newSchema, field.doc(), field.defaultVal(), field.order());

for (Map.Entry<String, Object> prop : field.getObjectProps().entrySet()) {
copy.addProp(prop.getKey(), prop.getValue());
}

if (fieldId != null) {
copy.addProp(AvroSchemaUtil.FIELD_ID_PROP, fieldId);
}

for (String alias : field.aliases()) {
copy.addAlias(alias);
}

return copy;
}

private Schema createMap(Schema value, int keyId, int valueId) {
Schema result = Schema.createMap(value);
result.addProp(AvroSchemaUtil.KEY_ID_PROP, keyId);
result.addProp(AvroSchemaUtil.VALUE_ID_PROP, valueId);
return result;
}

private Schema createArray(Schema element, int elementId) {
Schema result = Schema.createArray(element);
result.addProp(AvroSchemaUtil.ELEMENT_ID_PROP, elementId);
return result;
}

private Schema copyProps(Schema from, Schema copy) {
for (Map.Entry<String, Object> prop : from.getObjectProps().entrySet()) {
copy.addProp(prop.getKey(), prop.getValue());
}

LogicalType logicalType = from.getLogicalType();
if (logicalType != null) {
logicalType.addToSchema(copy);
}

return copy;
}
}
52 changes: 49 additions & 3 deletions core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,15 @@ public static Map<Type, Schema> convertTypes(Types.StructType type, String name)
return ImmutableMap.copyOf(converter.getConversionMap());
}

public static Schema pruneColumns(Schema schema, Set<Integer> selectedIds) {
return new PruneColumns(selectedIds, null).rootSchema(schema);
}

/**
* @deprecated will be removed in 2.0.0; use applyNameMapping and pruneColumns(Schema, Set)
* instead.
*/
@Deprecated
public static Schema pruneColumns(
Schema schema, Set<Integer> selectedIds, NameMapping nameMapping) {
return new PruneColumns(selectedIds, nameMapping).rootSchema(schema);
Expand All @@ -126,6 +135,14 @@ public static Schema buildAvroProjection(
return AvroCustomOrderSchemaVisitor.visit(schema, new BuildAvroProjection(expected, renames));
}

public static Schema applyNameMapping(Schema fileSchema, NameMapping nameMapping) {
if (nameMapping != null) {
return AvroSchemaVisitor.visit(fileSchema, new ApplyNameMapping(nameMapping));
}

return fileSchema;
}

public static boolean isTimestamptz(Schema schema) {
LogicalType logicalType = schema.getLogicalType();
if (logicalType instanceof LogicalTypes.TimestampMillis
Expand Down Expand Up @@ -290,6 +307,15 @@ public static int getKeyId(Schema schema) {
return getId(schema, KEY_ID_PROP);
}

static Integer keyId(Schema mapSchema) {
Object idObj = mapSchema.getObjectProp(KEY_ID_PROP);
if (idObj != null) {
return toInt(idObj);
}

return null;
}

static Integer getKeyId(
Schema schema, NameMapping nameMapping, Iterable<String> parentFieldNames) {
Preconditions.checkArgument(
Expand All @@ -305,6 +331,15 @@ public static int getValueId(Schema schema) {
return getId(schema, VALUE_ID_PROP);
}

static Integer valueId(Schema mapSchema) {
Object idObj = mapSchema.getObjectProp(VALUE_ID_PROP);
if (idObj != null) {
return toInt(idObj);
}

return null;
}

static Integer getValueId(
Schema schema, NameMapping nameMapping, Iterable<String> parentFieldNames) {
Preconditions.checkArgument(
Expand All @@ -320,6 +355,15 @@ public static int getElementId(Schema schema) {
return getId(schema, ELEMENT_ID_PROP);
}

static Integer elementId(Schema arraySchema) {
Object idObj = arraySchema.getObjectProp(ELEMENT_ID_PROP);
if (idObj != null) {
return toInt(idObj);
}

return null;
}

static Integer getElementId(
Schema schema, NameMapping nameMapping, Iterable<String> parentFieldNames) {
Preconditions.checkArgument(
Expand All @@ -335,15 +379,17 @@ public static int getFieldId(Schema.Field field) {
return id;
}

static Integer fieldId(Schema.Field field) {
return getFieldId(field, null, null);
}

static Integer getFieldId(
Schema.Field field, NameMapping nameMapping, Iterable<String> parentFieldNames) {
Object id = field.getObjectProp(FIELD_ID_PROP);
if (id != null) {
return toInt(id);
} else if (nameMapping != null) {
List<String> names = Lists.newArrayList(parentFieldNames);
names.add(field.name());
MappedField mappedField = nameMapping.find(names);
MappedField mappedField = nameMapping.find(parentFieldNames, field.name());
if (mappedField != null) {
return mappedField.id();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ public void setSchema(Schema newFileSchema) {
nameMapping = MappingUtil.create(expectedSchema);
}
Set<Integer> projectedIds = TypeUtil.getProjectedIds(expectedSchema);
Schema prunedSchema = AvroSchemaUtil.pruneColumns(newFileSchema, projectedIds, nameMapping);
Schema schemaWithIds = AvroSchemaUtil.applyNameMapping(newFileSchema, nameMapping);
Schema prunedSchema = AvroSchemaUtil.pruneColumns(schemaWithIds, projectedIds);
this.readSchema = AvroSchemaUtil.buildAvroProjection(prunedSchema, expectedSchema, renames);
this.wrapped = newDatumReader();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Map;
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;

/** Represents a mapping from external schema names to Iceberg type IDs. */
public class NameMapping implements Serializable {
Expand Down Expand Up @@ -62,6 +63,14 @@ public MappedField find(List<String> names) {
return lazyFieldsByName().get(DOT.join(names));
}

public MappedField find(Iterable<String> names) {
return lazyFieldsByName().get(DOT.join(names));
}

public MappedField find(Iterable<String> names, String name) {
return lazyFieldsByName().get(DOT.join(Iterables.concat(names, ImmutableList.of(name))));
}

public MappedFields asMappedFields() {
return mapping;
}
Expand Down

0 comments on commit 4f6c5fc

Please sign in to comment.