diff --git a/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java b/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java index 54bf3c6e44c4..2fa757086dd7 100644 --- a/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java +++ b/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java @@ -57,7 +57,8 @@ public class BaseRewriteManifests extends SnapshotProducer private final TableOperations ops; private final Map specsById; private final long manifestTargetSizeBytes; - private final boolean snapshotIdInheritanceEnabled; + private final int formatVersion; + private final boolean canInheritSnapshotId; private final Set deletedManifests = Sets.newHashSet(); private final List addedManifests = Lists.newArrayList(); @@ -82,10 +83,12 @@ public class BaseRewriteManifests extends SnapshotProducer this.manifestTargetSizeBytes = ops.current() .propertyAsLong(MANIFEST_TARGET_SIZE_BYTES, MANIFEST_TARGET_SIZE_BYTES_DEFAULT); - this.snapshotIdInheritanceEnabled = + this.formatVersion = ops.current().formatVersion(); + boolean snapshotIdInheritanceEnabled = ops.current() .propertyAsBoolean( SNAPSHOT_ID_INHERITANCE_ENABLED, SNAPSHOT_ID_INHERITANCE_ENABLED_DEFAULT); + this.canInheritSnapshotId = formatVersion > 1 || snapshotIdInheritanceEnabled; } @Override @@ -148,7 +151,7 @@ public RewriteManifests addManifest(ManifestFile manifest) { Preconditions.checkArgument( manifest.sequenceNumber() == -1, "Sequence must be assigned during commit"); - if (snapshotIdInheritanceEnabled && manifest.snapshotId() == null) { + if (canInheritSnapshotId && manifest.snapshotId() == null) { addedManifests.add(manifest); } else { // the manifest must be rewritten with this update's snapshot ID @@ -160,11 +163,10 @@ public RewriteManifests addManifest(ManifestFile manifest) { } private ManifestFile copyManifest(ManifestFile manifest) { - TableMetadata current = ops.current(); InputFile toCopy = ops.io().newInputFile(manifest.path()); OutputFile newFile = newManifestOutput(); return ManifestFiles.copyRewriteManifest( - current.formatVersion(), + formatVersion, manifest.partitionSpecId(), toCopy, specsById, diff --git a/core/src/test/java/org/apache/iceberg/TestRewriteManifests.java b/core/src/test/java/org/apache/iceberg/TestRewriteManifests.java index dbda87baf7f5..d7daae8b3ed4 100644 --- a/core/src/test/java/org/apache/iceberg/TestRewriteManifests.java +++ b/core/src/test/java/org/apache/iceberg/TestRewriteManifests.java @@ -20,6 +20,7 @@ import static org.apache.iceberg.TableProperties.MANIFEST_MERGE_ENABLED; import static org.apache.iceberg.TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED; +import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; @@ -443,6 +444,14 @@ public void testBasicManifestReplacement() throws IOException { List manifests = snapshot.allManifests(table.io()); Assert.assertEquals(3, manifests.size()); + if (formatVersion == 1) { + assertThat(manifests.get(0).path()).isNotEqualTo(firstNewManifest.path()); + assertThat(manifests.get(1).path()).isNotEqualTo(secondNewManifest.path()); + } else { + assertThat(manifests.get(0).path()).isEqualTo(firstNewManifest.path()); + assertThat(manifests.get(1).path()).isEqualTo(secondNewManifest.path()); + } + validateSummary(snapshot, 1, 1, 2, 0); validateManifestEntries( @@ -499,6 +508,9 @@ public void testBasicManifestReplacementWithSnapshotIdInheritance() throws IOExc List manifests = snapshot.allManifests(table.io()); Assert.assertEquals(3, manifests.size()); + assertThat(manifests.get(0).path()).isEqualTo(firstNewManifest.path()); + assertThat(manifests.get(1).path()).isEqualTo(secondNewManifest.path()); + validateSummary(snapshot, 1, 1, 2, 0); validateManifestEntries( diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java index 2b525ccc0b69..3916fe0350cf 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java @@ -337,7 +337,7 @@ private void replaceManifests( addedManifests.forEach(rewriteManifests::addManifest); commit(rewriteManifests); - if (!snapshotIdInheritanceEnabled) { + if (formatVersion == 1 && !snapshotIdInheritanceEnabled) { // delete new manifests as they were rewritten before the commit deleteFiles(Iterables.transform(addedManifests, ManifestFile::path)); } diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java index 4aafb72acef9..73026ef6ca8a 100644 --- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java @@ -24,6 +24,7 @@ import static org.apache.iceberg.ValidationHelpers.snapshotIds; import static org.apache.iceberg.ValidationHelpers.validateDataManifest; import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.assertj.core.api.Assumptions.assumeThat; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; @@ -75,17 +76,24 @@ public class TestRewriteManifestsAction extends SparkTestBase { optional(2, "c2", Types.StringType.get()), optional(3, "c3", Types.StringType.get())); - @Parameterized.Parameters(name = "snapshotIdInheritanceEnabled = {0}") + @Parameterized.Parameters(name = "formatVersion = {0}, snapshotIdInheritanceEnabled = {1}") public static Object[] parameters() { - return new Object[] {"true", "false"}; + return new Object[][] { + new Object[] {1, "true"}, + new Object[] {1, "false"}, + new Object[] {2, "true"}, + new Object[] {2, "false"} + }; } @Rule public TemporaryFolder temp = new TemporaryFolder(); + private final int formatVersion; private final String snapshotIdInheritanceEnabled; private String tableLocation = null; - public TestRewriteManifestsAction(String snapshotIdInheritanceEnabled) { + public TestRewriteManifestsAction(int formatVersion, String snapshotIdInheritanceEnabled) { + this.formatVersion = formatVersion; this.snapshotIdInheritanceEnabled = snapshotIdInheritanceEnabled; } @@ -99,6 +107,7 @@ public void setupTableLocation() throws Exception { public void testRewriteManifestsEmptyTable() throws IOException { PartitionSpec spec = PartitionSpec.unpartitioned(); Map options = Maps.newHashMap(); + options.put(TableProperties.FORMAT_VERSION, String.valueOf(formatVersion)); options.put(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, snapshotIdInheritanceEnabled); Table table = TABLES.create(SCHEMA, spec, options, tableLocation); @@ -119,6 +128,7 @@ public void testRewriteManifestsEmptyTable() throws IOException { public void testRewriteSmallManifestsNonPartitionedTable() { PartitionSpec spec = PartitionSpec.unpartitioned(); Map options = Maps.newHashMap(); + options.put(TableProperties.FORMAT_VERSION, String.valueOf(formatVersion)); options.put(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, snapshotIdInheritanceEnabled); Table table = TABLES.create(SCHEMA, spec, options, tableLocation); @@ -172,6 +182,7 @@ public void testRewriteSmallManifestsNonPartitionedTable() { public void testRewriteManifestsWithCommitStateUnknownException() { PartitionSpec spec = PartitionSpec.unpartitioned(); Map options = Maps.newHashMap(); + options.put(TableProperties.FORMAT_VERSION, String.valueOf(formatVersion)); options.put(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, snapshotIdInheritanceEnabled); Table table = TABLES.create(SCHEMA, spec, options, tableLocation); @@ -238,6 +249,7 @@ public void testRewriteManifestsWithCommitStateUnknownException() { public void testRewriteSmallManifestsPartitionedTable() { PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("c1").truncate("c2", 2).build(); Map options = Maps.newHashMap(); + options.put(TableProperties.FORMAT_VERSION, String.valueOf(formatVersion)); options.put(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, snapshotIdInheritanceEnabled); Table table = TABLES.create(SCHEMA, spec, options, tableLocation); @@ -318,6 +330,7 @@ public void testRewriteSmallManifestsPartitionedTable() { public void testRewriteImportedManifests() throws IOException { PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("c3").build(); Map options = Maps.newHashMap(); + options.put(TableProperties.FORMAT_VERSION, String.valueOf(formatVersion)); options.put(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, snapshotIdInheritanceEnabled); Table table = TABLES.create(SCHEMA, spec, options, tableLocation); @@ -373,6 +386,7 @@ public void testRewriteImportedManifests() throws IOException { public void testRewriteLargeManifestsPartitionedTable() throws IOException { PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("c3").build(); Map options = Maps.newHashMap(); + options.put(TableProperties.FORMAT_VERSION, String.valueOf(formatVersion)); options.put(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, snapshotIdInheritanceEnabled); Table table = TABLES.create(SCHEMA, spec, options, tableLocation); @@ -428,6 +442,7 @@ public void testRewriteLargeManifestsPartitionedTable() throws IOException { public void testRewriteManifestsWithPredicate() throws IOException { PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("c1").truncate("c2", 2).build(); Map options = Maps.newHashMap(); + options.put(TableProperties.FORMAT_VERSION, String.valueOf(formatVersion)); options.put(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, snapshotIdInheritanceEnabled); Table table = TABLES.create(SCHEMA, spec, options, tableLocation); @@ -495,6 +510,8 @@ public void testRewriteManifestsWithPredicate() throws IOException { @Test public void testRewriteSmallManifestsNonPartitionedV2Table() { + assumeThat(formatVersion).isGreaterThan(1); + PartitionSpec spec = PartitionSpec.unpartitioned(); Map properties = ImmutableMap.of(TableProperties.FORMAT_VERSION, "2"); Table table = TABLES.create(SCHEMA, spec, properties, tableLocation); diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java index 06a5c8c5720f..d5ab93ee6340 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java @@ -337,7 +337,7 @@ private void replaceManifests( addedManifests.forEach(rewriteManifests::addManifest); commit(rewriteManifests); - if (!snapshotIdInheritanceEnabled) { + if (formatVersion == 1 && !snapshotIdInheritanceEnabled) { // delete new manifests as they were rewritten before the commit deleteFiles(Iterables.transform(addedManifests, ManifestFile::path)); } diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java index 4aafb72acef9..73026ef6ca8a 100644 --- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java @@ -24,6 +24,7 @@ import static org.apache.iceberg.ValidationHelpers.snapshotIds; import static org.apache.iceberg.ValidationHelpers.validateDataManifest; import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.assertj.core.api.Assumptions.assumeThat; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; @@ -75,17 +76,24 @@ public class TestRewriteManifestsAction extends SparkTestBase { optional(2, "c2", Types.StringType.get()), optional(3, "c3", Types.StringType.get())); - @Parameterized.Parameters(name = "snapshotIdInheritanceEnabled = {0}") + @Parameterized.Parameters(name = "formatVersion = {0}, snapshotIdInheritanceEnabled = {1}") public static Object[] parameters() { - return new Object[] {"true", "false"}; + return new Object[][] { + new Object[] {1, "true"}, + new Object[] {1, "false"}, + new Object[] {2, "true"}, + new Object[] {2, "false"} + }; } @Rule public TemporaryFolder temp = new TemporaryFolder(); + private final int formatVersion; private final String snapshotIdInheritanceEnabled; private String tableLocation = null; - public TestRewriteManifestsAction(String snapshotIdInheritanceEnabled) { + public TestRewriteManifestsAction(int formatVersion, String snapshotIdInheritanceEnabled) { + this.formatVersion = formatVersion; this.snapshotIdInheritanceEnabled = snapshotIdInheritanceEnabled; } @@ -99,6 +107,7 @@ public void setupTableLocation() throws Exception { public void testRewriteManifestsEmptyTable() throws IOException { PartitionSpec spec = PartitionSpec.unpartitioned(); Map options = Maps.newHashMap(); + options.put(TableProperties.FORMAT_VERSION, String.valueOf(formatVersion)); options.put(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, snapshotIdInheritanceEnabled); Table table = TABLES.create(SCHEMA, spec, options, tableLocation); @@ -119,6 +128,7 @@ public void testRewriteManifestsEmptyTable() throws IOException { public void testRewriteSmallManifestsNonPartitionedTable() { PartitionSpec spec = PartitionSpec.unpartitioned(); Map options = Maps.newHashMap(); + options.put(TableProperties.FORMAT_VERSION, String.valueOf(formatVersion)); options.put(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, snapshotIdInheritanceEnabled); Table table = TABLES.create(SCHEMA, spec, options, tableLocation); @@ -172,6 +182,7 @@ public void testRewriteSmallManifestsNonPartitionedTable() { public void testRewriteManifestsWithCommitStateUnknownException() { PartitionSpec spec = PartitionSpec.unpartitioned(); Map options = Maps.newHashMap(); + options.put(TableProperties.FORMAT_VERSION, String.valueOf(formatVersion)); options.put(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, snapshotIdInheritanceEnabled); Table table = TABLES.create(SCHEMA, spec, options, tableLocation); @@ -238,6 +249,7 @@ public void testRewriteManifestsWithCommitStateUnknownException() { public void testRewriteSmallManifestsPartitionedTable() { PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("c1").truncate("c2", 2).build(); Map options = Maps.newHashMap(); + options.put(TableProperties.FORMAT_VERSION, String.valueOf(formatVersion)); options.put(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, snapshotIdInheritanceEnabled); Table table = TABLES.create(SCHEMA, spec, options, tableLocation); @@ -318,6 +330,7 @@ public void testRewriteSmallManifestsPartitionedTable() { public void testRewriteImportedManifests() throws IOException { PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("c3").build(); Map options = Maps.newHashMap(); + options.put(TableProperties.FORMAT_VERSION, String.valueOf(formatVersion)); options.put(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, snapshotIdInheritanceEnabled); Table table = TABLES.create(SCHEMA, spec, options, tableLocation); @@ -373,6 +386,7 @@ public void testRewriteImportedManifests() throws IOException { public void testRewriteLargeManifestsPartitionedTable() throws IOException { PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("c3").build(); Map options = Maps.newHashMap(); + options.put(TableProperties.FORMAT_VERSION, String.valueOf(formatVersion)); options.put(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, snapshotIdInheritanceEnabled); Table table = TABLES.create(SCHEMA, spec, options, tableLocation); @@ -428,6 +442,7 @@ public void testRewriteLargeManifestsPartitionedTable() throws IOException { public void testRewriteManifestsWithPredicate() throws IOException { PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("c1").truncate("c2", 2).build(); Map options = Maps.newHashMap(); + options.put(TableProperties.FORMAT_VERSION, String.valueOf(formatVersion)); options.put(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, snapshotIdInheritanceEnabled); Table table = TABLES.create(SCHEMA, spec, options, tableLocation); @@ -495,6 +510,8 @@ public void testRewriteManifestsWithPredicate() throws IOException { @Test public void testRewriteSmallManifestsNonPartitionedV2Table() { + assumeThat(formatVersion).isGreaterThan(1); + PartitionSpec spec = PartitionSpec.unpartitioned(); Map properties = ImmutableMap.of(TableProperties.FORMAT_VERSION, "2"); Table table = TABLES.create(SCHEMA, spec, properties, tableLocation); diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java index 06a5c8c5720f..d5ab93ee6340 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java @@ -337,7 +337,7 @@ private void replaceManifests( addedManifests.forEach(rewriteManifests::addManifest); commit(rewriteManifests); - if (!snapshotIdInheritanceEnabled) { + if (formatVersion == 1 && !snapshotIdInheritanceEnabled) { // delete new manifests as they were rewritten before the commit deleteFiles(Iterables.transform(addedManifests, ManifestFile::path)); } diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java index 64dbf42d4c8e..1c9cfcbe5155 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java @@ -24,6 +24,7 @@ import static org.apache.iceberg.ValidationHelpers.snapshotIds; import static org.apache.iceberg.ValidationHelpers.validateDataManifest; import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.assertj.core.api.Assumptions.assumeThat; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; @@ -75,17 +76,24 @@ public class TestRewriteManifestsAction extends SparkTestBase { optional(2, "c2", Types.StringType.get()), optional(3, "c3", Types.StringType.get())); - @Parameterized.Parameters(name = "snapshotIdInheritanceEnabled = {0}") + @Parameterized.Parameters(name = "formatVersion = {0}, snapshotIdInheritanceEnabled = {1}") public static Object[] parameters() { - return new Object[] {"true", "false"}; + return new Object[][] { + new Object[] {1, "true"}, + new Object[] {1, "false"}, + new Object[] {2, "true"}, + new Object[] {2, "false"} + }; } @Rule public TemporaryFolder temp = new TemporaryFolder(); + private final int formatVersion; private final String snapshotIdInheritanceEnabled; private String tableLocation = null; - public TestRewriteManifestsAction(String snapshotIdInheritanceEnabled) { + public TestRewriteManifestsAction(int formatVersion, String snapshotIdInheritanceEnabled) { + this.formatVersion = formatVersion; this.snapshotIdInheritanceEnabled = snapshotIdInheritanceEnabled; } @@ -99,6 +107,7 @@ public void setupTableLocation() throws Exception { public void testRewriteManifestsEmptyTable() throws IOException { PartitionSpec spec = PartitionSpec.unpartitioned(); Map options = Maps.newHashMap(); + options.put(TableProperties.FORMAT_VERSION, String.valueOf(formatVersion)); options.put(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, snapshotIdInheritanceEnabled); Table table = TABLES.create(SCHEMA, spec, options, tableLocation); @@ -119,6 +128,7 @@ public void testRewriteManifestsEmptyTable() throws IOException { public void testRewriteSmallManifestsNonPartitionedTable() { PartitionSpec spec = PartitionSpec.unpartitioned(); Map options = Maps.newHashMap(); + options.put(TableProperties.FORMAT_VERSION, String.valueOf(formatVersion)); options.put(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, snapshotIdInheritanceEnabled); Table table = TABLES.create(SCHEMA, spec, options, tableLocation); @@ -172,6 +182,7 @@ public void testRewriteSmallManifestsNonPartitionedTable() { public void testRewriteManifestsWithCommitStateUnknownException() { PartitionSpec spec = PartitionSpec.unpartitioned(); Map options = Maps.newHashMap(); + options.put(TableProperties.FORMAT_VERSION, String.valueOf(formatVersion)); options.put(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, snapshotIdInheritanceEnabled); Table table = TABLES.create(SCHEMA, spec, options, tableLocation); @@ -238,6 +249,7 @@ public void testRewriteManifestsWithCommitStateUnknownException() { public void testRewriteSmallManifestsPartitionedTable() { PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("c1").truncate("c2", 2).build(); Map options = Maps.newHashMap(); + options.put(TableProperties.FORMAT_VERSION, String.valueOf(formatVersion)); options.put(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, snapshotIdInheritanceEnabled); Table table = TABLES.create(SCHEMA, spec, options, tableLocation); @@ -318,6 +330,7 @@ public void testRewriteSmallManifestsPartitionedTable() { public void testRewriteImportedManifests() throws IOException { PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("c3").build(); Map options = Maps.newHashMap(); + options.put(TableProperties.FORMAT_VERSION, String.valueOf(formatVersion)); options.put(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, snapshotIdInheritanceEnabled); Table table = TABLES.create(SCHEMA, spec, options, tableLocation); @@ -373,6 +386,7 @@ public void testRewriteImportedManifests() throws IOException { public void testRewriteLargeManifestsPartitionedTable() throws IOException { PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("c3").build(); Map options = Maps.newHashMap(); + options.put(TableProperties.FORMAT_VERSION, String.valueOf(formatVersion)); options.put(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, snapshotIdInheritanceEnabled); Table table = TABLES.create(SCHEMA, spec, options, tableLocation); @@ -428,6 +442,7 @@ public void testRewriteLargeManifestsPartitionedTable() throws IOException { public void testRewriteManifestsWithPredicate() throws IOException { PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("c1").truncate("c2", 2).build(); Map options = Maps.newHashMap(); + options.put(TableProperties.FORMAT_VERSION, String.valueOf(formatVersion)); options.put(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, snapshotIdInheritanceEnabled); Table table = TABLES.create(SCHEMA, spec, options, tableLocation); @@ -495,6 +510,8 @@ public void testRewriteManifestsWithPredicate() throws IOException { @Test public void testRewriteSmallManifestsNonPartitionedV2Table() { + assumeThat(formatVersion).isGreaterThan(1); + PartitionSpec spec = PartitionSpec.unpartitioned(); Map properties = ImmutableMap.of(TableProperties.FORMAT_VERSION, "2"); Table table = TABLES.create(SCHEMA, spec, properties, tableLocation); diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java index 06a5c8c5720f..d5ab93ee6340 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java @@ -337,7 +337,7 @@ private void replaceManifests( addedManifests.forEach(rewriteManifests::addManifest); commit(rewriteManifests); - if (!snapshotIdInheritanceEnabled) { + if (formatVersion == 1 && !snapshotIdInheritanceEnabled) { // delete new manifests as they were rewritten before the commit deleteFiles(Iterables.transform(addedManifests, ManifestFile::path)); } diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java index 64dbf42d4c8e..1c9cfcbe5155 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java @@ -24,6 +24,7 @@ import static org.apache.iceberg.ValidationHelpers.snapshotIds; import static org.apache.iceberg.ValidationHelpers.validateDataManifest; import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.assertj.core.api.Assumptions.assumeThat; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; @@ -75,17 +76,24 @@ public class TestRewriteManifestsAction extends SparkTestBase { optional(2, "c2", Types.StringType.get()), optional(3, "c3", Types.StringType.get())); - @Parameterized.Parameters(name = "snapshotIdInheritanceEnabled = {0}") + @Parameterized.Parameters(name = "formatVersion = {0}, snapshotIdInheritanceEnabled = {1}") public static Object[] parameters() { - return new Object[] {"true", "false"}; + return new Object[][] { + new Object[] {1, "true"}, + new Object[] {1, "false"}, + new Object[] {2, "true"}, + new Object[] {2, "false"} + }; } @Rule public TemporaryFolder temp = new TemporaryFolder(); + private final int formatVersion; private final String snapshotIdInheritanceEnabled; private String tableLocation = null; - public TestRewriteManifestsAction(String snapshotIdInheritanceEnabled) { + public TestRewriteManifestsAction(int formatVersion, String snapshotIdInheritanceEnabled) { + this.formatVersion = formatVersion; this.snapshotIdInheritanceEnabled = snapshotIdInheritanceEnabled; } @@ -99,6 +107,7 @@ public void setupTableLocation() throws Exception { public void testRewriteManifestsEmptyTable() throws IOException { PartitionSpec spec = PartitionSpec.unpartitioned(); Map options = Maps.newHashMap(); + options.put(TableProperties.FORMAT_VERSION, String.valueOf(formatVersion)); options.put(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, snapshotIdInheritanceEnabled); Table table = TABLES.create(SCHEMA, spec, options, tableLocation); @@ -119,6 +128,7 @@ public void testRewriteManifestsEmptyTable() throws IOException { public void testRewriteSmallManifestsNonPartitionedTable() { PartitionSpec spec = PartitionSpec.unpartitioned(); Map options = Maps.newHashMap(); + options.put(TableProperties.FORMAT_VERSION, String.valueOf(formatVersion)); options.put(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, snapshotIdInheritanceEnabled); Table table = TABLES.create(SCHEMA, spec, options, tableLocation); @@ -172,6 +182,7 @@ public void testRewriteSmallManifestsNonPartitionedTable() { public void testRewriteManifestsWithCommitStateUnknownException() { PartitionSpec spec = PartitionSpec.unpartitioned(); Map options = Maps.newHashMap(); + options.put(TableProperties.FORMAT_VERSION, String.valueOf(formatVersion)); options.put(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, snapshotIdInheritanceEnabled); Table table = TABLES.create(SCHEMA, spec, options, tableLocation); @@ -238,6 +249,7 @@ public void testRewriteManifestsWithCommitStateUnknownException() { public void testRewriteSmallManifestsPartitionedTable() { PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("c1").truncate("c2", 2).build(); Map options = Maps.newHashMap(); + options.put(TableProperties.FORMAT_VERSION, String.valueOf(formatVersion)); options.put(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, snapshotIdInheritanceEnabled); Table table = TABLES.create(SCHEMA, spec, options, tableLocation); @@ -318,6 +330,7 @@ public void testRewriteSmallManifestsPartitionedTable() { public void testRewriteImportedManifests() throws IOException { PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("c3").build(); Map options = Maps.newHashMap(); + options.put(TableProperties.FORMAT_VERSION, String.valueOf(formatVersion)); options.put(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, snapshotIdInheritanceEnabled); Table table = TABLES.create(SCHEMA, spec, options, tableLocation); @@ -373,6 +386,7 @@ public void testRewriteImportedManifests() throws IOException { public void testRewriteLargeManifestsPartitionedTable() throws IOException { PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("c3").build(); Map options = Maps.newHashMap(); + options.put(TableProperties.FORMAT_VERSION, String.valueOf(formatVersion)); options.put(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, snapshotIdInheritanceEnabled); Table table = TABLES.create(SCHEMA, spec, options, tableLocation); @@ -428,6 +442,7 @@ public void testRewriteLargeManifestsPartitionedTable() throws IOException { public void testRewriteManifestsWithPredicate() throws IOException { PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("c1").truncate("c2", 2).build(); Map options = Maps.newHashMap(); + options.put(TableProperties.FORMAT_VERSION, String.valueOf(formatVersion)); options.put(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, snapshotIdInheritanceEnabled); Table table = TABLES.create(SCHEMA, spec, options, tableLocation); @@ -495,6 +510,8 @@ public void testRewriteManifestsWithPredicate() throws IOException { @Test public void testRewriteSmallManifestsNonPartitionedV2Table() { + assumeThat(formatVersion).isGreaterThan(1); + PartitionSpec spec = PartitionSpec.unpartitioned(); Map properties = ImmutableMap.of(TableProperties.FORMAT_VERSION, "2"); Table table = TABLES.create(SCHEMA, spec, properties, tableLocation);