Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

API: Support removeUnusedSpecs in ExpireSnapshots #10755

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
12 changes: 12 additions & 0 deletions api/src/main/java/org/apache/iceberg/ExpireSnapshots.java
Original file line number Diff line number Diff line change
Expand Up @@ -118,4 +118,16 @@ public interface ExpireSnapshots extends PendingUpdate<List<Snapshot>> {
* @return this for method chaining
*/
ExpireSnapshots cleanExpiredFiles(boolean clean);

/**
* Allows expiration of unreachable metadata, such as partition spec as part of the operation.
*
* @param clean setting this to true will remove metadata(such as partition spec) that is no
* longer reachable by any snapshot
* @return this for method chaining
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  /**
   * Allows expiration of unreachable table layout metadata, such as partition specs as part of the operation.
   *
   * @param setting this to true will remove table layout metadata that is no
   *     longer reachable by any snapshot
   * @return this for method chaining

Copy link
Contributor

@amogh-jahagirdar amogh-jahagirdar Dec 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically don't think we need to mention the "such as partition spec" twice, I think once at the top level and clarifying that we're talking about layout metadata like the spec should be sufficient imo.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, fixed.

*/
default ExpireSnapshots cleanExpiredMetadata(boolean clean) {
throw new UnsupportedOperationException(
this.getClass().getName() + " doesn't implement cleanExpiredMeta");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -256,7 +257,8 @@ public void cleanFiles(TableMetadata beforeExpiration, TableMetadata afterExpira
});

Set<String> filesToDelete =
findFilesToDelete(manifestsToScan, manifestsToRevert, validIds, afterExpiration);
findFilesToDelete(
manifestsToScan, manifestsToRevert, validIds, beforeExpiration.specsById());

deleteFiles(filesToDelete, "data");
deleteFiles(manifestsToDelete, "manifest");
Expand All @@ -273,7 +275,7 @@ private Set<String> findFilesToDelete(
Set<ManifestFile> manifestsToScan,
Set<ManifestFile> manifestsToRevert,
Set<Long> validIds,
TableMetadata current) {
Map<Integer, PartitionSpec> specsById) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for fixing this. In general we should avoid passing huge metadata objects around to helper methods that don't need everything!

Set<String> filesToDelete = ConcurrentHashMap.newKeySet();
Tasks.foreach(manifestsToScan)
.retry(3)
Expand All @@ -285,8 +287,7 @@ private Set<String> findFilesToDelete(
.run(
manifest -> {
// the manifest has deletes, scan it to find files to delete
try (ManifestReader<?> reader =
ManifestFiles.open(manifest, fileIO, current.specsById())) {
try (ManifestReader<?> reader = ManifestFiles.open(manifest, fileIO, specsById)) {
for (ManifestEntry<?> entry : reader.entries()) {
// if the snapshot ID of the DELETE entry is no longer valid, the data can be
// deleted
Expand All @@ -311,8 +312,7 @@ private Set<String> findFilesToDelete(
.run(
manifest -> {
// the manifest has deletes, scan it to find files to delete
try (ManifestReader<?> reader =
ManifestFiles.open(manifest, fileIO, current.specsById())) {
try (ManifestReader<?> reader = ManifestFiles.open(manifest, fileIO, specsById)) {
for (ManifestEntry<?> entry : reader.entries()) {
// delete any ADDED file from manifests that were reverted
if (entry.status() == ManifestEntry.Status.ADDED) {
Expand Down
17 changes: 17 additions & 0 deletions core/src/main/java/org/apache/iceberg/MetadataUpdate.java
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,23 @@ public void applyTo(TableMetadata.Builder metadataBuilder) {
}
}

class RemovePartitionSpecs implements MetadataUpdate {
private final Set<Integer> specIds;

public RemovePartitionSpecs(Set<Integer> specIds) {
this.specIds = specIds;
}

public Set<Integer> specIds() {
return specIds;
}

@Override
public void applyTo(TableMetadata.Builder metadataBuilder) {
metadataBuilder.removeSpecIds(specIds);
}
}

class AddSortOrder implements MetadataUpdate {
private final UnboundSortOrder sortOrder;

Expand Down
19 changes: 19 additions & 0 deletions core/src/main/java/org/apache/iceberg/MetadataUpdateParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ private MetadataUpdateParser() {}
static final String SET_CURRENT_VIEW_VERSION = "set-current-view-version";
static final String SET_PARTITION_STATISTICS = "set-partition-statistics";
static final String REMOVE_PARTITION_STATISTICS = "remove-partition-statistics";
static final String REMOVE_PARTITION_SPECS = "remove-partition-specs";

// AssignUUID
private static final String UUID = "uuid";
Expand Down Expand Up @@ -126,6 +127,9 @@ private MetadataUpdateParser() {}
// SetCurrentViewVersion
private static final String VIEW_VERSION_ID = "view-version-id";

// RemovePartitionSpecs
private static final String SPEC_IDS = "spec-ids";

private static final Map<Class<? extends MetadataUpdate>, String> ACTIONS =
ImmutableMap.<Class<? extends MetadataUpdate>, String>builder()
.put(MetadataUpdate.AssignUUID.class, ASSIGN_UUID)
Expand All @@ -149,6 +153,7 @@ private MetadataUpdateParser() {}
.put(MetadataUpdate.SetLocation.class, SET_LOCATION)
.put(MetadataUpdate.AddViewVersion.class, ADD_VIEW_VERSION)
.put(MetadataUpdate.SetCurrentViewVersion.class, SET_CURRENT_VIEW_VERSION)
.put(MetadataUpdate.RemovePartitionSpecs.class, REMOVE_PARTITION_SPECS)
.buildOrThrow();

public static String toJson(MetadataUpdate metadataUpdate) {
Expand Down Expand Up @@ -241,6 +246,9 @@ public static void toJson(MetadataUpdate metadataUpdate, JsonGenerator generator
writeSetCurrentViewVersionId(
(MetadataUpdate.SetCurrentViewVersion) metadataUpdate, generator);
break;
case REMOVE_PARTITION_SPECS:
writeRemovePartitionSpecs((MetadataUpdate.RemovePartitionSpecs) metadataUpdate, generator);
advancedxy marked this conversation as resolved.
Show resolved Hide resolved
break;
default:
throw new IllegalArgumentException(
String.format(
Expand Down Expand Up @@ -312,6 +320,8 @@ public static MetadataUpdate fromJson(JsonNode jsonNode) {
return readAddViewVersion(jsonNode);
case SET_CURRENT_VIEW_VERSION:
return readCurrentViewVersionId(jsonNode);
case REMOVE_PARTITION_SPECS:
return readRemovePartitionSpecs(jsonNode);
default:
throw new UnsupportedOperationException(
String.format("Cannot convert metadata update action to json: %s", action));
Expand Down Expand Up @@ -447,6 +457,11 @@ private static void writeSetCurrentViewVersionId(
gen.writeNumberField(VIEW_VERSION_ID, metadataUpdate.versionId());
}

private static void writeRemovePartitionSpecs(
MetadataUpdate.RemovePartitionSpecs metadataUpdate, JsonGenerator gen) throws IOException {
JsonUtil.writeIntegerArray(SPEC_IDS, metadataUpdate.specIds(), gen);
}

private static MetadataUpdate readAssignUUID(JsonNode node) {
String uuid = JsonUtil.getString(UUID, node);
return new MetadataUpdate.AssignUUID(uuid);
Expand Down Expand Up @@ -596,4 +611,8 @@ private static MetadataUpdate readAddViewVersion(JsonNode node) {
private static MetadataUpdate readCurrentViewVersionId(JsonNode node) {
return new MetadataUpdate.SetCurrentViewVersion(JsonUtil.getInt(VIEW_VERSION_ID, node));
}

private static MetadataUpdate readRemovePartitionSpecs(JsonNode node) {
return new MetadataUpdate.RemovePartitionSpecs(JsonUtil.getIntegerSet(SPEC_IDS, node));
}
}
27 changes: 27 additions & 0 deletions core/src/main/java/org/apache/iceberg/RemoveSnapshots.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
Expand Down Expand Up @@ -85,6 +86,7 @@ public void accept(String file) {
private ExecutorService planExecutorService = ThreadPools.getWorkerPool();
private Boolean incrementalCleanup;
private boolean specifiedSnapshotId = false;
private boolean cleanExpiredMetadata = false;

RemoveSnapshots(TableOperations ops) {
this.ops = ops;
Expand Down Expand Up @@ -159,6 +161,12 @@ public ExpireSnapshots planWith(ExecutorService executorService) {
return this;
}

@Override
public ExpireSnapshots cleanExpiredMetadata(boolean clean) {
this.cleanExpiredMetadata = clean;
return this;
}

@Override
public List<Snapshot> apply() {
TableMetadata updated = internalApply();
Expand Down Expand Up @@ -209,6 +217,25 @@ private TableMetadata internalApply() {
.forEach(idsToRemove::add);
updatedMetaBuilder.removeSnapshots(idsToRemove);

if (cleanExpiredMetadata) {
// TODO: Support cleaning expired schema as well.
Set<Integer> reachableSpecs = Sets.newConcurrentHashSet();
reachableSpecs.add(base.defaultSpecId());
Tasks.foreach(idsToRetain)
.executeWith(planExecutorService)
.run(
snapshot ->
base.snapshot(snapshot).allManifests(ops.io()).stream()
.map(ManifestFile::partitionSpecId)
.forEach(reachableSpecs::add));
Set<Integer> specsToRemove =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: newline after a loop.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addressed.

base.specs().stream()
.map(PartitionSpec::specId)
.filter(specId -> !reachableSpecs.contains(specId))
.collect(Collectors.toSet());
updatedMetaBuilder.removeSpecIds(specsToRemove);
}

return updatedMetaBuilder.build();
}

Expand Down
19 changes: 19 additions & 0 deletions core/src/main/java/org/apache/iceberg/TableMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -1108,6 +1108,25 @@ public Builder setDefaultPartitionSpec(int specId) {
return this;
}

Builder removeSpecIds(Iterable<Integer> specIds) {
Copy link
Contributor

@amogh-jahagirdar amogh-jahagirdar Dec 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I'd prefer this be called removeSpecs. If we look at the other APIs on TableMetadata builder like setDefaultPartitionSpec(int specId) it aligns with that naming, I don't think we need the "Ids" suffix. In case we ever need a removeSpecs which takes in actual PartitionSpec we'd just overload at that time just like setDefaultPartitionSpec

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not super opinionated here though, I remember the early discussion we had where I also mentioned removeSpecIds but this is just something I noticed when taking a look just now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated too. I do think removeSpecs is more consistent with other APIs in the TableMetadata builder.

Set<Integer> specIdsToRemove = Sets.newHashSet();
for (Integer specId : specIds) {
Preconditions.checkArgument(
specId != defaultSpecId, "Cannot remove default partition spec");
PartitionSpec toBeRemoved = specsById.remove(specId);
if (toBeRemoved != null) {
specIdsToRemove.add(specId);
}
}
advancedxy marked this conversation as resolved.
Show resolved Hide resolved

this.specs =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we be validating that all the specs we are trying to remove exist? I think this may be fine, just thinking about commit conflicts

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we be validating that all the specs we are trying to remove exist?

I think so, and L1129 - L1133 indeed ensures the spec to be removed are existed in current table metadata.

I think this may be fine, just thinking about commit conflicts

If you are talking about concurrent RemovedUnusedSpecs, I think it's fine and reasonable to have only one attempt succeeded once at time. For other concurrent commit, there's no conflict and retrying should do the trick.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general I'm in favor of succeeding in cases like this. There's no need to fail something if it has an accidental retry after the first attempt succeeded.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's no need to fail something if it has an accidental retry after the first attempt succeeded.

This is a very good point, let me reconsider this part.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in 190dde6, removing an already removed unused spec should not cause a failure then.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that case, is it really necessary to validate that a spec was removed and filter the list? If it is a noop to remove an unknown spec ID, then we can simplify the first half of this method:

    Preconditions.checkArgument(!specIds.contains(defaultSpecId), "Cannot remove the default partition spec");

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated, let me know if this is the desired change.

specs.stream()
.filter(s -> !specIdsToRemove.contains(s.specId()))
.collect(Collectors.toList());
changes.add(new MetadataUpdate.RemovePartitionSpecs(specIdsToRemove));
return this;
}

public Builder addPartitionSpec(UnboundPartitionSpec spec) {
addPartitionSpecInternal(spec.bind(schemasById.get(currentSchemaId)));
return this;
Expand Down
23 changes: 23 additions & 0 deletions core/src/main/java/org/apache/iceberg/UpdateRequirements.java
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ private Builder update(MetadataUpdate update) {
update((MetadataUpdate.SetDefaultPartitionSpec) update);
} else if (update instanceof MetadataUpdate.SetDefaultSortOrder) {
update((MetadataUpdate.SetDefaultSortOrder) update);
} else if (update instanceof MetadataUpdate.RemovePartitionSpecs) {
update((MetadataUpdate.RemovePartitionSpecs) update);
}

return this;
Expand Down Expand Up @@ -173,6 +175,27 @@ private void update(MetadataUpdate.SetDefaultSortOrder unused) {
}
}

private void update(MetadataUpdate.RemovePartitionSpecs unused) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add a test to TestUpdateRequirements

// require that the default partition spec has not changed
if (!setSpecId) {
if (base != null && !isReplace) {
require(new UpdateRequirement.AssertDefaultSpecID(base.defaultSpecId()));
}
this.setSpecId = true;
}

// require that no branches have changed, so that old specs won't be written.
if (base != null && !isReplace) {
base.refs()
.forEach(
(name, ref) -> {
if (ref.isBranch() && !name.equals(SnapshotRef.MAIN_BRANCH)) {
require(new UpdateRequirement.AssertRefSnapshotID(name, ref.snapshotId()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should probably also have a test in TestUpdateRequirements that actually changes the branch and eventually fails

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in addition to this, CatalogTests currently only tests the happy path, but not the exceptions that are either thrown by AssertDefaultSpecID or AssertRefSnapshotID. We need to test both cases in CatalogTests

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated the TestUpdateRequirements. However, it's hard to test the failure cases in CatalogTests: before committing, the internalApply also refreshes the base table, so we cannot create a concurrent update to base table easily, which has to be after the refresh and before commit. Do you have any suggestions?

}
});
}
}

private List<UpdateRequirement> build() {
return requirements.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -926,6 +926,17 @@ public void testRemovePartitionStatistics() {
.isEqualTo(json);
}

@Test
public void testRemovePartitionSpec() {
String action = MetadataUpdateParser.REMOVE_PARTITION_SPECS;
String json = "{\"action\":\"remove-partition-specs\",\"spec-ids\":[1,2,3]}";
MetadataUpdate expected = new MetadataUpdate.RemovePartitionSpecs(ImmutableSet.of(1, 2, 3));
assertEquals(action, expected, MetadataUpdateParser.fromJson(json));
assertThat(MetadataUpdateParser.toJson(expected))
.as("Remove partition specs should convert to the correct JSON value")
.isEqualTo(json);
}

public void assertEquals(
String action, MetadataUpdate expectedUpdate, MetadataUpdate actualUpdate) {
switch (action) {
Expand Down Expand Up @@ -1030,6 +1041,11 @@ public void assertEquals(
(MetadataUpdate.SetCurrentViewVersion) expectedUpdate,
(MetadataUpdate.SetCurrentViewVersion) actualUpdate);
break;
case MetadataUpdateParser.REMOVE_PARTITION_SPECS:
assertEqualsRemovePartitionSpecs(
(MetadataUpdate.RemovePartitionSpecs) expectedUpdate,
(MetadataUpdate.RemovePartitionSpecs) actualUpdate);
break;
default:
fail("Unrecognized metadata update action: " + action);
}
Expand Down Expand Up @@ -1251,6 +1267,11 @@ private static void assertEqualsSetCurrentViewVersion(
assertThat(actual.versionId()).isEqualTo(expected.versionId());
}

private static void assertEqualsRemovePartitionSpecs(
MetadataUpdate.RemovePartitionSpecs expected, MetadataUpdate.RemovePartitionSpecs actual) {
assertThat(actual.specIds()).containsExactlyInAnyOrderElementsOf(expected.specIds());
}

private String createManifestListWithManifestFiles(long snapshotId, Long parentSnapshotId)
throws IOException {
File manifestList = File.createTempFile("manifests", null, temp.toFile());
Expand Down
Loading