Skip to content

Commit

Permalink
more clippy fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
apoorvsadana committed Mar 8, 2024
1 parent 6e14ef0 commit c81d279
Show file tree
Hide file tree
Showing 16 changed files with 106 additions and 51 deletions.
Binary file modified .DS_Store
Binary file not shown.
Binary file modified src/.DS_Store
Binary file not shown.
29 changes: 22 additions & 7 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,41 +5,54 @@ use crate::database::mongodb::config::MongoDbConfig;
use crate::database::mongodb::MongoDb;
use crate::database::{Database, DatabaseConfig};
use crate::queue::sqs::SqsQueue;
use crate::queue::{QueueProvider};
use crate::queue::QueueProvider;
use crate::utils::env_utils::get_env_var_or_panic;
use dotenvy::dotenv;
use starknet::providers::jsonrpc::HttpTransport;
use starknet::providers::{JsonRpcClient, Url};
use std::sync::Arc;
use tokio::sync::OnceCell;

/// The app config. It can be accessed from anywhere inside the service
/// by calling `config` function.
pub struct Config {
/// The starknet client to get data from the node
starknet_client: Arc<JsonRpcClient<HttpTransport>>,
/// The DA client to interact with the DA layer
da_client: Box<dyn DaClient>,
/// The database client
database: Box<dyn Database>,
/// The queue provider
queue: Box<dyn QueueProvider>,
}

impl Config {
/// Returns the starknet client
pub fn starknet_client(&self) -> &Arc<JsonRpcClient<HttpTransport>> {
&self.starknet_client
}

pub fn da_client(&self) -> &Box<dyn DaClient> {
&self.da_client
/// Returns the DA client
pub fn da_client(&self) -> &dyn DaClient {
self.da_client.as_ref()
}

pub fn database(&self) -> &Box<dyn Database> {
&self.database
/// Returns the database client
pub fn database(&self) -> &dyn Database {
self.database.as_ref()
}

pub fn queue(&self) -> &Box<dyn QueueProvider> {
&self.queue
/// Returns the queue provider
pub fn queue(&self) -> &dyn QueueProvider {
self.queue.as_ref()
}
}

/// The app config. It can be accessed from anywhere inside the service.
/// It's initialized only once.
pub static CONFIG: OnceCell<Config> = OnceCell::const_new();

/// Initializes the app config
async fn init_config() -> Config {
dotenv().ok();

Expand All @@ -57,10 +70,12 @@ async fn init_config() -> Config {
Config { starknet_client: Arc::new(provider), da_client: build_da_client(), database, queue }
}

/// Returns the app config. Initializes if not already done.
pub async fn config() -> &'static Config {
CONFIG.get_or_init(init_config).await
}

/// Builds the DA client based on the environment variable DA_LAYER
fn build_da_client() -> Box<dyn DaClient + Send + Sync> {
match get_env_var_or_panic("DA_LAYER").as_str() {
"ethereum" => {
Expand Down
3 changes: 3 additions & 0 deletions src/controllers/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@ use color_eyre::eyre::ErrReport;
use serde_json::json;
use tracing::log;

/// Root level error which is sent back to the client
#[derive(thiserror::Error, Debug)]
pub enum AppError {
/// Internal server error
#[error("Internal Server Error {0}")]
InternalServerError(#[from] ErrReport),
}

/// Convert the error into a response so that it can be sent back to the client
impl IntoResponse for AppError {
fn into_response(self) -> axum::http::Response<axum::body::Body> {
log::error!("Error: {:?}", self);
Expand Down
13 changes: 7 additions & 6 deletions src/controllers/jobs_controller.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,21 @@
use crate::controllers::errors::AppError;
use crate::jobs::types::JobType;
use crate::AppState;
use axum::extract::Json;
use axum::extract::State;
use serde::Deserialize;

/// Client request to create a job
#[derive(Debug, Deserialize)]
pub struct CreateJobRequest {
/// Job type
job_type: JobType,
/// Internal id must be a way to identify the job. For example
/// block_no, transaction_hash etc. The (job_type, internal_id)
/// pair must be unique.
internal_id: String,
}

pub async fn create_job(
State(_state): State<AppState>,
Json(payload): Json<CreateJobRequest>,
) -> Result<Json<()>, AppError> {
/// Create a job
pub async fn create_job(Json(payload): Json<CreateJobRequest>) -> Result<Json<()>, AppError> {
crate::jobs::create_job(payload.job_type, payload.internal_id).await?;
Ok(Json::from(()))
}
3 changes: 3 additions & 0 deletions src/controllers/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
/// Errors
mod errors;

/// Job controllers
pub mod jobs_controller;
5 changes: 4 additions & 1 deletion src/da_clients/ethereum/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#![allow(missing_docs)]
#![allow(clippy::missing_docs_in_private_items)]
use alloy::rpc::client::RpcClient;
use alloy::transports::http::Http;
use async_trait::async_trait;
Expand All @@ -13,6 +15,7 @@ use crate::jobs::types::JobVerificationStatus;

pub mod config;
pub struct EthereumDaClient {
#[allow(dead_code)]
provider: RpcClient<Http<Client>>,
}

Expand All @@ -22,7 +25,7 @@ impl DaClient for EthereumDaClient {
unimplemented!()
}

async fn verify_inclusion(&self, _external_id: &String) -> Result<JobVerificationStatus> {
async fn verify_inclusion(&self, _external_id: &str) -> Result<JobVerificationStatus> {
todo!()
}
}
Expand Down
9 changes: 8 additions & 1 deletion src/da_clients/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,21 @@ use axum::async_trait;
use color_eyre::Result;
use starknet::core::types::FieldElement;

/// Ethereum client
pub mod ethereum;

/// Trait for every new DaClient to implement
#[async_trait]
pub trait DaClient: Send + Sync {
/// Should publish the state diff to the DA layer and return an external id
/// which can be used to track the status of the DA transaction.
async fn publish_state_diff(&self, state_diff: Vec<FieldElement>) -> Result<String>;
async fn verify_inclusion(&self, external_id: &String) -> Result<JobVerificationStatus>;
/// Should verify the inclusion of the state diff in the DA layer and return the status
async fn verify_inclusion(&self, external_id: &str) -> Result<JobVerificationStatus>;
}

/// Trait for every new DaConfig to implement
pub trait DaConfig {
/// Should create a new instance of the DaConfig from the environment variables
fn new_from_env() -> Self;
}
7 changes: 2 additions & 5 deletions src/database/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use color_eyre::Result;
use std::collections::HashMap;
use uuid::Uuid;

/// MongoDB
pub mod mongodb;

/// The Database trait is used to define the methods that a database
Expand All @@ -19,11 +20,7 @@ pub mod mongodb;
pub trait Database: Send + Sync {
async fn create_job(&self, job: JobItem) -> Result<JobItem>;
async fn get_job_by_id(&self, id: Uuid) -> Result<Option<JobItem>>;
async fn get_job_by_internal_id_and_type(
&self,
internal_id: &String,
job_type: &JobType,
) -> Result<Option<JobItem>>;
async fn get_job_by_internal_id_and_type(&self, internal_id: &str, job_type: &JobType) -> Result<Option<JobItem>>;
async fn update_job_status(&self, job: &JobItem, new_status: JobStatus) -> Result<()>;
async fn update_external_id_and_status_and_metadata(
&self,
Expand Down
8 changes: 2 additions & 6 deletions src/database/mongodb/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::jobs::types::{JobItem, JobStatus, JobType};
use async_trait::async_trait;
use color_eyre::eyre::eyre;
use color_eyre::Result;
use mongodb::bson::{Document};
use mongodb::bson::Document;
use mongodb::options::UpdateOptions;
use mongodb::{
bson::doc,
Expand Down Expand Up @@ -70,11 +70,7 @@ impl Database for MongoDb {
Ok(self.get_job_collection().find_one(filter, None).await?)
}

async fn get_job_by_internal_id_and_type(
&self,
internal_id: &String,
job_type: &JobType,
) -> Result<Option<JobItem>> {
async fn get_job_by_internal_id_and_type(&self, internal_id: &str, job_type: &JobType) -> Result<Option<JobItem>> {
let filter = doc! {
"internal_id": internal_id,
"job_type": mongodb::bson::to_bson(&job_type)?,
Expand Down
4 changes: 2 additions & 2 deletions src/jobs/da_job/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::config::Config;
use crate::jobs::types::{JobItem, JobStatus, JobType, JobVerificationStatus};
use crate::jobs::{Job};
use crate::jobs::Job;
use async_trait::async_trait;
use color_eyre::eyre::eyre;
use color_eyre::Result;
Expand Down Expand Up @@ -49,7 +49,7 @@ impl Job for DaJob {
}

async fn verify_job(&self, config: &Config, job: &JobItem) -> Result<JobVerificationStatus> {
Ok(config.da_client().verify_inclusion(&job.external_id).await?)
Ok(config.da_client().verify_inclusion(job.external_id.as_str()).await?)
}

fn max_process_attempts(&self) -> u64 {
Expand Down
29 changes: 25 additions & 4 deletions src/jobs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,35 @@ mod constants;
pub mod da_job;
pub mod types;

/// The Job trait is used to define the methods that a job
/// should implement to be used as a job for the orchestrator. The orchestrator automatically
/// handles queueing and processing of jobs as long as they implement the trait.
#[async_trait]
pub trait Job: Send + Sync {
/// Should build a new job item and return it
async fn create_job(&self, config: &Config, internal_id: String) -> Result<JobItem>;
/// Should process the job and return the external_id which can be used to
/// track the status of the job. For example, a DA job will submit the state diff
/// to the DA layer and return the txn hash.
async fn process_job(&self, config: &Config, job: &JobItem) -> Result<String>;
/// Should verify the job and return the status of the verification. For example,
/// a DA job will verify the inclusion of the state diff in the DA layer and return
/// the status of the verification.
async fn verify_job(&self, config: &Config, job: &JobItem) -> Result<JobVerificationStatus>;
/// Should return the maximum number of attempts to process the job. A new attempt is made
/// every time the verification returns `JobVerificationStatus::Rejected`
fn max_process_attempts(&self) -> u64;
/// Should return the maximum number of attempts to verify the job. A new attempt is made
/// every few seconds depending on the result `verification_polling_delay_seconds`
fn max_verification_attempts(&self) -> u64;
/// Should return the number of seconds to wait before polling for verification
fn verification_polling_delay_seconds(&self) -> u64;
}

/// Creates the job in the DB in the created state and adds it to the process queue
pub async fn create_job(job_type: JobType, internal_id: String) -> Result<()> {
let config = config().await;
let existing_job = config.database().get_job_by_internal_id_and_type(&internal_id, &job_type).await?;
let existing_job = config.database().get_job_by_internal_id_and_type(internal_id.as_str(), &job_type).await?;
if existing_job.is_some() {
log::debug!("Job already exists for internal_id {:?} and job_type {:?}. Skipping.", internal_id, job_type);
return Err(eyre!(
Expand All @@ -44,6 +60,8 @@ pub async fn create_job(job_type: JobType, internal_id: String) -> Result<()> {
Ok(())
}

/// Processes the job, increments the process attempt count and updates the status of the job in the DB.
/// It then adds the job to the verification queue.
pub async fn process_job(id: Uuid) -> Result<()> {
let config = config().await;
let job = get_job(id).await?;
Expand Down Expand Up @@ -78,6 +96,9 @@ pub async fn process_job(id: Uuid) -> Result<()> {
Ok(())
}

/// Verifies the job and updates the status of the job in the DB. If the verification fails, it retries
/// processing the job if the max attempts have not been exceeded. If the max attempts have been exceeded,
/// it marks the job as timedout. If the verification is still pending, it pushes the job back to the queue.
pub async fn verify_job(id: Uuid) -> Result<()> {
let config = config().await;
let job = get_job(id).await?;
Expand All @@ -96,10 +117,10 @@ pub async fn verify_job(id: Uuid) -> Result<()> {
let verification_status = job_handler.verify_job(config, &job).await?;

match verification_status {
JobVerificationStatus::VERIFIED => {
JobVerificationStatus::Verified => {
config.database().update_job_status(&job, JobStatus::Completed).await?;
}
JobVerificationStatus::REJECTED => {
JobVerificationStatus::Rejected => {
config.database().update_job_status(&job, JobStatus::VerificationFailed).await?;

// retry job processing if we haven't exceeded the max limit
Expand All @@ -116,7 +137,7 @@ pub async fn verify_job(id: Uuid) -> Result<()> {
// TODO: send alert
}
}
JobVerificationStatus::PENDING => {
JobVerificationStatus::Pending => {
log::info!("Inclusion is still pending for job {}. Pushing back to queue.", job.id);
let verify_attempts = get_u64_from_metadata(&job.metadata, JOB_VERIFICATION_ATTEMPT_METADATA_KEY)?;
if verify_attempts >= job_handler.max_verification_attempts() {
Expand Down
9 changes: 6 additions & 3 deletions src/jobs/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,10 @@ pub struct JobItem {

#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum JobVerificationStatus {
PENDING,
VERIFIED,
REJECTED,
#[allow(dead_code)]
Pending,
#[allow(dead_code)]
Verified,
#[allow(dead_code)]
Rejected,
}
20 changes: 12 additions & 8 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,20 @@
#![warn(missing_docs)]
#![warn(clippy::missing_docs_in_private_items)]

/// Config of the service. Contains configurations for DB, Queues and other services.
mod config;
/// Controllers for the routes
mod controllers;
/// Contains the trait that all DA clients must implement
mod da_clients;
/// Contains the trait that all database clients must implement
mod database;
/// Contains the trait that all jobs must implement. Also
/// contains the root level functions for which detect the job
/// type and call the corresponding job
mod jobs;
/// Contains the trait that all queues must implement
mod queue;
/// Contains the routes for the service
mod routes;
/// Contains the utils
mod utils;

use crate::config::config;
Expand All @@ -16,9 +23,7 @@ use crate::routes::app_router;
use crate::utils::env_utils::get_env_var_or_default;
use dotenvy::dotenv;

#[derive(Clone)]
pub struct AppState {}

/// Start the server
#[tokio::main]
async fn main() {
dotenv().ok();
Expand All @@ -30,8 +35,7 @@ async fn main() {
let port = get_env_var_or_default("PORT", "3000").parse::<u16>().expect("PORT must be a u16");
let address = format!("{}:{}", host, port);
let listener = tokio::net::TcpListener::bind(address.clone()).await.expect("Failed to get listener");
let state = AppState {};
let app = app_router(state.clone()).with_state(state);
let app = app_router();

// init consumer
init_consumers().await.expect("Failed to init consumers");
Expand Down
3 changes: 3 additions & 0 deletions src/queue/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ use omniqueue::{Delivery, QueueError};

use std::time::Duration;

/// The QueueProvider trait is used to define the methods that a queue
/// should implement to be used as a queue for the orchestrator. The
/// purpose of this trait is to allow developers to use any queue of their choice.
#[async_trait]
pub trait QueueProvider: Send + Sync {
async fn send_message_to_queue(&self, queue: String, payload: String, delay: Option<Duration>) -> Result<()>;
Expand Down
Loading

0 comments on commit c81d279

Please sign in to comment.