From 5e67fd436b9e4ec620c9a62faea38b8d0d1b607c Mon Sep 17 00:00:00 2001 From: hopeyen Date: Wed, 27 Mar 2024 16:09:34 -0700 Subject: [PATCH] feat: individual price mgmt, merge status/price query/mutation --- file-service/src/admin.rs | 187 ++++++++++++++++++++++++++- file-service/src/file_server/cost.rs | 8 +- file-service/src/file_server/mod.rs | 4 +- 3 files changed, 188 insertions(+), 11 deletions(-) diff --git a/file-service/src/admin.rs b/file-service/src/admin.rs index c70987d..60825cf 100644 --- a/file-service/src/admin.rs +++ b/file-service/src/admin.rs @@ -1,15 +1,18 @@ use std::collections::HashMap; use std::sync::Arc; -use async_graphql::{Context, EmptySubscription, Object, Schema}; +use async_graphql::{Context, EmptySubscription, MergedObject, Object, Schema}; use async_graphql_axum::{GraphQLRequest, GraphQLResponse}; use axum::{extract::State, routing::get, Router, Server}; use http::HeaderMap; use tokio::sync::Mutex; -use crate::file_server::status::{GraphQlBundle, StatusQuery}; -use crate::file_server::util::graphql_playground; -use crate::file_server::{FileServiceError, ServerContext}; +use crate::file_server::{ + cost::{GraphQlCostModel, PriceQuery}, + status::{GraphQlBundle, StatusQuery}, + util::graphql_playground, + FileServiceError, ServerContext, +}; use file_exchange::{ errors::{Error, ServerError}, manifest::{ @@ -21,6 +24,7 @@ use file_exchange::{ pub struct AdminState { pub client: IpfsClient, pub bundles: Arc>>, + pub prices: Arc>>, pub admin_auth_token: Option, pub admin_schema: AdminSchema, } @@ -36,10 +40,21 @@ impl AdminContext { } } -pub type AdminSchema = Schema; +#[derive(MergedObject, Default)] +pub struct MergedQuery(StatusQuery, PriceQuery); + +#[derive(MergedObject, Default)] +pub struct MergedMutation(StatusMutation, PriceMutation); + +pub type AdminSchema = Schema; pub async fn build_schema() -> AdminSchema { - Schema::build(StatusQuery, StatusMutation, EmptySubscription).finish() + Schema::build( + MergedQuery(StatusQuery, PriceQuery), + MergedMutation(StatusMutation, PriceMutation), + EmptySubscription, + ) + .finish() } fn get_token_from_headers(headers: &HeaderMap) -> Option { @@ -68,6 +83,7 @@ pub fn serve_admin(context: ServerContext) { AdminState { client: context.state.client.clone(), bundles: context.state.bundles.clone(), + prices: context.state.prices.clone(), admin_auth_token: context.state.admin_auth_token.clone(), admin_schema: build_schema().await, } @@ -267,3 +283,162 @@ impl StatusMutation { removed_bundles } } + +#[derive(Default)] +pub struct PriceMutation; + +#[Object] +impl PriceMutation { + // Set price for a deployment + async fn set_price( + &self, + ctx: &Context<'_>, + deployment: String, + price_per_byte: f64, + ) -> Result { + if ctx.data_opt::() + != ctx + .data_unchecked::() + .state + .admin_auth_token + .as_ref() + { + return Err(anyhow::anyhow!(format!( + "Failed to authenticate: {:#?} (admin: {:#?}", + ctx.data_opt::(), + ctx.data_unchecked::() + .state + .admin_auth_token + .as_ref() + ))); + } + + ctx.data_unchecked::() + .state + .prices + .lock() + .await + .insert(deployment.clone(), price_per_byte); + + Ok(GraphQlCostModel { + deployment, + price_per_byte, + }) + } + + // Add multiple bundles + async fn set_prices( + &self, + ctx: &Context<'_>, + deployments: Vec, + prices: Vec, + ) -> Result, anyhow::Error> { + if ctx.data_opt::() + != ctx + .data_unchecked::() + .state + .admin_auth_token + .as_ref() + { + return Err(anyhow::anyhow!("Failed to authenticate")); + } + let price_ref = ctx.data_unchecked::().state.prices.clone(); + let prices = deployments + .iter() + .zip(prices) + .map(|(deployment, price)| { + let price_ref = price_ref.clone(); + + async move { + price_ref + .clone() + .lock() + .await + .insert(deployment.clone(), price); + + Ok::<_, anyhow::Error>(GraphQlCostModel { + deployment: deployment.to_string(), + price_per_byte: price, + }) + } + }) + .collect::>(); + + // Since collect() gathers futures, we need to resolve them. You can use `try_join_all` for this. + let resolved_prices: Result, _> = + futures::future::try_join_all(prices).await; + + Ok(resolved_prices.unwrap_or_default()) + } + + async fn remove_price( + &self, + ctx: &Context<'_>, + deployment: String, + ) -> Result, anyhow::Error> { + if ctx.data_opt::() + != ctx + .data_unchecked::() + .state + .admin_auth_token + .as_ref() + { + return Err(anyhow::anyhow!("Failed to authenticate")); + } + + let bundle = ctx + .data_unchecked::() + .state + .prices + .lock() + .await + .remove(&deployment) + .map(|price| GraphQlCostModel { + deployment, + price_per_byte: price, + }); + + Ok(bundle) + } + + async fn remove_prices( + &self, + ctx: &Context<'_>, + deployments: Vec, + ) -> Result, anyhow::Error> { + if ctx.data_opt::() + != ctx + .data_unchecked::() + .state + .admin_auth_token + .as_ref() + { + return Err(anyhow::anyhow!("Failed to authenticate")); + } + + let prices = deployments + .iter() + .map(|deployment| async move { + ctx.data_unchecked::() + .state + .prices + .lock() + .await + .remove(deployment) + .map(|price| GraphQlCostModel { + deployment: deployment.to_string(), + price_per_byte: price, + }) + .ok_or(anyhow::anyhow!(format!( + "Deployment not found: {}", + deployment + ))) + }) + .collect::>(); + + let removed_prices: Result, _> = + futures::future::try_join_all(prices).await; + + removed_prices + } +} diff --git a/file-service/src/file_server/cost.rs b/file-service/src/file_server/cost.rs index 641d359..a86d426 100644 --- a/file-service/src/file_server/cost.rs +++ b/file-service/src/file_server/cost.rs @@ -13,10 +13,10 @@ pub struct GraphQlCostModel { } #[derive(Default)] -pub struct Query; +pub struct PriceQuery; #[Object] -impl Query { +impl PriceQuery { /// Provide an array of cost model to the queried deployment whether it is served or not async fn cost_models( &self, @@ -70,10 +70,10 @@ impl Query { } } -pub type CostSchema = Schema; +pub type CostSchema = Schema; pub async fn build_schema() -> CostSchema { - Schema::build(Query, EmptyMutation, EmptySubscription).finish() + Schema::build(PriceQuery, EmptyMutation, EmptySubscription).finish() } pub async fn cost(State(context): State, req: GraphQLRequest) -> GraphQLResponse { diff --git a/file-service/src/file_server/mod.rs b/file-service/src/file_server/mod.rs index d43adfc..0347b67 100644 --- a/file-service/src/file_server/mod.rs +++ b/file-service/src/file_server/mod.rs @@ -36,7 +36,8 @@ pub struct ServerState { pub client: IpfsClient, pub operator_public_key: String, pub bundles: Arc>>, // Keyed by IPFS hash, valued by Bundle and Local path - pub admin_auth_token: Option, // Add bearer prefix + pub prices: Arc>>, // Keyed by IPFS hash, valued by price per byte + pub admin_auth_token: Option, // Add bearer prefix pub config: Config, pub database: PgPool, pub cost_schema: crate::file_server::cost::CostSchema, @@ -116,6 +117,7 @@ pub async fn initialize_server_context(config: Config) -> Result