Skip to content

Commit

Permalink
feat : added snos worker implementation and unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Arun Jangra authored and Arun Jangra committed Jun 13, 2024
1 parent 2a9d77b commit 2e9cf16
Show file tree
Hide file tree
Showing 13 changed files with 184 additions and 14 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/coverage.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ jobs:
- name: Run llvm-cov
run: |
cargo llvm-cov nextest --release --lcov --output-path lcov.info
cargo llvm-cov nextest --release --lcov --output-path lcov.info --test-threads=1
- name: Upload coverage to codecov.io
uses: codecov/codecov-action@v3
Expand Down
9 changes: 8 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,4 @@ httpmock = { version = "0.7.0" }
da-client-interface = { path = "crates/da_clients/da-client-interface" }
ethereum-da-client = { path = "crates/da_clients/ethereum" }
utils = { path = "crates/utils" }
arc-swap = { version = "1.7.1" }
1 change: 1 addition & 0 deletions crates/orchestrator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ name = "orchestrator"
path = "src/main.rs"

[dependencies]
arc-swap = { workspace = true }
async-trait = { workspace = true }
axum = { workspace = true, features = ["macros"] }
axum-macros = { workspace = true }
Expand Down
24 changes: 21 additions & 3 deletions crates/orchestrator/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::database::{Database, DatabaseConfig};
use crate::queue::sqs::SqsQueue;
use crate::queue::QueueProvider;
use crate::utils::env_utils::get_env_var_or_panic;
use arc_swap::{ArcSwap, Guard};
use da_client_interface::DaClient;
use da_client_interface::DaConfig;
use dotenvy::dotenv;
Expand Down Expand Up @@ -79,11 +80,28 @@ impl Config {

/// The app config. It can be accessed from anywhere inside the service.
/// It's initialized only once.
pub static CONFIG: OnceCell<Config> = OnceCell::const_new();
/// We are using `ArcSwap` as it allow us to replace the new `Config` with
/// a new one which is required when running test cases. This approach was
/// inspired from here - https://github.com/matklad/once_cell/issues/127
pub static CONFIG: OnceCell<ArcSwap<Config>> = OnceCell::const_new();

/// Returns the app config. Initializes if not already done.
pub async fn config() -> &'static Config {
CONFIG.get_or_init(init_config).await
pub async fn config() -> Guard<Arc<Config>> {
let cfg = CONFIG.get_or_init(|| async { ArcSwap::from_pointee(init_config().await) }).await;
cfg.load()
}

/// OnceCell only allows us to initialize the config once and that's how it should be on production.
/// However, when running tests, we often want to reinitialize because we want to clear the DB and
/// set it up again for reuse in new tests. By calling `config_force_init` we replace the already
/// stored config inside `ArcSwap` with the new configuration and pool settings.
pub async fn config_force_init(config: Config) {
match CONFIG.get() {
Some(arc) => arc.store(Arc::new(config)),
None => {
CONFIG.get_or_init(|| async { ArcSwap::from_pointee(config) }).await;
}
}
}

/// Builds the DA client based on the environment variable DA_LAYER
Expand Down
3 changes: 3 additions & 0 deletions crates/orchestrator/src/database/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::jobs::types::{JobItem, JobStatus, JobType};
use ::mongodb::Cursor;
use async_trait::async_trait;
use color_eyre::Result;
use mockall::automock;
Expand Down Expand Up @@ -33,6 +34,8 @@ pub trait Database: Send + Sync {
) -> Result<()>;

async fn update_metadata(&self, job: &JobItem, metadata: HashMap<String, String>) -> Result<()>;
async fn get_latest_job_by_type(&self, job_type: JobType) -> Result<Option<JobItem>>;
async fn get_all_jobs(&self, job_type: JobType) -> Result<Cursor<JobItem>>;
}

pub trait DatabaseConfig {
Expand Down
23 changes: 21 additions & 2 deletions crates/orchestrator/src/database/mongodb/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ use async_trait::async_trait;
use color_eyre::eyre::eyre;
use color_eyre::Result;
use mongodb::bson::Document;
use mongodb::options::UpdateOptions;
use mongodb::options::{FindOneOptions, UpdateOptions};
use mongodb::{
bson::doc,
options::{ClientOptions, ServerApi, ServerApiVersion},
Client, Collection,
Client, Collection, Cursor,
};
use std::collections::HashMap;
use uuid::Uuid;
Expand Down Expand Up @@ -115,4 +115,23 @@ impl Database for MongoDb {
self.update_job_optimistically(job, update).await?;
Ok(())
}

async fn get_latest_job_by_type(&self, job_type: JobType) -> Result<Option<JobItem>> {
let filter = doc! {
"job_type": mongodb::bson::to_bson(&job_type)?,
};
let find_options = FindOneOptions::builder().sort(doc! { "internal_id": -1 }).build();
Ok(self
.get_job_collection()
.find_one(filter, find_options)
.await
.expect("Failed to fetch latest job by given job type"))
}

async fn get_all_jobs(&self, job_type: JobType) -> Result<Cursor<JobItem>> {
let filter = doc! {
"job_type": mongodb::bson::to_bson(&job_type)?,
};
Ok(self.get_job_collection().find(filter, None).await.expect("Failed to fetch jobs with given job type"))
}
}
7 changes: 4 additions & 3 deletions crates/orchestrator/src/jobs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ pub async fn create_job(job_type: JobType, internal_id: String, metadata: HashMa
}

let job_handler = get_job_handler(&job_type);
let job_item = job_handler.create_job(config, internal_id, metadata).await?;
let job_item = job_handler.create_job(config.as_ref(), internal_id, metadata).await?;
config.database().create_job(job_item.clone()).await?;

add_job_to_process_queue(job_item.id).await?;
Expand Down Expand Up @@ -90,7 +90,7 @@ pub async fn process_job(id: Uuid) -> Result<()> {
config.database().update_job_status(&job, JobStatus::LockedForProcessing).await?;

let job_handler = get_job_handler(&job.job_type);
let external_id = job_handler.process_job(config, &job).await?;
let external_id = job_handler.process_job(config.as_ref(), &job).await?;

let metadata = increment_key_in_metadata(&job.metadata, JOB_PROCESS_ATTEMPT_METADATA_KEY)?;
config
Expand Down Expand Up @@ -122,7 +122,7 @@ pub async fn verify_job(id: Uuid) -> Result<()> {
}

let job_handler = get_job_handler(&job.job_type);
let verification_status = job_handler.verify_job(config, &job).await?;
let verification_status = job_handler.verify_job(config.as_ref(), &job).await?;

match verification_status {
JobVerificationStatus::Verified => {
Expand Down Expand Up @@ -170,6 +170,7 @@ pub async fn verify_job(id: Uuid) -> Result<()> {
fn get_job_handler(job_type: &JobType) -> Box<dyn Job> {
match job_type {
JobType::DataSubmission => Box::new(da_job::DaJob),
JobType::SnosRun => Box::new(da_job::DaJob),
_ => unimplemented!("Job type not implemented yet."),
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/orchestrator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ async fn main() {
// init consumer
init_consumers().await.expect("Failed to init consumers");

// spawn a thread for each worker
// spawn a thread for each workers
// changes in rollup mode - sovereign, validity, validiums etc.
// will likely involve changes in these workers as well
tokio::spawn(start_cron(Box::new(SnosWorker), 60));
Expand Down
4 changes: 2 additions & 2 deletions crates/orchestrator/src/queue/job_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ const JOB_PROCESSING_QUEUE: &str = "madara_orchestrator_job_processing_queue";
const JOB_VERIFICATION_QUEUE: &str = "madara_orchestrator_job_verification_queue";

#[derive(Debug, Serialize, Deserialize)]
struct JobQueueMessage {
id: Uuid,
pub struct JobQueueMessage {
pub(crate) id: Uuid,
}

pub async fn add_job_to_process_queue(id: Uuid) -> Result<()> {
Expand Down
1 change: 1 addition & 0 deletions crates/orchestrator/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ pub mod server;
pub mod queue;

pub mod common;
mod workers;
85 changes: 85 additions & 0 deletions crates/orchestrator/src/tests/workers/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
use crate::config::config_force_init;
use crate::database::MockDatabase;
use crate::jobs::types::{ExternalId, JobItem, JobStatus, JobType};
use crate::queue::job_queue::JobQueueMessage;
use crate::queue::MockQueueProvider;
use crate::tests::common::init_config;
use crate::workers::snos::SnosWorker;
use crate::workers::Worker;
use da_client_interface::MockDaClient;
use httpmock::MockServer;
use rstest::rstest;
use serde_json::json;
use std::collections::HashMap;
use uuid::Uuid;

#[rstest]
#[tokio::test]
async fn test_create_job() {
let server = MockServer::start();
let da_client = MockDaClient::new();
let mut db = MockDatabase::new();
let mut queue = MockQueueProvider::new();

// Mocking db functions
db.expect_get_latest_job_by_type().returning(|_| Ok(None)).call(JobType::SnosRun).expect("Failed to call.");
// Getting jobs for check expectations
for i in 1..6 {
db.expect_get_job_by_internal_id_and_type()
.returning(|_, _| Ok(None))
.call(&i.to_string(), &JobType::SnosRun)
.expect("Failed to call.");
}

// Creating jobs expectations
for i in 1..6 {
db.expect_create_job()
.returning(|_| Ok(get_job_item_mock_by_id("1".to_string())))
.call(get_job_item_mock_by_id(i.to_string()))
.expect("Failed to call");
}

// Queue function call simulations
queue
.expect_send_message_to_queue()
.returning(|_, _, _| Ok(()))
.call(
"madara_orchestrator_job_processing_queue".to_string(),
serde_json::to_string(&JobQueueMessage { id: Uuid::new_v4() }).unwrap(),
None,
)
.expect("Failed to call");

// mock block number (madara) : 5
let rpc_response_block_number = 5;
let response = json!({ "id": 1,"jsonrpc":"2.0","result": rpc_response_block_number });

let config =
init_config(Some(format!("http://localhost:{}", server.port())), Some(db), Some(queue), Some(da_client)).await;
config_force_init(config).await;

// mocking block call
let rpc_block_call_mock = server.mock(|when, then| {
when.path("/").body_contains("starknet_blockNumber");
then.status(200).body(serde_json::to_vec(&response).unwrap());
});

let snos_worker = SnosWorker {};
snos_worker.run_worker().await;

rpc_block_call_mock.assert();
}

fn get_job_item_mock_by_id(id: String) -> JobItem {
let uuid = Uuid::new_v4();

JobItem {
id: uuid,
internal_id: id.clone(),
job_type: JobType::SnosRun,
status: JobStatus::Created,
external_id: ExternalId::Number(0),
metadata: HashMap::new(),
version: 0,
}
}
36 changes: 35 additions & 1 deletion crates/orchestrator/src/workers/snos.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
use crate::config::config;
use crate::jobs::create_job;
use crate::jobs::types::JobType;
use crate::workers::Worker;
use async_trait::async_trait;
use starknet::providers::Provider;
use std::collections::HashMap;
use tracing::log;

pub struct SnosWorker;

Expand All @@ -8,7 +14,35 @@ impl Worker for SnosWorker {
/// 1. Fetch the latest completed block from the Starknet chain
/// 2. Fetch the last block that had a SNOS job run.
/// 3. Create SNOS run jobs for all the remaining blocks
// TEST : added config temporarily to test
async fn run_worker(&self) {
todo!()
let config = config().await;
let provider = config.starknet_client();
let latest_block_number = provider.block_number().await.expect("Failed to fetch block number from rpc");
let latest_block_processed_data = config
.database()
.get_latest_job_by_type(JobType::SnosRun)
.await
.unwrap()
.map(|item| item.internal_id)
.unwrap_or("0".to_string());

let latest_block_processed: u64 =
latest_block_processed_data.parse().expect("Failed to convert block number from JobItem into u64");

let block_diff = latest_block_number - latest_block_processed;

// if all blocks are processed
if block_diff == 0 {
return;
}

for x in latest_block_processed + 1..latest_block_number + 1 {
create_job(JobType::SnosRun, x.to_string(), HashMap::new())
.await
.expect("Error : failed to create job for snos workers.");
}

log::info!("jobs created !!");
}
}

0 comments on commit 2e9cf16

Please sign in to comment.