Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async Deletion of Previous Metadata and Statistics Files #312

Merged
merged 37 commits into from
Dec 6, 2024
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
d0cd456
delete manifest, manifest list, prev files, stats when drop table wit…
danielhumanmod Sep 22, 2024
26e03ac
unit test for drop table
danielhumanmod Sep 22, 2024
0f8e8f4
refine warning code
danielhumanmod Sep 22, 2024
1b525de
code format
danielhumanmod Sep 22, 2024
e8b26d2
refine warning code
danielhumanmod Sep 22, 2024
2ee6dee
remove unused code
danielhumanmod Sep 22, 2024
806f46d
remove unused import
danielhumanmod Sep 22, 2024
4f1d3c9
code format
danielhumanmod Sep 23, 2024
0a77bfa
remove additional manifest and manifest list deletion
danielhumanmod Sep 24, 2024
47dc60a
add stat deletion test
danielhumanmod Sep 24, 2024
9d835b3
code format
danielhumanmod Sep 24, 2024
40c6147
add new AsyncTaskType
danielhumanmod Oct 6, 2024
f354d1c
Schedule prev metadata and stat files deletion in seperated tasks
danielhumanmod Oct 6, 2024
88c6651
Table content cleanup task handler
danielhumanmod Oct 6, 2024
af3efab
Unit test for table clean up
danielhumanmod Oct 6, 2024
278ab7e
code format
danielhumanmod Oct 6, 2024
ed30fb0
register task handler
danielhumanmod Oct 7, 2024
05c3dd9
handler table content files in batch
danielhumanmod Oct 7, 2024
49dbe68
adjust unit test after batch processing
danielhumanmod Oct 7, 2024
8eea50d
add unit test for TableContentCleanupTaskHandler
danielhumanmod Oct 7, 2024
d9804e6
code format
danielhumanmod Oct 7, 2024
54511de
Merge branch 'main' into pr-289
danielhumanmod Oct 17, 2024
56ba4f2
Merge branch 'main' into pr-289
danielhumanmod Oct 26, 2024
e92852e
Merge branch 'main' into pr-289
danielhumanmod Nov 4, 2024
27ea1b3
merge cleanup tasks into one
danielhumanmod Nov 4, 2024
4d1b68b
Merge remote-tracking branch 'origin/pr-289' into pr-289
danielhumanmod Nov 4, 2024
eb533d7
code format
danielhumanmod Nov 4, 2024
47f760f
Merge branch 'main' into pr-289
flyrain Nov 9, 2024
988e530
refactor manifest cleanup handler based on comments
danielhumanmod Nov 14, 2024
4965d5c
refactor table cleanup handler based on comments
danielhumanmod Nov 14, 2024
5f81483
add TODO
danielhumanmod Nov 14, 2024
097189c
Merge branch 'pr-289' of https://github.com/danielhumanmod/polaris in…
danielhumanmod Nov 14, 2024
651ece0
Merge branch 'main' into pr-289
danielhumanmod Nov 14, 2024
16bb5fe
renaming
danielhumanmod Nov 14, 2024
d276ae6
split the task type in cleanup task handler
danielhumanmod Nov 20, 2024
187b47e
error handling
danielhumanmod Nov 20, 2024
ba5c47c
Merge branch 'main' into pr-289
danielhumanmod Nov 23, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@

public enum AsyncTaskType {
ENTITY_CLEANUP_SCHEDULER(1),
FILE_CLEANUP(2);
FILE_CLEANUP(2),
TABLE_CONTENT_CLEANUP(3);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The FILE_CLEANUP type was poorly named. We should make it specific, as only the ManifestFileCleanupTaskHandler deals with it. That said, the actual serialized value is only the integer (see https://github.com/collado-mike/polaris/blob/8cb6b44bf57dc597dab612d109d3eb534aef5715/polaris-core/src/main/java/org/apache/polaris/core/entity/AsyncTaskType.java#L34-L37 ), so we can rename it and maintain backward compatibility.

With that in mind, I think we should rename FILE_CLEANUP -> MANIFEST_FILE_CLEANUP and add two enums for the new file types: METADATA_LOG_ENTRY_CLEANUP and STATISTICS_FILE_CLEANUP. Your task handler can look for instances of both types. I think that gives us flexibility in the future if we need to handle the different file types differently.


private final int typeCode;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@
import org.apache.polaris.service.storage.PolarisStorageIntegrationProviderImpl;
import org.apache.polaris.service.task.ManifestFileCleanupTaskHandler;
import org.apache.polaris.service.task.TableCleanupTaskHandler;
import org.apache.polaris.service.task.TableContentCleanupTaskHandler;
import org.apache.polaris.service.task.TaskExecutorImpl;
import org.apache.polaris.service.task.TaskFileIOSupplier;
import org.apache.polaris.service.throttling.StreamReadConstraintsExceptionMapper;
Expand Down Expand Up @@ -217,6 +218,9 @@ public void run(PolarisApplicationConfig configuration, Environment environment)
taskExecutor.addTaskHandler(
new ManifestFileCleanupTaskHandler(
fileIOSupplier, Executors.newVirtualThreadPerTaskExecutor()));
taskExecutor.addTaskHandler(
new TableContentCleanupTaskHandler(
fileIOSupplier, Executors.newVirtualThreadPerTaskExecutor()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's worth considering whether these ought to share the same executor service... I haven't put much thought into it, so... we can just leave a comment for now if there's not a strong argument either way.


LOGGER.info(
"Initializing PolarisCallContextCatalogFactory for metaStoreManagerType {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.StatisticsFile;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableMetadataParser;
import org.apache.iceberg.io.FileIO;
Expand All @@ -49,6 +51,7 @@ public class TableCleanupTaskHandler implements TaskHandler {
private final TaskExecutor taskExecutor;
private final MetaStoreManagerFactory metaStoreManagerFactory;
private final Function<TaskEntity, FileIO> fileIOSupplier;
private static final int BATCH_SIZE = 10;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tentatively set the BATCH_SIZE as 10, please feel free to let me know if there is a better option

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use the PolarisConfigurationStore to control this value


public TableCleanupTaskHandler(
TaskExecutor taskExecutor,
Expand Down Expand Up @@ -158,11 +161,98 @@ public boolean handleTask(TaskEntity cleanupTask) {
for (PolarisBaseEntity createdTask : createdTasks) {
taskExecutor.addTaskHandlerContext(createdTask.getId(), CallContext.getCurrentContext());
}

// Schedule and dispatch prev metadata and stat files in seperated tasks
scheduleTableContentCleanupTask(
tableMetadata.previousFiles().stream().map(TableMetadata.MetadataLogEntry::file),
CleanupTableContentFileType.PREV_METADATA,
fileIO,
cleanupTask,
metaStoreManager,
polarisCallContext);
scheduleTableContentCleanupTask(
tableMetadata.statisticsFiles().stream().map(StatisticsFile::path),
CleanupTableContentFileType.STATISTICS,
fileIO,
cleanupTask,
metaStoreManager,
polarisCallContext);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could merge these two method calls. How about collecting the file first, then dispatching them into tasks for deletion? We don't have to distinguish file types while collecting them.


fileIO.deleteFile(tableEntity.getMetadataLocation());

return true;
}
}
return false;
}

Copy link
Contributor Author

@danielhumanmod danielhumanmod Nov 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extract previous manifest task creation into a new method, no new change added for line 152 - 201

private void scheduleTableContentCleanupTask(
Stream<String> fileStream,
CleanupTableContentFileType fileType,
FileIO fileIO,
TaskEntity cleanupTask,
PolarisMetaStoreManager metaStoreManager,
PolarisCallContext polarisCallContext) {
PolarisBaseEntity entity = cleanupTask.readData(PolarisBaseEntity.class);
TableLikeEntity tableEntity = TableLikeEntity.of(entity);

List<String> validFiles = fileStream.filter(file -> TaskUtils.exists(file, fileIO)).toList();

for (int i = 0; i < validFiles.size(); i += BATCH_SIZE) {
List<String> fileBatch = validFiles.subList(i, Math.min(i + BATCH_SIZE, validFiles.size()));
String taskName = cleanupTask.getName() + "_batch" + i + "_" + UUID.randomUUID();
LOGGER
.atDebug()
.addKeyValue("taskName", taskName)
.addKeyValue("tableIdentifier", tableEntity.getTableIdentifier())
.addKeyValue("fileBatch", fileBatch.toString())
.log("Queueing task to delete a batch of " + fileType.getTypeName());

TaskEntity batchTask =
new TaskEntity.Builder()
.setName(taskName)
.setId(metaStoreManager.generateNewEntityId(polarisCallContext).getId())
.setCreateTimestamp(polarisCallContext.getClock().millis())
.withTaskType(AsyncTaskType.TABLE_CONTENT_CLEANUP)
.withData(
new TableContentCleanupTaskHandler.TableContentCleanupTask(
tableEntity.getTableIdentifier(), fileBatch))
.setInternalProperties(cleanupTask.getInternalPropertiesAsMap())
.build();

List<PolarisBaseEntity> createdTasks =
metaStoreManager
.createEntitiesIfNotExist(polarisCallContext, null, List.of(batchTask))
.getEntities();

if (createdTasks != null) {
LOGGER
.atInfo()
.addKeyValue("tableIdentifier", tableEntity.getTableIdentifier())
.addKeyValue("taskCount", createdTasks.size())
.addKeyValue("fileBatch", fileBatch.toString())
.log("Successfully queued task to delete a batch of " + fileType.getTypeName() + "s");

for (PolarisBaseEntity createdTask : createdTasks) {
taskExecutor.addTaskHandlerContext(createdTask.getId(), CallContext.getCurrentContext());
}
}
}
}

private enum CleanupTableContentFileType {
PREV_METADATA("previous metadata file"),
STATISTICS("statistics file"),
;

private final String typeName;

CleanupTableContentFileType(String typeName) {
this.typeName = typeName;
}

public String getTypeName() {
return typeName;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.polaris.service.task;

import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.io.FileIO;
import org.apache.polaris.core.entity.AsyncTaskType;
import org.apache.polaris.core.entity.TaskEntity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* {@link TaskHandler} responsible for deleting previous metadata and statistics files of a table.
*/
public class TableContentCleanupTaskHandler implements TaskHandler {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rename this to something like BatchFileCleanupTaskHandler and make the javadoc more generic. We don't really need to know what kind of files these are, as we treat all of them the same (unlike the ManifestFileCleanupTaskHandler, which has to read the manifests).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This new class is largely same as class ManifestFileCleanupTaskHandler. Can we try to merge it with ManifestFileCleanupTaskHandler?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oof, sorry for missing this comment - I don't think we should have merged these two classes. At most, I think a common base class would be fine, but I'd prefer to avoid overloading the same class and cluttering it with if/else statements

public static final int MAX_ATTEMPTS = 3;
public static final int FILE_DELETION_RETRY_MILLIS = 100;
private static final Logger LOGGER =
LoggerFactory.getLogger(TableContentCleanupTaskHandler.class);
private final Function<TaskEntity, FileIO> fileIOSupplier;
private final ExecutorService executorService;

public TableContentCleanupTaskHandler(
Function<TaskEntity, FileIO> fileIOSupplier, ExecutorService executorService) {
this.fileIOSupplier = fileIOSupplier;
this.executorService = executorService;
}

@Override
public boolean canHandleTask(TaskEntity task) {
return task.getTaskType() == AsyncTaskType.TABLE_CONTENT_CLEANUP;
}

@Override
public boolean handleTask(TaskEntity task) {
TableContentCleanupTask cleanupTask = task.readData(TableContentCleanupTask.class);
List<String> fileBatch = cleanupTask.getFileBatch();
TableIdentifier tableId = cleanupTask.getTableId();
try (FileIO authorizedFileIO = fileIOSupplier.apply(task)) {
List<String> validFiles =
fileBatch.stream().filter(file -> TaskUtils.exists(file, authorizedFileIO)).toList();
if (validFiles.isEmpty()) {
LOGGER
.atWarn()
.addKeyValue("taskName", task.getName())
.addKeyValue("fileBatch", fileBatch.toString())
.addKeyValue("tableId", tableId)
.log("Table content cleanup task scheduled, but the none of the file in batch exists");
return true;
}

// Schedule the deletion for each file asynchronously
List<CompletableFuture<Void>> deleteFutures =
validFiles.stream()
.map(file -> tryDelete(tableId, authorizedFileIO, file, null, 1))
.toList();

// Wait for all delete operations to finish
CompletableFuture<Void> allDeletes =
CompletableFuture.allOf(deleteFutures.toArray(new CompletableFuture[0]));
allDeletes.join();

LOGGER
.atInfo()
.addKeyValue("taskName", task.getName())
.addKeyValue("fileBatch", fileBatch.toString())
.addKeyValue("tableId", tableId)
.log("All the files in task have been deleted");

return true;
} catch (Exception e) {
LOGGER.error("Error during table content cleanup for file batch {}", fileBatch.toString(), e);
return false;
}
}

private CompletableFuture<Void> tryDelete(
TableIdentifier tableId, FileIO fileIO, String filePath, Throwable e, int attempt) {
if (e != null && attempt <= MAX_ATTEMPTS) {
LOGGER
.atWarn()
.addKeyValue("filePath", filePath)
.addKeyValue("attempt", attempt)
.addKeyValue("error", e.getMessage())
.log("Error encountered attempting to delete file");
}

if (attempt > MAX_ATTEMPTS && e != null) {
return CompletableFuture.failedFuture(e);
}

return CompletableFuture.runAsync(
() -> {
if (TaskUtils.exists(filePath, fileIO)) {
fileIO.deleteFile(filePath);
LOGGER
.atInfo()
.addKeyValue("filePath", filePath)
.addKeyValue("tableId", tableId)
.addKeyValue("attempt", attempt)
.log("Successfully deleted file {}", filePath);
} else {
LOGGER
.atInfo()
.addKeyValue("filePath", filePath)
.addKeyValue("tableId", tableId)
.log("File doesn't exist, likely already deleted");
}
},
executorService)
.exceptionallyComposeAsync(
newEx -> {
LOGGER
.atWarn()
.addKeyValue("filePath", filePath)
.addKeyValue("tableId", tableId)
.log("Exception caught deleting table content file", newEx);
return tryDelete(tableId, fileIO, filePath, newEx, attempt + 1);
},
CompletableFuture.delayedExecutor(
FILE_DELETION_RETRY_MILLIS, TimeUnit.MILLISECONDS, executorService));
}

public static final class TableContentCleanupTask {
private TableIdentifier tableId;
private List<String> fileBatch;

public TableContentCleanupTask() {}

public TableContentCleanupTask(TableIdentifier tableId, List<String> fileBatch) {
this.tableId = tableId;
this.fileBatch = fileBatch;
}

public TableIdentifier getTableId() {
return tableId;
}

public void setTableId(TableIdentifier tableId) {
this.tableId = tableId;
}

public List<String> getFileBatch() {
return fileBatch;
}

public void setFileBatch(List<String> fileBatch) {
this.fileBatch = fileBatch;
}

@Override
public boolean equals(Object object) {
if (this == object) {
return true;
}
if (!(object instanceof TableContentCleanupTask other)) {
return false;
}
return Objects.equals(tableId, other.tableId) && Objects.equals(fileBatch, other.fileBatch);
}

@Override
public int hashCode() {
return Objects.hash(tableId, fileBatch.toString());
}
}
}
Loading