From 3610af721c7ae3e01f966ee487fff41b045fdc5c Mon Sep 17 00:00:00 2001 From: hopeyen Date: Wed, 20 Dec 2023 11:33:50 -0600 Subject: [PATCH] feat: object store for local file system - list objects, update e2e tests --- Cargo.lock | 75 ++++++++++++++++++ subfile-exchange/Cargo.toml | 1 + subfile-exchange/src/errors.rs | 4 +- .../src/subfile/local_file_system.rs | 77 +++++++++++++++++++ subfile-exchange/src/subfile/mod.rs | 1 + subfile-exchange/src/subfile_client/mod.rs | 5 +- subfile-exchange/tests/discovery.rs | 21 ++--- subfile-exchange/tests/file_transfer.rs | 1 + 8 files changed, 172 insertions(+), 13 deletions(-) create mode 100644 subfile-exchange/src/subfile/local_file_system.rs diff --git a/Cargo.lock b/Cargo.lock index 48d5580..168e89c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1195,6 +1195,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "doc-comment" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fea41bba32d969b513997752735605054bc0dfa92b4c56bf1189f2e174be7a10" + [[package]] name = "dotenv" version = "0.15.0" @@ -2041,6 +2047,12 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" +[[package]] +name = "humantime" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" + [[package]] name = "hyper" version = "0.14.27" @@ -2690,6 +2702,36 @@ dependencies = [ "memchr", ] +[[package]] +name = "object_store" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2524735495ea1268be33d200e1ee97455096a0846295a21548cd2f3541de7050" +dependencies = [ + "async-trait", + "base64 0.21.5", + "bytes", + "chrono", + "futures 0.3.29", + "humantime", + "hyper", + "itertools 0.11.0", + "parking_lot", + "percent-encoding", + "quick-xml", + "rand", + "reqwest", + "ring 0.17.7", + "rustls-pemfile", + "serde", + "serde_json", + "snafu", + "tokio", + "tracing", + "url", + "walkdir", +] + [[package]] name = "once_cell" version = "1.19.0" @@ -3180,6 +3222,16 @@ version = "1.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0" +[[package]] +name = "quick-xml" +version = "0.31.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1004a344b30a54e2ee58d66a71b32d2db2feb0a31f9a2d302bf0536f15de2a33" +dependencies = [ + "memchr", + "serde", +] + [[package]] name = "quote" version = "1.0.33" @@ -3925,6 +3977,28 @@ dependencies = [ "serde", ] +[[package]] +name = "snafu" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e4de37ad025c587a29e8f3f5605c00f70b98715ef90b9061a815b9e59e9042d6" +dependencies = [ + "doc-comment", + "snafu-derive", +] + +[[package]] +name = "snafu-derive" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "990079665f075b699031e9c08fd3ab99be5029b96f3b78dc0709e8f77e4efebf" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "socket2" version = "0.4.10" @@ -4074,6 +4148,7 @@ dependencies = [ "ipfs-api-backend-hyper", "ipfs-api-prelude", "merkle-cbt", + "object_store", "rand", "reqwest", "rustls", diff --git a/subfile-exchange/Cargo.toml b/subfile-exchange/Cargo.toml index ada7b78..5dedada 100644 --- a/subfile-exchange/Cargo.toml +++ b/subfile-exchange/Cargo.toml @@ -34,6 +34,7 @@ hyper = { version = "0.14.27", features = [ "server" ]} ipfs-api-backend-hyper = "0.6" ipfs-api-prelude = "0.6" merkle-cbt = "0.3.2" +object_store = {version = "0.8.0", features = [ "http", "aws", "gcp", "azure" ]} rand = "0.8.4" reqwest = { version = "0.11", features = ["json", "stream", "multipart"] } rustls = "0.21.8" diff --git a/subfile-exchange/src/errors.rs b/subfile-exchange/src/errors.rs index bf4c3c1..6f37644 100644 --- a/subfile-exchange/src/errors.rs +++ b/subfile-exchange/src/errors.rs @@ -15,6 +15,7 @@ pub enum Error { YamlError(serde_yaml::Error), InvalidPriceFormat(String), ContractError(String), + ObjectStoreError(String), } impl fmt::Display for Error { @@ -32,7 +33,8 @@ impl fmt::Display for Error { Error::JsonError(ref err) => write!(f, "JSON error: {}", err), Error::YamlError(ref err) => write!(f, "YAML error: {}", err), Error::InvalidPriceFormat(ref msg) => write!(f, "Price format error: {}", msg), - Error::ContractError(ref msg) => write!(f, "Price format error: {}", msg), + Error::ContractError(ref msg) => write!(f, "Contract call error: {}", msg), + Error::ObjectStoreError(ref msg) => write!(f, "Object store error: {}", msg), } } } diff --git a/subfile-exchange/src/subfile/local_file_system.rs b/subfile-exchange/src/subfile/local_file_system.rs new file mode 100644 index 0000000..bf289e4 --- /dev/null +++ b/subfile-exchange/src/subfile/local_file_system.rs @@ -0,0 +1,77 @@ +use futures::StreamExt; +use object_store::ObjectMeta; +use object_store::{path::Path, ObjectStore}; + +use object_store::local::LocalFileSystem; + +use std::fs; +use std::path::PathBuf; +use std::str::FromStr; +use std::sync::Arc; + +use crate::subfile::Error; + +pub struct Store { + local_file_system: Arc, +} + +impl Store { + pub fn new(path: &str) -> Result { + let path = PathBuf::from_str(path).map_err(|e| Error::InvalidConfig(e.to_string()))?; + if !path.exists() || !path.is_dir() { + fs::create_dir_all(&path).map_err(|e| { + Error::ObjectStoreError(format!( + "Unable to create local filesystem directory structure for object store: {:?}", + e.to_string() + )) + })? + } + // As long as the provided path is correct, the following should never panic + Ok(Store { + local_file_system: Arc::new( + LocalFileSystem::new_with_prefix(path) + .map_err(|e| Error::ObjectStoreError(e.to_string()))?, + ), + }) + } + + /// List out all files in the path, optionally filtered by a prefix to the filesystem + pub async fn list(&self, prefix: Option<&Path>) -> Result, Error> { + let mut list_stream = self.local_file_system.list(prefix); + + let mut objects = vec![]; + while let Ok(Some(meta)) = list_stream.next().await.transpose() { + tracing::trace!("File name: {}, size: {}", meta.location, meta.size); + objects.push(meta.clone()); + } + Ok(objects) + } +} + +#[cfg(test)] +mod tests { + use crate::{ + subfile::local_file_system::*, + test_util::{create_random_temp_file, CHUNK_SIZE}, + }; + + #[tokio::test] + async fn test_local_list() { + let file_size = CHUNK_SIZE * 25; + let (temp_file, temp_path) = create_random_temp_file(file_size as usize).unwrap(); + + let path = std::path::Path::new(&temp_path); + let readdir = path.parent().unwrap().to_str().unwrap(); + let file_name = path.file_name().unwrap().to_str().unwrap(); + + let object_store = Store::new(readdir).unwrap(); + let res = object_store.list(None).await.unwrap(); + let found_obj = res + .iter() + .find(|obj| obj.location.to_string() == file_name) + .unwrap(); + assert!(found_obj.size == file_size as usize); + + drop(temp_file); + } +} diff --git a/subfile-exchange/src/subfile/mod.rs b/subfile-exchange/src/subfile/mod.rs index 17e06cc..e611f66 100644 --- a/subfile-exchange/src/subfile/mod.rs +++ b/subfile-exchange/src/subfile/mod.rs @@ -1,6 +1,7 @@ pub mod file_hasher; pub mod file_reader; pub mod ipfs; +pub mod local_file_system; pub mod subfile_reader; use std::{ diff --git a/subfile-exchange/src/subfile_client/mod.rs b/subfile-exchange/src/subfile_client/mod.rs index 4371485..008a24c 100644 --- a/subfile-exchange/src/subfile_client/mod.rs +++ b/subfile-exchange/src/subfile_client/mod.rs @@ -67,9 +67,8 @@ impl SubfileDownloader { ) .await; - let transaction_manager = TransactionManager::new(&args.provider, wallet.clone()) - .await - .expect("Initiate Transaction manager"); + //TODO: Factor away from client, Transactions could be a separate entity + let transaction_manager = TransactionManager::new(&args.provider, wallet.clone()).await; tracing::info!( transaction_manager = tracing::field::debug(&transaction_manager), "transaction_manager" diff --git a/subfile-exchange/tests/discovery.rs b/subfile-exchange/tests/discovery.rs index 0d8bb4c..f339841 100644 --- a/subfile-exchange/tests/discovery.rs +++ b/subfile-exchange/tests/discovery.rs @@ -14,6 +14,9 @@ mod tests { std::env::set_var("RUST_LOG", "off,subfile_exchange=debug,file_transfer=trace"); subfile_exchange::config::init_tracing(String::from("pretty")).unwrap(); + let server_0 = "http://0.0.0.0:5677"; + let server_1 = "http://0.0.0.0:5679"; + let chunk_file_hash_a = "QmeKabcCQBtgU6QjM3rp3w6pDHFW4r54ee89nGdhuyDuhi".to_string(); let chunk_file_hash_b = "QmeE38uPSqT5XuHfM8X2JZAYgDCEwmDyMYULmZaRnNqPCj".to_string(); let chunk_file_hash_c = "QmWs8dkshZ7abxFYQ3h9ie1Em7SqzAkwtVJXaBapwEWqR9".to_string(); @@ -25,7 +28,7 @@ mod tests { let indexer_0: IndexerEndpoint = ( "0xead22a75679608952db6e85537fbfdca02dae9cb".to_string(), - "http://0.0.0.0:5678".to_string(), + server_0.to_string(), ); let indexer_1: IndexerEndpoint = ( "0x19804e50af1b72db4ce22a3c028e80c78d75af62".to_string(), @@ -36,8 +39,10 @@ mod tests { let mut server_process_0 = Command::new("cargo") .arg("run") .arg("-p") - .arg("subfile-exchange") - .arg("server") + .arg("subfile-service") + .arg("--") + .arg("--port") + .arg("5677") .arg("--mnemonic") .arg("sheriff obscure trick beauty army fat wink legal flee leader section suit") .arg("--subfiles") @@ -48,8 +53,8 @@ mod tests { let mut server_process_1 = Command::new("cargo") .arg("run") .arg("-p") - .arg("subfile-exchange") - .arg("server") + .arg("subfile-service") + .arg("--") .arg("--mnemonic") .arg("ice palace drill gadget biology glow tray equip heavy wolf toddler menu") .arg("--host") @@ -66,8 +71,6 @@ mod tests { tracing::debug!("Server initializing, wait 10 seconds..."); tokio::time::sleep(Duration::from_secs(10)).await; - let server_0 = "http://0.0.0.0:5678"; - let server_1 = "http://0.0.0.0:5679"; let _ = server_ready(server_0).await; let _ = server_ready(server_1).await; @@ -86,7 +89,7 @@ mod tests { .await; assert!(endpoints.len() == 1); assert!(endpoints.first().unwrap().0 == "0xead22a75679608952db6e85537fbfdca02dae9cb"); - assert!(endpoints.first().unwrap().1 == "http://0.0.0.0:5678"); + assert!(endpoints.first().unwrap().1 == server_0); // 3.2 find subfile_1 with server 0 and 1, get server 1 let endpoints = finder @@ -97,7 +100,7 @@ mod tests { .await; assert!(endpoints.len() == 1); assert!(endpoints.first().unwrap().0 == "0x19804e50af1b72db4ce22a3c028e80c78d75af62"); - assert!(endpoints.first().unwrap().1 == "http://0.0.0.0:5679"); + assert!(endpoints.first().unwrap().1 == server_1); // 3.3 find subfile_0 with sieved availability let map = finder diff --git a/subfile-exchange/tests/file_transfer.rs b/subfile-exchange/tests/file_transfer.rs index 821e59f..8beee13 100644 --- a/subfile-exchange/tests/file_transfer.rs +++ b/subfile-exchange/tests/file_transfer.rs @@ -51,6 +51,7 @@ mod tests { "sheriff obscure trick beauty army fat wink legal flee leader section suit", ), free_query_auth_token: Some("Bearer free-token".to_string()), + provider: String::from("https://arbitrum-sepolia.infura.io/v3/aaaaaaaaaaaaaaaaaaaa"), ..Default::default() };