Skip to content

Commit

Permalink
Address new comments
Browse files Browse the repository at this point in the history
  • Loading branch information
ajantha-bhat committed Dec 15, 2023
1 parent 3124544 commit 5bd45e1
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,13 @@
*/
package org.apache.iceberg;

import java.io.Serializable;

/**
* Represents a partition statistics file that can be used to read table data more efficiently.
*
* <p>Statistics are informational. A reader can choose to ignore statistics information. Statistics
* support is not required to read the table correctly.
*/
public interface PartitionStatisticsFile extends Serializable {
public interface PartitionStatisticsFile {
/** ID of the Iceberg table's snapshot the partition statistics file is associated with. */
long snapshotId();

Expand Down
20 changes: 10 additions & 10 deletions core/src/main/java/org/apache/iceberg/SetPartitionStatistics.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,31 +19,31 @@
package org.apache.iceberg;

import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;

public class SetPartitionStatistics implements UpdatePartitionStatistics {
private final TableOperations ops;
private final Set<PartitionStatisticsFile> partitionStatisticsToSet = Sets.newHashSet();
private final Set<Long> partitionStatisticsToRemove = Sets.newHashSet();
private final Map<Long, PartitionStatisticsFile> statsToSet = Maps.newHashMap();
private final Set<Long> statsToRemove = Sets.newHashSet();

public SetPartitionStatistics(TableOperations ops) {
this.ops = ops;
}

@Override
public UpdatePartitionStatistics setPartitionStatistics(
PartitionStatisticsFile partitionStatisticsFile) {
Preconditions.checkArgument(
null != partitionStatisticsFile, "partition statistics file must not be null");
partitionStatisticsToSet.add(partitionStatisticsFile);
public UpdatePartitionStatistics setPartitionStatistics(PartitionStatisticsFile file) {
Preconditions.checkArgument(null != file, "partition statistics file must not be null");
statsToSet.put(file.snapshotId(), file);
return this;
}

@Override
public UpdatePartitionStatistics removePartitionStatistics(long snapshotId) {
partitionStatisticsToRemove.add(snapshotId);
statsToRemove.add(snapshotId);
return this;
}

Expand All @@ -61,8 +61,8 @@ public void commit() {

private TableMetadata internalApply(TableMetadata base) {
TableMetadata.Builder builder = TableMetadata.buildFrom(base);
partitionStatisticsToSet.forEach(builder::setPartitionStatistics);
partitionStatisticsToRemove.forEach(builder::removePartitionStatistics);
statsToSet.values().forEach(builder::setPartitionStatistics);
statsToRemove.forEach(builder::removePartitionStatistics);
return builder.build();
}
}
35 changes: 15 additions & 20 deletions core/src/main/java/org/apache/iceberg/TableMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -955,25 +955,14 @@ private Builder(TableMetadata base) {
this.previousFileLocation = base.metadataFileLocation;
this.previousFiles = base.previousFiles;
this.refs = Maps.newHashMap(base.refs);
this.statisticsFiles = statsFileBySnapshotID(base);
this.partitionStatisticsFiles = partitionStatsFileBySnapshotID(base);
this.statisticsFiles = indexStatistics(base.statisticsFiles);
this.partitionStatisticsFiles = indexPartitionStatistics(base.partitionStatisticsFiles);
this.snapshotsById = Maps.newHashMap(base.snapshotsById);
this.schemasById = Maps.newHashMap(base.schemasById);
this.specsById = Maps.newHashMap(base.specsById);
this.sortOrdersById = Maps.newHashMap(base.sortOrdersById);
}

private static Map<Long, List<StatisticsFile>> statsFileBySnapshotID(TableMetadata base) {
return base.statisticsFiles.stream()
.collect(Collectors.groupingBy(StatisticsFile::snapshotId));
}

private static Map<Long, List<PartitionStatisticsFile>> partitionStatsFileBySnapshotID(
TableMetadata base) {
return base.partitionStatisticsFiles.stream()
.collect(Collectors.groupingBy(PartitionStatisticsFile::snapshotId));
}

public Builder withMetadataLocation(String newMetadataLocation) {
this.metadataLocation = newMetadataLocation;
return this;
Expand Down Expand Up @@ -1281,7 +1270,6 @@ public Builder setStatistics(long snapshotId, StatisticsFile statisticsFile) {
}

public Builder removeStatistics(long snapshotId) {
Preconditions.checkNotNull(snapshotId, "snapshotId is null");
if (statisticsFiles.remove(snapshotId) == null) {
return this;
}
Expand All @@ -1307,16 +1295,14 @@ public Builder suppressHistoricalSnapshots() {
return this;
}

public Builder setPartitionStatistics(PartitionStatisticsFile partitionStatisticsFile) {
Preconditions.checkNotNull(partitionStatisticsFile, "partition statistics file is null");
partitionStatisticsFiles.put(
partitionStatisticsFile.snapshotId(), ImmutableList.of(partitionStatisticsFile));
changes.add(new MetadataUpdate.SetPartitionStatistics(partitionStatisticsFile));
public Builder setPartitionStatistics(PartitionStatisticsFile file) {
Preconditions.checkNotNull(file, "partition statistics file is null");
partitionStatisticsFiles.put(file.snapshotId(), ImmutableList.of(file));
changes.add(new MetadataUpdate.SetPartitionStatistics(file));
return this;
}

public Builder removePartitionStatistics(long snapshotId) {
Preconditions.checkNotNull(snapshotId, "snapshotId is null");
if (partitionStatisticsFiles.remove(snapshotId) == null) {
return this;
}
Expand Down Expand Up @@ -1776,6 +1762,15 @@ private static List<HistoryEntry> updateSnapshotLog(
return newSnapshotLog;
}

private static Map<Long, List<StatisticsFile>> indexStatistics(List<StatisticsFile> files) {
return files.stream().collect(Collectors.groupingBy(StatisticsFile::snapshotId));
}

private static Map<Long, List<PartitionStatisticsFile>> indexPartitionStatistics(
List<PartitionStatisticsFile> files) {
return files.stream().collect(Collectors.groupingBy(PartitionStatisticsFile::snapshotId));
}

private boolean isAddedSnapshot(long snapshotId) {
return changes(MetadataUpdate.AddSnapshot.class)
.anyMatch(add -> add.snapshot().snapshotId() == snapshotId);
Expand Down
19 changes: 8 additions & 11 deletions core/src/main/java/org/apache/iceberg/TableMetadataParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,7 @@ public static TableMetadata fromJson(String metadataLocation, JsonNode node) {

List<PartitionStatisticsFile> partitionStatisticsFiles;
if (node.has(PARTITION_STATISTICS)) {
partitionStatisticsFiles = partitionStatisticsFilesFromJson(node.get(PARTITION_STATISTICS));
partitionStatisticsFiles = partitionStatsFilesFromJson(node.get(PARTITION_STATISTICS));
} else {
partitionStatisticsFiles = ImmutableList.of();
}
Expand Down Expand Up @@ -577,20 +577,17 @@ private static List<StatisticsFile> statisticsFilesFromJson(JsonNode statisticsF
return statisticsFilesBuilder.build();
}

private static List<PartitionStatisticsFile> partitionStatisticsFilesFromJson(
JsonNode partitionStatisticsFilesList) {
private static List<PartitionStatisticsFile> partitionStatsFilesFromJson(JsonNode filesList) {
Preconditions.checkArgument(
partitionStatisticsFilesList.isArray(),
filesList.isArray(),
"Cannot parse partition statistics files from non-array: %s",
partitionStatisticsFilesList);
filesList);

ImmutableList.Builder<PartitionStatisticsFile> partitionStatisticsFileBuilder =
ImmutableList.builder();
for (JsonNode partitionStatisticsFile : partitionStatisticsFilesList) {
partitionStatisticsFileBuilder.add(
PartitionStatisticsFileParser.fromJson(partitionStatisticsFile));
ImmutableList.Builder<PartitionStatisticsFile> statsFileBuilder = ImmutableList.builder();
for (JsonNode partitionStatsFile : filesList) {
statsFileBuilder.add(PartitionStatisticsFileParser.fromJson(partitionStatsFile));
}

return partitionStatisticsFileBuilder.build();
return statsFileBuilder.build();
}
}

0 comments on commit 5bd45e1

Please sign in to comment.