Skip to content

Commit

Permalink
chore: cala_outbox_import extension
Browse files Browse the repository at this point in the history
  • Loading branch information
bodymindarts committed May 12, 2024
1 parent ea98ce9 commit fd991c0
Show file tree
Hide file tree
Showing 8 changed files with 124 additions and 8 deletions.
15 changes: 15 additions & 0 deletions cala-server/src/extension/cala_outbox_import/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
use cala_ledger_outbox_client::CalaLedgerOutboxClientConfig;
use serde::{Deserialize, Serialize};

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct CalaOutboxImportConfig {
pub endpoint: String,
}

impl From<&CalaOutboxImportConfig> for CalaLedgerOutboxClientConfig {
fn from(config: &CalaOutboxImportConfig) -> Self {
Self {
url: config.endpoint.clone(),
}
}
}
50 changes: 50 additions & 0 deletions cala-server/src/extension/cala_outbox_import/job.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
use async_trait::async_trait;
use cala_ledger::{primitives::DataSourceId, CalaLedger};
use cala_ledger_outbox_client::{
CalaLedgerOutboxClient as Client, CalaLedgerOutboxClientConfig as ClientConfig,
};
use cala_types::outbox::EventSequence;
use futures::StreamExt;
use serde::{Deserialize, Serialize};
use tracing::instrument;

use crate::job::{CurrentJob, JobRunner, JobType};

pub use super::config::*;

pub const CALA_OUTBOX_IMPORT_JOB_TYPE: JobType = JobType::new("cala-outbox-import-job");

pub struct CalaOutboxImportJob {
config: CalaOutboxImportConfig,
ledger: CalaLedger,
}

#[derive(Default, Serialize, Deserialize)]
struct CalaOutboxImportJobState {
last_synced: EventSequence,
}

#[async_trait]
impl JobRunner for CalaOutboxImportJob {
#[instrument(name = "import_job.cala_outbox.run", skip(self, current_job), err)]
async fn run(&self, mut current_job: CurrentJob) -> Result<(), Box<dyn std::error::Error>> {
println!(
"Executing CalaOutboxImportJob importing from endpoint: {}",
self.config.endpoint
);
let mut client = Client::connect(ClientConfig::from(&self.config)).await?;
let mut state = current_job
.state::<CalaOutboxImportJobState>()?
.unwrap_or_default();
let mut stream = client.subscribe(Some(state.last_synced)).await?;
while let Some(Ok(message)) = stream.next().await {
let mut tx = current_job.pool().begin().await?;
state.last_synced = message.sequence;
current_job.update_state(&mut tx, &state).await?;
self.ledger
.sync_outbox_event(tx, DataSourceId::from(current_job.id()), message)
.await?;
}
Ok(())
}
}
5 changes: 5 additions & 0 deletions cala-server/src/extension/cala_outbox_import/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
mod config;
mod job;
mod mutation;

pub use mutation::*;
50 changes: 50 additions & 0 deletions cala-server/src/extension/cala_outbox_import/mutation.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
use async_graphql::*;

use super::config::*;
use crate::{app::CalaApp, graphql::Job, job::JobType};

#[derive(InputObject)]
pub struct CalaOutboxImportJobCreateInput {
pub job_name: String,
pub description: Option<String>,
pub endpoint: String,
}

#[derive(SimpleObject)]
pub struct CalaOutboxImportJobCreatePayload {
pub job: Job,
}

#[derive(Default)]
pub struct Mutation;

#[async_graphql::Object(name = "BigQueryMutation")]
impl Mutation {
async fn cala_outbox_import_job_create(
&self,
ctx: &Context<'_>,
input: CalaOutboxImportJobCreateInput,
) -> async_graphql::Result<CalaOutboxImportJobCreatePayload> {
let app = ctx.data_unchecked::<CalaApp>();
let name = input.job_name.clone();
let job = app
.create_and_spawn_job(
name,
None,
super::job::CALA_OUTBOX_IMPORT_JOB_TYPE,
CalaOutboxImportConfig::from(input),
)
.await?;
Ok(CalaOutboxImportJobCreatePayload {
job: Job::from(job),
})
}
}

impl From<CalaOutboxImportJobCreateInput> for CalaOutboxImportConfig {
fn from(input: CalaOutboxImportJobCreateInput) -> Self {
CalaOutboxImportConfig {
endpoint: input.endpoint,
}
}
}
3 changes: 2 additions & 1 deletion cala-server/src/extension/core.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#[derive(async_graphql::SimpleObject, Default)]
pub struct CoreMutationExtension {
hello: String,
#[graphql(flatten)]
cala_outbox_import: super::cala_outbox_import::Mutation,
}

#[derive(async_graphql::SimpleObject, Default)]
Expand Down
1 change: 1 addition & 0 deletions cala-server/src/extension/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use async_graphql::*;

mod cala_outbox_import;
pub mod core;

pub trait MutationExtensionMarker: Default + OutputType + ContainerType + 'static {}
7 changes: 0 additions & 7 deletions cala-server/src/graphql/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,6 @@ use serde::{Deserialize, Serialize};

use super::primitives::*;

#[derive(InputObject)]
pub struct JobCreateInput {
pub name: String,
pub description: Option<String>,
pub endpoint: String,
}

#[derive(SimpleObject)]
pub struct Job {
pub id: ID,
Expand Down
1 change: 1 addition & 0 deletions cala-server/src/graphql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ mod timestamp;

use async_graphql::*;

pub use job::Job;
pub use schema::*;

use crate::{app::CalaApp, extension::MutationExtensionMarker};
Expand Down

0 comments on commit fd991c0

Please sign in to comment.