Skip to content

Commit

Permalink
Add core plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
GamePad64 committed Nov 27, 2024
1 parent 322bba1 commit e850f40
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 13 deletions.
43 changes: 43 additions & 0 deletions notifico-core/src/engine/core.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
use crate::engine::{EnginePlugin, PipelineContext, StepOutput};
use crate::error::EngineError;
use crate::pipeline::runner::RecipientSelector;
use crate::step::SerializedStep;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use std::borrow::Cow;

#[derive(Serialize, Deserialize)]
#[serde(tag = "step")]
pub enum Step {
#[serde(rename = "core.set_recipient")]
SetRecipient { recipient: RecipientSelector },
}

pub const STEPS: &[&str] = &["core.set_recipient"];

struct CorePlugin {}

#[async_trait]
impl EnginePlugin for CorePlugin {
async fn execute_step(
&self,
context: &mut PipelineContext,
step: &SerializedStep,
) -> Result<StepOutput, EngineError> {
let step: Step = step.clone().convert_step()?;

match step {
Step::SetRecipient { recipient } => {
let recipient = match recipient {
RecipientSelector::Recipient(r) => r,
};
context.recipient = Some(recipient);
Ok(StepOutput::Continue)
}
}
}

fn steps(&self) -> Vec<Cow<'static, str>> {
STEPS.iter().map(|&s| s.into()).collect()
}
}
4 changes: 4 additions & 0 deletions notifico-core/src/engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ use tracing::instrument;
use utoipa::ToSchema;
use uuid::Uuid;

mod core;
mod plugin;

pub use plugin::{EnginePlugin, StepOutput};

#[derive(Debug, Default, Clone, Serialize, Deserialize, ToSchema)]
Expand All @@ -27,6 +29,8 @@ pub struct Message {

#[derive(Default, Debug, Serialize, Deserialize)]
pub struct PipelineContext {
pub step_number: usize,

pub project_id: Uuid,
pub event_id: Uuid,
pub notification_id: Uuid,
Expand Down
20 changes: 13 additions & 7 deletions notifico-core/src/pipeline/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@ use uuid::Uuid;
#[derive(Serialize, Deserialize, ToSchema, Debug)]
pub struct ProcessEventRequest {
#[serde(default = "Uuid::now_v7")]
pub(crate) id: Uuid,
pub id: Uuid,
#[serde(default = "Uuid::nil")]
pub(crate) project_id: Uuid,
pub(crate) event: String,
pub(crate) recipient: Option<RecipientSelector>,
pub(crate) context: EventContext,
pub project_id: Uuid,
pub event: String,
pub recipient: Option<RecipientSelector>,
pub context: EventContext,
}

#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
Expand Down Expand Up @@ -97,6 +97,8 @@ impl PipelineRunner {

join_handles.spawn(async move {
let context = PipelineContext {
step_number: 0,

project_id,
recipient,
event_name,
Expand Down Expand Up @@ -124,10 +126,14 @@ impl PipelineRunner {
pipeline: Pipeline,
mut context: PipelineContext,
) {
for step in pipeline.steps.iter() {
for (step_number, step) in pipeline.steps.iter().enumerate() {
if step_number < context.step_number {
continue;
}

let result = engine.execute_step(&mut context, step).await;
match result {
Ok(StepOutput::Continue) => continue,
Ok(StepOutput::Continue) => context.step_number += 1,
Ok(StepOutput::Interrupt) => break,
Err(err) => {
error!("Error executing step: {:?}", err);
Expand Down
7 changes: 4 additions & 3 deletions notifico-ingest/src/amqp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@ pub async fn run(amqp_url: Url, worker_addr: String, event_rx: Receiver<ProcessE
})
.await;

info!("Connected to AMQP broker: {amqp_url}.");

let mut connection = match connection {
Ok(conn) => conn,
Ok(conn) => {
info!("Connected to AMQP broker: {amqp_url}.");
conn
}
Err(err) => {
error!("Failed to connect to AMQP broker: {err:?}");
continue;
Expand Down
37 changes: 34 additions & 3 deletions notifico-ingest/src/http.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,26 @@
use axum::extract::Query;
use axum::http::StatusCode;
use axum::routing::post;
use axum::{Extension, Json, Router};
use flume::Sender;
use notifico_core::engine::EventContext;
use notifico_core::pipeline::runner::ProcessEventRequest;
use serde::Deserialize;
use std::net::SocketAddr;
use tokio::net::TcpListener;
use utoipa::OpenApi;
use utoipa_redoc::Redoc;
use utoipa_redoc::Servable;
use utoipa_swagger_ui::SwaggerUi;
use uuid::Uuid;

#[derive(Clone)]
pub(crate) struct HttpExtensions {
pub sender: Sender<ProcessEventRequest>,
}

#[derive(OpenApi)]
#[openapi(info(description = "Notifico Ingest API"), paths(event_send))]
#[openapi(info(description = "Notifico Ingest API"), paths(send, send_webhook))]
struct ApiDoc;

pub(crate) async fn start(serviceapi_bind: SocketAddr, ext: HttpExtensions) {
Expand All @@ -25,7 +29,8 @@ pub(crate) async fn start(serviceapi_bind: SocketAddr, ext: HttpExtensions) {

// Service API
let app = Router::new()
.route("/v1/send", post(event_send))
.route("/v1/send", post(send))
.route("/v1/send_webhook", post(send_webhook))
.layer(Extension(ext.sender));

let app =
Expand All @@ -36,11 +41,37 @@ pub(crate) async fn start(serviceapi_bind: SocketAddr, ext: HttpExtensions) {
}

#[utoipa::path(post, path = "/v1/send")]
async fn event_send(
async fn send(
Extension(sender): Extension<Sender<ProcessEventRequest>>,
Json(payload): Json<ProcessEventRequest>,
) -> StatusCode {
sender.send_async(payload).await.unwrap();

StatusCode::ACCEPTED
}

#[derive(Deserialize)]
struct WebhookParameters {
#[serde(default = "Uuid::nil")]
project_id: Uuid,
event: String,
}

#[utoipa::path(post, path = "/v1/send_webhook")]
async fn send_webhook(
Extension(sender): Extension<Sender<ProcessEventRequest>>,
parameters: Query<WebhookParameters>,
Json(context): Json<EventContext>,
) -> StatusCode {
let process_event_request = ProcessEventRequest {
id: Uuid::now_v7(),
project_id: parameters.project_id,
event: parameters.event.clone(),
recipient: None,
context,
};

sender.send_async(process_event_request).await.unwrap();

StatusCode::ACCEPTED
}

0 comments on commit e850f40

Please sign in to comment.