From 4f6c5fc6403de9a4843f54b58fcd371e1e206873 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Fri, 22 Dec 2023 14:02:39 -0800 Subject: [PATCH] Core: Add ApplyNameMapping for Avro (#9347) Co-authored-by: Fokko Driesprong --- .palantir/revapi.yml | 4 + .../apache/iceberg/avro/ApplyNameMapping.java | 190 ++++++++++++++++++ .../apache/iceberg/avro/AvroSchemaUtil.java | 52 ++++- .../iceberg/avro/ProjectionDatumReader.java | 3 +- .../apache/iceberg/mapping/NameMapping.java | 9 + 5 files changed, 254 insertions(+), 4 deletions(-) create mode 100644 core/src/main/java/org/apache/iceberg/avro/ApplyNameMapping.java diff --git a/.palantir/revapi.yml b/.palantir/revapi.yml index 226714261460..990c9ba31afd 100644 --- a/.palantir/revapi.yml +++ b/.palantir/revapi.yml @@ -874,6 +874,10 @@ acceptedBreaks: justification: "Static utility class - should not have public constructor" "1.4.0": org.apache.iceberg:iceberg-core: + - code: "java.class.defaultSerializationChanged" + old: "class org.apache.iceberg.mapping.NameMapping" + new: "class org.apache.iceberg.mapping.NameMapping" + justification: "Serialization across versions is not guaranteed" - code: "java.class.removed" old: "class org.apache.iceberg.actions.BinPackStrategy" justification: "Removing deprecated code" diff --git a/core/src/main/java/org/apache/iceberg/avro/ApplyNameMapping.java b/core/src/main/java/org/apache/iceberg/avro/ApplyNameMapping.java new file mode 100644 index 000000000000..ce619c47fabe --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/avro/ApplyNameMapping.java @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.avro; + +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; +import org.apache.avro.LogicalType; +import org.apache.avro.Schema; +import org.apache.iceberg.mapping.MappedField; +import org.apache.iceberg.mapping.NameMapping; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; + +/** + * An Avro Schema visitor to apply a name mapping to add Iceberg field IDs. + * + *

Methods return null when a schema has no ID and cannot be projected. + */ +public class ApplyNameMapping extends AvroSchemaVisitor { + private final NameMapping nameMapping; + + public ApplyNameMapping(NameMapping nameMapping) { + this.nameMapping = nameMapping; + } + + @Override + public Schema record(Schema record, List names, List fields) { + List originalFields = record.getFields(); + + List newFields = Lists.newArrayList(); + boolean hasChange = false; + for (int i = 0; i < originalFields.size(); i += 1) { + Schema newSchema = fields.get(i); + Schema.Field field = originalFields.get(i); + Integer fieldId = AvroSchemaUtil.getFieldId(field, nameMapping, fieldNames()); + // always copy because fields can't be reused + if (newSchema != null && fieldId != null) { + newFields.add(copyField(field, newSchema, fieldId)); + hasChange = true; + } else { + newFields.add(copyField(field, field.schema(), null)); + } + } + + if (!hasChange) { + return record; + } + + return copyRecord(record, newFields); + } + + @Override + public Schema union(Schema union, List options) { + if (options.equals(union.getTypes())) { + return union; + } + + List validOptions = + options.stream().filter(Objects::nonNull).collect(Collectors.toList()); + + return copyProps(union, Schema.createUnion(validOptions)); + } + + @Override + public Schema array(Schema array, Schema element) { + if (array.getLogicalType() instanceof LogicalMap + || (isKeyValueMapping(fieldNames()) + && AvroSchemaUtil.isKeyValueSchema(array.getElementType()))) { + return copyProps(array, Schema.createArray(element)); + } + + Integer elementId = AvroSchemaUtil.elementId(array); + if (elementId != null) { + if (array.getElementType().equals(element)) { + return array; + } + + return copyProps(array, createArray(element, elementId)); + } + + MappedField mapping = nameMapping.find(fieldNames(), "element"); + if (mapping != null) { + return copyProps(array, createArray(element, mapping.id())); + } + + return array; + } + + private boolean isKeyValueMapping(Iterable names) { + return nameMapping.find(names, "key") != null && nameMapping.find(names, "value") != null; + } + + @Override + public Schema map(Schema map, Schema value) { + Integer keyId = AvroSchemaUtil.keyId(map); + Integer valueId = AvroSchemaUtil.valueId(map); + if (keyId != null && valueId != null) { + if (map.getValueType().equals(value)) { + return map; + } + + return copyProps(map, createMap(value, keyId, valueId)); + } + + MappedField keyMapping = nameMapping.find(fieldNames(), "key"); + MappedField valueMapping = nameMapping.find(fieldNames(), "value"); + if (keyMapping != null && valueMapping != null) { + return copyProps(map, createMap(value, keyMapping.id(), valueMapping.id())); + } + + return map; + } + + @Override + public Schema primitive(Schema primitive) { + return primitive; + } + + private Schema copyRecord(Schema record, List newFields) { + Schema copy = + Schema.createRecord( + record.getName(), record.getDoc(), record.getNamespace(), record.isError(), newFields); + + copyProps(record, copy); + + return copy; + } + + static Schema.Field copyField(Schema.Field field, Schema newSchema, Integer fieldId) { + Schema.Field copy = + new Schema.Field(field.name(), newSchema, field.doc(), field.defaultVal(), field.order()); + + for (Map.Entry prop : field.getObjectProps().entrySet()) { + copy.addProp(prop.getKey(), prop.getValue()); + } + + if (fieldId != null) { + copy.addProp(AvroSchemaUtil.FIELD_ID_PROP, fieldId); + } + + for (String alias : field.aliases()) { + copy.addAlias(alias); + } + + return copy; + } + + private Schema createMap(Schema value, int keyId, int valueId) { + Schema result = Schema.createMap(value); + result.addProp(AvroSchemaUtil.KEY_ID_PROP, keyId); + result.addProp(AvroSchemaUtil.VALUE_ID_PROP, valueId); + return result; + } + + private Schema createArray(Schema element, int elementId) { + Schema result = Schema.createArray(element); + result.addProp(AvroSchemaUtil.ELEMENT_ID_PROP, elementId); + return result; + } + + private Schema copyProps(Schema from, Schema copy) { + for (Map.Entry prop : from.getObjectProps().entrySet()) { + copy.addProp(prop.getKey(), prop.getValue()); + } + + LogicalType logicalType = from.getLogicalType(); + if (logicalType != null) { + logicalType.addToSchema(copy); + } + + return copy; + } +} diff --git a/core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java b/core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java index 46c17722f8f7..2d7c5e4a88a3 100644 --- a/core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java +++ b/core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java @@ -116,6 +116,15 @@ public static Map convertTypes(Types.StructType type, String name) return ImmutableMap.copyOf(converter.getConversionMap()); } + public static Schema pruneColumns(Schema schema, Set selectedIds) { + return new PruneColumns(selectedIds, null).rootSchema(schema); + } + + /** + * @deprecated will be removed in 2.0.0; use applyNameMapping and pruneColumns(Schema, Set) + * instead. + */ + @Deprecated public static Schema pruneColumns( Schema schema, Set selectedIds, NameMapping nameMapping) { return new PruneColumns(selectedIds, nameMapping).rootSchema(schema); @@ -126,6 +135,14 @@ public static Schema buildAvroProjection( return AvroCustomOrderSchemaVisitor.visit(schema, new BuildAvroProjection(expected, renames)); } + public static Schema applyNameMapping(Schema fileSchema, NameMapping nameMapping) { + if (nameMapping != null) { + return AvroSchemaVisitor.visit(fileSchema, new ApplyNameMapping(nameMapping)); + } + + return fileSchema; + } + public static boolean isTimestamptz(Schema schema) { LogicalType logicalType = schema.getLogicalType(); if (logicalType instanceof LogicalTypes.TimestampMillis @@ -290,6 +307,15 @@ public static int getKeyId(Schema schema) { return getId(schema, KEY_ID_PROP); } + static Integer keyId(Schema mapSchema) { + Object idObj = mapSchema.getObjectProp(KEY_ID_PROP); + if (idObj != null) { + return toInt(idObj); + } + + return null; + } + static Integer getKeyId( Schema schema, NameMapping nameMapping, Iterable parentFieldNames) { Preconditions.checkArgument( @@ -305,6 +331,15 @@ public static int getValueId(Schema schema) { return getId(schema, VALUE_ID_PROP); } + static Integer valueId(Schema mapSchema) { + Object idObj = mapSchema.getObjectProp(VALUE_ID_PROP); + if (idObj != null) { + return toInt(idObj); + } + + return null; + } + static Integer getValueId( Schema schema, NameMapping nameMapping, Iterable parentFieldNames) { Preconditions.checkArgument( @@ -320,6 +355,15 @@ public static int getElementId(Schema schema) { return getId(schema, ELEMENT_ID_PROP); } + static Integer elementId(Schema arraySchema) { + Object idObj = arraySchema.getObjectProp(ELEMENT_ID_PROP); + if (idObj != null) { + return toInt(idObj); + } + + return null; + } + static Integer getElementId( Schema schema, NameMapping nameMapping, Iterable parentFieldNames) { Preconditions.checkArgument( @@ -335,15 +379,17 @@ public static int getFieldId(Schema.Field field) { return id; } + static Integer fieldId(Schema.Field field) { + return getFieldId(field, null, null); + } + static Integer getFieldId( Schema.Field field, NameMapping nameMapping, Iterable parentFieldNames) { Object id = field.getObjectProp(FIELD_ID_PROP); if (id != null) { return toInt(id); } else if (nameMapping != null) { - List names = Lists.newArrayList(parentFieldNames); - names.add(field.name()); - MappedField mappedField = nameMapping.find(names); + MappedField mappedField = nameMapping.find(parentFieldNames, field.name()); if (mappedField != null) { return mappedField.id(); } diff --git a/core/src/main/java/org/apache/iceberg/avro/ProjectionDatumReader.java b/core/src/main/java/org/apache/iceberg/avro/ProjectionDatumReader.java index 3b04fe30db65..4a94e5175e4c 100644 --- a/core/src/main/java/org/apache/iceberg/avro/ProjectionDatumReader.java +++ b/core/src/main/java/org/apache/iceberg/avro/ProjectionDatumReader.java @@ -64,7 +64,8 @@ public void setSchema(Schema newFileSchema) { nameMapping = MappingUtil.create(expectedSchema); } Set projectedIds = TypeUtil.getProjectedIds(expectedSchema); - Schema prunedSchema = AvroSchemaUtil.pruneColumns(newFileSchema, projectedIds, nameMapping); + Schema schemaWithIds = AvroSchemaUtil.applyNameMapping(newFileSchema, nameMapping); + Schema prunedSchema = AvroSchemaUtil.pruneColumns(schemaWithIds, projectedIds); this.readSchema = AvroSchemaUtil.buildAvroProjection(prunedSchema, expectedSchema, renames); this.wrapped = newDatumReader(); } diff --git a/core/src/main/java/org/apache/iceberg/mapping/NameMapping.java b/core/src/main/java/org/apache/iceberg/mapping/NameMapping.java index aa5f5f3cdaca..642a77a4f2ea 100644 --- a/core/src/main/java/org/apache/iceberg/mapping/NameMapping.java +++ b/core/src/main/java/org/apache/iceberg/mapping/NameMapping.java @@ -23,6 +23,7 @@ import java.util.Map; import org.apache.iceberg.relocated.com.google.common.base.Joiner; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; /** Represents a mapping from external schema names to Iceberg type IDs. */ public class NameMapping implements Serializable { @@ -62,6 +63,14 @@ public MappedField find(List names) { return lazyFieldsByName().get(DOT.join(names)); } + public MappedField find(Iterable names) { + return lazyFieldsByName().get(DOT.join(names)); + } + + public MappedField find(Iterable names, String name) { + return lazyFieldsByName().get(DOT.join(Iterables.concat(names, ImmutableList.of(name)))); + } + public MappedFields asMappedFields() { return mapping; }