Skip to content

Commit

Permalink
[FEAT] Enable Requester Pay for S3 reads (#1856)
Browse files Browse the repository at this point in the history
This PR enables reads from S3 buckets with a requester pays policy. This
configuration is set in the S3 IO Config.

Todo: integration tests? will we need to set up a requester pays bucket
for this? I did test it on the 1tr row parquets tho, so it is working.
  • Loading branch information
colin-ho authored Feb 12, 2024
1 parent 05e3e3f commit 8565272
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 2 deletions.
3 changes: 3 additions & 0 deletions daft/daft.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,7 @@ class S3Config:
anonymous: bool
verify_ssl: bool
check_hostname_ssl: bool
requester_pays: bool | None

def __init__(
self,
Expand All @@ -439,6 +440,7 @@ class S3Config:
anonymous: bool | None = None,
verify_ssl: bool | None = None,
check_hostname_ssl: bool | None = None,
requester_pays: bool | None = None,
): ...
def replace(
self,
Expand All @@ -456,6 +458,7 @@ class S3Config:
anonymous: bool | None = None,
verify_ssl: bool | None = None,
check_hostname_ssl: bool | None = None,
requester_pays: bool | None = None,
) -> S3Config:
"""Replaces values if provided, returning a new S3Config"""
...
Expand Down
11 changes: 11 additions & 0 deletions src/common/io-config/src/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use crate::config;
/// anonymous: Whether or not to use "anonymous mode", which will access S3 without any credentials
/// verify_ssl: Whether or not to verify ssl certificates, which will access S3 without checking if the certs are valid, defaults to True
/// check_hostname_ssl: Whether or not to verify the hostname when verifying ssl certificates, this was the legacy behavior for openssl, defaults to True
/// requester_pays: Whether or not the authenticated user will assume transfer costs, which is required by some providers of bulk data, defaults to False
///
/// Example:
/// >>> io_config = IOConfig(s3=S3Config(key_id="xxx", access_key="xxx"))
Expand Down Expand Up @@ -182,6 +183,7 @@ impl S3Config {
anonymous: Option<bool>,
verify_ssl: Option<bool>,
check_hostname_ssl: Option<bool>,
requester_pays: Option<bool>,
) -> Self {
let def = crate::S3Config::default();
S3Config {
Expand All @@ -202,6 +204,7 @@ impl S3Config {
anonymous: anonymous.unwrap_or(def.anonymous),
verify_ssl: verify_ssl.unwrap_or(def.verify_ssl),
check_hostname_ssl: check_hostname_ssl.unwrap_or(def.check_hostname_ssl),
requester_pays: requester_pays.unwrap_or(def.requester_pays),
},
}
}
Expand All @@ -223,6 +226,7 @@ impl S3Config {
anonymous: Option<bool>,
verify_ssl: Option<bool>,
check_hostname_ssl: Option<bool>,
requester_pays: Option<bool>,
) -> Self {
S3Config {
config: crate::S3Config {
Expand All @@ -242,6 +246,7 @@ impl S3Config {
anonymous: anonymous.unwrap_or(self.config.anonymous),
verify_ssl: verify_ssl.unwrap_or(self.config.verify_ssl),
check_hostname_ssl: check_hostname_ssl.unwrap_or(self.config.check_hostname_ssl),
requester_pays: requester_pays.unwrap_or(self.config.requester_pays),
},
}
}
Expand Down Expand Up @@ -333,6 +338,12 @@ impl S3Config {
pub fn check_hostname_ssl(&self) -> PyResult<Option<bool>> {
Ok(Some(self.config.check_hostname_ssl))
}

/// AWS Requester Pays
#[getter]
pub fn requester_pays(&self) -> PyResult<Option<bool>> {
Ok(Some(self.config.requester_pays))
}
}

#[pymethods]
Expand Down
9 changes: 7 additions & 2 deletions src/common/io-config/src/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub struct S3Config {
pub anonymous: bool,
pub verify_ssl: bool,
pub check_hostname_ssl: bool,
pub requester_pays: bool,
}

impl S3Config {
Expand Down Expand Up @@ -57,6 +58,7 @@ impl S3Config {
res.push(format!("Anonymous = {}", self.anonymous));
res.push(format!("Verify SSL = {}", self.verify_ssl));
res.push(format!("Check hostname SSL = {}", self.check_hostname_ssl));
res.push(format!("Requester pays = {}", self.requester_pays));
res
}
}
Expand All @@ -80,6 +82,7 @@ impl Default for S3Config {
anonymous: false,
verify_ssl: true,
check_hostname_ssl: true,
requester_pays: false,
}
}
}
Expand All @@ -102,7 +105,8 @@ impl Display for S3Config {
retry_mode: {:?},
anonymous: {},
verify_ssl: {},
check_hostname_ssl: {}",
check_hostname_ssl: {}
requester_pays: {}",
self.region_name,
self.endpoint_url,
self.key_id,
Expand All @@ -116,7 +120,8 @@ impl Display for S3Config {
self.retry_mode,
self.anonymous,
self.verify_ssl,
self.check_hostname_ssl
self.check_hostname_ssl,
self.requester_pays
)
}
}
18 changes: 18 additions & 0 deletions src/daft-io/src/s3_like.rs
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,12 @@ impl S3LikeSource {
.bucket(bucket)
.key(key);

let request = if self.s3_config.requester_pays {
request.request_payer(s3::types::RequestPayer::Requester)
} else {
request
};

let request = match &range {
None => request,
Some(range) => request.range(format!(
Expand Down Expand Up @@ -552,6 +558,12 @@ impl S3LikeSource {
.bucket(bucket)
.key(key);

let request = if self.s3_config.requester_pays {
request.request_payer(s3::types::RequestPayer::Requester)
} else {
request
};

let response = if self.anonymous {
request
.customize_middleware()
Expand Down Expand Up @@ -643,6 +655,12 @@ impl S3LikeSource {
} else {
request
};
let request = if self.s3_config.requester_pays {
request.request_payer(s3::types::RequestPayer::Requester)
} else {
request
};

let response = if self.anonymous {
request
.customize_middleware()
Expand Down

0 comments on commit 8565272

Please sign in to comment.