Skip to content

Commit

Permalink
feat: server can publish files/bundles through admin
Browse files Browse the repository at this point in the history
  • Loading branch information
hopeyen committed Apr 19, 2024
1 parent 0ba4782 commit 842f044
Show file tree
Hide file tree
Showing 2 changed files with 179 additions and 1 deletion.
1 change: 0 additions & 1 deletion file-exchange/src/manifest/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,6 @@ mod tests {
main_dir: main_directory.to_string(),
}))
.unwrap();
let _local_path = Path::from("");
let local = LocalBundle {
bundle: bundle.clone(),
local_path: Path::from(""),
Expand Down
179 changes: 179 additions & 0 deletions file-service/src/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ use std::sync::Arc;
use async_graphql::{Context, EmptySubscription, MergedObject, Object, Schema};
use async_graphql_axum::{GraphQLRequest, GraphQLResponse};
use axum::{extract::State, routing::get, Router, Server};
use file_exchange::{
config::{BundleArgs, PublisherArgs},
publisher::ManifestPublisher,
};
use http::HeaderMap;
use tokio::sync::Mutex;

Expand Down Expand Up @@ -119,6 +123,94 @@ pub struct StatusMutation;

#[Object]
impl StatusMutation {
// Publish a bundle; the location of files are relative to the main directory of the server
#[allow(clippy::too_many_arguments)]
async fn publish_and_serve_bundle(
&self,
ctx: &Context<'_>,
filenames: Vec<String>,
chunk_size: Option<u64>,
bundle_name: Option<String>,
file_type: Option<String>,
bundle_version: Option<String>,
identifier: Option<String>,
start_block: Option<u64>,
end_block: Option<u64>,
description: Option<String>,
chain_id: Option<String>,
) -> Result<GraphQlBundle, ServerError> {
if ctx.data_opt::<String>()
!= ctx
.data_unchecked::<AdminContext>()
.state
.admin_auth_token
.as_ref()
{
return Err(ServerError::InvalidAuthentication(format!(
"Failed to authenticate: {:#?}",
ctx.data_opt::<String>(),
)));
}
// publish bundle
let client = ctx.data_unchecked::<AdminContext>().state.client.clone();
let publisher = ManifestPublisher::new(
client,
PublisherArgs {
chunk_size: chunk_size.unwrap_or(1048576),
filenames,
bundle: Some(BundleArgs {
bundle_name,
file_type,
bundle_version,
identifier,
start_block,
end_block,
description,
chain_id,
}),
storage_method: ctx
.data_unchecked::<AdminContext>()
.state
.store
.storage_method
.clone(),
..Default::default()
},
);

let published = publisher
.publish()
.await
.map_err(|e| ServerError::ContextError(e.to_string()))?;
let deployment = published
.first()
.ok_or(ServerError::ContextError("No bundle published".to_string()))?;

let bundle = match read_bundle(
&ctx.data_unchecked::<AdminContext>().state.client,
deployment,
)
.await
{
Ok(s) => s,
Err(e) => return Err(ServerError::RequestBodyError(e.to_string())),
};
let local_bundle = LocalBundle {
bundle: bundle.clone(),
//TODO: remove this field
local_path: "".into(),
};

ctx.data_unchecked::<AdminContext>()
.state
.bundles
.lock()
.await
.insert(bundle.ipfs_hash.clone(), local_bundle);

Ok(GraphQlBundle::from(bundle))
}

// Add a bundle
async fn add_bundle(
&self,
Expand Down Expand Up @@ -419,6 +511,93 @@ impl StatusMutation {
Ok(resolved_files.unwrap_or_default())
}

// Publish a bundle; the location of files are relative to the main directory of the server
async fn publish_and_serve_files(
&self,
ctx: &Context<'_>,
filenames: Vec<String>,
chunk_size: Option<u64>,
) -> Result<Vec<GraphQlFileManifestMeta>, ServerError> {
if ctx.data_opt::<String>()
!= ctx
.data_unchecked::<AdminContext>()
.state
.admin_auth_token
.as_ref()
{
return Err(ServerError::InvalidAuthentication(format!(
"Failed to authenticate: {:#?}",
ctx.data_opt::<String>(),
)));
}
// publish bundle
let client = ctx.data_unchecked::<AdminContext>().state.client.clone();
let file_ref = ctx.data_unchecked::<AdminContext>().state.files.clone();
let publisher = ManifestPublisher::new(
client.clone(),
PublisherArgs {
chunk_size: chunk_size.unwrap_or(1048576),
filenames: filenames.clone(),
storage_method: ctx
.data_unchecked::<AdminContext>()
.state
.store
.storage_method
.clone(),
..Default::default()
},
);

let published = publisher
.publish()
.await
.map_err(|e| ServerError::ContextError(e.to_string()))?;

let files = published
.iter()
.zip(filenames)
.map(|(deployment, file_name)| {
let client = client.clone();
let file_ref = file_ref.clone();

async move {
tracing::debug!(deployment, file_name, "Adding file");

let file_manifest = fetch_file_manifest_from_ipfs(&client.clone(), deployment)
.await
.map_err(|e| ServerError::ContextError(e.to_string()))?;

let meta = FileManifestMeta {
meta_info: FileMetaInfo {
name: file_name.clone(),
hash: deployment.clone(),
},
file_manifest,
};
ctx.data_unchecked::<AdminContext>()
.state
.store
.read_and_validate_file(&meta, None)
.await
.map_err(|e| ServerError::ContextError(e.to_string()))?;
file_ref
.clone()
.lock()
.await
.insert(deployment.clone(), meta.clone());

Ok::<_, crate::admin::ServerError>(GraphQlFileManifestMeta::from(meta))
}
})
.collect::<Vec<_>>();

// Since collect() gathers futures, we need to resolve them. You can use `try_join_all` for this.
let resolved_files: Result<Vec<GraphQlFileManifestMeta>, _> =
futures::future::try_join_all(files).await;

Ok(resolved_files.unwrap_or_default())
}

async fn remove_file(
&self,
ctx: &Context<'_>,
Expand Down

0 comments on commit 842f044

Please sign in to comment.