Skip to content

Commit

Permalink
[ENH] Add put if not exists and put if match to s3
Browse files Browse the repository at this point in the history
  • Loading branch information
HammadB committed Dec 11, 2024
1 parent 96786b6 commit bb70ba7
Showing 1 changed file with 143 additions and 26 deletions.
169 changes: 143 additions & 26 deletions rust/storage/src/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,11 +341,57 @@ impl S3Storage {

pub async fn put_bytes(&self, key: &str, bytes: Vec<u8>) -> Result<(), S3PutError> {
let bytes = Arc::new(Bytes::from(bytes));
let opts = PutOptions::default();
self.put_object(
key,
bytes.len(),
move |range| {
let bytes = bytes.clone();
async move { Ok(ByteStream::from(bytes.slice(range))) }.boxed()
},
opts,
)
.await
}

self.put_object(key, bytes.len(), move |range| {
let bytes = bytes.clone();
async move { Ok(ByteStream::from(bytes.slice(range))) }.boxed()
})
pub async fn put_bytes_if_not_exists(
&self,
key: &str,
bytes: Vec<u8>,
) -> Result<(), S3PutError> {
let bytes = Arc::new(Bytes::from(bytes));
// TODO: don't unwrap
let opts = PutOptions::new(true, None).unwrap();
self.put_object(
key,
bytes.len(),
move |range| {
let bytes = bytes.clone();
async move { Ok(ByteStream::from(bytes.slice(range))) }.boxed()
},
opts,
)
.await
}

pub async fn put_bytes_if_match(
&self,
key: &str,
bytes: Vec<u8>,
etag: impl Into<String>,
) -> Result<(), S3PutError> {
let bytes = Arc::new(Bytes::from(bytes));
// TODO: don't unwrap
let opts = PutOptions::new(false, etag.into().into()).unwrap();
self.put_object(
key,
bytes.len(),
move |range| {
let bytes = bytes.clone();
async move { Ok(ByteStream::from(bytes.slice(range))) }.boxed()
},
opts,
)
.await
}

Expand All @@ -356,21 +402,26 @@ impl S3Storage {
.len();

let path = path.to_string();

self.put_object(key, file_size as usize, move |range| {
let path = path.clone();

async move {
ByteStream::read_from()
.path(path)
.offset(range.start as u64)
.length(Length::Exact(range.len() as u64))
.build()
.await
.map_err(|err| S3PutError::S3PutError(err.to_string()))
}
.boxed()
})
let opts = PutOptions::default();
self.put_object(
key,
file_size as usize,
move |range| {
let path = path.clone();

async move {
ByteStream::read_from()
.path(path)
.offset(range.start as u64)
.length(Length::Exact(range.len() as u64))
.build()
.await
.map_err(|err| S3PutError::S3PutError(err.to_string()))
}
.boxed()
},
opts,
)
.await
}

Expand All @@ -381,14 +432,16 @@ impl S3Storage {
create_bytestream_fn: impl Fn(
Range<usize>,
) -> BoxFuture<'static, Result<ByteStream, S3PutError>>,
options: PutOptions,
) -> Result<(), S3PutError> {
// TODO: reorder the opts one arg ahead for clean
if total_size_bytes < self.upload_part_size_bytes {
return self
.oneshot_upload(key, total_size_bytes, create_bytestream_fn)
.oneshot_upload(key, total_size_bytes, create_bytestream_fn, options)
.await;
}

self.multipart_upload(key, total_size_bytes, create_bytestream_fn)
self.multipart_upload(key, total_size_bytes, create_bytestream_fn, options)
.await
}

Expand All @@ -399,13 +452,26 @@ impl S3Storage {
create_bytestream_fn: impl Fn(
Range<usize>,
) -> BoxFuture<'static, Result<ByteStream, S3PutError>>,
options: PutOptions,
) -> Result<(), S3PutError> {
self.client
let req = self
.client
.put_object()
.bucket(&self.bucket)
.key(key)
.body(create_bytestream_fn(0..total_size_bytes).await?)
.send()
.body(create_bytestream_fn(0..total_size_bytes).await?);

let req = match options.if_not_exists {
true => req.if_none_match('*'),
false => req,
};

let req = match options.if_match {
Some(etag) => req.if_match(etag),
None => req,
};

req.send()
.await
.map_err(|err| S3PutError::S3PutError(err.to_string()))?;

Expand All @@ -419,6 +485,7 @@ impl S3Storage {
create_bytestream_fn: impl Fn(
Range<usize>,
) -> BoxFuture<'static, Result<ByteStream, S3PutError>>,
options: PutOptions,
) -> Result<(), S3PutError> {
let mut part_count = (total_size_bytes / self.upload_part_size_bytes) + 1;
let mut size_of_last_part = total_size_bytes % self.upload_part_size_bytes;
Expand Down Expand Up @@ -478,7 +545,8 @@ impl S3Storage {
);
}

self.client
let complete_req = self
.client
.complete_multipart_upload()
.bucket(&self.bucket)
.key(key)
Expand All @@ -487,7 +555,19 @@ impl S3Storage {
.set_parts(Some(upload_parts))
.build(),
)
.upload_id(&upload_id)
.upload_id(&upload_id);

let complete_req = match options.if_not_exists {
true => complete_req.if_none_match('*'),
false => complete_req,
};

let complete_req = match options.if_match {
Some(etag) => complete_req.if_match(etag),
None => complete_req,
};

complete_req
.send()
.await
.map_err(|err| S3PutError::S3PutError(err.to_string()))?;
Expand All @@ -496,6 +576,35 @@ impl S3Storage {
}
}

// Internal struct to model user provided options
// for put
#[derive(Default)]
struct PutOptions {
if_not_exists: bool,
if_match: Option<String>,
}

#[derive(Error, Debug)]
enum PutOptionsCreateError {
#[error("If not exists and if match cannot both be used")]
IfNotExistsAndIfMatchEnabled,
}

impl PutOptions {
fn new(
if_not_exists: bool,
if_match: Option<String>,
) -> Result<PutOptions, PutOptionsCreateError> {
if !if_not_exists && if_match.is_some() {
return Err(PutOptionsCreateError::IfNotExistsAndIfMatchEnabled);
}
Ok(PutOptions {
if_not_exists,
if_match,
})
}
}

#[async_trait]
impl Configurable<StorageConfig> for S3Storage {
async fn try_from_config(config: &StorageConfig) -> Result<Self, Box<dyn ChromaError>> {
Expand Down Expand Up @@ -747,4 +856,12 @@ mod tests {
async fn test_k8s_integration_empty_file() {
test_multipart_get_for_size(0).await;
}

#[test]
fn test_put_options_default() {
let default = PutOptions::default();

assert!(!default.if_not_exists);
assert_eq!(default.if_match, None);
}
}

0 comments on commit bb70ba7

Please sign in to comment.