-
Notifications
You must be signed in to change notification settings - Fork 175
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FEAT] Iceberg MOR for streaming parquet #2975
Conversation
CodSpeed Performance ReportMerging #2975 will not alter performanceComparing Summary
|
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #2975 +/- ##
==========================================
- Coverage 78.49% 78.46% -0.04%
==========================================
Files 610 610
Lines 71865 71992 +127
==========================================
+ Hits 56413 56485 +72
- Misses 15452 15507 +55
|
@@ -363,7 +363,6 @@ async fn read_parquet_single( | |||
async fn stream_parquet_single( | |||
uri: String, | |||
columns: Option<&[&str]>, | |||
start_offset: Option<usize>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
start_offset is never used for swordfish, which does not rely on unloaded micropartitions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great! Clean refactor. I do have a few questions though:
- It looks like this implementation reads each delete file once per relevant scan task. This means if a file was split into multiple scan tasks, or if a delete file has info about multiple files, it would be read more than once. If doing so would degrade the actual streaming parquet read, don't worry about it, but is it possible to cut down the redundant reads?
- Are there any tests we can re-enable now that this is implemented?
for i in 0..table.len() { | ||
let file = file_paths.get(i); | ||
let pos = positions.get(i); | ||
if let Some(file) = file | ||
&& let Some(pos) = pos | ||
&& file == url | ||
{ | ||
delete_rows.push(pos); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Small nit: you can convert file_paths
and positions
into iterators and zip together, might be cleaner than this for loop since you don't use i
for anything other than indexing.
Swordfish doesn't do any splitting or merging of scan tasks, see: https://github.com/Eventual-Inc/Daft/blob/main/src/daft-physical-plan/src/translate.rs#L8-L30. Reason being that swordfish is parallelized based on morsels instead of partitions, and so there's no need to balance out load across the scan tasks. I think it's possible to read all the delete files across scan tasks prior to executing the scan task, will give it a try. Regarding tests, yes, the whole iceberg integration test suite is enabled now. https://github.com/Eventual-Inc/Daft/pull/2975/files#diff-ee49282f461b4c8ad179f79dd5bcdf93124561074c64a771366caf93e99b9320R471 I just add the native executor flag to the github workflow |
Ok, managed to do the delete file reading prior to scan task execution! |
pub(crate) type TaskSet<T> = tokio::task::JoinSet<T>; | ||
pub(crate) fn create_task_set<T>() -> TaskSet<T> { | ||
tokio::task::JoinSet::new() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why wrap tokio::task::JoinSet
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wanna keep the operators agnostic from the implementation details, to reduce complexity.
.flatten() | ||
.cloned() | ||
}) | ||
.collect::<Vec<_>>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there may be delete files that are shared between scan tasks. Perhaps you should collect into a HashSet instead
This PR implements iceberg MOR for streaming parquet reads.
Notes:
read_parquet_bulk
function to read them. However the native executor is already running in a Tokio runtime, so it needs to runread_parquet_bulk
in the IO pool. This PR refactors theread_parquet_bulk
method such that it can be ran in this manner.