Skip to content

Commit

Permalink
Add default value implementation for GenericAvroReader
Browse files Browse the repository at this point in the history
  • Loading branch information
wmoustafa committed Jan 17, 2024
1 parent 0b08ddd commit 49a3b12
Show file tree
Hide file tree
Showing 5 changed files with 120 additions and 38 deletions.
5 changes: 5 additions & 0 deletions .palantir/revapi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -873,6 +873,11 @@ acceptedBreaks:
new: "method void org.apache.iceberg.encryption.Ciphers::<init>()"
justification: "Static utility class - should not have public constructor"
"1.4.0":
org.apache.iceberg:iceberg-api:
- code: "java.class.defaultSerializationChanged"
old: "class org.apache.iceberg.types.Types.NestedField"
new: "class org.apache.iceberg.types.Types.NestedField"
justification: "Add default value APIs."
org.apache.iceberg:iceberg-core:
- code: "java.class.defaultSerializationChanged"
old: "class org.apache.iceberg.mapping.NameMapping"
Expand Down
22 changes: 8 additions & 14 deletions core/src/main/java/org/apache/iceberg/avro/GenericAvroReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,30 +47,22 @@ public class GenericAvroReader<T>
private Schema fileSchema = null;
private ValueReader<T> reader = null;

public static <D> GenericAvroReader<D> create(org.apache.iceberg.Schema schema) {
return new GenericAvroReader<>(schema);
public static <D> GenericAvroReader<D> create(org.apache.iceberg.Schema expectedSchema) {
return new GenericAvroReader<>(expectedSchema);
}

public static <D> GenericAvroReader<D> create(Schema schema) {
return new GenericAvroReader<>(schema);
public static <D> GenericAvroReader<D> create(Schema readSchema) {
return new GenericAvroReader<>(readSchema);
}

public static <D> GenericAvroReader<D> create(org.apache.iceberg.Schema avroSchema, Schema icebrgSchema) {
return new GenericAvroReader<>(avroSchema, icebrgSchema);
}

GenericAvroReader(org.apache.iceberg.Schema readSchema) {
this.expectedType = readSchema.asStruct();
GenericAvroReader(org.apache.iceberg.Schema expectedSchema) {
this.expectedType = expectedSchema.asStruct();
}

GenericAvroReader(Schema readSchema) {
this.expectedType = AvroSchemaUtil.convert(readSchema).asStructType();
}

GenericAvroReader(org.apache.iceberg.Schema icebergReadSchema, Schema avroReadSchema) {
this.expectedType = AvroSchemaUtil.convert(avroReadSchema).asStructType();
}

@SuppressWarnings("unchecked")
private void initReader() {
this.reader =
Expand Down Expand Up @@ -148,6 +140,8 @@ public ValueReader<?> record(Type partner, Schema record, List<ValueReader<?>> f
Types.NestedField field = expected.field(fieldId);
if (constant != null) {
readPlan.add(Pair.of(pos, ValueReaders.constant(constant)));
} else if (field.initialDefault() != null) {
readPlan.add(Pair.of(pos, ValueReaders.constant(field.initialDefault(), field.type())));
} else if (fieldId == MetadataColumns.IS_DELETED.fieldId()) {
readPlan.add(Pair.of(pos, ValueReaders.constant(false)));
} else if (fieldId == MetadataColumns.ROW_POSITION.fieldId()) {
Expand Down
53 changes: 52 additions & 1 deletion core/src/main/java/org/apache/iceberg/avro/ValueReaders.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,10 @@
import org.apache.avro.util.Utf8;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.common.DynConstructors;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.UUIDUtil;
Expand All @@ -59,6 +61,10 @@ public static <T> ValueReader<T> constant(T value) {
return new ConstantReader<>(value);
}

public static <T> ValueReader<T> constant(T value, Type type) {
return new ConstantReader<>(value, type);
}

public static <T> ValueReader<T> replaceWithConstant(ValueReader<?> reader, T value) {
return new ReplaceWithConstantReader<>(reader, value);
}
Expand Down Expand Up @@ -200,14 +206,59 @@ public void skip(Decoder decoder) throws IOException {

private static class ConstantReader<T> implements ValueReader<T> {
private final T constant;
private final Type type;

private ConstantReader(T constant) {
this.constant = constant;
this.type = null;
}

private ConstantReader(T constant, Type type) {
this.constant = constant;
this.type = type;
}

@Override
public T read(Decoder decoder, Object reuse) throws IOException {
return constant;
// TODO: This cast implies that conversion should happen somewhere else. Keeping it here for
// illustration purposes.
return (T) toGenericRecord(type, constant);
}

// TODO: This method does not really belong here. Keeping it here for illustration purposes.
static Object toGenericRecord(Type type, Object data) {
// Recursively convert data to GenericRecord if type is a StructType.
// TODO: Rewrite this as a visitor.
if (type instanceof Types.StructType) {
Types.StructType structType = (Types.StructType) type;
GenericData.Record genericRecord = new GenericData.Record(AvroSchemaUtil.convert(type));
int index = 0;
for (Types.NestedField field : structType.fields()) {
genericRecord.put(
field.name(), toGenericRecord(field.type(), ((GenericRecord) data).get(index)));
index++;
}
return genericRecord;
} else if (type instanceof Types.MapType) {
Types.MapType mapType = (Types.MapType) type;
Map<Object, Object> genericMap =
Maps.newHashMapWithExpectedSize(((Map<Object, Object>) data).size());
for (Map.Entry<Object, Object> entry : ((Map<Object, Object>) data).entrySet()) {
genericMap.put(
toGenericRecord(mapType.keyType(), entry.getKey()),
toGenericRecord(mapType.valueType(), entry.getValue()));
}
return genericMap;
} else if (type instanceof Types.ListType) {
Types.ListType listType = (Types.ListType) type;
List<Object> genericList = Lists.newArrayListWithExpectedSize(((List<Object>) data).size());
for (Object element : (List<Object>) data) {
genericList.add(toGenericRecord(listType.elementType(), element));
}
return genericList;
} else {
return data;
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.iceberg.avro.ValueReader;
import org.apache.iceberg.avro.ValueReaders;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.IdentityPartitionConverters;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.types.Types.StructType;
import org.apache.iceberg.util.DateTimeUtil;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,23 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.data.avro;
package org.apache.iceberg.avro;

import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.apache.iceberg.types.Types.NestedField.required;

import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import org.apache.avro.generic.GenericData.Record;
import org.apache.iceberg.Files;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SingleValueParser;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.avro.AvroIterable;
import org.apache.iceberg.avro.GenericAvroReader;
import org.apache.iceberg.data.DataTestHelpers;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.IdentityPartitionConverters;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.junit.Assert;
Expand Down Expand Up @@ -128,34 +125,34 @@ public void testDefaultValueApplied() throws IOException {
required(999, "col1", Types.IntegerType.get()),
Types.NestedField.optional(1000, "col2", type, null, defaultValue, defaultValue));

Record expectedRecord = GenericRecord.create(readerSchema);
expectedRecord.set(0, 1);
expectedRecord.set(1, IdentityPartitionConverters.convertConstant(type, defaultValue));
Record expectedRecord = new Record(AvroSchemaUtil.convert(readerSchema.asStruct()));
expectedRecord.put(0, 1);
expectedRecord.put(1, toGenericRecord(type, defaultValue));

File testFile = temp.newFile();
Assert.assertTrue("Delete should succeed", testFile.delete());

try (FileAppender<Record> writer =
Avro.write(Files.localOutput(testFile))
.schema(writerSchema)
.createWriterFunc(DataWriter::create)
.createWriterFunc(GenericAvroWriter::create)
.named("test")
.build()) {
Record record = GenericRecord.create(writerSchema);
record.set(0, 1);
Record record = new Record(AvroSchemaUtil.convert(writerSchema.asStruct()));
record.put(0, 1);
writer.add(record);
}

List<Record> rows;
try (AvroIterable<Record> reader =
Avro.read(Files.localInput(testFile))
.project(readerSchema)
.createReaderFunc((expectedSchema, readSchema) -> GenericAvroReader.create(expectedSchema, readSchema))
.createResolvingReader(GenericAvroReader::create)
.build()) {
rows = Lists.newArrayList(reader);
}

DataTestHelpers.assertEquals(readerSchema.asStruct(), expectedRecord, rows.get(0));
AvroTestHelpers.assertEquals(readerSchema.asStruct(), expectedRecord, rows.get(0));
}
}

Expand All @@ -172,33 +169,69 @@ public void testDefaultValueNotApplied() throws IOException {
Types.NestedField.optional(1000, "col2", type, null, defaultValue, defaultValue));

// Create a record with null value for the column with default value
Record record = GenericRecord.create(readerSchema);
record.set(0, 1);
record.set(1, null);
Record expectedRecord = new Record(AvroSchemaUtil.convert(readerSchema.asStruct()));
expectedRecord.put(0, 1);
expectedRecord.put(1, null);

File testFile = temp.newFile();
Assert.assertTrue("Delete should succeed", testFile.delete());

try (FileAppender<Record> writer =
Avro.write(Files.localOutput(testFile))
.schema(readerSchema)
.createWriterFunc(DataWriter::create)
.createWriterFunc(GenericAvroWriter::create)
.named("test")
.build()) {
writer.add(record);
writer.add(expectedRecord);
}

List<Record> rows;
try (AvroIterable<Record> reader =
Avro.read(Files.localInput(testFile))
.project(readerSchema)
.createReaderFunc(DataReader::create)
.createReaderFunc(GenericAvroReader::create)
.build()) {
rows = Lists.newArrayList(reader);
}

// Existence of default value should not affect the read result
DataTestHelpers.assertEquals(readerSchema.asStruct(), record, rows.get(0));
AvroTestHelpers.assertEquals(readerSchema.asStruct(), expectedRecord, rows.get(0));
}
}

// TODO: Merge the conversion mechanism with the end state of
// ValueReaders.ConstantReader.toGenericRecord().
private Object toGenericRecord(Type type, Object data) {
// Recursively convert data to GenericRecord if type is a StructType.
if (type instanceof Types.StructType) {
Types.StructType structType = (Types.StructType) type;
Record genericRecord = new Record(AvroSchemaUtil.convert(type));
int index = 0;
for (Types.NestedField field : structType.fields()) {
genericRecord.put(
field.name(), toGenericRecord(field.type(), ((GenericRecord) data).get(index)));
index++;
}
return genericRecord;
} else if (type instanceof Types.MapType) {
Types.MapType mapType = (Types.MapType) type;
Map<Object, Object> genericMap =
Maps.newHashMapWithExpectedSize(((Map<Object, Object>) data).size());
for (Map.Entry<Object, Object> entry : ((Map<Object, Object>) data).entrySet()) {
genericMap.put(
toGenericRecord(mapType.keyType(), entry.getKey()),
toGenericRecord(mapType.valueType(), entry.getValue()));
}
return genericMap;
} else if (type instanceof Types.ListType) {
Types.ListType listType = (Types.ListType) type;
List<Object> genericList = Lists.newArrayListWithExpectedSize(((List<Object>) data).size());
for (Object element : (List<Object>) data) {
genericList.add(toGenericRecord(listType.elementType(), element));
}
return genericList;
} else {
return data;
}
}
}

0 comments on commit 49a3b12

Please sign in to comment.