Skip to content

Commit

Permalink
feat: server cost config, route, and admin
Browse files Browse the repository at this point in the history
  • Loading branch information
hopeyen committed Dec 12, 2023
1 parent d889146 commit 5465c23
Show file tree
Hide file tree
Showing 4 changed files with 152 additions and 45 deletions.
9 changes: 9 additions & 0 deletions subfile-exchange/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,15 @@ pub struct ServerArgs {
help = "Mnemonic for the operator wallet"
)]
pub mnemonic: String,
//TODO: More complex price management
#[arg(
long,
value_name = "PRICE_PER_BYTE",
default_value = "1",
env = "PRICE_PER_BYTE",
help = "Price per byte; price do not currently have a unit, perhaps use DAI or GRT, refer to TAP"
)]
pub price_per_byte: f32,
}

#[derive(Clone, Debug, Args, Serialize, Deserialize, Default)]
Expand Down
2 changes: 2 additions & 0 deletions subfile-exchange/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ pub enum Error {
ServerError(ServerError),
JsonError(serde_json::Error),
YamlError(serde_yaml::Error),
InvalidPriceFormat(String),
}

impl fmt::Display for Error {
Expand All @@ -29,6 +30,7 @@ impl fmt::Display for Error {
Error::ServerError(ref err) => write!(f, "Server error: {}", err),
Error::JsonError(ref err) => write!(f, "JSON error: {}", err),
Error::YamlError(ref err) => write!(f, "YAML error: {}", err),
Error::InvalidPriceFormat(ref msg) => write!(f, "Price format error: {}", msg),
}
}
}
Expand Down
170 changes: 125 additions & 45 deletions subfile-exchange/src/subfile_server/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,49 +38,27 @@ pub async fn handle_admin_request(
});
}

let body_bytes = to_bytes(req.into_body()).await.map_err(|e| {
Error::ServerError(crate::errors::ServerError::RequestBodyError(e.to_string()))
})?;

let json: Value = serde_json::from_slice(&body_bytes).map_err(Error::JsonError)?;

let method = json.get("method").and_then(Value::as_str).ok_or_else(|| {
Error::ServerError(crate::errors::ServerError::MethodParseError(
"Method not found in request".to_string(),
))
})?;
let params = json.get("params");
let (method, params) = match parse_admin_request(req).await {
Ok(r) => r,
Err(e) => {
return Ok(create_error_response(
&e.to_string(),
StatusCode::BAD_REQUEST,
))
}
};

tracing::info!("Received valid/authorized subfiles management request");
tracing::debug!(
method = tracing::field::debug(&method),
params = tracing::field::debug(&params),
"Received valid/authorized subfiles management request"
);

match method {
match method.as_str() {
"get_subfiles" => get_subfiles(context).await,
"add_subfile" => {
add_subfile(
params
.ok_or_else(|| {
Error::ServerError(crate::errors::ServerError::ParamsParseError(
"Params not found in request".to_string(),
))
})?
.clone(),
context,
)
.await
}
"remove_subfile" => {
remove_subfile(
params
.ok_or_else(|| {
Error::ServerError(crate::errors::ServerError::ParamsParseError(
"Params not found in request".to_string(),
))
})?
.clone(),
context,
)
.await
}
"add_subfile" => add_subfile(params, context).await,
"remove_subfile" => remove_subfile(params, context).await,
"update_price_per_byte" => update_price_per_byte(params, context).await,
_ => Ok(hyper::Response::builder()
.status(hyper::StatusCode::METHOD_NOT_ALLOWED)
.body("Method not supported".into())
Expand All @@ -92,6 +70,22 @@ pub async fn handle_admin_request(
}
}

async fn parse_admin_request(req: Request<hyper::Body>) -> Result<(String, Option<Value>), Error> {
let body_bytes = to_bytes(req.into_body()).await.map_err(|e| {
Error::ServerError(crate::errors::ServerError::RequestBodyError(e.to_string()))
})?;

let json: Value = serde_json::from_slice(&body_bytes).map_err(Error::JsonError)?;

let method = json.get("method").and_then(Value::as_str).ok_or_else(|| {
Error::ServerError(crate::errors::ServerError::MethodParseError(
"Method not found in request".to_string(),
))
})?;
let params = json.get("params");

Ok((method.to_string(), params.cloned()))
}
//TODO: rich the details
/// Function to retrieve all subfiles and their details
async fn get_subfiles(context: &ServerContext) -> Result<Response<Body>, Error> {
Expand All @@ -104,7 +98,15 @@ async fn get_subfiles(context: &ServerContext) -> Result<Response<Body>, Error>
.collect::<Vec<_>>();
drop(server_state);

let body = serde_json::to_string(&subfiles_info).map_err(Error::JsonError)?;
let body = match serde_json::to_string(&subfiles_info).map_err(Error::JsonError) {
Ok(b) => b,
Err(e) => {
return Ok(create_error_response(
&e.to_string(),
StatusCode::BAD_REQUEST,
))
}
};
tracing::trace!("Built get_subfile response");

Ok(Response::builder()
Expand All @@ -114,7 +116,20 @@ async fn get_subfiles(context: &ServerContext) -> Result<Response<Body>, Error>
}

/// Add a subfile to the server state
async fn add_subfile(params: Value, context: &ServerContext) -> Result<Response<Body>, Error> {
async fn add_subfile(
params: Option<Value>,
context: &ServerContext,
) -> Result<Response<Body>, Error> {
let params = match params {
Some(p) => p,
None => {
return Ok(create_error_response(
"Missing params",
StatusCode::BAD_REQUEST,
))
}
};

let entries: Vec<String> = serde_json::from_value(params).map_err(Error::JsonError)?;

// Validate before adding to the server state
Expand All @@ -129,7 +144,15 @@ async fn add_subfile(params: Value, context: &ServerContext) -> Result<Response<
};
let mut server_state = context.lock().await;
for (ipfs_hash, local_path) in subfile_entries {
let subfile = read_subfile(&server_state.client, &ipfs_hash, local_path).await?;
let subfile = match read_subfile(&server_state.client, &ipfs_hash, local_path).await {
Ok(s) => s,
Err(e) => {
return Ok(create_error_response(
&e.to_string(),
StatusCode::BAD_REQUEST,
))
}
};
if let Err(e) = subfile.validate_local_subfile() {
return Ok(create_error_response(
&e.to_string(),
Expand All @@ -149,8 +172,28 @@ async fn add_subfile(params: Value, context: &ServerContext) -> Result<Response<
}

/// Remove a subfile from the server state
async fn remove_subfile(params: Value, context: &ServerContext) -> Result<Response<Body>, Error> {
let ipfs_hashes: Vec<String> = serde_json::from_value(params).map_err(Error::JsonError)?;
async fn remove_subfile(
params: Option<Value>,
context: &ServerContext,
) -> Result<Response<Body>, Error> {
let params = match params {
Some(p) => p,
None => {
return Ok(create_error_response(
"Missing params",
StatusCode::BAD_REQUEST,
))
}
};
let ipfs_hashes: Vec<String> = match serde_json::from_value(params).map_err(Error::JsonError) {
Ok(h) => h,
Err(e) => {
return Ok(create_error_response(
&e.to_string(),
StatusCode::BAD_REQUEST,
))
}
};

for ipfs_hash in &ipfs_hashes {
match !is_valid_ipfs_hash(ipfs_hash) {
Expand All @@ -177,3 +220,40 @@ async fn remove_subfile(params: Value, context: &ServerContext) -> Result<Respon
.body("Subfile(s) removed successfully".into())
.unwrap())
}

/// Update price per byte
async fn update_price_per_byte(
params: Option<Value>,
context: &ServerContext,
) -> Result<Response<Body>, Error> {
let params = match params {
Some(p) => p,
None => {
return Ok(create_error_response(
"Missing params",
StatusCode::BAD_REQUEST,
))
}
};

let new_price: f32 = match serde_json::from_value(params) {
Ok(p) => p,
Err(e) => {
return Ok(create_error_response(
&e.to_string(),
StatusCode::BAD_REQUEST,
))
}
};

// Access the server state
let mut server_state = context.lock().await;

// Remove the valid IPFS hashes from the server state's subfiles
server_state.price_per_byte = new_price;

Ok(Response::builder()
.status(StatusCode::OK)
.body("Price successfully updated".into())
.unwrap())
}
16 changes: 16 additions & 0 deletions subfile-exchange/src/subfile_server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ pub struct ServerState {
pub release: PackageVersion,
pub free_query_auth_token: Option<String>, // Add bearer prefix
pub admin_auth_token: Option<String>, // Add bearer prefix
pub price_per_byte: f32,
}

pub type ServerContext = Arc<Mutex<ServerState>>;
Expand Down Expand Up @@ -109,6 +110,7 @@ async fn initialize_subfile_server_context(
admin_auth_token,
operator_public_key: public_key(&config.mnemonic)
.expect("Failed to initiate with operator wallet"),
price_per_byte: config.price_per_byte,
};

// Fetch the file using IPFS client
Expand Down Expand Up @@ -140,6 +142,7 @@ pub async fn handle_request(
"/status" => status(&context).await,
"/health" => health().await,
"/version" => version(&context).await,
"/cost" => cost(&context).await,
"/admin" => handle_admin_request(req, &context).await,
//TODO: consider routing through file level IPFS
path if path.starts_with("/subfiles/id/") => file_service(path, &req, &context).await,
Expand Down Expand Up @@ -177,6 +180,19 @@ pub async fn version(context: &ServerContext) -> Result<Response<Body>, Error> {
})
}

/// Endpoint for cost to download per byte
pub async fn cost(context: &ServerContext) -> Result<Response<Body>, Error> {
let price = context.lock().await.price_per_byte.to_string();
Response::builder()
.status(StatusCode::OK)
.body(Body::from(price))
.map_err(|e| {
Error::ServerError(crate::errors::ServerError::BuildResponseError(
e.to_string(),
))
})
}

/// Endpoint for status availability
pub async fn status(context: &ServerContext) -> Result<Response<Body>, Error> {
let subfile_mapping = context.lock().await.subfiles.clone();
Expand Down

0 comments on commit 5465c23

Please sign in to comment.