Skip to content

Commit

Permalink
Ensure that errors are sent back into the channel instead of swallowe…
Browse files Browse the repository at this point in the history
…d silently
  • Loading branch information
Jay Chia committed Sep 28, 2023
1 parent 3101cbe commit a1a1513
Showing 1 changed file with 83 additions and 87 deletions.
170 changes: 83 additions & 87 deletions src/daft-io/src/object_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,98 +199,98 @@ pub(crate) async fn glob(
.expect("Cannot parse glob")
.compile_matcher();

let mut next_level_file_metadata =
source.iter_dir(path.as_str(), Some("/"), None).await?;

while let Some(fm) = next_level_file_metadata.next().await {
match fm {
Ok(fm) => {
// Recursively visit each sub-directory, do not increment `i` so as to keep visiting the "**" fragmemt
if matches!(fm.filetype, FileType::Directory) {
visit(
result_tx.clone(),
source.clone(),
&fm.filepath,
(glob_fragments.clone(), i),
);
}
// Return any Files that match
if glob_matcher.is_match(fm.filepath.as_str())
&& matches!(fm.filetype, FileType::File)
{
result_tx.send(Ok(fm)).await.map_err(|se| {
super::Error::UnableToSendDataOverChannel { source: se.into() }
})?;
let next_level_file_metadata =
source.iter_dir(path.as_str(), Some("/"), None).await;

match next_level_file_metadata {
Ok(mut next_level_file_metadata) => {
while let Some(fm) = next_level_file_metadata.next().await {
match fm {
Ok(fm) => {
// Recursively visit each sub-directory, do not increment `i` so as to keep visiting the "**" fragmemt
if matches!(fm.filetype, FileType::Directory) {
visit(
result_tx.clone(),
source.clone(),
&fm.filepath,
(glob_fragments.clone(), i),
);
}
// Return any Files that match
if glob_matcher.is_match(fm.filepath.as_str())
&& matches!(fm.filetype, FileType::File)
{
result_tx.send(Ok(fm)).await.expect("Internal multithreading channel is broken: results may be incorrect");
}
}
Err(super::Error::NotFound { .. }) => {}
Err(e) => {
result_tx.send(Err(e)).await.expect("Internal multithreading channel is broken: results may be incorrect");
}
}
}
Err(super::Error::NotFound { .. }) => {}
Err(e) => {
result_tx.send(Err(e)).await.map_err(|se| {
super::Error::UnableToSendDataOverChannel { source: se.into() }
})?;
}
}
}
Ok(())

Err(e) => result_tx.send(Err(e)).await.expect(
"Internal multithreading channel is broken: results may be incorrect",
),
};
// CASE: current_fragment contains a special character (e.g. *)
// Perform a directory listing of the just next level and run it against the (partial) glob filter
} else if contains_special_character(current_fragment) {
let mut next_level_file_metadata =
source.iter_dir(path.as_str(), Some("/"), None).await?;

let glob_matcher = GlobBuilder::new(glob_fragments[..i + 1].join("/").as_str())
.literal_separator(true)
.build()
.expect("Cannot parse glob")
.compile_matcher();

// BASE CASE: we've reached the last remaining glob fragment and these glob_matches are the final glob_matches
if i == glob_fragments.len() - 1 {
while let Some(fm) = next_level_file_metadata.next().await {
match fm {
Ok(fm) => {
if glob_matcher.is_match(fm.filepath.as_str().trim_end_matches('/'))
&& matches!(fm.filetype, FileType::File)
{
result_tx.send(Ok(fm)).await.map_err(|se| {
super::Error::UnableToSendDataOverChannel {
source: se.into(),
let next_level_file_metadata =
source.iter_dir(path.as_str(), Some("/"), None).await;

match next_level_file_metadata {
Ok(mut next_level_file_metadata) => {
let glob_matcher =
GlobBuilder::new(glob_fragments[..i + 1].join("/").as_str())
.literal_separator(true)
.build()
.expect("Cannot parse glob")
.compile_matcher();

// BASE CASE: we've reached the last remaining glob fragment and these glob_matches are the final glob_matches
if i == glob_fragments.len() - 1 {
while let Some(fm) = next_level_file_metadata.next().await {
match fm {
Ok(fm) => {
if glob_matcher
.is_match(fm.filepath.as_str().trim_end_matches('/'))
&& matches!(fm.filetype, FileType::File)
{
result_tx.send(Ok(fm)).await.expect("Internal multithreading channel is broken: results may be incorrect");
}
})?;
}
Err(super::Error::NotFound { .. }) => (),
Err(e) => {
result_tx.send(Err(e)).await.expect("Internal multithreading channel is broken: results may be incorrect");
}
}
}
Err(super::Error::NotFound { .. }) => {}
Err(e) => {
result_tx.send(Err(e)).await.map_err(|se| {
super::Error::UnableToSendDataOverChannel { source: se.into() }
})?;
}
}
}
return Ok(());
}

// RECURSIVE CASE: keep visiting recursively
while let Some(fm) = next_level_file_metadata.next().await {
match fm {
Ok(fm) => {
if matches!(fm.filetype, FileType::Directory) {
visit(
result_tx.clone(),
source.clone(),
fm.filepath.as_str(),
(glob_fragments.clone(), i + 1),
);
// RECURSIVE CASE: keep visiting recursively
while let Some(fm) = next_level_file_metadata.next().await {
match fm {
Ok(fm) => {
if matches!(fm.filetype, FileType::Directory) {
visit(
result_tx.clone(),
source.clone(),
fm.filepath.as_str(),
(glob_fragments.clone(), i + 1),
);
}
}
Err(super::Error::NotFound { .. }) => (),
Err(e) => result_tx.send(Err(e)).await.expect("Internal multithreading channel is broken: results may be incorrect"),
}
}
Err(super::Error::NotFound { .. }) => {}
Err(e) => {
return Err(e);
}
}
Err(e) => result_tx.send(Err(e)).await.expect(
"Internal multithreading channel is broken: results may be incorrect",
),
}
Ok(())

// CASE: current_fragment contains no special characters, and is a path to a specific File or Directory
// We just append it to the current `path` and check whether or not it exists.
Expand All @@ -301,20 +301,17 @@ pub(crate) async fn glob(
// We need to verify that it exists before returning.
if i == glob_fragments.len() - 1 {
let single_file_ls = source.ls(full_dir_path.as_str(), Some("/"), None).await;
return match single_file_ls {
match single_file_ls {
Ok(mut single_file_ls) => {
if single_file_ls.files.len() == 1 {
let fm = single_file_ls.files.drain(..).next().unwrap();
result_tx.send(Ok(fm)).await.map_err(|se| {
super::Error::UnableToSendDataOverChannel { source: se.into() }
})?;
result_tx.send(Ok(fm)).await.expect("Internal multithreading channel is broken: results may be incorrect");
}
Ok(())
}
Err(super::Error::NotFound { .. }) => Ok(()),
Err(e) => result_tx.send(Err(e)).await.map_err(|se| {
super::Error::UnableToSendDataOverChannel { source: se.into() }
}),
Err(super::Error::NotFound { .. }) => (),
Err(e) => result_tx.send(Err(e)).await.expect(
"Internal multithreading channel is broken: results may be incorrect",
),
};
}

Expand All @@ -325,7 +322,6 @@ pub(crate) async fn glob(
full_dir_path.as_str(),
(glob_fragments.clone(), i + 1),
);
Ok(())
}
});
}
Expand Down

0 comments on commit a1a1513

Please sign in to comment.