Skip to content

Commit

Permalink
Core, Data, Spark 3.5: Support file and partition delete granularity (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
aokolnychyi authored Jan 2, 2024
1 parent ad42314 commit e7999a1
Show file tree
Hide file tree
Showing 18 changed files with 846 additions and 35 deletions.
45 changes: 45 additions & 0 deletions api/src/main/java/org/apache/iceberg/util/CharSequenceUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.util;

public class CharSequenceUtil {

private CharSequenceUtil() {}

public static boolean unequalPaths(CharSequence s1, CharSequence s2) {
if (s1 == s2) {
return false;
}

int s1Length = s1.length();
int s2Length = s2.length();

if (s1Length != s2Length) {
return true;
}

for (int index = s1Length - 1; index >= 0; index--) {
if (s1.charAt(index) != s2.charAt(index)) {
return true;
}
}

return false;
}
}
4 changes: 4 additions & 0 deletions core/src/main/java/org/apache/iceberg/TableProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.iceberg;

import java.util.Set;
import org.apache.iceberg.deletes.DeleteGranularity;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;

public class TableProperties {
Expand Down Expand Up @@ -334,6 +335,9 @@ private TableProperties() {}
public static final String MAX_REF_AGE_MS = "history.expire.max-ref-age-ms";
public static final long MAX_REF_AGE_MS_DEFAULT = Long.MAX_VALUE;

public static final String DELETE_GRANULARITY = "write.delete.granularity";
public static final String DELETE_GRANULARITY_DEFAULT = DeleteGranularity.PARTITION.toString();

public static final String DELETE_ISOLATION_LEVEL = "write.delete.isolation-level";
public static final String DELETE_ISOLATION_LEVEL_DEFAULT = "serializable";

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.deletes;

import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

/**
* An enum that represents the granularity of deletes.
*
* <p>Under partition granularity, delete writers are directed to group deletes for different data
* files into one delete file. This strategy tends to reduce the total number of delete files in the
* table. However, a scan for a single data file will require reading delete information for
* multiple data files even if those other files are not required for the scan. All irrelevant
* deletes will be discarded by readers but reading this extra information will cause overhead. The
* overhead can potentially be mitigated via delete file caching.
*
* <p>Under file granularity, delete writers always organize deletes by their target data file,
* creating separate delete files for each referenced data file. This strategy ensures the job
* planning does not assign irrelevant deletes to data files and readers only load necessary delete
* information. However, it also increases the total number of delete files in the table and may
* require a more aggressive approach for delete file compaction.
*
* <p>Currently, this configuration is only applicable to position deletes.
*
* <p>Each granularity has its own benefits and drawbacks and should be picked based on a use case.
* Regular delete compaction is still required regardless of which granularity is chosen. It is also
* possible to use one granularity for ingestion and another one for table maintenance.
*/
public enum DeleteGranularity {
FILE,
PARTITION;

@Override
public String toString() {
switch (this) {
case FILE:
return "file";
case PARTITION:
return "partition";
default:
throw new IllegalArgumentException("Unknown delete granularity: " + this);
}
}

public static DeleteGranularity fromString(String valueAsString) {
Preconditions.checkArgument(valueAsString != null, "Value is null");
if (FILE.toString().equalsIgnoreCase(valueAsString)) {
return FILE;
} else if (PARTITION.toString().equalsIgnoreCase(valueAsString)) {
return PARTITION;
} else {
throw new IllegalArgumentException("Unknown delete granularity: " + valueAsString);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.deletes;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List;
import java.util.function.Supplier;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.io.DeleteWriteResult;
import org.apache.iceberg.io.FileWriter;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.util.CharSequenceSet;
import org.apache.iceberg.util.CharSequenceUtil;

/**
* A position delete writer that produces a separate delete file for each referenced data file.
*
* <p>This writer does not keep track of seen deletes and assumes all incoming records are ordered
* by file and position as required by the spec. If there is no external process to order the
* records, consider using {@link SortingPositionOnlyDeleteWriter} instead.
*/
public class FileScopedPositionDeleteWriter<T>
implements FileWriter<PositionDelete<T>, DeleteWriteResult> {

private final Supplier<FileWriter<PositionDelete<T>, DeleteWriteResult>> writers;
private final List<DeleteFile> deleteFiles;
private final CharSequenceSet referencedDataFiles;

private FileWriter<PositionDelete<T>, DeleteWriteResult> currentWriter = null;
private CharSequence currentPath = null;
private boolean closed = false;

public FileScopedPositionDeleteWriter(
Supplier<FileWriter<PositionDelete<T>, DeleteWriteResult>> writers) {
this.writers = writers;
this.deleteFiles = Lists.newArrayList();
this.referencedDataFiles = CharSequenceSet.empty();
}

@Override
public void write(PositionDelete<T> positionDelete) {
writer(positionDelete.path()).write(positionDelete);
}

private FileWriter<PositionDelete<T>, DeleteWriteResult> writer(CharSequence path) {
if (currentWriter == null) {
openCurrentWriter(path);
} else if (CharSequenceUtil.unequalPaths(currentPath, path)) {
closeCurrentWriter();
openCurrentWriter(path);
}

return currentWriter;
}

@Override
public long length() {
throw new UnsupportedOperationException(getClass().getName() + " does not implement length");
}

@Override
public DeleteWriteResult result() {
Preconditions.checkState(closed, "Cannot get result from unclosed writer");
return new DeleteWriteResult(deleteFiles, referencedDataFiles);
}

@Override
public void close() throws IOException {
if (!closed) {
closeCurrentWriter();
this.closed = true;
}
}

private void openCurrentWriter(CharSequence path) {
Preconditions.checkState(!closed, "Writer has already been closed");
this.currentWriter = writers.get();
this.currentPath = path;
}

private void closeCurrentWriter() {
if (currentWriter != null) {
try {
currentWriter.close();
DeleteWriteResult result = currentWriter.result();
deleteFiles.addAll(result.deleteFiles());
referencedDataFiles.addAll(result.referencedDataFiles());
this.currentWriter = null;
this.currentPath = null;
} catch (IOException e) {
throw new UncheckedIOException("Failed to close current writer", e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,17 @@
package org.apache.iceberg.deletes;

import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.function.Supplier;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.io.DeleteWriteResult;
import org.apache.iceberg.io.FileWriter;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Comparators;
import org.apache.iceberg.util.CharSequenceMap;
import org.apache.iceberg.util.CharSequenceSet;
import org.roaringbitmap.longlong.PeekableLongIterator;
import org.roaringbitmap.longlong.Roaring64Bitmap;

Expand All @@ -41,12 +46,20 @@
public class SortingPositionOnlyDeleteWriter<T>
implements FileWriter<PositionDelete<T>, DeleteWriteResult> {

private final FileWriter<PositionDelete<T>, DeleteWriteResult> writer;
private final Supplier<FileWriter<PositionDelete<T>, DeleteWriteResult>> writers;
private final DeleteGranularity granularity;
private final CharSequenceMap<Roaring64Bitmap> positionsByPath;
private DeleteWriteResult result = null;

public SortingPositionOnlyDeleteWriter(FileWriter<PositionDelete<T>, DeleteWriteResult> writer) {
this.writer = writer;
this(() -> writer, DeleteGranularity.PARTITION);
}

public SortingPositionOnlyDeleteWriter(
Supplier<FileWriter<PositionDelete<T>, DeleteWriteResult>> writers,
DeleteGranularity granularity) {
this.writers = writers;
this.granularity = granularity;
this.positionsByPath = CharSequenceMap.create();
}

Expand All @@ -60,7 +73,7 @@ public void write(PositionDelete<T> positionDelete) {

@Override
public long length() {
return writer.length();
throw new UnsupportedOperationException(getClass().getName() + " does not implement length");
}

@Override
Expand All @@ -71,14 +84,44 @@ public DeleteWriteResult result() {
@Override
public void close() throws IOException {
if (result == null) {
this.result = writeDeletes();
switch (granularity) {
case FILE:
this.result = writeFileDeletes();
return;
case PARTITION:
this.result = writePartitionDeletes();
return;
default:
throw new UnsupportedOperationException("Unsupported delete granularity: " + granularity);
}
}
}

// write deletes for all data files together
private DeleteWriteResult writePartitionDeletes() throws IOException {
return writeDeletes(positionsByPath.keySet());
}

// write deletes for different data files into distinct delete files
private DeleteWriteResult writeFileDeletes() throws IOException {
List<DeleteFile> deleteFiles = Lists.newArrayList();
CharSequenceSet referencedDataFiles = CharSequenceSet.empty();

for (CharSequence path : positionsByPath.keySet()) {
DeleteWriteResult writeResult = writeDeletes(ImmutableList.of(path));
deleteFiles.addAll(writeResult.deleteFiles());
referencedDataFiles.addAll(writeResult.referencedDataFiles());
}

return new DeleteWriteResult(deleteFiles, referencedDataFiles);
}

private DeleteWriteResult writeDeletes() throws IOException {
private DeleteWriteResult writeDeletes(Collection<CharSequence> paths) throws IOException {
FileWriter<PositionDelete<T>, DeleteWriteResult> writer = writers.get();

try {
PositionDelete<T> positionDelete = PositionDelete.create();
for (CharSequence path : sortedPaths()) {
for (CharSequence path : sort(paths)) {
// the iterator provides values in ascending sorted order
PeekableLongIterator positions = positionsByPath.get(path).getLongIterator();
while (positions.hasNext()) {
Expand All @@ -93,9 +136,13 @@ private DeleteWriteResult writeDeletes() throws IOException {
return writer.result();
}

private List<CharSequence> sortedPaths() {
List<CharSequence> paths = Lists.newArrayList(positionsByPath.keySet());
paths.sort(Comparators.charSequences());
return paths;
private Collection<CharSequence> sort(Collection<CharSequence> paths) {
if (paths.size() <= 1) {
return paths;
} else {
List<CharSequence> sortedPaths = Lists.newArrayList(paths);
sortedPaths.sort(Comparators.charSequences());
return sortedPaths;
}
}
}
Loading

0 comments on commit e7999a1

Please sign in to comment.