Skip to content

Commit

Permalink
feat : db generic fucntion
Browse files Browse the repository at this point in the history
  • Loading branch information
Arun Jangra authored and Arun Jangra committed Jun 24, 2024
1 parent c65693c commit 9358c18
Show file tree
Hide file tree
Showing 5 changed files with 6,454 additions and 28 deletions.
11 changes: 8 additions & 3 deletions crates/orchestrator/src/database/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use ::mongodb::bson::doc;
use std::collections::HashMap;

use async_trait::async_trait;
Expand Down Expand Up @@ -33,11 +34,15 @@ pub trait Database: Send + Sync {
new_status: JobStatus,
metadata: HashMap<String, String>,
) -> Result<()>;

async fn update_metadata(&self, job: &JobItem, metadata: HashMap<String, String>) -> Result<()>;
async fn get_latest_job_by_type_and_internal_id(&self, job_type: JobType) -> Result<Option<JobItem>>;

async fn get_successful_snos_jobs_without_proving(&self) -> Result<Vec<JobItem>>;
async fn get_jobs_without_successor(
&self,
job_a_type: JobType,
job_a_status: JobStatus,
job_b_type: JobType,
job_b_status: Option<JobStatus>,
) -> Result<Vec<JobItem>>;
}

pub trait DatabaseConfig {
Expand Down
109 changes: 92 additions & 17 deletions crates/orchestrator/src/database/mongodb/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use async_std::stream::StreamExt;
use std::collections::HashMap;

use async_trait::async_trait;
use color_eyre::eyre::eyre;
use color_eyre::Result;
use futures::TryStreamExt;
use mongodb::bson::{Bson, Document};
use mongodb::options::{FindOneOptions, UpdateOptions};
use mongodb::{
Expand Down Expand Up @@ -133,16 +133,54 @@ impl Database for MongoDb {
.expect("Failed to fetch latest job by given job type"))
}

async fn get_successful_snos_jobs_without_proving(&self) -> Result<Vec<JobItem>> {
let filter = vec![
// Stage 1: Match successful SNOS job runs
/// function to get jobs that don't have a successor job.
///
/// `job_a_type` : Type of job that we need to get that doesn't have any successor.
///
/// `job_a_status` : Status of job A.
///
/// `job_b_type` : Type of job that we need to have as a successor for Job A.
///
/// `job_b_status` : Status of job B which we want to check with.
///
/// Eg :
///
/// Getting SNOS jobs that do not have a successive proving job initiated yet.
///
/// job_a_type : SnosRun
///
/// job_a_status : Completed
///
/// job_b_type : ProofCreation
///
/// job_b_status : Status of Job B / None
///
/// **IMP** : For now Job B status implementation is pending so we can pass None
async fn get_jobs_without_successor(
&self,
job_a_type: JobType,
job_a_status: JobStatus,
job_b_type: JobType,
_job_b_status: Option<JobStatus>,
) -> Result<Vec<JobItem>> {
// Convert enums to Bson strings
let job_a_type_bson = Bson::String(format!("{:?}", job_a_type));
let job_a_status_bson = Bson::String(format!("{:?}", job_a_status));
let job_b_type_bson = Bson::String(format!("{:?}", job_b_type));

// TODO :
// implement job_b_status here in the pipeline

// Construct the initial pipeline
let pipeline = vec![
// Stage 1: Match job_a_type with job_a_status
doc! {
"$match": {
"job_type": "SnosRun",
"status": "Completed",
"job_type": job_a_type_bson,
"status": job_a_status_bson,
}
},
// Stage 2: Lookup to find corresponding proving jobs
// Stage 2: Lookup to find corresponding job_b_type jobs
doc! {
"$lookup": {
"from": "jobs",
Expand All @@ -152,30 +190,67 @@ impl Database for MongoDb {
"$match": {
"$expr": {
"$and": [
{ "$eq": ["$job_type", "ProofCreation"] },
{ "$eq": ["$job_type", job_b_type_bson] },
// Conditionally match job_b_status if provided
{ "$eq": ["$internal_id", "$$internal_id"] }
]
}
}
}
},
// TODO : Job B status code :
// // Add status matching if job_b_status is provided
// if let Some(status) = job_b_status {
// doc! {
// "$match": {
// "$expr": { "$eq": ["$status", status] }
// }
// }
// } else {
// doc! {}
// }
// ].into_iter().filter(|d| !d.is_empty()).collect::<Vec<_>>(),
],
"as": "proving_jobs"
"as": "successor_jobs"
}
},
// Stage 3: Filter out SNOS runs that have corresponding proving jobs
// Stage 3: Filter out job_a_type jobs that have corresponding job_b_type jobs
doc! {
"$match": {
"proving_jobs": { "$eq": [] }
"successor_jobs": { "$eq": [] }
}
},
];

let mut cursor = self.get_job_collection().aggregate(filter, None).await?;
// TODO : Job B status code :
// // Conditionally add status matching for job_b_status
// if let Some(status) = job_b_status {
// let job_b_status_bson = Bson::String(format!("{:?}", status));
//
// // Access the "$lookup" stage in the pipeline and modify the "pipeline" array inside it
// if let Ok(lookup_stage) = pipeline[1].get_document_mut("pipeline") {
// if let Ok(lookup_pipeline) = lookup_stage.get_array_mut(0) {
// lookup_pipeline.push(Bson::Document(doc! {
// "$match": {
// "$expr": { "$eq": ["$status", job_b_status_bson] }
// }
// }));
// }
// }
// }

let collection = self.get_job_collection();
let mut cursor = collection.aggregate(pipeline, None).await?;

let mut vec_jobs: Vec<JobItem> = Vec::new();
while let Some(val) = cursor.try_next().await? {
match bson::from_bson(Bson::Document(val)) {
Ok(job_item) => vec_jobs.push(job_item),
Err(e) => eprintln!("Failed to deserialize JobItem: {:?}", e),

// Iterate over the cursor and process each document
while let Some(result) = cursor.next().await {
match result {
Ok(document) => match bson::from_bson(Bson::Document(document)) {
Ok(job_item) => vec_jobs.push(job_item),
Err(e) => eprintln!("Failed to deserialize JobItem: {:?}", e),
},
Err(e) => eprintln!("Error retrieving document: {:?}", e),
}
}

Expand Down
12 changes: 6 additions & 6 deletions crates/orchestrator/src/tests/workers/proving/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@ async fn test_proving_worker(#[case] incomplete_runs: bool) -> Result<(), Box<dy
let jobs_vec_temp: Vec<JobItem> =
get_job_item_mock_by_id_vec(5).into_iter().filter(|val| val.internal_id != "3").collect();
// Mocking db call for getting successful snos jobs
db.expect_get_successful_snos_jobs_without_proving()
db.expect_get_jobs_without_successor()
.times(1)
.with()
.returning(move || Ok(jobs_vec_temp.clone()));
.withf(|_, _, _, _| true)
.returning(move |_, _, _, _| Ok(jobs_vec_temp.clone()));

let num_vec: Vec<i32> = vec![1, 2, 4, 5];

Expand All @@ -56,10 +56,10 @@ async fn test_proving_worker(#[case] incomplete_runs: bool) -> Result<(), Box<dy
}

// Mocking db call for getting successful snos jobs
db.expect_get_successful_snos_jobs_without_proving()
db.expect_get_jobs_without_successor()
.times(1)
.with()
.returning(move || Ok(get_job_item_mock_by_id_vec(5)));
.withf(|_, _, _, _| true)
.returning(move |_, _, _, _| Ok(get_job_item_mock_by_id_vec(5)));

prover_client.expect_submit_task().times(5).returning(|_| Ok("task_id".to_string()));
}
Expand Down
7 changes: 5 additions & 2 deletions crates/orchestrator/src/workers/proving.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::config::config;
use crate::jobs::create_job;
use crate::jobs::types::JobType;
use crate::jobs::types::{JobStatus, JobType};
use crate::workers::Worker;
use async_trait::async_trait;
use std::error::Error;
Expand All @@ -13,7 +13,10 @@ impl Worker for ProvingWorker {
/// 2. Create a proving job for each SNOS job run
async fn run_worker(&self) -> Result<(), Box<dyn Error>> {
let config = config().await;
let successful_snos_jobs = config.database().get_successful_snos_jobs_without_proving().await?;
let successful_snos_jobs = config
.database()
.get_jobs_without_successor(JobType::SnosRun, JobStatus::Completed, JobType::ProofCreation, None)
.await?;

for job in successful_snos_jobs {
create_job(JobType::ProofCreation, job.internal_id.to_string(), job.metadata).await?
Expand Down
Loading

0 comments on commit 9358c18

Please sign in to comment.