Skip to content

Commit

Permalink
chore: job execution
Browse files Browse the repository at this point in the history
  • Loading branch information
bodymindarts committed May 4, 2024
1 parent e3c02db commit 9992cce
Show file tree
Hide file tree
Showing 14 changed files with 276 additions and 21 deletions.
85 changes: 85 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ tokio-stream = { version = "0.1.14", features = ["sync"] }
serde = "1.0"
serde_yaml = "0.9.25"
serde_json = "1.0"
serde_with = "3.8.1"
tonic = "0.11"
tonic-health = "0.11"
prost = "0.12.1"
Expand Down
1 change: 1 addition & 0 deletions cala-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ sqlx = { workspace = true }
serde = { workspace = true }
serde_yaml = { workspace = true }
serde_json = { workspace = true }
serde_with = { workspace = true }
uuid = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
10 changes: 10 additions & 0 deletions cala-server/migrations/20231211151809_cala_server_setup.sql
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,13 @@ CREATE TABLE import_job_events (
recorded_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE(id, sequence)
);

CREATE TYPE JobType AS ENUM ('import');

CREATE TABLE job_executions (
id UUID NOT NULL UNIQUE,
type JobType NOT NULL,
executing_server_id VARCHAR,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
reschedule_after TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
9 changes: 9 additions & 0 deletions cala-server/src/app/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
use serde::{Deserialize, Serialize};

use crate::job_execution::JobExecutionConfig;

#[derive(Clone, Default, Debug, Deserialize, Serialize)]
pub struct AppConfig {
#[serde(default)]
pub job_execution: JobExecutionConfig,
}
4 changes: 3 additions & 1 deletion cala-server/src/app/error.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use thiserror::Error;

use crate::import_job::error::*;
use crate::{import_job::error::*, job_execution::error::*};

#[derive(Error, Debug)]
pub enum ApplicationError {
#[error("ApplicationError - Sqlx: {0}")]
Sqlx(#[from] sqlx::Error),
#[error("ApplicationError - ImportJobError: {0}")]
ImportJob(#[from] ImportJobError),
#[error("ApplicationError - JobExecutionError: {0}")]
JobExecution(#[from] JobExecutionError),
}
35 changes: 27 additions & 8 deletions cala-server/src/app/mod.rs
Original file line number Diff line number Diff line change
@@ -1,28 +1,38 @@
mod config;
mod error;

use sqlx::{Pool, Postgres};
use sqlx::PgPool;
use tracing::instrument;

use cala_ledger::{query::*, CalaLedger};

use crate::import_job::*;
use crate::{import_job::*, job_execution::*};
pub use config::*;
pub use error::*;

#[derive(Clone)]
pub struct CalaApp {
_pool: Pool<Postgres>,
pool: PgPool,
ledger: CalaLedger,
import_jobs: ImportJobs,
job_execution: JobExecution,
}

impl CalaApp {
pub fn new(pool: Pool<Postgres>, ledger: CalaLedger) -> Self {
pub async fn run(
pool: PgPool,
config: AppConfig,
ledger: CalaLedger,
) -> Result<Self, ApplicationError> {
let import_jobs = ImportJobs::new(&pool);
Self {
_pool: pool,
let mut job_execution = JobExecution::new(&pool, config.job_execution.clone());
job_execution.start_poll().await?;
Ok(Self {
pool,
ledger,
import_jobs,
}
job_execution,
})
}

pub fn ledger(&self) -> &CalaLedger {
Expand All @@ -44,7 +54,16 @@ impl CalaApp {
}))
.build()
.expect("Could not build import job");
Ok(self.import_jobs.create(new_import_job).await?)
let mut tx = self.pool.begin().await?;
let job = self
.import_jobs
.create_in_tx(&mut tx, new_import_job)
.await?;
self.job_execution
.register_import_job(&mut tx, &job)
.await?;
tx.commit().await?;
Ok(job)
}

#[instrument(name = "cala_server.list_import_jobs", skip(self))]
Expand Down
12 changes: 8 additions & 4 deletions cala-server/src/cli/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,33 +5,37 @@ use serde::{Deserialize, Serialize};
use std::path::Path;

use super::db::*;
use crate::server::ServerConfig;
use crate::{app::AppConfig, server::ServerConfig};

#[derive(Clone, Default, Serialize, Deserialize)]
pub struct Config {
#[serde(default)]
pub db: DbConfig,
#[serde(default)]
pub server: ServerConfig,
// #[serde(default)]
// pub app: AppConfig,
#[serde(default)]
pub app: AppConfig,
#[serde(default)]
pub tracing: TracingConfig,
}

pub struct EnvOverride {
pub db_con: String,
pub server_id: Option<String>,
}

impl Config {
pub fn from_path(
path: impl AsRef<Path>,
EnvOverride { db_con }: EnvOverride,
EnvOverride { db_con, server_id }: EnvOverride,
) -> anyhow::Result<Self> {
let config_file = std::fs::read_to_string(path).context("Couldn't read config file")?;
let mut config: Config =
serde_yaml::from_str(&config_file).context("Couldn't parse config file")?;
config.db.pg_con = db_con;
if let Some(server_id) = server_id {
config.app.job_execution.server_id = server_id;
}

Ok(config)
}
Expand Down
12 changes: 10 additions & 2 deletions cala-server/src/cli/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,22 @@ struct Cli {
value_name = "DIRECTORY"
)]
cala_home: String,
#[clap(long, env = "CALA_SERVER_ID")]
server_id: Option<String>,
#[clap(env = "PG_CON")]
pg_con: String,
}

pub async fn run() -> anyhow::Result<()> {
let cli = Cli::parse();

let config = Config::from_path(cli.config, EnvOverride { db_con: cli.pg_con })?;
let config = Config::from_path(
cli.config,
EnvOverride {
db_con: cli.pg_con,
server_id: cli.server_id,
},
)?;

run_cmd(&cli.cala_home, config).await?;

Expand All @@ -46,7 +54,7 @@ async fn run_cmd(cala_home: &str, config: Config) -> anyhow::Result<()> {
let pool = db::init_pool(&config.db).await?;
let ledger_config = CalaLedgerConfig::builder().pool(pool.clone()).build()?;
let ledger = CalaLedger::init(ledger_config).await?;
let app = crate::app::CalaApp::new(pool, ledger);
let app = crate::app::CalaApp::run(pool, config.app, ledger).await?;
crate::server::run(config.server, app).await?;
Ok(())
}
Expand Down
14 changes: 8 additions & 6 deletions cala-server/src/import_job/repo.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use sqlx::PgPool;
use sqlx::{PgPool, Postgres, Transaction};

use super::{cursor::*, entity::*, error::*};
use crate::primitives::ImportJobId;
Expand All @@ -14,21 +14,23 @@ impl ImportJobs {
Self { pool: pool.clone() }
}

pub async fn create(&self, new_import_job: NewImportJob) -> Result<ImportJob, ImportJobError> {
let mut tx = self.pool.begin().await?;
pub async fn create_in_tx(
&self,
tx: &mut Transaction<'_, Postgres>,
new_import_job: NewImportJob,
) -> Result<ImportJob, ImportJobError> {
let id = new_import_job.id;
sqlx::query!(
r#"INSERT INTO import_jobs (id, name)
VALUES ($1, $2)"#,
id as ImportJobId,
new_import_job.name,
)
.execute(&mut *tx)
.execute(&mut **tx)
.await?;
let mut events = new_import_job.initial_events();
events.persist(&mut tx).await?;
events.persist(tx).await?;
let import_job = ImportJob::try_from(events)?;
tx.commit().await?;
Ok(import_job)
}

Expand Down
30 changes: 30 additions & 0 deletions cala-server/src/job_execution/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
use serde::{Deserialize, Serialize};

use std::time::Duration;

#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde_with::serde_as]
pub struct JobExecutionConfig {
#[serde(default = "random_server_id")]
pub server_id: String,
#[serde_as(as = "serde_with::DurationSeconds<u64>")]
#[serde(default = "default_poll_interval")]
pub poll_interval: Duration,
}

impl Default for JobExecutionConfig {
fn default() -> Self {
Self {
server_id: random_server_id(),
poll_interval: default_poll_interval(),
}
}
}

fn random_server_id() -> String {
uuid::Uuid::new_v4().to_string()
}

fn default_poll_interval() -> Duration {
Duration::from_secs(5)
}
Loading

0 comments on commit 9992cce

Please sign in to comment.