diff --git a/cala-server/src/extension/cala_outbox_import/config.rs b/cala-server/src/extension/cala_outbox_import/config.rs new file mode 100644 index 00000000..8c50aca6 --- /dev/null +++ b/cala-server/src/extension/cala_outbox_import/config.rs @@ -0,0 +1,15 @@ +use cala_ledger_outbox_client::CalaLedgerOutboxClientConfig; +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct CalaOutboxImportConfig { + pub endpoint: String, +} + +impl From<&CalaOutboxImportConfig> for CalaLedgerOutboxClientConfig { + fn from(config: &CalaOutboxImportConfig) -> Self { + Self { + url: config.endpoint.clone(), + } + } +} diff --git a/cala-server/src/extension/cala_outbox_import/job.rs b/cala-server/src/extension/cala_outbox_import/job.rs new file mode 100644 index 00000000..cb12ecb3 --- /dev/null +++ b/cala-server/src/extension/cala_outbox_import/job.rs @@ -0,0 +1,50 @@ +use async_trait::async_trait; +use cala_ledger::{primitives::DataSourceId, CalaLedger}; +use cala_ledger_outbox_client::{ + CalaLedgerOutboxClient as Client, CalaLedgerOutboxClientConfig as ClientConfig, +}; +use cala_types::outbox::EventSequence; +use futures::StreamExt; +use serde::{Deserialize, Serialize}; +use tracing::instrument; + +use crate::job::{CurrentJob, JobRunner, JobType}; + +pub use super::config::*; + +pub const CALA_OUTBOX_IMPORT_JOB_TYPE: JobType = JobType::new("cala-outbox-import-job"); + +pub struct CalaOutboxImportJob { + config: CalaOutboxImportConfig, + ledger: CalaLedger, +} + +#[derive(Default, Serialize, Deserialize)] +struct CalaOutboxImportJobState { + last_synced: EventSequence, +} + +#[async_trait] +impl JobRunner for CalaOutboxImportJob { + #[instrument(name = "import_job.cala_outbox.run", skip(self, current_job), err)] + async fn run(&self, mut current_job: CurrentJob) -> Result<(), Box> { + println!( + "Executing CalaOutboxImportJob importing from endpoint: {}", + self.config.endpoint + ); + let mut client = Client::connect(ClientConfig::from(&self.config)).await?; + let mut state = current_job + .state::()? + .unwrap_or_default(); + let mut stream = client.subscribe(Some(state.last_synced)).await?; + while let Some(Ok(message)) = stream.next().await { + let mut tx = current_job.pool().begin().await?; + state.last_synced = message.sequence; + current_job.update_state(&mut tx, &state).await?; + self.ledger + .sync_outbox_event(tx, DataSourceId::from(current_job.id()), message) + .await?; + } + Ok(()) + } +} diff --git a/cala-server/src/extension/cala_outbox_import/mod.rs b/cala-server/src/extension/cala_outbox_import/mod.rs new file mode 100644 index 00000000..59f75c4b --- /dev/null +++ b/cala-server/src/extension/cala_outbox_import/mod.rs @@ -0,0 +1,5 @@ +mod config; +mod job; +mod mutation; + +pub use mutation::*; diff --git a/cala-server/src/extension/cala_outbox_import/mutation.rs b/cala-server/src/extension/cala_outbox_import/mutation.rs new file mode 100644 index 00000000..b110f908 --- /dev/null +++ b/cala-server/src/extension/cala_outbox_import/mutation.rs @@ -0,0 +1,50 @@ +use async_graphql::*; + +use super::config::*; +use crate::{app::CalaApp, graphql::Job, job::JobType}; + +#[derive(InputObject)] +pub struct CalaOutboxImportJobCreateInput { + pub job_name: String, + pub description: Option, + pub endpoint: String, +} + +#[derive(SimpleObject)] +pub struct CalaOutboxImportJobCreatePayload { + pub job: Job, +} + +#[derive(Default)] +pub struct Mutation; + +#[async_graphql::Object(name = "BigQueryMutation")] +impl Mutation { + async fn cala_outbox_import_job_create( + &self, + ctx: &Context<'_>, + input: CalaOutboxImportJobCreateInput, + ) -> async_graphql::Result { + let app = ctx.data_unchecked::(); + let name = input.job_name.clone(); + let job = app + .create_and_spawn_job( + name, + None, + super::job::CALA_OUTBOX_IMPORT_JOB_TYPE, + CalaOutboxImportConfig::from(input), + ) + .await?; + Ok(CalaOutboxImportJobCreatePayload { + job: Job::from(job), + }) + } +} + +impl From for CalaOutboxImportConfig { + fn from(input: CalaOutboxImportJobCreateInput) -> Self { + CalaOutboxImportConfig { + endpoint: input.endpoint, + } + } +} diff --git a/cala-server/src/extension/core.rs b/cala-server/src/extension/core.rs index 0abbff93..639dd1ab 100644 --- a/cala-server/src/extension/core.rs +++ b/cala-server/src/extension/core.rs @@ -1,6 +1,7 @@ #[derive(async_graphql::SimpleObject, Default)] pub struct CoreMutationExtension { - hello: String, + #[graphql(flatten)] + cala_outbox_import: super::cala_outbox_import::Mutation, } #[derive(async_graphql::SimpleObject, Default)] diff --git a/cala-server/src/extension/mod.rs b/cala-server/src/extension/mod.rs index e0dc2a96..ae897ff8 100644 --- a/cala-server/src/extension/mod.rs +++ b/cala-server/src/extension/mod.rs @@ -1,5 +1,6 @@ use async_graphql::*; +mod cala_outbox_import; pub mod core; pub trait MutationExtensionMarker: Default + OutputType + ContainerType + 'static {} diff --git a/cala-server/src/graphql/job.rs b/cala-server/src/graphql/job.rs index ad9f6d78..ddd07607 100644 --- a/cala-server/src/graphql/job.rs +++ b/cala-server/src/graphql/job.rs @@ -3,13 +3,6 @@ use serde::{Deserialize, Serialize}; use super::primitives::*; -#[derive(InputObject)] -pub struct JobCreateInput { - pub name: String, - pub description: Option, - pub endpoint: String, -} - #[derive(SimpleObject)] pub struct Job { pub id: ID, diff --git a/cala-server/src/graphql/mod.rs b/cala-server/src/graphql/mod.rs index ab67c53c..7e6575c2 100644 --- a/cala-server/src/graphql/mod.rs +++ b/cala-server/src/graphql/mod.rs @@ -8,6 +8,7 @@ mod timestamp; use async_graphql::*; +pub use job::Job; pub use schema::*; use crate::{app::CalaApp, extension::MutationExtensionMarker};