Skip to content

Commit

Permalink
feat : added snos worker implementation and unit tests (#16)
Browse files Browse the repository at this point in the history
* feat : added snos worker implementation and unit tests

* feat : added review #1 changes : added error handling for snos workers

* feat : added review #1 changes : added error handling for snos workers

* fix : lint

* fix : lint errors

---------

Co-authored-by: Arun Jangra <[email protected]>
  • Loading branch information
ocdbytes and Arun Jangra authored Jun 17, 2024
1 parent 4343a41 commit b625f90
Show file tree
Hide file tree
Showing 18 changed files with 199 additions and 22 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/coverage.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ jobs:
- name: Run llvm-cov
run: |
cargo llvm-cov nextest --release --lcov --output-path lcov.info
cargo llvm-cov nextest --release --lcov --output-path lcov.info --test-threads=1
- name: Upload coverage to codecov.io
uses: codecov/codecov-action@v3
Expand Down
10 changes: 9 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,5 +42,6 @@ uuid = { version = "1.7.0" }
num-bigint = { version = "0.4.4" }
httpmock = { version = "0.7.0" }
utils = { path = "crates/utils" }
arc-swap = { version = "1.7.1" }
num-traits = "0.2"
lazy_static = "1.4.0"
3 changes: 2 additions & 1 deletion crates/orchestrator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ name = "orchestrator"
path = "src/main.rs"

[dependencies]

arc-swap = { workspace = true }
async-std = "1.12.0"
async-trait = { workspace = true }
axum = { workspace = true, features = ["macros"] }
axum-macros = { workspace = true }
Expand Down
25 changes: 22 additions & 3 deletions crates/orchestrator/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::database::{Database, DatabaseConfig};
use crate::queue::sqs::SqsQueue;
use crate::queue::QueueProvider;
use crate::utils::env_utils::get_env_var_or_panic;
use arc_swap::{ArcSwap, Guard};
use da_client_interface::DaClient;
use da_client_interface::DaConfig;
use dotenvy::dotenv;
Expand Down Expand Up @@ -79,11 +80,29 @@ impl Config {

/// The app config. It can be accessed from anywhere inside the service.
/// It's initialized only once.
pub static CONFIG: OnceCell<Config> = OnceCell::const_new();
/// We are using `ArcSwap` as it allow us to replace the new `Config` with
/// a new one which is required when running test cases. This approach was
/// inspired from here - https://github.com/matklad/once_cell/issues/127
pub static CONFIG: OnceCell<ArcSwap<Config>> = OnceCell::const_new();

/// Returns the app config. Initializes if not already done.
pub async fn config() -> &'static Config {
CONFIG.get_or_init(init_config).await
pub async fn config() -> Guard<Arc<Config>> {
let cfg = CONFIG.get_or_init(|| async { ArcSwap::from_pointee(init_config().await) }).await;
cfg.load()
}

/// OnceCell only allows us to initialize the config once and that's how it should be on production.
/// However, when running tests, we often want to reinitialize because we want to clear the DB and
/// set it up again for reuse in new tests. By calling `config_force_init` we replace the already
/// stored config inside `ArcSwap` with the new configuration and pool settings.
#[cfg(test)]
pub async fn config_force_init(config: Config) {
match CONFIG.get() {
Some(arc) => arc.store(Arc::new(config)),
None => {
CONFIG.get_or_init(|| async { ArcSwap::from_pointee(config) }).await;
}
}
}

/// Builds the DA client based on the environment variable DA_LAYER
Expand Down
1 change: 1 addition & 0 deletions crates/orchestrator/src/database/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ pub trait Database: Send + Sync {
) -> Result<()>;

async fn update_metadata(&self, job: &JobItem, metadata: HashMap<String, String>) -> Result<()>;
async fn get_latest_job_by_type_and_internal_id(&self, job_type: JobType) -> Result<Option<JobItem>>;
}

pub trait DatabaseConfig {
Expand Down
14 changes: 13 additions & 1 deletion crates/orchestrator/src/database/mongodb/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use async_trait::async_trait;
use color_eyre::eyre::eyre;
use color_eyre::Result;
use mongodb::bson::Document;
use mongodb::options::UpdateOptions;
use mongodb::options::{FindOneOptions, UpdateOptions};
use mongodb::{
bson::doc,
options::{ClientOptions, ServerApi, ServerApiVersion},
Expand Down Expand Up @@ -115,4 +115,16 @@ impl Database for MongoDb {
self.update_job_optimistically(job, update).await?;
Ok(())
}

async fn get_latest_job_by_type_and_internal_id(&self, job_type: JobType) -> Result<Option<JobItem>> {
let filter = doc! {
"job_type": mongodb::bson::to_bson(&job_type)?,
};
let find_options = FindOneOptions::builder().sort(doc! { "internal_id": -1 }).build();
Ok(self
.get_job_collection()
.find_one(filter, find_options)
.await
.expect("Failed to fetch latest job by given job type"))
}
}
7 changes: 4 additions & 3 deletions crates/orchestrator/src/jobs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ pub async fn create_job(job_type: JobType, internal_id: String, metadata: HashMa
}

let job_handler = get_job_handler(&job_type);
let job_item = job_handler.create_job(config, internal_id, metadata).await?;
let job_item = job_handler.create_job(config.as_ref(), internal_id, metadata).await?;
config.database().create_job(job_item.clone()).await?;

add_job_to_process_queue(job_item.id).await?;
Expand Down Expand Up @@ -90,7 +90,7 @@ pub async fn process_job(id: Uuid) -> Result<()> {
config.database().update_job_status(&job, JobStatus::LockedForProcessing).await?;

let job_handler = get_job_handler(&job.job_type);
let external_id = job_handler.process_job(config, &job).await?;
let external_id = job_handler.process_job(config.as_ref(), &job).await?;

let metadata = increment_key_in_metadata(&job.metadata, JOB_PROCESS_ATTEMPT_METADATA_KEY)?;
config
Expand Down Expand Up @@ -122,7 +122,7 @@ pub async fn verify_job(id: Uuid) -> Result<()> {
}

let job_handler = get_job_handler(&job.job_type);
let verification_status = job_handler.verify_job(config, &job).await?;
let verification_status = job_handler.verify_job(config.as_ref(), &job).await?;

match verification_status {
JobVerificationStatus::Verified => {
Expand Down Expand Up @@ -170,6 +170,7 @@ pub async fn verify_job(id: Uuid) -> Result<()> {
fn get_job_handler(job_type: &JobType) -> Box<dyn Job> {
match job_type {
JobType::DataSubmission => Box::new(da_job::DaJob),
JobType::SnosRun => Box::new(snos_job::SnosJob),
_ => unimplemented!("Job type not implemented yet."),
}
}
Expand Down
4 changes: 2 additions & 2 deletions crates/orchestrator/src/jobs/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::collections::HashMap;
use uuid::Uuid;

/// An external id.
#[derive(Serialize, Deserialize, Debug, Clone)]
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
#[serde(untagged)]
pub enum ExternalId {
/// A string.
Expand Down Expand Up @@ -98,7 +98,7 @@ pub enum JobStatus {
VerificationFailed,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
pub struct JobItem {
/// an uuid to identify a job
#[serde(with = "uuid_1_as_binary")]
Expand Down
4 changes: 2 additions & 2 deletions crates/orchestrator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ async fn main() {
// init consumer
init_consumers().await.expect("Failed to init consumers");

// spawn a thread for each worker
// spawn a thread for each workers
// changes in rollup mode - sovereign, validity, validiums etc.
// will likely involve changes in these workers as well
tokio::spawn(start_cron(Box::new(SnosWorker), 60));
Expand All @@ -40,7 +40,7 @@ async fn main() {

async fn start_cron(worker: Box<dyn Worker>, interval: u64) {
loop {
worker.run_worker().await;
worker.run_worker().await.expect("Error in running the worker.");
tokio::time::sleep(tokio::time::Duration::from_secs(interval)).await;
}
}
4 changes: 2 additions & 2 deletions crates/orchestrator/src/queue/job_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ const JOB_PROCESSING_QUEUE: &str = "madara_orchestrator_job_processing_queue";
const JOB_VERIFICATION_QUEUE: &str = "madara_orchestrator_job_verification_queue";

#[derive(Debug, Serialize, Deserialize)]
struct JobQueueMessage {
id: Uuid,
pub struct JobQueueMessage {
pub(crate) id: Uuid,
}

pub async fn add_job_to_process_queue(id: Uuid) -> Result<()> {
Expand Down
1 change: 1 addition & 0 deletions crates/orchestrator/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ pub mod server;
pub mod queue;

pub mod common;
mod workers;
99 changes: 99 additions & 0 deletions crates/orchestrator/src/tests/workers/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
use crate::config::config_force_init;
use crate::database::MockDatabase;
use crate::jobs::types::{ExternalId, JobItem, JobStatus, JobType};
use crate::queue::MockQueueProvider;
use crate::tests::common::init_config;
use crate::workers::snos::SnosWorker;
use crate::workers::Worker;
use da_client_interface::MockDaClient;
use httpmock::MockServer;
use mockall::predicate::eq;
use rstest::rstest;
use serde_json::json;
use std::collections::HashMap;
use std::error::Error;
use uuid::Uuid;

#[rstest]
#[case(false)]
#[case(true)]
#[tokio::test]
async fn test_snos_worker(#[case] db_val: bool) -> Result<(), Box<dyn Error>> {
let server = MockServer::start();
let da_client = MockDaClient::new();
let mut db = MockDatabase::new();
let mut queue = MockQueueProvider::new();
let start_job_index;
let block;

const JOB_PROCESSING_QUEUE: &str = "madara_orchestrator_job_processing_queue";

// Mocking db function expectations
if !db_val {
db.expect_get_latest_job_by_type_and_internal_id().times(1).with(eq(JobType::SnosRun)).returning(|_| Ok(None));
start_job_index = 1;
block = 5;
} else {
let uuid_temp = Uuid::new_v4();

db.expect_get_latest_job_by_type_and_internal_id()
.with(eq(JobType::SnosRun))
.returning(move |_| Ok(Some(get_job_item_mock_by_id("1".to_string(), uuid_temp))));
block = 6;
start_job_index = 2;
}

for i in start_job_index..block + 1 {
// Getting jobs for check expectations
db.expect_get_job_by_internal_id_and_type()
.times(1)
.with(eq(i.clone().to_string()), eq(JobType::SnosRun))
.returning(|_, _| Ok(None));

let uuid = Uuid::new_v4();

// creating jobs call expectations
db.expect_create_job()
.times(1)
.withf(move |item| item.internal_id == i.clone().to_string())
.returning(move |_| Ok(get_job_item_mock_by_id(i.clone().to_string(), uuid)));
}

// Queue function call simulations
queue
.expect_send_message_to_queue()
.returning(|_, _, _| Ok(()))
.withf(|queue, _payload, _delay| queue == JOB_PROCESSING_QUEUE);

// mock block number (madara) : 5
let rpc_response_block_number = block;
let response = json!({ "id": 1,"jsonrpc":"2.0","result": rpc_response_block_number });
let config =
init_config(Some(format!("http://localhost:{}", server.port())), Some(db), Some(queue), Some(da_client)).await;
config_force_init(config).await;

// mocking block call
let rpc_block_call_mock = server.mock(|when, then| {
when.path("/").body_contains("starknet_blockNumber");
then.status(200).body(serde_json::to_vec(&response).unwrap());
});

let snos_worker = SnosWorker {};
snos_worker.run_worker().await?;

rpc_block_call_mock.assert();

Ok(())
}

fn get_job_item_mock_by_id(id: String, uuid: Uuid) -> JobItem {
JobItem {
id: uuid,
internal_id: id.clone(),
job_type: JobType::SnosRun,
status: JobStatus::Created,
external_id: ExternalId::Number(0),
metadata: HashMap::new(),
version: 0,
}
}
3 changes: 2 additions & 1 deletion crates/orchestrator/src/workers/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use async_trait::async_trait;
use std::error::Error;

pub mod proof_registration;
pub mod proving;
Expand All @@ -7,5 +8,5 @@ pub mod update_state;

#[async_trait]
pub trait Worker: Send + Sync {
async fn run_worker(&self);
async fn run_worker(&self) -> Result<(), Box<dyn Error>>;
}
3 changes: 2 additions & 1 deletion crates/orchestrator/src/workers/proof_registration.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::workers::Worker;
use async_trait::async_trait;
use std::error::Error;

pub struct ProofRegistrationWorker;

Expand All @@ -8,7 +9,7 @@ impl Worker for ProofRegistrationWorker {
/// 1. Fetch all blocks with a successful proving job run
/// 2. Group blocks that have the same proof
/// 3. For each group, create a proof registration job with from and to block in metadata
async fn run_worker(&self) {
async fn run_worker(&self) -> Result<(), Box<dyn Error>> {
todo!()
}
}
3 changes: 2 additions & 1 deletion crates/orchestrator/src/workers/proving.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
use crate::workers::Worker;
use async_trait::async_trait;
use std::error::Error;

pub struct ProvingWorker;

#[async_trait]
impl Worker for ProvingWorker {
/// 1. Fetch all successful SNOS job runs that don't have a proving job
/// 2. Create a proving job for each SNOS job run
async fn run_worker(&self) {
async fn run_worker(&self) -> Result<(), Box<dyn Error>> {
todo!()
}
}
Loading

0 comments on commit b625f90

Please sign in to comment.