Skip to content

Commit

Permalink
Event server from GRPC to HTTP
Browse files Browse the repository at this point in the history
  • Loading branch information
legendofa committed Feb 6, 2024
1 parent 8f7b1dc commit fd1e93e
Show file tree
Hide file tree
Showing 5 changed files with 137 additions and 188 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ async-graphql = { version = "6.0.11", features = ["bson", "chrono", "uuid", "log
async-graphql-axum = "6.0.11"
tokio = { version = "1.8", features = ["macros", "rt-multi-thread"] }
hyper = "1.0.1"
axum = { version = "0.6.0", features = ["headers"] }
axum = { version = "0.6.0", features = ["headers", "macros"] }
slab = "0.4.2"
mongodb = "2.8.0"
serde = "1.0.193"
Expand Down
4 changes: 2 additions & 2 deletions docker-compose-base.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ services:
"--app-id",
"review",
"--app-port",
"50051",
"8080",
"--app-protocol",
"grpc",
"http",
"--dapr-http-port",
"3500",
"-placement-host-address",
Expand Down
144 changes: 0 additions & 144 deletions src/app_callback_service.rs

This file was deleted.

110 changes: 110 additions & 0 deletions src/http_event_service.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
use axum::{debug_handler, extract::State, http::StatusCode, Json};
use bson::Uuid;
use log::info;
use mongodb::Collection;
use serde::{Deserialize, Serialize};

use crate::{product_variant::ProductVariant, user::User};

/// Data to send to Dapr in order to describe a subscription.
#[derive(Serialize)]
pub struct Pubsub {
#[serde(rename(serialize = "pubsubName"))]
pub pubsubname: String,
pub topic: String,
pub route: String,
}

/// Reponse data to send to Dapr when receiving an event.
#[derive(Serialize)]
pub struct TopicEventResponse {
pub status: i32,
}

/// Default status is `0` -> Ok, according to Dapr specs.
impl Default for TopicEventResponse {
fn default() -> Self {
Self { status: 0 }
}
}

/// Relevant part of Dapr event wrapped in a CloudEnvelope.
#[derive(Deserialize, Debug)]
pub struct Event {
pub topic: String,
pub data: EventData,
}

/// Relevant part of Dapr event.data.
#[derive(Deserialize, Debug)]
pub struct EventData {
pub id: Uuid,
}

/// Service state containing database connections.
#[derive(Clone)]
pub struct HttpEventServiceState {
pub product_variant_collection: Collection<ProductVariant>,
pub user_collection: Collection<User>,
}

/// HTTP endpoint to list topic subsciptions.
pub async fn list_topic_subscriptions() -> Result<Json<Vec<Pubsub>>, StatusCode> {
let pubsub_user = Pubsub {
pubsubname: "pubsub".to_string(),
topic: "user/user/created".to_string(),
route: "/on-topic-event".to_string(),
};
let pubsub_product_variant = Pubsub {
pubsubname: "pubsub".to_string(),
topic: "catalog/product-variant/created".to_string(),
route: "/on-topic-event".to_string(),
};
Ok(Json(vec![pubsub_user, pubsub_product_variant]))
}

/// HTTP endpoint to receive events.
#[debug_handler(state = HttpEventServiceState)]
pub async fn on_topic_event(
State(state): State<HttpEventServiceState>,
Json(event): Json<Event>,
) -> Result<Json<TopicEventResponse>, StatusCode> {
info!("{:?}", event);

match event.topic.as_str() {
"catalog/product-variant/created" => {
add_product_variant_to_mongodb(state.product_variant_collection, event.data.id).await?
}
"user/user/created" => add_user_to_mongodb(state.user_collection, event.data.id).await?,
_ => {
// TODO: This message can be used for further Error visibility.
let _message = format!(
"Event of topic: `{}` is not a handleable by this service.",
event.topic.as_str()
);
return Err(StatusCode::INTERNAL_SERVER_ERROR);
}
}
Ok(Json(TopicEventResponse::default()))
}

/// Add a newly created product variant to MongoDB.
pub async fn add_product_variant_to_mongodb(
collection: Collection<ProductVariant>,
id: Uuid,
) -> Result<(), StatusCode> {
let product_variant = ProductVariant { _id: id };
match collection.insert_one(product_variant, None).await {
Ok(_) => Ok(()),
Err(_) => Err(StatusCode::INTERNAL_SERVER_ERROR),
}
}

/// Add a newly created user to MongoDB.
pub async fn add_user_to_mongodb(collection: Collection<User>, id: Uuid) -> Result<(), StatusCode> {
let user = User { _id: id };
match collection.insert_one(user, None).await {
Ok(_) => Ok(()),
Err(_) => Err(StatusCode::INTERNAL_SERVER_ERROR),
}
}
65 changes: 24 additions & 41 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,16 @@ use async_graphql::{
use async_graphql_axum::GraphQL;
use axum::{
response::{self, IntoResponse},
routing::get,
routing::{get, post},
Router, Server,
};
use clap::{arg, command, Parser};
use http_event_service::{list_topic_subscriptions, on_topic_event, HttpEventServiceState};
use simple_logger::SimpleLogger;

use log::info;
use mongodb::{options::ClientOptions, Client, Database};

use dapr::dapr::dapr::proto::runtime::v1::app_callback_server::AppCallbackServer;
use tonic::transport::Server as TonicServer;

use review::Review;

mod review;
Expand All @@ -28,12 +26,11 @@ use query::Query;
mod mutation;
use mutation::Mutation;

mod app_callback_service;
use app_callback_service::AppCallbackService;

mod user;
use user::User;

mod http_event_service;

use product_variant::ProductVariant;
mod product_variant;

Expand Down Expand Up @@ -64,30 +61,23 @@ async fn db_connection() -> Client {
Client::with_options(client_options).unwrap()
}

/// Establishes connection to Dapr.
/// Returns Router that establishes connection to Dapr.
///
/// Adds AppCallbackService which defines pub/sub interaction with Dapr.
async fn dapr_connection(db_client: Database) {
let addr = "[::]:50051".parse().unwrap();
/// Adds endpoints to define pub/sub interaction with Dapr.
async fn build_dapr_router(db_client: Database) -> Router {
let product_variant_collection: mongodb::Collection<ProductVariant> =
db_client.collection::<ProductVariant>("product_variants");
let user_collection: mongodb::Collection<User> = db_client.collection::<User>("users");

let callback_service = AppCallbackService {
product_variant_collection,
user_collection,
};

//callback_service.add_user_to_mongodb(bson::Uuid::new()).await.unwrap();
//callback_service.add_product_variant_to_mongodb(bson::Uuid::new()).await.unwrap();

info!("AppCallback server listening on: {}", addr);
// Create a gRPC server with the callback_service.
TonicServer::builder()
.add_service(AppCallbackServer::new(callback_service))
.serve(addr)
.await
.unwrap();
// Define routes.
let app = Router::new()
.route("/dapr/subscribe", get(list_topic_subscriptions))
.route("/on-topic-event", post(on_topic_event))
.with_state(HttpEventServiceState {
product_variant_collection,
user_collection,
});
app
}

/// Command line argument to toggle schema generation instead of service execution.
Expand Down Expand Up @@ -129,20 +119,13 @@ async fn start_service() {
.enable_federation()
.finish();

let app = Router::new().route("/", get(graphiql).post_service(GraphQL::new(schema)));

let t1 = tokio::spawn(async {
info!("GraphiQL IDE: http://0.0.0.0:8080");
Server::bind(&"0.0.0.0:8080".parse().unwrap())
.serve(app.into_make_service())
.await
.unwrap();
});
let graphiql = Router::new().route("/", get(graphiql).post_service(GraphQL::new(schema)));
let dapr_router = build_dapr_router(db_client).await;
let app = Router::new().merge(graphiql).merge(dapr_router);

let t2 = tokio::spawn(async {
dapr_connection(db_client).await;
});

t1.await.unwrap();
t2.await.unwrap();
info!("GraphiQL IDE: http://0.0.0.0:8080");
Server::bind(&"0.0.0.0:8080".parse().unwrap())
.serve(app.into_make_service())
.await
.unwrap();
}

0 comments on commit fd1e93e

Please sign in to comment.