Skip to content

Commit

Permalink
chore: outbox importer receiving
Browse files Browse the repository at this point in the history
  • Loading branch information
bodymindarts committed May 8, 2024
1 parent 31060e9 commit b56ac07
Show file tree
Hide file tree
Showing 12 changed files with 144 additions and 56 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion cala-ledger-outbox-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ impl CalaLedgerOutboxClient {
})
}

#[instrument(name = "cala_ledger.outbox_client.subscribe", skip(self))]
#[instrument(name = "cala_ledger_outbox_client.subscribe", skip(self))]
pub async fn subscribe(
&mut self,
after_sequence: Option<u64>,
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

This file was deleted.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ CREATE TABLE cala_journal_events (
CREATE TABLE cala_outbox_events (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
sequence BIGSERIAL UNIQUE,
payload JSONB NOT NULL,
payload JSONB,
recorded_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

Expand Down
7 changes: 6 additions & 1 deletion cala-ledger/src/outbox/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,12 @@ impl Stream for OutboxListener {
let last_sequence = self.last_returned_sequence;
let buffer_size = self.buffer_size;
self.next_page_handle = Some(tokio::spawn(async move {
repo.load_next_page(last_sequence, buffer_size).await
repo.load_next_page(last_sequence, buffer_size)
.await
.map_err(|e| {
eprintln!("Error loading next page: {:?}", e);
e
})
}));
return self.poll_next(cx);
}
Expand Down
49 changes: 42 additions & 7 deletions cala-ledger/src/outbox/repo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ impl OutboxRepo {

pub async fn highest_known_sequence(&self) -> Result<EventSequence, OutboxError> {
let row =
sqlx::query!(r#"SELECT COALESCE(MAX(sequence), 0) AS "max" FROM cala_outbox_events"#)
sqlx::query!(r#"SELECT COALESCE(MAX(sequence), 0) AS "max!" FROM cala_outbox_events"#)
.fetch_one(&self.pool)
.await?;
Ok(EventSequence::from(row.max.unwrap_or(0) as u64))
Ok(EventSequence::from(row.max as u64))
}

pub async fn persist_events(
Expand Down Expand Up @@ -56,34 +56,41 @@ impl OutboxRepo {

pub async fn load_next_page(
&self,
sequence: EventSequence,
from_sequence: EventSequence,
buffer_size: usize,
) -> Result<Vec<OutboxEvent>, OutboxError> {
let rows = sqlx::query!(
r#"
SELECT
g.seq AS "sequence!: EventSequence",
e.id,
e.id AS "id?",
e.payload AS "payload?",
e.recorded_at AS "recorded_at?"
FROM
generate_series($1 + 1, $1 + $2) AS g(seq)
generate_series($1 + 1,
LEAST($1 + $2, (SELECT MAX(sequence) FROM cala_outbox_events)))
AS g(seq)
LEFT JOIN
cala_outbox_events e ON g.seq = e.sequence
WHERE
g.seq > $1
ORDER BY
g.seq ASC
LIMIT $2"#,
sequence as EventSequence,
from_sequence as EventSequence,
buffer_size as i64,
)
.fetch_all(&self.pool)
.await?;
let mut events = Vec::new();
let mut empty_ids = Vec::new();
for row in rows {
if row.id.is_none() {
empty_ids.push(row.sequence);
continue;
}
events.push(OutboxEvent {
id: OutboxEventId::from(row.id),
id: OutboxEventId::from(row.id.expect("already checked")),
sequence: row.sequence,
payload: row
.payload
Expand All @@ -92,6 +99,34 @@ impl OutboxRepo {
recorded_at: row.recorded_at.unwrap_or_default(),
});
}

if !empty_ids.is_empty() {
let rows = sqlx::query!(
r#"
INSERT INTO cala_outbox_events (sequence)
SELECT unnest($1::bigint[]) AS sequence
ON CONFLICT (sequence) DO UPDATE
SET sequence = EXCLUDED.sequence
RETURNING id, sequence AS "sequence!: EventSequence", payload, recorded_at
"#,
&empty_ids as &[EventSequence]
)
.fetch_all(&self.pool)
.await?;
for row in rows {
events.push(OutboxEvent {
id: OutboxEventId::from(row.id),
sequence: row.sequence,
payload: row
.payload
.map(|p| serde_json::from_value(p).expect("Could not deserialize payload"))
.unwrap_or(OutboxEventPayload::Empty),
recorded_at: row.recorded_at,
});
}
events.sort_by(|a, b| a.sequence.cmp(&b.sequence));
}

Ok(events)
}
}
1 change: 1 addition & 0 deletions cala-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ axum-extra = { workspace = true }
base64 = { workspace = true }
chrono = { workspace = true }
derive_builder = { workspace = true }
futures = { workspace = true }
thiserror = { workspace = true }
clap = { workspace = true }
sqlx = { workspace = true }
Expand Down
8 changes: 7 additions & 1 deletion cala-server/src/import_job/cala_outbox/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use async_trait::async_trait;
use cala_ledger_outbox_client::{
CalaLedgerOutboxClient as Client, CalaLedgerOutboxClientConfig as ClientConfig,
};
use futures::StreamExt;
use tracing::instrument;

use super::runner::ImportJobRunnerDeps;
Expand Down Expand Up @@ -37,8 +38,13 @@ impl JobRunner for CalaOutboxImportJob {
self.config.endpoint
);
let mut client = Client::connect(ClientConfig::from(&self.config)).await?;
let _stream = client.subscribe(None).await?;
let mut stream = client.subscribe(Some(0)).await?;
println!("created stream");
while let Some(event) = stream.next().await {
let message = event?;
println!("message: {:?}", message);
}

tokio::time::sleep(tokio::time::Duration::from_secs(600)).await;
Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion cala-server/src/jobs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ impl JobExecutor {
Ok(())
}

#[instrument(name = "job_executor.start_job", skip(registry, running_jobs))]
#[instrument(name = "job_executor.start_job", skip(registry, running_jobs), err)]
async fn start_job(
registry: &Arc<JobRegistry>,
running_jobs: &Arc<RwLock<HashMap<Uuid, JobHandle>>>,
Expand Down

0 comments on commit b56ac07

Please sign in to comment.