-
Notifications
You must be signed in to change notification settings - Fork 2.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
API: Support removeUnusedSpecs in ExpireSnapshots #10755
base: main
Are you sure you want to change the base?
Changes from 10 commits
a75c4af
2f3544c
5841aef
1527c81
5b21438
5ee0f61
0cd6e51
a4635e9
c02354e
323d43a
5f08dfa
989beaa
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,6 +20,7 @@ | |
|
||
import java.io.IOException; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Set; | ||
import java.util.concurrent.ConcurrentHashMap; | ||
import java.util.concurrent.ExecutorService; | ||
|
@@ -256,7 +257,8 @@ public void cleanFiles(TableMetadata beforeExpiration, TableMetadata afterExpira | |
}); | ||
|
||
Set<String> filesToDelete = | ||
findFilesToDelete(manifestsToScan, manifestsToRevert, validIds, afterExpiration); | ||
findFilesToDelete( | ||
manifestsToScan, manifestsToRevert, validIds, beforeExpiration.specsById()); | ||
|
||
deleteFiles(filesToDelete, "data"); | ||
deleteFiles(manifestsToDelete, "manifest"); | ||
|
@@ -273,7 +275,7 @@ private Set<String> findFilesToDelete( | |
Set<ManifestFile> manifestsToScan, | ||
Set<ManifestFile> manifestsToRevert, | ||
Set<Long> validIds, | ||
TableMetadata current) { | ||
Map<Integer, PartitionSpec> specsById) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. +1 for fixing this. In general we should avoid passing huge metadata objects around to helper methods that don't need everything! |
||
Set<String> filesToDelete = ConcurrentHashMap.newKeySet(); | ||
Tasks.foreach(manifestsToScan) | ||
.retry(3) | ||
|
@@ -285,8 +287,7 @@ private Set<String> findFilesToDelete( | |
.run( | ||
manifest -> { | ||
// the manifest has deletes, scan it to find files to delete | ||
try (ManifestReader<?> reader = | ||
ManifestFiles.open(manifest, fileIO, current.specsById())) { | ||
try (ManifestReader<?> reader = ManifestFiles.open(manifest, fileIO, specsById)) { | ||
for (ManifestEntry<?> entry : reader.entries()) { | ||
// if the snapshot ID of the DELETE entry is no longer valid, the data can be | ||
// deleted | ||
|
@@ -311,8 +312,7 @@ private Set<String> findFilesToDelete( | |
.run( | ||
manifest -> { | ||
// the manifest has deletes, scan it to find files to delete | ||
try (ManifestReader<?> reader = | ||
ManifestFiles.open(manifest, fileIO, current.specsById())) { | ||
try (ManifestReader<?> reader = ManifestFiles.open(manifest, fileIO, specsById)) { | ||
for (ManifestEntry<?> entry : reader.entries()) { | ||
// delete any ADDED file from manifests that were reverted | ||
if (entry.status() == ManifestEntry.Status.ADDED) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -41,6 +41,7 @@ | |
import java.util.Set; | ||
import java.util.concurrent.ExecutorService; | ||
import java.util.function.Consumer; | ||
import java.util.stream.Collectors; | ||
import org.apache.iceberg.exceptions.CommitFailedException; | ||
import org.apache.iceberg.exceptions.ValidationException; | ||
import org.apache.iceberg.relocated.com.google.common.base.Preconditions; | ||
|
@@ -85,6 +86,7 @@ public void accept(String file) { | |
private ExecutorService planExecutorService = ThreadPools.getWorkerPool(); | ||
private Boolean incrementalCleanup; | ||
private boolean specifiedSnapshotId = false; | ||
private boolean cleanExpiredMetadata = false; | ||
|
||
RemoveSnapshots(TableOperations ops) { | ||
this.ops = ops; | ||
|
@@ -159,6 +161,12 @@ public ExpireSnapshots planWith(ExecutorService executorService) { | |
return this; | ||
} | ||
|
||
@Override | ||
public ExpireSnapshots cleanExpiredMetadata(boolean clean) { | ||
this.cleanExpiredMetadata = clean; | ||
return this; | ||
} | ||
|
||
@Override | ||
public List<Snapshot> apply() { | ||
TableMetadata updated = internalApply(); | ||
|
@@ -209,6 +217,25 @@ private TableMetadata internalApply() { | |
.forEach(idsToRemove::add); | ||
updatedMetaBuilder.removeSnapshots(idsToRemove); | ||
|
||
if (cleanExpiredMetadata) { | ||
// TODO: Support cleaning expired schema as well. | ||
Set<Integer> reachableSpecs = Sets.newConcurrentHashSet(); | ||
reachableSpecs.add(base.defaultSpecId()); | ||
Tasks.foreach(idsToRetain) | ||
.executeWith(planExecutorService) | ||
.run( | ||
snapshot -> | ||
base.snapshot(snapshot).allManifests(ops.io()).stream() | ||
.map(ManifestFile::partitionSpecId) | ||
.forEach(reachableSpecs::add)); | ||
Set<Integer> specsToRemove = | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: newline after a loop. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. addressed. |
||
base.specs().stream() | ||
.map(PartitionSpec::specId) | ||
.filter(specId -> !reachableSpecs.contains(specId)) | ||
.collect(Collectors.toSet()); | ||
updatedMetaBuilder.removeSpecs(specsToRemove); | ||
} | ||
|
||
return updatedMetaBuilder.build(); | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1108,6 +1108,25 @@ public Builder setDefaultPartitionSpec(int specId) { | |
return this; | ||
} | ||
|
||
Builder removeSpecs(Iterable<Integer> specIds) { | ||
Set<Integer> specIdsToRemove = Sets.newHashSet(); | ||
for (Integer specId : specIds) { | ||
Preconditions.checkArgument( | ||
specId != defaultSpecId, "Cannot remove default partition spec"); | ||
PartitionSpec toBeRemoved = specsById.remove(specId); | ||
if (toBeRemoved != null) { | ||
specIdsToRemove.add(specId); | ||
} | ||
} | ||
advancedxy marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
this.specs = | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we be validating that all the specs we are trying to remove exist? I think this may be fine, just thinking about commit conflicts There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I think so, and L1129 - L1133 indeed ensures the spec to be removed are existed in current table metadata.
If you are talking about concurrent There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In general I'm in favor of succeeding in cases like this. There's no need to fail something if it has an accidental retry after the first attempt succeeded. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
This is a very good point, let me reconsider this part. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Addressed in 190dde6, removing an already removed unused spec should not cause a failure then. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In that case, is it really necessary to validate that a spec was removed and filter the list? If it is a noop to remove an unknown spec ID, then we can simplify the first half of this method: Preconditions.checkArgument(!specIds.contains(defaultSpecId), "Cannot remove the default partition spec"); There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Updated, let me know if this is the desired change. |
||
specs.stream() | ||
.filter(s -> !specIdsToRemove.contains(s.specId())) | ||
.collect(Collectors.toList()); | ||
changes.add(new MetadataUpdate.RemovePartitionSpecs(specIdsToRemove)); | ||
return this; | ||
} | ||
|
||
public Builder addPartitionSpec(UnboundPartitionSpec spec) { | ||
addPartitionSpecInternal(spec.bind(schemasById.get(currentSchemaId))); | ||
return this; | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -105,6 +105,8 @@ private Builder update(MetadataUpdate update) { | |
update((MetadataUpdate.SetDefaultPartitionSpec) update); | ||
} else if (update instanceof MetadataUpdate.SetDefaultSortOrder) { | ||
update((MetadataUpdate.SetDefaultSortOrder) update); | ||
} else if (update instanceof MetadataUpdate.RemovePartitionSpecs) { | ||
update((MetadataUpdate.RemovePartitionSpecs) update); | ||
} | ||
|
||
return this; | ||
|
@@ -173,6 +175,27 @@ private void update(MetadataUpdate.SetDefaultSortOrder unused) { | |
} | ||
} | ||
|
||
private void update(MetadataUpdate.RemovePartitionSpecs unused) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. please add a test to |
||
// require that the default partition spec has not changed | ||
if (!setSpecId) { | ||
if (base != null && !isReplace) { | ||
require(new UpdateRequirement.AssertDefaultSpecID(base.defaultSpecId())); | ||
} | ||
this.setSpecId = true; | ||
} | ||
|
||
// require that no branches have changed, so that old specs won't be written. | ||
if (base != null && !isReplace) { | ||
base.refs() | ||
.forEach( | ||
(name, ref) -> { | ||
if (ref.isBranch() && !name.equals(SnapshotRef.MAIN_BRANCH)) { | ||
require(new UpdateRequirement.AssertRefSnapshotID(name, ref.snapshotId())); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we should probably also have a test in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. in addition to this, There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I updated the |
||
} | ||
}); | ||
} | ||
} | ||
|
||
private List<UpdateRequirement> build() { | ||
return requirements.build(); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a bit verbose. I'd recommend being more direct and not making it a full sentence. Something like:
The same could apply to the method description:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, Updated.