Skip to content

Commit

Permalink
feat: object store for local file system - list objects, update e2e t…
Browse files Browse the repository at this point in the history
…ests
  • Loading branch information
hopeyen committed Dec 20, 2023
1 parent 7a88209 commit 3610af7
Show file tree
Hide file tree
Showing 8 changed files with 172 additions and 13 deletions.
75 changes: 75 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions subfile-exchange/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ hyper = { version = "0.14.27", features = [ "server" ]}
ipfs-api-backend-hyper = "0.6"
ipfs-api-prelude = "0.6"
merkle-cbt = "0.3.2"
object_store = {version = "0.8.0", features = [ "http", "aws", "gcp", "azure" ]}
rand = "0.8.4"
reqwest = { version = "0.11", features = ["json", "stream", "multipart"] }
rustls = "0.21.8"
Expand Down
4 changes: 3 additions & 1 deletion subfile-exchange/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ pub enum Error {
YamlError(serde_yaml::Error),
InvalidPriceFormat(String),
ContractError(String),
ObjectStoreError(String),
}

impl fmt::Display for Error {
Expand All @@ -32,7 +33,8 @@ impl fmt::Display for Error {
Error::JsonError(ref err) => write!(f, "JSON error: {}", err),
Error::YamlError(ref err) => write!(f, "YAML error: {}", err),
Error::InvalidPriceFormat(ref msg) => write!(f, "Price format error: {}", msg),
Error::ContractError(ref msg) => write!(f, "Price format error: {}", msg),
Error::ContractError(ref msg) => write!(f, "Contract call error: {}", msg),
Error::ObjectStoreError(ref msg) => write!(f, "Object store error: {}", msg),
}
}
}
Expand Down
77 changes: 77 additions & 0 deletions subfile-exchange/src/subfile/local_file_system.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
use futures::StreamExt;
use object_store::ObjectMeta;
use object_store::{path::Path, ObjectStore};

use object_store::local::LocalFileSystem;

use std::fs;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::Arc;

use crate::subfile::Error;

pub struct Store {
local_file_system: Arc<LocalFileSystem>,
}

impl Store {
pub fn new(path: &str) -> Result<Self, Error> {
let path = PathBuf::from_str(path).map_err(|e| Error::InvalidConfig(e.to_string()))?;
if !path.exists() || !path.is_dir() {
fs::create_dir_all(&path).map_err(|e| {
Error::ObjectStoreError(format!(
"Unable to create local filesystem directory structure for object store: {:?}",
e.to_string()
))
})?
}
// As long as the provided path is correct, the following should never panic
Ok(Store {
local_file_system: Arc::new(
LocalFileSystem::new_with_prefix(path)
.map_err(|e| Error::ObjectStoreError(e.to_string()))?,
),
})
}

/// List out all files in the path, optionally filtered by a prefix to the filesystem
pub async fn list(&self, prefix: Option<&Path>) -> Result<Vec<ObjectMeta>, Error> {
let mut list_stream = self.local_file_system.list(prefix);

let mut objects = vec![];
while let Ok(Some(meta)) = list_stream.next().await.transpose() {
tracing::trace!("File name: {}, size: {}", meta.location, meta.size);
objects.push(meta.clone());
}
Ok(objects)
}
}

#[cfg(test)]
mod tests {
use crate::{
subfile::local_file_system::*,
test_util::{create_random_temp_file, CHUNK_SIZE},
};

#[tokio::test]
async fn test_local_list() {
let file_size = CHUNK_SIZE * 25;
let (temp_file, temp_path) = create_random_temp_file(file_size as usize).unwrap();

let path = std::path::Path::new(&temp_path);
let readdir = path.parent().unwrap().to_str().unwrap();
let file_name = path.file_name().unwrap().to_str().unwrap();

let object_store = Store::new(readdir).unwrap();
let res = object_store.list(None).await.unwrap();
let found_obj = res
.iter()
.find(|obj| obj.location.to_string() == file_name)
.unwrap();
assert!(found_obj.size == file_size as usize);

drop(temp_file);
}
}
1 change: 1 addition & 0 deletions subfile-exchange/src/subfile/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
pub mod file_hasher;
pub mod file_reader;
pub mod ipfs;
pub mod local_file_system;
pub mod subfile_reader;

use std::{
Expand Down
5 changes: 2 additions & 3 deletions subfile-exchange/src/subfile_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,8 @@ impl SubfileDownloader {
)
.await;

let transaction_manager = TransactionManager::new(&args.provider, wallet.clone())
.await
.expect("Initiate Transaction manager");
//TODO: Factor away from client, Transactions could be a separate entity
let transaction_manager = TransactionManager::new(&args.provider, wallet.clone()).await;
tracing::info!(
transaction_manager = tracing::field::debug(&transaction_manager),
"transaction_manager"
Expand Down
21 changes: 12 additions & 9 deletions subfile-exchange/tests/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ mod tests {
std::env::set_var("RUST_LOG", "off,subfile_exchange=debug,file_transfer=trace");
subfile_exchange::config::init_tracing(String::from("pretty")).unwrap();

let server_0 = "http://0.0.0.0:5677";
let server_1 = "http://0.0.0.0:5679";

let chunk_file_hash_a = "QmeKabcCQBtgU6QjM3rp3w6pDHFW4r54ee89nGdhuyDuhi".to_string();
let chunk_file_hash_b = "QmeE38uPSqT5XuHfM8X2JZAYgDCEwmDyMYULmZaRnNqPCj".to_string();
let chunk_file_hash_c = "QmWs8dkshZ7abxFYQ3h9ie1Em7SqzAkwtVJXaBapwEWqR9".to_string();
Expand All @@ -25,7 +28,7 @@ mod tests {

let indexer_0: IndexerEndpoint = (
"0xead22a75679608952db6e85537fbfdca02dae9cb".to_string(),
"http://0.0.0.0:5678".to_string(),
server_0.to_string(),
);
let indexer_1: IndexerEndpoint = (
"0x19804e50af1b72db4ce22a3c028e80c78d75af62".to_string(),
Expand All @@ -36,8 +39,10 @@ mod tests {
let mut server_process_0 = Command::new("cargo")
.arg("run")
.arg("-p")
.arg("subfile-exchange")
.arg("server")
.arg("subfile-service")
.arg("--")
.arg("--port")
.arg("5677")
.arg("--mnemonic")
.arg("sheriff obscure trick beauty army fat wink legal flee leader section suit")
.arg("--subfiles")
Expand All @@ -48,8 +53,8 @@ mod tests {
let mut server_process_1 = Command::new("cargo")
.arg("run")
.arg("-p")
.arg("subfile-exchange")
.arg("server")
.arg("subfile-service")
.arg("--")
.arg("--mnemonic")
.arg("ice palace drill gadget biology glow tray equip heavy wolf toddler menu")
.arg("--host")
Expand All @@ -66,8 +71,6 @@ mod tests {

tracing::debug!("Server initializing, wait 10 seconds...");
tokio::time::sleep(Duration::from_secs(10)).await;
let server_0 = "http://0.0.0.0:5678";
let server_1 = "http://0.0.0.0:5679";
let _ = server_ready(server_0).await;
let _ = server_ready(server_1).await;

Expand All @@ -86,7 +89,7 @@ mod tests {
.await;
assert!(endpoints.len() == 1);
assert!(endpoints.first().unwrap().0 == "0xead22a75679608952db6e85537fbfdca02dae9cb");
assert!(endpoints.first().unwrap().1 == "http://0.0.0.0:5678");
assert!(endpoints.first().unwrap().1 == server_0);

// 3.2 find subfile_1 with server 0 and 1, get server 1
let endpoints = finder
Expand All @@ -97,7 +100,7 @@ mod tests {
.await;
assert!(endpoints.len() == 1);
assert!(endpoints.first().unwrap().0 == "0x19804e50af1b72db4ce22a3c028e80c78d75af62");
assert!(endpoints.first().unwrap().1 == "http://0.0.0.0:5679");
assert!(endpoints.first().unwrap().1 == server_1);

// 3.3 find subfile_0 with sieved availability
let map = finder
Expand Down
1 change: 1 addition & 0 deletions subfile-exchange/tests/file_transfer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ mod tests {
"sheriff obscure trick beauty army fat wink legal flee leader section suit",
),
free_query_auth_token: Some("Bearer free-token".to_string()),
provider: String::from("https://arbitrum-sepolia.infura.io/v3/aaaaaaaaaaaaaaaaaaaa"),
..Default::default()
};

Expand Down

0 comments on commit 3610af7

Please sign in to comment.