-
Notifications
You must be signed in to change notification settings - Fork 156
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Enhancement] Refactor Cleanup Task Handler #516
base: main
Are you sure you want to change the base?
Changes from 3 commits
6cb99c1
12e55cd
b898b40
12eeabb
a1e38b7
c029ed1
5797758
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -0,0 +1,145 @@ | ||||||
/* | ||||||
* Licensed to the Apache Software Foundation (ASF) under one | ||||||
* or more contributor license agreements. See the NOTICE file | ||||||
* distributed with this work for additional information | ||||||
* regarding copyright ownership. The ASF licenses this file | ||||||
* to you under the Apache License, Version 2.0 (the | ||||||
* "License"); you may not use this file except in compliance | ||||||
* with the License. You may obtain a copy of the License at | ||||||
* | ||||||
* http://www.apache.org/licenses/LICENSE-2.0 | ||||||
* | ||||||
* Unless required by applicable law or agreed to in writing, | ||||||
* software distributed under the License is distributed on an | ||||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||||||
* KIND, either express or implied. See the License for the | ||||||
* specific language governing permissions and limitations | ||||||
* under the License. | ||||||
*/ | ||||||
package org.apache.polaris.service.task; | ||||||
|
||||||
import java.util.List; | ||||||
import java.util.Objects; | ||||||
import java.util.concurrent.CompletableFuture; | ||||||
import java.util.concurrent.ExecutorService; | ||||||
import java.util.function.Function; | ||||||
import org.apache.iceberg.catalog.TableIdentifier; | ||||||
import org.apache.iceberg.io.FileIO; | ||||||
import org.apache.polaris.core.entity.AsyncTaskType; | ||||||
import org.apache.polaris.core.entity.TaskEntity; | ||||||
import org.slf4j.Logger; | ||||||
import org.slf4j.LoggerFactory; | ||||||
|
||||||
/** | ||||||
* {@link BatchFileCleanupTaskHandler} responsible for batch file cleanup by processing multiple | ||||||
* file deletions in a single task handler. Valid files are deleted asynchronously with retries for | ||||||
* transient errors, while missing files are logged and skipped. | ||||||
*/ | ||||||
public class BatchFileCleanupTaskHandler extends FileCleanupTaskHandler { | ||||||
|
||||||
private static final Logger LOGGER = LoggerFactory.getLogger(BatchFileCleanupTaskHandler.class); | ||||||
|
||||||
public BatchFileCleanupTaskHandler( | ||||||
Function<TaskEntity, FileIO> fileIOSupplier, ExecutorService executorService) { | ||||||
super(fileIOSupplier, executorService); | ||||||
} | ||||||
|
||||||
@Override | ||||||
public boolean canHandleTask(TaskEntity task) { | ||||||
return task.getTaskType() == AsyncTaskType.BATCH_FILE_CLEANUP; | ||||||
} | ||||||
|
||||||
@Override | ||||||
public boolean handleTask(TaskEntity task) { | ||||||
BatchFileCleanupTask cleanupTask = task.readData(BatchFileCleanupTask.class); | ||||||
TableIdentifier tableId = cleanupTask.getTableId(); | ||||||
List<String> batchFiles = cleanupTask.getBatchFiles(); | ||||||
try (FileIO authorizedFileIO = fileIOSupplier.apply(task)) { | ||||||
List<String> validFiles = | ||||||
batchFiles.stream().filter(file -> TaskUtils.exists(file, authorizedFileIO)).toList(); | ||||||
if (validFiles.isEmpty()) { | ||||||
LOGGER | ||||||
.atWarn() | ||||||
.addKeyValue("batchFiles", batchFiles.toString()) | ||||||
.addKeyValue("tableId", tableId) | ||||||
.log("File batch cleanup task scheduled, but the none of the file in batch exists"); | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
return true; | ||||||
} | ||||||
if (validFiles.size() < batchFiles.size()) { | ||||||
List<String> missingFiles = | ||||||
batchFiles.stream().filter(file -> !TaskUtils.exists(file, authorizedFileIO)).toList(); | ||||||
LOGGER | ||||||
.atWarn() | ||||||
.addKeyValue("batchFiles", batchFiles.toString()) | ||||||
.addKeyValue("missingFiles", missingFiles.toString()) | ||||||
.addKeyValue("tableId", tableId) | ||||||
.log( | ||||||
"File batch cleanup task scheduled, but {} files in the batch are missing", | ||||||
missingFiles.size()); | ||||||
} | ||||||
|
||||||
// Schedule the deletion for each file asynchronously | ||||||
List<CompletableFuture<Void>> deleteFutures = | ||||||
validFiles.stream() | ||||||
.map(file -> super.tryDelete(tableId, authorizedFileIO, null, file, null, 1)) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hm, I'd have expected that this class actually leverages batched deletes of the underlying object store, but seems it does not. |
||||||
.toList(); | ||||||
|
||||||
try { | ||||||
// Wait for all delete operations to finish | ||||||
CompletableFuture<Void> allDeletes = | ||||||
CompletableFuture.allOf(deleteFutures.toArray(new CompletableFuture[0])); | ||||||
allDeletes.join(); | ||||||
} catch (Exception e) { | ||||||
LOGGER.error("Exception detected during batch files deletion", e); | ||||||
return false; | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Imho this a bit too coarse-grained. The likelihood of having one failed delete will grow bigger with the number of files to delete. Shouldn't we return a more meaningful result, e.g. a statistics object reporting how many files were deleted, and how many failed? Not saying you should change this now though, because returning a boolean is imposed by There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That’s a great point! Since this is the first time we’re introducing a batch files handler, it would be helpful to discuss how to refine the TaskHandler API for future scenarios. @collado-mike, do you have any suggestions or thoughts on this? |
||||||
} | ||||||
|
||||||
return true; | ||||||
} | ||||||
} | ||||||
|
||||||
@Override | ||||||
public Logger getLogger() { | ||||||
return LOGGER; | ||||||
} | ||||||
|
||||||
public static final class BatchFileCleanupTask { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can this be a |
||||||
private TableIdentifier tableId; | ||||||
private List<String> batchFiles; | ||||||
|
||||||
public BatchFileCleanupTask(TableIdentifier tableId, List<String> batchFiles) { | ||||||
this.tableId = tableId; | ||||||
this.batchFiles = batchFiles; | ||||||
} | ||||||
|
||||||
public BatchFileCleanupTask() {} | ||||||
|
||||||
public TableIdentifier getTableId() { | ||||||
return tableId; | ||||||
} | ||||||
|
||||||
public void setTableId(TableIdentifier tableId) { | ||||||
this.tableId = tableId; | ||||||
} | ||||||
|
||||||
public List<String> getBatchFiles() { | ||||||
return batchFiles; | ||||||
} | ||||||
|
||||||
public void setBatchFiles(List<String> batchFiles) { | ||||||
this.batchFiles = batchFiles; | ||||||
} | ||||||
|
||||||
@Override | ||||||
public boolean equals(Object object) { | ||||||
if (this == object) return true; | ||||||
if (!(object instanceof BatchFileCleanupTaskHandler.BatchFileCleanupTask that)) return false; | ||||||
return Objects.equals(tableId, that.tableId) && Objects.equals(batchFiles, that.batchFiles); | ||||||
} | ||||||
|
||||||
@Override | ||||||
public int hashCode() { | ||||||
return Objects.hash(tableId, batchFiles); | ||||||
} | ||||||
} | ||||||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,115 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
package org.apache.polaris.service.task; | ||
|
||
import java.util.concurrent.CompletableFuture; | ||
import java.util.concurrent.ExecutorService; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.function.Function; | ||
import org.apache.iceberg.catalog.TableIdentifier; | ||
import org.apache.iceberg.io.FileIO; | ||
import org.apache.polaris.core.entity.TaskEntity; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
/** | ||
* {@link FileCleanupTaskHandler} responsible for cleaning up files in table tasks. Handles retries | ||
* for file deletions and skips files that are already missing. Subclasses must implement | ||
* task-specific logic. | ||
*/ | ||
public class FileCleanupTaskHandler implements TaskHandler { | ||
|
||
public static final int MAX_ATTEMPTS = 3; | ||
public static final int FILE_DELETION_RETRY_MILLIS = 100; | ||
public final Function<TaskEntity, FileIO> fileIOSupplier; | ||
public final ExecutorService executorService; | ||
private static final Logger LOGGER = | ||
LoggerFactory.getLogger(ManifestFileCleanupTaskHandler.class); | ||
|
||
public FileCleanupTaskHandler( | ||
Function<TaskEntity, FileIO> fileIOSupplier, ExecutorService executorService) { | ||
this.fileIOSupplier = fileIOSupplier; | ||
this.executorService = executorService; | ||
} | ||
|
||
@Override | ||
public boolean canHandleTask(TaskEntity task) { | ||
throw new UnsupportedOperationException("This method must be implemented by subclasses."); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why not use an abstract class then? |
||
} | ||
|
||
@Override | ||
public boolean handleTask(TaskEntity task) { | ||
throw new UnsupportedOperationException("This method must be implemented by subclasses."); | ||
} | ||
|
||
public Logger getLogger() { | ||
return LOGGER; | ||
} | ||
|
||
public CompletableFuture<Void> tryDelete( | ||
TableIdentifier tableId, | ||
FileIO fileIO, | ||
String baseFile, | ||
String file, | ||
Throwable e, | ||
int attempt) { | ||
if (e != null && attempt <= MAX_ATTEMPTS) { | ||
getLogger() | ||
.atWarn() | ||
.addKeyValue("file", file) | ||
.addKeyValue("attempt", attempt) | ||
.addKeyValue("error", e.getMessage()) | ||
.log("Error encountered attempting to delete file"); | ||
} | ||
if (attempt > MAX_ATTEMPTS && e != null) { | ||
return CompletableFuture.failedFuture(e); | ||
} | ||
return CompletableFuture.runAsync( | ||
() -> { | ||
// totally normal for a file to already be missing, e.g. a data file | ||
// may be in multiple manifests. There's a possibility we check the | ||
// file's existence, but then it is deleted before we have a chance to | ||
// send the delete request. In such a case, we <i>should</i> retry | ||
// and find | ||
if (TaskUtils.exists(file, fileIO)) { | ||
fileIO.deleteFile(file); | ||
} else { | ||
getLogger() | ||
.atInfo() | ||
.addKeyValue("file", file) | ||
.addKeyValue("baseFile", baseFile != null ? baseFile : "") | ||
.addKeyValue("tableId", tableId) | ||
.log("table file cleanup task scheduled, but data file doesn't exist"); | ||
} | ||
}, | ||
executorService) | ||
.exceptionallyComposeAsync( | ||
newEx -> { | ||
getLogger() | ||
.atWarn() | ||
.addKeyValue("file", file) | ||
.addKeyValue("tableIdentifier", tableId) | ||
.addKeyValue("baseFile", baseFile != null ? baseFile : "") | ||
.log("Exception caught deleting data file", newEx); | ||
return tryDelete(tableId, fileIO, baseFile, file, newEx, attempt + 1); | ||
}, | ||
CompletableFuture.delayedExecutor( | ||
FILE_DELETION_RETRY_MILLIS, TimeUnit.MILLISECONDS, executorService)); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here and in line 69 are exists-checks performed for each individual file, which is inefficient (slow) and also incurs unnecessary cost for cloud object storages.
I'd prefer an approach that
a) leverages object-storage batch requests
b) does not perform the same operation more than once