Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement] Refactor Cleanup Task Handler #516

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
public enum AsyncTaskType {
ENTITY_CLEANUP_SCHEDULER(1),
MANIFEST_FILE_CLEANUP(2),
METADATA_FILE_BATCH_CLEANUP(3);
BATCH_FILE_CLEANUP(3);

private final int typeCode;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@
import org.apache.polaris.service.persistence.InMemoryPolarisMetaStoreManagerFactory;
import org.apache.polaris.service.persistence.cache.EntityCacheFactory;
import org.apache.polaris.service.ratelimiter.RateLimiterFilter;
import org.apache.polaris.service.task.BatchFileCleanupTaskHandler;
import org.apache.polaris.service.task.ManifestFileCleanupTaskHandler;
import org.apache.polaris.service.task.TableCleanupTaskHandler;
import org.apache.polaris.service.task.TaskExecutor;
Expand Down Expand Up @@ -336,6 +337,9 @@ protected void configure() {
taskExecutor.addTaskHandler(
new ManifestFileCleanupTaskHandler(
fileIOSupplier, Executors.newVirtualThreadPerTaskExecutor()));
taskExecutor.addTaskHandler(
new BatchFileCleanupTaskHandler(
fileIOSupplier, Executors.newVirtualThreadPerTaskExecutor()));

bind(taskExecutor).to(TaskExecutor.class);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.polaris.service.task;

import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.io.FileIO;
import org.apache.polaris.core.entity.AsyncTaskType;
import org.apache.polaris.core.entity.TaskEntity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* {@link BatchFileCleanupTaskHandler} responsible for batch file cleanup by processing multiple
* file deletions in a single task handler. Valid files are deleted asynchronously with retries for
* transient errors, while missing files are logged and skipped.
*/
public class BatchFileCleanupTaskHandler extends FileCleanupTaskHandler {

private static final Logger LOGGER = LoggerFactory.getLogger(BatchFileCleanupTaskHandler.class);

public BatchFileCleanupTaskHandler(
Function<TaskEntity, FileIO> fileIOSupplier, ExecutorService executorService) {
super(fileIOSupplier, executorService);
}

@Override
public boolean canHandleTask(TaskEntity task) {
return task.getTaskType() == AsyncTaskType.BATCH_FILE_CLEANUP;
}

@Override
public boolean handleTask(TaskEntity task) {
BatchFileCleanupTask cleanupTask = task.readData(BatchFileCleanupTask.class);
TableIdentifier tableId = cleanupTask.getTableId();
List<String> batchFiles = cleanupTask.getBatchFiles();
try (FileIO authorizedFileIO = fileIOSupplier.apply(task)) {
List<String> validFiles =
batchFiles.stream().filter(file -> TaskUtils.exists(file, authorizedFileIO)).toList();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here and in line 69 are exists-checks performed for each individual file, which is inefficient (slow) and also incurs unnecessary cost for cloud object storages.

I'd prefer an approach that
a) leverages object-storage batch requests
b) does not perform the same operation more than once

if (validFiles.isEmpty()) {
LOGGER
.atWarn()
.addKeyValue("batchFiles", batchFiles.toString())
.addKeyValue("tableId", tableId)
.log("File batch cleanup task scheduled, but the none of the file in batch exists");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
.log("File batch cleanup task scheduled, but the none of the file in batch exists");
.log("File batch cleanup task scheduled, but none of the files in batch exists");

return true;
}
if (validFiles.size() < batchFiles.size()) {
List<String> missingFiles =
batchFiles.stream().filter(file -> !TaskUtils.exists(file, authorizedFileIO)).toList();
LOGGER
.atWarn()
.addKeyValue("batchFiles", batchFiles.toString())
.addKeyValue("missingFiles", missingFiles.toString())
.addKeyValue("tableId", tableId)
.log(
"File batch cleanup task scheduled, but {} files in the batch are missing",
missingFiles.size());
}

// Schedule the deletion for each file asynchronously
List<CompletableFuture<Void>> deleteFutures =
validFiles.stream()
.map(file -> super.tryDelete(tableId, authorizedFileIO, null, file, null, 1))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, I'd have expected that this class actually leverages batched deletes of the underlying object store, but seems it does not.

.toList();

try {
// Wait for all delete operations to finish
CompletableFuture<Void> allDeletes =
CompletableFuture.allOf(deleteFutures.toArray(new CompletableFuture[0]));
allDeletes.join();
} catch (Exception e) {
LOGGER.error("Exception detected during batch files deletion", e);
return false;
Copy link
Contributor

@adutra adutra Dec 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Imho this a bit too coarse-grained. The likelihood of having one failed delete will grow bigger with the number of files to delete. Shouldn't we return a more meaningful result, e.g. a statistics object reporting how many files were deleted, and how many failed? Not saying you should change this now though, because returning a boolean is imposed by TaskHandler API, so we need to look into improving the API itself.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That’s a great point! Since this is the first time we’re introducing a batch files handler, it would be helpful to discuss how to refine the TaskHandler API for future scenarios. @collado-mike, do you have any suggestions or thoughts on this?

}

return true;
}
}

@Override
public Logger getLogger() {
return LOGGER;
}

public static final class BatchFileCleanupTask {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be a record?

private TableIdentifier tableId;
private List<String> batchFiles;

public BatchFileCleanupTask(TableIdentifier tableId, List<String> batchFiles) {
this.tableId = tableId;
this.batchFiles = batchFiles;
}

public BatchFileCleanupTask() {}

public TableIdentifier getTableId() {
return tableId;
}

public void setTableId(TableIdentifier tableId) {
this.tableId = tableId;
}

public List<String> getBatchFiles() {
return batchFiles;
}

public void setBatchFiles(List<String> batchFiles) {
this.batchFiles = batchFiles;
}

@Override
public boolean equals(Object object) {
if (this == object) return true;
if (!(object instanceof BatchFileCleanupTaskHandler.BatchFileCleanupTask that)) return false;
return Objects.equals(tableId, that.tableId) && Objects.equals(batchFiles, that.batchFiles);
}

@Override
public int hashCode() {
return Objects.hash(tableId, batchFiles);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.polaris.service.task;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.io.FileIO;
import org.apache.polaris.core.entity.TaskEntity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* {@link FileCleanupTaskHandler} responsible for cleaning up files in table tasks. Handles retries
* for file deletions and skips files that are already missing. Subclasses must implement
* task-specific logic.
*/
public class FileCleanupTaskHandler implements TaskHandler {

public static final int MAX_ATTEMPTS = 3;
public static final int FILE_DELETION_RETRY_MILLIS = 100;
public final Function<TaskEntity, FileIO> fileIOSupplier;
public final ExecutorService executorService;
private static final Logger LOGGER =
LoggerFactory.getLogger(ManifestFileCleanupTaskHandler.class);

public FileCleanupTaskHandler(
Function<TaskEntity, FileIO> fileIOSupplier, ExecutorService executorService) {
this.fileIOSupplier = fileIOSupplier;
this.executorService = executorService;
}

@Override
public boolean canHandleTask(TaskEntity task) {
throw new UnsupportedOperationException("This method must be implemented by subclasses.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use an abstract class then?

}

@Override
public boolean handleTask(TaskEntity task) {
throw new UnsupportedOperationException("This method must be implemented by subclasses.");
}

public Logger getLogger() {
return LOGGER;
}

public CompletableFuture<Void> tryDelete(
TableIdentifier tableId,
FileIO fileIO,
String baseFile,
String file,
Throwable e,
int attempt) {
if (e != null && attempt <= MAX_ATTEMPTS) {
getLogger()
.atWarn()
.addKeyValue("file", file)
.addKeyValue("attempt", attempt)
.addKeyValue("error", e.getMessage())
.log("Error encountered attempting to delete file");
}
if (attempt > MAX_ATTEMPTS && e != null) {
return CompletableFuture.failedFuture(e);
}
return CompletableFuture.runAsync(
() -> {
// totally normal for a file to already be missing, e.g. a data file
// may be in multiple manifests. There's a possibility we check the
// file's existence, but then it is deleted before we have a chance to
// send the delete request. In such a case, we <i>should</i> retry
// and find
if (TaskUtils.exists(file, fileIO)) {
fileIO.deleteFile(file);
} else {
getLogger()
.atInfo()
.addKeyValue("file", file)
.addKeyValue("baseFile", baseFile != null ? baseFile : "")
.addKeyValue("tableId", tableId)
.log("table file cleanup task scheduled, but data file doesn't exist");
}
},
executorService)
.exceptionallyComposeAsync(
newEx -> {
getLogger()
.atWarn()
.addKeyValue("file", file)
.addKeyValue("tableIdentifier", tableId)
.addKeyValue("baseFile", baseFile != null ? baseFile : "")
.log("Exception caught deleting data file", newEx);
return tryDelete(tableId, fileIO, baseFile, file, newEx, attempt + 1);
},
CompletableFuture.delayedExecutor(
FILE_DELETION_RETRY_MILLIS, TimeUnit.MILLISECONDS, executorService));
}
}
Loading