-
Notifications
You must be signed in to change notification settings - Fork 151
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Async Deletion of Previous Metadata and Statistics Files #312
Changes from 11 commits
d0cd456
26e03ac
0f8e8f4
1b525de
e8b26d2
2ee6dee
806f46d
4f1d3c9
0a77bfa
47dc60a
9d835b3
40c6147
f354d1c
88c6651
af3efab
278ab7e
ed30fb0
05c3dd9
49dbe68
8eea50d
d9804e6
54511de
56ba4f2
e92852e
27ea1b3
4d1b68b
eb533d7
47f760f
988e530
4965d5c
5f81483
097189c
651ece0
16bb5fe
d276ae6
187b47e
ba5c47c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The test cases in this file primarily verify theses scenarios:
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,15 +21,30 @@ | |
import static org.assertj.core.api.Assertions.assertThat; | ||
|
||
import java.io.IOException; | ||
import java.nio.ByteBuffer; | ||
import java.nio.charset.StandardCharsets; | ||
import java.util.List; | ||
import java.util.Set; | ||
import java.util.UUID; | ||
import java.util.stream.Collectors; | ||
import org.apache.commons.codec.binary.Base64; | ||
import org.apache.iceberg.DataFile; | ||
import org.apache.iceberg.DeleteFile; | ||
import org.apache.iceberg.GenericBlobMetadata; | ||
import org.apache.iceberg.GenericStatisticsFile; | ||
import org.apache.iceberg.ManifestFile; | ||
import org.apache.iceberg.ManifestFiles; | ||
import org.apache.iceberg.Snapshot; | ||
import org.apache.iceberg.StatisticsFile; | ||
import org.apache.iceberg.TableMetadata; | ||
import org.apache.iceberg.TableMetadataParser; | ||
import org.apache.iceberg.catalog.Namespace; | ||
import org.apache.iceberg.catalog.TableIdentifier; | ||
import org.apache.iceberg.inmemory.InMemoryFileIO; | ||
import org.apache.iceberg.io.FileIO; | ||
import org.apache.iceberg.puffin.Blob; | ||
import org.apache.iceberg.puffin.Puffin; | ||
import org.apache.iceberg.puffin.PuffinWriter; | ||
import org.apache.polaris.core.PolarisCallContext; | ||
import org.apache.polaris.core.PolarisDefaultDiagServiceImpl; | ||
import org.apache.polaris.core.context.CallContext; | ||
|
@@ -43,6 +58,7 @@ | |
import org.assertj.core.api.Assertions; | ||
import org.junit.jupiter.api.BeforeEach; | ||
import org.junit.jupiter.api.Test; | ||
import org.mockito.ArgumentCaptor; | ||
import org.mockito.Mockito; | ||
import org.slf4j.LoggerFactory; | ||
|
||
|
@@ -64,7 +80,7 @@ public void testTableCleanup() throws IOException { | |
new PolarisDefaultDiagServiceImpl()); | ||
try (CallContext callCtx = CallContext.of(realmContext, polarisCallContext)) { | ||
CallContext.setCurrentContext(callCtx); | ||
FileIO fileIO = new InMemoryFileIO(); | ||
FileIO fileIO = createMockFileIO(new InMemoryFileIO()); | ||
TableIdentifier tableIdentifier = | ||
TableIdentifier.of(Namespace.of("db1", "schema1"), "table1"); | ||
TableCleanupTaskHandler handler = | ||
|
@@ -76,7 +92,13 @@ public void testTableCleanup() throws IOException { | |
TestSnapshot snapshot = | ||
TaskTestUtils.newSnapshot(fileIO, "manifestList.avro", 1, snapshotId, 99L, manifestFile); | ||
String metadataFile = "v1-49494949.metadata.json"; | ||
TaskTestUtils.writeTableMetadata(fileIO, metadataFile, snapshot); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
StatisticsFile statisticsFile = | ||
writeStatsFile( | ||
snapshot.snapshotId(), | ||
snapshot.sequenceNumber(), | ||
"/metadata/" + UUID.randomUUID() + ".stats", | ||
fileIO); | ||
TaskTestUtils.writeTableMetadata(fileIO, metadataFile, List.of(statisticsFile), snapshot); | ||
|
||
TaskEntity task = | ||
new TaskEntity.Builder() | ||
|
@@ -91,6 +113,15 @@ public void testTableCleanup() throws IOException { | |
.build(); | ||
Assertions.assertThatPredicate(handler::canHandleTask).accepts(task); | ||
|
||
PolarisBaseEntity baseEntity = task.readData(PolarisBaseEntity.class); | ||
TableLikeEntity tableEntity = TableLikeEntity.of(baseEntity); | ||
TableMetadata tableMetadata = | ||
TableMetadataParser.read(fileIO, tableEntity.getMetadataLocation()); | ||
Set<String> metadataLocations = metadataLocations(tableMetadata); | ||
Set<String> statsLocation = statsLocations(tableMetadata); | ||
assertThat(metadataLocations).hasSize(1); | ||
assertThat(statsLocation).hasSize(1); | ||
|
||
CallContext.setCurrentContext(CallContext.of(realmContext, polarisCallContext)); | ||
handler.handleTask(task); | ||
|
||
|
@@ -113,6 +144,18 @@ public void testTableCleanup() throws IOException { | |
entity -> | ||
entity.readData( | ||
ManifestFileCleanupTaskHandler.ManifestCleanupTask.class))); | ||
|
||
ArgumentCaptor<String> argumentCaptor = ArgumentCaptor.forClass(String.class); | ||
Mockito.verify(fileIO, Mockito.times(metadataLocations.size() + statsLocation.size())) | ||
.deleteFile(argumentCaptor.capture()); | ||
|
||
List<String> deletedPaths = argumentCaptor.getAllValues(); | ||
assertThat(deletedPaths) | ||
.as("should contain all created metadata locations") | ||
.containsAll(metadataLocations); | ||
assertThat(deletedPaths) | ||
.as("should contain all created stats locations") | ||
.containsAll(statsLocation); | ||
} | ||
} | ||
|
||
|
@@ -275,7 +318,7 @@ public void testTableCleanupMultipleSnapshots() throws IOException { | |
new PolarisDefaultDiagServiceImpl()); | ||
try (CallContext callCtx = CallContext.of(realmContext, polarisCallContext)) { | ||
CallContext.setCurrentContext(callCtx); | ||
FileIO fileIO = new InMemoryFileIO(); | ||
FileIO fileIO = createMockFileIO(new InMemoryFileIO()); | ||
TableIdentifier tableIdentifier = | ||
TableIdentifier.of(Namespace.of("db1", "schema1"), "table1"); | ||
TableCleanupTaskHandler handler = | ||
|
@@ -303,7 +346,20 @@ public void testTableCleanupMultipleSnapshots() throws IOException { | |
manifestFile1, | ||
manifestFile3); // exclude manifest2 from the new snapshot | ||
String metadataFile = "v1-295495059.metadata.json"; | ||
TaskTestUtils.writeTableMetadata(fileIO, metadataFile, snapshot, snapshot2); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
StatisticsFile statisticsFile1 = | ||
writeStatsFile( | ||
snapshot.snapshotId(), | ||
snapshot.sequenceNumber(), | ||
"/metadata/" + UUID.randomUUID() + ".stats", | ||
fileIO); | ||
StatisticsFile statisticsFile2 = | ||
writeStatsFile( | ||
snapshot2.snapshotId(), | ||
snapshot2.sequenceNumber(), | ||
"/metadata/" + UUID.randomUUID() + ".stats", | ||
fileIO); | ||
TaskTestUtils.writeTableMetadata( | ||
fileIO, metadataFile, List.of(statisticsFile1, statisticsFile2), snapshot, snapshot2); | ||
|
||
TaskEntity task = | ||
new TaskEntity.Builder() | ||
|
@@ -318,6 +374,16 @@ public void testTableCleanupMultipleSnapshots() throws IOException { | |
Assertions.assertThatPredicate(handler::canHandleTask).accepts(task); | ||
|
||
CallContext.setCurrentContext(CallContext.of(realmContext, polarisCallContext)); | ||
|
||
PolarisBaseEntity baseEntity = task.readData(PolarisBaseEntity.class); | ||
TableLikeEntity tableEntity = TableLikeEntity.of(baseEntity); | ||
TableMetadata tableMetadata = | ||
TableMetadataParser.read(fileIO, tableEntity.getMetadataLocation()); | ||
Set<String> metadataLocations = metadataLocations(tableMetadata); | ||
Set<String> statsLocations = statsLocations(tableMetadata); | ||
assertThat(metadataLocations).hasSize(1); | ||
assertThat(statsLocations).hasSize(2); | ||
|
||
handler.handleTask(task); | ||
|
||
assertThat( | ||
|
@@ -362,6 +428,76 @@ public void testTableCleanupMultipleSnapshots() throws IOException { | |
entity -> | ||
entity.readData( | ||
ManifestFileCleanupTaskHandler.ManifestCleanupTask.class))); | ||
|
||
ArgumentCaptor<String> argumentCaptor = ArgumentCaptor.forClass(String.class); | ||
Mockito.verify(fileIO, Mockito.times(metadataLocations.size() + statsLocations.size())) | ||
.deleteFile(argumentCaptor.capture()); | ||
|
||
List<String> deletedPaths = argumentCaptor.getAllValues(); | ||
assertThat(deletedPaths) | ||
.as("should contain all created metadata locations") | ||
.containsAll(metadataLocations); | ||
assertThat(deletedPaths) | ||
.as("should contain all created stats locations") | ||
.containsAll(statsLocations); | ||
} | ||
} | ||
|
||
private FileIO createMockFileIO(InMemoryFileIO wrapped) { | ||
InMemoryFileIO mockIO = Mockito.mock(InMemoryFileIO.class); | ||
|
||
Mockito.when(mockIO.newInputFile(Mockito.anyString())) | ||
.thenAnswer(invocation -> wrapped.newInputFile((String) invocation.getArgument(0))); | ||
Mockito.when(mockIO.newInputFile(Mockito.anyString(), Mockito.anyLong())) | ||
.thenAnswer( | ||
invocation -> | ||
wrapped.newInputFile(invocation.getArgument(0), invocation.getArgument(1))); | ||
Mockito.when(mockIO.newInputFile(Mockito.any(ManifestFile.class))) | ||
.thenAnswer(invocation -> wrapped.newInputFile((ManifestFile) invocation.getArgument(0))); | ||
Mockito.when(mockIO.newInputFile(Mockito.any(DataFile.class))) | ||
.thenAnswer(invocation -> wrapped.newInputFile((DataFile) invocation.getArgument(0))); | ||
Mockito.when(mockIO.newInputFile(Mockito.any(DeleteFile.class))) | ||
.thenAnswer(invocation -> wrapped.newInputFile((DeleteFile) invocation.getArgument(0))); | ||
Mockito.when(mockIO.newOutputFile(Mockito.anyString())) | ||
.thenAnswer(invocation -> wrapped.newOutputFile(invocation.getArgument(0))); | ||
|
||
return mockIO; | ||
} | ||
|
||
private Set<String> metadataLocations(TableMetadata tableMetadata) { | ||
Set<String> metadataLocations = | ||
tableMetadata.previousFiles().stream() | ||
.map(TableMetadata.MetadataLogEntry::file) | ||
.collect(Collectors.toSet()); | ||
metadataLocations.add(tableMetadata.metadataFileLocation()); | ||
return metadataLocations; | ||
} | ||
|
||
private Set<String> statsLocations(TableMetadata tableMetadata) { | ||
return tableMetadata.statisticsFiles().stream() | ||
.map(StatisticsFile::path) | ||
.collect(Collectors.toSet()); | ||
} | ||
|
||
private StatisticsFile writeStatsFile( | ||
long snapshotId, long snapshotSequenceNumber, String statsLocation, FileIO fileIO) | ||
throws IOException { | ||
try (PuffinWriter puffinWriter = Puffin.write(fileIO.newOutputFile(statsLocation)).build()) { | ||
puffinWriter.add( | ||
new Blob( | ||
"some-blob-type", | ||
List.of(1), | ||
snapshotId, | ||
snapshotSequenceNumber, | ||
ByteBuffer.wrap("blob content".getBytes(StandardCharsets.UTF_8)))); | ||
puffinWriter.finish(); | ||
|
||
return new GenericStatisticsFile( | ||
snapshotId, | ||
statsLocation, | ||
puffinWriter.fileSize(), | ||
puffinWriter.footerSize(), | ||
puffinWriter.writtenBlobsMetadata().stream().map(GenericBlobMetadata::from).toList()); | ||
} | ||
} | ||
} |
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The changes in this file primarily attach previous metadata and statistics filewhen writing metadata |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should either add retries for these or submit them as separate tasks. As is, if one of these files fails to delete, we'll retry the whole task and resubmit a new task for each manifest. If the manifests are already deleted when we retry, we'll get stuck in a retry loop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the suggestion, Michael! I could refactor the deletion process by adding retry strategy, similar to the
tryDelete
inManifestFileCleanupTaskHandler
.Personally, I think using a retry mechanism here might be more effective than creating separate tasks. With separate tasks,, there’s still a risk of issues like task creation failures, which could result in skipping the entire task (which contains multiple files). By using retries within the ManifestFileCleanupTaskHandler, we can manage failure handling at the file level, ensuring that each file is retried independently. This way, if a file deletion fails, we can retry just that file without needing to resubmit or skip the others files. This approach can offers more granular control over handling failures.
I’m open to your thoughts on this! Does this seem aligned with what you were suggesting, or do you see potential advantages in the separate task approach that I might be overlooking?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, do you mean adding retry within the
TableCleanupTaskHandler.java
? If so, that's fine with me. I'd at least use an Executor to attempt the execution in parallel (i.e., so file2 isn't blocked during the delay between retries for file1).TBH, I'm not very familiar with how Iceberg generates the statistic files. Does there tend to be one per snapshot? one per data file? If the latter, we could be talking about a very large number of files. If that's the case, I think submitting separate tasks with batches of files makes sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They are per snapshots, https://iceberg.apache.org/spec/#table-statistics.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there only one statistics file per snapshot? The spec is not clear:
Unlike the partition statistics file, which is very clear:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could be multiple statistics files as the following code shows:
https://github.com/apache/iceberg/blob/5439cbdb278232779fdd9a392bbf57f007f9bda0/core/src/main/java/org/apache/iceberg/TableMetadata.java#L535-L535
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for providing the context, @flyrain :)
@collado-mike Yes, if we go for retries, the logic will be within
TableCleanupTaskHandler.java
. But considering the information provided by Yufei, maybe separate tasks will be a more appropriate approach for stats file deletion?