Skip to content

Commit

Permalink
chore: add timestamps to Transaction (#247)
Browse files Browse the repository at this point in the history
  • Loading branch information
bodymindarts authored Oct 10, 2024
1 parent 95b00b7 commit f3a5058
Show file tree
Hide file tree
Showing 19 changed files with 141 additions and 54 deletions.
4 changes: 4 additions & 0 deletions cala-ledger-core-types/src/transaction.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};

use super::primitives::*;
Expand All @@ -6,6 +7,8 @@ use super::primitives::*;
pub struct TransactionValues {
pub id: TransactionId,
pub version: u32,
pub created_at: DateTime<Utc>,
pub modified_at: DateTime<Utc>,
pub journal_id: JournalId,
pub tx_template_id: TxTemplateId,
pub entry_ids: Vec<EntryId>,
Expand All @@ -23,6 +26,7 @@ mod cel {
fn from(tx: &super::TransactionValues) -> Self {
let mut map = CelMap::new();
map.insert("id", tx.id);
map.insert("createdAt", tx.created_at);
map.insert("journalId", tx.journal_id);
map.insert("txTemplateId", tx.tx_template_id);
map.insert("effective", tx.effective);
Expand Down
2 changes: 1 addition & 1 deletion cala-ledger-outbox-client/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
tonic_build::configure()
.extern_path(".google.protobuf.Struct", "::prost_wkt_types::Struct")
.extern_path(".google.protobuf.Timestamp", "::prost_wkt_types::Timestamp")
.compile(&["proto/ledger/outbox_service.proto"], &["proto"])?;
.compile_protos(&["proto/ledger/outbox_service.proto"], &["proto"])?;
Ok(())
}
8 changes: 8 additions & 0 deletions cala-ledger-outbox-client/src/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,8 @@ impl TryFrom<proto::Transaction> for TransactionValues {
proto::Transaction {
id,
version,
created_at,
modified_at,
journal_id,
tx_template_id,
entry_ids,
Expand All @@ -435,6 +437,12 @@ impl TryFrom<proto::Transaction> for TransactionValues {
let res = Self {
id: id.parse()?,
version,
created_at: created_at
.ok_or(CalaLedgerOutboxClientError::MissingField)?
.into(),
modified_at: modified_at
.ok_or(CalaLedgerOutboxClientError::MissingField)?
.into(),
journal_id: journal_id.parse()?,
tx_template_id: tx_template_id.parse()?,
entry_ids: entry_ids
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

This file was deleted.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion cala-ledger/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,6 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
tonic_build::configure()
.extern_path(".google.protobuf.Struct", "::prost_wkt_types::Struct")
.extern_path(".google.protobuf.Timestamp", "::prost_wkt_types::Timestamp")
.compile(&["proto/ledger/outbox_service.proto"], &["proto"])?;
.compile_protos(&["proto/ledger/outbox_service.proto"], &["proto"])?;
Ok(())
}
11 changes: 10 additions & 1 deletion cala-ledger/src/atomic_operation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,24 @@ use sqlx::{PgPool, Postgres, Transaction};
use crate::outbox::*;

pub struct AtomicOperation<'t> {
pub(crate) now: chrono::DateTime<chrono::Utc>,

tx: Transaction<'t, Postgres>,
outbox: Outbox,
accumulated_events: Vec<OutboxEventPayload>,
}

impl<'t> AtomicOperation<'t> {
pub(crate) async fn init(pool: &PgPool, outbox: &Outbox) -> Result<Self, sqlx::Error> {
let mut tx = pool.begin().await?;
let now = sqlx::query!("SELECT NOW()")
.fetch_one(&mut *tx)
.await?
.now
.expect("NOW() is not NULL");
Ok(Self {
tx: pool.begin().await?,
tx,
now,
outbox: outbox.clone(),
accumulated_events: Vec::new(),
})
Expand Down
8 changes: 8 additions & 0 deletions cala-ledger/src/entity/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,14 @@ where
self.persist_inner(db, None, None).await
}

pub async fn persist_at(
&mut self,
db: &mut sqlx::Transaction<'_, sqlx::Postgres>,
recorded_at: DateTime<Utc>,
) -> Result<usize, sqlx::Error> {
self.persist_inner(db, None, Some(recorded_at)).await
}

pub async fn persisted_at(
&mut self,
db: &mut sqlx::Transaction<'_, sqlx::Postgres>,
Expand Down
2 changes: 1 addition & 1 deletion cala-ledger/src/ledger/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ impl CalaLedger {
) -> Result<Transaction, LedgerError> {
let prepared_tx = self
.tx_templates
.prepare_transaction(tx_id, tx_template_code, params.into())
.prepare_transaction(op.now, tx_id, tx_template_code, params.into())
.await?;

let transaction = self
Expand Down
4 changes: 4 additions & 0 deletions cala-ledger/src/outbox/server/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,8 @@ impl From<TransactionValues> for proto::Transaction {
TransactionValues {
id,
version,
created_at,
modified_at,
journal_id,
tx_template_id,
correlation_id,
Expand All @@ -403,6 +405,8 @@ impl From<TransactionValues> for proto::Transaction {
proto::Transaction {
id: id.to_string(),
version,
created_at: Some(created_at.into()),
modified_at: Some(modified_at.into()),
journal_id: journal_id.to_string(),
tx_template_id: tx_template_id.to_string(),
entry_ids: entry_ids.into_iter().map(|id| id.to_string()).collect(),
Expand Down
6 changes: 6 additions & 0 deletions cala-ledger/src/transaction/entity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ impl TryFrom<EntityEvents<TransactionEvent>> for Transaction {
pub(crate) struct NewTransaction {
#[builder(setter(custom))]
pub(super) id: TransactionId,
pub(super) created_at: chrono::DateTime<chrono::Utc>,
#[builder(setter(into))]
pub(super) journal_id: JournalId,
#[builder(setter(into))]
Expand All @@ -123,6 +124,7 @@ impl NewTransaction {
pub fn builder() -> NewTransactionBuilder {
NewTransactionBuilder::default()
}

#[allow(dead_code)]
pub(super) fn initial_events(self) -> EntityEvents<TransactionEvent> {
EntityEvents::init(
Expand All @@ -131,6 +133,8 @@ impl NewTransaction {
values: TransactionValues {
id: self.id,
version: 1,
created_at: self.created_at,
modified_at: self.created_at,
journal_id: self.journal_id,
tx_template_id: self.tx_template_id,
effective: self.effective,
Expand Down Expand Up @@ -164,6 +168,7 @@ mod tests {
let id = uuid::Uuid::new_v4();
let new_transaction = NewTransaction::builder()
.id(id)
.created_at(chrono::Utc::now())
.journal_id(uuid::Uuid::new_v4())
.tx_template_id(uuid::Uuid::new_v4())
.entry_ids(vec![EntryId::new()])
Expand All @@ -185,6 +190,7 @@ mod tests {
use serde_json::json;
let new_transaction = NewTransaction::builder()
.id(uuid::Uuid::new_v4())
.created_at(chrono::Utc::now())
.journal_id(uuid::Uuid::new_v4())
.tx_template_id(uuid::Uuid::new_v4())
.effective(chrono::NaiveDate::default())
Expand Down
8 changes: 5 additions & 3 deletions cala-ledger/src/transaction/repo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,12 @@ impl TransactionRepo {
db: &mut DbTransaction<'_, Postgres>,
new_transaction: NewTransaction,
) -> Result<Transaction, TransactionError> {
let created_at = new_transaction.created_at;
sqlx::query!(
r#"INSERT INTO cala_transactions (id, journal_id, tx_template_id, correlation_id, external_id)
VALUES ($1, $2, $3, $4, $5)"#,
r#"INSERT INTO cala_transactions (id, created_at, journal_id, tx_template_id, correlation_id, external_id)
VALUES ($1, $2, $3, $4, $5, $6)"#,
new_transaction.id as TransactionId,
created_at,
new_transaction.journal_id as JournalId,
new_transaction.tx_template_id as TxTemplateId,
new_transaction.correlation_id,
Expand All @@ -40,7 +42,7 @@ impl TransactionRepo {
.execute(&mut **db)
.await?;
let mut events = new_transaction.initial_events();
events.persist(db).await?;
events.persist_at(db, created_at).await?;
let transaction = Transaction::try_from(events)?;
Ok(transaction)
}
Expand Down
6 changes: 3 additions & 3 deletions cala-ledger/src/tx_template/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@ mod repo;

pub mod error;

use chrono::NaiveDate;
#[cfg(feature = "import")]
use chrono::{DateTime, Utc};
use chrono::{DateTime, NaiveDate, Utc};
use rust_decimal::Decimal;
use sqlx::PgPool;
use std::collections::HashMap;
Expand Down Expand Up @@ -86,6 +84,7 @@ impl TxTemplates {
)]
pub(crate) async fn prepare_transaction(
&self,
time: DateTime<Utc>,
tx_id: TransactionId,
code: &str,
params: Params,
Expand All @@ -101,6 +100,7 @@ impl TxTemplates {
let mut tx_builder = NewTransaction::builder();
tx_builder
.id(tx_id)
.created_at(time)
.tx_template_id(tmpl.id)
.entry_ids(entries.iter().map(|e| e.id).collect());

Expand Down
2 changes: 2 additions & 0 deletions cala-ledger/src/velocity/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ mod tests {
TransactionValues {
id: TransactionId::new(),
version: 1,
created_at: chrono::Utc::now(),
modified_at: chrono::Utc::now(),
journal_id: JournalId::new(),
tx_template_id: TxTemplateId::new(),
entry_ids: vec![],
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

This file was deleted.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 10 additions & 8 deletions proto/ledger/outbox_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -199,14 +199,16 @@ message TransactionCreated {
message Transaction {
string id = 1;
uint32 version = 2;
string journal_id = 3;
string tx_template_id = 4;
repeated string entry_ids = 7;
string effective = 5;
string correlation_id = 6;
optional string external_id = 8;
optional string description = 9;
optional google.protobuf.Struct metadata = 10;
google.protobuf.Timestamp created_at = 3;
google.protobuf.Timestamp modified_at = 4;
string journal_id = 5;
string tx_template_id = 6;
string effective = 7;
string correlation_id = 8;
repeated string entry_ids = 9;
optional string external_id = 10;
optional string description = 11;
optional google.protobuf.Struct metadata = 12;
}

message EntryCreated {
Expand Down

0 comments on commit f3a5058

Please sign in to comment.