Skip to content

Commit

Permalink
feat: Data Submission Worker Integration. (#51)
Browse files Browse the repository at this point in the history
* update: DA job draft #1

* docs: changelog updated

* update: is_worker_enabled impl & usage in da_submission, removal of String from VerificationFailed

* update: renamed  to

* update: run worker only if it's enabled using is_worker_enabled check

* build: linter fixes

* Update CHANGELOG.md

Co-authored-by: Apoorv Sadana <[email protected]>

* update: limit_to_one on get_jobs_by_status

* update: removed get_last_successful_job_by_type, added get_latest_job_by_type_and_status

* update: added error to job metadata

* update: pr resolution, simplifying get_jobs_by_status, rejected status in verify_jobs

* update: linting fixes

* Update crates/orchestrator/src/jobs/mod.rs

Co-authored-by: Apoorv Sadana <[email protected]>

* update: removing .expect from mongodb mod file

* update: fixed testcase for snos worker

* chore: correct variable name

* update: added support to check againt multiple status - is_worker_enabled, get_jobs_by_statuses

* docs: rewrote 1 job per block assumption

* docs: DataSubmissionWorker -> DataAvailabilitySynchronizer

* chore: liniting fix

* update: changed name : DataAvailabilitySynchronizer -> DataSubmissionWorker

---------

Co-authored-by: Apoorv Sadana <[email protected]>
  • Loading branch information
heemankv and apoorvsadana authored Aug 3, 2024
1 parent d3e57f9 commit 43db46e
Show file tree
Hide file tree
Showing 13 changed files with 152 additions and 52 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).

## Added

- implemented DA worker.
- Function to calculate the kzg proof of x_0.
- Tests for updating the state.
- Function to update the state and publish blob on ethereum in state update job.
Expand Down
11 changes: 9 additions & 2 deletions crates/orchestrator/src/database/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,27 @@ pub trait Database: Send + Sync {
async fn update_job(&self, job: &JobItem) -> Result<()>;
async fn update_job_status(&self, job: &JobItem, new_status: JobStatus) -> Result<()>;
async fn update_metadata(&self, job: &JobItem, metadata: HashMap<String, String>) -> Result<()>;
async fn get_latest_job_by_type_and_internal_id(&self, job_type: JobType) -> Result<Option<JobItem>>;
async fn get_latest_job_by_type(&self, job_type: JobType) -> Result<Option<JobItem>>;
async fn get_jobs_without_successor(
&self,
job_a_type: JobType,
job_a_status: JobStatus,
job_b_type: JobType,
) -> Result<Vec<JobItem>>;
async fn get_last_successful_job_by_type(&self, job_type: JobType) -> Result<Option<JobItem>>;
async fn get_latest_job_by_type_and_status(
&self,
job_type: JobType,
job_status: JobStatus,
) -> Result<Option<JobItem>>;
async fn get_jobs_after_internal_id_by_job_type(
&self,
job_type: JobType,
job_status: JobStatus,
internal_id: String,
) -> Result<Vec<JobItem>>;

// TODO: can be extendible to support multiple status.
async fn get_jobs_by_statuses(&self, status: Vec<JobStatus>, limit: Option<i64>) -> Result<Vec<JobItem>>;
}

pub trait DatabaseConfig {
Expand Down
56 changes: 26 additions & 30 deletions crates/orchestrator/src/database/mongodb/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use async_std::stream::StreamExt;
use futures::TryStreamExt;
use std::collections::HashMap;

use async_trait::async_trait;
use color_eyre::eyre::eyre;
use color_eyre::Result;
use mongodb::bson::{Bson, Document};
use mongodb::options::{FindOneOptions, UpdateOptions};
use mongodb::options::{FindOneOptions, FindOptions, UpdateOptions};
use mongodb::{
bson,
bson::doc,
Expand Down Expand Up @@ -112,16 +113,12 @@ impl Database for MongoDb {
Ok(())
}

async fn get_latest_job_by_type_and_internal_id(&self, job_type: JobType) -> Result<Option<JobItem>> {
async fn get_latest_job_by_type(&self, job_type: JobType) -> Result<Option<JobItem>> {
let filter = doc! {
"job_type": mongodb::bson::to_bson(&job_type)?,
};
let find_options = FindOneOptions::builder().sort(doc! { "internal_id": -1 }).build();
Ok(self
.get_job_collection()
.find_one(filter, find_options)
.await
.expect("Failed to fetch latest job by given job type"))
Ok(self.get_job_collection().find_one(filter, find_options).await?)
}

/// function to get jobs that don't have a successor job.
Expand Down Expand Up @@ -226,8 +223,7 @@ impl Database for MongoDb {
// }
// }

let collection = self.get_job_collection();
let mut cursor = collection.aggregate(pipeline, None).await?;
let mut cursor = self.get_job_collection().aggregate(pipeline, None).await?;

let mut vec_jobs: Vec<JobItem> = Vec::new();

Expand All @@ -245,18 +241,18 @@ impl Database for MongoDb {
Ok(vec_jobs)
}

async fn get_last_successful_job_by_type(&self, job_type: JobType) -> Result<Option<JobItem>> {
async fn get_latest_job_by_type_and_status(
&self,
job_type: JobType,
job_status: JobStatus,
) -> Result<Option<JobItem>> {
let filter = doc! {
"job_type": bson::to_bson(&job_type)?,
"job_status": bson::to_bson(&JobStatus::Completed)?
"job_status": bson::to_bson(&job_status)?
};
let find_options = FindOneOptions::builder().sort(doc! { "internal_id": -1 }).build();

Ok(self
.get_job_collection()
.find_one(filter, find_options)
.await
.expect("Failed to fetch latest job by given job type"))
Ok(self.get_job_collection().find_one(filter, find_options).await?)
}

async fn get_jobs_after_internal_id_by_job_type(
Expand All @@ -271,23 +267,23 @@ impl Database for MongoDb {
"internal_id": { "$gt": internal_id }
};

let mut jobs = self
.get_job_collection()
.find(filter, None)
.await
.expect("Failed to fetch latest jobs by given job type and internal_od conditions");
let jobs = self.get_job_collection().find(filter, None).await?.try_collect().await?;

let mut results = Vec::new();
Ok(jobs)
}

while let Some(result) = jobs.next().await {
match result {
Ok(job_item) => {
results.push(job_item);
}
Err(e) => return Err(e.into()),
async fn get_jobs_by_statuses(&self, job_status: Vec<JobStatus>, limit: Option<i64>) -> Result<Vec<JobItem>> {
let filter = doc! {
"job_status": {
// TODO: Check that the conversion leads to valid output!
"$in": job_status.iter().map(|status| bson::to_bson(status).unwrap_or(Bson::Null)).collect::<Vec<Bson>>()
}
}
};

let find_options = limit.map(|val| FindOptions::builder().limit(Some(val)).build());

let jobs = self.get_job_collection().find(filter, find_options).await?.try_collect().await?;

Ok(results)
Ok(jobs)
}
}
10 changes: 8 additions & 2 deletions crates/orchestrator/src/jobs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ pub async fn process_job(id: Uuid) -> Result<()> {
match job.status {
// we only want to process jobs that are in the created or verification failed state.
// verification failed state means that the previous processing failed and we want to retry
JobStatus::Created | JobStatus::VerificationFailed(_) => {
JobStatus::Created | JobStatus::VerificationFailed => {
log::info!("Processing job with id {:?}", id);
}
_ => {
Expand Down Expand Up @@ -135,7 +135,13 @@ pub async fn verify_job(id: Uuid) -> Result<()> {
config.database().update_job_status(&job, JobStatus::Completed).await?;
}
JobVerificationStatus::Rejected(e) => {
config.database().update_job_status(&job, JobStatus::VerificationFailed(e)).await?;
let mut new_job = job.clone();
new_job.metadata.insert("error".to_string(), e);
new_job.status = JobStatus::VerificationFailed;

config.database().update_job(&new_job).await?;

log::error!("Verification failed for job with id {:?}. Cannot verify.", id);

// retry job processing if we haven't exceeded the max limit
let process_attempts = get_u64_from_metadata(&job.metadata, JOB_PROCESS_ATTEMPT_METADATA_KEY)?;
Expand Down
2 changes: 1 addition & 1 deletion crates/orchestrator/src/jobs/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ pub enum JobStatus {
/// The job was processed but the was unable to be verified under the given time
VerificationTimeout,
/// The job failed processing
VerificationFailed(String),
VerificationFailed,
}

#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
Expand Down
4 changes: 3 additions & 1 deletion crates/orchestrator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use dotenvy::dotenv;
use orchestrator::config::config;
use orchestrator::queue::init_consumers;
use orchestrator::routes::app_router;
use orchestrator::workers::data_submission_worker::DataSubmissionWorker;
use orchestrator::workers::proof_registration::ProofRegistrationWorker;
use orchestrator::workers::proving::ProvingWorker;
use orchestrator::workers::snos::SnosWorker;
Expand Down Expand Up @@ -33,14 +34,15 @@ async fn main() {
tokio::spawn(start_cron(Box::new(ProvingWorker), 60));
tokio::spawn(start_cron(Box::new(ProofRegistrationWorker), 60));
tokio::spawn(start_cron(Box::new(UpdateStateWorker), 60));
tokio::spawn(start_cron(Box::new(DataSubmissionWorker), 60));

tracing::info!("Listening on http://{}", address);
axum::serve(listener, app).await.expect("Failed to start axum server");
}

async fn start_cron(worker: Box<dyn Worker>, interval: u64) {
loop {
worker.run_worker().await.expect("Error in running the worker.");
worker.run_worker_if_enabled().await.expect("Error in running the worker.");
tokio::time::sleep(tokio::time::Duration::from_secs(interval)).await;
}
}
13 changes: 8 additions & 5 deletions crates/orchestrator/src/tests/workers/snos/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::config::config_force_init;
use crate::database::MockDatabase;
use crate::jobs::types::JobType;
use crate::jobs::types::{JobStatus, JobType};
use crate::queue::MockQueueProvider;
use crate::tests::common::init_config;
use crate::tests::workers::utils::get_job_item_mock_by_id;
Expand Down Expand Up @@ -30,15 +30,18 @@ async fn test_snos_worker(#[case] db_val: bool) -> Result<(), Box<dyn Error>> {

// Mocking db function expectations
if !db_val {
db.expect_get_last_successful_job_by_type().times(1).with(eq(JobType::SnosRun)).returning(|_| Ok(None));
db.expect_get_latest_job_by_type_and_status()
.times(1)
.with(eq(JobType::SnosRun), eq(JobStatus::Completed))
.returning(|_, _| Ok(None));
start_job_index = 1;
block = 5;
} else {
let uuid_temp = Uuid::new_v4();

db.expect_get_last_successful_job_by_type()
.with(eq(JobType::SnosRun))
.returning(move |_| Ok(Some(get_job_item_mock_by_id("1".to_string(), uuid_temp))));
db.expect_get_latest_job_by_type_and_status()
.with(eq(JobType::SnosRun), eq(JobStatus::Completed))
.returning(move |_, _| Ok(Some(get_job_item_mock_by_id("1".to_string(), uuid_temp))));
block = 6;
start_job_index = 2;
}
Expand Down
11 changes: 7 additions & 4 deletions crates/orchestrator/src/tests/workers/update_state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,18 @@ async fn test_update_state_worker(
// Mocking db function expectations
// If no successful state update jobs exist
if !last_successful_job_exists {
db.expect_get_last_successful_job_by_type().with(eq(JobType::StateTransition)).times(1).returning(|_| Ok(None));
db.expect_get_latest_job_by_type_and_status()
.with(eq(JobType::StateTransition), eq(JobStatus::Completed))
.times(1)
.returning(|_, _| Ok(None));
} else {
// if successful state update job exists

// mocking the return value of first function call (getting last successful jobs):
db.expect_get_last_successful_job_by_type()
.with(eq(JobType::StateTransition))
db.expect_get_latest_job_by_type_and_status()
.with(eq(JobType::StateTransition), eq(JobStatus::Completed))
.times(1)
.returning(|_| Ok(Some(get_job_item_mock_by_id("1".to_string(), Uuid::new_v4()))));
.returning(|_, _| Ok(Some(get_job_item_mock_by_id("1".to_string(), Uuid::new_v4()))));

// mocking the return values of second function call (getting completed proving worker jobs)
db.expect_get_jobs_after_internal_id_by_job_type()
Expand Down
48 changes: 48 additions & 0 deletions crates/orchestrator/src/workers/data_submission_worker.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
use crate::config::config;
use crate::jobs::create_job;
use crate::jobs::types::{JobStatus, JobType};
use crate::workers::Worker;
use async_trait::async_trait;
use std::collections::HashMap;
use std::error::Error;

pub struct DataSubmissionWorker;

#[async_trait]
impl Worker for DataSubmissionWorker {
// 0. All ids are assumed to be block numbers.
// 1. Fetch the latest completed Proving job.
// 2. Fetch the latest DA job creation.
// 3. Create jobs from after the lastest DA job already created till latest completed proving job.
async fn run_worker(&self) -> Result<(), Box<dyn Error>> {
let config = config().await;

// provides latest completed proof creation job id
let latest_proven_job_id = config
.database()
.get_latest_job_by_type_and_status(JobType::ProofCreation, JobStatus::Completed)
.await
.unwrap()
.map(|item| item.internal_id)
.unwrap_or("0".to_string());

// provides latest triggered data submission job id
let latest_data_submission_job_id = config
.database()
.get_latest_job_by_type(JobType::DataSubmission)
.await
.unwrap()
.map(|item| item.internal_id)
.unwrap_or("0".to_string());

let latest_data_submission_id: u64 = latest_data_submission_job_id.parse()?;
let latest_proven_id: u64 = latest_proven_job_id.parse()?;

// creating data submission jobs for latest blocks that don't have existing data submission jobs yet.
for new_job_id in latest_data_submission_id + 1..latest_proven_id + 1 {
create_job(JobType::DataSubmission, new_job_id.to_string(), HashMap::new()).await?;
}

Ok(())
}
}
37 changes: 35 additions & 2 deletions crates/orchestrator/src/workers/mod.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,46 @@
use std::error::Error;

use crate::{config::config, jobs::types::JobStatus};
use async_trait::async_trait;
use std::error::Error;

pub mod data_submission_worker;
pub mod proof_registration;
pub mod proving;
pub mod snos;
pub mod update_state;

#[async_trait]
pub trait Worker: Send + Sync {
async fn run_worker_if_enabled(&self) -> Result<(), Box<dyn Error>> {
if !self.is_worker_enabled().await? {
return Ok(());
}
self.run_worker().await
}

async fn run_worker(&self) -> Result<(), Box<dyn Error>>;

// Assumption
// If say a job for block X fails, we don't want the worker to respawn another job for the same block
// we will resolve the existing failed job first.

// We assume the system to keep working till a job hasn't failed,
// as soon as it fails we currently halt any more execution and wait for manual intervention.

// Checks if any of the jobs have failed
// Failure : JobStatus::VerificationFailed, JobStatus::VerificationTimeout, JobStatus::Failed
// Halts any new job creation till all the count of failed jobs is not Zero.
async fn is_worker_enabled(&self) -> Result<bool, Box<dyn Error>> {
let config = config().await;

let failed_jobs = config
.database()
.get_jobs_by_statuses(vec![JobStatus::VerificationFailed, JobStatus::VerificationTimeout], Some(1))
.await?;

if !failed_jobs.is_empty() {
return Ok(false);
}

Ok(true)
}
}
4 changes: 2 additions & 2 deletions crates/orchestrator/src/workers/snos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use starknet::providers::Provider;

use crate::config::config;
use crate::jobs::create_job;
use crate::jobs::types::JobType;
use crate::jobs::types::{JobStatus, JobType};
use crate::workers::Worker;

pub struct SnosWorker;
Expand All @@ -22,7 +22,7 @@ impl Worker for SnosWorker {
let latest_block_number = provider.block_number().await?;
let latest_block_processed_data = config
.database()
.get_last_successful_job_by_type(JobType::SnosRun)
.get_latest_job_by_type_and_status(JobType::SnosRun, JobStatus::Completed)
.await
.unwrap()
.map(|item| item.internal_id)
Expand Down
3 changes: 2 additions & 1 deletion crates/orchestrator/src/workers/update_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ impl Worker for UpdateStateWorker {
/// 3. Create state updates for all the blocks that don't have a state update job
async fn run_worker(&self) -> Result<(), Box<dyn Error>> {
let config = config().await;
let latest_successful_job = config.database().get_last_successful_job_by_type(JobType::StateTransition).await?;
let latest_successful_job =
config.database().get_latest_job_by_type_and_status(JobType::StateTransition, JobStatus::Completed).await?;

match latest_successful_job {
Some(job) => {
Expand Down
4 changes: 2 additions & 2 deletions crates/prover-services/gps-fact-checker/src/fact_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
//! constructed using a stack of nodes (initialized to an empty stack) by repeating for each pair:
//! 1. Add #n_pages lead nodes to the stack.
//! 2. Pop the top #n_nodes, construct a parent node for them, and push it back to the stack.
//! After applying the steps above, the stack must contain exactly one node, which will
//! constitute the root of the Merkle tree.
//! After applying the steps above, the stack must contain exactly one node, which will
//! constitute the root of the Merkle tree.
//!
//! For example, [(2, 2)] will create a Merkle tree with a root and two direct children, while
//! [(3, 2), (0, 2)] will create a Merkle tree with a root whose left child is a leaf and
Expand Down

0 comments on commit 43db46e

Please sign in to comment.