diff --git a/object_store/src/client/header.rs b/object_store/src/client/header.rs index 9ce5db4339d1..79cdd2c16683 100644 --- a/object_store/src/client/header.rs +++ b/object_store/src/client/header.rs @@ -119,6 +119,7 @@ pub fn header_meta( let content_length = headers .get(CONTENT_LENGTH) + .or_else(|| headers.get("x-goog-stored-content-length")) .context(MissingContentLengthSnafu)?; let content_length = content_length.to_str().context(BadHeaderSnafu)?; diff --git a/object_store/src/client/mod.rs b/object_store/src/client/mod.rs index 43fd65892c2c..4a0b598410c6 100644 --- a/object_store/src/client/mod.rs +++ b/object_store/src/client/mod.rs @@ -684,7 +684,7 @@ mod cloud { } /// Override the minimum remaining TTL for a cached token to be used - #[cfg(feature = "aws")] + #[cfg(any(feature = "aws", feature = "gcp"))] pub fn with_min_ttl(mut self, min_ttl: Duration) -> Self { self.cache = self.cache.with_min_ttl(min_ttl); self diff --git a/object_store/src/client/token.rs b/object_store/src/client/token.rs index 7a3c8079449b..59a12793458c 100644 --- a/object_store/src/client/token.rs +++ b/object_store/src/client/token.rs @@ -33,8 +33,9 @@ pub struct TemporaryToken { /// [`TemporaryToken`] based on its expiry #[derive(Debug)] pub struct TokenCache { - cache: Mutex>>, + cache: Mutex, Instant)>>, min_ttl: Duration, + fetch_backoff: Duration, } impl Default for TokenCache { @@ -42,13 +43,16 @@ impl Default for TokenCache { Self { cache: Default::default(), min_ttl: Duration::from_secs(300), + // How long to wait before re-attempting a token fetch after receiving one that + // is still within the min-ttl + fetch_backoff: Duration::from_millis(100), } } } impl TokenCache { /// Override the minimum remaining TTL for a cached token to be used - #[cfg(feature = "aws")] + #[cfg(any(feature = "aws", feature = "gcp"))] pub fn with_min_ttl(self, min_ttl: Duration) -> Self { Self { min_ttl, ..self } } @@ -61,19 +65,24 @@ impl TokenCache { let now = Instant::now(); let mut locked = self.cache.lock().await; - if let Some(cached) = locked.as_ref() { + if let Some((cached, fetched_at)) = locked.as_ref() { match cached.expiry { - Some(ttl) if ttl.checked_duration_since(now).unwrap_or_default() > self.min_ttl => { - return Ok(cached.token.clone()); + Some(ttl) => { + if ttl.checked_duration_since(now).unwrap_or_default() > self.min_ttl || + // if we've recently attempted to fetch this token and it's not actually + // expired, we'll wait to re-fetch it and return the cached one + (fetched_at.elapsed() < self.fetch_backoff && ttl.checked_duration_since(now).is_some()) + { + return Ok(cached.token.clone()); + } } None => return Ok(cached.token.clone()), - _ => (), } } let cached = f().await?; let token = cached.token.clone(); - *locked = Some(cached); + *locked = Some((cached, Instant::now())); Ok(token) } diff --git a/object_store/src/gcp/builder.rs b/object_store/src/gcp/builder.rs index 82dab14437d7..4f8dbadcd18f 100644 --- a/object_store/src/gcp/builder.rs +++ b/object_store/src/gcp/builder.rs @@ -30,10 +30,13 @@ use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt, Snafu}; use std::str::FromStr; use std::sync::Arc; +use std::time::Duration; use url::Url; use super::credential::{AuthorizedUserSigningCredentials, InstanceSigningCredentialProvider}; +const TOKEN_MIN_TTL: Duration = Duration::from_secs(4 * 60); + #[derive(Debug, Snafu)] enum Error { #[snafu(display("Missing bucket name"))] @@ -463,13 +466,14 @@ impl GoogleCloudStorageBuilder { )) as _ } else if let Some(credentials) = application_default_credentials.clone() { match credentials { - ApplicationDefaultCredentials::AuthorizedUser(token) => { - Arc::new(TokenCredentialProvider::new( + ApplicationDefaultCredentials::AuthorizedUser(token) => Arc::new( + TokenCredentialProvider::new( token, self.client_options.client()?, self.retry_config.clone(), - )) as _ - } + ) + .with_min_ttl(TOKEN_MIN_TTL), + ) as _, ApplicationDefaultCredentials::ServiceAccount(token) => { Arc::new(TokenCredentialProvider::new( token.token_provider()?, @@ -479,11 +483,14 @@ impl GoogleCloudStorageBuilder { } } } else { - Arc::new(TokenCredentialProvider::new( - InstanceCredentialProvider::default(), - self.client_options.metadata_client()?, - self.retry_config.clone(), - )) as _ + Arc::new( + TokenCredentialProvider::new( + InstanceCredentialProvider::default(), + self.client_options.metadata_client()?, + self.retry_config.clone(), + ) + .with_min_ttl(TOKEN_MIN_TTL), + ) as _ }; let signing_credentials = if let Some(signing_credentials) = self.signing_credentials { diff --git a/object_store/src/http/client.rs b/object_store/src/http/client.rs index 5def93120a68..74091979c6c8 100644 --- a/object_store/src/http/client.rs +++ b/object_store/src/http/client.rs @@ -109,7 +109,7 @@ impl Client { fn path_url(&self, location: &Path) -> Url { let mut url = self.url.clone(); - url.path_segments_mut().unwrap().extend(location.parts()); + url.path_segments_mut().unwrap().pop_if_empty().extend(location.parts()); url }