From 96b73d60f6850095804b9e49b136af752819b1c1 Mon Sep 17 00:00:00 2001 From: Jay Chia <17691182+jaychia@users.noreply.github.com> Date: Wed, 8 Nov 2023 09:20:13 -0800 Subject: [PATCH] [CHORE] Convert GlobScanOperator to perform streaming into result and take a list of glob paths (#1577) 1. Converts GlobScanOperator to utilize Iterators and tokio channels to provide end-to-end streaming of glob results 2. Allows `GlobScanOperator` to be created from a list of glob files, which will chain the results into a single return stream --------- Co-authored-by: Jay Chia --- Cargo.lock | 1 + daft/daft.pyi | 2 +- daft/io/common.py | 21 +--- src/daft-io/src/azure_blob.rs | 2 +- src/daft-io/src/google_cloud.rs | 2 +- src/daft-io/src/http.rs | 2 +- src/daft-io/src/lib.rs | 14 ++- src/daft-io/src/local.rs | 2 +- src/daft-io/src/object_io.rs | 2 +- src/daft-io/src/s3_like.rs | 2 +- src/daft-scan/Cargo.toml | 1 + src/daft-scan/src/glob.rs | 173 +++++++++++++++++++++++--------- src/daft-scan/src/python.rs | 5 +- 13 files changed, 148 insertions(+), 81 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 95ee5698e5..dde1567414 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1296,6 +1296,7 @@ dependencies = [ "daft-parquet", "daft-stats", "daft-table", + "futures", "pyo3", "pyo3-log", "serde", diff --git a/daft/daft.pyi b/daft/daft.pyi index 3f15e13c31..1e4255373b 100644 --- a/daft/daft.pyi +++ b/daft/daft.pyi @@ -403,7 +403,7 @@ class ScanOperatorHandle: ) -> ScanOperatorHandle: ... @staticmethod def glob_scan( - glob_path: str, + glob_path: list[str], file_format_config: FileFormatConfig, storage_config: StorageConfig, schema: PySchema | None = None, diff --git a/daft/io/common.py b/daft/io/common.py index 68bf279b91..f6121fa4cf 100644 --- a/daft/io/common.py +++ b/daft/io/common.py @@ -55,28 +55,15 @@ def _get_tabular_files_scan( scan_op: ScanOperatorHandle if isinstance(path, list): - # Eagerly globs each path and fallback to AnonymousScanOperator. - # NOTE: We could instead have GlobScanOperator take a list of paths and mux the glob output streams - runner_io = get_context().runner().runner_io() - file_infos = runner_io.glob_paths_details(path, file_format_config=file_format_config, io_config=io_config) - - # TODO: Should we move this into the AnonymousScanOperator itself? - # Infer schema if no hints provided - inferred_or_provided_schema = ( - schema_hint - if schema_hint is not None - else runner_io.get_schema_from_first_filepath(file_infos, file_format_config, storage_config) - ) - - scan_op = ScanOperatorHandle.anonymous_scan( - file_infos.file_paths, - inferred_or_provided_schema._schema, + scan_op = ScanOperatorHandle.glob_scan( + path, file_format_config, storage_config, + schema=schema_hint._schema if schema_hint is not None else None, ) elif isinstance(path, str): scan_op = ScanOperatorHandle.glob_scan( - path, + [path], file_format_config, storage_config, schema=schema_hint._schema if schema_hint is not None else None, diff --git a/src/daft-io/src/azure_blob.rs b/src/daft-io/src/azure_blob.rs index 9b0130316c..24f1547c57 100644 --- a/src/daft-io/src/azure_blob.rs +++ b/src/daft-io/src/azure_blob.rs @@ -475,7 +475,7 @@ impl ObjectSource for AzureBlobSource { page_size: Option, limit: Option, io_stats: Option, - ) -> super::Result>> { + ) -> super::Result>> { use crate::object_store_glob::glob; // Ensure fanout_limit is not None to prevent runaway concurrency diff --git a/src/daft-io/src/google_cloud.rs b/src/daft-io/src/google_cloud.rs index dfb20a44de..2abc62c9c7 100644 --- a/src/daft-io/src/google_cloud.rs +++ b/src/daft-io/src/google_cloud.rs @@ -409,7 +409,7 @@ impl ObjectSource for GCSSource { page_size: Option, limit: Option, io_stats: Option, - ) -> super::Result>> { + ) -> super::Result>> { use crate::object_store_glob::glob; // Ensure fanout_limit is not None to prevent runaway concurrency diff --git a/src/daft-io/src/http.rs b/src/daft-io/src/http.rs index 01b143990a..b31f46060b 100644 --- a/src/daft-io/src/http.rs +++ b/src/daft-io/src/http.rs @@ -252,7 +252,7 @@ impl ObjectSource for HttpSource { _page_size: Option, limit: Option, io_stats: Option, - ) -> super::Result>> { + ) -> super::Result>> { use crate::object_store_glob::glob; // Ensure fanout_limit is None because HTTP ObjectSource does not support prefix listing diff --git a/src/daft-io/src/lib.rs b/src/daft-io/src/lib.rs index 323dc96192..cf97444422 100644 --- a/src/daft-io/src/lib.rs +++ b/src/daft-io/src/lib.rs @@ -26,7 +26,7 @@ use tokio::runtime::RuntimeFlavor; use std::{borrow::Cow, collections::HashMap, hash::Hash, ops::Range, sync::Arc}; -use futures::{StreamExt, TryStreamExt}; +use futures::{stream::BoxStream, StreamExt, TryStreamExt}; use snafu::Snafu; use url::ParseError; @@ -168,18 +168,16 @@ impl IOClient { pub async fn glob( &self, - input: &str, + input: String, fanout_limit: Option, page_size: Option, limit: Option, io_stats: Option>, - ) -> Result> { - let (scheme, _) = parse_url(input)?; + ) -> Result>> { + let (scheme, _) = parse_url(input.as_str())?; let source = self.get_source(&scheme).await?; - let files: Vec = source - .glob(input, fanout_limit, page_size, limit, io_stats) - .await? - .try_collect() + let files = source + .glob(input.as_str(), fanout_limit, page_size, limit, io_stats) .await?; Ok(files) } diff --git a/src/daft-io/src/local.rs b/src/daft-io/src/local.rs index 13e1dd3845..1f7afdc26d 100644 --- a/src/daft-io/src/local.rs +++ b/src/daft-io/src/local.rs @@ -149,7 +149,7 @@ impl ObjectSource for LocalSource { _page_size: Option, limit: Option, io_stats: Option, - ) -> super::Result>> { + ) -> super::Result>> { use crate::object_store_glob::glob; // Ensure fanout_limit is None because Local ObjectSource does not support prefix listing diff --git a/src/daft-io/src/object_io.rs b/src/daft-io/src/object_io.rs index 592d33f410..e724102e1e 100644 --- a/src/daft-io/src/object_io.rs +++ b/src/daft-io/src/object_io.rs @@ -118,7 +118,7 @@ pub(crate) trait ObjectSource: Sync + Send { page_size: Option, limit: Option, io_stats: Option, - ) -> super::Result>>; + ) -> super::Result>>; async fn ls( &self, diff --git a/src/daft-io/src/s3_like.rs b/src/daft-io/src/s3_like.rs index 73b7e8db79..20f2351a60 100644 --- a/src/daft-io/src/s3_like.rs +++ b/src/daft-io/src/s3_like.rs @@ -783,7 +783,7 @@ impl ObjectSource for S3LikeSource { page_size: Option, limit: Option, io_stats: Option, - ) -> super::Result>> { + ) -> super::Result>> { use crate::object_store_glob::glob; // Ensure fanout_limit is not None to prevent runaway concurrency diff --git a/src/daft-scan/Cargo.toml b/src/daft-scan/Cargo.toml index 9573f3c098..91d6b8b6a4 100644 --- a/src/daft-scan/Cargo.toml +++ b/src/daft-scan/Cargo.toml @@ -9,6 +9,7 @@ daft-io = {path = "../daft-io", default-features = false} daft-parquet = {path = "../daft-parquet", default-features = false} daft-stats = {path = "../daft-stats", default-features = false} daft-table = {path = "../daft-table", default-features = false} +futures = {workspace = true} pyo3 = {workspace = true, optional = true} pyo3-log = {workspace = true} serde = {workspace = true} diff --git a/src/daft-scan/src/glob.rs b/src/daft-scan/src/glob.rs index 4cf580399d..d6fadae4e0 100644 --- a/src/daft-scan/src/glob.rs +++ b/src/daft-scan/src/glob.rs @@ -1,11 +1,13 @@ use std::{fmt::Display, sync::Arc}; -use common_error::DaftResult; +use common_error::{DaftError, DaftResult}; use daft_core::schema::SchemaRef; use daft_io::{get_io_client, get_runtime, parse_url, IOClient, IOStatsContext, IOStatsRef}; use daft_parquet::read::ParquetSchemaInferenceOptions; +use futures::{stream::BoxStream, StreamExt}; +use snafu::{ResultExt, Snafu}; #[cfg(feature = "python")] -use {crate::PyIOSnafu, daft_core::schema::Schema, pyo3::Python, snafu::ResultExt}; +use {crate::PyIOSnafu, daft_core::schema::Schema, pyo3::Python}; use crate::{ file_format::{CsvSourceConfig, FileFormatConfig, JsonSourceConfig, ParquetSourceConfig}, @@ -14,30 +16,82 @@ use crate::{ }; #[derive(Debug, PartialEq, Hash)] pub struct GlobScanOperator { - glob_path: String, + glob_paths: Vec, file_format_config: Arc, schema: SchemaRef, storage_config: Arc, } +/// Wrapper struct that implements a sync Iterator for a BoxStream +struct BoxStreamIterator<'a, T> { + boxstream: BoxStream<'a, T>, + runtime_handle: tokio::runtime::Handle, +} + +impl<'a, T> Iterator for BoxStreamIterator<'a, T> { + type Item = T; + + fn next(&mut self) -> Option { + self.runtime_handle + .block_on(async { self.boxstream.next().await }) + } +} + +#[derive(Snafu, Debug)] +enum Error { + #[snafu(display("Glob path had no matches: \"{}\"", glob_path))] + GlobNoMatch { glob_path: String }, + #[snafu(display("Error during glob: \"{}\"", glob_path))] + GlobIOError { + glob_path: String, + source: daft_io::Error, + }, +} + +impl From for DaftError { + fn from(value: Error) -> Self { + match &value { + Error::GlobNoMatch { glob_path } => DaftError::FileNotFound { + path: glob_path.clone(), + source: Box::new(value), + }, + Error::GlobIOError { glob_path, source } => DaftError::InternalError(format!( + "Error when performing IO on path {glob_path}: {source}" + )), + } + } +} + fn run_glob( glob_path: &str, limit: Option, io_client: Arc, runtime: Arc, io_stats: Option, -) -> DaftResult> { +) -> DaftResult>>> { let (_, parsed_glob_path) = parse_url(glob_path)?; - let _rt_guard = runtime.enter(); - runtime.block_on(async { - Ok(io_client - .as_ref() - .glob(&parsed_glob_path, None, None, limit, io_stats) - .await? - .into_iter() - .map(|fm| fm.filepath) - .collect()) - }) + + // Construct a static-lifetime BoxStream returning the FileMetadata + let glob_input = parsed_glob_path.as_ref().to_string(); + let runtime_handle = runtime.handle(); + let boxstream = runtime_handle.block_on(async move { + io_client + .glob(glob_input, None, None, limit, io_stats) + .await + })?; + + // Construct a static-lifetime BoxStreamIterator + let glob_input = parsed_glob_path.as_ref().to_string(); + let iterator = BoxStreamIterator { + boxstream, + runtime_handle: runtime_handle.clone(), + }; + let iterator = iterator.map(move |fm| { + Ok(fm.map(|fm| fm.filepath).context(GlobIOSnafu { + glob_path: glob_input.clone(), + })?) + }); + Ok(Box::new(iterator)) } fn get_io_client_and_runtime( @@ -72,38 +126,56 @@ fn get_io_client_and_runtime( impl GlobScanOperator { pub fn try_new( - glob_path: &str, + glob_paths: &[&str], file_format_config: Arc, storage_config: Arc, schema: Option, ) -> DaftResult { + let first_glob_path = match glob_paths.first() { + None => Err(DaftError::ValueError( + "Cannot glob empty list of files".to_string(), + )), + Some(path) => Ok(path), + }?; + let schema = match schema { Some(s) => s, None => { let (io_runtime, io_client) = get_io_client_and_runtime(storage_config.as_ref())?; let io_stats = IOStatsContext::new(format!( - "GlobScanOperator::try_new schema inference for {glob_path}" + "GlobScanOperator::try_new schema inference for {first_glob_path}" )); - let paths = run_glob( - glob_path, + let mut paths = run_glob( + first_glob_path, Some(1), io_client.clone(), - io_runtime, + io_runtime.clone(), Some(io_stats.clone()), )?; - let first_filepath = paths[0].as_str(); + let first_filepath = match paths.next() { + Some(path) => path, + None => Err(Error::GlobNoMatch { + glob_path: first_glob_path.to_string(), + } + .into()), + }?; let inferred_schema = match file_format_config.as_ref() { FileFormatConfig::Parquet(ParquetSourceConfig { coerce_int96_timestamp_unit, .. - }) => daft_parquet::read::read_parquet_schema( - first_filepath, - io_client.clone(), - Some(io_stats), - ParquetSchemaInferenceOptions { - coerce_int96_timestamp_unit: *coerce_int96_timestamp_unit, - }, - )?, + }) => { + let io_stats = IOStatsContext::new(format!( + "GlobScanOperator constructor read_parquet_schema: for uri {first_filepath}" + )); + daft_parquet::read::read_parquet_schema( + first_filepath.as_str(), + io_client.clone(), + Some(io_stats), + ParquetSchemaInferenceOptions { + coerce_int96_timestamp_unit: *coerce_int96_timestamp_unit, + }, + )? + } FileFormatConfig::Csv(CsvSourceConfig { delimiter, has_headers, @@ -111,7 +183,7 @@ impl GlobScanOperator { .. }) => { let (schema, _, _, _, _) = daft_csv::metadata::read_csv_schema( - first_filepath, + first_filepath.as_str(), *has_headers, Some(delimiter.as_bytes()[0]), *double_quote, @@ -132,7 +204,7 @@ impl GlobScanOperator { StorageConfig::Python(_) => Python::with_gil(|py| { crate::python::pylib::read_json_schema( py, - first_filepath, + first_filepath.as_str(), storage_config.clone().into(), ) .and_then(|s| { @@ -148,7 +220,7 @@ impl GlobScanOperator { }; Ok(Self { - glob_path: glob_path.to_string(), + glob_paths: glob_paths.iter().map(|s| s.to_string()).collect(), file_format_config, schema, storage_config, @@ -184,31 +256,40 @@ impl ScanOperator for GlobScanOperator { fn to_scan_tasks( &self, pushdowns: Pushdowns, - ) -> DaftResult>>> { + ) -> DaftResult> + 'static>> { let (io_runtime, io_client) = get_io_client_and_runtime(self.storage_config.as_ref())?; let io_stats = IOStatsContext::new(format!( - "GlobScanOperator::to_scan_tasks for {}", - self.glob_path + "GlobScanOperator::to_scan_tasks for {:#?}", + self.glob_paths )); - // TODO: This runs the glob to exhaustion, but we should return an iterator instead - let files = run_glob( - self.glob_path.as_str(), - None, - io_client, - io_runtime, - Some(io_stats), - )?; + // Run [`run_glob`] on each path and mux them into the same iterator + let files = self + .glob_paths + .clone() + .into_iter() + .flat_map(move |glob_path| { + match run_glob( + glob_path.as_str(), + None, + io_client.clone(), + io_runtime.clone(), + Some(io_stats.clone()), + ) { + Ok(paths) => paths, + Err(err) => Box::new(vec![Err(err)].into_iter()), + } + }); + let file_format_config = self.file_format_config.clone(); let schema = self.schema.clone(); let storage_config = self.storage_config.clone(); - // Create one ScanTask per file. We should find a way to perform streaming from the glob instead - // of materializing here. - Ok(Box::new(files.into_iter().map(move |f| { + // Create one ScanTask per file + Ok(Box::new(files.map(move |f| { Ok(ScanTask::new( vec![DataFileSource::AnonymousDataFile { - path: f.to_string(), + path: f?.to_string(), metadata: None, partition_spec: None, statistics: None, diff --git a/src/daft-scan/src/python.rs b/src/daft-scan/src/python.rs index c8b27d45ef..3fe38dfcbf 100644 --- a/src/daft-scan/src/python.rs +++ b/src/daft-scan/src/python.rs @@ -53,19 +53,18 @@ pub mod pylib { #[staticmethod] pub fn glob_scan( py: Python, - glob_path: &str, + glob_path: Vec<&str>, file_format_config: PyFileFormatConfig, storage_config: PyStorageConfig, schema: Option, ) -> PyResult { py.allow_threads(|| { let operator = Arc::new(GlobScanOperator::try_new( - glob_path, + glob_path.as_slice(), file_format_config.into(), storage_config.into(), schema.map(|s| s.schema), )?); - Ok(ScanOperatorHandle { scan_op: ScanOperatorRef(operator), })