From 94a0e3fe4460a81b8069806437ce91f31454798f Mon Sep 17 00:00:00 2001 From: Jay Chia Date: Tue, 19 Sep 2023 23:14:43 -0700 Subject: [PATCH] [FEAT] Native Rust listing of GCS --- src/daft-io/src/google_cloud.rs | 195 +++++++++++++++++++++----------- 1 file changed, 128 insertions(+), 67 deletions(-) diff --git a/src/daft-io/src/google_cloud.rs b/src/daft-io/src/google_cloud.rs index 04a6f45d2a..478065f16f 100644 --- a/src/daft-io/src/google_cloud.rs +++ b/src/daft-io/src/google_cloud.rs @@ -9,11 +9,14 @@ use async_trait::async_trait; use google_cloud_storage::client::Client; use google_cloud_storage::http::objects::get::GetObjectRequest; +use google_cloud_storage::http::objects::list::ListObjectsRequest; use google_cloud_storage::http::Error as GError; use snafu::IntoError; use snafu::ResultExt; use snafu::Snafu; +use crate::object_io::FileMetadata; +use crate::object_io::FileType; use crate::object_io::LSResult; use crate::object_io::ObjectSource; use crate::s3_like; @@ -25,6 +28,9 @@ enum Error { #[snafu(display("Unable to open {}: {}", path, source))] UnableToOpenFile { path: String, source: GError }, + #[snafu(display("Unable to list objects: \"{}\"", path))] + UnableToListObjects { path: String, source: GError }, + #[snafu(display("Unable to read data from {}: {}", path, source))] UnableToReadBytes { path: String, source: GError }, @@ -46,44 +52,44 @@ impl From for super::Error { fn from(error: Error) -> Self { use Error::*; match error { - UnableToReadBytes { path, source } | UnableToOpenFile { path, source } => { - match source { - GError::HttpClient(err) => match err.status().map(|s| s.as_u16()) { - Some(404) | Some(410) => super::Error::NotFound { - path, - source: err.into(), - }, - Some(401) => super::Error::Unauthorized { - store: super::SourceType::GCS, - path, - source: err.into(), - }, - _ => super::Error::UnableToOpenFile { - path, - source: err.into(), - }, + UnableToReadBytes { path, source } + | UnableToOpenFile { path, source } + | UnableToListObjects { path, source } => match source { + GError::HttpClient(err) => match err.status().map(|s| s.as_u16()) { + Some(404) | Some(410) => super::Error::NotFound { + path, + source: err.into(), + }, + Some(401) => super::Error::Unauthorized { + store: super::SourceType::GCS, + path, + source: err.into(), }, - GError::Response(err) => match err.code { - 404 | 410 => super::Error::NotFound { - path, - source: err.into(), - }, - 401 => super::Error::Unauthorized { - store: super::SourceType::GCS, - path, - source: err.into(), - }, - _ => super::Error::UnableToOpenFile { - path, - source: err.into(), - }, + _ => super::Error::UnableToOpenFile { + path, + source: err.into(), }, - GError::TokenSource(err) => super::Error::UnableToLoadCredentials { + }, + GError::Response(err) => match err.code { + 404 | 410 => super::Error::NotFound { + path, + source: err.into(), + }, + 401 => super::Error::Unauthorized { store: super::SourceType::GCS, - source: err, + path, + source: err.into(), }, - } - } + _ => super::Error::UnableToOpenFile { + path, + source: err.into(), + }, + }, + GError::TokenSource(err) => super::Error::UnableToLoadCredentials { + store: super::SourceType::GCS, + source: err, + }, + }, NotAFile { path } => super::Error::NotAFile { path }, InvalidUrl { path, source } => super::Error::InvalidUrl { path, source }, UnableToLoadCredentials { source } => super::Error::UnableToLoadCredentials { @@ -99,23 +105,32 @@ enum GCSClientWrapper { S3Compat(Arc), } +fn parse_uri(uri: &url::Url) -> super::Result<(&str, &str)> { + let bucket = match uri.host_str() { + Some(s) => Ok(s), + None => Err(Error::InvalidUrl { + path: uri.to_string(), + source: url::ParseError::EmptyHost, + }), + }?; + let key = uri.path(); + + let key = if let Some(key) = key.strip_prefix('/') { + key + } else { + return Err(Error::NotAFile { + path: uri.to_string(), + } + .into()); + }; + + Ok((bucket, key)) +} + impl GCSClientWrapper { async fn get(&self, uri: &str, range: Option>) -> super::Result { - let parsed = url::Url::parse(uri).with_context(|_| InvalidUrlSnafu { path: uri })?; - let bucket = match parsed.host_str() { - Some(s) => Ok(s), - None => Err(Error::InvalidUrl { - path: uri.into(), - source: url::ParseError::EmptyHost, - }), - }?; - let key = parsed.path(); - let key = if let Some(key) = key.strip_prefix('/') { - key - } else { - return Err(Error::NotAFile { path: uri.into() }.into()); - }; - + let uri = url::Url::parse(uri).with_context(|_| InvalidUrlSnafu { path: uri })?; + let (bucket, key) = parse_uri(&uri)?; match self { GCSClientWrapper::Native(client) => { let req = GetObjectRequest { @@ -156,20 +171,8 @@ impl GCSClientWrapper { } async fn get_size(&self, uri: &str) -> super::Result { - let parsed = url::Url::parse(uri).with_context(|_| InvalidUrlSnafu { path: uri })?; - let bucket = match parsed.host_str() { - Some(s) => Ok(s), - None => Err(Error::InvalidUrl { - path: uri.into(), - source: url::ParseError::EmptyHost, - }), - }?; - let key = parsed.path(); - let key = if let Some(key) = key.strip_prefix('/') { - key - } else { - return Err(Error::NotAFile { path: uri.into() }.into()); - }; + let uri = url::Url::parse(uri).with_context(|_| InvalidUrlSnafu { path: uri })?; + let (bucket, key) = parse_uri(&uri)?; match self { GCSClientWrapper::Native(client) => { let req = GetObjectRequest { @@ -192,6 +195,64 @@ impl GCSClientWrapper { } } } + + async fn ls( + &self, + path: &str, + delimiter: Option<&str>, + continuation_token: Option<&str>, + ) -> super::Result { + let uri = url::Url::parse(path).with_context(|_| InvalidUrlSnafu { path })?; + let (bucket, key) = parse_uri(&uri)?; + match self { + GCSClientWrapper::Native(client) => { + let req = ListObjectsRequest { + bucket: bucket.to_string(), + prefix: Some(key.to_string()), + end_offset: None, + start_offset: None, + page_token: continuation_token.map(|s| s.to_string()), + delimiter: Some("/".to_string()), // returns results in "directory mode" + max_results: Some(1000), // Recommended value from API docs + include_trailing_delimiter: Some(false), // This will not populate "directories" in the response's .item[] + projection: None, + versions: None, + }; + let ls_response = client + .list_objects(&req) + .await + .context(UnableToListObjectsSnafu { path })?; + let mut files = ls_response.items.map_or(vec![], |items| { + items + .into_iter() + .map(|obj| FileMetadata { + filepath: obj.name, + size: Some(obj.size as u64), + filetype: FileType::File, + }) + .collect::>() + }); + let mut dirs = ls_response.prefixes.map_or(vec![], |prefixes| { + prefixes + .into_iter() + .map(|pref| FileMetadata { + filepath: pref, + size: None, + filetype: FileType::Directory, + }) + .collect::>() + }); + files.append(&mut dirs); + Ok(LSResult { + files, + continuation_token: ls_response.next_page_token, + }) + } + GCSClientWrapper::S3Compat(client) => { + client.ls(path, delimiter, continuation_token).await + } + } + } } pub(crate) struct GCSSource { @@ -248,10 +309,10 @@ impl ObjectSource for GCSSource { async fn ls( &self, - _path: &str, - _delimiter: Option<&str>, - _continuation_token: Option<&str>, + path: &str, + delimiter: Option<&str>, + continuation_token: Option<&str>, ) -> super::Result { - unimplemented!("gcs ls"); + self.client.ls(path, delimiter, continuation_token).await } }