Skip to content

Commit

Permalink
[FEAT] Include file paths as column from read_parquet/csv/json (#2953)
Browse files Browse the repository at this point in the history
Addresses: #2808

This PR enables adding file path as a column from file reads via the
`file_path_column: str | None` parameter. This works by appending a
column of the file path to the `Table` post read + pushdowns.

Having it as a string makes it easy to have unique field name
guarantees, i.e. if the user specifies a column name that already exists
then an error is thrown.

---------

Co-authored-by: Colin Ho <[email protected]>
Co-authored-by: Colin Ho <[email protected]>
Co-authored-by: Desmond Cheong <[email protected]>
  • Loading branch information
4 people authored Oct 11, 2024
1 parent ab1b772 commit c694c9e
Show file tree
Hide file tree
Showing 18 changed files with 493 additions and 52 deletions.
1 change: 1 addition & 0 deletions daft/daft/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -782,6 +782,7 @@ class ScanOperatorHandle:
storage_config: StorageConfig,
infer_schema: bool,
schema: PySchema | None = None,
file_path_column: str | None = None,
) -> ScanOperatorHandle: ...
@staticmethod
def from_python_scan_operator(operator: ScanOperator) -> ScanOperatorHandle: ...
Expand Down
3 changes: 3 additions & 0 deletions daft/io/_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ def read_csv(
comment: Optional[str] = None,
allow_variable_columns: bool = False,
io_config: Optional["IOConfig"] = None,
file_path_column: Optional[str] = None,
use_native_downloader: bool = True,
schema_hints: Optional[Dict[str, DataType]] = None,
_buffer_size: Optional[int] = None,
Expand All @@ -54,6 +55,7 @@ def read_csv(
comment (str): Character to treat as the start of a comment line, or None to not support comments
allow_variable_columns (bool): Whether to allow for variable number of columns in the CSV, defaults to False. If set to True, Daft will append nulls to rows with less columns than the schema, and ignore extra columns in rows with more columns
io_config (IOConfig): Config to be used with the native downloader
file_path_column: Include the source path(s) as a column with this name. Defaults to None.
use_native_downloader: Whether to use the native downloader instead of PyArrow for reading Parquet. This
is currently experimental.
Expand Down Expand Up @@ -97,5 +99,6 @@ def read_csv(
schema=schema,
file_format_config=file_format_config,
storage_config=storage_config,
file_path_column=file_path_column,
)
return DataFrame(builder)
3 changes: 3 additions & 0 deletions daft/io/_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ def read_json(
infer_schema: bool = True,
schema: Optional[Dict[str, DataType]] = None,
io_config: Optional["IOConfig"] = None,
file_path_column: Optional[str] = None,
use_native_downloader: bool = True,
schema_hints: Optional[Dict[str, DataType]] = None,
_buffer_size: Optional[int] = None,
Expand All @@ -41,6 +42,7 @@ def read_json(
infer_schema (bool): Whether to infer the schema of the JSON, defaults to True.
schema (dict[str, DataType]): A schema that is used as the definitive schema for the JSON if infer_schema is False, otherwise it is used as a schema hint that is applied after the schema is inferred.
io_config (IOConfig): Config to be used with the native downloader
file_path_column: Include the source path(s) as a column with this name. Defaults to None.
use_native_downloader: Whether to use the native downloader instead of PyArrow for reading Parquet. This
is currently experimental.
Expand Down Expand Up @@ -74,5 +76,6 @@ def read_json(
schema=schema,
file_format_config=file_format_config,
storage_config=storage_config,
file_path_column=file_path_column,
)
return DataFrame(builder)
3 changes: 3 additions & 0 deletions daft/io/_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ def read_parquet(
infer_schema: bool = True,
schema: Optional[Dict[str, DataType]] = None,
io_config: Optional["IOConfig"] = None,
file_path_column: Optional[str] = None,
use_native_downloader: bool = True,
coerce_int96_timestamp_unit: Optional[Union[str, TimeUnit]] = None,
schema_hints: Optional[Dict[str, DataType]] = None,
Expand All @@ -45,6 +46,7 @@ def read_parquet(
infer_schema (bool): Whether to infer the schema of the Parquet, defaults to True.
schema (dict[str, DataType]): A schema that is used as the definitive schema for the Parquet file if infer_schema is False, otherwise it is used as a schema hint that is applied after the schema is inferred.
io_config (IOConfig): Config to be used with the native downloader
file_path_column: Include the source path(s) as a column with this name. Defaults to None.
use_native_downloader: Whether to use the native downloader instead of PyArrow for reading Parquet.
coerce_int96_timestamp_unit: TimeUnit to coerce Int96 TimeStamps to. e.g.: [ns, us, ms], Defaults to None.
_multithreaded_io: Whether to use multithreading for IO threads. Setting this to False can be helpful in reducing
Expand Down Expand Up @@ -93,5 +95,6 @@ def read_parquet(
schema=schema,
file_format_config=file_format_config,
storage_config=storage_config,
file_path_column=file_path_column,
)
return DataFrame(builder)
2 changes: 2 additions & 0 deletions daft/io/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ def get_tabular_files_scan(
schema: dict[str, DataType] | None,
file_format_config: FileFormatConfig,
storage_config: StorageConfig,
file_path_column: str | None = None,
) -> LogicalPlanBuilder:
"""Returns a TabularFilesScan LogicalPlan for a given glob filepath."""
# Glob the path using the Runner
Expand All @@ -40,6 +41,7 @@ def get_tabular_files_scan(
storage_config,
infer_schema=infer_schema,
schema=_get_schema_from_dict(schema)._schema if schema is not None else None,
file_path_column=file_path_column,
)

builder = LogicalPlanBuilder.from_tabular_scan(
Expand Down
1 change: 1 addition & 0 deletions src/daft-csv/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ fn tables_concat(mut tables: Vec<Table>) -> DaftResult<Table> {
)
}

#[allow(clippy::too_many_arguments)]
async fn read_csv_single_into_table(
uri: &str,
convert_options: Option<CsvConvertOptions>,
Expand Down
3 changes: 3 additions & 0 deletions src/daft-micropartition/src/micropartition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -644,6 +644,7 @@ impl MicroPartition {
field_id_mapping.clone(),
parquet_metadata,
chunk_size,
scan_task.file_path_column.as_deref(),
)
.context(DaftCoreComputeSnafu)
}
Expand Down Expand Up @@ -1121,6 +1122,7 @@ pub fn read_parquet_into_micropartition<T: AsRef<str>>(
field_id_mapping: Option<Arc<BTreeMap<i32, Field>>>,
parquet_metadata: Option<Vec<Arc<FileMetaData>>>,
chunk_size: Option<usize>,
file_path_column: Option<&str>,
) -> DaftResult<MicroPartition> {
if let Some(so) = start_offset
&& so > 0
Expand Down Expand Up @@ -1308,6 +1310,7 @@ pub fn read_parquet_into_micropartition<T: AsRef<str>>(
}),
num_rows,
),
file_path_column.map(|s| s.to_string()),
);

let fill_map = scan_task.partition_spec().map(|pspec| pspec.to_fill_map());
Expand Down
1 change: 1 addition & 0 deletions src/daft-micropartition/src/ops/cast_to_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ impl MicroPartition {
schema,
scan_task.storage_config.clone(),
scan_task.pushdowns.clone(),
scan_task.file_path_column.clone(),
))
};
Ok(Self::new_unloaded(
Expand Down
2 changes: 2 additions & 0 deletions src/daft-micropartition/src/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -619,6 +619,7 @@ impl PyMicroPartition {
None,
None,
None,
None,
)
})?;
Ok(mp.into())
Expand Down Expand Up @@ -666,6 +667,7 @@ impl PyMicroPartition {
None,
None,
chunk_size,
None,
)
})?;
Ok(mp.into())
Expand Down
65 changes: 49 additions & 16 deletions src/daft-plan/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use daft_scan::{
PhysicalScanInfo, Pushdowns, ScanOperatorRef,
};
use daft_schema::{
dtype::DataType,
field::Field,
schema::{Schema, SchemaRef},
};
Expand Down Expand Up @@ -146,7 +147,7 @@ impl LogicalPlanBuilder {
io_config: Option<IOConfig>,
multithreaded_io: bool,
) -> DaftResult<Self> {
use daft_scan::storage_config::PyStorageConfig;
use daft_scan::storage_config::{NativeStorageConfig, PyStorageConfig, StorageConfig};

Python::with_gil(|py| {
let io_config = io_config.unwrap_or_default();
Expand Down Expand Up @@ -194,21 +195,45 @@ impl LogicalPlanBuilder {
pushdowns.clone().unwrap_or_default(),
));
// If column selection (projection) pushdown is specified, prune unselected columns from the schema.
let output_schema = if let Some(Pushdowns {
columns: Some(columns),
..
}) = &pushdowns
&& columns.len() < schema.fields.len()
{
let pruned_upstream_schema = schema
.fields
.iter()
.filter(|&(name, _)| columns.contains(name))
.map(|(_, field)| field.clone())
.collect::<Vec<_>>();
Arc::new(Schema::new(pruned_upstream_schema)?)
} else {
schema
// If file path column is specified, add it to the schema.
let output_schema = match (&pushdowns, &scan_operator.0.file_path_column()) {
(
Some(Pushdowns {
columns: Some(columns),
..
}),
file_path_column_opt,
) if columns.len() < schema.fields.len() => {
let pruned_fields = schema
.fields
.iter()
.filter(|(name, _)| columns.contains(name))
.map(|(_, field)| field.clone());

let finalized_fields = match file_path_column_opt {
Some(file_path_column) => pruned_fields
.chain(std::iter::once(Field::new(
(*file_path_column).to_string(),
DataType::Utf8,
)))
.collect::<Vec<_>>(),
None => pruned_fields.collect::<Vec<_>>(),
};
Arc::new(Schema::new(finalized_fields)?)
}
(None, Some(file_path_column)) => {
let schema_with_file_path = schema
.fields
.values()
.cloned()
.chain(std::iter::once(Field::new(
(*file_path_column).to_string(),
DataType::Utf8,
)))
.collect::<Vec<_>>();
Arc::new(Schema::new(schema_with_file_path)?)
}
_ => schema,
};
let logical_plan: LogicalPlan =
logical_ops::Source::new(output_schema, source_info.into()).into();
Expand Down Expand Up @@ -585,6 +610,7 @@ pub struct ParquetScanBuilder {
pub io_config: Option<IOConfig>,
pub multithreaded: bool,
pub schema: Option<SchemaRef>,
pub file_path_column: Option<String>,
}

impl ParquetScanBuilder {
Expand All @@ -605,6 +631,7 @@ impl ParquetScanBuilder {
multithreaded: true,
schema: None,
io_config: None,
file_path_column: None,
}
}
pub fn infer_schema(mut self, infer_schema: bool) -> Self {
Expand Down Expand Up @@ -642,6 +669,11 @@ impl ParquetScanBuilder {
self
}

pub fn file_path_column(mut self, file_path_column: String) -> Self {
self.file_path_column = Some(file_path_column);
self
}

pub fn finish(self) -> DaftResult<LogicalPlanBuilder> {
let cfg = ParquetSourceConfig {
coerce_int96_timestamp_unit: self.coerce_int96_timestamp_unit,
Expand All @@ -658,6 +690,7 @@ impl ParquetScanBuilder {
))),
self.infer_schema,
self.schema,
self.file_path_column,
)?);

LogicalPlanBuilder::table_scan(ScanOperatorRef(operator), None)
Expand Down
5 changes: 5 additions & 0 deletions src/daft-scan/src/anonymous.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ impl ScanOperator for AnonymousScanOperator {
&[]
}

fn file_path_column(&self) -> Option<&str> {
None
}

fn can_absorb_filter(&self) -> bool {
false
}
Expand Down Expand Up @@ -101,6 +105,7 @@ impl ScanOperator for AnonymousScanOperator {
schema.clone(),
storage_config.clone(),
pushdowns.clone(),
None,
)
.into())
},
Expand Down
Loading

0 comments on commit c694c9e

Please sign in to comment.