diff --git a/api/src/main/java/org/apache/iceberg/ExpireSnapshots.java b/api/src/main/java/org/apache/iceberg/ExpireSnapshots.java index f6524a1d4fba..15a141eb8c2c 100644 --- a/api/src/main/java/org/apache/iceberg/ExpireSnapshots.java +++ b/api/src/main/java/org/apache/iceberg/ExpireSnapshots.java @@ -118,4 +118,15 @@ public interface ExpireSnapshots extends PendingUpdate> { * @return this for method chaining */ ExpireSnapshots cleanExpiredFiles(boolean clean); + + /** + * Enable cleaning up unused metadata, such as partition specs, schemas, etc. + * + * @param clean remove unused partition specs, schemas, or other metadata when true + * @return this for method chaining + */ + default ExpireSnapshots cleanExpiredMetadata(boolean clean) { + throw new UnsupportedOperationException( + this.getClass().getName() + " doesn't implement cleanExpiredMetadata"); + } } diff --git a/core/src/main/java/org/apache/iceberg/IncrementalFileCleanup.java b/core/src/main/java/org/apache/iceberg/IncrementalFileCleanup.java index 60ad46e8e864..faa7e21a542d 100644 --- a/core/src/main/java/org/apache/iceberg/IncrementalFileCleanup.java +++ b/core/src/main/java/org/apache/iceberg/IncrementalFileCleanup.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; @@ -256,7 +257,8 @@ public void cleanFiles(TableMetadata beforeExpiration, TableMetadata afterExpira }); Set filesToDelete = - findFilesToDelete(manifestsToScan, manifestsToRevert, validIds, afterExpiration); + findFilesToDelete( + manifestsToScan, manifestsToRevert, validIds, beforeExpiration.specsById()); deleteFiles(filesToDelete, "data"); deleteFiles(manifestsToDelete, "manifest"); @@ -273,7 +275,7 @@ private Set findFilesToDelete( Set manifestsToScan, Set manifestsToRevert, Set validIds, - TableMetadata current) { + Map specsById) { Set filesToDelete = ConcurrentHashMap.newKeySet(); Tasks.foreach(manifestsToScan) .retry(3) @@ -285,8 +287,7 @@ private Set findFilesToDelete( .run( manifest -> { // the manifest has deletes, scan it to find files to delete - try (ManifestReader reader = - ManifestFiles.open(manifest, fileIO, current.specsById())) { + try (ManifestReader reader = ManifestFiles.open(manifest, fileIO, specsById)) { for (ManifestEntry entry : reader.entries()) { // if the snapshot ID of the DELETE entry is no longer valid, the data can be // deleted @@ -311,8 +312,7 @@ private Set findFilesToDelete( .run( manifest -> { // the manifest has deletes, scan it to find files to delete - try (ManifestReader reader = - ManifestFiles.open(manifest, fileIO, current.specsById())) { + try (ManifestReader reader = ManifestFiles.open(manifest, fileIO, specsById)) { for (ManifestEntry entry : reader.entries()) { // delete any ADDED file from manifests that were reverted if (entry.status() == ManifestEntry.Status.ADDED) { diff --git a/core/src/main/java/org/apache/iceberg/MetadataUpdate.java b/core/src/main/java/org/apache/iceberg/MetadataUpdate.java index 49fb1fe01c44..d697df6a4fc6 100644 --- a/core/src/main/java/org/apache/iceberg/MetadataUpdate.java +++ b/core/src/main/java/org/apache/iceberg/MetadataUpdate.java @@ -165,6 +165,23 @@ public void applyTo(TableMetadata.Builder metadataBuilder) { } } + class RemovePartitionSpecs implements MetadataUpdate { + private final Set specIds; + + public RemovePartitionSpecs(Set specIds) { + this.specIds = specIds; + } + + public Set specIds() { + return specIds; + } + + @Override + public void applyTo(TableMetadata.Builder metadataBuilder) { + metadataBuilder.removeSpecs(specIds); + } + } + class AddSortOrder implements MetadataUpdate { private final UnboundSortOrder sortOrder; diff --git a/core/src/main/java/org/apache/iceberg/MetadataUpdateParser.java b/core/src/main/java/org/apache/iceberg/MetadataUpdateParser.java index 8cdfd3c72b6e..08d4b3398f10 100644 --- a/core/src/main/java/org/apache/iceberg/MetadataUpdateParser.java +++ b/core/src/main/java/org/apache/iceberg/MetadataUpdateParser.java @@ -59,6 +59,7 @@ private MetadataUpdateParser() {} static final String SET_CURRENT_VIEW_VERSION = "set-current-view-version"; static final String SET_PARTITION_STATISTICS = "set-partition-statistics"; static final String REMOVE_PARTITION_STATISTICS = "remove-partition-statistics"; + static final String REMOVE_PARTITION_SPECS = "remove-partition-specs"; // AssignUUID private static final String UUID = "uuid"; @@ -126,6 +127,9 @@ private MetadataUpdateParser() {} // SetCurrentViewVersion private static final String VIEW_VERSION_ID = "view-version-id"; + // RemovePartitionSpecs + private static final String SPEC_IDS = "spec-ids"; + private static final Map, String> ACTIONS = ImmutableMap., String>builder() .put(MetadataUpdate.AssignUUID.class, ASSIGN_UUID) @@ -149,6 +153,7 @@ private MetadataUpdateParser() {} .put(MetadataUpdate.SetLocation.class, SET_LOCATION) .put(MetadataUpdate.AddViewVersion.class, ADD_VIEW_VERSION) .put(MetadataUpdate.SetCurrentViewVersion.class, SET_CURRENT_VIEW_VERSION) + .put(MetadataUpdate.RemovePartitionSpecs.class, REMOVE_PARTITION_SPECS) .buildOrThrow(); public static String toJson(MetadataUpdate metadataUpdate) { @@ -241,6 +246,9 @@ public static void toJson(MetadataUpdate metadataUpdate, JsonGenerator generator writeSetCurrentViewVersionId( (MetadataUpdate.SetCurrentViewVersion) metadataUpdate, generator); break; + case REMOVE_PARTITION_SPECS: + writeRemovePartitionSpecs((MetadataUpdate.RemovePartitionSpecs) metadataUpdate, generator); + break; default: throw new IllegalArgumentException( String.format( @@ -312,6 +320,8 @@ public static MetadataUpdate fromJson(JsonNode jsonNode) { return readAddViewVersion(jsonNode); case SET_CURRENT_VIEW_VERSION: return readCurrentViewVersionId(jsonNode); + case REMOVE_PARTITION_SPECS: + return readRemovePartitionSpecs(jsonNode); default: throw new UnsupportedOperationException( String.format("Cannot convert metadata update action to json: %s", action)); @@ -447,6 +457,11 @@ private static void writeSetCurrentViewVersionId( gen.writeNumberField(VIEW_VERSION_ID, metadataUpdate.versionId()); } + private static void writeRemovePartitionSpecs( + MetadataUpdate.RemovePartitionSpecs metadataUpdate, JsonGenerator gen) throws IOException { + JsonUtil.writeIntegerArray(SPEC_IDS, metadataUpdate.specIds(), gen); + } + private static MetadataUpdate readAssignUUID(JsonNode node) { String uuid = JsonUtil.getString(UUID, node); return new MetadataUpdate.AssignUUID(uuid); @@ -596,4 +611,8 @@ private static MetadataUpdate readAddViewVersion(JsonNode node) { private static MetadataUpdate readCurrentViewVersionId(JsonNode node) { return new MetadataUpdate.SetCurrentViewVersion(JsonUtil.getInt(VIEW_VERSION_ID, node)); } + + private static MetadataUpdate readRemovePartitionSpecs(JsonNode node) { + return new MetadataUpdate.RemovePartitionSpecs(JsonUtil.getIntegerSet(SPEC_IDS, node)); + } } diff --git a/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java b/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java index 7558ea7d8a3e..0cc89433413d 100644 --- a/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java +++ b/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java @@ -41,6 +41,7 @@ import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.function.Consumer; +import java.util.stream.Collectors; import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; @@ -85,6 +86,7 @@ public void accept(String file) { private ExecutorService planExecutorService = ThreadPools.getWorkerPool(); private Boolean incrementalCleanup; private boolean specifiedSnapshotId = false; + private boolean cleanExpiredMetadata = false; RemoveSnapshots(TableOperations ops) { this.ops = ops; @@ -159,6 +161,12 @@ public ExpireSnapshots planWith(ExecutorService executorService) { return this; } + @Override + public ExpireSnapshots cleanExpiredMetadata(boolean clean) { + this.cleanExpiredMetadata = clean; + return this; + } + @Override public List apply() { TableMetadata updated = internalApply(); @@ -209,6 +217,26 @@ private TableMetadata internalApply() { .forEach(idsToRemove::add); updatedMetaBuilder.removeSnapshots(idsToRemove); + if (cleanExpiredMetadata) { + // TODO: Support cleaning expired schema as well. + Set reachableSpecs = Sets.newConcurrentHashSet(); + reachableSpecs.add(base.defaultSpecId()); + Tasks.foreach(idsToRetain) + .executeWith(planExecutorService) + .run( + snapshot -> + base.snapshot(snapshot).allManifests(ops.io()).stream() + .map(ManifestFile::partitionSpecId) + .forEach(reachableSpecs::add)); + + Set specsToRemove = + base.specs().stream() + .map(PartitionSpec::specId) + .filter(specId -> !reachableSpecs.contains(specId)) + .collect(Collectors.toSet()); + updatedMetaBuilder.removeSpecs(specsToRemove); + } + return updatedMetaBuilder.build(); } diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java b/core/src/main/java/org/apache/iceberg/TableMetadata.java index d20dd59d2b97..c72a599d18b2 100644 --- a/core/src/main/java/org/apache/iceberg/TableMetadata.java +++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java @@ -1108,6 +1108,19 @@ public Builder setDefaultPartitionSpec(int specId) { return this; } + Builder removeSpecs(Iterable specIds) { + Set specIdsToRemove = Sets.newHashSet(specIds); + Preconditions.checkArgument( + !specIdsToRemove.contains(defaultSpecId), "Cannot remove the default partition spec"); + + this.specs = + specs.stream() + .filter(s -> !specIdsToRemove.contains(s.specId())) + .collect(Collectors.toList()); + changes.add(new MetadataUpdate.RemovePartitionSpecs(specIdsToRemove)); + return this; + } + public Builder addPartitionSpec(UnboundPartitionSpec spec) { addPartitionSpecInternal(spec.bind(schemasById.get(currentSchemaId))); return this; diff --git a/core/src/main/java/org/apache/iceberg/UpdateRequirements.java b/core/src/main/java/org/apache/iceberg/UpdateRequirements.java index d92c1a3742fe..95369d51934d 100644 --- a/core/src/main/java/org/apache/iceberg/UpdateRequirements.java +++ b/core/src/main/java/org/apache/iceberg/UpdateRequirements.java @@ -105,6 +105,8 @@ private Builder update(MetadataUpdate update) { update((MetadataUpdate.SetDefaultPartitionSpec) update); } else if (update instanceof MetadataUpdate.SetDefaultSortOrder) { update((MetadataUpdate.SetDefaultSortOrder) update); + } else if (update instanceof MetadataUpdate.RemovePartitionSpecs) { + update((MetadataUpdate.RemovePartitionSpecs) update); } return this; @@ -173,6 +175,27 @@ private void update(MetadataUpdate.SetDefaultSortOrder unused) { } } + private void update(MetadataUpdate.RemovePartitionSpecs unused) { + // require that the default partition spec has not changed + if (!setSpecId) { + if (base != null && !isReplace) { + require(new UpdateRequirement.AssertDefaultSpecID(base.defaultSpecId())); + } + this.setSpecId = true; + } + + // require that no branches have changed, so that old specs won't be written. + if (base != null && !isReplace) { + base.refs() + .forEach( + (name, ref) -> { + if (ref.isBranch() && !name.equals(SnapshotRef.MAIN_BRANCH)) { + require(new UpdateRequirement.AssertRefSnapshotID(name, ref.snapshotId())); + } + }); + } + } + private List build() { return requirements.build(); } diff --git a/core/src/test/java/org/apache/iceberg/TestMetadataUpdateParser.java b/core/src/test/java/org/apache/iceberg/TestMetadataUpdateParser.java index bfed6ebebe2c..8af0e5e7e479 100644 --- a/core/src/test/java/org/apache/iceberg/TestMetadataUpdateParser.java +++ b/core/src/test/java/org/apache/iceberg/TestMetadataUpdateParser.java @@ -926,6 +926,17 @@ public void testRemovePartitionStatistics() { .isEqualTo(json); } + @Test + public void testRemovePartitionSpec() { + String action = MetadataUpdateParser.REMOVE_PARTITION_SPECS; + String json = "{\"action\":\"remove-partition-specs\",\"spec-ids\":[1,2,3]}"; + MetadataUpdate expected = new MetadataUpdate.RemovePartitionSpecs(ImmutableSet.of(1, 2, 3)); + assertEquals(action, expected, MetadataUpdateParser.fromJson(json)); + assertThat(MetadataUpdateParser.toJson(expected)) + .as("Remove partition specs should convert to the correct JSON value") + .isEqualTo(json); + } + public void assertEquals( String action, MetadataUpdate expectedUpdate, MetadataUpdate actualUpdate) { switch (action) { @@ -1030,6 +1041,11 @@ public void assertEquals( (MetadataUpdate.SetCurrentViewVersion) expectedUpdate, (MetadataUpdate.SetCurrentViewVersion) actualUpdate); break; + case MetadataUpdateParser.REMOVE_PARTITION_SPECS: + assertEqualsRemovePartitionSpecs( + (MetadataUpdate.RemovePartitionSpecs) expectedUpdate, + (MetadataUpdate.RemovePartitionSpecs) actualUpdate); + break; default: fail("Unrecognized metadata update action: " + action); } @@ -1251,6 +1267,11 @@ private static void assertEqualsSetCurrentViewVersion( assertThat(actual.versionId()).isEqualTo(expected.versionId()); } + private static void assertEqualsRemovePartitionSpecs( + MetadataUpdate.RemovePartitionSpecs expected, MetadataUpdate.RemovePartitionSpecs actual) { + assertThat(actual.specIds()).containsExactlyInAnyOrderElementsOf(expected.specIds()); + } + private String createManifestListWithManifestFiles(long snapshotId, Long parentSnapshotId) throws IOException { File manifestList = File.createTempFile("manifests", null, temp.toFile()); diff --git a/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java b/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java index f95fe6191e43..2148b4b3c801 100644 --- a/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java +++ b/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java @@ -37,6 +37,7 @@ import java.util.stream.Collectors; import org.apache.iceberg.ManifestEntry.Status; import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.PositionOutputStream; import org.apache.iceberg.puffin.Blob; @@ -1620,6 +1621,90 @@ public void testRetainFilesOnRetainedBranches() { assertThat(deletedFiles).isEqualTo(expectedDeletes); } + @TestTemplate + public void testRemoveSpecDuringExpiration() { + DataFile file = + DataFiles.builder(table.spec()) + .withPath("/path/to/data-0.parquet") + .withPartitionPath("data_bucket=0") + .withFileSizeInBytes(10) + .withRecordCount(100) + .build(); + table.newAppend().appendFile(file).commit(); + Snapshot append = table.currentSnapshot(); + String appendManifest = + Iterables.getOnlyElement( + table.currentSnapshot().allManifests(table.io()).stream() + .map(ManifestFile::path) + .collect(Collectors.toList())); + table.newDelete().deleteFile(file).commit(); + Snapshot delete = table.currentSnapshot(); + String deleteManifest = + Iterables.getOnlyElement( + table.currentSnapshot().allManifests(table.io()).stream() + .map(ManifestFile::path) + .collect(Collectors.toList())); + + table.updateSpec().addField("id_bucket", Expressions.bucket("id", 16)).commit(); + PartitionSpec idAndDataBucketSpec = table.spec(); + DataFile bucketFile = + DataFiles.builder(table.spec()) + .withPath("/path/to/data-0-id-0.parquet") + .withFileSizeInBytes(10) + .withRecordCount(100) + .withPartitionPath("data_bucket=0/id_bucket=0") + .build(); + table.newAppend().appendFile(bucketFile).commit(); + + Set deletedFiles = Sets.newHashSet(); + // Expiring snapshots should remove the data_bucket partition + removeSnapshots(table) + .expireOlderThan(System.currentTimeMillis()) + .cleanExpiredMetadata(true) + .deleteWith(deletedFiles::add) + .commit(); + + assertThat(deletedFiles) + .containsExactlyInAnyOrder( + appendManifest, + deleteManifest, + file.location(), + append.manifestListLocation(), + delete.manifestListLocation()); + assertThat(table.specs().keySet()) + .as("Only id_bucket + data_bucket transform should exist") + .containsExactly(idAndDataBucketSpec.specId()); + } + + @TestTemplate + public void testRemoveSpecsDoesntRemoveDefaultSpec() throws IOException { + // The default spec for table is bucketed on data, but write using unpartitioned + PartitionSpec dataBucketSpec = table.spec(); + DataFile file = + DataFiles.builder(PartitionSpec.unpartitioned()) + .withPath("/path/to/data-0.parquet") + .withFileSizeInBytes(10) + .withRecordCount(100) + .build(); + + table.newAppend().appendFile(file).commit(); + Snapshot append = table.currentSnapshot(); + table.newDelete().deleteFile(file).commit(); + + Set deletedFiles = Sets.newHashSet(); + // Expiring snapshots should remove only the unpartitioned spec + removeSnapshots(table) + .expireOlderThan(System.currentTimeMillis()) + .cleanExpiredMetadata(true) + .deleteWith(deletedFiles::add) + .commit(); + + assertThat(deletedFiles).containsExactlyInAnyOrder(append.manifestListLocation()); + assertThat(table.specs().keySet()) + .as("Only data_bucket transform should exist") + .containsExactly(dataBucketSpec.specId()); + } + private Set manifestPaths(Snapshot snapshot, FileIO io) { return snapshot.allManifests(io).stream().map(ManifestFile::path).collect(Collectors.toSet()); } diff --git a/core/src/test/java/org/apache/iceberg/TestUpdateRequirements.java b/core/src/test/java/org/apache/iceberg/TestUpdateRequirements.java index 1a6c289ea241..de4d266318db 100644 --- a/core/src/test/java/org/apache/iceberg/TestUpdateRequirements.java +++ b/core/src/test/java/org/apache/iceberg/TestUpdateRequirements.java @@ -424,6 +424,121 @@ public void setDefaultPartitionSpecFailure() { .hasMessage("Requirement failed: default partition spec changed: expected id 0 != 1"); } + @Test + public void removePartitionSpec() { + int defaultSpecId = 3; + when(metadata.defaultSpecId()).thenReturn(defaultSpecId); + + List requirements = + UpdateRequirements.forUpdateTable( + metadata, + ImmutableList.of(new MetadataUpdate.RemovePartitionSpecs(Sets.newHashSet(1, 2)))); + requirements.forEach(req -> req.validate(metadata)); + + assertThat(requirements) + .hasSize(2) + .hasOnlyElementsOfTypes( + UpdateRequirement.AssertTableUUID.class, UpdateRequirement.AssertDefaultSpecID.class); + + assertTableUUID(requirements); + + assertThat(requirements) + .element(1) + .asInstanceOf(InstanceOfAssertFactories.type(UpdateRequirement.AssertDefaultSpecID.class)) + .extracting(UpdateRequirement.AssertDefaultSpecID::specId) + .isEqualTo(defaultSpecId); + } + + @Test + public void testRemovePartitionSpecsWithBranch() { + int defaultSpecId = 3; + long snapshotId = 42L; + when(metadata.defaultSpecId()).thenReturn(defaultSpecId); + + String branch = "branch"; + SnapshotRef snapshotRef = mock(SnapshotRef.class); + when(snapshotRef.snapshotId()).thenReturn(snapshotId); + when(snapshotRef.isBranch()).thenReturn(true); + when(metadata.refs()).thenReturn(ImmutableMap.of(branch, snapshotRef)); + when(metadata.ref(branch)).thenReturn(snapshotRef); + + List requirements = + UpdateRequirements.forUpdateTable( + metadata, + ImmutableList.of(new MetadataUpdate.RemovePartitionSpecs(Sets.newHashSet(1, 2)))); + requirements.forEach(req -> req.validate(metadata)); + + assertThat(requirements) + .hasSize(3) + .hasOnlyElementsOfTypes( + UpdateRequirement.AssertTableUUID.class, + UpdateRequirement.AssertDefaultSpecID.class, + UpdateRequirement.AssertRefSnapshotID.class); + + assertTableUUID(requirements); + + assertThat(requirements) + .element(1) + .asInstanceOf(InstanceOfAssertFactories.type(UpdateRequirement.AssertDefaultSpecID.class)) + .extracting(UpdateRequirement.AssertDefaultSpecID::specId) + .isEqualTo(defaultSpecId); + + assertThat(requirements) + .element(2) + .asInstanceOf(InstanceOfAssertFactories.type(UpdateRequirement.AssertRefSnapshotID.class)) + .extracting(UpdateRequirement.AssertRefSnapshotID::snapshotId) + .isEqualTo(snapshotId); + } + + @Test + public void testRemovePartitionSpecsFailure() { + int defaultSpecId = 3; + when(metadata.defaultSpecId()).thenReturn(defaultSpecId); + when(updated.defaultSpecId()).thenReturn(defaultSpecId + 1); + + List requirements = + UpdateRequirements.forUpdateTable( + metadata, + ImmutableList.of(new MetadataUpdate.RemovePartitionSpecs(Sets.newHashSet(1, 2)))); + + assertThatThrownBy(() -> requirements.forEach(req -> req.validate(updated))) + .isInstanceOf(CommitFailedException.class) + .hasMessage( + "Requirement failed: default partition spec changed: expected id %s != %s", + defaultSpecId, defaultSpecId + 1); + } + + @Test + public void testRemovePartitionSpecsWithBranchFailure() { + int defaultSpecId = 3; + long snapshotId = 42L; + when(metadata.defaultSpecId()).thenReturn(defaultSpecId); + when(updated.defaultSpecId()).thenReturn(defaultSpecId); + + String branch = "test"; + SnapshotRef snapshotRef = mock(SnapshotRef.class); + when(snapshotRef.snapshotId()).thenReturn(snapshotId); + when(snapshotRef.isBranch()).thenReturn(true); + when(metadata.refs()).thenReturn(ImmutableMap.of(branch, snapshotRef)); + when(metadata.ref(branch)).thenReturn(snapshotRef); + + SnapshotRef updatedRef = mock(SnapshotRef.class); + when(updatedRef.snapshotId()).thenReturn(snapshotId + 1); + when(updatedRef.isBranch()).thenReturn(true); + when(updated.ref(branch)).thenReturn(updatedRef); + + List requirements = + UpdateRequirements.forUpdateTable( + metadata, + ImmutableList.of(new MetadataUpdate.RemovePartitionSpecs(Sets.newHashSet(1, 2)))); + + assertThatThrownBy(() -> requirements.forEach(req -> req.validate(updated))) + .isInstanceOf(CommitFailedException.class) + .hasMessage( + "Requirement failed: branch %s has changed: expected id %s != %s", + branch, snapshotId, snapshotId + 1); + } + @Test public void addSortOrder() { List requirements = diff --git a/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java b/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java index 5402a13d7d4b..b6dd131c1fb3 100644 --- a/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java +++ b/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java @@ -1284,6 +1284,62 @@ public void testUpdateTableSpecThenRevert() { assertThat(table.spec()).as("Loaded table should have expected spec").isEqualTo(TABLE_SPEC); } + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testRemoveUnusedSpec(boolean withBranch) { + String branch = "test"; + C catalog = catalog(); + + if (requiresNamespaceCreate()) { + catalog.createNamespace(NS); + } + + Table table = + catalog + .buildTable(TABLE, SCHEMA) + .withPartitionSpec(SPEC) + .withProperty(TableProperties.GC_ENABLED, "true") + .create(); + PartitionSpec spec = table.spec(); + // added a file to trigger snapshot expiration + table.newFastAppend().appendFile(FILE_A).commit(); + if (withBranch) { + table.manageSnapshots().createBranch(branch).commit(); + } + table.updateSpec().addField(Expressions.bucket("data", 16)).commit(); + table.updateSpec().removeField(Expressions.bucket("data", 16)).commit(); + table.updateSpec().addField("data").commit(); + assertThat(table.specs()).as("Should have 3 total specs").hasSize(3); + PartitionSpec current = table.spec(); + table.expireSnapshots().cleanExpiredMetadata(true).commit(); + + Table loaded = catalog.loadTable(TABLE); + assertThat(loaded.specs().values()).containsExactlyInAnyOrder(spec, current); + + // add a data file with current spec and remove the old data file + table.newDelete().deleteFile(FILE_A).commit(); + DataFile anotherFile = + DataFiles.builder(current) + .withPath("/path/to/data-b.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("id_bucket=0/data=123") // easy way to set partition data for now + .withRecordCount(2) // needs at least one record or else metrics will filter it out + .build(); + table.newAppend().appendFile(anotherFile).commit(); + table + .expireSnapshots() + .cleanExpiredFiles(false) + .expireOlderThan(table.currentSnapshot().timestampMillis()) + .cleanExpiredMetadata(true) + .commit(); + loaded = catalog.loadTable(TABLE); + if (withBranch) { + assertThat(loaded.specs().values()).containsExactlyInAnyOrder(spec, current); + } else { + assertThat(loaded.specs().values()).containsExactlyInAnyOrder(current); + } + } + @Test public void testUpdateTableSortOrder() { C catalog = catalog();