Skip to content

Commit

Permalink
[FEAT] Native Rust listing of GCS
Browse files Browse the repository at this point in the history
  • Loading branch information
Jay Chia committed Sep 20, 2023
1 parent 1efeb33 commit 94a0e3f
Showing 1 changed file with 128 additions and 67 deletions.
195 changes: 128 additions & 67 deletions src/daft-io/src/google_cloud.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,14 @@ use async_trait::async_trait;
use google_cloud_storage::client::Client;
use google_cloud_storage::http::objects::get::GetObjectRequest;

use google_cloud_storage::http::objects::list::ListObjectsRequest;
use google_cloud_storage::http::Error as GError;
use snafu::IntoError;
use snafu::ResultExt;
use snafu::Snafu;

use crate::object_io::FileMetadata;
use crate::object_io::FileType;
use crate::object_io::LSResult;
use crate::object_io::ObjectSource;
use crate::s3_like;
Expand All @@ -25,6 +28,9 @@ enum Error {
#[snafu(display("Unable to open {}: {}", path, source))]
UnableToOpenFile { path: String, source: GError },

#[snafu(display("Unable to list objects: \"{}\"", path))]
UnableToListObjects { path: String, source: GError },

#[snafu(display("Unable to read data from {}: {}", path, source))]
UnableToReadBytes { path: String, source: GError },

Expand All @@ -46,44 +52,44 @@ impl From<Error> for super::Error {
fn from(error: Error) -> Self {
use Error::*;
match error {
UnableToReadBytes { path, source } | UnableToOpenFile { path, source } => {
match source {
GError::HttpClient(err) => match err.status().map(|s| s.as_u16()) {
Some(404) | Some(410) => super::Error::NotFound {
path,
source: err.into(),
},
Some(401) => super::Error::Unauthorized {
store: super::SourceType::GCS,
path,
source: err.into(),
},
_ => super::Error::UnableToOpenFile {
path,
source: err.into(),
},
UnableToReadBytes { path, source }
| UnableToOpenFile { path, source }
| UnableToListObjects { path, source } => match source {
GError::HttpClient(err) => match err.status().map(|s| s.as_u16()) {
Some(404) | Some(410) => super::Error::NotFound {
path,
source: err.into(),
},
Some(401) => super::Error::Unauthorized {
store: super::SourceType::GCS,
path,
source: err.into(),
},
GError::Response(err) => match err.code {
404 | 410 => super::Error::NotFound {
path,
source: err.into(),
},
401 => super::Error::Unauthorized {
store: super::SourceType::GCS,
path,
source: err.into(),
},
_ => super::Error::UnableToOpenFile {
path,
source: err.into(),
},
_ => super::Error::UnableToOpenFile {
path,
source: err.into(),
},
GError::TokenSource(err) => super::Error::UnableToLoadCredentials {
},
GError::Response(err) => match err.code {
404 | 410 => super::Error::NotFound {
path,
source: err.into(),
},
401 => super::Error::Unauthorized {
store: super::SourceType::GCS,
source: err,
path,
source: err.into(),
},
}
}
_ => super::Error::UnableToOpenFile {
path,
source: err.into(),
},
},
GError::TokenSource(err) => super::Error::UnableToLoadCredentials {
store: super::SourceType::GCS,
source: err,
},
},
NotAFile { path } => super::Error::NotAFile { path },
InvalidUrl { path, source } => super::Error::InvalidUrl { path, source },
UnableToLoadCredentials { source } => super::Error::UnableToLoadCredentials {
Expand All @@ -99,23 +105,32 @@ enum GCSClientWrapper {
S3Compat(Arc<s3_like::S3LikeSource>),
}

fn parse_uri(uri: &url::Url) -> super::Result<(&str, &str)> {
let bucket = match uri.host_str() {
Some(s) => Ok(s),
None => Err(Error::InvalidUrl {
path: uri.to_string(),
source: url::ParseError::EmptyHost,
}),
}?;
let key = uri.path();

let key = if let Some(key) = key.strip_prefix('/') {
key
} else {
return Err(Error::NotAFile {
path: uri.to_string(),
}
.into());
};

Ok((bucket, key))
}

impl GCSClientWrapper {
async fn get(&self, uri: &str, range: Option<Range<usize>>) -> super::Result<GetResult> {
let parsed = url::Url::parse(uri).with_context(|_| InvalidUrlSnafu { path: uri })?;
let bucket = match parsed.host_str() {
Some(s) => Ok(s),
None => Err(Error::InvalidUrl {
path: uri.into(),
source: url::ParseError::EmptyHost,
}),
}?;
let key = parsed.path();
let key = if let Some(key) = key.strip_prefix('/') {
key
} else {
return Err(Error::NotAFile { path: uri.into() }.into());
};

let uri = url::Url::parse(uri).with_context(|_| InvalidUrlSnafu { path: uri })?;
let (bucket, key) = parse_uri(&uri)?;
match self {
GCSClientWrapper::Native(client) => {
let req = GetObjectRequest {
Expand Down Expand Up @@ -156,20 +171,8 @@ impl GCSClientWrapper {
}

async fn get_size(&self, uri: &str) -> super::Result<usize> {
let parsed = url::Url::parse(uri).with_context(|_| InvalidUrlSnafu { path: uri })?;
let bucket = match parsed.host_str() {
Some(s) => Ok(s),
None => Err(Error::InvalidUrl {
path: uri.into(),
source: url::ParseError::EmptyHost,
}),
}?;
let key = parsed.path();
let key = if let Some(key) = key.strip_prefix('/') {
key
} else {
return Err(Error::NotAFile { path: uri.into() }.into());
};
let uri = url::Url::parse(uri).with_context(|_| InvalidUrlSnafu { path: uri })?;
let (bucket, key) = parse_uri(&uri)?;
match self {
GCSClientWrapper::Native(client) => {
let req = GetObjectRequest {
Expand All @@ -192,6 +195,64 @@ impl GCSClientWrapper {
}
}
}

async fn ls(
&self,
path: &str,
delimiter: Option<&str>,
continuation_token: Option<&str>,
) -> super::Result<LSResult> {
let uri = url::Url::parse(path).with_context(|_| InvalidUrlSnafu { path })?;
let (bucket, key) = parse_uri(&uri)?;
match self {
GCSClientWrapper::Native(client) => {
let req = ListObjectsRequest {
bucket: bucket.to_string(),
prefix: Some(key.to_string()),
end_offset: None,
start_offset: None,
page_token: continuation_token.map(|s| s.to_string()),
delimiter: Some("/".to_string()), // returns results in "directory mode"
max_results: Some(1000), // Recommended value from API docs
include_trailing_delimiter: Some(false), // This will not populate "directories" in the response's .item[]
projection: None,
versions: None,
};
let ls_response = client
.list_objects(&req)
.await
.context(UnableToListObjectsSnafu { path })?;
let mut files = ls_response.items.map_or(vec![], |items| {
items
.into_iter()
.map(|obj| FileMetadata {
filepath: obj.name,
size: Some(obj.size as u64),
filetype: FileType::File,
})
.collect::<Vec<_>>()
});
let mut dirs = ls_response.prefixes.map_or(vec![], |prefixes| {
prefixes
.into_iter()
.map(|pref| FileMetadata {
filepath: pref,
size: None,
filetype: FileType::Directory,
})
.collect::<Vec<_>>()
});
files.append(&mut dirs);
Ok(LSResult {
files,
continuation_token: ls_response.next_page_token,
})
}
GCSClientWrapper::S3Compat(client) => {
client.ls(path, delimiter, continuation_token).await
}
}
}
}

pub(crate) struct GCSSource {
Expand Down Expand Up @@ -248,10 +309,10 @@ impl ObjectSource for GCSSource {

async fn ls(
&self,
_path: &str,
_delimiter: Option<&str>,
_continuation_token: Option<&str>,
path: &str,
delimiter: Option<&str>,
continuation_token: Option<&str>,
) -> super::Result<LSResult> {
unimplemented!("gcs ls");
self.client.ls(path, delimiter, continuation_token).await
}
}

0 comments on commit 94a0e3f

Please sign in to comment.