diff --git a/crates/polars-arrow/src/io/ipc/read/file.rs b/crates/polars-arrow/src/io/ipc/read/file.rs index d06a59b3ff9d..775760adebec 100644 --- a/crates/polars-arrow/src/io/ipc/read/file.rs +++ b/crates/polars-arrow/src/io/ipc/read/file.rs @@ -41,15 +41,24 @@ pub struct FileMetadata { /// Read the row count by summing the length of the of the record batches pub fn get_row_count(reader: &mut R) -> PolarsResult { - let mut message_scratch: Vec = Default::default(); let (_, footer_len) = read_footer_len(reader)?; let footer = read_footer(reader, footer_len)?; let (_, blocks) = deserialize_footer_blocks(&footer)?; + get_row_count_from_blocks(reader, &blocks) +} + +/// Read the row count by summing the length of the of the record batches in blocks +pub fn get_row_count_from_blocks( + reader: &mut R, + blocks: &[arrow_format::ipc::Block], +) -> PolarsResult { + let mut message_scratch: Vec = Default::default(); + blocks - .into_iter() + .iter() .map(|block| { - let message = get_message_from_block(reader, &block, &mut message_scratch)?; + let message = get_message_from_block(reader, block, &mut message_scratch)?; let record_batch = get_record_batch(message)?; record_batch.length().map_err(|e| e.into()) }) diff --git a/crates/polars-arrow/src/io/ipc/read/mod.rs b/crates/polars-arrow/src/io/ipc/read/mod.rs index f4430db7dea2..006288a71859 100644 --- a/crates/polars-arrow/src/io/ipc/read/mod.rs +++ b/crates/polars-arrow/src/io/ipc/read/mod.rs @@ -22,8 +22,8 @@ pub(crate) use common::first_dict_field; pub use common::{prepare_projection, ProjectionInfo}; pub use error::OutOfSpecKind; pub use file::{ - deserialize_footer, get_row_count, read_batch, read_file_dictionaries, read_file_metadata, - FileMetadata, + deserialize_footer, get_row_count, get_row_count_from_blocks, read_batch, + read_file_dictionaries, read_file_metadata, FileMetadata, }; use polars_utils::aliases::PlHashMap; pub use reader::FileReader; diff --git a/crates/polars-mem-engine/src/executors/mod.rs b/crates/polars-mem-engine/src/executors/mod.rs index 7dc6ed65e545..c3a3fceb0b7a 100644 --- a/crates/polars-mem-engine/src/executors/mod.rs +++ b/crates/polars-mem-engine/src/executors/mod.rs @@ -7,8 +7,8 @@ mod group_by_dynamic; mod group_by_partitioned; pub(super) mod group_by_rolling; mod hconcat; -mod hive_scan; mod join; +mod multi_file_scan; mod projection; mod projection_simple; mod projection_utils; @@ -39,8 +39,8 @@ pub(super) use self::group_by_partitioned::*; #[cfg(feature = "dynamic_group_by")] pub(super) use self::group_by_rolling::GroupByRollingExec; pub(super) use self::hconcat::*; -pub(super) use self::hive_scan::*; pub(super) use self::join::*; +pub(super) use self::multi_file_scan::*; pub(super) use self::projection::*; pub(super) use self::projection_simple::*; pub(super) use self::scan::*; diff --git a/crates/polars-mem-engine/src/executors/hive_scan.rs b/crates/polars-mem-engine/src/executors/multi_file_scan.rs similarity index 69% rename from crates/polars-mem-engine/src/executors/hive_scan.rs rename to crates/polars-mem-engine/src/executors/multi_file_scan.rs index 538ab10bbb9e..679a822d59de 100644 --- a/crates/polars-mem-engine/src/executors/hive_scan.rs +++ b/crates/polars-mem-engine/src/executors/multi_file_scan.rs @@ -1,5 +1,4 @@ use std::borrow::Cow; -use std::cell::OnceCell; use hive::HivePartitions; use polars_core::config; @@ -13,55 +12,42 @@ use polars_io::RowIndex; use super::Executor; #[cfg(feature = "csv")] use crate::executors::CsvExec; +#[cfg(feature = "ipc")] +use crate::executors::IpcExec; +#[cfg(feature = "json")] +use crate::executors::JsonExec; #[cfg(feature = "parquet")] use crate::executors::ParquetExec; use crate::prelude::*; -pub trait IOFileMetadata: Send + Sync { - fn as_any(&self) -> &dyn std::any::Any; - fn num_rows(&self) -> PolarsResult; - fn schema(&self) -> PolarsResult; -} - -pub(super) struct BasicFileMetadata { - pub schema: Schema, - pub num_rows: IdxSize, -} - -impl IOFileMetadata for BasicFileMetadata { - fn as_any(&self) -> &dyn std::any::Any { - self - } - - fn num_rows(&self) -> PolarsResult { - Ok(self.num_rows) - } - - fn schema(&self) -> PolarsResult { - Ok(self.schema.clone()) - } -} - +/// An [`Executor`] that scans over some IO. pub trait ScanExec { + /// Read the source. fn read( &mut self, with_columns: Option>, slice: Option<(usize, usize)>, predicate: Option>, row_index: Option, - metadata: Option>, - schema: Schema, ) -> PolarsResult; - fn metadata(&mut self) -> PolarsResult>; + /// Get the full schema for the source behind this [`Executor`]. + /// + /// Note that this might be called several times so attempts should be made to cache the result. + fn schema(&mut self) -> PolarsResult<&SchemaRef>; + /// Get the number of rows for the source behind this [`Executor`]. + /// + /// Note that this might be called several times so attempts should be made to cache the result. + fn num_unfiltered_rows(&mut self) -> PolarsResult; } -fn source_to_scan_exec( +fn source_to_exec( source: ScanSourceRef, scan_type: &FileScan, file_info: &FileInfo, file_options: &FileScanOptions, - metadata: Option<&dyn IOFileMetadata>, + allow_missing_columns: bool, + file_index: usize, ) -> PolarsResult> { let source = match source { ScanSourceRef::Path(path) => ScanSources::Paths([path.to_path_buf()].into()), @@ -70,76 +56,100 @@ fn source_to_scan_exec( }, }; + let is_first_file = file_index == 0; + + let mut file_info = file_info.clone(); + + if allow_missing_columns && !is_first_file { + file_info.reader_schema.take(); + } + Ok(match scan_type { #[cfg(feature = "parquet")] FileScan::Parquet { options, cloud_options, - .. - } => Box::new(ParquetExec::new( - source, - file_info.clone(), - None, - None, - options.clone(), - cloud_options.clone(), - file_options.clone(), - metadata.map(|md| { - md.as_any() - .downcast_ref::>() - .unwrap() - .clone() - }), - )) as _, - #[cfg(feature = "csv")] - FileScan::Csv { options, .. } => Box::new(CsvExec { - sources: source, - file_info: file_info.clone(), - options: options.clone(), - file_options: file_options.clone(), - predicate: None, - }), - _ => todo!(), - }) -} + metadata, + } => { + let metadata = metadata.as_ref().take_if(|_| is_first_file); -pub struct Source { - scan_exec: Box, - metadata: OnceCell>, -} + let mut options = options.clone(); -impl Source { - fn new( - source: ScanSourceRef, - scan_type: &FileScan, - file_info: &FileInfo, - file_options: &FileScanOptions, - metadata: Option<&dyn IOFileMetadata>, - ) -> PolarsResult { - let scan_exec = source_to_scan_exec(source, scan_type, file_info, file_options, metadata)?; - Ok(Self { - scan_exec, - metadata: OnceCell::new(), - }) - } + if allow_missing_columns && !is_first_file { + options.schema.take(); + } - fn get_metadata(&mut self) -> PolarsResult<&dyn IOFileMetadata> { - match self.metadata.get() { - None => { - let metadata = self.scan_exec.metadata()?; - Ok(self.metadata.get_or_init(|| metadata).as_ref()) - }, - Some(metadata) => Ok(metadata.as_ref()), - } - } + Box::new(ParquetExec::new( + source, + file_info, + None, + None, + options, + cloud_options.clone(), + file_options.clone(), + metadata.cloned(), + )) + }, + #[cfg(feature = "csv")] + FileScan::Csv { options, .. } => { + let mut options = options.clone(); + let file_options = file_options.clone(); - fn num_unfiltered_rows(&mut self) -> PolarsResult { - self.get_metadata()?.num_rows() - } + if allow_missing_columns && !is_first_file { + options.schema.take(); + } - fn schema(&mut self) -> PolarsResult { - self.get_metadata()?.schema() - } + Box::new(CsvExec { + sources: source, + file_info, + options, + file_options, + predicate: None, + }) + }, + #[cfg(feature = "ipc")] + FileScan::Ipc { + options, + cloud_options, + metadata, + } => { + let metadata = metadata.as_ref().take_if(|_| is_first_file); + + let options = options.clone(); + let file_options = file_options.clone(); + let cloud_options = cloud_options.clone(); + + Box::new(IpcExec { + sources: source, + file_info, + options, + file_options, + predicate: None, + hive_parts: None, + cloud_options, + metadata: metadata.cloned(), + }) + }, + #[cfg(feature = "json")] + FileScan::NDJson { + options, + cloud_options, + .. + } => { + let options = options.clone(); + let file_options = file_options.clone(); + _ = cloud_options; // @TODO: Use these? + + Box::new(JsonExec::new( + source, + options, + file_options, + file_info, + None, + )) + }, + FileScan::Anonymous { .. } => unreachable!(), + }) } /// Scan over multiple sources and combine their results. @@ -150,8 +160,6 @@ pub struct MultiScanExec { predicate: Option>, file_options: FileScanOptions, scan_type: FileScan, - - first_file_metadata: Option>, } impl MultiScanExec { @@ -161,14 +169,8 @@ impl MultiScanExec { hive_parts: Option>>, predicate: Option>, file_options: FileScanOptions, - mut scan_type: FileScan, + scan_type: FileScan, ) -> Self { - let first_file_metadata = match &mut scan_type { - #[cfg(feature = "parquet")] - FileScan::Parquet { metadata, .. } => metadata.take().map(|md| Box::new(md) as _), - _ => None, - }; - Self { sources, file_info, @@ -176,7 +178,6 @@ impl MultiScanExec { predicate, file_options, scan_type, - first_file_metadata, } } @@ -191,12 +192,13 @@ impl MultiScanExec { for i in (0..self.sources.len()).rev() { let source = self.sources.get(i).unwrap(); - let mut exec_source = Source::new( + let mut exec_source = source_to_exec( source, &self.scan_type, &self.file_info, &self.file_options, - self.first_file_metadata.as_deref().filter(|_| i == 0), + self.file_options.allow_missing_columns, + i, )?; let num_rows = exec_source.num_unfiltered_rows()? as usize; @@ -260,10 +262,6 @@ impl MultiScanExec { let mut row_index = self.file_options.row_index.take(); let slice = self.file_options.slice.take(); - let current_schema = self.file_info.schema.clone(); - let output_schema = current_schema.clone(); - let mut missing_columns = Vec::new(); - let mut first_slice_file = None; let mut slice = match slice { None => None, @@ -278,6 +276,14 @@ impl MultiScanExec { }), }; + let final_per_source_schema = &self.file_info.schema; + let file_output_schema = if let Some(file_with_columns) = file_with_columns.as_ref() { + Arc::new(final_per_source_schema.try_project(file_with_columns.as_ref())?) + } else { + final_per_source_schema.clone() + }; + let mut missing_columns = Vec::new(); + let verbose = config::verbose(); let mut dfs = Vec::with_capacity(self.sources.len()); @@ -296,12 +302,13 @@ impl MultiScanExec { break; } - let mut exec_source = Source::new( + let mut exec_source = source_to_exec( source, &self.scan_type, &self.file_info, &self.file_options, - self.first_file_metadata.as_deref().filter(|_| i == 0), + allow_missing_columns, + i, )?; if verbose { @@ -311,36 +318,20 @@ impl MultiScanExec { ); } - // @TODO: There are cases where we can ignore reading. E.g. no row index + empty with columns + no predicate - let mut schema = exec_source.schema()?; - let mut extra_columns = Vec::new(); - - if let Some(file_with_columns) = &file_with_columns { - if allow_missing_columns { - schema = schema.try_project( - file_with_columns - .iter() - .filter(|c| schema.contains(c.as_str())), - )?; - } else { - schema = schema.try_project(file_with_columns.iter())?; - } - } - - if allow_missing_columns { - missing_columns.clear(); - extra_columns.clear(); - - current_schema.as_ref().field_compare( - &schema, - &mut missing_columns, - &mut extra_columns, - ); + let mut do_skip_file = false; + if let Some(slice) = &slice { + let allow_slice_skip = match first_slice_file { + None => slice.0 as IdxSize >= exec_source.num_unfiltered_rows()?, + Some(f) => i < f, + }; - if !extra_columns.is_empty() { - // @TODO: Better error - polars_bail!(InvalidOperation: "More schema in file after first"); + if allow_slice_skip && verbose { + eprintln!( + "Slice allows skipping of '{}'", + source.to_include_path_name() + ); } + do_skip_file |= allow_slice_skip; } // Insert the hive partition values into the predicate. This allows the predicate @@ -360,9 +351,10 @@ impl MultiScanExec { .into_static(); const_columns.insert(column.clone(), value); } - for (_, (missing_column, _)) in &missing_columns { - const_columns.insert((*missing_column).clone(), AnyValue::Null); - } + // @TODO: It would be nice to get this somehow. + // for (_, (missing_column, _)) in &missing_columns { + // const_columns.insert((*missing_column).clone(), AnyValue::Null); + // } file_predicate = predicate.replace_elementwise_const_columns(&const_columns); @@ -389,22 +381,6 @@ impl MultiScanExec { } } - let mut do_skip_file = false; - if let Some(slice) = &slice { - let allow_slice_skip = match first_slice_file { - None => slice.0 as IdxSize >= exec_source.num_unfiltered_rows()?, - Some(f) => i < f, - }; - - if allow_slice_skip && verbose { - eprintln!( - "Slice allows skipping of '{}'", - source.to_include_path_name() - ); - } - do_skip_file |= allow_slice_skip; - } - let stats_evaluator = file_predicate.as_ref().and_then(|p| p.as_stats_evaluator()); let stats_evaluator = stats_evaluator.filter(|_| use_statistics); @@ -438,32 +414,67 @@ impl MultiScanExec { continue; } - let with_columns = if allow_missing_columns { - file_with_columns - .as_ref() - .map(|_| schema.iter_names().cloned().collect()) - } else { - file_with_columns.clone() - }; - - // Read the DataFrame and needed metadata. - let num_unfiltered_rows = exec_source.num_unfiltered_rows()?; - let mut df = exec_source.scan_exec.read( - with_columns, + // @TODO: There are cases where we can ignore reading. E.g. no row index + empty with columns + no predicate + let mut current_source_with_columns = Cow::Borrowed(&file_with_columns); + + // If we allow missing columns, we need to determine the set of missing columns and + // possibly update the with_columns to reflect that. + if allow_missing_columns { + let current_source_schema = exec_source.schema()?; + + missing_columns.clear(); + + let mut extra_columns = Vec::new(); + final_per_source_schema.as_ref().field_compare( + current_source_schema.as_ref(), + &mut missing_columns, + &mut extra_columns, + ); + + if !extra_columns.is_empty() { + let source_name = match source { + ScanSourceRef::Path(path) => path.to_string_lossy().into_owned(), + ScanSourceRef::File(_) => format!("file descriptor #{}", i + 1), + ScanSourceRef::Buffer(_) => format!("in-memory buffer #{}", i + 1), + }; + let columns = extra_columns + .iter() + .map(|(_, (name, _))| format!("'{}'", name)) + .collect::>() + .join(", "); + polars_bail!( + SchemaMismatch: + "'{source_name}' contains column(s) {columns}, which are not present in the first scanned file" + ); + } + + // Update `with_columns` to not include any columns not present in the file. + current_source_with_columns = + Cow::Owned(current_source_with_columns.as_deref().map(|with_columns| { + with_columns + .iter() + .filter(|c| current_source_schema.contains(c)) + .cloned() + .collect() + })); + } + + // Read the DataFrame. + let mut df = exec_source.read( + current_source_with_columns.into_owned(), slice, file_predicate, row_index.clone(), - exec_source.metadata.take(), - schema, )?; // Update the row_index to the proper offset. if let Some(row_index) = row_index.as_mut() { - row_index.offset += num_unfiltered_rows; + row_index.offset += exec_source.num_unfiltered_rows()?; } // Update the slice. if let Some(slice) = slice.as_mut() { if first_slice_file.is_none_or(|f| i >= f) { + let num_unfiltered_rows = exec_source.num_unfiltered_rows()?; slice.1 = slice .1 .saturating_sub(num_unfiltered_rows as usize - slice.0); @@ -503,7 +514,7 @@ impl MultiScanExec { } // Project to ensure that all DataFrames have the proper order. - df = df.select(output_schema.iter_names().cloned())?; + df = df.select(file_output_schema.iter_names().cloned())?; dfs.push(df); } diff --git a/crates/polars-mem-engine/src/executors/scan/csv.rs b/crates/polars-mem-engine/src/executors/scan/csv.rs index 6caf40019c8d..4dc700878592 100644 --- a/crates/polars-mem-engine/src/executors/scan/csv.rs +++ b/crates/polars-mem-engine/src/executors/scan/csv.rs @@ -216,39 +216,76 @@ impl ScanExec for CsvExec { slice: Option<(usize, usize)>, predicate: Option>, row_index: Option, - metadata: Option>, - schema: Schema, ) -> PolarsResult { self.file_options.with_columns = with_columns; self.file_options.slice = slice.map(|(o, l)| (o as i64, l)); self.predicate = predicate; self.file_options.row_index = row_index; - let schema = Arc::new(schema); - self.file_info.reader_schema = Some(arrow::Either::Right(schema.clone())); - self.file_info.schema = schema.clone(); - - self.options.schema.take(); - // self.options.schema_overwrite.take(); + if self.file_info.reader_schema.is_none() { + self.schema()?; + } + self.read_impl() + } - // Use the metadata somehow - _ = metadata; + fn schema(&mut self) -> PolarsResult<&SchemaRef> { + let mut schema = self.file_info.reader_schema.take(); + if schema.is_none() { + let force_async = config::force_async(); + let run_async = (self.sources.is_paths() && force_async) || self.sources.is_cloud_url(); + + let source = self.sources.at(0); + let owned = &mut vec![]; + + let memslice = source.to_memslice_async_assume_latest(run_async)?; + + // @TODO!: Cache the decompression + let bytes = maybe_decompress_bytes(&memslice, owned)?; + + schema = Some(arrow::Either::Right(Arc::new( + infer_file_schema( + &get_reader_bytes(&mut std::io::Cursor::new(bytes))?, + self.options.parse_options.as_ref(), + self.options.infer_schema_length, + self.options.has_header, + self.options.schema_overwrite.as_deref(), + self.options.skip_rows, + self.options.skip_lines, + self.options.skip_rows_after_header, + self.options.raise_if_empty, + &mut self.options.n_threads, + )? + .0, + ))); + } - self.read_impl() + Ok(self + .file_info + .reader_schema + .insert(schema.unwrap()) + .as_ref() + .unwrap_right()) } - fn metadata(&mut self) -> PolarsResult> { + fn num_unfiltered_rows(&mut self) -> PolarsResult { + let (lb, ub) = self.file_info.row_estimation; + if lb.is_none_or(|lb| lb != ub) { + return Ok(ub as IdxSize); + } + let force_async = config::force_async(); let run_async = (self.sources.is_paths() && force_async) || self.sources.is_cloud_url(); let source = self.sources.at(0); let owned = &mut vec![]; + // @TODO!: Cache the decompression let memslice = source.to_memslice_async_assume_latest(run_async)?; let popt = self.options.parse_options.as_ref(); let bytes = maybe_decompress_bytes(&memslice, owned)?; + let num_rows = count_rows_from_slice( bytes, popt.separator, @@ -256,22 +293,10 @@ impl ScanExec for CsvExec { popt.comment_prefix.as_ref(), popt.eol_char, self.options.has_header, - )? as IdxSize; - let schema = infer_file_schema( - &get_reader_bytes(&mut std::io::Cursor::new(bytes))?, - self.options.parse_options.as_ref(), - self.options.infer_schema_length, - self.options.has_header, - self.options.schema_overwrite.as_deref(), - self.options.skip_rows, - self.options.skip_lines, - self.options.skip_rows_after_header, - self.options.raise_if_empty, - &mut self.options.n_threads, - )? - .0; - - Ok(Box::new(BasicFileMetadata { schema, num_rows }) as _) + )?; + + self.file_info.row_estimation = (Some(num_rows), num_rows); + Ok(num_rows as IdxSize) } } diff --git a/crates/polars-mem-engine/src/executors/scan/ipc.rs b/crates/polars-mem-engine/src/executors/scan/ipc.rs index c35e1d11441a..9c305949f060 100644 --- a/crates/polars-mem-engine/src/executors/scan/ipc.rs +++ b/crates/polars-mem-engine/src/executors/scan/ipc.rs @@ -20,6 +20,7 @@ pub struct IpcExec { pub(crate) file_options: FileScanOptions, pub(crate) hive_parts: Option>>, pub(crate) cloud_options: Option, + pub(crate) metadata: Option>, } impl IpcExec { @@ -192,6 +193,74 @@ impl IpcExec { } } +impl ScanExec for IpcExec { + fn read( + &mut self, + with_columns: Option>, + slice: Option<(usize, usize)>, + predicate: Option>, + row_index: Option, + ) -> PolarsResult { + self.file_options.with_columns = with_columns; + self.file_options.slice = slice.map(|(s, l)| (s as i64, l)); + self.predicate = predicate; + self.file_options.row_index = row_index; + + if self.file_info.reader_schema.is_none() { + self.schema()?; + } + self.read() + } + + fn schema(&mut self) -> PolarsResult<&SchemaRef> { + if self.file_info.reader_schema.is_some() { + return Ok(&self.file_info.schema); + } + + let arrow_schema = match &self.metadata { + None => { + // @TODO!: Cache the memslice here. + let memslice = self + .sources + .at(0) + .to_memslice_async_assume_latest(self.sources.is_cloud_url())?; + IpcReader::new(std::io::Cursor::new(memslice)).schema()? + }, + Some(md) => md.schema.clone(), + }; + self.file_info.schema = + Arc::new(Schema::from_iter(arrow_schema.iter().map( + |(name, field)| (name.clone(), DataType::from_arrow_field(field)), + ))); + self.file_info.reader_schema = Some(arrow::Either::Left(arrow_schema)); + + Ok(&self.file_info.schema) + } + + fn num_unfiltered_rows(&mut self) -> PolarsResult { + let (lb, ub) = self.file_info.row_estimation; + if lb.is_some_and(|lb| lb == ub) { + return Ok(ub as IdxSize); + } + + // @TODO!: Cache the memslice here. + let memslice = self + .sources + .at(0) + .to_memslice_async_assume_latest(self.sources.is_cloud_url())?; + let mut reader = std::io::Cursor::new(memslice); + + let num_unfiltered_rows = match &self.metadata { + None => arrow::io::ipc::read::get_row_count(&mut reader)?, + Some(md) => arrow::io::ipc::read::get_row_count_from_blocks(&mut reader, &md.blocks)?, + } as usize; + + self.file_info.row_estimation = (Some(num_unfiltered_rows), num_unfiltered_rows); + + Ok(num_unfiltered_rows as IdxSize) + } +} + impl Executor for IpcExec { fn execute(&mut self, state: &mut ExecutionState) -> PolarsResult { let profile_name = if state.has_node_timer() { diff --git a/crates/polars-mem-engine/src/executors/scan/ndjson.rs b/crates/polars-mem-engine/src/executors/scan/ndjson.rs index 37731bc7f61e..7cbcb89810a6 100644 --- a/crates/polars-mem-engine/src/executors/scan/ndjson.rs +++ b/crates/polars-mem-engine/src/executors/scan/ndjson.rs @@ -8,7 +8,7 @@ use super::*; pub struct JsonExec { sources: ScanSources, options: NDJsonReadOptions, - file_scan_options: FileScanOptions, + file_options: FileScanOptions, file_info: FileInfo, predicate: Option>, } @@ -17,20 +17,20 @@ impl JsonExec { pub fn new( sources: ScanSources, options: NDJsonReadOptions, - file_scan_options: FileScanOptions, + file_options: FileScanOptions, file_info: FileInfo, predicate: Option>, ) -> Self { Self { sources, options, - file_scan_options, + file_options, file_info, predicate, } } - fn read(&mut self) -> PolarsResult { + fn read_impl(&mut self) -> PolarsResult { let schema = self .file_info .reader_schema @@ -47,7 +47,7 @@ impl JsonExec { eprintln!("ASYNC READING FORCED"); } - let mut n_rows = self.file_scan_options.slice.map(|x| { + let mut n_rows = self.file_options.slice.map(|x| { assert_eq!(x.0, 0); x.1 }); @@ -55,12 +55,12 @@ impl JsonExec { // Avoid panicking if n_rows == Some(0) { let mut df = DataFrame::empty_with_schema(schema); - if let Some(col) = &self.file_scan_options.include_file_paths { + if let Some(col) = &self.file_options.include_file_paths { unsafe { df.with_column_unchecked(Column::new_empty(col.clone(), &DataType::String)) }; } - if let Some(row_index) = &self.file_scan_options.row_index { + if let Some(row_index) = &self.file_options.row_index { df.with_row_index_mut(row_index.name.clone(), Some(row_index.offset)); } return Ok(df); @@ -74,7 +74,7 @@ impl JsonExec { return None; } - let row_index = self.file_scan_options.row_index.as_mut(); + let row_index = self.file_options.row_index.as_mut(); let memslice = match source.to_memslice_async_assume_latest(run_async) { Ok(memslice) => memslice, @@ -90,11 +90,11 @@ impl JsonExec { let df = reader .with_schema(schema.clone()) - .with_rechunk(self.file_scan_options.rechunk) + .with_rechunk(self.file_options.rechunk) .with_chunk_size(Some(self.options.chunk_size)) .with_row_index(row_index) .with_predicate(self.predicate.clone().map(phys_expr_to_io_expr)) - .with_projection(self.file_scan_options.with_columns.clone()) + .with_projection(self.file_options.with_columns.clone()) .low_memory(self.options.low_memory) .with_n_rows(n_rows) .with_ignore_errors(self.options.ignore_errors) @@ -109,7 +109,7 @@ impl JsonExec { *n_rows -= df.height(); } - if let Some(col) = &self.file_scan_options.include_file_paths { + if let Some(col) = &self.file_options.include_file_paths { let name = source.to_include_path_name(); unsafe { df.with_column_unchecked(Column::new_scalar( @@ -128,6 +128,73 @@ impl JsonExec { } } +impl ScanExec for JsonExec { + fn read( + &mut self, + with_columns: Option>, + slice: Option<(usize, usize)>, + predicate: Option>, + row_index: Option, + ) -> PolarsResult { + self.file_options.with_columns = with_columns; + self.file_options.slice = slice.map(|(s, l)| (s as i64, l)); + self.predicate = predicate; + self.file_options.row_index = row_index; + + if self.file_info.reader_schema.is_none() { + self.schema()?; + } + self.read_impl() + } + + fn schema(&mut self) -> PolarsResult<&SchemaRef> { + if self.file_info.reader_schema.is_some() { + return Ok(&self.file_info.schema); + } + + let memslice = self + .sources + .at(0) + .to_memslice_async_assume_latest(self.sources.is_cloud_url())?; + + let owned = &mut vec![]; + let bytes = maybe_decompress_bytes(&memslice[..], owned)?; + + let schema = polars_io::ndjson::infer_schema( + &mut std::io::Cursor::new(bytes), + self.options.infer_schema_length, + )?; + + let schema = Arc::new(schema); + self.file_info.schema = schema.clone(); + self.file_info.reader_schema = Some(arrow::Either::Right(schema.clone())); + + Ok(&self.file_info.schema) + } + + fn num_unfiltered_rows(&mut self) -> PolarsResult { + let (lb, ub) = self.file_info.row_estimation; + if lb.is_some_and(|lb| lb == ub) { + return Ok(ub as IdxSize); + } + + let memslice = self + .sources + .at(0) + .to_memslice_async_assume_latest(self.sources.is_cloud_url())?; + + let owned = &mut vec![]; + let bytes = maybe_decompress_bytes(&memslice[..], owned)?; + + let reader = polars_io::ndjson::core::JsonLineReader::new(std::io::Cursor::new(bytes)); + let num_unfiltered_rows = reader.count()?; + + self.file_info.row_estimation = (Some(num_unfiltered_rows), num_unfiltered_rows); + + Ok(num_unfiltered_rows as IdxSize) + } +} + impl Executor for JsonExec { fn execute(&mut self, state: &mut ExecutionState) -> PolarsResult { let profile_name = if state.has_node_timer() { @@ -138,6 +205,6 @@ impl Executor for JsonExec { Cow::Borrowed("") }; - state.record(|| self.read(), profile_name) + state.record(|| self.read_impl(), profile_name) } } diff --git a/crates/polars-mem-engine/src/executors/scan/parquet.rs b/crates/polars-mem-engine/src/executors/scan/parquet.rs index 25595ff5d998..5a6efdaa608f 100644 --- a/crates/polars-mem-engine/src/executors/scan/parquet.rs +++ b/crates/polars-mem-engine/src/executors/scan/parquet.rs @@ -18,7 +18,7 @@ pub struct ParquetExec { hive_parts: Option>>, predicate: Option>, - options: ParquetOptions, + pub(crate) options: ParquetOptions, #[allow(dead_code)] cloud_options: Option, file_options: FileScanOptions, @@ -477,7 +477,7 @@ impl ParquetExec { Ok(result) } - fn read_with_num_unfiltered_rows(&mut self) -> PolarsResult<(IdxSize, DataFrame)> { + fn read_impl(&mut self) -> PolarsResult { // FIXME: The row index implementation is incorrect when a predicate is // applied. This code mitigates that by applying the predicate after the // collection of the entire dataframe if a row index is requested. This is @@ -506,64 +506,52 @@ impl ParquetExec { let mut out = accumulate_dataframes_vertical(out)?; - let num_unfiltered_rows = out.height() as IdxSize; + let num_unfiltered_rows = out.height(); + self.file_info.row_estimation = (Some(num_unfiltered_rows), num_unfiltered_rows); polars_io::predicates::apply_predicate(&mut out, post_predicate.as_deref(), true)?; if self.file_options.rechunk { out.as_single_chunk_par(); } - Ok((num_unfiltered_rows, out)) + Ok(out) } - fn metadata_sync(&mut self) -> PolarsResult> { - Ok(Box::new(match &self.metadata { - None => { - let memslice = self.sources.get(0).unwrap().to_memslice()?; - ParquetReader::new(std::io::Cursor::new(memslice)) - .get_metadata()? - .clone() - }, - Some(md) => md.clone(), - }) as _) + fn metadata_sync(&mut self) -> PolarsResult<&FileMetadataRef> { + let memslice = self.sources.get(0).unwrap().to_memslice()?; + Ok(self.metadata.insert( + ParquetReader::new(std::io::Cursor::new(memslice)) + .get_metadata()? + .clone(), + )) } #[cfg(feature = "cloud")] - async fn metadata_async(&mut self) -> PolarsResult> { + async fn metadata_async(&mut self) -> PolarsResult<&FileMetadataRef> { let ScanSourceRef::Path(path) = self.sources.get(0).unwrap() else { unreachable!(); }; - Ok(Box::new(match &self.metadata { - None => { - let mut reader = ParquetAsyncReader::from_uri( - path.to_str().unwrap(), - self.cloud_options.as_ref(), - None, - ) + let mut reader = + ParquetAsyncReader::from_uri(path.to_str().unwrap(), self.cloud_options.as_ref(), None) .await?; - reader.get_metadata().await?.clone() - }, - Some(md) => md.clone(), - }) as _) + Ok(self.metadata.insert(reader.get_metadata().await?.clone())) } -} -impl IOFileMetadata for Arc { - fn as_any(&self) -> &dyn std::any::Any { - self - } + fn metadata(&mut self) -> PolarsResult<&FileMetadataRef> { + let metadata = self.metadata.take(); + if let Some(md) = metadata { + return Ok(self.metadata.insert(md)); + } - fn num_rows(&self) -> PolarsResult { - Ok(self.num_rows as IdxSize) - } + #[cfg(feature = "cloud")] + if self.sources.is_cloud_url() { + return polars_io::pl_async::get_runtime() + .block_on_potential_spawn(self.metadata_async()); + } - fn schema(&self) -> PolarsResult { - let arrow_schema = polars_io::parquet::read::infer_schema(self)?; - Ok(Schema::from_iter(arrow_schema.iter().map( - |(name, field)| (name.clone(), DataType::from_arrow_field(field)), - ))) + self.metadata_sync() } } @@ -574,39 +562,37 @@ impl ScanExec for ParquetExec { slice: Option<(usize, usize)>, predicate: Option>, row_index: Option, - metadata: Option>, - schema: Schema, ) -> PolarsResult { self.file_options.with_columns = with_columns; self.file_options.slice = slice.map(|(o, l)| (o as i64, l)); self.predicate = predicate; self.file_options.row_index = row_index; - self.file_info.reader_schema = Some(arrow::Either::Left(Arc::new( - schema.to_arrow(CompatLevel::newest()), - ))); - self.file_info.schema = Arc::new(schema); - if let Some(metadata) = metadata { - self.metadata = Some( - metadata - .as_any() - .downcast_ref::>() - .unwrap() - .clone(), - ); + if self.file_info.reader_schema.is_none() { + self.schema()?; } - - self.read_with_num_unfiltered_rows().map(|(_, df)| df) + self.read_impl() } - fn metadata(&mut self) -> PolarsResult> { - #[cfg(feature = "cloud")] - if self.sources.is_cloud_url() { - return polars_io::pl_async::get_runtime() - .block_on_potential_spawn(self.metadata_async()); + fn schema(&mut self) -> PolarsResult<&SchemaRef> { + if self.file_info.reader_schema.is_some() { + return Ok(&self.file_info.schema); } - self.metadata_sync() + let md = self.metadata()?; + let arrow_schema = polars_io::parquet::read::infer_schema(md)?; + self.file_info.schema = + Arc::new(Schema::from_iter(arrow_schema.iter().map( + |(name, field)| (name.clone(), DataType::from_arrow_field(field)), + ))); + self.file_info.reader_schema = Some(arrow::Either::Left(Arc::new(arrow_schema))); + + Ok(&self.file_info.schema) + } + + fn num_unfiltered_rows(&mut self) -> PolarsResult { + let md = self.metadata()?; + Ok(md.num_rows as IdxSize) } } @@ -623,9 +609,6 @@ impl Executor for ParquetExec { Cow::Borrowed("") }; - state.record( - || self.read_with_num_unfiltered_rows().map(|(_, df)| df), - profile_name, - ) + state.record(|| self.read_impl(), profile_name) } } diff --git a/crates/polars-mem-engine/src/planner/lp.rs b/crates/polars-mem-engine/src/planner/lp.rs index 8850ab1173d4..3158651aaa15 100644 --- a/crates/polars-mem-engine/src/planner/lp.rs +++ b/crates/polars-mem-engine/src/planner/lp.rs @@ -210,30 +210,29 @@ fn create_physical_plan_impl( }) .map_or(Ok(None), |v| v.map(Some))?; + if sources.len() > 1 + && std::env::var("POLARS_NEW_MULTIFILE").as_deref() == Ok("1") + && !matches!(scan_type, FileScan::Anonymous { .. }) + { + return Ok(Box::new(executors::MultiScanExec::new( + sources, + file_info, + hive_parts, + predicate, + file_options, + scan_type, + ))); + } + match scan_type.clone() { #[cfg(feature = "csv")] - FileScan::Csv { options, .. } => { - if sources.len() > 1 - && std::env::var("POLARS_NEW_MULTIFILE").as_deref() == Ok("1") - { - Ok(Box::new(executors::MultiScanExec::new( - sources, - file_info, - hive_parts, - predicate, - file_options, - scan_type, - ))) - } else { - Ok(Box::new(executors::CsvExec { - sources, - file_info, - options, - predicate, - file_options, - })) - } - }, + FileScan::Csv { options, .. } => Ok(Box::new(executors::CsvExec { + sources, + file_info, + options, + predicate, + file_options, + })), #[cfg(feature = "ipc")] FileScan::Ipc { options, @@ -247,37 +246,23 @@ fn create_physical_plan_impl( file_options, hive_parts, cloud_options, + metadata, })), #[cfg(feature = "parquet")] FileScan::Parquet { options, cloud_options, metadata, - } => { - if sources.len() > 1 - && std::env::var("POLARS_NEW_MULTIFILE").as_deref() == Ok("1") - { - Ok(Box::new(executors::MultiScanExec::new( - sources, - file_info, - hive_parts, - predicate, - file_options, - scan_type, - ))) - } else { - Ok(Box::new(executors::ParquetExec::new( - sources, - file_info, - hive_parts, - predicate, - options, - cloud_options, - file_options, - metadata, - ))) - } - }, + } => Ok(Box::new(executors::ParquetExec::new( + sources, + file_info, + hive_parts, + predicate, + options, + cloud_options, + file_options, + metadata, + ))), #[cfg(feature = "json")] FileScan::NDJson { options, .. } => Ok(Box::new(executors::JsonExec::new( sources, diff --git a/crates/polars-plan/src/plans/conversion/dsl_to_ir.rs b/crates/polars-plan/src/plans/conversion/dsl_to_ir.rs index c6a9b2e32275..a2e8003dc902 100644 --- a/crates/polars-plan/src/plans/conversion/dsl_to_ir.rs +++ b/crates/polars-plan/src/plans/conversion/dsl_to_ir.rs @@ -217,7 +217,7 @@ pub fn to_alp_impl(lp: DslPlan, ctxt: &mut DslConversionContext) -> PolarsResult let (file_info, md) = scans::ipc_file_info(&sources, &file_options, cloud_options.as_ref()) .map_err(|e| e.context(failed_here!(ipc scan)))?; - *metadata = Some(md); + *metadata = Some(Arc::new(md)); file_info }, #[cfg(feature = "csv")] diff --git a/crates/polars-plan/src/plans/file_scan.rs b/crates/polars-plan/src/plans/file_scan.rs index d4b9e2b14666..9f1ec4dd906d 100644 --- a/crates/polars-plan/src/plans/file_scan.rs +++ b/crates/polars-plan/src/plans/file_scan.rs @@ -36,7 +36,7 @@ pub enum FileScan { options: IpcScanOptions, cloud_options: Option, #[cfg_attr(feature = "serde", serde(skip))] - metadata: Option, + metadata: Option>, }, #[cfg_attr(feature = "serde", serde(skip))] Anonymous { diff --git a/crates/polars-plan/src/plans/functions/count.rs b/crates/polars-plan/src/plans/functions/count.rs index de2ac244ef29..14f117e5dfaf 100644 --- a/crates/polars-plan/src/plans/functions/count.rs +++ b/crates/polars-plan/src/plans/functions/count.rs @@ -66,7 +66,7 @@ pub fn count_rows( sources, #[cfg(feature = "cloud")] cloud_options.as_ref(), - metadata.as_ref(), + metadata.as_deref(), ), #[cfg(feature = "json")] FileScan::NDJson { diff --git a/crates/polars-stream/src/nodes/io_sources/ipc.rs b/crates/polars-stream/src/nodes/io_sources/ipc.rs index 1490957adbe1..9a00736a5bbe 100644 --- a/crates/polars-stream/src/nodes/io_sources/ipc.rs +++ b/crates/polars-stream/src/nodes/io_sources/ipc.rs @@ -59,7 +59,7 @@ pub struct IpcSourceNodeConfig { rechunk: bool, include_file_paths: Option, - first_metadata: FileMetadata, + first_metadata: Arc, } pub struct IpcSourceNodeState { @@ -91,7 +91,7 @@ impl IpcSourceNode { options: IpcScanOptions, _cloud_options: Option, file_options: FileScanOptions, - mut first_metadata: Option, + mut first_metadata: Option>, ) -> PolarsResult { // These should have all been removed during lower_ir assert!(predicate.is_none()); @@ -117,7 +117,7 @@ impl IpcSourceNode { None => { let source = sources.iter().next().unwrap(); let source = source.to_memslice()?; - read_file_metadata(&mut std::io::Cursor::new(&*source))? + Arc::new(read_file_metadata(&mut std::io::Cursor::new(&*source))?) }, }; @@ -412,13 +412,15 @@ impl ComputeNode for IpcSourceNode { let metadata = if state.source_idx == 0 { config.first_metadata.clone() } else { - read_file_metadata(&mut std::io::Cursor::new(memslice.as_ref()))? + Arc::new(read_file_metadata(&mut std::io::Cursor::new( + memslice.as_ref(), + ))?) }; state.source.insert(Source { file_path, memslice: Arc::new(memslice), - metadata: Arc::new(metadata), + metadata, block_offset: 0, }) },