From d3e75e03d146c9d74991db1a28b23fb6649ba308 Mon Sep 17 00:00:00 2001 From: ajantha-bhat Date: Tue, 12 Dec 2023 14:03:03 +0530 Subject: [PATCH] Spark-3.5: Ensure that partition stats files are considered for GC procedures --- .../org/apache/iceberg/ReachableFileUtil.java | 41 +++++++++- .../spark/extensions/ProcedureUtil.java | 54 +++++++++++++ .../TestExpireSnapshotsProcedure.java | 76 ++++++++++++++----- .../TestRemoveOrphanFilesProcedure.java | 70 +++++++++++++++++ .../spark/actions/BaseSparkAction.java | 12 +-- 5 files changed, 221 insertions(+), 32 deletions(-) create mode 100644 spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/ProcedureUtil.java diff --git a/core/src/main/java/org/apache/iceberg/ReachableFileUtil.java b/core/src/main/java/org/apache/iceberg/ReachableFileUtil.java index bd23a221ab22..bc5f48a8d3f8 100644 --- a/core/src/main/java/org/apache/iceberg/ReachableFileUtil.java +++ b/core/src/main/java/org/apache/iceberg/ReachableFileUtil.java @@ -133,13 +133,13 @@ public static List manifestListLocations(Table table, Set snapshot } /** - * Returns locations of statistics files in a table. + * Returns locations of all statistics files in a table. * * @param table table for which statistics files needs to be listed * @return the location of statistics files */ public static List statisticsFilesLocations(Table table) { - return statisticsFilesLocations(table, statisticsFile -> true); + return statisticsFilesLocationsForSnapshots(table, null); } /** @@ -148,7 +148,9 @@ public static List statisticsFilesLocations(Table table) { * @param table table for which statistics files needs to be listed * @param predicate predicate for filtering the statistics files * @return the location of statistics files + * @deprecated use the {@code statisticsFilesLocationsForSnapshots(table, snapshotIds)} instead. */ + @Deprecated public static List statisticsFilesLocations( Table table, Predicate predicate) { return table.statisticsFiles().stream() @@ -156,4 +158,39 @@ public static List statisticsFilesLocations( .map(StatisticsFile::path) .collect(Collectors.toList()); } + + /** + * Returns locations of all statistics files for a table matching the given snapshot IDs. + * + * @param table table for which statistics files needs to be listed + * @param snapshotIds ids of snapshots for which statistics files will be returned. If null, + * statistics files for all the snapshots will be returned. + * @return the location of statistics files + */ + public static List statisticsFilesLocationsForSnapshots( + Table table, Set snapshotIds) { + List statsFileLocations = Lists.newArrayList(); + Predicate statsFilePredicate; + Predicate partitionStatsFilePredicate; + if (snapshotIds == null) { + statsFilePredicate = statisticsFile -> true; + partitionStatsFilePredicate = partitionStatisticsFile -> true; + } else { + statsFilePredicate = statisticsFile -> snapshotIds.contains(statisticsFile.snapshotId()); + partitionStatsFilePredicate = + partitionStatisticsFile -> snapshotIds.contains(partitionStatisticsFile.snapshotId()); + } + + table.statisticsFiles().stream() + .filter(statsFilePredicate) + .map(StatisticsFile::path) + .forEach(statsFileLocations::add); + + table.partitionStatisticsFiles().stream() + .filter(partitionStatsFilePredicate) + .map(PartitionStatisticsFile::path) + .forEach(statsFileLocations::add); + + return statsFileLocations; + } } diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/ProcedureUtil.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/ProcedureUtil.java new file mode 100644 index 000000000000..de4acd74a7ed --- /dev/null +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/ProcedureUtil.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.extensions; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.UUID; +import org.apache.iceberg.ImmutableGenericPartitionStatisticsFile; +import org.apache.iceberg.PartitionStatisticsFile; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.PositionOutputStream; + +public class ProcedureUtil { + + private ProcedureUtil() {} + + static PartitionStatisticsFile writePartitionStatsFile( + long snapshotId, String statsLocation, FileIO fileIO) { + PositionOutputStream positionOutputStream; + try { + positionOutputStream = fileIO.newOutputFile(statsLocation).create(); + positionOutputStream.close(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + + return ImmutableGenericPartitionStatisticsFile.builder() + .snapshotId(snapshotId) + .fileSizeInBytes(42L) + .path(statsLocation) + .build(); + } + + static String statsFileLocation(String tableLocation) { + String statsFileName = "stats-file-" + UUID.randomUUID(); + return tableLocation.replaceFirst("file:", "") + "/metadata/" + statsFileName; + } +} diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java index c4b93c6d6a44..7dacce5487d6 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java @@ -28,13 +28,12 @@ import java.time.Instant; import java.util.List; import java.util.Map; -import java.util.UUID; -import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.iceberg.GenericBlobMetadata; import org.apache.iceberg.GenericStatisticsFile; +import org.apache.iceberg.PartitionStatisticsFile; import org.apache.iceberg.Snapshot; import org.apache.iceberg.StatisticsFile; import org.apache.iceberg.Table; @@ -445,7 +444,7 @@ public void testExpireSnapshotsWithStatisticFiles() throws Exception { sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", tableName); sql("INSERT INTO TABLE %s VALUES (10, 'abc')", tableName); Table table = validationCatalog.loadTable(tableIdent); - String statsFileLocation1 = statsFileLocation(table.location()); + String statsFileLocation1 = ProcedureUtil.statsFileLocation(table.location()); StatisticsFile statisticsFile1 = writeStatsFile( table.currentSnapshot().snapshotId(), @@ -456,7 +455,7 @@ public void testExpireSnapshotsWithStatisticFiles() throws Exception { sql("INSERT INTO %s SELECT 20, 'def'", tableName); table.refresh(); - String statsFileLocation2 = statsFileLocation(table.location()); + String statsFileLocation2 = ProcedureUtil.statsFileLocation(table.location()); StatisticsFile statisticsFile2 = writeStatsFile( table.currentSnapshot().snapshotId(), @@ -475,18 +474,9 @@ public void testExpireSnapshotsWithStatisticFiles() throws Exception { Assertions.assertThat(output.get(0)[5]).as("should be 1 deleted statistics file").isEqualTo(1L); table.refresh(); - List statsWithSnapshotId1 = - table.statisticsFiles().stream() - .filter(statisticsFile -> statisticsFile.snapshotId() == statisticsFile1.snapshotId()) - .collect(Collectors.toList()); - Assertions.assertThat(statsWithSnapshotId1) - .as( - "Statistics file entry in TableMetadata should be deleted for the snapshot %s", - statisticsFile1.snapshotId()) - .isEmpty(); Assertions.assertThat(table.statisticsFiles()) .as( - "Statistics file entry in TableMetadata should be present for the snapshot %s", + "Statistics file entry in TableMetadata should be present only for the snapshot %s", statisticsFile2.snapshotId()) .extracting(StatisticsFile::snapshotId) .containsExactly(statisticsFile2.snapshotId()); @@ -500,7 +490,58 @@ public void testExpireSnapshotsWithStatisticFiles() throws Exception { .exists(); } - private StatisticsFile writeStatsFile( + @Test + public void testExpireSnapshotsWithPartitionStatisticFiles() { + sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", tableName); + sql("INSERT INTO TABLE %s VALUES (10, 'abc')", tableName); + Table table = validationCatalog.loadTable(tableIdent); + String partitionStatsFileLocation1 = ProcedureUtil.statsFileLocation(table.location()); + PartitionStatisticsFile partitionStatisticsFile1 = + ProcedureUtil.writePartitionStatsFile( + table.currentSnapshot().snapshotId(), partitionStatsFileLocation1, table.io()); + table.updatePartitionStatistics().setPartitionStatistics(partitionStatisticsFile1).commit(); + + sql("INSERT INTO %s SELECT 20, 'def'", tableName); + table.refresh(); + String partitionStatsFileLocation2 = ProcedureUtil.statsFileLocation(table.location()); + PartitionStatisticsFile partitionStatisticsFile2 = + ProcedureUtil.writePartitionStatsFile( + table.currentSnapshot().snapshotId(), partitionStatsFileLocation2, table.io()); + table.updatePartitionStatistics().setPartitionStatistics(partitionStatisticsFile2).commit(); + + waitUntilAfter(table.currentSnapshot().timestampMillis()); + + Timestamp currentTimestamp = Timestamp.from(Instant.ofEpochMilli(System.currentTimeMillis())); + List output = + sql( + "CALL %s.system.expire_snapshots(older_than => TIMESTAMP '%s',table => '%s')", + catalogName, currentTimestamp, tableIdent); + Assertions.assertThat(output.get(0)[5]) + .as("should be 1 deleted partition statistics file") + .isEqualTo(1L); + + table.refresh(); + Assertions.assertThat(table.partitionStatisticsFiles()) + .as( + "partition statistics file entry in TableMetadata should be present only for the snapshot %s", + partitionStatisticsFile2.snapshotId()) + .extracting(PartitionStatisticsFile::snapshotId) + .containsExactly(partitionStatisticsFile2.snapshotId()); + + Assertions.assertThat(new File(partitionStatsFileLocation1)) + .as( + "partition statistics file should not exist for snapshot %s", + partitionStatisticsFile1.snapshotId()) + .doesNotExist(); + + Assertions.assertThat(new File(partitionStatsFileLocation2)) + .as( + "partition statistics file should exist for snapshot %s", + partitionStatisticsFile2.snapshotId()) + .exists(); + } + + private static StatisticsFile writeStatsFile( long snapshotId, long snapshotSequenceNumber, String statsLocation, FileIO fileIO) throws IOException { try (PuffinWriter puffinWriter = Puffin.write(fileIO.newOutputFile(statsLocation)).build()) { @@ -523,9 +564,4 @@ private StatisticsFile writeStatsFile( .collect(ImmutableList.toImmutableList())); } } - - private String statsFileLocation(String tableLocation) { - String statsFileName = "stats-file-" + UUID.randomUUID(); - return tableLocation.replaceFirst("file:", "") + "/metadata/" + statsFileName; - } } diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java index b29981775076..40adf30c37e4 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java @@ -41,6 +41,7 @@ import org.apache.iceberg.GenericStatisticsFile; import org.apache.iceberg.ManifestFile; import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.PartitionStatisticsFile; import org.apache.iceberg.ReachableFileUtil; import org.apache.iceberg.StatisticsFile; import org.apache.iceberg.Table; @@ -530,6 +531,75 @@ public void testRemoveOrphanFilesWithStatisticFiles() throws Exception { Assertions.assertThat(statsLocation.exists()).as("stats file should be deleted").isFalse(); } + @Test + public void testRemoveOrphanFilesWithPartitionStatisticFiles() throws Exception { + sql( + "CREATE TABLE %s USING iceberg " + + "TBLPROPERTIES('format-version'='2') " + + "AS SELECT 10 int, 'abc' data", + tableName); + Table table = Spark3Util.loadIcebergTable(spark, tableName); + + String partitionStatsLocation = ProcedureUtil.statsFileLocation(table.location()); + PartitionStatisticsFile partitionStatisticsFile = + ProcedureUtil.writePartitionStatsFile( + table.currentSnapshot().snapshotId(), partitionStatsLocation, table.io()); + + commitPartitionStatsTxn(table, partitionStatisticsFile); + + // wait to ensure files are old enough + waitUntilAfter(System.currentTimeMillis()); + Timestamp currentTimestamp = Timestamp.from(Instant.ofEpochMilli(System.currentTimeMillis())); + + List output = + sql( + "CALL %s.system.remove_orphan_files(" + + "table => '%s'," + + "older_than => TIMESTAMP '%s')", + catalogName, tableIdent, currentTimestamp); + Assertions.assertThat(output).as("Should be no orphan files").isEmpty(); + + Assertions.assertThat(new File(partitionStatsLocation)) + .as("partition stats file should exist") + .exists(); + + removePartitionStatsTxn(table, partitionStatisticsFile); + + output = + sql( + "CALL %s.system.remove_orphan_files(" + + "table => '%s'," + + "older_than => TIMESTAMP '%s')", + catalogName, tableIdent, currentTimestamp); + Assertions.assertThat(output).as("Should be orphan files").hasSize(1); + Assertions.assertThat(Iterables.getOnlyElement(output)) + .as("Deleted files") + .containsExactly("file:" + partitionStatsLocation); + Assertions.assertThat(new File(partitionStatsLocation)) + .as("partition stats file should be deleted") + .doesNotExist(); + } + + private static void removePartitionStatsTxn( + Table table, PartitionStatisticsFile partitionStatisticsFile) { + Transaction transaction = table.newTransaction(); + transaction + .updatePartitionStatistics() + .removePartitionStatistics(partitionStatisticsFile.snapshotId()) + .commit(); + transaction.commitTransaction(); + } + + private static void commitPartitionStatsTxn( + Table table, PartitionStatisticsFile partitionStatisticsFile) { + Transaction transaction = table.newTransaction(); + transaction + .updatePartitionStatistics() + .setPartitionStatistics(partitionStatisticsFile) + .commit(); + transaction.commitTransaction(); + } + @Test public void testRemoveOrphanFilesProcedureWithPrefixMode() throws NoSuchTableException, ParseException, IOException { diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java index d0e71a707db9..53ce7418f3ec 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java @@ -31,7 +31,6 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; -import java.util.function.Predicate; import java.util.function.Supplier; import org.apache.iceberg.AllManifestsTable; import org.apache.iceberg.BaseTable; @@ -44,7 +43,6 @@ import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.ReachableFileUtil; import org.apache.iceberg.StaticTableOperations; -import org.apache.iceberg.StatisticsFile; import org.apache.iceberg.Table; import org.apache.iceberg.TableMetadata; import org.apache.iceberg.exceptions.NotFoundException; @@ -196,14 +194,8 @@ protected Dataset manifestListDS(Table table, Set snapshotIds) { } protected Dataset statisticsFileDS(Table table, Set snapshotIds) { - Predicate predicate; - if (snapshotIds == null) { - predicate = statisticsFile -> true; - } else { - predicate = statisticsFile -> snapshotIds.contains(statisticsFile.snapshotId()); - } - - List statisticsFiles = ReachableFileUtil.statisticsFilesLocations(table, predicate); + List statisticsFiles = + ReachableFileUtil.statisticsFilesLocationsForSnapshots(table, snapshotIds); return toFileInfoDS(statisticsFiles, STATISTICS_FILES); }