diff --git a/Cargo.toml b/Cargo.toml index f233148..e911244 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,7 +10,7 @@ async-graphql = { version = "6.0.11", features = ["bson", "chrono", "uuid", "log async-graphql-axum = "6.0.11" tokio = { version = "1.8", features = ["macros", "rt-multi-thread"] } hyper = "1.0.1" -axum = { version = "0.6.0", features = ["headers"] } +axum = { version = "0.6.0", features = ["headers", "macros"] } slab = "0.4.2" mongodb = "2.8.0" serde = "1.0.193" diff --git a/docker-compose-base.yaml b/docker-compose-base.yaml index 9c661ed..f9ae2eb 100644 --- a/docker-compose-base.yaml +++ b/docker-compose-base.yaml @@ -32,9 +32,9 @@ services: "--app-id", "review", "--app-port", - "50051", + "8080", "--app-protocol", - "grpc", + "http", "--dapr-http-port", "3500", "-placement-host-address", diff --git a/src/app_callback_service.rs b/src/app_callback_service.rs deleted file mode 100644 index c11fb45..0000000 --- a/src/app_callback_service.rs +++ /dev/null @@ -1,144 +0,0 @@ -use json::JsonValue; -use log::info; -use mongodb::Collection; -use tonic::{Request, Response, Status}; - -use bson::Uuid; -use dapr::{appcallback::*, dapr::dapr::proto::runtime::v1::app_callback_server::AppCallback}; - -use crate::{product_variant::ProductVariant, user::User}; - -pub struct AppCallbackService { - pub product_variant_collection: Collection, - pub user_collection: Collection, -} - -impl AppCallbackService { - /// Add a newly created product variant to MongoDB. - pub async fn add_product_variant_to_mongodb(&self, id: Uuid) -> Result<(), Status> { - let product_variant = ProductVariant { _id: id }; - match self - .product_variant_collection - .insert_one(product_variant, None) - .await - { - Ok(_) => Ok(()), - Err(_) => Err(Status::internal( - "Adding product variant failed in MongoDB.", - )), - } - } - - /// Add a newly created user to MongoDB. - pub async fn add_user_to_mongodb(&self, id: Uuid) -> Result<(), Status> { - let user = User { _id: id }; - match self.user_collection.insert_one(user, None).await { - Ok(_) => Ok(()), - Err(_) => Err(Status::internal("Adding user failed in MongoDB.")), - } - } -} - -#[tonic::async_trait] -impl AppCallback for AppCallbackService { - /// Invokes service method with InvokeRequest. - async fn on_invoke( - &self, - _request: Request, - ) -> Result, Status> { - Ok(Response::new(InvokeResponse::default())) - } - - /// Lists all topics subscribed by this app. - /// - /// NOTE: Dapr runtime will call this method to get - /// the list of topics the app wants to subscribe to. - /// In this example, the app is subscribing to topic `A`. - async fn list_topic_subscriptions( - &self, - _request: Request<()>, - ) -> Result, Status> { - let product_variant_topic = "catalog/product-variant/created".to_string(); - let user_topic = "user/user/created".to_string(); - let pubsub_name = "pubsub".to_string(); - let user_topic_subscription = TopicSubscription::new(pubsub_name.clone(), user_topic, None); - - let mut list_subscriptions = - ListTopicSubscriptionsResponse::topic(pubsub_name, product_variant_topic); - list_subscriptions - .subscriptions - .push(user_topic_subscription); - - Ok(Response::new(list_subscriptions)) - } - - /// Subscribes events from Pubsub. - async fn on_topic_event( - &self, - request: Request, - ) -> Result, Status> { - let r: dapr::dapr::dapr::proto::runtime::v1::TopicEventRequest = request.into_inner(); - let data = &r.data; - - let message = String::from_utf8_lossy(data); - let error_message = format!("Expected message to be parsable JSON, got: {}", message); - let message_json = json::parse(&message).map_err(|_| Status::internal(error_message))?; - let id_json_value = &message_json["id"]; - let id = parse_id(id_json_value)?; - - info!("Event with message was received: {}", &message); - - match r.topic.as_str() { - "catalog/product-variant/created" => self.add_product_variant_to_mongodb(id).await?, - "user/user/created" => self.add_user_to_mongodb(id).await?, - _ => { - let message = format!( - "Event of topic: `{}` is not a handable by this service.", - r.topic.as_str() - ); - Err(Status::internal(message))?; - } - } - - Ok(Response::new(TopicEventResponse::default())) - } - - /// Lists all input bindings subscribed by this app. - async fn list_input_bindings( - &self, - _request: Request<()>, - ) -> Result, Status> { - Ok(Response::new(ListInputBindingsResponse::default())) - } - - /// Listens events from the input bindings. - async fn on_binding_event( - &self, - _request: Request, - ) -> Result, Status> { - Ok(Response::new(BindingEventResponse::default())) - } -} - -/// Parses Uuid from JsonValue containing a String. -fn parse_id(id_json_value: &JsonValue) -> Result { - match id_json_value { - json::JsonValue::String(id_string) => match Uuid::parse_str(id_string) { - Ok(id_uuid) => Ok(id_uuid), - Err(_) => { - let error_message = format!( - "String value in `id` field cannot be parsed as bson::Uuid, got: {}", - id_string - ); - Err(Status::internal(error_message))? - } - }, - _ => { - let error_message = format!( - "`id` field does not exist or does not contain a String value, got: {}", - id_json_value - ); - Err(Status::internal(error_message))? - } - } -} diff --git a/src/http_event_service.rs b/src/http_event_service.rs new file mode 100644 index 0000000..87ee539 --- /dev/null +++ b/src/http_event_service.rs @@ -0,0 +1,110 @@ +use axum::{debug_handler, extract::State, http::StatusCode, Json}; +use bson::Uuid; +use log::info; +use mongodb::Collection; +use serde::{Deserialize, Serialize}; + +use crate::{product_variant::ProductVariant, user::User}; + +/// Data to send to Dapr in order to describe a subscription. +#[derive(Serialize)] +pub struct Pubsub { + #[serde(rename(serialize = "pubsubName"))] + pub pubsubname: String, + pub topic: String, + pub route: String, +} + +/// Reponse data to send to Dapr when receiving an event. +#[derive(Serialize)] +pub struct TopicEventResponse { + pub status: i32, +} + +/// Default status is `0` -> Ok, according to Dapr specs. +impl Default for TopicEventResponse { + fn default() -> Self { + Self { status: 0 } + } +} + +/// Relevant part of Dapr event wrapped in a CloudEnvelope. +#[derive(Deserialize, Debug)] +pub struct Event { + pub topic: String, + pub data: EventData, +} + +/// Relevant part of Dapr event.data. +#[derive(Deserialize, Debug)] +pub struct EventData { + pub id: Uuid, +} + +/// Service state containing database connections. +#[derive(Clone)] +pub struct HttpEventServiceState { + pub product_variant_collection: Collection, + pub user_collection: Collection, +} + +/// HTTP endpoint to list topic subsciptions. +pub async fn list_topic_subscriptions() -> Result>, StatusCode> { + let pubsub_user = Pubsub { + pubsubname: "pubsub".to_string(), + topic: "user/user/created".to_string(), + route: "/on-topic-event".to_string(), + }; + let pubsub_product_variant = Pubsub { + pubsubname: "pubsub".to_string(), + topic: "catalog/product-variant/created".to_string(), + route: "/on-topic-event".to_string(), + }; + Ok(Json(vec![pubsub_user, pubsub_product_variant])) +} + +/// HTTP endpoint to receive events. +#[debug_handler(state = HttpEventServiceState)] +pub async fn on_topic_event( + State(state): State, + Json(event): Json, +) -> Result, StatusCode> { + info!("{:?}", event); + + match event.topic.as_str() { + "catalog/product-variant/created" => { + add_product_variant_to_mongodb(state.product_variant_collection, event.data.id).await? + } + "user/user/created" => add_user_to_mongodb(state.user_collection, event.data.id).await?, + _ => { + // TODO: This message can be used for further Error visibility. + let _message = format!( + "Event of topic: `{}` is not a handleable by this service.", + event.topic.as_str() + ); + return Err(StatusCode::INTERNAL_SERVER_ERROR); + } + } + Ok(Json(TopicEventResponse::default())) +} + +/// Add a newly created product variant to MongoDB. +pub async fn add_product_variant_to_mongodb( + collection: Collection, + id: Uuid, +) -> Result<(), StatusCode> { + let product_variant = ProductVariant { _id: id }; + match collection.insert_one(product_variant, None).await { + Ok(_) => Ok(()), + Err(_) => Err(StatusCode::INTERNAL_SERVER_ERROR), + } +} + +/// Add a newly created user to MongoDB. +pub async fn add_user_to_mongodb(collection: Collection, id: Uuid) -> Result<(), StatusCode> { + let user = User { _id: id }; + match collection.insert_one(user, None).await { + Ok(_) => Ok(()), + Err(_) => Err(StatusCode::INTERNAL_SERVER_ERROR), + } +} diff --git a/src/main.rs b/src/main.rs index e1667b8..bdd25fb 100644 --- a/src/main.rs +++ b/src/main.rs @@ -6,18 +6,16 @@ use async_graphql::{ use async_graphql_axum::GraphQL; use axum::{ response::{self, IntoResponse}, - routing::get, + routing::{get, post}, Router, Server, }; use clap::{arg, command, Parser}; +use http_event_service::{list_topic_subscriptions, on_topic_event, HttpEventServiceState}; use simple_logger::SimpleLogger; use log::info; use mongodb::{options::ClientOptions, Client, Database}; -use dapr::dapr::dapr::proto::runtime::v1::app_callback_server::AppCallbackServer; -use tonic::transport::Server as TonicServer; - use review::Review; mod review; @@ -28,12 +26,11 @@ use query::Query; mod mutation; use mutation::Mutation; -mod app_callback_service; -use app_callback_service::AppCallbackService; - mod user; use user::User; +mod http_event_service; + use product_variant::ProductVariant; mod product_variant; @@ -64,30 +61,23 @@ async fn db_connection() -> Client { Client::with_options(client_options).unwrap() } -/// Establishes connection to Dapr. +/// Returns Router that establishes connection to Dapr. /// -/// Adds AppCallbackService which defines pub/sub interaction with Dapr. -async fn dapr_connection(db_client: Database) { - let addr = "[::]:50051".parse().unwrap(); +/// Adds endpoints to define pub/sub interaction with Dapr. +async fn build_dapr_router(db_client: Database) -> Router { let product_variant_collection: mongodb::Collection = db_client.collection::("product_variants"); let user_collection: mongodb::Collection = db_client.collection::("users"); - let callback_service = AppCallbackService { - product_variant_collection, - user_collection, - }; - - //callback_service.add_user_to_mongodb(bson::Uuid::new()).await.unwrap(); - //callback_service.add_product_variant_to_mongodb(bson::Uuid::new()).await.unwrap(); - - info!("AppCallback server listening on: {}", addr); - // Create a gRPC server with the callback_service. - TonicServer::builder() - .add_service(AppCallbackServer::new(callback_service)) - .serve(addr) - .await - .unwrap(); + // Define routes. + let app = Router::new() + .route("/dapr/subscribe", get(list_topic_subscriptions)) + .route("/on-topic-event", post(on_topic_event)) + .with_state(HttpEventServiceState { + product_variant_collection, + user_collection, + }); + app } /// Command line argument to toggle schema generation instead of service execution. @@ -129,20 +119,13 @@ async fn start_service() { .enable_federation() .finish(); - let app = Router::new().route("/", get(graphiql).post_service(GraphQL::new(schema))); - - let t1 = tokio::spawn(async { - info!("GraphiQL IDE: http://0.0.0.0:8080"); - Server::bind(&"0.0.0.0:8080".parse().unwrap()) - .serve(app.into_make_service()) - .await - .unwrap(); - }); + let graphiql = Router::new().route("/", get(graphiql).post_service(GraphQL::new(schema))); + let dapr_router = build_dapr_router(db_client).await; + let app = Router::new().merge(graphiql).merge(dapr_router); - let t2 = tokio::spawn(async { - dapr_connection(db_client).await; - }); - - t1.await.unwrap(); - t2.await.unwrap(); + info!("GraphiQL IDE: http://0.0.0.0:8080"); + Server::bind(&"0.0.0.0:8080".parse().unwrap()) + .serve(app.into_make_service()) + .await + .unwrap(); }