diff --git a/daft/daft.pyi b/daft/daft.pyi index ef1188c490..2ce85f1d63 100644 --- a/daft/daft.pyi +++ b/daft/daft.pyi @@ -718,6 +718,7 @@ class ScanTask: storage_config: StorageConfig, num_rows: int | None, size_bytes: int | None, + iceberg_delete_files: list[str] | None, pushdowns: Pushdowns | None, partition_values: PyTable | None, stats: PyTable | None, diff --git a/daft/delta_lake/delta_lake_scan.py b/daft/delta_lake/delta_lake_scan.py index 27cfef2562..0a52b52d30 100644 --- a/daft/delta_lake/delta_lake_scan.py +++ b/daft/delta_lake/delta_lake_scan.py @@ -187,6 +187,7 @@ def to_scan_tasks(self, pushdowns: Pushdowns) -> Iterator[ScanTask]: num_rows=record_count, storage_config=self._storage_config, size_bytes=size_bytes, + iceberg_delete_files=None, pushdowns=pushdowns, partition_values=partition_values, stats=stats._table if stats is not None else None, diff --git a/daft/hudi/hudi_scan.py b/daft/hudi/hudi_scan.py index 8c1c98f298..02821df1ae 100644 --- a/daft/hudi/hudi_scan.py +++ b/daft/hudi/hudi_scan.py @@ -118,6 +118,7 @@ def to_scan_tasks(self, pushdowns: Pushdowns) -> Iterator[ScanTask]: storage_config=self._storage_config, num_rows=record_count, size_bytes=size_bytes, + iceberg_delete_files=None, pushdowns=pushdowns, partition_values=partition_values, stats=stats, diff --git a/daft/iceberg/iceberg_scan.py b/daft/iceberg/iceberg_scan.py index 73adb840c3..58241ca217 100644 --- a/daft/iceberg/iceberg_scan.py +++ b/daft/iceberg/iceberg_scan.py @@ -167,8 +167,7 @@ def to_scan_tasks(self, pushdowns: Pushdowns) -> Iterator[ScanTask]: # TODO: Support ORC and AVRO when we can read it raise NotImplementedError(f"{file_format} for iceberg not implemented!") - if len(task.delete_files) > 0: - raise NotImplementedError("Iceberg Merge-on-Read currently not supported, please make an issue!") + iceberg_delete_files = [f.file_path for f in task.delete_files] # TODO: Thread in Statistics to each ScanTask: P2 pspec = self._iceberg_record_to_partition_spec(self._table.specs()[file.spec_id], file.partition) @@ -179,6 +178,7 @@ def to_scan_tasks(self, pushdowns: Pushdowns) -> Iterator[ScanTask]: num_rows=record_count, storage_config=self._storage_config, size_bytes=file.file_size_in_bytes, + iceberg_delete_files=iceberg_delete_files, pushdowns=pushdowns, partition_values=pspec._table if pspec is not None else None, stats=None, diff --git a/src/daft-micropartition/src/micropartition.rs b/src/daft-micropartition/src/micropartition.rs index 8d6061162a..3c84acac4e 100644 --- a/src/daft-micropartition/src/micropartition.rs +++ b/src/daft-micropartition/src/micropartition.rs @@ -1,11 +1,11 @@ -use std::collections::{BTreeMap, HashSet}; +use std::collections::{BTreeMap, HashMap, HashSet}; use std::fmt::Display; use std::sync::Arc; use std::{ops::Deref, sync::Mutex}; use arrow2::io::parquet::read::schema::infer_schema_with_options; use common_error::DaftResult; -use daft_core::datatypes::Field; +use daft_core::datatypes::{Field, Int64Array, Utf8Array}; use daft_core::schema::{Schema, SchemaRef}; use daft_core::DataType; @@ -17,7 +17,7 @@ use daft_parquet::read::{ }; use daft_scan::file_format::{CsvSourceConfig, FileFormatConfig, ParquetSourceConfig}; use daft_scan::storage_config::{NativeStorageConfig, StorageConfig}; -use daft_scan::{ChunkSpec, DataFileSource, Pushdowns, ScanTask}; +use daft_scan::{ChunkSpec, DataSource, Pushdowns, ScanTask}; use daft_table::Table; use parquet2::metadata::FileMetaData; @@ -134,8 +134,34 @@ fn materialize_scan_task( }) => { let inference_options = ParquetSchemaInferenceOptions::new(Some(*coerce_int96_timestamp_unit)); + + // TODO: This is a hardcoded magic value but should be configurable + let num_parallel_tasks = 8; + let urls = urls.collect::>(); + // Create vec of all unique delete files in the scan task + let iceberg_delete_files = scan_task + .sources + .iter() + .flat_map(|s| s.get_iceberg_delete_files()) + .flatten() + .map(String::as_str) + .collect::>() + .into_iter() + .collect::>(); + + let delete_map = _read_delete_files( + iceberg_delete_files.as_slice(), + urls.as_slice(), + io_client.clone(), + io_stats.clone(), + num_parallel_tasks, + runtime_handle.clone(), + &inference_options, + ) + .context(DaftCoreComputeSnafu)?; + let row_groups = parquet_sources_to_row_groups(scan_task.sources.as_slice()); let metadatas = scan_task .sources @@ -151,11 +177,12 @@ fn materialize_scan_task( scan_task.pushdowns.filters.clone(), io_client.clone(), io_stats, - 8, + num_parallel_tasks, runtime_handle, &inference_options, field_id_mapping.clone(), metadatas, + Some(delete_map), ) .context(DaftCoreComputeSnafu)? } @@ -366,7 +393,7 @@ fn materialize_scan_task( let table_iterators = scan_task.sources.iter().map(|source| { // Call Python function to create an Iterator (Grabs the GIL and then releases it) match source { - DataFileSource::PythonFactoryFunction { + DataSource::PythonFactoryFunction { module, func_name, func_args, @@ -593,11 +620,19 @@ impl MicroPartition { let row_groups = parquet_sources_to_row_groups(scan_task.sources.as_slice()); + let mut iceberg_delete_files: HashSet<&str> = HashSet::new(); + for source in scan_task.sources.iter() { + if let Some(delete_files) = source.get_iceberg_delete_files() { + iceberg_delete_files.extend(delete_files.iter().map(String::as_str)); + } + } + read_parquet_into_micropartition( uris.as_slice(), columns.as_deref(), None, scan_task.pushdowns.limit, + Some(iceberg_delete_files.into_iter().collect()), row_groups, scan_task.pushdowns.filters.clone(), scan_task.partition_spec(), @@ -777,7 +812,7 @@ fn prune_fields_from_schema( } } -fn parquet_sources_to_row_groups(sources: &[DataFileSource]) -> Option>>> { +fn parquet_sources_to_row_groups(sources: &[DataSource]) -> Option>>> { let row_groups = sources .iter() .map(|s| { @@ -918,6 +953,59 @@ fn _get_file_column_names<'a>( } } +fn _read_delete_files( + delete_files: &[&str], + uris: &[&str], + io_client: Arc, + io_stats: Option, + num_parallel_tasks: usize, + runtime_handle: Arc, + schema_infer_options: &ParquetSchemaInferenceOptions, +) -> DaftResult>> { + let columns: Option<&[&str]> = Some(&["file_path", "pos"]); + + let tables = read_parquet_bulk( + delete_files, + columns, + None, + None, + None, + None, + io_client, + io_stats, + num_parallel_tasks, + runtime_handle, + schema_infer_options, + None, + None, + None, + )?; + + let mut delete_map: HashMap> = + uris.iter().map(|uri| (uri.to_string(), vec![])).collect(); + + for table in tables.iter() { + // values in the file_path column are guaranteed by the iceberg spec to match the full URI of the corresponding data file + // https://iceberg.apache.org/spec/#position-delete-files + let file_paths = table.get_column("file_path")?.downcast::()?; + let positions = table.get_column("pos")?.downcast::()?; + + for i in 0..table.len() { + let file = file_paths.get(i); + let pos = positions.get(i); + + if let Some(file) = file + && let Some(pos) = pos + && delete_map.contains_key(file) + { + delete_map.get_mut(file).unwrap().push(pos); + } + } + } + + Ok(delete_map) +} + #[allow(clippy::too_many_arguments)] fn _read_parquet_into_loaded_micropartition( io_client: Arc, @@ -926,6 +1014,7 @@ fn _read_parquet_into_loaded_micropartition( columns: Option<&[&str]>, start_offset: Option, num_rows: Option, + iceberg_delete_files: Option>, row_groups: Option>>>, predicate: Option, partition_spec: Option<&PartitionSpec>, @@ -935,6 +1024,20 @@ fn _read_parquet_into_loaded_micropartition( catalog_provided_schema: Option, field_id_mapping: Option>>, ) -> DaftResult { + let delete_map = iceberg_delete_files + .map(|files| { + _read_delete_files( + files.as_slice(), + uris, + io_client.clone(), + io_stats.clone(), + num_parallel_tasks, + runtime_handle.clone(), + schema_infer_options, + ) + }) + .transpose()?; + let file_column_names = _get_file_column_names(columns, partition_spec); let all_tables = read_parquet_bulk( uris, @@ -950,6 +1053,7 @@ fn _read_parquet_into_loaded_micropartition( schema_infer_options, field_id_mapping, None, + delete_map, )?; // Prefer using the `catalog_provided_schema` but fall back onto inferred schema from Parquet files @@ -987,6 +1091,7 @@ pub(crate) fn read_parquet_into_micropartition( columns: Option<&[&str]>, start_offset: Option, num_rows: Option, + iceberg_delete_files: Option>, row_groups: Option>>>, predicate: Option, partition_spec: Option<&PartitionSpec>, @@ -1011,9 +1116,13 @@ pub(crate) fn read_parquet_into_micropartition( let runtime_handle = daft_io::get_runtime(multithreaded_io)?; let io_client = daft_io::get_io_client(multithreaded_io, io_config.clone())?; - // If we have a predicate then we no longer have an accurate accounting of required metadata + // If we have a predicate or deletion files then we no longer have an accurate accounting of required metadata // on the MicroPartition (e.g. its length). Hence we need to perform an eager read. - if predicate.is_some() { + if iceberg_delete_files + .as_ref() + .map_or(false, |files| !files.is_empty()) + || predicate.is_some() + { return _read_parquet_into_loaded_micropartition( io_client, runtime_handle, @@ -1021,6 +1130,7 @@ pub(crate) fn read_parquet_into_micropartition( columns, start_offset, num_rows, + iceberg_delete_files, row_groups, predicate, partition_spec, @@ -1142,10 +1252,11 @@ pub(crate) fn read_parquet_into_micropartition( .clone() .unwrap_or_else(|| std::iter::repeat(None).take(uris.len()).collect()), ) - .map(|((url, metadata), rgs)| DataFileSource::AnonymousDataFile { + .map(|((url, metadata), rgs)| DataSource::File { path: url, chunk_spec: rgs.map(ChunkSpec::Parquet), size_bytes: Some(size_bytes), + iceberg_delete_files: None, metadata: None, partition_spec: partition_spec.cloned(), statistics: None, @@ -1194,6 +1305,7 @@ pub(crate) fn read_parquet_into_micropartition( columns, start_offset, num_rows, + iceberg_delete_files, row_groups, predicate, partition_spec, diff --git a/src/daft-micropartition/src/python.rs b/src/daft-micropartition/src/python.rs index 95519991fd..77d7e7ed7a 100644 --- a/src/daft-micropartition/src/python.rs +++ b/src/daft-micropartition/src/python.rs @@ -593,6 +593,7 @@ impl PyMicroPartition { columns.as_deref(), start_offset, num_rows, + None, row_groups.map(|rg| vec![Some(rg)]), predicate.map(|e| e.expr), None, @@ -637,6 +638,7 @@ impl PyMicroPartition { columns.as_deref(), start_offset, num_rows, + None, row_groups, predicate.map(|e| e.expr), None, diff --git a/src/daft-parquet/src/lib.rs b/src/daft-parquet/src/lib.rs index 5675ab368d..cb3949c1c7 100644 --- a/src/daft-parquet/src/lib.rs +++ b/src/daft-parquet/src/lib.rs @@ -121,14 +121,14 @@ pub enum Error { }, #[snafu(display( - "Parquet file: {} metadata listed {} rows but only read: {} ", + "Parquet file: {} expected {} rows but only read: {} ", path, - metadata_num_rows, + expected_rows, read_rows ))] ParquetNumRowMismatch { path: String, - metadata_num_rows: usize, + expected_rows: usize, read_rows: usize, }, @@ -150,6 +150,18 @@ pub enum Error { read_columns: usize, }, + #[snafu(display( + "Parquet file: {} attempted to delete row at position {} but only read {} rows", + path, + row, + read_rows + ))] + ParquetDeleteRowOutOfIndex { + path: String, + row: usize, + read_rows: usize, + }, + #[snafu(display( "Parquet file: {} unable to convert row group metadata to stats\nDetails:\n{source}", path, diff --git a/src/daft-parquet/src/python.rs b/src/daft-parquet/src/python.rs index 1b26401b8a..10855129dc 100644 --- a/src/daft-parquet/src/python.rs +++ b/src/daft-parquet/src/python.rs @@ -171,6 +171,7 @@ pub mod pylib { &schema_infer_options, None, None, + None, )? .into_iter() .map(|v| v.into()) diff --git a/src/daft-parquet/src/read.rs b/src/daft-parquet/src/read.rs index 2fa6922618..a095971924 100644 --- a/src/daft-parquet/src/read.rs +++ b/src/daft-parquet/src/read.rs @@ -1,10 +1,14 @@ -use std::{collections::BTreeMap, sync::Arc, time::Duration}; +use std::{ + collections::{BTreeMap, HashMap}, + sync::Arc, + time::Duration, +}; -use arrow2::io::parquet::read::schema::infer_schema_with_options; +use arrow2::{bitmap::Bitmap, io::parquet::read::schema::infer_schema_with_options}; use common_error::DaftResult; use daft_core::{ - datatypes::{Field, Int32Array, TimeUnit, UInt64Array, Utf8Array}, + datatypes::{BooleanArray, Field, Int32Array, TimeUnit, UInt64Array, Utf8Array}, schema::Schema, DataType, IntoSeries, Series, }; @@ -57,6 +61,45 @@ impl From } } +/// Returns the new number of rows to read after taking into account rows that need to be deleted after reading +fn limit_with_delete_rows( + delete_rows: &[i64], + start_offset: Option, + num_rows_to_read: Option, +) -> Option { + if let Some(mut n) = num_rows_to_read { + let mut delete_rows_sorted = if let Some(start_offset) = start_offset { + delete_rows + .iter() + .filter_map(|r| { + let shifted_row = *r - start_offset as i64; + if shifted_row >= 0 { + Some(shifted_row as usize) + } else { + None + } + }) + .collect::>() + } else { + delete_rows.iter().map(|r| *r as usize).collect::>() + }; + delete_rows_sorted.sort(); + delete_rows_sorted.dedup(); + + for r in delete_rows_sorted { + if r < n { + n += 1; + } else { + break; + } + } + + Some(n) + } else { + num_rows_to_read + } +} + #[allow(clippy::too_many_arguments)] async fn read_parquet_single( uri: &str, @@ -70,18 +113,18 @@ async fn read_parquet_single( schema_infer_options: ParquetSchemaInferenceOptions, field_id_mapping: Option>>, metadata: Option>, + delete_rows: Option>, ) -> DaftResult { let field_id_mapping_provided = field_id_mapping.is_some(); - let original_columns = columns; - let original_num_rows = num_rows; - let mut num_rows = num_rows; - let mut columns = columns.map(|s| s.iter().map(|s| s.to_string()).collect_vec()); - let requested_columns = columns.as_ref().map(|v| v.len()); + let columns_to_return = columns; + let num_rows_to_return = num_rows; + let mut num_rows_to_read = num_rows; + let mut columns_to_read = columns.map(|s| s.iter().map(|s| s.to_string()).collect_vec()); + let requested_columns = columns_to_read.as_ref().map(|v| v.len()); if let Some(ref pred) = predicate { - if num_rows.is_some() { - num_rows = None; - } - if let Some(req_columns) = columns.as_mut() { + num_rows_to_read = None; + + if let Some(req_columns) = columns_to_read.as_mut() { let needed_columns = get_required_columns(pred); for c in needed_columns { if !req_columns.contains(&c) { @@ -90,14 +133,21 @@ async fn read_parquet_single( } } } + + // Increase the number of rows_to_read to account for deleted rows + // in order to have the correct number of rows in the end + if let Some(delete_rows) = &delete_rows { + num_rows_to_read = limit_with_delete_rows(delete_rows, start_offset, num_rows_to_read); + } + let (source_type, fixed_uri) = parse_url(uri)?; let (metadata, mut table) = if matches!(source_type, SourceType::File) { crate::stream_reader::local_parquet_read_async( fixed_uri.as_ref(), - columns, + columns_to_read, start_offset, - num_rows, + num_rows_to_read, row_groups.clone(), predicate.clone(), schema_infer_options, @@ -114,16 +164,16 @@ async fn read_parquet_single( .await?; let builder = builder.set_infer_schema_options(schema_infer_options); - let builder = if let Some(columns) = columns.as_ref() { + let builder = if let Some(columns) = columns_to_read.as_ref() { builder.prune_columns(columns.as_slice())? } else { builder }; - if row_groups.is_some() && (num_rows.is_some() || start_offset.is_some()) { + if row_groups.is_some() && (num_rows_to_read.is_some() || start_offset.is_some()) { return Err(common_error::DaftError::ValueError("Both `row_groups` and `num_rows` or `start_offset` is set at the same time. We only support setting one set or the other.".to_string())); } - let builder = builder.limit(start_offset, num_rows)?; + let builder = builder.limit(start_offset, num_rows_to_read)?; let metadata = builder.metadata().clone(); let builder = if let Some(ref row_groups) = row_groups { @@ -155,47 +205,92 @@ async fn read_parquet_single( let metadata_num_rows = metadata.num_rows; let metadata_num_columns = metadata.schema().fields().len(); + let num_deleted_rows = if let Some(delete_rows) = delete_rows + && !delete_rows.is_empty() + { + assert!( + row_groups.is_none(), + "Row group splitting is not supported with Iceberg deletion files." + ); + + let mut selection_mask = Bitmap::new_trued(table.len()).make_mut(); + + let start_offset = start_offset.unwrap_or(0); + + for row in delete_rows.into_iter().map(|r| r as usize) { + if row >= start_offset && num_rows_to_read.map_or(true, |n| row < start_offset + n) { + let table_row = row - start_offset; + + if table_row < table.len() { + unsafe { + selection_mask.set_unchecked(table_row, false); + } + } else { + return Err(super::Error::ParquetDeleteRowOutOfIndex { + path: uri.into(), + row: table_row, + read_rows: table.len(), + } + .into()); + } + } + } + + let num_deleted_rows = selection_mask.unset_bits(); + + let selection_mask: BooleanArray = ("selection_mask", Bitmap::from(selection_mask)).into(); + + table = table.mask_filter(&selection_mask.into_series())?; + + num_deleted_rows + } else { + 0 + }; + if let Some(predicate) = predicate { + // If a predicate exists, we need to apply it before a limit and also keep all of the columns that it needs until it is applied // TODO ideally pipeline this with IO and before concatenating, rather than after table = table.filter(&[predicate])?; - if let Some(oc) = original_columns { + if let Some(oc) = columns_to_return { table = table.get_columns(oc)?; } - if let Some(nr) = original_num_rows { + if let Some(nr) = num_rows_to_return { table = table.head(nr)?; } } else if let Some(row_groups) = row_groups { - let expected_rows: usize = row_groups + let expected_rows = row_groups .iter() .map(|i| rows_per_row_groups.get(*i as usize).unwrap()) - .sum(); + .sum::() + - num_deleted_rows; if expected_rows != table.len() { return Err(super::Error::ParquetNumRowMismatch { path: uri.into(), - metadata_num_rows: expected_rows, + expected_rows, read_rows: table.len(), } .into()); } } else { - match (start_offset, num_rows) { - (None, None) if metadata_num_rows != table.len() => { + let expected_rows = metadata_num_rows - num_deleted_rows; + match (start_offset, num_rows_to_read) { + (None, None) if expected_rows != table.len() => { Err(super::Error::ParquetNumRowMismatch { path: uri.into(), - metadata_num_rows, + expected_rows, read_rows: table.len(), }) } - (Some(s), None) if metadata_num_rows.saturating_sub(s) != table.len() => { + (Some(s), None) if expected_rows.saturating_sub(s) != table.len() => { Err(super::Error::ParquetNumRowMismatch { path: uri.into(), - metadata_num_rows: metadata_num_rows.saturating_sub(s), + expected_rows: expected_rows.saturating_sub(s), read_rows: table.len(), }) } (_, Some(n)) if n < table.len() => Err(super::Error::ParquetNumRowMismatch { path: uri.into(), - metadata_num_rows: n.min(metadata_num_rows), + expected_rows: n.min(expected_rows), read_rows: table.len(), }), _ => Ok(()), @@ -321,7 +416,7 @@ async fn read_parquet_single_into_arrow( if expected_rows != table_len { return Err(super::Error::ParquetNumRowMismatch { path: uri.into(), - metadata_num_rows: expected_rows, + expected_rows, read_rows: table_len, } .into()); @@ -331,20 +426,20 @@ async fn read_parquet_single_into_arrow( (None, None) if metadata_num_rows != table_len => { Err(super::Error::ParquetNumRowMismatch { path: uri.into(), - metadata_num_rows, + expected_rows: metadata_num_rows, read_rows: table_len, }) } (Some(s), None) if metadata_num_rows.saturating_sub(s) != table_len => { Err(super::Error::ParquetNumRowMismatch { path: uri.into(), - metadata_num_rows: metadata_num_rows.saturating_sub(s), + expected_rows: metadata_num_rows.saturating_sub(s), read_rows: table_len, }) } (_, Some(n)) if n < table_len => Err(super::Error::ParquetNumRowMismatch { path: uri.into(), - metadata_num_rows: n.min(metadata_num_rows), + expected_rows: n.min(metadata_num_rows), read_rows: table_len, }), _ => Ok(()), @@ -400,6 +495,7 @@ pub fn read_parquet( schema_infer_options, None, metadata, + None, ) .await }) @@ -463,6 +559,7 @@ pub fn read_parquet_bulk( schema_infer_options: &ParquetSchemaInferenceOptions, field_id_mapping: Option>>, metadata: Option>>, + delete_map: Option>>, ) -> DaftResult> { let _rt_guard = runtime_handle.enter(); let owned_columns = columns.map(|s| s.iter().map(|v| String::from(*v)).collect::>()); @@ -488,6 +585,7 @@ pub fn read_parquet_bulk( let io_stats = io_stats.clone(); let schema_infer_options = *schema_infer_options; let owned_field_id_mapping = field_id_mapping.clone(); + let delete_rows = delete_map.as_ref().and_then(|m| m.get(&uri).cloned()); tokio::task::spawn(async move { let columns = owned_columns .as_ref() @@ -506,6 +604,7 @@ pub fn read_parquet_bulk( schema_infer_options, owned_field_id_mapping, metadata, + delete_rows, ) .await?, )) diff --git a/src/daft-scan/src/anonymous.rs b/src/daft-scan/src/anonymous.rs index 799ecff8d9..1b9ece871b 100644 --- a/src/daft-scan/src/anonymous.rs +++ b/src/daft-scan/src/anonymous.rs @@ -6,7 +6,7 @@ use daft_core::schema::SchemaRef; use crate::{ file_format::{FileFormatConfig, ParquetSourceConfig}, storage_config::StorageConfig, - ChunkSpec, DataFileSource, PartitionField, Pushdowns, ScanOperator, ScanTask, ScanTaskRef, + ChunkSpec, DataSource, PartitionField, Pushdowns, ScanOperator, ScanTask, ScanTaskRef, }; #[derive(Debug)] pub struct AnonymousScanOperator { @@ -86,10 +86,11 @@ impl ScanOperator for AnonymousScanOperator { move |(f, rg)| { let chunk_spec = rg.map(ChunkSpec::Parquet); Ok(ScanTask::new( - vec![DataFileSource::AnonymousDataFile { + vec![DataSource::File { path: f.to_string(), chunk_spec, size_bytes: None, + iceberg_delete_files: None, metadata: None, partition_spec: None, statistics: None, diff --git a/src/daft-scan/src/glob.rs b/src/daft-scan/src/glob.rs index c38659f62d..a4c05dfb9b 100644 --- a/src/daft-scan/src/glob.rs +++ b/src/daft-scan/src/glob.rs @@ -11,7 +11,7 @@ use snafu::Snafu; use crate::{ file_format::{CsvSourceConfig, FileFormatConfig, ParquetSourceConfig}, storage_config::StorageConfig, - ChunkSpec, DataFileSource, PartitionField, Pushdowns, ScanOperator, ScanTask, ScanTaskRef, + ChunkSpec, DataSource, PartitionField, Pushdowns, ScanOperator, ScanTask, ScanTaskRef, }; #[derive(Debug)] pub struct GlobScanOperator { @@ -355,10 +355,11 @@ impl ScanOperator for GlobScanOperator { .flatten(); let chunk_spec = row_group.map(ChunkSpec::Parquet); Ok(ScanTask::new( - vec![DataFileSource::AnonymousDataFile { + vec![DataSource::File { path: path.to_string(), chunk_spec, size_bytes, + iceberg_delete_files: None, metadata: None, partition_spec: None, statistics: None, diff --git a/src/daft-scan/src/lib.rs b/src/daft-scan/src/lib.rs index 8c4d0d0c3c..52ed4de322 100644 --- a/src/daft-scan/src/lib.rs +++ b/src/daft-scan/src/lib.rs @@ -117,30 +117,21 @@ impl ChunkSpec { } #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] -pub enum DataFileSource { - AnonymousDataFile { +pub enum DataSource { + File { path: String, chunk_spec: Option, size_bytes: Option, + iceberg_delete_files: Option>, metadata: Option, partition_spec: Option, statistics: Option, parquet_metadata: Option>, }, - CatalogDataFile { + Database { path: String, - chunk_spec: Option, - size_bytes: Option, - metadata: TableMetadata, - partition_spec: PartitionSpec, - statistics: Option, - }, - DatabaseDataSource { - path: String, - chunk_spec: Option, size_bytes: Option, metadata: Option, - partition_spec: Option, statistics: Option, }, #[cfg(feature = "python")] @@ -155,12 +146,10 @@ pub enum DataFileSource { }, } -impl DataFileSource { +impl DataSource { pub fn get_path(&self) -> &str { match self { - Self::AnonymousDataFile { path, .. } - | Self::CatalogDataFile { path, .. } - | Self::DatabaseDataSource { path, .. } => path, + Self::File { path, .. } | Self::Database { path, .. } => path, #[cfg(feature = "python")] Self::PythonFactoryFunction { module, .. } => module, } @@ -168,7 +157,7 @@ impl DataFileSource { pub fn get_parquet_metadata(&self) -> Option<&Arc> { match self { - Self::AnonymousDataFile { + Self::File { parquet_metadata, .. } => parquet_metadata.as_ref(), _ => None, @@ -177,9 +166,8 @@ impl DataFileSource { pub fn get_chunk_spec(&self) -> Option<&ChunkSpec> { match self { - Self::AnonymousDataFile { chunk_spec, .. } - | Self::CatalogDataFile { chunk_spec, .. } - | Self::DatabaseDataSource { chunk_spec, .. } => chunk_spec.as_ref(), + Self::File { chunk_spec, .. } => chunk_spec.as_ref(), + Self::Database { .. } => None, #[cfg(feature = "python")] Self::PythonFactoryFunction { .. } => None, } @@ -187,9 +175,7 @@ impl DataFileSource { pub fn get_size_bytes(&self) -> Option { match self { - Self::AnonymousDataFile { size_bytes, .. } - | Self::CatalogDataFile { size_bytes, .. } - | Self::DatabaseDataSource { size_bytes, .. } => *size_bytes, + Self::File { size_bytes, .. } | Self::Database { size_bytes, .. } => *size_bytes, #[cfg(feature = "python")] Self::PythonFactoryFunction { size_bytes, .. } => *size_bytes, } @@ -197,9 +183,7 @@ impl DataFileSource { pub fn get_metadata(&self) -> Option<&TableMetadata> { match self { - Self::AnonymousDataFile { metadata, .. } - | Self::DatabaseDataSource { metadata, .. } => metadata.as_ref(), - Self::CatalogDataFile { metadata, .. } => Some(metadata), + Self::File { metadata, .. } | Self::Database { metadata, .. } => metadata.as_ref(), #[cfg(feature = "python")] Self::PythonFactoryFunction { metadata, .. } => metadata.as_ref(), } @@ -207,9 +191,9 @@ impl DataFileSource { pub fn get_statistics(&self) -> Option<&TableStatistics> { match self { - Self::AnonymousDataFile { statistics, .. } - | Self::CatalogDataFile { statistics, .. } - | Self::DatabaseDataSource { statistics, .. } => statistics.as_ref(), + Self::File { statistics, .. } | Self::Database { statistics, .. } => { + statistics.as_ref() + } #[cfg(feature = "python")] Self::PythonFactoryFunction { statistics, .. } => statistics.as_ref(), } @@ -217,33 +201,35 @@ impl DataFileSource { pub fn get_partition_spec(&self) -> Option<&PartitionSpec> { match self { - Self::AnonymousDataFile { partition_spec, .. } - | Self::DatabaseDataSource { partition_spec, .. } => partition_spec.as_ref(), - Self::CatalogDataFile { partition_spec, .. } => Some(partition_spec), + Self::File { partition_spec, .. } => partition_spec.as_ref(), + Self::Database { .. } => None, #[cfg(feature = "python")] Self::PythonFactoryFunction { partition_spec, .. } => partition_spec.as_ref(), } } + pub fn get_iceberg_delete_files(&self) -> Option<&Vec> { + match self { + Self::File { + iceberg_delete_files, + .. + } => iceberg_delete_files.as_ref(), + _ => None, + } + } + pub fn multiline_display(&self) -> Vec { let mut res = vec![]; match self { - Self::AnonymousDataFile { + Self::File { path, chunk_spec, size_bytes, + iceberg_delete_files, metadata, partition_spec, statistics, parquet_metadata: _, - } - | Self::DatabaseDataSource { - path, - chunk_spec, - size_bytes, - metadata, - partition_spec, - statistics, } => { res.push(format!("Path = {}", path)); if let Some(chunk_spec) = chunk_spec { @@ -255,6 +241,9 @@ impl DataFileSource { if let Some(size_bytes) = size_bytes { res.push(format!("Size bytes = {}", size_bytes)); } + if let Some(iceberg_delete_files) = iceberg_delete_files { + res.push(format!("Iceberg delete files = {:?}", iceberg_delete_files)); + } if let Some(metadata) = metadata { res.push(format!( "Metadata = {}", @@ -271,32 +260,22 @@ impl DataFileSource { res.push(format!("Statistics = {}", statistics)); } } - Self::CatalogDataFile { + Self::Database { path, - chunk_spec, size_bytes, metadata, - partition_spec, statistics, } => { res.push(format!("Path = {}", path)); - if let Some(chunk_spec) = chunk_spec { - res.push(format!( - "Chunk spec = {{ {} }}", - chunk_spec.multiline_display().join(", ") - )); - } if let Some(size_bytes) = size_bytes { res.push(format!("Size bytes = {}", size_bytes)); } - res.push(format!( - "Metadata = {}", - metadata.multiline_display().join(", ") - )); - res.push(format!( - "Partition spec = {}", - partition_spec.multiline_display().join(", ") - )); + if let Some(metadata) = metadata { + res.push(format!( + "Metadata = {}", + metadata.multiline_display().join(", ") + )); + } if let Some(statistics) = statistics { res.push(format!("Statistics = {}", statistics)); } @@ -338,11 +317,11 @@ impl DataFileSource { #[derive(Debug, PartialEq, Serialize, Deserialize)] pub struct ScanTask { - pub sources: Vec, + pub sources: Vec, - /// Schema to use when reading the DataFileSources. + /// Schema to use when reading the DataSources. /// - /// Schema to use when reading the DataFileSources. This should always be passed in by the + /// Schema to use when reading the DataSources. This should always be passed in by the /// ScanOperator implementation and should not have had any "pruning" applied. /// /// Note that this is different than the schema of the data after pushdowns have been applied, @@ -360,7 +339,7 @@ pub type ScanTaskRef = Arc; impl ScanTask { pub fn new( - sources: Vec, + sources: Vec, file_format_config: Arc, schema: SchemaRef, storage_config: Arc, diff --git a/src/daft-scan/src/python.rs b/src/daft-scan/src/python.rs index daa4afc9f3..9cb834078c 100644 --- a/src/daft-scan/src/python.rs +++ b/src/daft-scan/src/python.rs @@ -66,7 +66,7 @@ pub mod pylib { use crate::anonymous::AnonymousScanOperator; use crate::file_format::FileFormatConfig; use crate::storage_config::PythonStorageConfig; - use crate::DataFileSource; + use crate::DataSource; use crate::PartitionField; use crate::Pushdowns; use crate::ScanOperator; @@ -316,6 +316,7 @@ pub mod pylib { storage_config: PyStorageConfig, num_rows: Option, size_bytes: Option, + iceberg_delete_files: Option>, pushdowns: Option, partition_values: Option, stats: Option, @@ -348,26 +349,17 @@ pub mod pylib { .map(|s| TableStatistics::from_stats_table(&s.table)) .transpose()?; - let table_metadata = num_rows.map(|n| TableMetadata { length: n as usize }); - - let data_source = match table_metadata { - None => DataFileSource::AnonymousDataFile { - path: file, - chunk_spec: None, - size_bytes, - metadata: None, - partition_spec: Some(pspec), - statistics, - parquet_metadata: None, - }, - Some(tm) => DataFileSource::CatalogDataFile { - path: file, - chunk_spec: None, - size_bytes, - metadata: tm, - partition_spec: pspec, - statistics, - }, + let metadata = num_rows.map(|n| TableMetadata { length: n as usize }); + + let data_source = DataSource::File { + path: file, + chunk_spec: None, + size_bytes, + iceberg_delete_files, + metadata, + partition_spec: Some(pspec), + statistics, + parquet_metadata: None, }; let scan_task = ScanTask::new( @@ -395,12 +387,10 @@ pub mod pylib { let statistics = stats .map(|s| TableStatistics::from_stats_table(&s.table)) .transpose()?; - let data_source = DataFileSource::DatabaseDataSource { + let data_source = DataSource::Database { path: url, - chunk_spec: None, size_bytes, metadata: num_rows.map(|n| TableMetadata { length: n as usize }), - partition_spec: None, statistics, }; @@ -430,7 +420,7 @@ pub mod pylib { let statistics = stats .map(|s| TableStatistics::from_stats_table(&s.table)) .transpose()?; - let data_source = DataFileSource::PythonFactoryFunction { + let data_source = DataSource::PythonFactoryFunction { module, func_name, func_args: PythonTablesFactoryArgs::new( diff --git a/src/daft-scan/src/scan_task_iters.rs b/src/daft-scan/src/scan_task_iters.rs index 3d14ea38dd..a10c13a535 100644 --- a/src/daft-scan/src/scan_task_iters.rs +++ b/src/daft-scan/src/scan_task_iters.rs @@ -8,7 +8,7 @@ use daft_parquet::read::read_parquet_metadata; use crate::{ file_format::{FileFormatConfig, ParquetSourceConfig}, storage_config::StorageConfig, - ChunkSpec, DataFileSource, ScanTask, ScanTaskRef, + ChunkSpec, DataSource, ScanTask, ScanTaskRef, }; type BoxScanTaskIter<'a> = Box> + 'a>; @@ -147,6 +147,7 @@ pub fn split_by_row_groups( - use native storage config - have no specified chunk spec or number of rows - have size past split threshold + - no iceberg delete files */ if let ( FileFormatConfig::Parquet(ParquetSourceConfig { @@ -160,11 +161,14 @@ pub fn split_by_row_groups( t.file_format_config.as_ref(), t.storage_config.as_ref(), &t.sources[..], - t.sources.first().map(DataFileSource::get_chunk_spec), + t.sources.first().map(DataSource::get_chunk_spec), t.pushdowns.limit, ) && source .get_size_bytes() .map_or(true, |s| s > max_size_bytes as u64) + && source + .get_iceberg_delete_files() + .map_or(true, |f| f.is_empty()) { let (io_runtime, io_client) = t.storage_config.get_io_client_and_runtime()?; @@ -196,41 +200,24 @@ pub fn split_by_row_groups( if curr_size_bytes >= min_size_bytes || i == file.row_groups.len() - 1 { let mut new_source = source.clone(); - match &mut new_source { - DataFileSource::AnonymousDataFile { - chunk_spec, - size_bytes, - .. - } - | DataFileSource::CatalogDataFile { - chunk_spec, - size_bytes, - .. - } - | DataFileSource::DatabaseDataSource { - chunk_spec, - size_bytes, - .. - } => { - *chunk_spec = Some(ChunkSpec::Parquet(curr_row_groups)); - *size_bytes = Some(curr_size_bytes as u64); - } - #[cfg(feature = "python")] - DataFileSource::PythonFactoryFunction { .. } => unreachable!("DataFileSource::PythonFactoryFunction should never return Parquet formats"), - }; - match &mut new_source { - DataFileSource::AnonymousDataFile { - metadata: Some(metadata), - .. - } - | DataFileSource::CatalogDataFile { metadata, .. } - | DataFileSource::DatabaseDataSource { - metadata: Some(metadata), - .. - } => { - metadata.length = curr_num_rows; - } - _ => (), + if let DataSource::File { + chunk_spec, + size_bytes, + .. + } = &mut new_source + { + *chunk_spec = Some(ChunkSpec::Parquet(curr_row_groups)); + *size_bytes = Some(curr_size_bytes as u64); + } else { + unreachable!("Parquet file format should only be used with DataSource::File"); + } + + if let DataSource::File { + metadata: Some(metadata), + .. + } = &mut new_source + { + metadata.length = curr_num_rows; } // Reset accumulators diff --git a/tests/integration/iceberg/test_table_load.py b/tests/integration/iceberg/test_table_load.py index 1fdc67b792..2252f446d6 100644 --- a/tests/integration/iceberg/test_table_load.py +++ b/tests/integration/iceberg/test_table_load.py @@ -6,6 +6,7 @@ pyiceberg = pytest.importorskip("pyiceberg") +import pyarrow.compute as pc from pyiceberg.io.pyarrow import schema_to_pyarrow import daft @@ -34,8 +35,8 @@ def test_daft_iceberg_table_open(local_iceberg_tables): "test_partitioned_by_months", "test_partitioned_by_truncate", "test_partitioned_by_years", - # "test_positional_mor_deletes", # Need Merge on Read - # "test_positional_mor_double_deletes", # Need Merge on Read + "test_positional_mor_deletes", + "test_positional_mor_double_deletes", # "test_table_sanitized_character", # Bug in scan().to_arrow().to_arrow() "test_table_version", # we have bugs when loading no files "test_uuid_and_fixed_unpartitioned", @@ -163,3 +164,35 @@ def test_daft_iceberg_table_read_table_snapshot(local_iceberg_catalog): daft_pandas = daft.read_iceberg(tab, snapshot_id=snapshot.snapshot_id).to_pandas() iceberg_pandas = tab.scan(snapshot_id=snapshot.snapshot_id).to_pandas() assert_df_equals(daft_pandas, iceberg_pandas, sort_key=[]) + + +@pytest.mark.integration() +@pytest.mark.parametrize("table_name", ["test_positional_mor_deletes", "test_positional_mor_double_deletes"]) +def test_daft_iceberg_table_mor_limit_collect_correct(table_name, local_iceberg_catalog): + tab = local_iceberg_catalog.load_table(f"default.{table_name}") + df = daft.read_iceberg(tab) + df = df.limit(10) + df.collect() + daft_pandas = df.to_pandas() + + iceberg_arrow = tab.scan().to_arrow() + iceberg_arrow = iceberg_arrow.slice(length=10) + iceberg_pandas = iceberg_arrow.to_pandas() + + assert_df_equals(daft_pandas, iceberg_pandas, sort_key=[]) + + +@pytest.mark.integration() +@pytest.mark.parametrize("table_name", ["test_positional_mor_deletes", "test_positional_mor_double_deletes"]) +def test_daft_iceberg_table_mor_predicate_collect_correct(table_name, local_iceberg_catalog): + tab = local_iceberg_catalog.load_table(f"default.{table_name}") + df = daft.read_iceberg(tab) + df = df.where(df["number"] > 5) + df.collect() + daft_pandas = df.to_pandas() + + iceberg_arrow = tab.scan().to_arrow() + iceberg_arrow = iceberg_arrow.filter(pc.field("number") > 5) + iceberg_pandas = iceberg_arrow.to_pandas() + + assert_df_equals(daft_pandas, iceberg_pandas, sort_key=[])