diff --git a/subfile-exchange/src/errors.rs b/subfile-exchange/src/errors.rs index 6f37644..f4a4929 100644 --- a/subfile-exchange/src/errors.rs +++ b/subfile-exchange/src/errors.rs @@ -15,7 +15,7 @@ pub enum Error { YamlError(serde_yaml::Error), InvalidPriceFormat(String), ContractError(String), - ObjectStoreError(String), + ObjectStoreError(object_store::Error), } impl fmt::Display for Error { @@ -34,7 +34,7 @@ impl fmt::Display for Error { Error::YamlError(ref err) => write!(f, "YAML error: {}", err), Error::InvalidPriceFormat(ref msg) => write!(f, "Price format error: {}", msg), Error::ContractError(ref msg) => write!(f, "Contract call error: {}", msg), - Error::ObjectStoreError(ref msg) => write!(f, "Object store error: {}", msg), + Error::ObjectStoreError(ref err) => write!(f, "Object store error: {}", err), } } } diff --git a/subfile-exchange/src/subfile/local_file_system.rs b/subfile-exchange/src/subfile/local_file_system.rs index 48b3b6e..a2d5b85 100644 --- a/subfile-exchange/src/subfile/local_file_system.rs +++ b/subfile-exchange/src/subfile/local_file_system.rs @@ -1,12 +1,13 @@ use bytes::Bytes; use futures::StreamExt; -use object_store::ObjectMeta; use object_store::{path::Path, ObjectStore}; +use object_store::{ObjectMeta, PutResult}; use object_store::local::LocalFileSystem; use tokio::io::AsyncWriteExt; use std::fs; +use std::ops::Range; use std::path::PathBuf; use std::str::FromStr; use std::sync::Arc; @@ -15,14 +16,17 @@ use crate::subfile::Error; pub struct Store { local_file_system: Arc, + read_concurrency: usize, + write_concurrency: usize, } impl Store { pub fn new(path: &str) -> Result { let path = PathBuf::from_str(path).map_err(|e| Error::InvalidConfig(e.to_string()))?; if !path.exists() || !path.is_dir() { + tracing::debug!("Store path doesn't exist or is not a directory, creating a directory at configured path"); fs::create_dir_all(&path).map_err(|e| { - Error::ObjectStoreError(format!( + Error::InvalidConfig(format!( "Unable to create local filesystem directory structure for object store: {:?}", e.to_string() )) @@ -31,9 +35,11 @@ impl Store { // As long as the provided path is correct, the following should never panic Ok(Store { local_file_system: Arc::new( - LocalFileSystem::new_with_prefix(path) - .map_err(|e| Error::ObjectStoreError(e.to_string()))?, + LocalFileSystem::new_with_prefix(path).map_err(Error::ObjectStoreError)?, ), + //TODO: Make configurable + read_concurrency: 16, + write_concurrency: 8, }) } @@ -58,17 +64,28 @@ impl Store { .cloned() } - pub async fn multipart_read(&self, file_name: &str) -> Result, Error> { - let object_meta = - self.find_object(file_name, None) - .await - .ok_or(Error::ObjectStoreError(format!( - "Did not find file {}", - file_name - )))?; - let read_concurrency = 16; - let step = object_meta.size / read_concurrency; - let ranges = (0..read_concurrency) + pub async fn range_read(&self, file_name: &str, range: Range) -> Result { + Ok(self + .local_file_system + .get_range(&Path::from(file_name), range) + .await + .unwrap()) + } + + pub async fn multipart_read( + &self, + file_name: &str, + chunk_size: Option, + ) -> Result, Error> { + let object_meta = self + .find_object(file_name, None) + .await + .ok_or(Error::DataUnavilable(format!( + "Did not find file {}", + file_name + )))?; + let step = chunk_size.unwrap_or(object_meta.size / self.read_concurrency); + let ranges = (0..(object_meta.size / step)) .map(|i| std::ops::Range:: { start: i * step, end: (i + 1) * step, @@ -85,7 +102,12 @@ impl Store { } /// Async write with concurrent uploads at a location path - pub async fn multipart_write(&self, location: &str, bytes: &[u8]) -> Result { + pub async fn multipart_write( + &self, + location: &str, + bytes: &[u8], + chunk_size: Option, + ) -> Result { let (write_id, mut write) = self .local_file_system .put_multipart(&Path::from(location)) @@ -93,9 +115,8 @@ impl Store { .unwrap(); let size = bytes.len(); - let write_concurrency = 8; - let step = size / write_concurrency; - for i in 0..write_concurrency { + let step = chunk_size.unwrap_or(size / self.write_concurrency); + for i in 0..(size / step) { let buf = &bytes[i * step..(i + 1) * step]; write.write_all(buf).await.unwrap(); } @@ -105,12 +126,20 @@ impl Store { Ok(write_id) } + /// Single write at a location path + pub async fn write(&self, location: &str, bytes: &[u8]) -> Result { + self.local_file_system + .put(&Path::from(location), bytes.to_vec().into()) + .await + .map_err(Error::ObjectStoreError) + } + /// Delete the file if exists pub async fn delete(&self, location: &str) -> Result<(), Error> { self.local_file_system .delete(&Path::from(location)) .await - .map_err(|e| Error::ObjectStoreError(e.to_string())) + .map_err(Error::ObjectStoreError) } } @@ -154,17 +183,37 @@ mod tests { let directory_pathbuf = std::env::current_dir().unwrap(); let directory = directory_pathbuf.to_str().unwrap(); - // Write + // Write with adjusted concurrency let object_store = Store::new(directory).unwrap(); let new_file_name = "tempfile"; let test_bytes = test_string.as_bytes(); let write_res = object_store - .multipart_write(new_file_name, test_bytes) + .multipart_write(new_file_name, test_bytes, None) .await; assert!(write_res.is_ok()); - // Read - let read_res: Vec = object_store.multipart_read(new_file_name).await.unwrap(); + // Read with fixed concurrency + let read_res: Vec = object_store + .multipart_read(new_file_name, Some(CHUNK_SIZE.try_into().unwrap())) + .await + .unwrap(); + let flattened_vec: Vec = read_res.into_iter().flat_map(|b| b.to_vec()).collect(); + assert!(flattened_vec.as_slice() == test_bytes); + + // Write with fixed concurrency + let object_store = Store::new(directory).unwrap(); + let new_file_name = "tempfile"; + let test_bytes = test_string.as_bytes(); + let write_res = object_store + .multipart_write(new_file_name, test_bytes, None) + .await; + assert!(write_res.is_ok()); + + // Read with adjusted concurrency + let read_res: Vec = object_store + .multipart_read(new_file_name, None) + .await + .unwrap(); let flattened_vec: Vec = read_res.into_iter().flat_map(|b| b.to_vec()).collect(); assert!(flattened_vec.as_slice() == test_bytes);