From f0e8ca441c9d785e4e1786bd71c1268a8fcbf06c Mon Sep 17 00:00:00 2001 From: bodymindarts Date: Wed, 2 Oct 2024 10:25:18 +0200 Subject: [PATCH 01/14] chore: enforcement boilerplate --- cala-ledger/src/ledger/error.rs | 4 ++- cala-ledger/src/ledger/mod.rs | 10 +++++++ .../src/velocity/account_control/mod.rs | 20 ++++++++++--- .../src/velocity/account_control/repo.rs | 26 +++++++++++++++++ .../src/velocity/account_control/value.rs | 4 ++- cala-ledger/src/velocity/control/entity.rs | 4 +++ cala-ledger/src/velocity/control/repo.rs | 29 +++++++++++++++++++ cala-ledger/src/velocity/error.rs | 4 +++ cala-ledger/src/velocity/limit/repo.rs | 3 +- cala-ledger/src/velocity/mod.rs | 28 ++++++++++++++++-- 10 files changed, 122 insertions(+), 10 deletions(-) diff --git a/cala-ledger/src/ledger/error.rs b/cala-ledger/src/ledger/error.rs index c90d862f..d0da9caa 100644 --- a/cala-ledger/src/ledger/error.rs +++ b/cala-ledger/src/ledger/error.rs @@ -5,7 +5,7 @@ use crate::{ account::error::AccountError, account_set::error::AccountSetError, balance::error::BalanceError, entry::error::EntryError, journal::error::JournalError, outbox::server::error::OutboxServerError, transaction::error::TransactionError, - tx_template::error::TxTemplateError, + tx_template::error::TxTemplateError, velocity::error::VelocityError, }; #[derive(Error, Debug)] @@ -34,6 +34,8 @@ pub enum LedgerError { EntryError(#[from] EntryError), #[error("LedgerError - BalanceError: {0}")] BalanceError(#[from] BalanceError), + #[error("LedgerError - VelocityError: {0}")] + VelocityError(#[from] VelocityError), } impl From for LedgerError { diff --git a/cala-ledger/src/ledger/mod.rs b/cala-ledger/src/ledger/mod.rs index 2caa6e46..c05f534f 100644 --- a/cala-ledger/src/ledger/mod.rs +++ b/cala-ledger/src/ledger/mod.rs @@ -186,6 +186,16 @@ impl CalaLedger { .fetch_mappings(transaction.values().journal_id, &account_ids) .await?; + self.velocities + .update_balances_in_op( + op, + transaction.created_at(), + transaction.values(), + &entries, + &account_ids, + ) + .await?; + self.balances .update_balances_in_op( op, diff --git a/cala-ledger/src/velocity/account_control/mod.rs b/cala-ledger/src/velocity/account_control/mod.rs index 98988496..cf1c4908 100644 --- a/cala-ledger/src/velocity/account_control/mod.rs +++ b/cala-ledger/src/velocity/account_control/mod.rs @@ -4,12 +4,14 @@ mod value; use rust_decimal::Decimal; use sqlx::PgPool; +use std::collections::HashMap; + use crate::{ atomic_operation::*, param::Params, - primitives::{AccountId, DebitOrCredit, Layer, VelocityControlId}, + primitives::{AccountId, DebitOrCredit, Layer}, }; -use cala_types::velocity::VelocityLimitValues; +use cala_types::velocity::{VelocityControlValues, VelocityLimitValues}; use super::error::VelocityError; @@ -33,7 +35,7 @@ impl AccountControls { pub async fn attach_control_in_op( &self, op: &mut AtomicOperation<'_>, - control: VelocityControlId, + control: VelocityControlValues, account_id: AccountId, limits: Vec, params: impl Into + std::fmt::Debug, @@ -70,7 +72,9 @@ impl AccountControls { let control = AccountVelocityControl { account_id, - control_id: control, + control_id: control.id, + condition: control.condition, + enforcement: control.enforcement, velocity_limits, }; @@ -78,4 +82,12 @@ impl AccountControls { Ok(()) } + + pub async fn find_for_enforcement( + &self, + op: &mut AtomicOperation<'_>, + account_ids: &[AccountId], + ) -> Result>, VelocityError> { + self.repo.find_for_enforcement(op.tx(), account_ids).await + } } diff --git a/cala-ledger/src/velocity/account_control/repo.rs b/cala-ledger/src/velocity/account_control/repo.rs index 87dfb3e6..81f6e1a6 100644 --- a/cala-ledger/src/velocity/account_control/repo.rs +++ b/cala-ledger/src/velocity/account_control/repo.rs @@ -1,5 +1,7 @@ use sqlx::{PgPool, Postgres, Transaction}; +use std::collections::HashMap; + use crate::primitives::{AccountId, VelocityControlId}; use super::{super::error::*, value::*}; @@ -32,4 +34,28 @@ impl AccountControlRepo { .await?; Ok(()) } + + pub async fn find_for_enforcement( + &self, + db: &mut Transaction<'_, Postgres>, + account_ids: &[AccountId], + ) -> Result>, VelocityError> { + let rows = sqlx::query!( + r#"SELECT values FROM cala_velocity_account_controls + WHERE data_source_id = '00000000-0000-0000-0000-000000000000' AND account_id = ANY($1)"#, + account_ids as &[AccountId], + ) + .fetch_all(&mut **db) + .await?; + + let mut res: HashMap> = HashMap::new(); + + for row in rows { + let values: AccountVelocityControl = + serde_json::from_value(row.values).expect("Failed to deserialize control values"); + res.entry(values.account_id).or_default().push(values); + } + + Ok(res) + } } diff --git a/cala-ledger/src/velocity/account_control/value.rs b/cala-ledger/src/velocity/account_control/value.rs index 1938658a..ad320129 100644 --- a/cala-ledger/src/velocity/account_control/value.rs +++ b/cala-ledger/src/velocity/account_control/value.rs @@ -5,12 +5,14 @@ use serde::{Deserialize, Serialize}; use crate::primitives::{ AccountId, Currency, DebitOrCredit, Layer, VelocityControlId, VelocityLimitId, }; -use cala_types::velocity::PartitionKey; +use cala_types::velocity::{PartitionKey, VelocityEnforcement}; #[derive(Clone, Debug, Serialize, Deserialize)] pub struct AccountVelocityControl { pub account_id: AccountId, pub control_id: VelocityControlId, + pub enforcement: VelocityEnforcement, + pub condition: Option, pub velocity_limits: Vec, } diff --git a/cala-ledger/src/velocity/control/entity.rs b/cala-ledger/src/velocity/control/entity.rs index 30fb30bd..c1f8e755 100644 --- a/cala-ledger/src/velocity/control/entity.rs +++ b/cala-ledger/src/velocity/control/entity.rs @@ -32,6 +32,10 @@ impl VelocityControl { pub fn id(&self) -> VelocityControlId { self.values.id } + + pub fn into_values(self) -> VelocityControlValues { + self.values + } } impl Entity for VelocityControl { diff --git a/cala-ledger/src/velocity/control/repo.rs b/cala-ledger/src/velocity/control/repo.rs index 8444f4d6..eec4e7f2 100644 --- a/cala-ledger/src/velocity/control/repo.rs +++ b/cala-ledger/src/velocity/control/repo.rs @@ -33,4 +33,33 @@ impl VelocityControlRepo { let control = VelocityControl::try_from(events)?; Ok(control) } + + pub async fn find_by_id( + &self, + db: &mut Transaction<'_, Postgres>, + id: VelocityControlId, + ) -> Result { + let rows = sqlx::query_as!( + GenericEvent, + r#"SELECT c.id, e.sequence, e.event, + c.created_at AS entity_created_at, e.recorded_at AS event_recorded_at + FROM cala_velocity_controls c + JOIN cala_velocity_control_events e + ON c.data_source_id = e.data_source_id + AND c.id = e.id + WHERE c.data_source_id = '00000000-0000-0000-0000-000000000000' + AND c.id = $1 + ORDER BY e.sequence"#, + id as VelocityControlId, + ) + .fetch_all(&mut **db) + .await?; + match EntityEvents::load_first(rows) { + Ok(account) => Ok(account), + Err(EntityError::NoEntityEventsPresent) => { + Err(VelocityError::CouldNotFindControlById(id)) + } + Err(e) => Err(e.into()), + } + } } diff --git a/cala-ledger/src/velocity/error.rs b/cala-ledger/src/velocity/error.rs index 3c67edcf..fa2227c3 100644 --- a/cala-ledger/src/velocity/error.rs +++ b/cala-ledger/src/velocity/error.rs @@ -2,6 +2,8 @@ use thiserror::Error; use cel_interpreter::CelError; +use crate::primitives::VelocityControlId; + #[derive(Error, Debug)] pub enum VelocityError { #[error("VelocityError - Sqlx: {0}")] @@ -12,4 +14,6 @@ pub enum VelocityError { CelError(#[from] CelError), #[error("{0}")] ParamError(#[from] crate::param::error::ParamError), + #[error("VelocityError - Could not find control by id: {0}")] + CouldNotFindControlById(VelocityControlId), } diff --git a/cala-ledger/src/velocity/limit/repo.rs b/cala-ledger/src/velocity/limit/repo.rs index 06163dda..bf8544f6 100644 --- a/cala-ledger/src/velocity/limit/repo.rs +++ b/cala-ledger/src/velocity/limit/repo.rs @@ -52,6 +52,7 @@ impl VelocityLimitRepo { pub async fn list_for_control( &self, + db: &mut Transaction<'_, Postgres>, control: VelocityControlId, ) -> Result, VelocityError> { let rows = sqlx::query_as!( @@ -71,7 +72,7 @@ impl VelocityLimitRepo { ORDER BY l.id, e.sequence"#, control as VelocityControlId, ) - .fetch_all(&self.pool) + .fetch_all(&mut **db) .await?; let n = rows.len(); let ret = EntityEvents::load_n(rows, n)? diff --git a/cala-ledger/src/velocity/mod.rs b/cala-ledger/src/velocity/mod.rs index cff117f5..751dbbbe 100644 --- a/cala-ledger/src/velocity/mod.rs +++ b/cala-ledger/src/velocity/mod.rs @@ -3,6 +3,8 @@ mod control; pub mod error; mod limit; +use cala_types::{entry::EntryValues, transaction::TransactionValues}; +use chrono::{DateTime, Utc}; use sqlx::PgPool; pub use crate::param::Params; @@ -108,14 +110,34 @@ impl Velocities { pub async fn attach_control_to_account_in_op( &self, op: &mut AtomicOperation<'_>, - control: VelocityControlId, + control_id: VelocityControlId, account_id: AccountId, params: impl Into + std::fmt::Debug, ) -> Result<(), VelocityError> { - let limits = self.limits.list_for_control(control).await?; + let control = self.controls.find_by_id(op.tx(), control_id).await?; + let limits = self.limits.list_for_control(op.tx(), control_id).await?; self.account_controls - .attach_control_in_op(op, control, account_id, limits, params) + .attach_control_in_op(op, control.into_values(), account_id, limits, params) + .await?; + Ok(()) + } + + pub(crate) async fn update_balances_in_op( + &self, + op: &mut AtomicOperation<'_>, + created_at: DateTime, + transaction: &TransactionValues, + entries: &Vec, + account_ids: &[AccountId], + ) -> Result<(), VelocityError> { + let controls = self + .account_controls + .find_for_enforcement(op, account_ids) .await?; + + for control in controls { + // + } Ok(()) } } From 8c14a7704efa054e0eb64f09619901338e613b78 Mon Sep 17 00:00:00 2001 From: bodymindarts Date: Wed, 2 Oct 2024 14:46:07 +0200 Subject: [PATCH 02/14] chore: check control / limit condition --- cala-cel-interpreter/src/value.rs | 16 ++++++++++ cala-ledger-core-types/src/entry.rs | 18 +++++++++++ cala-ledger-core-types/src/primitives.rs | 25 +++++++++++++++ cala-ledger-core-types/src/transaction.rs | 19 +++++++++++ cala-ledger/src/cel_context.rs | 2 +- cala-ledger/src/velocity/context.rs | 39 +++++++++++++++++++++++ cala-ledger/src/velocity/mod.rs | 38 +++++++++++++++++++--- cala-ledger/tests/velocity.rs | 4 ++- 8 files changed, 155 insertions(+), 6 deletions(-) create mode 100644 cala-ledger/src/velocity/context.rs diff --git a/cala-cel-interpreter/src/value.rs b/cala-cel-interpreter/src/value.rs index 19d15417..d96f35b6 100644 --- a/cala-cel-interpreter/src/value.rs +++ b/cala-cel-interpreter/src/value.rs @@ -269,6 +269,22 @@ impl<'a> TryFrom<&'a CelValue> for &'a Decimal { } } +impl<'a> TryFrom> for bool { + type Error = ResultCoercionError; + + fn try_from(CelResult { expr, val }: CelResult) -> Result { + if let CelValue::Bool(b) = val { + Ok(b) + } else { + Err(ResultCoercionError::BadCoreTypeCoercion( + format!("{expr:?}"), + CelType::from(&val), + CelType::Bool, + )) + } + } +} + impl<'a> TryFrom> for NaiveDate { type Error = ResultCoercionError; diff --git a/cala-ledger-core-types/src/entry.rs b/cala-ledger-core-types/src/entry.rs index cb68fa26..258c9b01 100644 --- a/cala-ledger-core-types/src/entry.rs +++ b/cala-ledger-core-types/src/entry.rs @@ -18,3 +18,21 @@ pub struct EntryValues { pub direction: DebitOrCredit, pub description: Option, } + +mod cel { + use cel_interpreter::{CelMap, CelValue}; + + impl From<&super::EntryValues> for CelValue { + fn from(entry: &super::EntryValues) -> Self { + let mut map = CelMap::new(); + map.insert("id", entry.id); + map.insert("entry_type", entry.entry_type.clone()); + map.insert("sequence", CelValue::UInt(entry.sequence as u64)); + map.insert("layer", entry.layer); + map.insert("direction", entry.direction); + map.insert("units", entry.units); + map.insert("currency", entry.currency); + map.into() + } + } +} diff --git a/cala-ledger-core-types/src/primitives.rs b/cala-ledger-core-types/src/primitives.rs index 2b7c4b3c..aa014f68 100644 --- a/cala-ledger-core-types/src/primitives.rs +++ b/cala-ledger-core-types/src/primitives.rs @@ -57,6 +57,15 @@ impl<'a> TryFrom> for DebitOrCredit { } } +impl Into for DebitOrCredit { + fn into(self) -> CelValue { + match self { + DebitOrCredit::Debit => "DEBIT".into(), + DebitOrCredit::Credit => "CREDIT".into(), + } + } +} + #[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq, sqlx::Type)] #[sqlx(type_name = "Status", rename_all = "snake_case")] #[serde(rename_all = "snake_case")] @@ -108,6 +117,16 @@ impl Default for Layer { } } +impl Into for Layer { + fn into(self) -> CelValue { + match self { + Layer::Settled => "SETTLED".into(), + Layer::Pending => "PENDING".into(), + Layer::Encumbrance => "ENCUMBRANCE".into(), + } + } +} + #[derive(Debug, Clone, Copy, Eq, Serialize, Deserialize)] #[serde(try_from = "String")] #[serde(into = "&str")] @@ -131,6 +150,12 @@ impl std::fmt::Display for Currency { } } +impl Into for Currency { + fn into(self) -> CelValue { + self.code().into() + } +} + impl std::hash::Hash for Currency { fn hash(&self, state: &mut H) { self.code().hash(state); diff --git a/cala-ledger-core-types/src/transaction.rs b/cala-ledger-core-types/src/transaction.rs index 7e33e409..b69e72e6 100644 --- a/cala-ledger-core-types/src/transaction.rs +++ b/cala-ledger-core-types/src/transaction.rs @@ -15,3 +15,22 @@ pub struct TransactionValues { pub description: Option, pub metadata: Option, } + +mod cel { + use cel_interpreter::{CelMap, CelValue}; + + impl From<&super::TransactionValues> for CelValue { + fn from(tx: &super::TransactionValues) -> Self { + let mut map = CelMap::new(); + map.insert("id", tx.id); + map.insert("journal_id", tx.journal_id); + map.insert("tx_template_id", tx.tx_template_id); + map.insert("effective", tx.effective); + map.insert("correlation_id", tx.correlation_id.clone()); + if let Some(metadata) = &tx.metadata { + map.insert("metadata", metadata.clone()); + } + map.into() + } + } +} diff --git a/cala-ledger/src/cel_context.rs b/cala-ledger/src/cel_context.rs index 2ae41100..84b9aebb 100644 --- a/cala-ledger/src/cel_context.rs +++ b/cala-ledger/src/cel_context.rs @@ -1,4 +1,4 @@ -use cel_interpreter::CelContext; +pub use cel_interpreter::CelContext; pub(crate) fn initialize() -> CelContext { let mut ctx = CelContext::new(); diff --git a/cala-ledger/src/velocity/context.rs b/cala-ledger/src/velocity/context.rs new file mode 100644 index 00000000..37177b0a --- /dev/null +++ b/cala-ledger/src/velocity/context.rs @@ -0,0 +1,39 @@ +use std::collections::HashMap; + +use cala_types::{entry::EntryValues, transaction::TransactionValues}; +use cel_interpreter::{CelMap, CelValue}; + +use crate::{cel_context::*, primitives::EntryId}; + +pub struct EvalContext { + transaction: CelValue, + entry_values: HashMap, +} + +impl EvalContext { + pub fn new(transaction: &TransactionValues) -> Self { + Self { + transaction: transaction.into(), + entry_values: HashMap::new(), + } + } + + pub fn control_context(&mut self, entry: &EntryValues) -> CelContext { + let entry = self + .entry_values + .entry(entry.id) + .or_insert_with(|| entry.into()); + + let mut vars = CelMap::new(); + vars.insert("transaction", self.transaction.clone()); + vars.insert("entry", entry.clone()); + + let mut context = CelMap::new(); + context.insert("vars", vars); + + let mut ctx = initialize(); + ctx.add_variable("context", context); + + ctx + } +} diff --git a/cala-ledger/src/velocity/mod.rs b/cala-ledger/src/velocity/mod.rs index 751dbbbe..a1cf807f 100644 --- a/cala-ledger/src/velocity/mod.rs +++ b/cala-ledger/src/velocity/mod.rs @@ -1,12 +1,14 @@ mod account_control; +mod context; mod control; pub mod error; mod limit; -use cala_types::{entry::EntryValues, transaction::TransactionValues}; use chrono::{DateTime, Utc}; use sqlx::PgPool; +use cala_types::{entry::EntryValues, transaction::TransactionValues}; + pub use crate::param::Params; use crate::{atomic_operation::*, outbox::*, primitives::AccountId}; @@ -127,7 +129,7 @@ impl Velocities { op: &mut AtomicOperation<'_>, created_at: DateTime, transaction: &TransactionValues, - entries: &Vec, + entries: &[EntryValues], account_ids: &[AccountId], ) -> Result<(), VelocityError> { let controls = self @@ -135,8 +137,36 @@ impl Velocities { .find_for_enforcement(op, account_ids) .await?; - for control in controls { - // + let empty = Vec::new(); + + let mut context = context::EvalContext::new(transaction); + + for entry in entries { + for control in controls.get(&entry.account_id).unwrap_or(&empty) { + let ctx = context.control_context(entry); + let control_active = if let Some(condition) = &control.condition { + let control_active: bool = condition.try_evaluate(&ctx)?; + control_active + } else { + true + }; + if control_active { + for limit in &control.velocity_limits { + if let Some(currenty) = &limit.currency { + if currenty != &entry.currency { + continue; + } + } + + let limit_active = if let Some(condition) = &limit.condition { + let limit_active: bool = condition.try_evaluate(&ctx)?; + limit_active + } else { + true + }; + } + } + } } Ok(()) } diff --git a/cala-ledger/tests/velocity.rs b/cala-ledger/tests/velocity.rs index 179da167..fc7cdae1 100644 --- a/cala-ledger/tests/velocity.rs +++ b/cala-ledger/tests/velocity.rs @@ -83,8 +83,10 @@ async fn create_control() -> anyhow::Result<()> { .add_limit_to_control(control.id(), deposit_limit.id()) .await?; - let (one, _) = helpers::test_accounts(); + let (one, two) = helpers::test_accounts(); let one = cala.accounts().create(one).await.unwrap(); + let _ = cala.accounts().create(two).await.unwrap(); + let mut params = Params::new(); params.insert("withdrawal_limit", Decimal::from(100)); params.insert("deposit_limit", Decimal::from(100)); From 621c3216ecd1592e7883a2c09bf05eec761338ac Mon Sep 17 00:00:00 2001 From: bodymindarts Date: Wed, 2 Oct 2024 15:02:16 +0200 Subject: [PATCH 03/14] chore: add Timestamp to cel types --- cala-cel-interpreter/src/cel_type.rs | 3 ++- cala-cel-interpreter/src/value.rs | 6 ++++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/cala-cel-interpreter/src/cel_type.rs b/cala-cel-interpreter/src/cel_type.rs index 097f3f2b..cadf7599 100644 --- a/cala-cel-interpreter/src/cel_type.rs +++ b/cala-cel-interpreter/src/cel_type.rs @@ -11,8 +11,9 @@ pub enum CelType { Bool, Null, - // Addons + // Abstract Date, + Timestamp, Uuid, Decimal, } diff --git a/cala-cel-interpreter/src/value.rs b/cala-cel-interpreter/src/value.rs index d96f35b6..ed7336ed 100644 --- a/cala-cel-interpreter/src/value.rs +++ b/cala-cel-interpreter/src/value.rs @@ -1,5 +1,5 @@ use cel_parser::{ast::Literal, Expression}; -use chrono::NaiveDate; +use chrono::{DateTime, NaiveDate, Utc}; use rust_decimal::Decimal; use uuid::Uuid; @@ -25,9 +25,10 @@ pub enum CelValue { Bool(bool), Null, - // Addons + // Abstract Decimal(Decimal), Date(NaiveDate), + Timestamp(DateTime), Uuid(Uuid), } @@ -226,6 +227,7 @@ impl From<&CelValue> for CelType { CelValue::Decimal(_) => CelType::Decimal, CelValue::Date(_) => CelType::Date, CelValue::Uuid(_) => CelType::Uuid, + CelValue::Timestamp(_) => CelType::Timestamp, } } } From ba6ad67885862d6a5605d1adf2bcfc65b19e5f4b Mon Sep 17 00:00:00 2001 From: bodymindarts Date: Wed, 2 Oct 2024 15:03:55 +0200 Subject: [PATCH 04/14] fix: typo --- cala-ledger/src/velocity/mod.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/cala-ledger/src/velocity/mod.rs b/cala-ledger/src/velocity/mod.rs index a1cf807f..11ab7c12 100644 --- a/cala-ledger/src/velocity/mod.rs +++ b/cala-ledger/src/velocity/mod.rs @@ -152,8 +152,8 @@ impl Velocities { }; if control_active { for limit in &control.velocity_limits { - if let Some(currenty) = &limit.currency { - if currenty != &entry.currency { + if let Some(currency) = &limit.currency { + if currency != &entry.currency { continue; } } @@ -164,6 +164,9 @@ impl Velocities { } else { true }; + if limit_active { + // load balance in window + } } } } From 5cef1b81f54dd18e55401fb47e67fe2fe62ab8af Mon Sep 17 00:00:00 2001 From: bodymindarts Date: Wed, 2 Oct 2024 15:24:57 +0200 Subject: [PATCH 05/14] chore: support nested lookups in cel --- cala-cel-interpreter/src/context/decimal.rs | 6 +++--- cala-cel-interpreter/src/context/mod.rs | 16 ++++++++-------- cala-cel-interpreter/src/interpreter.rs | 9 ++++++--- 3 files changed, 17 insertions(+), 14 deletions(-) diff --git a/cala-cel-interpreter/src/context/decimal.rs b/cala-cel-interpreter/src/context/decimal.rs index 818ce35e..d14e9b03 100644 --- a/cala-cel-interpreter/src/context/decimal.rs +++ b/cala-cel-interpreter/src/context/decimal.rs @@ -1,6 +1,6 @@ use lazy_static::lazy_static; -use std::collections::HashMap; +use std::{borrow::Cow, collections::HashMap}; use crate::builtins; @@ -10,11 +10,11 @@ lazy_static! { pub static ref CEL_CONTEXT: CelContext = { let mut idents = HashMap::new(); idents.insert( - SELF_PACKAGE_NAME.to_string(), + SELF_PACKAGE_NAME, ContextItem::Function(Box::new(builtins::decimal::cast)), ); idents.insert( - "Add".to_string(), + Cow::Borrowed("Add"), ContextItem::Function(Box::new(builtins::decimal::add)), ); CelContext { idents } diff --git a/cala-cel-interpreter/src/context/mod.rs b/cala-cel-interpreter/src/context/mod.rs index 9b9fcfd9..219b57c5 100644 --- a/cala-cel-interpreter/src/context/mod.rs +++ b/cala-cel-interpreter/src/context/mod.rs @@ -1,19 +1,19 @@ mod decimal; -use std::collections::HashMap; +use std::{borrow::Cow, collections::HashMap}; use crate::{builtins, error::*, value::*}; -const SELF_PACKAGE_NAME: &str = "self"; +const SELF_PACKAGE_NAME: Cow<'static, str> = Cow::Borrowed("self"); type CelFunction = Box) -> Result + Sync>; #[derive(Debug)] pub struct CelContext { - idents: HashMap, + idents: HashMap, ContextItem>, } impl CelContext { - pub fn add_variable(&mut self, name: impl Into, value: impl Into) { + pub fn add_variable(&mut self, name: impl Into>, value: impl Into) { self.idents .insert(name.into(), ContextItem::Value(value.into())); } @@ -21,22 +21,22 @@ impl CelContext { pub fn new() -> Self { let mut idents = HashMap::new(); idents.insert( - "date".to_string(), + Cow::Borrowed("date"), ContextItem::Function(Box::new(builtins::date)), ); idents.insert( - "uuid".to_string(), + Cow::Borrowed("uuid"), ContextItem::Function(Box::new(builtins::uuid)), ); idents.insert( - "decimal".to_string(), + Cow::Borrowed("decimal"), ContextItem::Package(&decimal::CEL_CONTEXT), ); Self { idents } } pub(crate) fn package_self(&self) -> Result<&ContextItem, CelError> { - self.lookup(SELF_PACKAGE_NAME) + self.lookup(&SELF_PACKAGE_NAME) } pub(crate) fn lookup(&self, name: &str) -> Result<&ContextItem, CelError> { diff --git a/cala-cel-interpreter/src/interpreter.rs b/cala-cel-interpreter/src/interpreter.rs index e2fc2453..5c027469 100755 --- a/cala-cel-interpreter/src/interpreter.rs +++ b/cala-cel-interpreter/src/interpreter.rs @@ -156,6 +156,7 @@ fn evaluate_member<'a>( use ast::Member::*; match member { Attribute(name) => match target { + EvalType::Value(CelValue::Map(map)) => Ok(EvalType::Value(map.get(name))), EvalType::ContextItem(ContextItem::Value(CelValue::Map(map))) => { Ok(EvalType::Value(map.get(name))) } @@ -379,10 +380,12 @@ mod tests { #[test] fn lookup() { - let expression = "params.hello".parse::().unwrap(); - let mut context = CelContext::new(); + let expression = "params.hello.world".parse::().unwrap(); + let mut hello = CelMap::new(); + hello.insert("world", 42); let mut params = CelMap::new(); - params.insert("hello", 42); + params.insert("hello", hello); + let mut context = CelContext::new(); context.add_variable("params", params); assert_eq!(expression.evaluate(&context).unwrap(), CelValue::Int(42)); } From 0971708ff7fa5a89287c43855960633a11f88c22 Mon Sep 17 00:00:00 2001 From: bodymindarts Date: Wed, 2 Oct 2024 15:52:31 +0200 Subject: [PATCH 06/14] chore: determin_window --- .../src/velocity/account_control/mod.rs | 2 +- .../src/velocity/account_control/value.rs | 2 +- cala-ledger/src/velocity/balance.rs | 13 +++++ cala-ledger/src/velocity/mod.rs | 52 ++++++++++++++++++- 4 files changed, 66 insertions(+), 3 deletions(-) create mode 100644 cala-ledger/src/velocity/balance.rs diff --git a/cala-ledger/src/velocity/account_control/mod.rs b/cala-ledger/src/velocity/account_control/mod.rs index cf1c4908..3e2b3abb 100644 --- a/cala-ledger/src/velocity/account_control/mod.rs +++ b/cala-ledger/src/velocity/account_control/mod.rs @@ -59,7 +59,7 @@ impl AccountControls { }) } velocity_limits.push(AccountVelocityLimit { - velocity_limit_id: velocity.id, + limit_id: velocity.id, window: velocity.window, condition: velocity.condition, currency: velocity.currency, diff --git a/cala-ledger/src/velocity/account_control/value.rs b/cala-ledger/src/velocity/account_control/value.rs index ad320129..ca6135ef 100644 --- a/cala-ledger/src/velocity/account_control/value.rs +++ b/cala-ledger/src/velocity/account_control/value.rs @@ -18,7 +18,7 @@ pub struct AccountVelocityControl { #[derive(Clone, Debug, Serialize, Deserialize)] pub struct AccountVelocityLimit { - pub velocity_limit_id: VelocityLimitId, + pub limit_id: VelocityLimitId, pub window: Vec, pub condition: Option, pub currency: Option, diff --git a/cala-ledger/src/velocity/balance.rs b/cala-ledger/src/velocity/balance.rs new file mode 100644 index 00000000..5378d2b5 --- /dev/null +++ b/cala-ledger/src/velocity/balance.rs @@ -0,0 +1,13 @@ +use rust_decimal::Decimal; + +use crate::primitives::{Currency, VelocityControlId, VelocityLimitId}; +use cala_types::balance::BalanceSnapshot; + +pub struct VelocityBalance { + control_id: VelocityControlId, + limit_id: VelocityLimitId, + spend: Decimal, + remaining: Decimal, + currency: Currency, + balance: BalanceSnapshot, +} diff --git a/cala-ledger/src/velocity/mod.rs b/cala-ledger/src/velocity/mod.rs index 11ab7c12..32069e4e 100644 --- a/cala-ledger/src/velocity/mod.rs +++ b/cala-ledger/src/velocity/mod.rs @@ -1,4 +1,5 @@ mod account_control; +mod balance; mod context; mod control; pub mod error; @@ -141,6 +142,7 @@ impl Velocities { let mut context = context::EvalContext::new(transaction); + let mut balances_to_load = Vec::new(); for entry in entries { for control in controls.get(&entry.account_id).unwrap_or(&empty) { let ctx = context.control_context(entry); @@ -165,7 +167,13 @@ impl Velocities { true }; if limit_active { - // load balance in window + let window = determin_window(&limit.window, &ctx); + balances_to_load.push(( + entry.account_id, + control.control_id, + limit.limit_id, + window, + )); } } } @@ -174,3 +182,45 @@ impl Velocities { Ok(()) } } + +fn determin_window( + keys: &[PartitionKey], + ctx: &cel_interpreter::CelContext, +) -> Result { + let mut map = serde_json::Map::new(); + for key in keys { + let value: serde_json::Value = key.value.try_evaluate(ctx)?; + map.insert(key.alias.clone(), value); + } + Ok(map.into()) +} + +#[cfg(test)] +mod test { + #[test] + fn window_determination() { + use super::*; + use cala_types::velocity::PartitionKey; + use cel_interpreter::CelContext; + use serde_json::json; + + let keys = vec![ + PartitionKey { + alias: "foo".to_string(), + value: "'bar'".parse().expect("Failed to parse"), + }, + PartitionKey { + alias: "baz".to_string(), + value: "'qux'".parse().expect("Failed to parse"), + }, + ]; + + let ctx = CelContext::new(); + let result = determin_window(&keys, &ctx).unwrap(); + let expected = json!({ + "foo": "bar", + "baz": "qux", + }); + assert_eq!(expected, result); + } +} From ddfa605e0e44f064025c4e383ccf57bbf2d894a7 Mon Sep 17 00:00:00 2001 From: bodymindarts Date: Fri, 4 Oct 2024 14:29:22 +0200 Subject: [PATCH 07/14] refactor: introduce velocity/balance module --- .../src/velocity/balance.rs | 28 ++++ cala-ledger-core-types/src/velocity/mod.rs | 2 + ...937c3c446b11076dd83e2bd202cee8c95e2c2.json | 22 +++ ...022b01dd30aedc0092708459e7f49d563e053.json | 46 ++++++ .../20231208110808_cala_ledger_setup.sql | 34 +++++ .../src/velocity/account_control/mod.rs | 2 +- cala-ledger/src/velocity/balance.rs | 13 -- cala-ledger/src/velocity/balance/mod.rs | 133 ++++++++++++++++++ cala-ledger/src/velocity/balance/repo.rs | 21 +++ cala-ledger/src/velocity/mod.rs | 90 +----------- 10 files changed, 293 insertions(+), 98 deletions(-) create mode 100644 cala-ledger-core-types/src/velocity/balance.rs create mode 100644 cala-ledger/.sqlx/query-8b87fd1b4f25b8265528a02375e937c3c446b11076dd83e2bd202cee8c95e2c2.json create mode 100644 cala-ledger/.sqlx/query-d4640c733860f62b22b5d9e5bca022b01dd30aedc0092708459e7f49d563e053.json delete mode 100644 cala-ledger/src/velocity/balance.rs create mode 100644 cala-ledger/src/velocity/balance/mod.rs create mode 100644 cala-ledger/src/velocity/balance/repo.rs diff --git a/cala-ledger-core-types/src/velocity/balance.rs b/cala-ledger-core-types/src/velocity/balance.rs new file mode 100644 index 00000000..f9d91950 --- /dev/null +++ b/cala-ledger-core-types/src/velocity/balance.rs @@ -0,0 +1,28 @@ +use rust_decimal::Decimal; + +use crate::{balance::BalanceSnapshot, primitives::*}; + +#[derive(Debug, sqlx::Type, PartialEq)] +#[sqlx(transparent)] +pub struct Window(serde_json::Value); + +impl From> for Window { + fn from(map: serde_json::Map) -> Self { + Window(map.into()) + } +} + +impl From for Window { + fn from(map: serde_json::Value) -> Self { + Window(map) + } +} + +pub struct VelocityBalance { + pub control_id: VelocityControlId, + pub limit_id: VelocityLimitId, + pub spent: Decimal, + pub remaining: Decimal, + pub currency: Currency, + pub balance: BalanceSnapshot, +} diff --git a/cala-ledger-core-types/src/velocity/mod.rs b/cala-ledger-core-types/src/velocity/mod.rs index e42453d3..0a07c5ab 100644 --- a/cala-ledger-core-types/src/velocity/mod.rs +++ b/cala-ledger-core-types/src/velocity/mod.rs @@ -1,5 +1,7 @@ +mod balance; mod control; mod limit; +pub use balance::*; pub use control::*; pub use limit::*; diff --git a/cala-ledger/.sqlx/query-8b87fd1b4f25b8265528a02375e937c3c446b11076dd83e2bd202cee8c95e2c2.json b/cala-ledger/.sqlx/query-8b87fd1b4f25b8265528a02375e937c3c446b11076dd83e2bd202cee8c95e2c2.json new file mode 100644 index 00000000..281dff95 --- /dev/null +++ b/cala-ledger/.sqlx/query-8b87fd1b4f25b8265528a02375e937c3c446b11076dd83e2bd202cee8c95e2c2.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT values FROM cala_velocity_account_controls\n WHERE data_source_id = '00000000-0000-0000-0000-000000000000' AND account_id = ANY($1)", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "values", + "type_info": "Jsonb" + } + ], + "parameters": { + "Left": [ + "UuidArray" + ] + }, + "nullable": [ + false + ] + }, + "hash": "8b87fd1b4f25b8265528a02375e937c3c446b11076dd83e2bd202cee8c95e2c2" +} diff --git a/cala-ledger/.sqlx/query-d4640c733860f62b22b5d9e5bca022b01dd30aedc0092708459e7f49d563e053.json b/cala-ledger/.sqlx/query-d4640c733860f62b22b5d9e5bca022b01dd30aedc0092708459e7f49d563e053.json new file mode 100644 index 00000000..ac919a0b --- /dev/null +++ b/cala-ledger/.sqlx/query-d4640c733860f62b22b5d9e5bca022b01dd30aedc0092708459e7f49d563e053.json @@ -0,0 +1,46 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT c.id, e.sequence, e.event,\n c.created_at AS entity_created_at, e.recorded_at AS event_recorded_at\n FROM cala_velocity_controls c\n JOIN cala_velocity_control_events e\n ON c.data_source_id = e.data_source_id\n AND c.id = e.id\n WHERE c.data_source_id = '00000000-0000-0000-0000-000000000000'\n AND c.id = $1\n ORDER BY e.sequence", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id", + "type_info": "Uuid" + }, + { + "ordinal": 1, + "name": "sequence", + "type_info": "Int4" + }, + { + "ordinal": 2, + "name": "event", + "type_info": "Jsonb" + }, + { + "ordinal": 3, + "name": "entity_created_at", + "type_info": "Timestamptz" + }, + { + "ordinal": 4, + "name": "event_recorded_at", + "type_info": "Timestamptz" + } + ], + "parameters": { + "Left": [ + "Uuid" + ] + }, + "nullable": [ + false, + false, + false, + false, + false + ] + }, + "hash": "d4640c733860f62b22b5d9e5bca022b01dd30aedc0092708459e7f49d563e053" +} diff --git a/cala-ledger/migrations/20231208110808_cala_ledger_setup.sql b/cala-ledger/migrations/20231208110808_cala_ledger_setup.sql index 172f75de..67ab06f2 100644 --- a/cala-ledger/migrations/20231208110808_cala_ledger_setup.sql +++ b/cala-ledger/migrations/20231208110808_cala_ledger_setup.sql @@ -255,6 +255,40 @@ CREATE TABLE cala_velocity_account_controls ( FOREIGN KEY (data_source_id, velocity_control_id) REFERENCES cala_velocity_controls(data_source_id, id) ); +CREATE TABLE cala_velocity_current_balances ( + data_source_id UUID NOT NULL DEFAULT '00000000-0000-0000-0000-000000000000', + journal_id UUID NOT NULL, + account_id UUID NOT NULL, + currency VARCHAR NOT NULL, + velocity_control_id UUID NOT NULL, + velocity_limit_id UUID NOT NULL, + partition_window JSONB NOT NULL, + latest_version INT NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + UNIQUE(data_source_id, partition_window, currency, journal_id, account_id, velocity_limit_id, velocity_control_id), + FOREIGN KEY (data_source_id, journal_id) REFERENCES cala_journals(data_source_id, id), + FOREIGN KEY (data_source_id, account_id) REFERENCES cala_accounts(data_source_id, id), + FOREIGN KEY (data_source_id, velocity_control_id) REFERENCES cala_velocity_controls(data_source_id, id), + FOREIGN KEY (data_source_id, velocity_limit_id) REFERENCES cala_velocity_limits(data_source_id, id) +); + +CREATE TABLE cala_velocity_balance_history ( + data_source_id UUID NOT NULL DEFAULT '00000000-0000-0000-0000-000000000000', + journal_id UUID NOT NULL, + account_id UUID NOT NULL, + currency VARCHAR NOT NULL, + velocity_control_id UUID NOT NULL, + velocity_limit_id UUID NOT NULL, + partition_window JSONB NOT NULL, + latest_entry_id UUID NOT NULL, + version INT NOT NULL, + values JSONB NOT NULL, + recorded_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + UNIQUE(data_source_id, partition_window, currency, journal_id, account_id, velocity_limit_id, velocity_control_id, version), + FOREIGN KEY (data_source_id, partition_window, currency, journal_id, account_id, velocity_limit_id, velocity_control_id) REFERENCES cala_velocity_current_balances(data_source_id, partition_window, currency, journal_id, account_id, velocity_limit_id, velocity_control_id), + FOREIGN KEY (data_source_id, latest_entry_id) REFERENCES cala_entries(data_source_id, id) +); + CREATE TABLE cala_outbox_events ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), sequence BIGSERIAL UNIQUE, diff --git a/cala-ledger/src/velocity/account_control/mod.rs b/cala-ledger/src/velocity/account_control/mod.rs index 3e2b3abb..acb7a1ce 100644 --- a/cala-ledger/src/velocity/account_control/mod.rs +++ b/cala-ledger/src/velocity/account_control/mod.rs @@ -16,7 +16,7 @@ use cala_types::velocity::{VelocityControlValues, VelocityLimitValues}; use super::error::VelocityError; use repo::*; -use value::*; +pub(super) use value::*; #[derive(Clone)] pub struct AccountControls { diff --git a/cala-ledger/src/velocity/balance.rs b/cala-ledger/src/velocity/balance.rs deleted file mode 100644 index 5378d2b5..00000000 --- a/cala-ledger/src/velocity/balance.rs +++ /dev/null @@ -1,13 +0,0 @@ -use rust_decimal::Decimal; - -use crate::primitives::{Currency, VelocityControlId, VelocityLimitId}; -use cala_types::balance::BalanceSnapshot; - -pub struct VelocityBalance { - control_id: VelocityControlId, - limit_id: VelocityLimitId, - spend: Decimal, - remaining: Decimal, - currency: Currency, - balance: BalanceSnapshot, -} diff --git a/cala-ledger/src/velocity/balance/mod.rs b/cala-ledger/src/velocity/balance/mod.rs new file mode 100644 index 00000000..6efb6778 --- /dev/null +++ b/cala-ledger/src/velocity/balance/mod.rs @@ -0,0 +1,133 @@ +mod repo; + +use chrono::{DateTime, Utc}; +use rust_decimal::Decimal; +use sqlx::PgPool; + +use std::collections::HashMap; + +use cala_types::{ + balance::BalanceSnapshot, + entry::EntryValues, + transaction::TransactionValues, + velocity::{PartitionKey, Window}, +}; + +use crate::{ + atomic_operation::*, + outbox::*, + primitives::{AccountId, Currency, VelocityControlId, VelocityLimitId}, +}; + +use super::{account_control::AccountVelocityControl, error::*}; + +use repo::VelocityBalanceRepo; + +#[derive(Clone)] +pub(super) struct VelocityBalances { + repo: VelocityBalanceRepo, +} + +impl VelocityBalances { + pub fn new(pool: &PgPool) -> Self { + Self { + repo: VelocityBalanceRepo::new(pool), + } + } + + pub(crate) async fn update_balances_in_op( + &self, + op: &mut AtomicOperation<'_>, + _created_at: DateTime, + transaction: &TransactionValues, + entries: &[EntryValues], + controls: HashMap>, + ) -> Result<(), VelocityError> { + let empty = Vec::new(); + + let mut context = super::context::EvalContext::new(transaction); + + let mut balances_to_load = Vec::new(); + for entry in entries { + for control in controls.get(&entry.account_id).unwrap_or(&empty) { + let ctx = context.control_context(entry); + let control_active = if let Some(condition) = &control.condition { + let control_active: bool = condition.try_evaluate(&ctx)?; + control_active + } else { + true + }; + if control_active { + for limit in &control.velocity_limits { + if let Some(currency) = &limit.currency { + if currency != &entry.currency { + continue; + } + } + + let limit_active = if let Some(condition) = &limit.condition { + let limit_active: bool = condition.try_evaluate(&ctx)?; + limit_active + } else { + true + }; + if limit_active { + let window = determin_window(&limit.window, &ctx); + balances_to_load.push(( + entry.journal_id, + entry.account_id, + entry.currency, + control.control_id, + limit.limit_id, + window, + )); + } + } + } + } + } + Ok(()) + } +} + +fn determin_window( + keys: &[PartitionKey], + ctx: &cel_interpreter::CelContext, +) -> Result { + let mut map = serde_json::Map::new(); + for key in keys { + let value: serde_json::Value = key.value.try_evaluate(ctx)?; + map.insert(key.alias.clone(), value); + } + Ok(map.into()) +} + +#[cfg(test)] +mod test { + #[test] + fn window_determination() { + use super::*; + use cala_types::velocity::PartitionKey; + use cel_interpreter::CelContext; + use serde_json::json; + + let keys = vec![ + PartitionKey { + alias: "foo".to_string(), + value: "'bar'".parse().expect("Failed to parse"), + }, + PartitionKey { + alias: "baz".to_string(), + value: "'qux'".parse().expect("Failed to parse"), + }, + ]; + + let ctx = CelContext::new(); + let result = determin_window(&keys, &ctx).unwrap(); + let expected = json!({ + "foo": "bar", + "baz": "qux", + }); + assert_eq!(Window::from(expected), result); + } +} diff --git a/cala-ledger/src/velocity/balance/repo.rs b/cala-ledger/src/velocity/balance/repo.rs new file mode 100644 index 00000000..1e75b72b --- /dev/null +++ b/cala-ledger/src/velocity/balance/repo.rs @@ -0,0 +1,21 @@ +use sqlx::PgPool; + +#[derive(Clone)] +pub(super) struct VelocityBalanceRepo { + pool: PgPool, +} + +impl VelocityBalanceRepo { + pub fn new(pool: &PgPool) -> Self { + Self { pool: pool.clone() } + } + + pub async fn find_for_update( + &self, + // control_id: VelocityControlId, + // limit_id: VelocityLimitId, + // currency: Currency, + ) -> Result<(), sqlx::Error> { + unimplemented!() + } +} diff --git a/cala-ledger/src/velocity/mod.rs b/cala-ledger/src/velocity/mod.rs index 32069e4e..841560a0 100644 --- a/cala-ledger/src/velocity/mod.rs +++ b/cala-ledger/src/velocity/mod.rs @@ -14,6 +14,7 @@ pub use crate::param::Params; use crate::{atomic_operation::*, outbox::*, primitives::AccountId}; use account_control::*; +use balance::*; pub use control::*; use error::*; pub use limit::*; @@ -25,6 +26,7 @@ pub struct Velocities { limits: VelocityLimitRepo, controls: VelocityControlRepo, account_controls: AccountControls, + balances: VelocityBalances, } impl Velocities { @@ -33,6 +35,7 @@ impl Velocities { limits: VelocityLimitRepo::new(pool), controls: VelocityControlRepo::new(pool), account_controls: AccountControls::new(pool), + balances: VelocityBalances::new(pool), pool: pool.clone(), outbox, } @@ -138,89 +141,8 @@ impl Velocities { .find_for_enforcement(op, account_ids) .await?; - let empty = Vec::new(); - - let mut context = context::EvalContext::new(transaction); - - let mut balances_to_load = Vec::new(); - for entry in entries { - for control in controls.get(&entry.account_id).unwrap_or(&empty) { - let ctx = context.control_context(entry); - let control_active = if let Some(condition) = &control.condition { - let control_active: bool = condition.try_evaluate(&ctx)?; - control_active - } else { - true - }; - if control_active { - for limit in &control.velocity_limits { - if let Some(currency) = &limit.currency { - if currency != &entry.currency { - continue; - } - } - - let limit_active = if let Some(condition) = &limit.condition { - let limit_active: bool = condition.try_evaluate(&ctx)?; - limit_active - } else { - true - }; - if limit_active { - let window = determin_window(&limit.window, &ctx); - balances_to_load.push(( - entry.account_id, - control.control_id, - limit.limit_id, - window, - )); - } - } - } - } - } - Ok(()) - } -} - -fn determin_window( - keys: &[PartitionKey], - ctx: &cel_interpreter::CelContext, -) -> Result { - let mut map = serde_json::Map::new(); - for key in keys { - let value: serde_json::Value = key.value.try_evaluate(ctx)?; - map.insert(key.alias.clone(), value); - } - Ok(map.into()) -} - -#[cfg(test)] -mod test { - #[test] - fn window_determination() { - use super::*; - use cala_types::velocity::PartitionKey; - use cel_interpreter::CelContext; - use serde_json::json; - - let keys = vec![ - PartitionKey { - alias: "foo".to_string(), - value: "'bar'".parse().expect("Failed to parse"), - }, - PartitionKey { - alias: "baz".to_string(), - value: "'qux'".parse().expect("Failed to parse"), - }, - ]; - - let ctx = CelContext::new(); - let result = determin_window(&keys, &ctx).unwrap(); - let expected = json!({ - "foo": "bar", - "baz": "qux", - }); - assert_eq!(expected, result); + self.balances + .update_balances_in_op(op, created_at, transaction, entries, controls) + .await } } From 7af1fbaf27685438958f7f61e9b4a0ea99d7ebbc Mon Sep 17 00:00:00 2001 From: bodymindarts Date: Fri, 4 Oct 2024 15:30:53 +0200 Subject: [PATCH 08/14] chore: load current_balances for limits --- .../src/velocity/balance.rs | 2 +- cala-ledger/src/velocity/balance/mod.rs | 52 +++++---- cala-ledger/src/velocity/balance/repo.rs | 106 ++++++++++++++++-- cala-ledger/src/velocity/limit/repo.rs | 6 +- cala-ledger/tests/transaction_post.rs | 2 +- cala-ledger/tests/velocity.rs | 27 ++++- ...937c3c446b11076dd83e2bd202cee8c95e2c2.json | 22 ++++ ...022b01dd30aedc0092708459e7f49d563e053.json | 46 ++++++++ 8 files changed, 226 insertions(+), 37 deletions(-) create mode 100644 cala-server/.sqlx/query-8b87fd1b4f25b8265528a02375e937c3c446b11076dd83e2bd202cee8c95e2c2.json create mode 100644 cala-server/.sqlx/query-d4640c733860f62b22b5d9e5bca022b01dd30aedc0092708459e7f49d563e053.json diff --git a/cala-ledger-core-types/src/velocity/balance.rs b/cala-ledger-core-types/src/velocity/balance.rs index f9d91950..274d4a1f 100644 --- a/cala-ledger-core-types/src/velocity/balance.rs +++ b/cala-ledger-core-types/src/velocity/balance.rs @@ -2,7 +2,7 @@ use rust_decimal::Decimal; use crate::{balance::BalanceSnapshot, primitives::*}; -#[derive(Debug, sqlx::Type, PartialEq)] +#[derive(Debug, sqlx::Type, PartialEq, Eq, Hash)] #[sqlx(transparent)] pub struct Window(serde_json::Value); diff --git a/cala-ledger/src/velocity/balance/mod.rs b/cala-ledger/src/velocity/balance/mod.rs index 6efb6778..816b830d 100644 --- a/cala-ledger/src/velocity/balance/mod.rs +++ b/cala-ledger/src/velocity/balance/mod.rs @@ -1,27 +1,21 @@ mod repo; use chrono::{DateTime, Utc}; -use rust_decimal::Decimal; use sqlx::PgPool; use std::collections::HashMap; use cala_types::{ - balance::BalanceSnapshot, entry::EntryValues, transaction::TransactionValues, velocity::{PartitionKey, Window}, }; -use crate::{ - atomic_operation::*, - outbox::*, - primitives::{AccountId, Currency, VelocityControlId, VelocityLimitId}, -}; +use crate::{atomic_operation::*, primitives::AccountId}; -use super::{account_control::AccountVelocityControl, error::*}; +use super::{account_control::*, error::*}; -use repo::VelocityBalanceRepo; +use repo::*; #[derive(Clone)] pub(super) struct VelocityBalances { @@ -47,7 +41,10 @@ impl VelocityBalances { let mut context = super::context::EvalContext::new(transaction); - let mut balances_to_load = Vec::new(); + let mut entries_to_add: HashMap< + VelocityBalanceKey, + Vec<(&AccountVelocityLimit, &EntryValues)>, + > = HashMap::new(); for entry in entries { for control in controls.get(&entry.account_id).unwrap_or(&empty) { let ctx = context.control_context(entry); @@ -72,25 +69,38 @@ impl VelocityBalances { true }; if limit_active { - let window = determin_window(&limit.window, &ctx); - balances_to_load.push(( - entry.journal_id, - entry.account_id, - entry.currency, - control.control_id, - limit.limit_id, - window, - )); + let window = determine_window(&limit.window, &ctx)?; + entries_to_add + .entry(( + window, + entry.currency, + entry.journal_id, + entry.account_id, + control.control_id, + limit.limit_id, + )) + .or_default() + .push((limit, entry)); } } } } } + + if entries_to_add.is_empty() { + return Ok(()); + } + + let _current_balances = self + .repo + .find_for_update(op.tx(), entries_to_add.keys()) + .await?; + Ok(()) } } -fn determin_window( +fn determine_window( keys: &[PartitionKey], ctx: &cel_interpreter::CelContext, ) -> Result { @@ -123,7 +133,7 @@ mod test { ]; let ctx = CelContext::new(); - let result = determin_window(&keys, &ctx).unwrap(); + let result = determine_window(&keys, &ctx).unwrap(); let expected = json!({ "foo": "bar", "baz": "qux", diff --git a/cala-ledger/src/velocity/balance/repo.rs b/cala-ledger/src/velocity/balance/repo.rs index 1e75b72b..ab7c93fa 100644 --- a/cala-ledger/src/velocity/balance/repo.rs +++ b/cala-ledger/src/velocity/balance/repo.rs @@ -1,21 +1,111 @@ -use sqlx::PgPool; +use sqlx::{PgPool, Postgres, QueryBuilder, Row, Transaction}; + +use std::collections::HashMap; + +use cala_types::{balance::BalanceSnapshot, velocity::Window}; + +use crate::primitives::*; + +pub(super) type VelocityBalanceKey = ( + Window, + Currency, + JournalId, + AccountId, + VelocityControlId, + VelocityLimitId, +); #[derive(Clone)] pub(super) struct VelocityBalanceRepo { - pool: PgPool, + _pool: PgPool, } impl VelocityBalanceRepo { pub fn new(pool: &PgPool) -> Self { - Self { pool: pool.clone() } + Self { + _pool: pool.clone(), + } } pub async fn find_for_update( &self, - // control_id: VelocityControlId, - // limit_id: VelocityLimitId, - // currency: Currency, - ) -> Result<(), sqlx::Error> { - unimplemented!() + db: &mut Transaction<'_, Postgres>, + keys: impl Iterator, + ) -> Result>, sqlx::Error> { + let mut query_builder = QueryBuilder::new( + r#" + WITH inputs AS ( + SELECT * + FROM ( + "#, + ); + query_builder.push_values( + keys, + |mut builder, (window, currency, journal_id, account_id, control_id, limit_id)| { + dbg!(window); + builder.push_bind(window); + builder.push_bind(currency.code()); + builder.push_bind(journal_id); + builder.push_bind(account_id); + builder.push_bind(control_id); + builder.push_bind(limit_id); + }, + ); + query_builder.push( + r#" + ) AS v(partition_window, currency, journal_id, account_id, velocity_control_id, velocity_limit_id) + ), + locked_balances AS ( + SELECT data_source_id, b.partition_window, b.currency, b.journal_id, b.account_id, b.velocity_control_id, b.velocity_limit_id, b.latest_version + FROM cala_velocity_current_balances b + WHERE data_source_id = '00000000-0000-0000-0000-000000000000' AND ((partition_window, currency, journal_id, account_id, velocity_control_id, velocity_limit_id) IN (SELECT * FROM inputs)) + FOR UPDATE + ) + SELECT i.partition_window, i.currency, i.journal_id, i.account_id, i.velocity_control_id, i.velocity_limit_id, h.values + FROM inputs i + LEFT JOIN locked_balances b + ON i.partition_window = b.partition_window + AND i.currency = b.currency + AND i.journal_id = b.journal_id + AND i.account_id = b.account_id + AND i.velocity_control_id = b.velocity_control_id + AND i.velocity_limit_id = b.velocity_limit_id + LEFT JOIN cala_velocity_balance_history h + ON b.data_source_id = h.data_source_id + AND b.partition_window = h.partition_window + AND b.currency = h.currency + AND b.journal_id = h.journal_id + AND b.account_id = h.account_id + AND b.velocity_control_id = h.velocity_control_id + AND b.velocity_limit_id = h.velocity_limit_id + AND b.latest_version = h.version + "# + ); + dbg!(query_builder.sql()); + let query = query_builder.build(); + let rows = query.fetch_all(&mut **db).await?; + + let mut ret = HashMap::new(); + for row in rows { + let values: Option = row.get("values"); + let snapshot = values.map(|v| { + serde_json::from_value::(v) + .expect("Failed to deserialize balance snapshot") + }); + ret.insert( + ( + row.get("partition_window"), + row.get::<&str, _>("currency") + .parse() + .expect("Could not parse currency"), + row.get("journal_id"), + row.get("account_id"), + row.get("velocity_control_id"), + row.get("velocity_limit_id"), + ), + snapshot, + ); + } + Ok(ret) } } diff --git a/cala-ledger/src/velocity/limit/repo.rs b/cala-ledger/src/velocity/limit/repo.rs index bf8544f6..66a06978 100644 --- a/cala-ledger/src/velocity/limit/repo.rs +++ b/cala-ledger/src/velocity/limit/repo.rs @@ -5,12 +5,14 @@ use crate::primitives::VelocityControlId; #[derive(Debug, Clone)] pub struct VelocityLimitRepo { - pool: PgPool, + _pool: PgPool, } impl VelocityLimitRepo { pub fn new(pool: &PgPool) -> Self { - Self { pool: pool.clone() } + Self { + _pool: pool.clone(), + } } pub async fn create_in_tx( diff --git a/cala-ledger/tests/transaction_post.rs b/cala-ledger/tests/transaction_post.rs index 99facb7a..dd65e659 100644 --- a/cala-ledger/tests/transaction_post.rs +++ b/cala-ledger/tests/transaction_post.rs @@ -1,9 +1,9 @@ mod helpers; +use rand::distributions::{Alphanumeric, DistString}; use rust_decimal::Decimal; use cala_ledger::{tx_template::*, *}; -use rand::distributions::{Alphanumeric, DistString}; #[tokio::test] async fn transaction_post() -> anyhow::Result<()> { diff --git a/cala-ledger/tests/velocity.rs b/cala-ledger/tests/velocity.rs index fc7cdae1..8e5c7dd4 100644 --- a/cala-ledger/tests/velocity.rs +++ b/cala-ledger/tests/velocity.rs @@ -1,5 +1,6 @@ mod helpers; +use rand::distributions::{Alphanumeric, DistString}; use rust_decimal::Decimal; use cala_ledger::{velocity::*, *}; @@ -13,6 +14,9 @@ async fn create_control() -> anyhow::Result<()> { .build()?; let cala = CalaLedger::init(cala_config).await?; + let new_journal = helpers::test_journal(); + let journal = cala.journals().create(new_journal).await.unwrap(); + let velocity = cala.velocities(); let withdrawal_limit = NewVelocityLimit::builder() @@ -83,15 +87,30 @@ async fn create_control() -> anyhow::Result<()> { .add_limit_to_control(control.id(), deposit_limit.id()) .await?; - let (one, two) = helpers::test_accounts(); - let one = cala.accounts().create(one).await.unwrap(); - let _ = cala.accounts().create(two).await.unwrap(); + let (sender, receiver) = helpers::test_accounts(); + let sender_account = cala.accounts().create(sender).await.unwrap(); + let recipient_account = cala.accounts().create(receiver).await.unwrap(); let mut params = Params::new(); params.insert("withdrawal_limit", Decimal::from(100)); params.insert("deposit_limit", Decimal::from(100)); velocity - .attach_control_to_account(control.id(), one.id(), params) + .attach_control_to_account(control.id(), sender_account.id(), params) .await?; + + let tx_code = Alphanumeric.sample_string(&mut rand::thread_rng(), 32); + let new_template = helpers::test_template(&tx_code); + + cala.tx_templates().create(new_template).await.unwrap(); + + let mut params = Params::new(); + params.insert("journal_id", journal.id().to_string()); + params.insert("sender", sender_account.id()); + params.insert("recipient", recipient_account.id()); + + cala.post_transaction(TransactionId::new(), &tx_code, params) + .await + .unwrap(); + Ok(()) } diff --git a/cala-server/.sqlx/query-8b87fd1b4f25b8265528a02375e937c3c446b11076dd83e2bd202cee8c95e2c2.json b/cala-server/.sqlx/query-8b87fd1b4f25b8265528a02375e937c3c446b11076dd83e2bd202cee8c95e2c2.json new file mode 100644 index 00000000..281dff95 --- /dev/null +++ b/cala-server/.sqlx/query-8b87fd1b4f25b8265528a02375e937c3c446b11076dd83e2bd202cee8c95e2c2.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT values FROM cala_velocity_account_controls\n WHERE data_source_id = '00000000-0000-0000-0000-000000000000' AND account_id = ANY($1)", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "values", + "type_info": "Jsonb" + } + ], + "parameters": { + "Left": [ + "UuidArray" + ] + }, + "nullable": [ + false + ] + }, + "hash": "8b87fd1b4f25b8265528a02375e937c3c446b11076dd83e2bd202cee8c95e2c2" +} diff --git a/cala-server/.sqlx/query-d4640c733860f62b22b5d9e5bca022b01dd30aedc0092708459e7f49d563e053.json b/cala-server/.sqlx/query-d4640c733860f62b22b5d9e5bca022b01dd30aedc0092708459e7f49d563e053.json new file mode 100644 index 00000000..ac919a0b --- /dev/null +++ b/cala-server/.sqlx/query-d4640c733860f62b22b5d9e5bca022b01dd30aedc0092708459e7f49d563e053.json @@ -0,0 +1,46 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT c.id, e.sequence, e.event,\n c.created_at AS entity_created_at, e.recorded_at AS event_recorded_at\n FROM cala_velocity_controls c\n JOIN cala_velocity_control_events e\n ON c.data_source_id = e.data_source_id\n AND c.id = e.id\n WHERE c.data_source_id = '00000000-0000-0000-0000-000000000000'\n AND c.id = $1\n ORDER BY e.sequence", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id", + "type_info": "Uuid" + }, + { + "ordinal": 1, + "name": "sequence", + "type_info": "Int4" + }, + { + "ordinal": 2, + "name": "event", + "type_info": "Jsonb" + }, + { + "ordinal": 3, + "name": "entity_created_at", + "type_info": "Timestamptz" + }, + { + "ordinal": 4, + "name": "event_recorded_at", + "type_info": "Timestamptz" + } + ], + "parameters": { + "Left": [ + "Uuid" + ] + }, + "nullable": [ + false, + false, + false, + false, + false + ] + }, + "hash": "d4640c733860f62b22b5d9e5bca022b01dd30aedc0092708459e7f49d563e053" +} From c2916b7fd4da946fc47ab11101f735fe4ea853af Mon Sep 17 00:00:00 2001 From: bodymindarts Date: Fri, 4 Oct 2024 20:54:01 +0200 Subject: [PATCH 09/14] chore: create new_snapshots for balance --- .../src/velocity/balance.rs | 2 +- ...013c2246cb79e265f48b3c51d06b55db28bd.json} | 7 +-- ...a584e6a170427c89b39940217bb743d1049b.json} | 7 +-- .../20231208110808_cala_ledger_setup.sql | 1 + cala-ledger/src/account/entity.rs | 42 ++++++++--------- cala-ledger/src/account/repo.rs | 10 ++-- cala-ledger/src/balance/mod.rs | 6 +-- .../src/velocity/account_control/value.rs | 41 +++++++++++++++-- cala-ledger/src/velocity/balance/mod.rs | 46 ++++++++++++++++++- cala-ledger/src/velocity/balance/repo.rs | 2 - cala-ledger/src/velocity/error.rs | 8 ++++ ...013c2246cb79e265f48b3c51d06b55db28bd.json} | 7 +-- ...a584e6a170427c89b39940217bb743d1049b.json} | 7 +-- 13 files changed, 138 insertions(+), 48 deletions(-) rename cala-ledger/.sqlx/{query-67f2b33262254fd0d5d54f5bf92ead9ca880e166919d3e9fd52650bcef348488.json => query-1bdbfe685cc6583cb82612f37bc6013c2246cb79e265f48b3c51d06b55db28bd.json} (64%) rename cala-ledger/.sqlx/{query-de85f101b071aab683abad0a452087b012330d423849a0a20c719816cf0b7343.json => query-2d1a0955c79d7be68272211f35fba584e6a170427c89b39940217bb743d1049b.json} (68%) rename cala-server/.sqlx/{query-67f2b33262254fd0d5d54f5bf92ead9ca880e166919d3e9fd52650bcef348488.json => query-1bdbfe685cc6583cb82612f37bc6013c2246cb79e265f48b3c51d06b55db28bd.json} (64%) rename cala-server/.sqlx/{query-de85f101b071aab683abad0a452087b012330d423849a0a20c719816cf0b7343.json => query-2d1a0955c79d7be68272211f35fba584e6a170427c89b39940217bb743d1049b.json} (68%) diff --git a/cala-ledger-core-types/src/velocity/balance.rs b/cala-ledger-core-types/src/velocity/balance.rs index 274d4a1f..7e0f24d9 100644 --- a/cala-ledger-core-types/src/velocity/balance.rs +++ b/cala-ledger-core-types/src/velocity/balance.rs @@ -2,7 +2,7 @@ use rust_decimal::Decimal; use crate::{balance::BalanceSnapshot, primitives::*}; -#[derive(Debug, sqlx::Type, PartialEq, Eq, Hash)] +#[derive(Debug, Clone, sqlx::Type, PartialEq, Eq, Hash)] #[sqlx(transparent)] pub struct Window(serde_json::Value); diff --git a/cala-ledger/.sqlx/query-67f2b33262254fd0d5d54f5bf92ead9ca880e166919d3e9fd52650bcef348488.json b/cala-ledger/.sqlx/query-1bdbfe685cc6583cb82612f37bc6013c2246cb79e265f48b3c51d06b55db28bd.json similarity index 64% rename from cala-ledger/.sqlx/query-67f2b33262254fd0d5d54f5bf92ead9ca880e166919d3e9fd52650bcef348488.json rename to cala-ledger/.sqlx/query-1bdbfe685cc6583cb82612f37bc6013c2246cb79e265f48b3c51d06b55db28bd.json index 91b6adb0..4ca5ed7e 100644 --- a/cala-ledger/.sqlx/query-67f2b33262254fd0d5d54f5bf92ead9ca880e166919d3e9fd52650bcef348488.json +++ b/cala-ledger/.sqlx/query-1bdbfe685cc6583cb82612f37bc6013c2246cb79e265f48b3c51d06b55db28bd.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "UPDATE cala_accounts\n SET code = $2, name = $3, external_id = $4, normal_balance_type = $5\n WHERE id = $1 AND data_source_id = '00000000-0000-0000-0000-000000000000'", + "query": "UPDATE cala_accounts\n SET code = $2, name = $3, external_id = $4, normal_balance_type = $5, latest_values = $6\n WHERE id = $1 AND data_source_id = '00000000-0000-0000-0000-000000000000'", "describe": { "columns": [], "parameters": { @@ -19,10 +19,11 @@ ] } } - } + }, + "Jsonb" ] }, "nullable": [] }, - "hash": "67f2b33262254fd0d5d54f5bf92ead9ca880e166919d3e9fd52650bcef348488" + "hash": "1bdbfe685cc6583cb82612f37bc6013c2246cb79e265f48b3c51d06b55db28bd" } diff --git a/cala-ledger/.sqlx/query-de85f101b071aab683abad0a452087b012330d423849a0a20c719816cf0b7343.json b/cala-ledger/.sqlx/query-2d1a0955c79d7be68272211f35fba584e6a170427c89b39940217bb743d1049b.json similarity index 68% rename from cala-ledger/.sqlx/query-de85f101b071aab683abad0a452087b012330d423849a0a20c719816cf0b7343.json rename to cala-ledger/.sqlx/query-2d1a0955c79d7be68272211f35fba584e6a170427c89b39940217bb743d1049b.json index d7e7b7d6..4c58ab38 100644 --- a/cala-ledger/.sqlx/query-de85f101b071aab683abad0a452087b012330d423849a0a20c719816cf0b7343.json +++ b/cala-ledger/.sqlx/query-2d1a0955c79d7be68272211f35fba584e6a170427c89b39940217bb743d1049b.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "INSERT INTO cala_accounts (id, code, name, external_id, normal_balance_type, eventually_consistent)\n VALUES ($1, $2, $3, $4, $5, $6)", + "query": "INSERT INTO cala_accounts (id, code, name, external_id, normal_balance_type, eventually_consistent, latest_values)\n VALUES ($1, $2, $3, $4, $5, $6, $7)", "describe": { "columns": [], "parameters": { @@ -20,10 +20,11 @@ } } }, - "Bool" + "Bool", + "Jsonb" ] }, "nullable": [] }, - "hash": "de85f101b071aab683abad0a452087b012330d423849a0a20c719816cf0b7343" + "hash": "2d1a0955c79d7be68272211f35fba584e6a170427c89b39940217bb743d1049b" } diff --git a/cala-ledger/migrations/20231208110808_cala_ledger_setup.sql b/cala-ledger/migrations/20231208110808_cala_ledger_setup.sql index 67ab06f2..9d72634c 100644 --- a/cala-ledger/migrations/20231208110808_cala_ledger_setup.sql +++ b/cala-ledger/migrations/20231208110808_cala_ledger_setup.sql @@ -10,6 +10,7 @@ CREATE TABLE cala_accounts ( external_id VARCHAR, normal_balance_type DebitOrCredit NOT NULL, -- For quick lookup when querying balances eventually_consistent BOOLEAN NOT NULL, -- For balance locking + latest_values JSONB NOT NULL, -- Cached for quicker velocity enforcement created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), UNIQUE(data_source_id, id), UNIQUE(data_source_id, code) diff --git a/cala-ledger/src/account/entity.rs b/cala-ledger/src/account/entity.rs index 524215a1..f317d93c 100644 --- a/cala-ledger/src/account/entity.rs +++ b/cala-ledger/src/account/entity.rs @@ -246,7 +246,7 @@ impl From<(AccountValues, Vec)> for AccountUpdate { } /// Representation of a ***new*** ledger account entity with required/optional properties and a builder. -#[derive(Builder, Debug)] +#[derive(Builder, Debug, Clone)] pub struct NewAccount { #[builder(setter(into))] pub id: AccountId, @@ -275,27 +275,27 @@ impl NewAccount { NewAccountBuilder::default() } + pub(super) fn into_values(self) -> AccountValues { + AccountValues { + id: self.id, + version: 1, + code: self.code, + name: self.name, + external_id: self.external_id, + normal_balance_type: self.normal_balance_type, + status: self.status, + description: self.description, + metadata: self.metadata, + config: AccountConfig { + is_account_set: self.is_account_set, + eventually_consistent: false, + }, + } + } + pub(super) fn initial_events(self) -> EntityEvents { - EntityEvents::init( - self.id, - [AccountEvent::Initialized { - values: AccountValues { - id: self.id, - version: 1, - code: self.code, - name: self.name, - external_id: self.external_id, - normal_balance_type: self.normal_balance_type, - status: self.status, - description: self.description, - metadata: self.metadata, - config: AccountConfig { - is_account_set: self.is_account_set, - eventually_consistent: false, - }, - }, - }], - ) + let values = self.into_values(); + EntityEvents::init(values.id, [AccountEvent::Initialized { values }]) } } diff --git a/cala-ledger/src/account/repo.rs b/cala-ledger/src/account/repo.rs index 5b01e69b..5208f61e 100644 --- a/cala-ledger/src/account/repo.rs +++ b/cala-ledger/src/account/repo.rs @@ -26,14 +26,15 @@ impl AccountRepo { ) -> Result { let id = new_account.id; sqlx::query!( - r#"INSERT INTO cala_accounts (id, code, name, external_id, normal_balance_type, eventually_consistent) - VALUES ($1, $2, $3, $4, $5, $6)"#, + r#"INSERT INTO cala_accounts (id, code, name, external_id, normal_balance_type, eventually_consistent, latest_values) + VALUES ($1, $2, $3, $4, $5, $6, $7)"#, id as AccountId, new_account.code, new_account.name, new_account.external_id, new_account.normal_balance_type as DebitOrCredit, - new_account.eventually_consistent + new_account.eventually_consistent, + serde_json::to_value(&new_account.clone().into_values()).expect("Failed to serialize account values"), ) .execute(&mut **db) .await?; @@ -50,13 +51,14 @@ impl AccountRepo { ) -> Result<(), AccountError> { sqlx::query!( r#"UPDATE cala_accounts - SET code = $2, name = $3, external_id = $4, normal_balance_type = $5 + SET code = $2, name = $3, external_id = $4, normal_balance_type = $5, latest_values = $6 WHERE id = $1 AND data_source_id = '00000000-0000-0000-0000-000000000000'"#, account.values().id as AccountId, account.values().code, account.values().name, account.values().external_id, account.values().normal_balance_type as DebitOrCredit, + serde_json::to_value(account.values()).expect("Failed to serialize account values"), ) .execute(&mut **db) .await?; diff --git a/cala-ledger/src/balance/mod.rs b/cala-ledger/src/balance/mod.rs index 75675756..552c4098 100644 --- a/cala-ledger/src/balance/mod.rs +++ b/cala-ledger/src/balance/mod.rs @@ -159,8 +159,8 @@ impl Balances { ) -> Vec { let mut latest_balances: HashMap<(AccountId, &Currency), BalanceSnapshot> = HashMap::new(); let mut new_balances = Vec::new(); + let empty = Vec::new(); for entry in entries.iter() { - let empty = Vec::new(); for account_id in mappings .get(&entry.account_id) .unwrap_or(&empty) @@ -198,7 +198,7 @@ impl Balances { new_balances } - fn new_snapshot( + pub(crate) fn new_snapshot( time: DateTime, account_id: AccountId, entry: &EntryValues, @@ -237,7 +237,7 @@ impl Balances { ) } - fn update_snapshot( + pub(crate) fn update_snapshot( time: DateTime, mut snapshot: BalanceSnapshot, entry: &EntryValues, diff --git a/cala-ledger/src/velocity/account_control/value.rs b/cala-ledger/src/velocity/account_control/value.rs index ca6135ef..7a7dffad 100644 --- a/cala-ledger/src/velocity/account_control/value.rs +++ b/cala-ledger/src/velocity/account_control/value.rs @@ -2,10 +2,15 @@ use cel_interpreter::CelExpression; use rust_decimal::Decimal; use serde::{Deserialize, Serialize}; -use crate::primitives::{ - AccountId, Currency, DebitOrCredit, Layer, VelocityControlId, VelocityLimitId, +use cala_types::{ + balance::BalanceSnapshot, + velocity::{PartitionKey, VelocityEnforcement}, +}; + +use crate::{ + primitives::{AccountId, Currency, DebitOrCredit, Layer, VelocityControlId, VelocityLimitId}, + velocity::error::VelocityEnforcementError, }; -use cala_types::velocity::{PartitionKey, VelocityEnforcement}; #[derive(Clone, Debug, Serialize, Deserialize)] pub struct AccountVelocityControl { @@ -25,6 +30,36 @@ pub struct AccountVelocityLimit { pub limit: AccountLimit, } +impl AccountVelocityLimit { + pub fn enforce(&self, _balance: &BalanceSnapshot) -> Result<(), VelocityEnforcementError> { + // return Err(VelocityEnforcementError::LimitExceeded); + // let mut spent = Decimal::ZERO; + // let mut remaining = Decimal::ZERO; + // for limit in &self.limit.balance { + // let layer = limit.layer; + // let amount = limit.amount; + // let enforcement_direction = limit.enforcement_direction; + // let balance = balance.get(layer).unwrap_or(&Decimal::ZERO); + // match enforcement_direction { + // DebitOrCredit::Debit => { + // spent += balance; + // remaining += amount - balance; + // } + // DebitOrCredit::Credit => { + // spent += amount - balance; + // remaining += balance; + // } + // } + // } + // if spent > Decimal::ZERO { + // if spent > self.limit.balance.iter().map(|l| l.amount).sum() { + // return Err(VelocityError::LimitExceeded); + // } + // } + Ok(()) + } +} + #[derive(Clone, Debug, Serialize, Deserialize)] pub struct AccountLimit { pub timestamp_source: Option, diff --git a/cala-ledger/src/velocity/balance/mod.rs b/cala-ledger/src/velocity/balance/mod.rs index 816b830d..a05a91c9 100644 --- a/cala-ledger/src/velocity/balance/mod.rs +++ b/cala-ledger/src/velocity/balance/mod.rs @@ -6,6 +6,7 @@ use sqlx::PgPool; use std::collections::HashMap; use cala_types::{ + balance::BalanceSnapshot, entry::EntryValues, transaction::TransactionValues, velocity::{PartitionKey, Window}, @@ -32,7 +33,7 @@ impl VelocityBalances { pub(crate) async fn update_balances_in_op( &self, op: &mut AtomicOperation<'_>, - _created_at: DateTime, + created_at: DateTime, transaction: &TransactionValues, entries: &[EntryValues], controls: HashMap>, @@ -91,13 +92,54 @@ impl VelocityBalances { return Ok(()); } - let _current_balances = self + let current_balances = self .repo .find_for_update(op.tx(), entries_to_add.keys()) .await?; + let _new_balances = Self::new_snapshots(created_at, current_balances, &entries_to_add)?; Ok(()) } + + fn new_snapshots<'a>( + time: DateTime, + mut current_balances: HashMap>, + entries_to_add: &'a HashMap>, + ) -> Result>, VelocityEnforcementError> + { + let mut res = HashMap::new(); + + for (key, entries) in entries_to_add.iter() { + let mut latest_balance: Option = None; + let mut new_balances = Vec::new(); + + for (limit, entry) in entries { + let balance = match (latest_balance.take(), current_balances.remove(key)) { + (Some(latest), _) => { + new_balances.push(latest.clone()); + latest + } + (_, Some(Some(balance))) => balance, + (_, Some(None)) => { + let new_snapshot = + crate::balance::Balances::new_snapshot(time, entry.account_id, entry); + limit.enforce(&new_snapshot)?; + latest_balance = Some(new_snapshot); + continue; + } + _ => unreachable!(), + }; + let new_snapshot = crate::balance::Balances::update_snapshot(time, balance, entry); + limit.enforce(&new_snapshot)?; + new_balances.push(new_snapshot); + } + if let Some(latest) = latest_balance.take() { + new_balances.push(latest) + } + res.insert(key, new_balances); + } + Ok(res) + } } fn determine_window( diff --git a/cala-ledger/src/velocity/balance/repo.rs b/cala-ledger/src/velocity/balance/repo.rs index ab7c93fa..7d030a09 100644 --- a/cala-ledger/src/velocity/balance/repo.rs +++ b/cala-ledger/src/velocity/balance/repo.rs @@ -42,7 +42,6 @@ impl VelocityBalanceRepo { query_builder.push_values( keys, |mut builder, (window, currency, journal_id, account_id, control_id, limit_id)| { - dbg!(window); builder.push_bind(window); builder.push_bind(currency.code()); builder.push_bind(journal_id); @@ -81,7 +80,6 @@ impl VelocityBalanceRepo { AND b.latest_version = h.version "# ); - dbg!(query_builder.sql()); let query = query_builder.build(); let rows = query.fetch_all(&mut **db).await?; diff --git a/cala-ledger/src/velocity/error.rs b/cala-ledger/src/velocity/error.rs index fa2227c3..543e0791 100644 --- a/cala-ledger/src/velocity/error.rs +++ b/cala-ledger/src/velocity/error.rs @@ -16,4 +16,12 @@ pub enum VelocityError { ParamError(#[from] crate::param::error::ParamError), #[error("VelocityError - Could not find control by id: {0}")] CouldNotFindControlById(VelocityControlId), + #[error("VelocityError - VelocityEnforcement: {0}")] + Enforcement(#[from] VelocityEnforcementError), +} + +#[derive(Error, Debug)] +pub enum VelocityEnforcementError { + #[error("VelocityEnforcement - LimitExceeded")] + LimitExceeded, } diff --git a/cala-server/.sqlx/query-67f2b33262254fd0d5d54f5bf92ead9ca880e166919d3e9fd52650bcef348488.json b/cala-server/.sqlx/query-1bdbfe685cc6583cb82612f37bc6013c2246cb79e265f48b3c51d06b55db28bd.json similarity index 64% rename from cala-server/.sqlx/query-67f2b33262254fd0d5d54f5bf92ead9ca880e166919d3e9fd52650bcef348488.json rename to cala-server/.sqlx/query-1bdbfe685cc6583cb82612f37bc6013c2246cb79e265f48b3c51d06b55db28bd.json index 91b6adb0..4ca5ed7e 100644 --- a/cala-server/.sqlx/query-67f2b33262254fd0d5d54f5bf92ead9ca880e166919d3e9fd52650bcef348488.json +++ b/cala-server/.sqlx/query-1bdbfe685cc6583cb82612f37bc6013c2246cb79e265f48b3c51d06b55db28bd.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "UPDATE cala_accounts\n SET code = $2, name = $3, external_id = $4, normal_balance_type = $5\n WHERE id = $1 AND data_source_id = '00000000-0000-0000-0000-000000000000'", + "query": "UPDATE cala_accounts\n SET code = $2, name = $3, external_id = $4, normal_balance_type = $5, latest_values = $6\n WHERE id = $1 AND data_source_id = '00000000-0000-0000-0000-000000000000'", "describe": { "columns": [], "parameters": { @@ -19,10 +19,11 @@ ] } } - } + }, + "Jsonb" ] }, "nullable": [] }, - "hash": "67f2b33262254fd0d5d54f5bf92ead9ca880e166919d3e9fd52650bcef348488" + "hash": "1bdbfe685cc6583cb82612f37bc6013c2246cb79e265f48b3c51d06b55db28bd" } diff --git a/cala-server/.sqlx/query-de85f101b071aab683abad0a452087b012330d423849a0a20c719816cf0b7343.json b/cala-server/.sqlx/query-2d1a0955c79d7be68272211f35fba584e6a170427c89b39940217bb743d1049b.json similarity index 68% rename from cala-server/.sqlx/query-de85f101b071aab683abad0a452087b012330d423849a0a20c719816cf0b7343.json rename to cala-server/.sqlx/query-2d1a0955c79d7be68272211f35fba584e6a170427c89b39940217bb743d1049b.json index d7e7b7d6..4c58ab38 100644 --- a/cala-server/.sqlx/query-de85f101b071aab683abad0a452087b012330d423849a0a20c719816cf0b7343.json +++ b/cala-server/.sqlx/query-2d1a0955c79d7be68272211f35fba584e6a170427c89b39940217bb743d1049b.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "INSERT INTO cala_accounts (id, code, name, external_id, normal_balance_type, eventually_consistent)\n VALUES ($1, $2, $3, $4, $5, $6)", + "query": "INSERT INTO cala_accounts (id, code, name, external_id, normal_balance_type, eventually_consistent, latest_values)\n VALUES ($1, $2, $3, $4, $5, $6, $7)", "describe": { "columns": [], "parameters": { @@ -20,10 +20,11 @@ } } }, - "Bool" + "Bool", + "Jsonb" ] }, "nullable": [] }, - "hash": "de85f101b071aab683abad0a452087b012330d423849a0a20c719816cf0b7343" + "hash": "2d1a0955c79d7be68272211f35fba584e6a170427c89b39940217bb743d1049b" } From ea69f58a63438b3dd7aec34b056db8c07bf54410 Mon Sep 17 00:00:00 2001 From: bodymindarts Date: Fri, 4 Oct 2024 21:20:04 +0200 Subject: [PATCH 10/14] chore: add start/end to limit --- cala-cel-interpreter/src/value.rs | 16 ++++++++++++++++ cala-ledger-core-types/src/primitives.rs | 18 +++++++++--------- cala-ledger-core-types/src/velocity/limit.rs | 2 ++ .../src/velocity/account_control/mod.rs | 14 ++++++++++++++ .../src/velocity/account_control/value.rs | 3 +++ cala-ledger/src/velocity/control/entity.rs | 10 ++++++++-- cala-ledger/src/velocity/limit/entity.rs | 12 ++++++++++++ cala-ledger/src/velocity/mod.rs | 9 ++++++++- 8 files changed, 72 insertions(+), 12 deletions(-) diff --git a/cala-cel-interpreter/src/value.rs b/cala-cel-interpreter/src/value.rs index ed7336ed..4e4ad73d 100644 --- a/cala-cel-interpreter/src/value.rs +++ b/cala-cel-interpreter/src/value.rs @@ -303,6 +303,22 @@ impl<'a> TryFrom> for NaiveDate { } } +impl<'a> TryFrom> for DateTime { + type Error = ResultCoercionError; + + fn try_from(CelResult { expr, val }: CelResult) -> Result { + if let CelValue::Timestamp(d) = val { + Ok(d) + } else { + Err(ResultCoercionError::BadCoreTypeCoercion( + format!("{expr:?}"), + CelType::from(&val), + CelType::Timestamp, + )) + } + } +} + impl<'a> TryFrom> for Uuid { type Error = ResultCoercionError; diff --git a/cala-ledger-core-types/src/primitives.rs b/cala-ledger-core-types/src/primitives.rs index aa014f68..3f99a16d 100644 --- a/cala-ledger-core-types/src/primitives.rs +++ b/cala-ledger-core-types/src/primitives.rs @@ -57,9 +57,9 @@ impl<'a> TryFrom> for DebitOrCredit { } } -impl Into for DebitOrCredit { - fn into(self) -> CelValue { - match self { +impl From for CelValue { + fn from(v: DebitOrCredit) -> Self { + match v { DebitOrCredit::Debit => "DEBIT".into(), DebitOrCredit::Credit => "CREDIT".into(), } @@ -117,9 +117,9 @@ impl Default for Layer { } } -impl Into for Layer { - fn into(self) -> CelValue { - match self { +impl From for CelValue { + fn from(l: Layer) -> Self { + match l { Layer::Settled => "SETTLED".into(), Layer::Pending => "PENDING".into(), Layer::Encumbrance => "ENCUMBRANCE".into(), @@ -150,9 +150,9 @@ impl std::fmt::Display for Currency { } } -impl Into for Currency { - fn into(self) -> CelValue { - self.code().into() +impl From for CelValue { + fn from(c: Currency) -> Self { + c.code().into() } } diff --git a/cala-ledger-core-types/src/velocity/limit.rs b/cala-ledger-core-types/src/velocity/limit.rs index 3c447579..d275d216 100644 --- a/cala-ledger-core-types/src/velocity/limit.rs +++ b/cala-ledger-core-types/src/velocity/limit.rs @@ -33,4 +33,6 @@ pub struct BalanceLimit { pub layer: CelExpression, pub amount: CelExpression, pub enforcement_direction: CelExpression, + pub start: Option, + pub end: Option, } diff --git a/cala-ledger/src/velocity/account_control/mod.rs b/cala-ledger/src/velocity/account_control/mod.rs index acb7a1ce..93f628b0 100644 --- a/cala-ledger/src/velocity/account_control/mod.rs +++ b/cala-ledger/src/velocity/account_control/mod.rs @@ -1,6 +1,7 @@ mod repo; mod value; +use chrono::{DateTime, Utc}; use rust_decimal::Decimal; use sqlx::PgPool; @@ -35,6 +36,7 @@ impl AccountControls { pub async fn attach_control_in_op( &self, op: &mut AtomicOperation<'_>, + created_at: DateTime, control: VelocityControlValues, account_id: AccountId, limits: Vec, @@ -52,10 +54,22 @@ impl AccountControls { let amount: Decimal = limit.amount.try_evaluate(&ctx)?; let enforcement_direction: DebitOrCredit = limit.enforcement_direction.try_evaluate(&ctx)?; + let start = if let Some(start) = limit.start { + start.try_evaluate(&ctx)? + } else { + created_at + }; + let end = if let Some(end) = limit.end { + Some(end.try_evaluate(&ctx)?) + } else { + None + }; limits.push(AccountBalanceLimit { layer, amount, enforcement_direction, + start, + end, }) } velocity_limits.push(AccountVelocityLimit { diff --git a/cala-ledger/src/velocity/account_control/value.rs b/cala-ledger/src/velocity/account_control/value.rs index 7a7dffad..1a2f8013 100644 --- a/cala-ledger/src/velocity/account_control/value.rs +++ b/cala-ledger/src/velocity/account_control/value.rs @@ -1,4 +1,5 @@ use cel_interpreter::CelExpression; +use chrono::{DateTime, Utc}; use rust_decimal::Decimal; use serde::{Deserialize, Serialize}; @@ -71,4 +72,6 @@ pub struct AccountBalanceLimit { pub layer: Layer, pub amount: Decimal, pub enforcement_direction: DebitOrCredit, + pub start: DateTime, + pub end: Option>, } diff --git a/cala-ledger/src/velocity/control/entity.rs b/cala-ledger/src/velocity/control/entity.rs index c1f8e755..a019742d 100644 --- a/cala-ledger/src/velocity/control/entity.rs +++ b/cala-ledger/src/velocity/control/entity.rs @@ -25,7 +25,7 @@ impl EntityEvent for VelocityControlEvent { #[builder(pattern = "owned", build_fn(error = "EntityError"))] pub struct VelocityControl { values: VelocityControlValues, - pub(super) _events: EntityEvents, + pub(super) events: EntityEvents, } impl VelocityControl { @@ -36,6 +36,12 @@ impl VelocityControl { pub fn into_values(self) -> VelocityControlValues { self.values } + + pub fn created_at(&self) -> chrono::DateTime { + self.events + .entity_first_persisted_at + .expect("No events for account") + } } impl Entity for VelocityControl { @@ -54,7 +60,7 @@ impl TryFrom> for VelocityControl { } } } - builder._events(events).build() + builder.events(events).build() } } diff --git a/cala-ledger/src/velocity/limit/entity.rs b/cala-ledger/src/velocity/limit/entity.rs index 4ceb36ae..390050ed 100644 --- a/cala-ledger/src/velocity/limit/entity.rs +++ b/cala-ledger/src/velocity/limit/entity.rs @@ -124,6 +124,12 @@ impl NewVelocityLimit { input.enforcement_direction, ) .expect("already validated"), + start: input.start.map(|expr| { + CelExpression::try_from(expr).expect("already validated") + }), + end: input.end.map(|expr| { + CelExpression::try_from(expr).expect("already validated") + }), }) .collect(), }, @@ -191,6 +197,10 @@ pub struct NewBalanceLimit { amount: String, #[builder(setter(into))] enforcement_direction: String, + #[builder(setter(into), default)] + start: Option, + #[builder(setter(into), default)] + end: Option, } impl NewBalanceLimit { pub fn builder() -> NewBalanceLimitBuilder { @@ -214,6 +224,8 @@ impl NewBalanceLimitBuilder { .as_ref() .expect("Mandatory field 'enforcement_direction' not set"), )?; + validate_optional_expression(&self.start)?; + validate_optional_expression(&self.end)?; Ok(()) } } diff --git a/cala-ledger/src/velocity/mod.rs b/cala-ledger/src/velocity/mod.rs index 841560a0..faef9729 100644 --- a/cala-ledger/src/velocity/mod.rs +++ b/cala-ledger/src/velocity/mod.rs @@ -123,7 +123,14 @@ impl Velocities { let control = self.controls.find_by_id(op.tx(), control_id).await?; let limits = self.limits.list_for_control(op.tx(), control_id).await?; self.account_controls - .attach_control_in_op(op, control.into_values(), account_id, limits, params) + .attach_control_in_op( + op, + control.created_at(), + control.into_values(), + account_id, + limits, + params, + ) .await?; Ok(()) } From 7617fc6e675c22d3d74adabc1d064fc421eb1eb9 Mon Sep 17 00:00:00 2001 From: bodymindarts Date: Sat, 5 Oct 2024 06:45:28 +0200 Subject: [PATCH 11/14] chore: enforcement --- cala-ledger/src/balance/account_balance.rs | 58 ++++++++++++- cala-ledger/src/balance/repo.rs | 20 +---- .../src/velocity/account_control/value.rs | 71 +++++++++------ cala-ledger/src/velocity/balance/mod.rs | 12 +-- cala-ledger/src/velocity/error.rs | 18 ++-- cala-ledger/tests/account_set.rs | 2 +- cala-ledger/tests/balance.rs | 2 +- cala-ledger/tests/helpers.rs | 86 ++++++++++++++++++- cala-ledger/tests/transaction_post.rs | 2 +- cala-ledger/tests/velocity.rs | 15 ++-- 10 files changed, 218 insertions(+), 68 deletions(-) diff --git a/cala-ledger/src/balance/account_balance.rs b/cala-ledger/src/balance/account_balance.rs index 7848fe9d..35919d5c 100644 --- a/cala-ledger/src/balance/account_balance.rs +++ b/cala-ledger/src/balance/account_balance.rs @@ -6,11 +6,18 @@ use cala_types::balance::*; /// Representation of account's balance tracked in 3 distinct layers. #[derive(Debug, Clone)] pub struct AccountBalance { - pub(super) balance_type: DebitOrCredit, + balance_type: DebitOrCredit, pub details: BalanceSnapshot, } impl AccountBalance { + pub(crate) fn new(balance_type: DebitOrCredit, details: BalanceSnapshot) -> Self { + Self { + balance_type, + details, + } + } + pub(super) fn derive_diff(mut self, since: &Self) -> Self { self.details.settled = BalanceAmount { dr_balance: self.details.settled.dr_balance - since.details.settled.dr_balance, @@ -31,7 +38,50 @@ impl AccountBalance { } pub fn pending(&self) -> Decimal { - if self.balance_type == DebitOrCredit::Credit { + BalanceWithDirection { + direction: self.balance_type, + details: &self.details, + } + .pending() + } + + pub fn settled(&self) -> Decimal { + BalanceWithDirection { + direction: self.balance_type, + details: &self.details, + } + .settled() + } + + pub fn encumbrance(&self) -> Decimal { + BalanceWithDirection { + direction: self.balance_type, + details: &self.details, + } + .encumbrance() + } + + pub fn available(&self, layer: Layer) -> Decimal { + BalanceWithDirection { + direction: self.balance_type, + details: &self.details, + } + .available(layer) + } +} + +pub(crate) struct BalanceWithDirection<'a> { + direction: DebitOrCredit, + details: &'a BalanceSnapshot, +} + +impl<'a> BalanceWithDirection<'a> { + pub fn new(direction: DebitOrCredit, details: &'a BalanceSnapshot) -> Self { + Self { direction, details } + } + + pub fn pending(&self) -> Decimal { + if self.direction == DebitOrCredit::Credit { self.details.pending.cr_balance - self.details.pending.dr_balance } else { self.details.pending.dr_balance - self.details.pending.cr_balance @@ -39,7 +89,7 @@ impl AccountBalance { } pub fn settled(&self) -> Decimal { - if self.balance_type == DebitOrCredit::Credit { + if self.direction == DebitOrCredit::Credit { self.details.settled.cr_balance - self.details.settled.dr_balance } else { self.details.settled.dr_balance - self.details.settled.cr_balance @@ -47,7 +97,7 @@ impl AccountBalance { } pub fn encumbrance(&self) -> Decimal { - if self.balance_type == DebitOrCredit::Credit { + if self.direction == DebitOrCredit::Credit { self.details.encumbrance.cr_balance - self.details.encumbrance.dr_balance } else { self.details.encumbrance.dr_balance - self.details.encumbrance.cr_balance diff --git a/cala-ledger/src/balance/repo.rs b/cala-ledger/src/balance/repo.rs index 557b627a..08c18da0 100644 --- a/cala-ledger/src/balance/repo.rs +++ b/cala-ledger/src/balance/repo.rs @@ -78,10 +78,7 @@ impl BalanceRepo { if let Some(row) = row { let details: BalanceSnapshot = serde_json::from_value(row.values).expect("Failed to deserialize balance snapshot"); - Ok(AccountBalance { - balance_type: row.normal_balance_type, - details, - }) + Ok(AccountBalance::new(row.normal_balance_type, details)) } else { Err(BalanceError::NotFound(journal_id, account_id, currency)) } @@ -149,18 +146,12 @@ impl BalanceRepo { let details: BalanceSnapshot = serde_json::from_value(row.values.expect("values is not null")) .expect("Failed to deserialize balance snapshot"); - first = Some(AccountBalance { - balance_type: row.normal_balance_type, - details, - }); + first = Some(AccountBalance::new(row.normal_balance_type, details)); } else { let details: BalanceSnapshot = serde_json::from_value(row.values.expect("values is not null")) .expect("Failed to deserialize balance snapshot"); - last = Some(AccountBalance { - balance_type: row.normal_balance_type, - details, - }); + last = Some(AccountBalance::new(row.normal_balance_type, details)); } } Ok((first, last)) @@ -201,10 +192,7 @@ impl BalanceRepo { let normal_balance_type: DebitOrCredit = row.get("normal_balance_type"); ret.insert( (details.journal_id, details.account_id, details.currency), - AccountBalance { - details, - balance_type: normal_balance_type, - }, + AccountBalance::new(normal_balance_type, details), ); } Ok(ret) diff --git a/cala-ledger/src/velocity/account_control/value.rs b/cala-ledger/src/velocity/account_control/value.rs index 1a2f8013..bfe36d44 100644 --- a/cala-ledger/src/velocity/account_control/value.rs +++ b/cala-ledger/src/velocity/account_control/value.rs @@ -1,4 +1,4 @@ -use cel_interpreter::CelExpression; +use cel_interpreter::{CelContext, CelExpression}; use chrono::{DateTime, Utc}; use rust_decimal::Decimal; use serde::{Deserialize, Serialize}; @@ -10,7 +10,7 @@ use cala_types::{ use crate::{ primitives::{AccountId, Currency, DebitOrCredit, Layer, VelocityControlId, VelocityLimitId}, - velocity::error::VelocityEnforcementError, + velocity::error::*, }; #[derive(Clone, Debug, Serialize, Deserialize)] @@ -32,31 +32,48 @@ pub struct AccountVelocityLimit { } impl AccountVelocityLimit { - pub fn enforce(&self, _balance: &BalanceSnapshot) -> Result<(), VelocityEnforcementError> { - // return Err(VelocityEnforcementError::LimitExceeded); - // let mut spent = Decimal::ZERO; - // let mut remaining = Decimal::ZERO; - // for limit in &self.limit.balance { - // let layer = limit.layer; - // let amount = limit.amount; - // let enforcement_direction = limit.enforcement_direction; - // let balance = balance.get(layer).unwrap_or(&Decimal::ZERO); - // match enforcement_direction { - // DebitOrCredit::Debit => { - // spent += balance; - // remaining += amount - balance; - // } - // DebitOrCredit::Credit => { - // spent += amount - balance; - // remaining += balance; - // } - // } - // } - // if spent > Decimal::ZERO { - // if spent > self.limit.balance.iter().map(|l| l.amount).sum() { - // return Err(VelocityError::LimitExceeded); - // } - // } + pub fn enforce( + &self, + ctx: &CelContext, + time: DateTime, + snapshot: &BalanceSnapshot, + ) -> Result<(), VelocityError> { + if let Some(currency) = &self.currency { + if currency != &snapshot.currency { + return Ok(()); + } + } + let time = if let Some(source) = &self.limit.timestamp_source { + source.try_evaluate(ctx)? + } else { + time + }; + for limit in self.limit.balance.iter() { + if limit.start > time { + continue; + } + if let Some(end) = limit.end { + if end <= time { + continue; + } + } + let balance = + crate::balance::BalanceWithDirection::new(limit.enforcement_direction, snapshot); + let requested = balance.available(limit.layer); + + if requested > limit.amount { + return Err(LimitExceededError { + account_id: snapshot.account_id, + currency: snapshot.currency.code().to_string(), + limit_id: self.limit_id, + layer: limit.layer, + limit: limit.amount, + requested, + } + .into()); + } + } + Ok(()) } } diff --git a/cala-ledger/src/velocity/balance/mod.rs b/cala-ledger/src/velocity/balance/mod.rs index a05a91c9..ef027027 100644 --- a/cala-ledger/src/velocity/balance/mod.rs +++ b/cala-ledger/src/velocity/balance/mod.rs @@ -97,16 +97,17 @@ impl VelocityBalances { .find_for_update(op.tx(), entries_to_add.keys()) .await?; - let _new_balances = Self::new_snapshots(created_at, current_balances, &entries_to_add)?; + let _new_balances = + Self::new_snapshots(context, created_at, current_balances, &entries_to_add)?; Ok(()) } fn new_snapshots<'a>( + mut context: super::context::EvalContext, time: DateTime, mut current_balances: HashMap>, entries_to_add: &'a HashMap>, - ) -> Result>, VelocityEnforcementError> - { + ) -> Result>, VelocityError> { let mut res = HashMap::new(); for (key, entries) in entries_to_add.iter() { @@ -114,6 +115,7 @@ impl VelocityBalances { let mut new_balances = Vec::new(); for (limit, entry) in entries { + let ctx = context.control_context(entry); let balance = match (latest_balance.take(), current_balances.remove(key)) { (Some(latest), _) => { new_balances.push(latest.clone()); @@ -123,14 +125,14 @@ impl VelocityBalances { (_, Some(None)) => { let new_snapshot = crate::balance::Balances::new_snapshot(time, entry.account_id, entry); - limit.enforce(&new_snapshot)?; + limit.enforce(&ctx, time, &new_snapshot)?; latest_balance = Some(new_snapshot); continue; } _ => unreachable!(), }; let new_snapshot = crate::balance::Balances::update_snapshot(time, balance, entry); - limit.enforce(&new_snapshot)?; + limit.enforce(&ctx, time, &new_snapshot)?; new_balances.push(new_snapshot); } if let Some(latest) = latest_balance.take() { diff --git a/cala-ledger/src/velocity/error.rs b/cala-ledger/src/velocity/error.rs index 543e0791..9771b4d3 100644 --- a/cala-ledger/src/velocity/error.rs +++ b/cala-ledger/src/velocity/error.rs @@ -1,8 +1,9 @@ +use rust_decimal::Decimal; use thiserror::Error; use cel_interpreter::CelError; -use crate::primitives::VelocityControlId; +use crate::primitives::*; #[derive(Error, Debug)] pub enum VelocityError { @@ -16,12 +17,17 @@ pub enum VelocityError { ParamError(#[from] crate::param::error::ParamError), #[error("VelocityError - Could not find control by id: {0}")] CouldNotFindControlById(VelocityControlId), - #[error("VelocityError - VelocityEnforcement: {0}")] - Enforcement(#[from] VelocityEnforcementError), + #[error("VelocityError - Enforcement: {0}")] + Enforcement(#[from] LimitExceededError), } #[derive(Error, Debug)] -pub enum VelocityEnforcementError { - #[error("VelocityEnforcement - LimitExceeded")] - LimitExceeded, +#[error("Velocity limit exceeded for account {account_id} in currency {currency}: Limit ({limit_id}): {limit}, Requested: {requested}")] +pub struct LimitExceededError { + pub account_id: AccountId, + pub currency: String, + pub limit_id: VelocityLimitId, + pub layer: Layer, + pub limit: Decimal, + pub requested: Decimal, } diff --git a/cala-ledger/tests/account_set.rs b/cala-ledger/tests/account_set.rs index f83a6cd2..defd6369 100644 --- a/cala-ledger/tests/account_set.rs +++ b/cala-ledger/tests/account_set.rs @@ -100,7 +100,7 @@ async fn balances() -> anyhow::Result<()> { let recipient_account = cala.accounts().create(receiver).await.unwrap(); let tx_code = Alphanumeric.sample_string(&mut rand::thread_rng(), 32); - let new_template = helpers::test_template(&tx_code); + let new_template = helpers::currency_conversion_template(&tx_code); cala.tx_templates().create(new_template).await.unwrap(); let before_account_set = NewAccountSet::builder() diff --git a/cala-ledger/tests/balance.rs b/cala-ledger/tests/balance.rs index fbbbde0a..558e8e1e 100644 --- a/cala-ledger/tests/balance.rs +++ b/cala-ledger/tests/balance.rs @@ -25,7 +25,7 @@ async fn balance_in_range() -> anyhow::Result<()> { let recipient_account = cala.accounts().create(receiver).await.unwrap(); let tx_code = Alphanumeric.sample_string(&mut rand::thread_rng(), 32); - let new_template = helpers::test_template(&tx_code); + let new_template = helpers::currency_conversion_template(&tx_code); cala.tx_templates().create(new_template).await.unwrap(); let mut params = Params::new(); diff --git a/cala-ledger/tests/helpers.rs b/cala-ledger/tests/helpers.rs index f50de23f..6cf8dd3e 100644 --- a/cala-ledger/tests/helpers.rs +++ b/cala-ledger/tests/helpers.rs @@ -37,7 +37,7 @@ pub fn test_accounts() -> (NewAccount, NewAccount) { (sender_account, recipient_account) } -pub fn test_template(code: &str) -> NewTxTemplate { +pub fn currency_conversion_template(code: &str) -> NewTxTemplate { let params = vec![ NewParamDefinition::builder() .name("recipient") @@ -133,3 +133,87 @@ pub fn test_template(code: &str) -> NewTxTemplate { .build() .unwrap() } + +pub fn velocity_template(code: &str) -> NewTxTemplate { + let params = vec![ + NewParamDefinition::builder() + .name("recipient") + .r#type(ParamDataType::Uuid) + .build() + .unwrap(), + NewParamDefinition::builder() + .name("sender") + .r#type(ParamDataType::Uuid) + .build() + .unwrap(), + NewParamDefinition::builder() + .name("journal_id") + .r#type(ParamDataType::Uuid) + .build() + .unwrap(), + NewParamDefinition::builder() + .name("amount") + .r#type(ParamDataType::Decimal) + .build() + .unwrap(), + NewParamDefinition::builder() + .name("currency") + .r#type(ParamDataType::String) + .default_expr("'USD'") + .build() + .unwrap(), + NewParamDefinition::builder() + .name("layer") + .r#type(ParamDataType::String) + .default_expr("'SETTLED'") + .build() + .unwrap(), + NewParamDefinition::builder() + .name("meta") + .r#type(ParamDataType::Json) + .default_expr(r#"{"foo": "bar"}"#) + .build() + .unwrap(), + NewParamDefinition::builder() + .name("effective") + .r#type(ParamDataType::Date) + .default_expr("date()") + .build() + .unwrap(), + ]; + let entries = vec![ + NewTxTemplateEntry::builder() + .entry_type("'TEST_DR'") + .account_id("params.sender") + .layer("params.layer") + .direction("DEBIT") + .units("params.amount") + .currency("params.currency") + .build() + .unwrap(), + NewTxTemplateEntry::builder() + .entry_type("'TEST_CR'") + .account_id("params.recipient") + .layer("params.layer") + .direction("CREDIT") + .units("params.amount") + .currency("params.currency") + .build() + .unwrap(), + ]; + NewTxTemplate::builder() + .id(uuid::Uuid::new_v4()) + .code(code) + .params(params) + .transaction( + NewTxTemplateTransaction::builder() + .effective("params.effective") + .journal_id("params.journal_id") + .metadata("params.meta") + .build() + .unwrap(), + ) + .entries(entries) + .build() + .unwrap() +} diff --git a/cala-ledger/tests/transaction_post.rs b/cala-ledger/tests/transaction_post.rs index dd65e659..25a8fd7b 100644 --- a/cala-ledger/tests/transaction_post.rs +++ b/cala-ledger/tests/transaction_post.rs @@ -22,7 +22,7 @@ async fn transaction_post() -> anyhow::Result<()> { let recipient_account = cala.accounts().create(receiver).await.unwrap(); let tx_code = Alphanumeric.sample_string(&mut rand::thread_rng(), 32); - let new_template = helpers::test_template(&tx_code); + let new_template = helpers::currency_conversion_template(&tx_code); cala.tx_templates().create(new_template).await.unwrap(); diff --git a/cala-ledger/tests/velocity.rs b/cala-ledger/tests/velocity.rs index 8e5c7dd4..77d2155a 100644 --- a/cala-ledger/tests/velocity.rs +++ b/cala-ledger/tests/velocity.rs @@ -92,14 +92,15 @@ async fn create_control() -> anyhow::Result<()> { let recipient_account = cala.accounts().create(receiver).await.unwrap(); let mut params = Params::new(); - params.insert("withdrawal_limit", Decimal::from(100)); - params.insert("deposit_limit", Decimal::from(100)); + let limit = Decimal::ONE_HUNDRED; + params.insert("withdrawal_limit", limit); + params.insert("deposit_limit", limit); velocity .attach_control_to_account(control.id(), sender_account.id(), params) .await?; let tx_code = Alphanumeric.sample_string(&mut rand::thread_rng(), 32); - let new_template = helpers::test_template(&tx_code); + let new_template = helpers::velocity_template(&tx_code); cala.tx_templates().create(new_template).await.unwrap(); @@ -107,10 +108,12 @@ async fn create_control() -> anyhow::Result<()> { params.insert("journal_id", journal.id().to_string()); params.insert("sender", sender_account.id()); params.insert("recipient", recipient_account.id()); + params.insert("amount", limit + Decimal::ONE); - cala.post_transaction(TransactionId::new(), &tx_code, params) - .await - .unwrap(); + let res = cala + .post_transaction(TransactionId::new(), &tx_code, params.clone()) + .await; + assert!(res.is_err()); Ok(()) } From 590790603a4a4816cb710a4d0605fe4646564f76 Mon Sep 17 00:00:00 2001 From: bodymindarts Date: Sat, 5 Oct 2024 06:53:03 +0200 Subject: [PATCH 12/14] chore: import latest_values for account --- ...d5a04883d934490afa99104ab790689a9181e28a0e2.json} | 7 ++++--- cala-ledger/src/account/repo.rs | 7 ++++--- cala-ledger/src/velocity/balance/mod.rs | 7 ++++++- cala-ledger/src/velocity/balance/repo.rs | 12 ++++++++++-- ...d5a04883d934490afa99104ab790689a9181e28a0e2.json} | 7 ++++--- 5 files changed, 28 insertions(+), 12 deletions(-) rename cala-ledger/.sqlx/{query-35ac870c6d687c4e91412a1465238bf150fb646b646a3f12159817caab4fc7aa.json => query-49602a3b0dbe7780cdd64d5a04883d934490afa99104ab790689a9181e28a0e2.json} (74%) rename cala-server/.sqlx/{query-35ac870c6d687c4e91412a1465238bf150fb646b646a3f12159817caab4fc7aa.json => query-49602a3b0dbe7780cdd64d5a04883d934490afa99104ab790689a9181e28a0e2.json} (74%) diff --git a/cala-ledger/.sqlx/query-35ac870c6d687c4e91412a1465238bf150fb646b646a3f12159817caab4fc7aa.json b/cala-ledger/.sqlx/query-49602a3b0dbe7780cdd64d5a04883d934490afa99104ab790689a9181e28a0e2.json similarity index 74% rename from cala-ledger/.sqlx/query-35ac870c6d687c4e91412a1465238bf150fb646b646a3f12159817caab4fc7aa.json rename to cala-ledger/.sqlx/query-49602a3b0dbe7780cdd64d5a04883d934490afa99104ab790689a9181e28a0e2.json index e21da570..dbdefce4 100644 --- a/cala-ledger/.sqlx/query-35ac870c6d687c4e91412a1465238bf150fb646b646a3f12159817caab4fc7aa.json +++ b/cala-ledger/.sqlx/query-49602a3b0dbe7780cdd64d5a04883d934490afa99104ab790689a9181e28a0e2.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "INSERT INTO cala_accounts (data_source_id, id, code, name, external_id, normal_balance_type, eventually_consistent, created_at)\n VALUES ($1, $2, $3, $4, $5, $6, $7, $8)", + "query": "INSERT INTO cala_accounts (data_source_id, id, code, name, external_id, normal_balance_type, eventually_consistent, created_at, latest_values)\n VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)", "describe": { "columns": [], "parameters": { @@ -22,10 +22,11 @@ } }, "Bool", - "Timestamptz" + "Timestamptz", + "Jsonb" ] }, "nullable": [] }, - "hash": "35ac870c6d687c4e91412a1465238bf150fb646b646a3f12159817caab4fc7aa" + "hash": "49602a3b0dbe7780cdd64d5a04883d934490afa99104ab790689a9181e28a0e2" } diff --git a/cala-ledger/src/account/repo.rs b/cala-ledger/src/account/repo.rs index 5208f61e..378d1699 100644 --- a/cala-ledger/src/account/repo.rs +++ b/cala-ledger/src/account/repo.rs @@ -232,8 +232,8 @@ impl AccountRepo { account: &mut Account, ) -> Result<(), AccountError> { sqlx::query!( - r#"INSERT INTO cala_accounts (data_source_id, id, code, name, external_id, normal_balance_type, eventually_consistent, created_at) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8)"#, + r#"INSERT INTO cala_accounts (data_source_id, id, code, name, external_id, normal_balance_type, eventually_consistent, created_at, latest_values) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)"#, origin as DataSourceId, account.values().id as AccountId, account.values().code, @@ -241,7 +241,8 @@ impl AccountRepo { account.values().external_id, account.values().normal_balance_type as DebitOrCredit, account.values().config.eventually_consistent, - recorded_at + recorded_at, + serde_json::to_value(account.values()).expect("Failed to serialize account values"), ) .execute(&mut **db) .await?; diff --git a/cala-ledger/src/velocity/balance/mod.rs b/cala-ledger/src/velocity/balance/mod.rs index ef027027..41c1b024 100644 --- a/cala-ledger/src/velocity/balance/mod.rs +++ b/cala-ledger/src/velocity/balance/mod.rs @@ -97,8 +97,13 @@ impl VelocityBalances { .find_for_update(op.tx(), entries_to_add.keys()) .await?; - let _new_balances = + let new_balances = Self::new_snapshots(context, created_at, current_balances, &entries_to_add)?; + + self.repo + .insert_new_snapshots(op.tx(), new_balances) + .await?; + Ok(()) } diff --git a/cala-ledger/src/velocity/balance/repo.rs b/cala-ledger/src/velocity/balance/repo.rs index 7d030a09..550dae6a 100644 --- a/cala-ledger/src/velocity/balance/repo.rs +++ b/cala-ledger/src/velocity/balance/repo.rs @@ -4,7 +4,7 @@ use std::collections::HashMap; use cala_types::{balance::BalanceSnapshot, velocity::Window}; -use crate::primitives::*; +use crate::{primitives::*, velocity::error::VelocityError}; pub(super) type VelocityBalanceKey = ( Window, @@ -31,7 +31,7 @@ impl VelocityBalanceRepo { &self, db: &mut Transaction<'_, Postgres>, keys: impl Iterator, - ) -> Result>, sqlx::Error> { + ) -> Result>, VelocityError> { let mut query_builder = QueryBuilder::new( r#" WITH inputs AS ( @@ -106,4 +106,12 @@ impl VelocityBalanceRepo { } Ok(ret) } + + pub(crate) async fn insert_new_snapshots( + &self, + _db: &mut Transaction<'_, Postgres>, + _new_balances: HashMap<&VelocityBalanceKey, Vec>, + ) -> Result<(), VelocityError> { + unimplemented!() + } } diff --git a/cala-server/.sqlx/query-35ac870c6d687c4e91412a1465238bf150fb646b646a3f12159817caab4fc7aa.json b/cala-server/.sqlx/query-49602a3b0dbe7780cdd64d5a04883d934490afa99104ab790689a9181e28a0e2.json similarity index 74% rename from cala-server/.sqlx/query-35ac870c6d687c4e91412a1465238bf150fb646b646a3f12159817caab4fc7aa.json rename to cala-server/.sqlx/query-49602a3b0dbe7780cdd64d5a04883d934490afa99104ab790689a9181e28a0e2.json index e21da570..dbdefce4 100644 --- a/cala-server/.sqlx/query-35ac870c6d687c4e91412a1465238bf150fb646b646a3f12159817caab4fc7aa.json +++ b/cala-server/.sqlx/query-49602a3b0dbe7780cdd64d5a04883d934490afa99104ab790689a9181e28a0e2.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "INSERT INTO cala_accounts (data_source_id, id, code, name, external_id, normal_balance_type, eventually_consistent, created_at)\n VALUES ($1, $2, $3, $4, $5, $6, $7, $8)", + "query": "INSERT INTO cala_accounts (data_source_id, id, code, name, external_id, normal_balance_type, eventually_consistent, created_at, latest_values)\n VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)", "describe": { "columns": [], "parameters": { @@ -22,10 +22,11 @@ } }, "Bool", - "Timestamptz" + "Timestamptz", + "Jsonb" ] }, "nullable": [] }, - "hash": "35ac870c6d687c4e91412a1465238bf150fb646b646a3f12159817caab4fc7aa" + "hash": "49602a3b0dbe7780cdd64d5a04883d934490afa99104ab790689a9181e28a0e2" } From a44b161d5dee2491a95548bd836b65e4c6e9d919 Mon Sep 17 00:00:00 2001 From: bodymindarts Date: Sat, 5 Oct 2024 20:58:41 +0200 Subject: [PATCH 13/14] chore: re-run setup-db --- Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile b/Makefile index e4dbcdec..6f1e9fb3 100644 --- a/Makefile +++ b/Makefile @@ -8,7 +8,7 @@ start-deps: docker compose up -d integration-deps setup-db: - cd cala-ledger && cargo sqlx migrate run + cd cala-ledger && cargo sqlx migrate run || cargo sqlx migrate run cd cala-server && cargo sqlx migrate run --ignore-missing reset-deps: clean-deps start-deps setup-db From aa804cd4edd0caf9b6578d8eb3d44e46cb869ff1 Mon Sep 17 00:00:00 2001 From: bodymindarts Date: Sun, 6 Oct 2024 07:00:34 +0200 Subject: [PATCH 14/14] test: e2e velocity limit working --- .../src/velocity/balance.rs | 6 ++ cala-ledger/src/velocity/balance/repo.rs | 83 ++++++++++++++++++- cala-ledger/tests/velocity.rs | 6 +- 3 files changed, 91 insertions(+), 4 deletions(-) diff --git a/cala-ledger-core-types/src/velocity/balance.rs b/cala-ledger-core-types/src/velocity/balance.rs index 7e0f24d9..11cb71a9 100644 --- a/cala-ledger-core-types/src/velocity/balance.rs +++ b/cala-ledger-core-types/src/velocity/balance.rs @@ -6,6 +6,12 @@ use crate::{balance::BalanceSnapshot, primitives::*}; #[sqlx(transparent)] pub struct Window(serde_json::Value); +impl Window { + pub fn inner(&self) -> &serde_json::Value { + &self.0 + } +} + impl From> for Window { fn from(map: serde_json::Map) -> Self { Window(map.into()) diff --git a/cala-ledger/src/velocity/balance/repo.rs b/cala-ledger/src/velocity/balance/repo.rs index 550dae6a..f1db19be 100644 --- a/cala-ledger/src/velocity/balance/repo.rs +++ b/cala-ledger/src/velocity/balance/repo.rs @@ -109,9 +109,86 @@ impl VelocityBalanceRepo { pub(crate) async fn insert_new_snapshots( &self, - _db: &mut Transaction<'_, Postgres>, - _new_balances: HashMap<&VelocityBalanceKey, Vec>, + db: &mut Transaction<'_, Postgres>, + new_balances: HashMap<&VelocityBalanceKey, Vec>, ) -> Result<(), VelocityError> { - unimplemented!() + dbg!("insert_new_snapshots"); + dbg!(&new_balances); + let mut query_builder = QueryBuilder::new( + r#" + WITH new_snapshots AS ( + INSERT INTO cala_velocity_balance_history ( + journal_id, account_id, currency, velocity_control_id, velocity_limit_id, partition_window, latest_entry_id, version, values + ) + "#, + ); + + query_builder.push_values( + new_balances.into_iter().flat_map(|(key, snapshots)| { + snapshots.into_iter().map(move |snapshot| (key, snapshot)) + }), + |mut builder, (key, b)| { + let ( + window, + currency, + journal_id, + account_id, + velocity_control_id, + velocity_limit_id, + ) = key; + builder.push_bind(journal_id); + builder.push_bind(account_id); + builder.push_bind(currency.code()); + builder.push_bind(velocity_control_id); + builder.push_bind(velocity_limit_id); + builder.push_bind(window.inner()); + builder.push_bind(b.entry_id); + builder.push_bind(b.version as i32); + builder.push_bind( + serde_json::to_value(b).expect("Failed to serialize balance snapshot"), + ); + }, + ); + + query_builder.push( + r#" + RETURNING * + ), + ranked_balances AS ( + SELECT *, + ROW_NUMBER() OVER ( + PARTITION BY partition_window, currency, journal_id, account_id, velocity_control_id, velocity_limit_id ORDER BY version + ) AS rn, + MAX(version) OVER ( + PARTITION BY partition_window, currency, journal_id, account_id, velocity_control_id, velocity_limit_id + ) AS max + FROM new_snapshots + ), + initial_balances AS ( + INSERT INTO cala_velocity_current_balances ( + journal_id, account_id, currency, velocity_control_id, velocity_limit_id, + partition_window, latest_version + ) + SELECT + journal_id, account_id, currency, velocity_control_id, velocity_limit_id, + partition_window, version + FROM ranked_balances + WHERE version = rn AND rn = max + ) + UPDATE cala_velocity_current_balances c + SET latest_version = n.version + FROM ranked_balances n + WHERE c.journal_id = n.journal_id + AND c.account_id = n.account_id + AND c.currency = n.currency + AND c.velocity_control_id = n.velocity_control_id + AND c.velocity_limit_id = n.velocity_limit_id + AND c.partition_window = n.partition_window + AND c.data_source_id = '00000000-0000-0000-0000-000000000000' + AND version = max AND version != rn + "#, + ); + query_builder.build().execute(&mut **db).await?; + Ok(()) } } diff --git a/cala-ledger/tests/velocity.rs b/cala-ledger/tests/velocity.rs index 77d2155a..e72eee29 100644 --- a/cala-ledger/tests/velocity.rs +++ b/cala-ledger/tests/velocity.rs @@ -108,8 +108,12 @@ async fn create_control() -> anyhow::Result<()> { params.insert("journal_id", journal.id().to_string()); params.insert("sender", sender_account.id()); params.insert("recipient", recipient_account.id()); - params.insert("amount", limit + Decimal::ONE); + params.insert("amount", limit); + let _ = cala + .post_transaction(TransactionId::new(), &tx_code, params.clone()) + .await?; + params.insert("amount", Decimal::ONE); let res = cala .post_transaction(TransactionId::new(), &tx_code, params.clone()) .await;