diff --git a/core/src/main/java/org/apache/iceberg/BaseRowDelta.java b/core/src/main/java/org/apache/iceberg/BaseRowDelta.java index 85c2269ee526..372fc5367f08 100644 --- a/core/src/main/java/org/apache/iceberg/BaseRowDelta.java +++ b/core/src/main/java/org/apache/iceberg/BaseRowDelta.java @@ -139,6 +139,8 @@ protected void validate(TableMetadata base, Snapshot parent) { if (validateNewDeleteFiles) { validateNoNewDeleteFiles(base, startingSnapshotId, conflictDetectionFilter, parent); } + + validateAddedDVs(base, startingSnapshotId, conflictDetectionFilter, parent); } } } diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java index 50885dbb06c7..6198ad00f680 100644 --- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java @@ -48,11 +48,13 @@ import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.util.CharSequenceSet; +import org.apache.iceberg.util.ContentFileUtil; import org.apache.iceberg.util.DataFileSet; import org.apache.iceberg.util.DeleteFileSet; import org.apache.iceberg.util.Pair; import org.apache.iceberg.util.PartitionSet; import org.apache.iceberg.util.SnapshotUtil; +import org.apache.iceberg.util.Tasks; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -70,6 +72,9 @@ abstract class MergingSnapshotProducer extends SnapshotProducer { // delete files can be added in "overwrite" or "delete" operations private static final Set VALIDATE_ADDED_DELETE_FILES_OPERATIONS = ImmutableSet.of(DataOperations.OVERWRITE, DataOperations.DELETE); + // DVs can be added in "overwrite", "delete", and "replace" operations + private static final Set VALIDATE_ADDED_DVS_OPERATIONS = + ImmutableSet.of(DataOperations.OVERWRITE, DataOperations.DELETE, DataOperations.REPLACE); private final String tableName; private final TableOperations ops; @@ -83,6 +88,7 @@ abstract class MergingSnapshotProducer extends SnapshotProducer { private final Map newDataFilesBySpec = Maps.newHashMap(); private Long newDataFilesDataSequenceNumber; private final Map newDeleteFilesBySpec = Maps.newHashMap(); + private final Set newDVRefs = Sets.newHashSet(); private final List appendManifests = Lists.newArrayList(); private final List rewrittenAppendManifests = Lists.newArrayList(); private final SnapshotSummary.Builder addedFilesSummary = SnapshotSummary.builder(); @@ -245,13 +251,13 @@ private PartitionSpec spec(int specId) { /** Add a delete file to the new snapshot. */ protected void add(DeleteFile file) { - Preconditions.checkNotNull(file, "Invalid delete file: null"); + validateNewDeleteFile(file); add(new PendingDeleteFile(file)); } /** Add a delete file to the new snapshot. */ protected void add(DeleteFile file, long dataSequenceNumber) { - Preconditions.checkNotNull(file, "Invalid delete file: null"); + validateNewDeleteFile(file); add(new PendingDeleteFile(file, dataSequenceNumber)); } @@ -268,9 +274,39 @@ private void add(PendingDeleteFile file) { if (deleteFiles.add(file)) { addedFilesSummary.addedFile(spec, file); hasNewDeleteFiles = true; + if (ContentFileUtil.isDV(file)) { + newDVRefs.add(file.referencedDataFile()); + } + } + } + + protected void validateNewDeleteFile(DeleteFile file) { + Preconditions.checkNotNull(file, "Invalid delete file: null"); + switch (formatVersion()) { + case 1: + throw new IllegalArgumentException("Deletes are supported in V2 and above"); + case 2: + Preconditions.checkArgument( + file.content() == FileContent.EQUALITY_DELETES || !ContentFileUtil.isDV(file), + "Must not use DVs for position deletes in V2: %s", + ContentFileUtil.dvDesc(file)); + break; + case 3: + Preconditions.checkArgument( + file.content() == FileContent.EQUALITY_DELETES || ContentFileUtil.isDV(file), + "Must use DVs for position deletes in V%s: %s", + formatVersion(), + file.location()); + break; + default: + throw new IllegalArgumentException("Unsupported format version: " + formatVersion()); } } + private int formatVersion() { + return ops.current().formatVersion(); + } + /** Add all files in a manifest to the new snapshot. */ protected void add(ManifestFile manifest) { Preconditions.checkArgument( @@ -769,6 +805,58 @@ protected void validateDataFilesExist( } } + // validates there are no concurrently added DVs for referenced data files + protected void validateAddedDVs( + TableMetadata base, + Long startingSnapshotId, + Expression conflictDetectionFilter, + Snapshot parent) { + // skip if there is no current table state or this operation doesn't add new DVs + if (parent == null || newDVRefs.isEmpty()) { + return; + } + + Pair, Set> history = + validationHistory( + base, + startingSnapshotId, + VALIDATE_ADDED_DVS_OPERATIONS, + ManifestContent.DELETES, + parent); + List newDeleteManifests = history.first(); + Set newSnapshotIds = history.second(); + + Tasks.foreach(newDeleteManifests) + .stopOnFailure() + .throwFailureWhenFinished() + .executeWith(workerPool()) + .run(manifest -> validateAddedDVs(manifest, conflictDetectionFilter, newSnapshotIds)); + } + + private void validateAddedDVs( + ManifestFile manifest, Expression conflictDetectionFilter, Set newSnapshotIds) { + try (CloseableIterable> entries = + ManifestFiles.readDeleteManifest(manifest, ops.io(), ops.current().specsById()) + .filterRows(conflictDetectionFilter) + .caseSensitive(caseSensitive) + .liveEntries()) { + + for (ManifestEntry entry : entries) { + DeleteFile file = entry.file(); + if (newSnapshotIds.contains(entry.snapshotId()) && ContentFileUtil.isDV(file)) { + ValidationException.check( + !newDVRefs.contains(file.referencedDataFile()), + "Found concurrently added DV for %s: %s", + file.referencedDataFile(), + ContentFileUtil.dvDesc(file)); + } + } + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + // returns newly added manifests and snapshot IDs between the starting and parent snapshots private Pair, Set> validationHistory( TableMetadata base, Long startingSnapshotId, diff --git a/core/src/test/java/org/apache/iceberg/DeleteFileIndexTestBase.java b/core/src/test/java/org/apache/iceberg/DeleteFileIndexTestBase.java index 6ef28191e78e..481422457b73 100644 --- a/core/src/test/java/org/apache/iceberg/DeleteFileIndexTestBase.java +++ b/core/src/test/java/org/apache/iceberg/DeleteFileIndexTestBase.java @@ -53,15 +53,6 @@ public static List parameters() { return Arrays.asList(2, 3); } - static final DeleteFile FILE_A_POS_1 = - FileMetadata.deleteFileBuilder(SPEC) - .ofPositionDeletes() - .withPath("/path/to/data-a-pos-deletes.parquet") - .withFileSizeInBytes(10) - .withPartition(FILE_A.partition()) - .withRecordCount(1) - .build(); - static final DeleteFile FILE_A_EQ_1 = FileMetadata.deleteFileBuilder(SPEC) .ofEqualityDeletes() @@ -311,7 +302,7 @@ public void testUnpartitionedTableScan() throws IOException { public void testPartitionedTableWithPartitionPosDeletes() { table.newAppend().appendFile(FILE_A).commit(); - table.newRowDelta().addDeletes(FILE_A_POS_1).commit(); + table.newRowDelta().addDeletes(fileADeletes()).commit(); List tasks = Lists.newArrayList(newScan(table).planFiles().iterator()); assertThat(tasks).as("Should have one task").hasSize(1); @@ -323,7 +314,7 @@ public void testPartitionedTableWithPartitionPosDeletes() { assertThat(task.deletes()).as("Should have one associated delete file").hasSize(1); assertThat(task.deletes().get(0).path()) .as("Should have only pos delete file") - .isEqualTo(FILE_A_POS_1.path()); + .isEqualTo(fileADeletes().path()); } @TestTemplate @@ -349,7 +340,7 @@ public void testPartitionedTableWithPartitionEqDeletes() { public void testPartitionedTableWithUnrelatedPartitionDeletes() { table.newAppend().appendFile(FILE_B).commit(); - table.newRowDelta().addDeletes(FILE_A_POS_1).addDeletes(FILE_A_EQ_1).commit(); + table.newRowDelta().addDeletes(fileADeletes()).addDeletes(FILE_A_EQ_1).commit(); List tasks = Lists.newArrayList(newScan(table).planFiles().iterator()); assertThat(tasks).as("Should have one task").hasSize(1); @@ -363,7 +354,9 @@ public void testPartitionedTableWithUnrelatedPartitionDeletes() { @TestTemplate public void testPartitionedTableWithOlderPartitionDeletes() { - table.newRowDelta().addDeletes(FILE_A_POS_1).addDeletes(FILE_A_EQ_1).commit(); + assumeThat(formatVersion).as("DVs are not filtered using sequence numbers").isEqualTo(2); + + table.newRowDelta().addDeletes(FILE_A_DELETES).addDeletes(FILE_A_EQ_1).commit(); table.newAppend().appendFile(FILE_A).commit(); @@ -379,6 +372,8 @@ public void testPartitionedTableWithOlderPartitionDeletes() { @TestTemplate public void testPartitionedTableScanWithGlobalDeletes() { + assumeThat(formatVersion).as("Requires V2 position deletes").isEqualTo(2); + table.newAppend().appendFile(FILE_A).commit(); TableMetadata base = table.ops().current(); @@ -407,6 +402,8 @@ public void testPartitionedTableScanWithGlobalDeletes() { @TestTemplate public void testPartitionedTableScanWithGlobalAndPartitionDeletes() { + assumeThat(formatVersion).as("Requires V2 position deletes").isEqualTo(2); + table.newAppend().appendFile(FILE_A).commit(); table.newRowDelta().addDeletes(FILE_A_EQ_1).commit(); @@ -437,7 +434,7 @@ public void testPartitionedTableScanWithGlobalAndPartitionDeletes() { @TestTemplate public void testPartitionedTableSequenceNumbers() { - table.newRowDelta().addRows(FILE_A).addDeletes(FILE_A_EQ_1).addDeletes(FILE_A_POS_1).commit(); + table.newRowDelta().addRows(FILE_A).addDeletes(FILE_A_EQ_1).addDeletes(fileADeletes()).commit(); List tasks = Lists.newArrayList(newScan(table).planFiles().iterator()); assertThat(tasks).as("Should have one task").hasSize(1); @@ -449,7 +446,7 @@ public void testPartitionedTableSequenceNumbers() { assertThat(task.deletes()).as("Should have one associated delete file").hasSize(1); assertThat(task.deletes().get(0).path()) .as("Should have only pos delete file") - .isEqualTo(FILE_A_POS_1.path()); + .isEqualTo(fileADeletes().path()); } @TestTemplate @@ -501,7 +498,7 @@ public void testPartitionedTableWithExistingDeleteFile() { table.newRowDelta().addDeletes(FILE_A_EQ_1).commit(); - table.newRowDelta().addDeletes(FILE_A_POS_1).commit(); + table.newRowDelta().addDeletes(fileADeletes()).commit(); table .updateProperties() @@ -557,7 +554,7 @@ public void testPartitionedTableWithExistingDeleteFile() { assertThat(task.deletes()).as("Should have two associated delete files").hasSize(2); assertThat(Sets.newHashSet(Iterables.transform(task.deletes(), ContentFile::path))) .as("Should have expected delete files") - .isEqualTo(Sets.newHashSet(FILE_A_EQ_1.path(), FILE_A_POS_1.path())); + .isEqualTo(Sets.newHashSet(FILE_A_EQ_1.path(), fileADeletes().path())); } @TestTemplate diff --git a/core/src/test/java/org/apache/iceberg/ScanPlanningAndReportingTestBase.java b/core/src/test/java/org/apache/iceberg/ScanPlanningAndReportingTestBase.java index 13e96869b454..80551f0a2247 100644 --- a/core/src/test/java/org/apache/iceberg/ScanPlanningAndReportingTestBase.java +++ b/core/src/test/java/org/apache/iceberg/ScanPlanningAndReportingTestBase.java @@ -186,7 +186,7 @@ public void scanningWithDeletes() throws IOException { reporter); table.newAppend().appendFile(FILE_A).appendFile(FILE_B).appendFile(FILE_C).commit(); - table.newRowDelta().addDeletes(FILE_A_DELETES).addDeletes(FILE_B_DELETES).commit(); + table.newRowDelta().addDeletes(fileADeletes()).addDeletes(fileBDeletes()).commit(); ScanT tableScan = newScan(table); try (CloseableIterable fileScanTasks = tableScan.planFiles()) { @@ -208,12 +208,19 @@ public void scanningWithDeletes() throws IOException { assertThat(result.totalDataManifests().value()).isEqualTo(1); assertThat(result.totalDeleteManifests().value()).isEqualTo(1); assertThat(result.totalFileSizeInBytes().value()).isEqualTo(30L); - assertThat(result.totalDeleteFileSizeInBytes().value()).isEqualTo(20L); + assertThat(result.totalDeleteFileSizeInBytes().value()) + .isEqualTo(contentSize(fileADeletes(), fileBDeletes())); assertThat(result.skippedDataFiles().value()).isEqualTo(0); assertThat(result.skippedDeleteFiles().value()).isEqualTo(0); assertThat(result.indexedDeleteFiles().value()).isEqualTo(2); assertThat(result.equalityDeleteFiles().value()).isEqualTo(0); - assertThat(result.positionalDeleteFiles().value()).isEqualTo(2); + if (formatVersion == 2) { + assertThat(result.positionalDeleteFiles().value()).isEqualTo(2); + assertThat(result.dvs().value()).isEqualTo(0); + } else { + assertThat(result.positionalDeleteFiles().value()).isEqualTo(0); + assertThat(result.dvs().value()).isEqualTo(2); + } } @TestTemplate @@ -264,8 +271,8 @@ public void scanningWithSkippedDeleteFiles() throws IOException { tableDir, tableName, SCHEMA, SPEC, SortOrder.unsorted(), formatVersion, reporter); table.newAppend().appendFile(FILE_A).appendFile(FILE_B).appendFile(FILE_D).commit(); table.newOverwrite().deleteFile(FILE_A).addFile(FILE_A2).commit(); - table.newRowDelta().addDeletes(FILE_A_DELETES).addDeletes(FILE_D2_DELETES).commit(); - table.newRowDelta().addDeletes(FILE_B_DELETES).addDeletes(FILE_C2_DELETES).commit(); + table.newRowDelta().addDeletes(fileADeletes()).addDeletes(FILE_D2_DELETES).commit(); + table.newRowDelta().addDeletes(fileBDeletes()).addDeletes(FILE_C2_DELETES).commit(); ScanT tableScan = newScan(table); List fileTasks = Lists.newArrayList(); @@ -308,7 +315,7 @@ public void scanningWithEqualityAndPositionalDeleteFiles() throws IOException { tableDir, tableName, SCHEMA, SPEC, SortOrder.unsorted(), formatVersion, reporter); table.newAppend().appendFile(FILE_A).commit(); // FILE_A_DELETES = positionalDelete / FILE_A2_DELETES = equalityDelete - table.newRowDelta().addDeletes(FILE_A_DELETES).addDeletes(FILE_A2_DELETES).commit(); + table.newRowDelta().addDeletes(fileADeletes()).addDeletes(FILE_A2_DELETES).commit(); ScanT tableScan = newScan(table); try (CloseableIterable fileScanTasks = @@ -321,7 +328,13 @@ public void scanningWithEqualityAndPositionalDeleteFiles() throws IOException { ScanMetricsResult result = scanReport.scanMetrics(); assertThat(result.indexedDeleteFiles().value()).isEqualTo(2); assertThat(result.equalityDeleteFiles().value()).isEqualTo(1); - assertThat(result.positionalDeleteFiles().value()).isEqualTo(1); + if (formatVersion == 2) { + assertThat(result.positionalDeleteFiles().value()).isEqualTo(1); + assertThat(result.dvs().value()).isEqualTo(0); + } else { + assertThat(result.positionalDeleteFiles().value()).isEqualTo(0); + assertThat(result.dvs().value()).isEqualTo(1); + } } static class TestMetricsReporter implements MetricsReporter { diff --git a/core/src/test/java/org/apache/iceberg/TestBase.java b/core/src/test/java/org/apache/iceberg/TestBase.java index 9813d02910a6..46a1518e877f 100644 --- a/core/src/test/java/org/apache/iceberg/TestBase.java +++ b/core/src/test/java/org/apache/iceberg/TestBase.java @@ -45,6 +45,7 @@ import org.apache.iceberg.relocated.com.google.common.io.Files; import org.apache.iceberg.types.Conversions; import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.ScanTaskUtil; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.extension.ExtendWith; @@ -85,6 +86,17 @@ public class TestBase { .withPartitionPath("data_bucket=0") // easy way to set partition data for now .withRecordCount(1) .build(); + static final DeleteFile FILE_A_DV = + FileMetadata.deleteFileBuilder(SPEC) + .ofPositionDeletes() + .withPath("/path/to/data-a-deletes.puffin") + .withFileSizeInBytes(10) + .withPartitionPath("data_bucket=0") + .withRecordCount(1) + .withReferencedDataFile(FILE_A.location()) + .withContentOffset(4) + .withContentSizeInBytes(6) + .build(); // Equality delete files. static final DeleteFile FILE_A2_DELETES = FileMetadata.deleteFileBuilder(SPEC) @@ -110,6 +122,17 @@ public class TestBase { .withPartitionPath("data_bucket=1") // easy way to set partition data for now .withRecordCount(1) .build(); + static final DeleteFile FILE_B_DV = + FileMetadata.deleteFileBuilder(SPEC) + .ofPositionDeletes() + .withPath("/path/to/data-b-deletes.puffin") + .withFileSizeInBytes(10) + .withPartitionPath("data_bucket=1") + .withRecordCount(1) + .withReferencedDataFile(FILE_B.location()) + .withContentOffset(4) + .withContentSizeInBytes(6) + .build(); static final DataFile FILE_C = DataFiles.builder(SPEC) .withPath("/path/to/data-c.parquet") @@ -643,6 +666,22 @@ protected DataFile newDataFile(String partitionPath) { .build(); } + protected DeleteFile fileADeletes() { + return formatVersion >= 3 ? FILE_A_DV : FILE_A_DELETES; + } + + protected DeleteFile fileBDeletes() { + return formatVersion >= 3 ? FILE_B_DV : FILE_B_DELETES; + } + + protected DeleteFile newDeletes(DataFile dataFile) { + if (formatVersion >= 3) { + return FileGenerationUtil.generateDV(table, dataFile); + } else { + return FileGenerationUtil.generatePositionDeleteFile(table, dataFile); + } + } + protected DeleteFile newDeleteFile(int specId, String partitionPath) { PartitionSpec spec = table.specs().get(specId); return FileMetadata.deleteFileBuilder(spec) @@ -764,6 +803,14 @@ static Iterator files(ManifestFile manifest) { return ManifestFiles.read(manifest, FILE_IO).iterator(); } + static long recordCount(ContentFile... files) { + return Arrays.stream(files).mapToLong(ContentFile::recordCount).sum(); + } + + static long contentSize(ContentFile... files) { + return ScanTaskUtil.contentSizeInBytes(Arrays.asList(files)); + } + /** Used for assertions that only apply if the table version is v2. */ protected static class TableAssertions { private boolean enabled; diff --git a/core/src/test/java/org/apache/iceberg/TestBatchScans.java b/core/src/test/java/org/apache/iceberg/TestBatchScans.java index 1597f44f6338..72cd00e0573d 100644 --- a/core/src/test/java/org/apache/iceberg/TestBatchScans.java +++ b/core/src/test/java/org/apache/iceberg/TestBatchScans.java @@ -42,7 +42,7 @@ public void testDataTableScan() { table.newFastAppend().appendFile(FILE_A).appendFile(FILE_B).commit(); if (formatVersion > 1) { - table.newRowDelta().addDeletes(FILE_A_DELETES).commit(); + table.newRowDelta().addDeletes(fileADeletes()).commit(); } BatchScan scan = table.newBatchScan(); diff --git a/core/src/test/java/org/apache/iceberg/TestCommitReporting.java b/core/src/test/java/org/apache/iceberg/TestCommitReporting.java index 41b301668722..d333af98d623 100644 --- a/core/src/test/java/org/apache/iceberg/TestCommitReporting.java +++ b/core/src/test/java/org/apache/iceberg/TestCommitReporting.java @@ -95,11 +95,13 @@ public void addAndDeleteDeleteFiles() { // 2 positional + 1 equality table .newRowDelta() - .addDeletes(FILE_A_DELETES) - .addDeletes(FILE_B_DELETES) + .addDeletes(fileADeletes()) + .addDeletes(fileBDeletes()) .addDeletes(FILE_C2_DELETES) .commit(); + long totalDeleteContentSize = contentSize(fileADeletes(), fileBDeletes(), FILE_C2_DELETES); + CommitReport report = reporter.lastCommitReport(); assertThat(report).isNotNull(); assertThat(report.operation()).isEqualTo("delete"); @@ -110,7 +112,13 @@ public void addAndDeleteDeleteFiles() { CommitMetricsResult metrics = report.commitMetrics(); assertThat(metrics.addedDeleteFiles().value()).isEqualTo(3L); assertThat(metrics.totalDeleteFiles().value()).isEqualTo(3L); - assertThat(metrics.addedPositionalDeleteFiles().value()).isEqualTo(2L); + if (formatVersion == 2) { + assertThat(metrics.addedPositionalDeleteFiles().value()).isEqualTo(2L); + assertThat(metrics.addedDVs()).isNull(); + } else { + assertThat(metrics.addedPositionalDeleteFiles()).isNull(); + assertThat(metrics.addedDVs().value()).isEqualTo(2L); + } assertThat(metrics.addedEqualityDeleteFiles().value()).isEqualTo(1L); assertThat(metrics.addedPositionalDeletes().value()).isEqualTo(2L); @@ -119,15 +127,15 @@ public void addAndDeleteDeleteFiles() { assertThat(metrics.addedEqualityDeletes().value()).isEqualTo(1L); assertThat(metrics.totalEqualityDeletes().value()).isEqualTo(1L); - assertThat(metrics.addedFilesSizeInBytes().value()).isEqualTo(30L); - assertThat(metrics.totalFilesSizeInBytes().value()).isEqualTo(30L); + assertThat(metrics.addedFilesSizeInBytes().value()).isEqualTo(totalDeleteContentSize); + assertThat(metrics.totalFilesSizeInBytes().value()).isEqualTo(totalDeleteContentSize); // now remove those 2 positional + 1 equality delete files table .newRewrite() .rewriteFiles( ImmutableSet.of(), - ImmutableSet.of(FILE_A_DELETES, FILE_B_DELETES, FILE_C2_DELETES), + ImmutableSet.of(fileADeletes(), fileBDeletes(), FILE_C2_DELETES), ImmutableSet.of(), ImmutableSet.of()) .commit(); @@ -142,7 +150,13 @@ public void addAndDeleteDeleteFiles() { metrics = report.commitMetrics(); assertThat(metrics.removedDeleteFiles().value()).isEqualTo(3L); assertThat(metrics.totalDeleteFiles().value()).isEqualTo(0L); - assertThat(metrics.removedPositionalDeleteFiles().value()).isEqualTo(2L); + if (formatVersion == 2) { + assertThat(metrics.removedPositionalDeleteFiles().value()).isEqualTo(2L); + assertThat(metrics.removedDVs()).isNull(); + } else { + assertThat(metrics.removedPositionalDeleteFiles()).isNull(); + assertThat(metrics.removedDVs().value()).isEqualTo(2L); + } assertThat(metrics.removedEqualityDeleteFiles().value()).isEqualTo(1L); assertThat(metrics.removedPositionalDeletes().value()).isEqualTo(2L); @@ -151,7 +165,7 @@ public void addAndDeleteDeleteFiles() { assertThat(metrics.removedEqualityDeletes().value()).isEqualTo(1L); assertThat(metrics.totalEqualityDeletes().value()).isEqualTo(0L); - assertThat(metrics.removedFilesSizeInBytes().value()).isEqualTo(30L); + assertThat(metrics.removedFilesSizeInBytes().value()).isEqualTo(totalDeleteContentSize); assertThat(metrics.totalFilesSizeInBytes().value()).isEqualTo(0L); } diff --git a/core/src/test/java/org/apache/iceberg/TestEntriesMetadataTable.java b/core/src/test/java/org/apache/iceberg/TestEntriesMetadataTable.java index 9bce4e60a4f3..e061567e72a8 100644 --- a/core/src/test/java/org/apache/iceberg/TestEntriesMetadataTable.java +++ b/core/src/test/java/org/apache/iceberg/TestEntriesMetadataTable.java @@ -131,7 +131,7 @@ public void testEntriesTableWithDeleteManifests() { assumeThat(formatVersion).as("Only V2 Tables Support Deletes").isGreaterThanOrEqualTo(2); table.newAppend().appendFile(FILE_A).appendFile(FILE_B).commit(); - table.newRowDelta().addDeletes(FILE_A_DELETES).commit(); + table.newRowDelta().addDeletes(fileADeletes()).commit(); Table entriesTable = new ManifestEntriesTable(table); TableScan scan = entriesTable.newScan(); diff --git a/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java b/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java index 30fdae01cd94..f811dac02043 100644 --- a/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java +++ b/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java @@ -58,14 +58,14 @@ private void preparePartitionedTable(boolean transactional) { if (transactional) { table .newRowDelta() - .addDeletes(FILE_A_DELETES) - .addDeletes(FILE_B_DELETES) + .addDeletes(fileADeletes()) + .addDeletes(fileBDeletes()) .addDeletes(FILE_C2_DELETES) .addDeletes(FILE_D2_DELETES) .commit(); } else { - table.newRowDelta().addDeletes(FILE_A_DELETES).commit(); - table.newRowDelta().addDeletes(FILE_B_DELETES).commit(); + table.newRowDelta().addDeletes(fileADeletes()).commit(); + table.newRowDelta().addDeletes(fileBDeletes()).commit(); table.newRowDelta().addDeletes(FILE_C2_DELETES).commit(); table.newRowDelta().addDeletes(FILE_D2_DELETES).commit(); } @@ -721,7 +721,7 @@ public void testDeleteFilesTableSelection() throws IOException { assumeThat(formatVersion).as("Position deletes are not supported by V1 Tables").isNotEqualTo(1); table.newFastAppend().appendFile(FILE_A).commit(); - table.newRowDelta().addDeletes(FILE_A_DELETES).addDeletes(FILE_A2_DELETES).commit(); + table.newRowDelta().addDeletes(fileADeletes()).addDeletes(FILE_A2_DELETES).commit(); Table deleteFilesTable = new DeleteFilesTable(table); @@ -1409,10 +1409,10 @@ public void testPositionDeletesWithFilter() { .containsEntry(MetadataColumns.SPEC_ID.fieldId(), 0); assertThat(posDeleteTask.file().path()) .as("Expected correct delete file on task") - .isEqualTo(FILE_B_DELETES.path()); + .isEqualTo(fileBDeletes().path()); assertThat((Map) constantsMap(posDeleteTask, partitionType)) .as("Expected correct delete file on constant column") - .containsEntry(MetadataColumns.FILE_PATH.fieldId(), FILE_B_DELETES.path().toString()); + .containsEntry(MetadataColumns.FILE_PATH.fieldId(), fileBDeletes().path().toString()); } @TestTemplate @@ -1479,17 +1479,16 @@ private void testPositionDeletesBaseTableFilter(boolean transactional) { .containsEntry(MetadataColumns.SPEC_ID.fieldId(), 0); assertThat(posDeleteTask.file().path()) .as("Expected correct delete file on task") - .isEqualTo(FILE_A_DELETES.path()); + .isEqualTo(fileADeletes().path()); assertThat((Map) constantsMap(posDeleteTask, partitionType)) .as("Expected correct delete file on constant column") - .containsEntry(MetadataColumns.FILE_PATH.fieldId(), FILE_A_DELETES.path().toString()); + .containsEntry(MetadataColumns.FILE_PATH.fieldId(), fileADeletes().path().toString()); } @TestTemplate public void testPositionDeletesWithBaseTableFilterNot() { - assumeThat(formatVersion) - .as("Position deletes are not supported by V1 Tables") - .isNotEqualTo(1); // use identity rather than bucket partition spec, + assumeThat(formatVersion).as("Position deletes are not supported by V1 Tables").isEqualTo(2); + // use identity rather than bucket partition spec, // as bucket.project does not support projecting notEq table.updateSpec().removeField("data_bucket").addField("id").commit(); PartitionSpec spec = table.spec(); @@ -1619,20 +1618,8 @@ public void testPositionDeletesUnpartitioned() { .build(); table.newAppend().appendFile(dataFile1).appendFile(dataFile2).commit(); - DeleteFile delete1 = - FileMetadata.deleteFileBuilder(table.spec()) - .ofPositionDeletes() - .withPath("/path/to/delete1.parquet") - .withFileSizeInBytes(10) - .withRecordCount(1) - .build(); - DeleteFile delete2 = - FileMetadata.deleteFileBuilder(table.spec()) - .ofPositionDeletes() - .withPath("/path/to/delete2.parquet") - .withFileSizeInBytes(10) - .withRecordCount(1) - .build(); + DeleteFile delete1 = newDeletes(dataFile1); + DeleteFile delete2 = newDeletes(dataFile2); table.newRowDelta().addDeletes(delete1).addDeletes(delete2).commit(); PositionDeletesTable positionDeletesTable = new PositionDeletesTable(table); @@ -1655,16 +1642,16 @@ public void testPositionDeletesUnpartitioned() { .isEqualTo(1); assertThat(scanTasks).hasSize(2); - scanTasks.sort(Comparator.comparing(f -> f.file().path().toString())); - assertThat(scanTasks.get(0).file().path().toString()).isEqualTo("/path/to/delete1.parquet"); - assertThat(scanTasks.get(1).file().path().toString()).isEqualTo("/path/to/delete2.parquet"); + scanTasks.sort(Comparator.comparing(f -> f.file().pos())); + assertThat(scanTasks.get(0).file().location()).isEqualTo(delete1.location()); + assertThat(scanTasks.get(1).file().location()).isEqualTo(delete2.location()); Types.StructType partitionType = Partitioning.partitionType(table); assertThat((Map) constantsMap(scanTasks.get(0), partitionType)) - .containsEntry(MetadataColumns.FILE_PATH.fieldId(), "/path/to/delete1.parquet"); + .containsEntry(MetadataColumns.FILE_PATH.fieldId(), delete1.location()); assertThat((Map) constantsMap(scanTasks.get(1), partitionType)) - .containsEntry(MetadataColumns.FILE_PATH.fieldId(), "/path/to/delete2.parquet"); + .containsEntry(MetadataColumns.FILE_PATH.fieldId(), delete2.location()); assertThat((Map) constantsMap(scanTasks.get(0), partitionType)) .containsEntry(MetadataColumns.SPEC_ID.fieldId(), 1); assertThat((Map) constantsMap(scanTasks.get(1), partitionType)) @@ -1712,20 +1699,8 @@ public void testPositionDeletesManyColumns() { .build(); table.newAppend().appendFile(dataFile1).appendFile(dataFile2).commit(); - DeleteFile delete1 = - FileMetadata.deleteFileBuilder(table.spec()) - .ofPositionDeletes() - .withPath("/path/to/delete1.parquet") - .withFileSizeInBytes(10) - .withRecordCount(1) - .build(); - DeleteFile delete2 = - FileMetadata.deleteFileBuilder(table.spec()) - .ofPositionDeletes() - .withPath("/path/to/delete2.parquet") - .withFileSizeInBytes(10) - .withRecordCount(1) - .build(); + DeleteFile delete1 = newDeletes(dataFile1); + DeleteFile delete2 = newDeletes(dataFile2); table.newRowDelta().addDeletes(delete1).addDeletes(delete2).commit(); PositionDeletesTable positionDeletesTable = new PositionDeletesTable(table); @@ -1745,9 +1720,9 @@ public void testPositionDeletesManyColumns() { return (PositionDeletesScanTask) task; })); assertThat(scanTasks).hasSize(2); - scanTasks.sort(Comparator.comparing(f -> f.file().path().toString())); - assertThat(scanTasks.get(0).file().path().toString()).isEqualTo("/path/to/delete1.parquet"); - assertThat(scanTasks.get(1).file().path().toString()).isEqualTo("/path/to/delete2.parquet"); + scanTasks.sort(Comparator.comparing(f -> f.file().pos())); + assertThat(scanTasks.get(0).file().location()).isEqualTo(delete1.location()); + assertThat(scanTasks.get(1).file().location()).isEqualTo(delete2.location()); } @TestTemplate diff --git a/core/src/test/java/org/apache/iceberg/TestMetadataTableScansWithPartitionEvolution.java b/core/src/test/java/org/apache/iceberg/TestMetadataTableScansWithPartitionEvolution.java index 2de38541777b..84860d34bb31 100644 --- a/core/src/test/java/org/apache/iceberg/TestMetadataTableScansWithPartitionEvolution.java +++ b/core/src/test/java/org/apache/iceberg/TestMetadataTableScansWithPartitionEvolution.java @@ -159,7 +159,7 @@ public void testPartitionsTableScanWithAddPartitionOnNestedField() { @TestTemplate public void testPositionDeletesPartitionSpecRemoval() { - assumeThat(formatVersion).as("Position deletes are not supported by V1 Tables").isNotEqualTo(1); + assumeThat(formatVersion).as("Position deletes are not supported by V1 Tables").isEqualTo(2); table.updateSpec().removeField("id").commit(); DeleteFile deleteFile = newDeleteFile(table.ops().current().spec().specId(), "nested.id=1"); diff --git a/core/src/test/java/org/apache/iceberg/TestRewriteFiles.java b/core/src/test/java/org/apache/iceberg/TestRewriteFiles.java index 124cc2f28dd5..5b108e9ee565 100644 --- a/core/src/test/java/org/apache/iceberg/TestRewriteFiles.java +++ b/core/src/test/java/org/apache/iceberg/TestRewriteFiles.java @@ -55,6 +55,7 @@ protected static List parameters() { @TestTemplate public void testEmptyTable() { + assumeThat(formatVersion).isGreaterThanOrEqualTo(2); assertThat(listManifestFiles()).isEmpty(); TableMetadata base = readMetadata(); @@ -87,6 +88,7 @@ public void testEmptyTable() { @TestTemplate public void testAddOnly() { + assumeThat(formatVersion).isGreaterThanOrEqualTo(2); assertThat(listManifestFiles()).isEmpty(); assertThatThrownBy( @@ -130,6 +132,7 @@ public void testAddOnly() { @TestTemplate public void testDeleteOnly() { + assumeThat(formatVersion).isGreaterThanOrEqualTo(2); assertThat(listManifestFiles()).isEmpty(); assertThatThrownBy( diff --git a/core/src/test/java/org/apache/iceberg/TestRewriteManifests.java b/core/src/test/java/org/apache/iceberg/TestRewriteManifests.java index f1d23de32a42..72bb85c0446e 100644 --- a/core/src/test/java/org/apache/iceberg/TestRewriteManifests.java +++ b/core/src/test/java/org/apache/iceberg/TestRewriteManifests.java @@ -1096,7 +1096,7 @@ public void testRewriteDataManifestsPreservesDeletes() { long appendSnapshotSeq = appendSnapshot.sequenceNumber(); // commit delete files - table.newRowDelta().addDeletes(FILE_A_DELETES).addDeletes(FILE_A2_DELETES).commit(); + table.newRowDelta().addDeletes(fileADeletes()).addDeletes(FILE_A2_DELETES).commit(); // save the delete snapshot info Snapshot deleteSnapshot = table.currentSnapshot(); @@ -1139,7 +1139,7 @@ public void testRewriteDataManifestsPreservesDeletes() { dataSeqs(deleteSnapshotSeq, deleteSnapshotSeq), fileSeqs(deleteSnapshotSeq, deleteSnapshotSeq), ids(deleteSnapshotId, deleteSnapshotId), - files(FILE_A_DELETES, FILE_A2_DELETES), + files(fileADeletes(), FILE_A2_DELETES), statuses(ManifestEntry.Status.ADDED, ManifestEntry.Status.ADDED)); } @@ -1158,7 +1158,7 @@ public void testReplaceDeleteManifestsOnly() throws IOException { long appendSnapshotSeq = appendSnapshot.sequenceNumber(); // commit delete files - table.newRowDelta().addDeletes(FILE_A_DELETES).addDeletes(FILE_A2_DELETES).commit(); + table.newRowDelta().addDeletes(fileADeletes()).addDeletes(FILE_A2_DELETES).commit(); // save the delete snapshot info Snapshot deleteSnapshot = table.currentSnapshot(); @@ -1179,7 +1179,7 @@ public void testReplaceDeleteManifestsOnly() throws IOException { deleteSnapshotId, deleteSnapshotSeq, deleteSnapshotSeq, - FILE_A_DELETES)); + fileADeletes())); ManifestFile newDeleteManifest2 = writeManifest( "delete-manifest-file-2.avro", @@ -1218,7 +1218,7 @@ public void testReplaceDeleteManifestsOnly() throws IOException { dataSeqs(deleteSnapshotSeq), fileSeqs(deleteSnapshotSeq), ids(deleteSnapshotId), - files(FILE_A_DELETES), + files(fileADeletes()), statuses(ManifestEntry.Status.EXISTING)); validateDeleteManifest( deleteManifests.get(1), @@ -1244,7 +1244,7 @@ public void testReplaceDataAndDeleteManifests() throws IOException { long appendSnapshotSeq = appendSnapshot.sequenceNumber(); // commit delete files - table.newRowDelta().addDeletes(FILE_A_DELETES).addDeletes(FILE_A2_DELETES).commit(); + table.newRowDelta().addDeletes(fileADeletes()).addDeletes(FILE_A2_DELETES).commit(); // save the delete snapshot info Snapshot deleteSnapshot = table.currentSnapshot(); @@ -1287,7 +1287,7 @@ public void testReplaceDataAndDeleteManifests() throws IOException { deleteSnapshotId, deleteSnapshotSeq, deleteSnapshotSeq, - FILE_A_DELETES)); + fileADeletes())); ManifestFile newDeleteManifest2 = writeManifest( "delete-manifest-file-2.avro", @@ -1337,7 +1337,7 @@ public void testReplaceDataAndDeleteManifests() throws IOException { dataSeqs(deleteSnapshotSeq), fileSeqs(deleteSnapshotSeq), ids(deleteSnapshotId), - files(FILE_A_DELETES), + files(fileADeletes()), statuses(ManifestEntry.Status.EXISTING)); validateDeleteManifest( deleteManifests.get(1), @@ -1361,7 +1361,7 @@ public void testDeleteManifestReplacementConcurrentAppend() throws IOException { long appendSnapshotSeq = appendSnapshot.sequenceNumber(); // commit delete files - table.newRowDelta().addDeletes(FILE_A_DELETES).addDeletes(FILE_A2_DELETES).commit(); + table.newRowDelta().addDeletes(fileADeletes()).addDeletes(FILE_A2_DELETES).commit(); // save the delete snapshot info Snapshot deleteSnapshot = table.currentSnapshot(); @@ -1379,7 +1379,7 @@ public void testDeleteManifestReplacementConcurrentAppend() throws IOException { deleteSnapshotId, deleteSnapshotSeq, deleteSnapshotSeq, - FILE_A_DELETES)); + fileADeletes())); ManifestFile newDeleteManifest2 = writeManifest( "delete-manifest-file-2.avro", @@ -1440,7 +1440,7 @@ public void testDeleteManifestReplacementConcurrentAppend() throws IOException { dataSeqs(deleteSnapshotSeq), fileSeqs(deleteSnapshotSeq), ids(deleteSnapshotId), - files(FILE_A_DELETES), + files(fileADeletes()), statuses(ManifestEntry.Status.EXISTING)); validateDeleteManifest( deleteManifests.get(1), @@ -1464,7 +1464,7 @@ public void testDeleteManifestReplacementConcurrentDeleteFileRemoval() throws IO long appendSnapshotSeq = appendSnapshot.sequenceNumber(); // commit the first set of delete files - table.newRowDelta().addDeletes(FILE_A_DELETES).addDeletes(FILE_A2_DELETES).commit(); + table.newRowDelta().addDeletes(fileADeletes()).addDeletes(FILE_A2_DELETES).commit(); // save the first delete snapshot info Snapshot deleteSnapshot1 = table.currentSnapshot(); @@ -1472,7 +1472,7 @@ public void testDeleteManifestReplacementConcurrentDeleteFileRemoval() throws IO long deleteSnapshotSeq1 = deleteSnapshot1.sequenceNumber(); // commit the second set of delete files - table.newRowDelta().addDeletes(FILE_B_DELETES).addDeletes(FILE_C2_DELETES).commit(); + table.newRowDelta().addDeletes(fileBDeletes()).addDeletes(FILE_C2_DELETES).commit(); // save the second delete snapshot info Snapshot deleteSnapshot2 = table.currentSnapshot(); @@ -1489,7 +1489,7 @@ public void testDeleteManifestReplacementConcurrentDeleteFileRemoval() throws IO deleteSnapshotId1, deleteSnapshotSeq1, deleteSnapshotSeq1, - FILE_A_DELETES)); + fileADeletes())); ManifestFile newDeleteManifest2 = writeManifest( "delete-manifest-file-2.avro", @@ -1507,7 +1507,7 @@ public void testDeleteManifestReplacementConcurrentDeleteFileRemoval() throws IO rewriteManifests.addManifest(newDeleteManifest2); // commit the third set of delete files concurrently - table.newRewrite().deleteFile(FILE_B_DELETES).commit(); + table.newRewrite().deleteFile(fileBDeletes()).commit(); Snapshot concurrentSnapshot = table.currentSnapshot(); long concurrentSnapshotId = concurrentSnapshot.snapshotId(); @@ -1541,7 +1541,7 @@ public void testDeleteManifestReplacementConcurrentDeleteFileRemoval() throws IO dataSeqs(deleteSnapshotSeq1), fileSeqs(deleteSnapshotSeq1), ids(deleteSnapshotId1), - files(FILE_A_DELETES), + files(fileADeletes()), statuses(ManifestEntry.Status.EXISTING)); validateDeleteManifest( deleteManifests.get(1), @@ -1555,7 +1555,7 @@ public void testDeleteManifestReplacementConcurrentDeleteFileRemoval() throws IO dataSeqs(deleteSnapshotSeq2, deleteSnapshotSeq2), fileSeqs(deleteSnapshotSeq2, deleteSnapshotSeq2), ids(concurrentSnapshotId, deleteSnapshotId2), - files(FILE_B_DELETES, FILE_C2_DELETES), + files(fileBDeletes(), FILE_C2_DELETES), statuses(ManifestEntry.Status.DELETED, ManifestEntry.Status.EXISTING)); } @@ -1567,7 +1567,7 @@ public void testDeleteManifestReplacementConflictingDeleteFileRemoval() throws I table.newFastAppend().appendFile(FILE_A).appendFile(FILE_B).appendFile(FILE_C).commit(); // commit delete files - table.newRowDelta().addDeletes(FILE_A_DELETES).addDeletes(FILE_A2_DELETES).commit(); + table.newRowDelta().addDeletes(fileADeletes()).addDeletes(FILE_A2_DELETES).commit(); // save the delete snapshot info Snapshot deleteSnapshot = table.currentSnapshot(); @@ -1584,7 +1584,7 @@ public void testDeleteManifestReplacementConflictingDeleteFileRemoval() throws I deleteSnapshotId, deleteSnapshotSeq, deleteSnapshotSeq, - FILE_A_DELETES)); + fileADeletes())); ManifestFile newDeleteManifest2 = writeManifest( "delete-manifest-file-2.avro", @@ -1602,7 +1602,7 @@ public void testDeleteManifestReplacementConflictingDeleteFileRemoval() throws I rewriteManifests.addManifest(newDeleteManifest2); // modify the original delete manifest concurrently - table.newRewrite().deleteFile(FILE_A_DELETES).commit(); + table.newRewrite().deleteFile(fileADeletes()).commit(); // the rewrite must fail as the original delete manifest was replaced concurrently assertThatThrownBy(rewriteManifests::commit) @@ -1621,7 +1621,7 @@ public void testDeleteManifestReplacementFailure() throws IOException { table.newFastAppend().appendFile(FILE_A).commit(); // commit the first delete file - table.newRowDelta().addDeletes(FILE_A_DELETES).commit(); + table.newRowDelta().addDeletes(fileADeletes()).commit(); // save the first delete snapshot info Snapshot deleteSnapshot1 = table.currentSnapshot(); @@ -1648,7 +1648,7 @@ public void testDeleteManifestReplacementFailure() throws IOException { deleteSnapshotId1, deleteSnapshotSeq1, deleteSnapshotSeq1, - FILE_A_DELETES), + fileADeletes()), manifestEntry( ManifestEntry.Status.EXISTING, deleteSnapshotId2, diff --git a/core/src/test/java/org/apache/iceberg/TestRowDelta.java b/core/src/test/java/org/apache/iceberg/TestRowDelta.java index 1d67e48a2ce2..b41be0c7a636 100644 --- a/core/src/test/java/org/apache/iceberg/TestRowDelta.java +++ b/core/src/test/java/org/apache/iceberg/TestRowDelta.java @@ -29,7 +29,9 @@ import static org.apache.iceberg.util.SnapshotUtil.latestSnapshot; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.api.Assumptions.assumeThat; +import java.io.IOException; import java.util.Arrays; import java.util.List; import java.util.Map; @@ -38,8 +40,11 @@ import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.junit.jupiter.api.TestTemplate; import org.junit.jupiter.api.extension.ExtendWith; @@ -52,13 +57,17 @@ public class TestRowDelta extends V2TableTestBase { @Parameters(name = "formatVersion = {0}, branch = {1}") protected static List parameters() { - return Arrays.asList(new Object[] {2, "main"}, new Object[] {2, "testBranch"}); + return Arrays.asList( + new Object[] {2, "main"}, + new Object[] {2, "testBranch"}, + new Object[] {3, "main"}, + new Object[] {3, "testBranch"}); } @TestTemplate public void addOnlyDeleteFilesProducesDeleteOperation() { SnapshotUpdate rowDelta = - table.newRowDelta().addDeletes(FILE_A_DELETES).addDeletes(FILE_B_DELETES); + table.newRowDelta().addDeletes(fileADeletes()).addDeletes(fileBDeletes()); commit(table, rowDelta, branch); Snapshot snap = latestSnapshot(table, branch); @@ -70,7 +79,7 @@ public void addOnlyDeleteFilesProducesDeleteOperation() { @TestTemplate public void testAddDeleteFile() { SnapshotUpdate rowDelta = - table.newRowDelta().addRows(FILE_A).addDeletes(FILE_A_DELETES).addDeletes(FILE_B_DELETES); + table.newRowDelta().addRows(FILE_A).addDeletes(fileADeletes()).addDeletes(fileBDeletes()); commit(table, rowDelta, branch); Snapshot snap = latestSnapshot(table, branch); @@ -95,7 +104,7 @@ public void testAddDeleteFile() { dataSeqs(1L, 1L), fileSeqs(1L, 1L), ids(snap.snapshotId(), snap.snapshotId()), - files(FILE_A_DELETES, FILE_B_DELETES), + files(fileADeletes(), fileBDeletes()), statuses(Status.ADDED, Status.ADDED)); } @@ -126,7 +135,7 @@ public void testValidateDataFilesExistDefaults() { table, table .newRowDelta() - .addDeletes(FILE_A_DELETES) + .addDeletes(fileADeletes()) .validateFromSnapshot(validateFromSnapshotId) .validateDataFilesExist(ImmutableList.of(FILE_A.path())), branch)) @@ -143,7 +152,7 @@ public void testValidateDataFilesExistDefaults() { table, table .newRowDelta() - .addDeletes(FILE_B_DELETES) + .addDeletes(fileBDeletes()) .validateDataFilesExist(ImmutableList.of(FILE_B.path())) .validateFromSnapshot(validateFromSnapshotId), branch); @@ -155,7 +164,7 @@ public void testValidateDataFilesExistDefaults() { dataSeqs(4L), fileSeqs(4L), ids(latestSnapshot(table, branch).snapshotId()), - files(FILE_B_DELETES), + files(fileBDeletes()), statuses(Status.ADDED)); } @@ -177,7 +186,7 @@ public void testValidateDataFilesExistOverwrite() { table, table .newRowDelta() - .addDeletes(FILE_A_DELETES) + .addDeletes(fileADeletes()) .validateFromSnapshot(validateFromSnapshotId) .validateDataFilesExist(ImmutableList.of(FILE_A.path())), branch)) @@ -209,7 +218,7 @@ public void testValidateDataFilesExistReplacePartitions() { table, table .newRowDelta() - .addDeletes(FILE_A_DELETES) + .addDeletes(fileADeletes()) .validateFromSnapshot(validateFromSnapshotId) .validateDataFilesExist(ImmutableList.of(FILE_A.path())), branch)) @@ -242,7 +251,7 @@ public void testValidateDataFilesExistFromSnapshot() { table, table .newRowDelta() - .addDeletes(FILE_A_DELETES) + .addDeletes(fileADeletes()) .validateFromSnapshot(validateFromSnapshotId) .validateDataFilesExist(ImmutableList.of(FILE_A.path())), branch); @@ -276,7 +285,7 @@ public void testValidateDataFilesExistFromSnapshot() { dataSeqs(3L), fileSeqs(3L), ids(snap.snapshotId()), - files(FILE_A_DELETES), + files(fileADeletes()), statuses(Status.ADDED)); } @@ -301,7 +310,7 @@ public void testValidateDataFilesExistRewrite() { table, table .newRowDelta() - .addDeletes(FILE_A_DELETES) + .addDeletes(fileADeletes()) .validateFromSnapshot(validateFromSnapshotId) .validateDataFilesExist(ImmutableList.of(FILE_A.path())), branch)) @@ -333,7 +342,7 @@ public void testValidateDataFilesExistValidateDeletes() { table, table .newRowDelta() - .addDeletes(FILE_A_DELETES) + .addDeletes(fileADeletes()) .validateDeletedFiles() .validateFromSnapshot(validateFromSnapshotId) .validateDataFilesExist(ImmutableList.of(FILE_A.path())), @@ -366,7 +375,7 @@ public void testValidateNoConflicts() { table, table .newRowDelta() - .addDeletes(FILE_A_DELETES) + .addDeletes(fileADeletes()) .validateFromSnapshot(validateFromSnapshotId) .conflictDetectionFilter( Expressions.equal("data", "u")) // bucket16("u") -> 0 @@ -399,7 +408,7 @@ public void testValidateNoConflictsFromSnapshot() { table, table .newRowDelta() - .addDeletes(FILE_A_DELETES) + .addDeletes(fileADeletes()) .validateDeletedFiles() .validateFromSnapshot(validateFromSnapshotId) .validateDataFilesExist(ImmutableList.of(FILE_A.path())) @@ -436,7 +445,7 @@ public void testValidateNoConflictsFromSnapshot() { dataSeqs(3L), fileSeqs(3L), ids(snap.snapshotId()), - files(FILE_A_DELETES), + files(fileADeletes()), statuses(Status.ADDED)); } @@ -444,7 +453,7 @@ public void testValidateNoConflictsFromSnapshot() { public void testOverwriteWithDeleteFile() { commit( table, - table.newRowDelta().addRows(FILE_A).addDeletes(FILE_A_DELETES).addDeletes(FILE_B_DELETES), + table.newRowDelta().addRows(FILE_A).addDeletes(fileADeletes()).addDeletes(fileBDeletes()), branch); long deltaSnapshotId = latestSnapshot(table, branch).snapshotId(); @@ -479,7 +488,7 @@ public void testOverwriteWithDeleteFile() { dataSeqs(1L, 1L), fileSeqs(1L, 1L), ids(snap.snapshotId(), deltaSnapshotId), - files(FILE_A_DELETES, FILE_B_DELETES), + files(fileADeletes(), fileBDeletes()), statuses(Status.DELETED, Status.EXISTING)); } @@ -487,7 +496,7 @@ public void testOverwriteWithDeleteFile() { public void testReplacePartitionsWithDeleteFile() { commit( table, - table.newRowDelta().addRows(FILE_A).addDeletes(FILE_A_DELETES).addDeletes(FILE_B_DELETES), + table.newRowDelta().addRows(FILE_A).addDeletes(fileADeletes()).addDeletes(fileBDeletes()), branch); long deltaSnapshotId = latestSnapshot(table, branch).snapshotId(); @@ -526,7 +535,7 @@ public void testReplacePartitionsWithDeleteFile() { dataSeqs(1L, 1L), fileSeqs(1L, 1L), ids(snap.snapshotId(), deltaSnapshotId), - files(FILE_A_DELETES, FILE_B_DELETES), + files(fileADeletes(), fileBDeletes()), statuses(Status.DELETED, Status.EXISTING)); } @@ -534,7 +543,7 @@ public void testReplacePartitionsWithDeleteFile() { public void testDeleteByExpressionWithDeleteFile() { commit( table, - table.newRowDelta().addRows(FILE_A).addDeletes(FILE_A_DELETES).addDeletes(FILE_B_DELETES), + table.newRowDelta().addRows(FILE_A).addDeletes(fileADeletes()).addDeletes(fileBDeletes()), branch); long deltaSnapshotId = latestSnapshot(table, branch).snapshotId(); @@ -564,13 +573,13 @@ public void testDeleteByExpressionWithDeleteFile() { dataSeqs(1L, 1L), fileSeqs(1L, 1L), ids(snap.snapshotId(), snap.snapshotId()), - files(FILE_A_DELETES, FILE_B_DELETES), + files(fileADeletes(), fileBDeletes()), statuses(Status.DELETED, Status.DELETED)); } @TestTemplate public void testDeleteDataFileWithDeleteFile() { - commit(table, table.newRowDelta().addRows(FILE_A).addDeletes(FILE_A_DELETES), branch); + commit(table, table.newRowDelta().addRows(FILE_A).addDeletes(fileADeletes()), branch); long deltaSnapshotId = latestSnapshot(table, branch).snapshotId(); assertThat(latestSnapshot(table, branch).sequenceNumber()).isEqualTo(1); @@ -598,7 +607,7 @@ public void testDeleteDataFileWithDeleteFile() { dataSeqs(1L), fileSeqs(1L), ids(deltaSnapshotId), - files(FILE_A_DELETES), + files(fileADeletes()), statuses(Status.ADDED)); // the manifest that removed FILE_A will be dropped next commit, causing the min sequence number @@ -619,13 +628,13 @@ public void testDeleteDataFileWithDeleteFile() { dataSeqs(1L), fileSeqs(1L), ids(nextSnap.snapshotId()), - files(FILE_A_DELETES), + files(fileADeletes()), statuses(Status.DELETED)); } @TestTemplate public void testFastAppendDoesNotRemoveStaleDeleteFiles() { - commit(table, table.newRowDelta().addRows(FILE_A).addDeletes(FILE_A_DELETES), branch); + commit(table, table.newRowDelta().addRows(FILE_A).addDeletes(fileADeletes()), branch); long deltaSnapshotId = latestSnapshot(table, branch).snapshotId(); assertThat(latestSnapshot(table, branch).sequenceNumber()).isEqualTo(1); @@ -653,7 +662,7 @@ public void testFastAppendDoesNotRemoveStaleDeleteFiles() { dataSeqs(1L), fileSeqs(1L), ids(deltaSnapshotId), - files(FILE_A_DELETES), + files(fileADeletes()), statuses(Status.ADDED)); // the manifest that removed FILE_A will be dropped next merging commit, but FastAppend will not @@ -689,7 +698,7 @@ public void testFastAppendDoesNotRemoveStaleDeleteFiles() { dataSeqs(1L), fileSeqs(1L), ids(deltaSnapshotId), - files(FILE_A_DELETES), + files(fileADeletes()), statuses(Status.ADDED)); } @@ -728,14 +737,7 @@ public void testValidateDataFilesExistWithConflictDetectionFilter() { Snapshot baseSnapshot = latestSnapshot(table, branch); // add a delete file for partition A - DeleteFile deleteFile = - FileMetadata.deleteFileBuilder(table.spec()) - .ofPositionDeletes() - .withPath("/path/to/data-a-deletes.parquet") - .withFileSizeInBytes(10) - .withPartitionPath("data=a") - .withRecordCount(1) - .build(); + DeleteFile deleteFile = newDeletes(dataFile1); Expression conflictDetectionFilter = Expressions.equal("data", "a"); RowDelta rowDelta = @@ -789,14 +791,7 @@ public void testValidateDataFilesDoNotExistWithConflictDetectionFilter() { Snapshot baseSnapshot = latestSnapshot(table, branch); // add a delete file for partition A - DeleteFile deleteFile = - FileMetadata.deleteFileBuilder(table.spec()) - .ofPositionDeletes() - .withPath("/path/to/data-a-deletes.parquet") - .withFileSizeInBytes(10) - .withPartitionPath("data=a") - .withRecordCount(1) - .build(); + DeleteFile deleteFile = newDeletes(dataFile1); Expression conflictDetectionFilter = Expressions.equal("data", "a"); RowDelta rowDelta = @@ -847,9 +842,9 @@ public void testAddDeleteFilesMultipleSpecs() { // commit a row delta with 1 data file and 3 delete files where delete files have different // specs DataFile dataFile = newDataFile("data=xyz"); - DeleteFile firstDeleteFile = newDeleteFile(firstSnapshotDataFile.specId(), "data_bucket=0"); - DeleteFile secondDeleteFile = newDeleteFile(secondSnapshotDataFile.specId(), ""); - DeleteFile thirdDeleteFile = newDeleteFile(thirdSnapshotDataFile.specId(), "data=abc"); + DeleteFile firstDeleteFile = newDeletes(firstSnapshotDataFile); + DeleteFile secondDeleteFile = newDeletes(secondSnapshotDataFile); + DeleteFile thirdDeleteFile = newDeletes(thirdSnapshotDataFile); commit( table, @@ -867,6 +862,7 @@ public void testAddDeleteFilesMultipleSpecs() { assertThat(snapshot.operation()).isEqualTo(DataOperations.OVERWRITE); Map summary = snapshot.summary(); + long posDeletesCount = recordCount(firstDeleteFile, secondDeleteFile, thirdDeleteFile); assertThat(summary) .containsEntry(CHANGED_PARTITION_COUNT_PROP, "4") @@ -874,8 +870,8 @@ public void testAddDeleteFilesMultipleSpecs() { .containsEntry(TOTAL_DATA_FILES_PROP, "4") .containsEntry(ADDED_DELETE_FILES_PROP, "3") .containsEntry(TOTAL_DELETE_FILES_PROP, "3") - .containsEntry(ADDED_POS_DELETES_PROP, "3") - .containsEntry(TOTAL_POS_DELETES_PROP, "3") + .containsEntry(ADDED_POS_DELETES_PROP, String.valueOf(posDeletesCount)) + .containsEntry(TOTAL_POS_DELETES_PROP, String.valueOf(posDeletesCount)) .hasEntrySatisfying( CHANGED_PARTITION_PREFIX + "data_bucket=0", v -> assertThat(v).contains(ADDED_DELETE_FILES_PROP + "=1")) @@ -953,8 +949,8 @@ public void testManifestMergingMultipleSpecs() { commit(table, table.newAppend().appendFile(secondSnapshotDataFile), branch); // commit two delete files to two specs in a single operation - DeleteFile firstDeleteFile = newDeleteFile(firstSnapshotDataFile.specId(), "data_bucket=0"); - DeleteFile secondDeleteFile = newDeleteFile(secondSnapshotDataFile.specId(), ""); + DeleteFile firstDeleteFile = newDeletes(firstSnapshotDataFile); + DeleteFile secondDeleteFile = newDeletes(secondSnapshotDataFile); commit( table, @@ -968,12 +964,18 @@ public void testManifestMergingMultipleSpecs() { assertThat(thirdSnapshot.deleteManifests(table.io())).hasSize(2); // commit two more delete files to the same specs to trigger merging - DeleteFile thirdDeleteFile = newDeleteFile(firstSnapshotDataFile.specId(), "data_bucket=0"); - DeleteFile fourthDeleteFile = newDeleteFile(secondSnapshotDataFile.specId(), ""); + DeleteFile thirdDeleteFile = newDeletes(firstSnapshotDataFile); + DeleteFile fourthDeleteFile = newDeletes(secondSnapshotDataFile); commit( table, - table.newRowDelta().addDeletes(thirdDeleteFile).addDeletes(fourthDeleteFile), + table + .newRowDelta() + .removeDeletes(firstDeleteFile) + .addDeletes(thirdDeleteFile) + .removeDeletes(secondDeleteFile) + .addDeletes(fourthDeleteFile) + .validateFromSnapshot(thirdSnapshot.snapshotId()), branch); Snapshot fourthSnapshot = latestSnapshot(table, branch); @@ -988,9 +990,9 @@ public void testManifestMergingMultipleSpecs() { firstDeleteManifest, dataSeqs(4L, 3L), fileSeqs(4L, 3L), - ids(fourthSnapshot.snapshotId(), thirdSnapshot.snapshotId()), + ids(fourthSnapshot.snapshotId(), fourthSnapshot.snapshotId()), files(thirdDeleteFile, firstDeleteFile), - statuses(Status.ADDED, Status.EXISTING)); + statuses(Status.ADDED, Status.DELETED)); ManifestFile secondDeleteManifest = fourthSnapshot.deleteManifests(table.io()).get(0); assertThat(secondDeleteManifest.partitionSpecId()).isEqualTo(secondSnapshotDataFile.specId()); @@ -998,9 +1000,9 @@ public void testManifestMergingMultipleSpecs() { secondDeleteManifest, dataSeqs(4L, 3L), fileSeqs(4L, 3L), - ids(fourthSnapshot.snapshotId(), thirdSnapshot.snapshotId()), + ids(fourthSnapshot.snapshotId(), fourthSnapshot.snapshotId()), files(fourthDeleteFile, secondDeleteFile), - statuses(Status.ADDED, Status.EXISTING)); + statuses(Status.ADDED, Status.DELETED)); } @TestTemplate @@ -1019,8 +1021,8 @@ public void testAbortMultipleSpecs() { commit(table, table.newAppend().appendFile(secondSnapshotDataFile), branch); // prepare two delete files that belong to different specs - DeleteFile firstDeleteFile = newDeleteFile(firstSnapshotDataFile.specId(), "data_bucket=0"); - DeleteFile secondDeleteFile = newDeleteFile(secondSnapshotDataFile.specId(), ""); + DeleteFile firstDeleteFile = newDeletes(firstSnapshotDataFile); + DeleteFile secondDeleteFile = newDeletes(secondSnapshotDataFile); // capture all deletes Set deletedFiles = Sets.newHashSet(); @@ -1062,7 +1064,7 @@ public void testConcurrentConflictingRowDelta() { .newRowDelta() .toBranch(branch) .addRows(FILE_B) - .addDeletes(FILE_A_DELETES) + .addDeletes(fileADeletes()) .validateFromSnapshot(firstSnapshot.snapshotId()) .conflictDetectionFilter(conflictDetectionFilter) .validateNoConflictingDataFiles() @@ -1071,7 +1073,7 @@ public void testConcurrentConflictingRowDelta() { table .newRowDelta() .toBranch(branch) - .addDeletes(FILE_A_DELETES) + .addDeletes(fileADeletes()) .validateFromSnapshot(firstSnapshot.snapshotId()) .conflictDetectionFilter(conflictDetectionFilter) .validateNoConflictingDataFiles() @@ -1094,7 +1096,7 @@ public void testConcurrentConflictingRowDeltaWithoutAppendValidation() { RowDelta rowDelta = table .newRowDelta() - .addDeletes(FILE_A_DELETES) + .addDeletes(fileADeletes()) .validateFromSnapshot(firstSnapshot.snapshotId()) .conflictDetectionFilter(conflictDetectionFilter) .validateNoConflictingDeleteFiles(); @@ -1102,7 +1104,7 @@ public void testConcurrentConflictingRowDeltaWithoutAppendValidation() { table .newRowDelta() .toBranch(branch) - .addDeletes(FILE_A_DELETES) + .addDeletes(fileADeletes()) .validateFromSnapshot(firstSnapshot.snapshotId()) .conflictDetectionFilter(conflictDetectionFilter) .validateNoConflictingDataFiles() @@ -1149,14 +1151,7 @@ public void testConcurrentNonConflictingRowDelta() { Expression conflictDetectionFilter = Expressions.equal("data", "a"); // add a delete file for partition A - DeleteFile deleteFile1 = - FileMetadata.deleteFileBuilder(table.spec()) - .ofPositionDeletes() - .withPath("/path/to/data-a-deletes.parquet") - .withFileSizeInBytes(10) - .withPartitionPath("data=a") - .withRecordCount(1) - .build(); + DeleteFile deleteFile1 = newDeletes(dataFile1); // mock a DELETE operation with serializable isolation RowDelta rowDelta = @@ -1170,14 +1165,7 @@ public void testConcurrentNonConflictingRowDelta() { .validateNoConflictingDeleteFiles(); // add a delete file for partition B - DeleteFile deleteFile2 = - FileMetadata.deleteFileBuilder(table.spec()) - .ofPositionDeletes() - .withPath("/path/to/data-b-deletes.parquet") - .withFileSizeInBytes(10) - .withPartitionPath("data=b") - .withRecordCount(1) - .build(); + DeleteFile deleteFile2 = newDeletes(dataFile2); table .newRowDelta() @@ -1320,8 +1308,8 @@ public void testConcurrentConflictingRowDeltaAndRewriteFilesWithSequenceNumber() Snapshot baseSnapshot = latestSnapshot(table, branch); - // add an position delete file - DeleteFile deleteFile1 = newDeleteFile(table.spec().specId(), "data=a"); + // add position deletes + DeleteFile deleteFile1 = newDeletes(dataFile1); // mock a DELETE operation with serializable isolation RowDelta rowDelta = @@ -1357,7 +1345,7 @@ public void testRowDeltaCaseSensitivity() { Snapshot firstSnapshot = latestSnapshot(table, branch); - commit(table, table.newRowDelta().addDeletes(FILE_A_DELETES), branch); + commit(table, table.newRowDelta().addDeletes(fileADeletes()), branch); Expression conflictDetectionFilter = Expressions.equal(Expressions.bucket("dAtA", 16), 0); @@ -1413,12 +1401,12 @@ public void testRowDeltaCaseSensitivity() { @TestTemplate public void testRewrittenDeleteFiles() { DataFile dataFile = newDataFile("data_bucket=0"); - DeleteFile deleteFile = newDeleteFile(dataFile.specId(), "data_bucket=0"); + DeleteFile deleteFile = newDeletes(dataFile); RowDelta baseRowDelta = table.newRowDelta().addRows(dataFile).addDeletes(deleteFile); Snapshot baseSnapshot = commit(table, baseRowDelta, branch); assertThat(baseSnapshot.operation()).isEqualTo(DataOperations.OVERWRITE); - DeleteFile newDeleteFile = newDeleteFile(dataFile.specId(), "data_bucket=0"); + DeleteFile newDeleteFile = newDeletes(dataFile); RowDelta rowDelta = table .newRowDelta() @@ -1458,14 +1446,16 @@ public void testRewrittenDeleteFiles() { @TestTemplate public void testConcurrentDeletesRewriteSameDeleteFile() { + assumeThat(formatVersion).isEqualTo(2); + DataFile dataFile = newDataFile("data_bucket=0"); - DeleteFile deleteFile = newDeleteFile(dataFile.specId(), "data_bucket=0"); + DeleteFile deleteFile = newDeletes(dataFile); RowDelta baseRowDelta = table.newRowDelta().addRows(dataFile).addDeletes(deleteFile); Snapshot baseSnapshot = commit(table, baseRowDelta, branch); assertThat(baseSnapshot.operation()).isEqualTo(DataOperations.OVERWRITE); // commit the first DELETE operation that replaces `deleteFile` - DeleteFile newDeleteFile1 = newDeleteFile(dataFile.specId(), "data_bucket=0"); + DeleteFile newDeleteFile1 = newDeletes(dataFile); RowDelta delete1 = table .newRowDelta() @@ -1478,7 +1468,7 @@ public void testConcurrentDeletesRewriteSameDeleteFile() { assertThat(snapshot1.sequenceNumber()).isEqualTo(2L); // commit the second DELETE operation that replaces `deleteFile` - DeleteFile newDeleteFile2 = newDeleteFile(dataFile.specId(), "data_bucket=0"); + DeleteFile newDeleteFile2 = newDeletes(dataFile); RowDelta delete2 = table .newRowDelta() @@ -1522,13 +1512,13 @@ public void testConcurrentDeletesRewriteSameDeleteFile() { @TestTemplate public void testConcurrentMergeRewriteSameDeleteFile() { DataFile dataFile = newDataFile("data_bucket=0"); - DeleteFile deleteFile = newDeleteFile(dataFile.specId(), "data_bucket=0"); + DeleteFile deleteFile = newDeletes(dataFile); RowDelta baseRowDelta = table.newRowDelta().addRows(dataFile).addDeletes(deleteFile); Snapshot baseSnapshot = commit(table, baseRowDelta, branch); assertThat(baseSnapshot.operation()).isEqualTo(DataOperations.OVERWRITE); // commit a DELETE operation that replaces `deleteFile` - DeleteFile newDeleteFile1 = newDeleteFile(dataFile.specId(), "data_bucket=0"); + DeleteFile newDeleteFile1 = newDeletes(dataFile); RowDelta delete = table .newRowDelta() @@ -1540,7 +1530,7 @@ public void testConcurrentMergeRewriteSameDeleteFile() { // attempt to commit a MERGE operation that replaces `deleteFile` DataFile newDataFile2 = newDataFile("data_bucket=0"); - DeleteFile newDeleteFile2 = newDeleteFile(dataFile.specId(), "data_bucket=0"); + DeleteFile newDeleteFile2 = newDeletes(dataFile); RowDelta merge = table .newRowDelta() @@ -1556,4 +1546,102 @@ public void testConcurrentMergeRewriteSameDeleteFile() { .isInstanceOf(ValidationException.class) .hasMessageStartingWith("Found new conflicting delete files that can apply"); } + + @TestTemplate + public void testConcurrentDVsForSameDataFile() { + assumeThat(formatVersion).isGreaterThanOrEqualTo(3); + + DataFile dataFile = newDataFile("data_bucket=0"); + commit(table, table.newRowDelta().addRows(dataFile), branch); + + DeleteFile deleteFile1 = newDeletes(dataFile); + RowDelta rowDelta1 = table.newRowDelta().addDeletes(deleteFile1); + + DeleteFile deleteFile2 = newDeletes(dataFile); + RowDelta rowDelta2 = table.newRowDelta().addDeletes(deleteFile2); + + commit(table, rowDelta1, branch); + + assertThatThrownBy(() -> commit(table, rowDelta2, branch)) + .isInstanceOf(ValidationException.class) + .hasMessageContaining("Found concurrently added DV for %s", dataFile.location()); + } + + @TestTemplate + public void testManifestMergingAfterUpgradeToV3() { + assumeThat(formatVersion).isEqualTo(2); + + // enable manifest merging + table + .updateProperties() + .set(TableProperties.MANIFEST_MERGE_ENABLED, "true") + .set(TableProperties.MANIFEST_MIN_MERGE_COUNT, "2") + .commit(); + + // add a data file + DataFile dataFile = newDataFile("data_bucket=0"); + commit(table, table.newAppend().appendFile(dataFile), branch); + + // commit a delete operation using a positional delete file + DeleteFile deleteFile = newDeleteFileWithRef(dataFile); + assertThat(deleteFile.format()).isEqualTo(FileFormat.PARQUET); + RowDelta rowDelta1 = table.newRowDelta().addDeletes(deleteFile); + Snapshot deleteFileSnapshot = commit(table, rowDelta1, branch); + + // upgrade the table + table.updateProperties().set(TableProperties.FORMAT_VERSION, "3").commit(); + + // commit a DV + DeleteFile dv = newDV(dataFile); + assertThat(dv.format()).isEqualTo(FileFormat.PUFFIN); + RowDelta rowDelta2 = table.newRowDelta().addDeletes(dv); + Snapshot dvSnapshot = commit(table, rowDelta2, branch); + + // both must be part of the table and merged into one manifest + ManifestFile deleteManifest = Iterables.getOnlyElement(dvSnapshot.deleteManifests(table.io())); + validateDeleteManifest( + deleteManifest, + dataSeqs(3L, 2L), + fileSeqs(3L, 2L), + ids(dvSnapshot.snapshotId(), deleteFileSnapshot.snapshotId()), + files(dv, deleteFile), + statuses(Status.ADDED, Status.EXISTING)); + + // only the DV must be assigned during planning + List tasks = planFiles(); + FileScanTask task = Iterables.getOnlyElement(tasks).asFileScanTask(); + assertThat(task.deletes()).hasSize(1); + DeleteFile taskDV = Iterables.getOnlyElement(task.deletes()); + assertThat(taskDV.location()).isEqualTo(dv.location()); + assertThat(taskDV.referencedDataFile()).isEqualTo(dv.referencedDataFile()); + assertThat(taskDV.contentOffset()).isEqualTo(dv.contentOffset()); + assertThat(taskDV.contentSizeInBytes()).isEqualTo(dv.contentSizeInBytes()); + } + + @TestTemplate + public void testInabilityToAddPositionDeleteFilesInTablesWithDVs() { + assumeThat(formatVersion).isGreaterThanOrEqualTo(3); + DeleteFile deleteFile = newDeleteFile(table.spec().specId(), "data_bucket=0"); + assertThatThrownBy(() -> table.newRowDelta().addDeletes(deleteFile)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Must use DVs for position deletes in V%s", formatVersion); + } + + @TestTemplate + public void testInabilityToAddDVToV2Tables() { + assumeThat(formatVersion).isEqualTo(2); + DataFile dataFile = newDataFile("data_bucket=0"); + DeleteFile dv = newDV(dataFile); + assertThatThrownBy(() -> table.newRowDelta().addDeletes(dv)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Must not use DVs for position deletes in V2"); + } + + private List planFiles() { + try (CloseableIterable tasks = table.newBatchScan().useRef(branch).planFiles()) { + return Lists.newArrayList(tasks); + } catch (IOException e) { + throw new RuntimeException(e); + } + } } diff --git a/core/src/test/java/org/apache/iceberg/TestSnapshot.java b/core/src/test/java/org/apache/iceberg/TestSnapshot.java index 8a30036f3242..bbe5e8f6cdd8 100644 --- a/core/src/test/java/org/apache/iceberg/TestSnapshot.java +++ b/core/src/test/java/org/apache/iceberg/TestSnapshot.java @@ -123,7 +123,7 @@ public void testCachedDeleteFiles() { int specId = table.spec().specId(); DataFile secondSnapshotDataFile = newDataFile("data_bucket=8/data_trunc_2=aa"); - DeleteFile secondSnapshotDeleteFile = newDeleteFile(specId, "data_bucket=8/data_trunc_2=aa"); + DeleteFile secondSnapshotDeleteFile = newDeletes(secondSnapshotDataFile); table .newRowDelta() @@ -131,7 +131,7 @@ public void testCachedDeleteFiles() { .addDeletes(secondSnapshotDeleteFile) .commit(); - DeleteFile thirdSnapshotDeleteFile = newDeleteFile(specId, "data_bucket=8/data_trunc_2=aa"); + DeleteFile thirdSnapshotDeleteFile = newDeletes(secondSnapshotDataFile); ImmutableSet replacedDeleteFiles = ImmutableSet.of(secondSnapshotDeleteFile); ImmutableSet newDeleteFiles = ImmutableSet.of(thirdSnapshotDeleteFile); @@ -248,11 +248,9 @@ public void testSequenceNumbersInAddedDeleteFiles() { table.newFastAppend().appendFile(FILE_A).appendFile(FILE_B).commit(); - int specId = table.spec().specId(); - - runAddedDeleteFileSequenceNumberTest(newDeleteFile(specId, "data_bucket=8"), 2); + runAddedDeleteFileSequenceNumberTest(newDeletes(FILE_A), 2); - runAddedDeleteFileSequenceNumberTest(newDeleteFile(specId, "data_bucket=28"), 3); + runAddedDeleteFileSequenceNumberTest(newDeletes(FILE_B), 3); } private void runAddedDeleteFileSequenceNumberTest( diff --git a/core/src/test/java/org/apache/iceberg/TestSnapshotSummary.java b/core/src/test/java/org/apache/iceberg/TestSnapshotSummary.java index b0b9d003e35b..9c67e766a993 100644 --- a/core/src/test/java/org/apache/iceberg/TestSnapshotSummary.java +++ b/core/src/test/java/org/apache/iceberg/TestSnapshotSummary.java @@ -78,7 +78,7 @@ public void testFileSizeSummary() { @TestTemplate public void testFileSizeSummaryWithDeletes() { - assumeThat(formatVersion).isGreaterThan(1); + assumeThat(formatVersion).isEqualTo(2); table.newRowDelta().addDeletes(FILE_A_DELETES).addDeletes(FILE_A2_DELETES).commit(); @@ -260,7 +260,7 @@ public void rowDeltaWithDuplicates() { @TestTemplate public void rowDeltaWithDeletesAndDuplicates() { - assumeThat(formatVersion).isGreaterThan(1); + assumeThat(formatVersion).isEqualTo(2); assertThat(listManifestFiles()).isEmpty(); table @@ -325,7 +325,7 @@ public void rewriteWithDuplicateFiles() { @TestTemplate public void rewriteWithDeletesAndDuplicates() { - assumeThat(formatVersion).isGreaterThan(1); + assumeThat(formatVersion).isEqualTo(2); assertThat(listManifestFiles()).isEmpty(); table.newRowDelta().addRows(FILE_A2).addDeletes(FILE_A_DELETES).commit(); diff --git a/data/src/test/java/org/apache/iceberg/io/TestDVWriters.java b/data/src/test/java/org/apache/iceberg/io/TestDVWriters.java index 23e0090ca49f..4e50ee57db41 100644 --- a/data/src/test/java/org/apache/iceberg/io/TestDVWriters.java +++ b/data/src/test/java/org/apache/iceberg/io/TestDVWriters.java @@ -34,6 +34,7 @@ import org.apache.iceberg.Parameters; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.RowDelta; +import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.data.BaseDeleteLoader; @@ -295,9 +296,13 @@ public void testApplyPartitionScopedPositionDeletes() throws IOException { } private void commit(DeleteWriteResult result) { + Snapshot startSnapshot = table.currentSnapshot(); RowDelta rowDelta = table.newRowDelta(); result.rewrittenDeleteFiles().forEach(rowDelta::removeDeletes); result.deleteFiles().forEach(rowDelta::addDeletes); + if (startSnapshot != null) { + rowDelta.validateFromSnapshot(startSnapshot.snapshotId()); + } rowDelta.commit(); } diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanDeletes.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanDeletes.java index 9361c63176e0..659507e4c5e3 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanDeletes.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanDeletes.java @@ -42,7 +42,11 @@ public static List parameters() { new Object[] {2, LOCAL, LOCAL}, new Object[] {2, LOCAL, DISTRIBUTED}, new Object[] {2, DISTRIBUTED, LOCAL}, - new Object[] {2, LOCAL, DISTRIBUTED}); + new Object[] {2, LOCAL, DISTRIBUTED}, + new Object[] {3, LOCAL, LOCAL}, + new Object[] {3, LOCAL, DISTRIBUTED}, + new Object[] {3, DISTRIBUTED, LOCAL}, + new Object[] {3, DISTRIBUTED, DISTRIBUTED}); } private static SparkSession spark = null; diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanReporting.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanReporting.java index acd4688440d1..2665d7ba8d3b 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanReporting.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanReporting.java @@ -41,7 +41,11 @@ public static List parameters() { new Object[] {2, LOCAL, LOCAL}, new Object[] {2, LOCAL, DISTRIBUTED}, new Object[] {2, DISTRIBUTED, LOCAL}, - new Object[] {2, DISTRIBUTED, DISTRIBUTED}); + new Object[] {2, DISTRIBUTED, DISTRIBUTED}, + new Object[] {3, LOCAL, LOCAL}, + new Object[] {3, LOCAL, DISTRIBUTED}, + new Object[] {3, DISTRIBUTED, LOCAL}, + new Object[] {3, DISTRIBUTED, DISTRIBUTED}); } private static SparkSession spark = null; diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java index 79e48f47f241..11d61e599eba 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java @@ -719,7 +719,7 @@ public void testRewriteLargeManifestsEvolvedUnpartitionedV1Table() throws IOExce @TestTemplate public void testRewriteSmallDeleteManifestsNonPartitionedTable() throws IOException { - assumeThat(formatVersion).isGreaterThan(1); + assumeThat(formatVersion).isEqualTo(2); PartitionSpec spec = PartitionSpec.unpartitioned(); Map options = Maps.newHashMap(); @@ -792,7 +792,7 @@ public void testRewriteSmallDeleteManifestsNonPartitionedTable() throws IOExcept @TestTemplate public void testRewriteSmallDeleteManifestsPartitionedTable() throws IOException { - assumeThat(formatVersion).isGreaterThan(1); + assumeThat(formatVersion).isEqualTo(2); PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("c3").build(); Map options = Maps.newHashMap(); @@ -895,7 +895,7 @@ public void testRewriteSmallDeleteManifestsPartitionedTable() throws IOException @TestTemplate public void testRewriteLargeDeleteManifestsPartitionedTable() throws IOException { - assumeThat(formatVersion).isGreaterThan(1); + assumeThat(formatVersion).isEqualTo(2); PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("c3").build(); Map options = Maps.newHashMap(); @@ -956,6 +956,62 @@ public void testRewriteLargeDeleteManifestsPartitionedTable() throws IOException assertThat(deleteManifests).hasSizeGreaterThanOrEqualTo(2); } + @TestTemplate + public void testRewriteManifestsAfterUpgradeToV3() throws IOException { + assumeThat(formatVersion).isEqualTo(2); + + PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("c1").build(); + Map options = ImmutableMap.of(TableProperties.FORMAT_VERSION, "2"); + Table table = TABLES.create(SCHEMA, spec, options, tableLocation); + + DataFile dataFile1 = newDataFile(table, "c1=1"); + DeleteFile deleteFile1 = newDeletes(table, dataFile1); + table.newRowDelta().addRows(dataFile1).addDeletes(deleteFile1).commit(); + + DataFile dataFile2 = newDataFile(table, "c1=1"); + DeleteFile deleteFile2 = newDeletes(table, dataFile2); + table.newRowDelta().addRows(dataFile2).addDeletes(deleteFile2).commit(); + + // upgrade the table to enable DVs + table.updateProperties().set(TableProperties.FORMAT_VERSION, "3").commit(); + + DataFile dataFile3 = newDataFile(table, "c1=1"); + DeleteFile dv3 = newDV(table, dataFile3); + table.newRowDelta().addRows(dataFile3).addDeletes(dv3).commit(); + + SparkActions actions = SparkActions.get(); + + RewriteManifests.Result result = + actions + .rewriteManifests(table) + .rewriteIf(manifest -> true) + .option(RewriteManifestsSparkAction.USE_CACHING, useCaching) + .execute(); + + assertThat(result.rewrittenManifests()).as("Action should rewrite 6 manifests").hasSize(6); + assertThat(result.addedManifests()).as("Action should add 2 manifests").hasSize(2); + assertManifestsLocation(result.addedManifests()); + + table.refresh(); + + try (CloseableIterable tasks = table.newScan().planFiles()) { + for (FileScanTask fileTask : tasks) { + DataFile dataFile = fileTask.file(); + DeleteFile deleteFile = Iterables.getOnlyElement(fileTask.deletes()); + if (dataFile.location().equals(dataFile1.location())) { + assertThat(deleteFile.referencedDataFile()).isEqualTo(deleteFile1.referencedDataFile()); + assertEqual(deleteFile, deleteFile1); + } else if (dataFile.location().equals(dataFile2.location())) { + assertThat(deleteFile.referencedDataFile()).isEqualTo(deleteFile2.referencedDataFile()); + assertEqual(deleteFile, deleteFile2); + } else { + assertThat(deleteFile.referencedDataFile()).isEqualTo(dv3.referencedDataFile()); + assertEqual(deleteFile, dv3); + } + } + } + } + private List actualRecords() { return spark .read()