From 9992cce5c550acfa35a05cbee991b8bc79537f3b Mon Sep 17 00:00:00 2001 From: bodymindarts Date: Sat, 4 May 2024 22:06:01 +0200 Subject: [PATCH] chore: job execution --- Cargo.lock | 85 +++++++++++++++++++ Cargo.toml | 1 + cala-server/Cargo.toml | 1 + .../20231211151809_cala_server_setup.sql | 10 +++ cala-server/src/app/config.rs | 9 ++ cala-server/src/app/error.rs | 4 +- cala-server/src/app/mod.rs | 35 ++++++-- cala-server/src/cli/config.rs | 12 ++- cala-server/src/cli/mod.rs | 12 ++- cala-server/src/import_job/repo.rs | 14 +-- cala-server/src/job_execution/config.rs | 30 +++++++ cala-server/src/job_execution/error.rs | 7 ++ cala-server/src/job_execution/mod.rs | 76 +++++++++++++++++ cala-server/src/lib.rs | 1 + 14 files changed, 276 insertions(+), 21 deletions(-) create mode 100644 cala-server/src/app/config.rs create mode 100644 cala-server/src/job_execution/config.rs create mode 100644 cala-server/src/job_execution/error.rs create mode 100644 cala-server/src/job_execution/mod.rs diff --git a/Cargo.lock b/Cargo.lock index 17256cd7..aa358cc1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -651,6 +651,7 @@ dependencies = [ "derive_builder", "serde", "serde_json", + "serde_with", "serde_yaml", "sqlx", "thiserror", @@ -907,6 +908,16 @@ dependencies = [ "zeroize", ] +[[package]] +name = "deranged" +version = "0.3.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b42b6fa04a440b495c8b04d0e71b707c585f83cb9cb28cf8cd0d976c315e31b4" +dependencies = [ + "powerfmt", + "serde", +] + [[package]] name = "derive_builder" version = "0.20.0" @@ -1525,6 +1536,7 @@ checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99" dependencies = [ "autocfg", "hashbrown 0.12.3", + "serde", ] [[package]] @@ -1828,6 +1840,12 @@ dependencies = [ "zeroize", ] +[[package]] +name = "num-conv" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9" + [[package]] name = "num-integer" version = "0.1.46" @@ -2111,6 +2129,12 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d231b230927b5e4ad203db57bbcbee2802f6bce620b1e4a9024a07d94e2907ec" +[[package]] +name = "powerfmt" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391" + [[package]] name = "ppv-lite86" version = "0.2.17" @@ -2681,6 +2705,36 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_with" +version = "3.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ad483d2ab0149d5a5ebcd9972a3852711e0153d863bf5a5d0391d28883c4a20" +dependencies = [ + "base64 0.22.1", + "chrono", + "hex", + "indexmap 1.9.3", + "indexmap 2.2.6", + "serde", + "serde_derive", + "serde_json", + "serde_with_macros", + "time", +] + +[[package]] +name = "serde_with_macros" +version = "3.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "65569b702f41443e8bc8bbb1c5779bd0450bbe723b56198980e80ec45780bce2" +dependencies = [ + "darling", + "proc-macro2", + "quote", + "syn 2.0.60", +] + [[package]] name = "serde_yaml" version = "0.9.34+deprecated" @@ -3182,6 +3236,37 @@ dependencies = [ "once_cell", ] +[[package]] +name = "time" +version = "0.3.36" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5dfd88e563464686c916c7e46e623e520ddc6d79fa6641390f2e3fa86e83e885" +dependencies = [ + "deranged", + "itoa", + "num-conv", + "powerfmt", + "serde", + "time-core", + "time-macros", +] + +[[package]] +name = "time-core" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef927ca75afb808a4d64dd374f00a2adf8d0fcff8e7b184af886c3c87ec4a3f3" + +[[package]] +name = "time-macros" +version = "0.2.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f252a68540fde3a3877aeea552b832b40ab9a69e318efd078774a01ddee1ccf" +dependencies = [ + "num-conv", + "time-core", +] + [[package]] name = "tinyvec" version = "1.6.0" diff --git a/Cargo.toml b/Cargo.toml index 5b15070b..6c2b5901 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -36,6 +36,7 @@ tokio-stream = { version = "0.1.14", features = ["sync"] } serde = "1.0" serde_yaml = "0.9.25" serde_json = "1.0" +serde_with = "3.8.1" tonic = "0.11" tonic-health = "0.11" prost = "0.12.1" diff --git a/cala-server/Cargo.toml b/cala-server/Cargo.toml index 8dbf5f08..79279b94 100644 --- a/cala-server/Cargo.toml +++ b/cala-server/Cargo.toml @@ -24,6 +24,7 @@ sqlx = { workspace = true } serde = { workspace = true } serde_yaml = { workspace = true } serde_json = { workspace = true } +serde_with = { workspace = true } uuid = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } diff --git a/cala-server/migrations/20231211151809_cala_server_setup.sql b/cala-server/migrations/20231211151809_cala_server_setup.sql index ca906b6d..bb44d6d8 100644 --- a/cala-server/migrations/20231211151809_cala_server_setup.sql +++ b/cala-server/migrations/20231211151809_cala_server_setup.sql @@ -14,3 +14,13 @@ CREATE TABLE import_job_events ( recorded_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), UNIQUE(id, sequence) ); + +CREATE TYPE JobType AS ENUM ('import'); + +CREATE TABLE job_executions ( + id UUID NOT NULL UNIQUE, + type JobType NOT NULL, + executing_server_id VARCHAR, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + reschedule_after TIMESTAMPTZ NOT NULL DEFAULT NOW() +); diff --git a/cala-server/src/app/config.rs b/cala-server/src/app/config.rs new file mode 100644 index 00000000..3c19a07b --- /dev/null +++ b/cala-server/src/app/config.rs @@ -0,0 +1,9 @@ +use serde::{Deserialize, Serialize}; + +use crate::job_execution::JobExecutionConfig; + +#[derive(Clone, Default, Debug, Deserialize, Serialize)] +pub struct AppConfig { + #[serde(default)] + pub job_execution: JobExecutionConfig, +} diff --git a/cala-server/src/app/error.rs b/cala-server/src/app/error.rs index 9d85f11e..7b222ee9 100644 --- a/cala-server/src/app/error.rs +++ b/cala-server/src/app/error.rs @@ -1,6 +1,6 @@ use thiserror::Error; -use crate::import_job::error::*; +use crate::{import_job::error::*, job_execution::error::*}; #[derive(Error, Debug)] pub enum ApplicationError { @@ -8,4 +8,6 @@ pub enum ApplicationError { Sqlx(#[from] sqlx::Error), #[error("ApplicationError - ImportJobError: {0}")] ImportJob(#[from] ImportJobError), + #[error("ApplicationError - JobExecutionError: {0}")] + JobExecution(#[from] JobExecutionError), } diff --git a/cala-server/src/app/mod.rs b/cala-server/src/app/mod.rs index 2a4b5bce..5472a1e1 100644 --- a/cala-server/src/app/mod.rs +++ b/cala-server/src/app/mod.rs @@ -1,28 +1,38 @@ +mod config; mod error; -use sqlx::{Pool, Postgres}; +use sqlx::PgPool; use tracing::instrument; use cala_ledger::{query::*, CalaLedger}; -use crate::import_job::*; +use crate::{import_job::*, job_execution::*}; +pub use config::*; pub use error::*; #[derive(Clone)] pub struct CalaApp { - _pool: Pool, + pool: PgPool, ledger: CalaLedger, import_jobs: ImportJobs, + job_execution: JobExecution, } impl CalaApp { - pub fn new(pool: Pool, ledger: CalaLedger) -> Self { + pub async fn run( + pool: PgPool, + config: AppConfig, + ledger: CalaLedger, + ) -> Result { let import_jobs = ImportJobs::new(&pool); - Self { - _pool: pool, + let mut job_execution = JobExecution::new(&pool, config.job_execution.clone()); + job_execution.start_poll().await?; + Ok(Self { + pool, ledger, import_jobs, - } + job_execution, + }) } pub fn ledger(&self) -> &CalaLedger { @@ -44,7 +54,16 @@ impl CalaApp { })) .build() .expect("Could not build import job"); - Ok(self.import_jobs.create(new_import_job).await?) + let mut tx = self.pool.begin().await?; + let job = self + .import_jobs + .create_in_tx(&mut tx, new_import_job) + .await?; + self.job_execution + .register_import_job(&mut tx, &job) + .await?; + tx.commit().await?; + Ok(job) } #[instrument(name = "cala_server.list_import_jobs", skip(self))] diff --git a/cala-server/src/cli/config.rs b/cala-server/src/cli/config.rs index b1706162..fb26ed50 100644 --- a/cala-server/src/cli/config.rs +++ b/cala-server/src/cli/config.rs @@ -5,7 +5,7 @@ use serde::{Deserialize, Serialize}; use std::path::Path; use super::db::*; -use crate::server::ServerConfig; +use crate::{app::AppConfig, server::ServerConfig}; #[derive(Clone, Default, Serialize, Deserialize)] pub struct Config { @@ -13,25 +13,29 @@ pub struct Config { pub db: DbConfig, #[serde(default)] pub server: ServerConfig, - // #[serde(default)] - // pub app: AppConfig, + #[serde(default)] + pub app: AppConfig, #[serde(default)] pub tracing: TracingConfig, } pub struct EnvOverride { pub db_con: String, + pub server_id: Option, } impl Config { pub fn from_path( path: impl AsRef, - EnvOverride { db_con }: EnvOverride, + EnvOverride { db_con, server_id }: EnvOverride, ) -> anyhow::Result { let config_file = std::fs::read_to_string(path).context("Couldn't read config file")?; let mut config: Config = serde_yaml::from_str(&config_file).context("Couldn't parse config file")?; config.db.pg_con = db_con; + if let Some(server_id) = server_id { + config.app.job_execution.server_id = server_id; + } Ok(config) } diff --git a/cala-server/src/cli/mod.rs b/cala-server/src/cli/mod.rs index 307473ef..700ad948 100644 --- a/cala-server/src/cli/mod.rs +++ b/cala-server/src/cli/mod.rs @@ -25,6 +25,8 @@ struct Cli { value_name = "DIRECTORY" )] cala_home: String, + #[clap(long, env = "CALA_SERVER_ID")] + server_id: Option, #[clap(env = "PG_CON")] pg_con: String, } @@ -32,7 +34,13 @@ struct Cli { pub async fn run() -> anyhow::Result<()> { let cli = Cli::parse(); - let config = Config::from_path(cli.config, EnvOverride { db_con: cli.pg_con })?; + let config = Config::from_path( + cli.config, + EnvOverride { + db_con: cli.pg_con, + server_id: cli.server_id, + }, + )?; run_cmd(&cli.cala_home, config).await?; @@ -46,7 +54,7 @@ async fn run_cmd(cala_home: &str, config: Config) -> anyhow::Result<()> { let pool = db::init_pool(&config.db).await?; let ledger_config = CalaLedgerConfig::builder().pool(pool.clone()).build()?; let ledger = CalaLedger::init(ledger_config).await?; - let app = crate::app::CalaApp::new(pool, ledger); + let app = crate::app::CalaApp::run(pool, config.app, ledger).await?; crate::server::run(config.server, app).await?; Ok(()) } diff --git a/cala-server/src/import_job/repo.rs b/cala-server/src/import_job/repo.rs index 5cc17760..7f6b9816 100644 --- a/cala-server/src/import_job/repo.rs +++ b/cala-server/src/import_job/repo.rs @@ -1,4 +1,4 @@ -use sqlx::PgPool; +use sqlx::{PgPool, Postgres, Transaction}; use super::{cursor::*, entity::*, error::*}; use crate::primitives::ImportJobId; @@ -14,8 +14,11 @@ impl ImportJobs { Self { pool: pool.clone() } } - pub async fn create(&self, new_import_job: NewImportJob) -> Result { - let mut tx = self.pool.begin().await?; + pub async fn create_in_tx( + &self, + tx: &mut Transaction<'_, Postgres>, + new_import_job: NewImportJob, + ) -> Result { let id = new_import_job.id; sqlx::query!( r#"INSERT INTO import_jobs (id, name) @@ -23,12 +26,11 @@ impl ImportJobs { id as ImportJobId, new_import_job.name, ) - .execute(&mut *tx) + .execute(&mut **tx) .await?; let mut events = new_import_job.initial_events(); - events.persist(&mut tx).await?; + events.persist(tx).await?; let import_job = ImportJob::try_from(events)?; - tx.commit().await?; Ok(import_job) } diff --git a/cala-server/src/job_execution/config.rs b/cala-server/src/job_execution/config.rs new file mode 100644 index 00000000..6103abe0 --- /dev/null +++ b/cala-server/src/job_execution/config.rs @@ -0,0 +1,30 @@ +use serde::{Deserialize, Serialize}; + +use std::time::Duration; + +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde_with::serde_as] +pub struct JobExecutionConfig { + #[serde(default = "random_server_id")] + pub server_id: String, + #[serde_as(as = "serde_with::DurationSeconds")] + #[serde(default = "default_poll_interval")] + pub poll_interval: Duration, +} + +impl Default for JobExecutionConfig { + fn default() -> Self { + Self { + server_id: random_server_id(), + poll_interval: default_poll_interval(), + } + } +} + +fn random_server_id() -> String { + uuid::Uuid::new_v4().to_string() +} + +fn default_poll_interval() -> Duration { + Duration::from_secs(5) +} diff --git a/cala-server/src/job_execution/error.rs b/cala-server/src/job_execution/error.rs new file mode 100644 index 00000000..22e5c6ee --- /dev/null +++ b/cala-server/src/job_execution/error.rs @@ -0,0 +1,7 @@ +use thiserror::Error; + +#[derive(Error, Debug)] +pub enum JobExecutionError { + #[error("JobExecutionError - Sqlx: {0}")] + Sqlx(#[from] sqlx::Error), +} diff --git a/cala-server/src/job_execution/mod.rs b/cala-server/src/job_execution/mod.rs new file mode 100644 index 00000000..7d43a5b1 --- /dev/null +++ b/cala-server/src/job_execution/mod.rs @@ -0,0 +1,76 @@ +mod config; +pub mod error; + +use sqlx::{PgPool, Postgres, Transaction}; + +use std::sync::Arc; + +use crate::{import_job::ImportJob, primitives::ImportJobId}; +pub use config::*; +use error::JobExecutionError; + +#[derive(Clone)] +pub struct JobExecution { + pool: PgPool, + config: JobExecutionConfig, + poller_handle: Option>>, +} + +impl JobExecution { + pub fn new(pool: &PgPool, config: JobExecutionConfig) -> Self { + Self { + pool: pool.clone(), + poller_handle: None, + config, + } + } + + pub async fn start_poll(&mut self) -> Result<(), JobExecutionError> { + let pool = self.pool.clone(); + let server_id = self.config.server_id.clone(); + let poll_interval = self.config.poll_interval; + let handle = tokio::spawn(async move { + loop { + let res = sqlx::query!( + r#" + UPDATE job_executions + SET reschedule_after = NOW() + INTERVAL '20 second', + executing_server_id = $1 + WHERE reschedule_after < NOW() + "#, + server_id + ) + .fetch_all(&pool) + .await; + tokio::time::sleep(poll_interval).await; + } + }); + self.poller_handle = Some(Arc::new(handle)); + Ok(()) + } + + pub async fn register_import_job( + &self, + tx: &mut Transaction<'_, Postgres>, + job: &ImportJob, + ) -> Result<(), JobExecutionError> { + sqlx::query!( + r#" + INSERT INTO job_executions (id, type) + VALUES ($1, 'import') + "#, + job.id as ImportJobId, + ) + .execute(&mut **tx) + .await?; + Ok(()) + } +} + +impl Drop for JobExecution { + fn drop(&mut self) { + if let Some(handle) = self.poller_handle.take() { + handle.abort(); + } + } +} diff --git a/cala-server/src/lib.rs b/cala-server/src/lib.rs index 066fde15..678b9da7 100644 --- a/cala-server/src/lib.rs +++ b/cala-server/src/lib.rs @@ -5,5 +5,6 @@ pub mod app; pub mod cli; pub mod graphql; pub mod import_job; +mod job_execution; pub mod primitives; pub mod server;