Skip to content

Commit

Permalink
[PERF] Harden GCP Retries (#3253)
Browse files Browse the repository at this point in the history
* Introduces retries with exponential backoffs for GCS (default 5)
* Introduces connection and read timeouts (default 30 seconds)
* Introduces maximum connections for GCS (default 8/thread or 64)
* introduces idle connection clean up (max of 70)
  • Loading branch information
samster25 authored Nov 9, 2024
1 parent e27e2f5 commit 84e34d0
Show file tree
Hide file tree
Showing 10 changed files with 528 additions and 62 deletions.
256 changes: 233 additions & 23 deletions Cargo.lock

Large diffs are not rendered by default.

15 changes: 15 additions & 0 deletions daft/daft/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -600,20 +600,35 @@ class GCSConfig:
credentials: str | None
token: str | None
anonymous: bool
max_connections: int
retry_initial_backoff_ms: int
connect_timeout_ms: int
read_timeout_ms: int
num_tries: int

def __init__(
self,
project_id: str | None = None,
credentials: str | None = None,
token: str | None = None,
anonymous: bool | None = None,
max_connections: int | None = None,
retry_initial_backoff_ms: int | None = None,
connect_timeout_ms: int | None = None,
read_timeout_ms: int | None = None,
num_tries: int | None = None,
): ...
def replace(
self,
project_id: str | None = None,
credentials: str | None = None,
token: str | None = None,
anonymous: bool | None = None,
max_connections: int | None = None,
retry_initial_backoff_ms: int | None = None,
connect_timeout_ms: int | None = None,
read_timeout_ms: int | None = None,
num_tries: int | None = None,
) -> GCSConfig:
"""Replaces values if provided, returning a new GCSConfig"""
...
Expand Down
1 change: 1 addition & 0 deletions src/common/io-config/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ aws-credential-types = {version = "0.55.3"}
chrono = {workspace = true}
common-error = {path = "../error", default-features = false}
common-py-serde = {path = "../py-serde", default-features = false}
derive_more = {workspace = true}
pyo3 = {workspace = true, optional = true}
secrecy = {version = "0.8.0", features = ["alloc"], default-features = false}
serde = {workspace = true}
Expand Down
59 changes: 44 additions & 15 deletions src/common/io-config/src/gcs.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,45 @@
use std::fmt::{Display, Formatter};

use derive_more::Display;
use serde::{Deserialize, Serialize};

use crate::ObfuscatedString;

#[derive(Clone, Default, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)]
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Hash, Display)]
#[display(
"GCSConfig
project_id: {project_id:?}
anonymous: {anonymous}
max_connections_per_io_thread: {max_connections_per_io_thread}
retry_initial_backoff_ms: {retry_initial_backoff_ms}
connect_timeout_ms: {connect_timeout_ms}
read_timeout_ms: {read_timeout_ms}
num_tries: {num_tries}"
)]
pub struct GCSConfig {
pub project_id: Option<String>,
pub credentials: Option<ObfuscatedString>,
pub token: Option<String>,
pub anonymous: bool,
pub max_connections_per_io_thread: u32,
pub retry_initial_backoff_ms: u64,
pub connect_timeout_ms: u64,
pub read_timeout_ms: u64,
pub num_tries: u32,
}

impl Default for GCSConfig {
fn default() -> Self {
Self {
project_id: None,
credentials: None,
token: None,
anonymous: false,
max_connections_per_io_thread: 8,
retry_initial_backoff_ms: 1000,
connect_timeout_ms: 30_000,
read_timeout_ms: 30_000,
num_tries: 5,
}
}
}

impl GCSConfig {
Expand All @@ -20,18 +50,17 @@ impl GCSConfig {
res.push(format!("Project ID = {project_id}"));
}
res.push(format!("Anonymous = {}", self.anonymous));
res.push(format!(
"Max connections = {}",
self.max_connections_per_io_thread
));
res.push(format!(
"Retry initial backoff ms = {}",
self.retry_initial_backoff_ms
));
res.push(format!("Connect timeout ms = {}", self.connect_timeout_ms));
res.push(format!("Read timeout ms = {}", self.read_timeout_ms));
res.push(format!("Max retries = {}", self.num_tries));
res
}
}

impl Display for GCSConfig {
fn fmt(&self, f: &mut Formatter<'_>) -> std::result::Result<(), std::fmt::Error> {
write!(
f,
"GCSConfig
project_id: {:?}
anonymous: {:?}",
self.project_id, self.anonymous
)
}
}
64 changes: 59 additions & 5 deletions src/common/io-config/src/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ use crate::{config, s3::S3CredentialsProvider};
/// access_key (str, optional): AWS Secret Access Key, defaults to auto-detection from the current environment
/// credentials_provider (Callable[[], S3Credentials], optional): Custom credentials provider function, should return a `S3Credentials` object
/// buffer_time (int, optional): Amount of time in seconds before the actual credential expiration time where credentials given by `credentials_provider` are considered expired, defaults to 10s
/// max_connections (int, optional): Maximum number of connections to S3 at any time, defaults to 64
/// max_connections (int, optional): Maximum number of connections to S3 at any time per io thread, defaults to 8
/// session_token (str, optional): AWS Session Token, required only if `key_id` and `access_key` are temporary credentials
/// retry_initial_backoff_ms (int, optional): Initial backoff duration in milliseconds for an S3 retry, defaults to 1000ms
/// connect_timeout_ms (int, optional): Timeout duration to wait to make a connection to S3 in milliseconds, defaults to 10 seconds
/// read_timeout_ms (int, optional): Timeout duration to wait to read the first byte from S3 in milliseconds, defaults to 10 seconds
/// num_tries (int, optional): Number of attempts to make a connection, defaults to 5
/// connect_timeout_ms (int, optional): Timeout duration to wait to make a connection to S3 in milliseconds, defaults to 30 seconds
/// read_timeout_ms (int, optional): Timeout duration to wait to read the first byte from S3 in milliseconds, defaults to 30 seconds
/// num_tries (int, optional): Number of attempts to make a connection, defaults to 25
/// retry_mode (str, optional): Retry Mode when a request fails, current supported values are `standard` and `adaptive`, defaults to `adaptive`
/// anonymous (bool, optional): Whether or not to use "anonymous mode", which will access S3 without any credentials
/// use_ssl (bool, optional): Whether or not to use SSL, which require accessing S3 over HTTPS rather than HTTP, defaults to True
Expand Down Expand Up @@ -107,6 +107,11 @@ pub struct AzureConfig {
/// credentials (str, optional): Path to credentials file or JSON string with credentials
/// token (str, optional): OAuth2 token to use for authentication. You likely want to use `credentials` instead, since it can be used to refresh the token. This value is used when vended by a data catalog.
/// anonymous (bool, optional): Whether or not to use "anonymous mode", which will access Google Storage without any credentials. Defaults to false
/// max_connections (int, optional): Maximum number of connections to GCS at any time per io thread, defaults to 8
/// retry_initial_backoff_ms (int, optional): Initial backoff duration in milliseconds for an GCS retry, defaults to 1000ms
/// connect_timeout_ms (int, optional): Timeout duration to wait to make a connection to GCS in milliseconds, defaults to 30 seconds
/// read_timeout_ms (int, optional): Timeout duration to wait to read the first byte from GCS in milliseconds, defaults to 30 seconds
/// num_tries (int, optional): Number of attempts to make a connection, defaults to 5
///
/// Example:
/// >>> io_config = IOConfig(gcs=GCSConfig(anonymous=True))
Expand Down Expand Up @@ -848,6 +853,11 @@ impl GCSConfig {
credentials: Option<String>,
token: Option<String>,
anonymous: Option<bool>,
max_connections: Option<u32>,
retry_initial_backoff_ms: Option<u64>,
connect_timeout_ms: Option<u64>,
read_timeout_ms: Option<u64>,
num_tries: Option<u32>,
) -> Self {
let def = crate::GCSConfig::default();
Self {
Expand All @@ -858,17 +868,29 @@ impl GCSConfig {
.or(def.credentials),
token: token.or(def.token),
anonymous: anonymous.unwrap_or(def.anonymous),
max_connections_per_io_thread: max_connections
.unwrap_or(def.max_connections_per_io_thread),
retry_initial_backoff_ms: retry_initial_backoff_ms
.unwrap_or(def.retry_initial_backoff_ms),
connect_timeout_ms: connect_timeout_ms.unwrap_or(def.connect_timeout_ms),
read_timeout_ms: read_timeout_ms.unwrap_or(def.read_timeout_ms),
num_tries: num_tries.unwrap_or(def.num_tries),
},
}
}

#[allow(clippy::too_many_arguments)]
#[must_use]
pub fn replace(
&self,
project_id: Option<String>,
credentials: Option<String>,
token: Option<String>,
anonymous: Option<bool>,
max_connections: Option<u32>,
retry_initial_backoff_ms: Option<u64>,
connect_timeout_ms: Option<u64>,
read_timeout_ms: Option<u64>,
num_tries: Option<u32>,
) -> Self {
Self {
config: crate::GCSConfig {
Expand All @@ -878,6 +900,13 @@ impl GCSConfig {
.or_else(|| self.config.credentials.clone()),
token: token.or_else(|| self.config.token.clone()),
anonymous: anonymous.unwrap_or(self.config.anonymous),
max_connections_per_io_thread: max_connections
.unwrap_or(self.config.max_connections_per_io_thread),
retry_initial_backoff_ms: retry_initial_backoff_ms
.unwrap_or(self.config.retry_initial_backoff_ms),
connect_timeout_ms: connect_timeout_ms.unwrap_or(self.config.connect_timeout_ms),
read_timeout_ms: read_timeout_ms.unwrap_or(self.config.read_timeout_ms),
num_tries: num_tries.unwrap_or(self.config.num_tries),
},
}
}
Expand Down Expand Up @@ -913,6 +942,31 @@ impl GCSConfig {
pub fn anonymous(&self) -> PyResult<bool> {
Ok(self.config.anonymous)
}

#[getter]
pub fn max_connections(&self) -> PyResult<u32> {
Ok(self.config.max_connections_per_io_thread)
}

#[getter]
pub fn retry_initial_backoff_ms(&self) -> PyResult<u64> {
Ok(self.config.retry_initial_backoff_ms)
}

#[getter]
pub fn connect_timeout_ms(&self) -> PyResult<u64> {
Ok(self.config.connect_timeout_ms)
}

#[getter]
pub fn read_timeout_ms(&self) -> PyResult<u64> {
Ok(self.config.read_timeout_ms)
}

#[getter]
pub fn num_tries(&self) -> PyResult<u32> {
Ok(self.config.num_tries)
}
}

impl From<config::IOConfig> for IOConfig {
Expand Down
6 changes: 5 additions & 1 deletion src/daft-io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ common-runtime = {path = "../common/runtime", default-features = false}
derive_builder = {workspace = true}
futures = {workspace = true}
globset = "0.4"
google-cloud-storage = {version = "0.15.0", default-features = false, features = ["default-tls", "auth"]}
google-cloud-storage = {version = "0.22.1", default-features = false, features = ["default-tls", "auth"]}
google-cloud-token = {version = "0.1.2"}
home = "0.5.9"
hyper = "0.14.27"
Expand All @@ -32,10 +32,14 @@ openssl-sys = {version = "0.9.102", features = ["vendored"]}
pyo3 = {workspace = true, optional = true}
rand = "0.8.5"
regex = {version = "1.10.4"}
reqwest-middleware = "0.3.3"
reqwest-retry = "0.6.1"
retry-policies = "0.4.0"
serde = {workspace = true}
snafu = {workspace = true}
tokio = {workspace = true}
tokio-stream = {workspace = true}
tracing = {workspace = true}
url = {workspace = true}

[dependencies.reqwest]
Expand Down
Loading

0 comments on commit 84e34d0

Please sign in to comment.