diff --git a/cala-ledger-outbox-client/src/config.rs b/cala-ledger-outbox-client/src/config.rs index b421900e..96cffb27 100644 --- a/cala-ledger-outbox-client/src/config.rs +++ b/cala-ledger-outbox-client/src/config.rs @@ -4,3 +4,9 @@ use serde::{Deserialize, Serialize}; pub struct CalaLedgerOutboxClientConfig { pub url: String, } + +impl CalaLedgerOutboxClientConfig { + pub fn new(url: String) -> Self { + Self { url } + } +} diff --git a/cala-server/.sqlx/query-6b997e2db24d461ce3558091f26e1db37ed5635b50c84a1cd2a2d6c8f94932c1.json b/cala-server/.sqlx/query-2089d470f13ecf990b01d97ce5d9c8e42ac034b6e4db1c0c04f064deda1b7c24.json similarity index 54% rename from cala-server/.sqlx/query-6b997e2db24d461ce3558091f26e1db37ed5635b50c84a1cd2a2d6c8f94932c1.json rename to cala-server/.sqlx/query-2089d470f13ecf990b01d97ce5d9c8e42ac034b6e4db1c0c04f064deda1b7c24.json index c9413b7c..a91e8496 100644 --- a/cala-server/.sqlx/query-6b997e2db24d461ce3558091f26e1db37ed5635b50c84a1cd2a2d6c8f94932c1.json +++ b/cala-server/.sqlx/query-2089d470f13ecf990b01d97ce5d9c8e42ac034b6e4db1c0c04f064deda1b7c24.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "INSERT INTO jobs (id, type, name, description, data)\n VALUES ($1, $2, $3, $4, $5)", + "query": "INSERT INTO jobs (id, type, name, description, state_json)\n VALUES ($1, $2, $3, $4, $5)", "describe": { "columns": [], "parameters": { @@ -14,5 +14,5 @@ }, "nullable": [] }, - "hash": "6b997e2db24d461ce3558091f26e1db37ed5635b50c84a1cd2a2d6c8f94932c1" + "hash": "2089d470f13ecf990b01d97ce5d9c8e42ac034b6e4db1c0c04f064deda1b7c24" } diff --git a/cala-server/.sqlx/query-210e2319ae5fa66ac0b06be370965450c1ea71f2e0b9c8e194be31732db46f8b.json b/cala-server/.sqlx/query-210e2319ae5fa66ac0b06be370965450c1ea71f2e0b9c8e194be31732db46f8b.json deleted file mode 100644 index 386f96b6..00000000 --- a/cala-server/.sqlx/query-210e2319ae5fa66ac0b06be370965450c1ea71f2e0b9c8e194be31732db46f8b.json +++ /dev/null @@ -1,35 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n WITH selected_jobs AS (\n SELECT id\n FROM job_executions\n WHERE reschedule_after < NOW()\n AND state = 'pending'\n LIMIT $1\n FOR UPDATE\n )\n UPDATE job_executions AS je\n SET state = 'running', reschedule_after = NOW() + $2::interval\n FROM selected_jobs\n WHERE je.id = selected_jobs.id\n RETURNING je.id AS \"id!: JobId\", je.state_json, je.next_attempt\n ", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "id!: JobId", - "type_info": "Uuid" - }, - { - "ordinal": 1, - "name": "state_json", - "type_info": "Jsonb" - }, - { - "ordinal": 2, - "name": "next_attempt", - "type_info": "Int4" - } - ], - "parameters": { - "Left": [ - "Int8", - "Interval" - ] - }, - "nullable": [ - false, - true, - false - ] - }, - "hash": "210e2319ae5fa66ac0b06be370965450c1ea71f2e0b9c8e194be31732db46f8b" -} diff --git a/cala-server/.sqlx/query-25de3171e846bacfaffb81c7e1785fa9321982e3396d797b0573884149e9b937.json b/cala-server/.sqlx/query-25de3171e846bacfaffb81c7e1785fa9321982e3396d797b0573884149e9b937.json deleted file mode 100644 index a8b08100..00000000 --- a/cala-server/.sqlx/query-25de3171e846bacfaffb81c7e1785fa9321982e3396d797b0573884149e9b937.json +++ /dev/null @@ -1,15 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n UPDATE job_executions\n SET state_json = $1\n WHERE id = $2\n ", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Jsonb", - "Uuid" - ] - }, - "nullable": [] - }, - "hash": "25de3171e846bacfaffb81c7e1785fa9321982e3396d797b0573884149e9b937" -} diff --git a/cala-server/.sqlx/query-0ff4c7b86806df5eff0b3189b809c217a7fe77da062c51ae7cf7a3b0e01beb29.json b/cala-server/.sqlx/query-4d290b36ed9d75a80f77f6f0f4916b47ad8825dab856682433165c332a761b78.json similarity index 82% rename from cala-server/.sqlx/query-0ff4c7b86806df5eff0b3189b809c217a7fe77da062c51ae7cf7a3b0e01beb29.json rename to cala-server/.sqlx/query-4d290b36ed9d75a80f77f6f0f4916b47ad8825dab856682433165c332a761b78.json index 2325d07b..36d78591 100644 --- a/cala-server/.sqlx/query-0ff4c7b86806df5eff0b3189b809c217a7fe77da062c51ae7cf7a3b0e01beb29.json +++ b/cala-server/.sqlx/query-4d290b36ed9d75a80f77f6f0f4916b47ad8825dab856682433165c332a761b78.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "SELECT id as \"id: JobId\", type AS job_type, name, description, data, completed_at, last_error\n FROM jobs\n WHERE id = $1", + "query": "SELECT id as \"id: JobId\", type AS job_type, name, description, state_json, completed_at, last_error\n FROM jobs\n WHERE id = $1", "describe": { "columns": [ { @@ -25,7 +25,7 @@ }, { "ordinal": 4, - "name": "data", + "name": "state_json", "type_info": "Jsonb" }, { @@ -54,5 +54,5 @@ true ] }, - "hash": "0ff4c7b86806df5eff0b3189b809c217a7fe77da062c51ae7cf7a3b0e01beb29" + "hash": "4d290b36ed9d75a80f77f6f0f4916b47ad8825dab856682433165c332a761b78" } diff --git a/cala-server/.sqlx/query-8069f648bfca81fd8f081308f3d74b0002c83a9556f5d0969eb8652b0c23090a.json b/cala-server/.sqlx/query-8069f648bfca81fd8f081308f3d74b0002c83a9556f5d0969eb8652b0c23090a.json new file mode 100644 index 00000000..2627de87 --- /dev/null +++ b/cala-server/.sqlx/query-8069f648bfca81fd8f081308f3d74b0002c83a9556f5d0969eb8652b0c23090a.json @@ -0,0 +1,35 @@ +{ + "db_name": "PostgreSQL", + "query": "\n WITH selected_jobs AS (\n SELECT je.id, state_json\n FROM job_executions je\n JOIN jobs ON je.id = jobs.id\n WHERE reschedule_after < NOW()\n AND je.state = 'pending'\n LIMIT $1\n FOR UPDATE\n )\n UPDATE job_executions AS je\n SET state = 'running', reschedule_after = NOW() + $2::interval\n FROM selected_jobs\n WHERE je.id = selected_jobs.id\n RETURNING je.id AS \"id!: JobId\", selected_jobs.state_json, je.next_attempt\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id!: JobId", + "type_info": "Uuid" + }, + { + "ordinal": 1, + "name": "state_json", + "type_info": "Jsonb" + }, + { + "ordinal": 2, + "name": "next_attempt", + "type_info": "Int4" + } + ], + "parameters": { + "Left": [ + "Int8", + "Interval" + ] + }, + "nullable": [ + false, + true, + false + ] + }, + "hash": "8069f648bfca81fd8f081308f3d74b0002c83a9556f5d0969eb8652b0c23090a" +} diff --git a/cala-server/.sqlx/query-8f1dce84b193d193a2e161e6cd90768b4a84a6a9664209c6de998ce1ea65dd64.json b/cala-server/.sqlx/query-8f1dce84b193d193a2e161e6cd90768b4a84a6a9664209c6de998ce1ea65dd64.json new file mode 100644 index 00000000..7b11a84b --- /dev/null +++ b/cala-server/.sqlx/query-8f1dce84b193d193a2e161e6cd90768b4a84a6a9664209c6de998ce1ea65dd64.json @@ -0,0 +1,15 @@ +{ + "db_name": "PostgreSQL", + "query": "\n UPDATE jobs\n SET state_json = $1\n WHERE id = $2\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Jsonb", + "Uuid" + ] + }, + "nullable": [] + }, + "hash": "8f1dce84b193d193a2e161e6cd90768b4a84a6a9664209c6de998ce1ea65dd64" +} diff --git a/cala-server/migrations/20231211151809_cala_server_setup.sql b/cala-server/migrations/20231211151809_cala_server_setup.sql index a1e924b0..83500e83 100644 --- a/cala-server/migrations/20231211151809_cala_server_setup.sql +++ b/cala-server/migrations/20231211151809_cala_server_setup.sql @@ -3,7 +3,7 @@ CREATE TABLE jobs ( name VARCHAR NOT NULL, type VARCHAR NOT NULL, description VARCHAR, - data JSONB, + state_json JSONB, last_error VARCHAR, completed_at TIMESTAMPTZ, modified_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), @@ -18,9 +18,8 @@ CREATE TABLE job_executions ( next_attempt INT NOT NULL DEFAULT 1, name VARCHAR NOT NULL, state JobExecutionState NOT NULL DEFAULT 'pending', - state_json JSONB, - created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), - reschedule_after TIMESTAMPTZ NOT NULL DEFAULT NOW() + reschedule_after TIMESTAMPTZ NOT NULL DEFAULT NOW(), + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() ); CREATE TABLE integrations ( diff --git a/cala-server/src/extension/cala_outbox_import/config.rs b/cala-server/src/extension/cala_outbox_import/config.rs index 8c50aca6..e69de29b 100644 --- a/cala-server/src/extension/cala_outbox_import/config.rs +++ b/cala-server/src/extension/cala_outbox_import/config.rs @@ -1,15 +0,0 @@ -use cala_ledger_outbox_client::CalaLedgerOutboxClientConfig; -use serde::{Deserialize, Serialize}; - -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct CalaOutboxImportConfig { - pub endpoint: String, -} - -impl From<&CalaOutboxImportConfig> for CalaLedgerOutboxClientConfig { - fn from(config: &CalaOutboxImportConfig) -> Self { - Self { - url: config.endpoint.clone(), - } - } -} diff --git a/cala-server/src/extension/cala_outbox_import/job.rs b/cala-server/src/extension/cala_outbox_import/job.rs index 3da87f81..ff9818b0 100644 --- a/cala-server/src/extension/cala_outbox_import/job.rs +++ b/cala-server/src/extension/cala_outbox_import/job.rs @@ -12,8 +12,6 @@ use tracing::instrument; use crate::job::*; -pub use super::config::*; - pub const CALA_OUTBOX_IMPORT_JOB_TYPE: JobType = JobType::new("cala-outbox-import-job"); #[derive(Default)] @@ -26,24 +24,23 @@ impl JobInitializer for CalaOutboxImportJobInitializer { fn init( &self, - job: Job, + _job: Job, ledger: &CalaLedger, ) -> Result, Box> { Ok(Box::new(CalaOutboxImportJob { - config: job.data()?, ledger: ledger.clone(), })) } } pub struct CalaOutboxImportJob { - config: CalaOutboxImportConfig, ledger: CalaLedger, } #[derive(Default, Serialize, Deserialize)] -struct CalaOutboxImportJobState { - last_synced: EventSequence, +pub struct CalaOutboxImportJobState { + pub endpoint: String, + pub last_synced: EventSequence, } #[async_trait] @@ -53,21 +50,21 @@ impl JobRunner for CalaOutboxImportJob { &mut self, mut current_job: CurrentJob, ) -> Result> { + let mut state = current_job + .state::()? + .expect("Job state"); println!( "Executing CalaOutboxImportJob importing from endpoint: {}", - self.config.endpoint + state.endpoint ); - let mut client = Client::connect(ClientConfig::from(&self.config)).await?; - let mut state = current_job - .state::()? - .unwrap_or_default(); + let mut client = Client::connect(ClientConfig::new(state.endpoint.clone())).await?; let mut stream = client.subscribe(Some(state.last_synced)).await?; loop { match stream.next().await { Some(Ok(message)) => { let mut tx = current_job.pool().begin().await?; state.last_synced = message.sequence; - current_job.update_execution_state(&mut tx, &state).await?; + current_job.update_state(&mut tx, &state).await?; self.ledger .sync_outbox_event( tx, diff --git a/cala-server/src/extension/cala_outbox_import/mod.rs b/cala-server/src/extension/cala_outbox_import/mod.rs index 06611b17..f5c969a0 100644 --- a/cala-server/src/extension/cala_outbox_import/mod.rs +++ b/cala-server/src/extension/cala_outbox_import/mod.rs @@ -1,4 +1,3 @@ -mod config; mod job; mod mutation; diff --git a/cala-server/src/extension/cala_outbox_import/mutation.rs b/cala-server/src/extension/cala_outbox_import/mutation.rs index f8687642..e5d63f3b 100644 --- a/cala-server/src/extension/cala_outbox_import/mutation.rs +++ b/cala-server/src/extension/cala_outbox_import/mutation.rs @@ -41,7 +41,7 @@ impl Mutation { input.job_id, input.name.clone(), input.description.clone(), - CalaOutboxImportConfig::from(input), + CalaOutboxImportJobState::from(input), ) .await?; Ok(CalaOutboxImportJobCreatePayload { @@ -50,10 +50,11 @@ impl Mutation { } } -impl From for CalaOutboxImportConfig { +impl From for CalaOutboxImportJobState { fn from(input: CalaOutboxImportJobCreateInput) -> Self { - CalaOutboxImportConfig { + Self { endpoint: input.endpoint, + ..Default::default() } } } diff --git a/cala-server/src/job/current.rs b/cala-server/src/job/current.rs index 27074bd1..c9e04316 100644 --- a/cala-server/src/job/current.rs +++ b/cala-server/src/job/current.rs @@ -38,7 +38,7 @@ impl CurrentJob { } } - pub async fn update_execution_state( + pub async fn update_state( &mut self, db: &mut Transaction<'_, Postgres>, state: T, @@ -46,7 +46,7 @@ impl CurrentJob { let state_json = serde_json::to_value(state).map_err(JobError::CouldNotSerializeState)?; sqlx::query!( r#" - UPDATE job_executions + UPDATE jobs SET state_json = $1 WHERE id = $2 "#, diff --git a/cala-server/src/job/entity.rs b/cala-server/src/job/entity.rs index 5715fc3e..ef377afd 100644 --- a/cala-server/src/job/entity.rs +++ b/cala-server/src/job/entity.rs @@ -32,7 +32,7 @@ pub struct Job { pub job_type: JobType, pub description: Option, pub last_error: Option, - data: serde_json::Value, + state: serde_json::Value, pub completed_at: Option>, } @@ -41,21 +41,21 @@ impl Job { name: String, job_type: JobType, description: Option, - data: T, + initial_state: T, ) -> Self { Self { id: JobId::new(), name, job_type, description, - data: serde_json::to_value(data).expect("could not serialize job data"), + state: serde_json::to_value(initial_state).expect("could not serialize job state"), last_error: None, completed_at: None, } } - pub fn data(&self) -> Result { - serde_json::from_value(self.data.clone()) + pub fn state(&self) -> Result { + serde_json::from_value(self.state.clone()) } pub(super) fn success(&mut self) { diff --git a/cala-server/src/job/executor.rs b/cala-server/src/job/executor.rs index e8e96433..233cc30e 100644 --- a/cala-server/src/job/executor.rs +++ b/cala-server/src/job/executor.rs @@ -145,10 +145,11 @@ impl JobExecutor { let rows = sqlx::query!( r#" WITH selected_jobs AS ( - SELECT id - FROM job_executions + SELECT je.id, state_json + FROM job_executions je + JOIN jobs ON je.id = jobs.id WHERE reschedule_after < NOW() - AND state = 'pending' + AND je.state = 'pending' LIMIT $1 FOR UPDATE ) @@ -156,7 +157,7 @@ impl JobExecutor { SET state = 'running', reschedule_after = NOW() + $2::interval FROM selected_jobs WHERE je.id = selected_jobs.id - RETURNING je.id AS "id!: JobId", je.state_json, je.next_attempt + RETURNING je.id AS "id!: JobId", selected_jobs.state_json, je.next_attempt "#, poll_limit as i32, pg_interval diff --git a/cala-server/src/job/repo.rs b/cala-server/src/job/repo.rs index 81be81f5..994a8650 100644 --- a/cala-server/src/job/repo.rs +++ b/cala-server/src/job/repo.rs @@ -20,14 +20,14 @@ impl JobRepo { job: Job, ) -> Result { sqlx::query!( - r#"INSERT INTO jobs (id, type, name, description, data) + r#"INSERT INTO jobs (id, type, name, description, state_json) VALUES ($1, $2, $3, $4, $5)"#, job.id as JobId, &job.job_type as &JobType, &job.name, job.description.as_ref(), - job.data::() - .expect("Could not serialize data") + job.state::() + .expect("Could not serialize state") ) .execute(&mut **db) .await?; @@ -56,7 +56,7 @@ impl JobRepo { pub async fn find_by_id(&self, id: JobId) -> Result { let row = sqlx::query!( - r#"SELECT id as "id: JobId", type AS job_type, name, description, data, completed_at, last_error + r#"SELECT id as "id: JobId", type AS job_type, name, description, state_json, completed_at, last_error FROM jobs WHERE id = $1"#, id as JobId @@ -67,7 +67,7 @@ impl JobRepo { row.name, JobType::from_string(row.job_type), row.description, - row.data, + row.state_json, ); job.id = row.id; job.completed_at = row.completed_at;