-
Notifications
You must be signed in to change notification settings - Fork 175
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FEAT] Iceberg MOR for streaming parquet #2975
Merged
Merged
Changes from 6 commits
Commits
Show all changes
7 commits
Select commit
Hold shift + click to select a range
45b0383
working iceberg mor
a5ddcd6
clean up
f1d2b15
Merge branch main into colin/swordfish-iceberg-mor
33d6d31
process delete files at the start
252a330
Merge branch main into colin/swordfish-iceberg-mor
2f3cdb4
comment
6208564
collect into hashset
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -468,6 +468,10 @@ jobs: | |
matrix: | ||
python-version: ['3.8'] # can't use 3.7 due to requiring anon mode for adlfs | ||
daft-runner: [py, ray] | ||
enable-native-executor: [0, 1] | ||
exclude: | ||
- daft-runner: ray | ||
enable-native-executor: 1 | ||
steps: | ||
- uses: actions/checkout@v4 | ||
with: | ||
|
@@ -513,6 +517,7 @@ jobs: | |
pytest tests/integration/iceberg -m 'integration' --durations=50 | ||
env: | ||
DAFT_RUNNER: ${{ matrix.daft-runner }} | ||
DAFT_ENABLE_NATIVE_EXECUTOR: ${{ matrix.enable-native-executor }} | ||
- name: Send Slack notification on failure | ||
uses: slackapi/[email protected] | ||
if: ${{ failure() && (github.ref == 'refs/heads/main') }} | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,12 +1,13 @@ | ||
use std::sync::Arc; | ||
use std::{collections::HashMap, sync::Arc}; | ||
|
||
use common_error::DaftResult; | ||
use common_file_formats::{FileFormatConfig, ParquetSourceConfig}; | ||
use daft_core::prelude::{AsArrow, Int64Array, Utf8Array}; | ||
use daft_csv::{CsvConvertOptions, CsvParseOptions, CsvReadOptions}; | ||
use daft_io::IOStatsRef; | ||
use daft_json::{JsonConvertOptions, JsonParseOptions, JsonReadOptions}; | ||
use daft_micropartition::MicroPartition; | ||
use daft_parquet::read::ParquetSchemaInferenceOptions; | ||
use daft_parquet::read::{read_parquet_bulk_async, ParquetSchemaInferenceOptions}; | ||
use daft_scan::{storage_config::StorageConfig, ChunkSpec, ScanTask}; | ||
use futures::{Stream, StreamExt}; | ||
use snafu::ResultExt; | ||
|
@@ -16,7 +17,7 @@ | |
use crate::{ | ||
channel::{create_channel, Sender}, | ||
sources::source::{Source, SourceStream}, | ||
ExecutionRuntimeHandle, | ||
ExecutionRuntimeHandle, JoinSnafu, TaskSet, NUM_CPUS, | ||
}; | ||
|
||
pub struct ScanTaskSource { | ||
|
@@ -38,9 +39,11 @@ | |
sender: Sender<Arc<MicroPartition>>, | ||
maintain_order: bool, | ||
io_stats: IOStatsRef, | ||
delete_map: Option<Arc<HashMap<String, Vec<i64>>>>, | ||
) -> DaftResult<()> { | ||
let schema = scan_task.materialized_schema(); | ||
let mut stream = stream_scan_task(scan_task, Some(io_stats), maintain_order).await?; | ||
let mut stream = | ||
stream_scan_task(scan_task, Some(io_stats), delete_map, maintain_order).await?; | ||
let mut has_data = false; | ||
while let Some(partition) = stream.next().await { | ||
let _ = sender.send(partition?).await; | ||
|
@@ -77,17 +80,28 @@ | |
vec![receiver], | ||
) | ||
}; | ||
for (scan_task, sender) in self.scan_tasks.iter().zip(senders) { | ||
runtime_handle.spawn( | ||
Self::process_scan_task_stream( | ||
scan_task.clone(), | ||
sender, | ||
maintain_order, | ||
io_stats.clone(), | ||
), | ||
self.name(), | ||
); | ||
} | ||
let scan_tasks = self.scan_tasks.clone(); | ||
runtime_handle.spawn( | ||
async move { | ||
let mut task_set = TaskSet::new(); | ||
let delete_map = get_delete_map(&scan_tasks).await?.map(Arc::new); | ||
for (scan_task, sender) in scan_tasks.into_iter().zip(senders) { | ||
task_set.spawn(Self::process_scan_task_stream( | ||
scan_task, | ||
sender, | ||
maintain_order, | ||
io_stats.clone(), | ||
delete_map.clone(), | ||
)); | ||
} | ||
while let Some(result) = task_set.join_next().await { | ||
result.context(JoinSnafu)??; | ||
} | ||
Ok(()) | ||
}, | ||
self.name(), | ||
); | ||
|
||
let stream = futures::stream::iter(receivers.into_iter().map(ReceiverStream::new)); | ||
Ok(Box::pin(stream.flatten())) | ||
} | ||
|
@@ -97,9 +111,80 @@ | |
} | ||
} | ||
|
||
// Read all iceberg delete files and return a map of file paths to delete positions | ||
async fn get_delete_map( | ||
scan_tasks: &[Arc<ScanTask>], | ||
) -> DaftResult<Option<HashMap<String, Vec<i64>>>> { | ||
let delete_files = scan_tasks | ||
.iter() | ||
.flat_map(|st| { | ||
st.sources | ||
.iter() | ||
.filter_map(|source| source.get_iceberg_delete_files()) | ||
.flatten() | ||
.cloned() | ||
}) | ||
.collect::<Vec<_>>(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think there may be delete files that are shared between scan tasks. Perhaps you should collect into a HashSet instead |
||
if delete_files.is_empty() { | ||
return Ok(None); | ||
} | ||
|
||
let (runtime, io_client) = scan_tasks | ||
.first() | ||
.unwrap() // Safe to unwrap because we checked that the list is not empty | ||
.storage_config | ||
.get_io_client_and_runtime()?; | ||
let scan_tasks = scan_tasks.to_vec(); | ||
runtime.block_on_io_pool(async move { | ||
let mut delete_map = scan_tasks | ||
.iter() | ||
.flat_map(|st| st.sources.iter().map(|s| s.get_path().to_string())) | ||
.map(|path| (path, vec![])) | ||
.collect::<std::collections::HashMap<_, _>>(); | ||
let columns_to_read = Some(vec!["file_path".to_string(), "pos".to_string()]); | ||
let result = read_parquet_bulk_async( | ||
delete_files, | ||
columns_to_read, | ||
None, | ||
None, | ||
None, | ||
None, | ||
io_client, | ||
None, | ||
*NUM_CPUS, | ||
ParquetSchemaInferenceOptions::new(None), | ||
None, | ||
None, | ||
None, | ||
None, | ||
) | ||
.await?; | ||
|
||
for table_result in result { | ||
let table = table_result?; | ||
// values in the file_path column are guaranteed by the iceberg spec to match the full URI of the corresponding data file | ||
// https://iceberg.apache.org/spec/#position-delete-files | ||
let file_paths = table.get_column("file_path")?.downcast::<Utf8Array>()?; | ||
let positions = table.get_column("pos")?.downcast::<Int64Array>()?; | ||
|
||
for (file, pos) in file_paths | ||
.as_arrow() | ||
.values_iter() | ||
.zip(positions.as_arrow().values_iter()) | ||
{ | ||
if delete_map.contains_key(file) { | ||
delete_map.get_mut(file).unwrap().push(*pos); | ||
} | ||
} | ||
} | ||
Ok(Some(delete_map)) | ||
})? | ||
} | ||
|
||
async fn stream_scan_task( | ||
scan_task: Arc<ScanTask>, | ||
io_stats: Option<IOStatsRef>, | ||
delete_map: Option<Arc<HashMap<String, Vec<i64>>>>, | ||
maintain_order: bool, | ||
) -> DaftResult<impl Stream<Item = DaftResult<Arc<MicroPartition>>> + Send> { | ||
let pushdown_columns = scan_task.pushdowns.columns.as_ref().map(|v| { | ||
|
@@ -159,12 +244,7 @@ | |
let inference_options = | ||
ParquetSchemaInferenceOptions::new(Some(*coerce_int96_timestamp_unit)); | ||
|
||
if source.get_iceberg_delete_files().is_some() { | ||
return Err(common_error::DaftError::TypeError( | ||
"Streaming reads not supported for Iceberg delete files".to_string(), | ||
)); | ||
} | ||
|
||
let delete_rows = delete_map.as_ref().and_then(|m| m.get(url).cloned()); | ||
let row_groups = if let Some(ChunkSpec::Parquet(row_groups)) = source.get_chunk_spec() { | ||
Some(row_groups.clone()) | ||
} else { | ||
|
@@ -177,7 +257,6 @@ | |
daft_parquet::read::stream_parquet( | ||
url, | ||
file_column_names.as_deref(), | ||
None, | ||
scan_task.pushdowns.limit, | ||
row_groups, | ||
scan_task.pushdowns.filters.clone(), | ||
|
@@ -187,6 +266,7 @@ | |
field_id_mapping.clone(), | ||
metadata, | ||
maintain_order, | ||
delete_rows, | ||
) | ||
.await? | ||
} | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why wrap
tokio::task::JoinSet
here?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wanna keep the operators agnostic from the implementation details, to reduce complexity.