Skip to content

Commit

Permalink
feat : fixed tests and updated worker code
Browse files Browse the repository at this point in the history
  • Loading branch information
ocdbytes committed Jul 16, 2024
1 parent 375cb15 commit d2d3c26
Show file tree
Hide file tree
Showing 7 changed files with 9,366 additions and 4,700 deletions.
3 changes: 2 additions & 1 deletion crates/orchestrator/src/database/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,10 @@ pub trait Database: Send + Sync {
job_b_type: JobType,
) -> Result<Vec<JobItem>>;
async fn get_last_successful_job_by_type(&self, job_type: JobType) -> Result<Option<JobItem>>;
async fn get_completed_jobs_after_internal_id_by_job_type(
async fn get_jobs_after_internal_id_by_job_type(
&self,
job_type: JobType,
job_status: JobStatus,
internal_id: String,
) -> Result<Vec<JobItem>>;
}
Expand Down
5 changes: 3 additions & 2 deletions crates/orchestrator/src/database/mongodb/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,14 +259,15 @@ impl Database for MongoDb {
.expect("Failed to fetch latest job by given job type"))
}

async fn get_completed_jobs_after_internal_id_by_job_type(
async fn get_jobs_after_internal_id_by_job_type(
&self,
job_type: JobType,
job_status: JobStatus,
internal_id: String,
) -> Result<Vec<JobItem>> {
let filter = doc! {
"job_type": bson::to_bson(&job_type)?,
"job_status": bson::to_bson(&JobStatus::Completed)?,
"job_status": bson::to_bson(&job_status)?,
"internal_id": { "$gt": internal_id }
};

Expand Down
1 change: 1 addition & 0 deletions crates/orchestrator/src/jobs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ fn get_job_handler(job_type: &JobType) -> Box<dyn Job> {
JobType::DataSubmission => Box::new(da_job::DaJob),
JobType::SnosRun => Box::new(snos_job::SnosJob),
JobType::ProofCreation => Box::new(proving_job::ProvingJob),
JobType::StateTransition => Box::new(state_update_job::StateUpdateJob),
_ => unimplemented!("Job type not implemented yet."),
}
}
Expand Down
12 changes: 6 additions & 6 deletions crates/orchestrator/src/tests/workers/update_state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@ use crate::jobs::types::{JobStatus, JobType};
use crate::queue::MockQueueProvider;
use crate::tests::common::init_config;
use crate::tests::workers::utils::{
db_create_job_expectations_update_state_worker, db_get_job_expectations_update_state_worker,
get_job_by_mock_id_vector, get_job_item_mock_by_id,
db_create_job_expectations_update_state_worker, get_job_by_mock_id_vector, get_job_item_mock_by_id,
};
use crate::workers::update_state::UpdateStateWorker;
use crate::workers::Worker;
Expand Down Expand Up @@ -45,9 +44,9 @@ async fn test_update_state_worker(
.returning(|_| Ok(Some(get_job_item_mock_by_id("1".to_string(), Uuid::new_v4()))));

// mocking the return values of second function call (getting completed proving worker jobs)
db.expect_get_completed_jobs_after_internal_id_by_job_type()
.with(eq(JobType::ProofCreation), eq("1".to_string()))
.returning(move |_, _| {
db.expect_get_jobs_after_internal_id_by_job_type()
.with(eq(JobType::ProofCreation), eq(JobStatus::Completed), eq("1".to_string()))
.returning(move |_, _, _| {
Ok(get_job_by_mock_id_vector(
JobType::ProofCreation,
JobStatus::Completed,
Expand All @@ -57,7 +56,8 @@ async fn test_update_state_worker(
});

// mocking getting of the jobs
let completed_jobs = get_job_by_mock_id_vector(JobType::ProofCreation, JobStatus::Completed, number_of_processed_jobs as u64, 2);
let completed_jobs =
get_job_by_mock_id_vector(JobType::ProofCreation, JobStatus::Completed, number_of_processed_jobs as u64, 2);
for job in completed_jobs {
db.expect_get_job_by_internal_id_and_type()
.times(1)
Expand Down
19 changes: 11 additions & 8 deletions crates/orchestrator/src/tests/workers/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,17 @@ pub fn get_job_item_mock_by_id(id: String, uuid: Uuid) -> JobItem {
}
}

/// Function to get the vector of JobItems with mock IDs
///
/// Arguments :
///
/// `job_type` : Type of job you want to create the vector for.
///
/// `job_status` : State of the job you want to create the vector for.
///
/// `number_of_jobs` : Number of jobs (length of the vector you need).
///
/// `start_index` : Start index of the `internal_id` for the JobItem in the vector.
pub fn get_job_by_mock_id_vector(
job_type: JobType,
job_status: JobStatus,
Expand All @@ -41,14 +52,6 @@ pub fn get_job_by_mock_id_vector(
jobs_vec
}

pub fn db_get_job_expectations_update_state_worker(db: &mut MockDatabase, internal_id: &str) {
db.expect_get_job_by_internal_id_and_type()
.times(1)
.with(eq(internal_id.to_string()), eq(JobType::StateTransition))
.returning(|_, _| Ok(None));
println!("expect set for job id : {}", internal_id);
}

pub fn db_create_job_expectations_update_state_worker(db: &mut MockDatabase, proof_creation_jobs: Vec<JobItem>) {
for job in proof_creation_jobs {
let internal_id = job.internal_id.clone();
Expand Down
18 changes: 4 additions & 14 deletions crates/orchestrator/src/workers/update_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::error::Error;

use crate::config::config;
use crate::jobs::create_job;
use crate::jobs::types::JobType;
use crate::jobs::types::{JobStatus, JobType};
use async_trait::async_trait;

use crate::workers::Worker;
Expand All @@ -24,25 +24,15 @@ impl Worker for UpdateStateWorker {

let successful_proving_jobs = config
.database()
.get_completed_jobs_after_internal_id_by_job_type(
.get_jobs_after_internal_id_by_job_type(
JobType::ProofCreation,
JobStatus::Completed,
latest_successful_job_internal_id,
)
.await?;

for job in successful_proving_jobs {
let existing_job = config
.database()
.get_job_by_internal_id_and_type(&job.internal_id, &JobType::StateTransition)
.await?;
match existing_job {
Some(job) => {
log::info!("State Update Job already exists for internal id : {}", job.internal_id)
}
None => {
create_job(JobType::StateTransition, job.internal_id, job.metadata).await?;
}
}
create_job(JobType::StateTransition, job.internal_id, job.metadata).await?;
}

Ok(())
Expand Down
Loading

0 comments on commit d2d3c26

Please sign in to comment.