From 15262331ec65f10f5eea0deac840cdac2edd161e Mon Sep 17 00:00:00 2001 From: Jay Chia <17691182+jaychia@users.noreply.github.com> Date: Fri, 22 Sep 2023 17:31:42 -0700 Subject: [PATCH] [BUG] Fix gs listing to include 0 sized marker files (#1412) * Fix gs listing to include 0 sized marker files (these files are created when manually creating folders in the UI) * Fixes also for multiple trailing delimiters * More test coverage for recursive cases --------- Co-authored-by: Jay Chia --- src/daft-io/src/google_cloud.rs | 54 +++++++++++-------- tests/integration/io/test_list_files_gcs.py | 57 +++++++++------------ 2 files changed, 57 insertions(+), 54 deletions(-) diff --git a/src/daft-io/src/google_cloud.rs b/src/daft-io/src/google_cloud.rs index fe8973d1e9..7f3a2a3849 100644 --- a/src/daft-io/src/google_cloud.rs +++ b/src/daft-io/src/google_cloud.rs @@ -45,7 +45,7 @@ enum Error { }, #[snafu(display("Not a File: \"{}\"", path))] - NotAFile { path: String }, + NotFound { path: String }, } impl From for super::Error { @@ -90,7 +90,10 @@ impl From for super::Error { source: err, }, }, - NotAFile { path } => super::Error::NotAFile { path }, + NotFound { ref path } => super::Error::NotFound { + path: path.into(), + source: error.into(), + }, InvalidUrl { path, source } => super::Error::InvalidUrl { path, source }, UnableToLoadCredentials { source } => super::Error::UnableToLoadCredentials { store: super::SourceType::GCS, @@ -192,7 +195,7 @@ impl GCSClientWrapper { client: &Client, bucket: &str, key: &str, - delimiter: Option<&str>, + delimiter: &str, continuation_token: Option<&str>, ) -> super::Result { let req = ListObjectsRequest { @@ -201,8 +204,8 @@ impl GCSClientWrapper { end_offset: None, start_offset: None, page_token: continuation_token.map(|s| s.to_string()), - delimiter: Some(delimiter.unwrap_or("/").to_string()), // returns results in "directory mode" - max_results: Some(1000), // Recommended value from API docs + delimiter: Some(delimiter.to_string()), // returns results in "directory mode" + max_results: Some(1000), // Recommended value from API docs include_trailing_delimiter: Some(false), // This will not populate "directories" in the response's .item[] projection: None, versions: None, @@ -215,17 +218,10 @@ impl GCSClientWrapper { })?; let response_items = ls_response.items.unwrap_or_default(); let response_prefixes = ls_response.prefixes.unwrap_or_default(); - let files = response_items.iter().filter_map(|obj| { - if obj.name.ends_with('/') { - // Sometimes the GCS API returns "folders" in .items[], so we manually filter here - None - } else { - Some(FileMetadata { - filepath: format!("gs://{}/{}", bucket, obj.name), - size: Some(obj.size as u64), - filetype: FileType::File, - }) - } + let files = response_items.iter().map(|obj| FileMetadata { + filepath: format!("gs://{}/{}", bucket, obj.name), + size: Some(obj.size as u64), + filetype: FileType::File, }); let dirs = response_prefixes.iter().map(|pref| FileMetadata { filepath: format!("gs://{}/{}", bucket, pref), @@ -246,12 +242,12 @@ impl GCSClientWrapper { ) -> super::Result { let uri = url::Url::parse(path).with_context(|_| InvalidUrlSnafu { path })?; let (bucket, key) = parse_uri(&uri)?; + let delimiter = delimiter.unwrap_or("/"); match self { GCSClientWrapper::Native(client) => { // Attempt to forcefully ls the key as a directory (by ensuring a "/" suffix) - // If no items were obtained, then this is actually a file and we perform a second ls to obtain just the file's - // details as the one-and-only-one entry - let forced_directory_key = format!("{}/", key.strip_suffix('/').unwrap_or(key)); + let forced_directory_key = + format!("{}{delimiter}", key.trim_end_matches(delimiter)); let forced_directory_ls_result = self ._ls_impl( client, @@ -261,15 +257,29 @@ impl GCSClientWrapper { continuation_token, ) .await?; + + // If no items were obtained, then this is actually a file and we perform a second ls to obtain just the file's + // details as the one-and-only-one entry if forced_directory_ls_result.files.is_empty() { - self._ls_impl(client, bucket, key, delimiter, continuation_token) - .await + let file_result = self + ._ls_impl(client, bucket, key, delimiter, continuation_token) + .await?; + + // Not dir and not file, so it is missing + if file_result.files.is_empty() { + return Err(Error::NotFound { + path: path.to_string(), + } + .into()); + } + + Ok(file_result) } else { Ok(forced_directory_ls_result) } } GCSClientWrapper::S3Compat(client) => { - client.ls(path, delimiter, continuation_token).await + client.ls(path, Some(delimiter), continuation_token).await } } } diff --git a/tests/integration/io/test_list_files_gcs.py b/tests/integration/io/test_list_files_gcs.py index ddbb29bc2d..640d9d5be4 100644 --- a/tests/integration/io/test_list_files_gcs.py +++ b/tests/integration/io/test_list_files_gcs.py @@ -8,22 +8,30 @@ BUCKET = "daft-public-data-gs" +def gcsfs_recursive_list(fs, path) -> list: + all_results = [] + curr_level_result = fs.ls(path, detail=True) + for item in curr_level_result: + if item["type"] == "directory": + new_path = f'gs://{item["name"]}' + all_results.extend(gcsfs_recursive_list(fs, new_path)) + item["name"] += "/" + all_results.append(item) + else: + all_results.append(item) + return all_results + + def compare_gcs_result(daft_ls_result: list, fsspec_result: list): daft_files = [(f["path"], f["type"].lower()) for f in daft_ls_result] gcsfs_files = [(f"gs://{f['name']}", f["type"]) for f in fsspec_result] # Perform necessary post-processing of fsspec results to match expected behavior from Daft: - # NOTE: gcsfs sometimes does not return the trailing / for directories, so we have to ensure it gcsfs_files = [ (f"{path.rstrip('/')}/", type_) if type_ == "directory" else (path, type_) for path, type_ in gcsfs_files ] - # NOTE: gcsfs will sometimes return 0-sized marker files for manually-created folders, which we ignore here - # Be careful here because this will end up pruning any truly size-0 files that are actually files and not folders! - size_0_files = {f"gs://{f['name']}" for f in fsspec_result if f["size"] == 0 and f["type"] == "file"} - gcsfs_files = [(path, type_) for path, type_ in gcsfs_files if path not in size_0_files] - assert len(daft_files) == len(gcsfs_files) assert sorted(daft_files) == sorted(gcsfs_files) @@ -36,22 +44,25 @@ def compare_gcs_result(daft_ls_result: list, fsspec_result: list): f"gs://{BUCKET}/", f"gs://{BUCKET}/test_ls", f"gs://{BUCKET}/test_ls/", + f"gs://{BUCKET}/test_ls//", f"gs://{BUCKET}/test_ls/paginated-1100-files/", ], ) -def test_gs_flat_directory_listing(path): +@pytest.mark.parametrize("recursive", [False, True]) +def test_gs_flat_directory_listing(path, recursive): fs = gcsfs.GCSFileSystem() - daft_ls_result = io_list(path) - fsspec_result = fs.ls(path, detail=True) + daft_ls_result = io_list(path, recursive=recursive) + fsspec_result = gcsfs_recursive_list(fs, path) if recursive else fs.ls(path, detail=True) compare_gcs_result(daft_ls_result, fsspec_result) @pytest.mark.integration() -def test_gs_single_file_listing(): +@pytest.mark.parametrize("recursive", [False, True]) +def test_gs_single_file_listing(recursive): path = f"gs://{BUCKET}/test_ls/file.txt" fs = gcsfs.GCSFileSystem() - daft_ls_result = io_list(path) - fsspec_result = fs.ls(path, detail=True) + daft_ls_result = io_list(path, recursive=recursive) + fsspec_result = gcsfs_recursive_list(fs, path) if recursive else fs.ls(path, detail=True) compare_gcs_result(daft_ls_result, fsspec_result) @@ -61,23 +72,5 @@ def test_gs_notfound(): fs = gcsfs.GCSFileSystem() with pytest.raises(FileNotFoundError): fs.ls(path, detail=True) - - # NOTE: Google Cloud does not return a 404 to indicate anything missing, but just returns empty results - # Thus Daft is unable to differentiate between "missing" folders and "empty" folders - daft_ls_result = io_list(path) - assert daft_ls_result == [] - - -@pytest.mark.integration() -@pytest.mark.parametrize( - "path", - [ - f"gs://{BUCKET}/test_ls", - f"gs://{BUCKET}/test_ls/", - ], -) -def test_gs_flat_directory_listing_recursive(path): - fs = gcsfs.GCSFileSystem() - daft_ls_result = io_list(path, recursive=True) - fsspec_result = list(fs.glob(path.rstrip("/") + "/**", detail=True).values()) - compare_gcs_result(daft_ls_result, fsspec_result) + with pytest.raises(FileNotFoundError, match=path): + io_list(path)