Skip to content

Commit

Permalink
feat: local store multi-range read, write, delete with unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
hopeyen committed Dec 20, 2023
1 parent 3610af7 commit b962d5f
Showing 1 changed file with 97 additions and 0 deletions.
97 changes: 97 additions & 0 deletions subfile-exchange/src/subfile/local_file_system.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use bytes::Bytes;
use futures::StreamExt;
use object_store::ObjectMeta;
use object_store::{path::Path, ObjectStore};

use object_store::local::LocalFileSystem;
use tokio::io::AsyncWriteExt;

use std::fs;
use std::path::PathBuf;
Expand Down Expand Up @@ -46,10 +48,76 @@ impl Store {
}
Ok(objects)
}

/// Find a specific object by file name with optional prefix
pub async fn find_object(&self, file_name: &str, prefix: Option<&Path>) -> Option<ObjectMeta> {
let listed = self.list(prefix).await.unwrap();
listed
.iter()
.find(|obj| obj.location.to_string() == file_name)
.cloned()
}

pub async fn multipart_read(&self, file_name: &str) -> Result<Vec<Bytes>, Error> {
let object_meta =
self.find_object(file_name, None)
.await
.ok_or(Error::ObjectStoreError(format!(
"Did not find file {}",
file_name
)))?;
let read_concurrency = 16;
let step = object_meta.size / read_concurrency;
let ranges = (0..read_concurrency)
.map(|i| std::ops::Range::<usize> {
start: i * step,
end: (i + 1) * step,
})
.collect::<Vec<std::ops::Range<usize>>>();

let result = self
.local_file_system
.get_ranges(&Path::from(file_name), ranges.as_slice())
.await
.unwrap();

Ok(result)
}

/// Async write with concurrent uploads at a location path
pub async fn multipart_write(&self, location: &str, bytes: &[u8]) -> Result<String, Error> {
let (write_id, mut write) = self
.local_file_system
.put_multipart(&Path::from(location))
.await
.unwrap();
let size = bytes.len();

let write_concurrency = 8;
let step = size / write_concurrency;
for i in 0..write_concurrency {
let buf = &bytes[i * step..(i + 1) * step];
write.write_all(buf).await.unwrap();
}
write.flush().await.unwrap();
write.shutdown().await.unwrap();
drop(write);
Ok(write_id)
}

/// Delete the file if exists
pub async fn delete(&self, location: &str) -> Result<(), Error> {
self.local_file_system
.delete(&Path::from(location))
.await
.map_err(|e| Error::ObjectStoreError(e.to_string()))
}
}

#[cfg(test)]
mod tests {
use rand::{distributions::DistString, thread_rng};

use crate::{
subfile::local_file_system::*,
test_util::{create_random_temp_file, CHUNK_SIZE},
Expand All @@ -74,4 +142,33 @@ mod tests {

drop(temp_file);
}

#[tokio::test]
async fn test_local_rw() {
// Create random files
let file_size = CHUNK_SIZE * 25;
let mut rng = thread_rng();
let test_string = rand::distributions::Alphanumeric
.sample_string(&mut rng, file_size.try_into().unwrap());

let directory_pathbuf = std::env::current_dir().unwrap();
let directory = directory_pathbuf.to_str().unwrap();

// Write
let object_store = Store::new(directory).unwrap();
let new_file_name = "tempfile";
let test_bytes = test_string.as_bytes();
let write_res = object_store
.multipart_write(new_file_name, test_bytes)
.await;
assert!(write_res.is_ok());

// Read
let read_res: Vec<Bytes> = object_store.multipart_read(new_file_name).await.unwrap();
let flattened_vec: Vec<u8> = read_res.into_iter().flat_map(|b| b.to_vec()).collect();
assert!(flattened_vec.as_slice() == test_bytes);

// Delete
assert!(object_store.delete(new_file_name).await.is_ok());
}
}

0 comments on commit b962d5f

Please sign in to comment.