Skip to content

Commit

Permalink
Add todos for prefix listing in non-s3 impls
Browse files Browse the repository at this point in the history
  • Loading branch information
Jay Chia committed Oct 2, 2023
1 parent 0ef0609 commit 8902c03
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 34 deletions.
6 changes: 5 additions & 1 deletion src/daft-io/src/azure_blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -431,11 +431,15 @@ impl ObjectSource for AzureBlobSource {
&self,
uri: &str,
delimiter: &str,
_posix: bool,
posix: bool,
_limit: Option<usize>,
) -> super::Result<BoxStream<super::Result<FileMetadata>>> {
let uri = url::Url::parse(uri).with_context(|_| InvalidUrlSnafu { path: uri })?;

if !posix {
todo!("Prefix-listing is not yet implemented for Azure");
}

// path can be root (buckets) or path prefix within a bucket.
let container = {
// "Container" is Azure's name for Bucket.
Expand Down
60 changes: 30 additions & 30 deletions src/daft-io/src/google_cloud.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,41 +245,41 @@ impl GCSClientWrapper {
let (bucket, key) = parse_uri(&uri)?;
match self {
GCSClientWrapper::Native(client) => {
if posix {
// Attempt to forcefully ls the key as a directory (by ensuring a "/" suffix)
let forced_directory_key =
format!("{}{delimiter}", key.trim_end_matches(delimiter));
let forced_directory_ls_result = self
._ls_impl(
client,
bucket,
forced_directory_key.as_str(),
delimiter,
continuation_token,
)
.await?;
if !posix {
todo!("Prefix-listing is not yet implemented for GCS");
}

// If no items were obtained, then this is actually a file and we perform a second ls to obtain just the file's
// details as the one-and-only-one entry
if forced_directory_ls_result.files.is_empty() {
let file_result = self
._ls_impl(client, bucket, key, delimiter, continuation_token)
.await?;
// Attempt to forcefully ls the key as a directory (by ensuring a "/" suffix)
let forced_directory_key =
format!("{}{delimiter}", key.trim_end_matches(delimiter));
let forced_directory_ls_result = self
._ls_impl(
client,
bucket,
forced_directory_key.as_str(),
delimiter,
continuation_token,
)
.await?;

// Not dir and not file, so it is missing
if file_result.files.is_empty() {
return Err(Error::NotFound {
path: path.to_string(),
}
.into());
}
// If no items were obtained, then this is actually a file and we perform a second ls to obtain just the file's
// details as the one-and-only-one entry
if forced_directory_ls_result.files.is_empty() {
let file_result = self
._ls_impl(client, bucket, key, delimiter, continuation_token)
.await?;

Ok(file_result)
} else {
Ok(forced_directory_ls_result)
// Not dir and not file, so it is missing
if file_result.files.is_empty() {
return Err(Error::NotFound {
path: path.to_string(),
}
.into());
}

Ok(file_result)
} else {
todo!("Need to implement prefix-listing for GCS");
Ok(forced_directory_ls_result)
}
}
GCSClientWrapper::S3Compat(client) => {
Expand Down
6 changes: 5 additions & 1 deletion src/daft-io/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,9 +226,13 @@ impl ObjectSource for HttpSource {
&self,
path: &str,
_delimiter: &str,
_posix: bool,
posix: bool,
_continuation_token: Option<&str>,
) -> super::Result<LSResult> {
if !posix {
todo!("Prefix-listing is not implemented for HTTP listing");
}

let request = self.client.get(path);
let response = request
.send()
Expand Down
6 changes: 5 additions & 1 deletion src/daft-io/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,13 @@ impl ObjectSource for LocalSource {
&self,
uri: &str,
_delimiter: &str,
_posix: bool,
posix: bool,
_limit: Option<usize>,
) -> super::Result<BoxStream<super::Result<FileMetadata>>> {
if !posix {
todo!("Prefix-listing is not implemented for local");
}

const LOCAL_PROTOCOL: &str = "file://";
let Some(uri) = uri.strip_prefix(LOCAL_PROTOCOL) else {
return Err(Error::InvalidFilePath { path: uri.into() }.into());
Expand Down
4 changes: 3 additions & 1 deletion src/daft-io/src/object_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,9 @@ pub(crate) async fn glob(
visit(
result_tx.clone(),
source.clone(),
state.clone().advance(full_dir_path, state.current_fragment_idx + 1, 1),
state
.clone()
.advance(full_dir_path, state.current_fragment_idx + 1, 1),
);
}
});
Expand Down

0 comments on commit 8902c03

Please sign in to comment.