diff --git a/src/app_callback_service.rs b/src/app_callback_service.rs index e9873c7..92eb2e8 100644 --- a/src/app_callback_service.rs +++ b/src/app_callback_service.rs @@ -6,28 +6,37 @@ use tonic::{Request, Response, Status}; use bson::Uuid; use dapr::{appcallback::*, dapr::dapr::proto::runtime::v1::app_callback_server::AppCallback}; -use crate::foreign_types::ProductVariant; +use crate::{foreign_types::ProductVariant, user::User}; pub struct AppCallbackService { - pub collection: Collection, + pub product_variant_collection: Collection, + pub user_collection: Collection, } impl AppCallbackService { /// Add a newly created product variant to MongoDB. - pub async fn add_product_variant_to_mongodb( - &self, - product_variant_id: Uuid, - ) -> Result<(), Status> { - let product_variant = ProductVariant { - _id: product_variant_id, - }; - match self.collection.insert_one(product_variant, None).await { + pub async fn add_product_variant_to_mongodb(&self, id: Uuid) -> Result<(), Status> { + let product_variant = ProductVariant { _id: id }; + match self + .product_variant_collection + .insert_one(product_variant, None) + .await + { Ok(_) => Ok(()), Err(_) => Err(Status::internal( "Adding product variant failed in MongoDB.", )), } } + + /// Add a newly created user to MongoDB. + pub async fn add_user_to_mongodb(&self, id: Uuid) -> Result<(), Status> { + let user = User { _id: id }; + match self.user_collection.insert_one(user, None).await { + Ok(_) => Ok(()), + Err(_) => Err(Status::internal("Adding user failed in MongoDB.")), + } + } } #[tonic::async_trait] @@ -49,10 +58,16 @@ impl AppCallback for AppCallbackService { &self, _request: Request<()>, ) -> Result, Status> { - let topic = "catalog/product-variant/created".to_string(); + let product_variant_topic = "catalog/product-variant/created".to_string(); + let user_topic = "user/user/created".to_string(); let pubsub_name = "pubsub".to_string(); + let user_topic_subscription = TopicSubscription::new(pubsub_name.clone(), user_topic, None); - let list_subscriptions = ListTopicSubscriptionsResponse::topic(pubsub_name, topic); + let mut list_subscriptions = + ListTopicSubscriptionsResponse::topic(pubsub_name, product_variant_topic); + list_subscriptions + .subscriptions + .push(user_topic_subscription); Ok(Response::new(list_subscriptions)) } @@ -62,19 +77,28 @@ impl AppCallback for AppCallbackService { &self, request: Request, ) -> Result, Status> { - let r = request.into_inner(); + let r: dapr::dapr::dapr::proto::runtime::v1::TopicEventRequest = request.into_inner(); let data = &r.data; let message = String::from_utf8_lossy(data); let error_message = format!("Expected message to be parsable JSON, got: {}", message); let message_json = json::parse(&message).map_err(|_| Status::internal(error_message))?; - let product_variant_id_json_value = &message_json["id"]; - let product_variant_id = parse_product_variant_id(product_variant_id_json_value)?; + let id_json_value = &message_json["id"]; + let id = parse_id(id_json_value)?; info!("Event with message was received: {}", &message); - self.add_product_variant_to_mongodb(product_variant_id) - .await?; + match r.topic.as_str() { + "catalog/product-variant/created" => self.add_product_variant_to_mongodb(id).await?, + "user/user/created" => self.add_user_to_mongodb(id).await?, + _ => { + let message = format!( + "Event of topic: `{}` is not a handable by this service.", + r.topic.as_str() + ); + Err(Status::internal(message))?; + } + } Ok(Response::new(TopicEventResponse::default())) } @@ -97,24 +121,22 @@ impl AppCallback for AppCallbackService { } /// Parses Uuid from JsonValue containing a String. -fn parse_product_variant_id(product_variant_id_json_value: &JsonValue) -> Result { - match product_variant_id_json_value { - json::JsonValue::String(product_variant_id_string) => { - match Uuid::parse_str(product_variant_id_string) { - Ok(product_variant_id_uuid) => Ok(product_variant_id_uuid), - Err(_) => { - let error_message = format!( - "String value in `id` field cannot be parsed as bson::Uuid, got: {}", - product_variant_id_string - ); - Err(Status::internal(error_message))? - } +fn parse_id(id_json_value: &JsonValue) -> Result { + match id_json_value { + json::JsonValue::String(id_string) => match Uuid::parse_str(id_string) { + Ok(id_uuid) => Ok(id_uuid), + Err(_) => { + let error_message = format!( + "String value in `id` field cannot be parsed as bson::Uuid, got: {}", + id_string + ); + Err(Status::internal(error_message))? } - } + }, _ => { let error_message = format!( "`id` field does not exist or does not contain a String value, got: {}", - product_variant_id_json_value + id_json_value ); Err(Status::internal(error_message))? } diff --git a/src/main.rs b/src/main.rs index f4d9778..0ad135c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -71,10 +71,18 @@ async fn db_connection() -> Client { /// Adds AppCallbackService which defines pub/sub interaction with Dapr. async fn dapr_connection(db_client: Database) { let addr = "[::]:50051".parse().unwrap(); - let collection: mongodb::Collection = + let product_variant_collection: mongodb::Collection = db_client.collection::("product_variants"); + let user_collection: mongodb::Collection = db_client.collection::("users"); - let callback_service = AppCallbackService { collection }; + let callback_service = AppCallbackService { + product_variant_collection, + user_collection, + }; + callback_service + .add_user_to_mongodb(Uuid::parse_str("cef15b33-b7f6-45ab-a697-cdf136ec5289").unwrap()) + .await + .unwrap(); info!("AppCallback server listening on: {}", addr); // Create a gRPC server with the callback_service. diff --git a/src/mutation.rs b/src/mutation.rs index 852fab4..9494fce 100644 --- a/src/mutation.rs +++ b/src/mutation.rs @@ -9,6 +9,7 @@ use mongodb::{ Collection, Database, }; +use crate::query::query_user; use crate::user::User; use crate::{ foreign_types::ProductVariant, @@ -32,10 +33,7 @@ impl Mutation { ) -> Result { let db_client = ctx.data_unchecked::(); let collection: Collection = db_client.collection::("wishlists"); - let product_variant_collection: Collection = - db_client.collection::("product_variants"); - validate_product_variant_ids(&product_variant_collection, &input.product_variant_ids) - .await?; + validate_input(db_client, &input).await?; let normalized_product_variants: HashSet = input .product_variant_ids .iter() @@ -169,9 +167,19 @@ async fn update_name( Ok(()) } +/// Checks if product variants and user in AddWishlistInput are in the system (MongoDB database populated with events). +async fn validate_input(db_client: &Database, input: &AddWishlistInput) -> Result<()> { + let product_variant_collection: Collection = + db_client.collection::("product_variants"); + let user_collection: Collection = db_client.collection::("users"); + validate_product_variant_ids(&product_variant_collection, &input.product_variant_ids).await?; + validate_user(&user_collection, input.user_id).await?; + Ok(()) +} + /// Checks if product variants are in the system (MongoDB database populated with events). /// -/// Used before adding or modifying product variants. +/// Used before adding or modifying product variants / wishlists. async fn validate_product_variant_ids( collection: &Collection, product_variant_ids: &HashSet, @@ -201,3 +209,10 @@ async fn validate_product_variant_ids( )), } } + +/// Checks if user is in the system (MongoDB database populated with events). +/// +/// Used before adding wishlists. +async fn validate_user(collection: &Collection, id: Uuid) -> Result<()> { + query_user(&collection, id).await.map(|_| ()) +} diff --git a/src/query.rs b/src/query.rs index ec04907..05ed33d 100644 --- a/src/query.rs +++ b/src/query.rs @@ -12,9 +12,12 @@ impl Query { /// Retrieve user with wishlists. async fn user<'a>( &self, + ctx: &Context<'a>, #[graphql(desc = "UUID of user to retrieve.")] id: Uuid, ) -> Result { - Ok(User { _id: id }) + let db_client = ctx.data_unchecked::(); + let collection: Collection = db_client.collection::("users"); + query_user(&collection, id).await } /// Retrieves wishlist of specific id. @@ -44,7 +47,7 @@ impl Query { /// Shared function to query a wishlist from a MongoDB collection of wishlists /// /// * `connection` - MongoDB database connection. -/// * `stringified_uuid` - UUID of wishlist as String. +/// * `id` - UUID of wishlist. pub async fn query_wishlist(collection: &Collection, id: Uuid) -> Result { match collection.find_one(doc! {"_id": id }, None).await { Ok(maybe_wishlist) => match maybe_wishlist { @@ -60,3 +63,23 @@ pub async fn query_wishlist(collection: &Collection, id: Uuid) -> Resu } } } + +/// Shared function to query a user from a MongoDB collection of users. +/// +/// * `connection` - MongoDB database connection. +/// * `id` - UUID of user. +pub async fn query_user(collection: &Collection, id: Uuid) -> Result { + match collection.find_one(doc! {"_id": id }, None).await { + Ok(maybe_user) => match maybe_user { + Some(user) => Ok(user), + None => { + let message = format!("User with UUID id: `{}` not found.", id); + Err(Error::new(message)) + } + }, + Err(_) => { + let message = format!("User with UUID id: `{}` not found.", id); + Err(Error::new(message)) + } + } +}