Skip to content

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
wmoustafa committed Aug 8, 2023
1 parent 0629c8a commit 7ccf868
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 14 deletions.
9 changes: 9 additions & 0 deletions .palantir/revapi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,15 @@ acceptedBreaks:
- code: "java.field.removedWithConstant"
old: "field org.apache.iceberg.TableProperties.HMS_TABLE_OWNER"
justification: "Removing deprecations for 1.3.0"
- code: "java.method.numberOfParametersChanged"
old: "method void org.apache.iceberg.avro.ValueReaders.StructReader<S>::<init>(java.util.List<org.apache.iceberg.avro.ValueReader<?>>,\
\ org.apache.iceberg.types.Types.StructType, java.util.Map<java.lang.Integer,\
\ ?>)"
new: "method void org.apache.iceberg.avro.ValueReaders.StructReader<S>::<init>(java.util.List<org.apache.iceberg.avro.ValueReader<?>>,\
\ org.apache.iceberg.types.Types.StructType, java.util.Map<java.lang.Integer,\
\ ?>, java.util.Map<java.lang.Integer, ?>)"
justification: "Added idToDefault parameter to accommodate passing the default\
\ map from the specific reader"
- code: "java.method.parameterTypeChanged"
old: "parameter void org.apache.iceberg.actions.RewriteDataFilesCommitManager.CommitService::offer(===org.apache.iceberg.actions.RewriteFileGroup===)"
new: "parameter void org.apache.iceberg.actions.BaseCommitService<T>::offer(===T===)\
Expand Down
35 changes: 24 additions & 11 deletions core/src/main/java/org/apache/iceberg/avro/ValueReaders.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import org.apache.avro.util.Utf8;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.common.DynConstructors;
import org.apache.iceberg.data.IdentityPartitionConverters;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
Expand Down Expand Up @@ -602,13 +601,16 @@ protected StructReader(List<ValueReader<?>> readers, Schema schema) {
}

protected StructReader(
List<ValueReader<?>> readers, Types.StructType struct, Map<Integer, ?> idToConstant) {
List<ValueReader<?>> readers,
Types.StructType struct,
Map<Integer, ?> idToConstant,
Map<Integer, ?> idToDefault) {
this.readers = readers.toArray(new ValueReader[0]);

List<Types.NestedField> fields = struct.fields();
List<Integer> constantPositionsList = Lists.newArrayListWithCapacity(fields.size());
List<Object> constantValuesList = Lists.newArrayListWithCapacity(fields.size());
List<Integer> defaultValuesPositionList = Lists.newArrayListWithCapacity(fields.size());
List<Integer> defaultPositionList = Lists.newArrayListWithCapacity(fields.size());
List<Object> defaultValuesList = Lists.newArrayListWithCapacity(fields.size());
for (int pos = 0; pos < fields.size(); pos += 1) {
Types.NestedField field = fields.get(pos);
Expand All @@ -622,19 +624,28 @@ protected StructReader(
constantPositionsList.add(pos);
constantValuesList.add(false);
} else if (field.initialDefault() != null) {
// Add a constant value for fields that have a default value.
// In the {@link #read()} method, this will be leveraged only if there is no corresponding
// reader.
defaultValuesPositionList.add(pos);
defaultValuesList.add(
IdentityPartitionConverters.convertConstant(field.type(), field.initialDefault()));
if (idToDefault.containsKey(field.fieldId())) {
// Add a constant value for fields that have a default value.
// In the {@link #read()} method, this will be leveraged only if there is no
// corresponding
// reader.
defaultPositionList.add(pos);
defaultValuesList.add(idToDefault.get(field.fieldId()));
} else {
// Throw an exception if the map does not contain a default value for that field.
throw new UnsupportedOperationException(
"Default value not found for field: "
+ field.name()
+ " (field ID: "
+ field.fieldId()
+ ").");
}
}
}

this.constantPositions = constantPositionsList.stream().mapToInt(Integer::intValue).toArray();
this.constantValues = constantValuesList.toArray();
this.defaultPositions =
defaultValuesPositionList.stream().mapToInt(Integer::intValue).toArray();
this.defaultPositions = defaultPositionList.stream().mapToInt(Integer::intValue).toArray();
this.defaultValues = defaultValuesList.toArray();
}

Expand Down Expand Up @@ -680,13 +691,15 @@ public S read(Decoder decoder, Object reuse) throws IOException {
set(struct, field.pos(), readers[field.pos()].read(decoder, reusedValue));
existingFieldPositionsSet.add(field.pos());
}

// Set default values
for (int i = 0; i < defaultPositions.length; i += 1) {
// Set default values only if the field does not exist in the data.
if (!existingFieldPositionsSet.contains(defaultPositions[i])) {
set(struct, defaultPositions[i], defaultValues[i]);
}
}

} else {
for (int i = 0; i < readers.length; i += 1) {
Object reusedValue = get(struct, i);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@
import org.apache.iceberg.avro.ValueReader;
import org.apache.iceberg.avro.ValueReaders;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.IdentityPartitionConverters;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.types.Types.StructType;
import org.apache.iceberg.util.DateTimeUtil;

Expand Down Expand Up @@ -106,7 +109,7 @@ private static class GenericRecordReader extends ValueReaders.StructReader<Recor

private GenericRecordReader(
List<ValueReader<?>> readers, StructType struct, Map<Integer, ?> idToConstant) {
super(readers, struct, idToConstant);
super(readers, struct, idToConstant, idToDefault(struct));
this.structType = struct;
}

Expand All @@ -128,5 +131,17 @@ protected Object get(Record struct, int pos) {
protected void set(Record struct, int pos, Object value) {
struct.set(pos, value);
}

private static Map<Integer, ?> idToDefault(StructType struct) {
Map<Integer, Object> result = Maps.newHashMap();
for (Types.NestedField field : struct.fields()) {
if (field.initialDefault() != null) {
result.put(
field.fieldId(),
IdentityPartitionConverters.convertConstant(field.type(), field.initialDefault()));
}
}
return result;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.flink.table.data.TimestampData;
import org.apache.iceberg.avro.ValueReader;
import org.apache.iceberg.avro.ValueReaders;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Types;
Expand Down Expand Up @@ -287,7 +288,8 @@ private static class StructReader extends ValueReaders.StructReader<RowData> {

private StructReader(
List<ValueReader<?>> readers, Types.StructType struct, Map<Integer, ?> idToConstant) {
super(readers, struct, idToConstant);
// TODO: Support passing default value map.
super(readers, struct, idToConstant, ImmutableMap.of());
this.numFields = readers.size();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.avro.util.Utf8;
import org.apache.iceberg.avro.ValueReader;
import org.apache.iceberg.avro.ValueReaders;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.UUIDUtil;
Expand Down Expand Up @@ -254,7 +255,8 @@ static class StructReader extends ValueReaders.StructReader<InternalRow> {

protected StructReader(
List<ValueReader<?>> readers, Types.StructType struct, Map<Integer, ?> idToConstant) {
super(readers, struct, idToConstant);
// TODO: Support passing default value map
super(readers, struct, idToConstant, ImmutableMap.of());
this.numFields = readers.size();
}

Expand Down

0 comments on commit 7ccf868

Please sign in to comment.