Skip to content

Commit

Permalink
[BUG] Filter out size-0 directory marker files during s3 globs (#1629)
Browse files Browse the repository at this point in the history
This should fix CI failures:
https://github.com/Eventual-Inc/daft-benchmarking/actions/runs/6899343721/job/18770787773

---------

Co-authored-by: Jay Chia <[email protected]@users.noreply.github.com>
  • Loading branch information
jaychia and Jay Chia authored Nov 17, 2023
1 parent a7bd5ec commit f18aeae
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 1 deletion.
25 changes: 24 additions & 1 deletion src/daft-io/src/object_store_glob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,24 @@ async fn ls_with_prefix_fallback(
(s.boxed(), dir_count_so_far)
}

/// Helper to filter FileMetadata entries that should not be returned by globbing
fn _should_return(fm: &FileMetadata) -> bool {
match fm.filetype {
// Do not return size-0 File entries that end with "/"
// These are usually used to demarcate "empty folders", since S3 is not really a filesystem
// However they can lead to unexpected globbing behavior since most users do not expect them to exist
FileType::File
if fm.filepath.ends_with(GLOB_DELIMITER) && fm.size.is_some_and(|s| s == 0) =>
{
false
}
// Return all other File entries
FileType::File => true,
// Globbing does not return Directory results
FileType::Directory => false,
}
}

/// Globs an ObjectSource for Files
///
/// Uses the `globset` crate for matching, and thus supports all the syntax enabled by that crate.
Expand Down Expand Up @@ -333,7 +351,7 @@ pub(crate) async fn glob(
while let Some(result) = results.next().await && remaining_results.map(|rr| rr > 0).unwrap_or(true) {
match result {
Ok(fm) => {
if matches!(fm.filetype, FileType::File) {
if _should_return(&fm) {
remaining_results = remaining_results.map(|rr| rr - 1);
yield Ok(fm)
}
Expand Down Expand Up @@ -600,6 +618,11 @@ pub(crate) async fn glob(
let to_rtn_stream = stream! {
let mut remaining_results = limit;
while remaining_results.map(|rr| rr > 0).unwrap_or(true) && let Some(v) = to_rtn_rx.recv().await {

if v.as_ref().is_ok_and(|v| !_should_return(v)) {
continue
}

remaining_results = remaining_results.map(|rr| rr - 1);
yield v
}
Expand Down
3 changes: 3 additions & 0 deletions tests/integration/io/test_list_files_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ def compare_gcs_result(daft_ls_result: list, fsspec_result: list):
# Remove all directories: our glob utilities don't return dirs
gcsfs_files = [(path, type_) for path, type_ in gcsfs_files if type_ == "file"]

# Remove size-0 directory marker files
gcsfs_files = [(path, type_) for path, type_ in gcsfs_files if not path.endswith("/")]

assert len(daft_files) == len(gcsfs_files)
assert sorted(daft_files) == sorted(gcsfs_files)

Expand Down

0 comments on commit f18aeae

Please sign in to comment.