Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Core, Data, Spark 3.5: Support file and partition delete granularity #9384

Merged
merged 2 commits into from
Jan 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions api/src/main/java/org/apache/iceberg/util/CharSequenceUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.util;

public class CharSequenceUtil {

private CharSequenceUtil() {}

public static boolean unequalPaths(CharSequence s1, CharSequence s2) {
if (s1 == s2) {
return false;
}

int s1Length = s1.length();
int s2Length = s2.length();

if (s1Length != s2Length) {
return true;
}

for (int index = s1Length - 1; index >= 0; index--) {
if (s1.charAt(index) != s2.charAt(index)) {
return true;
}
}

return false;
}
}
4 changes: 4 additions & 0 deletions core/src/main/java/org/apache/iceberg/TableProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.iceberg;

import java.util.Set;
import org.apache.iceberg.deletes.DeleteGranularity;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;

public class TableProperties {
Expand Down Expand Up @@ -334,6 +335,9 @@ private TableProperties() {}
public static final String MAX_REF_AGE_MS = "history.expire.max-ref-age-ms";
public static final long MAX_REF_AGE_MS_DEFAULT = Long.MAX_VALUE;

public static final String DELETE_GRANULARITY = "write.delete.granularity";
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am still debating the property name. As it stands today, it will be applicable only to position deletes but I am not sure the name has to reflect it.

Copy link
Contributor Author

@aokolnychyi aokolnychyi Dec 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, we would probably want to always use the file granularity for Flink position deletes to solve compaction issues. This property becomes more like a recommendation then.

Any feedback is appreciated.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe just write.position-delete.granularity? I prefer to use a more precise name and limit the scope of its usage.

A while ago I encountered an issue about adjusting the row-group size of Parquet position delete files.
I want to adjust the default row-group size of Parquet pos delete of the tables that I manage to speed up queries (more details are in issue #9149), however I found the parameter write.delete.parquet.row-group-size-bytes that controls the row-group size of Parquet pos delete also controls the row-group size of equality delete files. But obviously the row-group sizes applicable to these two type of delete files are not the same.

Because we also use equality delete when the data size is small, I cannot directly set a default value of write.delete.parquet.row-group-size-bytes for new tables. I can only adjust write.delete.parquet.row-group-size-bytes according to the specific use of each table, which is inconvenient.

In fact, I think it is not appropriate to use one parameter to control the row-group size of both position delete files and equality delete files, so I created #9177 to add a separate parameter for the position delete file that only writes the file_path and pos columns.

Back to this, IIUC, If we later add a grouping granularity for equality delete, since position delete and equality delete have different characteristics, they will most likely apply different grouping granularity. So I think we'd better make the distinction right from the start, what do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be honest, I doubt we will ever support this property for equality deletes.

In general, I do get that we may want to configure position and equality deletes differently. We can explore adding an extra namespace. I am still not sure this use case falls into that bucket.

@rdblue @RussellSpitzer @zhongyujiang, thoughts? Do we want a prefix for this config to make it explicit that it only applies to position deletes? Currently, I only note that in the docs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This option makes no sense for equality deletes because they aren't targeted at a single file, so I agree that we won't support it for equality. This is also mostly advisory. It is unlikely that we will support it in Flink and will instead always use file-level granularity. Maybe we won't even want to support this in the long term, if we decide that Spark performs better with file granularity at all times.

I guess where I'm at for this is that I would probably not worry much about it -- but also not add it to documentation since we will probably not want people setting it themselves. I think I'd leave it as write.delete.granularity.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea not to document it for now is reasonable given that it acts like a recommendation and we are not sure we want to support it in the future. Let's keep the name as is then.

Adding a way to configure position and equality deletes separately is another discussion.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add a document for this table property?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do, same for the write option.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reverted based on this discussion.

public static final String DELETE_GRANULARITY_DEFAULT = DeleteGranularity.PARTITION.toString();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I'd use a string so that we are forced to continue supporting it, like the other defaults. This would technically allow someone to change PARTITION in the code without breaking although it would change the property value.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see you override toString so it's probably fine.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I started with a string constant but then saw what we did for RowLevelOperationMode and decided to follow that for consistency.


public static final String DELETE_ISOLATION_LEVEL = "write.delete.isolation-level";
public static final String DELETE_ISOLATION_LEVEL_DEFAULT = "serializable";

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.deletes;

import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

/**
* An enum that represents the granularity of deletes.
*
* <p>Under partition granularity, delete writers are directed to group deletes for different data
* files into one delete file. This strategy tends to reduce the total number of delete files in the
* table. However, a scan for a single data file will require reading delete information for
* multiple data files even if those other files are not required for the scan. All irrelevant
* deletes will be discarded by readers but reading this extra information will cause overhead. The
* overhead can potentially be mitigated via delete file caching.
*
* <p>Under file granularity, delete writers always organize deletes by their target data file,
* creating separate delete files for each referenced data file. This strategy ensures the job
* planning does not assign irrelevant deletes to data files and readers only load necessary delete
* information. However, it also increases the total number of delete files in the table and may
* require a more aggressive approach for delete file compaction.
*
* <p>Currently, this configuration is only applicable to position deletes.
*
* <p>Each granularity has its own benefits and drawbacks and should be picked based on a use case.
* Regular delete compaction is still required regardless of which granularity is chosen. It is also
* possible to use one granularity for ingestion and another one for table maintenance.
*/
public enum DeleteGranularity {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the granularity currently ? file ? what is the impact to flink writers ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current behavior is partition granularity. The new default will match the existing behavior.

There is no immediate impact on Flink writes. Equality deletes can only be written with partition granularity at the moment. That said, we should make Flink write position deletes with file granularity no matter what to solve the concurrent data compaction issue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Flink uses the old writer API right now. We will follow up to change that.

FILE,
PARTITION;

@Override
public String toString() {
switch (this) {
case FILE:
return "file";
case PARTITION:
return "partition";
default:
throw new IllegalArgumentException("Unknown delete granularity: " + this);
}
}

public static DeleteGranularity fromString(String valueAsString) {
Preconditions.checkArgument(valueAsString != null, "Value is null");
if (FILE.toString().equalsIgnoreCase(valueAsString)) {
return FILE;
} else if (PARTITION.toString().equalsIgnoreCase(valueAsString)) {
return PARTITION;
} else {
throw new IllegalArgumentException("Unknown delete granularity: " + valueAsString);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.deletes;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List;
import java.util.function.Supplier;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.io.DeleteWriteResult;
import org.apache.iceberg.io.FileWriter;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.util.CharSequenceSet;
import org.apache.iceberg.util.CharSequenceUtil;

/**
* A position delete writer that produces a separate delete file for each referenced data file.
*
* <p>This writer does not keep track of seen deletes and assumes all incoming records are ordered
* by file and position as required by the spec. If there is no external process to order the
* records, consider using {@link SortingPositionOnlyDeleteWriter} instead.
*/
public class FileScopedPositionDeleteWriter<T>
implements FileWriter<PositionDelete<T>, DeleteWriteResult> {

private final Supplier<FileWriter<PositionDelete<T>, DeleteWriteResult>> writers;
private final List<DeleteFile> deleteFiles;
private final CharSequenceSet referencedDataFiles;

private FileWriter<PositionDelete<T>, DeleteWriteResult> currentWriter = null;
private CharSequence currentPath = null;
private boolean closed = false;

public FileScopedPositionDeleteWriter(
Supplier<FileWriter<PositionDelete<T>, DeleteWriteResult>> writers) {
this.writers = writers;
this.deleteFiles = Lists.newArrayList();
this.referencedDataFiles = CharSequenceSet.empty();
}

@Override
public void write(PositionDelete<T> positionDelete) {
writer(positionDelete.path()).write(positionDelete);
}

private FileWriter<PositionDelete<T>, DeleteWriteResult> writer(CharSequence path) {
if (currentWriter == null) {
openCurrentWriter(path);
} else if (CharSequenceUtil.unequalPaths(currentPath, path)) {
closeCurrentWriter();
openCurrentWriter(path);
}

return currentWriter;
}

@Override
public long length() {
throw new UnsupportedOperationException(getClass().getName() + " does not implement length");
}

@Override
public DeleteWriteResult result() {
Preconditions.checkState(closed, "Cannot get result from unclosed writer");
return new DeleteWriteResult(deleteFiles, referencedDataFiles);
}

@Override
public void close() throws IOException {
if (!closed) {
closeCurrentWriter();
this.closed = true;
}
}

private void openCurrentWriter(CharSequence path) {
Preconditions.checkState(!closed, "Writer has already been closed");
this.currentWriter = writers.get();
this.currentPath = path;
}

private void closeCurrentWriter() {
if (currentWriter != null) {
try {
currentWriter.close();
DeleteWriteResult result = currentWriter.result();
deleteFiles.addAll(result.deleteFiles());
referencedDataFiles.addAll(result.referencedDataFiles());
this.currentWriter = null;
this.currentPath = null;
} catch (IOException e) {
throw new UncheckedIOException("Failed to close current writer", e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,17 @@
package org.apache.iceberg.deletes;

import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.function.Supplier;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.io.DeleteWriteResult;
import org.apache.iceberg.io.FileWriter;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Comparators;
import org.apache.iceberg.util.CharSequenceMap;
import org.apache.iceberg.util.CharSequenceSet;
import org.roaringbitmap.longlong.PeekableLongIterator;
import org.roaringbitmap.longlong.Roaring64Bitmap;

Expand All @@ -41,12 +46,20 @@
public class SortingPositionOnlyDeleteWriter<T>
implements FileWriter<PositionDelete<T>, DeleteWriteResult> {

private final FileWriter<PositionDelete<T>, DeleteWriteResult> writer;
private final Supplier<FileWriter<PositionDelete<T>, DeleteWriteResult>> writers;
private final DeleteGranularity granularity;
private final CharSequenceMap<Roaring64Bitmap> positionsByPath;
private DeleteWriteResult result = null;

public SortingPositionOnlyDeleteWriter(FileWriter<PositionDelete<T>, DeleteWriteResult> writer) {
this.writer = writer;
this(() -> writer, DeleteGranularity.PARTITION);
}

public SortingPositionOnlyDeleteWriter(
Supplier<FileWriter<PositionDelete<T>, DeleteWriteResult>> writers,
DeleteGranularity granularity) {
this.writers = writers;
this.granularity = granularity;
this.positionsByPath = CharSequenceMap.create();
}

Expand All @@ -60,7 +73,7 @@ public void write(PositionDelete<T> positionDelete) {

@Override
public long length() {
return writer.length();
throw new UnsupportedOperationException(getClass().getName() + " does not implement length");
}

@Override
Expand All @@ -71,14 +84,44 @@ public DeleteWriteResult result() {
@Override
public void close() throws IOException {
if (result == null) {
this.result = writeDeletes();
switch (granularity) {
case FILE:
this.result = writeFileDeletes();
return;
case PARTITION:
this.result = writePartitionDeletes();
return;
default:
throw new UnsupportedOperationException("Unsupported delete granularity: " + granularity);
}
}
}

// write deletes for all data files together
private DeleteWriteResult writePartitionDeletes() throws IOException {
return writeDeletes(positionsByPath.keySet());
}

// write deletes for different data files into distinct delete files
private DeleteWriteResult writeFileDeletes() throws IOException {
List<DeleteFile> deleteFiles = Lists.newArrayList();
CharSequenceSet referencedDataFiles = CharSequenceSet.empty();

for (CharSequence path : positionsByPath.keySet()) {
DeleteWriteResult writeResult = writeDeletes(ImmutableList.of(path));
deleteFiles.addAll(writeResult.deleteFiles());
referencedDataFiles.addAll(writeResult.referencedDataFiles());
}

return new DeleteWriteResult(deleteFiles, referencedDataFiles);
}

private DeleteWriteResult writeDeletes() throws IOException {
private DeleteWriteResult writeDeletes(Collection<CharSequence> paths) throws IOException {
FileWriter<PositionDelete<T>, DeleteWriteResult> writer = writers.get();

try {
PositionDelete<T> positionDelete = PositionDelete.create();
for (CharSequence path : sortedPaths()) {
for (CharSequence path : sort(paths)) {
// the iterator provides values in ascending sorted order
PeekableLongIterator positions = positionsByPath.get(path).getLongIterator();
while (positions.hasNext()) {
Expand All @@ -93,9 +136,13 @@ private DeleteWriteResult writeDeletes() throws IOException {
return writer.result();
}

private List<CharSequence> sortedPaths() {
List<CharSequence> paths = Lists.newArrayList(positionsByPath.keySet());
paths.sort(Comparators.charSequences());
return paths;
private Collection<CharSequence> sort(Collection<CharSequence> paths) {
if (paths.size() <= 1) {
return paths;
} else {
List<CharSequence> sortedPaths = Lists.newArrayList(paths);
sortedPaths.sort(Comparators.charSequences());
return sortedPaths;
}
}
}
Loading