Skip to content

Commit

Permalink
boilerplate code (#9)
Browse files Browse the repository at this point in the history
* boilerplate code

* fix taplo

* fmt fix
  • Loading branch information
apoorvsadana authored and Tranduy1dol committed Aug 1, 2024
1 parent a703ef5 commit c2ba064
Show file tree
Hide file tree
Showing 19 changed files with 344 additions and 8 deletions.
11 changes: 11 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ members = [
"crates/da_clients/da-client-interface",
"crates/da_clients/ethereum",
"crates/utils",
"crates/settlement_clients/settlement-client-interface",
]

[workspace.package]
Expand Down
3 changes: 2 additions & 1 deletion crates/orchestrator/src/controllers/jobs_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::controllers::errors::AppError;
use crate::jobs::types::JobType;
use axum::extract::Json;
use serde::Deserialize;
use std::collections::HashMap;

/// Client request to create a job
#[derive(Debug, Deserialize)]
Expand All @@ -16,6 +17,6 @@ pub struct CreateJobRequest {

/// Create a job
pub async fn create_job(Json(payload): Json<CreateJobRequest>) -> Result<Json<()>, AppError> {
crate::jobs::create_job(payload.job_type, payload.internal_id).await?;
crate::jobs::create_job(payload.job_type, payload.internal_id, HashMap::new()).await?;
Ok(Json::from(()))
}
9 changes: 7 additions & 2 deletions crates/orchestrator/src/jobs/da_job/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,19 @@ pub struct DaJob;

#[async_trait]
impl Job for DaJob {
async fn create_job(&self, _config: &Config, internal_id: String) -> Result<JobItem> {
async fn create_job(
&self,
_config: &Config,
internal_id: String,
metadata: HashMap<String, String>,
) -> Result<JobItem> {
Ok(JobItem {
id: Uuid::new_v4(),
internal_id,
job_type: JobType::DataSubmission,
status: JobStatus::Created,
external_id: String::new().into(),
metadata: HashMap::new(),
metadata,
version: 0,
})
}
Expand Down
14 changes: 11 additions & 3 deletions crates/orchestrator/src/jobs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ use uuid::Uuid;

mod constants;
pub mod da_job;
mod register_proof_job;
pub mod snos_job;
mod state_update_job;
pub mod types;

/// The Job trait is used to define the methods that a job
Expand All @@ -20,7 +23,12 @@ pub mod types;
#[async_trait]
pub trait Job: Send + Sync {
/// Should build a new job item and return it
async fn create_job(&self, config: &Config, internal_id: String) -> Result<JobItem>;
async fn create_job(
&self,
config: &Config,
internal_id: String,
metadata: HashMap<String, String>,
) -> Result<JobItem>;
/// Should process the job and return the external_id which can be used to
/// track the status of the job. For example, a DA job will submit the state diff
/// to the DA layer and return the txn hash.
Expand All @@ -40,7 +48,7 @@ pub trait Job: Send + Sync {
}

/// Creates the job in the DB in the created state and adds it to the process queue
pub async fn create_job(job_type: JobType, internal_id: String) -> Result<()> {
pub async fn create_job(job_type: JobType, internal_id: String, metadata: HashMap<String, String>) -> Result<()> {
let config = config().await;
let existing_job = config.database().get_job_by_internal_id_and_type(internal_id.as_str(), &job_type).await?;
if existing_job.is_some() {
Expand All @@ -53,7 +61,7 @@ pub async fn create_job(job_type: JobType, internal_id: String) -> Result<()> {
}

let job_handler = get_job_handler(&job_type);
let job_item = job_handler.create_job(config, internal_id).await?;
let job_item = job_handler.create_job(config, internal_id, metadata).await?;
config.database().create_job(job_item.clone()).await?;

add_job_to_process_queue(job_item.id).await?;
Expand Down
55 changes: 55 additions & 0 deletions crates/orchestrator/src/jobs/register_proof_job/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
use crate::config::Config;
use crate::jobs::types::{JobItem, JobStatus, JobType, JobVerificationStatus};
use crate::jobs::Job;
use async_trait::async_trait;
use color_eyre::Result;
use std::collections::HashMap;
use uuid::Uuid;

pub struct RegisterProofJob;

#[async_trait]
impl Job for RegisterProofJob {
async fn create_job(
&self,
_config: &Config,
internal_id: String,
metadata: HashMap<String, String>,
) -> Result<JobItem> {
Ok(JobItem {
id: Uuid::new_v4(),
internal_id,
job_type: JobType::ProofRegistration,
status: JobStatus::Created,
external_id: String::new().into(),
// metadata must contain the blocks that have been included inside this proof
// this will allow state update jobs to be created for each block
metadata,
version: 0,
})
}

async fn process_job(&self, _config: &Config, _job: &JobItem) -> Result<String> {
// Get proof from S3 and submit on chain for verification
// We need to implement a generic trait for this to support multiple
// base layers
todo!()
}

async fn verify_job(&self, _config: &Config, _job: &JobItem) -> Result<JobVerificationStatus> {
// verify that the proof transaction has been included on chain
todo!()
}

fn max_process_attempts(&self) -> u64 {
todo!()
}

fn max_verification_attempts(&self) -> u64 {
todo!()
}

fn verification_polling_delay_seconds(&self) -> u64 {
todo!()
}
}
54 changes: 54 additions & 0 deletions crates/orchestrator/src/jobs/snos_job/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
use crate::config::Config;
use crate::jobs::types::{JobItem, JobStatus, JobType, JobVerificationStatus};
use crate::jobs::Job;
use async_trait::async_trait;
use color_eyre::Result;
use std::collections::HashMap;
use uuid::Uuid;

pub struct SnosJob;

#[async_trait]
impl Job for SnosJob {
async fn create_job(
&self,
_config: &Config,
internal_id: String,
metadata: HashMap<String, String>,
) -> Result<JobItem> {
Ok(JobItem {
id: Uuid::new_v4(),
internal_id,
job_type: JobType::SnosRun,
status: JobStatus::Created,
external_id: String::new().into(),
metadata,
version: 0,
})
}

async fn process_job(&self, _config: &Config, _job: &JobItem) -> Result<String> {
// 1. Fetch SNOS input data from Madara
// 2. Import SNOS in Rust and execute it with the input data
// 3. Store the received PIE in DB
todo!()
}

async fn verify_job(&self, _config: &Config, _job: &JobItem) -> Result<JobVerificationStatus> {
// No need for verification as of now. If we later on decide to outsource SNOS run
// to another servicehow a, verify_job can be used to poll on the status of the job
todo!()
}

fn max_process_attempts(&self) -> u64 {
todo!()
}

fn max_verification_attempts(&self) -> u64 {
todo!()
}

fn verification_polling_delay_seconds(&self) -> u64 {
todo!()
}
}
54 changes: 54 additions & 0 deletions crates/orchestrator/src/jobs/state_update_job/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
use crate::config::Config;
use crate::jobs::types::{JobItem, JobStatus, JobType, JobVerificationStatus};
use crate::jobs::Job;
use async_trait::async_trait;
use color_eyre::Result;
use std::collections::HashMap;
use uuid::Uuid;

pub struct StateUpdateJob;

#[async_trait]
impl Job for StateUpdateJob {
async fn create_job(
&self,
_config: &Config,
internal_id: String,
metadata: HashMap<String, String>,
) -> Result<JobItem> {
Ok(JobItem {
id: Uuid::new_v4(),
internal_id,
job_type: JobType::ProofRegistration,
status: JobStatus::Created,
external_id: String::new().into(),
// metadata must contain the blocks for which state update will be performed
// we don't do one job per state update as that makes nonce management complicated
metadata,
version: 0,
})
}

async fn process_job(&self, _config: &Config, _job: &JobItem) -> Result<String> {
// Read the metadata to get the blocks for which state update will be performed.
// For each block, get the program output (from the PIE?) and the
todo!()
}

async fn verify_job(&self, _config: &Config, _job: &JobItem) -> Result<JobVerificationStatus> {
// verify that the proof transaction has been included on chain
todo!()
}

fn max_process_attempts(&self) -> u64 {
todo!()
}

fn max_verification_attempts(&self) -> u64 {
todo!()
}

fn verification_polling_delay_seconds(&self) -> u64 {
todo!()
}
}
4 changes: 3 additions & 1 deletion crates/orchestrator/src/jobs/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,14 @@ fn unwrap_external_id_failed(expected: &str, got: &ExternalId) -> color_eyre::ey

#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
pub enum JobType {
/// Running SNOS for a block
SnosRun,
/// Submitting DA data to the DA layer
DataSubmission,
/// Getting a proof from the proving service
ProofCreation,
/// Verifying the proof on the base layer
ProofVerification,
ProofRegistration,
/// Updaing the state root on the base layer
StateUpdation,
}
Expand Down
2 changes: 2 additions & 0 deletions crates/orchestrator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ pub mod queue;
pub mod routes;
/// Contains the utils
pub mod utils;
/// Contains workers which act like cron jobs
pub mod workers;

#[cfg(test)]
mod tests;
20 changes: 20 additions & 0 deletions crates/orchestrator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@ use orchestrator::config::config;
use orchestrator::queue::init_consumers;
use orchestrator::routes::app_router;
use orchestrator::utils::env_utils::get_env_var_or_default;
use orchestrator::workers::proof_registration::ProofRegistrationWorker;
use orchestrator::workers::proving::ProvingWorker;
use orchestrator::workers::snos::SnosWorker;
use orchestrator::workers::update_state::UpdateStateWorker;
use orchestrator::workers::*;

/// Start the server
#[tokio::main]
Expand All @@ -21,6 +26,21 @@ async fn main() {
// init consumer
init_consumers().await.expect("Failed to init consumers");

// spawn a thread for each worker
// changes in rollup mode - sovereign, validity, validiums etc.
// will likely involve changes in these workers as well
tokio::spawn(start_cron(Box::new(SnosWorker), 60));
tokio::spawn(start_cron(Box::new(ProvingWorker), 60));
tokio::spawn(start_cron(Box::new(ProofRegistrationWorker), 60));
tokio::spawn(start_cron(Box::new(UpdateStateWorker), 60));

tracing::info!("Listening on http://{}", address);
axum::serve(listener, app).await.expect("Failed to start axum server");
}

async fn start_cron(worker: Box<dyn Worker>, interval: u64) {
loop {
worker.run_worker().await;
tokio::time::sleep(tokio::time::Duration::from_secs(interval)).await;
}
}
2 changes: 1 addition & 1 deletion crates/orchestrator/src/tests/jobs/da_job/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use da_client_interface::{DaVerificationStatus, MockDaClient};
#[tokio::test]
async fn test_create_job() {
let config = init_config(None, None, None, None).await;
let job = DaJob.create_job(&config, String::from("0")).await;
let job = DaJob.create_job(&config, String::from("0"), HashMap::new()).await;
assert!(job.is_ok());

let job = job.unwrap();
Expand Down
11 changes: 11 additions & 0 deletions crates/orchestrator/src/workers/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
use async_trait::async_trait;

pub mod proof_registration;
pub mod proving;
pub mod snos;
pub mod update_state;

#[async_trait]
pub trait Worker: Send + Sync {
async fn run_worker(&self);
}
14 changes: 14 additions & 0 deletions crates/orchestrator/src/workers/proof_registration.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
use crate::workers::Worker;
use async_trait::async_trait;

pub struct ProofRegistrationWorker;

#[async_trait]
impl Worker for ProofRegistrationWorker {
/// 1. Fetch all blocks with a successful proving job run
/// 2. Group blocks that have the same proof
/// 3. For each group, create a proof registration job with from and to block in metadata
async fn run_worker(&self) {
todo!()
}
}
13 changes: 13 additions & 0 deletions crates/orchestrator/src/workers/proving.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
use crate::workers::Worker;
use async_trait::async_trait;

pub struct ProvingWorker;

#[async_trait]
impl Worker for ProvingWorker {
/// 1. Fetch all successful SNOS job runs that don't have a proving job
/// 2. Create a proving job for each SNOS job run
async fn run_worker(&self) {
todo!()
}
}
14 changes: 14 additions & 0 deletions crates/orchestrator/src/workers/snos.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
use crate::workers::Worker;
use async_trait::async_trait;

pub struct SnosWorker;

#[async_trait]
impl Worker for SnosWorker {
/// 1. Fetch the latest completed block from the Starknet chain
/// 2. Fetch the last block that had a SNOS job run.
/// 3. Create SNOS run jobs for all the remaining blocks
async fn run_worker(&self) {
todo!()
}
}
Loading

0 comments on commit c2ba064

Please sign in to comment.