From b7e3e21bbb53814c3f7418ba55de0c1d5eea57dd Mon Sep 17 00:00:00 2001 From: Ajantha Bhat Date: Fri, 5 Jan 2024 02:57:57 +0530 Subject: [PATCH] Core: Remove partition statistics files during purge table (#9409) --- .../java/org/apache/iceberg/CatalogUtil.java | 5 ++ .../hadoop/TestCatalogUtilDropTable.java | 67 ++++++++++++++++--- 2 files changed, 61 insertions(+), 11 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/CatalogUtil.java b/core/src/main/java/org/apache/iceberg/CatalogUtil.java index 2d16b19fc90d..f9af07c1a443 100644 --- a/core/src/main/java/org/apache/iceberg/CatalogUtil.java +++ b/core/src/main/java/org/apache/iceberg/CatalogUtil.java @@ -122,6 +122,11 @@ public static void dropTableData(FileIO io, TableMetadata metadata) { Iterables.transform(metadata.statisticsFiles(), StatisticsFile::path), "statistics", true); + deleteFiles( + io, + Iterables.transform(metadata.partitionStatisticsFiles(), PartitionStatisticsFile::path), + "partition statistics", + true); deleteFile(io, metadata.metadataFileLocation(), "metadata"); } diff --git a/core/src/test/java/org/apache/iceberg/hadoop/TestCatalogUtilDropTable.java b/core/src/test/java/org/apache/iceberg/hadoop/TestCatalogUtilDropTable.java index b8511bef5281..9cb44b1341cb 100644 --- a/core/src/test/java/org/apache/iceberg/hadoop/TestCatalogUtilDropTable.java +++ b/core/src/test/java/org/apache/iceberg/hadoop/TestCatalogUtilDropTable.java @@ -19,6 +19,7 @@ package org.apache.iceberg.hadoop; import java.io.IOException; +import java.io.UncheckedIOException; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.List; @@ -29,12 +30,15 @@ import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.GenericBlobMetadata; import org.apache.iceberg.GenericStatisticsFile; +import org.apache.iceberg.ImmutableGenericPartitionStatisticsFile; import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.PartitionStatisticsFile; import org.apache.iceberg.Snapshot; import org.apache.iceberg.StatisticsFile; import org.apache.iceberg.TableMetadata; import org.apache.iceberg.TableProperties; import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.PositionOutputStream; import org.apache.iceberg.puffin.Blob; import org.apache.iceberg.puffin.Puffin; import org.apache.iceberg.puffin.PuffinWriter; @@ -60,7 +64,14 @@ public void dropTableDataDeletesExpectedFiles() throws IOException { table.io()); table.updateStatistics().setStatistics(statisticsFile.snapshotId(), statisticsFile).commit(); - TableMetadata tableMetadata = readMetadataVersion(4); + PartitionStatisticsFile partitionStatisticsFile = + writePartitionStatsFile( + table.currentSnapshot().snapshotId(), + tableLocation + "/metadata/" + UUID.randomUUID() + ".stats", + table.io()); + table.updatePartitionStatistics().setPartitionStatistics(partitionStatisticsFile).commit(); + + TableMetadata tableMetadata = readMetadataVersion(5); Set snapshotSet = Sets.newHashSet(table.snapshots()); Set manifestListLocations = manifestListLocations(snapshotSet); @@ -68,9 +79,16 @@ public void dropTableDataDeletesExpectedFiles() throws IOException { Set dataLocations = dataLocations(snapshotSet, table.io()); Set metadataLocations = metadataLocations(tableMetadata); Set statsLocations = statsLocations(tableMetadata); + Set partitionStatsLocations = partitionStatsLocations(tableMetadata); + Assertions.assertThat(manifestListLocations).as("should have 2 manifest lists").hasSize(2); - Assertions.assertThat(metadataLocations).as("should have 4 metadata locations").hasSize(4); - Assertions.assertThat(statsLocations).as("should have 1 stats file").hasSize(1); + Assertions.assertThat(metadataLocations).as("should have 5 metadata locations").hasSize(5); + Assertions.assertThat(statsLocations) + .as("should have 1 stats file") + .containsExactly(statisticsFile.path()); + Assertions.assertThat(partitionStatsLocations) + .as("should have 1 partition stats file") + .containsExactly(partitionStatisticsFile.path()); FileIO fileIO = Mockito.mock(FileIO.class); Mockito.when(fileIO.newInputFile(Mockito.anyString())) @@ -90,7 +108,8 @@ public void dropTableDataDeletesExpectedFiles() throws IOException { + manifestLocations.size() + dataLocations.size() + metadataLocations.size() - + statsLocations.size())) + + statsLocations.size() + + partitionStatsLocations.size())) .deleteFile(argumentCaptor.capture()); List deletedPaths = argumentCaptor.getAllValues(); @@ -107,8 +126,11 @@ public void dropTableDataDeletesExpectedFiles() throws IOException { .as("should contain all created metadata locations") .containsAll(metadataLocations); Assertions.assertThat(deletedPaths) - .as("should contain all created statistic") + .as("should contain all created statistics") .containsAll(statsLocations); + Assertions.assertThat(deletedPaths) + .as("should contain all created partition stats files") + .containsAll(partitionStatsLocations); } @Test @@ -179,25 +201,25 @@ public void shouldNotDropDataFilesIfGcNotEnabled() { .containsAll(metadataLocations); } - private Set manifestListLocations(Set snapshotSet) { + private static Set manifestListLocations(Set snapshotSet) { return snapshotSet.stream().map(Snapshot::manifestListLocation).collect(Collectors.toSet()); } - private Set manifestLocations(Set snapshotSet, FileIO io) { + private static Set manifestLocations(Set snapshotSet, FileIO io) { return snapshotSet.stream() .flatMap(snapshot -> snapshot.allManifests(io).stream()) .map(ManifestFile::path) .collect(Collectors.toSet()); } - private Set dataLocations(Set snapshotSet, FileIO io) { + private static Set dataLocations(Set snapshotSet, FileIO io) { return snapshotSet.stream() .flatMap(snapshot -> StreamSupport.stream(snapshot.addedDataFiles(io).spliterator(), false)) .map(dataFile -> dataFile.path().toString()) .collect(Collectors.toSet()); } - private Set metadataLocations(TableMetadata tableMetadata) { + private static Set metadataLocations(TableMetadata tableMetadata) { Set metadataLocations = tableMetadata.previousFiles().stream() .map(TableMetadata.MetadataLogEntry::file) @@ -206,13 +228,13 @@ private Set metadataLocations(TableMetadata tableMetadata) { return metadataLocations; } - private Set statsLocations(TableMetadata tableMetadata) { + private static Set statsLocations(TableMetadata tableMetadata) { return tableMetadata.statisticsFiles().stream() .map(StatisticsFile::path) .collect(Collectors.toSet()); } - private StatisticsFile writeStatsFile( + private static StatisticsFile writeStatsFile( long snapshotId, long snapshotSequenceNumber, String statsLocation, FileIO fileIO) throws IOException { try (PuffinWriter puffinWriter = Puffin.write(fileIO.newOutputFile(statsLocation)).build()) { @@ -235,4 +257,27 @@ private StatisticsFile writeStatsFile( .collect(ImmutableList.toImmutableList())); } } + + private static PartitionStatisticsFile writePartitionStatsFile( + long snapshotId, String statsLocation, FileIO fileIO) { + PositionOutputStream positionOutputStream; + try { + positionOutputStream = fileIO.newOutputFile(statsLocation).create(); + positionOutputStream.close(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + + return ImmutableGenericPartitionStatisticsFile.builder() + .snapshotId(snapshotId) + .fileSizeInBytes(42L) + .path(statsLocation) + .build(); + } + + private static Set partitionStatsLocations(TableMetadata tableMetadata) { + return tableMetadata.partitionStatisticsFiles().stream() + .map(PartitionStatisticsFile::path) + .collect(Collectors.toSet()); + } }