-
Notifications
You must be signed in to change notification settings - Fork 2.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Core, Data, Spark 3.5: Support file and partition delete granularity #9384
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
package org.apache.iceberg.util; | ||
|
||
public class CharSequenceUtil { | ||
|
||
private CharSequenceUtil() {} | ||
|
||
public static boolean unequalPaths(CharSequence s1, CharSequence s2) { | ||
if (s1 == s2) { | ||
return false; | ||
} | ||
|
||
int s1Length = s1.length(); | ||
int s2Length = s2.length(); | ||
|
||
if (s1Length != s2Length) { | ||
return true; | ||
} | ||
|
||
for (int index = s1Length - 1; index >= 0; index--) { | ||
if (s1.charAt(index) != s2.charAt(index)) { | ||
return true; | ||
} | ||
} | ||
|
||
return false; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,6 +19,7 @@ | |
package org.apache.iceberg; | ||
|
||
import java.util.Set; | ||
import org.apache.iceberg.deletes.DeleteGranularity; | ||
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; | ||
|
||
public class TableProperties { | ||
|
@@ -334,6 +335,9 @@ private TableProperties() {} | |
public static final String MAX_REF_AGE_MS = "history.expire.max-ref-age-ms"; | ||
public static final long MAX_REF_AGE_MS_DEFAULT = Long.MAX_VALUE; | ||
|
||
public static final String DELETE_GRANULARITY = "write.delete.granularity"; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we add a document for this table property? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will do, same for the write option. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Reverted based on this discussion. |
||
public static final String DELETE_GRANULARITY_DEFAULT = DeleteGranularity.PARTITION.toString(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: I'd use a string so that we are forced to continue supporting it, like the other defaults. This would technically allow someone to change There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, I see you override There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I started with a string constant but then saw what we did for |
||
|
||
public static final String DELETE_ISOLATION_LEVEL = "write.delete.isolation-level"; | ||
public static final String DELETE_ISOLATION_LEVEL_DEFAULT = "serializable"; | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,71 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
package org.apache.iceberg.deletes; | ||
|
||
import org.apache.iceberg.relocated.com.google.common.base.Preconditions; | ||
|
||
/** | ||
* An enum that represents the granularity of deletes. | ||
* | ||
* <p>Under partition granularity, delete writers are directed to group deletes for different data | ||
* files into one delete file. This strategy tends to reduce the total number of delete files in the | ||
* table. However, a scan for a single data file will require reading delete information for | ||
* multiple data files even if those other files are not required for the scan. All irrelevant | ||
* deletes will be discarded by readers but reading this extra information will cause overhead. The | ||
* overhead can potentially be mitigated via delete file caching. | ||
* | ||
* <p>Under file granularity, delete writers always organize deletes by their target data file, | ||
* creating separate delete files for each referenced data file. This strategy ensures the job | ||
* planning does not assign irrelevant deletes to data files and readers only load necessary delete | ||
* information. However, it also increases the total number of delete files in the table and may | ||
* require a more aggressive approach for delete file compaction. | ||
* | ||
* <p>Currently, this configuration is only applicable to position deletes. | ||
* | ||
* <p>Each granularity has its own benefits and drawbacks and should be picked based on a use case. | ||
* Regular delete compaction is still required regardless of which granularity is chosen. It is also | ||
* possible to use one granularity for ingestion and another one for table maintenance. | ||
*/ | ||
public enum DeleteGranularity { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what is the granularity currently ? file ? what is the impact to flink writers ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The current behavior is partition granularity. The new default will match the existing behavior. There is no immediate impact on Flink writes. Equality deletes can only be written with partition granularity at the moment. That said, we should make Flink write position deletes with file granularity no matter what to solve the concurrent data compaction issue. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Flink uses the old writer API right now. We will follow up to change that. |
||
FILE, | ||
PARTITION; | ||
|
||
@Override | ||
public String toString() { | ||
switch (this) { | ||
case FILE: | ||
return "file"; | ||
case PARTITION: | ||
return "partition"; | ||
default: | ||
throw new IllegalArgumentException("Unknown delete granularity: " + this); | ||
} | ||
} | ||
|
||
public static DeleteGranularity fromString(String valueAsString) { | ||
Preconditions.checkArgument(valueAsString != null, "Value is null"); | ||
if (FILE.toString().equalsIgnoreCase(valueAsString)) { | ||
return FILE; | ||
} else if (PARTITION.toString().equalsIgnoreCase(valueAsString)) { | ||
return PARTITION; | ||
} else { | ||
throw new IllegalArgumentException("Unknown delete granularity: " + valueAsString); | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,113 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
package org.apache.iceberg.deletes; | ||
|
||
import java.io.IOException; | ||
import java.io.UncheckedIOException; | ||
import java.util.List; | ||
import java.util.function.Supplier; | ||
import org.apache.iceberg.DeleteFile; | ||
import org.apache.iceberg.io.DeleteWriteResult; | ||
import org.apache.iceberg.io.FileWriter; | ||
import org.apache.iceberg.relocated.com.google.common.base.Preconditions; | ||
import org.apache.iceberg.relocated.com.google.common.collect.Lists; | ||
import org.apache.iceberg.util.CharSequenceSet; | ||
import org.apache.iceberg.util.CharSequenceUtil; | ||
|
||
/** | ||
* A position delete writer that produces a separate delete file for each referenced data file. | ||
* | ||
* <p>This writer does not keep track of seen deletes and assumes all incoming records are ordered | ||
* by file and position as required by the spec. If there is no external process to order the | ||
* records, consider using {@link SortingPositionOnlyDeleteWriter} instead. | ||
*/ | ||
public class FileScopedPositionDeleteWriter<T> | ||
implements FileWriter<PositionDelete<T>, DeleteWriteResult> { | ||
|
||
private final Supplier<FileWriter<PositionDelete<T>, DeleteWriteResult>> writers; | ||
private final List<DeleteFile> deleteFiles; | ||
private final CharSequenceSet referencedDataFiles; | ||
|
||
private FileWriter<PositionDelete<T>, DeleteWriteResult> currentWriter = null; | ||
private CharSequence currentPath = null; | ||
private boolean closed = false; | ||
|
||
public FileScopedPositionDeleteWriter( | ||
Supplier<FileWriter<PositionDelete<T>, DeleteWriteResult>> writers) { | ||
this.writers = writers; | ||
this.deleteFiles = Lists.newArrayList(); | ||
this.referencedDataFiles = CharSequenceSet.empty(); | ||
} | ||
|
||
@Override | ||
public void write(PositionDelete<T> positionDelete) { | ||
writer(positionDelete.path()).write(positionDelete); | ||
} | ||
|
||
private FileWriter<PositionDelete<T>, DeleteWriteResult> writer(CharSequence path) { | ||
if (currentWriter == null) { | ||
openCurrentWriter(path); | ||
} else if (CharSequenceUtil.unequalPaths(currentPath, path)) { | ||
closeCurrentWriter(); | ||
openCurrentWriter(path); | ||
} | ||
|
||
return currentWriter; | ||
} | ||
|
||
@Override | ||
public long length() { | ||
throw new UnsupportedOperationException(getClass().getName() + " does not implement length"); | ||
} | ||
|
||
@Override | ||
public DeleteWriteResult result() { | ||
Preconditions.checkState(closed, "Cannot get result from unclosed writer"); | ||
return new DeleteWriteResult(deleteFiles, referencedDataFiles); | ||
} | ||
|
||
@Override | ||
public void close() throws IOException { | ||
if (!closed) { | ||
closeCurrentWriter(); | ||
this.closed = true; | ||
} | ||
} | ||
|
||
private void openCurrentWriter(CharSequence path) { | ||
Preconditions.checkState(!closed, "Writer has already been closed"); | ||
this.currentWriter = writers.get(); | ||
this.currentPath = path; | ||
} | ||
|
||
private void closeCurrentWriter() { | ||
if (currentWriter != null) { | ||
try { | ||
currentWriter.close(); | ||
DeleteWriteResult result = currentWriter.result(); | ||
deleteFiles.addAll(result.deleteFiles()); | ||
referencedDataFiles.addAll(result.referencedDataFiles()); | ||
this.currentWriter = null; | ||
this.currentPath = null; | ||
} catch (IOException e) { | ||
throw new UncheckedIOException("Failed to close current writer", e); | ||
} | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am still debating the property name. As it stands today, it will be applicable only to position deletes but I am not sure the name has to reflect it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, we would probably want to always use the file granularity for Flink position deletes to solve compaction issues. This property becomes more like a recommendation then.
Any feedback is appreciated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe just
write.position-delete.granularity
? I prefer to use a more precise name and limit the scope of its usage.A while ago I encountered an issue about adjusting the row-group size of Parquet position delete files.
I want to adjust the default row-group size of Parquet pos delete of the tables that I manage to speed up queries (more details are in issue #9149), however I found the parameter
write.delete.parquet.row-group-size-bytes
that controls the row-group size of Parquet pos delete also controls the row-group size of equality delete files. But obviously the row-group sizes applicable to these two type of delete files are not the same.Because we also use equality delete when the data size is small, I cannot directly set a default value of
write.delete.parquet.row-group-size-bytes
for new tables. I can only adjustwrite.delete.parquet.row-group-size-bytes
according to the specific use of each table, which is inconvenient.In fact, I think it is not appropriate to use one parameter to control the row-group size of both position delete files and equality delete files, so I created #9177 to add a separate parameter for the position delete file that only writes the
file_path
andpos
columns.Back to this, IIUC, If we later add a grouping granularity for equality delete, since position delete and equality delete have different characteristics, they will most likely apply different grouping granularity. So I think we'd better make the distinction right from the start, what do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To be honest, I doubt we will ever support this property for equality deletes.
In general, I do get that we may want to configure position and equality deletes differently. We can explore adding an extra namespace. I am still not sure this use case falls into that bucket.
@rdblue @RussellSpitzer @zhongyujiang, thoughts? Do we want a prefix for this config to make it explicit that it only applies to position deletes? Currently, I only note that in the docs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This option makes no sense for equality deletes because they aren't targeted at a single file, so I agree that we won't support it for equality. This is also mostly advisory. It is unlikely that we will support it in Flink and will instead always use file-level granularity. Maybe we won't even want to support this in the long term, if we decide that Spark performs better with file granularity at all times.
I guess where I'm at for this is that I would probably not worry much about it -- but also not add it to documentation since we will probably not want people setting it themselves. I think I'd leave it as
write.delete.granularity
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea not to document it for now is reasonable given that it acts like a recommendation and we are not sure we want to support it in the future. Let's keep the name as is then.
Adding a way to configure position and equality deletes separately is another discussion.