From a75c4afa0909910a1a96bb9661d2e40d50d91231 Mon Sep 17 00:00:00 2001 From: Russell_Spitzer Date: Thu, 4 Nov 2021 04:23:12 +0800 Subject: [PATCH 01/11] API: Add function for removing unused specs from metadata.json Previously there was no way to remove partition specs from a table once they were added. To fix this we add an api which searches through all reachable manifest files and records their specsIds. Any specIds which do not find are marked for removal which is done through a serializable commit. --- .../apache/iceberg/MetadataMaintenance.java | 31 +++++ .../org/apache/iceberg/RemoveUnusedSpecs.java | 29 ++++ .../main/java/org/apache/iceberg/Table.java | 11 ++ .../iceberg/BaseMetadataMaintenance.java | 32 +++++ .../apache/iceberg/BaseRemoveUnusedSpecs.java | 113 ++++++++++++++++ .../java/org/apache/iceberg/BaseTable.java | 5 + .../org/apache/iceberg/BaseTransaction.java | 6 + .../org/apache/iceberg/MetadataUpdate.java | 12 ++ .../org/apache/iceberg/SerializableTable.java | 5 + .../org/apache/iceberg/TableMetadata.java | 40 ++++++ .../apache/iceberg/TestRemoveUnusedSpecs.java | 127 ++++++++++++++++++ .../apache/iceberg/catalog/CatalogTests.java | 30 +++++ 12 files changed, 441 insertions(+) create mode 100644 api/src/main/java/org/apache/iceberg/MetadataMaintenance.java create mode 100644 api/src/main/java/org/apache/iceberg/RemoveUnusedSpecs.java create mode 100644 core/src/main/java/org/apache/iceberg/BaseMetadataMaintenance.java create mode 100644 core/src/main/java/org/apache/iceberg/BaseRemoveUnusedSpecs.java create mode 100644 core/src/test/java/org/apache/iceberg/TestRemoveUnusedSpecs.java diff --git a/api/src/main/java/org/apache/iceberg/MetadataMaintenance.java b/api/src/main/java/org/apache/iceberg/MetadataMaintenance.java new file mode 100644 index 000000000000..1ae13c31911d --- /dev/null +++ b/api/src/main/java/org/apache/iceberg/MetadataMaintenance.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +/** APIs for table metadata maintenance, such as removing unused partition specs. */ +public interface MetadataMaintenance { + /** + * Remove any partition specs from the Metadata that are no longer used in any data files. + * + *

Always preserves the current default spec even if it has not yet been used. + * + * @return a new {@link RemoveUnusedSpecs} + */ + RemoveUnusedSpecs removeUnusedSpecs(); +} diff --git a/api/src/main/java/org/apache/iceberg/RemoveUnusedSpecs.java b/api/src/main/java/org/apache/iceberg/RemoveUnusedSpecs.java new file mode 100644 index 000000000000..7c33780d9231 --- /dev/null +++ b/api/src/main/java/org/apache/iceberg/RemoveUnusedSpecs.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import java.util.List; + +/** + * API for removing partition specs from the metadata which are not the default spec and no longer + * refer to any datafiles in the table. + * + *

{@link #apply()} returns the specs that will remain if committed on the current metadata + */ +public interface RemoveUnusedSpecs extends PendingUpdate> {} diff --git a/api/src/main/java/org/apache/iceberg/Table.java b/api/src/main/java/org/apache/iceberg/Table.java index 97ea9ba76526..60a35dc5e1f3 100644 --- a/api/src/main/java/org/apache/iceberg/Table.java +++ b/api/src/main/java/org/apache/iceberg/Table.java @@ -211,6 +211,17 @@ default IncrementalChangelogScan newIncrementalChangelogScan() { */ AppendFiles newAppend(); + /** + * Create a new {@link MetadataMaintenance maintenance API} to perform metadata maintenance + * operations. + * + * @return a new {@link MetadataMaintenance} + */ + default MetadataMaintenance maintenance() { + throw new UnsupportedOperationException( + "Maintenance operations are not supported by " + getClass().getName()); + } + /** * Create a new {@link AppendFiles append API} to add files to this table and commit. * diff --git a/core/src/main/java/org/apache/iceberg/BaseMetadataMaintenance.java b/core/src/main/java/org/apache/iceberg/BaseMetadataMaintenance.java new file mode 100644 index 000000000000..d8954914fb16 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/BaseMetadataMaintenance.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +class BaseMetadataMaintenance implements MetadataMaintenance { + private final TableOperations ops; + + BaseMetadataMaintenance(TableOperations ops) { + this.ops = ops; + } + + @Override + public RemoveUnusedSpecs removeUnusedSpecs() { + return new BaseRemoveUnusedSpecs(ops); + } +} diff --git a/core/src/main/java/org/apache/iceberg/BaseRemoveUnusedSpecs.java b/core/src/main/java/org/apache/iceberg/BaseRemoveUnusedSpecs.java new file mode 100644 index 000000000000..96291734f551 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/BaseRemoveUnusedSpecs.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import static org.apache.iceberg.TableProperties.COMMIT_MAX_RETRY_WAIT_MS; +import static org.apache.iceberg.TableProperties.COMMIT_MAX_RETRY_WAIT_MS_DEFAULT; +import static org.apache.iceberg.TableProperties.COMMIT_MIN_RETRY_WAIT_MS; +import static org.apache.iceberg.TableProperties.COMMIT_MIN_RETRY_WAIT_MS_DEFAULT; +import static org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES; +import static org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES_DEFAULT; +import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS; +import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.iceberg.exceptions.CommitFailedException; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.util.ParallelIterable; +import org.apache.iceberg.util.Tasks; +import org.apache.iceberg.util.ThreadPools; + +/** + * Implementation of RemoveUnusedSpecs API to remove unused partition specs. + * + *

When committing, these changes will be applied to the latest table metadata. Commit conflicts + * will be resolved by recalculating which specs are no longer in use again in the latest metadata + * and retrying. + */ +class BaseRemoveUnusedSpecs implements RemoveUnusedSpecs { + private final TableOperations ops; + + private TableMetadata base; + + BaseRemoveUnusedSpecs(TableOperations ops) { + this.ops = ops; + this.base = ops.current(); + } + + @Override + public List apply() { + TableMetadata newMetadata = internalApply(); + return newMetadata.specs(); + } + + @Override + public void commit() { + this.base = ops.refresh(); + Tasks.foreach(ops) + .retry(base.propertyAsInt(COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT)) + .exponentialBackoff( + base.propertyAsInt(COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT), + base.propertyAsInt(COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT), + base.propertyAsInt(COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT), + 2.0 /* exponential */) + .onlyRetryOn(CommitFailedException.class) + .run( + taskOps -> { + TableMetadata newMetadata = internalApply(); + taskOps.commit(base, newMetadata); + }); + } + + private Iterable reachableManifests() { + Iterable snapshots = base.snapshots(); + Iterable> manifestIterables = + Iterables.transform(snapshots, snapshot -> snapshot.allManifests(ops.io())); + try (CloseableIterable iterable = + new ParallelIterable<>(manifestIterables, ThreadPools.getWorkerPool())) { + return Sets.newHashSet(iterable); + } catch (IOException e) { + throw new UncheckedIOException("Failed to close parallel iterable", e); + } + } + + private TableMetadata internalApply() { + this.base = ops.refresh(); + List specs = base.specs(); + int currentSpecId = base.defaultSpecId(); + + Set specsInUse = + Sets.newHashSet(Iterables.transform(reachableManifests(), ManifestFile::partitionSpecId)); + + specsInUse.add(currentSpecId); + + List specsToRemove = + specs.stream() + .filter(spec -> !specsInUse.contains(spec.specId())) + .collect(Collectors.toList()); + + return TableMetadata.buildFrom(base).removeUnusedSpecs(specsToRemove).build(); + } +} diff --git a/core/src/main/java/org/apache/iceberg/BaseTable.java b/core/src/main/java/org/apache/iceberg/BaseTable.java index a44adc4e9035..64d4358e804a 100644 --- a/core/src/main/java/org/apache/iceberg/BaseTable.java +++ b/core/src/main/java/org/apache/iceberg/BaseTable.java @@ -170,6 +170,11 @@ public ReplaceSortOrder replaceSortOrder() { return new BaseReplaceSortOrder(ops); } + @Override + public MetadataMaintenance maintenance() { + return new BaseMetadataMaintenance(ops); + } + @Override public UpdateLocation updateLocation() { return new SetLocation(ops); diff --git a/core/src/main/java/org/apache/iceberg/BaseTransaction.java b/core/src/main/java/org/apache/iceberg/BaseTransaction.java index eb8dbd2538e6..f693ea093658 100644 --- a/core/src/main/java/org/apache/iceberg/BaseTransaction.java +++ b/core/src/main/java/org/apache/iceberg/BaseTransaction.java @@ -687,6 +687,12 @@ public UpdateProperties updateProperties() { return BaseTransaction.this.updateProperties(); } + @Override + public MetadataMaintenance maintenance() { + throw new UnsupportedOperationException( + "Cannot perform metadata maintenance as part of a transaction"); + } + @Override public ReplaceSortOrder replaceSortOrder() { return BaseTransaction.this.replaceSortOrder(); diff --git a/core/src/main/java/org/apache/iceberg/MetadataUpdate.java b/core/src/main/java/org/apache/iceberg/MetadataUpdate.java index 49fb1fe01c44..52680ad35fb2 100644 --- a/core/src/main/java/org/apache/iceberg/MetadataUpdate.java +++ b/core/src/main/java/org/apache/iceberg/MetadataUpdate.java @@ -165,6 +165,18 @@ public void applyTo(TableMetadata.Builder metadataBuilder) { } } + class RemoveUnusedSpecs implements MetadataUpdate { + private final Set specIds; + + public RemoveUnusedSpecs(Set specIds) { + this.specIds = specIds; + } + + public Set specIds() { + return specIds; + } + } + class AddSortOrder implements MetadataUpdate { private final UnboundSortOrder sortOrder; diff --git a/core/src/main/java/org/apache/iceberg/SerializableTable.java b/core/src/main/java/org/apache/iceberg/SerializableTable.java index 082e50b840dc..109e1d2f05b5 100644 --- a/core/src/main/java/org/apache/iceberg/SerializableTable.java +++ b/core/src/main/java/org/apache/iceberg/SerializableTable.java @@ -343,6 +343,11 @@ public AppendFiles newAppend() { throw new UnsupportedOperationException(errorMsg("newAppend")); } + @Override + public MetadataMaintenance maintenance() { + throw new UnsupportedOperationException(errorMsg("metadataMaintenance")); + } + @Override public RewriteFiles newRewrite() { throw new UnsupportedOperationException(errorMsg("newRewrite")); diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java b/core/src/main/java/org/apache/iceberg/TableMetadata.java index d20dd59d2b97..84002b56b47d 100644 --- a/core/src/main/java/org/apache/iceberg/TableMetadata.java +++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java @@ -1108,6 +1108,46 @@ public Builder setDefaultPartitionSpec(int specId) { return this; } + Builder removeUnusedSpecs(Iterable specsToRemove) { + Set specIdsToRemove = Sets.newHashSet(); + for (PartitionSpec spec : specsToRemove) { + Preconditions.checkArgument( + spec.specId() != defaultSpecId, "Cannot remove default partition spec"); + PartitionSpec toBeRemoved = specsById.remove(spec.specId()); + Preconditions.checkArgument( + toBeRemoved == null || toBeRemoved.equals(spec), + "Cannot remove an unknown spec, spec id: %s", + spec.specId()); + if (toBeRemoved != null) { + specIdsToRemove.add(spec.specId()); + } + } + this.specs = + specs.stream() + .filter(s -> !specIdsToRemove.contains(s.specId())) + .collect(Collectors.toList()); + changes.add(new MetadataUpdate.RemoveUnusedSpecs(specIdsToRemove)); + return this; + } + + Builder removeUnusedSpecsById(Iterable specIds) { + Set specIdsToRemove = Sets.newHashSet(); + for (Integer specId : specIds) { + Preconditions.checkArgument( + specId != defaultSpecId, "Cannot remove default partition spec"); + PartitionSpec toBeRemoved = specsById.remove(specId); + if (toBeRemoved != null) { + specIdsToRemove.add(specId); + } + } + this.specs = + specs.stream() + .filter(s -> !specIdsToRemove.contains(s.specId())) + .collect(Collectors.toList()); + changes.add(new MetadataUpdate.RemoveUnusedSpecs(specIdsToRemove)); + return this; + } + public Builder addPartitionSpec(UnboundPartitionSpec spec) { addPartitionSpecInternal(spec.bind(schemasById.get(currentSchemaId))); return this; diff --git a/core/src/test/java/org/apache/iceberg/TestRemoveUnusedSpecs.java b/core/src/test/java/org/apache/iceberg/TestRemoveUnusedSpecs.java new file mode 100644 index 000000000000..121c606fbeba --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/TestRemoveUnusedSpecs.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import static org.assertj.core.api.Assertions.assertThat; + +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.types.Types; +import org.junit.jupiter.api.TestTemplate; + +public class TestRemoveUnusedSpecs extends TestBase { + + @TestTemplate + public void testRemoveAllButCurrent() { + table + .updateSchema() + .addColumn("ts", Types.TimestampType.withoutZone()) + .addColumn("category", Types.StringType.get()) + .commit(); + table.updateSpec().addField("id").commit(); + table.updateSpec().addField("ts").commit(); + table.updateSpec().addField("category").commit(); + table.updateSpec().addField("data").commit(); + assertThat(table.specs().size()).as("Added specs should be present").isEqualTo(5); + + PartitionSpec currentSpec = table.spec(); + table.maintenance().removeUnusedSpecs().commit(); + + assertThat(table.specs().size()).as("All but current spec should be removed").isEqualTo(1); + assertThat(table.spec()).as("Current spec shall not change").isEqualTo(currentSpec); + } + + @TestTemplate + public void testDontRemoveInUseSpecs() { + table + .updateSchema() + .addColumn("ts", Types.LongType.get()) + .addColumn("category", Types.StringType.get()) + .commit(); + + table.updateSpec().addField("id").commit(); // 1 + table.newAppend().appendFile(newDataFile("data_bucket=0/id=5")).commit(); + + table.updateSpec().addField("ts").commit(); // 2 + + table.updateSpec().addField("category").commit(); // 3 + if (formatVersion == 1) { + table.newAppend().appendFile(newDataFile("data_bucket=0/id=5/ts=100/category=fo")).commit(); + } else { + table + .newRowDelta() + .addDeletes(newDeleteFile(table.spec().specId(), "data_bucket=0/id=5/ts=100/category=fo")) + .commit(); + } + + table.updateSpec().addField("data").commit(); // 4 + assertThat(table.specs()).size().as("Added specs should be present").isEqualTo(5); + + PartitionSpec currentSpec = table.spec(); + table.maintenance().removeUnusedSpecs().commit(); + assertThat(table.specs().keySet()).as("Unused specs are removed").containsExactly(1, 3, 4); + assertThat(table.spec()).as("Current spec shall not change").isEqualTo(currentSpec); + } + + @TestTemplate + public void testRemoveUnpartitionedSpec() { + // clean it first to reset to unpartitioned + cleanupTables(); + this.table = create(SCHEMA, PartitionSpec.unpartitioned()); + DataFile file = + DataFiles.builder(table.spec()) + .withPath("/path/to/data-0.parquet") + .withFileSizeInBytes(10) + .withRecordCount(100) + .build(); + table.newAppend().appendFile(file).commit(); + + table.updateSpec().addField("data_bucket", Expressions.bucket("data", 16)).commit(); + + // removeUnusedPartitionSpec shall not remove the unpartitioned spec + table.maintenance().removeUnusedSpecs().commit(); + assertThat(table.specs().keySet()).as("unpartitioned spec is still used").containsExactly(0, 1); + + table.newDelete().deleteFile(file).commit(); + DataFile bucketFile = + DataFiles.builder(table.spec()) + .withPath("/path/to/data-0.parquet") + .withFileSizeInBytes(10) + .withRecordCount(100) + .withPartitionPath("data_bucket=0") + .build(); + table.newAppend().appendFile(bucketFile).commit(); + table.expireSnapshots().expireOlderThan(System.currentTimeMillis()).commit(); + + table.maintenance().removeUnusedSpecs().commit(); + assertThat(table.specs().keySet()) + .as("unpartitioned spec should be removed") + .containsExactly(1); + + table.updateSpec().removeField("data_bucket").commit(); + assertThat(table.spec().isUnpartitioned()).as("Should equal to unpartitioned").isTrue(); + + int unpartitionedFieldsSize = formatVersion == 1 ? 1 : 0; + assertThat(table.spec().fields().size()) + .as("Should have one void transform for v1 and empty for v2") + .isEqualTo(unpartitionedFieldsSize); + assertThat(table.spec().specId()) + .as("unpartitioned is evolved to use a new SpecId") + .isEqualTo(2); + } +} diff --git a/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java b/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java index 5402a13d7d4b..bd51052560f2 100644 --- a/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java +++ b/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java @@ -40,6 +40,7 @@ import org.apache.iceberg.HasTableOperations; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.ReachableFileUtil; +import org.apache.iceberg.RemoveUnusedSpecs; import org.apache.iceberg.ReplaceSortOrder; import org.apache.iceberg.Schema; import org.apache.iceberg.SortOrder; @@ -64,6 +65,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.relocated.com.google.common.collect.Streams; +import org.apache.iceberg.rest.RESTCatalog; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.CharSequenceSet; import org.junit.jupiter.api.Assumptions; @@ -1153,6 +1155,34 @@ public void testUpdateTableSpec() { .isEqualTo(expected.fields()); } + @Test + public void testRemoveUnusedSpec() { + C catalog = catalog(); + + if (requiresNamespaceCreate()) { + catalog.createNamespace(NS); + } + + Table table = catalog.buildTable(TABLE, SCHEMA).create(); + table.updateSpec().addField("shard", Expressions.bucket("id", 16)).commit(); + table.updateSpec().addField("data").commit(); + int currentSpecId = table.spec().specId(); + + RemoveUnusedSpecs remove = table.maintenance().removeUnusedSpecs(); + List specsToRetain = remove.apply(); + assertThat(specsToRetain).hasSize(1).map(PartitionSpec::specId).contains(currentSpecId); + + if (catalog instanceof RESTCatalog) { + // RESTCatalog does not support maintenance operations yet + // TODO: Remove this once REST spec is updated to support remove unused spec + assertThatThrownBy(remove::commit); + } else { + remove.commit(); + Table loaded = catalog.loadTable(TABLE); + assertThat(loaded.specs().values()).hasSameElementsAs(specsToRetain); + } + } + @Test public void testUpdateTableSpecServerSideRetry() { Assumptions.assumeTrue( From 2f3544c215048ff6e52984f0f074f1fe461d5e1d Mon Sep 17 00:00:00 2001 From: advancedxy <807537+advancedxy@users.noreply.github.com> Date: Mon, 28 Oct 2024 16:20:02 +0800 Subject: [PATCH 02/11] feat: Support RemovePartitionSpec in REST catalog --- .../org/apache/iceberg/MetadataUpdate.java | 9 ++++++-- .../apache/iceberg/MetadataUpdateParser.java | 20 +++++++++++++++++ .../org/apache/iceberg/TableMetadata.java | 4 ++-- .../apache/iceberg/UpdateRequirements.java | 22 +++++++++++++++++++ ...ecs.java => TestRemovePartitionSpecs.java} | 2 +- .../apache/iceberg/catalog/CatalogTests.java | 13 +++-------- 6 files changed, 55 insertions(+), 15 deletions(-) rename core/src/test/java/org/apache/iceberg/{TestRemoveUnusedSpecs.java => TestRemovePartitionSpecs.java} (98%) diff --git a/core/src/main/java/org/apache/iceberg/MetadataUpdate.java b/core/src/main/java/org/apache/iceberg/MetadataUpdate.java index 52680ad35fb2..676c6796f508 100644 --- a/core/src/main/java/org/apache/iceberg/MetadataUpdate.java +++ b/core/src/main/java/org/apache/iceberg/MetadataUpdate.java @@ -165,16 +165,21 @@ public void applyTo(TableMetadata.Builder metadataBuilder) { } } - class RemoveUnusedSpecs implements MetadataUpdate { + class RemovePartitionSpecs implements MetadataUpdate { private final Set specIds; - public RemoveUnusedSpecs(Set specIds) { + public RemovePartitionSpecs(Set specIds) { this.specIds = specIds; } public Set specIds() { return specIds; } + + @Override + public void applyTo(TableMetadata.Builder metadataBuilder) { + metadataBuilder.removeUnusedSpecsById(specIds); + } } class AddSortOrder implements MetadataUpdate { diff --git a/core/src/main/java/org/apache/iceberg/MetadataUpdateParser.java b/core/src/main/java/org/apache/iceberg/MetadataUpdateParser.java index 8cdfd3c72b6e..d6f55d215910 100644 --- a/core/src/main/java/org/apache/iceberg/MetadataUpdateParser.java +++ b/core/src/main/java/org/apache/iceberg/MetadataUpdateParser.java @@ -59,6 +59,7 @@ private MetadataUpdateParser() {} static final String SET_CURRENT_VIEW_VERSION = "set-current-view-version"; static final String SET_PARTITION_STATISTICS = "set-partition-statistics"; static final String REMOVE_PARTITION_STATISTICS = "remove-partition-statistics"; + static final String REMOVE_PARTITION_SPECS = "remove-partition-specs"; // AssignUUID private static final String UUID = "uuid"; @@ -126,6 +127,9 @@ private MetadataUpdateParser() {} // SetCurrentViewVersion private static final String VIEW_VERSION_ID = "view-version-id"; + // RemovePartitionSpecs + private static final String PARTITION_SPEC_IDS = "partition-spec-ids"; + private static final Map, String> ACTIONS = ImmutableMap., String>builder() .put(MetadataUpdate.AssignUUID.class, ASSIGN_UUID) @@ -149,6 +153,7 @@ private MetadataUpdateParser() {} .put(MetadataUpdate.SetLocation.class, SET_LOCATION) .put(MetadataUpdate.AddViewVersion.class, ADD_VIEW_VERSION) .put(MetadataUpdate.SetCurrentViewVersion.class, SET_CURRENT_VIEW_VERSION) + .put(MetadataUpdate.RemovePartitionSpecs.class, REMOVE_PARTITION_SPECS) .buildOrThrow(); public static String toJson(MetadataUpdate metadataUpdate) { @@ -241,6 +246,9 @@ public static void toJson(MetadataUpdate metadataUpdate, JsonGenerator generator writeSetCurrentViewVersionId( (MetadataUpdate.SetCurrentViewVersion) metadataUpdate, generator); break; + case REMOVE_PARTITION_SPECS: + writeRemovePartitionSpecs((MetadataUpdate.RemovePartitionSpecs) metadataUpdate, generator); + break; default: throw new IllegalArgumentException( String.format( @@ -312,6 +320,8 @@ public static MetadataUpdate fromJson(JsonNode jsonNode) { return readAddViewVersion(jsonNode); case SET_CURRENT_VIEW_VERSION: return readCurrentViewVersionId(jsonNode); + case REMOVE_PARTITION_SPECS: + return readRemoveUnusedSpecs(jsonNode); default: throw new UnsupportedOperationException( String.format("Cannot convert metadata update action to json: %s", action)); @@ -447,6 +457,11 @@ private static void writeSetCurrentViewVersionId( gen.writeNumberField(VIEW_VERSION_ID, metadataUpdate.versionId()); } + private static void writeRemovePartitionSpecs( + MetadataUpdate.RemovePartitionSpecs metadataUpdate, JsonGenerator gen) throws IOException { + JsonUtil.writeIntegerArray(PARTITION_SPEC_IDS, metadataUpdate.specIds(), gen); + } + private static MetadataUpdate readAssignUUID(JsonNode node) { String uuid = JsonUtil.getString(UUID, node); return new MetadataUpdate.AssignUUID(uuid); @@ -596,4 +611,9 @@ private static MetadataUpdate readAddViewVersion(JsonNode node) { private static MetadataUpdate readCurrentViewVersionId(JsonNode node) { return new MetadataUpdate.SetCurrentViewVersion(JsonUtil.getInt(VIEW_VERSION_ID, node)); } + + private static MetadataUpdate readRemoveUnusedSpecs(JsonNode node) { + return new MetadataUpdate.RemovePartitionSpecs( + JsonUtil.getIntegerSet(PARTITION_SPEC_IDS, node)); + } } diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java b/core/src/main/java/org/apache/iceberg/TableMetadata.java index 84002b56b47d..f1ec69ca27e4 100644 --- a/core/src/main/java/org/apache/iceberg/TableMetadata.java +++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java @@ -1126,7 +1126,7 @@ Builder removeUnusedSpecs(Iterable specsToRemove) { specs.stream() .filter(s -> !specIdsToRemove.contains(s.specId())) .collect(Collectors.toList()); - changes.add(new MetadataUpdate.RemoveUnusedSpecs(specIdsToRemove)); + changes.add(new MetadataUpdate.RemovePartitionSpecs(specIdsToRemove)); return this; } @@ -1144,7 +1144,7 @@ Builder removeUnusedSpecsById(Iterable specIds) { specs.stream() .filter(s -> !specIdsToRemove.contains(s.specId())) .collect(Collectors.toList()); - changes.add(new MetadataUpdate.RemoveUnusedSpecs(specIdsToRemove)); + changes.add(new MetadataUpdate.RemovePartitionSpecs(specIdsToRemove)); return this; } diff --git a/core/src/main/java/org/apache/iceberg/UpdateRequirements.java b/core/src/main/java/org/apache/iceberg/UpdateRequirements.java index d92c1a3742fe..4a44a53641d3 100644 --- a/core/src/main/java/org/apache/iceberg/UpdateRequirements.java +++ b/core/src/main/java/org/apache/iceberg/UpdateRequirements.java @@ -105,6 +105,8 @@ private Builder update(MetadataUpdate update) { update((MetadataUpdate.SetDefaultPartitionSpec) update); } else if (update instanceof MetadataUpdate.SetDefaultSortOrder) { update((MetadataUpdate.SetDefaultSortOrder) update); + } else if (update instanceof MetadataUpdate.RemovePartitionSpecs) { + update((MetadataUpdate.RemovePartitionSpecs) update); } return this; @@ -173,6 +175,26 @@ private void update(MetadataUpdate.SetDefaultSortOrder unused) { } } + private void update(MetadataUpdate.RemovePartitionSpecs unused) { + // require that the default partition spec has not changed + if (!setSpecId) { + if (base != null && !isReplace) { + require(new UpdateRequirement.AssertDefaultSpecID(base.defaultSpecId())); + } + this.setSpecId = true; + } + // require that all the branch has not changed, so that old specs won't be written. + if (base != null && !isReplace) { + base.refs() + .forEach( + (name, ref) -> { + if (ref.isBranch() && !name.equals(SnapshotRef.MAIN_BRANCH)) { + require(new UpdateRequirement.AssertRefSnapshotID(name, ref.snapshotId())); + } + }); + } + } + private List build() { return requirements.build(); } diff --git a/core/src/test/java/org/apache/iceberg/TestRemoveUnusedSpecs.java b/core/src/test/java/org/apache/iceberg/TestRemovePartitionSpecs.java similarity index 98% rename from core/src/test/java/org/apache/iceberg/TestRemoveUnusedSpecs.java rename to core/src/test/java/org/apache/iceberg/TestRemovePartitionSpecs.java index 121c606fbeba..fd67f38fb4ac 100644 --- a/core/src/test/java/org/apache/iceberg/TestRemoveUnusedSpecs.java +++ b/core/src/test/java/org/apache/iceberg/TestRemovePartitionSpecs.java @@ -24,7 +24,7 @@ import org.apache.iceberg.types.Types; import org.junit.jupiter.api.TestTemplate; -public class TestRemoveUnusedSpecs extends TestBase { +public class TestRemovePartitionSpecs extends TestBase { @TestTemplate public void testRemoveAllButCurrent() { diff --git a/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java b/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java index bd51052560f2..a4b928f27cfc 100644 --- a/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java +++ b/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java @@ -65,7 +65,6 @@ import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.relocated.com.google.common.collect.Streams; -import org.apache.iceberg.rest.RESTCatalog; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.CharSequenceSet; import org.junit.jupiter.api.Assumptions; @@ -1172,15 +1171,9 @@ public void testRemoveUnusedSpec() { List specsToRetain = remove.apply(); assertThat(specsToRetain).hasSize(1).map(PartitionSpec::specId).contains(currentSpecId); - if (catalog instanceof RESTCatalog) { - // RESTCatalog does not support maintenance operations yet - // TODO: Remove this once REST spec is updated to support remove unused spec - assertThatThrownBy(remove::commit); - } else { - remove.commit(); - Table loaded = catalog.loadTable(TABLE); - assertThat(loaded.specs().values()).hasSameElementsAs(specsToRetain); - } + remove.commit(); + Table loaded = catalog.loadTable(TABLE); + assertThat(loaded.specs().values()).hasSameElementsAs(specsToRetain); } @Test From 5841aefb670849a2a0b3c54937be9a0c6191d8a4 Mon Sep 17 00:00:00 2001 From: Amogh Jahagirdar Date: Tue, 1 Oct 2024 13:02:52 -0600 Subject: [PATCH 03/11] Remove specs as part of expiration --- .../org/apache/iceberg/ExpireSnapshots.java | 10 ++ .../apache/iceberg/MetadataMaintenance.java | 31 ----- .../org/apache/iceberg/RemoveUnusedSpecs.java | 29 ---- .../main/java/org/apache/iceberg/Table.java | 11 -- .../iceberg/BaseMetadataMaintenance.java | 32 ----- .../apache/iceberg/BaseRemoveUnusedSpecs.java | 113 ---------------- .../java/org/apache/iceberg/BaseTable.java | 5 - .../org/apache/iceberg/BaseTransaction.java | 6 - .../apache/iceberg/FileCleanupStrategy.java | 1 + .../iceberg/IncrementalFileCleanup.java | 12 +- .../org/apache/iceberg/MetadataUpdate.java | 2 +- .../org/apache/iceberg/RemoveSnapshots.java | 26 ++++ .../org/apache/iceberg/SerializableTable.java | 5 - .../org/apache/iceberg/TableMetadata.java | 24 +--- .../iceberg/TestRemovePartitionSpecs.java | 127 ------------------ .../apache/iceberg/TestRemoveSnapshots.java | 85 ++++++++++++ .../apache/iceberg/catalog/CatalogTests.java | 23 ---- 17 files changed, 130 insertions(+), 412 deletions(-) delete mode 100644 api/src/main/java/org/apache/iceberg/MetadataMaintenance.java delete mode 100644 api/src/main/java/org/apache/iceberg/RemoveUnusedSpecs.java delete mode 100644 core/src/main/java/org/apache/iceberg/BaseMetadataMaintenance.java delete mode 100644 core/src/main/java/org/apache/iceberg/BaseRemoveUnusedSpecs.java delete mode 100644 core/src/test/java/org/apache/iceberg/TestRemovePartitionSpecs.java diff --git a/api/src/main/java/org/apache/iceberg/ExpireSnapshots.java b/api/src/main/java/org/apache/iceberg/ExpireSnapshots.java index f6524a1d4fba..3bbd7cdde0fc 100644 --- a/api/src/main/java/org/apache/iceberg/ExpireSnapshots.java +++ b/api/src/main/java/org/apache/iceberg/ExpireSnapshots.java @@ -118,4 +118,14 @@ public interface ExpireSnapshots extends PendingUpdate> { * @return this for method chaining */ ExpireSnapshots cleanExpiredFiles(boolean clean); + + /** + * Allows removal of unreachable partition specs as part of the expiration operation + * + * @return this for method chaining + */ + default ExpireSnapshots removeUnusedSpecs() { + throw new UnsupportedOperationException( + this.getClass().getName() + " doesn't implement removeUnusedSpecs"); + } } diff --git a/api/src/main/java/org/apache/iceberg/MetadataMaintenance.java b/api/src/main/java/org/apache/iceberg/MetadataMaintenance.java deleted file mode 100644 index 1ae13c31911d..000000000000 --- a/api/src/main/java/org/apache/iceberg/MetadataMaintenance.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iceberg; - -/** APIs for table metadata maintenance, such as removing unused partition specs. */ -public interface MetadataMaintenance { - /** - * Remove any partition specs from the Metadata that are no longer used in any data files. - * - *

Always preserves the current default spec even if it has not yet been used. - * - * @return a new {@link RemoveUnusedSpecs} - */ - RemoveUnusedSpecs removeUnusedSpecs(); -} diff --git a/api/src/main/java/org/apache/iceberg/RemoveUnusedSpecs.java b/api/src/main/java/org/apache/iceberg/RemoveUnusedSpecs.java deleted file mode 100644 index 7c33780d9231..000000000000 --- a/api/src/main/java/org/apache/iceberg/RemoveUnusedSpecs.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iceberg; - -import java.util.List; - -/** - * API for removing partition specs from the metadata which are not the default spec and no longer - * refer to any datafiles in the table. - * - *

{@link #apply()} returns the specs that will remain if committed on the current metadata - */ -public interface RemoveUnusedSpecs extends PendingUpdate> {} diff --git a/api/src/main/java/org/apache/iceberg/Table.java b/api/src/main/java/org/apache/iceberg/Table.java index 60a35dc5e1f3..97ea9ba76526 100644 --- a/api/src/main/java/org/apache/iceberg/Table.java +++ b/api/src/main/java/org/apache/iceberg/Table.java @@ -211,17 +211,6 @@ default IncrementalChangelogScan newIncrementalChangelogScan() { */ AppendFiles newAppend(); - /** - * Create a new {@link MetadataMaintenance maintenance API} to perform metadata maintenance - * operations. - * - * @return a new {@link MetadataMaintenance} - */ - default MetadataMaintenance maintenance() { - throw new UnsupportedOperationException( - "Maintenance operations are not supported by " + getClass().getName()); - } - /** * Create a new {@link AppendFiles append API} to add files to this table and commit. * diff --git a/core/src/main/java/org/apache/iceberg/BaseMetadataMaintenance.java b/core/src/main/java/org/apache/iceberg/BaseMetadataMaintenance.java deleted file mode 100644 index d8954914fb16..000000000000 --- a/core/src/main/java/org/apache/iceberg/BaseMetadataMaintenance.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iceberg; - -class BaseMetadataMaintenance implements MetadataMaintenance { - private final TableOperations ops; - - BaseMetadataMaintenance(TableOperations ops) { - this.ops = ops; - } - - @Override - public RemoveUnusedSpecs removeUnusedSpecs() { - return new BaseRemoveUnusedSpecs(ops); - } -} diff --git a/core/src/main/java/org/apache/iceberg/BaseRemoveUnusedSpecs.java b/core/src/main/java/org/apache/iceberg/BaseRemoveUnusedSpecs.java deleted file mode 100644 index 96291734f551..000000000000 --- a/core/src/main/java/org/apache/iceberg/BaseRemoveUnusedSpecs.java +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iceberg; - -import static org.apache.iceberg.TableProperties.COMMIT_MAX_RETRY_WAIT_MS; -import static org.apache.iceberg.TableProperties.COMMIT_MAX_RETRY_WAIT_MS_DEFAULT; -import static org.apache.iceberg.TableProperties.COMMIT_MIN_RETRY_WAIT_MS; -import static org.apache.iceberg.TableProperties.COMMIT_MIN_RETRY_WAIT_MS_DEFAULT; -import static org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES; -import static org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES_DEFAULT; -import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS; -import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT; - -import java.io.IOException; -import java.io.UncheckedIOException; -import java.util.List; -import java.util.Set; -import java.util.stream.Collectors; -import org.apache.iceberg.exceptions.CommitFailedException; -import org.apache.iceberg.io.CloseableIterable; -import org.apache.iceberg.relocated.com.google.common.collect.Iterables; -import org.apache.iceberg.relocated.com.google.common.collect.Sets; -import org.apache.iceberg.util.ParallelIterable; -import org.apache.iceberg.util.Tasks; -import org.apache.iceberg.util.ThreadPools; - -/** - * Implementation of RemoveUnusedSpecs API to remove unused partition specs. - * - *

When committing, these changes will be applied to the latest table metadata. Commit conflicts - * will be resolved by recalculating which specs are no longer in use again in the latest metadata - * and retrying. - */ -class BaseRemoveUnusedSpecs implements RemoveUnusedSpecs { - private final TableOperations ops; - - private TableMetadata base; - - BaseRemoveUnusedSpecs(TableOperations ops) { - this.ops = ops; - this.base = ops.current(); - } - - @Override - public List apply() { - TableMetadata newMetadata = internalApply(); - return newMetadata.specs(); - } - - @Override - public void commit() { - this.base = ops.refresh(); - Tasks.foreach(ops) - .retry(base.propertyAsInt(COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT)) - .exponentialBackoff( - base.propertyAsInt(COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT), - base.propertyAsInt(COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT), - base.propertyAsInt(COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT), - 2.0 /* exponential */) - .onlyRetryOn(CommitFailedException.class) - .run( - taskOps -> { - TableMetadata newMetadata = internalApply(); - taskOps.commit(base, newMetadata); - }); - } - - private Iterable reachableManifests() { - Iterable snapshots = base.snapshots(); - Iterable> manifestIterables = - Iterables.transform(snapshots, snapshot -> snapshot.allManifests(ops.io())); - try (CloseableIterable iterable = - new ParallelIterable<>(manifestIterables, ThreadPools.getWorkerPool())) { - return Sets.newHashSet(iterable); - } catch (IOException e) { - throw new UncheckedIOException("Failed to close parallel iterable", e); - } - } - - private TableMetadata internalApply() { - this.base = ops.refresh(); - List specs = base.specs(); - int currentSpecId = base.defaultSpecId(); - - Set specsInUse = - Sets.newHashSet(Iterables.transform(reachableManifests(), ManifestFile::partitionSpecId)); - - specsInUse.add(currentSpecId); - - List specsToRemove = - specs.stream() - .filter(spec -> !specsInUse.contains(spec.specId())) - .collect(Collectors.toList()); - - return TableMetadata.buildFrom(base).removeUnusedSpecs(specsToRemove).build(); - } -} diff --git a/core/src/main/java/org/apache/iceberg/BaseTable.java b/core/src/main/java/org/apache/iceberg/BaseTable.java index 64d4358e804a..a44adc4e9035 100644 --- a/core/src/main/java/org/apache/iceberg/BaseTable.java +++ b/core/src/main/java/org/apache/iceberg/BaseTable.java @@ -170,11 +170,6 @@ public ReplaceSortOrder replaceSortOrder() { return new BaseReplaceSortOrder(ops); } - @Override - public MetadataMaintenance maintenance() { - return new BaseMetadataMaintenance(ops); - } - @Override public UpdateLocation updateLocation() { return new SetLocation(ops); diff --git a/core/src/main/java/org/apache/iceberg/BaseTransaction.java b/core/src/main/java/org/apache/iceberg/BaseTransaction.java index f693ea093658..eb8dbd2538e6 100644 --- a/core/src/main/java/org/apache/iceberg/BaseTransaction.java +++ b/core/src/main/java/org/apache/iceberg/BaseTransaction.java @@ -687,12 +687,6 @@ public UpdateProperties updateProperties() { return BaseTransaction.this.updateProperties(); } - @Override - public MetadataMaintenance maintenance() { - throw new UnsupportedOperationException( - "Cannot perform metadata maintenance as part of a transaction"); - } - @Override public ReplaceSortOrder replaceSortOrder() { return BaseTransaction.this.replaceSortOrder(); diff --git a/core/src/main/java/org/apache/iceberg/FileCleanupStrategy.java b/core/src/main/java/org/apache/iceberg/FileCleanupStrategy.java index dae99c572c78..988aa05ffb68 100644 --- a/core/src/main/java/org/apache/iceberg/FileCleanupStrategy.java +++ b/core/src/main/java/org/apache/iceberg/FileCleanupStrategy.java @@ -35,6 +35,7 @@ abstract class FileCleanupStrategy { private static final Logger LOG = LoggerFactory.getLogger(FileCleanupStrategy.class); protected final FileIO fileIO; + protected final Set referencedPartitionSpecs = Sets.newConcurrentHashSet(); protected final ExecutorService planExecutorService; private final Consumer deleteFunc; private final ExecutorService deleteExecutorService; diff --git a/core/src/main/java/org/apache/iceberg/IncrementalFileCleanup.java b/core/src/main/java/org/apache/iceberg/IncrementalFileCleanup.java index 60ad46e8e864..faa7e21a542d 100644 --- a/core/src/main/java/org/apache/iceberg/IncrementalFileCleanup.java +++ b/core/src/main/java/org/apache/iceberg/IncrementalFileCleanup.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; @@ -256,7 +257,8 @@ public void cleanFiles(TableMetadata beforeExpiration, TableMetadata afterExpira }); Set filesToDelete = - findFilesToDelete(manifestsToScan, manifestsToRevert, validIds, afterExpiration); + findFilesToDelete( + manifestsToScan, manifestsToRevert, validIds, beforeExpiration.specsById()); deleteFiles(filesToDelete, "data"); deleteFiles(manifestsToDelete, "manifest"); @@ -273,7 +275,7 @@ private Set findFilesToDelete( Set manifestsToScan, Set manifestsToRevert, Set validIds, - TableMetadata current) { + Map specsById) { Set filesToDelete = ConcurrentHashMap.newKeySet(); Tasks.foreach(manifestsToScan) .retry(3) @@ -285,8 +287,7 @@ private Set findFilesToDelete( .run( manifest -> { // the manifest has deletes, scan it to find files to delete - try (ManifestReader reader = - ManifestFiles.open(manifest, fileIO, current.specsById())) { + try (ManifestReader reader = ManifestFiles.open(manifest, fileIO, specsById)) { for (ManifestEntry entry : reader.entries()) { // if the snapshot ID of the DELETE entry is no longer valid, the data can be // deleted @@ -311,8 +312,7 @@ private Set findFilesToDelete( .run( manifest -> { // the manifest has deletes, scan it to find files to delete - try (ManifestReader reader = - ManifestFiles.open(manifest, fileIO, current.specsById())) { + try (ManifestReader reader = ManifestFiles.open(manifest, fileIO, specsById)) { for (ManifestEntry entry : reader.entries()) { // delete any ADDED file from manifests that were reverted if (entry.status() == ManifestEntry.Status.ADDED) { diff --git a/core/src/main/java/org/apache/iceberg/MetadataUpdate.java b/core/src/main/java/org/apache/iceberg/MetadataUpdate.java index 676c6796f508..84eca7608cb2 100644 --- a/core/src/main/java/org/apache/iceberg/MetadataUpdate.java +++ b/core/src/main/java/org/apache/iceberg/MetadataUpdate.java @@ -178,7 +178,7 @@ public Set specIds() { @Override public void applyTo(TableMetadata.Builder metadataBuilder) { - metadataBuilder.removeUnusedSpecsById(specIds); + metadataBuilder.removeUnusedSpecs(specIds); } } diff --git a/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java b/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java index 7558ea7d8a3e..b7f13a01558b 100644 --- a/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java +++ b/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java @@ -41,6 +41,7 @@ import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.function.Consumer; +import java.util.stream.Collectors; import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; @@ -85,6 +86,7 @@ public void accept(String file) { private ExecutorService planExecutorService = ThreadPools.getWorkerPool(); private Boolean incrementalCleanup; private boolean specifiedSnapshotId = false; + private boolean removeUnusedSpecs = false; RemoveSnapshots(TableOperations ops) { this.ops = ops; @@ -159,6 +161,12 @@ public ExpireSnapshots planWith(ExecutorService executorService) { return this; } + @Override + public ExpireSnapshots removeUnusedSpecs() { + this.removeUnusedSpecs = true; + return this; + } + @Override public List apply() { TableMetadata updated = internalApply(); @@ -209,6 +217,24 @@ private TableMetadata internalApply() { .forEach(idsToRemove::add); updatedMetaBuilder.removeSnapshots(idsToRemove); + if (removeUnusedSpecs) { + Set reachableSpecs = Sets.newConcurrentHashSet(); + reachableSpecs.add(base.defaultSpecId()); + Tasks.foreach(idsToRetain) + .executeWith(planExecutorService) + .run( + snapshot -> + base.snapshot(snapshot).allManifests(ops.io()).stream() + .map(ManifestFile::partitionSpecId) + .forEach(reachableSpecs::add)); + Set specsToRemove = + base.specs().stream() + .map(PartitionSpec::specId) + .filter(specId -> !reachableSpecs.contains(specId)) + .collect(Collectors.toSet()); + updatedMetaBuilder.removeUnusedSpecs(specsToRemove); + } + return updatedMetaBuilder.build(); } diff --git a/core/src/main/java/org/apache/iceberg/SerializableTable.java b/core/src/main/java/org/apache/iceberg/SerializableTable.java index 109e1d2f05b5..082e50b840dc 100644 --- a/core/src/main/java/org/apache/iceberg/SerializableTable.java +++ b/core/src/main/java/org/apache/iceberg/SerializableTable.java @@ -343,11 +343,6 @@ public AppendFiles newAppend() { throw new UnsupportedOperationException(errorMsg("newAppend")); } - @Override - public MetadataMaintenance maintenance() { - throw new UnsupportedOperationException(errorMsg("metadataMaintenance")); - } - @Override public RewriteFiles newRewrite() { throw new UnsupportedOperationException(errorMsg("newRewrite")); diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java b/core/src/main/java/org/apache/iceberg/TableMetadata.java index f1ec69ca27e4..fc7ed897392a 100644 --- a/core/src/main/java/org/apache/iceberg/TableMetadata.java +++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java @@ -1108,29 +1108,7 @@ public Builder setDefaultPartitionSpec(int specId) { return this; } - Builder removeUnusedSpecs(Iterable specsToRemove) { - Set specIdsToRemove = Sets.newHashSet(); - for (PartitionSpec spec : specsToRemove) { - Preconditions.checkArgument( - spec.specId() != defaultSpecId, "Cannot remove default partition spec"); - PartitionSpec toBeRemoved = specsById.remove(spec.specId()); - Preconditions.checkArgument( - toBeRemoved == null || toBeRemoved.equals(spec), - "Cannot remove an unknown spec, spec id: %s", - spec.specId()); - if (toBeRemoved != null) { - specIdsToRemove.add(spec.specId()); - } - } - this.specs = - specs.stream() - .filter(s -> !specIdsToRemove.contains(s.specId())) - .collect(Collectors.toList()); - changes.add(new MetadataUpdate.RemovePartitionSpecs(specIdsToRemove)); - return this; - } - - Builder removeUnusedSpecsById(Iterable specIds) { + Builder removeUnusedSpecs(Iterable specIds) { Set specIdsToRemove = Sets.newHashSet(); for (Integer specId : specIds) { Preconditions.checkArgument( diff --git a/core/src/test/java/org/apache/iceberg/TestRemovePartitionSpecs.java b/core/src/test/java/org/apache/iceberg/TestRemovePartitionSpecs.java deleted file mode 100644 index fd67f38fb4ac..000000000000 --- a/core/src/test/java/org/apache/iceberg/TestRemovePartitionSpecs.java +++ /dev/null @@ -1,127 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iceberg; - -import static org.assertj.core.api.Assertions.assertThat; - -import org.apache.iceberg.expressions.Expressions; -import org.apache.iceberg.types.Types; -import org.junit.jupiter.api.TestTemplate; - -public class TestRemovePartitionSpecs extends TestBase { - - @TestTemplate - public void testRemoveAllButCurrent() { - table - .updateSchema() - .addColumn("ts", Types.TimestampType.withoutZone()) - .addColumn("category", Types.StringType.get()) - .commit(); - table.updateSpec().addField("id").commit(); - table.updateSpec().addField("ts").commit(); - table.updateSpec().addField("category").commit(); - table.updateSpec().addField("data").commit(); - assertThat(table.specs().size()).as("Added specs should be present").isEqualTo(5); - - PartitionSpec currentSpec = table.spec(); - table.maintenance().removeUnusedSpecs().commit(); - - assertThat(table.specs().size()).as("All but current spec should be removed").isEqualTo(1); - assertThat(table.spec()).as("Current spec shall not change").isEqualTo(currentSpec); - } - - @TestTemplate - public void testDontRemoveInUseSpecs() { - table - .updateSchema() - .addColumn("ts", Types.LongType.get()) - .addColumn("category", Types.StringType.get()) - .commit(); - - table.updateSpec().addField("id").commit(); // 1 - table.newAppend().appendFile(newDataFile("data_bucket=0/id=5")).commit(); - - table.updateSpec().addField("ts").commit(); // 2 - - table.updateSpec().addField("category").commit(); // 3 - if (formatVersion == 1) { - table.newAppend().appendFile(newDataFile("data_bucket=0/id=5/ts=100/category=fo")).commit(); - } else { - table - .newRowDelta() - .addDeletes(newDeleteFile(table.spec().specId(), "data_bucket=0/id=5/ts=100/category=fo")) - .commit(); - } - - table.updateSpec().addField("data").commit(); // 4 - assertThat(table.specs()).size().as("Added specs should be present").isEqualTo(5); - - PartitionSpec currentSpec = table.spec(); - table.maintenance().removeUnusedSpecs().commit(); - assertThat(table.specs().keySet()).as("Unused specs are removed").containsExactly(1, 3, 4); - assertThat(table.spec()).as("Current spec shall not change").isEqualTo(currentSpec); - } - - @TestTemplate - public void testRemoveUnpartitionedSpec() { - // clean it first to reset to unpartitioned - cleanupTables(); - this.table = create(SCHEMA, PartitionSpec.unpartitioned()); - DataFile file = - DataFiles.builder(table.spec()) - .withPath("/path/to/data-0.parquet") - .withFileSizeInBytes(10) - .withRecordCount(100) - .build(); - table.newAppend().appendFile(file).commit(); - - table.updateSpec().addField("data_bucket", Expressions.bucket("data", 16)).commit(); - - // removeUnusedPartitionSpec shall not remove the unpartitioned spec - table.maintenance().removeUnusedSpecs().commit(); - assertThat(table.specs().keySet()).as("unpartitioned spec is still used").containsExactly(0, 1); - - table.newDelete().deleteFile(file).commit(); - DataFile bucketFile = - DataFiles.builder(table.spec()) - .withPath("/path/to/data-0.parquet") - .withFileSizeInBytes(10) - .withRecordCount(100) - .withPartitionPath("data_bucket=0") - .build(); - table.newAppend().appendFile(bucketFile).commit(); - table.expireSnapshots().expireOlderThan(System.currentTimeMillis()).commit(); - - table.maintenance().removeUnusedSpecs().commit(); - assertThat(table.specs().keySet()) - .as("unpartitioned spec should be removed") - .containsExactly(1); - - table.updateSpec().removeField("data_bucket").commit(); - assertThat(table.spec().isUnpartitioned()).as("Should equal to unpartitioned").isTrue(); - - int unpartitionedFieldsSize = formatVersion == 1 ? 1 : 0; - assertThat(table.spec().fields().size()) - .as("Should have one void transform for v1 and empty for v2") - .isEqualTo(unpartitionedFieldsSize); - assertThat(table.spec().specId()) - .as("unpartitioned is evolved to use a new SpecId") - .isEqualTo(2); - } -} diff --git a/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java b/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java index f95fe6191e43..ed16236d1cbd 100644 --- a/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java +++ b/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java @@ -37,6 +37,7 @@ import java.util.stream.Collectors; import org.apache.iceberg.ManifestEntry.Status; import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.PositionOutputStream; import org.apache.iceberg.puffin.Blob; @@ -1620,6 +1621,90 @@ public void testRetainFilesOnRetainedBranches() { assertThat(deletedFiles).isEqualTo(expectedDeletes); } + @TestTemplate + public void testRemoveSpecDuringExpiration() { + DataFile file = + DataFiles.builder(table.spec()) + .withPath("/path/to/data-0.parquet") + .withPartitionPath("data_bucket=0") + .withFileSizeInBytes(10) + .withRecordCount(100) + .build(); + table.newAppend().appendFile(file).commit(); + Snapshot append = table.currentSnapshot(); + String appendManifest = + Iterables.getOnlyElement( + table.currentSnapshot().allManifests(table.io()).stream() + .map(ManifestFile::path) + .collect(Collectors.toList())); + table.newDelete().deleteFile(file).commit(); + Snapshot delete = table.currentSnapshot(); + String deleteManifest = + Iterables.getOnlyElement( + table.currentSnapshot().allManifests(table.io()).stream() + .map(ManifestFile::path) + .collect(Collectors.toList())); + + table.updateSpec().addField("id_bucket", Expressions.bucket("id", 16)).commit(); + PartitionSpec idAndDataBucketSpec = table.spec(); + DataFile bucketFile = + DataFiles.builder(table.spec()) + .withPath("/path/to/data-0-id-0.parquet") + .withFileSizeInBytes(10) + .withRecordCount(100) + .withPartitionPath("data_bucket=0/id_bucket=0") + .build(); + table.newAppend().appendFile(bucketFile).commit(); + + Set deletedFiles = Sets.newHashSet(); + // Expiring snapshots should remove the data_bucket partition + removeSnapshots(table) + .expireOlderThan(System.currentTimeMillis()) + .removeUnusedSpecs() + .deleteWith(deletedFiles::add) + .commit(); + + assertThat(deletedFiles) + .containsExactlyInAnyOrder( + appendManifest, + deleteManifest, + file.location(), + append.manifestListLocation(), + delete.manifestListLocation()); + assertThat(Iterables.getOnlyElement(table.specs().keySet())) + .as("Only id_bucket + data_bucket transform should exist") + .isEqualTo(idAndDataBucketSpec.specId()); + } + + @TestTemplate + public void testRemoveSpecsDoesntRemoveDefaultSpec() throws IOException { + // The default spec for table is bucketed on data, but write using unpartitioned + PartitionSpec dataBucketSpec = table.spec(); + DataFile file = + DataFiles.builder(PartitionSpec.unpartitioned()) + .withPath("/path/to/data-0.parquet") + .withFileSizeInBytes(10) + .withRecordCount(100) + .build(); + + table.newAppend().appendFile(file).commit(); + Snapshot append = table.currentSnapshot(); + table.newDelete().deleteFile(file).commit(); + + Set deletedFiles = Sets.newHashSet(); + // Expiring snapshots should remove only the unpartitioned spec + removeSnapshots(table) + .expireOlderThan(System.currentTimeMillis()) + .removeUnusedSpecs() + .deleteWith(deletedFiles::add) + .commit(); + + assertThat(deletedFiles).containsExactlyInAnyOrder(append.manifestListLocation()); + assertThat(Iterables.getOnlyElement(table.specs().keySet())) + .as("Only data_bucket transform should exist") + .isEqualTo(dataBucketSpec.specId()); + } + private Set manifestPaths(Snapshot snapshot, FileIO io) { return snapshot.allManifests(io).stream().map(ManifestFile::path).collect(Collectors.toSet()); } diff --git a/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java b/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java index a4b928f27cfc..5402a13d7d4b 100644 --- a/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java +++ b/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java @@ -40,7 +40,6 @@ import org.apache.iceberg.HasTableOperations; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.ReachableFileUtil; -import org.apache.iceberg.RemoveUnusedSpecs; import org.apache.iceberg.ReplaceSortOrder; import org.apache.iceberg.Schema; import org.apache.iceberg.SortOrder; @@ -1154,28 +1153,6 @@ public void testUpdateTableSpec() { .isEqualTo(expected.fields()); } - @Test - public void testRemoveUnusedSpec() { - C catalog = catalog(); - - if (requiresNamespaceCreate()) { - catalog.createNamespace(NS); - } - - Table table = catalog.buildTable(TABLE, SCHEMA).create(); - table.updateSpec().addField("shard", Expressions.bucket("id", 16)).commit(); - table.updateSpec().addField("data").commit(); - int currentSpecId = table.spec().specId(); - - RemoveUnusedSpecs remove = table.maintenance().removeUnusedSpecs(); - List specsToRetain = remove.apply(); - assertThat(specsToRetain).hasSize(1).map(PartitionSpec::specId).contains(currentSpecId); - - remove.commit(); - Table loaded = catalog.loadTable(TABLE); - assertThat(loaded.specs().values()).hasSameElementsAs(specsToRetain); - } - @Test public void testUpdateTableSpecServerSideRetry() { Assumptions.assumeTrue( From 5b214382c270aad4c4e3ce560bacf5023c79b5af Mon Sep 17 00:00:00 2001 From: advancedxy <807537+advancedxy@users.noreply.github.com> Date: Sun, 3 Nov 2024 00:16:08 +0800 Subject: [PATCH 04/11] refine --- .../org/apache/iceberg/ExpireSnapshots.java | 4 ++- .../apache/iceberg/FileCleanupStrategy.java | 1 - .../org/apache/iceberg/RemoveSnapshots.java | 4 +-- .../apache/iceberg/TestRemoveSnapshots.java | 4 +-- .../apache/iceberg/catalog/CatalogTests.java | 29 +++++++++++++++++++ 5 files changed, 36 insertions(+), 6 deletions(-) diff --git a/api/src/main/java/org/apache/iceberg/ExpireSnapshots.java b/api/src/main/java/org/apache/iceberg/ExpireSnapshots.java index 3bbd7cdde0fc..95622394d87d 100644 --- a/api/src/main/java/org/apache/iceberg/ExpireSnapshots.java +++ b/api/src/main/java/org/apache/iceberg/ExpireSnapshots.java @@ -122,9 +122,11 @@ public interface ExpireSnapshots extends PendingUpdate> { /** * Allows removal of unreachable partition specs as part of the expiration operation * + * @param removeUnusedSpecs setting this to true will remove partition specs that are no longer + * reachable by any snapshot * @return this for method chaining */ - default ExpireSnapshots removeUnusedSpecs() { + default ExpireSnapshots removeUnusedSpecs(boolean removeUnusedSpecs) { throw new UnsupportedOperationException( this.getClass().getName() + " doesn't implement removeUnusedSpecs"); } diff --git a/core/src/main/java/org/apache/iceberg/FileCleanupStrategy.java b/core/src/main/java/org/apache/iceberg/FileCleanupStrategy.java index 988aa05ffb68..dae99c572c78 100644 --- a/core/src/main/java/org/apache/iceberg/FileCleanupStrategy.java +++ b/core/src/main/java/org/apache/iceberg/FileCleanupStrategy.java @@ -35,7 +35,6 @@ abstract class FileCleanupStrategy { private static final Logger LOG = LoggerFactory.getLogger(FileCleanupStrategy.class); protected final FileIO fileIO; - protected final Set referencedPartitionSpecs = Sets.newConcurrentHashSet(); protected final ExecutorService planExecutorService; private final Consumer deleteFunc; private final ExecutorService deleteExecutorService; diff --git a/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java b/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java index b7f13a01558b..8d1a3fc04c7d 100644 --- a/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java +++ b/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java @@ -162,8 +162,8 @@ public ExpireSnapshots planWith(ExecutorService executorService) { } @Override - public ExpireSnapshots removeUnusedSpecs() { - this.removeUnusedSpecs = true; + public ExpireSnapshots removeUnusedSpecs(boolean remove) { + this.removeUnusedSpecs = remove; return this; } diff --git a/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java b/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java index ed16236d1cbd..baeceb34db46 100644 --- a/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java +++ b/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java @@ -1660,7 +1660,7 @@ public void testRemoveSpecDuringExpiration() { // Expiring snapshots should remove the data_bucket partition removeSnapshots(table) .expireOlderThan(System.currentTimeMillis()) - .removeUnusedSpecs() + .removeUnusedSpecs(true) .deleteWith(deletedFiles::add) .commit(); @@ -1695,7 +1695,7 @@ public void testRemoveSpecsDoesntRemoveDefaultSpec() throws IOException { // Expiring snapshots should remove only the unpartitioned spec removeSnapshots(table) .expireOlderThan(System.currentTimeMillis()) - .removeUnusedSpecs() + .removeUnusedSpecs(true) .deleteWith(deletedFiles::add) .commit(); diff --git a/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java b/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java index 5402a13d7d4b..d9437170bbd1 100644 --- a/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java +++ b/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java @@ -1284,6 +1284,35 @@ public void testUpdateTableSpecThenRevert() { assertThat(table.spec()).as("Loaded table should have expected spec").isEqualTo(TABLE_SPEC); } + @Test + public void testRemoveUnusedSpec() { + C catalog = catalog(); + + if (requiresNamespaceCreate()) { + catalog.createNamespace(NS); + } + + Table table = + catalog + .buildTable(TABLE, SCHEMA) + .withPartitionSpec(SPEC) + .withProperty(TableProperties.GC_ENABLED, "true") + .create(); + PartitionSpec spec = table.spec(); + // added some file to trigger expire snapshot + table.newFastAppend().appendFile(FILE_A).commit(); + table.updateSpec().addField(Expressions.bucket("data", 16)).commit(); + table.updateSpec().removeField(Expressions.bucket("data", 16)).commit(); + table.updateSpec().addField("data").commit(); + assertThat(table.specs().size()).as("Should have 3 total specs").isEqualTo(3); + PartitionSpec current = table.spec(); + table.expireSnapshots().removeUnusedSpecs(true).commit(); + + Table loaded = catalog.loadTable(TABLE); + assertThat(loaded.specs().values()) + .hasSameElementsAs(Lists.asList(spec, current, new PartitionSpec[0])); + } + @Test public void testUpdateTableSortOrder() { C catalog = catalog(); From 5ee0f610a4b65bdb633f3a9396924b9a9b637aba Mon Sep 17 00:00:00 2001 From: advancedxy <807537+advancedxy@users.noreply.github.com> Date: Tue, 12 Nov 2024 11:24:53 +0800 Subject: [PATCH 05/11] address comments --- .../main/java/org/apache/iceberg/ExpireSnapshots.java | 10 +++++----- .../main/java/org/apache/iceberg/RemoveSnapshots.java | 9 +++++---- .../java/org/apache/iceberg/TestRemoveSnapshots.java | 4 ++-- .../java/org/apache/iceberg/catalog/CatalogTests.java | 2 +- 4 files changed, 13 insertions(+), 12 deletions(-) diff --git a/api/src/main/java/org/apache/iceberg/ExpireSnapshots.java b/api/src/main/java/org/apache/iceberg/ExpireSnapshots.java index 95622394d87d..31b572c799c8 100644 --- a/api/src/main/java/org/apache/iceberg/ExpireSnapshots.java +++ b/api/src/main/java/org/apache/iceberg/ExpireSnapshots.java @@ -120,14 +120,14 @@ public interface ExpireSnapshots extends PendingUpdate> { ExpireSnapshots cleanExpiredFiles(boolean clean); /** - * Allows removal of unreachable partition specs as part of the expiration operation + * Allows expiration of unreachable metadata, such as partition specs as part of the operation. * - * @param removeUnusedSpecs setting this to true will remove partition specs that are no longer - * reachable by any snapshot + * @param clean setting this to true will remove metadata(such as partition spec) that are no + * longer reachable by any snapshot * @return this for method chaining */ - default ExpireSnapshots removeUnusedSpecs(boolean removeUnusedSpecs) { + default ExpireSnapshots cleanExpiredMeta(boolean clean) { throw new UnsupportedOperationException( - this.getClass().getName() + " doesn't implement removeUnusedSpecs"); + this.getClass().getName() + " doesn't implement cleanExpiredMeta"); } } diff --git a/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java b/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java index 8d1a3fc04c7d..a22a0687db9c 100644 --- a/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java +++ b/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java @@ -86,7 +86,7 @@ public void accept(String file) { private ExecutorService planExecutorService = ThreadPools.getWorkerPool(); private Boolean incrementalCleanup; private boolean specifiedSnapshotId = false; - private boolean removeUnusedSpecs = false; + private boolean cleanExpiredMeta = false; RemoveSnapshots(TableOperations ops) { this.ops = ops; @@ -162,8 +162,8 @@ public ExpireSnapshots planWith(ExecutorService executorService) { } @Override - public ExpireSnapshots removeUnusedSpecs(boolean remove) { - this.removeUnusedSpecs = remove; + public ExpireSnapshots cleanExpiredMeta(boolean clean) { + this.cleanExpiredMeta = clean; return this; } @@ -217,7 +217,8 @@ private TableMetadata internalApply() { .forEach(idsToRemove::add); updatedMetaBuilder.removeSnapshots(idsToRemove); - if (removeUnusedSpecs) { + if (cleanExpiredMeta) { + // TODO: Support cleaning expired schema as well. Set reachableSpecs = Sets.newConcurrentHashSet(); reachableSpecs.add(base.defaultSpecId()); Tasks.foreach(idsToRetain) diff --git a/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java b/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java index baeceb34db46..eebf30a4428d 100644 --- a/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java +++ b/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java @@ -1660,7 +1660,7 @@ public void testRemoveSpecDuringExpiration() { // Expiring snapshots should remove the data_bucket partition removeSnapshots(table) .expireOlderThan(System.currentTimeMillis()) - .removeUnusedSpecs(true) + .cleanExpiredMeta(true) .deleteWith(deletedFiles::add) .commit(); @@ -1695,7 +1695,7 @@ public void testRemoveSpecsDoesntRemoveDefaultSpec() throws IOException { // Expiring snapshots should remove only the unpartitioned spec removeSnapshots(table) .expireOlderThan(System.currentTimeMillis()) - .removeUnusedSpecs(true) + .cleanExpiredMeta(true) .deleteWith(deletedFiles::add) .commit(); diff --git a/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java b/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java index d9437170bbd1..820b9da515a4 100644 --- a/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java +++ b/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java @@ -1306,7 +1306,7 @@ public void testRemoveUnusedSpec() { table.updateSpec().addField("data").commit(); assertThat(table.specs().size()).as("Should have 3 total specs").isEqualTo(3); PartitionSpec current = table.spec(); - table.expireSnapshots().removeUnusedSpecs(true).commit(); + table.expireSnapshots().cleanExpiredMeta(true).commit(); Table loaded = catalog.loadTable(TABLE); assertThat(loaded.specs().values()) From 0cd6e51947b7ce1d69a281831d0b2cec8f1ba775 Mon Sep 17 00:00:00 2001 From: advancedxy <807537+advancedxy@users.noreply.github.com> Date: Mon, 18 Nov 2024 20:34:45 +0800 Subject: [PATCH 06/11] address comments --- .../apache/iceberg/MetadataUpdateParser.java | 11 ++- .../iceberg/TestMetadataUpdateParser.java | 21 +++++ .../apache/iceberg/TestRemoveSnapshots.java | 8 +- .../iceberg/TestUpdateRequirements.java | 83 +++++++++++++++++++ .../apache/iceberg/catalog/CatalogTests.java | 5 +- 5 files changed, 115 insertions(+), 13 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/MetadataUpdateParser.java b/core/src/main/java/org/apache/iceberg/MetadataUpdateParser.java index d6f55d215910..08d4b3398f10 100644 --- a/core/src/main/java/org/apache/iceberg/MetadataUpdateParser.java +++ b/core/src/main/java/org/apache/iceberg/MetadataUpdateParser.java @@ -128,7 +128,7 @@ private MetadataUpdateParser() {} private static final String VIEW_VERSION_ID = "view-version-id"; // RemovePartitionSpecs - private static final String PARTITION_SPEC_IDS = "partition-spec-ids"; + private static final String SPEC_IDS = "spec-ids"; private static final Map, String> ACTIONS = ImmutableMap., String>builder() @@ -321,7 +321,7 @@ public static MetadataUpdate fromJson(JsonNode jsonNode) { case SET_CURRENT_VIEW_VERSION: return readCurrentViewVersionId(jsonNode); case REMOVE_PARTITION_SPECS: - return readRemoveUnusedSpecs(jsonNode); + return readRemovePartitionSpecs(jsonNode); default: throw new UnsupportedOperationException( String.format("Cannot convert metadata update action to json: %s", action)); @@ -459,7 +459,7 @@ private static void writeSetCurrentViewVersionId( private static void writeRemovePartitionSpecs( MetadataUpdate.RemovePartitionSpecs metadataUpdate, JsonGenerator gen) throws IOException { - JsonUtil.writeIntegerArray(PARTITION_SPEC_IDS, metadataUpdate.specIds(), gen); + JsonUtil.writeIntegerArray(SPEC_IDS, metadataUpdate.specIds(), gen); } private static MetadataUpdate readAssignUUID(JsonNode node) { @@ -612,8 +612,7 @@ private static MetadataUpdate readCurrentViewVersionId(JsonNode node) { return new MetadataUpdate.SetCurrentViewVersion(JsonUtil.getInt(VIEW_VERSION_ID, node)); } - private static MetadataUpdate readRemoveUnusedSpecs(JsonNode node) { - return new MetadataUpdate.RemovePartitionSpecs( - JsonUtil.getIntegerSet(PARTITION_SPEC_IDS, node)); + private static MetadataUpdate readRemovePartitionSpecs(JsonNode node) { + return new MetadataUpdate.RemovePartitionSpecs(JsonUtil.getIntegerSet(SPEC_IDS, node)); } } diff --git a/core/src/test/java/org/apache/iceberg/TestMetadataUpdateParser.java b/core/src/test/java/org/apache/iceberg/TestMetadataUpdateParser.java index bfed6ebebe2c..8af0e5e7e479 100644 --- a/core/src/test/java/org/apache/iceberg/TestMetadataUpdateParser.java +++ b/core/src/test/java/org/apache/iceberg/TestMetadataUpdateParser.java @@ -926,6 +926,17 @@ public void testRemovePartitionStatistics() { .isEqualTo(json); } + @Test + public void testRemovePartitionSpec() { + String action = MetadataUpdateParser.REMOVE_PARTITION_SPECS; + String json = "{\"action\":\"remove-partition-specs\",\"spec-ids\":[1,2,3]}"; + MetadataUpdate expected = new MetadataUpdate.RemovePartitionSpecs(ImmutableSet.of(1, 2, 3)); + assertEquals(action, expected, MetadataUpdateParser.fromJson(json)); + assertThat(MetadataUpdateParser.toJson(expected)) + .as("Remove partition specs should convert to the correct JSON value") + .isEqualTo(json); + } + public void assertEquals( String action, MetadataUpdate expectedUpdate, MetadataUpdate actualUpdate) { switch (action) { @@ -1030,6 +1041,11 @@ public void assertEquals( (MetadataUpdate.SetCurrentViewVersion) expectedUpdate, (MetadataUpdate.SetCurrentViewVersion) actualUpdate); break; + case MetadataUpdateParser.REMOVE_PARTITION_SPECS: + assertEqualsRemovePartitionSpecs( + (MetadataUpdate.RemovePartitionSpecs) expectedUpdate, + (MetadataUpdate.RemovePartitionSpecs) actualUpdate); + break; default: fail("Unrecognized metadata update action: " + action); } @@ -1251,6 +1267,11 @@ private static void assertEqualsSetCurrentViewVersion( assertThat(actual.versionId()).isEqualTo(expected.versionId()); } + private static void assertEqualsRemovePartitionSpecs( + MetadataUpdate.RemovePartitionSpecs expected, MetadataUpdate.RemovePartitionSpecs actual) { + assertThat(actual.specIds()).containsExactlyInAnyOrderElementsOf(expected.specIds()); + } + private String createManifestListWithManifestFiles(long snapshotId, Long parentSnapshotId) throws IOException { File manifestList = File.createTempFile("manifests", null, temp.toFile()); diff --git a/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java b/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java index eebf30a4428d..7c81fcba8491 100644 --- a/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java +++ b/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java @@ -1671,9 +1671,9 @@ public void testRemoveSpecDuringExpiration() { file.location(), append.manifestListLocation(), delete.manifestListLocation()); - assertThat(Iterables.getOnlyElement(table.specs().keySet())) + assertThat(table.specs().keySet()) .as("Only id_bucket + data_bucket transform should exist") - .isEqualTo(idAndDataBucketSpec.specId()); + .containsExactly(idAndDataBucketSpec.specId()); } @TestTemplate @@ -1700,9 +1700,9 @@ public void testRemoveSpecsDoesntRemoveDefaultSpec() throws IOException { .commit(); assertThat(deletedFiles).containsExactlyInAnyOrder(append.manifestListLocation()); - assertThat(Iterables.getOnlyElement(table.specs().keySet())) + assertThat(table.specs().keySet()) .as("Only data_bucket transform should exist") - .isEqualTo(dataBucketSpec.specId()); + .containsExactly(dataBucketSpec.specId()); } private Set manifestPaths(Snapshot snapshot, FileIO io) { diff --git a/core/src/test/java/org/apache/iceberg/TestUpdateRequirements.java b/core/src/test/java/org/apache/iceberg/TestUpdateRequirements.java index 1a6c289ea241..f9a3918e3dab 100644 --- a/core/src/test/java/org/apache/iceberg/TestUpdateRequirements.java +++ b/core/src/test/java/org/apache/iceberg/TestUpdateRequirements.java @@ -424,6 +424,89 @@ public void setDefaultPartitionSpecFailure() { .hasMessage("Requirement failed: default partition spec changed: expected id 0 != 1"); } + @Test + public void testRemovePartitionSpec() { + int defaultSpecId = 3; + when(metadata.defaultSpecId()).thenReturn(defaultSpecId); + // empty refs + when(metadata.refs()).thenReturn(ImmutableMap.of()); + + List requirements = + UpdateRequirements.forUpdateTable( + metadata, + ImmutableList.of(new MetadataUpdate.RemovePartitionSpecs(Sets.newHashSet(1, 2)))); + + assertThat(requirements) + .hasSize(2) + .hasOnlyElementsOfTypes( + UpdateRequirement.AssertTableUUID.class, UpdateRequirement.AssertDefaultSpecID.class); + + assertTableUUID(requirements); + + assertThat(requirements) + .element(1) + .asInstanceOf(InstanceOfAssertFactories.type(UpdateRequirement.AssertDefaultSpecID.class)) + .extracting(UpdateRequirement.AssertDefaultSpecID::specId) + .isEqualTo(defaultSpecId); + } + + @Test + public void testRemovePartitionSpecsWithRefs() { + int defaultSpecId = 3; + long snapshotId = 42L; + when(metadata.defaultSpecId()).thenReturn(defaultSpecId); + + String branch = "branch"; + SnapshotRef snapshotRef = mock(SnapshotRef.class); + when(snapshotRef.snapshotId()).thenReturn(snapshotId); + when(snapshotRef.isBranch()).thenReturn(true); + when(metadata.refs()).thenReturn(ImmutableMap.of(branch, snapshotRef)); + + List requirements = + UpdateRequirements.forUpdateTable( + metadata, + ImmutableList.of(new MetadataUpdate.RemovePartitionSpecs(Sets.newHashSet(1, 2)))); + + assertThat(requirements) + .hasSize(3) + .hasOnlyElementsOfTypes( + UpdateRequirement.AssertTableUUID.class, + UpdateRequirement.AssertDefaultSpecID.class, + UpdateRequirement.AssertRefSnapshotID.class); + + assertTableUUID(requirements); + + assertThat(requirements) + .element(1) + .asInstanceOf(InstanceOfAssertFactories.type(UpdateRequirement.AssertDefaultSpecID.class)) + .extracting(UpdateRequirement.AssertDefaultSpecID::specId) + .isEqualTo(defaultSpecId); + + assertThat(requirements) + .element(2) + .asInstanceOf(InstanceOfAssertFactories.type(UpdateRequirement.AssertRefSnapshotID.class)) + .extracting(UpdateRequirement.AssertRefSnapshotID::snapshotId) + .isEqualTo(snapshotId); + } + + @Test + public void testRemovePartitionSpecsFailure() { + int defaultSpecId = 3; + when(metadata.defaultSpecId()).thenReturn(defaultSpecId); + when(updated.defaultSpecId()).thenReturn(defaultSpecId + 1); + + List requirements = + UpdateRequirements.forUpdateTable( + metadata, + ImmutableList.of(new MetadataUpdate.RemovePartitionSpecs(Sets.newHashSet(1, 2)))); + + assertThatThrownBy(() -> requirements.forEach(req -> req.validate(updated))) + .isInstanceOf(CommitFailedException.class) + .hasMessage( + "Requirement failed: default partition spec changed: expected id %s != %s", + defaultSpecId, defaultSpecId + 1); + } + @Test public void addSortOrder() { List requirements = diff --git a/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java b/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java index 820b9da515a4..903718070866 100644 --- a/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java +++ b/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java @@ -1304,13 +1304,12 @@ public void testRemoveUnusedSpec() { table.updateSpec().addField(Expressions.bucket("data", 16)).commit(); table.updateSpec().removeField(Expressions.bucket("data", 16)).commit(); table.updateSpec().addField("data").commit(); - assertThat(table.specs().size()).as("Should have 3 total specs").isEqualTo(3); + assertThat(table.specs()).as("Should have 3 total specs").hasSize(3); PartitionSpec current = table.spec(); table.expireSnapshots().cleanExpiredMeta(true).commit(); Table loaded = catalog.loadTable(TABLE); - assertThat(loaded.specs().values()) - .hasSameElementsAs(Lists.asList(spec, current, new PartitionSpec[0])); + assertThat(loaded.specs().values()).containsExactly(spec, current); } @Test From a4635e930a1781136e1e4e52b23f59c7369ad741 Mon Sep 17 00:00:00 2001 From: advancedxy <807537+advancedxy@users.noreply.github.com> Date: Sun, 24 Nov 2024 23:29:27 +0800 Subject: [PATCH 07/11] address comments --- .../org/apache/iceberg/ExpireSnapshots.java | 6 +-- .../org/apache/iceberg/MetadataUpdate.java | 2 +- .../org/apache/iceberg/RemoveSnapshots.java | 10 ++--- .../org/apache/iceberg/TableMetadata.java | 3 +- .../apache/iceberg/UpdateRequirements.java | 3 +- .../apache/iceberg/TestRemoveSnapshots.java | 4 +- .../iceberg/TestUpdateRequirements.java | 7 ++-- .../apache/iceberg/catalog/CatalogTests.java | 38 ++++++++++++++++--- 8 files changed, 52 insertions(+), 21 deletions(-) diff --git a/api/src/main/java/org/apache/iceberg/ExpireSnapshots.java b/api/src/main/java/org/apache/iceberg/ExpireSnapshots.java index 31b572c799c8..08f284a5dfc9 100644 --- a/api/src/main/java/org/apache/iceberg/ExpireSnapshots.java +++ b/api/src/main/java/org/apache/iceberg/ExpireSnapshots.java @@ -120,13 +120,13 @@ public interface ExpireSnapshots extends PendingUpdate> { ExpireSnapshots cleanExpiredFiles(boolean clean); /** - * Allows expiration of unreachable metadata, such as partition specs as part of the operation. + * Allows expiration of unreachable metadata, such as partition spec as part of the operation. * - * @param clean setting this to true will remove metadata(such as partition spec) that are no + * @param clean setting this to true will remove metadata(such as partition spec) that is no * longer reachable by any snapshot * @return this for method chaining */ - default ExpireSnapshots cleanExpiredMeta(boolean clean) { + default ExpireSnapshots cleanExpiredMetadata(boolean clean) { throw new UnsupportedOperationException( this.getClass().getName() + " doesn't implement cleanExpiredMeta"); } diff --git a/core/src/main/java/org/apache/iceberg/MetadataUpdate.java b/core/src/main/java/org/apache/iceberg/MetadataUpdate.java index 84eca7608cb2..548e661c62b4 100644 --- a/core/src/main/java/org/apache/iceberg/MetadataUpdate.java +++ b/core/src/main/java/org/apache/iceberg/MetadataUpdate.java @@ -178,7 +178,7 @@ public Set specIds() { @Override public void applyTo(TableMetadata.Builder metadataBuilder) { - metadataBuilder.removeUnusedSpecs(specIds); + metadataBuilder.removeSpecIds(specIds); } } diff --git a/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java b/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java index a22a0687db9c..994bde8f589e 100644 --- a/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java +++ b/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java @@ -86,7 +86,7 @@ public void accept(String file) { private ExecutorService planExecutorService = ThreadPools.getWorkerPool(); private Boolean incrementalCleanup; private boolean specifiedSnapshotId = false; - private boolean cleanExpiredMeta = false; + private boolean cleanExpiredMetadata = false; RemoveSnapshots(TableOperations ops) { this.ops = ops; @@ -162,8 +162,8 @@ public ExpireSnapshots planWith(ExecutorService executorService) { } @Override - public ExpireSnapshots cleanExpiredMeta(boolean clean) { - this.cleanExpiredMeta = clean; + public ExpireSnapshots cleanExpiredMetadata(boolean clean) { + this.cleanExpiredMetadata = clean; return this; } @@ -217,7 +217,7 @@ private TableMetadata internalApply() { .forEach(idsToRemove::add); updatedMetaBuilder.removeSnapshots(idsToRemove); - if (cleanExpiredMeta) { + if (cleanExpiredMetadata) { // TODO: Support cleaning expired schema as well. Set reachableSpecs = Sets.newConcurrentHashSet(); reachableSpecs.add(base.defaultSpecId()); @@ -233,7 +233,7 @@ private TableMetadata internalApply() { .map(PartitionSpec::specId) .filter(specId -> !reachableSpecs.contains(specId)) .collect(Collectors.toSet()); - updatedMetaBuilder.removeUnusedSpecs(specsToRemove); + updatedMetaBuilder.removeSpecIds(specsToRemove); } return updatedMetaBuilder.build(); diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java b/core/src/main/java/org/apache/iceberg/TableMetadata.java index fc7ed897392a..5bda5a7519a2 100644 --- a/core/src/main/java/org/apache/iceberg/TableMetadata.java +++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java @@ -1108,7 +1108,7 @@ public Builder setDefaultPartitionSpec(int specId) { return this; } - Builder removeUnusedSpecs(Iterable specIds) { + Builder removeSpecIds(Iterable specIds) { Set specIdsToRemove = Sets.newHashSet(); for (Integer specId : specIds) { Preconditions.checkArgument( @@ -1118,6 +1118,7 @@ Builder removeUnusedSpecs(Iterable specIds) { specIdsToRemove.add(specId); } } + this.specs = specs.stream() .filter(s -> !specIdsToRemove.contains(s.specId())) diff --git a/core/src/main/java/org/apache/iceberg/UpdateRequirements.java b/core/src/main/java/org/apache/iceberg/UpdateRequirements.java index 4a44a53641d3..95369d51934d 100644 --- a/core/src/main/java/org/apache/iceberg/UpdateRequirements.java +++ b/core/src/main/java/org/apache/iceberg/UpdateRequirements.java @@ -183,7 +183,8 @@ private void update(MetadataUpdate.RemovePartitionSpecs unused) { } this.setSpecId = true; } - // require that all the branch has not changed, so that old specs won't be written. + + // require that no branches have changed, so that old specs won't be written. if (base != null && !isReplace) { base.refs() .forEach( diff --git a/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java b/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java index 7c81fcba8491..2148b4b3c801 100644 --- a/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java +++ b/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java @@ -1660,7 +1660,7 @@ public void testRemoveSpecDuringExpiration() { // Expiring snapshots should remove the data_bucket partition removeSnapshots(table) .expireOlderThan(System.currentTimeMillis()) - .cleanExpiredMeta(true) + .cleanExpiredMetadata(true) .deleteWith(deletedFiles::add) .commit(); @@ -1695,7 +1695,7 @@ public void testRemoveSpecsDoesntRemoveDefaultSpec() throws IOException { // Expiring snapshots should remove only the unpartitioned spec removeSnapshots(table) .expireOlderThan(System.currentTimeMillis()) - .cleanExpiredMeta(true) + .cleanExpiredMetadata(true) .deleteWith(deletedFiles::add) .commit(); diff --git a/core/src/test/java/org/apache/iceberg/TestUpdateRequirements.java b/core/src/test/java/org/apache/iceberg/TestUpdateRequirements.java index f9a3918e3dab..dd8b4da0c2d8 100644 --- a/core/src/test/java/org/apache/iceberg/TestUpdateRequirements.java +++ b/core/src/test/java/org/apache/iceberg/TestUpdateRequirements.java @@ -425,16 +425,15 @@ public void setDefaultPartitionSpecFailure() { } @Test - public void testRemovePartitionSpec() { + public void removePartitionSpec() { int defaultSpecId = 3; when(metadata.defaultSpecId()).thenReturn(defaultSpecId); - // empty refs - when(metadata.refs()).thenReturn(ImmutableMap.of()); List requirements = UpdateRequirements.forUpdateTable( metadata, ImmutableList.of(new MetadataUpdate.RemovePartitionSpecs(Sets.newHashSet(1, 2)))); + requirements.forEach(req -> req.validate(metadata)); assertThat(requirements) .hasSize(2) @@ -461,11 +460,13 @@ public void testRemovePartitionSpecsWithRefs() { when(snapshotRef.snapshotId()).thenReturn(snapshotId); when(snapshotRef.isBranch()).thenReturn(true); when(metadata.refs()).thenReturn(ImmutableMap.of(branch, snapshotRef)); + when(metadata.ref(branch)).thenReturn(snapshotRef); List requirements = UpdateRequirements.forUpdateTable( metadata, ImmutableList.of(new MetadataUpdate.RemovePartitionSpecs(Sets.newHashSet(1, 2)))); + requirements.forEach(req -> req.validate(metadata)); assertThat(requirements) .hasSize(3) diff --git a/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java b/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java index 903718070866..b6dd131c1fb3 100644 --- a/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java +++ b/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java @@ -1284,8 +1284,10 @@ public void testUpdateTableSpecThenRevert() { assertThat(table.spec()).as("Loaded table should have expected spec").isEqualTo(TABLE_SPEC); } - @Test - public void testRemoveUnusedSpec() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testRemoveUnusedSpec(boolean withBranch) { + String branch = "test"; C catalog = catalog(); if (requiresNamespaceCreate()) { @@ -1299,17 +1301,43 @@ public void testRemoveUnusedSpec() { .withProperty(TableProperties.GC_ENABLED, "true") .create(); PartitionSpec spec = table.spec(); - // added some file to trigger expire snapshot + // added a file to trigger snapshot expiration table.newFastAppend().appendFile(FILE_A).commit(); + if (withBranch) { + table.manageSnapshots().createBranch(branch).commit(); + } table.updateSpec().addField(Expressions.bucket("data", 16)).commit(); table.updateSpec().removeField(Expressions.bucket("data", 16)).commit(); table.updateSpec().addField("data").commit(); assertThat(table.specs()).as("Should have 3 total specs").hasSize(3); PartitionSpec current = table.spec(); - table.expireSnapshots().cleanExpiredMeta(true).commit(); + table.expireSnapshots().cleanExpiredMetadata(true).commit(); Table loaded = catalog.loadTable(TABLE); - assertThat(loaded.specs().values()).containsExactly(spec, current); + assertThat(loaded.specs().values()).containsExactlyInAnyOrder(spec, current); + + // add a data file with current spec and remove the old data file + table.newDelete().deleteFile(FILE_A).commit(); + DataFile anotherFile = + DataFiles.builder(current) + .withPath("/path/to/data-b.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("id_bucket=0/data=123") // easy way to set partition data for now + .withRecordCount(2) // needs at least one record or else metrics will filter it out + .build(); + table.newAppend().appendFile(anotherFile).commit(); + table + .expireSnapshots() + .cleanExpiredFiles(false) + .expireOlderThan(table.currentSnapshot().timestampMillis()) + .cleanExpiredMetadata(true) + .commit(); + loaded = catalog.loadTable(TABLE); + if (withBranch) { + assertThat(loaded.specs().values()).containsExactlyInAnyOrder(spec, current); + } else { + assertThat(loaded.specs().values()).containsExactlyInAnyOrder(current); + } } @Test From c02354ecdf51a64139efb3734ca5c21256e28d22 Mon Sep 17 00:00:00 2001 From: advancedxy <807537+advancedxy@users.noreply.github.com> Date: Thu, 28 Nov 2024 00:36:16 +0800 Subject: [PATCH 08/11] address comments --- .../iceberg/TestUpdateRequirements.java | 33 ++++++++++++++++++- 1 file changed, 32 insertions(+), 1 deletion(-) diff --git a/core/src/test/java/org/apache/iceberg/TestUpdateRequirements.java b/core/src/test/java/org/apache/iceberg/TestUpdateRequirements.java index dd8b4da0c2d8..de4d266318db 100644 --- a/core/src/test/java/org/apache/iceberg/TestUpdateRequirements.java +++ b/core/src/test/java/org/apache/iceberg/TestUpdateRequirements.java @@ -450,7 +450,7 @@ public void removePartitionSpec() { } @Test - public void testRemovePartitionSpecsWithRefs() { + public void testRemovePartitionSpecsWithBranch() { int defaultSpecId = 3; long snapshotId = 42L; when(metadata.defaultSpecId()).thenReturn(defaultSpecId); @@ -508,6 +508,37 @@ public void testRemovePartitionSpecsFailure() { defaultSpecId, defaultSpecId + 1); } + @Test + public void testRemovePartitionSpecsWithBranchFailure() { + int defaultSpecId = 3; + long snapshotId = 42L; + when(metadata.defaultSpecId()).thenReturn(defaultSpecId); + when(updated.defaultSpecId()).thenReturn(defaultSpecId); + + String branch = "test"; + SnapshotRef snapshotRef = mock(SnapshotRef.class); + when(snapshotRef.snapshotId()).thenReturn(snapshotId); + when(snapshotRef.isBranch()).thenReturn(true); + when(metadata.refs()).thenReturn(ImmutableMap.of(branch, snapshotRef)); + when(metadata.ref(branch)).thenReturn(snapshotRef); + + SnapshotRef updatedRef = mock(SnapshotRef.class); + when(updatedRef.snapshotId()).thenReturn(snapshotId + 1); + when(updatedRef.isBranch()).thenReturn(true); + when(updated.ref(branch)).thenReturn(updatedRef); + + List requirements = + UpdateRequirements.forUpdateTable( + metadata, + ImmutableList.of(new MetadataUpdate.RemovePartitionSpecs(Sets.newHashSet(1, 2)))); + + assertThatThrownBy(() -> requirements.forEach(req -> req.validate(updated))) + .isInstanceOf(CommitFailedException.class) + .hasMessage( + "Requirement failed: branch %s has changed: expected id %s != %s", + branch, snapshotId, snapshotId + 1); + } + @Test public void addSortOrder() { List requirements = From 323d43ae1f354440fe976743ff9bd717d513f8b6 Mon Sep 17 00:00:00 2001 From: advancedxy <807537+advancedxy@users.noreply.github.com> Date: Wed, 4 Dec 2024 10:34:46 +0800 Subject: [PATCH 09/11] Address comments --- api/src/main/java/org/apache/iceberg/ExpireSnapshots.java | 7 ++++--- core/src/main/java/org/apache/iceberg/MetadataUpdate.java | 2 +- core/src/main/java/org/apache/iceberg/RemoveSnapshots.java | 2 +- core/src/main/java/org/apache/iceberg/TableMetadata.java | 2 +- 4 files changed, 7 insertions(+), 6 deletions(-) diff --git a/api/src/main/java/org/apache/iceberg/ExpireSnapshots.java b/api/src/main/java/org/apache/iceberg/ExpireSnapshots.java index 08f284a5dfc9..8efa424d5ce3 100644 --- a/api/src/main/java/org/apache/iceberg/ExpireSnapshots.java +++ b/api/src/main/java/org/apache/iceberg/ExpireSnapshots.java @@ -120,10 +120,11 @@ public interface ExpireSnapshots extends PendingUpdate> { ExpireSnapshots cleanExpiredFiles(boolean clean); /** - * Allows expiration of unreachable metadata, such as partition spec as part of the operation. + * Allows expiration of unreachable table layout metadata, such as partition specs as part of the + * operation. * - * @param clean setting this to true will remove metadata(such as partition spec) that is no - * longer reachable by any snapshot + * @param clean setting this to true will remove table layout metadata that is no longer reachable + * by any snapshot * @return this for method chaining */ default ExpireSnapshots cleanExpiredMetadata(boolean clean) { diff --git a/core/src/main/java/org/apache/iceberg/MetadataUpdate.java b/core/src/main/java/org/apache/iceberg/MetadataUpdate.java index 548e661c62b4..d697df6a4fc6 100644 --- a/core/src/main/java/org/apache/iceberg/MetadataUpdate.java +++ b/core/src/main/java/org/apache/iceberg/MetadataUpdate.java @@ -178,7 +178,7 @@ public Set specIds() { @Override public void applyTo(TableMetadata.Builder metadataBuilder) { - metadataBuilder.removeSpecIds(specIds); + metadataBuilder.removeSpecs(specIds); } } diff --git a/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java b/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java index 994bde8f589e..c437cedad1f3 100644 --- a/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java +++ b/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java @@ -233,7 +233,7 @@ private TableMetadata internalApply() { .map(PartitionSpec::specId) .filter(specId -> !reachableSpecs.contains(specId)) .collect(Collectors.toSet()); - updatedMetaBuilder.removeSpecIds(specsToRemove); + updatedMetaBuilder.removeSpecs(specsToRemove); } return updatedMetaBuilder.build(); diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java b/core/src/main/java/org/apache/iceberg/TableMetadata.java index 5bda5a7519a2..e4ab61451458 100644 --- a/core/src/main/java/org/apache/iceberg/TableMetadata.java +++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java @@ -1108,7 +1108,7 @@ public Builder setDefaultPartitionSpec(int specId) { return this; } - Builder removeSpecIds(Iterable specIds) { + Builder removeSpecs(Iterable specIds) { Set specIdsToRemove = Sets.newHashSet(); for (Integer specId : specIds) { Preconditions.checkArgument( From 5f08dfa0205364282afd0c006ada59d01da1af8d Mon Sep 17 00:00:00 2001 From: advancedxy <807537+advancedxy@users.noreply.github.com> Date: Wed, 18 Dec 2024 22:40:25 +0800 Subject: [PATCH 10/11] address comments --- api/src/main/java/org/apache/iceberg/ExpireSnapshots.java | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/api/src/main/java/org/apache/iceberg/ExpireSnapshots.java b/api/src/main/java/org/apache/iceberg/ExpireSnapshots.java index 8efa424d5ce3..15a141eb8c2c 100644 --- a/api/src/main/java/org/apache/iceberg/ExpireSnapshots.java +++ b/api/src/main/java/org/apache/iceberg/ExpireSnapshots.java @@ -120,15 +120,13 @@ public interface ExpireSnapshots extends PendingUpdate> { ExpireSnapshots cleanExpiredFiles(boolean clean); /** - * Allows expiration of unreachable table layout metadata, such as partition specs as part of the - * operation. + * Enable cleaning up unused metadata, such as partition specs, schemas, etc. * - * @param clean setting this to true will remove table layout metadata that is no longer reachable - * by any snapshot + * @param clean remove unused partition specs, schemas, or other metadata when true * @return this for method chaining */ default ExpireSnapshots cleanExpiredMetadata(boolean clean) { throw new UnsupportedOperationException( - this.getClass().getName() + " doesn't implement cleanExpiredMeta"); + this.getClass().getName() + " doesn't implement cleanExpiredMetadata"); } } From 989beaa907359837b75530e814df1fb65d8b48ef Mon Sep 17 00:00:00 2001 From: advancedxy <807537+advancedxy@users.noreply.github.com> Date: Fri, 20 Dec 2024 15:19:18 +0800 Subject: [PATCH 11/11] Address comments --- .../java/org/apache/iceberg/RemoveSnapshots.java | 1 + .../main/java/org/apache/iceberg/TableMetadata.java | 12 +++--------- 2 files changed, 4 insertions(+), 9 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java b/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java index c437cedad1f3..0cc89433413d 100644 --- a/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java +++ b/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java @@ -228,6 +228,7 @@ private TableMetadata internalApply() { base.snapshot(snapshot).allManifests(ops.io()).stream() .map(ManifestFile::partitionSpecId) .forEach(reachableSpecs::add)); + Set specsToRemove = base.specs().stream() .map(PartitionSpec::specId) diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java b/core/src/main/java/org/apache/iceberg/TableMetadata.java index e4ab61451458..c72a599d18b2 100644 --- a/core/src/main/java/org/apache/iceberg/TableMetadata.java +++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java @@ -1109,15 +1109,9 @@ public Builder setDefaultPartitionSpec(int specId) { } Builder removeSpecs(Iterable specIds) { - Set specIdsToRemove = Sets.newHashSet(); - for (Integer specId : specIds) { - Preconditions.checkArgument( - specId != defaultSpecId, "Cannot remove default partition spec"); - PartitionSpec toBeRemoved = specsById.remove(specId); - if (toBeRemoved != null) { - specIdsToRemove.add(specId); - } - } + Set specIdsToRemove = Sets.newHashSet(specIds); + Preconditions.checkArgument( + !specIdsToRemove.contains(defaultSpecId), "Cannot remove the default partition spec"); this.specs = specs.stream()