Skip to content

Commit

Permalink
Core: Remove partition statistics files during purge table (apache#9409)
Browse files Browse the repository at this point in the history
  • Loading branch information
ajantha-bhat authored and devangjhabakh committed Apr 22, 2024
1 parent 67faac4 commit 85cf56f
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 11 deletions.
5 changes: 5 additions & 0 deletions core/src/main/java/org/apache/iceberg/CatalogUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,11 @@ public static void dropTableData(FileIO io, TableMetadata metadata) {
Iterables.transform(metadata.statisticsFiles(), StatisticsFile::path),
"statistics",
true);
deleteFiles(
io,
Iterables.transform(metadata.partitionStatisticsFiles(), PartitionStatisticsFile::path),
"partition statistics",
true);
deleteFile(io, metadata.metadataFileLocation(), "metadata");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.iceberg.hadoop;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.List;
Expand All @@ -29,12 +30,15 @@
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.GenericBlobMetadata;
import org.apache.iceberg.GenericStatisticsFile;
import org.apache.iceberg.ImmutableGenericPartitionStatisticsFile;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.PartitionStatisticsFile;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.StatisticsFile;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.PositionOutputStream;
import org.apache.iceberg.puffin.Blob;
import org.apache.iceberg.puffin.Puffin;
import org.apache.iceberg.puffin.PuffinWriter;
Expand All @@ -60,17 +64,31 @@ public void dropTableDataDeletesExpectedFiles() throws IOException {
table.io());
table.updateStatistics().setStatistics(statisticsFile.snapshotId(), statisticsFile).commit();

TableMetadata tableMetadata = readMetadataVersion(4);
PartitionStatisticsFile partitionStatisticsFile =
writePartitionStatsFile(
table.currentSnapshot().snapshotId(),
tableLocation + "/metadata/" + UUID.randomUUID() + ".stats",
table.io());
table.updatePartitionStatistics().setPartitionStatistics(partitionStatisticsFile).commit();

TableMetadata tableMetadata = readMetadataVersion(5);
Set<Snapshot> snapshotSet = Sets.newHashSet(table.snapshots());

Set<String> manifestListLocations = manifestListLocations(snapshotSet);
Set<String> manifestLocations = manifestLocations(snapshotSet, table.io());
Set<String> dataLocations = dataLocations(snapshotSet, table.io());
Set<String> metadataLocations = metadataLocations(tableMetadata);
Set<String> statsLocations = statsLocations(tableMetadata);
Set<String> partitionStatsLocations = partitionStatsLocations(tableMetadata);

Assertions.assertThat(manifestListLocations).as("should have 2 manifest lists").hasSize(2);
Assertions.assertThat(metadataLocations).as("should have 4 metadata locations").hasSize(4);
Assertions.assertThat(statsLocations).as("should have 1 stats file").hasSize(1);
Assertions.assertThat(metadataLocations).as("should have 5 metadata locations").hasSize(5);
Assertions.assertThat(statsLocations)
.as("should have 1 stats file")
.containsExactly(statisticsFile.path());
Assertions.assertThat(partitionStatsLocations)
.as("should have 1 partition stats file")
.containsExactly(partitionStatisticsFile.path());

FileIO fileIO = Mockito.mock(FileIO.class);
Mockito.when(fileIO.newInputFile(Mockito.anyString()))
Expand All @@ -90,7 +108,8 @@ public void dropTableDataDeletesExpectedFiles() throws IOException {
+ manifestLocations.size()
+ dataLocations.size()
+ metadataLocations.size()
+ statsLocations.size()))
+ statsLocations.size()
+ partitionStatsLocations.size()))
.deleteFile(argumentCaptor.capture());

List<String> deletedPaths = argumentCaptor.getAllValues();
Expand All @@ -107,8 +126,11 @@ public void dropTableDataDeletesExpectedFiles() throws IOException {
.as("should contain all created metadata locations")
.containsAll(metadataLocations);
Assertions.assertThat(deletedPaths)
.as("should contain all created statistic")
.as("should contain all created statistics")
.containsAll(statsLocations);
Assertions.assertThat(deletedPaths)
.as("should contain all created partition stats files")
.containsAll(partitionStatsLocations);
}

@Test
Expand Down Expand Up @@ -179,25 +201,25 @@ public void shouldNotDropDataFilesIfGcNotEnabled() {
.containsAll(metadataLocations);
}

private Set<String> manifestListLocations(Set<Snapshot> snapshotSet) {
private static Set<String> manifestListLocations(Set<Snapshot> snapshotSet) {
return snapshotSet.stream().map(Snapshot::manifestListLocation).collect(Collectors.toSet());
}

private Set<String> manifestLocations(Set<Snapshot> snapshotSet, FileIO io) {
private static Set<String> manifestLocations(Set<Snapshot> snapshotSet, FileIO io) {
return snapshotSet.stream()
.flatMap(snapshot -> snapshot.allManifests(io).stream())
.map(ManifestFile::path)
.collect(Collectors.toSet());
}

private Set<String> dataLocations(Set<Snapshot> snapshotSet, FileIO io) {
private static Set<String> dataLocations(Set<Snapshot> snapshotSet, FileIO io) {
return snapshotSet.stream()
.flatMap(snapshot -> StreamSupport.stream(snapshot.addedDataFiles(io).spliterator(), false))
.map(dataFile -> dataFile.path().toString())
.collect(Collectors.toSet());
}

private Set<String> metadataLocations(TableMetadata tableMetadata) {
private static Set<String> metadataLocations(TableMetadata tableMetadata) {
Set<String> metadataLocations =
tableMetadata.previousFiles().stream()
.map(TableMetadata.MetadataLogEntry::file)
Expand All @@ -206,13 +228,13 @@ private Set<String> metadataLocations(TableMetadata tableMetadata) {
return metadataLocations;
}

private Set<String> statsLocations(TableMetadata tableMetadata) {
private static Set<String> statsLocations(TableMetadata tableMetadata) {
return tableMetadata.statisticsFiles().stream()
.map(StatisticsFile::path)
.collect(Collectors.toSet());
}

private StatisticsFile writeStatsFile(
private static StatisticsFile writeStatsFile(
long snapshotId, long snapshotSequenceNumber, String statsLocation, FileIO fileIO)
throws IOException {
try (PuffinWriter puffinWriter = Puffin.write(fileIO.newOutputFile(statsLocation)).build()) {
Expand All @@ -235,4 +257,27 @@ private StatisticsFile writeStatsFile(
.collect(ImmutableList.toImmutableList()));
}
}

private static PartitionStatisticsFile writePartitionStatsFile(
long snapshotId, String statsLocation, FileIO fileIO) {
PositionOutputStream positionOutputStream;
try {
positionOutputStream = fileIO.newOutputFile(statsLocation).create();
positionOutputStream.close();
} catch (IOException e) {
throw new UncheckedIOException(e);
}

return ImmutableGenericPartitionStatisticsFile.builder()
.snapshotId(snapshotId)
.fileSizeInBytes(42L)
.path(statsLocation)
.build();
}

private static Set<String> partitionStatsLocations(TableMetadata tableMetadata) {
return tableMetadata.partitionStatisticsFiles().stream()
.map(PartitionStatisticsFile::path)
.collect(Collectors.toSet());
}
}

0 comments on commit 85cf56f

Please sign in to comment.