Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Iceberg delete files are read multiple times during query processing causing delays #6527

Closed
wants to merge 2 commits into from

Conversation

deniskuzZ
Copy link
Member

@deniskuzZ deniskuzZ commented Jan 5, 2023

ref https://issues.apache.org/jira/browse/HIVE-26714

Current optimization covers only positional deletes by creating a PositionDeleteIndex bitmap for every task in a combined TableScan, avoiding multiple delete file reads for each task.

@deniskuzZ deniskuzZ force-pushed the master branch 9 times, most recently from 3fb3746 to b576956 Compare January 12, 2023 08:42
@deniskuzZ deniskuzZ changed the title DRAFT: Iceberg delete files are read twice during query processing causing delays Iceberg delete files are read twice during query processing causing delays Jan 12, 2023
@deniskuzZ deniskuzZ force-pushed the master branch 2 times, most recently from 17509c5 to 8962bc9 Compare January 19, 2023 11:41
@github-actions github-actions bot added the build label Jan 19, 2023
@github-actions github-actions bot added the spark label Jan 25, 2023
@github-actions github-actions bot added the MR label Feb 7, 2023
@deniskuzZ deniskuzZ force-pushed the master branch 3 times, most recently from 9c4dc58 to dc7af64 Compare February 8, 2023 08:26
@deniskuzZ deniskuzZ changed the title Iceberg delete files are read twice during query processing causing delays Iceberg delete files are read multiple times during query processing causing delays Apr 24, 2023
@Override
@SuppressWarnings("CollectionUndefinedEquality")
protected boolean shouldKeep(T posDelete) {
return dataLocation.contains(FILENAME_ACCESSOR.get(posDelete));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this remove the optimization above?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

restored

*
* @return an {@link ExecutorService} that uses delete worker pool
*/
public static ExecutorService getDeleteWorkerPool() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you remove the changes that overlap with #6432?

Copy link
Member Author

@deniskuzZ deniskuzZ May 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed, however, #6432 looks abandoned

roaring64Bitmap.or(((BitmapPositionDeleteIndex) deleteIndex).roaring64Bitmap);
return this;
}
throw new IllegalArgumentException();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should not throw an exception with no context. Please use Preconditions and produce a helpful error message.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

deleteFiles,
deletes ->
CloseableIterable.transform(
locationFilter.filter(deletes),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changing the filter has removed the need for having one in the first place. Instead, I think this should use CloseabileIterable.filter directly.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

CloseableIterable.transform(
locationFilter.filter(deletes),
row ->
Pair.of(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's no need to convert to Pair only to consume those pairs in the same function. Just use the accessors below.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

deletes.forEach(
entry ->
positionDeleteIndex
.computeIfAbsent(entry.first(), f -> new BitmapPositionDeleteIndex())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of using computeIfAbsent on every row, this should pre-populate the map using dataLocations, since those are all known.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i don't see any issues here as nothing extra is done when key exists:

default V computeIfAbsent(K key,
            Function<? super K, ? extends V> mappingFunction) {
        Objects.requireNonNull(mappingFunction);
        V v;
        if ((v = get(key)) == null) {
            V newValue;
            if ((newValue = mappingFunction.apply(key)) != null) {
                put(key, newValue);
                return newValue;
            }
        }

        return v;
    }

@@ -266,13 +291,23 @@ private CloseableIterable<T> createDeleteIterable(
: Deletes.filterDeleted(records, isDeleted, counter);
}

static CloseableIterable<Record> openPosDeletes(FileIO io, DeleteFile file) {
InputFile input = io.newInputFile(file.path().toString());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should use getInputFile instead of calling io.newInputFile. In Spark and Flink, the input files are already created in a bulk operation.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

List<DeleteFile> posDeletes = distinctPosDeletes(fileTasks);
if (posDeletes.isEmpty()) {
return ImmutableMap.of();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please follow style guidelines. Control flow blocks should be separated from the following statement by a line of whitespace.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

@@ -62,6 +62,7 @@ project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}") {
implementation("org.scala-lang.modules:scala-collection-compat_${scalaVersion}")

compileOnly "com.google.errorprone:error_prone_annotations"
compileOnly "com.github.ben-manes.caffeine:caffeine"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These changes look incorrect. Why is this new compile dependency needed when there is no code change?

Copy link
Member Author

@deniskuzZ deniskuzZ May 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is a change in GenericReader that could reuse positionalDeletes info between the tasks from the same split, see https://github.com/apache/iceberg/pull/6527/files#diff-98d1b57871903c422d33d86cc7781f33b844cef31c58938218d8fcc439b12131R76-R80


return posIndexMap;
}
return null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like a correctness bug. This can't ignore deletes if there are too many.

Copy link
Member Author

@deniskuzZ deniskuzZ May 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's not. there is a special handling logic in PositionalDeletes class:

    Optional<PositionDeleteIndex> positionIndex =
        Optional.ofNullable(positionIndexMap).map(cache -> cache.get(filePath));

    boolean skipPosDeletes = positionIndexMap != null && !positionIndex.isPresent();

i tried to return just empty Optional when we can't push the delete ids into memory, however, I think that just complicated the code

@deniskuzZ
Copy link
Member Author

@rdblue, thank you for the review! I've addressed most of the comments and provided answers for others

@deniskuzZ
Copy link
Member Author

@rdblue, gentle reminder. Please take a look once you get a chance.

@deniskuzZ
Copy link
Member Author

@aokolnychyi, could you please help with the review? thanks!

@Fokko
Copy link
Contributor

Fokko commented Oct 30, 2023

I believe @bryanck also ran into this, he might be interested in reviewing this as well

@deniskuzZ deniskuzZ closed this Aug 19, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants