diff --git a/rust/storage/src/s3.rs b/rust/storage/src/s3.rs index bcecb987038..a6dbf59a751 100644 --- a/rust/storage/src/s3.rs +++ b/rust/storage/src/s3.rs @@ -341,11 +341,57 @@ impl S3Storage { pub async fn put_bytes(&self, key: &str, bytes: Vec) -> Result<(), S3PutError> { let bytes = Arc::new(Bytes::from(bytes)); + let opts = PutOptions::default(); + self.put_object( + key, + bytes.len(), + move |range| { + let bytes = bytes.clone(); + async move { Ok(ByteStream::from(bytes.slice(range))) }.boxed() + }, + opts, + ) + .await + } - self.put_object(key, bytes.len(), move |range| { - let bytes = bytes.clone(); - async move { Ok(ByteStream::from(bytes.slice(range))) }.boxed() - }) + pub async fn put_bytes_if_not_exists( + &self, + key: &str, + bytes: Vec, + ) -> Result<(), S3PutError> { + let bytes = Arc::new(Bytes::from(bytes)); + // TODO: don't unwrap + let opts = PutOptions::new(true, None).unwrap(); + self.put_object( + key, + bytes.len(), + move |range| { + let bytes = bytes.clone(); + async move { Ok(ByteStream::from(bytes.slice(range))) }.boxed() + }, + opts, + ) + .await + } + + pub async fn put_bytes_if_match( + &self, + key: &str, + bytes: Vec, + etag: impl Into, + ) -> Result<(), S3PutError> { + let bytes = Arc::new(Bytes::from(bytes)); + // TODO: don't unwrap + let opts = PutOptions::new(false, etag.into().into()).unwrap(); + self.put_object( + key, + bytes.len(), + move |range| { + let bytes = bytes.clone(); + async move { Ok(ByteStream::from(bytes.slice(range))) }.boxed() + }, + opts, + ) .await } @@ -356,21 +402,26 @@ impl S3Storage { .len(); let path = path.to_string(); - - self.put_object(key, file_size as usize, move |range| { - let path = path.clone(); - - async move { - ByteStream::read_from() - .path(path) - .offset(range.start as u64) - .length(Length::Exact(range.len() as u64)) - .build() - .await - .map_err(|err| S3PutError::S3PutError(err.to_string())) - } - .boxed() - }) + let opts = PutOptions::default(); + self.put_object( + key, + file_size as usize, + move |range| { + let path = path.clone(); + + async move { + ByteStream::read_from() + .path(path) + .offset(range.start as u64) + .length(Length::Exact(range.len() as u64)) + .build() + .await + .map_err(|err| S3PutError::S3PutError(err.to_string())) + } + .boxed() + }, + opts, + ) .await } @@ -381,14 +432,16 @@ impl S3Storage { create_bytestream_fn: impl Fn( Range, ) -> BoxFuture<'static, Result>, + options: PutOptions, ) -> Result<(), S3PutError> { + // TODO: reorder the opts one arg ahead for clean if total_size_bytes < self.upload_part_size_bytes { return self - .oneshot_upload(key, total_size_bytes, create_bytestream_fn) + .oneshot_upload(key, total_size_bytes, create_bytestream_fn, options) .await; } - self.multipart_upload(key, total_size_bytes, create_bytestream_fn) + self.multipart_upload(key, total_size_bytes, create_bytestream_fn, options) .await } @@ -399,13 +452,26 @@ impl S3Storage { create_bytestream_fn: impl Fn( Range, ) -> BoxFuture<'static, Result>, + options: PutOptions, ) -> Result<(), S3PutError> { - self.client + let req = self + .client .put_object() .bucket(&self.bucket) .key(key) - .body(create_bytestream_fn(0..total_size_bytes).await?) - .send() + .body(create_bytestream_fn(0..total_size_bytes).await?); + + let req = match options.if_not_exists { + true => req.if_none_match('*'), + false => req, + }; + + let req = match options.if_match { + Some(etag) => req.if_match(etag), + None => req, + }; + + req.send() .await .map_err(|err| S3PutError::S3PutError(err.to_string()))?; @@ -419,6 +485,7 @@ impl S3Storage { create_bytestream_fn: impl Fn( Range, ) -> BoxFuture<'static, Result>, + options: PutOptions, ) -> Result<(), S3PutError> { let mut part_count = (total_size_bytes / self.upload_part_size_bytes) + 1; let mut size_of_last_part = total_size_bytes % self.upload_part_size_bytes; @@ -478,7 +545,8 @@ impl S3Storage { ); } - self.client + let complete_req = self + .client .complete_multipart_upload() .bucket(&self.bucket) .key(key) @@ -487,7 +555,19 @@ impl S3Storage { .set_parts(Some(upload_parts)) .build(), ) - .upload_id(&upload_id) + .upload_id(&upload_id); + + let complete_req = match options.if_not_exists { + true => complete_req.if_none_match('*'), + false => complete_req, + }; + + let complete_req = match options.if_match { + Some(etag) => complete_req.if_match(etag), + None => complete_req, + }; + + complete_req .send() .await .map_err(|err| S3PutError::S3PutError(err.to_string()))?; @@ -496,6 +576,35 @@ impl S3Storage { } } +// Internal struct to model user provided options +// for put +#[derive(Default)] +struct PutOptions { + if_not_exists: bool, + if_match: Option, +} + +#[derive(Error, Debug)] +enum PutOptionsCreateError { + #[error("If not exists and if match cannot both be used")] + IfNotExistsAndIfMatchEnabled, +} + +impl PutOptions { + fn new( + if_not_exists: bool, + if_match: Option, + ) -> Result { + if !if_not_exists && if_match.is_some() { + return Err(PutOptionsCreateError::IfNotExistsAndIfMatchEnabled); + } + Ok(PutOptions { + if_not_exists, + if_match, + }) + } +} + #[async_trait] impl Configurable for S3Storage { async fn try_from_config(config: &StorageConfig) -> Result> { @@ -747,4 +856,12 @@ mod tests { async fn test_k8s_integration_empty_file() { test_multipart_get_for_size(0).await; } + + #[test] + fn test_put_options_default() { + let default = PutOptions::default(); + + assert!(!default.if_not_exists); + assert_eq!(default.if_match, None); + } }