Skip to content

Commit

Permalink
refactor: server better entry parsing and storage validation
Browse files Browse the repository at this point in the history
  • Loading branch information
hopeyen committed Apr 9, 2024
1 parent 16ce0da commit 1155f59
Show file tree
Hide file tree
Showing 8 changed files with 74 additions and 91 deletions.
4 changes: 2 additions & 2 deletions docs/server_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,9 @@ CLI with configuration file
cargo run -p file-service -- --config ./file-server/template.toml
```

(You might need to set an additional envrionmental variable for logs, `RUST_LOG=file-service=debug`)
(You might need to set an additional envrionmental variable for logs, `RUST_LOG=file_service=debug`)

3. Access services via the additional endpoints:
5. Access services via the additional endpoints:



Expand Down
4 changes: 1 addition & 3 deletions file-exchange/benches/read_and_validate_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use file_exchange::{
config::{LocalDirectory, StorageMethod},
manifest::store::Store,
};
use object_store::path::Path;

fn read_and_validate_file_benchmark(c: &mut Criterion) {
let store = black_box(
Expand All @@ -16,12 +15,11 @@ fn read_and_validate_file_benchmark(c: &mut Criterion) {
.unwrap(),
);
let bundle = black_box(simple_bundle());
let path = black_box(Path::from(""));

c.bench_function("read_and_validate_file", |b| {
let meta = black_box(bundle.file_manifests.first().unwrap());
b.to_async(FuturesExecutor)
.iter(|| store.read_and_validate_file(meta, &path))
.iter(|| store.read_and_validate_file(meta, None))
});
}

Expand Down
56 changes: 12 additions & 44 deletions file-exchange/src/manifest/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ pub fn validate_bundle_entries(entries: Vec<String>) -> Result<Vec<(String, Path
let mut results = Vec::new();

for entry in entries {
results.push(validate_bundle_entry(entry)?);
results.push(parse_bundle_entry(entry)?);
}

Ok(results)
Expand All @@ -125,14 +125,14 @@ pub fn validate_file_entries(entries: Vec<String>) -> Result<Vec<(String, Path)>
let mut results = Vec::new();

for entry in entries {
results.push(validate_file_entry(entry)?);
results.push(parse_file_entry(entry)?);
}

Ok(results)
}

/// Bundle entry must be in the format of "valid_ipfs_hash:valid_local_path"
pub fn validate_bundle_entry(entry: String) -> Result<(String, Path), Error> {
pub fn parse_bundle_entry(entry: String) -> Result<(String, Path), Error> {
let parts: Vec<&str> = entry.split(':').collect();
if parts.len() != 2 {
return Err(Error::InvalidConfig(format!(
Expand All @@ -143,11 +143,18 @@ pub fn validate_bundle_entry(entry: String) -> Result<(String, Path), Error> {

let ipfs_hash = parts[0];
let local_path = parts[1];
validate_bundle_and_location(ipfs_hash, local_path)
if !is_valid_ipfs_hash(ipfs_hash) {
return Err(Error::InvalidConfig(format!(
"Invalid IPFS hash: {}",
ipfs_hash
)));
}

Ok((ipfs_hash.to_string(), Path::from(local_path)))
}

/// Bundle entry must be in the format of "valid_ipfs_hash:valid_local_path"
pub fn validate_file_entry(entry: String) -> Result<(String, Path), Error> {
pub fn parse_file_entry(entry: String) -> Result<(String, Path), Error> {
let parts: Vec<&str> = entry.split(':').collect();
if parts.len() != 2 {
return Err(Error::InvalidConfig(format!(
Expand All @@ -158,50 +165,11 @@ pub fn validate_file_entry(entry: String) -> Result<(String, Path), Error> {

let ipfs_hash = parts[0];
let file_name = parts[1];
validate_file_and_location(ipfs_hash, file_name)
}

// Check for valid ipfs hash and path for a bundle
pub fn validate_bundle_and_location(
ipfs_hash: &str,
local_path: &str,
) -> Result<(String, Path), Error> {
if !is_valid_ipfs_hash(ipfs_hash) {
return Err(Error::InvalidConfig(format!(
"Invalid IPFS hash: {}",
ipfs_hash
)));
}

// Validate local path

Ok((ipfs_hash.to_string(), Path::from(local_path)))
}

// Check for valid ipfs hash and path for a file
pub fn validate_file_and_location(
ipfs_hash: &str,
file_name: &str,
) -> Result<(String, Path), Error> {
if !is_valid_ipfs_hash(ipfs_hash) {
return Err(Error::InvalidConfig(format!(
"Invalid IPFS hash: {}",
ipfs_hash
)));
}

// // Validate filename
// // TODO: consider better validation here: file should actually exist
// let full_path = Path::new(directory).join(file_path);
// fs::metadata(full_path).is_ok()

Ok((ipfs_hash.to_string(), Path::from(file_name)))
}

// use std::path::Path;
// use std::fs;

// fn validate_and_check_path(directory: &str, file_path: &str) -> bool {
// let full_path = Path::new(directory).join(file_path);
// fs::metadata(full_path).is_ok()
// }
9 changes: 4 additions & 5 deletions file-exchange/src/manifest/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ impl Store {

// Read all files in bundle to verify locally. This may cause a long initialization time
for file_meta in &local.bundle.file_manifests {
self.read_and_validate_file(file_meta, &local.local_path)
self.read_and_validate_file(file_meta, Some(&local.local_path))
.await?;
}

Expand All @@ -258,7 +258,7 @@ impl Store {
pub async fn read_and_validate_file(
&self,
file: &FileManifestMeta,
prefix: &Path,
prefix: Option<&Path>,
) -> Result<(), Error> {
// read file by file_manifest.file_name
let meta_info = &file.meta_info;
Expand Down Expand Up @@ -422,8 +422,7 @@ mod tests {
.unwrap();
let mut bundle = simple_bundle();
let file_meta = bundle.file_manifests.first().unwrap();
let path = Path::from("");
let res = store.read_and_validate_file(file_meta, &path).await;
let res = store.read_and_validate_file(file_meta, None).await;
assert!(res.is_ok());

// Add tests for failure cases
Expand All @@ -433,7 +432,7 @@ mod tests {
}
}
let file_meta = bundle.file_manifests.first().unwrap();
let res = store.read_and_validate_file(file_meta, &path).await;
let res = store.read_and_validate_file(file_meta, None).await;
assert!(res.is_err());
}

Expand Down
2 changes: 1 addition & 1 deletion file-exchange/tests/file_transfer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ mod tests {

let client = IpfsClient::new("https://ipfs.network.thegraph.com")
.expect("Could not create client to thegraph IPFS gateway");
let target_file = "QmeE38uPSqT5XuHfM8X2JZAYgDCEwmDyMYULmZaRnNqPCj".to_string();
let target_file = "QmeKabcCQBtgU6QjM3rp3w6pDHFW4r54ee89nGdhuyDuhi".to_string();
// 1. Setup server
let mut server_process = Command::new("cargo")
.arg("run")
Expand Down
1 change: 1 addition & 0 deletions file-exchange/tests/test0.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
[server]
initial_bundles = ["QmeaPp764FjQjPB66M9ijmQKmLhwBpHQhA7dEbH2FA1j3v:"]
initial_files = ["QmeKabcCQBtgU6QjM3rp3w6pDHFW4r54ee89nGdhuyDuhi:0017234500.dbin.zst"]
admin_auth_token = "slayyy"
admin_host_and_port = "0.0.0.0:5664"
default_price_per_byte = 1
Expand Down
1 change: 1 addition & 0 deletions file-exchange/tests/test1.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ initial_bundles = [
"QmeD3dRVV6Gs84TRwiNj3tLt9mBEMVqy3GoWm7WN8oDzGz:",
"QmTSwj1BGkkmVSnhw6uEGkcxGZvP5nq4pDhzHjwJvsQC2Z:"
]
initial_files = ["QmeKabcCQBtgU6QjM3rp3w6pDHFW4r54ee89nGdhuyDuhi:0017234500.dbin.zst"]
admin_auth_token = "kueen"
admin_host_and_port = "0.0.0.0:5665"
default_price_per_byte = 1
Expand Down
88 changes: 52 additions & 36 deletions file-service/src/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ use file_exchange::{
manifest::{
ipfs::IpfsClient,
manifest_fetcher::{fetch_file_manifest_from_ipfs, read_bundle},
validate_bundle_and_location, validate_file_and_location, FileManifestMeta, FileMetaInfo,
LocalBundle,
store::Store,
FileManifestMeta, FileMetaInfo, LocalBundle,
},
};

Expand All @@ -31,6 +31,7 @@ pub struct AdminState {
pub prices: Arc<Mutex<HashMap<String, f64>>>,
pub admin_auth_token: Option<String>,
pub admin_schema: AdminSchema,
pub store: Store,
}

#[derive(Clone)]
Expand Down Expand Up @@ -91,6 +92,7 @@ pub fn serve_admin(context: ServerContext) {
prices: context.state.prices.clone(),
admin_auth_token: context.state.admin_auth_token.clone(),
admin_schema: build_schema().await,
store: context.state.store.clone(),
}
.into(),
);
Expand Down Expand Up @@ -142,28 +144,32 @@ impl StatusMutation {
.as_ref()
)));
}
let (hash, loc) = match validate_bundle_and_location(&deployment, &location) {
let bundle = match read_bundle(
&ctx.data_unchecked::<AdminContext>().state.client,
&deployment,
)
.await
{
Ok(s) => s,
Err(e) => return Err(anyhow::anyhow!("Invalid input: {}", e.to_string())),
Err(e) => return Err(anyhow::anyhow!(e.to_string(),)),
};
let bundle =
match read_bundle(&ctx.data_unchecked::<AdminContext>().state.client, &hash).await {
Ok(s) => s,
Err(e) => return Err(anyhow::anyhow!(e.to_string(),)),
};
let local_bundle = LocalBundle {
bundle: bundle.clone(),
local_path: location.into(),
};
let _ = ctx
.data_unchecked::<AdminContext>()
.state
.store
.validate_local_bundle(&local_bundle)
.await;

ctx.data_unchecked::<AdminContext>()
.state
.bundles
.lock()
.await
.insert(
bundle.ipfs_hash.clone(),
LocalBundle {
bundle: bundle.clone(),
local_path: loc,
},
);
.insert(bundle.ipfs_hash.clone(), local_bundle);

Ok(GraphQlBundle::from(bundle))
}
Expand Down Expand Up @@ -196,20 +202,25 @@ impl StatusMutation {
async move {
tracing::debug!(deployment, location, "Adding bundle");

let (hash, loc) = validate_bundle_and_location(deployment, &location)
.map_err(|e| anyhow::anyhow!("Invalid input: {}", e))?;

let bundle = read_bundle(&client.clone(), &hash)
let bundle = read_bundle(&client.clone(), deployment)
.await
.map_err(|e| anyhow::anyhow!("{}", e))?;

bundle_ref.clone().lock().await.insert(
bundle.ipfs_hash.clone(),
LocalBundle {
bundle: bundle.clone(),
local_path: loc,
},
);
let local_bundle = LocalBundle {
bundle: bundle.clone(),
local_path: location.into(),
};
let _ = ctx
.data_unchecked::<AdminContext>()
.state
.store
.validate_local_bundle(&local_bundle)
.await;
bundle_ref
.clone()
.lock()
.await
.insert(bundle.ipfs_hash.clone(), local_bundle);

Ok::<_, anyhow::Error>(GraphQlBundle::from(bundle))
}
Expand Down Expand Up @@ -311,13 +322,9 @@ impl StatusMutation {
.as_ref()
)));
}
let (hash, _loc) = match validate_file_and_location(&deployment, &file_name) {
Ok(s) => s,
Err(e) => return Err(anyhow::anyhow!("Invalid input: {}", e.to_string())),
};
let file_manifest = match fetch_file_manifest_from_ipfs(
&ctx.data_unchecked::<AdminContext>().state.client,
&hash,
&deployment,
)
.await
{
Expand All @@ -332,6 +339,12 @@ impl StatusMutation {
},
file_manifest,
};
let _ = ctx
.data_unchecked::<AdminContext>()
.state
.store
.read_and_validate_file(&meta, None)
.await;
ctx.data_unchecked::<AdminContext>()
.state
.files
Expand Down Expand Up @@ -370,10 +383,7 @@ impl StatusMutation {
async move {
tracing::debug!(deployment, file_name, "Adding file");

let (hash, _loc) = validate_file_and_location(deployment, &file_name)
.map_err(|e| anyhow::anyhow!("Invalid input: {}", e))?;

let file_manifest = fetch_file_manifest_from_ipfs(&client.clone(), &hash)
let file_manifest = fetch_file_manifest_from_ipfs(&client.clone(), deployment)
.await
.map_err(|e| anyhow::anyhow!("{}", e))?;

Expand All @@ -384,6 +394,12 @@ impl StatusMutation {
},
file_manifest,
};
let _ = ctx
.data_unchecked::<AdminContext>()
.state
.store
.read_and_validate_file(&meta, None)
.await;
file_ref
.clone()
.lock()
Expand Down

0 comments on commit 1155f59

Please sign in to comment.