Skip to content

Commit

Permalink
Spark 3.5: Use DataFile constants in SparkDataFile (#8936)
Browse files Browse the repository at this point in the history
  • Loading branch information
aokolnychyi authored Nov 2, 2023
1 parent 94de985 commit 4a3d266
Showing 1 changed file with 21 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,16 @@ public SparkDataFile(Types.StructType type, StructType sparkType) {

public SparkDataFile(
Types.StructType type, Types.StructType projectedType, StructType sparkType) {
this.lowerBoundsType = type.fieldType("lower_bounds");
this.upperBoundsType = type.fieldType("upper_bounds");
this.keyMetadataType = type.fieldType("key_metadata");
this.lowerBoundsType = type.fieldType(DataFile.LOWER_BOUNDS.name());
this.upperBoundsType = type.fieldType(DataFile.UPPER_BOUNDS.name());
this.keyMetadataType = type.fieldType(DataFile.KEY_METADATA.name());

Types.StructType partitionType = type.fieldType("partition").asStructType();
Types.StructType partitionType = type.fieldType(DataFile.PARTITION_NAME).asStructType();
this.wrappedPartition = new SparkStructLike(partitionType);

if (projectedType != null) {
Types.StructType projectedPartitionType = projectedType.fieldType("partition").asStructType();
Types.StructType projectedPartitionType =
projectedType.fieldType(DataFile.PARTITION_NAME).asStructType();
this.partitionProjection =
StructProjection.create(partitionType, projectedPartitionType).wrap(wrappedPartition);
} else {
Expand All @@ -84,20 +85,20 @@ public SparkDataFile(
positions.put(fieldName, fieldPosition(fieldName, sparkType));
});

filePathPosition = positions.get("file_path");
fileFormatPosition = positions.get("file_format");
partitionPosition = positions.get("partition");
recordCountPosition = positions.get("record_count");
fileSizeInBytesPosition = positions.get("file_size_in_bytes");
columnSizesPosition = positions.get("column_sizes");
valueCountsPosition = positions.get("value_counts");
nullValueCountsPosition = positions.get("null_value_counts");
nanValueCountsPosition = positions.get("nan_value_counts");
lowerBoundsPosition = positions.get("lower_bounds");
upperBoundsPosition = positions.get("upper_bounds");
keyMetadataPosition = positions.get("key_metadata");
splitOffsetsPosition = positions.get("split_offsets");
sortOrderIdPosition = positions.get("sort_order_id");
this.filePathPosition = positions.get(DataFile.FILE_PATH.name());
this.fileFormatPosition = positions.get(DataFile.FILE_FORMAT.name());
this.partitionPosition = positions.get(DataFile.PARTITION_NAME);
this.recordCountPosition = positions.get(DataFile.RECORD_COUNT.name());
this.fileSizeInBytesPosition = positions.get(DataFile.FILE_SIZE.name());
this.columnSizesPosition = positions.get(DataFile.COLUMN_SIZES.name());
this.valueCountsPosition = positions.get(DataFile.VALUE_COUNTS.name());
this.nullValueCountsPosition = positions.get(DataFile.NULL_VALUE_COUNTS.name());
this.nanValueCountsPosition = positions.get(DataFile.NAN_VALUE_COUNTS.name());
this.lowerBoundsPosition = positions.get(DataFile.LOWER_BOUNDS.name());
this.upperBoundsPosition = positions.get(DataFile.UPPER_BOUNDS.name());
this.keyMetadataPosition = positions.get(DataFile.KEY_METADATA.name());
this.splitOffsetsPosition = positions.get(DataFile.SPLIT_OFFSETS.name());
this.sortOrderIdPosition = positions.get(DataFile.SORT_ORDER_ID.name());
}

public SparkDataFile wrap(Row row) {
Expand Down Expand Up @@ -211,7 +212,7 @@ private int fieldPosition(String name, StructType sparkType) {
return sparkType.fieldIndex(name);
} catch (IllegalArgumentException e) {
// the partition field is absent for unpartitioned tables
if (name.equals("partition") && wrappedPartition.size() == 0) {
if (name.equals(DataFile.PARTITION_NAME) && wrappedPartition.size() == 0) {
return -1;
}
throw e;
Expand Down

0 comments on commit 4a3d266

Please sign in to comment.