-
Notifications
You must be signed in to change notification settings - Fork 2.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Iceberg delete files are read multiple times during query processing causing delays #6527
Conversation
3fb3746
to
b576956
Compare
17509c5
to
8962bc9
Compare
9c4dc58
to
dc7af64
Compare
@Override | ||
@SuppressWarnings("CollectionUndefinedEquality") | ||
protected boolean shouldKeep(T posDelete) { | ||
return dataLocation.contains(FILENAME_ACCESSOR.get(posDelete)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does this remove the optimization above?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
restored
* | ||
* @return an {@link ExecutorService} that uses delete worker pool | ||
*/ | ||
public static ExecutorService getDeleteWorkerPool() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you remove the changes that overlap with #6432?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed, however, #6432 looks abandoned
roaring64Bitmap.or(((BitmapPositionDeleteIndex) deleteIndex).roaring64Bitmap); | ||
return this; | ||
} | ||
throw new IllegalArgumentException(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should not throw an exception with no context. Please use Preconditions
and produce a helpful error message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
deleteFiles, | ||
deletes -> | ||
CloseableIterable.transform( | ||
locationFilter.filter(deletes), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changing the filter has removed the need for having one in the first place. Instead, I think this should use CloseabileIterable.filter
directly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
CloseableIterable.transform( | ||
locationFilter.filter(deletes), | ||
row -> | ||
Pair.of( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's no need to convert to Pair
only to consume those pairs in the same function. Just use the accessors below.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
deletes.forEach( | ||
entry -> | ||
positionDeleteIndex | ||
.computeIfAbsent(entry.first(), f -> new BitmapPositionDeleteIndex()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of using computeIfAbsent
on every row, this should pre-populate the map using dataLocations
, since those are all known.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i don't see any issues here as nothing extra is done when key exists:
default V computeIfAbsent(K key,
Function<? super K, ? extends V> mappingFunction) {
Objects.requireNonNull(mappingFunction);
V v;
if ((v = get(key)) == null) {
V newValue;
if ((newValue = mappingFunction.apply(key)) != null) {
put(key, newValue);
return newValue;
}
}
return v;
}
@@ -266,13 +291,23 @@ private CloseableIterable<T> createDeleteIterable( | |||
: Deletes.filterDeleted(records, isDeleted, counter); | |||
} | |||
|
|||
static CloseableIterable<Record> openPosDeletes(FileIO io, DeleteFile file) { | |||
InputFile input = io.newInputFile(file.path().toString()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should use getInputFile
instead of calling io.newInputFile
. In Spark and Flink, the input files are already created in a bulk operation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
List<DeleteFile> posDeletes = distinctPosDeletes(fileTasks); | ||
if (posDeletes.isEmpty()) { | ||
return ImmutableMap.of(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please follow style guidelines. Control flow blocks should be separated from the following statement by a line of whitespace.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
spark/v3.3/build.gradle
Outdated
@@ -62,6 +62,7 @@ project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}") { | |||
implementation("org.scala-lang.modules:scala-collection-compat_${scalaVersion}") | |||
|
|||
compileOnly "com.google.errorprone:error_prone_annotations" | |||
compileOnly "com.github.ben-manes.caffeine:caffeine" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These changes look incorrect. Why is this new compile dependency needed when there is no code change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there is a change in GenericReader that could reuse positionalDeletes info between the tasks from the same split, see https://github.com/apache/iceberg/pull/6527/files#diff-98d1b57871903c422d33d86cc7781f33b844cef31c58938218d8fcc439b12131R76-R80
|
||
return posIndexMap; | ||
} | ||
return null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like a correctness bug. This can't ignore deletes if there are too many.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's not. there is a special handling logic in PositionalDeletes class:
Optional<PositionDeleteIndex> positionIndex =
Optional.ofNullable(positionIndexMap).map(cache -> cache.get(filePath));
boolean skipPosDeletes = positionIndexMap != null && !positionIndex.isPresent();
i tried to return just empty Optional
when we can't push the delete ids into memory, however, I think that just complicated the code
@rdblue, thank you for the review! I've addressed most of the comments and provided answers for others |
@rdblue, gentle reminder. Please take a look once you get a chance. |
@aokolnychyi, could you please help with the review? thanks! |
I believe @bryanck also ran into this, he might be interested in reviewing this as well |
ref https://issues.apache.org/jira/browse/HIVE-26714
Current optimization covers only positional deletes by creating a PositionDeleteIndex bitmap for every task in a combined TableScan, avoiding multiple delete file reads for each task.