Skip to content

Commit

Permalink
[BUG] Fix gs listing to include 0 sized marker files (#1412)
Browse files Browse the repository at this point in the history
* Fix gs listing to include 0 sized marker files (these files are
created when manually creating folders in the UI)
* Fixes also for multiple trailing delimiters
* More test coverage for recursive cases

---------

Co-authored-by: Jay Chia <[email protected]@users.noreply.github.com>
  • Loading branch information
jaychia and Jay Chia authored Sep 23, 2023
1 parent f00f209 commit 1526233
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 54 deletions.
54 changes: 32 additions & 22 deletions src/daft-io/src/google_cloud.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ enum Error {
},

#[snafu(display("Not a File: \"{}\"", path))]
NotAFile { path: String },
NotFound { path: String },
}

impl From<Error> for super::Error {
Expand Down Expand Up @@ -90,7 +90,10 @@ impl From<Error> for super::Error {
source: err,
},
},
NotAFile { path } => super::Error::NotAFile { path },
NotFound { ref path } => super::Error::NotFound {
path: path.into(),
source: error.into(),
},
InvalidUrl { path, source } => super::Error::InvalidUrl { path, source },
UnableToLoadCredentials { source } => super::Error::UnableToLoadCredentials {
store: super::SourceType::GCS,
Expand Down Expand Up @@ -192,7 +195,7 @@ impl GCSClientWrapper {
client: &Client,
bucket: &str,
key: &str,
delimiter: Option<&str>,
delimiter: &str,
continuation_token: Option<&str>,
) -> super::Result<LSResult> {
let req = ListObjectsRequest {
Expand All @@ -201,8 +204,8 @@ impl GCSClientWrapper {
end_offset: None,
start_offset: None,
page_token: continuation_token.map(|s| s.to_string()),
delimiter: Some(delimiter.unwrap_or("/").to_string()), // returns results in "directory mode"
max_results: Some(1000), // Recommended value from API docs
delimiter: Some(delimiter.to_string()), // returns results in "directory mode"
max_results: Some(1000), // Recommended value from API docs
include_trailing_delimiter: Some(false), // This will not populate "directories" in the response's .item[]
projection: None,
versions: None,
Expand All @@ -215,17 +218,10 @@ impl GCSClientWrapper {
})?;
let response_items = ls_response.items.unwrap_or_default();
let response_prefixes = ls_response.prefixes.unwrap_or_default();
let files = response_items.iter().filter_map(|obj| {
if obj.name.ends_with('/') {
// Sometimes the GCS API returns "folders" in .items[], so we manually filter here
None
} else {
Some(FileMetadata {
filepath: format!("gs://{}/{}", bucket, obj.name),
size: Some(obj.size as u64),
filetype: FileType::File,
})
}
let files = response_items.iter().map(|obj| FileMetadata {
filepath: format!("gs://{}/{}", bucket, obj.name),
size: Some(obj.size as u64),
filetype: FileType::File,
});
let dirs = response_prefixes.iter().map(|pref| FileMetadata {
filepath: format!("gs://{}/{}", bucket, pref),
Expand All @@ -246,12 +242,12 @@ impl GCSClientWrapper {
) -> super::Result<LSResult> {
let uri = url::Url::parse(path).with_context(|_| InvalidUrlSnafu { path })?;
let (bucket, key) = parse_uri(&uri)?;
let delimiter = delimiter.unwrap_or("/");
match self {
GCSClientWrapper::Native(client) => {
// Attempt to forcefully ls the key as a directory (by ensuring a "/" suffix)
// If no items were obtained, then this is actually a file and we perform a second ls to obtain just the file's
// details as the one-and-only-one entry
let forced_directory_key = format!("{}/", key.strip_suffix('/').unwrap_or(key));
let forced_directory_key =
format!("{}{delimiter}", key.trim_end_matches(delimiter));
let forced_directory_ls_result = self
._ls_impl(
client,
Expand All @@ -261,15 +257,29 @@ impl GCSClientWrapper {
continuation_token,
)
.await?;

// If no items were obtained, then this is actually a file and we perform a second ls to obtain just the file's
// details as the one-and-only-one entry
if forced_directory_ls_result.files.is_empty() {
self._ls_impl(client, bucket, key, delimiter, continuation_token)
.await
let file_result = self
._ls_impl(client, bucket, key, delimiter, continuation_token)
.await?;

// Not dir and not file, so it is missing
if file_result.files.is_empty() {
return Err(Error::NotFound {
path: path.to_string(),
}
.into());
}

Ok(file_result)
} else {
Ok(forced_directory_ls_result)
}
}
GCSClientWrapper::S3Compat(client) => {
client.ls(path, delimiter, continuation_token).await
client.ls(path, Some(delimiter), continuation_token).await
}
}
}
Expand Down
57 changes: 25 additions & 32 deletions tests/integration/io/test_list_files_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,30 @@
BUCKET = "daft-public-data-gs"


def gcsfs_recursive_list(fs, path) -> list:
all_results = []
curr_level_result = fs.ls(path, detail=True)
for item in curr_level_result:
if item["type"] == "directory":
new_path = f'gs://{item["name"]}'
all_results.extend(gcsfs_recursive_list(fs, new_path))
item["name"] += "/"
all_results.append(item)
else:
all_results.append(item)
return all_results


def compare_gcs_result(daft_ls_result: list, fsspec_result: list):
daft_files = [(f["path"], f["type"].lower()) for f in daft_ls_result]
gcsfs_files = [(f"gs://{f['name']}", f["type"]) for f in fsspec_result]

# Perform necessary post-processing of fsspec results to match expected behavior from Daft:

# NOTE: gcsfs sometimes does not return the trailing / for directories, so we have to ensure it
gcsfs_files = [
(f"{path.rstrip('/')}/", type_) if type_ == "directory" else (path, type_) for path, type_ in gcsfs_files
]

# NOTE: gcsfs will sometimes return 0-sized marker files for manually-created folders, which we ignore here
# Be careful here because this will end up pruning any truly size-0 files that are actually files and not folders!
size_0_files = {f"gs://{f['name']}" for f in fsspec_result if f["size"] == 0 and f["type"] == "file"}
gcsfs_files = [(path, type_) for path, type_ in gcsfs_files if path not in size_0_files]

assert len(daft_files) == len(gcsfs_files)
assert sorted(daft_files) == sorted(gcsfs_files)

Expand All @@ -36,22 +44,25 @@ def compare_gcs_result(daft_ls_result: list, fsspec_result: list):
f"gs://{BUCKET}/",
f"gs://{BUCKET}/test_ls",
f"gs://{BUCKET}/test_ls/",
f"gs://{BUCKET}/test_ls//",
f"gs://{BUCKET}/test_ls/paginated-1100-files/",
],
)
def test_gs_flat_directory_listing(path):
@pytest.mark.parametrize("recursive", [False, True])
def test_gs_flat_directory_listing(path, recursive):
fs = gcsfs.GCSFileSystem()
daft_ls_result = io_list(path)
fsspec_result = fs.ls(path, detail=True)
daft_ls_result = io_list(path, recursive=recursive)
fsspec_result = gcsfs_recursive_list(fs, path) if recursive else fs.ls(path, detail=True)
compare_gcs_result(daft_ls_result, fsspec_result)


@pytest.mark.integration()
def test_gs_single_file_listing():
@pytest.mark.parametrize("recursive", [False, True])
def test_gs_single_file_listing(recursive):
path = f"gs://{BUCKET}/test_ls/file.txt"
fs = gcsfs.GCSFileSystem()
daft_ls_result = io_list(path)
fsspec_result = fs.ls(path, detail=True)
daft_ls_result = io_list(path, recursive=recursive)
fsspec_result = gcsfs_recursive_list(fs, path) if recursive else fs.ls(path, detail=True)
compare_gcs_result(daft_ls_result, fsspec_result)


Expand All @@ -61,23 +72,5 @@ def test_gs_notfound():
fs = gcsfs.GCSFileSystem()
with pytest.raises(FileNotFoundError):
fs.ls(path, detail=True)

# NOTE: Google Cloud does not return a 404 to indicate anything missing, but just returns empty results
# Thus Daft is unable to differentiate between "missing" folders and "empty" folders
daft_ls_result = io_list(path)
assert daft_ls_result == []


@pytest.mark.integration()
@pytest.mark.parametrize(
"path",
[
f"gs://{BUCKET}/test_ls",
f"gs://{BUCKET}/test_ls/",
],
)
def test_gs_flat_directory_listing_recursive(path):
fs = gcsfs.GCSFileSystem()
daft_ls_result = io_list(path, recursive=True)
fsspec_result = list(fs.glob(path.rstrip("/") + "/**", detail=True).values())
compare_gcs_result(daft_ls_result, fsspec_result)
with pytest.raises(FileNotFoundError, match=path):
io_list(path)

0 comments on commit 1526233

Please sign in to comment.