Skip to content

Commit

Permalink
Update
Browse files Browse the repository at this point in the history
  • Loading branch information
TQJADE committed Dec 19, 2024
1 parent a53a587 commit 4774774
Showing 1 changed file with 22 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import org.apache.iceberg.util.StructLikeMap;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.internal.SQLConf;
import org.apache.spark.sql.types.StructType;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.TestTemplate;
Expand Down Expand Up @@ -193,7 +194,7 @@ public void testUnpartitioned() throws Exception {
.option(SizeBasedFileRewriter.REWRITE_ALL, "true")
.execute();
List<DeleteFile> newDeleteFiles = deleteFiles(table);
assertThat(newDeleteFiles).as("New delete file(s)").hasSize(1);
assertThat(newDeleteFiles).as("New delete files").hasSize(1);
assertLocallySorted(newDeleteFiles);
assertNotContains(deleteFiles, newDeleteFiles);
checkResult(result, deleteFiles, newDeleteFiles, 1);
Expand Down Expand Up @@ -262,7 +263,8 @@ public void testRewriteFilter() throws Exception {
Expression filter =
Expressions.and(
Expressions.greaterThan("c3", "0"), // should have no effect
Expressions.or(Expressions.equal("c1", 1), Expressions.equal("c1", 2)));
// "C1" should work because Spark defaults case sensitivity to false.
Expressions.or(Expressions.equal("C1", 1), Expressions.equal("C1", 2)));

Result result =
SparkActions.get(spark)
Expand All @@ -273,7 +275,7 @@ public void testRewriteFilter() throws Exception {
.execute();

List<DeleteFile> newDeleteFiles = except(deleteFiles(table), deleteFiles);
assertThat(newDeleteFiles).as("Delete file(s)").hasSize(2);
assertThat(newDeleteFiles).as("Delete files").hasSize(2);

List<DeleteFile> expectedRewrittenFiles =
filterFiles(table, deleteFiles, ImmutableList.of(1), ImmutableList.of(2));
Expand Down Expand Up @@ -324,7 +326,7 @@ public void testRewriteToSmallerTarget() throws Exception {
.option(SizeBasedFileRewriter.TARGET_FILE_SIZE_BYTES, String.valueOf(avgSize / 2))
.execute();
List<DeleteFile> newDeleteFiles = deleteFiles(table);
assertThat(newDeleteFiles).as("New delete file(s)").hasSize(8);
assertThat(newDeleteFiles).as("New delete files").hasSize(8);
assertNotContains(deleteFiles, newDeleteFiles);
assertLocallySorted(newDeleteFiles);
checkResult(result, deleteFiles, newDeleteFiles, 4);
Expand Down Expand Up @@ -378,7 +380,7 @@ public void testRemoveDanglingDeletes() throws Exception {
List<Object[]> actualRecords = records(table);
List<Object[]> actualDeletes = deleteRecords(table);
assertEquals("Rows must match", expectedRecords, actualRecords);
assertThat(actualDeletes).as("no new position deletes").isEmpty();
assertThat(actualDeletes).as("No new position deletes").isEmpty();
}

@TestTemplate
Expand Down Expand Up @@ -411,7 +413,7 @@ public void testSomePartitionsDanglingDeletes() throws Exception {
.option(SizeBasedFileRewriter.REWRITE_ALL, "true")
.execute();
List<DeleteFile> newDeleteFiles = deleteFiles(table);
assertThat(newDeleteFiles).as("New delete file(s)").hasSize(2);
assertThat(newDeleteFiles).as("New delete files").hasSize(2);
assertNotContains(deleteFiles, newDeleteFiles);
assertLocallySorted(newDeleteFiles);
checkResult(result, deleteFiles, newDeleteFiles, 4);
Expand Down Expand Up @@ -467,7 +469,7 @@ public void testRewriteFilterRemoveDangling() throws Exception {
.execute();

List<DeleteFile> newDeleteFiles = except(deleteFiles(table), deleteFiles);
assertThat(newDeleteFiles).as("New delete files").hasSize(0);
assertThat(newDeleteFiles).as("New delete files").isEmpty();

List<DeleteFile> expectedRewrittenFiles =
filterFiles(table, deleteFiles, ImmutableList.of(0), ImmutableList.of(1));
Expand Down Expand Up @@ -522,7 +524,7 @@ public void testPartitionEvolutionAdd() throws Exception {
Stream.concat(unpartitionedDeleteFiles.stream(), partitionedDeleteFiles.stream())
.collect(Collectors.toList());
List<DeleteFile> newDeleteFiles = deleteFiles(table);
assertThat(newDeleteFiles).as("New delete file(s)").hasSize(3);
assertThat(newDeleteFiles).as("New delete files").hasSize(3);
assertNotContains(rewrittenDeleteFiles, newDeleteFiles);
assertLocallySorted(newDeleteFiles);
checkResult(result, rewrittenDeleteFiles, newDeleteFiles, 3);
Expand Down Expand Up @@ -569,7 +571,7 @@ public void testPartitionEvolutionRemove() throws Exception {
.option(SizeBasedFileRewriter.REWRITE_ALL, "true")
.execute();
List<DeleteFile> newDeleteFiles = deleteFiles(table);
assertThat(newDeleteFiles).as("New delete file(s)").hasSize(3);
assertThat(newDeleteFiles).as("New delete files").hasSize(3);
assertNotContains(expectedRewritten, newDeleteFiles);
assertLocallySorted(newDeleteFiles);
checkResult(result, expectedRewritten, newDeleteFiles, 3);
Expand Down Expand Up @@ -620,7 +622,7 @@ public void testSchemaEvolution() throws Exception {
Stream.concat(deleteFiles.stream(), newSchemaDeleteFiles.stream())
.collect(Collectors.toList());
List<DeleteFile> newDeleteFiles = deleteFiles(table);
assertThat(newDeleteFiles).as("New delete file(s)").hasSize(4);
assertThat(newDeleteFiles).as("New delete files").hasSize(4);
assertNotContains(rewrittenDeleteFiles, newDeleteFiles);
assertLocallySorted(newDeleteFiles);
checkResult(result, rewrittenDeleteFiles, newDeleteFiles, 4);
Expand Down Expand Up @@ -946,7 +948,7 @@ private void assertNotContains(List<DeleteFile> original, List<DeleteFile> rewri
Set<String> rewrittenPaths =
rewritten.stream().map(ContentFile::location).collect(Collectors.toSet());
rewrittenPaths.retainAll(originalPaths);
assertThat(rewrittenPaths).hasSize(0);
assertThat(rewrittenPaths).isEmpty();
}

private void assertLocallySorted(List<DeleteFile> deleteFiles) {
Expand Down Expand Up @@ -1052,48 +1054,48 @@ private void checkResult(
List<DeleteFile> newDeletes,
int expectedGroups) {
assertThat(rewrittenDeletes.size())
.as("Expected rewritten delete file count does not match")
.as("Rewritten delete file count does not match")
.isEqualTo(result.rewrittenDeleteFilesCount());

assertThat(newDeletes.size())
.as("Expected new delete file count does not match")
.as("New delete file count does not match")
.isEqualTo(result.addedDeleteFilesCount());

assertThat(size(rewrittenDeletes))
.as("Expected rewritten delete byte count does not match")
.as("Rewritten delete byte count does not match")
.isEqualTo(result.rewrittenBytesCount());

assertThat(size(newDeletes))
.as("Expected new delete byte count does not match")
.as("New delete byte count does not match")
.isEqualTo(result.addedBytesCount());

assertThat(expectedGroups)
.as("Expected rewrite group count does not match")
.as("Rewrite group count does not match")
.isEqualTo(result.rewriteResults().size());

assertThat(rewrittenDeletes.size())
.as("Expected rewritten delete file count in all groups to match")
.as("Rewritten delete file count in all groups to match")
.isEqualTo(
result.rewriteResults().stream()
.mapToInt(FileGroupRewriteResult::rewrittenDeleteFilesCount)
.sum());

assertThat(newDeletes.size())
.as("Expected added delete file count in all groups to match")
.as("Added delete file count in all groups to match")
.isEqualTo(
result.rewriteResults().stream()
.mapToInt(FileGroupRewriteResult::addedDeleteFilesCount)
.sum());

assertThat(size(rewrittenDeletes))
.as("Expected rewritten delete bytes in all groups to match")
.as("Rewritten delete bytes in all groups to match")
.isEqualTo(
result.rewriteResults().stream()
.mapToLong(FileGroupRewriteResult::rewrittenBytesCount)
.sum());

assertThat(size(newDeletes))
.as("Expected added delete bytes in all groups to match")
.as("Added delete bytes in all groups to match")
.isEqualTo(
result.rewriteResults().stream()
.mapToLong(FileGroupRewriteResult::addedBytesCount)
Expand Down

0 comments on commit 4774774

Please sign in to comment.