Skip to content

Commit

Permalink
Merge branch 'main' into atlantic_refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
Mohiiit authored Nov 12, 2024
2 parents a20ca26 + a9d0e30 commit f296a30
Show file tree
Hide file tree
Showing 17 changed files with 436 additions and 10 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).
## Added

- tests: http_client tests added
- setup functions added for cloud and db
- panic handling in process job
- upgrade ETH L1 bridge for withdrawals to work
- added makefile and submodules
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/orchestrator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ aws-config = { workspace = true, features = ["behavior-version-latest"] }
aws-credential-types = { workspace = true, features = [
"hardcoded-credentials",
] }
aws-sdk-eventbridge.workspace = true
aws-sdk-s3 = { workspace = true, features = ["behavior-version-latest"] }
aws-sdk-sns = { workspace = true, features = ["behavior-version-latest"] }
aws-sdk-sqs = { workspace = true }
Expand Down
7 changes: 7 additions & 0 deletions crates/orchestrator/src/alerts/aws_sns/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,11 @@ impl Alerts for AWSSNS {
self.client.publish().topic_arn(self.topic_arn.clone()).message(message_body).send().await?;
Ok(())
}

async fn create_alert(&self, topic_name: &str) -> color_eyre::Result<()> {
let response = self.client.create_topic().name(topic_name).send().await?;
let topic_arn = response.topic_arn().expect("Topic Not found");
log::info!("SNS topic created. Topic ARN: {}", topic_arn);
Ok(())
}
}
7 changes: 7 additions & 0 deletions crates/orchestrator/src/alerts/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use async_trait::async_trait;
use mockall::automock;
use utils::settings::Settings;

pub mod aws_sns;

Expand All @@ -8,4 +9,10 @@ pub mod aws_sns;
pub trait Alerts: Send + Sync {
/// To send an alert message to our alert service
async fn send_alert_message(&self, message_body: String) -> color_eyre::Result<()>;
async fn create_alert(&self, topic_name: &str) -> color_eyre::Result<()>;
async fn setup(&self, settings_provider: Box<dyn Settings>) -> color_eyre::Result<()> {
let sns_topic_name = settings_provider.get_settings_or_panic("ALERT_TOPIC_NAME");
self.create_alert(&sns_topic_name).await?;
Ok(())
}
}
111 changes: 111 additions & 0 deletions crates/orchestrator/src/cron/event_bridge.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
use std::time::Duration;

use async_trait::async_trait;
use aws_sdk_eventbridge::types::{InputTransformer, RuleState, Target};
use aws_sdk_sqs::types::QueueAttributeName;

use crate::cron::Cron;
use crate::setup::SetupConfig;

pub struct AWSEventBridge {}

#[async_trait]
#[allow(unreachable_patterns)]
impl Cron for AWSEventBridge {
async fn create_cron(
&self,
config: &SetupConfig,
cron_time: Duration,
trigger_rule_name: String,
) -> color_eyre::Result<()> {
let config = match config {
SetupConfig::AWS(config) => config,
_ => panic!("Unsupported Event Bridge configuration"),
};
let event_bridge_client = aws_sdk_eventbridge::Client::new(config);
event_bridge_client
.put_rule()
.name(&trigger_rule_name)
.schedule_expression(duration_to_rate_string(cron_time))
.state(RuleState::Enabled)
.send()
.await?;

Ok(())
}
async fn add_cron_target_queue(
&self,
config: &SetupConfig,
target_queue_name: String,
message: String,
trigger_rule_name: String,
) -> color_eyre::Result<()> {
let config = match config {
SetupConfig::AWS(config) => config,
_ => panic!("Unsupported Event Bridge configuration"),
};
let event_bridge_client = aws_sdk_eventbridge::Client::new(config);
let sqs_client = aws_sdk_sqs::Client::new(config);
let queue_url = sqs_client.get_queue_url().queue_name(target_queue_name).send().await?;

let queue_attributes = sqs_client
.get_queue_attributes()
.queue_url(queue_url.queue_url.unwrap())
.attribute_names(QueueAttributeName::QueueArn)
.send()
.await?;
let queue_arn = queue_attributes.attributes().unwrap().get(&QueueAttributeName::QueueArn).unwrap();

// Create the EventBridge target with the input transformer
let input_transformer =
InputTransformer::builder().input_paths_map("$.time", "time").input_template(message).build()?;

event_bridge_client
.put_targets()
.rule(trigger_rule_name)
.targets(
Target::builder()
.id(uuid::Uuid::new_v4().to_string())
.arn(queue_arn)
.input_transformer(input_transformer)
.build()?,
)
.send()
.await?;

Ok(())
}
}

fn duration_to_rate_string(duration: Duration) -> String {
let total_secs = duration.as_secs();
let total_mins = duration.as_secs() / 60;
let total_hours = duration.as_secs() / 3600;
let total_days = duration.as_secs() / 86400;

if total_days > 0 {
format!("rate({} day{})", total_days, if total_days == 1 { "" } else { "s" })
} else if total_hours > 0 {
format!("rate({} hour{})", total_hours, if total_hours == 1 { "" } else { "s" })
} else if total_mins > 0 {
format!("rate({} minute{})", total_mins, if total_mins == 1 { "" } else { "s" })
} else {
format!("rate({} second{})", total_secs, if total_secs == 1 { "" } else { "s" })
}
}

#[cfg(test)]
mod event_bridge_utils_test {
use rstest::rstest;

use super::*;

#[rstest]
fn test_duration_to_rate_string() {
assert_eq!(duration_to_rate_string(Duration::from_secs(60)), "rate(1 minute)");
assert_eq!(duration_to_rate_string(Duration::from_secs(120)), "rate(2 minutes)");
assert_eq!(duration_to_rate_string(Duration::from_secs(30)), "rate(30 seconds)");
assert_eq!(duration_to_rate_string(Duration::from_secs(3600)), "rate(1 hour)");
assert_eq!(duration_to_rate_string(Duration::from_secs(86400)), "rate(1 day)");
}
}
57 changes: 57 additions & 0 deletions crates/orchestrator/src/cron/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
use std::time::Duration;

use async_trait::async_trait;
use lazy_static::lazy_static;

use crate::queue::job_queue::{WorkerTriggerMessage, WorkerTriggerType};
use crate::setup::SetupConfig;

pub mod event_bridge;

lazy_static! {
pub static ref CRON_DURATION: Duration = Duration::from_mins(1);
// TODO : we can take this from clap.
pub static ref TARGET_QUEUE_NAME: String = String::from("madara_orchestrator_worker_trigger_queue");
pub static ref WORKER_TRIGGERS: Vec<WorkerTriggerType> = vec![
WorkerTriggerType::Snos,
WorkerTriggerType::Proving,
WorkerTriggerType::DataSubmission,
WorkerTriggerType::UpdateState
];
pub static ref WORKER_TRIGGER_RULE_NAME: String = String::from("worker_trigger_scheduled");
}

#[async_trait]
pub trait Cron {
async fn create_cron(
&self,
config: &SetupConfig,
cron_time: Duration,
trigger_rule_name: String,
) -> color_eyre::Result<()>;
async fn add_cron_target_queue(
&self,
config: &SetupConfig,
target_queue_name: String,
message: String,
trigger_rule_name: String,
) -> color_eyre::Result<()>;
async fn setup(&self, config: SetupConfig) -> color_eyre::Result<()> {
self.create_cron(&config, *CRON_DURATION, WORKER_TRIGGER_RULE_NAME.clone()).await?;
for triggers in WORKER_TRIGGERS.iter() {
self.add_cron_target_queue(
&config,
TARGET_QUEUE_NAME.clone(),
get_worker_trigger_message(triggers.clone())?,
WORKER_TRIGGER_RULE_NAME.clone(),
)
.await?;
}
Ok(())
}
}

fn get_worker_trigger_message(worker_trigger_type: WorkerTriggerType) -> color_eyre::Result<String> {
let message = WorkerTriggerMessage { worker: worker_trigger_type };
Ok(serde_json::to_string(&message)?)
}
2 changes: 1 addition & 1 deletion crates/orchestrator/src/data_storage/aws_s3/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ impl DataStorage for AWSS3 {
Ok(())
}

async fn build_test_bucket(&self, bucket_name: &str) -> Result<()> {
async fn create_bucket(&self, bucket_name: &str) -> Result<()> {
self.client.create_bucket().bucket(bucket_name).send().await?;
Ok(())
}
Expand Down
6 changes: 5 additions & 1 deletion crates/orchestrator/src/data_storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@ use utils::settings::Settings;
pub trait DataStorage: Send + Sync {
async fn get_data(&self, key: &str) -> Result<Bytes>;
async fn put_data(&self, data: Bytes, key: &str) -> Result<()>;
async fn build_test_bucket(&self, bucket_name: &str) -> Result<()>;
async fn create_bucket(&self, bucket_name: &str) -> Result<()>;
async fn setup(&self, settings_provider: Box<dyn Settings>) -> Result<()> {
let bucket_name = settings_provider.get_settings_or_panic("STORAGE_BUCKET_NAME");
self.create_bucket(&bucket_name).await
}
}

/// **DataStorageConfig** : Trait method to represent the config struct needed for
Expand Down
5 changes: 5 additions & 0 deletions crates/orchestrator/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
#![feature(duration_constructors)]

/// Contains the trait implementations for alerts
pub mod alerts;
/// Config of the service. Contains configurations for DB, Queues and other services.
pub mod config;
pub mod constants;
/// Controllers for the routes
pub mod controllers;
pub mod cron;
/// Contains the trait that implements the fetching functions
/// for blob and SNOS data from cloud for a particular block.
pub mod data_storage;
Expand All @@ -20,6 +23,8 @@ pub mod metrics;
pub mod queue;
/// Contains the routes for the service
pub mod routes;
/// Contains setup functions to set up db and cloud.
pub mod setup;
/// Contains telemetry collection services. (Metrics/Logs/Traces)
pub mod telemetry;
#[cfg(test)]
Expand Down
81 changes: 80 additions & 1 deletion crates/orchestrator/src/queue/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,82 @@ use std::time::Duration;

use async_trait::async_trait;
use color_eyre::Result as EyreResult;
use lazy_static::lazy_static;
use mockall::automock;
use omniqueue::{Delivery, QueueError};

use crate::config::Config;
use crate::jobs::JobError;
use crate::setup::SetupConfig;

#[derive(Clone)]
pub struct DlqConfig<'a> {
pub max_receive_count: i32,
pub dlq_name: &'a str,
}

#[derive(Clone)]
pub struct QueueConfig<'a> {
pub name: String,
pub visibility_timeout: i32,
pub dlq_config: Option<DlqConfig<'a>>,
}

lazy_static! {
pub static ref JOB_HANDLE_FAILURE_QUEUE: String = String::from("madara_orchestrator_job_handle_failure_queue");
pub static ref QUEUES: Vec<QueueConfig<'static>> = vec![
QueueConfig {
name: String::from("madara_orchestrator_snos_job_processing_queue"),
visibility_timeout: 300,
dlq_config: Some(DlqConfig { max_receive_count: 5, dlq_name: &JOB_HANDLE_FAILURE_QUEUE })
},
QueueConfig {
name: String::from("madara_orchestrator_snos_job_verification_queue"),
visibility_timeout: 300,
dlq_config: Some(DlqConfig { max_receive_count: 5, dlq_name: &JOB_HANDLE_FAILURE_QUEUE })
},
QueueConfig {
name: String::from("madara_orchestrator_proving_job_processing_queue"),
visibility_timeout: 300,
dlq_config: Some(DlqConfig { max_receive_count: 5, dlq_name: &JOB_HANDLE_FAILURE_QUEUE })
},
QueueConfig {
name: String::from("madara_orchestrator_proving_job_verification_queue"),
visibility_timeout: 300,
dlq_config: Some(DlqConfig { max_receive_count: 5, dlq_name: &JOB_HANDLE_FAILURE_QUEUE })
},
QueueConfig {
name: String::from("madara_orchestrator_data_submission_job_processing_queue"),
visibility_timeout: 300,
dlq_config: Some(DlqConfig { max_receive_count: 5, dlq_name: &JOB_HANDLE_FAILURE_QUEUE })
},
QueueConfig {
name: String::from("madara_orchestrator_data_submission_job_verification_queue"),
visibility_timeout: 300,
dlq_config: Some(DlqConfig { max_receive_count: 5, dlq_name: &JOB_HANDLE_FAILURE_QUEUE })
},
QueueConfig {
name: String::from("madara_orchestrator_update_state_job_processing_queue"),
visibility_timeout: 300,
dlq_config: Some(DlqConfig { max_receive_count: 5, dlq_name: &JOB_HANDLE_FAILURE_QUEUE })
},
QueueConfig {
name: String::from("madara_orchestrator_update_state_job_verification_queue"),
visibility_timeout: 300,
dlq_config: Some(DlqConfig { max_receive_count: 5, dlq_name: &JOB_HANDLE_FAILURE_QUEUE })
},
QueueConfig {
name: String::from("madara_orchestrator_job_handle_failure_queue"),
visibility_timeout: 300,
dlq_config: None
},
QueueConfig {
name: String::from("madara_orchestrator_worker_trigger_queue"),
visibility_timeout: 300,
dlq_config: None
},
];
}

/// Queue Provider Trait
///
Expand All @@ -21,7 +92,15 @@ use crate::jobs::JobError;
#[async_trait]
pub trait QueueProvider: Send + Sync {
async fn send_message_to_queue(&self, queue: String, payload: String, delay: Option<Duration>) -> EyreResult<()>;
async fn consume_message_from_queue(&self, queue: String) -> std::result::Result<Delivery, QueueError>;
async fn consume_message_from_queue(&self, queue: String) -> Result<Delivery, QueueError>;
async fn create_queue<'a>(&self, queue_config: &QueueConfig<'a>, config: &SetupConfig) -> EyreResult<()>;
async fn setup(&self, config: SetupConfig) -> EyreResult<()> {
// Creating the queues :
for queue in QUEUES.iter() {
self.create_queue(queue, &config).await?;
}
Ok(())
}
}

pub async fn init_consumers(config: Arc<Config>) -> Result<(), JobError> {
Expand Down
Loading

0 comments on commit f296a30

Please sign in to comment.