Skip to content

Commit

Permalink
feat: object store multi-range rw size config, single rw, better errors
Browse files Browse the repository at this point in the history
  • Loading branch information
hopeyen committed Dec 20, 2023
1 parent b962d5f commit 62703c5
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 26 deletions.
4 changes: 2 additions & 2 deletions subfile-exchange/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub enum Error {
YamlError(serde_yaml::Error),
InvalidPriceFormat(String),
ContractError(String),
ObjectStoreError(String),
ObjectStoreError(object_store::Error),
}

impl fmt::Display for Error {
Expand All @@ -34,7 +34,7 @@ impl fmt::Display for Error {
Error::YamlError(ref err) => write!(f, "YAML error: {}", err),
Error::InvalidPriceFormat(ref msg) => write!(f, "Price format error: {}", msg),
Error::ContractError(ref msg) => write!(f, "Contract call error: {}", msg),
Error::ObjectStoreError(ref msg) => write!(f, "Object store error: {}", msg),
Error::ObjectStoreError(ref err) => write!(f, "Object store error: {}", err),
}
}
}
Expand Down
97 changes: 73 additions & 24 deletions subfile-exchange/src/subfile/local_file_system.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use bytes::Bytes;
use futures::StreamExt;
use object_store::ObjectMeta;
use object_store::{path::Path, ObjectStore};
use object_store::{ObjectMeta, PutResult};

use object_store::local::LocalFileSystem;
use tokio::io::AsyncWriteExt;

use std::fs;
use std::ops::Range;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::Arc;
Expand All @@ -15,14 +16,17 @@ use crate::subfile::Error;

pub struct Store {
local_file_system: Arc<LocalFileSystem>,
read_concurrency: usize,
write_concurrency: usize,
}

impl Store {
pub fn new(path: &str) -> Result<Self, Error> {
let path = PathBuf::from_str(path).map_err(|e| Error::InvalidConfig(e.to_string()))?;
if !path.exists() || !path.is_dir() {
tracing::debug!("Store path doesn't exist or is not a directory, creating a directory at configured path");
fs::create_dir_all(&path).map_err(|e| {
Error::ObjectStoreError(format!(
Error::InvalidConfig(format!(
"Unable to create local filesystem directory structure for object store: {:?}",
e.to_string()
))
Expand All @@ -31,9 +35,11 @@ impl Store {
// As long as the provided path is correct, the following should never panic
Ok(Store {
local_file_system: Arc::new(
LocalFileSystem::new_with_prefix(path)
.map_err(|e| Error::ObjectStoreError(e.to_string()))?,
LocalFileSystem::new_with_prefix(path).map_err(Error::ObjectStoreError)?,
),
//TODO: Make configurable
read_concurrency: 16,
write_concurrency: 8,
})
}

Expand All @@ -58,17 +64,28 @@ impl Store {
.cloned()
}

pub async fn multipart_read(&self, file_name: &str) -> Result<Vec<Bytes>, Error> {
let object_meta =
self.find_object(file_name, None)
.await
.ok_or(Error::ObjectStoreError(format!(
"Did not find file {}",
file_name
)))?;
let read_concurrency = 16;
let step = object_meta.size / read_concurrency;
let ranges = (0..read_concurrency)
pub async fn range_read(&self, file_name: &str, range: Range<usize>) -> Result<Bytes, Error> {
Ok(self
.local_file_system
.get_range(&Path::from(file_name), range)
.await
.unwrap())
}

pub async fn multipart_read(
&self,
file_name: &str,
chunk_size: Option<usize>,
) -> Result<Vec<Bytes>, Error> {
let object_meta = self
.find_object(file_name, None)
.await
.ok_or(Error::DataUnavilable(format!(
"Did not find file {}",
file_name
)))?;
let step = chunk_size.unwrap_or(object_meta.size / self.read_concurrency);
let ranges = (0..(object_meta.size / step))
.map(|i| std::ops::Range::<usize> {
start: i * step,
end: (i + 1) * step,
Expand All @@ -85,17 +102,21 @@ impl Store {
}

/// Async write with concurrent uploads at a location path
pub async fn multipart_write(&self, location: &str, bytes: &[u8]) -> Result<String, Error> {
pub async fn multipart_write(
&self,
location: &str,
bytes: &[u8],
chunk_size: Option<usize>,
) -> Result<String, Error> {
let (write_id, mut write) = self
.local_file_system
.put_multipart(&Path::from(location))
.await
.unwrap();
let size = bytes.len();

let write_concurrency = 8;
let step = size / write_concurrency;
for i in 0..write_concurrency {
let step = chunk_size.unwrap_or(size / self.write_concurrency);
for i in 0..(size / step) {
let buf = &bytes[i * step..(i + 1) * step];
write.write_all(buf).await.unwrap();
}
Expand All @@ -105,12 +126,20 @@ impl Store {
Ok(write_id)
}

/// Single write at a location path
pub async fn write(&self, location: &str, bytes: &[u8]) -> Result<PutResult, Error> {
self.local_file_system
.put(&Path::from(location), bytes.to_vec().into())
.await
.map_err(Error::ObjectStoreError)
}

/// Delete the file if exists
pub async fn delete(&self, location: &str) -> Result<(), Error> {
self.local_file_system
.delete(&Path::from(location))
.await
.map_err(|e| Error::ObjectStoreError(e.to_string()))
.map_err(Error::ObjectStoreError)
}
}

Expand Down Expand Up @@ -154,17 +183,37 @@ mod tests {
let directory_pathbuf = std::env::current_dir().unwrap();
let directory = directory_pathbuf.to_str().unwrap();

// Write
// Write with adjusted concurrency
let object_store = Store::new(directory).unwrap();
let new_file_name = "tempfile";
let test_bytes = test_string.as_bytes();
let write_res = object_store
.multipart_write(new_file_name, test_bytes)
.multipart_write(new_file_name, test_bytes, None)
.await;
assert!(write_res.is_ok());

// Read
let read_res: Vec<Bytes> = object_store.multipart_read(new_file_name).await.unwrap();
// Read with fixed concurrency
let read_res: Vec<Bytes> = object_store
.multipart_read(new_file_name, Some(CHUNK_SIZE.try_into().unwrap()))
.await
.unwrap();
let flattened_vec: Vec<u8> = read_res.into_iter().flat_map(|b| b.to_vec()).collect();
assert!(flattened_vec.as_slice() == test_bytes);

// Write with fixed concurrency
let object_store = Store::new(directory).unwrap();
let new_file_name = "tempfile";
let test_bytes = test_string.as_bytes();
let write_res = object_store
.multipart_write(new_file_name, test_bytes, None)
.await;
assert!(write_res.is_ok());

// Read with adjusted concurrency
let read_res: Vec<Bytes> = object_store
.multipart_read(new_file_name, None)
.await
.unwrap();
let flattened_vec: Vec<u8> = read_res.into_iter().flat_map(|b| b.to_vec()).collect();
assert!(flattened_vec.as_slice() == test_bytes);

Expand Down

0 comments on commit 62703c5

Please sign in to comment.