From 6d7a87d37bcb9be92dcd46abcf615cc6f5320b0b Mon Sep 17 00:00:00 2001 From: Colin Ho Date: Thu, 26 Sep 2024 15:18:33 -0700 Subject: [PATCH 01/14] include path in read --- daft/daft/__init__.pyi | 1 + daft/io/_csv.py | 3 + daft/io/_json.py | 3 + daft/io/_parquet.py | 5 +- daft/io/common.py | 2 + src/daft-csv/src/read.rs | 51 ++++++--- src/daft-json/src/local.rs | 12 +- src/daft-json/src/read.rs | 55 ++++++--- .../src/sources/scan_task.rs | 20 +++- src/daft-micropartition/src/micropartition.rs | 13 +++ .../src/ops/cast_to_schema.rs | 1 + src/daft-micropartition/src/python.rs | 2 + src/daft-parquet/src/python.rs | 1 + src/daft-parquet/src/read.rs | 20 +++- src/daft-plan/src/builder.rs | 60 +++++++--- src/daft-scan/src/anonymous.rs | 5 + src/daft-scan/src/glob.rs | 12 +- src/daft-scan/src/lib.rs | 52 ++++++++- src/daft-scan/src/python.rs | 8 ++ src/daft-scan/src/scan_task_iters.rs | 1 + tests/dataframe/test_creation.py | 105 ++++++++++++++++++ .../io/test_s3_reads_include_path.py | 74 ++++++++++++ 22 files changed, 446 insertions(+), 60 deletions(-) create mode 100644 tests/integration/io/test_s3_reads_include_path.py diff --git a/daft/daft/__init__.pyi b/daft/daft/__init__.pyi index 9206c9da4d..cad7f43773 100644 --- a/daft/daft/__init__.pyi +++ b/daft/daft/__init__.pyi @@ -782,6 +782,7 @@ class ScanOperatorHandle: storage_config: StorageConfig, infer_schema: bool, schema: PySchema | None = None, + file_path_column: str | None = None, ) -> ScanOperatorHandle: ... @staticmethod def from_python_scan_operator(operator: ScanOperator) -> ScanOperatorHandle: ... diff --git a/daft/io/_csv.py b/daft/io/_csv.py index 8d3ce8b7a6..2ef2227ed8 100644 --- a/daft/io/_csv.py +++ b/daft/io/_csv.py @@ -30,6 +30,7 @@ def read_csv( comment: Optional[str] = None, allow_variable_columns: bool = False, io_config: Optional["IOConfig"] = None, + file_path_column: Optional[str] = None, use_native_downloader: bool = True, schema_hints: Optional[Dict[str, DataType]] = None, _buffer_size: Optional[int] = None, @@ -54,6 +55,7 @@ def read_csv( comment (str): Character to treat as the start of a comment line, or None to not support comments allow_variable_columns (bool): Whether to allow for variable number of columns in the CSV, defaults to False. If set to True, Daft will append nulls to rows with less columns than the schema, and ignore extra columns in rows with more columns io_config (IOConfig): Config to be used with the native downloader + file_path_column: Include the source path(s) as a column with this name. Defaults to None. use_native_downloader: Whether to use the native downloader instead of PyArrow for reading Parquet. This is currently experimental. @@ -97,5 +99,6 @@ def read_csv( schema=schema, file_format_config=file_format_config, storage_config=storage_config, + file_path_column=file_path_column, ) return DataFrame(builder) diff --git a/daft/io/_json.py b/daft/io/_json.py index f99f16aa2b..5dd3e8b3a5 100644 --- a/daft/io/_json.py +++ b/daft/io/_json.py @@ -23,6 +23,7 @@ def read_json( infer_schema: bool = True, schema: Optional[Dict[str, DataType]] = None, io_config: Optional["IOConfig"] = None, + file_path_column: Optional[str] = None, use_native_downloader: bool = True, schema_hints: Optional[Dict[str, DataType]] = None, _buffer_size: Optional[int] = None, @@ -41,6 +42,7 @@ def read_json( infer_schema (bool): Whether to infer the schema of the JSON, defaults to True. schema (dict[str, DataType]): A schema that is used as the definitive schema for the JSON if infer_schema is False, otherwise it is used as a schema hint that is applied after the schema is inferred. io_config (IOConfig): Config to be used with the native downloader + file_path_column: Include the source path(s) as a column with this name. Defaults to None. use_native_downloader: Whether to use the native downloader instead of PyArrow for reading Parquet. This is currently experimental. @@ -74,5 +76,6 @@ def read_json( schema=schema, file_format_config=file_format_config, storage_config=storage_config, + file_path_column=file_path_column, ) return DataFrame(builder) diff --git a/daft/io/_parquet.py b/daft/io/_parquet.py index c3a7748d3f..77653aafa5 100644 --- a/daft/io/_parquet.py +++ b/daft/io/_parquet.py @@ -24,6 +24,7 @@ def read_parquet( infer_schema: bool = True, schema: Optional[Dict[str, DataType]] = None, io_config: Optional["IOConfig"] = None, + file_path_column: Optional[str] = None, use_native_downloader: bool = True, coerce_int96_timestamp_unit: Optional[Union[str, TimeUnit]] = None, schema_hints: Optional[Dict[str, DataType]] = None, @@ -45,9 +46,10 @@ def read_parquet( infer_schema (bool): Whether to infer the schema of the Parquet, defaults to True. schema (dict[str, DataType]): A schema that is used as the definitive schema for the Parquet file if infer_schema is False, otherwise it is used as a schema hint that is applied after the schema is inferred. io_config (IOConfig): Config to be used with the native downloader + file_path_column: Include the source path(s) as a column with this name. Defaults to None. use_native_downloader: Whether to use the native downloader instead of PyArrow for reading Parquet. coerce_int96_timestamp_unit: TimeUnit to coerce Int96 TimeStamps to. e.g.: [ns, us, ms], Defaults to None. - _multithreaded_io: Whether to use multithreading for IO threads. Setting this to False can be helpful in reducing + _multithreaded_io: Include the source path(s) as a column called the amount of system resources (number of connections and thread contention) when running in the Ray runner. Defaults to None, which will let Daft decide based on the runner it is currently using. @@ -93,5 +95,6 @@ def read_parquet( schema=schema, file_format_config=file_format_config, storage_config=storage_config, + file_path_column=file_path_column, ) return DataFrame(builder) diff --git a/daft/io/common.py b/daft/io/common.py index d4b34291c4..981042004f 100644 --- a/daft/io/common.py +++ b/daft/io/common.py @@ -23,6 +23,7 @@ def get_tabular_files_scan( schema: dict[str, DataType] | None, file_format_config: FileFormatConfig, storage_config: StorageConfig, + file_path_column: str | None = None, ) -> LogicalPlanBuilder: """Returns a TabularFilesScan LogicalPlan for a given glob filepath.""" # Glob the path using the Runner @@ -40,6 +41,7 @@ def get_tabular_files_scan( storage_config, infer_schema=infer_schema, schema=_get_schema_from_dict(schema)._schema if schema is not None else None, + file_path_column=file_path_column, ) builder = LogicalPlanBuilder.from_tabular_scan( diff --git a/src/daft-csv/src/read.rs b/src/daft-csv/src/read.rs index c0332feca8..1c3e4a46fe 100644 --- a/src/daft-csv/src/read.rs +++ b/src/daft-csv/src/read.rs @@ -63,6 +63,7 @@ pub fn read_csv( io_client, io_stats, max_chunks_in_flight, + None, ) .await }) @@ -79,18 +80,28 @@ pub fn read_csv_bulk( multithreaded_io: bool, max_chunks_in_flight: Option, num_parallel_tasks: usize, + file_path_column: Option, ) -> DaftResult> { let runtime_handle = get_runtime(multithreaded_io)?; let tables = runtime_handle.block_on_current_thread(async move { // Launch a read task per URI, throttling the number of concurrent file reads to num_parallel tasks. let task_stream = futures::stream::iter(uris.iter().map(|uri| { - let (uri, convert_options, parse_options, read_options, io_client, io_stats) = ( + let ( + uri, + convert_options, + parse_options, + read_options, + io_client, + io_stats, + file_path_column, + ) = ( uri.to_string(), convert_options.clone(), parse_options.clone(), read_options.clone(), io_client.clone(), io_stats.clone(), + file_path_column.clone(), ); tokio::task::spawn(async move { read_csv_single_into_table( @@ -101,6 +112,7 @@ pub fn read_csv_bulk( io_client, io_stats, max_chunks_in_flight, + file_path_column, ) .await }) @@ -199,6 +211,7 @@ fn tables_concat(mut tables: Vec) -> DaftResult
{ ) } +#[allow(clippy::too_many_arguments)] async fn read_csv_single_into_table( uri: &str, convert_options: Option, @@ -207,6 +220,7 @@ async fn read_csv_single_into_table( io_client: Arc, io_stats: Option, max_chunks_in_flight: Option, + file_path_column: Option, ) -> DaftResult
{ let predicate = convert_options .as_ref() @@ -309,20 +323,31 @@ async fn read_csv_single_into_table( .into_iter() .collect::>>()?; // Handle empty table case. - if collected_tables.is_empty() { - return Table::empty(Some(schema)); - } + let output_table = { + if collected_tables.is_empty() { + return Table::empty(Some(schema)); + } - // // TODO(Clark): Don't concatenate all chunks from a file into a single table, since MicroPartition is natively chunked. - let concated_table = tables_concat(collected_tables)?; - if let Some(limit) = limit - && concated_table.len() > limit - { - // apply head in case that last chunk went over limit - concated_table.head(limit) - } else { - Ok(concated_table) + // // TODO(Clark): Don't concatenate all chunks from a file into a single table, since MicroPartition is natively chunked. + let concated_table = tables_concat(collected_tables)?; + if let Some(limit) = limit + && concated_table.len() > limit + { + // apply head in case that last chunk went over limit + concated_table.head(limit) + } else { + Ok(concated_table) + } + }?; + if let Some(file_path_col_name) = file_path_column { + let file_paths_column = Utf8Array::from_iter( + file_path_col_name.as_str(), + std::iter::repeat(Some(uri.trim_start_matches("file://"))).take(output_table.len()), + ) + .into_series(); + return output_table.union(&Table::from_nonempty_columns(vec![file_paths_column])?); } + Ok(output_table) } async fn stream_csv_single( diff --git a/src/daft-json/src/local.rs b/src/daft-json/src/local.rs index 224c94f24f..9fff94703c 100644 --- a/src/daft-json/src/local.rs +++ b/src/daft-json/src/local.rs @@ -28,6 +28,7 @@ pub fn read_json_local( parse_options: Option, read_options: Option, max_chunks_in_flight: Option, + file_path_column: Option, ) -> DaftResult
{ let uri = uri.trim_start_matches("file://"); let file = std::fs::File::open(uri)?; @@ -43,7 +44,16 @@ pub fn read_json_local( read_options, max_chunks_in_flight, )?; - reader.finish() + let output_table = reader.finish()?; + if let Some(file_path_col_name) = file_path_column { + let file_paths_column = Utf8Array::from_iter( + file_path_col_name.as_str(), + std::iter::repeat(Some(uri.trim_start_matches("file://"))).take(output_table.len()), + ) + .into_series(); + return output_table.union(&Table::from_nonempty_columns(vec![file_paths_column])?); + } + Ok(output_table) } struct JsonReader<'a> { diff --git a/src/daft-json/src/read.rs b/src/daft-json/src/read.rs index 7396e6ca04..1a4a06bc4c 100644 --- a/src/daft-json/src/read.rs +++ b/src/daft-json/src/read.rs @@ -56,6 +56,7 @@ pub fn read_json( io_client, io_stats, max_chunks_in_flight, + None, ) .await }) @@ -72,18 +73,28 @@ pub fn read_json_bulk( multithreaded_io: bool, max_chunks_in_flight: Option, num_parallel_tasks: usize, + file_path_column: Option, ) -> DaftResult> { let runtime_handle = get_runtime(multithreaded_io)?; let tables = runtime_handle.block_on_current_thread(async move { // Launch a read task per URI, throttling the number of concurrent file reads to num_parallel tasks. let task_stream = futures::stream::iter(uris.iter().map(|uri| { - let (uri, convert_options, parse_options, read_options, io_client, io_stats) = ( + let ( + uri, + convert_options, + parse_options, + read_options, + io_client, + io_stats, + file_path_column, + ) = ( uri.to_string(), convert_options.clone(), parse_options.clone(), read_options.clone(), io_client.clone(), io_stats.clone(), + file_path_column.clone(), ); tokio::task::spawn(async move { let table = read_json_single_into_table( @@ -94,6 +105,7 @@ pub fn read_json_bulk( io_client, io_stats, max_chunks_in_flight, + file_path_column, ) .await?; DaftResult::Ok(table) @@ -168,6 +180,7 @@ pub(crate) fn tables_concat(mut tables: Vec
) -> DaftResult
{ ) } +#[allow(clippy::too_many_arguments)] async fn read_json_single_into_table( uri: &str, convert_options: Option, @@ -176,6 +189,7 @@ async fn read_json_single_into_table( io_client: Arc, io_stats: Option, max_chunks_in_flight: Option, + file_path_column: Option, ) -> DaftResult
{ let (source_type, fixed_uri) = parse_url(uri)?; let is_compressed = CompressionCodec::from_uri(uri).is_some(); @@ -186,6 +200,7 @@ async fn read_json_single_into_table( parse_options, read_options, max_chunks_in_flight, + file_path_column, ); } @@ -270,20 +285,32 @@ async fn read_json_single_into_table( .into_iter() .collect::>>()?; // Handle empty table case. - if collected_tables.is_empty() { - let daft_schema = Arc::new(Schema::try_from(&schema)?); - return Table::empty(Some(daft_schema)); - } - // // TODO(Clark): Don't concatenate all chunks from a file into a single table, since MicroPartition is natively chunked. - let concated_table = tables_concat(collected_tables)?; - if let Some(limit) = limit - && concated_table.len() > limit - { - // apply head in case that last chunk went over limit - concated_table.head(limit) - } else { - Ok(concated_table) + let output_table = { + if collected_tables.is_empty() { + let daft_schema = Arc::new(Schema::try_from(&schema)?); + return Table::empty(Some(daft_schema)); + } + + // // TODO(Clark): Don't concatenate all chunks from a file into a single table, since MicroPartition is natively chunked. + let concated_table = tables_concat(collected_tables)?; + if let Some(limit) = limit + && concated_table.len() > limit + { + // apply head in case that last chunk went over limit + concated_table.head(limit) + } else { + Ok(concated_table) + } + }?; + if let Some(file_path_col_name) = file_path_column { + let file_paths_column = Utf8Array::from_iter( + file_path_col_name.as_str(), + std::iter::repeat(Some(uri.trim_start_matches("file://"))).take(output_table.len()), + ) + .into_series(); + return output_table.union(&Table::from_nonempty_columns(vec![file_paths_column])?); } + Ok(output_table) } pub async fn stream_json( diff --git a/src/daft-local-execution/src/sources/scan_task.rs b/src/daft-local-execution/src/sources/scan_task.rs index 7d36ba6a22..d177ffad84 100644 --- a/src/daft-local-execution/src/sources/scan_task.rs +++ b/src/daft-local-execution/src/sources/scan_task.rs @@ -2,12 +2,14 @@ use std::sync::Arc; use common_error::DaftResult; use common_file_formats::{FileFormatConfig, ParquetSourceConfig}; +use daft_core::{prelude::Utf8Array, series::IntoSeries}; use daft_csv::{CsvConvertOptions, CsvParseOptions, CsvReadOptions}; use daft_io::IOStatsRef; use daft_json::{JsonConvertOptions, JsonParseOptions, JsonReadOptions}; use daft_micropartition::MicroPartition; use daft_parquet::read::ParquetSchemaInferenceOptions; use daft_scan::{storage_config::StorageConfig, ChunkSpec, ScanTask}; +use daft_table::Table; use futures::{Stream, StreamExt}; use tokio_stream::wrappers::ReceiverStream; use tracing::instrument; @@ -136,7 +138,7 @@ async fn stream_scan_task( )); } let source = scan_task.sources.first().unwrap(); - let url = source.get_path(); + let url = source.get_path().to_string(); let table_stream = match scan_task.storage_config.as_ref() { StorageConfig::Native(native_storage_config) => { let io_config = Arc::new( @@ -178,7 +180,7 @@ async fn stream_scan_task( .first() .and_then(|s| s.get_parquet_metadata().cloned()); daft_parquet::read::stream_parquet( - url, + url.clone(), file_column_names.as_deref(), None, scan_task.pushdowns.limit, @@ -233,7 +235,7 @@ async fn stream_scan_task( let read_options = CsvReadOptions::new_internal(cfg.buffer_size, cfg.chunk_size); daft_csv::stream_csv( - url.to_string(), + url.clone(), Some(convert_options), Some(parse_options), Some(read_options), @@ -264,7 +266,7 @@ async fn stream_scan_task( JsonReadOptions::new_internal(cfg.buffer_size, cfg.chunk_size); daft_json::read::stream_json( - url.to_string(), + url.clone(), Some(convert_options), Some(parse_options), Some(read_options), @@ -298,7 +300,15 @@ async fn stream_scan_task( }; Ok(table_stream.map(move |table| { - let table = table?; + let mut table = table?; + if let Some(file_path_col_name) = &scan_task.file_path_column { + let file_paths_column = Utf8Array::from_iter( + file_path_col_name.as_str(), + std::iter::repeat(Some(url.trim_start_matches("file://"))).take(table.len()), + ) + .into_series(); + table = table.union(&Table::from_nonempty_columns(vec![file_paths_column])?)?; + } let casted_table = table.cast_to_schema_with_fill( scan_task.materialized_schema().as_ref(), scan_task diff --git a/src/daft-micropartition/src/micropartition.rs b/src/daft-micropartition/src/micropartition.rs index 5b518419d1..28896d97a6 100644 --- a/src/daft-micropartition/src/micropartition.rs +++ b/src/daft-micropartition/src/micropartition.rs @@ -181,6 +181,7 @@ fn materialize_scan_task( metadatas, Some(delete_map), *chunk_size, + scan_task.file_path_column.clone(), ) .context(DaftCoreComputeSnafu)? } @@ -235,6 +236,7 @@ fn materialize_scan_task( native_storage_config.multithreaded_io, None, 8, + scan_task.file_path_column.clone(), ) .context(DaftCoreComputeSnafu)? } @@ -265,6 +267,7 @@ fn materialize_scan_task( native_storage_config.multithreaded_io, None, 8, + scan_task.file_path_column.clone(), ) .context(DaftCoreComputeSnafu)? } @@ -653,6 +656,7 @@ impl MicroPartition { field_id_mapping.clone(), parquet_metadata, chunk_size, + scan_task.file_path_column.clone(), ) .context(DaftCoreComputeSnafu) } @@ -862,6 +866,7 @@ pub(crate) fn read_csv_into_micropartition( multithreaded_io, None, 8, + None, ) .context(DaftCoreComputeSnafu)?; @@ -911,6 +916,7 @@ pub(crate) fn read_json_into_micropartition( multithreaded_io, None, 8, + None, ) .context(DaftCoreComputeSnafu)?; @@ -987,6 +993,7 @@ fn _read_delete_files( None, None, None, + None, )?; let mut delete_map: HashMap> = @@ -1032,6 +1039,7 @@ fn _read_parquet_into_loaded_micropartition>( catalog_provided_schema: Option, field_id_mapping: Option>>, chunk_size: Option, + file_path_column: Option, ) -> DaftResult { let delete_map = iceberg_delete_files .map(|files| { @@ -1066,6 +1074,7 @@ fn _read_parquet_into_loaded_micropartition>( None, delete_map, chunk_size, + file_path_column, )?; // Prefer using the `catalog_provided_schema` but fall back onto inferred schema from Parquet files @@ -1116,6 +1125,7 @@ pub(crate) fn read_parquet_into_micropartition>( field_id_mapping: Option>>, parquet_metadata: Option>>, chunk_size: Option, + file_path_column: Option, ) -> DaftResult { if let Some(so) = start_offset && so > 0 @@ -1152,6 +1162,7 @@ pub(crate) fn read_parquet_into_micropartition>( catalog_provided_schema, field_id_mapping, chunk_size, + file_path_column, ); } let runtime_handle = get_runtime(multithreaded_io)?; @@ -1304,6 +1315,7 @@ pub(crate) fn read_parquet_into_micropartition>( }), num_rows, ), + file_path_column, ); let fill_map = scan_task.partition_spec().map(|pspec| pspec.to_fill_map()); @@ -1334,6 +1346,7 @@ pub(crate) fn read_parquet_into_micropartition>( catalog_provided_schema, field_id_mapping, chunk_size, + file_path_column, ) } } diff --git a/src/daft-micropartition/src/ops/cast_to_schema.rs b/src/daft-micropartition/src/ops/cast_to_schema.rs index 1612a83eae..45c55cebbf 100644 --- a/src/daft-micropartition/src/ops/cast_to_schema.rs +++ b/src/daft-micropartition/src/ops/cast_to_schema.rs @@ -28,6 +28,7 @@ impl MicroPartition { schema, scan_task.storage_config.clone(), scan_task.pushdowns.clone(), + scan_task.file_path_column.clone(), )) }; Ok(Self::new_unloaded( diff --git a/src/daft-micropartition/src/python.rs b/src/daft-micropartition/src/python.rs index 2060b3de30..1d6fdd40fc 100644 --- a/src/daft-micropartition/src/python.rs +++ b/src/daft-micropartition/src/python.rs @@ -601,6 +601,7 @@ impl PyMicroPartition { None, None, None, + None, ) })?; Ok(mp.into()) @@ -648,6 +649,7 @@ impl PyMicroPartition { None, None, chunk_size, + None, ) })?; Ok(mp.into()) diff --git a/src/daft-parquet/src/python.rs b/src/daft-parquet/src/python.rs index 2d965053c2..d9616b0978 100644 --- a/src/daft-parquet/src/python.rs +++ b/src/daft-parquet/src/python.rs @@ -170,6 +170,7 @@ pub mod pylib { None, None, None, + None, )? .into_iter() .map(|v| v.into()) diff --git a/src/daft-parquet/src/read.rs b/src/daft-parquet/src/read.rs index 3b6c498cf6..40c9164785 100644 --- a/src/daft-parquet/src/read.rs +++ b/src/daft-parquet/src/read.rs @@ -156,6 +156,7 @@ async fn read_parquet_single( metadata: Option>, delete_rows: Option>, chunk_size: Option, + file_path_column: Option, ) -> DaftResult
{ let field_id_mapping_provided = field_id_mapping.is_some(); let mut columns_to_read = columns.clone(); @@ -356,6 +357,15 @@ async fn read_parquet_single( .into()); } + if let Some(file_path_col_name) = file_path_column { + let file_paths_column = Utf8Array::from_iter( + file_path_col_name.as_str(), + std::iter::repeat(Some(uri.trim_start_matches("file://"))).take(table.len()), + ) + .into_series(); + return table.union(&Table::from_nonempty_columns(vec![file_paths_column])?); + } + Ok(table) } @@ -683,6 +693,7 @@ pub fn read_parquet( metadata, None, None, + None, ) .await }) @@ -751,6 +762,7 @@ pub fn read_parquet_bulk>( metadata: Option>>, delete_map: Option>>, chunk_size: Option, + file_path_column: Option, ) -> DaftResult> { let runtime_handle = daft_io::get_runtime(multithreaded_io)?; @@ -778,6 +790,7 @@ pub fn read_parquet_bulk>( let schema_infer_options = *schema_infer_options; let owned_field_id_mapping = field_id_mapping.clone(); let delete_rows = delete_map.as_ref().and_then(|m| m.get(&uri).cloned()); + let owned_file_path_column = file_path_column.clone(); tokio::task::spawn(async move { read_parquet_single( &uri, @@ -793,6 +806,7 @@ pub fn read_parquet_bulk>( metadata, delete_rows, chunk_size, + owned_file_path_column, ) .await }) @@ -827,7 +841,7 @@ pub fn read_parquet_bulk>( #[allow(clippy::too_many_arguments)] pub async fn stream_parquet( - uri: &str, + uri: String, columns: Option<&[&str]>, start_offset: Option, num_rows: Option, @@ -841,7 +855,7 @@ pub async fn stream_parquet( maintain_order: bool, ) -> DaftResult>> { let stream = stream_parquet_single( - uri.to_string(), + uri, columns, start_offset, num_rows, @@ -1125,7 +1139,7 @@ mod tests { let runtime_handle = daft_io::get_runtime(true)?; runtime_handle.block_on_current_thread(async move { let tables = stream_parquet( - file, + file.to_string(), None, None, None, diff --git a/src/daft-plan/src/builder.rs b/src/daft-plan/src/builder.rs index 982a3634a9..5291cf8dae 100644 --- a/src/daft-plan/src/builder.rs +++ b/src/daft-plan/src/builder.rs @@ -11,7 +11,11 @@ use common_io_config::IOConfig; use daft_core::join::{JoinStrategy, JoinType}; use daft_dsl::{col, ExprRef}; use daft_scan::{PhysicalScanInfo, Pushdowns, ScanOperatorRef}; -use daft_schema::schema::{Schema, SchemaRef}; +use daft_schema::{ + dtype::DataType, + field::Field, + schema::{Schema, SchemaRef}, +}; #[cfg(feature = "python")] use { crate::sink_info::{CatalogInfo, IcebergCatalogInfo}, @@ -121,21 +125,45 @@ impl LogicalPlanBuilder { pushdowns.clone().unwrap_or_default(), )); // If column selection (projection) pushdown is specified, prune unselected columns from the schema. - let output_schema = if let Some(Pushdowns { - columns: Some(columns), - .. - }) = &pushdowns - && columns.len() < schema.fields.len() - { - let pruned_upstream_schema = schema - .fields - .iter() - .filter(|&(name, _)| columns.contains(name)) - .map(|(_, field)| field.clone()) - .collect::>(); - Arc::new(Schema::new(pruned_upstream_schema)?) - } else { - schema.clone() + // If file path column is specified, add it to the schema. + let output_schema = match (&pushdowns, &scan_operator.0.file_path_column()) { + ( + Some(Pushdowns { + columns: Some(columns), + .. + }), + file_path_column_opt, + ) if columns.len() < schema.fields.len() => { + let pruned_fields = schema + .fields + .iter() + .filter(|(name, _)| columns.contains(name)) + .map(|(_, field)| field.clone()); + + let finalized_fields = match file_path_column_opt { + Some(file_path_column) => pruned_fields + .chain(std::iter::once(Field::new( + file_path_column.to_string(), + DataType::Utf8, + ))) + .collect::>(), + None => pruned_fields.collect::>(), + }; + Arc::new(Schema::new(finalized_fields)?) + } + (None, Some(file_path_column)) => { + let schema_with_file_path = schema + .fields + .values() + .cloned() + .chain(std::iter::once(Field::new( + file_path_column.to_string(), + DataType::Utf8, + ))) + .collect::>(); + Arc::new(Schema::new(schema_with_file_path)?) + } + _ => schema.clone(), }; let logical_plan: LogicalPlan = logical_ops::Source::new(output_schema, source_info.into()).into(); diff --git a/src/daft-scan/src/anonymous.rs b/src/daft-scan/src/anonymous.rs index 956ee1c639..78e32b1b26 100644 --- a/src/daft-scan/src/anonymous.rs +++ b/src/daft-scan/src/anonymous.rs @@ -41,6 +41,10 @@ impl ScanOperator for AnonymousScanOperator { &[] } + fn file_path_column(&self) -> Option<&str> { + None + } + fn can_absorb_filter(&self) -> bool { false } @@ -100,6 +104,7 @@ impl ScanOperator for AnonymousScanOperator { schema.clone(), storage_config.clone(), pushdowns.clone(), + None, ) .into()) }, diff --git a/src/daft-scan/src/glob.rs b/src/daft-scan/src/glob.rs index 376548f7a7..56f5cbb5c5 100644 --- a/src/daft-scan/src/glob.rs +++ b/src/daft-scan/src/glob.rs @@ -19,6 +19,7 @@ pub struct GlobScanOperator { file_format_config: Arc, schema: SchemaRef, storage_config: Arc, + file_path_column: Option, } /// Wrapper struct that implements a sync Iterator for a BoxStream @@ -130,6 +131,7 @@ impl GlobScanOperator { storage_config: Arc, infer_schema: bool, schema: Option, + file_path_column: Option, ) -> DaftResult { let first_glob_path = match glob_paths.first() { None => Err(DaftError::ValueError( @@ -247,6 +249,7 @@ impl GlobScanOperator { file_format_config, schema, storage_config, + file_path_column, }) } } @@ -260,6 +263,10 @@ impl ScanOperator for GlobScanOperator { &[] } + fn file_path_column(&self) -> Option<&str> { + self.file_path_column.as_deref() + } + fn can_absorb_filter(&self) -> bool { false } @@ -310,7 +317,6 @@ impl ScanOperator for GlobScanOperator { self.glob_paths )); let file_format = self.file_format_config.file_format(); - let files = run_glob_parallel( self.glob_paths.clone(), io_client.clone(), @@ -332,7 +338,7 @@ impl ScanOperator for GlobScanOperator { } else { None }; - + let file_path_column = self.file_path_column.clone(); // Create one ScanTask per file Ok(Box::new(files.enumerate().map(move |(idx, f)| { let FileMetadata { @@ -340,7 +346,6 @@ impl ScanOperator for GlobScanOperator { size: size_bytes, .. } = f?; - let row_group = row_groups .as_ref() .and_then(|rgs| rgs.get(idx).cloned()) @@ -361,6 +366,7 @@ impl ScanOperator for GlobScanOperator { schema.clone(), storage_config.clone(), pushdowns.clone(), + file_path_column.clone(), ) .into()) }))) diff --git a/src/daft-scan/src/lib.rs b/src/daft-scan/src/lib.rs index 10cc0c6804..d077b2bacb 100644 --- a/src/daft-scan/src/lib.rs +++ b/src/daft-scan/src/lib.rs @@ -12,6 +12,7 @@ use common_error::{DaftError, DaftResult}; use common_file_formats::FileFormatConfig; use daft_dsl::ExprRef; use daft_schema::{ + dtype::DataType, field::Field, schema::{Schema, SchemaRef}, }; @@ -65,6 +66,13 @@ pub enum Error { ffc2: Arc, }, + #[snafu(display( + "FilePathColumns were different during ScanTask::merge: {:?} vs {:?}", + fpc1, + fpc2 + ))] + DifferingFilePathColumnsInScanTaskMerge { fpc1: String, fpc2: String }, + #[snafu(display( "StorageConfigs were different during ScanTask::merge: {:?} vs {:?}", sc1, @@ -356,6 +364,7 @@ pub struct ScanTask { pub size_bytes_on_disk: Option, pub metadata: Option, pub statistics: Option, + pub file_path_column: Option, } pub type ScanTaskRef = Arc; @@ -366,6 +375,7 @@ impl ScanTask { schema: SchemaRef, storage_config: Arc, pushdowns: Pushdowns, + file_path_column: Option, ) -> Self { assert!(!sources.is_empty()); debug_assert!( @@ -406,6 +416,7 @@ impl ScanTask { size_bytes_on_disk, metadata, statistics, + file_path_column, } } @@ -440,6 +451,12 @@ impl ScanTask { p2: sc2.pushdowns.clone(), }); } + if sc1.file_path_column != sc2.file_path_column { + return Err(Error::DifferingFileFormatConfigsInScanTaskMerge { + ffc1: sc1.file_format_config.clone(), + ffc2: sc2.file_format_config.clone(), + }); + } Ok(Self::new( sc1.sources .clone() @@ -450,19 +467,43 @@ impl ScanTask { sc1.schema.clone(), sc1.storage_config.clone(), sc1.pushdowns.clone(), + sc1.file_path_column.clone(), )) } pub fn materialized_schema(&self) -> SchemaRef { - match &self.pushdowns.columns { - None => self.schema.clone(), - Some(columns) => Arc::new(Schema { + match (&self.pushdowns.columns, &self.file_path_column) { + (None, None) => self.schema.clone(), + (Some(columns), file_path_column_opt) => { + let filtered_fields = self + .schema + .fields + .clone() + .into_iter() + .filter(|(name, _)| columns.contains(name)); + + let fields = match file_path_column_opt { + Some(file_path_column) => filtered_fields + .chain(std::iter::once(( + file_path_column.to_string(), + Field::new(file_path_column.to_string(), DataType::Utf8), + ))) + .collect(), + None => filtered_fields.collect(), + }; + + Arc::new(Schema { fields }) + } + (None, Some(file_path_column)) => Arc::new(Schema { fields: self .schema .fields .clone() .into_iter() - .filter(|(name, _)| columns.contains(name)) + .chain(std::iter::once(( + file_path_column.to_string(), + Field::new(file_path_column.to_string(), DataType::Utf8), + ))) .collect(), }), } @@ -752,6 +793,7 @@ impl Display for PartitionTransform { pub trait ScanOperator: Send + Sync + Debug { fn schema(&self) -> SchemaRef; fn partitioning_keys(&self) -> &[PartitionField]; + fn file_path_column(&self) -> Option<&str>; fn can_absorb_filter(&self) -> bool; fn can_absorb_select(&self) -> bool; @@ -1000,6 +1042,7 @@ mod test { NativeStorageConfig::new_internal(false, None), ))), Pushdowns::default(), + None, ) } @@ -1025,6 +1068,7 @@ mod test { ))), false, Some(Arc::new(Schema::empty())), + None, ) .unwrap(); diff --git a/src/daft-scan/src/python.rs b/src/daft-scan/src/python.rs index fac37ccb48..ec5f771ff6 100644 --- a/src/daft-scan/src/python.rs +++ b/src/daft-scan/src/python.rs @@ -107,6 +107,7 @@ pub mod pylib { storage_config: PyStorageConfig, infer_schema: bool, schema: Option, + file_path_column: Option, ) -> PyResult { py.allow_threads(|| { let operator = Arc::new(GlobScanOperator::try_new( @@ -115,6 +116,7 @@ pub mod pylib { storage_config.into(), infer_schema, schema.map(|s| s.schema), + file_path_column, )?); Ok(Self { scan_op: ScanOperatorRef(operator), @@ -211,6 +213,9 @@ pub mod pylib { fn schema(&self) -> daft_schema::schema::SchemaRef { self.schema.clone() } + fn file_path_column(&self) -> Option<&str> { + None + } fn can_absorb_filter(&self) -> bool { self.can_absorb_filter } @@ -348,6 +353,7 @@ pub mod pylib { schema.schema, storage_config.into(), pushdowns.map(|p| p.0.as_ref().clone()).unwrap_or_default(), + None, ); Ok(Some(Self(scan_task.into()))) } @@ -380,6 +386,7 @@ pub mod pylib { schema.schema, storage_config.into(), pushdowns.map(|p| p.0.as_ref().clone()).unwrap_or_default(), + None, ); Ok(Self(scan_task.into())) } @@ -424,6 +431,7 @@ pub mod pylib { PythonStorageConfig { io_config: None }, ))), pushdowns.map(|p| p.0.as_ref().clone()).unwrap_or_default(), + None, ); Ok(Self(scan_task.into())) } diff --git a/src/daft-scan/src/scan_task_iters.rs b/src/daft-scan/src/scan_task_iters.rs index b223ee5732..b7fc1a9dbf 100644 --- a/src/daft-scan/src/scan_task_iters.rs +++ b/src/daft-scan/src/scan_task_iters.rs @@ -291,6 +291,7 @@ pub fn split_by_row_groups( t.schema.clone(), t.storage_config.clone(), t.pushdowns.clone(), + t.file_path_column.clone(), ) .into())); } diff --git a/tests/dataframe/test_creation.py b/tests/dataframe/test_creation.py index c43751f7d3..df2d9405ef 100644 --- a/tests/dataframe/test_creation.py +++ b/tests/dataframe/test_creation.py @@ -410,6 +410,43 @@ def test_create_dataframe_multiple_csvs(valid_data: list[dict[str, float]], use_ assert len(pd_df) == (len(valid_data) * 2) +def test_create_dataframe_csv_with_file_path_column(valid_data: list[dict[str, float]]) -> None: + with create_temp_filename() as fname: + with open(fname, "w") as f: + header = list(valid_data[0].keys()) + writer = csv.writer(f) + writer.writerow(header) + writer.writerows([[item[col] for col in header] for item in valid_data]) + f.flush() + + df = daft.read_csv(fname, file_path_column="file_path") + assert df.column_names == COL_NAMES + ["file_path"] + + pd_df = df.to_pandas() + assert list(pd_df.columns) == COL_NAMES + ["file_path"] + assert len(pd_df) == len(valid_data) + assert all(pd_df["file_path"] == fname) + + +def test_create_dataframe_multiple_csvs_with_file_path_column(valid_data: list[dict[str, float]]) -> None: + with create_temp_filename() as f1name, create_temp_filename() as f2name: + with open(f1name, "w") as f1, open(f2name, "w") as f2: + for f in (f1, f2): + header = list(valid_data[0].keys()) + writer = csv.writer(f) + writer.writerow(header) + writer.writerows([[item[col] for col in header] for item in valid_data]) + f.flush() + + df = daft.read_csv([f1name, f2name], file_path_column="file_path") + assert df.column_names == COL_NAMES + ["file_path"] + + pd_df = df.to_pandas() + assert list(pd_df.columns) == COL_NAMES + ["file_path"] + assert len(pd_df) == (len(valid_data) * 2) + assert pd_df["file_path"].to_list() == [f1name] * len(valid_data) + [f2name] * len(valid_data) + + @pytest.mark.parametrize("use_native_downloader", [True, False]) def test_create_dataframe_csv_generate_headers(valid_data: list[dict[str, float]], use_native_downloader) -> None: with create_temp_filename() as fname: @@ -683,6 +720,41 @@ def test_create_dataframe_multiple_jsons(valid_data: list[dict[str, float]], use assert len(pd_df) == (len(valid_data) * 2) +def test_create_dataframe_json_with_file_path_column(valid_data: list[dict[str, float]]) -> None: + with create_temp_filename() as fname: + with open(fname, "w") as f: + for data in valid_data: + f.write(json.dumps(data)) + f.write("\n") + f.flush() + + df = daft.read_json(fname, file_path_column="file_path") + assert df.column_names == COL_NAMES + ["file_path"] + + pd_df = df.to_pandas() + assert list(pd_df.columns) == COL_NAMES + ["file_path"] + assert len(pd_df) == len(valid_data) + assert all(pd_df["file_path"] == fname) + + +def test_create_dataframe_multiple_jsons_with_file_path_column(valid_data: list[dict[str, float]]) -> None: + with create_temp_filename() as f1name, create_temp_filename() as f2name: + with open(f1name, "w") as f1, open(f2name, "w") as f2: + for f in (f1, f2): + for data in valid_data: + f.write(json.dumps(data)) + f.write("\n") + f.flush() + + df = daft.read_json([f1name, f2name], file_path_column="file_path") + assert df.column_names == COL_NAMES + ["file_path"] + + pd_df = df.to_pandas() + assert list(pd_df.columns) == COL_NAMES + ["file_path"] + assert len(pd_df) == (len(valid_data) * 2) + assert pd_df["file_path"].to_list() == [f1name] * len(valid_data) + [f2name] * len(valid_data) + + @pytest.mark.parametrize("use_native_downloader", [True, False]) def test_create_dataframe_json_column_projection(valid_data: list[dict[str, float]], use_native_downloader) -> None: with create_temp_filename() as fname: @@ -905,6 +977,39 @@ def test_create_dataframe_parquet(valid_data: list[dict[str, float]]) -> None: assert len(pd_df) == len(valid_data) +def test_create_dataframe_parquet_with_file_path_column(valid_data: list[dict[str, float]]) -> None: + with create_temp_filename() as fname: + with open(fname, "w") as f: + table = pa.Table.from_pydict({col: [d[col] for d in valid_data] for col in COL_NAMES}) + papq.write_table(table, f.name) + f.flush() + + df = daft.read_parquet(fname, file_path_column="file_path") + assert df.column_names == COL_NAMES + ["file_path"] + + pd_df = df.to_pandas() + assert list(pd_df.columns) == COL_NAMES + ["file_path"] + assert len(pd_df) == len(valid_data) + assert all(pd_df["file_path"] == fname) + + +def test_create_dataframe_multiple_parquets_with_file_path_column(valid_data: list[dict[str, float]]) -> None: + with create_temp_filename() as f1name, create_temp_filename() as f2name: + with open(f1name, "w") as f1, open(f2name, "w") as f2: + for f in (f1, f2): + table = pa.Table.from_pydict({col: [d[col] for d in valid_data] for col in COL_NAMES}) + papq.write_table(table, f.name) + f.flush() + + df = daft.read_parquet([f1name, f2name], file_path_column="file_path") + assert df.column_names == COL_NAMES + ["file_path"] + + pd_df = df.to_pandas() + assert list(pd_df.columns) == COL_NAMES + ["file_path"] + assert len(pd_df) == (len(valid_data) * 2) + assert pd_df["file_path"].to_list() == [f1name] * len(valid_data) + [f2name] * len(valid_data) + + def test_create_dataframe_parquet_with_filter(valid_data: list[dict[str, float]]) -> None: with create_temp_filename() as fname: with open(fname, "w") as f: diff --git a/tests/integration/io/test_s3_reads_include_path.py b/tests/integration/io/test_s3_reads_include_path.py new file mode 100644 index 0000000000..c1a54d24a5 --- /dev/null +++ b/tests/integration/io/test_s3_reads_include_path.py @@ -0,0 +1,74 @@ +import pytest + +import daft +from tests.integration.io.conftest import minio_create_bucket + + +@pytest.mark.integration() +def test_read_parquet_from_s3_with_include_file_path_column(minio_io_config): + bucket_name = "bucket" + data = {"a": [1, 2, 3], "b": ["a", "b", "c"]} + with minio_create_bucket(minio_io_config, bucket_name=bucket_name): + file_paths = ( + daft.from_pydict(data).write_parquet(f"s3://{bucket_name}", io_config=minio_io_config).to_pydict()["path"] + ) + assert len(file_paths) == 1 + file_path = f"s3://{file_paths[0]}" + read_back = daft.read_parquet(file_path, io_config=minio_io_config, file_path_column="path") + assert read_back.to_pydict()["a"] == data["a"] + assert read_back.to_pydict()["b"] == data["b"] + assert read_back.to_pydict()["path"] == [file_path] * 3 + + +@pytest.mark.integration() +def test_read_multi_parquet_from_s3_with_include_file_path_column(minio_io_config): + bucket_name = "bucket" + data = {"a": [1, 2, 3], "b": ["a", "b", "c"]} + with minio_create_bucket(minio_io_config, bucket_name=bucket_name): + file_paths = ( + daft.from_pydict(data) + .into_partitions(3) + .write_parquet(f"s3://{bucket_name}", io_config=minio_io_config) + .to_pydict()["path"] + ) + assert len(file_paths) == 3 + file_paths = [f"s3://{path}" for path in file_paths] + read_back = daft.read_parquet(file_paths, io_config=minio_io_config, file_path_column="path") + assert read_back.to_pydict()["a"] == data["a"] + assert read_back.to_pydict()["b"] == data["b"] + assert read_back.to_pydict()["path"] == file_paths + + +@pytest.mark.integration() +def test_read_csv_from_s3_with_include_file_path_column(minio_io_config): + bucket_name = "bucket" + data = {"a": [1, 2, 3], "b": ["a", "b", "c"]} + with minio_create_bucket(minio_io_config, bucket_name=bucket_name): + file_paths = ( + daft.from_pydict(data).write_csv(f"s3://{bucket_name}", io_config=minio_io_config).to_pydict()["path"] + ) + assert len(file_paths) == 1 + file_path = f"s3://{file_paths[0]}" + read_back = daft.read_csv(file_path, io_config=minio_io_config, file_path_column="path") + assert read_back.to_pydict()["a"] == data["a"] + assert read_back.to_pydict()["b"] == data["b"] + assert read_back.to_pydict()["path"] == [file_path] * 3 + + +@pytest.mark.integration() +def test_read_multi_csv_from_s3_with_include_file_path_column(minio_io_config): + bucket_name = "bucket" + data = {"a": [1, 2, 3], "b": ["a", "b", "c"]} + with minio_create_bucket(minio_io_config, bucket_name=bucket_name): + file_paths = ( + daft.from_pydict(data) + .into_partitions(3) + .write_csv(f"s3://{bucket_name}", io_config=minio_io_config) + .to_pydict()["path"] + ) + assert len(file_paths) == 3 + file_paths = [f"s3://{path}" for path in file_paths] + read_back = daft.read_csv(file_paths, io_config=minio_io_config, file_path_column="path") + assert read_back.to_pydict()["a"] == data["a"] + assert read_back.to_pydict()["b"] == data["b"] + assert read_back.to_pydict()["path"] == file_paths From 3321272b840841b6f88ad1b602a998805dd8d3e7 Mon Sep 17 00:00:00 2001 From: Colin Ho Date: Thu, 26 Sep 2024 15:26:44 -0700 Subject: [PATCH 02/14] test if column name exists --- tests/dataframe/test_creation.py | 41 ++++++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/tests/dataframe/test_creation.py b/tests/dataframe/test_creation.py index df2d9405ef..0cc348a2df 100644 --- a/tests/dataframe/test_creation.py +++ b/tests/dataframe/test_creation.py @@ -447,6 +447,21 @@ def test_create_dataframe_multiple_csvs_with_file_path_column(valid_data: list[d assert pd_df["file_path"].to_list() == [f1name] * len(valid_data) + [f2name] * len(valid_data) +def test_create_dataframe_csv_with_file_path_column_duplicate_field_names() -> None: + with create_temp_filename() as fname: + with open(fname, "w") as f: + data = [{"path": 1, "data": "a"}, {"path": 2, "data": "b"}, {"path": 3, "data": "c"}] + header = list(data[0].keys()) + writer = csv.writer(f) + writer.writerow(header) + writer.writerows([[item[col] for col in header] for item in data]) + f.flush() + + with pytest.raises(ValueError): + # The file_path_column name is the same as a column in the table, which is not allowed + daft.read_json(fname, file_path_column="path") + + @pytest.mark.parametrize("use_native_downloader", [True, False]) def test_create_dataframe_csv_generate_headers(valid_data: list[dict[str, float]], use_native_downloader) -> None: with create_temp_filename() as fname: @@ -755,6 +770,20 @@ def test_create_dataframe_multiple_jsons_with_file_path_column(valid_data: list[ assert pd_df["file_path"].to_list() == [f1name] * len(valid_data) + [f2name] * len(valid_data) +def test_create_dataframe_json_with_file_path_column_duplicate_field_names() -> None: + with create_temp_filename() as fname: + with open(fname, "w") as f: + data = {"path": [1, 2, 3], "data": [4, 5, 6]} + for i in range(len(data["path"])): + f.write(json.dumps({k: data[k][i] for k in data})) + f.write("\n") + f.flush() + + with pytest.raises(ValueError): + # The file_path_column name is the same as a column in the table, which is not allowed + daft.read_json(fname, file_path_column="path") + + @pytest.mark.parametrize("use_native_downloader", [True, False]) def test_create_dataframe_json_column_projection(valid_data: list[dict[str, float]], use_native_downloader) -> None: with create_temp_filename() as fname: @@ -1010,6 +1039,18 @@ def test_create_dataframe_multiple_parquets_with_file_path_column(valid_data: li assert pd_df["file_path"].to_list() == [f1name] * len(valid_data) + [f2name] * len(valid_data) +def test_create_dataframe_parquet_with_file_path_column_duplicate_field_names() -> None: + with create_temp_filename() as fname: + with open(fname, "w") as f: + table = pa.Table.from_pydict({"path": [1, 2, 3], "data": [4, 5, 6]}) + papq.write_table(table, f.name) + f.flush() + + with pytest.raises(ValueError): + # The file_path_column name is the same as a column in the table, which is not allowed + daft.read_parquet(fname, file_path_column="path") + + def test_create_dataframe_parquet_with_filter(valid_data: list[dict[str, float]]) -> None: with create_temp_filename() as fname: with open(fname, "w") as f: From 355c05dea87e2caa090f6cd6fb30604fc9e84aef Mon Sep 17 00:00:00 2001 From: Colin Ho Date: Thu, 26 Sep 2024 15:48:17 -0700 Subject: [PATCH 03/14] fix typo and reduce 1 alloc --- daft/io/_parquet.py | 2 +- src/daft-csv/src/read.rs | 13 +++++++------ src/daft-json/src/local.rs | 7 ++++--- src/daft-json/src/read.rs | 13 +++++++------ src/daft-micropartition/src/micropartition.rs | 14 +++++++------- src/daft-parquet/src/read.rs | 13 +++++++------ 6 files changed, 33 insertions(+), 29 deletions(-) diff --git a/daft/io/_parquet.py b/daft/io/_parquet.py index 77653aafa5..3bb8d3730f 100644 --- a/daft/io/_parquet.py +++ b/daft/io/_parquet.py @@ -49,7 +49,7 @@ def read_parquet( file_path_column: Include the source path(s) as a column with this name. Defaults to None. use_native_downloader: Whether to use the native downloader instead of PyArrow for reading Parquet. coerce_int96_timestamp_unit: TimeUnit to coerce Int96 TimeStamps to. e.g.: [ns, us, ms], Defaults to None. - _multithreaded_io: Include the source path(s) as a column called + _multithreaded_io: Whether to use multithreading for IO threads. Setting this to False can be helpful in reducing the amount of system resources (number of connections and thread contention) when running in the Ray runner. Defaults to None, which will let Daft decide based on the runner it is currently using. diff --git a/src/daft-csv/src/read.rs b/src/daft-csv/src/read.rs index 1c3e4a46fe..60e9a50e36 100644 --- a/src/daft-csv/src/read.rs +++ b/src/daft-csv/src/read.rs @@ -80,7 +80,7 @@ pub fn read_csv_bulk( multithreaded_io: bool, max_chunks_in_flight: Option, num_parallel_tasks: usize, - file_path_column: Option, + file_path_column: Option<&str>, ) -> DaftResult> { let runtime_handle = get_runtime(multithreaded_io)?; let tables = runtime_handle.block_on_current_thread(async move { @@ -101,7 +101,7 @@ pub fn read_csv_bulk( read_options.clone(), io_client.clone(), io_stats.clone(), - file_path_column.clone(), + file_path_column.map(|s| s.to_string()), ); tokio::task::spawn(async move { read_csv_single_into_table( @@ -112,7 +112,7 @@ pub fn read_csv_bulk( io_client, io_stats, max_chunks_in_flight, - file_path_column, + file_path_column.as_deref(), ) .await }) @@ -220,7 +220,7 @@ async fn read_csv_single_into_table( io_client: Arc, io_stats: Option, max_chunks_in_flight: Option, - file_path_column: Option, + file_path_column: Option<&str>, ) -> DaftResult
{ let predicate = convert_options .as_ref() @@ -340,9 +340,10 @@ async fn read_csv_single_into_table( } }?; if let Some(file_path_col_name) = file_path_column { + let trimmed = uri.trim_start_matches("file://"); let file_paths_column = Utf8Array::from_iter( - file_path_col_name.as_str(), - std::iter::repeat(Some(uri.trim_start_matches("file://"))).take(output_table.len()), + file_path_col_name, + std::iter::repeat(Some(trimmed)).take(output_table.len()), ) .into_series(); return output_table.union(&Table::from_nonempty_columns(vec![file_paths_column])?); diff --git a/src/daft-json/src/local.rs b/src/daft-json/src/local.rs index 9fff94703c..61562e764c 100644 --- a/src/daft-json/src/local.rs +++ b/src/daft-json/src/local.rs @@ -28,7 +28,7 @@ pub fn read_json_local( parse_options: Option, read_options: Option, max_chunks_in_flight: Option, - file_path_column: Option, + file_path_column: Option<&str>, ) -> DaftResult
{ let uri = uri.trim_start_matches("file://"); let file = std::fs::File::open(uri)?; @@ -46,9 +46,10 @@ pub fn read_json_local( )?; let output_table = reader.finish()?; if let Some(file_path_col_name) = file_path_column { + let trimmed = uri.trim_start_matches("file://"); let file_paths_column = Utf8Array::from_iter( - file_path_col_name.as_str(), - std::iter::repeat(Some(uri.trim_start_matches("file://"))).take(output_table.len()), + file_path_col_name, + std::iter::repeat(Some(trimmed)).take(output_table.len()), ) .into_series(); return output_table.union(&Table::from_nonempty_columns(vec![file_paths_column])?); diff --git a/src/daft-json/src/read.rs b/src/daft-json/src/read.rs index 1a4a06bc4c..007f530109 100644 --- a/src/daft-json/src/read.rs +++ b/src/daft-json/src/read.rs @@ -73,7 +73,7 @@ pub fn read_json_bulk( multithreaded_io: bool, max_chunks_in_flight: Option, num_parallel_tasks: usize, - file_path_column: Option, + file_path_column: Option<&str>, ) -> DaftResult> { let runtime_handle = get_runtime(multithreaded_io)?; let tables = runtime_handle.block_on_current_thread(async move { @@ -94,7 +94,7 @@ pub fn read_json_bulk( read_options.clone(), io_client.clone(), io_stats.clone(), - file_path_column.clone(), + file_path_column.map(|s| s.to_string()), ); tokio::task::spawn(async move { let table = read_json_single_into_table( @@ -105,7 +105,7 @@ pub fn read_json_bulk( io_client, io_stats, max_chunks_in_flight, - file_path_column, + file_path_column.as_deref(), ) .await?; DaftResult::Ok(table) @@ -189,7 +189,7 @@ async fn read_json_single_into_table( io_client: Arc, io_stats: Option, max_chunks_in_flight: Option, - file_path_column: Option, + file_path_column: Option<&str>, ) -> DaftResult
{ let (source_type, fixed_uri) = parse_url(uri)?; let is_compressed = CompressionCodec::from_uri(uri).is_some(); @@ -303,9 +303,10 @@ async fn read_json_single_into_table( } }?; if let Some(file_path_col_name) = file_path_column { + let trimmed = uri.trim_start_matches("file://"); let file_paths_column = Utf8Array::from_iter( - file_path_col_name.as_str(), - std::iter::repeat(Some(uri.trim_start_matches("file://"))).take(output_table.len()), + file_path_col_name, + std::iter::repeat(Some(trimmed)).take(output_table.len()), ) .into_series(); return output_table.union(&Table::from_nonempty_columns(vec![file_paths_column])?); diff --git a/src/daft-micropartition/src/micropartition.rs b/src/daft-micropartition/src/micropartition.rs index 28896d97a6..8ab12f7f5a 100644 --- a/src/daft-micropartition/src/micropartition.rs +++ b/src/daft-micropartition/src/micropartition.rs @@ -181,7 +181,7 @@ fn materialize_scan_task( metadatas, Some(delete_map), *chunk_size, - scan_task.file_path_column.clone(), + scan_task.file_path_column.as_deref(), ) .context(DaftCoreComputeSnafu)? } @@ -236,7 +236,7 @@ fn materialize_scan_task( native_storage_config.multithreaded_io, None, 8, - scan_task.file_path_column.clone(), + scan_task.file_path_column.as_deref(), ) .context(DaftCoreComputeSnafu)? } @@ -267,7 +267,7 @@ fn materialize_scan_task( native_storage_config.multithreaded_io, None, 8, - scan_task.file_path_column.clone(), + scan_task.file_path_column.as_deref(), ) .context(DaftCoreComputeSnafu)? } @@ -656,7 +656,7 @@ impl MicroPartition { field_id_mapping.clone(), parquet_metadata, chunk_size, - scan_task.file_path_column.clone(), + scan_task.file_path_column.as_deref(), ) .context(DaftCoreComputeSnafu) } @@ -1039,7 +1039,7 @@ fn _read_parquet_into_loaded_micropartition>( catalog_provided_schema: Option, field_id_mapping: Option>>, chunk_size: Option, - file_path_column: Option, + file_path_column: Option<&str>, ) -> DaftResult { let delete_map = iceberg_delete_files .map(|files| { @@ -1125,7 +1125,7 @@ pub(crate) fn read_parquet_into_micropartition>( field_id_mapping: Option>>, parquet_metadata: Option>>, chunk_size: Option, - file_path_column: Option, + file_path_column: Option<&str>, ) -> DaftResult { if let Some(so) = start_offset && so > 0 @@ -1315,7 +1315,7 @@ pub(crate) fn read_parquet_into_micropartition>( }), num_rows, ), - file_path_column, + file_path_column.map(|s| s.to_string()), ); let fill_map = scan_task.partition_spec().map(|pspec| pspec.to_fill_map()); diff --git a/src/daft-parquet/src/read.rs b/src/daft-parquet/src/read.rs index 40c9164785..e2bc923cb5 100644 --- a/src/daft-parquet/src/read.rs +++ b/src/daft-parquet/src/read.rs @@ -156,7 +156,7 @@ async fn read_parquet_single( metadata: Option>, delete_rows: Option>, chunk_size: Option, - file_path_column: Option, + file_path_column: Option<&str>, ) -> DaftResult
{ let field_id_mapping_provided = field_id_mapping.is_some(); let mut columns_to_read = columns.clone(); @@ -358,9 +358,10 @@ async fn read_parquet_single( } if let Some(file_path_col_name) = file_path_column { + let trimmed = uri.trim_start_matches("file://"); let file_paths_column = Utf8Array::from_iter( - file_path_col_name.as_str(), - std::iter::repeat(Some(uri.trim_start_matches("file://"))).take(table.len()), + file_path_col_name, + std::iter::repeat(Some(trimmed)).take(table.len()), ) .into_series(); return table.union(&Table::from_nonempty_columns(vec![file_paths_column])?); @@ -762,7 +763,7 @@ pub fn read_parquet_bulk>( metadata: Option>>, delete_map: Option>>, chunk_size: Option, - file_path_column: Option, + file_path_column: Option<&str>, ) -> DaftResult> { let runtime_handle = daft_io::get_runtime(multithreaded_io)?; @@ -790,7 +791,7 @@ pub fn read_parquet_bulk>( let schema_infer_options = *schema_infer_options; let owned_field_id_mapping = field_id_mapping.clone(); let delete_rows = delete_map.as_ref().and_then(|m| m.get(&uri).cloned()); - let owned_file_path_column = file_path_column.clone(); + let owned_file_path_column = file_path_column.map(|s| s.to_string()); tokio::task::spawn(async move { read_parquet_single( &uri, @@ -806,7 +807,7 @@ pub fn read_parquet_bulk>( metadata, delete_rows, chunk_size, - owned_file_path_column, + owned_file_path_column.as_deref(), ) .await }) From 945cfc70c6d643af294549be4817d1f1783748e8 Mon Sep 17 00:00:00 2001 From: Colin Ho Date: Fri, 27 Sep 2024 11:22:26 -0700 Subject: [PATCH 04/14] oops --- src/daft-local-execution/src/sources/scan_task.rs | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/src/daft-local-execution/src/sources/scan_task.rs b/src/daft-local-execution/src/sources/scan_task.rs index d2ebbfe9cf..51101ce56d 100644 --- a/src/daft-local-execution/src/sources/scan_task.rs +++ b/src/daft-local-execution/src/sources/scan_task.rs @@ -293,9 +293,17 @@ async fn stream_scan_task( .into_series(); table = table.union(&Table::from_nonempty_columns(vec![file_paths_column])?)?; } + let casted_table = table.cast_to_schema_with_fill( + scan_task.materialized_schema().as_ref(), + scan_task + .partition_spec() + .as_ref() + .map(|pspec| pspec.to_fill_map()) + .as_ref(), + )?; let mp = Arc::new(MicroPartition::new_loaded( scan_task.materialized_schema().clone(), - Arc::new(vec![table]), + Arc::new(vec![casted_table]), scan_task.statistics.clone(), )); Ok(mp) From d133fed79cb80a8f719943dbf36026748ab2f7f3 Mon Sep 17 00:00:00 2001 From: Colin Ho Date: Fri, 11 Oct 2024 10:10:42 -0700 Subject: [PATCH 05/14] include in partition spec --- src/daft-csv/src/read.rs | 24 +----------------- src/daft-json/src/local.rs | 13 +--------- src/daft-json/src/read.rs | 25 +------------------ .../src/sources/scan_task.rs | 18 +------------ src/daft-micropartition/src/micropartition.rs | 12 +-------- src/daft-parquet/src/python.rs | 1 - src/daft-parquet/src/read.rs | 15 ----------- src/daft-scan/src/glob.rs | 15 ++++++++++- src/daft-table/src/lib.rs | 2 ++ 9 files changed, 21 insertions(+), 104 deletions(-) diff --git a/src/daft-csv/src/read.rs b/src/daft-csv/src/read.rs index 60e9a50e36..5216d29f39 100644 --- a/src/daft-csv/src/read.rs +++ b/src/daft-csv/src/read.rs @@ -63,7 +63,6 @@ pub fn read_csv( io_client, io_stats, max_chunks_in_flight, - None, ) .await }) @@ -80,28 +79,18 @@ pub fn read_csv_bulk( multithreaded_io: bool, max_chunks_in_flight: Option, num_parallel_tasks: usize, - file_path_column: Option<&str>, ) -> DaftResult> { let runtime_handle = get_runtime(multithreaded_io)?; let tables = runtime_handle.block_on_current_thread(async move { // Launch a read task per URI, throttling the number of concurrent file reads to num_parallel tasks. let task_stream = futures::stream::iter(uris.iter().map(|uri| { - let ( - uri, - convert_options, - parse_options, - read_options, - io_client, - io_stats, - file_path_column, - ) = ( + let (uri, convert_options, parse_options, read_options, io_client, io_stats) = ( uri.to_string(), convert_options.clone(), parse_options.clone(), read_options.clone(), io_client.clone(), io_stats.clone(), - file_path_column.map(|s| s.to_string()), ); tokio::task::spawn(async move { read_csv_single_into_table( @@ -112,7 +101,6 @@ pub fn read_csv_bulk( io_client, io_stats, max_chunks_in_flight, - file_path_column.as_deref(), ) .await }) @@ -220,7 +208,6 @@ async fn read_csv_single_into_table( io_client: Arc, io_stats: Option, max_chunks_in_flight: Option, - file_path_column: Option<&str>, ) -> DaftResult
{ let predicate = convert_options .as_ref() @@ -339,15 +326,6 @@ async fn read_csv_single_into_table( Ok(concated_table) } }?; - if let Some(file_path_col_name) = file_path_column { - let trimmed = uri.trim_start_matches("file://"); - let file_paths_column = Utf8Array::from_iter( - file_path_col_name, - std::iter::repeat(Some(trimmed)).take(output_table.len()), - ) - .into_series(); - return output_table.union(&Table::from_nonempty_columns(vec![file_paths_column])?); - } Ok(output_table) } diff --git a/src/daft-json/src/local.rs b/src/daft-json/src/local.rs index 61562e764c..224c94f24f 100644 --- a/src/daft-json/src/local.rs +++ b/src/daft-json/src/local.rs @@ -28,7 +28,6 @@ pub fn read_json_local( parse_options: Option, read_options: Option, max_chunks_in_flight: Option, - file_path_column: Option<&str>, ) -> DaftResult
{ let uri = uri.trim_start_matches("file://"); let file = std::fs::File::open(uri)?; @@ -44,17 +43,7 @@ pub fn read_json_local( read_options, max_chunks_in_flight, )?; - let output_table = reader.finish()?; - if let Some(file_path_col_name) = file_path_column { - let trimmed = uri.trim_start_matches("file://"); - let file_paths_column = Utf8Array::from_iter( - file_path_col_name, - std::iter::repeat(Some(trimmed)).take(output_table.len()), - ) - .into_series(); - return output_table.union(&Table::from_nonempty_columns(vec![file_paths_column])?); - } - Ok(output_table) + reader.finish() } struct JsonReader<'a> { diff --git a/src/daft-json/src/read.rs b/src/daft-json/src/read.rs index 007f530109..de7b153b90 100644 --- a/src/daft-json/src/read.rs +++ b/src/daft-json/src/read.rs @@ -56,7 +56,6 @@ pub fn read_json( io_client, io_stats, max_chunks_in_flight, - None, ) .await }) @@ -73,28 +72,18 @@ pub fn read_json_bulk( multithreaded_io: bool, max_chunks_in_flight: Option, num_parallel_tasks: usize, - file_path_column: Option<&str>, ) -> DaftResult> { let runtime_handle = get_runtime(multithreaded_io)?; let tables = runtime_handle.block_on_current_thread(async move { // Launch a read task per URI, throttling the number of concurrent file reads to num_parallel tasks. let task_stream = futures::stream::iter(uris.iter().map(|uri| { - let ( - uri, - convert_options, - parse_options, - read_options, - io_client, - io_stats, - file_path_column, - ) = ( + let (uri, convert_options, parse_options, read_options, io_client, io_stats) = ( uri.to_string(), convert_options.clone(), parse_options.clone(), read_options.clone(), io_client.clone(), io_stats.clone(), - file_path_column.map(|s| s.to_string()), ); tokio::task::spawn(async move { let table = read_json_single_into_table( @@ -105,7 +94,6 @@ pub fn read_json_bulk( io_client, io_stats, max_chunks_in_flight, - file_path_column.as_deref(), ) .await?; DaftResult::Ok(table) @@ -189,7 +177,6 @@ async fn read_json_single_into_table( io_client: Arc, io_stats: Option, max_chunks_in_flight: Option, - file_path_column: Option<&str>, ) -> DaftResult
{ let (source_type, fixed_uri) = parse_url(uri)?; let is_compressed = CompressionCodec::from_uri(uri).is_some(); @@ -200,7 +187,6 @@ async fn read_json_single_into_table( parse_options, read_options, max_chunks_in_flight, - file_path_column, ); } @@ -302,15 +288,6 @@ async fn read_json_single_into_table( Ok(concated_table) } }?; - if let Some(file_path_col_name) = file_path_column { - let trimmed = uri.trim_start_matches("file://"); - let file_paths_column = Utf8Array::from_iter( - file_path_col_name, - std::iter::repeat(Some(trimmed)).take(output_table.len()), - ) - .into_series(); - return output_table.union(&Table::from_nonempty_columns(vec![file_paths_column])?); - } Ok(output_table) } diff --git a/src/daft-local-execution/src/sources/scan_task.rs b/src/daft-local-execution/src/sources/scan_task.rs index 51101ce56d..5b9f95d96e 100644 --- a/src/daft-local-execution/src/sources/scan_task.rs +++ b/src/daft-local-execution/src/sources/scan_task.rs @@ -2,14 +2,12 @@ use std::sync::Arc; use common_error::DaftResult; use common_file_formats::{FileFormatConfig, ParquetSourceConfig}; -use daft_core::{prelude::Utf8Array, series::IntoSeries}; use daft_csv::{CsvConvertOptions, CsvParseOptions, CsvReadOptions}; use daft_io::IOStatsRef; use daft_json::{JsonConvertOptions, JsonParseOptions, JsonReadOptions}; use daft_micropartition::MicroPartition; use daft_parquet::read::ParquetSchemaInferenceOptions; use daft_scan::{storage_config::StorageConfig, ChunkSpec, ScanTask}; -use daft_table::Table; use futures::{Stream, StreamExt}; use tokio_stream::wrappers::ReceiverStream; use tracing::instrument; @@ -277,22 +275,8 @@ async fn stream_scan_task( } }; - let url = if scan_task.file_path_column.is_some() { - Some(url.to_string()) - } else { - None - }; Ok(table_stream.map(move |table| { - let mut table = table?; - if let Some(file_path_col_name) = scan_task.file_path_column.as_ref() { - let trimmed = url.as_ref().unwrap().trim_start_matches("file://"); - let file_paths_column = Utf8Array::from_iter( - file_path_col_name, - std::iter::repeat(Some(trimmed)).take(table.len()), - ) - .into_series(); - table = table.union(&Table::from_nonempty_columns(vec![file_paths_column])?)?; - } + let table = table?; let casted_table = table.cast_to_schema_with_fill( scan_task.materialized_schema().as_ref(), scan_task diff --git a/src/daft-micropartition/src/micropartition.rs b/src/daft-micropartition/src/micropartition.rs index 8ab12f7f5a..68224d85cf 100644 --- a/src/daft-micropartition/src/micropartition.rs +++ b/src/daft-micropartition/src/micropartition.rs @@ -181,7 +181,6 @@ fn materialize_scan_task( metadatas, Some(delete_map), *chunk_size, - scan_task.file_path_column.as_deref(), ) .context(DaftCoreComputeSnafu)? } @@ -236,7 +235,6 @@ fn materialize_scan_task( native_storage_config.multithreaded_io, None, 8, - scan_task.file_path_column.as_deref(), ) .context(DaftCoreComputeSnafu)? } @@ -267,7 +265,6 @@ fn materialize_scan_task( native_storage_config.multithreaded_io, None, 8, - scan_task.file_path_column.as_deref(), ) .context(DaftCoreComputeSnafu)? } @@ -498,7 +495,7 @@ fn materialize_scan_task( // If there is a partition spec and partition values aren't duplicated in the data, inline the partition values // into the table when casting the schema. let fill_map = scan_task.partition_spec().map(|pspec| pspec.to_fill_map()); - + println!("fill_map: {:?}", fill_map); table_values = table_values .iter() .map(|tbl| tbl.cast_to_schema_with_fill(cast_to_schema.as_ref(), fill_map.as_ref())) @@ -866,7 +863,6 @@ pub(crate) fn read_csv_into_micropartition( multithreaded_io, None, 8, - None, ) .context(DaftCoreComputeSnafu)?; @@ -916,7 +912,6 @@ pub(crate) fn read_json_into_micropartition( multithreaded_io, None, 8, - None, ) .context(DaftCoreComputeSnafu)?; @@ -993,7 +988,6 @@ fn _read_delete_files( None, None, None, - None, )?; let mut delete_map: HashMap> = @@ -1039,7 +1033,6 @@ fn _read_parquet_into_loaded_micropartition>( catalog_provided_schema: Option, field_id_mapping: Option>>, chunk_size: Option, - file_path_column: Option<&str>, ) -> DaftResult { let delete_map = iceberg_delete_files .map(|files| { @@ -1074,7 +1067,6 @@ fn _read_parquet_into_loaded_micropartition>( None, delete_map, chunk_size, - file_path_column, )?; // Prefer using the `catalog_provided_schema` but fall back onto inferred schema from Parquet files @@ -1162,7 +1154,6 @@ pub(crate) fn read_parquet_into_micropartition>( catalog_provided_schema, field_id_mapping, chunk_size, - file_path_column, ); } let runtime_handle = get_runtime(multithreaded_io)?; @@ -1346,7 +1337,6 @@ pub(crate) fn read_parquet_into_micropartition>( catalog_provided_schema, field_id_mapping, chunk_size, - file_path_column, ) } } diff --git a/src/daft-parquet/src/python.rs b/src/daft-parquet/src/python.rs index d9616b0978..2d965053c2 100644 --- a/src/daft-parquet/src/python.rs +++ b/src/daft-parquet/src/python.rs @@ -170,7 +170,6 @@ pub mod pylib { None, None, None, - None, )? .into_iter() .map(|v| v.into()) diff --git a/src/daft-parquet/src/read.rs b/src/daft-parquet/src/read.rs index e3eac7f627..3b6c498cf6 100644 --- a/src/daft-parquet/src/read.rs +++ b/src/daft-parquet/src/read.rs @@ -156,7 +156,6 @@ async fn read_parquet_single( metadata: Option>, delete_rows: Option>, chunk_size: Option, - file_path_column: Option<&str>, ) -> DaftResult
{ let field_id_mapping_provided = field_id_mapping.is_some(); let mut columns_to_read = columns.clone(); @@ -357,16 +356,6 @@ async fn read_parquet_single( .into()); } - if let Some(file_path_col_name) = file_path_column { - let trimmed = uri.trim_start_matches("file://"); - let file_paths_column = Utf8Array::from_iter( - file_path_col_name, - std::iter::repeat(Some(trimmed)).take(table.len()), - ) - .into_series(); - return table.union(&Table::from_nonempty_columns(vec![file_paths_column])?); - } - Ok(table) } @@ -694,7 +683,6 @@ pub fn read_parquet( metadata, None, None, - None, ) .await }) @@ -763,7 +751,6 @@ pub fn read_parquet_bulk>( metadata: Option>>, delete_map: Option>>, chunk_size: Option, - file_path_column: Option<&str>, ) -> DaftResult> { let runtime_handle = daft_io::get_runtime(multithreaded_io)?; @@ -791,7 +778,6 @@ pub fn read_parquet_bulk>( let schema_infer_options = *schema_infer_options; let owned_field_id_mapping = field_id_mapping.clone(); let delete_rows = delete_map.as_ref().and_then(|m| m.get(&uri).cloned()); - let owned_file_path_column = file_path_column.map(|s| s.to_string()); tokio::task::spawn(async move { read_parquet_single( &uri, @@ -807,7 +793,6 @@ pub fn read_parquet_bulk>( metadata, delete_rows, chunk_size, - owned_file_path_column.as_deref(), ) .await }) diff --git a/src/daft-scan/src/glob.rs b/src/daft-scan/src/glob.rs index 56f5cbb5c5..28c1ca1686 100644 --- a/src/daft-scan/src/glob.rs +++ b/src/daft-scan/src/glob.rs @@ -2,10 +2,13 @@ use std::{sync::Arc, vec}; use common_error::{DaftError, DaftResult}; use common_file_formats::{CsvSourceConfig, FileFormat, FileFormatConfig, ParquetSourceConfig}; +use daft_core::{prelude::Utf8Array, series::IntoSeries}; use daft_csv::CsvParseOptions; use daft_io::{parse_url, FileMetadata, IOClient, IOStatsContext, IOStatsRef, RuntimeRef}; use daft_parquet::read::ParquetSchemaInferenceOptions; use daft_schema::schema::SchemaRef; +use daft_stats::PartitionSpec; +use daft_table::Table; use futures::{stream::BoxStream, StreamExt, TryStreamExt}; use snafu::Snafu; @@ -346,6 +349,16 @@ impl ScanOperator for GlobScanOperator { size: size_bytes, .. } = f?; + let partition_spec = if let Some(fp_col) = &file_path_column { + let trimmed = path.trim_start_matches("file://"); + let file_paths_column_series = + Utf8Array::from_iter(fp_col, std::iter::once(Some(trimmed))).into_series(); + Some(PartitionSpec { + keys: Table::from_nonempty_columns(vec![file_paths_column_series])?, + }) + } else { + None + }; let row_group = row_groups .as_ref() .and_then(|rgs| rgs.get(idx).cloned()) @@ -358,7 +371,7 @@ impl ScanOperator for GlobScanOperator { size_bytes, iceberg_delete_files: None, metadata: None, - partition_spec: None, + partition_spec, statistics: None, parquet_metadata: None, }], diff --git a/src/daft-table/src/lib.rs b/src/daft-table/src/lib.rs index 3669fda3f5..ff9f9c0857 100644 --- a/src/daft-table/src/lib.rs +++ b/src/daft-table/src/lib.rs @@ -647,7 +647,9 @@ impl Table { schema: &Schema, fill_map: Option<&HashMap<&str, ExprRef>>, ) -> DaftResult { + println!("schema: {:?}", schema); let current_col_names = HashSet::<_>::from_iter(self.column_names()); + println!("current_col_names: {:?}", current_col_names); let null_lit = null_lit(); let exprs: Vec<_> = schema .fields From 2c59e26211044e1adc7c2d50c498c81f303d1993 Mon Sep 17 00:00:00 2001 From: Colin Ho Date: Fri, 11 Oct 2024 10:14:43 -0700 Subject: [PATCH 06/14] cleanup --- src/daft-csv/src/read.rs | 29 ++++++++--------- src/daft-json/src/read.rs | 32 ++++++++----------- src/daft-micropartition/src/micropartition.rs | 1 - 3 files changed, 27 insertions(+), 35 deletions(-) diff --git a/src/daft-csv/src/read.rs b/src/daft-csv/src/read.rs index 5216d29f39..33a1c6cd8c 100644 --- a/src/daft-csv/src/read.rs +++ b/src/daft-csv/src/read.rs @@ -310,23 +310,20 @@ async fn read_csv_single_into_table( .into_iter() .collect::>>()?; // Handle empty table case. - let output_table = { - if collected_tables.is_empty() { - return Table::empty(Some(schema)); - } + if collected_tables.is_empty() { + return Table::empty(Some(schema)); + } - // // TODO(Clark): Don't concatenate all chunks from a file into a single table, since MicroPartition is natively chunked. - let concated_table = tables_concat(collected_tables)?; - if let Some(limit) = limit - && concated_table.len() > limit - { - // apply head in case that last chunk went over limit - concated_table.head(limit) - } else { - Ok(concated_table) - } - }?; - Ok(output_table) + // // TODO(Clark): Don't concatenate all chunks from a file into a single table, since MicroPartition is natively chunked. + let concated_table = tables_concat(collected_tables)?; + if let Some(limit) = limit + && concated_table.len() > limit + { + // apply head in case that last chunk went over limit + concated_table.head(limit) + } else { + Ok(concated_table) + } } async fn stream_csv_single( diff --git a/src/daft-json/src/read.rs b/src/daft-json/src/read.rs index de7b153b90..133c49c31f 100644 --- a/src/daft-json/src/read.rs +++ b/src/daft-json/src/read.rs @@ -271,24 +271,20 @@ async fn read_json_single_into_table( .into_iter() .collect::>>()?; // Handle empty table case. - let output_table = { - if collected_tables.is_empty() { - let daft_schema = Arc::new(Schema::try_from(&schema)?); - return Table::empty(Some(daft_schema)); - } - - // // TODO(Clark): Don't concatenate all chunks from a file into a single table, since MicroPartition is natively chunked. - let concated_table = tables_concat(collected_tables)?; - if let Some(limit) = limit - && concated_table.len() > limit - { - // apply head in case that last chunk went over limit - concated_table.head(limit) - } else { - Ok(concated_table) - } - }?; - Ok(output_table) + if collected_tables.is_empty() { + let daft_schema = Arc::new(Schema::try_from(&schema)?); + return Table::empty(Some(daft_schema)); + } + // // TODO(Clark): Don't concatenate all chunks from a file into a single table, since MicroPartition is natively chunked. + let concated_table = tables_concat(collected_tables)?; + if let Some(limit) = limit + && concated_table.len() > limit + { + // apply head in case that last chunk went over limit + concated_table.head(limit) + } else { + Ok(concated_table) + } } pub async fn stream_json( diff --git a/src/daft-micropartition/src/micropartition.rs b/src/daft-micropartition/src/micropartition.rs index 68224d85cf..2d1ae4f0e1 100644 --- a/src/daft-micropartition/src/micropartition.rs +++ b/src/daft-micropartition/src/micropartition.rs @@ -495,7 +495,6 @@ fn materialize_scan_task( // If there is a partition spec and partition values aren't duplicated in the data, inline the partition values // into the table when casting the schema. let fill_map = scan_task.partition_spec().map(|pspec| pspec.to_fill_map()); - println!("fill_map: {:?}", fill_map); table_values = table_values .iter() .map(|tbl| tbl.cast_to_schema_with_fill(cast_to_schema.as_ref(), fill_map.as_ref())) From ebe6be798d1b05a403bf1a170aa097bca2c434d0 Mon Sep 17 00:00:00 2001 From: Colin Ho Date: Fri, 11 Oct 2024 10:15:33 -0700 Subject: [PATCH 07/14] cleanup --- src/daft-json/src/read.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/daft-json/src/read.rs b/src/daft-json/src/read.rs index 133c49c31f..7396e6ca04 100644 --- a/src/daft-json/src/read.rs +++ b/src/daft-json/src/read.rs @@ -168,7 +168,6 @@ pub(crate) fn tables_concat(mut tables: Vec
) -> DaftResult
{ ) } -#[allow(clippy::too_many_arguments)] async fn read_json_single_into_table( uri: &str, convert_options: Option, From 3e222da610654421ccb59347424ecb497599e2bf Mon Sep 17 00:00:00 2001 From: Colin Ho Date: Fri, 11 Oct 2024 10:55:31 -0700 Subject: [PATCH 08/14] add partitioning key --- src/daft-scan/src/glob.rs | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/src/daft-scan/src/glob.rs b/src/daft-scan/src/glob.rs index aec4e7441a..225ae0d203 100644 --- a/src/daft-scan/src/glob.rs +++ b/src/daft-scan/src/glob.rs @@ -6,7 +6,7 @@ use daft_core::{prelude::Utf8Array, series::IntoSeries}; use daft_csv::CsvParseOptions; use daft_io::{parse_url, FileMetadata, IOClient, IOStatsContext, IOStatsRef, RuntimeRef}; use daft_parquet::read::ParquetSchemaInferenceOptions; -use daft_schema::schema::SchemaRef; +use daft_schema::{dtype::DataType, field::Field, schema::SchemaRef}; use daft_stats::PartitionSpec; use daft_table::Table; use futures::{stream::BoxStream, StreamExt, TryStreamExt}; @@ -23,6 +23,7 @@ pub struct GlobScanOperator { schema: SchemaRef, storage_config: Arc, file_path_column: Option, + partitioning_keys: Vec, } /// Wrapper struct that implements a sync Iterator for a BoxStream @@ -167,6 +168,13 @@ impl GlobScanOperator { } .into()), }?; + let partitioning_keys = if let Some(fp_col) = &file_path_column { + let partition_field = + PartitionField::new(Field::new(fp_col, DataType::Utf8), None, None)?; + vec![partition_field] + } else { + vec![] + }; let schema = match infer_schema { true => { @@ -253,6 +261,7 @@ impl GlobScanOperator { schema, storage_config, file_path_column, + partitioning_keys, }) } } @@ -263,7 +272,7 @@ impl ScanOperator for GlobScanOperator { } fn partitioning_keys(&self) -> &[PartitionField] { - &[] + &self.partitioning_keys } fn file_path_column(&self) -> Option<&str> { From 115d3425b220de177c45f68ade4da3158fff9643 Mon Sep 17 00:00:00 2001 From: Colin Ho Date: Fri, 11 Oct 2024 11:26:47 -0700 Subject: [PATCH 09/14] partition pruning --- src/daft-scan/src/glob.rs | 101 +++++++++++++++++++++++--------------- 1 file changed, 62 insertions(+), 39 deletions(-) diff --git a/src/daft-scan/src/glob.rs b/src/daft-scan/src/glob.rs index 225ae0d203..43b8d791be 100644 --- a/src/daft-scan/src/glob.rs +++ b/src/daft-scan/src/glob.rs @@ -352,45 +352,68 @@ impl ScanOperator for GlobScanOperator { }; let file_path_column = self.file_path_column.clone(); // Create one ScanTask per file - Ok(Box::new(files.enumerate().map(move |(idx, f)| { - let FileMetadata { - filepath: path, - size: size_bytes, - .. - } = f?; - let partition_spec = if let Some(fp_col) = &file_path_column { - let trimmed = path.trim_start_matches("file://"); - let file_paths_column_series = - Utf8Array::from_iter(fp_col, std::iter::once(Some(trimmed))).into_series(); - Some(PartitionSpec { - keys: Table::from_nonempty_columns(vec![file_paths_column_series])?, - }) - } else { - None - }; - let row_group = row_groups - .as_ref() - .and_then(|rgs| rgs.get(idx).cloned()) - .flatten(); - let chunk_spec = row_group.map(ChunkSpec::Parquet); - Ok(ScanTask::new( - vec![DataSource::File { - path, - chunk_spec, - size_bytes, - iceberg_delete_files: None, - metadata: None, - partition_spec, - statistics: None, - parquet_metadata: None, - }], - file_format_config.clone(), - schema.clone(), - storage_config.clone(), - pushdowns.clone(), - file_path_column.clone(), - ) - .into()) + Ok(Box::new(files.enumerate().filter_map(move |(idx, f)| { + let scan_task_result = (|| { + let FileMetadata { + filepath: path, + size: size_bytes, + .. + } = f?; + let partition_spec = if let Some(fp_col) = &file_path_column { + let trimmed = path.trim_start_matches("file://"); + let file_paths_column_series = + Utf8Array::from_iter(fp_col, std::iter::once(Some(trimmed))).into_series(); + let file_paths_table = + Table::from_nonempty_columns(vec![file_paths_column_series; 1])?; + + if let Some(ref partition_filters) = pushdowns.partition_filters { + let eval_pred = + file_paths_table.eval_expression_list(&[partition_filters.clone()])?; + assert_eq!(eval_pred.num_columns(), 1); + let series = eval_pred.get_column_by_index(0)?; + assert_eq!(series.data_type(), &daft_core::datatypes::DataType::Boolean); + let boolean = series.bool()?; + assert_eq!(boolean.len(), 1); + let value = boolean.get(0); + match value { + None | Some(false) => return Ok(None), + Some(true) => {} + } + } + Some(PartitionSpec { + keys: file_paths_table, + }) + } else { + None + }; + let row_group = row_groups + .as_ref() + .and_then(|rgs| rgs.get(idx).cloned()) + .flatten(); + let chunk_spec = row_group.map(ChunkSpec::Parquet); + Ok(Some(ScanTask::new( + vec![DataSource::File { + path, + chunk_spec, + size_bytes, + iceberg_delete_files: None, + metadata: None, + partition_spec, + statistics: None, + parquet_metadata: None, + }], + file_format_config.clone(), + schema.clone(), + storage_config.clone(), + pushdowns.clone(), + file_path_column.clone(), + ))) + })(); + match scan_task_result { + Ok(Some(scan_task)) => Some(Ok(scan_task.into())), + Ok(None) => None, + Err(e) => Some(Err(e)), + } }))) } } From d73f6bb9457054969629f155a3991308187f3b75 Mon Sep 17 00:00:00 2001 From: Colin Ho Date: Fri, 11 Oct 2024 12:17:35 -0700 Subject: [PATCH 10/14] add pushdown tests --- tests/dataframe/test_creation.py | 54 ++++++++++++++++++++++++++++++++ 1 file changed, 54 insertions(+) diff --git a/tests/dataframe/test_creation.py b/tests/dataframe/test_creation.py index d0f972fa84..9265c405cd 100644 --- a/tests/dataframe/test_creation.py +++ b/tests/dataframe/test_creation.py @@ -441,6 +441,25 @@ def test_create_dataframe_multiple_csvs_with_file_path_column(valid_data: list[d assert pd_df["file_path"].to_list() == [f1name] * len(valid_data) + [f2name] * len(valid_data) +def test_create_dataframe_csv_with_file_path_column_and_pushdowns(valid_data: list[dict[str, float]]) -> None: + with create_temp_filename() as f1name, create_temp_filename() as f2name: + with open(f1name, "w") as f1, open(f2name, "w") as f2: + for f in (f1, f2): + header = list(valid_data[0].keys()) + writer = csv.writer(f) + writer.writerow(header) + writer.writerows([[item[col] for col in header] for item in valid_data]) + f.flush() + + df = daft.read_csv([f1name, f2name], file_path_column="file_path").where(daft.col("file_path") == f1name) + assert df.column_names == COL_NAMES + ["file_path"] + + pd_df = df.to_pandas() + assert list(pd_df.columns) == COL_NAMES + ["file_path"] + assert len(pd_df) == len(valid_data) + assert pd_df["file_path"].to_list() == [f1name] * len(valid_data) + + def test_create_dataframe_csv_with_file_path_column_duplicate_field_names() -> None: with create_temp_filename() as fname: with open(fname, "w") as f: @@ -764,6 +783,24 @@ def test_create_dataframe_multiple_jsons_with_file_path_column(valid_data: list[ assert pd_df["file_path"].to_list() == [f1name] * len(valid_data) + [f2name] * len(valid_data) +def test_create_dataframe_json_with_file_path_column_and_pushdowns(valid_data: list[dict[str, float]]) -> None: + with create_temp_filename() as f1name, create_temp_filename() as f2name: + with open(f1name, "w") as f1, open(f2name, "w") as f2: + for f in (f1, f2): + for data in valid_data: + f.write(json.dumps(data)) + f.write("\n") + f.flush() + + df = daft.read_json([f1name, f2name], file_path_column="file_path").where(daft.col("file_path") == f1name) + assert df.column_names == COL_NAMES + ["file_path"] + + pd_df = df.to_pandas() + assert list(pd_df.columns) == COL_NAMES + ["file_path"] + assert len(pd_df) == len(valid_data) + assert pd_df["file_path"].to_list() == [f1name] * len(valid_data) + + def test_create_dataframe_json_with_file_path_column_duplicate_field_names() -> None: with create_temp_filename() as fname: with open(fname, "w") as f: @@ -1033,6 +1070,23 @@ def test_create_dataframe_multiple_parquets_with_file_path_column(valid_data: li assert pd_df["file_path"].to_list() == [f1name] * len(valid_data) + [f2name] * len(valid_data) +def test_create_dataframe_parquet_with_file_path_column_and_pushdowns(valid_data: list[dict[str, float]]) -> None: + with create_temp_filename() as f1name, create_temp_filename() as f2name: + with open(f1name, "w") as f1, open(f2name, "w") as f2: + for f in (f1, f2): + table = pa.Table.from_pydict({col: [d[col] for d in valid_data] for col in COL_NAMES}) + papq.write_table(table, f.name) + f.flush() + + df = daft.read_parquet([f1name, f2name], file_path_column="file_path").where(daft.col("file_path") == f1name) + assert df.column_names == COL_NAMES + ["file_path"] + + pd_df = df.to_pandas() + assert list(pd_df.columns) == COL_NAMES + ["file_path"] + assert len(pd_df) == len(valid_data) + assert pd_df["file_path"].to_list() == [f1name] * len(valid_data) + + def test_create_dataframe_parquet_with_file_path_column_duplicate_field_names() -> None: with create_temp_filename() as fname: with open(fname, "w") as f: From b5995d1858d46c5e42e33a2aa616d5b59bd7f15f Mon Sep 17 00:00:00 2001 From: Colin Ho Date: Fri, 11 Oct 2024 12:37:49 -0700 Subject: [PATCH 11/14] cleanup --- src/daft-micropartition/src/micropartition.rs | 1 + src/daft-scan/src/glob.rs | 3 ++- src/daft-table/src/lib.rs | 2 -- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/daft-micropartition/src/micropartition.rs b/src/daft-micropartition/src/micropartition.rs index 12e30089b4..e8d349e55f 100644 --- a/src/daft-micropartition/src/micropartition.rs +++ b/src/daft-micropartition/src/micropartition.rs @@ -489,6 +489,7 @@ fn materialize_scan_task( // If there is a partition spec and partition values aren't duplicated in the data, inline the partition values // into the table when casting the schema. let fill_map = scan_task.partition_spec().map(|pspec| pspec.to_fill_map()); + table_values = table_values .iter() .map(|tbl| tbl.cast_to_schema_with_fill(cast_to_schema.as_ref(), fill_map.as_ref())) diff --git a/src/daft-scan/src/glob.rs b/src/daft-scan/src/glob.rs index 43b8d791be..72f6307184 100644 --- a/src/daft-scan/src/glob.rs +++ b/src/daft-scan/src/glob.rs @@ -171,7 +171,7 @@ impl GlobScanOperator { let partitioning_keys = if let Some(fp_col) = &file_path_column { let partition_field = PartitionField::new(Field::new(fp_col, DataType::Utf8), None, None)?; - vec![partition_field] + vec![partition_field; 1] } else { vec![] }; @@ -329,6 +329,7 @@ impl ScanOperator for GlobScanOperator { self.glob_paths )); let file_format = self.file_format_config.file_format(); + let files = run_glob_parallel( self.glob_paths.clone(), io_client, diff --git a/src/daft-table/src/lib.rs b/src/daft-table/src/lib.rs index 33f3c92fe4..cf96344a53 100644 --- a/src/daft-table/src/lib.rs +++ b/src/daft-table/src/lib.rs @@ -658,9 +658,7 @@ impl Table { schema: &Schema, fill_map: Option<&HashMap<&str, ExprRef>>, ) -> DaftResult { - println!("schema: {:?}", schema); let current_col_names = HashSet::<_>::from_iter(self.column_names()); - println!("current_col_names: {:?}", current_col_names); let null_lit = null_lit(); let exprs: Vec<_> = schema .fields From 6c65e635ae6f0fb378a27579b0f678308cd31df5 Mon Sep 17 00:00:00 2001 From: Colin Ho Date: Fri, 11 Oct 2024 14:32:14 -0700 Subject: [PATCH 12/14] Update src/daft-scan/src/lib.rs Co-authored-by: Desmond Cheong --- src/daft-scan/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/daft-scan/src/lib.rs b/src/daft-scan/src/lib.rs index b719ccfcd2..40f8689264 100644 --- a/src/daft-scan/src/lib.rs +++ b/src/daft-scan/src/lib.rs @@ -463,7 +463,7 @@ impl ScanTask { }); } if sc1.file_path_column != sc2.file_path_column { - return Err(Error::DifferingFileFormatConfigsInScanTaskMerge { + return Err(Error::DifferingFilePathColumnsInScanTaskMerge { ffc1: sc1.file_format_config.clone(), ffc2: sc2.file_format_config.clone(), }); From 67ce742c6da537decb915657b6b0b20160114981 Mon Sep 17 00:00:00 2001 From: Colin Ho Date: Fri, 11 Oct 2024 14:34:37 -0700 Subject: [PATCH 13/14] read sql parquet --- src/daft-plan/src/builder.rs | 9 ++++++++- src/daft-sql/src/table_provider/read_parquet.rs | 2 ++ 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/src/daft-plan/src/builder.rs b/src/daft-plan/src/builder.rs index 9e20e398ae..1a6fbb8503 100644 --- a/src/daft-plan/src/builder.rs +++ b/src/daft-plan/src/builder.rs @@ -610,6 +610,7 @@ pub struct ParquetScanBuilder { pub io_config: Option, pub multithreaded: bool, pub schema: Option, + pub file_path_column: Option, } impl ParquetScanBuilder { @@ -630,6 +631,7 @@ impl ParquetScanBuilder { multithreaded: true, schema: None, io_config: None, + file_path_column: None, } } pub fn infer_schema(mut self, infer_schema: bool) -> Self { @@ -667,6 +669,11 @@ impl ParquetScanBuilder { self } + pub fn file_path_column(mut self, file_path_column: String) -> Self { + self.file_path_column = Some(file_path_column); + self + } + pub fn finish(self) -> DaftResult { let cfg = ParquetSourceConfig { coerce_int96_timestamp_unit: self.coerce_int96_timestamp_unit, @@ -683,7 +690,7 @@ impl ParquetScanBuilder { ))), self.infer_schema, self.schema, - None, + self.file_path_column, )?); LogicalPlanBuilder::table_scan(ScanOperatorRef(operator), None) diff --git a/src/daft-sql/src/table_provider/read_parquet.rs b/src/daft-sql/src/table_provider/read_parquet.rs index 36f84507a0..7bdc191536 100644 --- a/src/daft-sql/src/table_provider/read_parquet.rs +++ b/src/daft-sql/src/table_provider/read_parquet.rs @@ -30,6 +30,7 @@ impl TryFrom for ParquetScanBuilder { PlannerError::invalid_argument("coerce_int96_timestamp_unit", "read_parquet") })?; let chunk_size = args.try_get_named("chunk_size")?; + let file_path_column = args.try_get_named("file_path_column")?; let multithreaded = args.try_get_named("multithreaded")?.unwrap_or(true); let field_id_mapping = None; // TODO @@ -47,6 +48,7 @@ impl TryFrom for ParquetScanBuilder { io_config, multithreaded, schema, + file_path_column, }) } } From fd676118cde13c96d9bb904f00dbcc9b6d70e1d5 Mon Sep 17 00:00:00 2001 From: Colin Ho Date: Fri, 11 Oct 2024 14:39:26 -0700 Subject: [PATCH 14/14] fix error --- src/daft-scan/src/lib.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/daft-scan/src/lib.rs b/src/daft-scan/src/lib.rs index 40f8689264..7fce95da7f 100644 --- a/src/daft-scan/src/lib.rs +++ b/src/daft-scan/src/lib.rs @@ -71,7 +71,10 @@ pub enum Error { fpc1, fpc2 ))] - DifferingFilePathColumnsInScanTaskMerge { fpc1: String, fpc2: String }, + DifferingFilePathColumnsInScanTaskMerge { + fpc1: Option, + fpc2: Option, + }, #[snafu(display( "StorageConfigs were different during ScanTask::merge: {:?} vs {:?}", @@ -464,8 +467,8 @@ impl ScanTask { } if sc1.file_path_column != sc2.file_path_column { return Err(Error::DifferingFilePathColumnsInScanTaskMerge { - ffc1: sc1.file_format_config.clone(), - ffc2: sc2.file_format_config.clone(), + fpc1: sc1.file_path_column.clone(), + fpc2: sc2.file_path_column.clone(), }); } Ok(Self::new(