Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ENH] Add put if not exists and put if match to s3 #3284

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 145 additions & 26 deletions rust/storage/src/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,11 +341,57 @@ impl S3Storage {

pub async fn put_bytes(&self, key: &str, bytes: Vec<u8>) -> Result<(), S3PutError> {
let bytes = Arc::new(Bytes::from(bytes));
let opts = PutOptions::default();
self.put_object(
key,
bytes.len(),
move |range| {
let bytes = bytes.clone();
async move { Ok(ByteStream::from(bytes.slice(range))) }.boxed()
},
opts,
)
.await
}

self.put_object(key, bytes.len(), move |range| {
let bytes = bytes.clone();
async move { Ok(ByteStream::from(bytes.slice(range))) }.boxed()
})
pub async fn put_bytes_if_not_exists(
&self,
key: &str,
bytes: Vec<u8>,
) -> Result<(), S3PutError> {
let bytes = Arc::new(Bytes::from(bytes));
// TODO: don't unwrap
let opts = PutOptions::new(true, None).unwrap();
self.put_object(
key,
bytes.len(),
move |range| {
let bytes = bytes.clone();
async move { Ok(ByteStream::from(bytes.slice(range))) }.boxed()
},
opts,
)
.await
}

pub async fn put_bytes_if_match(
&self,
key: &str,
bytes: Vec<u8>,
etag: impl Into<String>,
) -> Result<(), S3PutError> {
let bytes = Arc::new(Bytes::from(bytes));
// TODO: don't unwrap
let opts = PutOptions::new(false, etag.into().into()).unwrap();
self.put_object(
key,
bytes.len(),
move |range| {
let bytes = bytes.clone();
async move { Ok(ByteStream::from(bytes.slice(range))) }.boxed()
},
opts,
)
.await
}

Expand All @@ -356,21 +402,26 @@ impl S3Storage {
.len();

let path = path.to_string();

self.put_object(key, file_size as usize, move |range| {
let path = path.clone();

async move {
ByteStream::read_from()
.path(path)
.offset(range.start as u64)
.length(Length::Exact(range.len() as u64))
.build()
.await
.map_err(|err| S3PutError::S3PutError(err.to_string()))
}
.boxed()
})
let opts = PutOptions::default();
self.put_object(
key,
file_size as usize,
move |range| {
let path = path.clone();

async move {
ByteStream::read_from()
.path(path)
.offset(range.start as u64)
.length(Length::Exact(range.len() as u64))
.build()
.await
.map_err(|err| S3PutError::S3PutError(err.to_string()))
}
.boxed()
},
opts,
)
.await
}

Expand All @@ -381,14 +432,16 @@ impl S3Storage {
create_bytestream_fn: impl Fn(
Range<usize>,
) -> BoxFuture<'static, Result<ByteStream, S3PutError>>,
options: PutOptions,
) -> Result<(), S3PutError> {
// TODO: reorder the opts one arg ahead for clean
if total_size_bytes < self.upload_part_size_bytes {
return self
.oneshot_upload(key, total_size_bytes, create_bytestream_fn)
.oneshot_upload(key, total_size_bytes, create_bytestream_fn, options)
.await;
}

self.multipart_upload(key, total_size_bytes, create_bytestream_fn)
self.multipart_upload(key, total_size_bytes, create_bytestream_fn, options)
.await
}

Expand All @@ -399,13 +452,26 @@ impl S3Storage {
create_bytestream_fn: impl Fn(
Range<usize>,
) -> BoxFuture<'static, Result<ByteStream, S3PutError>>,
options: PutOptions,
) -> Result<(), S3PutError> {
self.client
let req = self
.client
.put_object()
.bucket(&self.bucket)
.key(key)
.body(create_bytestream_fn(0..total_size_bytes).await?)
.send()
.body(create_bytestream_fn(0..total_size_bytes).await?);

let req = match options.if_not_exists {
true => req.if_none_match('*'),
false => req,
};

let req = match options.if_match {
Some(etag) => req.if_match(etag),
None => req,
};

req.send()
.await
.map_err(|err| S3PutError::S3PutError(err.to_string()))?;

Expand All @@ -419,6 +485,7 @@ impl S3Storage {
create_bytestream_fn: impl Fn(
Range<usize>,
) -> BoxFuture<'static, Result<ByteStream, S3PutError>>,
options: PutOptions,
) -> Result<(), S3PutError> {
let mut part_count = (total_size_bytes / self.upload_part_size_bytes) + 1;
let mut size_of_last_part = total_size_bytes % self.upload_part_size_bytes;
Expand Down Expand Up @@ -478,7 +545,8 @@ impl S3Storage {
);
}

self.client
let complete_req = self
.client
.complete_multipart_upload()
.bucket(&self.bucket)
.key(key)
Expand All @@ -487,7 +555,19 @@ impl S3Storage {
.set_parts(Some(upload_parts))
.build(),
)
.upload_id(&upload_id)
.upload_id(&upload_id);

let complete_req = match options.if_not_exists {
true => complete_req.if_none_match('*'),
false => complete_req,
};

let complete_req = match options.if_match {
Some(etag) => complete_req.if_match(etag),
None => complete_req,
};

complete_req
.send()
.await
.map_err(|err| S3PutError::S3PutError(err.to_string()))?;
Expand All @@ -496,6 +576,35 @@ impl S3Storage {
}
}

// Internal struct to model user provided options
// for put
#[derive(Default)]
struct PutOptions {
if_not_exists: bool,
if_match: Option<String>,
}

#[derive(Error, Debug)]
enum PutOptionsCreateError {
#[error("If not exists and if match cannot both be used")]
IfNotExistsAndIfMatchEnabled,
}

impl PutOptions {
fn new(
if_not_exists: bool,
if_match: Option<String>,
) -> Result<PutOptions, PutOptionsCreateError> {
if !if_not_exists && if_match.is_some() {
return Err(PutOptionsCreateError::IfNotExistsAndIfMatchEnabled);
}
Ok(PutOptions {
if_not_exists,
if_match,
})
}
}

#[async_trait]
impl Configurable<StorageConfig> for S3Storage {
async fn try_from_config(config: &StorageConfig) -> Result<Self, Box<dyn ChromaError>> {
Expand Down Expand Up @@ -747,4 +856,14 @@ mod tests {
async fn test_k8s_integration_empty_file() {
test_multipart_get_for_size(0).await;
}

#[test]
fn test_put_options_default() {
let default = PutOptions::default();

assert!(!default.if_not_exists);
assert_eq!(default.if_match, None);
}

// TODO: both of these are trivially testable, add test_k8s_integration tests
}
Loading