diff --git a/src/daft-io/src/azure_blob.rs b/src/daft-io/src/azure_blob.rs index cf4752fe97..a546a71422 100644 --- a/src/daft-io/src/azure_blob.rs +++ b/src/daft-io/src/azure_blob.rs @@ -381,10 +381,6 @@ impl AzureBlobSource { #[async_trait] impl ObjectSource for AzureBlobSource { - fn delimiter(&self) -> &'static str { - AZURE_DELIMITER - } - async fn get(&self, uri: &str, range: Option>) -> super::Result { let parsed = url::Url::parse(uri).with_context(|_| InvalidUrlSnafu { path: uri })?; let container = match parsed.host_str() { diff --git a/src/daft-io/src/google_cloud.rs b/src/daft-io/src/google_cloud.rs index bc190512dd..de2c6127a9 100644 --- a/src/daft-io/src/google_cloud.rs +++ b/src/daft-io/src/google_cloud.rs @@ -362,10 +362,6 @@ impl GCSSource { #[async_trait] impl ObjectSource for GCSSource { - fn delimiter(&self) -> &'static str { - GCS_DELIMITER - } - async fn get(&self, uri: &str, range: Option>) -> super::Result { self.client.get(uri, range).await } diff --git a/src/daft-io/src/http.rs b/src/daft-io/src/http.rs index 9bf80f9f6a..a433c99c80 100644 --- a/src/daft-io/src/http.rs +++ b/src/daft-io/src/http.rs @@ -169,10 +169,6 @@ impl HttpSource { #[async_trait] impl ObjectSource for HttpSource { - fn delimiter(&self) -> &'static str { - HTTP_DELIMITER - } - async fn get(&self, uri: &str, range: Option>) -> super::Result { let request = self.client.get(uri); let request = match range { diff --git a/src/daft-io/src/local.rs b/src/daft-io/src/local.rs index c3d617f5bb..9424a49669 100644 --- a/src/daft-io/src/local.rs +++ b/src/daft-io/src/local.rs @@ -17,7 +17,11 @@ use std::sync::Arc; use tokio::io::{AsyncReadExt, AsyncSeekExt}; use url::ParseError; -const PLATFORM_FS_DELIMITER: &str = std::path::MAIN_SEPARATOR_STR; +/// NOTE: We hardcode this even for Windows +/// +/// For the most part, Windows machines work quite well with POSIX-style paths +/// as long as there is no "mix" of "\" and "/". +const PATH_SEGMENT_DELIMITER: &str = "/"; pub(crate) struct LocalSource {} @@ -107,10 +111,6 @@ pub struct LocalFile { #[async_trait] impl ObjectSource for LocalSource { - fn delimiter(&self) -> &'static str { - PLATFORM_FS_DELIMITER - } - async fn get(&self, uri: &str, range: Option>) -> super::Result { const LOCAL_PROTOCOL: &str = "file://"; if let Some(uri) = uri.strip_prefix(LOCAL_PROTOCOL) { @@ -148,6 +148,14 @@ impl ObjectSource for LocalSource { let fanout_limit = None; let page_size = None; + // If on Windows, the delimiter provided may be "\" which is treated as an escape character by `glob` + // We sanitize our filepaths here but note that on-return we will be received POSIX-style paths as well + #[cfg(target_env = "msvc")] + { + let glob_path = glob_path.replace("\\", "/"); + return glob(self, glob_path.as_str(), fanout_limit, page_size).await; + } + glob(self, glob_path, fanout_limit, page_size).await } @@ -208,6 +216,15 @@ impl ObjectSource for LocalSource { let entry = entry.with_context(|_| UnableToFetchDirectoryEntriesSnafu { path: uri.to_string(), })?; + + // NOTE: `entry` returned by ReadDirStream can potentially mix posix-delimiters ("/") and windows-delimiter ("\") + // on Windows machines if we naively use `entry.path()`. Manually concatting the entries to the uri is safer. + let path = format!( + "{}{PATH_SEGMENT_DELIMITER}{}", + uri.trim_end_matches(PATH_SEGMENT_DELIMITER), + entry.file_name().to_string_lossy() + ); + let meta = tokio::fs::metadata(entry.path()).await.with_context(|_| { UnableToFetchFileMetadataSnafu { path: entry.path().to_string_lossy().to_string(), @@ -217,8 +234,12 @@ impl ObjectSource for LocalSource { filepath: format!( "{}{}{}", LOCAL_PROTOCOL, - entry.path().to_string_lossy(), - if meta.is_dir() { self.delimiter() } else { "" } + path, + if meta.is_dir() { + PATH_SEGMENT_DELIMITER + } else { + "" + } ), size: Some(meta.len()), filetype: meta.file_type().try_into().with_context(|_| { @@ -348,7 +369,7 @@ mod tests { write_remote_parquet_to_local_file(&mut file2).await?; let mut file3 = tempfile::NamedTempFile::new_in(dir.path()).unwrap(); write_remote_parquet_to_local_file(&mut file3).await?; - let dir_path = format!("file://{}", dir.path().to_string_lossy()); + let dir_path = format!("file://{}", dir.path().to_string_lossy().replace("\\", "/")); let client = LocalSource::get_client().await?; let ls_result = client.ls(dir_path.as_ref(), true, None, None).await?; @@ -357,17 +378,29 @@ mod tests { files.sort_by(|a, b| a.filepath.cmp(&b.filepath)); let mut expected = vec![ FileMetadata { - filepath: format!("file://{}", file1.path().to_string_lossy()), + filepath: format!( + "file://{}/{}", + dir.path().to_string_lossy().replace("\\", "/"), + file1.path().file_name().unwrap().to_string_lossy(), + ), size: Some(file1.as_file().metadata().unwrap().len()), filetype: FileType::File, }, FileMetadata { - filepath: format!("file://{}", file2.path().to_string_lossy()), + filepath: format!( + "file://{}/{}", + dir.path().to_string_lossy().replace("\\", "/"), + file2.path().file_name().unwrap().to_string_lossy(), + ), size: Some(file2.as_file().metadata().unwrap().len()), filetype: FileType::File, }, FileMetadata { - filepath: format!("file://{}", file3.path().to_string_lossy()), + filepath: format!( + "file://{}/{}", + dir.path().to_string_lossy().replace("\\", "/"), + file3.path().file_name().unwrap().to_string_lossy(), + ), size: Some(file3.as_file().metadata().unwrap().len()), filetype: FileType::File, }, diff --git a/src/daft-io/src/object_io.rs b/src/daft-io/src/object_io.rs index d7504b27ab..ba8d4d15a6 100644 --- a/src/daft-io/src/object_io.rs +++ b/src/daft-io/src/object_io.rs @@ -94,9 +94,6 @@ use async_stream::stream; #[async_trait] pub(crate) trait ObjectSource: Sync + Send { - /// Returns the delimiter for the platform (S3 vs GCS vs Azure vs local-unix vs Windows etc) - fn delimiter(&self) -> &'static str; - async fn get(&self, uri: &str, range: Option>) -> super::Result; async fn get_range(&self, uri: &str, range: Range) -> super::Result { self.get(uri, Some(range)).await diff --git a/src/daft-io/src/object_store_glob.rs b/src/daft-io/src/object_store_glob.rs index 5ffb6aec41..162462d891 100644 --- a/src/daft-io/src/object_store_glob.rs +++ b/src/daft-io/src/object_store_glob.rs @@ -18,6 +18,11 @@ lazy_static! { const SCHEME_SUFFIX_LEN: usize = "://".len(); +/// NOTE: Our globbing logic makes very strong assumptions about the delimiter being used to denote +/// directories. The concept of a "glob" is a Unix concept anyways, and so even for Windows machines +/// the `glob` utility can only be used with POSIX-style paths. +const GLOB_DELIMITER: &str = "/"; + #[derive(Clone)] pub(crate) struct GlobState { // Current path in dirtree and glob_fragments @@ -153,10 +158,7 @@ impl GlobFragment { /// 2. Non-wildcard fragments are joined and coalesced by delimiter /// 3. The first fragment is prefixed by "{scheme}://" /// 4. Preserves any leading delimiters -pub(crate) fn to_glob_fragments( - glob_str: &str, - delimiter: &str, -) -> super::Result> { +pub(crate) fn to_glob_fragments(glob_str: &str) -> super::Result> { // NOTE: We only use the URL parse library to get the scheme, because it will escape some of our glob special characters // such as ? and {} let glob_url = url::Url::parse(glob_str).map_err(|e| super::Error::InvalidUrl { @@ -169,8 +171,8 @@ pub(crate) fn to_glob_fragments( // NOTE: Leading delimiter may be important for absolute paths on local directory, and is considered // part of the first fragment - let leading_delimiter = if glob_str_after_scheme.starts_with(delimiter) { - delimiter + let leading_delimiter = if glob_str_after_scheme.starts_with(GLOB_DELIMITER) { + GLOB_DELIMITER } else { "" }; @@ -179,7 +181,7 @@ pub(crate) fn to_glob_fragments( let mut coalesced_fragments = vec![]; let mut nonspecial_fragments_so_far = vec![]; for fragment in glob_str_after_scheme - .split(delimiter) + .split(GLOB_DELIMITER) .map(GlobFragment::new) { match fragment { @@ -188,7 +190,7 @@ pub(crate) fn to_glob_fragments( if !nonspecial_fragments_so_far.is_empty() { coalesced_fragments.push(GlobFragment::join( nonspecial_fragments_so_far.drain(..).as_slice(), - delimiter, + GLOB_DELIMITER, )); } coalesced_fragments.push(fragment); @@ -201,7 +203,7 @@ pub(crate) fn to_glob_fragments( if !nonspecial_fragments_so_far.is_empty() { coalesced_fragments.push(GlobFragment::join( nonspecial_fragments_so_far.drain(..).as_slice(), - delimiter, + GLOB_DELIMITER, )); } @@ -293,6 +295,10 @@ async fn ls_with_prefix_fallback( /// Uses the `globset` crate for matching, and thus supports all the syntax enabled by that crate. /// See: https://docs.rs/globset/latest/globset/#syntax /// +/// NOTE: Users of this function are responsible for sanitizing their paths and delimiters to follow the `globset` crate's expectations +/// in terms of delimiters. E.g. on Windows machines, callers of [`glob`] must convert all Windows-style "\" delimiters to "/" because +/// `globset` treats "\" as escape characters. +/// /// Arguments: /// * source: the ObjectSource to use for file listing /// * glob: the string to glob @@ -306,9 +312,7 @@ pub(crate) async fn glob( glob: &str, fanout_limit: Option, page_size: Option, -) -> super::Result>> { - let delimiter = source.delimiter(); - +) -> super::Result>> { // If no special characters, we fall back to ls behavior let full_fragment = GlobFragment::new(glob); if !full_fragment.has_special_character() { @@ -331,14 +335,14 @@ pub(crate) async fn glob( // If user specifies a trailing / then we understand it as an attempt to list the folder(s) matched // and append a trailing * fragment - let glob = if glob.ends_with(source.delimiter()) { + let glob = if glob.ends_with(GLOB_DELIMITER) { glob.to_string() + "*" } else { glob.to_string() }; let glob = glob.as_str(); - let glob_fragments = to_glob_fragments(glob, delimiter)?; + let glob_fragments = to_glob_fragments(glob)?; let full_glob_matcher = GlobBuilder::new(glob) .literal_separator(true) .backslash_escape(true) @@ -468,7 +472,7 @@ pub(crate) async fn glob( let partial_glob_matcher = GlobBuilder::new( GlobFragment::join( &state.glob_fragments[..state.current_fragment_idx + 1], - source.delimiter(), + GLOB_DELIMITER, ) .raw_str(), ) @@ -492,7 +496,7 @@ pub(crate) async fn glob( Ok(fm) => match fm.filetype { FileType::Directory if partial_glob_matcher.is_match( - fm.filepath.as_str().trim_end_matches(source.delimiter()), + fm.filepath.as_str().trim_end_matches(GLOB_DELIMITER), ) => { visit( diff --git a/src/daft-io/src/s3_like.rs b/src/daft-io/src/s3_like.rs index fea3e1629e..3a549d2afb 100644 --- a/src/daft-io/src/s3_like.rs +++ b/src/daft-io/src/s3_like.rs @@ -692,10 +692,6 @@ impl S3LikeSource { #[async_trait] impl ObjectSource for S3LikeSource { - fn delimiter(&self) -> &'static str { - S3_DELIMITER - } - async fn get(&self, uri: &str, range: Option>) -> super::Result { let permit = self .connection_pool_sema diff --git a/tests/io/test_list_files_local.py b/tests/io/test_list_files_local.py index fc7cd5259c..e18f2acaf8 100644 --- a/tests/io/test_list_files_local.py +++ b/tests/io/test_list_files_local.py @@ -27,6 +27,9 @@ def compare_local_result(daft_ls_result: list, fs_result: list): # io_glob does not return directories fs_files = [(p, t) for p, t in fs_files if t == "file"] + # io_glob returns posix-style paths + fs_files = [(p.replace("\\", "/"), t) for p, t in fs_files] + assert sorted(daft_files) == sorted(fs_files) @@ -134,5 +137,5 @@ def test_missing_file_path(tmp_path, include_protocol): p = f"{d}/c/cc/ddd" if include_protocol: p = "file://" + p - with pytest.raises(FileNotFoundError, match=f"File: {d}/c/cc/ddd not found"): + with pytest.raises(FileNotFoundError, match=f"/c/cc/ddd not found"): io_glob(p)