Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

API: Support removeUnusedSpecs in ExpireSnapshots #10755

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
13 changes: 13 additions & 0 deletions api/src/main/java/org/apache/iceberg/ExpireSnapshots.java
Original file line number Diff line number Diff line change
Expand Up @@ -118,4 +118,17 @@ public interface ExpireSnapshots extends PendingUpdate<List<Snapshot>> {
* @return this for method chaining
*/
ExpireSnapshots cleanExpiredFiles(boolean clean);

/**
* Allows expiration of unreachable table layout metadata, such as partition specs as part of the
* operation.
*
* @param clean setting this to true will remove table layout metadata that is no longer reachable
* by any snapshot
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit verbose. I'd recommend being more direct and not making it a full sentence. Something like:

remove unused partition specs, schemas, or other metadata when true

The same could apply to the method description:

Enable cleaning up unused partition specs, schemas, or other metadata.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, Updated.

* @return this for method chaining
*/
default ExpireSnapshots cleanExpiredMetadata(boolean clean) {
throw new UnsupportedOperationException(
this.getClass().getName() + " doesn't implement cleanExpiredMeta");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -256,7 +257,8 @@ public void cleanFiles(TableMetadata beforeExpiration, TableMetadata afterExpira
});

Set<String> filesToDelete =
findFilesToDelete(manifestsToScan, manifestsToRevert, validIds, afterExpiration);
findFilesToDelete(
manifestsToScan, manifestsToRevert, validIds, beforeExpiration.specsById());

deleteFiles(filesToDelete, "data");
deleteFiles(manifestsToDelete, "manifest");
Expand All @@ -273,7 +275,7 @@ private Set<String> findFilesToDelete(
Set<ManifestFile> manifestsToScan,
Set<ManifestFile> manifestsToRevert,
Set<Long> validIds,
TableMetadata current) {
Map<Integer, PartitionSpec> specsById) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for fixing this. In general we should avoid passing huge metadata objects around to helper methods that don't need everything!

Set<String> filesToDelete = ConcurrentHashMap.newKeySet();
Tasks.foreach(manifestsToScan)
.retry(3)
Expand All @@ -285,8 +287,7 @@ private Set<String> findFilesToDelete(
.run(
manifest -> {
// the manifest has deletes, scan it to find files to delete
try (ManifestReader<?> reader =
ManifestFiles.open(manifest, fileIO, current.specsById())) {
try (ManifestReader<?> reader = ManifestFiles.open(manifest, fileIO, specsById)) {
for (ManifestEntry<?> entry : reader.entries()) {
// if the snapshot ID of the DELETE entry is no longer valid, the data can be
// deleted
Expand All @@ -311,8 +312,7 @@ private Set<String> findFilesToDelete(
.run(
manifest -> {
// the manifest has deletes, scan it to find files to delete
try (ManifestReader<?> reader =
ManifestFiles.open(manifest, fileIO, current.specsById())) {
try (ManifestReader<?> reader = ManifestFiles.open(manifest, fileIO, specsById)) {
for (ManifestEntry<?> entry : reader.entries()) {
// delete any ADDED file from manifests that were reverted
if (entry.status() == ManifestEntry.Status.ADDED) {
Expand Down
17 changes: 17 additions & 0 deletions core/src/main/java/org/apache/iceberg/MetadataUpdate.java
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,23 @@ public void applyTo(TableMetadata.Builder metadataBuilder) {
}
}

class RemovePartitionSpecs implements MetadataUpdate {
private final Set<Integer> specIds;

public RemovePartitionSpecs(Set<Integer> specIds) {
this.specIds = specIds;
}

public Set<Integer> specIds() {
return specIds;
}

@Override
public void applyTo(TableMetadata.Builder metadataBuilder) {
metadataBuilder.removeSpecs(specIds);
}
}

class AddSortOrder implements MetadataUpdate {
private final UnboundSortOrder sortOrder;

Expand Down
19 changes: 19 additions & 0 deletions core/src/main/java/org/apache/iceberg/MetadataUpdateParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ private MetadataUpdateParser() {}
static final String SET_CURRENT_VIEW_VERSION = "set-current-view-version";
static final String SET_PARTITION_STATISTICS = "set-partition-statistics";
static final String REMOVE_PARTITION_STATISTICS = "remove-partition-statistics";
static final String REMOVE_PARTITION_SPECS = "remove-partition-specs";

// AssignUUID
private static final String UUID = "uuid";
Expand Down Expand Up @@ -126,6 +127,9 @@ private MetadataUpdateParser() {}
// SetCurrentViewVersion
private static final String VIEW_VERSION_ID = "view-version-id";

// RemovePartitionSpecs
private static final String SPEC_IDS = "spec-ids";

private static final Map<Class<? extends MetadataUpdate>, String> ACTIONS =
ImmutableMap.<Class<? extends MetadataUpdate>, String>builder()
.put(MetadataUpdate.AssignUUID.class, ASSIGN_UUID)
Expand All @@ -149,6 +153,7 @@ private MetadataUpdateParser() {}
.put(MetadataUpdate.SetLocation.class, SET_LOCATION)
.put(MetadataUpdate.AddViewVersion.class, ADD_VIEW_VERSION)
.put(MetadataUpdate.SetCurrentViewVersion.class, SET_CURRENT_VIEW_VERSION)
.put(MetadataUpdate.RemovePartitionSpecs.class, REMOVE_PARTITION_SPECS)
.buildOrThrow();

public static String toJson(MetadataUpdate metadataUpdate) {
Expand Down Expand Up @@ -241,6 +246,9 @@ public static void toJson(MetadataUpdate metadataUpdate, JsonGenerator generator
writeSetCurrentViewVersionId(
(MetadataUpdate.SetCurrentViewVersion) metadataUpdate, generator);
break;
case REMOVE_PARTITION_SPECS:
writeRemovePartitionSpecs((MetadataUpdate.RemovePartitionSpecs) metadataUpdate, generator);
advancedxy marked this conversation as resolved.
Show resolved Hide resolved
break;
default:
throw new IllegalArgumentException(
String.format(
Expand Down Expand Up @@ -312,6 +320,8 @@ public static MetadataUpdate fromJson(JsonNode jsonNode) {
return readAddViewVersion(jsonNode);
case SET_CURRENT_VIEW_VERSION:
return readCurrentViewVersionId(jsonNode);
case REMOVE_PARTITION_SPECS:
return readRemovePartitionSpecs(jsonNode);
default:
throw new UnsupportedOperationException(
String.format("Cannot convert metadata update action to json: %s", action));
Expand Down Expand Up @@ -447,6 +457,11 @@ private static void writeSetCurrentViewVersionId(
gen.writeNumberField(VIEW_VERSION_ID, metadataUpdate.versionId());
}

private static void writeRemovePartitionSpecs(
MetadataUpdate.RemovePartitionSpecs metadataUpdate, JsonGenerator gen) throws IOException {
JsonUtil.writeIntegerArray(SPEC_IDS, metadataUpdate.specIds(), gen);
}

private static MetadataUpdate readAssignUUID(JsonNode node) {
String uuid = JsonUtil.getString(UUID, node);
return new MetadataUpdate.AssignUUID(uuid);
Expand Down Expand Up @@ -596,4 +611,8 @@ private static MetadataUpdate readAddViewVersion(JsonNode node) {
private static MetadataUpdate readCurrentViewVersionId(JsonNode node) {
return new MetadataUpdate.SetCurrentViewVersion(JsonUtil.getInt(VIEW_VERSION_ID, node));
}

private static MetadataUpdate readRemovePartitionSpecs(JsonNode node) {
return new MetadataUpdate.RemovePartitionSpecs(JsonUtil.getIntegerSet(SPEC_IDS, node));
}
}
27 changes: 27 additions & 0 deletions core/src/main/java/org/apache/iceberg/RemoveSnapshots.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
Expand Down Expand Up @@ -85,6 +86,7 @@ public void accept(String file) {
private ExecutorService planExecutorService = ThreadPools.getWorkerPool();
private Boolean incrementalCleanup;
private boolean specifiedSnapshotId = false;
private boolean cleanExpiredMetadata = false;

RemoveSnapshots(TableOperations ops) {
this.ops = ops;
Expand Down Expand Up @@ -159,6 +161,12 @@ public ExpireSnapshots planWith(ExecutorService executorService) {
return this;
}

@Override
public ExpireSnapshots cleanExpiredMetadata(boolean clean) {
this.cleanExpiredMetadata = clean;
return this;
}

@Override
public List<Snapshot> apply() {
TableMetadata updated = internalApply();
Expand Down Expand Up @@ -209,6 +217,25 @@ private TableMetadata internalApply() {
.forEach(idsToRemove::add);
updatedMetaBuilder.removeSnapshots(idsToRemove);

if (cleanExpiredMetadata) {
// TODO: Support cleaning expired schema as well.
Set<Integer> reachableSpecs = Sets.newConcurrentHashSet();
reachableSpecs.add(base.defaultSpecId());
Tasks.foreach(idsToRetain)
.executeWith(planExecutorService)
.run(
snapshot ->
base.snapshot(snapshot).allManifests(ops.io()).stream()
.map(ManifestFile::partitionSpecId)
.forEach(reachableSpecs::add));
Set<Integer> specsToRemove =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: newline after a loop.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addressed.

base.specs().stream()
.map(PartitionSpec::specId)
.filter(specId -> !reachableSpecs.contains(specId))
.collect(Collectors.toSet());
updatedMetaBuilder.removeSpecs(specsToRemove);
}

return updatedMetaBuilder.build();
}

Expand Down
19 changes: 19 additions & 0 deletions core/src/main/java/org/apache/iceberg/TableMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -1108,6 +1108,25 @@ public Builder setDefaultPartitionSpec(int specId) {
return this;
}

Builder removeSpecs(Iterable<Integer> specIds) {
Set<Integer> specIdsToRemove = Sets.newHashSet();
for (Integer specId : specIds) {
Preconditions.checkArgument(
specId != defaultSpecId, "Cannot remove default partition spec");
PartitionSpec toBeRemoved = specsById.remove(specId);
if (toBeRemoved != null) {
specIdsToRemove.add(specId);
}
}
advancedxy marked this conversation as resolved.
Show resolved Hide resolved

this.specs =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we be validating that all the specs we are trying to remove exist? I think this may be fine, just thinking about commit conflicts

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we be validating that all the specs we are trying to remove exist?

I think so, and L1129 - L1133 indeed ensures the spec to be removed are existed in current table metadata.

I think this may be fine, just thinking about commit conflicts

If you are talking about concurrent RemovedUnusedSpecs, I think it's fine and reasonable to have only one attempt succeeded once at time. For other concurrent commit, there's no conflict and retrying should do the trick.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general I'm in favor of succeeding in cases like this. There's no need to fail something if it has an accidental retry after the first attempt succeeded.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's no need to fail something if it has an accidental retry after the first attempt succeeded.

This is a very good point, let me reconsider this part.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in 190dde6, removing an already removed unused spec should not cause a failure then.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that case, is it really necessary to validate that a spec was removed and filter the list? If it is a noop to remove an unknown spec ID, then we can simplify the first half of this method:

    Preconditions.checkArgument(!specIds.contains(defaultSpecId), "Cannot remove the default partition spec");

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated, let me know if this is the desired change.

specs.stream()
.filter(s -> !specIdsToRemove.contains(s.specId()))
.collect(Collectors.toList());
changes.add(new MetadataUpdate.RemovePartitionSpecs(specIdsToRemove));
return this;
}

public Builder addPartitionSpec(UnboundPartitionSpec spec) {
addPartitionSpecInternal(spec.bind(schemasById.get(currentSchemaId)));
return this;
Expand Down
23 changes: 23 additions & 0 deletions core/src/main/java/org/apache/iceberg/UpdateRequirements.java
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ private Builder update(MetadataUpdate update) {
update((MetadataUpdate.SetDefaultPartitionSpec) update);
} else if (update instanceof MetadataUpdate.SetDefaultSortOrder) {
update((MetadataUpdate.SetDefaultSortOrder) update);
} else if (update instanceof MetadataUpdate.RemovePartitionSpecs) {
update((MetadataUpdate.RemovePartitionSpecs) update);
}

return this;
Expand Down Expand Up @@ -173,6 +175,27 @@ private void update(MetadataUpdate.SetDefaultSortOrder unused) {
}
}

private void update(MetadataUpdate.RemovePartitionSpecs unused) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add a test to TestUpdateRequirements

// require that the default partition spec has not changed
if (!setSpecId) {
if (base != null && !isReplace) {
require(new UpdateRequirement.AssertDefaultSpecID(base.defaultSpecId()));
}
this.setSpecId = true;
}

// require that no branches have changed, so that old specs won't be written.
if (base != null && !isReplace) {
base.refs()
.forEach(
(name, ref) -> {
if (ref.isBranch() && !name.equals(SnapshotRef.MAIN_BRANCH)) {
require(new UpdateRequirement.AssertRefSnapshotID(name, ref.snapshotId()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should probably also have a test in TestUpdateRequirements that actually changes the branch and eventually fails

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in addition to this, CatalogTests currently only tests the happy path, but not the exceptions that are either thrown by AssertDefaultSpecID or AssertRefSnapshotID. We need to test both cases in CatalogTests

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated the TestUpdateRequirements. However, it's hard to test the failure cases in CatalogTests: before committing, the internalApply also refreshes the base table, so we cannot create a concurrent update to base table easily, which has to be after the refresh and before commit. Do you have any suggestions?

}
});
}
}

private List<UpdateRequirement> build() {
return requirements.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -926,6 +926,17 @@ public void testRemovePartitionStatistics() {
.isEqualTo(json);
}

@Test
public void testRemovePartitionSpec() {
String action = MetadataUpdateParser.REMOVE_PARTITION_SPECS;
String json = "{\"action\":\"remove-partition-specs\",\"spec-ids\":[1,2,3]}";
MetadataUpdate expected = new MetadataUpdate.RemovePartitionSpecs(ImmutableSet.of(1, 2, 3));
assertEquals(action, expected, MetadataUpdateParser.fromJson(json));
assertThat(MetadataUpdateParser.toJson(expected))
.as("Remove partition specs should convert to the correct JSON value")
.isEqualTo(json);
}

public void assertEquals(
String action, MetadataUpdate expectedUpdate, MetadataUpdate actualUpdate) {
switch (action) {
Expand Down Expand Up @@ -1030,6 +1041,11 @@ public void assertEquals(
(MetadataUpdate.SetCurrentViewVersion) expectedUpdate,
(MetadataUpdate.SetCurrentViewVersion) actualUpdate);
break;
case MetadataUpdateParser.REMOVE_PARTITION_SPECS:
assertEqualsRemovePartitionSpecs(
(MetadataUpdate.RemovePartitionSpecs) expectedUpdate,
(MetadataUpdate.RemovePartitionSpecs) actualUpdate);
break;
default:
fail("Unrecognized metadata update action: " + action);
}
Expand Down Expand Up @@ -1251,6 +1267,11 @@ private static void assertEqualsSetCurrentViewVersion(
assertThat(actual.versionId()).isEqualTo(expected.versionId());
}

private static void assertEqualsRemovePartitionSpecs(
MetadataUpdate.RemovePartitionSpecs expected, MetadataUpdate.RemovePartitionSpecs actual) {
assertThat(actual.specIds()).containsExactlyInAnyOrderElementsOf(expected.specIds());
}

private String createManifestListWithManifestFiles(long snapshotId, Long parentSnapshotId)
throws IOException {
File manifestList = File.createTempFile("manifests", null, temp.toFile());
Expand Down
Loading