diff --git a/Cargo.lock b/Cargo.lock index c0338772..d926e2e2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4833,6 +4833,17 @@ dependencies = [ "syn 2.0.58", ] +[[package]] +name = "settlement-client-interface" +version = "0.1.0" +dependencies = [ + "async-trait", + "axum 0.7.5", + "color-eyre", + "mockall", + "starknet", +] + [[package]] name = "sha-1" version = "0.10.1" diff --git a/Cargo.toml b/Cargo.toml index 5c3ca3aa..2f90e85a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,6 +5,7 @@ members = [ "crates/da_clients/da-client-interface", "crates/da_clients/ethereum", "crates/utils", + "crates/settlement_clients/settlement-client-interface", ] [workspace.package] diff --git a/crates/orchestrator/src/controllers/jobs_controller.rs b/crates/orchestrator/src/controllers/jobs_controller.rs index 199ff633..4ac8c388 100644 --- a/crates/orchestrator/src/controllers/jobs_controller.rs +++ b/crates/orchestrator/src/controllers/jobs_controller.rs @@ -2,6 +2,7 @@ use crate::controllers::errors::AppError; use crate::jobs::types::JobType; use axum::extract::Json; use serde::Deserialize; +use std::collections::HashMap; /// Client request to create a job #[derive(Debug, Deserialize)] @@ -16,6 +17,6 @@ pub struct CreateJobRequest { /// Create a job pub async fn create_job(Json(payload): Json) -> Result, AppError> { - crate::jobs::create_job(payload.job_type, payload.internal_id).await?; + crate::jobs::create_job(payload.job_type, payload.internal_id, HashMap::new()).await?; Ok(Json::from(())) } diff --git a/crates/orchestrator/src/jobs/da_job/mod.rs b/crates/orchestrator/src/jobs/da_job/mod.rs index 5c00930d..b5108898 100644 --- a/crates/orchestrator/src/jobs/da_job/mod.rs +++ b/crates/orchestrator/src/jobs/da_job/mod.rs @@ -14,14 +14,19 @@ pub struct DaJob; #[async_trait] impl Job for DaJob { - async fn create_job(&self, _config: &Config, internal_id: String) -> Result { + async fn create_job( + &self, + _config: &Config, + internal_id: String, + metadata: HashMap, + ) -> Result { Ok(JobItem { id: Uuid::new_v4(), internal_id, job_type: JobType::DataSubmission, status: JobStatus::Created, external_id: String::new().into(), - metadata: HashMap::new(), + metadata, version: 0, }) } diff --git a/crates/orchestrator/src/jobs/mod.rs b/crates/orchestrator/src/jobs/mod.rs index c75aac0d..f7011180 100644 --- a/crates/orchestrator/src/jobs/mod.rs +++ b/crates/orchestrator/src/jobs/mod.rs @@ -12,6 +12,9 @@ use uuid::Uuid; mod constants; pub mod da_job; +mod register_proof_job; +pub mod snos_job; +mod state_update_job; pub mod types; /// The Job trait is used to define the methods that a job @@ -20,7 +23,12 @@ pub mod types; #[async_trait] pub trait Job: Send + Sync { /// Should build a new job item and return it - async fn create_job(&self, config: &Config, internal_id: String) -> Result; + async fn create_job( + &self, + config: &Config, + internal_id: String, + metadata: HashMap, + ) -> Result; /// Should process the job and return the external_id which can be used to /// track the status of the job. For example, a DA job will submit the state diff /// to the DA layer and return the txn hash. @@ -40,7 +48,7 @@ pub trait Job: Send + Sync { } /// Creates the job in the DB in the created state and adds it to the process queue -pub async fn create_job(job_type: JobType, internal_id: String) -> Result<()> { +pub async fn create_job(job_type: JobType, internal_id: String, metadata: HashMap) -> Result<()> { let config = config().await; let existing_job = config.database().get_job_by_internal_id_and_type(internal_id.as_str(), &job_type).await?; if existing_job.is_some() { @@ -53,7 +61,7 @@ pub async fn create_job(job_type: JobType, internal_id: String) -> Result<()> { } let job_handler = get_job_handler(&job_type); - let job_item = job_handler.create_job(config, internal_id).await?; + let job_item = job_handler.create_job(config, internal_id, metadata).await?; config.database().create_job(job_item.clone()).await?; add_job_to_process_queue(job_item.id).await?; diff --git a/crates/orchestrator/src/jobs/register_proof_job/mod.rs b/crates/orchestrator/src/jobs/register_proof_job/mod.rs new file mode 100644 index 00000000..5f7a36d4 --- /dev/null +++ b/crates/orchestrator/src/jobs/register_proof_job/mod.rs @@ -0,0 +1,55 @@ +use crate::config::Config; +use crate::jobs::types::{JobItem, JobStatus, JobType, JobVerificationStatus}; +use crate::jobs::Job; +use async_trait::async_trait; +use color_eyre::Result; +use std::collections::HashMap; +use uuid::Uuid; + +pub struct RegisterProofJob; + +#[async_trait] +impl Job for RegisterProofJob { + async fn create_job( + &self, + _config: &Config, + internal_id: String, + metadata: HashMap, + ) -> Result { + Ok(JobItem { + id: Uuid::new_v4(), + internal_id, + job_type: JobType::ProofRegistration, + status: JobStatus::Created, + external_id: String::new().into(), + // metadata must contain the blocks that have been included inside this proof + // this will allow state update jobs to be created for each block + metadata, + version: 0, + }) + } + + async fn process_job(&self, _config: &Config, _job: &JobItem) -> Result { + // Get proof from S3 and submit on chain for verification + // We need to implement a generic trait for this to support multiple + // base layers + todo!() + } + + async fn verify_job(&self, _config: &Config, _job: &JobItem) -> Result { + // verify that the proof transaction has been included on chain + todo!() + } + + fn max_process_attempts(&self) -> u64 { + todo!() + } + + fn max_verification_attempts(&self) -> u64 { + todo!() + } + + fn verification_polling_delay_seconds(&self) -> u64 { + todo!() + } +} diff --git a/crates/orchestrator/src/jobs/snos_job/mod.rs b/crates/orchestrator/src/jobs/snos_job/mod.rs new file mode 100644 index 00000000..50fe0013 --- /dev/null +++ b/crates/orchestrator/src/jobs/snos_job/mod.rs @@ -0,0 +1,54 @@ +use crate::config::Config; +use crate::jobs::types::{JobItem, JobStatus, JobType, JobVerificationStatus}; +use crate::jobs::Job; +use async_trait::async_trait; +use color_eyre::Result; +use std::collections::HashMap; +use uuid::Uuid; + +pub struct SnosJob; + +#[async_trait] +impl Job for SnosJob { + async fn create_job( + &self, + _config: &Config, + internal_id: String, + metadata: HashMap, + ) -> Result { + Ok(JobItem { + id: Uuid::new_v4(), + internal_id, + job_type: JobType::SnosRun, + status: JobStatus::Created, + external_id: String::new().into(), + metadata, + version: 0, + }) + } + + async fn process_job(&self, _config: &Config, _job: &JobItem) -> Result { + // 1. Fetch SNOS input data from Madara + // 2. Import SNOS in Rust and execute it with the input data + // 3. Store the received PIE in DB + todo!() + } + + async fn verify_job(&self, _config: &Config, _job: &JobItem) -> Result { + // No need for verification as of now. If we later on decide to outsource SNOS run + // to another servicehow a, verify_job can be used to poll on the status of the job + todo!() + } + + fn max_process_attempts(&self) -> u64 { + todo!() + } + + fn max_verification_attempts(&self) -> u64 { + todo!() + } + + fn verification_polling_delay_seconds(&self) -> u64 { + todo!() + } +} diff --git a/crates/orchestrator/src/jobs/state_update_job/mod.rs b/crates/orchestrator/src/jobs/state_update_job/mod.rs new file mode 100644 index 00000000..384eaf71 --- /dev/null +++ b/crates/orchestrator/src/jobs/state_update_job/mod.rs @@ -0,0 +1,54 @@ +use crate::config::Config; +use crate::jobs::types::{JobItem, JobStatus, JobType, JobVerificationStatus}; +use crate::jobs::Job; +use async_trait::async_trait; +use color_eyre::Result; +use std::collections::HashMap; +use uuid::Uuid; + +pub struct StateUpdateJob; + +#[async_trait] +impl Job for StateUpdateJob { + async fn create_job( + &self, + _config: &Config, + internal_id: String, + metadata: HashMap, + ) -> Result { + Ok(JobItem { + id: Uuid::new_v4(), + internal_id, + job_type: JobType::ProofRegistration, + status: JobStatus::Created, + external_id: String::new().into(), + // metadata must contain the blocks for which state update will be performed + // we don't do one job per state update as that makes nonce management complicated + metadata, + version: 0, + }) + } + + async fn process_job(&self, _config: &Config, _job: &JobItem) -> Result { + // Read the metadata to get the blocks for which state update will be performed. + // For each block, get the program output (from the PIE?) and the + todo!() + } + + async fn verify_job(&self, _config: &Config, _job: &JobItem) -> Result { + // verify that the proof transaction has been included on chain + todo!() + } + + fn max_process_attempts(&self) -> u64 { + todo!() + } + + fn max_verification_attempts(&self) -> u64 { + todo!() + } + + fn verification_polling_delay_seconds(&self) -> u64 { + todo!() + } +} diff --git a/crates/orchestrator/src/jobs/types.rs b/crates/orchestrator/src/jobs/types.rs index 1aba242c..bcd8556b 100644 --- a/crates/orchestrator/src/jobs/types.rs +++ b/crates/orchestrator/src/jobs/types.rs @@ -68,12 +68,14 @@ fn unwrap_external_id_failed(expected: &str, got: &ExternalId) -> color_eyre::ey #[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] pub enum JobType { + /// Running SNOS for a block + SnosRun, /// Submitting DA data to the DA layer DataSubmission, /// Getting a proof from the proving service ProofCreation, /// Verifying the proof on the base layer - ProofVerification, + ProofRegistration, /// Updaing the state root on the base layer StateUpdation, } diff --git a/crates/orchestrator/src/lib.rs b/crates/orchestrator/src/lib.rs index c578041d..618c8481 100644 --- a/crates/orchestrator/src/lib.rs +++ b/crates/orchestrator/src/lib.rs @@ -14,6 +14,8 @@ pub mod queue; pub mod routes; /// Contains the utils pub mod utils; +/// Contains workers which act like cron jobs +pub mod workers; #[cfg(test)] mod tests; diff --git a/crates/orchestrator/src/main.rs b/crates/orchestrator/src/main.rs index f7146110..b6502683 100644 --- a/crates/orchestrator/src/main.rs +++ b/crates/orchestrator/src/main.rs @@ -3,6 +3,11 @@ use orchestrator::config::config; use orchestrator::queue::init_consumers; use orchestrator::routes::app_router; use orchestrator::utils::env_utils::get_env_var_or_default; +use orchestrator::workers::proof_registration::ProofRegistrationWorker; +use orchestrator::workers::proving::ProvingWorker; +use orchestrator::workers::snos::SnosWorker; +use orchestrator::workers::update_state::UpdateStateWorker; +use orchestrator::workers::*; /// Start the server #[tokio::main] @@ -21,6 +26,21 @@ async fn main() { // init consumer init_consumers().await.expect("Failed to init consumers"); + // spawn a thread for each worker + // changes in rollup mode - sovereign, validity, validiums etc. + // will likely involve changes in these workers as well + tokio::spawn(start_cron(Box::new(SnosWorker), 60)); + tokio::spawn(start_cron(Box::new(ProvingWorker), 60)); + tokio::spawn(start_cron(Box::new(ProofRegistrationWorker), 60)); + tokio::spawn(start_cron(Box::new(UpdateStateWorker), 60)); + tracing::info!("Listening on http://{}", address); axum::serve(listener, app).await.expect("Failed to start axum server"); } + +async fn start_cron(worker: Box, interval: u64) { + loop { + worker.run_worker().await; + tokio::time::sleep(tokio::time::Duration::from_secs(interval)).await; + } +} diff --git a/crates/orchestrator/src/tests/jobs/da_job/mod.rs b/crates/orchestrator/src/tests/jobs/da_job/mod.rs index 6b29ea27..81166995 100644 --- a/crates/orchestrator/src/tests/jobs/da_job/mod.rs +++ b/crates/orchestrator/src/tests/jobs/da_job/mod.rs @@ -22,7 +22,7 @@ use da_client_interface::{DaVerificationStatus, MockDaClient}; #[tokio::test] async fn test_create_job() { let config = init_config(None, None, None, None).await; - let job = DaJob.create_job(&config, String::from("0")).await; + let job = DaJob.create_job(&config, String::from("0"), HashMap::new()).await; assert!(job.is_ok()); let job = job.unwrap(); diff --git a/crates/orchestrator/src/workers/mod.rs b/crates/orchestrator/src/workers/mod.rs new file mode 100644 index 00000000..846d0c8c --- /dev/null +++ b/crates/orchestrator/src/workers/mod.rs @@ -0,0 +1,11 @@ +use async_trait::async_trait; + +pub mod proof_registration; +pub mod proving; +pub mod snos; +pub mod update_state; + +#[async_trait] +pub trait Worker: Send + Sync { + async fn run_worker(&self); +} diff --git a/crates/orchestrator/src/workers/proof_registration.rs b/crates/orchestrator/src/workers/proof_registration.rs new file mode 100644 index 00000000..5ad5bc2d --- /dev/null +++ b/crates/orchestrator/src/workers/proof_registration.rs @@ -0,0 +1,14 @@ +use crate::workers::Worker; +use async_trait::async_trait; + +pub struct ProofRegistrationWorker; + +#[async_trait] +impl Worker for ProofRegistrationWorker { + /// 1. Fetch all blocks with a successful proving job run + /// 2. Group blocks that have the same proof + /// 3. For each group, create a proof registration job with from and to block in metadata + async fn run_worker(&self) { + todo!() + } +} diff --git a/crates/orchestrator/src/workers/proving.rs b/crates/orchestrator/src/workers/proving.rs new file mode 100644 index 00000000..9476ea71 --- /dev/null +++ b/crates/orchestrator/src/workers/proving.rs @@ -0,0 +1,13 @@ +use crate::workers::Worker; +use async_trait::async_trait; + +pub struct ProvingWorker; + +#[async_trait] +impl Worker for ProvingWorker { + /// 1. Fetch all successful SNOS job runs that don't have a proving job + /// 2. Create a proving job for each SNOS job run + async fn run_worker(&self) { + todo!() + } +} diff --git a/crates/orchestrator/src/workers/snos.rs b/crates/orchestrator/src/workers/snos.rs new file mode 100644 index 00000000..bdc04169 --- /dev/null +++ b/crates/orchestrator/src/workers/snos.rs @@ -0,0 +1,14 @@ +use crate::workers::Worker; +use async_trait::async_trait; + +pub struct SnosWorker; + +#[async_trait] +impl Worker for SnosWorker { + /// 1. Fetch the latest completed block from the Starknet chain + /// 2. Fetch the last block that had a SNOS job run. + /// 3. Create SNOS run jobs for all the remaining blocks + async fn run_worker(&self) { + todo!() + } +} diff --git a/crates/orchestrator/src/workers/update_state.rs b/crates/orchestrator/src/workers/update_state.rs new file mode 100644 index 00000000..c359e99e --- /dev/null +++ b/crates/orchestrator/src/workers/update_state.rs @@ -0,0 +1,14 @@ +use crate::workers::Worker; +use async_trait::async_trait; + +pub struct UpdateStateWorker; + +#[async_trait] +impl Worker for UpdateStateWorker { + /// 1. Fetch the last succesful state update job + /// 2. Fetch all succesful proving jobs covering blocks after the last state update + /// 3. Create state updates for all the blocks that don't have a state update job + async fn run_worker(&self) { + todo!() + } +} diff --git a/crates/settlement_clients/settlement-client-interface/Cargo.toml b/crates/settlement_clients/settlement-client-interface/Cargo.toml new file mode 100644 index 00000000..1e74b976 --- /dev/null +++ b/crates/settlement_clients/settlement-client-interface/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "settlement-client-interface" +version.workspace = true +edition.workspace = true +authors.workspace = true + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +async-trait = { workspace = true } +axum = { workspace = true } +color-eyre = { workspace = true } +mockall = "0.12.1" +starknet = { workspace = true } diff --git a/crates/settlement_clients/settlement-client-interface/src/lib.rs b/crates/settlement_clients/settlement-client-interface/src/lib.rs new file mode 100644 index 00000000..da5a471c --- /dev/null +++ b/crates/settlement_clients/settlement-client-interface/src/lib.rs @@ -0,0 +1,43 @@ +use async_trait::async_trait; +use color_eyre::Result; +use mockall::{automock, predicate::*}; +use starknet::core::types::FieldElement; + +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub enum SettlementVerificationStatus { + #[allow(dead_code)] + Pending, + #[allow(dead_code)] + Verified, + #[allow(dead_code)] + Rejected, +} + +/// Trait for every new DaClient to implement +#[automock] +#[async_trait] +pub trait SettlementClient: Send + Sync { + /// Should register the proof on the base layer and return an external id + /// which can be used to track the status. + async fn register_proof(&self, proof: Vec) -> Result; + + /// Should be used to update state on core contract when DA is done in calldata + async fn update_state_calldata( + &self, + program_output: Vec, + onchain_data_hash: FieldElement, + onchain_data_size: FieldElement, + ) -> Result; + + /// Should be used to update state on core contract when DA is in blobs/alt DA + async fn update_state_blobs(&self, program_output: Vec, kzg_proof: Vec) -> Result; + + /// Should verify the inclusion of the state diff in the DA layer and return the status + async fn verify_inclusion(&self, external_id: &str) -> Result; +} + +/// Trait for every new DaConfig to implement +pub trait SettlementConfig { + /// Should create a new instance of the DaConfig from the environment variables + fn new_from_env() -> Self; +}