diff --git a/.env.example b/.env.example index fc5dc314..e0355037 100644 --- a/.env.example +++ b/.env.example @@ -33,7 +33,6 @@ MONGODB_CONNECTION_STRING= PROVER_SERVICE= SHARP_CUSTOMER_ID= SHARP_URL= -# [IMP!!!] These are test certificates (they don't work) SHARP_USER_CRT= SHARP_USER_KEY= SHARP_SERVER_CRT= @@ -48,4 +47,4 @@ MADARA_RPC_URL= MEMORY_PAGES_CONTRACT_ADDRESS= PRIVATE_KEY= ETHEREUM_PRIVATE_KEY= -STARKNET_SOLIDITY_CORE_CONTRACT_ADDRESS= +STARKNET_SOLIDITY_CORE_CONTRACT_ADDRESS= \ No newline at end of file diff --git a/.env.test b/.env.test index b87267ed..4d09e45b 100644 --- a/.env.test +++ b/.env.test @@ -9,6 +9,7 @@ AWS_ACCESS_KEY_ID="AWS_ACCESS_KEY_ID" AWS_SECRET_ACCESS_KEY="AWS_SECRET_ACCESS_KEY" AWS_REGION="us-east-1" AWS_ENDPOINT_URL="http://localhost.localstack.cloud:4566" +AWS_DEFAULT_REGION="localhost" ##### STORAGE ##### @@ -20,8 +21,14 @@ AWS_S3_BUCKET_NAME="madara-orchestrator-test-bucket" QUEUE_PROVIDER="sqs" SQS_JOB_PROCESSING_QUEUE_URL="http://sqs.us-east-1.localhost.localstack.cloud:4566/000000000000/madara_orchestrator_job_processing_queue" SQS_JOB_VERIFICATION_QUEUE_URL="http://sqs.us-east-1.localhost.localstack.cloud:4566/000000000000/madara_orchestrator_job_verification_queue" -SQS_JOB_HANDLE_FAILURE_QUEUE_URL= -SQS_WORKER_TRIGGER_QUEUE_URL= +SQS_JOB_HANDLE_FAILURE_QUEUE_URL="http://sqs.us-east-1.localhost.localstack.cloud:4566/000000000000/madara_orchestrator_job_handle_failure_queue" +SQS_WORKER_TRIGGER_QUEUE_URL="http://sqs.us-east-1.localhost.localstack.cloud:4566/000000000000/madara_orchestrator_worker_trigger_queue" + +##### SNS ##### +ALERTS="sns" +AWS_SNS_REGION="us-east-1" +AWS_SNS_ARN="arn:aws:sns:us-east-1:000000000000:madara-orchestrator-arn" +AWS_SNS_ARN_NAME="madara-orchestrator-arn" ##### DATABASE ##### @@ -46,7 +53,15 @@ SETTLEMENT_LAYER="ethereum" SETTLEMENT_RPC_URL="http://localhost:3001" MADARA_RPC_URL="http://localhost:3000" L1_CORE_CONTRACT_ADDRESS="0xc662c410C0ECf747543f5bA90660f6ABeBD9C8c4" -MEMORY_PAGES_CONTRACT_ADDRESS="0x000000000000000000000000000000000001dead" +MEMORY_PAGES_CONTRACT_ADDRESS="0x47312450B3Ac8b5b8e247a6bB6d523e7605bDb60" PRIVATE_KEY="0xdead" ETHEREUM_PRIVATE_KEY="0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80" -STARKNET_SOLIDITY_CORE_CONTRACT_ADDRESS="0x000000000000000000000000000000000002dead" +STARKNET_SOLIDITY_CORE_CONTRACT_ADDRESS="0xc662c410C0ECf747543f5bA90660f6ABeBD9C8c4" +DEFAULT_L1_CORE_CONTRACT_ADDRESS="0xc662c410C0ECf747543f5bA90660f6ABeBD9C8c4" +TEST_DUMMY_CONTRACT_ADDRESS="0xE5b6F5e695BA6E4aeD92B68c4CC8Df1160D69A81" +STARKNET_OPERATOR_ADDRESS="0x2C169DFe5fBbA12957Bdd0Ba47d9CEDbFE260CA7" +ETHEREUM_BLAST_RPC_URL="https://eth-mainnet.public.blastapi.io" + +##### E2E test vars ##### + +L2_BLOCK_NUMBER_FOR_TEST=671070 \ No newline at end of file diff --git a/.github/workflows/coverage.yml b/.github/workflows/coverage.yml index f6afb593..2284f89b 100644 --- a/.github/workflows/coverage.yml +++ b/.github/workflows/coverage.yml @@ -14,7 +14,7 @@ jobs: localstack: image: localstack/localstack env: - SERVICES: s3, sqs + SERVICES: s3, sqs, sns DEFAULT_REGION: us-east-1 AWS_ACCESS_KEY_ID: "AWS_ACCESS_KEY_ID" AWS_SECRET_ACCESS_KEY: "AWS_SECRET_ACCESS_KEY" diff --git a/CHANGELOG.md b/CHANGELOG.md index 952e7162..c79ba808 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). ## Added +- alerts module. - Tests for Settlement client. - Worker queues to listen for trigger events. - Tests for prover client. diff --git a/Cargo.lock b/Cargo.lock index 8414b14e..ccbd7a75 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1697,9 +1697,9 @@ dependencies = [ [[package]] name = "aws-runtime" -version = "1.3.1" +version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "87c5f920ffd1e0526ec9e70e50bf444db50b204395a0fa7016bbf9e31ea1698f" +checksum = "f42c2d4218de4dcd890a109461e2f799a1a2ba3bcd2cde9af88360f5df9266c6" dependencies = [ "aws-credential-types", "aws-sigv4", @@ -1713,6 +1713,7 @@ dependencies = [ "fastrand 2.1.0", "http 0.2.12", "http-body 0.4.6", + "once_cell", "percent-encoding", "pin-project-lite", "tracing", @@ -1754,6 +1755,29 @@ dependencies = [ "url", ] +[[package]] +name = "aws-sdk-sns" +version = "1.40.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dab2b9787b8d9d3094ace9585e785079cfc583199ec620ab067b599e8850c1a6" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-query", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-smithy-xml", + "aws-types", + "http 0.2.12", + "once_cell", + "regex-lite", + "tracing", +] + [[package]] name = "aws-sdk-sqs" version = "1.36.0" @@ -1957,9 +1981,9 @@ dependencies = [ [[package]] name = "aws-smithy-runtime" -version = "1.6.2" +version = "1.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ce87155eba55e11768b8c1afa607f3e864ae82f03caf63258b37455b0ad02537" +checksum = "0abbf454960d0db2ad12684a1640120e7557294b0ff8e2f11236290a1b293225" dependencies = [ "aws-smithy-async", "aws-smithy-http", @@ -1984,9 +2008,9 @@ dependencies = [ [[package]] name = "aws-smithy-runtime-api" -version = "1.7.1" +version = "1.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "30819352ed0a04ecf6a2f3477e344d2d1ba33d43e0f09ad9047c12e0d923616f" +checksum = "e086682a53d3aa241192aa110fa8dfce98f2f5ac2ead0de84d41582c7e8fdb96" dependencies = [ "aws-smithy-async", "aws-smithy-types", @@ -2001,9 +2025,9 @@ dependencies = [ [[package]] name = "aws-smithy-types" -version = "1.2.0" +version = "1.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cfe321a6b21f5d8eabd0ade9c55d3d0335f3c3157fc2b3e87f05f34b539e4df5" +checksum = "6cee7cadb433c781d3299b916fbf620fea813bf38f49db282fb6858141a05cc8" dependencies = [ "base64-simd", "bytes", @@ -6374,6 +6398,7 @@ dependencies = [ "async-trait", "aws-config", "aws-sdk-s3", + "aws-sdk-sns", "aws-sdk-sqs", "axum 0.7.5", "axum-macros", diff --git a/crates/orchestrator/Cargo.toml b/crates/orchestrator/Cargo.toml index e5121529..1d67b237 100644 --- a/crates/orchestrator/Cargo.toml +++ b/crates/orchestrator/Cargo.toml @@ -19,6 +19,7 @@ async-std = "1.12.0" async-trait = { workspace = true } aws-config = { version = "1.1.7", features = ["behavior-version-latest"] } aws-sdk-s3 = { version = "1.38.0", features = ["behavior-version-latest"] } +aws-sdk-sns = { version = "1.40.0", features = ["behavior-version-latest"] } aws-sdk-sqs = "1.36.0" axum = { workspace = true, features = ["macros"] } axum-macros = { workspace = true } diff --git a/crates/orchestrator/src/alerts/aws_sns/mod.rs b/crates/orchestrator/src/alerts/aws_sns/mod.rs new file mode 100644 index 00000000..9aa95f9a --- /dev/null +++ b/crates/orchestrator/src/alerts/aws_sns/mod.rs @@ -0,0 +1,27 @@ +use crate::alerts::Alerts; +use async_trait::async_trait; +use aws_sdk_sns::config::Region; +use aws_sdk_sns::Client; +use utils::env_utils::get_env_var_or_panic; + +pub struct AWSSNS { + client: Client, +} + +impl AWSSNS { + /// To create a new SNS client + pub async fn new() -> Self { + let sns_region = get_env_var_or_panic("AWS_SNS_REGION"); + let config = aws_config::from_env().region(Region::new(sns_region)).load().await; + AWSSNS { client: Client::new(&config) } + } +} + +#[async_trait] +impl Alerts for AWSSNS { + async fn send_alert_message(&self, message_body: String) -> color_eyre::Result<()> { + let topic_arn = get_env_var_or_panic("AWS_SNS_ARN"); + self.client.publish().topic_arn(topic_arn).message(message_body).send().await?; + Ok(()) + } +} diff --git a/crates/orchestrator/src/alerts/mod.rs b/crates/orchestrator/src/alerts/mod.rs new file mode 100644 index 00000000..1e36129d --- /dev/null +++ b/crates/orchestrator/src/alerts/mod.rs @@ -0,0 +1,11 @@ +use async_trait::async_trait; +use mockall::automock; + +pub mod aws_sns; + +#[automock] +#[async_trait] +pub trait Alerts: Send + Sync { + /// To send an alert message to our alert service + async fn send_alert_message(&self, message_body: String) -> color_eyre::Result<()>; +} diff --git a/crates/orchestrator/src/config.rs b/crates/orchestrator/src/config.rs index e3b7b975..4f29cac5 100644 --- a/crates/orchestrator/src/config.rs +++ b/crates/orchestrator/src/config.rs @@ -1,5 +1,7 @@ use std::sync::Arc; +use crate::alerts::aws_sns::AWSSNS; +use crate::alerts::Alerts; use crate::data_storage::aws_s3::config::AWSS3Config; use crate::data_storage::aws_s3::AWSS3; use crate::data_storage::{DataStorage, DataStorageConfig}; @@ -43,6 +45,8 @@ pub struct Config { queue: Box, /// Storage client storage: Box, + /// Alerts client + alerts: Box, } /// Initializes the app config @@ -74,11 +78,23 @@ pub async fn init_config() -> Config { let storage_client = build_storage_client(&aws_config).await; - Config::new(Arc::new(provider), da_client, prover_client, settlement_client, database, queue, storage_client) + let alerts_client = build_alert_client().await; + + Config::new( + Arc::new(provider), + da_client, + prover_client, + settlement_client, + database, + queue, + storage_client, + alerts_client, + ) } impl Config { /// Create a new config + #[allow(clippy::too_many_arguments)] pub fn new( starknet_client: Arc>, da_client: Box, @@ -87,8 +103,9 @@ impl Config { database: Box, queue: Box, storage: Box, + alerts: Box, ) -> Self { - Self { starknet_client, da_client, prover_client, settlement_client, database, queue, storage } + Self { starknet_client, da_client, prover_client, settlement_client, database, queue, storage, alerts } } /// Returns the starknet client @@ -125,6 +142,11 @@ impl Config { pub fn storage(&self) -> &dyn DataStorage { self.storage.as_ref() } + + /// Returns the alerts client + pub fn alerts(&self) -> &dyn Alerts { + self.alerts.as_ref() + } } /// The app config. It can be accessed from anywhere inside the service. @@ -191,6 +213,12 @@ pub async fn build_storage_client(aws_config: &SdkConfig) -> Box Box { + match get_env_var_or_panic("ALERTS").as_str() { + "sns" => Box::new(AWSSNS::new().await), + _ => panic!("Unsupported Alert Client"), + } +} pub fn build_queue_client(_aws_config: &SdkConfig) -> Box { match get_env_var_or_panic("QUEUE_PROVIDER").as_str() { "sqs" => Box::new(SqsQueue {}), diff --git a/crates/orchestrator/src/jobs/mod.rs b/crates/orchestrator/src/jobs/mod.rs index e4ae1ed6..11da33d8 100644 --- a/crates/orchestrator/src/jobs/mod.rs +++ b/crates/orchestrator/src/jobs/mod.rs @@ -139,7 +139,8 @@ pub async fn create_job( } let job_handler = factory::get_job_handler(&job_type).await; - let job_item = job_handler.create_job(config.as_ref(), internal_id, metadata).await?; + let job_item = job_handler.create_job(config.as_ref(), internal_id.clone(), metadata.clone()).await?; + config.database().create_job(job_item.clone()).await.map_err(|e| JobError::Other(OtherError(e)))?; add_job_to_process_queue(job_item.id).await.map_err(|e| JobError::Other(OtherError(e)))?; @@ -173,6 +174,7 @@ pub async fn process_job(id: Uuid) -> Result<(), JobError> { let job_handler = factory::get_job_handler(&job.job_type).await; let external_id = job_handler.process_job(config.as_ref(), &mut job).await?; + let metadata = increment_key_in_metadata(&job.metadata, JOB_PROCESS_ATTEMPT_METADATA_KEY)?; // Fetching the job again because update status above will update the job version @@ -239,8 +241,6 @@ pub async fn verify_job(id: Uuid) -> Result<(), JobError> { ); add_job_to_process_queue(job.id).await.map_err(|e| JobError::Other(OtherError(e)))?; return Ok(()); - } else { - // TODO: send alert } } JobVerificationStatus::Pending => { @@ -248,7 +248,6 @@ pub async fn verify_job(id: Uuid) -> Result<(), JobError> { let verify_attempts = get_u64_from_metadata(&job.metadata, JOB_VERIFICATION_ATTEMPT_METADATA_KEY) .map_err(|e| JobError::Other(OtherError(e)))?; if verify_attempts >= job_handler.max_verification_attempts() { - // TODO: send alert log::info!("Verification attempts exceeded for job {}. Marking as timed out.", job.id); config .database() diff --git a/crates/orchestrator/src/lib.rs b/crates/orchestrator/src/lib.rs index 3d02378b..4212f381 100644 --- a/crates/orchestrator/src/lib.rs +++ b/crates/orchestrator/src/lib.rs @@ -1,3 +1,5 @@ +/// Contains the trait implementations for alerts +pub mod alerts; /// Config of the service. Contains configurations for DB, Queues and other services. pub mod config; pub mod constants; diff --git a/crates/orchestrator/src/queue/job_queue.rs b/crates/orchestrator/src/queue/job_queue.rs index d6769aea..44d492da 100644 --- a/crates/orchestrator/src/queue/job_queue.rs +++ b/crates/orchestrator/src/queue/job_queue.rs @@ -163,6 +163,13 @@ where } Err(e) => { log::error!("Failed to handle job with id {:?}. Error: {:?}", job_message.id, e); + config() + .await + .alerts() + .send_alert_message(e.to_string()) + .await + .map_err(|e| ConsumptionError::Other(OtherError::from(e)))?; + match message.nack().await { Ok(_) => Err(ConsumptionError::FailedToHandleJob { job_id: job_message.id, @@ -201,6 +208,13 @@ where } Err(e) => { log::error!("Failed to handle worker trigger {:?}. Error: {:?}", job_message.worker, e); + config() + .await + .alerts() + .send_alert_message(e.to_string()) + .await + .map_err(|e| ConsumptionError::Other(OtherError::from(e)))?; + message.nack().await.map_err(|(e, _)| ConsumptionError::Other(OtherError::from(e.to_string())))?; Err(ConsumptionError::FailedToSpawnWorker { worker_trigger_type: job_message.worker, diff --git a/crates/orchestrator/src/tests/alerts/mod.rs b/crates/orchestrator/src/tests/alerts/mod.rs new file mode 100644 index 00000000..2026061b --- /dev/null +++ b/crates/orchestrator/src/tests/alerts/mod.rs @@ -0,0 +1,61 @@ +use crate::config::config; +use crate::tests::common::{get_sns_client, get_sqs_client}; +use crate::tests::config::TestConfigBuilder; +use aws_sdk_sqs::types::QueueAttributeName::QueueArn; +use rstest::rstest; +use std::time::Duration; +use tokio::time::sleep; +use utils::env_utils::get_env_var_or_panic; + +pub const SNS_ALERT_TEST_QUEUE: &str = "orchestrator_sns_alert_testing_queue"; + +#[rstest] +#[tokio::test] +async fn sns_alert_subscribe_to_topic_receive_alert_works() { + TestConfigBuilder::new().build().await; + + let sqs_client = get_sqs_client().await; + let queue = sqs_client.create_queue().queue_name(SNS_ALERT_TEST_QUEUE).send().await.unwrap(); + let queue_url = queue.queue_url().unwrap(); + + let sns_client = get_sns_client().await; + let config = config().await; + + let queue_attributes = + sqs_client.get_queue_attributes().queue_url(queue_url).attribute_names(QueueArn).send().await.unwrap(); + + let queue_arn = queue_attributes.attributes().unwrap().get(&QueueArn).unwrap(); + + // subscribing the queue with the alerts + sns_client + .subscribe() + .topic_arn(get_env_var_or_panic("AWS_SNS_ARN").as_str()) + .protocol("sqs") + .endpoint(queue_arn) + .send() + .await + .unwrap(); + + let message_to_send = "Hello World :)"; + + // Getting sns client from the module + let alerts_client = config.alerts(); + // Sending the alert message + alerts_client.send_alert_message(message_to_send.to_string()).await.unwrap(); + + sleep(Duration::from_secs(5)).await; + + // Checking the queue for message + let receive_message_result = sqs_client + .receive_message() + .queue_url(queue_url) + .max_number_of_messages(1) + .send() + .await + .unwrap() + .messages + .unwrap(); + + assert_eq!(receive_message_result.len(), 1, "Alert message length assertion failed"); + assert!(receive_message_result[0].body.clone().unwrap().contains(message_to_send)); +} diff --git a/crates/orchestrator/src/tests/common/mod.rs b/crates/orchestrator/src/tests/common/mod.rs index ee29ac34..83cf7e68 100644 --- a/crates/orchestrator/src/tests/common/mod.rs +++ b/crates/orchestrator/src/tests/common/mod.rs @@ -4,6 +4,8 @@ use std::collections::HashMap; use ::uuid::Uuid; use aws_config::Region; +use aws_sdk_sns::error::SdkError; +use aws_sdk_sns::operation::create_topic::CreateTopicError; use mongodb::Client; use rstest::*; use serde::Deserialize; @@ -41,6 +43,18 @@ pub fn custom_job_item(default_job_item: JobItem, #[default(String::from("0"))] job_item } +pub async fn create_sns_arn() -> Result<(), SdkError> { + let sns_client = get_sns_client().await; + sns_client.create_topic().name(get_env_var_or_panic("AWS_SNS_ARN_NAME")).send().await?; + Ok(()) +} + +pub async fn get_sns_client() -> aws_sdk_sns::client::Client { + let sns_region = get_env_var_or_panic("AWS_SNS_REGION"); + let config = aws_config::from_env().region(Region::new(sns_region)).load().await; + aws_sdk_sns::Client::new(&config) +} + pub async fn drop_database() -> color_eyre::Result<()> { let db_client: Client = MongoDb::new(MongoDbConfig::new_from_env()).await.client(); // dropping all the collection. @@ -72,7 +86,7 @@ pub async fn create_sqs_queues() -> color_eyre::Result<()> { Ok(()) } -async fn get_sqs_client() -> aws_sdk_sqs::Client { +pub async fn get_sqs_client() -> aws_sdk_sqs::Client { // This function is for localstack. So we can hardcode the region for this as of now. let region_provider = Region::new("us-east-1"); let config = aws_config::from_env().region(region_provider).load().await; diff --git a/crates/orchestrator/src/tests/config.rs b/crates/orchestrator/src/tests/config.rs index 21510375..8424d6d7 100644 --- a/crates/orchestrator/src/tests/config.rs +++ b/crates/orchestrator/src/tests/config.rs @@ -1,10 +1,13 @@ use std::sync::Arc; -use crate::config::{build_da_client, build_prover_service, build_settlement_client, config_force_init, Config}; +use crate::config::{ + build_alert_client, build_da_client, build_prover_service, build_settlement_client, config_force_init, Config, +}; use crate::data_storage::DataStorage; use da_client_interface::DaClient; use httpmock::MockServer; +use crate::alerts::Alerts; use prover_client_interface::ProverClient; use settlement_client_interface::SettlementClient; use starknet::providers::jsonrpc::HttpTransport; @@ -17,7 +20,7 @@ use crate::database::mongodb::MongoDb; use crate::database::{Database, DatabaseConfig}; use crate::queue::sqs::SqsQueue; use crate::queue::QueueProvider; -use crate::tests::common::{create_sqs_queues, drop_database, get_storage_client}; +use crate::tests::common::{create_sns_arn, create_sqs_queues, drop_database, get_storage_client}; // Inspiration : https://rust-unofficial.github.io/patterns/patterns/creational/builder.html // TestConfigBuilder allows to heavily customise the global configs based on the test's requirement. @@ -39,6 +42,8 @@ pub struct TestConfigBuilder { queue: Option>, /// Storage client storage: Option>, + /// Alerts client + alerts: Option>, } impl Default for TestConfigBuilder { @@ -58,6 +63,7 @@ impl TestConfigBuilder { database: None, queue: None, storage: None, + alerts: None, } } @@ -96,6 +102,11 @@ impl TestConfigBuilder { self } + pub fn mock_alerts(mut self, alerts: Box) -> TestConfigBuilder { + self.alerts = Some(alerts); + self + } + pub async fn build(mut self) -> MockServer { dotenvy::from_filename("../.env.test").expect("Failed to load the .env file"); @@ -132,10 +143,16 @@ impl TestConfigBuilder { } } + if self.alerts.is_none() { + self.alerts = Some(build_alert_client().await); + } + // Deleting and Creating the queues in sqs. create_sqs_queues().await.expect("Not able to delete and create the queues."); // Deleting the database drop_database().await.expect("Unable to drop the database."); + // Creating the SNS ARN + create_sns_arn().await.expect("Unable to create the sns arn"); let config = Config::new( self.starknet_client.unwrap_or_else(|| { @@ -150,6 +167,7 @@ impl TestConfigBuilder { self.database.unwrap(), self.queue.unwrap_or_else(|| Box::new(SqsQueue {})), self.storage.unwrap(), + self.alerts.unwrap(), ); drop_database().await.unwrap(); diff --git a/crates/orchestrator/src/tests/mod.rs b/crates/orchestrator/src/tests/mod.rs index 1dbc21a2..4f264304 100644 --- a/crates/orchestrator/src/tests/mod.rs +++ b/crates/orchestrator/src/tests/mod.rs @@ -7,6 +7,7 @@ pub mod server; pub mod queue; +pub mod alerts; pub mod common; mod data_storage; pub mod workers;