diff --git a/src/daft-io/src/azure_blob.rs b/src/daft-io/src/azure_blob.rs index bf868dffae..e63d77e6c1 100644 --- a/src/daft-io/src/azure_blob.rs +++ b/src/daft-io/src/azure_blob.rs @@ -430,11 +430,16 @@ impl ObjectSource for AzureBlobSource { async fn iter_dir( &self, uri: &str, - delimiter: Option<&str>, + delimiter: &str, + posix: bool, + _page_size: Option, _limit: Option, ) -> super::Result>> { let uri = url::Url::parse(uri).with_context(|_| InvalidUrlSnafu { path: uri })?; - let delimiter = delimiter.unwrap_or("/"); + + if !posix { + todo!("Prefix-listing is not yet implemented for Azure"); + } // path can be root (buckets) or path prefix within a bucket. let container = { @@ -473,8 +478,10 @@ impl ObjectSource for AzureBlobSource { async fn ls( &self, path: &str, - delimiter: Option<&str>, + delimiter: &str, + posix: bool, continuation_token: Option<&str>, + page_size: Option, ) -> super::Result { // It looks like the azure rust library API // does not currently allow using the continuation token: @@ -489,7 +496,7 @@ impl ObjectSource for AzureBlobSource { }?; let files = self - .iter_dir(path, delimiter, None) + .iter_dir(path, delimiter, posix, page_size, None) .await? .try_collect::>() .await?; diff --git a/src/daft-io/src/glob.rs b/src/daft-io/src/glob.rs index edd709b9d4..823af44d5d 100644 --- a/src/daft-io/src/glob.rs +++ b/src/daft-io/src/glob.rs @@ -19,12 +19,17 @@ pub(crate) struct GlobState { pub current_path: String, pub current_fragment_idx: usize, + // How large of a fanout this level of iteration is currently experiencing + pub current_fanout: usize, + // Whether we have encountered wildcards yet in the process of parsing pub wildcard_mode: bool, // Carry along expensive data as Arcs to avoid recomputation pub glob_fragments: Arc>, pub full_glob_matcher: Arc, + pub fanout_limit: usize, + pub page_size: Option, } impl GlobState { @@ -32,10 +37,11 @@ impl GlobState { &self.glob_fragments[self.current_fragment_idx] } - pub fn advance(self, path: String, idx: usize) -> Self { + pub fn advance(self, path: String, idx: usize, fanout_factor: usize) -> Self { GlobState { current_path: path, current_fragment_idx: idx, + current_fanout: self.current_fanout * fanout_factor, ..self.clone() } } @@ -87,7 +93,7 @@ impl GlobFragment { let mut ptr = 0; while ptr < data.len() { let remaining = &data[ptr..]; - match remaining.find("\\\\") { + match remaining.find(r"\\") { Some(backslash_idx) => { escaped_data.push_str(&remaining[..backslash_idx].replace('\\', "")); escaped_data.extend(std::iter::once('\\')); diff --git a/src/daft-io/src/google_cloud.rs b/src/daft-io/src/google_cloud.rs index 7f3a2a3849..15a612caa2 100644 --- a/src/daft-io/src/google_cloud.rs +++ b/src/daft-io/src/google_cloud.rs @@ -197,6 +197,7 @@ impl GCSClientWrapper { key: &str, delimiter: &str, continuation_token: Option<&str>, + page_size: Option, ) -> super::Result { let req = ListObjectsRequest { bucket: bucket.to_string(), @@ -205,7 +206,7 @@ impl GCSClientWrapper { start_offset: None, page_token: continuation_token.map(|s| s.to_string()), delimiter: Some(delimiter.to_string()), // returns results in "directory mode" - max_results: Some(1000), // Recommended value from API docs + max_results: page_size, include_trailing_delimiter: Some(false), // This will not populate "directories" in the response's .item[] projection: None, versions: None, @@ -237,14 +238,19 @@ impl GCSClientWrapper { async fn ls( &self, path: &str, - delimiter: Option<&str>, + delimiter: &str, + posix: bool, continuation_token: Option<&str>, + page_size: Option, ) -> super::Result { let uri = url::Url::parse(path).with_context(|_| InvalidUrlSnafu { path })?; let (bucket, key) = parse_uri(&uri)?; - let delimiter = delimiter.unwrap_or("/"); match self { GCSClientWrapper::Native(client) => { + if !posix { + todo!("Prefix-listing is not yet implemented for GCS"); + } + // Attempt to forcefully ls the key as a directory (by ensuring a "/" suffix) let forced_directory_key = format!("{}{delimiter}", key.trim_end_matches(delimiter)); @@ -255,6 +261,7 @@ impl GCSClientWrapper { forced_directory_key.as_str(), delimiter, continuation_token, + page_size, ) .await?; @@ -262,7 +269,14 @@ impl GCSClientWrapper { // details as the one-and-only-one entry if forced_directory_ls_result.files.is_empty() { let file_result = self - ._ls_impl(client, bucket, key, delimiter, continuation_token) + ._ls_impl( + client, + bucket, + key, + delimiter, + continuation_token, + page_size, + ) .await?; // Not dir and not file, so it is missing @@ -279,7 +293,9 @@ impl GCSClientWrapper { } } GCSClientWrapper::S3Compat(client) => { - client.ls(path, Some(delimiter), continuation_token).await + client + .ls(path, delimiter, posix, continuation_token, page_size) + .await } } } @@ -340,9 +356,13 @@ impl ObjectSource for GCSSource { async fn ls( &self, path: &str, - delimiter: Option<&str>, + delimiter: &str, + posix: bool, continuation_token: Option<&str>, + page_size: Option, ) -> super::Result { - self.client.ls(path, delimiter, continuation_token).await + self.client + .ls(path, delimiter, posix, continuation_token, page_size) + .await } } diff --git a/src/daft-io/src/http.rs b/src/daft-io/src/http.rs index 5c2f3b252f..d47008d89e 100644 --- a/src/daft-io/src/http.rs +++ b/src/daft-io/src/http.rs @@ -225,9 +225,15 @@ impl ObjectSource for HttpSource { async fn ls( &self, path: &str, - _delimiter: Option<&str>, + _delimiter: &str, + posix: bool, _continuation_token: Option<&str>, + _page_size: Option, ) -> super::Result { + if !posix { + todo!("Prefix-listing is not implemented for HTTP listing"); + } + let request = self.client.get(path); let response = request .send() diff --git a/src/daft-io/src/local.rs b/src/daft-io/src/local.rs index 6cf62e9634..3a039ed623 100644 --- a/src/daft-io/src/local.rs +++ b/src/daft-io/src/local.rs @@ -132,10 +132,14 @@ impl ObjectSource for LocalSource { async fn ls( &self, path: &str, - _delimiter: Option<&str>, + delimiter: &str, + posix: bool, _continuation_token: Option<&str>, + page_size: Option, ) -> super::Result { - let s = self.iter_dir(path, None, None).await?; + let s = self + .iter_dir(path, delimiter, posix, page_size, None) + .await?; let files = s.try_collect::>().await?; Ok(LSResult { files, @@ -146,9 +150,15 @@ impl ObjectSource for LocalSource { async fn iter_dir( &self, uri: &str, - _delimiter: Option<&str>, + _delimiter: &str, + posix: bool, + _page_size: Option, _limit: Option, ) -> super::Result>> { + if !posix { + todo!("Prefix-listing is not implemented for local"); + } + const LOCAL_PROTOCOL: &str = "file://"; let Some(uri) = uri.strip_prefix(LOCAL_PROTOCOL) else { return Err(Error::InvalidFilePath { path: uri.into() }.into()); @@ -324,7 +334,7 @@ mod tests { let dir_path = format!("file://{}", dir.path().to_string_lossy()); let client = LocalSource::get_client().await?; - let ls_result = client.ls(dir_path.as_ref(), None, None).await?; + let ls_result = client.ls(dir_path.as_ref(), "/", true, None, None).await?; let mut files = ls_result.files.clone(); // Ensure stable sort ordering of file paths before comparing with expected payload. files.sort_by(|a, b| a.filepath.cmp(&b.filepath)); diff --git a/src/daft-io/src/object_io.rs b/src/daft-io/src/object_io.rs index 455cf6b26b..fa2776ebcb 100644 --- a/src/daft-io/src/object_io.rs +++ b/src/daft-io/src/object_io.rs @@ -16,6 +16,9 @@ use crate::{ local::{collect_file, LocalFile}, }; +/// Default limit before we fallback onto parallel prefix list streams +static DEFAULT_FANOUT_LIMIT: usize = 1024; + pub enum GetResult { File(LocalFile), Stream( @@ -108,30 +111,34 @@ pub(crate) trait ObjectSource: Sync + Send { async fn ls( &self, path: &str, - delimiter: Option<&str>, + delimiter: &str, + posix: bool, continuation_token: Option<&str>, + page_size: Option, ) -> super::Result; async fn iter_dir( &self, uri: &str, - delimiter: Option<&str>, + delimiter: &str, + posix: bool, + page_size: Option, _limit: Option, ) -> super::Result>> { let uri = uri.to_string(); - let delimiter = delimiter.map(String::from); + let delimiter = delimiter.to_string(); let s = stream! { - let lsr = self.ls(&uri, delimiter.as_deref(), None).await?; - let mut continuation_token = lsr.continuation_token.clone(); - for file in lsr.files { - yield Ok(file); + let lsr = self.ls(&uri, delimiter.as_str(), posix, None, page_size).await?; + for fm in lsr.files { + yield Ok(fm); } + let mut continuation_token = lsr.continuation_token.clone(); while continuation_token.is_some() { - let lsr = self.ls(&uri, delimiter.as_deref(), continuation_token.as_deref()).await?; + let lsr = self.ls(&uri, delimiter.as_str(), posix, continuation_token.as_deref(), page_size).await?; continuation_token = lsr.continuation_token.clone(); - for file in lsr.files { - yield Ok(file); + for fm in lsr.files { + yield Ok(fm); } } }; @@ -139,16 +146,98 @@ pub(crate) trait ObjectSource: Sync + Send { } } +/// Helper method to iterate on a directory with the following behavior +/// +/// * First attempts to non-recursively list all Files and Directories under the current `uri` +/// * If during iteration we detect the number of Directories being returned exceeds `max_dirs`, we +/// fall back onto a prefix list of all Files with the current `uri` as the prefix +/// +/// Returns a tuple `(file_metadata_stream: BoxStream<...>, dir_count: usize)` where the second element +/// indicates the number of Directory entries contained within the stream +async fn ls_with_prefix_fallback( + source: Arc, + uri: &str, + delimiter: &str, + max_dirs: usize, + page_size: Option, +) -> (BoxStream<'static, super::Result>, usize) { + // Prefix list function that only returns Files + fn prefix_ls( + source: Arc, + path: String, + delimiter: String, + page_size: Option, + ) -> BoxStream<'static, super::Result> { + stream! { + match source.iter_dir(&path, delimiter.as_str(), false, page_size, None).await { + Ok(mut result_stream) => { + while let Some(fm) = result_stream.next().await { + match fm { + Ok(fm) => { + if matches!(fm.filetype, FileType::File) + { + yield Ok(fm) + } + } + Err(e) => yield Err(e), + } + } + }, + Err(e) => yield Err(e), + } + } + .boxed() + } + + // Buffer results in memory as we go along + let mut results_buffer = vec![]; + let mut fm_stream = source + .iter_dir(uri, delimiter, true, page_size, None) + .await + .unwrap_or_else(|e| futures::stream::iter([Err(e)]).boxed()); + + // Iterate and collect results into the `results_buffer`, but terminate early if too many directories are found + let mut dir_count_so_far = 0; + while let Some(fm) = fm_stream.next().await { + if let Ok(fm) = &fm { + if matches!(fm.filetype, FileType::Directory) { + dir_count_so_far += 1; + // STOP EARLY!! + // If the number of directory results are more than `max_dirs`, we terminate the function early, + // throw away our results buffer and return a stream of FileType::File files using `prefix_ls` instead + if dir_count_so_far > max_dirs { + return ( + prefix_ls( + source.clone(), + uri.to_string(), + delimiter.to_string(), + page_size, + ), + 0, + ); + } + } + } + results_buffer.push(fm); + } + + // No early termination: we unwrap the results in our results buffer and yield data as a stream + let s = futures::stream::iter(results_buffer); + (s.boxed(), dir_count_so_far) +} + pub(crate) async fn glob( source: Arc, glob: &str, + fanout_limit: Option, + page_size: Option, ) -> super::Result>> { // If no special characters, we fall back to ls behavior let full_fragment = GlobFragment::new(glob); if !full_fragment.has_special_character() { let glob = full_fragment.escaped_str().to_string(); return Ok(stream! { - let mut results = source.iter_dir(glob.as_str(), Some("/"), None).await?; + let mut results = source.iter_dir(glob.as_str(), "/", true, page_size, None).await?; while let Some(val) = results.next().await { match &val { // Ignore non-File results @@ -169,6 +258,7 @@ pub(crate) async fn glob( }; let glob = glob.as_str(); + let fanout_limit = fanout_limit.unwrap_or(DEFAULT_FANOUT_LIMIT); let glob_fragments = to_glob_fragments(glob)?; let full_glob_matcher = GlobBuilder::new(glob) .literal_separator(true) @@ -199,10 +289,14 @@ pub(crate) async fn glob( // BASE CASE: current_fragment is a ** // We perform a recursive ls and filter on the results for only FileType::File results that match the full glob if current_fragment.escaped_str() == "**" { - let mut results = source - .iter_dir(&state.current_path, Some("/"), None) - .await - .unwrap_or_else(|e| futures::stream::iter([Err(e)]).boxed()); + let (mut results, stream_dir_count) = ls_with_prefix_fallback( + source.clone(), + &state.current_path, + "/", + state.fanout_limit / state.current_fanout, + state.page_size, + ) + .await; while let Some(val) = results.next().await { match val { @@ -217,6 +311,7 @@ pub(crate) async fn glob( state.clone().advance( fm.filepath.clone(), state.current_fragment_idx, + stream_dir_count, ), ); } @@ -241,7 +336,7 @@ pub(crate) async fn glob( // Last fragment contains a wildcard: we list the last level and match against the full glob if current_fragment.has_special_character() { let mut results = source - .iter_dir(&state.current_path, Some("/"), None) + .iter_dir(&state.current_path, "/", true, state.page_size, None) .await .unwrap_or_else(|e| futures::stream::iter([Err(e)]).boxed()); @@ -264,7 +359,9 @@ pub(crate) async fn glob( // Last fragment does not contain wildcard: we return it if the full path exists and is a FileType::File } else { let full_dir_path = state.current_path.clone() + current_fragment.escaped_str(); - let single_file_ls = source.ls(full_dir_path.as_str(), Some("/"), None).await; + let single_file_ls = source + .ls(full_dir_path.as_str(), "/", true, None, state.page_size) + .await; match single_file_ls { Ok(mut single_file_ls) => { if single_file_ls.files.len() == 1 @@ -295,28 +392,43 @@ pub(crate) async fn glob( .build() .expect("Cannot parse glob") .compile_matcher(); - let mut results = source - .iter_dir(&state.current_path, Some("/"), None) - .await - .unwrap_or_else(|e| futures::stream::iter([Err(e)]).boxed()); + + let (mut results, stream_dir_count) = ls_with_prefix_fallback( + source.clone(), + &state.current_path, + "/", + state.fanout_limit / state.current_fanout, + state.page_size, + ) + .await; while let Some(val) = results.next().await { match val { - Ok(fm) => { - if matches!(fm.filetype, FileType::Directory) - && partial_glob_matcher - .is_match(fm.filepath.as_str().trim_end_matches('/')) + Ok(fm) => match fm.filetype { + FileType::Directory + if partial_glob_matcher + .is_match(fm.filepath.as_str().trim_end_matches('/')) => { visit( result_tx.clone(), source.clone(), state .clone() - .advance(fm.filepath, state.current_fragment_idx + 1) + .advance( + fm.filepath, + state.current_fragment_idx + 1, + stream_dir_count, + ) .with_wildcard_mode(), ); } - } + FileType::File + if state.full_glob_matcher.is_match(fm.filepath.as_str()) => + { + result_tx.send(Ok(fm)).await.expect("Internal multithreading channel is broken: results may be incorrect"); + } + _ => (), + }, // Always silence NotFound since we are in wildcard "search" mode here by definition Err(super::Error::NotFound { .. }) => (), Err(e) => result_tx.send(Err(e)).await.expect( @@ -333,7 +445,7 @@ pub(crate) async fn glob( source.clone(), state .clone() - .advance(full_dir_path, state.current_fragment_idx + 1), + .advance(full_dir_path, state.current_fragment_idx + 1, 1), ); } }); @@ -348,6 +460,9 @@ pub(crate) async fn glob( glob_fragments: Arc::new(glob_fragments), full_glob_matcher: Arc::new(full_glob_matcher), wildcard_mode: false, + current_fanout: 1, + fanout_limit, + page_size, }, ); @@ -374,7 +489,7 @@ pub(crate) async fn recursive_iter( log::debug!(target: "recursive_iter", "recursive_iter: spawning task to list: {dir}"); let source = source.clone(); tokio::spawn(async move { - let s = source.iter_dir(&dir, None, None).await; + let s = source.iter_dir(&dir, "/", true, Some(1000), None).await; log::debug!(target: "recursive_iter", "started listing task for {dir}"); let mut s = match s { Ok(s) => s, diff --git a/src/daft-io/src/python.rs b/src/daft-io/src/python.rs index 33b399ae8e..c4a666bd54 100644 --- a/src/daft-io/src/python.rs +++ b/src/daft-io/src/python.rs @@ -20,6 +20,8 @@ mod py { path: String, multithreaded_io: Option, io_config: Option, + fanout_limit: Option, + page_size: Option, ) -> PyResult<&PyList> { let multithreaded_io = multithreaded_io.unwrap_or(true); let lsr: DaftResult> = py.allow_threads(|| { @@ -33,7 +35,10 @@ mod py { runtime_handle.block_on(async move { let source = io_client.get_source(&scheme).await?; - let files = glob(source, path.as_ref()).await?.try_collect().await?; + let files = glob(source, path.as_ref(), fanout_limit, page_size) + .await? + .try_collect() + .await?; Ok(files) }) }); @@ -75,7 +80,7 @@ mod py { .await? } else { source - .iter_dir(&path, None, None) + .iter_dir(&path, "/", true, None, None) .await? .try_collect::>() .await? diff --git a/src/daft-io/src/s3_like.rs b/src/daft-io/src/s3_like.rs index 8dba518e46..5004abc87b 100644 --- a/src/daft-io/src/s3_like.rs +++ b/src/daft-io/src/s3_like.rs @@ -549,9 +549,10 @@ impl S3LikeSource { scheme: &str, bucket: &str, key: &str, - delimiter: String, + delimiter: Option, continuation_token: Option, region: &Region, + page_size: Option, ) -> super::Result { log::debug!("S3 list_objects: Bucket: {bucket}, Key: {key}, continuation_token: {continuation_token:?} in region: {region}"); let request = self @@ -559,13 +560,22 @@ impl S3LikeSource { .await? .list_objects_v2() .bucket(bucket) - .delimiter(&delimiter) .prefix(key); + let request = if let Some(delimiter) = delimiter.as_ref() { + request.delimiter(delimiter) + } else { + request + }; let request = if let Some(ref continuation_token) = continuation_token { request.continuation_token(continuation_token) } else { request }; + let request = if let Some(page_size) = page_size { + request.max_keys(page_size) + } else { + request + }; let response = if self.anonymous { request .customize_middleware() @@ -660,6 +670,7 @@ impl S3LikeSource { delimiter, continuation_token.clone(), &new_region, + page_size, ) .await } @@ -696,15 +707,17 @@ impl ObjectSource for S3LikeSource { .context(UnableToGrabSemaphoreSnafu)?; self._head_impl(permit, uri, &self.default_region).await } + async fn ls( &self, path: &str, - delimiter: Option<&str>, + delimiter: &str, + posix: bool, continuation_token: Option<&str>, + page_size: Option, ) -> super::Result { let parsed = url::Url::parse(path).with_context(|_| InvalidUrlSnafu { path })?; let scheme = parsed.scheme(); - let delimiter = delimiter.unwrap_or("/"); let bucket = match parsed.host_str() { Some(s) => Ok(s), @@ -713,63 +726,87 @@ impl ObjectSource for S3LikeSource { source: ParseError::EmptyHost, }), }?; - let key = parsed.path(); - let key = key - .trim_start_matches(delimiter) - .trim_end_matches(delimiter); - let key = if key.is_empty() { - "".to_string() - } else { - format!("{key}{delimiter}") - }; + let key = parsed.path().trim_start_matches(delimiter); - // assume its a directory first - let lsr = { - let permit = self - .connection_pool_sema - .acquire() - .await - .context(UnableToGrabSemaphoreSnafu)?; - - self._list_impl( - permit, - scheme, - bucket, - &key, - delimiter.into(), - continuation_token.map(String::from), - &self.default_region, - ) - .await? - }; - if lsr.files.is_empty() && key.contains(delimiter) { - let permit = self - .connection_pool_sema - .acquire() - .await - .context(UnableToGrabSemaphoreSnafu)?; - // Might be a File - let key = key.trim_end_matches(delimiter); - let mut lsr = self - ._list_impl( + if posix { + // Perform a directory-based list of entries in the next level + // assume its a directory first + let key = if key.is_empty() { + "".to_string() + } else { + format!("{}{delimiter}", key.trim_end_matches(delimiter)) + }; + let lsr = { + let permit = self + .connection_pool_sema + .acquire() + .await + .context(UnableToGrabSemaphoreSnafu)?; + + self._list_impl( permit, scheme, bucket, - key, - delimiter.into(), + &key, + Some(delimiter.into()), continuation_token.map(String::from), &self.default_region, + page_size, ) - .await?; - let target_path = format!("{scheme}://{bucket}/{key}"); - lsr.files.retain(|f| f.filepath == target_path); - - if lsr.files.is_empty() { - // Isnt a file or a directory - return Err(Error::NotFound { path: path.into() }.into()); + .await? + }; + if lsr.files.is_empty() && key.contains(delimiter) { + let permit = self + .connection_pool_sema + .acquire() + .await + .context(UnableToGrabSemaphoreSnafu)?; + // Might be a File + let key = key.trim_end_matches(delimiter); + let mut lsr = self + ._list_impl( + permit, + scheme, + bucket, + key, + Some(delimiter.into()), + continuation_token.map(String::from), + &self.default_region, + page_size, + ) + .await?; + let target_path = format!("{scheme}://{bucket}/{key}"); + lsr.files.retain(|f| f.filepath == target_path); + + if lsr.files.is_empty() { + // Isnt a file or a directory + return Err(Error::NotFound { path: path.into() }.into()); + } + Ok(lsr) + } else { + Ok(lsr) } - Ok(lsr) } else { + // Perform a prefix-based list of all entries with this prefix + let lsr = { + let permit = self + .connection_pool_sema + .acquire() + .await + .context(UnableToGrabSemaphoreSnafu)?; + + self._list_impl( + permit, + scheme, + bucket, + key, + None, // triggers prefix-based list + continuation_token.map(String::from), + &self.default_region, + page_size, + ) + .await? + }; Ok(lsr) } } @@ -842,7 +879,7 @@ mod tests { }; let client = S3LikeSource::get_client(&config).await?; - client.ls(file_path, None, None).await?; + client.ls(file_path, "/", true, None, None).await?; Ok(()) } diff --git a/tests/integration/io/benchmarks/__init__.py b/tests/integration/io/benchmarks/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/integration/io/benchmarks/test_benchmark_glob.py b/tests/integration/io/benchmarks/test_benchmark_glob.py new file mode 100644 index 0000000000..09a1d12fd4 --- /dev/null +++ b/tests/integration/io/benchmarks/test_benchmark_glob.py @@ -0,0 +1,228 @@ +from __future__ import annotations + +import itertools + +import pytest +import s3fs + +from daft.daft import io_glob, io_list + +from ..conftest import minio_create_bucket + +NUM_FILES = 10000 +NUM_LEVELS = 4 +FANOUT_PER_LEVEL = 12 +BUCKET = "bucket" + + +def generate_one_file_per_dir(): + # Total of 10k files (10^4 * 1) + NUM_LEVELS = 4 + FANOUT_PER_LEVEL = 10 + return [ + "/".join(f"part_col_{i}={val}" for i, val in enumerate(part_vals)) + f"/0.parquet" + for part_vals in itertools.product([str(i) for i in range(FANOUT_PER_LEVEL)], repeat=NUM_LEVELS) + ] + + +def generate_balanced_partitioned_data(): + # Total of 10k files (10^3 * 10) + NUM_LEVELS = 3 + FANOUT_PER_LEVEL = 10 + FILES_PER_PARTITION = 10 + return [ + "/".join(f"part_col_{i}={val}" for i, val in enumerate(part_vals)) + f"/{i}.parquet" + for i in range(FILES_PER_PARTITION) + for part_vals in itertools.product([str(i) for i in range(FANOUT_PER_LEVEL)], repeat=NUM_LEVELS) + ] + + +def generate_left_skew_partitioned_data(): + # Total of 10k files (10^3 * 10) + NUM_LEVELS = 3 + FANOUT_PER_LEVEL = 10 + + # First partition contains 1009 files, and the other partitions have 9 files each + num_files_per_partition = [1009] + [9 for i in range(999)] + + return [ + "/".join(f"part_col_{i}={val}" for i, val in enumerate(part_vals)) + f"/{i}.parquet" + for part_vals, num_files_for_this_partition in zip( + itertools.product([str(i) for i in range(FANOUT_PER_LEVEL)], repeat=NUM_LEVELS), num_files_per_partition + ) + for i in range(num_files_for_this_partition) + ] + + +def generate_right_skew_partitioned_data(): + # Total of 10k files (10^3 * 10) + NUM_LEVELS = 3 + FANOUT_PER_LEVEL = 10 + + # Last partition contains 1009 files, and the other partitions have 9 files each + num_files_per_partition = [9 for i in range(999)] + [1009] + + return [ + "/".join(f"part_col_{i}={val}" for i, val in enumerate(part_vals)) + f"/{i}.parquet" + for part_vals, num_files_for_this_partition in zip( + itertools.product([str(i) for i in range(FANOUT_PER_LEVEL)], repeat=NUM_LEVELS), num_files_per_partition + ) + for i in range(num_files_for_this_partition) + ] + + +def generate_left_skew_dirs_partitioned_data(): + # First partition in level 0 fans out (1 * 20 * 20 * 10 files = 4000 files) + # The rest of the partitions in level 0 fan out much smaller (1 * 8 * 8 * 10 = 10 files) * 9 = 5940 files + first_partition_paths = [ + "/".join(f"part_col_{i}={val}" for i, val in enumerate([0, *part_vals])) + f"/{i}.parquet" + for i in range(10) + for part_vals in itertools.product([str(i) for i in range(20)], repeat=2) + ] + other_partition_paths = [ + "/".join(f"part_col_{i}={val}" for i, val in enumerate([first_level_val, *part_vals])) + f"/{i}.parquet" + for first_level_val in range(1, 10) + for part_vals in itertools.product([str(i) for i in range(8)], repeat=2) + for i in range(10) + ] + + return first_partition_paths + other_partition_paths + + +def generate_right_skew_dirs_partitioned_data(): + # Last partition in level 0 fans out (1 * 20 * 20 * 10 files = 4000 files) + # The rest of the partitions in level 0 fan out much smaller (1 * 8 * 8 * 10 = 10 files) * 9 = 5940 files + last_partition_paths = [ + "/".join(f"part_col_{i}={val}" for i, val in enumerate([9, *part_vals])) + f"/{i}.parquet" + for i in range(10) + for part_vals in itertools.product([str(i) for i in range(20)], repeat=2) + ] + other_partition_paths = [ + "/".join(f"part_col_{i}={val}" for i, val in enumerate([first_level_val, *part_vals])) + f"/{i}.parquet" + for first_level_val in range(0, 9) + for part_vals in itertools.product([str(i) for i in range(8)], repeat=2) + for i in range(10) + ] + + return other_partition_paths + last_partition_paths + + +def generate_bushy_late_partitioned_data(): + # Total of 10k files (10^3 * 10) + return [f"single/single/part_col={val}" + f"/{i}.parquet" for i in range(10) for val in range(1000)] + + +def generate_bushy_early_partitioned_data(): + # Total of 10k files (10^3 * 10) + return [f"part_col={val}/single/single" + f"/{i}.parquet" for i in range(10) for val in range(1000)] + + +FILE_NAME_GENERATORS = { + "one-file-per-dir": generate_one_file_per_dir, + "partitioned-data-balanced": generate_balanced_partitioned_data, + "partitioned-data-left-skew-files": generate_left_skew_partitioned_data, + "partitioned-data-right-skew-files": generate_right_skew_partitioned_data, + "partitioned-data-left-skew-dirs": generate_left_skew_dirs_partitioned_data, + "partitioned-data-right-skew-dirs": generate_right_skew_dirs_partitioned_data, + "partitioned-data-bushy-early": generate_bushy_early_partitioned_data, + "partitioned-data-bushy-late": generate_bushy_late_partitioned_data, +} + + +@pytest.fixture( + scope="module", + params=[ + "one-file-per-dir", + "partitioned-data-balanced", + "partitioned-data-left-skew-files", + "partitioned-data-right-skew-files", + "partitioned-data-right-skew-dirs", + "partitioned-data-left-skew-dirs", + "partitioned-data-bushy-early", + "partitioned-data-bushy-late", + ], +) +def setup_bucket(request, minio_io_config): + test_name = request.param + file_name_generator = FILE_NAME_GENERATORS[test_name] + with minio_create_bucket(minio_io_config, bucket_name=BUCKET) as fs: + files = file_name_generator() + print(f"Num files: {len(files)}") + for name in files: + fs.touch(f"{BUCKET}/{name}") + yield len(set(files)) + + +@pytest.mark.benchmark(group="glob") +@pytest.mark.integration() +def test_benchmark_glob_s3fs(benchmark, setup_bucket, minio_io_config): + fs = s3fs.S3FileSystem( + key=minio_io_config.s3.key_id, + password=minio_io_config.s3.access_key, + client_kwargs={"endpoint_url": minio_io_config.s3.endpoint_url}, + ) + results = benchmark( + lambda: fs.glob( + f"s3://{BUCKET}/**/*.parquet", + # Can't set page size for s3fs + # max_items=PAGE_SIZE, + ) + ) + assert len(results) == setup_bucket + + +@pytest.mark.benchmark(group="glob") +@pytest.mark.integration() +@pytest.mark.parametrize("page_size", [100, 1000]) +def test_benchmark_glob_boto3_list(benchmark, setup_bucket, minio_io_config, page_size): + import boto3 + + s3 = boto3.client( + "s3", + aws_access_key_id=minio_io_config.s3.key_id, + aws_secret_access_key=minio_io_config.s3.access_key, + endpoint_url=minio_io_config.s3.endpoint_url, + ) + + def f(): + continuation_token = None + has_next = True + opts = {"MaxKeys": page_size} + data = [] + while has_next: + if continuation_token is not None: + opts["ContinuationToken"] = continuation_token + + response = s3.list_objects_v2( + Bucket=BUCKET, + Prefix="", + **opts, + ) + data.extend([{"path": f"s3://{BUCKET}/{d['Key']}", "type": "File"} for d in response.get("Contents", [])]) + continuation_token = response.get("NextContinuationToken") + has_next = continuation_token is not None + + return data + + data = benchmark(f) + assert len(data) == setup_bucket + + +@pytest.mark.benchmark(group="glob") +@pytest.mark.integration() +@pytest.mark.parametrize("fanout_limit", [128, 256]) +@pytest.mark.parametrize("page_size", [100, 1000]) +def test_benchmark_glob_daft(benchmark, setup_bucket, minio_io_config, fanout_limit, page_size): + results = benchmark( + lambda: io_glob( + f"s3://{BUCKET}/**/*.parquet", io_config=minio_io_config, fanout_limit=fanout_limit, page_size=page_size + ) + ) + assert len(results) == setup_bucket + + +@pytest.mark.benchmark(group="glob") +@pytest.mark.integration() +def test_benchmark_io_list_recursive_daft(benchmark, setup_bucket, minio_io_config): + results = benchmark(lambda: io_list(f"s3://{BUCKET}/", io_config=minio_io_config, recursive=True)) + assert len([r for r in results if r["type"] == "File"]) == setup_bucket diff --git a/tests/integration/io/test_list_files_s3_minio.py b/tests/integration/io/test_list_files_s3_minio.py index 80b0439fac..3f0fc9a715 100644 --- a/tests/integration/io/test_list_files_s3_minio.py +++ b/tests/integration/io/test_list_files_s3_minio.py @@ -174,7 +174,8 @@ def s3fs_recursive_list(fs, path) -> list: ), ], ) -def test_directory_globbing_fragment_wildcard(minio_io_config, path_expect_pair): +@pytest.mark.parametrize("fanout_limit", [None, 1]) +def test_directory_globbing_fragment_wildcard(minio_io_config, path_expect_pair, fanout_limit): globpath, expect = path_expect_pair with minio_create_bucket(minio_io_config, bucket_name="bucket") as fs: files = [ @@ -193,9 +194,9 @@ def test_directory_globbing_fragment_wildcard(minio_io_config, path_expect_pair) if type(expect) == type and issubclass(expect, BaseException): with pytest.raises(expect): - io_glob(globpath, io_config=minio_io_config) + io_glob(globpath, io_config=minio_io_config, fanout_limit=fanout_limit) else: - daft_ls_result = io_glob(globpath, io_config=minio_io_config) + daft_ls_result = io_glob(globpath, io_config=minio_io_config, fanout_limit=fanout_limit) assert sorted(daft_ls_result, key=lambda d: d["path"]) == sorted(expect, key=lambda d: d["path"]) @@ -264,6 +265,32 @@ def test_directory_globbing_special_characters(minio_io_config, path_expect_pair assert sorted(daft_ls_result, key=lambda d: d["path"]) == sorted(expect, key=lambda d: d["path"]) +@pytest.mark.integration() +def test_directory_globbing_common_prefix_cornercase(minio_io_config): + with minio_create_bucket(minio_io_config, bucket_name="bucket") as fs: + files = [ + "1/a/file.txt", + "1/b/file.txt", + # share a prefix with `1` which may cause issues if we drop the trailing / in a prefix list + "11/a/file.txt", + "11/b/file.txt", + ] + for name in files: + fs.touch(f"bucket/{name}") + + # Force a prefix listing on the second level when the fanout becomes more than 2 + daft_ls_result = io_glob("s3://bucket/**", io_config=minio_io_config, fanout_limit=2) + assert sorted(daft_ls_result, key=lambda d: d["path"]) == sorted( + [ + {"type": "File", "path": "s3://bucket/1/a/file.txt", "size": 0}, + {"type": "File", "path": "s3://bucket/1/b/file.txt", "size": 0}, + {"type": "File", "path": "s3://bucket/11/a/file.txt", "size": 0}, + {"type": "File", "path": "s3://bucket/11/b/file.txt", "size": 0}, + ], + key=lambda d: d["path"], + ) + + @pytest.mark.integration() def test_flat_directory_listing(minio_io_config): bucket_name = "bucket"