From fe10f8c8e13f07c3a82beff9c61d1c8d0eaa994e Mon Sep 17 00:00:00 2001 From: Colin Ho Date: Mon, 14 Oct 2024 12:51:12 -0700 Subject: [PATCH] [FEAT] Iceberg MOR for streaming parquet (#2975) This PR implements iceberg MOR for streaming parquet reads. Notes: - Since reading iceberg delete files is a bulk operation, the native executor will use the `read_parquet_bulk` function to read them. However the native executor is already running in a Tokio runtime, so it needs to run `read_parquet_bulk` in the IO pool. This PR refactors the `read_parquet_bulk` method such that it can be ran in this manner. --------- Co-authored-by: Colin Ho Co-authored-by: Colin Ho --- .github/workflows/python-package.yml | 5 + src/daft-local-execution/src/lib.rs | 9 +- .../src/sources/scan_task.rs | 127 +++++++++++--- src/daft-parquet/src/file.rs | 3 + src/daft-parquet/src/read.rs | 159 +++++++++++------- src/daft-parquet/src/stream_reader.rs | 34 +++- 6 files changed, 243 insertions(+), 94 deletions(-) diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index 620b2037ec..3affeecc4c 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -468,6 +468,10 @@ jobs: matrix: python-version: ['3.8'] # can't use 3.7 due to requiring anon mode for adlfs daft-runner: [py, ray] + enable-native-executor: [0, 1] + exclude: + - daft-runner: ray + enable-native-executor: 1 steps: - uses: actions/checkout@v4 with: @@ -513,6 +517,7 @@ jobs: pytest tests/integration/iceberg -m 'integration' --durations=50 env: DAFT_RUNNER: ${{ matrix.daft-runner }} + DAFT_ENABLE_NATIVE_EXECUTOR: ${{ matrix.enable-native-executor }} - name: Send Slack notification on failure uses: slackapi/slack-github-action@v1.26.0 if: ${{ failure() && (github.ref == 'refs/heads/main') }} diff --git a/src/daft-local-execution/src/lib.rs b/src/daft-local-execution/src/lib.rs index b7809b4126..c316434a63 100644 --- a/src/daft-local-execution/src/lib.rs +++ b/src/daft-local-execution/src/lib.rs @@ -14,8 +14,13 @@ lazy_static! { pub static ref NUM_CPUS: usize = std::thread::available_parallelism().unwrap().get(); } +pub(crate) type TaskSet = tokio::task::JoinSet; +pub(crate) fn create_task_set() -> TaskSet { + tokio::task::JoinSet::new() +} + pub struct ExecutionRuntimeHandle { - worker_set: tokio::task::JoinSet>, + worker_set: TaskSet>, default_morsel_size: usize, } @@ -23,7 +28,7 @@ impl ExecutionRuntimeHandle { #[must_use] pub fn new(default_morsel_size: usize) -> Self { Self { - worker_set: tokio::task::JoinSet::new(), + worker_set: create_task_set(), default_morsel_size, } } diff --git a/src/daft-local-execution/src/sources/scan_task.rs b/src/daft-local-execution/src/sources/scan_task.rs index bf77e6dc8d..d1dae8bd39 100644 --- a/src/daft-local-execution/src/sources/scan_task.rs +++ b/src/daft-local-execution/src/sources/scan_task.rs @@ -1,12 +1,16 @@ -use std::sync::Arc; +use std::{ + collections::{HashMap, HashSet}, + sync::Arc, +}; use common_error::DaftResult; use common_file_formats::{FileFormatConfig, ParquetSourceConfig}; +use daft_core::prelude::{AsArrow, Int64Array, Utf8Array}; use daft_csv::{CsvConvertOptions, CsvParseOptions, CsvReadOptions}; use daft_io::IOStatsRef; use daft_json::{JsonConvertOptions, JsonParseOptions, JsonReadOptions}; use daft_micropartition::MicroPartition; -use daft_parquet::read::ParquetSchemaInferenceOptions; +use daft_parquet::read::{read_parquet_bulk_async, ParquetSchemaInferenceOptions}; use daft_scan::{storage_config::StorageConfig, ChunkSpec, ScanTask}; use futures::{Stream, StreamExt}; use snafu::ResultExt; @@ -16,7 +20,7 @@ use tracing::instrument; use crate::{ channel::{create_channel, Sender}, sources::source::{Source, SourceStream}, - ExecutionRuntimeHandle, + ExecutionRuntimeHandle, JoinSnafu, TaskSet, NUM_CPUS, }; pub struct ScanTaskSource { @@ -38,9 +42,11 @@ impl ScanTaskSource { sender: Sender>, maintain_order: bool, io_stats: IOStatsRef, + delete_map: Option>>>, ) -> DaftResult<()> { let schema = scan_task.materialized_schema(); - let mut stream = stream_scan_task(scan_task, Some(io_stats), maintain_order).await?; + let mut stream = + stream_scan_task(scan_task, Some(io_stats), delete_map, maintain_order).await?; let mut has_data = false; while let Some(partition) = stream.next().await { let _ = sender.send(partition?).await; @@ -77,17 +83,28 @@ impl Source for ScanTaskSource { vec![receiver], ) }; - for (scan_task, sender) in self.scan_tasks.iter().zip(senders) { - runtime_handle.spawn( - Self::process_scan_task_stream( - scan_task.clone(), - sender, - maintain_order, - io_stats.clone(), - ), - self.name(), - ); - } + let scan_tasks = self.scan_tasks.clone(); + runtime_handle.spawn( + async move { + let mut task_set = TaskSet::new(); + let delete_map = get_delete_map(&scan_tasks).await?.map(Arc::new); + for (scan_task, sender) in scan_tasks.into_iter().zip(senders) { + task_set.spawn(Self::process_scan_task_stream( + scan_task, + sender, + maintain_order, + io_stats.clone(), + delete_map.clone(), + )); + } + while let Some(result) = task_set.join_next().await { + result.context(JoinSnafu)??; + } + Ok(()) + }, + self.name(), + ); + let stream = futures::stream::iter(receivers.into_iter().map(ReceiverStream::new)); Ok(Box::pin(stream.flatten())) } @@ -97,9 +114,80 @@ impl Source for ScanTaskSource { } } +// Read all iceberg delete files and return a map of file paths to delete positions +async fn get_delete_map( + scan_tasks: &[Arc], +) -> DaftResult>>> { + let delete_files = scan_tasks + .iter() + .flat_map(|st| { + st.sources + .iter() + .filter_map(|source| source.get_iceberg_delete_files()) + .flatten() + .cloned() + }) + .collect::>(); + if delete_files.is_empty() { + return Ok(None); + } + + let (runtime, io_client) = scan_tasks + .first() + .unwrap() // Safe to unwrap because we checked that the list is not empty + .storage_config + .get_io_client_and_runtime()?; + let scan_tasks = scan_tasks.to_vec(); + runtime.block_on_io_pool(async move { + let mut delete_map = scan_tasks + .iter() + .flat_map(|st| st.sources.iter().map(|s| s.get_path().to_string())) + .map(|path| (path, vec![])) + .collect::>(); + let columns_to_read = Some(vec!["file_path".to_string(), "pos".to_string()]); + let result = read_parquet_bulk_async( + delete_files.into_iter().collect(), + columns_to_read, + None, + None, + None, + None, + io_client, + None, + *NUM_CPUS, + ParquetSchemaInferenceOptions::new(None), + None, + None, + None, + None, + ) + .await?; + + for table_result in result { + let table = table_result?; + // values in the file_path column are guaranteed by the iceberg spec to match the full URI of the corresponding data file + // https://iceberg.apache.org/spec/#position-delete-files + let file_paths = table.get_column("file_path")?.downcast::()?; + let positions = table.get_column("pos")?.downcast::()?; + + for (file, pos) in file_paths + .as_arrow() + .values_iter() + .zip(positions.as_arrow().values_iter()) + { + if delete_map.contains_key(file) { + delete_map.get_mut(file).unwrap().push(*pos); + } + } + } + Ok(Some(delete_map)) + })? +} + async fn stream_scan_task( scan_task: Arc, io_stats: Option, + delete_map: Option>>>, maintain_order: bool, ) -> DaftResult>> + Send> { let pushdown_columns = scan_task.pushdowns.columns.as_ref().map(|v| { @@ -159,12 +247,7 @@ async fn stream_scan_task( let inference_options = ParquetSchemaInferenceOptions::new(Some(*coerce_int96_timestamp_unit)); - if source.get_iceberg_delete_files().is_some() { - return Err(common_error::DaftError::TypeError( - "Streaming reads not supported for Iceberg delete files".to_string(), - )); - } - + let delete_rows = delete_map.as_ref().and_then(|m| m.get(url).cloned()); let row_groups = if let Some(ChunkSpec::Parquet(row_groups)) = source.get_chunk_spec() { Some(row_groups.clone()) } else { @@ -177,7 +260,6 @@ async fn stream_scan_task( daft_parquet::read::stream_parquet( url, file_column_names.as_deref(), - None, scan_task.pushdowns.limit, row_groups, scan_task.pushdowns.filters.clone(), @@ -187,6 +269,7 @@ async fn stream_scan_task( field_id_mapping.clone(), metadata, maintain_order, + delete_rows, ) .await? } diff --git a/src/daft-parquet/src/file.rs b/src/daft-parquet/src/file.rs index 8c582dd24e..84dfa5a83e 100644 --- a/src/daft-parquet/src/file.rs +++ b/src/daft-parquet/src/file.rs @@ -398,6 +398,7 @@ impl ParquetFileReader { predicate: Option, original_columns: Option>, original_num_rows: Option, + delete_rows: Option>, ) -> DaftResult>> { let daft_schema = Arc::new(daft_core::prelude::Schema::try_from( self.arrow_schema.as_ref(), @@ -426,6 +427,7 @@ impl ParquetFileReader { let ranges = ranges.clone(); let predicate = predicate.clone(); let original_columns = original_columns.clone(); + let delete_rows = delete_rows.clone(); let row_range = *row_range; tokio::task::spawn(async move { @@ -515,6 +517,7 @@ impl ParquetFileReader { predicate, original_columns, original_num_rows, + delete_rows, ); if table_iter.is_none() { let table = diff --git a/src/daft-parquet/src/read.rs b/src/daft-parquet/src/read.rs index 647ad5f7bd..5897f0f2be 100644 --- a/src/daft-parquet/src/read.rs +++ b/src/daft-parquet/src/read.rs @@ -364,7 +364,6 @@ async fn read_parquet_single( async fn stream_parquet_single( uri: String, columns: Option<&[&str]>, - start_offset: Option, num_rows: Option, row_groups: Option>, predicate: Option, @@ -373,6 +372,7 @@ async fn stream_parquet_single( schema_infer_options: ParquetSchemaInferenceOptions, field_id_mapping: Option>>, metadata: Option>, + delete_rows: Option>, maintain_order: bool, ) -> DaftResult> + Send> { let field_id_mapping_provided = field_id_mapping.is_some(); @@ -394,6 +394,12 @@ async fn stream_parquet_single( } } + // Increase the number of rows_to_read to account for deleted rows + // in order to have the correct number of rows in the end + if let Some(delete_rows) = &delete_rows { + num_rows_to_read = limit_with_delete_rows(delete_rows, None, num_rows_to_read); + } + let (source_type, fixed_uri) = parse_url(uri.as_str())?; let (metadata, table_stream) = if matches!(source_type, SourceType::File) { @@ -401,9 +407,9 @@ async fn stream_parquet_single( fixed_uri.as_ref(), columns_to_return, columns_to_read, - start_offset, num_rows_to_return, num_rows_to_read, + delete_rows, row_groups.clone(), predicate.clone(), schema_infer_options, @@ -427,10 +433,10 @@ async fn stream_parquet_single( builder }; - if row_groups.is_some() && (num_rows_to_read.is_some() || start_offset.is_some()) { - return Err(common_error::DaftError::ValueError("Both `row_groups` and `num_rows` or `start_offset` is set at the same time. We only support setting one set or the other.".to_string())); + if row_groups.is_some() && num_rows_to_read.is_some() { + return Err(common_error::DaftError::ValueError("Both `row_groups` and `num_rows` is set at the same time. We only support setting one set or the other.".to_string())); } - let builder = builder.limit(start_offset, num_rows_to_read)?; + let builder = builder.limit(None, num_rows_to_read)?; let metadata = builder.metadata().clone(); let builder = if let Some(ref row_groups) = row_groups { @@ -456,6 +462,7 @@ async fn stream_parquet_single( predicate.clone(), columns_to_return, num_rows_to_return, + delete_rows, ) .await?, )) @@ -765,72 +772,95 @@ pub fn read_parquet_bulk>( ))); } } - let tables = runtime_handle - .block_on_current_thread(async move { - let task_stream = futures::stream::iter(uris.iter().enumerate().map(|(i, uri)| { - let uri = (*uri).to_string(); - let owned_columns = columns.clone(); - let owned_row_group = row_groups.as_ref().and_then(|rgs| rgs[i].clone()); - let owned_predicate = predicate.clone(); - let metadata = metadata.as_ref().map(|mds| mds[i].clone()); - let io_client = io_client.clone(); - let io_stats = io_stats.clone(); - let schema_infer_options = *schema_infer_options; - let owned_field_id_mapping = field_id_mapping.clone(); - let delete_rows = delete_map.as_ref().and_then(|m| m.get(&uri).cloned()); - tokio::task::spawn(async move { - read_parquet_single( - &uri, - owned_columns, - start_offset, - num_rows, - owned_row_group, - owned_predicate, - io_client, - io_stats, - schema_infer_options, - owned_field_id_mapping, - metadata, - delete_rows, - chunk_size, - ) - .await - }) - })); - let mut remaining_rows = num_rows.map(|x| x as i64); - task_stream - // Limit the number of file reads we have in flight at any given time. - .buffered(num_parallel_tasks) - // Terminate the stream if we have already reached the row limit. With the upstream buffering, we will still read up to - // num_parallel_tasks redundant files. - .try_take_while(|result| { - match (result, remaining_rows) { - // Limit has been met, early-terminate. - (_, Some(rows_left)) if rows_left <= 0 => futures::future::ready(Ok(false)), - // Limit has not yet been met, update remaining limit slack and continue. - (Ok(table), Some(rows_left)) => { - remaining_rows = Some(rows_left - table.len() as i64); - futures::future::ready(Ok(true)) - } - // (1) No limit, never early-terminate. - // (2) Encountered error, propagate error to try_collect to allow it to short-circuit. - (_, None) | (Err(_), _) => futures::future::ready(Ok(true)), - } - }) - .try_collect::>() - .await + let tables = runtime_handle.block_on_current_thread(read_parquet_bulk_async( + uris.iter().map(|s| (*s).to_string()).collect(), + columns, + start_offset, + num_rows, + row_groups, + predicate, + io_client, + io_stats, + num_parallel_tasks, + *schema_infer_options, + field_id_mapping, + metadata, + delete_map, + chunk_size, + ))?; + tables.into_iter().collect::>>() +} + +#[allow(clippy::too_many_arguments)] +pub async fn read_parquet_bulk_async( + uris: Vec, + columns: Option>, + start_offset: Option, + num_rows: Option, + row_groups: Option>>>, + predicate: Option, + io_client: Arc, + io_stats: Option, + num_parallel_tasks: usize, + schema_infer_options: ParquetSchemaInferenceOptions, + field_id_mapping: Option>>, + metadata: Option>>, + delete_map: Option>>, + chunk_size: Option, +) -> DaftResult>> { + let task_stream = futures::stream::iter(uris.into_iter().enumerate().map(|(i, uri)| { + let owned_columns = columns.clone(); + let owned_row_group = row_groups.as_ref().and_then(|rgs| rgs[i].clone()); + let owned_predicate = predicate.clone(); + let metadata = metadata.as_ref().map(|mds| mds[i].clone()); + + let io_client = io_client.clone(); + let io_stats = io_stats.clone(); + let owned_field_id_mapping = field_id_mapping.clone(); + let delete_rows = delete_map.as_ref().and_then(|m| m.get(&uri).cloned()); + + tokio::task::spawn(async move { + read_parquet_single( + &uri, + owned_columns, + start_offset, + num_rows, + owned_row_group, + owned_predicate, + io_client, + io_stats, + schema_infer_options, + owned_field_id_mapping, + metadata, + delete_rows, + chunk_size, + ) + .await + }) + })); + + let mut remaining_rows = num_rows.map(|x| x as i64); + let tables = task_stream + .buffered(num_parallel_tasks) + .try_take_while(|result| match (result, remaining_rows) { + (_, Some(rows_left)) if rows_left <= 0 => futures::future::ready(Ok(false)), + (Ok(table), Some(rows_left)) => { + remaining_rows = Some(rows_left - table.len() as i64); + futures::future::ready(Ok(true)) + } + (_, None) | (Err(_), _) => futures::future::ready(Ok(true)), }) + .try_collect::>() + .await .context(JoinSnafu { path: "UNKNOWN" })?; - - tables.into_iter().collect::>>() + Ok(tables) } #[allow(clippy::too_many_arguments)] pub async fn stream_parquet( uri: &str, columns: Option<&[&str]>, - start_offset: Option, num_rows: Option, row_groups: Option>, predicate: Option, @@ -840,11 +870,11 @@ pub async fn stream_parquet( field_id_mapping: Option>>, metadata: Option>, maintain_order: bool, + delete_rows: Option>, ) -> DaftResult>> { let stream = stream_parquet_single( uri.to_string(), columns, - start_offset, num_rows, row_groups, predicate, @@ -853,6 +883,7 @@ pub async fn stream_parquet( *schema_infer_options, field_id_mapping, metadata, + delete_rows, maintain_order, ) .await?; @@ -1131,13 +1162,13 @@ mod tests { None, None, None, - None, io_client, None, &Default::default(), None, None, false, + None, ) .await? .collect::>() diff --git a/src/daft-parquet/src/stream_reader.rs b/src/daft-parquet/src/stream_reader.rs index 58a3662cf0..4755386fe9 100644 --- a/src/daft-parquet/src/stream_reader.rs +++ b/src/daft-parquet/src/stream_reader.rs @@ -5,7 +5,7 @@ use std::{ sync::Arc, }; -use arrow2::io::parquet::read; +use arrow2::{bitmap::Bitmap, io::parquet::read}; use common_error::DaftResult; use daft_core::{prelude::*, utils::arrow::cast_array_for_daft_if_needed}; use daft_dsl::ExprRef; @@ -48,6 +48,7 @@ fn prune_fields_from_schema( } } +#[allow(clippy::too_many_arguments)] pub fn arrow_column_iters_to_table_iter( arr_iters: ArrowChunkIters, row_range_start: usize, @@ -56,6 +57,7 @@ pub fn arrow_column_iters_to_table_iter( predicate: Option, original_columns: Option>, original_num_rows: Option, + delete_rows: Option>, ) -> Option>> { if arr_iters.is_empty() { return None; @@ -75,6 +77,7 @@ pub fn arrow_column_iters_to_table_iter( } let par_lock_step_iter = ParallelLockStepIter { iters: arr_iters }; + let mut curr_delete_row_idx = 0; // Keep track of the current index in the row group so we can throw away arrays that are not needed // and slice arrays that are partially needed. let mut index_so_far = 0; @@ -109,7 +112,6 @@ pub fn arrow_column_iters_to_table_iter( if all_series.iter().any(|s| s.len() != len) { return Err(super::Error::ParquetColumnsDontHaveEqualRows { path: uri.clone() }.into()); } - index_so_far += len; let mut table = Table::new_with_size( Schema::new(all_series.iter().map(|s| s.field().clone()).collect())?, @@ -117,6 +119,27 @@ pub fn arrow_column_iters_to_table_iter( len, ) .with_context(|_| super::UnableToCreateTableFromChunkSnafu { path: uri.clone() })?; + + // Apply delete rows if needed + if let Some(delete_rows) = &delete_rows + && !delete_rows.is_empty() + { + let mut selection_mask = Bitmap::new_trued(table.len()).make_mut(); + while curr_delete_row_idx < delete_rows.len() + && delete_rows[curr_delete_row_idx] < index_so_far as i64 + len as i64 + { + let table_row = delete_rows[curr_delete_row_idx] as usize - index_so_far; + unsafe { + selection_mask.set_unchecked(table_row, false); + } + curr_delete_row_idx += 1; + } + let selection_mask: BooleanArray = + ("selection_mask", Bitmap::from(selection_mask)).into(); + table = table.mask_filter(&selection_mask.into_series())?; + } + index_so_far += len; + // Apply pushdowns if needed if let Some(predicate) = &predicate { table = table.filter(&[predicate.clone()])?; @@ -191,7 +214,6 @@ impl Drop for CountingReader { pub fn local_parquet_read_into_column_iters( uri: &str, columns: Option<&[String]>, - start_offset: Option, num_rows: Option, row_groups: Option<&[i64]>, predicate: Option, @@ -255,7 +277,7 @@ pub fn local_parquet_read_into_column_iters( let row_ranges = build_row_ranges( num_rows, - start_offset.unwrap_or(0), + 0, row_groups, predicate, &daft_schema, @@ -499,9 +521,9 @@ pub fn local_parquet_stream( uri: &str, original_columns: Option>, columns: Option>, - start_offset: Option, original_num_rows: Option, num_rows: Option, + delete_rows: Option>, row_groups: Option>, predicate: Option, schema_infer_options: ParquetSchemaInferenceOptions, @@ -516,7 +538,6 @@ pub fn local_parquet_stream( let (metadata, schema_ref, row_ranges, column_iters) = local_parquet_read_into_column_iters( uri, columns.as_deref(), - start_offset, num_rows, row_groups.as_deref(), predicate.clone(), @@ -557,6 +578,7 @@ pub fn local_parquet_stream( predicate.clone(), original_columns.clone(), original_num_rows, + delete_rows.clone(), ); // Even if there are no columns to read, we still need to create a empty table with the correct number of rows // This is because the columns may be present in other files. See https://github.com/Eventual-Inc/Daft/pull/2514