Skip to content

Commit

Permalink
Use SupportsPrefixOperations for Remove OrphanFile Procedure on Spark…
Browse files Browse the repository at this point in the history
… 3.5
  • Loading branch information
Ismail Simsek committed Jan 4, 2025
1 parent 42d3885 commit 6267e48
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -293,28 +293,37 @@ private Dataset<FileURI> validFileIdentDS() {

private Dataset<FileURI> actualFileIdentDS() {
StringToFileURI toFileURI = new StringToFileURI(equalSchemes, equalAuthorities);
Dataset<String> dataList;
if (compareToFileList == null) {
return toFileURI.apply(listedFileDS());
dataList =
table.io() instanceof SupportsPrefixOperations ? listWithPrefix() : listWithoutPrefix();
} else {
return toFileURI.apply(filteredCompareToFileList());
dataList = filteredCompareToFileList();
}

return toFileURI.apply(dataList);
}

private Dataset<String> listWithPrefix() {
@VisibleForTesting
Dataset<String> listWithPrefix() {
List<String> matchingFiles = Lists.newArrayList();
PathFilter pathFilter = PartitionAwareHiddenPathFilter.forSpecs(table.specs());

Iterator<org.apache.iceberg.io.FileInfo> iterator =
((SupportsPrefixOperations) table.io()).listPrefix(location).iterator();
while (iterator.hasNext()) {
org.apache.iceberg.io.FileInfo fileInfo = iterator.next();
if (fileInfo.createdAtMillis() < olderThanTimestamp) {
if (fileInfo.createdAtMillis() < olderThanTimestamp
&& pathFilter.accept(new Path(fileInfo.location()))) {
matchingFiles.add(fileInfo.location());
}
}
JavaRDD<String> matchingFileRDD = sparkContext().parallelize(matchingFiles, 1);
return spark().createDataset(matchingFileRDD.rdd(), Encoders.STRING());
}

private Dataset<String> listWithoutPrefix() {
@VisibleForTesting
Dataset<String> listWithoutPrefix() {
List<String> subDirs = Lists.newArrayList();
List<String> matchingFiles = Lists.newArrayList();

Expand Down Expand Up @@ -349,14 +358,6 @@ private Dataset<String> listWithoutPrefix() {
return spark().createDataset(completeMatchingFileRDD.rdd(), Encoders.STRING());
}

private Dataset<String> listedFileDS() {
if (table.io() instanceof SupportsPrefixOperations) {
return listWithPrefix();
} else {
return listWithoutPrefix();
}
}

private static void listDirRecursively(
String dir,
Predicate<FileStatus> predicate,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -575,9 +575,12 @@ public void testHiddenPartitionPathsWithPartitionEvolution() {
waitUntilAfter(System.currentTimeMillis());

SparkActions actions = SparkActions.get();
DeleteOrphanFilesSparkAction action =
actions.deleteOrphanFiles(table).olderThan(System.currentTimeMillis());
// test list methods by directly instantiating the action
assertThatDatasetsAreEqualIgnoringOrder(action.listWithPrefix(), action.listWithoutPrefix());

DeleteOrphanFiles.Result result =
actions.deleteOrphanFiles(table).olderThan(System.currentTimeMillis()).execute();
DeleteOrphanFiles.Result result = action.execute();

assertThat(result.orphanFileLocations()).as("Should delete 2 files").hasSize(2);
}
Expand Down Expand Up @@ -610,9 +613,12 @@ public void testHiddenPathsStartingWithPartitionNamesAreIgnored() throws IOExcep
waitUntilAfter(System.currentTimeMillis());

SparkActions actions = SparkActions.get();
DeleteOrphanFilesSparkAction action =
actions.deleteOrphanFiles(table).olderThan(System.currentTimeMillis());
// test list methods by directly instantiating the action
assertThatDatasetsAreEqualIgnoringOrder(action.listWithPrefix(), action.listWithoutPrefix());

DeleteOrphanFiles.Result result =
actions.deleteOrphanFiles(table).olderThan(System.currentTimeMillis()).execute();
DeleteOrphanFiles.Result result = action.execute();

assertThat(result.orphanFileLocations()).as("Should delete 0 files").isEmpty();
assertThat(fs.exists(pathToFileInHiddenFolder)).isTrue();
Expand Down Expand Up @@ -675,12 +681,10 @@ public void testRemoveOrphanFilesWithRelativeFilePath() throws IOException {
waitUntilAfter(System.currentTimeMillis());

SparkActions actions = SparkActions.get();
DeleteOrphanFiles.Result result =
actions
.deleteOrphanFiles(table)
.olderThan(System.currentTimeMillis())
.deleteWith(s -> {})
.execute();
DeleteOrphanFilesSparkAction action =
actions.deleteOrphanFiles(table).olderThan(System.currentTimeMillis()).deleteWith(s -> {});
assertThatDatasetsAreEqualIgnoringOrder(action.listWithPrefix(), action.listWithoutPrefix());
DeleteOrphanFiles.Result result = action.execute();
assertThat(result.orphanFileLocations())
.as("Action should find 1 file")
.isEqualTo(invalidFiles);
Expand Down Expand Up @@ -713,8 +717,11 @@ public void testRemoveOrphanFilesWithHadoopCatalog() throws InterruptedException

table.refresh();

DeleteOrphanFiles.Result result =
SparkActions.get().deleteOrphanFiles(table).olderThan(System.currentTimeMillis()).execute();
DeleteOrphanFilesSparkAction action =
SparkActions.get().deleteOrphanFiles(table).olderThan(System.currentTimeMillis());
assertThatDatasetsAreEqualIgnoringOrder(action.listWithPrefix(), action.listWithoutPrefix());

DeleteOrphanFiles.Result result = action.execute();

assertThat(result.orphanFileLocations()).as("Should delete only 1 file").hasSize(1);

Expand Down Expand Up @@ -854,12 +861,14 @@ public void testCompareToFileList() throws IOException {
.as("Invalid file should be present")
.isTrue();

DeleteOrphanFiles.Result result3 =
DeleteOrphanFilesSparkAction action3 =
actions
.deleteOrphanFiles(table)
.compareToFileList(compareToFileList)
.olderThan(System.currentTimeMillis())
.execute();
.olderThan(System.currentTimeMillis());
assertThatDatasetsAreEqualIgnoringOrder(action3.listWithPrefix(), action3.listWithoutPrefix());

DeleteOrphanFiles.Result result3 = action3.execute();
assertThat(result3.orphanFileLocations())
.as("Action should delete 1 file")
.isEqualTo(invalidFilePaths);
Expand All @@ -885,12 +894,14 @@ public void testCompareToFileList() throws IOException {
.withColumnRenamed("filePath", "file_path")
.withColumnRenamed("lastModified", "last_modified");

DeleteOrphanFiles.Result result4 =
DeleteOrphanFilesSparkAction action4 =
actions
.deleteOrphanFiles(table)
.compareToFileList(compareToFileListWithOutsideLocation)
.deleteWith(s -> {})
.execute();
.deleteWith(s -> {});
assertThatDatasetsAreEqualIgnoringOrder(action4.listWithPrefix(), action4.listWithoutPrefix());

DeleteOrphanFiles.Result result4 = action4.execute();
assertThat(result4.orphanFileLocations()).as("Action should find nothing").isEmpty();
}

Expand Down Expand Up @@ -1100,4 +1111,10 @@ private void executeTest(
spark, toFileUri.apply(actualFileDS), toFileUri.apply(validFileDS), mode);
assertThat(orphanFiles).isEqualTo(expectedOrphanFiles);
}

private void assertThatDatasetsAreEqualIgnoringOrder(Dataset<String> actual, Dataset<String> expected) {
assertThat(actual.collectAsList())
.as("same as")
.containsExactlyInAnyOrderElementsOf(expected.collectAsList());
}
}

0 comments on commit 6267e48

Please sign in to comment.