Skip to content

Commit

Permalink
fix: admin endpoint serves status and cost queries
Browse files Browse the repository at this point in the history
  • Loading branch information
hopeyen committed Apr 15, 2024
1 parent 18ae7ef commit b7b060f
Show file tree
Hide file tree
Showing 6 changed files with 354 additions and 118 deletions.
249 changes: 240 additions & 9 deletions file-service/src/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,15 @@ use axum::{extract::State, routing::get, Router, Server};
use http::HeaderMap;
use tokio::sync::Mutex;

use crate::file_server::{
cost::{GraphQlCostModel, PriceQuery},
status::{GraphQlBundle, GraphQlFileManifestMeta, StatusQuery},
util::graphql_playground,
FileServiceError, ServerContext,
};
use crate::file_server::{util::graphql_playground, FileServiceError, ServerContext};
use crate::graphql_types::{GraphQlBundle, GraphQlCostModel, GraphQlFileManifestMeta};
use file_exchange::{
errors::{Error, ServerError},
manifest::{
ipfs::IpfsClient,
manifest_fetcher::{fetch_file_manifest_from_ipfs, read_bundle},
store::Store,
FileManifestMeta, FileMetaInfo, LocalBundle,
Bundle, FileManifestMeta, FileMetaInfo, LocalBundle,
},
};

Expand Down Expand Up @@ -96,13 +92,15 @@ pub fn serve_admin(context: ServerContext) {
}
.into(),
);
tracing::info!(address = %context.state.config.server.admin_host_and_port, "Serve admin metrics");
let addr = context.state.config.server.admin_host_and_port;
tracing::info!(address = %addr
, "Serve admin metrics");

let router = Router::new()
.route("/admin", get(graphql_playground).post(graphql_handler))
.with_state(admin_context);

Server::bind(&context.state.config.server.admin_host_and_port)
Server::bind(&addr)
.serve(router.into_make_service())
.await
.expect("Failed to initialize admin server")
Expand Down Expand Up @@ -658,3 +656,236 @@ impl PriceMutation {
removed_prices
}
}

/* StatusQuery and CostQuer are repeated from the status/cost endpoints
** This is due to the difference in ServerContext
** It isn't possible to use generic types for these queries due to async-graphql
** But we can later restructure admin/server contexts with the arc bundles/files/cost */
#[derive(Default)]
pub struct StatusQuery;

#[Object]
impl StatusQuery {
/// Files inside some bundles
async fn bundled_files(
&self,
ctx: &Context<'_>,
deployments: Option<Vec<String>>,
) -> Result<Vec<GraphQlFileManifestMeta>, anyhow::Error> {
let bundles: Vec<Bundle> = ctx
.data_unchecked::<AdminContext>()
.state
.bundles
.lock()
.await
.values()
.map(|b| b.bundle.clone())
.collect();
let file_metas: Vec<FileManifestMeta> = bundles
.iter()
.flat_map(|b| b.file_manifests.clone())
.collect();

if deployments.is_none() {
return Ok(file_metas
.iter()
.map(|m| GraphQlFileManifestMeta::from(m.clone()))
.collect::<Vec<GraphQlFileManifestMeta>>());
};
let ids = deployments.unwrap();
Ok(file_metas
.iter()
.filter(|m| ids.contains(&m.meta_info.hash))
.cloned()
.map(GraphQlFileManifestMeta::from)
.collect())
}

/// A file inside some bundles
async fn bundled_file(
&self,
ctx: &Context<'_>,
deployment: String,
) -> Result<Option<GraphQlFileManifestMeta>, anyhow::Error> {
let bundles: Vec<Bundle> = ctx
.data_unchecked::<AdminContext>()
.state
.bundles
.lock()
.await
.values()
.map(|b| b.bundle.clone())
.collect();
let file_metas: Vec<FileManifestMeta> = bundles
.iter()
.flat_map(|b| b.file_manifests.clone())
.collect();
let manifest_graphql = file_metas
.iter()
.find(|m| m.meta_info.hash == deployment)
.cloned()
.map(GraphQlFileManifestMeta::from);

Ok(manifest_graphql)
}

/// Bundles, optional deployments filter
async fn bundles(
&self,
ctx: &Context<'_>,
deployments: Option<Vec<String>>,
) -> Result<Vec<GraphQlBundle>, anyhow::Error> {
tracing::trace!("received bundles request");
let all_bundles = &ctx
.data_unchecked::<AdminContext>()
.state
.bundles
.lock()
.await
.clone();

let bundles = if deployments.is_none() {
tracing::trace!(
bundles = tracing::field::debug(&all_bundles),
"no deployment filter"
);
all_bundles
.values()
.cloned()
.map(|b| GraphQlBundle::from(b.bundle))
.collect()
} else {
let ids = deployments.unwrap();
ids.iter()
.filter_map(|key| all_bundles.get(key))
.cloned()
.map(|b| GraphQlBundle::from(b.bundle))
.collect()
};
tracing::debug!(bundles = tracing::field::debug(&bundles), "queried bundles");
Ok(bundles)
}

/// A single bundle by deployment hash
async fn bundle(
&self,
ctx: &Context<'_>,
deployment: String,
) -> Result<Option<GraphQlBundle>, anyhow::Error> {
tracing::trace!("received bundle request");
let bundle: Option<Bundle> = ctx
.data_unchecked::<AdminContext>()
.state
.bundles
.lock()
.await
.get(&deployment)
.map(|b| b.bundle.clone());

Ok(bundle.map(GraphQlBundle::from))
}

/// Serving files with optional deployments filter
async fn files(
&self,
ctx: &Context<'_>,
deployments: Option<Vec<String>>,
) -> Result<Vec<GraphQlFileManifestMeta>, anyhow::Error> {
let file_metas: Vec<FileManifestMeta> = ctx
.data_unchecked::<AdminContext>()
.state
.files
.lock()
.await
.values()
.cloned()
.collect();

if deployments.is_none() {
return Ok(file_metas
.iter()
.map(|m| GraphQlFileManifestMeta::from(m.clone()))
.collect::<Vec<GraphQlFileManifestMeta>>());
};
let ids = deployments.unwrap();
Ok(file_metas
.iter()
.filter(|m| ids.contains(&m.meta_info.hash))
.cloned()
.map(GraphQlFileManifestMeta::from)
.collect())
}

/// A single file by deployment hash
async fn file(
&self,
ctx: &Context<'_>,
deployment: String,
) -> Result<Option<GraphQlFileManifestMeta>, anyhow::Error> {
let file_meta: Option<GraphQlFileManifestMeta> = ctx
.data_unchecked::<AdminContext>()
.state
.files
.lock()
.await
.values()
.find(|m| m.meta_info.hash == deployment)
.cloned()
.map(GraphQlFileManifestMeta::from);

Ok(file_meta)
}
}

#[derive(Default)]
pub struct PriceQuery;

#[Object]
impl PriceQuery {
/// Provide an array of cost model to the queried deployment whether it is served or not
async fn cost_models(
&self,
ctx: &Context<'_>,
deployments: Vec<String>,
) -> Result<Vec<GraphQlCostModel>, anyhow::Error> {
let mut cost_models = vec![];
for deployment in deployments {
let price: Option<f64> = ctx
.data_unchecked::<AdminContext>()
.state
.prices
.lock()
.await
.get(&deployment)
.cloned();
if let Some(p) = price {
cost_models.push(GraphQlCostModel {
deployment,
price_per_byte: p,
})
}
}
Ok(cost_models)
}

/// provide a cost model for a specific file/bundle served
async fn cost_model(
&self,
ctx: &Context<'_>,
deployment: String,
) -> Result<Option<GraphQlCostModel>, anyhow::Error> {
let model: Option<GraphQlCostModel> = ctx
.data_unchecked::<AdminContext>()
.state
.prices
.lock()
.await
.get(&deployment)
.cloned()
.map(|p| GraphQlCostModel {
deployment,
price_per_byte: p,
});
Ok(model)
}
}
11 changes: 2 additions & 9 deletions file-service/src/file_server/cost.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,9 @@
use async_graphql::{Context, EmptyMutation, EmptySubscription, Object, Schema, SimpleObject};
use async_graphql::{Context, EmptyMutation, EmptySubscription, Object, Schema};
use async_graphql_axum::{GraphQLRequest, GraphQLResponse};
use axum::extract::State;

use serde::{Deserialize, Serialize};

use crate::file_server::ServerContext;

#[derive(Clone, Debug, Serialize, Deserialize, SimpleObject)]
pub struct GraphQlCostModel {
pub deployment: String,
pub price_per_byte: f64,
}
use crate::graphql_types::GraphQlCostModel;

#[derive(Default)]
pub struct PriceQuery;
Expand Down
2 changes: 1 addition & 1 deletion file-service/src/file_server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ pub struct ServerState {
pub client: IpfsClient,
pub operator_public_key: String,
pub bundles: Arc<Mutex<HashMap<String, LocalBundle>>>, // Keyed by IPFS hash, valued by Bundle and Local path
pub files: Arc<Mutex<HashMap<String, FileManifestMeta>>>, // Keyed by IPFS hash, valued by Bundle and Local path
pub files: Arc<Mutex<HashMap<String, FileManifestMeta>>>, // Keyed by IPFS hash, valued by file and Local path
pub prices: Arc<Mutex<HashMap<String, f64>>>, // Keyed by IPFS hash, valued by price per byte
pub admin_auth_token: Option<String>, // Add bearer prefix
pub config: Config,
Expand Down
Loading

0 comments on commit b7b060f

Please sign in to comment.