Skip to content

Commit

Permalink
Parquet: Move to ValueReader generation to a visitor (#9063)
Browse files Browse the repository at this point in the history
* Parquet: Move to the visitor

* Add one more edge case

* Thanks Amogh!
  • Loading branch information
Fokko authored Jan 8, 2024
1 parent be155d7 commit 13e1965
Showing 1 changed file with 116 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.Schema;
Expand All @@ -43,6 +44,7 @@
import org.apache.iceberg.types.Types;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
Expand Down Expand Up @@ -108,6 +110,113 @@ public ParquetValueReader<?> struct(
}
}

private class LogicalTypeAnnotationParquetValueReaderVisitor
implements LogicalTypeAnnotation.LogicalTypeAnnotationVisitor<ParquetValueReader<?>> {

private final ColumnDescriptor desc;
private final org.apache.iceberg.types.Type.PrimitiveType expected;
private final PrimitiveType primitive;

LogicalTypeAnnotationParquetValueReaderVisitor(
ColumnDescriptor desc,
org.apache.iceberg.types.Type.PrimitiveType expected,
PrimitiveType primitive) {
this.desc = desc;
this.expected = expected;
this.primitive = primitive;
}

@Override
public Optional<ParquetValueReader<?>> visit(
LogicalTypeAnnotation.StringLogicalTypeAnnotation stringLogicalType) {
return Optional.of(new ParquetValueReaders.StringReader(desc));
}

@Override
public Optional<ParquetValueReader<?>> visit(
LogicalTypeAnnotation.EnumLogicalTypeAnnotation enumLogicalType) {
return Optional.of(new ParquetValueReaders.StringReader(desc));
}

@Override
public Optional<ParquetValueReader<?>> visit(DecimalLogicalTypeAnnotation decimalLogicalType) {
switch (primitive.getPrimitiveTypeName()) {
case BINARY:
case FIXED_LEN_BYTE_ARRAY:
return Optional.of(
new ParquetValueReaders.BinaryAsDecimalReader(desc, decimalLogicalType.getScale()));
case INT64:
return Optional.of(
new ParquetValueReaders.LongAsDecimalReader(desc, decimalLogicalType.getScale()));
case INT32:
return Optional.of(
new ParquetValueReaders.IntegerAsDecimalReader(desc, decimalLogicalType.getScale()));
default:
throw new UnsupportedOperationException(
"Unsupported base type for decimal: " + primitive.getPrimitiveTypeName());
}
}

@Override
public Optional<ParquetValueReader<?>> visit(
LogicalTypeAnnotation.DateLogicalTypeAnnotation dateLogicalType) {
return Optional.of(new DateReader(desc));
}

@Override
public Optional<ParquetValueReader<?>> visit(
LogicalTypeAnnotation.TimeLogicalTypeAnnotation timeLogicalType) {
if (timeLogicalType.getUnit() == LogicalTypeAnnotation.TimeUnit.MICROS) {
return Optional.of(new TimeReader(desc));
} else if (timeLogicalType.getUnit() == LogicalTypeAnnotation.TimeUnit.MILLIS) {
return Optional.of(new TimeMillisReader(desc));
}

return Optional.empty();
}

@Override
public Optional<ParquetValueReader<?>> visit(
LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestampLogicalType) {
if (timestampLogicalType.getUnit() == LogicalTypeAnnotation.TimeUnit.MICROS) {
Types.TimestampType tsMicrosType = (Types.TimestampType) expected;
return tsMicrosType.shouldAdjustToUTC()
? Optional.of(new TimestamptzReader(desc))
: Optional.of(new TimestampReader(desc));
} else if (timestampLogicalType.getUnit() == LogicalTypeAnnotation.TimeUnit.MILLIS) {
Types.TimestampType tsMillisType = (Types.TimestampType) expected;
return tsMillisType.shouldAdjustToUTC()
? Optional.of(new TimestamptzMillisReader(desc))
: Optional.of(new TimestampMillisReader(desc));
}

return LogicalTypeAnnotation.LogicalTypeAnnotationVisitor.super.visit(timestampLogicalType);
}

@Override
public Optional<ParquetValueReader<?>> visit(
LogicalTypeAnnotation.IntLogicalTypeAnnotation intLogicalType) {
if (intLogicalType.getBitWidth() == 64) {
return Optional.of(new ParquetValueReaders.UnboxedReader<>(desc));
}
return (expected.typeId() == org.apache.iceberg.types.Type.TypeID.LONG)
? Optional.of(new ParquetValueReaders.IntAsLongReader(desc))
: Optional.of(new ParquetValueReaders.UnboxedReader<>(desc));
}

@Override
public Optional<ParquetValueReader<?>> visit(
LogicalTypeAnnotation.JsonLogicalTypeAnnotation jsonLogicalType) {
return Optional.of(new ParquetValueReaders.StringReader(desc));
}

@Override
public Optional<ParquetValueReader<?>> visit(
LogicalTypeAnnotation.BsonLogicalTypeAnnotation bsonLogicalType) {
return Optional.of(new ParquetValueReaders.BytesReader(desc));
}
}

private class ReadBuilder extends TypeWithSchemaVisitor<ParquetValueReader<?>> {
private final MessageType type;
private final Map<Integer, ?> idToConstant;
Expand Down Expand Up @@ -240,62 +349,13 @@ public ParquetValueReader<?> primitive(
ColumnDescriptor desc = type.getColumnDescription(currentPath());

if (primitive.getOriginalType() != null) {
switch (primitive.getOriginalType()) {
case ENUM:
case JSON:
case UTF8:
return new ParquetValueReaders.StringReader(desc);
case INT_8:
case INT_16:
case INT_32:
if (expected.typeId() == org.apache.iceberg.types.Type.TypeID.LONG) {
return new ParquetValueReaders.IntAsLongReader(desc);
} else {
return new ParquetValueReaders.UnboxedReader<>(desc);
}
case INT_64:
return new ParquetValueReaders.UnboxedReader<>(desc);
case DATE:
return new DateReader(desc);
case TIMESTAMP_MICROS:
Types.TimestampType tsMicrosType = (Types.TimestampType) expected;
if (tsMicrosType.shouldAdjustToUTC()) {
return new TimestamptzReader(desc);
} else {
return new TimestampReader(desc);
}
case TIMESTAMP_MILLIS:
Types.TimestampType tsMillisType = (Types.TimestampType) expected;
if (tsMillisType.shouldAdjustToUTC()) {
return new TimestamptzMillisReader(desc);
} else {
return new TimestampMillisReader(desc);
}
case TIME_MICROS:
return new TimeReader(desc);
case TIME_MILLIS:
return new TimeMillisReader(desc);
case DECIMAL:
DecimalLogicalTypeAnnotation decimal =
(DecimalLogicalTypeAnnotation) primitive.getLogicalTypeAnnotation();
switch (primitive.getPrimitiveTypeName()) {
case BINARY:
case FIXED_LEN_BYTE_ARRAY:
return new ParquetValueReaders.BinaryAsDecimalReader(desc, decimal.getScale());
case INT64:
return new ParquetValueReaders.LongAsDecimalReader(desc, decimal.getScale());
case INT32:
return new ParquetValueReaders.IntegerAsDecimalReader(desc, decimal.getScale());
default:
throw new UnsupportedOperationException(
"Unsupported base type for decimal: " + primitive.getPrimitiveTypeName());
}
case BSON:
return new ParquetValueReaders.BytesReader(desc);
default:
throw new UnsupportedOperationException(
"Unsupported logical type: " + primitive.getOriginalType());
}
return primitive
.getLogicalTypeAnnotation()
.accept(new LogicalTypeAnnotationParquetValueReaderVisitor(desc, expected, primitive))
.orElseThrow(
() ->
new UnsupportedOperationException(
"Unsupported logical type: " + primitive.getLogicalTypeAnnotation()));
}

switch (primitive.getPrimitiveTypeName()) {
Expand Down

0 comments on commit 13e1965

Please sign in to comment.