Skip to content

Commit

Permalink
refactor: move state_json from job_execution -> jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
bodymindarts committed Oct 11, 2024
1 parent 8750e6e commit 49ce79b
Show file tree
Hide file tree
Showing 16 changed files with 95 additions and 107 deletions.
6 changes: 6 additions & 0 deletions cala-ledger-outbox-client/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,9 @@ use serde::{Deserialize, Serialize};
pub struct CalaLedgerOutboxClientConfig {
pub url: String,
}

impl CalaLedgerOutboxClientConfig {
pub fn new(url: String) -> Self {
Self { url }
}
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

This file was deleted.

This file was deleted.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 3 additions & 4 deletions cala-server/migrations/20231211151809_cala_server_setup.sql
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ CREATE TABLE jobs (
name VARCHAR NOT NULL,
type VARCHAR NOT NULL,
description VARCHAR,
data JSONB,
state_json JSONB,
last_error VARCHAR,
completed_at TIMESTAMPTZ,
modified_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
Expand All @@ -18,9 +18,8 @@ CREATE TABLE job_executions (
next_attempt INT NOT NULL DEFAULT 1,
name VARCHAR NOT NULL,
state JobExecutionState NOT NULL DEFAULT 'pending',
state_json JSONB,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
reschedule_after TIMESTAMPTZ NOT NULL DEFAULT NOW()
reschedule_after TIMESTAMPTZ NOT NULL DEFAULT NOW(),
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

CREATE TABLE integrations (
Expand Down
15 changes: 0 additions & 15 deletions cala-server/src/extension/cala_outbox_import/config.rs
Original file line number Diff line number Diff line change
@@ -1,15 +0,0 @@
use cala_ledger_outbox_client::CalaLedgerOutboxClientConfig;
use serde::{Deserialize, Serialize};

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct CalaOutboxImportConfig {
pub endpoint: String,
}

impl From<&CalaOutboxImportConfig> for CalaLedgerOutboxClientConfig {
fn from(config: &CalaOutboxImportConfig) -> Self {
Self {
url: config.endpoint.clone(),
}
}
}
23 changes: 10 additions & 13 deletions cala-server/src/extension/cala_outbox_import/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ use tracing::instrument;

use crate::job::*;

pub use super::config::*;

pub const CALA_OUTBOX_IMPORT_JOB_TYPE: JobType = JobType::new("cala-outbox-import-job");

#[derive(Default)]
Expand All @@ -26,24 +24,23 @@ impl JobInitializer for CalaOutboxImportJobInitializer {

fn init(
&self,
job: Job,
_job: Job,
ledger: &CalaLedger,
) -> Result<Box<dyn JobRunner>, Box<dyn std::error::Error>> {
Ok(Box::new(CalaOutboxImportJob {
config: job.data()?,
ledger: ledger.clone(),
}))
}
}

pub struct CalaOutboxImportJob {
config: CalaOutboxImportConfig,
ledger: CalaLedger,
}

#[derive(Default, Serialize, Deserialize)]
struct CalaOutboxImportJobState {
last_synced: EventSequence,
pub struct CalaOutboxImportJobState {
pub endpoint: String,
pub last_synced: EventSequence,
}

#[async_trait]
Expand All @@ -53,21 +50,21 @@ impl JobRunner for CalaOutboxImportJob {
&mut self,
mut current_job: CurrentJob,
) -> Result<JobCompletion, Box<dyn std::error::Error>> {
let mut state = current_job
.state::<CalaOutboxImportJobState>()?
.expect("Job state");
println!(
"Executing CalaOutboxImportJob importing from endpoint: {}",
self.config.endpoint
state.endpoint
);
let mut client = Client::connect(ClientConfig::from(&self.config)).await?;
let mut state = current_job
.state::<CalaOutboxImportJobState>()?
.unwrap_or_default();
let mut client = Client::connect(ClientConfig::new(state.endpoint.clone())).await?;
let mut stream = client.subscribe(Some(state.last_synced)).await?;
loop {
match stream.next().await {
Some(Ok(message)) => {
let mut tx = current_job.pool().begin().await?;
state.last_synced = message.sequence;
current_job.update_execution_state(&mut tx, &state).await?;
current_job.update_state(&mut tx, &state).await?;
self.ledger
.sync_outbox_event(
tx,
Expand Down
1 change: 0 additions & 1 deletion cala-server/src/extension/cala_outbox_import/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
mod config;
mod job;
mod mutation;

Expand Down
7 changes: 4 additions & 3 deletions cala-server/src/extension/cala_outbox_import/mutation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ impl Mutation {
input.job_id,
input.name.clone(),
input.description.clone(),
CalaOutboxImportConfig::from(input),
CalaOutboxImportJobState::from(input),
)
.await?;
Ok(CalaOutboxImportJobCreatePayload {
Expand All @@ -50,10 +50,11 @@ impl Mutation {
}
}

impl From<CalaOutboxImportJobCreateInput> for CalaOutboxImportConfig {
impl From<CalaOutboxImportJobCreateInput> for CalaOutboxImportJobState {
fn from(input: CalaOutboxImportJobCreateInput) -> Self {
CalaOutboxImportConfig {
Self {
endpoint: input.endpoint,
..Default::default()
}
}
}
4 changes: 2 additions & 2 deletions cala-server/src/job/current.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,15 @@ impl CurrentJob {
}
}

pub async fn update_execution_state<T: Serialize>(
pub async fn update_state<T: Serialize>(
&mut self,
db: &mut Transaction<'_, Postgres>,
state: T,
) -> Result<(), JobError> {
let state_json = serde_json::to_value(state).map_err(JobError::CouldNotSerializeState)?;
sqlx::query!(
r#"
UPDATE job_executions
UPDATE jobs
SET state_json = $1
WHERE id = $2
"#,
Expand Down
10 changes: 5 additions & 5 deletions cala-server/src/job/entity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub struct Job {
pub job_type: JobType,
pub description: Option<String>,
pub last_error: Option<String>,
data: serde_json::Value,
state: serde_json::Value,
pub completed_at: Option<DateTime<Utc>>,
}

Expand All @@ -41,21 +41,21 @@ impl Job {
name: String,
job_type: JobType,
description: Option<String>,
data: T,
initial_state: T,
) -> Self {
Self {
id: JobId::new(),
name,
job_type,
description,
data: serde_json::to_value(data).expect("could not serialize job data"),
state: serde_json::to_value(initial_state).expect("could not serialize job state"),
last_error: None,
completed_at: None,
}
}

pub fn data<T: serde::de::DeserializeOwned>(&self) -> Result<T, serde_json::Error> {
serde_json::from_value(self.data.clone())
pub fn state<T: serde::de::DeserializeOwned>(&self) -> Result<T, serde_json::Error> {
serde_json::from_value(self.state.clone())
}

pub(super) fn success(&mut self) {
Expand Down
9 changes: 5 additions & 4 deletions cala-server/src/job/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,18 +145,19 @@ impl JobExecutor {
let rows = sqlx::query!(
r#"
WITH selected_jobs AS (
SELECT id
FROM job_executions
SELECT je.id, state_json
FROM job_executions je
JOIN jobs ON je.id = jobs.id
WHERE reschedule_after < NOW()
AND state = 'pending'
AND je.state = 'pending'
LIMIT $1
FOR UPDATE
)
UPDATE job_executions AS je
SET state = 'running', reschedule_after = NOW() + $2::interval
FROM selected_jobs
WHERE je.id = selected_jobs.id
RETURNING je.id AS "id!: JobId", je.state_json, je.next_attempt
RETURNING je.id AS "id!: JobId", selected_jobs.state_json, je.next_attempt
"#,
poll_limit as i32,
pg_interval
Expand Down
10 changes: 5 additions & 5 deletions cala-server/src/job/repo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ impl JobRepo {
job: Job,
) -> Result<Job, JobError> {
sqlx::query!(
r#"INSERT INTO jobs (id, type, name, description, data)
r#"INSERT INTO jobs (id, type, name, description, state_json)
VALUES ($1, $2, $3, $4, $5)"#,
job.id as JobId,
&job.job_type as &JobType,
&job.name,
job.description.as_ref(),
job.data::<serde_json::Value>()
.expect("Could not serialize data")
job.state::<serde_json::Value>()
.expect("Could not serialize state")
)
.execute(&mut **db)
.await?;
Expand Down Expand Up @@ -56,7 +56,7 @@ impl JobRepo {

pub async fn find_by_id(&self, id: JobId) -> Result<Job, JobError> {
let row = sqlx::query!(
r#"SELECT id as "id: JobId", type AS job_type, name, description, data, completed_at, last_error
r#"SELECT id as "id: JobId", type AS job_type, name, description, state_json, completed_at, last_error
FROM jobs
WHERE id = $1"#,
id as JobId
Expand All @@ -67,7 +67,7 @@ impl JobRepo {
row.name,
JobType::from_string(row.job_type),
row.description,
row.data,
row.state_json,
);
job.id = row.id;
job.completed_at = row.completed_at;
Expand Down

0 comments on commit 49ce79b

Please sign in to comment.