diff --git a/Cargo.lock b/Cargo.lock index 5759d3ec..e4bade71 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -696,6 +696,7 @@ dependencies = [ "chrono", "clap", "derive_builder", + "futures", "serde", "serde_json", "serde_with", diff --git a/cala-ledger-outbox-client/src/client.rs b/cala-ledger-outbox-client/src/client.rs index 2b3fe2ec..5db418d0 100644 --- a/cala-ledger-outbox-client/src/client.rs +++ b/cala-ledger-outbox-client/src/client.rs @@ -27,7 +27,7 @@ impl CalaLedgerOutboxClient { }) } - #[instrument(name = "cala_ledger.outbox_client.subscribe", skip(self))] + #[instrument(name = "cala_ledger_outbox_client.subscribe", skip(self))] pub async fn subscribe( &mut self, after_sequence: Option, diff --git a/cala-ledger/.sqlx/query-17f105ace5fb84fd62b400e2fc4c4480ca301daed55a93b7d34335c9a4ac6ae6.json b/cala-ledger/.sqlx/query-17f105ace5fb84fd62b400e2fc4c4480ca301daed55a93b7d34335c9a4ac6ae6.json new file mode 100644 index 00000000..610e92d1 --- /dev/null +++ b/cala-ledger/.sqlx/query-17f105ace5fb84fd62b400e2fc4c4480ca301daed55a93b7d34335c9a4ac6ae6.json @@ -0,0 +1,41 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT\n g.seq AS \"sequence!: EventSequence\",\n e.id AS \"id?\",\n e.payload AS \"payload?\",\n e.recorded_at AS \"recorded_at?\"\n FROM\n generate_series($1 + 1,\n LEAST($1 + $2, (SELECT MAX(sequence) FROM cala_outbox_events)))\n AS g(seq)\n LEFT JOIN\n cala_outbox_events e ON g.seq = e.sequence\n WHERE\n g.seq > $1\n ORDER BY\n g.seq ASC\n LIMIT $2", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "sequence!: EventSequence", + "type_info": "Int8" + }, + { + "ordinal": 1, + "name": "id?", + "type_info": "Uuid" + }, + { + "ordinal": 2, + "name": "payload?", + "type_info": "Jsonb" + }, + { + "ordinal": 3, + "name": "recorded_at?", + "type_info": "Timestamptz" + } + ], + "parameters": { + "Left": [ + "Int4", + "Int4" + ] + }, + "nullable": [ + null, + false, + true, + false + ] + }, + "hash": "17f105ace5fb84fd62b400e2fc4c4480ca301daed55a93b7d34335c9a4ac6ae6" +} diff --git a/cala-ledger/.sqlx/query-2e0d2a9cb0f712b9aa694eb218d9526a3bb21f197440f2741f6bfd0adccd822f.json b/cala-ledger/.sqlx/query-2f03e709d0ae22f223c62907dfa09942d115792af4d92ebc6835a4b4dcd1b140.json similarity index 54% rename from cala-ledger/.sqlx/query-2e0d2a9cb0f712b9aa694eb218d9526a3bb21f197440f2741f6bfd0adccd822f.json rename to cala-ledger/.sqlx/query-2f03e709d0ae22f223c62907dfa09942d115792af4d92ebc6835a4b4dcd1b140.json index 2801d317..53b0b19f 100644 --- a/cala-ledger/.sqlx/query-2e0d2a9cb0f712b9aa694eb218d9526a3bb21f197440f2741f6bfd0adccd822f.json +++ b/cala-ledger/.sqlx/query-2f03e709d0ae22f223c62907dfa09942d115792af4d92ebc6835a4b4dcd1b140.json @@ -1,11 +1,11 @@ { "db_name": "PostgreSQL", - "query": "SELECT COALESCE(MAX(sequence), 0) AS \"max\" FROM cala_outbox_events", + "query": "SELECT COALESCE(MAX(sequence), 0) AS \"max!\" FROM cala_outbox_events", "describe": { "columns": [ { "ordinal": 0, - "name": "max", + "name": "max!", "type_info": "Int8" } ], @@ -16,5 +16,5 @@ null ] }, - "hash": "2e0d2a9cb0f712b9aa694eb218d9526a3bb21f197440f2741f6bfd0adccd822f" + "hash": "2f03e709d0ae22f223c62907dfa09942d115792af4d92ebc6835a4b4dcd1b140" } diff --git a/cala-ledger/.sqlx/query-40ef9cf667f0acfd014da9e9d99af2d67d02c36c41d29df4b7f47dd48e72f26a.json b/cala-ledger/.sqlx/query-40ef9cf667f0acfd014da9e9d99af2d67d02c36c41d29df4b7f47dd48e72f26a.json deleted file mode 100644 index be291b13..00000000 --- a/cala-ledger/.sqlx/query-40ef9cf667f0acfd014da9e9d99af2d67d02c36c41d29df4b7f47dd48e72f26a.json +++ /dev/null @@ -1,41 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n SELECT\n g.seq AS \"sequence!: EventSequence\",\n e.id,\n e.payload AS \"payload?\",\n e.recorded_at AS \"recorded_at?\"\n FROM\n generate_series($1 + 1, $1 + $2) AS g(seq)\n LEFT JOIN\n cala_outbox_events e ON g.seq = e.sequence\n WHERE\n g.seq > $1\n ORDER BY\n g.seq ASC\n LIMIT $2", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "sequence!: EventSequence", - "type_info": "Int4" - }, - { - "ordinal": 1, - "name": "id", - "type_info": "Uuid" - }, - { - "ordinal": 2, - "name": "payload?", - "type_info": "Jsonb" - }, - { - "ordinal": 3, - "name": "recorded_at?", - "type_info": "Timestamptz" - } - ], - "parameters": { - "Left": [ - "Int4", - "Int4" - ] - }, - "nullable": [ - null, - false, - false, - false - ] - }, - "hash": "40ef9cf667f0acfd014da9e9d99af2d67d02c36c41d29df4b7f47dd48e72f26a" -} diff --git a/cala-ledger/.sqlx/query-e983a467a77dbf5c21aa33f0ff7e70ff74e62037c3a84f74f8836756f610d333.json b/cala-ledger/.sqlx/query-e983a467a77dbf5c21aa33f0ff7e70ff74e62037c3a84f74f8836756f610d333.json new file mode 100644 index 00000000..601b3aa8 --- /dev/null +++ b/cala-ledger/.sqlx/query-e983a467a77dbf5c21aa33f0ff7e70ff74e62037c3a84f74f8836756f610d333.json @@ -0,0 +1,40 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO cala_outbox_events (sequence)\n SELECT unnest($1::bigint[]) AS sequence\n ON CONFLICT (sequence) DO UPDATE\n SET sequence = EXCLUDED.sequence\n RETURNING id, sequence AS \"sequence!: EventSequence\", payload, recorded_at\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id", + "type_info": "Uuid" + }, + { + "ordinal": 1, + "name": "sequence!: EventSequence", + "type_info": "Int8" + }, + { + "ordinal": 2, + "name": "payload", + "type_info": "Jsonb" + }, + { + "ordinal": 3, + "name": "recorded_at", + "type_info": "Timestamptz" + } + ], + "parameters": { + "Left": [ + "Int8Array" + ] + }, + "nullable": [ + false, + false, + true, + false + ] + }, + "hash": "e983a467a77dbf5c21aa33f0ff7e70ff74e62037c3a84f74f8836756f610d333" +} diff --git a/cala-ledger/migrations/20231208110808_cala_ledger_setup.sql b/cala-ledger/migrations/20231208110808_cala_ledger_setup.sql index 9e5b5346..9e1d8fdf 100644 --- a/cala-ledger/migrations/20231208110808_cala_ledger_setup.sql +++ b/cala-ledger/migrations/20231208110808_cala_ledger_setup.sql @@ -54,7 +54,7 @@ CREATE TABLE cala_journal_events ( CREATE TABLE cala_outbox_events ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), sequence BIGSERIAL UNIQUE, - payload JSONB NOT NULL, + payload JSONB, recorded_at TIMESTAMPTZ NOT NULL DEFAULT NOW() ); diff --git a/cala-ledger/src/outbox/listener.rs b/cala-ledger/src/outbox/listener.rs index 6adbf4c4..eb73ffd0 100644 --- a/cala-ledger/src/outbox/listener.rs +++ b/cala-ledger/src/outbox/listener.rs @@ -106,7 +106,12 @@ impl Stream for OutboxListener { let last_sequence = self.last_returned_sequence; let buffer_size = self.buffer_size; self.next_page_handle = Some(tokio::spawn(async move { - repo.load_next_page(last_sequence, buffer_size).await + repo.load_next_page(last_sequence, buffer_size) + .await + .map_err(|e| { + eprintln!("Error loading next page: {:?}", e); + e + }) })); return self.poll_next(cx); } diff --git a/cala-ledger/src/outbox/repo.rs b/cala-ledger/src/outbox/repo.rs index 5f4bbca0..d38f4f7b 100644 --- a/cala-ledger/src/outbox/repo.rs +++ b/cala-ledger/src/outbox/repo.rs @@ -14,10 +14,10 @@ impl OutboxRepo { pub async fn highest_known_sequence(&self) -> Result { let row = - sqlx::query!(r#"SELECT COALESCE(MAX(sequence), 0) AS "max" FROM cala_outbox_events"#) + sqlx::query!(r#"SELECT COALESCE(MAX(sequence), 0) AS "max!" FROM cala_outbox_events"#) .fetch_one(&self.pool) .await?; - Ok(EventSequence::from(row.max.unwrap_or(0) as u64)) + Ok(EventSequence::from(row.max as u64)) } pub async fn persist_events( @@ -56,18 +56,20 @@ impl OutboxRepo { pub async fn load_next_page( &self, - sequence: EventSequence, + from_sequence: EventSequence, buffer_size: usize, ) -> Result, OutboxError> { let rows = sqlx::query!( r#" SELECT g.seq AS "sequence!: EventSequence", - e.id, + e.id AS "id?", e.payload AS "payload?", e.recorded_at AS "recorded_at?" FROM - generate_series($1 + 1, $1 + $2) AS g(seq) + generate_series($1 + 1, + LEAST($1 + $2, (SELECT MAX(sequence) FROM cala_outbox_events))) + AS g(seq) LEFT JOIN cala_outbox_events e ON g.seq = e.sequence WHERE @@ -75,15 +77,20 @@ impl OutboxRepo { ORDER BY g.seq ASC LIMIT $2"#, - sequence as EventSequence, + from_sequence as EventSequence, buffer_size as i64, ) .fetch_all(&self.pool) .await?; let mut events = Vec::new(); + let mut empty_ids = Vec::new(); for row in rows { + if row.id.is_none() { + empty_ids.push(row.sequence); + continue; + } events.push(OutboxEvent { - id: OutboxEventId::from(row.id), + id: OutboxEventId::from(row.id.expect("already checked")), sequence: row.sequence, payload: row .payload @@ -92,6 +99,34 @@ impl OutboxRepo { recorded_at: row.recorded_at.unwrap_or_default(), }); } + + if !empty_ids.is_empty() { + let rows = sqlx::query!( + r#" + INSERT INTO cala_outbox_events (sequence) + SELECT unnest($1::bigint[]) AS sequence + ON CONFLICT (sequence) DO UPDATE + SET sequence = EXCLUDED.sequence + RETURNING id, sequence AS "sequence!: EventSequence", payload, recorded_at + "#, + &empty_ids as &[EventSequence] + ) + .fetch_all(&self.pool) + .await?; + for row in rows { + events.push(OutboxEvent { + id: OutboxEventId::from(row.id), + sequence: row.sequence, + payload: row + .payload + .map(|p| serde_json::from_value(p).expect("Could not deserialize payload")) + .unwrap_or(OutboxEventPayload::Empty), + recorded_at: row.recorded_at, + }); + } + events.sort_by(|a, b| a.sequence.cmp(&b.sequence)); + } + Ok(events) } } diff --git a/cala-server/Cargo.toml b/cala-server/Cargo.toml index ad178fb7..c758bf95 100644 --- a/cala-server/Cargo.toml +++ b/cala-server/Cargo.toml @@ -20,6 +20,7 @@ axum-extra = { workspace = true } base64 = { workspace = true } chrono = { workspace = true } derive_builder = { workspace = true } +futures = { workspace = true } thiserror = { workspace = true } clap = { workspace = true } sqlx = { workspace = true } diff --git a/cala-server/src/import_job/cala_outbox/mod.rs b/cala-server/src/import_job/cala_outbox/mod.rs index 1735e67c..f2016470 100644 --- a/cala-server/src/import_job/cala_outbox/mod.rs +++ b/cala-server/src/import_job/cala_outbox/mod.rs @@ -5,6 +5,7 @@ use async_trait::async_trait; use cala_ledger_outbox_client::{ CalaLedgerOutboxClient as Client, CalaLedgerOutboxClientConfig as ClientConfig, }; +use futures::StreamExt; use tracing::instrument; use super::runner::ImportJobRunnerDeps; @@ -37,8 +38,13 @@ impl JobRunner for CalaOutboxImportJob { self.config.endpoint ); let mut client = Client::connect(ClientConfig::from(&self.config)).await?; - let _stream = client.subscribe(None).await?; + let mut stream = client.subscribe(Some(0)).await?; println!("created stream"); + while let Some(event) = stream.next().await { + let message = event?; + println!("message: {:?}", message); + } + tokio::time::sleep(tokio::time::Duration::from_secs(600)).await; Ok(()) } diff --git a/cala-server/src/jobs/mod.rs b/cala-server/src/jobs/mod.rs index eb30a881..d1f43ef0 100644 --- a/cala-server/src/jobs/mod.rs +++ b/cala-server/src/jobs/mod.rs @@ -156,7 +156,7 @@ impl JobExecutor { Ok(()) } - #[instrument(name = "job_executor.start_job", skip(registry, running_jobs))] + #[instrument(name = "job_executor.start_job", skip(registry, running_jobs), err)] async fn start_job( registry: &Arc, running_jobs: &Arc>>,