Skip to content

Commit

Permalink
update: better alert impl. (#111)
Browse files Browse the repository at this point in the history
* update: drilled config

* update: TestConfigBuilder with configurations

* chore: lint fixes

* chore: lint fixes #2

* update: Non-Arc Impl for TestConfigBuilder

* update: New TestConfigBuilder accomodating changed on TestCases

* update: uncomment fft tests

* update: better alert impl

* update: PR review changes #1

* ignore: empty comment to trigger CI

* update: TestConfigBuilder object name changes

* update: optimised init_<service> functions

* update: removed new_from_env() from AWSSNS

* ignore: empty comment to trigger CI
  • Loading branch information
heemankv authored Sep 5, 2024
1 parent f84ba41 commit 21ec2cb
Show file tree
Hide file tree
Showing 36 changed files with 766 additions and 614 deletions.
1 change: 0 additions & 1 deletion .env.test
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ SQS_WORKER_TRIGGER_QUEUE_URL="http://sqs.us-east-1.localhost.localstack.cloud:45

##### SNS #####
ALERTS="sns"
AWS_SNS_REGION="us-east-1"
AWS_SNS_ARN="arn:aws:sns:us-east-1:000000000000:madara-orchestrator-arn"
AWS_SNS_ARN_NAME="madara-orchestrator-arn"

Expand Down
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).

## Changed

- AWS config built from TestConfigBuilder.
- Better TestConfigBuilder, with sync config clients.
- Drilled Config, removing dirty global reads.
- refactor AWS config usage and clean .env files
- GitHub's coverage CI yml file for localstack and db testing.
- Orchestrator :Moved TestConfigBuilder to `config.rs` in tests folder.
Expand Down
7 changes: 0 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ url = { version = "2.5.0", features = ["serde"] }
uuid = { version = "1.7.0", features = ["v4", "serde"] }
httpmock = { version = "0.7.0", features = ["remote"] }
num-bigint = { version = "0.4.4" }
arc-swap = { version = "1.7.1" }
num-traits = "0.2"
lazy_static = "1.4.0"
stark_evm_adapter = "0.1.1"
Expand Down
1 change: 0 additions & 1 deletion crates/orchestrator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ path = "src/main.rs"

[dependencies]
alloy = { workspace = true }
arc-swap = { workspace = true }
assert_matches = "1.5.0"
async-std = "1.12.0"
async-trait = { workspace = true }
Expand Down
8 changes: 3 additions & 5 deletions crates/orchestrator/src/alerts/aws_sns/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::alerts::Alerts;
use async_trait::async_trait;
use aws_sdk_sns::config::Region;
use aws_config::SdkConfig;
use aws_sdk_sns::Client;
use utils::env_utils::get_env_var_or_panic;

Expand All @@ -10,10 +10,8 @@ pub struct AWSSNS {

impl AWSSNS {
/// To create a new SNS client
pub async fn new() -> Self {
let sns_region = get_env_var_or_panic("AWS_SNS_REGION");
let config = aws_config::from_env().region(Region::new(sns_region)).load().await;
AWSSNS { client: Client::new(&config) }
pub async fn new(config: &SdkConfig) -> Self {
AWSSNS { client: Client::new(config) }
}
}

Expand Down
63 changes: 17 additions & 46 deletions crates/orchestrator/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,27 +1,26 @@
use std::sync::Arc;

use crate::alerts::aws_sns::AWSSNS;
use crate::alerts::Alerts;
use crate::data_storage::aws_s3::config::AWSS3Config;
use crate::data_storage::aws_s3::AWSS3;
use crate::data_storage::{DataStorage, DataStorageConfig};
use arc_swap::{ArcSwap, Guard};
use aws_config::SdkConfig;
use da_client_interface::{DaClient, DaConfig};
use dotenvy::dotenv;
use starknet::providers::jsonrpc::HttpTransport;
use starknet::providers::{JsonRpcClient, Url};

use da_client_interface::{DaClient, DaConfig};
use ethereum_da_client::config::EthereumDaConfig;
use ethereum_settlement_client::EthereumSettlementClient;
use prover_client_interface::ProverClient;
use settlement_client_interface::SettlementClient;
use sharp_service::SharpProverService;
use starknet::providers::jsonrpc::HttpTransport;
use starknet::providers::{JsonRpcClient, Url};
use starknet_settlement_client::StarknetSettlementClient;
use tokio::sync::OnceCell;
use utils::env_utils::get_env_var_or_panic;
use utils::settings::default::DefaultSettingsProvider;
use utils::settings::SettingsProvider;

use crate::alerts::aws_sns::AWSSNS;
use crate::alerts::Alerts;
use crate::data_storage::aws_s3::config::AWSS3Config;
use crate::data_storage::aws_s3::AWSS3;
use crate::data_storage::{DataStorage, DataStorageConfig};
use crate::database::mongodb::config::MongoDbConfig;
use crate::database::mongodb::MongoDb;
use crate::database::{Database, DatabaseConfig};
Expand Down Expand Up @@ -50,7 +49,7 @@ pub struct Config {
}

/// Initializes the app config
pub async fn init_config() -> Config {
pub async fn init_config() -> Arc<Config> {
dotenv().ok();

// init starknet client
Expand All @@ -68,7 +67,7 @@ pub async fn init_config() -> Config {
// TODO: we use omniqueue for now which doesn't support loading AWS config
// from `SdkConfig`. We can later move to using `aws_sdk_sqs`. This would require
// us stop using the generic omniqueue abstractions for message ack/nack
let queue = build_queue_client(&aws_config);
let queue = build_queue_client();

let da_client = build_da_client().await;

Expand All @@ -77,10 +76,9 @@ pub async fn init_config() -> Config {
let prover_client = build_prover_service(&settings_provider);

let storage_client = build_storage_client(&aws_config).await;
let alerts_client = build_alert_client(&aws_config).await;

let alerts_client = build_alert_client().await;

Config::new(
Arc::new(Config::new(
Arc::new(provider),
da_client,
prover_client,
Expand All @@ -89,7 +87,7 @@ pub async fn init_config() -> Config {
queue,
storage_client,
alerts_client,
)
))
}

impl Config {
Expand Down Expand Up @@ -149,33 +147,6 @@ impl Config {
}
}

/// The app config. It can be accessed from anywhere inside the service.
/// It's initialized only once.
/// We are using `ArcSwap` as it allow us to replace the new `Config` with
/// a new one which is required when running test cases. This approach was
/// inspired from here - https://github.com/matklad/once_cell/issues/127
pub static CONFIG: OnceCell<ArcSwap<Config>> = OnceCell::const_new();

/// Returns the app config. Initializes if not already done.
pub async fn config() -> Guard<Arc<Config>> {
let cfg = CONFIG.get_or_init(|| async { ArcSwap::from_pointee(init_config().await) }).await;
cfg.load()
}

/// OnceCell only allows us to initialize the config once and that's how it should be on production.
/// However, when running tests, we often want to reinitialize because we want to clear the DB and
/// set it up again for reuse in new tests. By calling `config_force_init` we replace the already
/// stored config inside `ArcSwap` with the new configuration and pool settings.
#[cfg(test)]
pub async fn config_force_init(config: Config) {
match CONFIG.get() {
Some(arc) => arc.store(Arc::new(config)),
None => {
CONFIG.get_or_init(|| async { ArcSwap::from_pointee(config) }).await;
}
}
}

/// Builds the DA client based on the environment variable DA_LAYER
pub async fn build_da_client() -> Box<dyn DaClient + Send + Sync> {
match get_env_var_or_panic("DA_LAYER").as_str() {
Expand Down Expand Up @@ -213,13 +184,13 @@ pub async fn build_storage_client(aws_config: &SdkConfig) -> Box<dyn DataStorage
}
}

pub async fn build_alert_client() -> Box<dyn Alerts + Send + Sync> {
pub async fn build_alert_client(aws_config: &SdkConfig) -> Box<dyn Alerts + Send + Sync> {
match get_env_var_or_panic("ALERTS").as_str() {
"sns" => Box::new(AWSSNS::new().await),
"sns" => Box::new(AWSSNS::new(aws_config).await),
_ => panic!("Unsupported Alert Client"),
}
}
pub fn build_queue_client(_aws_config: &SdkConfig) -> Box<dyn QueueProvider + Send + Sync> {
pub fn build_queue_client() -> Box<dyn QueueProvider + Send + Sync> {
match get_env_var_or_panic("QUEUE_PROVIDER").as_str() {
"sqs" => Box::new(SqsQueue {}),
_ => panic!("Unsupported Queue Client"),
Expand Down
46 changes: 24 additions & 22 deletions crates/orchestrator/src/jobs/da_job/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::collections::HashMap;
use std::ops::{Add, Mul, Rem};
use std::str::FromStr;
use std::sync::Arc;

use async_trait::async_trait;
use color_eyre::eyre::WrapErr;
Expand All @@ -13,11 +14,12 @@ use thiserror::Error;
use tracing::log;
use uuid::Uuid;

use super::types::{JobItem, JobStatus, JobType, JobVerificationStatus};
use super::{Job, JobError, OtherError};
use crate::config::Config;
use crate::constants::BLOB_DATA_FILE_NAME;

use super::types::{JobItem, JobStatus, JobType, JobVerificationStatus};
use super::{Job, JobError, OtherError};

lazy_static! {
/// EIP-4844 BLS12-381 modulus.
///
Expand Down Expand Up @@ -59,7 +61,7 @@ pub struct DaJob;
impl Job for DaJob {
async fn create_job(
&self,
_config: &Config,
_config: Arc<Config>,
internal_id: String,
metadata: HashMap<String, String>,
) -> Result<JobItem, JobError> {
Expand All @@ -74,7 +76,7 @@ impl Job for DaJob {
})
}

async fn process_job(&self, config: &Config, job: &mut JobItem) -> Result<String, JobError> {
async fn process_job(&self, config: Arc<Config>, job: &mut JobItem) -> Result<String, JobError> {
let block_no = job
.internal_id
.parse::<u64>()
Expand All @@ -95,7 +97,7 @@ impl Job for DaJob {
MaybePendingStateUpdate::Update(state_update) => state_update,
};
// constructing the data from the rpc
let blob_data = state_update_to_blob_data(block_no, state_update, config)
let blob_data = state_update_to_blob_data(block_no, state_update, config.clone())
.await
.map_err(|e| JobError::Other(OtherError(e)))?;
// transforming the data so that we can apply FFT on this.
Expand Down Expand Up @@ -135,7 +137,7 @@ impl Job for DaJob {
Ok(external_id)
}

async fn verify_job(&self, config: &Config, job: &mut JobItem) -> Result<JobVerificationStatus, JobError> {
async fn verify_job(&self, config: Arc<Config>, job: &mut JobItem) -> Result<JobVerificationStatus, JobError> {
Ok(config
.da_client()
.verify_inclusion(job.external_id.unwrap_string().map_err(|e| JobError::Other(OtherError(e)))?)
Expand Down Expand Up @@ -230,7 +232,7 @@ fn data_to_blobs(blob_size: u64, block_data: Vec<BigUint>) -> Result<Vec<Vec<u8>
pub async fn state_update_to_blob_data(
block_no: u64,
state_update: StateUpdate,
config: &Config,
config: Arc<Config>,
) -> color_eyre::Result<Vec<FieldElement>> {
let state_diff = state_update.state_diff;
let mut blob_data: Vec<FieldElement> = vec![
Expand Down Expand Up @@ -308,7 +310,7 @@ pub async fn state_update_to_blob_data(
}

/// To store the blob data using the storage client with path <block_number>/blob_data.txt
async fn store_blob_data(blob_data: Vec<FieldElement>, block_number: u64, config: &Config) -> Result<(), JobError> {
async fn store_blob_data(blob_data: Vec<FieldElement>, block_number: u64, config: Arc<Config>) -> Result<(), JobError> {
let storage_client = config.storage();
let key = block_number.to_string() + "/" + BLOB_DATA_FILE_NAME;
let data_blob_big_uint = convert_to_biguint(blob_data.clone());
Expand Down Expand Up @@ -370,18 +372,12 @@ fn da_word(class_flag: bool, nonce_change: Option<FieldElement>, num_changes: u6
#[cfg(test)]

pub mod test {
use crate::jobs::da_job::da_word;
use std::fs;
use std::fs::File;
use std::io::Read;
use std::sync::Arc;

use crate::config::config;
use crate::data_storage::MockDataStorage;
use crate::tests::config::TestConfigBuilder;
use ::serde::{Deserialize, Serialize};
use color_eyre::Result;
use da_client_interface::MockDaClient;
use httpmock::prelude::*;
use majin_blob_core::blob;
use majin_blob_types::serde;
Expand All @@ -393,6 +389,11 @@ pub mod test {
use starknet_core::types::{FieldElement, StateUpdate};
use url::Url;

use da_client_interface::MockDaClient;

use crate::data_storage::MockDataStorage;
use crate::jobs::da_job::da_word;

/// Tests `da_word` function with various inputs for class flag, new nonce, and number of changes.
/// Verifies that `da_word` produces the correct FieldElement based on the provided parameters.
/// Uses test cases with different combinations of inputs and expected output strings.
Expand Down Expand Up @@ -444,7 +445,10 @@ pub mod test {
#[case] file_path: &str,
#[case] nonce_file_path: &str,
) {
use crate::jobs::da_job::{convert_to_biguint, state_update_to_blob_data};
use crate::{
jobs::da_job::{convert_to_biguint, state_update_to_blob_data},
tests::config::TestConfigBuilder,
};

let server = MockServer::start();
let mut da_client = MockDaClient::new();
Expand All @@ -462,19 +466,17 @@ pub mod test {
));

// mock block number (madara) : 5
TestConfigBuilder::new()
.mock_starknet_client(Arc::new(provider))
.mock_da_client(Box::new(da_client))
.mock_storage_client(Box::new(storage_client))
let services = TestConfigBuilder::new()
.configure_starknet_client(provider.into())
.configure_da_client(da_client.into())
.configure_storage_client(storage_client.into())
.build()
.await;

let config = config().await;

get_nonce_attached(&server, nonce_file_path);

let state_update = read_state_update_from_file(state_update_file_path).expect("issue while reading");
let blob_data = state_update_to_blob_data(block_no, state_update, &config)
let blob_data = state_update_to_blob_data(block_no, state_update, services.config)
.await
.expect("issue while converting state update to blob data");

Expand Down
Loading

0 comments on commit 21ec2cb

Please sign in to comment.