Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

API: Support removeUnusedSpecs in ExpireSnapshots #10755

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
12 changes: 12 additions & 0 deletions api/src/main/java/org/apache/iceberg/ExpireSnapshots.java
Original file line number Diff line number Diff line change
Expand Up @@ -118,4 +118,16 @@ public interface ExpireSnapshots extends PendingUpdate<List<Snapshot>> {
* @return this for method chaining
*/
ExpireSnapshots cleanExpiredFiles(boolean clean);

/**
* Allows removal of unreachable partition specs as part of the expiration operation
*
* @param removeUnusedSpecs setting this to true will remove partition specs that are no longer
* reachable by any snapshot
* @return this for method chaining
*/
default ExpireSnapshots removeUnusedSpecs(boolean removeUnusedSpecs) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we make this API a more generic removeUnusedTableMetadata? This goes back to the previous discussion that removing schemas and partition specs, requires the same level of work that the implementation has to do so imo there's not much value in separating them and forcing a user to chain multiple methods. Generally if they want to remove unused specs, they probably also want to remove unused schemas as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, while I think we generally try and avoid boolean arguments in APIs, this may be one case where it makes sense. Down the line, if we want to make this behavior the default and have a path for users to disable cleanup of specs/schemas, they can.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure we want to even allow the option to not do this. Is there a benefit to leaving a spec or schema in place if it is no longer in use?

That said, I would be fine with just having a "cleanMetadata(boolean cleanMetadata: True)"

Copy link
Contributor

@amogh-jahagirdar amogh-jahagirdar Nov 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@RussellSpitzer I was thinking about not exposing the API at all and doing the cleanup by default in the implementation since it's true that there's no real benefit to keeping the spec/schema in place.

The rationale for having the API is more REST + compatibility related.

A bit ago, we added the ability to send remove spec updates to the server and if we change the snapshot expiration implementation to just always remove metadata in the implementation, servers may not be able to handle the new message yet since it was recently added as a possible update type and services would perhaps unnecessarily fail the entire commit as part of expiration since the service would say spec removal is unsupported. Even though the service could've removed the snapshots.

In the current model a client would opt-in knowing that the service would support the spec removal.
There may be a different way to handle this though so we can keep it all implicit in the procedure though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That said, I would be fine with just having a "cleanMetadata(boolean cleanMetadata: True)"

I think this is a good candidate, or we should be more specific like cleanExpiredFiles, we should call it cleanExpiredMeta(boolean clean). WDYT? @RussellSpitzer @amogh-jahagirdar

The rationale for having the API is more REST + compatibility related.

This is well thought. I'm in favor of exposing this as an API. As for the boolean parameter, I think it would be consistent with cleanExpiredFiles and it would be easier to call it in a fluent way when expiring files and meta are determined by external caller.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated. PTAL again @RussellSpitzer @amogh-jahagirdar

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @advancedxy ! I'm in favor of the client side API option for now, just had a comment on the name of it.

I think there's an important discussion to be had on how REST servers can indicate to clients the specific supported update types, this can either be done through config endpoint or through the existing endpoints discovery mechanism where each update type is attached as a query fragment to the endpoint and clients just use that.

My opinion is that it's not worth blocking this on a REST protocol discussion since we can always just come back and deprecate this client side option and have it run as part of the default procedure implementation.

throw new UnsupportedOperationException(
this.getClass().getName() + " doesn't implement removeUnusedSpecs");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -256,7 +257,8 @@ public void cleanFiles(TableMetadata beforeExpiration, TableMetadata afterExpira
});

Set<String> filesToDelete =
findFilesToDelete(manifestsToScan, manifestsToRevert, validIds, afterExpiration);
findFilesToDelete(
manifestsToScan, manifestsToRevert, validIds, beforeExpiration.specsById());

deleteFiles(filesToDelete, "data");
deleteFiles(manifestsToDelete, "manifest");
Expand All @@ -273,7 +275,7 @@ private Set<String> findFilesToDelete(
Set<ManifestFile> manifestsToScan,
Set<ManifestFile> manifestsToRevert,
Set<Long> validIds,
TableMetadata current) {
Map<Integer, PartitionSpec> specsById) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for fixing this. In general we should avoid passing huge metadata objects around to helper methods that don't need everything!

Set<String> filesToDelete = ConcurrentHashMap.newKeySet();
Tasks.foreach(manifestsToScan)
.retry(3)
Expand All @@ -285,8 +287,7 @@ private Set<String> findFilesToDelete(
.run(
manifest -> {
// the manifest has deletes, scan it to find files to delete
try (ManifestReader<?> reader =
ManifestFiles.open(manifest, fileIO, current.specsById())) {
try (ManifestReader<?> reader = ManifestFiles.open(manifest, fileIO, specsById)) {
for (ManifestEntry<?> entry : reader.entries()) {
// if the snapshot ID of the DELETE entry is no longer valid, the data can be
// deleted
Expand All @@ -311,8 +312,7 @@ private Set<String> findFilesToDelete(
.run(
manifest -> {
// the manifest has deletes, scan it to find files to delete
try (ManifestReader<?> reader =
ManifestFiles.open(manifest, fileIO, current.specsById())) {
try (ManifestReader<?> reader = ManifestFiles.open(manifest, fileIO, specsById)) {
for (ManifestEntry<?> entry : reader.entries()) {
// delete any ADDED file from manifests that were reverted
if (entry.status() == ManifestEntry.Status.ADDED) {
Expand Down
17 changes: 17 additions & 0 deletions core/src/main/java/org/apache/iceberg/MetadataUpdate.java
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,23 @@ public void applyTo(TableMetadata.Builder metadataBuilder) {
}
}

class RemovePartitionSpecs implements MetadataUpdate {
private final Set<Integer> specIds;

public RemovePartitionSpecs(Set<Integer> specIds) {
this.specIds = specIds;
}

public Set<Integer> specIds() {
return specIds;
}

@Override
public void applyTo(TableMetadata.Builder metadataBuilder) {
metadataBuilder.removeUnusedSpecs(specIds);
}
}

class AddSortOrder implements MetadataUpdate {
private final UnboundSortOrder sortOrder;

Expand Down
20 changes: 20 additions & 0 deletions core/src/main/java/org/apache/iceberg/MetadataUpdateParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ private MetadataUpdateParser() {}
static final String SET_CURRENT_VIEW_VERSION = "set-current-view-version";
static final String SET_PARTITION_STATISTICS = "set-partition-statistics";
static final String REMOVE_PARTITION_STATISTICS = "remove-partition-statistics";
static final String REMOVE_PARTITION_SPECS = "remove-partition-specs";

// AssignUUID
private static final String UUID = "uuid";
Expand Down Expand Up @@ -126,6 +127,9 @@ private MetadataUpdateParser() {}
// SetCurrentViewVersion
private static final String VIEW_VERSION_ID = "view-version-id";

// RemovePartitionSpecs
private static final String PARTITION_SPEC_IDS = "partition-spec-ids";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add tests for this to TestMetadataUpdateParser

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also this is called spec-ids in the OpenAPI definition:


private static final Map<Class<? extends MetadataUpdate>, String> ACTIONS =
ImmutableMap.<Class<? extends MetadataUpdate>, String>builder()
.put(MetadataUpdate.AssignUUID.class, ASSIGN_UUID)
Expand All @@ -149,6 +153,7 @@ private MetadataUpdateParser() {}
.put(MetadataUpdate.SetLocation.class, SET_LOCATION)
.put(MetadataUpdate.AddViewVersion.class, ADD_VIEW_VERSION)
.put(MetadataUpdate.SetCurrentViewVersion.class, SET_CURRENT_VIEW_VERSION)
.put(MetadataUpdate.RemovePartitionSpecs.class, REMOVE_PARTITION_SPECS)
.buildOrThrow();

public static String toJson(MetadataUpdate metadataUpdate) {
Expand Down Expand Up @@ -241,6 +246,9 @@ public static void toJson(MetadataUpdate metadataUpdate, JsonGenerator generator
writeSetCurrentViewVersionId(
(MetadataUpdate.SetCurrentViewVersion) metadataUpdate, generator);
break;
case REMOVE_PARTITION_SPECS:
writeRemovePartitionSpecs((MetadataUpdate.RemovePartitionSpecs) metadataUpdate, generator);
advancedxy marked this conversation as resolved.
Show resolved Hide resolved
break;
default:
throw new IllegalArgumentException(
String.format(
Expand Down Expand Up @@ -312,6 +320,8 @@ public static MetadataUpdate fromJson(JsonNode jsonNode) {
return readAddViewVersion(jsonNode);
case SET_CURRENT_VIEW_VERSION:
return readCurrentViewVersionId(jsonNode);
case REMOVE_PARTITION_SPECS:
return readRemoveUnusedSpecs(jsonNode);
default:
throw new UnsupportedOperationException(
String.format("Cannot convert metadata update action to json: %s", action));
Expand Down Expand Up @@ -447,6 +457,11 @@ private static void writeSetCurrentViewVersionId(
gen.writeNumberField(VIEW_VERSION_ID, metadataUpdate.versionId());
}

private static void writeRemovePartitionSpecs(
MetadataUpdate.RemovePartitionSpecs metadataUpdate, JsonGenerator gen) throws IOException {
JsonUtil.writeIntegerArray(PARTITION_SPEC_IDS, metadataUpdate.specIds(), gen);
}

private static MetadataUpdate readAssignUUID(JsonNode node) {
String uuid = JsonUtil.getString(UUID, node);
return new MetadataUpdate.AssignUUID(uuid);
Expand Down Expand Up @@ -596,4 +611,9 @@ private static MetadataUpdate readAddViewVersion(JsonNode node) {
private static MetadataUpdate readCurrentViewVersionId(JsonNode node) {
return new MetadataUpdate.SetCurrentViewVersion(JsonUtil.getInt(VIEW_VERSION_ID, node));
}

private static MetadataUpdate readRemoveUnusedSpecs(JsonNode node) {
return new MetadataUpdate.RemovePartitionSpecs(
JsonUtil.getIntegerSet(PARTITION_SPEC_IDS, node));
}
}
26 changes: 26 additions & 0 deletions core/src/main/java/org/apache/iceberg/RemoveSnapshots.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
Expand Down Expand Up @@ -85,6 +86,7 @@ public void accept(String file) {
private ExecutorService planExecutorService = ThreadPools.getWorkerPool();
private Boolean incrementalCleanup;
private boolean specifiedSnapshotId = false;
private boolean removeUnusedSpecs = false;

RemoveSnapshots(TableOperations ops) {
this.ops = ops;
Expand Down Expand Up @@ -159,6 +161,12 @@ public ExpireSnapshots planWith(ExecutorService executorService) {
return this;
}

@Override
public ExpireSnapshots removeUnusedSpecs(boolean remove) {
this.removeUnusedSpecs = remove;
return this;
}

@Override
public List<Snapshot> apply() {
TableMetadata updated = internalApply();
Expand Down Expand Up @@ -209,6 +217,24 @@ private TableMetadata internalApply() {
.forEach(idsToRemove::add);
updatedMetaBuilder.removeSnapshots(idsToRemove);

if (removeUnusedSpecs) {
Set<Integer> reachableSpecs = Sets.newConcurrentHashSet();
reachableSpecs.add(base.defaultSpecId());
Tasks.foreach(idsToRetain)
.executeWith(planExecutorService)
.run(
snapshot ->
base.snapshot(snapshot).allManifests(ops.io()).stream()
.map(ManifestFile::partitionSpecId)
.forEach(reachableSpecs::add));
Set<Integer> specsToRemove =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: newline after a loop.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addressed.

base.specs().stream()
.map(PartitionSpec::specId)
.filter(specId -> !reachableSpecs.contains(specId))
.collect(Collectors.toSet());
updatedMetaBuilder.removeUnusedSpecs(specsToRemove);
}

return updatedMetaBuilder.build();
}

Expand Down
18 changes: 18 additions & 0 deletions core/src/main/java/org/apache/iceberg/TableMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -1108,6 +1108,24 @@ public Builder setDefaultPartitionSpec(int specId) {
return this;
}

Builder removeUnusedSpecs(Iterable<Integer> specIds) {
advancedxy marked this conversation as resolved.
Show resolved Hide resolved
Set<Integer> specIdsToRemove = Sets.newHashSet();
for (Integer specId : specIds) {
Preconditions.checkArgument(
specId != defaultSpecId, "Cannot remove default partition spec");
PartitionSpec toBeRemoved = specsById.remove(specId);
if (toBeRemoved != null) {
specIdsToRemove.add(specId);
}
}
advancedxy marked this conversation as resolved.
Show resolved Hide resolved
this.specs =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we be validating that all the specs we are trying to remove exist? I think this may be fine, just thinking about commit conflicts

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we be validating that all the specs we are trying to remove exist?

I think so, and L1129 - L1133 indeed ensures the spec to be removed are existed in current table metadata.

I think this may be fine, just thinking about commit conflicts

If you are talking about concurrent RemovedUnusedSpecs, I think it's fine and reasonable to have only one attempt succeeded once at time. For other concurrent commit, there's no conflict and retrying should do the trick.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general I'm in favor of succeeding in cases like this. There's no need to fail something if it has an accidental retry after the first attempt succeeded.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's no need to fail something if it has an accidental retry after the first attempt succeeded.

This is a very good point, let me reconsider this part.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in 190dde6, removing an already removed unused spec should not cause a failure then.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that case, is it really necessary to validate that a spec was removed and filter the list? If it is a noop to remove an unknown spec ID, then we can simplify the first half of this method:

    Preconditions.checkArgument(!specIds.contains(defaultSpecId), "Cannot remove the default partition spec");

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated, let me know if this is the desired change.

specs.stream()
.filter(s -> !specIdsToRemove.contains(s.specId()))
.collect(Collectors.toList());
changes.add(new MetadataUpdate.RemovePartitionSpecs(specIdsToRemove));
return this;
}

public Builder addPartitionSpec(UnboundPartitionSpec spec) {
addPartitionSpecInternal(spec.bind(schemasById.get(currentSchemaId)));
return this;
Expand Down
22 changes: 22 additions & 0 deletions core/src/main/java/org/apache/iceberg/UpdateRequirements.java
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ private Builder update(MetadataUpdate update) {
update((MetadataUpdate.SetDefaultPartitionSpec) update);
} else if (update instanceof MetadataUpdate.SetDefaultSortOrder) {
update((MetadataUpdate.SetDefaultSortOrder) update);
} else if (update instanceof MetadataUpdate.RemovePartitionSpecs) {
update((MetadataUpdate.RemovePartitionSpecs) update);
}

return this;
Expand Down Expand Up @@ -173,6 +175,26 @@ private void update(MetadataUpdate.SetDefaultSortOrder unused) {
}
}

private void update(MetadataUpdate.RemovePartitionSpecs unused) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add a test to TestUpdateRequirements

// require that the default partition spec has not changed
if (!setSpecId) {
if (base != null && !isReplace) {
require(new UpdateRequirement.AssertDefaultSpecID(base.defaultSpecId()));
}
this.setSpecId = true;
}
// require that all the branch has not changed, so that old specs won't be written.
advancedxy marked this conversation as resolved.
Show resolved Hide resolved
if (base != null && !isReplace) {
base.refs()
.forEach(
(name, ref) -> {
if (ref.isBranch() && !name.equals(SnapshotRef.MAIN_BRANCH)) {
require(new UpdateRequirement.AssertRefSnapshotID(name, ref.snapshotId()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should probably also have a test in TestUpdateRequirements that actually changes the branch and eventually fails

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in addition to this, CatalogTests currently only tests the happy path, but not the exceptions that are either thrown by AssertDefaultSpecID or AssertRefSnapshotID. We need to test both cases in CatalogTests

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated the TestUpdateRequirements. However, it's hard to test the failure cases in CatalogTests: before committing, the internalApply also refreshes the base table, so we cannot create a concurrent update to base table easily, which has to be after the refresh and before commit. Do you have any suggestions?

}
});
}
}

private List<UpdateRequirement> build() {
return requirements.build();
}
Expand Down
85 changes: 85 additions & 0 deletions core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.stream.Collectors;
import org.apache.iceberg.ManifestEntry.Status;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.PositionOutputStream;
import org.apache.iceberg.puffin.Blob;
Expand Down Expand Up @@ -1620,6 +1621,90 @@ public void testRetainFilesOnRetainedBranches() {
assertThat(deletedFiles).isEqualTo(expectedDeletes);
}

@TestTemplate
public void testRemoveSpecDuringExpiration() {
DataFile file =
DataFiles.builder(table.spec())
.withPath("/path/to/data-0.parquet")
.withPartitionPath("data_bucket=0")
.withFileSizeInBytes(10)
.withRecordCount(100)
.build();
table.newAppend().appendFile(file).commit();
Snapshot append = table.currentSnapshot();
String appendManifest =
Iterables.getOnlyElement(
table.currentSnapshot().allManifests(table.io()).stream()
.map(ManifestFile::path)
.collect(Collectors.toList()));
table.newDelete().deleteFile(file).commit();
Snapshot delete = table.currentSnapshot();
String deleteManifest =
Iterables.getOnlyElement(
table.currentSnapshot().allManifests(table.io()).stream()
.map(ManifestFile::path)
.collect(Collectors.toList()));

table.updateSpec().addField("id_bucket", Expressions.bucket("id", 16)).commit();
PartitionSpec idAndDataBucketSpec = table.spec();
DataFile bucketFile =
DataFiles.builder(table.spec())
.withPath("/path/to/data-0-id-0.parquet")
.withFileSizeInBytes(10)
.withRecordCount(100)
.withPartitionPath("data_bucket=0/id_bucket=0")
.build();
table.newAppend().appendFile(bucketFile).commit();

Set<String> deletedFiles = Sets.newHashSet();
// Expiring snapshots should remove the data_bucket partition
removeSnapshots(table)
.expireOlderThan(System.currentTimeMillis())
.removeUnusedSpecs(true)
.deleteWith(deletedFiles::add)
.commit();

assertThat(deletedFiles)
.containsExactlyInAnyOrder(
appendManifest,
deleteManifest,
file.location(),
append.manifestListLocation(),
delete.manifestListLocation());
assertThat(Iterables.getOnlyElement(table.specs().keySet()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
assertThat(Iterables.getOnlyElement(table.specs().keySet()))
assertThat(table.specs().keySet()).containsExactly(idAndDataBucketSpec.specId())

no need to use Iterables.getOnlyElement

.as("Only id_bucket + data_bucket transform should exist")
.isEqualTo(idAndDataBucketSpec.specId());
}

@TestTemplate
public void testRemoveSpecsDoesntRemoveDefaultSpec() throws IOException {
// The default spec for table is bucketed on data, but write using unpartitioned
PartitionSpec dataBucketSpec = table.spec();
DataFile file =
DataFiles.builder(PartitionSpec.unpartitioned())
.withPath("/path/to/data-0.parquet")
.withFileSizeInBytes(10)
.withRecordCount(100)
.build();

table.newAppend().appendFile(file).commit();
Snapshot append = table.currentSnapshot();
table.newDelete().deleteFile(file).commit();

Set<String> deletedFiles = Sets.newHashSet();
// Expiring snapshots should remove only the unpartitioned spec
removeSnapshots(table)
.expireOlderThan(System.currentTimeMillis())
.removeUnusedSpecs(true)
.deleteWith(deletedFiles::add)
.commit();

assertThat(deletedFiles).containsExactlyInAnyOrder(append.manifestListLocation());
assertThat(Iterables.getOnlyElement(table.specs().keySet()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above

.as("Only data_bucket transform should exist")
.isEqualTo(dataBucketSpec.specId());
}

private Set<String> manifestPaths(Snapshot snapshot, FileIO io) {
return snapshot.allManifests(io).stream().map(ManifestFile::path).collect(Collectors.toSet());
}
Expand Down
Loading