Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async Deletion of Previous Metadata and Statistics Files #312

Merged
merged 37 commits into from
Dec 6, 2024
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
d0cd456
delete manifest, manifest list, prev files, stats when drop table wit…
danielhumanmod Sep 22, 2024
26e03ac
unit test for drop table
danielhumanmod Sep 22, 2024
0f8e8f4
refine warning code
danielhumanmod Sep 22, 2024
1b525de
code format
danielhumanmod Sep 22, 2024
e8b26d2
refine warning code
danielhumanmod Sep 22, 2024
2ee6dee
remove unused code
danielhumanmod Sep 22, 2024
806f46d
remove unused import
danielhumanmod Sep 22, 2024
4f1d3c9
code format
danielhumanmod Sep 23, 2024
0a77bfa
remove additional manifest and manifest list deletion
danielhumanmod Sep 24, 2024
47dc60a
add stat deletion test
danielhumanmod Sep 24, 2024
9d835b3
code format
danielhumanmod Sep 24, 2024
40c6147
add new AsyncTaskType
danielhumanmod Oct 6, 2024
f354d1c
Schedule prev metadata and stat files deletion in seperated tasks
danielhumanmod Oct 6, 2024
88c6651
Table content cleanup task handler
danielhumanmod Oct 6, 2024
af3efab
Unit test for table clean up
danielhumanmod Oct 6, 2024
278ab7e
code format
danielhumanmod Oct 6, 2024
ed30fb0
register task handler
danielhumanmod Oct 7, 2024
05c3dd9
handler table content files in batch
danielhumanmod Oct 7, 2024
49dbe68
adjust unit test after batch processing
danielhumanmod Oct 7, 2024
8eea50d
add unit test for TableContentCleanupTaskHandler
danielhumanmod Oct 7, 2024
d9804e6
code format
danielhumanmod Oct 7, 2024
54511de
Merge branch 'main' into pr-289
danielhumanmod Oct 17, 2024
56ba4f2
Merge branch 'main' into pr-289
danielhumanmod Oct 26, 2024
e92852e
Merge branch 'main' into pr-289
danielhumanmod Nov 4, 2024
27ea1b3
merge cleanup tasks into one
danielhumanmod Nov 4, 2024
4d1b68b
Merge remote-tracking branch 'origin/pr-289' into pr-289
danielhumanmod Nov 4, 2024
eb533d7
code format
danielhumanmod Nov 4, 2024
47f760f
Merge branch 'main' into pr-289
flyrain Nov 9, 2024
988e530
refactor manifest cleanup handler based on comments
danielhumanmod Nov 14, 2024
4965d5c
refactor table cleanup handler based on comments
danielhumanmod Nov 14, 2024
5f81483
add TODO
danielhumanmod Nov 14, 2024
097189c
Merge branch 'pr-289' of https://github.com/danielhumanmod/polaris in…
danielhumanmod Nov 14, 2024
651ece0
Merge branch 'main' into pr-289
danielhumanmod Nov 14, 2024
16bb5fe
renaming
danielhumanmod Nov 14, 2024
d276ae6
split the task type in cleanup task handler
danielhumanmod Nov 20, 2024
187b47e
error handling
danielhumanmod Nov 20, 2024
ba5c47c
Merge branch 'main' into pr-289
danielhumanmod Nov 23, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.StatisticsFile;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableMetadataParser;
import org.apache.iceberg.io.FileIO;
Expand Down Expand Up @@ -158,6 +159,13 @@ public boolean handleTask(TaskEntity cleanupTask) {
for (PolarisBaseEntity createdTask : createdTasks) {
taskExecutor.addTaskHandlerContext(createdTask.getId(), CallContext.getCurrentContext());
}

tableMetadata.previousFiles().stream()
.map(TableMetadata.MetadataLogEntry::file)
.forEach(fileIO::deleteFile);
tableMetadata.statisticsFiles().stream()
.map(StatisticsFile::path)
.forEach(fileIO::deleteFile);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should either add retries for these or submit them as separate tasks. As is, if one of these files fails to delete, we'll retry the whole task and resubmit a new task for each manifest. If the manifests are already deleted when we retry, we'll get stuck in a retry loop.

Copy link
Contributor Author

@danielhumanmod danielhumanmod Sep 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion, Michael! I could refactor the deletion process by adding retry strategy, similar to the tryDelete in ManifestFileCleanupTaskHandler.

Personally, I think using a retry mechanism here might be more effective than creating separate tasks. With separate tasks,, there’s still a risk of issues like task creation failures, which could result in skipping the entire task (which contains multiple files). By using retries within the ManifestFileCleanupTaskHandler, we can manage failure handling at the file level, ensuring that each file is retried independently. This way, if a file deletion fails, we can retry just that file without needing to resubmit or skip the others files. This approach can offers more granular control over handling failures.

I’m open to your thoughts on this! Does this seem aligned with what you were suggesting, or do you see potential advantages in the separate task approach that I might be overlooking?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, do you mean adding retry within the TableCleanupTaskHandler.java? If so, that's fine with me. I'd at least use an Executor to attempt the execution in parallel (i.e., so file2 isn't blocked during the delay between retries for file1).

TBH, I'm not very familiar with how Iceberg generates the statistic files. Does there tend to be one per snapshot? one per data file? If the latter, we could be talking about a very large number of files. If that's the case, I think submitting separate tasks with batches of files makes sense.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there only one statistics file per snapshot? The spec is not clear:

A table can contain many statistics files associated with different table snapshots.

Unlike the partition statistics file, which is very clear:

Each table snapshot may be associated with at most one partition statistics file.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are per snapshots, https://iceberg.apache.org/spec/#table-statistics.

Could be multiple statistics files as the following code shows:

https://github.com/apache/iceberg/blob/5439cbdb278232779fdd9a392bbf57f007f9bda0/core/src/main/java/org/apache/iceberg/TableMetadata.java#L535-L535

Thank you for providing the context, @flyrain :)

Sorry, do you mean adding retry within the TableCleanupTaskHandler.java? If so, that's fine with me. I'd at least use an Executor to attempt the execution in parallel (i.e., so file2 isn't blocked during the delay between retries for file1).

TBH, I'm not very familiar with how Iceberg generates the statistic files. Does there tend to be one per snapshot? one per data file? If the latter, we could be talking about a very large number of files. If that's the case, I think submitting separate tasks with batches of files makes sense.

@collado-mike Yes, if we go for retries, the logic will be within TableCleanupTaskHandler.java. But considering the information provided by Yufei, maybe separate tasks will be a more appropriate approach for stats file deletion?

fileIO.deleteFile(tableEntity.getMetadataLocation());

return true;
Expand Down
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test cases in this file primarily verify theses scenarios:

  1. Cleanup of a single metadata file with a single snapshot
  2. Cleanup of a single metadata file across multiple snapshots
  3. Cleanup of multiple metadata files across multiple snapshots (New)

Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,30 @@
import static org.assertj.core.api.Assertions.assertThat;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.commons.codec.binary.Base64;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.GenericBlobMetadata;
import org.apache.iceberg.GenericStatisticsFile;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.StatisticsFile;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableMetadataParser;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.inmemory.InMemoryFileIO;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.puffin.Blob;
import org.apache.iceberg.puffin.Puffin;
import org.apache.iceberg.puffin.PuffinWriter;
import org.apache.polaris.core.PolarisCallContext;
import org.apache.polaris.core.PolarisDefaultDiagServiceImpl;
import org.apache.polaris.core.context.CallContext;
Expand All @@ -43,6 +58,7 @@
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.slf4j.LoggerFactory;

Expand All @@ -64,7 +80,7 @@ public void testTableCleanup() throws IOException {
new PolarisDefaultDiagServiceImpl());
try (CallContext callCtx = CallContext.of(realmContext, polarisCallContext)) {
CallContext.setCurrentContext(callCtx);
FileIO fileIO = new InMemoryFileIO();
FileIO fileIO = createMockFileIO(new InMemoryFileIO());
TableIdentifier tableIdentifier =
TableIdentifier.of(Namespace.of("db1", "schema1"), "table1");
TableCleanupTaskHandler handler =
Expand All @@ -76,7 +92,13 @@ public void testTableCleanup() throws IOException {
TestSnapshot snapshot =
TaskTestUtils.newSnapshot(fileIO, "manifestList.avro", 1, snapshotId, 99L, manifestFile);
String metadataFile = "v1-49494949.metadata.json";
TaskTestUtils.writeTableMetadata(fileIO, metadataFile, snapshot);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Cleanup of a single metadata file with a single snapshot

StatisticsFile statisticsFile =
writeStatsFile(
snapshot.snapshotId(),
snapshot.sequenceNumber(),
"/metadata/" + UUID.randomUUID() + ".stats",
fileIO);
TaskTestUtils.writeTableMetadata(fileIO, metadataFile, List.of(statisticsFile), snapshot);

TaskEntity task =
new TaskEntity.Builder()
Expand All @@ -91,6 +113,15 @@ public void testTableCleanup() throws IOException {
.build();
Assertions.assertThatPredicate(handler::canHandleTask).accepts(task);

PolarisBaseEntity baseEntity = task.readData(PolarisBaseEntity.class);
TableLikeEntity tableEntity = TableLikeEntity.of(baseEntity);
TableMetadata tableMetadata =
TableMetadataParser.read(fileIO, tableEntity.getMetadataLocation());
Set<String> metadataLocations = metadataLocations(tableMetadata);
Set<String> statsLocation = statsLocations(tableMetadata);
assertThat(metadataLocations).hasSize(1);
assertThat(statsLocation).hasSize(1);

CallContext.setCurrentContext(CallContext.of(realmContext, polarisCallContext));
handler.handleTask(task);

Expand All @@ -113,6 +144,18 @@ public void testTableCleanup() throws IOException {
entity ->
entity.readData(
ManifestFileCleanupTaskHandler.ManifestCleanupTask.class)));

ArgumentCaptor<String> argumentCaptor = ArgumentCaptor.forClass(String.class);
Mockito.verify(fileIO, Mockito.times(metadataLocations.size() + statsLocation.size()))
.deleteFile(argumentCaptor.capture());

List<String> deletedPaths = argumentCaptor.getAllValues();
assertThat(deletedPaths)
.as("should contain all created metadata locations")
.containsAll(metadataLocations);
assertThat(deletedPaths)
.as("should contain all created stats locations")
.containsAll(statsLocation);
}
}

Expand Down Expand Up @@ -275,7 +318,7 @@ public void testTableCleanupMultipleSnapshots() throws IOException {
new PolarisDefaultDiagServiceImpl());
try (CallContext callCtx = CallContext.of(realmContext, polarisCallContext)) {
CallContext.setCurrentContext(callCtx);
FileIO fileIO = new InMemoryFileIO();
FileIO fileIO = createMockFileIO(new InMemoryFileIO());
TableIdentifier tableIdentifier =
TableIdentifier.of(Namespace.of("db1", "schema1"), "table1");
TableCleanupTaskHandler handler =
Expand Down Expand Up @@ -303,7 +346,20 @@ public void testTableCleanupMultipleSnapshots() throws IOException {
manifestFile1,
manifestFile3); // exclude manifest2 from the new snapshot
String metadataFile = "v1-295495059.metadata.json";
TaskTestUtils.writeTableMetadata(fileIO, metadataFile, snapshot, snapshot2);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Cleanup of a single metadata file across multiple snapshots

StatisticsFile statisticsFile1 =
writeStatsFile(
snapshot.snapshotId(),
snapshot.sequenceNumber(),
"/metadata/" + UUID.randomUUID() + ".stats",
fileIO);
StatisticsFile statisticsFile2 =
writeStatsFile(
snapshot2.snapshotId(),
snapshot2.sequenceNumber(),
"/metadata/" + UUID.randomUUID() + ".stats",
fileIO);
TaskTestUtils.writeTableMetadata(
fileIO, metadataFile, List.of(statisticsFile1, statisticsFile2), snapshot, snapshot2);

TaskEntity task =
new TaskEntity.Builder()
Expand All @@ -318,6 +374,16 @@ public void testTableCleanupMultipleSnapshots() throws IOException {
Assertions.assertThatPredicate(handler::canHandleTask).accepts(task);

CallContext.setCurrentContext(CallContext.of(realmContext, polarisCallContext));

PolarisBaseEntity baseEntity = task.readData(PolarisBaseEntity.class);
TableLikeEntity tableEntity = TableLikeEntity.of(baseEntity);
TableMetadata tableMetadata =
TableMetadataParser.read(fileIO, tableEntity.getMetadataLocation());
Set<String> metadataLocations = metadataLocations(tableMetadata);
Set<String> statsLocations = statsLocations(tableMetadata);
assertThat(metadataLocations).hasSize(1);
assertThat(statsLocations).hasSize(2);

handler.handleTask(task);

assertThat(
Expand Down Expand Up @@ -362,6 +428,76 @@ public void testTableCleanupMultipleSnapshots() throws IOException {
entity ->
entity.readData(
ManifestFileCleanupTaskHandler.ManifestCleanupTask.class)));

ArgumentCaptor<String> argumentCaptor = ArgumentCaptor.forClass(String.class);
Mockito.verify(fileIO, Mockito.times(metadataLocations.size() + statsLocations.size()))
.deleteFile(argumentCaptor.capture());

List<String> deletedPaths = argumentCaptor.getAllValues();
assertThat(deletedPaths)
.as("should contain all created metadata locations")
.containsAll(metadataLocations);
assertThat(deletedPaths)
.as("should contain all created stats locations")
.containsAll(statsLocations);
}
}

private FileIO createMockFileIO(InMemoryFileIO wrapped) {
InMemoryFileIO mockIO = Mockito.mock(InMemoryFileIO.class);

Mockito.when(mockIO.newInputFile(Mockito.anyString()))
.thenAnswer(invocation -> wrapped.newInputFile((String) invocation.getArgument(0)));
Mockito.when(mockIO.newInputFile(Mockito.anyString(), Mockito.anyLong()))
.thenAnswer(
invocation ->
wrapped.newInputFile(invocation.getArgument(0), invocation.getArgument(1)));
Mockito.when(mockIO.newInputFile(Mockito.any(ManifestFile.class)))
.thenAnswer(invocation -> wrapped.newInputFile((ManifestFile) invocation.getArgument(0)));
Mockito.when(mockIO.newInputFile(Mockito.any(DataFile.class)))
.thenAnswer(invocation -> wrapped.newInputFile((DataFile) invocation.getArgument(0)));
Mockito.when(mockIO.newInputFile(Mockito.any(DeleteFile.class)))
.thenAnswer(invocation -> wrapped.newInputFile((DeleteFile) invocation.getArgument(0)));
Mockito.when(mockIO.newOutputFile(Mockito.anyString()))
.thenAnswer(invocation -> wrapped.newOutputFile(invocation.getArgument(0)));

return mockIO;
}

private Set<String> metadataLocations(TableMetadata tableMetadata) {
Set<String> metadataLocations =
tableMetadata.previousFiles().stream()
.map(TableMetadata.MetadataLogEntry::file)
.collect(Collectors.toSet());
metadataLocations.add(tableMetadata.metadataFileLocation());
return metadataLocations;
}

private Set<String> statsLocations(TableMetadata tableMetadata) {
return tableMetadata.statisticsFiles().stream()
.map(StatisticsFile::path)
.collect(Collectors.toSet());
}

private StatisticsFile writeStatsFile(
long snapshotId, long snapshotSequenceNumber, String statsLocation, FileIO fileIO)
throws IOException {
try (PuffinWriter puffinWriter = Puffin.write(fileIO.newOutputFile(statsLocation)).build()) {
puffinWriter.add(
new Blob(
"some-blob-type",
List.of(1),
snapshotId,
snapshotSequenceNumber,
ByteBuffer.wrap("blob content".getBytes(StandardCharsets.UTF_8))));
puffinWriter.finish();

return new GenericStatisticsFile(
snapshotId,
statsLocation,
puffinWriter.fileSize(),
puffinWriter.footerSize(),
puffinWriter.writtenBlobsMetadata().stream().map(GenericBlobMetadata::from).toList());
}
}
}
Copy link
Contributor Author

@danielhumanmod danielhumanmod Nov 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The changes in this file primarily attach previous metadata and statistics filewhen writing metadata

Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.StatisticsFile;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableMetadataParser;
import org.apache.iceberg.avro.Avro;
Expand Down Expand Up @@ -64,7 +65,16 @@ static ManifestFile manifestFile(

static void writeTableMetadata(FileIO fileIO, String metadataFile, Snapshot... snapshots)
throws IOException {
TableMetadata.Builder tmBuidler =
writeTableMetadata(fileIO, metadataFile, null, snapshots);
}

static void writeTableMetadata(
FileIO fileIO,
String metadataFile,
List<StatisticsFile> statisticsFiles,
Snapshot... snapshots)
throws IOException {
TableMetadata.Builder tmBuilder =
TableMetadata.buildFromEmpty()
.setLocation("path/to/table")
.addSchema(
Expand All @@ -74,10 +84,15 @@ static void writeTableMetadata(FileIO fileIO, String metadataFile, Snapshot... s
.addSortOrder(SortOrder.unsorted())
.assignUUID(UUID.randomUUID().toString())
.addPartitionSpec(PartitionSpec.unpartitioned());

int statisticsFileIndex = 0;
for (Snapshot snapshot : snapshots) {
tmBuidler.addSnapshot(snapshot);
tmBuilder.addSnapshot(snapshot);
if (statisticsFiles != null) {
tmBuilder.setStatistics(snapshot.snapshotId(), statisticsFiles.get(statisticsFileIndex++));
}
}
TableMetadata tableMetadata = tmBuidler.build();
TableMetadata tableMetadata = tmBuilder.build();
PositionOutputStream out = fileIO.newOutputFile(metadataFile).createOrOverwrite();
out.write(TableMetadataParser.toJson(tableMetadata).getBytes(StandardCharsets.UTF_8));
out.close();
Expand Down