From 0c469d8603cefe635d56595d65f6606bdb8f1e86 Mon Sep 17 00:00:00 2001 From: Sammy Sidhu Date: Thu, 30 Nov 2023 12:19:46 -0800 Subject: [PATCH] drop s3 compat mode for gcs --- Cargo.lock | 17 +- src/daft-io/Cargo.toml | 10 +- src/daft-io/src/google_cloud.rs | 276 ++++++++++++++------------------ 3 files changed, 136 insertions(+), 167 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 672fec9c08..30b698ed21 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1732,8 +1732,7 @@ dependencies = [ [[package]] name = "google-cloud-auth" version = "0.13.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "af1087f1fbd2dd3f58c17c7574ddd99cd61cbbbc2c4dc81114b8687209b196cb" +source = "git+https://github.com/yoshidan/google-cloud-rust?rev=6b067f3d14bd109af3705988e5cb28f620419d55#6b067f3d14bd109af3705988e5cb28f620419d55" dependencies = [ "async-trait", "base64 0.21.5", @@ -1754,8 +1753,7 @@ dependencies = [ [[package]] name = "google-cloud-metadata" version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cc279bfb50487d7bcd900e8688406475fc750fe474a835b2ab9ade9eb1fc90e2" +source = "git+https://github.com/yoshidan/google-cloud-rust?rev=6b067f3d14bd109af3705988e5cb28f620419d55#6b067f3d14bd109af3705988e5cb28f620419d55" dependencies = [ "reqwest", "thiserror", @@ -1764,11 +1762,11 @@ dependencies = [ [[package]] name = "google-cloud-storage" -version = "0.14.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bea96059b656bc5f3332865c02cadc532238b269bd29b2fde64d8ecb878d1b13" +version = "0.15.0" +source = "git+https://github.com/yoshidan/google-cloud-rust?rev=6b067f3d14bd109af3705988e5cb28f620419d55#6b067f3d14bd109af3705988e5cb28f620419d55" dependencies = [ "async-stream", + "async-trait", "base64 0.21.5", "bytes", "futures-util", @@ -1794,9 +1792,8 @@ dependencies = [ [[package]] name = "google-cloud-token" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0fcd62eb34e3de2f085bcc33a09c3e17c4f65650f36d53eb328b00d63bcb536a" +version = "0.1.2" +source = "git+https://github.com/yoshidan/google-cloud-rust?rev=6b067f3d14bd109af3705988e5cb28f620419d55#6b067f3d14bd109af3705988e5cb28f620419d55" dependencies = [ "async-trait", ] diff --git a/src/daft-io/Cargo.toml b/src/daft-io/Cargo.toml index fe6bfdfc07..a06bfabf67 100644 --- a/src/daft-io/Cargo.toml +++ b/src/daft-io/Cargo.toml @@ -17,7 +17,6 @@ common-io-config = {path = "../common/io-config", default-features = false} daft-core = {path = "../daft-core", default-features = false} futures = {workspace = true} globset = "0.4" -google-cloud-storage = {version = "0.14.0", default-features = false, features = ["default-tls", "auth"]} hyper = "0.14.27" hyper-tls = "0.5.0" itertools = {workspace = true} @@ -35,6 +34,15 @@ tokio = {workspace = true} tokio-stream = {workspace = true} url = {workspace = true} +[dependencies.google-cloud-storage] +default-features = false +features = ["default-tls", "auth"] +# branch = "anonymouse_storage_access" +git = "https://github.com/yoshidan/google-cloud-rust" +package = "google-cloud-storage" +rev = "6b067f3d14bd109af3705988e5cb28f620419d55" +version = "0.15.0" + [dependencies.reqwest] default-features = false features = ["stream", "native-tls"] diff --git a/src/daft-io/src/google_cloud.rs b/src/daft-io/src/google_cloud.rs index 2abc62c9c7..34cdde825f 100644 --- a/src/daft-io/src/google_cloud.rs +++ b/src/daft-io/src/google_cloud.rs @@ -19,7 +19,6 @@ use crate::object_io::FileMetadata; use crate::object_io::FileType; use crate::object_io::LSResult; use crate::object_io::ObjectSource; -use crate::s3_like; use crate::stats::IOStatsRef; use crate::stream_utils::io_stats_on_bytestream; use crate::GetResult; @@ -109,10 +108,7 @@ impl From for super::Error { } } -enum GCSClientWrapper { - Native(Client), - S3Compat(Arc), -} +struct GCSClientWrapper(Client); fn parse_uri(uri: &url::Url) -> super::Result<(&str, &str)> { let bucket = match uri.host_str() { @@ -136,79 +132,65 @@ impl GCSClientWrapper { ) -> super::Result { let uri = url::Url::parse(uri).with_context(|_| InvalidUrlSnafu { path: uri })?; let (bucket, key) = parse_uri(&uri)?; - match self { - GCSClientWrapper::Native(client) => { - let req = GetObjectRequest { - bucket: bucket.into(), - object: key.into(), - ..Default::default() - }; - use google_cloud_storage::http::objects::download::Range as GRange; - let (grange, size) = if let Some(range) = range { - ( - GRange(Some(range.start as u64), Some(range.end as u64)), - Some(range.len()), - ) - } else { - (GRange::default(), None) - }; - let owned_uri = uri.to_string(); - let response = client - .download_streamed_object(&req, &grange) - .await - .context(UnableToOpenFileSnafu { - path: uri.to_string(), - })?; - let response = response.map_err(move |e| { - UnableToReadBytesSnafu:: { - path: owned_uri.clone(), - } - .into_error(e) - .into() - }); - if let Some(is) = io_stats.as_ref() { - is.mark_get_requests(1) - } - Ok(GetResult::Stream( - io_stats_on_bytestream(response, io_stats), - size, - None, - )) - } - GCSClientWrapper::S3Compat(client) => { - let uri = format!("s3://{}/{}", bucket, key); - client.get(&uri, range, io_stats).await + let client = &self.0; + let req = GetObjectRequest { + bucket: bucket.into(), + object: key.into(), + ..Default::default() + }; + use google_cloud_storage::http::objects::download::Range as GRange; + let (grange, size) = if let Some(range) = range { + ( + GRange(Some(range.start as u64), Some(range.end as u64)), + Some(range.len()), + ) + } else { + (GRange::default(), None) + }; + let owned_uri = uri.to_string(); + let response = client + .download_streamed_object(&req, &grange) + .await + .context(UnableToOpenFileSnafu { + path: uri.to_string(), + })?; + let response = response.map_err(move |e| { + UnableToReadBytesSnafu:: { + path: owned_uri.clone(), } + .into_error(e) + .into() + }); + if let Some(is) = io_stats.as_ref() { + is.mark_get_requests(1) } + Ok(GetResult::Stream( + io_stats_on_bytestream(response, io_stats), + size, + None, + )) } async fn get_size(&self, uri: &str, io_stats: Option) -> super::Result { let uri = url::Url::parse(uri).with_context(|_| InvalidUrlSnafu { path: uri })?; let (bucket, key) = parse_uri(&uri)?; - match self { - GCSClientWrapper::Native(client) => { - let req = GetObjectRequest { - bucket: bucket.into(), - object: key.into(), - ..Default::default() - }; - - let response = client - .get_object(&req) - .await - .context(UnableToOpenFileSnafu { - path: uri.to_string(), - })?; - if let Some(is) = io_stats.as_ref() { - is.mark_head_requests(1) - } - Ok(response.size as usize) - } - GCSClientWrapper::S3Compat(client) => { - let uri = format!("s3://{}/{}", bucket, key); - client.get_size(&uri, io_stats).await - } + let client = &self.0; + let req = GetObjectRequest { + bucket: bucket.into(), + object: key.into(), + ..Default::default() + }; + + let response = client + .get_object(&req) + .await + .context(UnableToOpenFileSnafu { + path: uri.to_string(), + })?; + if let Some(is) = io_stats.as_ref() { + is.mark_head_requests(1) } + Ok(response.size as usize) } #[allow(clippy::too_many_arguments)] async fn _ls_impl( @@ -271,76 +253,69 @@ impl GCSClientWrapper { ) -> super::Result { let uri = url::Url::parse(path).with_context(|_| InvalidUrlSnafu { path })?; let (bucket, key) = parse_uri(&uri)?; - match self { - GCSClientWrapper::Native(client) => { - if posix { - // Attempt to forcefully ls the key as a directory (by ensuring a "/" suffix) - let forced_directory_key = if key.is_empty() { - "".to_string() - } else { - format!("{}{GCS_DELIMITER}", key.trim_end_matches(GCS_DELIMITER)) - }; - let forced_directory_ls_result = self - ._ls_impl( - client, - bucket, - forced_directory_key.as_str(), - Some(GCS_DELIMITER), - continuation_token, - page_size, - io_stats.as_ref(), - ) - .await?; - - // If no items were obtained, then this is actually a file and we perform a second ls to obtain just the file's - // details as the one-and-only-one entry - if forced_directory_ls_result.files.is_empty() { - let mut file_result = self - ._ls_impl( - client, - bucket, - key, - Some(GCS_DELIMITER), - continuation_token, - page_size, - io_stats.as_ref(), - ) - .await?; - - // Only retain exact matches (since the API does prefix lists by default) - let target_path = format!("{GCS_SCHEME}://{bucket}/{key}"); - file_result.files.retain(|fm| fm.filepath == target_path); - - // Not dir and not file, so it is missing - if file_result.files.is_empty() { - return Err(Error::NotFound { - path: path.to_string(), - } - .into()); - } - - Ok(file_result) - } else { - Ok(forced_directory_ls_result) - } - } else { - self._ls_impl( + let client = &self.0; + + if posix { + // Attempt to forcefully ls the key as a directory (by ensuring a "/" suffix) + let forced_directory_key = if key.is_empty() { + "".to_string() + } else { + format!("{}{GCS_DELIMITER}", key.trim_end_matches(GCS_DELIMITER)) + }; + let forced_directory_ls_result = self + ._ls_impl( + client, + bucket, + forced_directory_key.as_str(), + Some(GCS_DELIMITER), + continuation_token, + page_size, + io_stats.as_ref(), + ) + .await?; + + // If no items were obtained, then this is actually a file and we perform a second ls to obtain just the file's + // details as the one-and-only-one entry + if forced_directory_ls_result.files.is_empty() { + let mut file_result = self + ._ls_impl( client, bucket, key, - None, // Force a prefix-listing + Some(GCS_DELIMITER), continuation_token, page_size, io_stats.as_ref(), ) - .await + .await?; + + // Only retain exact matches (since the API does prefix lists by default) + let target_path = format!("{GCS_SCHEME}://{bucket}/{key}"); + file_result.files.retain(|fm| fm.filepath == target_path); + + // Not dir and not file, so it is missing + if file_result.files.is_empty() { + return Err(Error::NotFound { + path: path.to_string(), + } + .into()); } + + Ok(file_result) + } else { + Ok(forced_directory_ls_result) } - GCSClientWrapper::S3Compat(client) => { - client - .ls(path, posix, continuation_token, page_size, io_stats) - .await - } + } else { + self._ls_impl( + client, + bucket, + key, + None, // Force a prefix-listing + continuation_token, + page_size, + io_stats.as_ref(), + ) + .await } } } @@ -350,40 +325,29 @@ pub(crate) struct GCSSource { } impl GCSSource { - async fn build_s3_compat_client() -> super::Result> { - let s3_config = common_io_config::S3Config { - anonymous: true, - endpoint_url: Some("https://storage.googleapis.com".to_string()), - ..Default::default() - }; - let s3_client = s3_like::S3LikeSource::get_client(&s3_config).await?; - Ok(GCSSource { - client: GCSClientWrapper::S3Compat(s3_client), - } - .into()) - } pub async fn get_client(config: &GCSConfig) -> super::Result> { - if config.anonymous { - GCSSource::build_s3_compat_client().await - } else { - let config = ClientConfig::default() + let config = if !config.anonymous { + let attempted = ClientConfig::default() .with_auth() .await .context(UnableToLoadCredentialsSnafu {}); - match config { - Ok(config) => { - let client = Client::new(config); - Ok(GCSSource { - client: GCSClientWrapper::Native(client), - } - .into()) - } + + match attempted { + Ok(attempt) => attempt, Err(err) => { log::warn!("Google Cloud Storage Credentials not provided or found when making client. Reverting to Anonymous mode.\nDetails\n{err}"); - GCSSource::build_s3_compat_client().await + ClientConfig::default().anonymous() } } + } else { + ClientConfig::default().anonymous() + }; + + let client = Client::new(config); + Ok(GCSSource { + client: GCSClientWrapper(client), } + .into()) } }