Skip to content

Commit

Permalink
[AMORO-2870] Rewrite the task which contain different eq-delete fields (
Browse files Browse the repository at this point in the history
#3175)

* rewrite multiple delete ids

* update comments

* replace shade guava with google

* replace `computeIfAbsent` with `putIfAbsent` to void projecting schema every time
  • Loading branch information
XBaith authored Oct 18, 2024
1 parent d97597f commit 6982a32
Show file tree
Hide file tree
Showing 3 changed files with 350 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,19 @@

import java.io.Closeable;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.function.Predicate;

public class CloseablePredicate<T> implements Predicate<T>, Closeable {

private final Predicate<T> predicate;

private final Closeable closeable;
private final List<Closeable> closeable;

public CloseablePredicate(Predicate<T> predicate, Closeable closeable) {
public CloseablePredicate(Predicate<T> predicate, Closeable... closeable) {
this.predicate = predicate;
this.closeable = closeable;
this.closeable = Arrays.asList(closeable);
}

@Override
Expand All @@ -40,6 +42,8 @@ public boolean test(T t) {

@Override
public void close() throws IOException {
closeable.close();
for (Closeable closeable : closeable) {
closeable.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,12 @@
import org.apache.amoro.optimizing.RewriteFilesInput;
import org.apache.amoro.shade.guava32.com.google.common.annotations.VisibleForTesting;
import org.apache.amoro.shade.guava32.com.google.common.collect.ImmutableList;
import org.apache.amoro.shade.guava32.com.google.common.collect.ImmutableSet;
import org.apache.amoro.shade.guava32.com.google.common.collect.Iterables;
import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
import org.apache.amoro.shade.guava32.com.google.common.collect.Multimap;
import org.apache.amoro.shade.guava32.com.google.common.collect.Multimaps;
import org.apache.amoro.shade.guava32.com.google.common.collect.Sets;
import org.apache.amoro.shade.guava32.com.google.common.hash.BloomFilter;
import org.apache.amoro.utils.ContentFiles;
import org.apache.amoro.utils.map.StructLikeBaseMap;
Expand All @@ -51,10 +54,12 @@
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.util.Filter;
import org.apache.iceberg.util.StructProjection;
import org.roaringbitmap.longlong.Roaring64Bitmap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Closeable;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
Expand Down Expand Up @@ -90,17 +95,23 @@ public abstract class CombinedDeleteFilter<T extends StructLike> {

private final RewriteFilesInput input;
private final List<DeleteFile> posDeletes;
private final List<DeleteFile> eqDeletes;
// There may have multiple equality delete fields within a rewrite input
private final Multimap<Set<Integer>, DeleteFile> eqDeleteFilesByDeleteIds =
Multimaps.newMultimap(Maps.newHashMap(), Lists::newArrayList);

private Map<String, Roaring64Bitmap> positionMap;

private final Set<String> positionPathSets;

private Set<Integer> deleteIds = new HashSet<>();
// The delete ids are union of all equality delete fields
private final Set<Integer> deleteIds = new HashSet<>();

private CloseablePredicate<StructForDelete<T>> eqPredicate;

private final Schema deleteSchema;
// Include all identifier fields of equality delete files
private final Schema requiredSchema;

private final Map<Set<Integer>, Schema> deleteSchemaByDeleteIds = new HashMap<>();

private StructLikeCollections structLikeCollections = StructLikeCollections.DEFAULT;

Expand All @@ -115,34 +126,19 @@ protected CombinedDeleteFilter(
this.dataRecordCnt =
Arrays.stream(rewriteFilesInput.dataFiles()).mapToLong(ContentFile::recordCount).sum();
ImmutableList.Builder<DeleteFile> posDeleteBuilder = ImmutableList.builder();
ImmutableList.Builder<DeleteFile> eqDeleteBuilder = ImmutableList.builder();
if (rewriteFilesInput.deleteFiles() != null) {
String firstDeleteFilePath = null;
for (ContentFile<?> delete : rewriteFilesInput.deleteFiles()) {
switch (delete.content()) {
case POSITION_DELETES:
posDeleteBuilder.add(ContentFiles.asDeleteFile(delete));
break;
case EQUALITY_DELETES:
if (deleteIds.isEmpty()) {
deleteIds = ImmutableSet.copyOf(ContentFiles.asDeleteFile(delete).equalityFieldIds());
firstDeleteFilePath = delete.path().toString();
} else {
Set<Integer> currentDeleteIds =
ImmutableSet.copyOf(ContentFiles.asDeleteFile(delete).equalityFieldIds());
if (!deleteIds.equals(currentDeleteIds)) {
throw new IllegalArgumentException(
String.format(
"Equality delete files have different delete fields, first equality field ids:[%s],"
+ " current equality field ids:[%s], first delete file path:[%s], "
+ " current delete file path: [%s].",
deleteIds,
currentDeleteIds,
firstDeleteFilePath,
delete.path().toString()));
}
}
eqDeleteBuilder.add(ContentFiles.asDeleteFile(delete));
DeleteFile deleteFile = ContentFiles.asDeleteFile(delete);
Set<Integer> eqFieldIds = Sets.newHashSet(delete.equalityFieldIds());
deleteIds.addAll(eqFieldIds);
eqDeleteFilesByDeleteIds.put(eqFieldIds, deleteFile);
deleteSchemaByDeleteIds.computeIfAbsent(
eqFieldIds, ids -> TypeUtil.select(tableSchema, ids));
break;
default:
throw new UnsupportedOperationException(
Expand All @@ -155,8 +151,7 @@ protected CombinedDeleteFilter(
.map(s -> s.path().toString())
.collect(Collectors.toSet());
this.posDeletes = posDeleteBuilder.build();
this.eqDeletes = eqDeleteBuilder.build();
this.deleteSchema = TypeUtil.select(tableSchema, deleteIds);
this.requiredSchema = TypeUtil.select(tableSchema, deleteIds);

if (structLikeCollections != null) {
this.structLikeCollections = structLikeCollections;
Expand Down Expand Up @@ -186,12 +181,20 @@ public boolean isFilterEqDelete() {

protected abstract AuthenticatedFileIO getFileIO();

/**
* Get all delete ids of equality delete files
*
* <p>For example, if there are two equality delete fields, one is [1, 2] and another is [1], the
* delete ids will be [1, 2].
*
* @return delete ids
*/
public Set<Integer> deleteIds() {
return deleteIds;
}

public boolean hasPosition() {
return posDeletes != null && posDeletes.size() > 0;
return posDeletes != null && !posDeletes.isEmpty();
}

public void close() {
Expand Down Expand Up @@ -232,35 +235,85 @@ private Predicate<StructForDelete<T>> applyEqDeletes() {
return eqPredicate;
}

if (eqDeletes.isEmpty()) {
if (eqDeleteFilesByDeleteIds.isEmpty()) {
return record -> false;
}

InternalRecordWrapper internalRecordWrapper =
new InternalRecordWrapper(deleteSchema.asStruct());
List<Predicate<StructForDelete<T>>> isInDeleteSets = Lists.newArrayList();
List<Closeable> structMapCloseable = Lists.newArrayList();
BloomFilter<StructLike> bloomFilter = initializeBloomFilter();
for (Map.Entry<Set<Integer>, Schema> deleteSchemaEntry : deleteSchemaByDeleteIds.entrySet()) {
Predicate<StructForDelete<T>> predicate =
applyEqDeletesForSchema(deleteSchemaEntry, bloomFilter, structMapCloseable);
isInDeleteSets.add(predicate);
}

BloomFilter<StructLike> bloomFilter = null;
if (filterEqDelete) {
LOG.debug(
"Enable bloom-filter to filter eq-delete, (rewrite + rewrite pos) data count is {}",
dataRecordCnt);
// one million data is about 1.71M memory usage
bloomFilter = BloomFilter.create(StructLikeFunnel.INSTANCE, dataRecordCnt, 0.001);
try (CloseableIterable<Record> deletes =
CloseableIterable.concat(
CloseableIterable.transform(
CloseableIterable.withNoopClose(
Arrays.stream(input.dataFiles()).collect(Collectors.toList())),
s -> openFile(s, deleteSchema)))) {
for (Record record : deletes) {
StructLike identifier = internalRecordWrapper.copyFor(record);
bloomFilter.put(identifier);
}
} catch (IOException e) {
throw new RuntimeException(e);
Predicate<StructForDelete<T>> isInDelete =
isInDeleteSets.stream().reduce(Predicate::or).orElse(record -> false);
this.eqPredicate =
new CloseablePredicate<>(isInDelete, structMapCloseable.toArray(new Closeable[0]));
return isInDelete;
}

private BloomFilter<StructLike> initializeBloomFilter() {
if (!filterEqDelete) {
return null;
}

LOG.debug(
"Enable bloom-filter to filter eq-delete, (rewrite + rewrite pos) data count is {}",
dataRecordCnt);
// one million data is about 1.71M memory usage
BloomFilter<StructLike> bloomFilter =
BloomFilter.create(StructLikeFunnel.INSTANCE, dataRecordCnt, 0.001);

Map<Set<Integer>, InternalRecordWrapper> recordWrappers = Maps.newHashMap();
for (Map.Entry<Set<Integer>, Schema> deleteSchemaEntry : deleteSchemaByDeleteIds.entrySet()) {
Set<Integer> ids = deleteSchemaEntry.getKey();
Schema deleteSchema = deleteSchemaEntry.getValue();

InternalRecordWrapper internalRecordWrapper =
new InternalRecordWrapper(deleteSchema.asStruct());
recordWrappers.put(ids, internalRecordWrapper);
}

try (CloseableIterable<Record> deletes = readRecords()) {
for (Record record : deletes) {
recordWrappers.forEach(
(ids, internalRecordWrapper) -> {
Schema deleteSchema = deleteSchemaByDeleteIds.get(ids);
StructProjection projection =
StructProjection.create(requiredSchema, deleteSchema).wrap(record);
StructLike deletePK = internalRecordWrapper.copyFor(projection);
bloomFilter.put(deletePK);
});
}
} catch (IOException e) {
throw new RuntimeException(e);
}

return bloomFilter;
}

private CloseableIterable<Record> readRecords() {
return CloseableIterable.concat(
CloseableIterable.transform(
CloseableIterable.withNoopClose(
Arrays.stream(input.dataFiles()).collect(Collectors.toList())),
s -> openFile(s, requiredSchema)));
}

private Predicate<StructForDelete<T>> applyEqDeletesForSchema(
Map.Entry<Set<Integer>, Schema> deleteSchemaEntry,
BloomFilter<StructLike> bloomFilter,
List<Closeable> structMapCloseable) {
Set<Integer> ids = deleteSchemaEntry.getKey();
Schema deleteSchema = deleteSchemaEntry.getValue();
Iterable<DeleteFile> eqDeletes = eqDeleteFilesByDeleteIds.get(ids);

InternalRecordWrapper internalRecordWrapper =
new InternalRecordWrapper(deleteSchema.asStruct());

CloseableIterable<RecordWithLsn> deleteRecords =
CloseableIterable.transform(
CloseableIterable.concat(
Expand Down Expand Up @@ -294,23 +347,20 @@ private Predicate<StructForDelete<T>> applyEqDeletes() {
} catch (IOException e) {
throw new RuntimeException(e);
}
structMapCloseable.add(structLikeMap);

return structForDelete -> {
StructProjection deleteProjection =
StructProjection.create(requiredSchema, deleteSchema).wrap(structForDelete.getPk());
StructLike dataPk = internalRecordWrapper.copyFor(deleteProjection);
Long dataLSN = structForDelete.getLsn();
Long deleteLsn = structLikeMap.get(dataPk);
if (deleteLsn == null) {
return false;
}

Predicate<StructForDelete<T>> isInDeleteSet =
structForDelete -> {
StructLike dataPk = internalRecordWrapper.copyFor(structForDelete.getPk());
Long dataLSN = structForDelete.getLsn();
Long deleteLsn = structLikeMap.get(dataPk);
if (deleteLsn == null) {
return false;
}

return deleteLsn.compareTo(dataLSN) > 0;
};

CloseablePredicate<StructForDelete<T>> closeablePredicate =
new CloseablePredicate<>(isInDeleteSet, structLikeMap);
this.eqPredicate = closeablePredicate;
return isInDeleteSet;
return deleteLsn.compareTo(dataLSN) > 0;
};
}

private CloseableIterable<StructForDelete<T>> applyEqDeletes(
Expand All @@ -322,7 +372,7 @@ private CloseableIterable<StructForDelete<T>> applyEqDeletes(
private CloseableIterable<StructForDelete<T>> eqDeletesBase(
CloseableIterable<StructForDelete<T>> records, Predicate<StructForDelete<T>> predicate) {
// Predicate to test whether a row should be visible to user after applying equality deletions.
if (eqDeletes.isEmpty()) {
if (eqDeleteFilesByDeleteIds.isEmpty()) {
return records;
}

Expand Down
Loading

0 comments on commit 6982a32

Please sign in to comment.