From 1691a7a1ee449c77b564e396839b30801f58c828 Mon Sep 17 00:00:00 2001 From: hopeyen Date: Fri, 19 Apr 2024 16:11:40 -0700 Subject: [PATCH] feat: server can publish files/bundles through admin --- file-exchange/src/manifest/store.rs | 1 - file-service/src/admin.rs | 179 ++++++++++++++++++++++++++++ 2 files changed, 179 insertions(+), 1 deletion(-) diff --git a/file-exchange/src/manifest/store.rs b/file-exchange/src/manifest/store.rs index 4412a5a..a72355d 100644 --- a/file-exchange/src/manifest/store.rs +++ b/file-exchange/src/manifest/store.rs @@ -479,7 +479,6 @@ mod tests { main_dir: main_directory.to_string(), })) .unwrap(); - let _local_path = Path::from(""); let local = LocalBundle { bundle: bundle.clone(), local_path: Path::from(""), diff --git a/file-service/src/admin.rs b/file-service/src/admin.rs index 2185b19..cba37db 100644 --- a/file-service/src/admin.rs +++ b/file-service/src/admin.rs @@ -4,6 +4,10 @@ use std::sync::Arc; use async_graphql::{Context, EmptySubscription, MergedObject, Object, Schema}; use async_graphql_axum::{GraphQLRequest, GraphQLResponse}; use axum::{extract::State, routing::get, Router, Server}; +use file_exchange::{ + config::{BundleArgs, PublisherArgs}, + publisher::ManifestPublisher, +}; use http::HeaderMap; use tokio::sync::Mutex; @@ -119,6 +123,94 @@ pub struct StatusMutation; #[Object] impl StatusMutation { + // Publish a bundle; the location of files are relative to the main directory of the server + #[allow(clippy::too_many_arguments)] + async fn publish_and_serve_bundle( + &self, + ctx: &Context<'_>, + filenames: Vec, + chunk_size: Option, + bundle_name: Option, + file_type: Option, + bundle_version: Option, + identifier: Option, + start_block: Option, + end_block: Option, + description: Option, + chain_id: Option, + ) -> Result { + if ctx.data_opt::() + != ctx + .data_unchecked::() + .state + .admin_auth_token + .as_ref() + { + return Err(ServerError::InvalidAuthentication(format!( + "Failed to authenticate: {:#?}", + ctx.data_opt::(), + ))); + } + // publish bundle + let client = ctx.data_unchecked::().state.client.clone(); + let publisher = ManifestPublisher::new( + client, + PublisherArgs { + chunk_size: chunk_size.unwrap_or(1048576), + filenames, + bundle: Some(BundleArgs { + bundle_name, + file_type, + bundle_version, + identifier, + start_block, + end_block, + description, + chain_id, + }), + storage_method: ctx + .data_unchecked::() + .state + .store + .storage_method + .clone(), + ..Default::default() + }, + ); + + let published = publisher + .publish() + .await + .map_err(|e| ServerError::ContextError(e.to_string()))?; + let deployment = published + .first() + .ok_or(ServerError::ContextError("No bundle published".to_string()))?; + + let bundle = match read_bundle( + &ctx.data_unchecked::().state.client, + deployment, + ) + .await + { + Ok(s) => s, + Err(e) => return Err(ServerError::RequestBodyError(e.to_string())), + }; + let local_bundle = LocalBundle { + bundle: bundle.clone(), + //TODO: remove this field + local_path: "".into(), + }; + + ctx.data_unchecked::() + .state + .bundles + .lock() + .await + .insert(bundle.ipfs_hash.clone(), local_bundle); + + Ok(GraphQlBundle::from(bundle)) + } + // Add a bundle async fn add_bundle( &self, @@ -419,6 +511,93 @@ impl StatusMutation { Ok(resolved_files.unwrap_or_default()) } + // Publish a bundle; the location of files are relative to the main directory of the server + async fn publish_and_serve_files( + &self, + ctx: &Context<'_>, + filenames: Vec, + chunk_size: Option, + ) -> Result, ServerError> { + if ctx.data_opt::() + != ctx + .data_unchecked::() + .state + .admin_auth_token + .as_ref() + { + return Err(ServerError::InvalidAuthentication(format!( + "Failed to authenticate: {:#?}", + ctx.data_opt::(), + ))); + } + // publish bundle + let client = ctx.data_unchecked::().state.client.clone(); + let file_ref = ctx.data_unchecked::().state.files.clone(); + let publisher = ManifestPublisher::new( + client.clone(), + PublisherArgs { + chunk_size: chunk_size.unwrap_or(1048576), + filenames: filenames.clone(), + storage_method: ctx + .data_unchecked::() + .state + .store + .storage_method + .clone(), + ..Default::default() + }, + ); + + let published = publisher + .publish() + .await + .map_err(|e| ServerError::ContextError(e.to_string()))?; + + let files = published + .iter() + .zip(filenames) + .map(|(deployment, file_name)| { + let client = client.clone(); + let file_ref = file_ref.clone(); + + async move { + tracing::debug!(deployment, file_name, "Adding file"); + + let file_manifest = fetch_file_manifest_from_ipfs(&client.clone(), deployment) + .await + .map_err(|e| ServerError::ContextError(e.to_string()))?; + + let meta = FileManifestMeta { + meta_info: FileMetaInfo { + name: file_name.clone(), + hash: deployment.clone(), + }, + file_manifest, + }; + ctx.data_unchecked::() + .state + .store + .read_and_validate_file(&meta, None) + .await + .map_err(|e| ServerError::ContextError(e.to_string()))?; + file_ref + .clone() + .lock() + .await + .insert(deployment.clone(), meta.clone()); + + Ok::<_, crate::admin::ServerError>(GraphQlFileManifestMeta::from(meta)) + } + }) + .collect::>(); + + // Since collect() gathers futures, we need to resolve them. You can use `try_join_all` for this. + let resolved_files: Result, _> = + futures::future::try_join_all(files).await; + + Ok(resolved_files.unwrap_or_default()) + } + async fn remove_file( &self, ctx: &Context<'_>,