Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT] Iceberg MOR for streaming parquet #2975

Merged
merged 7 commits into from
Oct 14, 2024
Merged

Conversation

colin-ho
Copy link
Contributor

@colin-ho colin-ho commented Sep 30, 2024

This PR implements iceberg MOR for streaming parquet reads.

Notes:

  • Since reading iceberg delete files is a bulk operation, the native executor will use the read_parquet_bulk function to read them. However the native executor is already running in a Tokio runtime, so it needs to run read_parquet_bulk in the IO pool. This PR refactors the read_parquet_bulk method such that it can be ran in this manner.

@github-actions github-actions bot added the enhancement New feature or request label Sep 30, 2024
Copy link

codspeed-hq bot commented Sep 30, 2024

CodSpeed Performance Report

Merging #2975 will not alter performance

Comparing colin/swordfish-iceberg-mor (6208564) with main (c694c9e)

Summary

✅ 17 untouched benchmarks

Copy link

codecov bot commented Sep 30, 2024

Codecov Report

Attention: Patch coverage is 69.54315% with 60 lines in your changes missing coverage. Please review.

Project coverage is 78.46%. Comparing base (c694c9e) to head (6208564).
Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
src/daft-local-execution/src/sources/scan_task.rs 44.30% 44 Missing ⚠️
src/daft-parquet/src/stream_reader.rs 36.36% 14 Missing ⚠️
src/daft-parquet/src/read.rs 97.75% 2 Missing ⚠️
Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main    #2975      +/-   ##
==========================================
- Coverage   78.49%   78.46%   -0.04%     
==========================================
  Files         610      610              
  Lines       71865    71992     +127     
==========================================
+ Hits        56413    56485      +72     
- Misses      15452    15507      +55     
Files with missing lines Coverage Δ
src/daft-local-execution/src/lib.rs 90.47% <100.00%> (+0.73%) ⬆️
src/daft-parquet/src/file.rs 74.01% <100.00%> (+0.10%) ⬆️
src/daft-parquet/src/read.rs 75.34% <97.75%> (+0.68%) ⬆️
src/daft-parquet/src/stream_reader.rs 88.27% <36.36%> (-2.35%) ⬇️
src/daft-local-execution/src/sources/scan_task.rs 71.94% <44.30%> (-10.01%) ⬇️

... and 1 file with indirect coverage changes

@@ -363,7 +363,6 @@ async fn read_parquet_single(
async fn stream_parquet_single(
uri: String,
columns: Option<&[&str]>,
start_offset: Option<usize>,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

start_offset is never used for swordfish, which does not rely on unloaded micropartitions.

@colin-ho colin-ho marked this pull request as ready for review October 8, 2024 23:37
Copy link
Member

@kevinzwang kevinzwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great! Clean refactor. I do have a few questions though:

  • It looks like this implementation reads each delete file once per relevant scan task. This means if a file was split into multiple scan tasks, or if a delete file has info about multiple files, it would be read more than once. If doing so would degrade the actual streaming parquet read, don't worry about it, but is it possible to cut down the redundant reads?
  • Are there any tests we can re-enable now that this is implemented?

Comment on lines 197 to 206
for i in 0..table.len() {
let file = file_paths.get(i);
let pos = positions.get(i);
if let Some(file) = file
&& let Some(pos) = pos
&& file == url
{
delete_rows.push(pos);
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Small nit: you can convert file_paths and positions into iterators and zip together, might be cleaner than this for loop since you don't use i for anything other than indexing.

@colin-ho
Copy link
Contributor Author

Looks great! Clean refactor. I do have a few questions though:

  • It looks like this implementation reads each delete file once per relevant scan task. This means if a file was split into multiple scan tasks, or if a delete file has info about multiple files, it would be read more than once. If doing so would degrade the actual streaming parquet read, don't worry about it, but is it possible to cut down the redundant reads?
  • Are there any tests we can re-enable now that this is implemented?

Swordfish doesn't do any splitting or merging of scan tasks, see: https://github.com/Eventual-Inc/Daft/blob/main/src/daft-physical-plan/src/translate.rs#L8-L30. Reason being that swordfish is parallelized based on morsels instead of partitions, and so there's no need to balance out load across the scan tasks.

I think it's possible to read all the delete files across scan tasks prior to executing the scan task, will give it a try.

Regarding tests, yes, the whole iceberg integration test suite is enabled now. https://github.com/Eventual-Inc/Daft/pull/2975/files#diff-ee49282f461b4c8ad179f79dd5bcdf93124561074c64a771366caf93e99b9320R471 I just add the native executor flag to the github workflow

@colin-ho
Copy link
Contributor Author

Looks great! Clean refactor. I do have a few questions though:

  • It looks like this implementation reads each delete file once per relevant scan task. This means if a file was split into multiple scan tasks, or if a delete file has info about multiple files, it would be read more than once. If doing so would degrade the actual streaming parquet read, don't worry about it, but is it possible to cut down the redundant reads?
  • Are there any tests we can re-enable now that this is implemented?

Swordfish doesn't do any splitting or merging of scan tasks, see: https://github.com/Eventual-Inc/Daft/blob/main/src/daft-physical-plan/src/translate.rs#L8-L30. Reason being that swordfish is parallelized based on morsels instead of partitions, and so there's no need to balance out load across the scan tasks.

I think it's possible to read all the delete files across scan tasks prior to executing the scan task, will give it a try.

Regarding tests, yes, the whole iceberg integration test suite is enabled now. https://github.com/Eventual-Inc/Daft/pull/2975/files#diff-ee49282f461b4c8ad179f79dd5bcdf93124561074c64a771366caf93e99b9320R471 I just add the native executor flag to the github workflow

Ok, managed to do the delete file reading prior to scan task execution!

@colin-ho colin-ho requested a review from kevinzwang October 12, 2024 01:32
Comment on lines +17 to +20
pub(crate) type TaskSet<T> = tokio::task::JoinSet<T>;
pub(crate) fn create_task_set<T>() -> TaskSet<T> {
tokio::task::JoinSet::new()
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why wrap tokio::task::JoinSet here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanna keep the operators agnostic from the implementation details, to reduce complexity.

.flatten()
.cloned()
})
.collect::<Vec<_>>();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there may be delete files that are shared between scan tasks. Perhaps you should collect into a HashSet instead

@colin-ho colin-ho merged commit fe10f8c into main Oct 14, 2024
41 of 42 checks passed
@colin-ho colin-ho deleted the colin/swordfish-iceberg-mor branch October 14, 2024 19:51
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants