diff --git a/Makefile b/Makefile index e4dbcdec..6f1e9fb3 100644 --- a/Makefile +++ b/Makefile @@ -8,7 +8,7 @@ start-deps: docker compose up -d integration-deps setup-db: - cd cala-ledger && cargo sqlx migrate run + cd cala-ledger && cargo sqlx migrate run || cargo sqlx migrate run cd cala-server && cargo sqlx migrate run --ignore-missing reset-deps: clean-deps start-deps setup-db diff --git a/cala-cel-interpreter/src/cel_type.rs b/cala-cel-interpreter/src/cel_type.rs index 097f3f2b..cadf7599 100644 --- a/cala-cel-interpreter/src/cel_type.rs +++ b/cala-cel-interpreter/src/cel_type.rs @@ -11,8 +11,9 @@ pub enum CelType { Bool, Null, - // Addons + // Abstract Date, + Timestamp, Uuid, Decimal, } diff --git a/cala-cel-interpreter/src/context/decimal.rs b/cala-cel-interpreter/src/context/decimal.rs index 818ce35e..d14e9b03 100644 --- a/cala-cel-interpreter/src/context/decimal.rs +++ b/cala-cel-interpreter/src/context/decimal.rs @@ -1,6 +1,6 @@ use lazy_static::lazy_static; -use std::collections::HashMap; +use std::{borrow::Cow, collections::HashMap}; use crate::builtins; @@ -10,11 +10,11 @@ lazy_static! { pub static ref CEL_CONTEXT: CelContext = { let mut idents = HashMap::new(); idents.insert( - SELF_PACKAGE_NAME.to_string(), + SELF_PACKAGE_NAME, ContextItem::Function(Box::new(builtins::decimal::cast)), ); idents.insert( - "Add".to_string(), + Cow::Borrowed("Add"), ContextItem::Function(Box::new(builtins::decimal::add)), ); CelContext { idents } diff --git a/cala-cel-interpreter/src/context/mod.rs b/cala-cel-interpreter/src/context/mod.rs index 9b9fcfd9..219b57c5 100644 --- a/cala-cel-interpreter/src/context/mod.rs +++ b/cala-cel-interpreter/src/context/mod.rs @@ -1,19 +1,19 @@ mod decimal; -use std::collections::HashMap; +use std::{borrow::Cow, collections::HashMap}; use crate::{builtins, error::*, value::*}; -const SELF_PACKAGE_NAME: &str = "self"; +const SELF_PACKAGE_NAME: Cow<'static, str> = Cow::Borrowed("self"); type CelFunction = Box) -> Result + Sync>; #[derive(Debug)] pub struct CelContext { - idents: HashMap, + idents: HashMap, ContextItem>, } impl CelContext { - pub fn add_variable(&mut self, name: impl Into, value: impl Into) { + pub fn add_variable(&mut self, name: impl Into>, value: impl Into) { self.idents .insert(name.into(), ContextItem::Value(value.into())); } @@ -21,22 +21,22 @@ impl CelContext { pub fn new() -> Self { let mut idents = HashMap::new(); idents.insert( - "date".to_string(), + Cow::Borrowed("date"), ContextItem::Function(Box::new(builtins::date)), ); idents.insert( - "uuid".to_string(), + Cow::Borrowed("uuid"), ContextItem::Function(Box::new(builtins::uuid)), ); idents.insert( - "decimal".to_string(), + Cow::Borrowed("decimal"), ContextItem::Package(&decimal::CEL_CONTEXT), ); Self { idents } } pub(crate) fn package_self(&self) -> Result<&ContextItem, CelError> { - self.lookup(SELF_PACKAGE_NAME) + self.lookup(&SELF_PACKAGE_NAME) } pub(crate) fn lookup(&self, name: &str) -> Result<&ContextItem, CelError> { diff --git a/cala-cel-interpreter/src/interpreter.rs b/cala-cel-interpreter/src/interpreter.rs index e2fc2453..5c027469 100755 --- a/cala-cel-interpreter/src/interpreter.rs +++ b/cala-cel-interpreter/src/interpreter.rs @@ -156,6 +156,7 @@ fn evaluate_member<'a>( use ast::Member::*; match member { Attribute(name) => match target { + EvalType::Value(CelValue::Map(map)) => Ok(EvalType::Value(map.get(name))), EvalType::ContextItem(ContextItem::Value(CelValue::Map(map))) => { Ok(EvalType::Value(map.get(name))) } @@ -379,10 +380,12 @@ mod tests { #[test] fn lookup() { - let expression = "params.hello".parse::().unwrap(); - let mut context = CelContext::new(); + let expression = "params.hello.world".parse::().unwrap(); + let mut hello = CelMap::new(); + hello.insert("world", 42); let mut params = CelMap::new(); - params.insert("hello", 42); + params.insert("hello", hello); + let mut context = CelContext::new(); context.add_variable("params", params); assert_eq!(expression.evaluate(&context).unwrap(), CelValue::Int(42)); } diff --git a/cala-cel-interpreter/src/value.rs b/cala-cel-interpreter/src/value.rs index 19d15417..4e4ad73d 100644 --- a/cala-cel-interpreter/src/value.rs +++ b/cala-cel-interpreter/src/value.rs @@ -1,5 +1,5 @@ use cel_parser::{ast::Literal, Expression}; -use chrono::NaiveDate; +use chrono::{DateTime, NaiveDate, Utc}; use rust_decimal::Decimal; use uuid::Uuid; @@ -25,9 +25,10 @@ pub enum CelValue { Bool(bool), Null, - // Addons + // Abstract Decimal(Decimal), Date(NaiveDate), + Timestamp(DateTime), Uuid(Uuid), } @@ -226,6 +227,7 @@ impl From<&CelValue> for CelType { CelValue::Decimal(_) => CelType::Decimal, CelValue::Date(_) => CelType::Date, CelValue::Uuid(_) => CelType::Uuid, + CelValue::Timestamp(_) => CelType::Timestamp, } } } @@ -269,6 +271,22 @@ impl<'a> TryFrom<&'a CelValue> for &'a Decimal { } } +impl<'a> TryFrom> for bool { + type Error = ResultCoercionError; + + fn try_from(CelResult { expr, val }: CelResult) -> Result { + if let CelValue::Bool(b) = val { + Ok(b) + } else { + Err(ResultCoercionError::BadCoreTypeCoercion( + format!("{expr:?}"), + CelType::from(&val), + CelType::Bool, + )) + } + } +} + impl<'a> TryFrom> for NaiveDate { type Error = ResultCoercionError; @@ -285,6 +303,22 @@ impl<'a> TryFrom> for NaiveDate { } } +impl<'a> TryFrom> for DateTime { + type Error = ResultCoercionError; + + fn try_from(CelResult { expr, val }: CelResult) -> Result { + if let CelValue::Timestamp(d) = val { + Ok(d) + } else { + Err(ResultCoercionError::BadCoreTypeCoercion( + format!("{expr:?}"), + CelType::from(&val), + CelType::Timestamp, + )) + } + } +} + impl<'a> TryFrom> for Uuid { type Error = ResultCoercionError; diff --git a/cala-ledger-core-types/src/entry.rs b/cala-ledger-core-types/src/entry.rs index cb68fa26..258c9b01 100644 --- a/cala-ledger-core-types/src/entry.rs +++ b/cala-ledger-core-types/src/entry.rs @@ -18,3 +18,21 @@ pub struct EntryValues { pub direction: DebitOrCredit, pub description: Option, } + +mod cel { + use cel_interpreter::{CelMap, CelValue}; + + impl From<&super::EntryValues> for CelValue { + fn from(entry: &super::EntryValues) -> Self { + let mut map = CelMap::new(); + map.insert("id", entry.id); + map.insert("entry_type", entry.entry_type.clone()); + map.insert("sequence", CelValue::UInt(entry.sequence as u64)); + map.insert("layer", entry.layer); + map.insert("direction", entry.direction); + map.insert("units", entry.units); + map.insert("currency", entry.currency); + map.into() + } + } +} diff --git a/cala-ledger-core-types/src/primitives.rs b/cala-ledger-core-types/src/primitives.rs index 2b7c4b3c..3f99a16d 100644 --- a/cala-ledger-core-types/src/primitives.rs +++ b/cala-ledger-core-types/src/primitives.rs @@ -57,6 +57,15 @@ impl<'a> TryFrom> for DebitOrCredit { } } +impl From for CelValue { + fn from(v: DebitOrCredit) -> Self { + match v { + DebitOrCredit::Debit => "DEBIT".into(), + DebitOrCredit::Credit => "CREDIT".into(), + } + } +} + #[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq, sqlx::Type)] #[sqlx(type_name = "Status", rename_all = "snake_case")] #[serde(rename_all = "snake_case")] @@ -108,6 +117,16 @@ impl Default for Layer { } } +impl From for CelValue { + fn from(l: Layer) -> Self { + match l { + Layer::Settled => "SETTLED".into(), + Layer::Pending => "PENDING".into(), + Layer::Encumbrance => "ENCUMBRANCE".into(), + } + } +} + #[derive(Debug, Clone, Copy, Eq, Serialize, Deserialize)] #[serde(try_from = "String")] #[serde(into = "&str")] @@ -131,6 +150,12 @@ impl std::fmt::Display for Currency { } } +impl From for CelValue { + fn from(c: Currency) -> Self { + c.code().into() + } +} + impl std::hash::Hash for Currency { fn hash(&self, state: &mut H) { self.code().hash(state); diff --git a/cala-ledger-core-types/src/transaction.rs b/cala-ledger-core-types/src/transaction.rs index 7e33e409..b69e72e6 100644 --- a/cala-ledger-core-types/src/transaction.rs +++ b/cala-ledger-core-types/src/transaction.rs @@ -15,3 +15,22 @@ pub struct TransactionValues { pub description: Option, pub metadata: Option, } + +mod cel { + use cel_interpreter::{CelMap, CelValue}; + + impl From<&super::TransactionValues> for CelValue { + fn from(tx: &super::TransactionValues) -> Self { + let mut map = CelMap::new(); + map.insert("id", tx.id); + map.insert("journal_id", tx.journal_id); + map.insert("tx_template_id", tx.tx_template_id); + map.insert("effective", tx.effective); + map.insert("correlation_id", tx.correlation_id.clone()); + if let Some(metadata) = &tx.metadata { + map.insert("metadata", metadata.clone()); + } + map.into() + } + } +} diff --git a/cala-ledger-core-types/src/velocity/balance.rs b/cala-ledger-core-types/src/velocity/balance.rs new file mode 100644 index 00000000..11cb71a9 --- /dev/null +++ b/cala-ledger-core-types/src/velocity/balance.rs @@ -0,0 +1,34 @@ +use rust_decimal::Decimal; + +use crate::{balance::BalanceSnapshot, primitives::*}; + +#[derive(Debug, Clone, sqlx::Type, PartialEq, Eq, Hash)] +#[sqlx(transparent)] +pub struct Window(serde_json::Value); + +impl Window { + pub fn inner(&self) -> &serde_json::Value { + &self.0 + } +} + +impl From> for Window { + fn from(map: serde_json::Map) -> Self { + Window(map.into()) + } +} + +impl From for Window { + fn from(map: serde_json::Value) -> Self { + Window(map) + } +} + +pub struct VelocityBalance { + pub control_id: VelocityControlId, + pub limit_id: VelocityLimitId, + pub spent: Decimal, + pub remaining: Decimal, + pub currency: Currency, + pub balance: BalanceSnapshot, +} diff --git a/cala-ledger-core-types/src/velocity/limit.rs b/cala-ledger-core-types/src/velocity/limit.rs index 3c447579..d275d216 100644 --- a/cala-ledger-core-types/src/velocity/limit.rs +++ b/cala-ledger-core-types/src/velocity/limit.rs @@ -33,4 +33,6 @@ pub struct BalanceLimit { pub layer: CelExpression, pub amount: CelExpression, pub enforcement_direction: CelExpression, + pub start: Option, + pub end: Option, } diff --git a/cala-ledger-core-types/src/velocity/mod.rs b/cala-ledger-core-types/src/velocity/mod.rs index e42453d3..0a07c5ab 100644 --- a/cala-ledger-core-types/src/velocity/mod.rs +++ b/cala-ledger-core-types/src/velocity/mod.rs @@ -1,5 +1,7 @@ +mod balance; mod control; mod limit; +pub use balance::*; pub use control::*; pub use limit::*; diff --git a/cala-server/.sqlx/query-67f2b33262254fd0d5d54f5bf92ead9ca880e166919d3e9fd52650bcef348488.json b/cala-ledger/.sqlx/query-1bdbfe685cc6583cb82612f37bc6013c2246cb79e265f48b3c51d06b55db28bd.json similarity index 64% rename from cala-server/.sqlx/query-67f2b33262254fd0d5d54f5bf92ead9ca880e166919d3e9fd52650bcef348488.json rename to cala-ledger/.sqlx/query-1bdbfe685cc6583cb82612f37bc6013c2246cb79e265f48b3c51d06b55db28bd.json index 91b6adb0..4ca5ed7e 100644 --- a/cala-server/.sqlx/query-67f2b33262254fd0d5d54f5bf92ead9ca880e166919d3e9fd52650bcef348488.json +++ b/cala-ledger/.sqlx/query-1bdbfe685cc6583cb82612f37bc6013c2246cb79e265f48b3c51d06b55db28bd.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "UPDATE cala_accounts\n SET code = $2, name = $3, external_id = $4, normal_balance_type = $5\n WHERE id = $1 AND data_source_id = '00000000-0000-0000-0000-000000000000'", + "query": "UPDATE cala_accounts\n SET code = $2, name = $3, external_id = $4, normal_balance_type = $5, latest_values = $6\n WHERE id = $1 AND data_source_id = '00000000-0000-0000-0000-000000000000'", "describe": { "columns": [], "parameters": { @@ -19,10 +19,11 @@ ] } } - } + }, + "Jsonb" ] }, "nullable": [] }, - "hash": "67f2b33262254fd0d5d54f5bf92ead9ca880e166919d3e9fd52650bcef348488" + "hash": "1bdbfe685cc6583cb82612f37bc6013c2246cb79e265f48b3c51d06b55db28bd" } diff --git a/cala-server/.sqlx/query-de85f101b071aab683abad0a452087b012330d423849a0a20c719816cf0b7343.json b/cala-ledger/.sqlx/query-2d1a0955c79d7be68272211f35fba584e6a170427c89b39940217bb743d1049b.json similarity index 68% rename from cala-server/.sqlx/query-de85f101b071aab683abad0a452087b012330d423849a0a20c719816cf0b7343.json rename to cala-ledger/.sqlx/query-2d1a0955c79d7be68272211f35fba584e6a170427c89b39940217bb743d1049b.json index d7e7b7d6..4c58ab38 100644 --- a/cala-server/.sqlx/query-de85f101b071aab683abad0a452087b012330d423849a0a20c719816cf0b7343.json +++ b/cala-ledger/.sqlx/query-2d1a0955c79d7be68272211f35fba584e6a170427c89b39940217bb743d1049b.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "INSERT INTO cala_accounts (id, code, name, external_id, normal_balance_type, eventually_consistent)\n VALUES ($1, $2, $3, $4, $5, $6)", + "query": "INSERT INTO cala_accounts (id, code, name, external_id, normal_balance_type, eventually_consistent, latest_values)\n VALUES ($1, $2, $3, $4, $5, $6, $7)", "describe": { "columns": [], "parameters": { @@ -20,10 +20,11 @@ } } }, - "Bool" + "Bool", + "Jsonb" ] }, "nullable": [] }, - "hash": "de85f101b071aab683abad0a452087b012330d423849a0a20c719816cf0b7343" + "hash": "2d1a0955c79d7be68272211f35fba584e6a170427c89b39940217bb743d1049b" } diff --git a/cala-ledger/.sqlx/query-35ac870c6d687c4e91412a1465238bf150fb646b646a3f12159817caab4fc7aa.json b/cala-ledger/.sqlx/query-49602a3b0dbe7780cdd64d5a04883d934490afa99104ab790689a9181e28a0e2.json similarity index 74% rename from cala-ledger/.sqlx/query-35ac870c6d687c4e91412a1465238bf150fb646b646a3f12159817caab4fc7aa.json rename to cala-ledger/.sqlx/query-49602a3b0dbe7780cdd64d5a04883d934490afa99104ab790689a9181e28a0e2.json index e21da570..dbdefce4 100644 --- a/cala-ledger/.sqlx/query-35ac870c6d687c4e91412a1465238bf150fb646b646a3f12159817caab4fc7aa.json +++ b/cala-ledger/.sqlx/query-49602a3b0dbe7780cdd64d5a04883d934490afa99104ab790689a9181e28a0e2.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "INSERT INTO cala_accounts (data_source_id, id, code, name, external_id, normal_balance_type, eventually_consistent, created_at)\n VALUES ($1, $2, $3, $4, $5, $6, $7, $8)", + "query": "INSERT INTO cala_accounts (data_source_id, id, code, name, external_id, normal_balance_type, eventually_consistent, created_at, latest_values)\n VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)", "describe": { "columns": [], "parameters": { @@ -22,10 +22,11 @@ } }, "Bool", - "Timestamptz" + "Timestamptz", + "Jsonb" ] }, "nullable": [] }, - "hash": "35ac870c6d687c4e91412a1465238bf150fb646b646a3f12159817caab4fc7aa" + "hash": "49602a3b0dbe7780cdd64d5a04883d934490afa99104ab790689a9181e28a0e2" } diff --git a/cala-ledger/.sqlx/query-8b87fd1b4f25b8265528a02375e937c3c446b11076dd83e2bd202cee8c95e2c2.json b/cala-ledger/.sqlx/query-8b87fd1b4f25b8265528a02375e937c3c446b11076dd83e2bd202cee8c95e2c2.json new file mode 100644 index 00000000..281dff95 --- /dev/null +++ b/cala-ledger/.sqlx/query-8b87fd1b4f25b8265528a02375e937c3c446b11076dd83e2bd202cee8c95e2c2.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT values FROM cala_velocity_account_controls\n WHERE data_source_id = '00000000-0000-0000-0000-000000000000' AND account_id = ANY($1)", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "values", + "type_info": "Jsonb" + } + ], + "parameters": { + "Left": [ + "UuidArray" + ] + }, + "nullable": [ + false + ] + }, + "hash": "8b87fd1b4f25b8265528a02375e937c3c446b11076dd83e2bd202cee8c95e2c2" +} diff --git a/cala-ledger/.sqlx/query-d4640c733860f62b22b5d9e5bca022b01dd30aedc0092708459e7f49d563e053.json b/cala-ledger/.sqlx/query-d4640c733860f62b22b5d9e5bca022b01dd30aedc0092708459e7f49d563e053.json new file mode 100644 index 00000000..ac919a0b --- /dev/null +++ b/cala-ledger/.sqlx/query-d4640c733860f62b22b5d9e5bca022b01dd30aedc0092708459e7f49d563e053.json @@ -0,0 +1,46 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT c.id, e.sequence, e.event,\n c.created_at AS entity_created_at, e.recorded_at AS event_recorded_at\n FROM cala_velocity_controls c\n JOIN cala_velocity_control_events e\n ON c.data_source_id = e.data_source_id\n AND c.id = e.id\n WHERE c.data_source_id = '00000000-0000-0000-0000-000000000000'\n AND c.id = $1\n ORDER BY e.sequence", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id", + "type_info": "Uuid" + }, + { + "ordinal": 1, + "name": "sequence", + "type_info": "Int4" + }, + { + "ordinal": 2, + "name": "event", + "type_info": "Jsonb" + }, + { + "ordinal": 3, + "name": "entity_created_at", + "type_info": "Timestamptz" + }, + { + "ordinal": 4, + "name": "event_recorded_at", + "type_info": "Timestamptz" + } + ], + "parameters": { + "Left": [ + "Uuid" + ] + }, + "nullable": [ + false, + false, + false, + false, + false + ] + }, + "hash": "d4640c733860f62b22b5d9e5bca022b01dd30aedc0092708459e7f49d563e053" +} diff --git a/cala-ledger/migrations/20231208110808_cala_ledger_setup.sql b/cala-ledger/migrations/20231208110808_cala_ledger_setup.sql index 172f75de..9d72634c 100644 --- a/cala-ledger/migrations/20231208110808_cala_ledger_setup.sql +++ b/cala-ledger/migrations/20231208110808_cala_ledger_setup.sql @@ -10,6 +10,7 @@ CREATE TABLE cala_accounts ( external_id VARCHAR, normal_balance_type DebitOrCredit NOT NULL, -- For quick lookup when querying balances eventually_consistent BOOLEAN NOT NULL, -- For balance locking + latest_values JSONB NOT NULL, -- Cached for quicker velocity enforcement created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), UNIQUE(data_source_id, id), UNIQUE(data_source_id, code) @@ -255,6 +256,40 @@ CREATE TABLE cala_velocity_account_controls ( FOREIGN KEY (data_source_id, velocity_control_id) REFERENCES cala_velocity_controls(data_source_id, id) ); +CREATE TABLE cala_velocity_current_balances ( + data_source_id UUID NOT NULL DEFAULT '00000000-0000-0000-0000-000000000000', + journal_id UUID NOT NULL, + account_id UUID NOT NULL, + currency VARCHAR NOT NULL, + velocity_control_id UUID NOT NULL, + velocity_limit_id UUID NOT NULL, + partition_window JSONB NOT NULL, + latest_version INT NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + UNIQUE(data_source_id, partition_window, currency, journal_id, account_id, velocity_limit_id, velocity_control_id), + FOREIGN KEY (data_source_id, journal_id) REFERENCES cala_journals(data_source_id, id), + FOREIGN KEY (data_source_id, account_id) REFERENCES cala_accounts(data_source_id, id), + FOREIGN KEY (data_source_id, velocity_control_id) REFERENCES cala_velocity_controls(data_source_id, id), + FOREIGN KEY (data_source_id, velocity_limit_id) REFERENCES cala_velocity_limits(data_source_id, id) +); + +CREATE TABLE cala_velocity_balance_history ( + data_source_id UUID NOT NULL DEFAULT '00000000-0000-0000-0000-000000000000', + journal_id UUID NOT NULL, + account_id UUID NOT NULL, + currency VARCHAR NOT NULL, + velocity_control_id UUID NOT NULL, + velocity_limit_id UUID NOT NULL, + partition_window JSONB NOT NULL, + latest_entry_id UUID NOT NULL, + version INT NOT NULL, + values JSONB NOT NULL, + recorded_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + UNIQUE(data_source_id, partition_window, currency, journal_id, account_id, velocity_limit_id, velocity_control_id, version), + FOREIGN KEY (data_source_id, partition_window, currency, journal_id, account_id, velocity_limit_id, velocity_control_id) REFERENCES cala_velocity_current_balances(data_source_id, partition_window, currency, journal_id, account_id, velocity_limit_id, velocity_control_id), + FOREIGN KEY (data_source_id, latest_entry_id) REFERENCES cala_entries(data_source_id, id) +); + CREATE TABLE cala_outbox_events ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), sequence BIGSERIAL UNIQUE, diff --git a/cala-ledger/src/account/entity.rs b/cala-ledger/src/account/entity.rs index 524215a1..f317d93c 100644 --- a/cala-ledger/src/account/entity.rs +++ b/cala-ledger/src/account/entity.rs @@ -246,7 +246,7 @@ impl From<(AccountValues, Vec)> for AccountUpdate { } /// Representation of a ***new*** ledger account entity with required/optional properties and a builder. -#[derive(Builder, Debug)] +#[derive(Builder, Debug, Clone)] pub struct NewAccount { #[builder(setter(into))] pub id: AccountId, @@ -275,27 +275,27 @@ impl NewAccount { NewAccountBuilder::default() } + pub(super) fn into_values(self) -> AccountValues { + AccountValues { + id: self.id, + version: 1, + code: self.code, + name: self.name, + external_id: self.external_id, + normal_balance_type: self.normal_balance_type, + status: self.status, + description: self.description, + metadata: self.metadata, + config: AccountConfig { + is_account_set: self.is_account_set, + eventually_consistent: false, + }, + } + } + pub(super) fn initial_events(self) -> EntityEvents { - EntityEvents::init( - self.id, - [AccountEvent::Initialized { - values: AccountValues { - id: self.id, - version: 1, - code: self.code, - name: self.name, - external_id: self.external_id, - normal_balance_type: self.normal_balance_type, - status: self.status, - description: self.description, - metadata: self.metadata, - config: AccountConfig { - is_account_set: self.is_account_set, - eventually_consistent: false, - }, - }, - }], - ) + let values = self.into_values(); + EntityEvents::init(values.id, [AccountEvent::Initialized { values }]) } } diff --git a/cala-ledger/src/account/repo.rs b/cala-ledger/src/account/repo.rs index 5b01e69b..378d1699 100644 --- a/cala-ledger/src/account/repo.rs +++ b/cala-ledger/src/account/repo.rs @@ -26,14 +26,15 @@ impl AccountRepo { ) -> Result { let id = new_account.id; sqlx::query!( - r#"INSERT INTO cala_accounts (id, code, name, external_id, normal_balance_type, eventually_consistent) - VALUES ($1, $2, $3, $4, $5, $6)"#, + r#"INSERT INTO cala_accounts (id, code, name, external_id, normal_balance_type, eventually_consistent, latest_values) + VALUES ($1, $2, $3, $4, $5, $6, $7)"#, id as AccountId, new_account.code, new_account.name, new_account.external_id, new_account.normal_balance_type as DebitOrCredit, - new_account.eventually_consistent + new_account.eventually_consistent, + serde_json::to_value(&new_account.clone().into_values()).expect("Failed to serialize account values"), ) .execute(&mut **db) .await?; @@ -50,13 +51,14 @@ impl AccountRepo { ) -> Result<(), AccountError> { sqlx::query!( r#"UPDATE cala_accounts - SET code = $2, name = $3, external_id = $4, normal_balance_type = $5 + SET code = $2, name = $3, external_id = $4, normal_balance_type = $5, latest_values = $6 WHERE id = $1 AND data_source_id = '00000000-0000-0000-0000-000000000000'"#, account.values().id as AccountId, account.values().code, account.values().name, account.values().external_id, account.values().normal_balance_type as DebitOrCredit, + serde_json::to_value(account.values()).expect("Failed to serialize account values"), ) .execute(&mut **db) .await?; @@ -230,8 +232,8 @@ impl AccountRepo { account: &mut Account, ) -> Result<(), AccountError> { sqlx::query!( - r#"INSERT INTO cala_accounts (data_source_id, id, code, name, external_id, normal_balance_type, eventually_consistent, created_at) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8)"#, + r#"INSERT INTO cala_accounts (data_source_id, id, code, name, external_id, normal_balance_type, eventually_consistent, created_at, latest_values) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)"#, origin as DataSourceId, account.values().id as AccountId, account.values().code, @@ -239,7 +241,8 @@ impl AccountRepo { account.values().external_id, account.values().normal_balance_type as DebitOrCredit, account.values().config.eventually_consistent, - recorded_at + recorded_at, + serde_json::to_value(account.values()).expect("Failed to serialize account values"), ) .execute(&mut **db) .await?; diff --git a/cala-ledger/src/balance/account_balance.rs b/cala-ledger/src/balance/account_balance.rs index 7848fe9d..35919d5c 100644 --- a/cala-ledger/src/balance/account_balance.rs +++ b/cala-ledger/src/balance/account_balance.rs @@ -6,11 +6,18 @@ use cala_types::balance::*; /// Representation of account's balance tracked in 3 distinct layers. #[derive(Debug, Clone)] pub struct AccountBalance { - pub(super) balance_type: DebitOrCredit, + balance_type: DebitOrCredit, pub details: BalanceSnapshot, } impl AccountBalance { + pub(crate) fn new(balance_type: DebitOrCredit, details: BalanceSnapshot) -> Self { + Self { + balance_type, + details, + } + } + pub(super) fn derive_diff(mut self, since: &Self) -> Self { self.details.settled = BalanceAmount { dr_balance: self.details.settled.dr_balance - since.details.settled.dr_balance, @@ -31,7 +38,50 @@ impl AccountBalance { } pub fn pending(&self) -> Decimal { - if self.balance_type == DebitOrCredit::Credit { + BalanceWithDirection { + direction: self.balance_type, + details: &self.details, + } + .pending() + } + + pub fn settled(&self) -> Decimal { + BalanceWithDirection { + direction: self.balance_type, + details: &self.details, + } + .settled() + } + + pub fn encumbrance(&self) -> Decimal { + BalanceWithDirection { + direction: self.balance_type, + details: &self.details, + } + .encumbrance() + } + + pub fn available(&self, layer: Layer) -> Decimal { + BalanceWithDirection { + direction: self.balance_type, + details: &self.details, + } + .available(layer) + } +} + +pub(crate) struct BalanceWithDirection<'a> { + direction: DebitOrCredit, + details: &'a BalanceSnapshot, +} + +impl<'a> BalanceWithDirection<'a> { + pub fn new(direction: DebitOrCredit, details: &'a BalanceSnapshot) -> Self { + Self { direction, details } + } + + pub fn pending(&self) -> Decimal { + if self.direction == DebitOrCredit::Credit { self.details.pending.cr_balance - self.details.pending.dr_balance } else { self.details.pending.dr_balance - self.details.pending.cr_balance @@ -39,7 +89,7 @@ impl AccountBalance { } pub fn settled(&self) -> Decimal { - if self.balance_type == DebitOrCredit::Credit { + if self.direction == DebitOrCredit::Credit { self.details.settled.cr_balance - self.details.settled.dr_balance } else { self.details.settled.dr_balance - self.details.settled.cr_balance @@ -47,7 +97,7 @@ impl AccountBalance { } pub fn encumbrance(&self) -> Decimal { - if self.balance_type == DebitOrCredit::Credit { + if self.direction == DebitOrCredit::Credit { self.details.encumbrance.cr_balance - self.details.encumbrance.dr_balance } else { self.details.encumbrance.dr_balance - self.details.encumbrance.cr_balance diff --git a/cala-ledger/src/balance/mod.rs b/cala-ledger/src/balance/mod.rs index 75675756..552c4098 100644 --- a/cala-ledger/src/balance/mod.rs +++ b/cala-ledger/src/balance/mod.rs @@ -159,8 +159,8 @@ impl Balances { ) -> Vec { let mut latest_balances: HashMap<(AccountId, &Currency), BalanceSnapshot> = HashMap::new(); let mut new_balances = Vec::new(); + let empty = Vec::new(); for entry in entries.iter() { - let empty = Vec::new(); for account_id in mappings .get(&entry.account_id) .unwrap_or(&empty) @@ -198,7 +198,7 @@ impl Balances { new_balances } - fn new_snapshot( + pub(crate) fn new_snapshot( time: DateTime, account_id: AccountId, entry: &EntryValues, @@ -237,7 +237,7 @@ impl Balances { ) } - fn update_snapshot( + pub(crate) fn update_snapshot( time: DateTime, mut snapshot: BalanceSnapshot, entry: &EntryValues, diff --git a/cala-ledger/src/balance/repo.rs b/cala-ledger/src/balance/repo.rs index 557b627a..08c18da0 100644 --- a/cala-ledger/src/balance/repo.rs +++ b/cala-ledger/src/balance/repo.rs @@ -78,10 +78,7 @@ impl BalanceRepo { if let Some(row) = row { let details: BalanceSnapshot = serde_json::from_value(row.values).expect("Failed to deserialize balance snapshot"); - Ok(AccountBalance { - balance_type: row.normal_balance_type, - details, - }) + Ok(AccountBalance::new(row.normal_balance_type, details)) } else { Err(BalanceError::NotFound(journal_id, account_id, currency)) } @@ -149,18 +146,12 @@ impl BalanceRepo { let details: BalanceSnapshot = serde_json::from_value(row.values.expect("values is not null")) .expect("Failed to deserialize balance snapshot"); - first = Some(AccountBalance { - balance_type: row.normal_balance_type, - details, - }); + first = Some(AccountBalance::new(row.normal_balance_type, details)); } else { let details: BalanceSnapshot = serde_json::from_value(row.values.expect("values is not null")) .expect("Failed to deserialize balance snapshot"); - last = Some(AccountBalance { - balance_type: row.normal_balance_type, - details, - }); + last = Some(AccountBalance::new(row.normal_balance_type, details)); } } Ok((first, last)) @@ -201,10 +192,7 @@ impl BalanceRepo { let normal_balance_type: DebitOrCredit = row.get("normal_balance_type"); ret.insert( (details.journal_id, details.account_id, details.currency), - AccountBalance { - details, - balance_type: normal_balance_type, - }, + AccountBalance::new(normal_balance_type, details), ); } Ok(ret) diff --git a/cala-ledger/src/cel_context.rs b/cala-ledger/src/cel_context.rs index 2ae41100..84b9aebb 100644 --- a/cala-ledger/src/cel_context.rs +++ b/cala-ledger/src/cel_context.rs @@ -1,4 +1,4 @@ -use cel_interpreter::CelContext; +pub use cel_interpreter::CelContext; pub(crate) fn initialize() -> CelContext { let mut ctx = CelContext::new(); diff --git a/cala-ledger/src/ledger/error.rs b/cala-ledger/src/ledger/error.rs index c90d862f..d0da9caa 100644 --- a/cala-ledger/src/ledger/error.rs +++ b/cala-ledger/src/ledger/error.rs @@ -5,7 +5,7 @@ use crate::{ account::error::AccountError, account_set::error::AccountSetError, balance::error::BalanceError, entry::error::EntryError, journal::error::JournalError, outbox::server::error::OutboxServerError, transaction::error::TransactionError, - tx_template::error::TxTemplateError, + tx_template::error::TxTemplateError, velocity::error::VelocityError, }; #[derive(Error, Debug)] @@ -34,6 +34,8 @@ pub enum LedgerError { EntryError(#[from] EntryError), #[error("LedgerError - BalanceError: {0}")] BalanceError(#[from] BalanceError), + #[error("LedgerError - VelocityError: {0}")] + VelocityError(#[from] VelocityError), } impl From for LedgerError { diff --git a/cala-ledger/src/ledger/mod.rs b/cala-ledger/src/ledger/mod.rs index 2caa6e46..c05f534f 100644 --- a/cala-ledger/src/ledger/mod.rs +++ b/cala-ledger/src/ledger/mod.rs @@ -186,6 +186,16 @@ impl CalaLedger { .fetch_mappings(transaction.values().journal_id, &account_ids) .await?; + self.velocities + .update_balances_in_op( + op, + transaction.created_at(), + transaction.values(), + &entries, + &account_ids, + ) + .await?; + self.balances .update_balances_in_op( op, diff --git a/cala-ledger/src/velocity/account_control/mod.rs b/cala-ledger/src/velocity/account_control/mod.rs index 98988496..93f628b0 100644 --- a/cala-ledger/src/velocity/account_control/mod.rs +++ b/cala-ledger/src/velocity/account_control/mod.rs @@ -1,20 +1,23 @@ mod repo; mod value; +use chrono::{DateTime, Utc}; use rust_decimal::Decimal; use sqlx::PgPool; +use std::collections::HashMap; + use crate::{ atomic_operation::*, param::Params, - primitives::{AccountId, DebitOrCredit, Layer, VelocityControlId}, + primitives::{AccountId, DebitOrCredit, Layer}, }; -use cala_types::velocity::VelocityLimitValues; +use cala_types::velocity::{VelocityControlValues, VelocityLimitValues}; use super::error::VelocityError; use repo::*; -use value::*; +pub(super) use value::*; #[derive(Clone)] pub struct AccountControls { @@ -33,7 +36,8 @@ impl AccountControls { pub async fn attach_control_in_op( &self, op: &mut AtomicOperation<'_>, - control: VelocityControlId, + created_at: DateTime, + control: VelocityControlValues, account_id: AccountId, limits: Vec, params: impl Into + std::fmt::Debug, @@ -50,14 +54,26 @@ impl AccountControls { let amount: Decimal = limit.amount.try_evaluate(&ctx)?; let enforcement_direction: DebitOrCredit = limit.enforcement_direction.try_evaluate(&ctx)?; + let start = if let Some(start) = limit.start { + start.try_evaluate(&ctx)? + } else { + created_at + }; + let end = if let Some(end) = limit.end { + Some(end.try_evaluate(&ctx)?) + } else { + None + }; limits.push(AccountBalanceLimit { layer, amount, enforcement_direction, + start, + end, }) } velocity_limits.push(AccountVelocityLimit { - velocity_limit_id: velocity.id, + limit_id: velocity.id, window: velocity.window, condition: velocity.condition, currency: velocity.currency, @@ -70,7 +86,9 @@ impl AccountControls { let control = AccountVelocityControl { account_id, - control_id: control, + control_id: control.id, + condition: control.condition, + enforcement: control.enforcement, velocity_limits, }; @@ -78,4 +96,12 @@ impl AccountControls { Ok(()) } + + pub async fn find_for_enforcement( + &self, + op: &mut AtomicOperation<'_>, + account_ids: &[AccountId], + ) -> Result>, VelocityError> { + self.repo.find_for_enforcement(op.tx(), account_ids).await + } } diff --git a/cala-ledger/src/velocity/account_control/repo.rs b/cala-ledger/src/velocity/account_control/repo.rs index 87dfb3e6..81f6e1a6 100644 --- a/cala-ledger/src/velocity/account_control/repo.rs +++ b/cala-ledger/src/velocity/account_control/repo.rs @@ -1,5 +1,7 @@ use sqlx::{PgPool, Postgres, Transaction}; +use std::collections::HashMap; + use crate::primitives::{AccountId, VelocityControlId}; use super::{super::error::*, value::*}; @@ -32,4 +34,28 @@ impl AccountControlRepo { .await?; Ok(()) } + + pub async fn find_for_enforcement( + &self, + db: &mut Transaction<'_, Postgres>, + account_ids: &[AccountId], + ) -> Result>, VelocityError> { + let rows = sqlx::query!( + r#"SELECT values FROM cala_velocity_account_controls + WHERE data_source_id = '00000000-0000-0000-0000-000000000000' AND account_id = ANY($1)"#, + account_ids as &[AccountId], + ) + .fetch_all(&mut **db) + .await?; + + let mut res: HashMap> = HashMap::new(); + + for row in rows { + let values: AccountVelocityControl = + serde_json::from_value(row.values).expect("Failed to deserialize control values"); + res.entry(values.account_id).or_default().push(values); + } + + Ok(res) + } } diff --git a/cala-ledger/src/velocity/account_control/value.rs b/cala-ledger/src/velocity/account_control/value.rs index 1938658a..bfe36d44 100644 --- a/cala-ledger/src/velocity/account_control/value.rs +++ b/cala-ledger/src/velocity/account_control/value.rs @@ -1,28 +1,83 @@ -use cel_interpreter::CelExpression; +use cel_interpreter::{CelContext, CelExpression}; +use chrono::{DateTime, Utc}; use rust_decimal::Decimal; use serde::{Deserialize, Serialize}; -use crate::primitives::{ - AccountId, Currency, DebitOrCredit, Layer, VelocityControlId, VelocityLimitId, +use cala_types::{ + balance::BalanceSnapshot, + velocity::{PartitionKey, VelocityEnforcement}, +}; + +use crate::{ + primitives::{AccountId, Currency, DebitOrCredit, Layer, VelocityControlId, VelocityLimitId}, + velocity::error::*, }; -use cala_types::velocity::PartitionKey; #[derive(Clone, Debug, Serialize, Deserialize)] pub struct AccountVelocityControl { pub account_id: AccountId, pub control_id: VelocityControlId, + pub enforcement: VelocityEnforcement, + pub condition: Option, pub velocity_limits: Vec, } #[derive(Clone, Debug, Serialize, Deserialize)] pub struct AccountVelocityLimit { - pub velocity_limit_id: VelocityLimitId, + pub limit_id: VelocityLimitId, pub window: Vec, pub condition: Option, pub currency: Option, pub limit: AccountLimit, } +impl AccountVelocityLimit { + pub fn enforce( + &self, + ctx: &CelContext, + time: DateTime, + snapshot: &BalanceSnapshot, + ) -> Result<(), VelocityError> { + if let Some(currency) = &self.currency { + if currency != &snapshot.currency { + return Ok(()); + } + } + let time = if let Some(source) = &self.limit.timestamp_source { + source.try_evaluate(ctx)? + } else { + time + }; + for limit in self.limit.balance.iter() { + if limit.start > time { + continue; + } + if let Some(end) = limit.end { + if end <= time { + continue; + } + } + let balance = + crate::balance::BalanceWithDirection::new(limit.enforcement_direction, snapshot); + let requested = balance.available(limit.layer); + + if requested > limit.amount { + return Err(LimitExceededError { + account_id: snapshot.account_id, + currency: snapshot.currency.code().to_string(), + limit_id: self.limit_id, + layer: limit.layer, + limit: limit.amount, + requested, + } + .into()); + } + } + + Ok(()) + } +} + #[derive(Clone, Debug, Serialize, Deserialize)] pub struct AccountLimit { pub timestamp_source: Option, @@ -34,4 +89,6 @@ pub struct AccountBalanceLimit { pub layer: Layer, pub amount: Decimal, pub enforcement_direction: DebitOrCredit, + pub start: DateTime, + pub end: Option>, } diff --git a/cala-ledger/src/velocity/balance/mod.rs b/cala-ledger/src/velocity/balance/mod.rs new file mode 100644 index 00000000..41c1b024 --- /dev/null +++ b/cala-ledger/src/velocity/balance/mod.rs @@ -0,0 +1,192 @@ +mod repo; + +use chrono::{DateTime, Utc}; +use sqlx::PgPool; + +use std::collections::HashMap; + +use cala_types::{ + balance::BalanceSnapshot, + entry::EntryValues, + transaction::TransactionValues, + velocity::{PartitionKey, Window}, +}; + +use crate::{atomic_operation::*, primitives::AccountId}; + +use super::{account_control::*, error::*}; + +use repo::*; + +#[derive(Clone)] +pub(super) struct VelocityBalances { + repo: VelocityBalanceRepo, +} + +impl VelocityBalances { + pub fn new(pool: &PgPool) -> Self { + Self { + repo: VelocityBalanceRepo::new(pool), + } + } + + pub(crate) async fn update_balances_in_op( + &self, + op: &mut AtomicOperation<'_>, + created_at: DateTime, + transaction: &TransactionValues, + entries: &[EntryValues], + controls: HashMap>, + ) -> Result<(), VelocityError> { + let empty = Vec::new(); + + let mut context = super::context::EvalContext::new(transaction); + + let mut entries_to_add: HashMap< + VelocityBalanceKey, + Vec<(&AccountVelocityLimit, &EntryValues)>, + > = HashMap::new(); + for entry in entries { + for control in controls.get(&entry.account_id).unwrap_or(&empty) { + let ctx = context.control_context(entry); + let control_active = if let Some(condition) = &control.condition { + let control_active: bool = condition.try_evaluate(&ctx)?; + control_active + } else { + true + }; + if control_active { + for limit in &control.velocity_limits { + if let Some(currency) = &limit.currency { + if currency != &entry.currency { + continue; + } + } + + let limit_active = if let Some(condition) = &limit.condition { + let limit_active: bool = condition.try_evaluate(&ctx)?; + limit_active + } else { + true + }; + if limit_active { + let window = determine_window(&limit.window, &ctx)?; + entries_to_add + .entry(( + window, + entry.currency, + entry.journal_id, + entry.account_id, + control.control_id, + limit.limit_id, + )) + .or_default() + .push((limit, entry)); + } + } + } + } + } + + if entries_to_add.is_empty() { + return Ok(()); + } + + let current_balances = self + .repo + .find_for_update(op.tx(), entries_to_add.keys()) + .await?; + + let new_balances = + Self::new_snapshots(context, created_at, current_balances, &entries_to_add)?; + + self.repo + .insert_new_snapshots(op.tx(), new_balances) + .await?; + + Ok(()) + } + + fn new_snapshots<'a>( + mut context: super::context::EvalContext, + time: DateTime, + mut current_balances: HashMap>, + entries_to_add: &'a HashMap>, + ) -> Result>, VelocityError> { + let mut res = HashMap::new(); + + for (key, entries) in entries_to_add.iter() { + let mut latest_balance: Option = None; + let mut new_balances = Vec::new(); + + for (limit, entry) in entries { + let ctx = context.control_context(entry); + let balance = match (latest_balance.take(), current_balances.remove(key)) { + (Some(latest), _) => { + new_balances.push(latest.clone()); + latest + } + (_, Some(Some(balance))) => balance, + (_, Some(None)) => { + let new_snapshot = + crate::balance::Balances::new_snapshot(time, entry.account_id, entry); + limit.enforce(&ctx, time, &new_snapshot)?; + latest_balance = Some(new_snapshot); + continue; + } + _ => unreachable!(), + }; + let new_snapshot = crate::balance::Balances::update_snapshot(time, balance, entry); + limit.enforce(&ctx, time, &new_snapshot)?; + new_balances.push(new_snapshot); + } + if let Some(latest) = latest_balance.take() { + new_balances.push(latest) + } + res.insert(key, new_balances); + } + Ok(res) + } +} + +fn determine_window( + keys: &[PartitionKey], + ctx: &cel_interpreter::CelContext, +) -> Result { + let mut map = serde_json::Map::new(); + for key in keys { + let value: serde_json::Value = key.value.try_evaluate(ctx)?; + map.insert(key.alias.clone(), value); + } + Ok(map.into()) +} + +#[cfg(test)] +mod test { + #[test] + fn window_determination() { + use super::*; + use cala_types::velocity::PartitionKey; + use cel_interpreter::CelContext; + use serde_json::json; + + let keys = vec![ + PartitionKey { + alias: "foo".to_string(), + value: "'bar'".parse().expect("Failed to parse"), + }, + PartitionKey { + alias: "baz".to_string(), + value: "'qux'".parse().expect("Failed to parse"), + }, + ]; + + let ctx = CelContext::new(); + let result = determine_window(&keys, &ctx).unwrap(); + let expected = json!({ + "foo": "bar", + "baz": "qux", + }); + assert_eq!(Window::from(expected), result); + } +} diff --git a/cala-ledger/src/velocity/balance/repo.rs b/cala-ledger/src/velocity/balance/repo.rs new file mode 100644 index 00000000..f1db19be --- /dev/null +++ b/cala-ledger/src/velocity/balance/repo.rs @@ -0,0 +1,194 @@ +use sqlx::{PgPool, Postgres, QueryBuilder, Row, Transaction}; + +use std::collections::HashMap; + +use cala_types::{balance::BalanceSnapshot, velocity::Window}; + +use crate::{primitives::*, velocity::error::VelocityError}; + +pub(super) type VelocityBalanceKey = ( + Window, + Currency, + JournalId, + AccountId, + VelocityControlId, + VelocityLimitId, +); + +#[derive(Clone)] +pub(super) struct VelocityBalanceRepo { + _pool: PgPool, +} + +impl VelocityBalanceRepo { + pub fn new(pool: &PgPool) -> Self { + Self { + _pool: pool.clone(), + } + } + + pub async fn find_for_update( + &self, + db: &mut Transaction<'_, Postgres>, + keys: impl Iterator, + ) -> Result>, VelocityError> { + let mut query_builder = QueryBuilder::new( + r#" + WITH inputs AS ( + SELECT * + FROM ( + "#, + ); + query_builder.push_values( + keys, + |mut builder, (window, currency, journal_id, account_id, control_id, limit_id)| { + builder.push_bind(window); + builder.push_bind(currency.code()); + builder.push_bind(journal_id); + builder.push_bind(account_id); + builder.push_bind(control_id); + builder.push_bind(limit_id); + }, + ); + query_builder.push( + r#" + ) AS v(partition_window, currency, journal_id, account_id, velocity_control_id, velocity_limit_id) + ), + locked_balances AS ( + SELECT data_source_id, b.partition_window, b.currency, b.journal_id, b.account_id, b.velocity_control_id, b.velocity_limit_id, b.latest_version + FROM cala_velocity_current_balances b + WHERE data_source_id = '00000000-0000-0000-0000-000000000000' AND ((partition_window, currency, journal_id, account_id, velocity_control_id, velocity_limit_id) IN (SELECT * FROM inputs)) + FOR UPDATE + ) + SELECT i.partition_window, i.currency, i.journal_id, i.account_id, i.velocity_control_id, i.velocity_limit_id, h.values + FROM inputs i + LEFT JOIN locked_balances b + ON i.partition_window = b.partition_window + AND i.currency = b.currency + AND i.journal_id = b.journal_id + AND i.account_id = b.account_id + AND i.velocity_control_id = b.velocity_control_id + AND i.velocity_limit_id = b.velocity_limit_id + LEFT JOIN cala_velocity_balance_history h + ON b.data_source_id = h.data_source_id + AND b.partition_window = h.partition_window + AND b.currency = h.currency + AND b.journal_id = h.journal_id + AND b.account_id = h.account_id + AND b.velocity_control_id = h.velocity_control_id + AND b.velocity_limit_id = h.velocity_limit_id + AND b.latest_version = h.version + "# + ); + let query = query_builder.build(); + let rows = query.fetch_all(&mut **db).await?; + + let mut ret = HashMap::new(); + for row in rows { + let values: Option = row.get("values"); + let snapshot = values.map(|v| { + serde_json::from_value::(v) + .expect("Failed to deserialize balance snapshot") + }); + ret.insert( + ( + row.get("partition_window"), + row.get::<&str, _>("currency") + .parse() + .expect("Could not parse currency"), + row.get("journal_id"), + row.get("account_id"), + row.get("velocity_control_id"), + row.get("velocity_limit_id"), + ), + snapshot, + ); + } + Ok(ret) + } + + pub(crate) async fn insert_new_snapshots( + &self, + db: &mut Transaction<'_, Postgres>, + new_balances: HashMap<&VelocityBalanceKey, Vec>, + ) -> Result<(), VelocityError> { + dbg!("insert_new_snapshots"); + dbg!(&new_balances); + let mut query_builder = QueryBuilder::new( + r#" + WITH new_snapshots AS ( + INSERT INTO cala_velocity_balance_history ( + journal_id, account_id, currency, velocity_control_id, velocity_limit_id, partition_window, latest_entry_id, version, values + ) + "#, + ); + + query_builder.push_values( + new_balances.into_iter().flat_map(|(key, snapshots)| { + snapshots.into_iter().map(move |snapshot| (key, snapshot)) + }), + |mut builder, (key, b)| { + let ( + window, + currency, + journal_id, + account_id, + velocity_control_id, + velocity_limit_id, + ) = key; + builder.push_bind(journal_id); + builder.push_bind(account_id); + builder.push_bind(currency.code()); + builder.push_bind(velocity_control_id); + builder.push_bind(velocity_limit_id); + builder.push_bind(window.inner()); + builder.push_bind(b.entry_id); + builder.push_bind(b.version as i32); + builder.push_bind( + serde_json::to_value(b).expect("Failed to serialize balance snapshot"), + ); + }, + ); + + query_builder.push( + r#" + RETURNING * + ), + ranked_balances AS ( + SELECT *, + ROW_NUMBER() OVER ( + PARTITION BY partition_window, currency, journal_id, account_id, velocity_control_id, velocity_limit_id ORDER BY version + ) AS rn, + MAX(version) OVER ( + PARTITION BY partition_window, currency, journal_id, account_id, velocity_control_id, velocity_limit_id + ) AS max + FROM new_snapshots + ), + initial_balances AS ( + INSERT INTO cala_velocity_current_balances ( + journal_id, account_id, currency, velocity_control_id, velocity_limit_id, + partition_window, latest_version + ) + SELECT + journal_id, account_id, currency, velocity_control_id, velocity_limit_id, + partition_window, version + FROM ranked_balances + WHERE version = rn AND rn = max + ) + UPDATE cala_velocity_current_balances c + SET latest_version = n.version + FROM ranked_balances n + WHERE c.journal_id = n.journal_id + AND c.account_id = n.account_id + AND c.currency = n.currency + AND c.velocity_control_id = n.velocity_control_id + AND c.velocity_limit_id = n.velocity_limit_id + AND c.partition_window = n.partition_window + AND c.data_source_id = '00000000-0000-0000-0000-000000000000' + AND version = max AND version != rn + "#, + ); + query_builder.build().execute(&mut **db).await?; + Ok(()) + } +} diff --git a/cala-ledger/src/velocity/context.rs b/cala-ledger/src/velocity/context.rs new file mode 100644 index 00000000..37177b0a --- /dev/null +++ b/cala-ledger/src/velocity/context.rs @@ -0,0 +1,39 @@ +use std::collections::HashMap; + +use cala_types::{entry::EntryValues, transaction::TransactionValues}; +use cel_interpreter::{CelMap, CelValue}; + +use crate::{cel_context::*, primitives::EntryId}; + +pub struct EvalContext { + transaction: CelValue, + entry_values: HashMap, +} + +impl EvalContext { + pub fn new(transaction: &TransactionValues) -> Self { + Self { + transaction: transaction.into(), + entry_values: HashMap::new(), + } + } + + pub fn control_context(&mut self, entry: &EntryValues) -> CelContext { + let entry = self + .entry_values + .entry(entry.id) + .or_insert_with(|| entry.into()); + + let mut vars = CelMap::new(); + vars.insert("transaction", self.transaction.clone()); + vars.insert("entry", entry.clone()); + + let mut context = CelMap::new(); + context.insert("vars", vars); + + let mut ctx = initialize(); + ctx.add_variable("context", context); + + ctx + } +} diff --git a/cala-ledger/src/velocity/control/entity.rs b/cala-ledger/src/velocity/control/entity.rs index 30fb30bd..a019742d 100644 --- a/cala-ledger/src/velocity/control/entity.rs +++ b/cala-ledger/src/velocity/control/entity.rs @@ -25,13 +25,23 @@ impl EntityEvent for VelocityControlEvent { #[builder(pattern = "owned", build_fn(error = "EntityError"))] pub struct VelocityControl { values: VelocityControlValues, - pub(super) _events: EntityEvents, + pub(super) events: EntityEvents, } impl VelocityControl { pub fn id(&self) -> VelocityControlId { self.values.id } + + pub fn into_values(self) -> VelocityControlValues { + self.values + } + + pub fn created_at(&self) -> chrono::DateTime { + self.events + .entity_first_persisted_at + .expect("No events for account") + } } impl Entity for VelocityControl { @@ -50,7 +60,7 @@ impl TryFrom> for VelocityControl { } } } - builder._events(events).build() + builder.events(events).build() } } diff --git a/cala-ledger/src/velocity/control/repo.rs b/cala-ledger/src/velocity/control/repo.rs index 8444f4d6..eec4e7f2 100644 --- a/cala-ledger/src/velocity/control/repo.rs +++ b/cala-ledger/src/velocity/control/repo.rs @@ -33,4 +33,33 @@ impl VelocityControlRepo { let control = VelocityControl::try_from(events)?; Ok(control) } + + pub async fn find_by_id( + &self, + db: &mut Transaction<'_, Postgres>, + id: VelocityControlId, + ) -> Result { + let rows = sqlx::query_as!( + GenericEvent, + r#"SELECT c.id, e.sequence, e.event, + c.created_at AS entity_created_at, e.recorded_at AS event_recorded_at + FROM cala_velocity_controls c + JOIN cala_velocity_control_events e + ON c.data_source_id = e.data_source_id + AND c.id = e.id + WHERE c.data_source_id = '00000000-0000-0000-0000-000000000000' + AND c.id = $1 + ORDER BY e.sequence"#, + id as VelocityControlId, + ) + .fetch_all(&mut **db) + .await?; + match EntityEvents::load_first(rows) { + Ok(account) => Ok(account), + Err(EntityError::NoEntityEventsPresent) => { + Err(VelocityError::CouldNotFindControlById(id)) + } + Err(e) => Err(e.into()), + } + } } diff --git a/cala-ledger/src/velocity/error.rs b/cala-ledger/src/velocity/error.rs index 3c67edcf..9771b4d3 100644 --- a/cala-ledger/src/velocity/error.rs +++ b/cala-ledger/src/velocity/error.rs @@ -1,7 +1,10 @@ +use rust_decimal::Decimal; use thiserror::Error; use cel_interpreter::CelError; +use crate::primitives::*; + #[derive(Error, Debug)] pub enum VelocityError { #[error("VelocityError - Sqlx: {0}")] @@ -12,4 +15,19 @@ pub enum VelocityError { CelError(#[from] CelError), #[error("{0}")] ParamError(#[from] crate::param::error::ParamError), + #[error("VelocityError - Could not find control by id: {0}")] + CouldNotFindControlById(VelocityControlId), + #[error("VelocityError - Enforcement: {0}")] + Enforcement(#[from] LimitExceededError), +} + +#[derive(Error, Debug)] +#[error("Velocity limit exceeded for account {account_id} in currency {currency}: Limit ({limit_id}): {limit}, Requested: {requested}")] +pub struct LimitExceededError { + pub account_id: AccountId, + pub currency: String, + pub limit_id: VelocityLimitId, + pub layer: Layer, + pub limit: Decimal, + pub requested: Decimal, } diff --git a/cala-ledger/src/velocity/limit/entity.rs b/cala-ledger/src/velocity/limit/entity.rs index 4ceb36ae..390050ed 100644 --- a/cala-ledger/src/velocity/limit/entity.rs +++ b/cala-ledger/src/velocity/limit/entity.rs @@ -124,6 +124,12 @@ impl NewVelocityLimit { input.enforcement_direction, ) .expect("already validated"), + start: input.start.map(|expr| { + CelExpression::try_from(expr).expect("already validated") + }), + end: input.end.map(|expr| { + CelExpression::try_from(expr).expect("already validated") + }), }) .collect(), }, @@ -191,6 +197,10 @@ pub struct NewBalanceLimit { amount: String, #[builder(setter(into))] enforcement_direction: String, + #[builder(setter(into), default)] + start: Option, + #[builder(setter(into), default)] + end: Option, } impl NewBalanceLimit { pub fn builder() -> NewBalanceLimitBuilder { @@ -214,6 +224,8 @@ impl NewBalanceLimitBuilder { .as_ref() .expect("Mandatory field 'enforcement_direction' not set"), )?; + validate_optional_expression(&self.start)?; + validate_optional_expression(&self.end)?; Ok(()) } } diff --git a/cala-ledger/src/velocity/limit/repo.rs b/cala-ledger/src/velocity/limit/repo.rs index 06163dda..66a06978 100644 --- a/cala-ledger/src/velocity/limit/repo.rs +++ b/cala-ledger/src/velocity/limit/repo.rs @@ -5,12 +5,14 @@ use crate::primitives::VelocityControlId; #[derive(Debug, Clone)] pub struct VelocityLimitRepo { - pool: PgPool, + _pool: PgPool, } impl VelocityLimitRepo { pub fn new(pool: &PgPool) -> Self { - Self { pool: pool.clone() } + Self { + _pool: pool.clone(), + } } pub async fn create_in_tx( @@ -52,6 +54,7 @@ impl VelocityLimitRepo { pub async fn list_for_control( &self, + db: &mut Transaction<'_, Postgres>, control: VelocityControlId, ) -> Result, VelocityError> { let rows = sqlx::query_as!( @@ -71,7 +74,7 @@ impl VelocityLimitRepo { ORDER BY l.id, e.sequence"#, control as VelocityControlId, ) - .fetch_all(&self.pool) + .fetch_all(&mut **db) .await?; let n = rows.len(); let ret = EntityEvents::load_n(rows, n)? diff --git a/cala-ledger/src/velocity/mod.rs b/cala-ledger/src/velocity/mod.rs index cff117f5..faef9729 100644 --- a/cala-ledger/src/velocity/mod.rs +++ b/cala-ledger/src/velocity/mod.rs @@ -1,14 +1,20 @@ mod account_control; +mod balance; +mod context; mod control; pub mod error; mod limit; +use chrono::{DateTime, Utc}; use sqlx::PgPool; +use cala_types::{entry::EntryValues, transaction::TransactionValues}; + pub use crate::param::Params; use crate::{atomic_operation::*, outbox::*, primitives::AccountId}; use account_control::*; +use balance::*; pub use control::*; use error::*; pub use limit::*; @@ -20,6 +26,7 @@ pub struct Velocities { limits: VelocityLimitRepo, controls: VelocityControlRepo, account_controls: AccountControls, + balances: VelocityBalances, } impl Velocities { @@ -28,6 +35,7 @@ impl Velocities { limits: VelocityLimitRepo::new(pool), controls: VelocityControlRepo::new(pool), account_controls: AccountControls::new(pool), + balances: VelocityBalances::new(pool), pool: pool.clone(), outbox, } @@ -108,14 +116,40 @@ impl Velocities { pub async fn attach_control_to_account_in_op( &self, op: &mut AtomicOperation<'_>, - control: VelocityControlId, + control_id: VelocityControlId, account_id: AccountId, params: impl Into + std::fmt::Debug, ) -> Result<(), VelocityError> { - let limits = self.limits.list_for_control(control).await?; + let control = self.controls.find_by_id(op.tx(), control_id).await?; + let limits = self.limits.list_for_control(op.tx(), control_id).await?; self.account_controls - .attach_control_in_op(op, control, account_id, limits, params) + .attach_control_in_op( + op, + control.created_at(), + control.into_values(), + account_id, + limits, + params, + ) .await?; Ok(()) } + + pub(crate) async fn update_balances_in_op( + &self, + op: &mut AtomicOperation<'_>, + created_at: DateTime, + transaction: &TransactionValues, + entries: &[EntryValues], + account_ids: &[AccountId], + ) -> Result<(), VelocityError> { + let controls = self + .account_controls + .find_for_enforcement(op, account_ids) + .await?; + + self.balances + .update_balances_in_op(op, created_at, transaction, entries, controls) + .await + } } diff --git a/cala-ledger/tests/account_set.rs b/cala-ledger/tests/account_set.rs index f83a6cd2..defd6369 100644 --- a/cala-ledger/tests/account_set.rs +++ b/cala-ledger/tests/account_set.rs @@ -100,7 +100,7 @@ async fn balances() -> anyhow::Result<()> { let recipient_account = cala.accounts().create(receiver).await.unwrap(); let tx_code = Alphanumeric.sample_string(&mut rand::thread_rng(), 32); - let new_template = helpers::test_template(&tx_code); + let new_template = helpers::currency_conversion_template(&tx_code); cala.tx_templates().create(new_template).await.unwrap(); let before_account_set = NewAccountSet::builder() diff --git a/cala-ledger/tests/balance.rs b/cala-ledger/tests/balance.rs index fbbbde0a..558e8e1e 100644 --- a/cala-ledger/tests/balance.rs +++ b/cala-ledger/tests/balance.rs @@ -25,7 +25,7 @@ async fn balance_in_range() -> anyhow::Result<()> { let recipient_account = cala.accounts().create(receiver).await.unwrap(); let tx_code = Alphanumeric.sample_string(&mut rand::thread_rng(), 32); - let new_template = helpers::test_template(&tx_code); + let new_template = helpers::currency_conversion_template(&tx_code); cala.tx_templates().create(new_template).await.unwrap(); let mut params = Params::new(); diff --git a/cala-ledger/tests/helpers.rs b/cala-ledger/tests/helpers.rs index f50de23f..6cf8dd3e 100644 --- a/cala-ledger/tests/helpers.rs +++ b/cala-ledger/tests/helpers.rs @@ -37,7 +37,7 @@ pub fn test_accounts() -> (NewAccount, NewAccount) { (sender_account, recipient_account) } -pub fn test_template(code: &str) -> NewTxTemplate { +pub fn currency_conversion_template(code: &str) -> NewTxTemplate { let params = vec![ NewParamDefinition::builder() .name("recipient") @@ -133,3 +133,87 @@ pub fn test_template(code: &str) -> NewTxTemplate { .build() .unwrap() } + +pub fn velocity_template(code: &str) -> NewTxTemplate { + let params = vec![ + NewParamDefinition::builder() + .name("recipient") + .r#type(ParamDataType::Uuid) + .build() + .unwrap(), + NewParamDefinition::builder() + .name("sender") + .r#type(ParamDataType::Uuid) + .build() + .unwrap(), + NewParamDefinition::builder() + .name("journal_id") + .r#type(ParamDataType::Uuid) + .build() + .unwrap(), + NewParamDefinition::builder() + .name("amount") + .r#type(ParamDataType::Decimal) + .build() + .unwrap(), + NewParamDefinition::builder() + .name("currency") + .r#type(ParamDataType::String) + .default_expr("'USD'") + .build() + .unwrap(), + NewParamDefinition::builder() + .name("layer") + .r#type(ParamDataType::String) + .default_expr("'SETTLED'") + .build() + .unwrap(), + NewParamDefinition::builder() + .name("meta") + .r#type(ParamDataType::Json) + .default_expr(r#"{"foo": "bar"}"#) + .build() + .unwrap(), + NewParamDefinition::builder() + .name("effective") + .r#type(ParamDataType::Date) + .default_expr("date()") + .build() + .unwrap(), + ]; + let entries = vec![ + NewTxTemplateEntry::builder() + .entry_type("'TEST_DR'") + .account_id("params.sender") + .layer("params.layer") + .direction("DEBIT") + .units("params.amount") + .currency("params.currency") + .build() + .unwrap(), + NewTxTemplateEntry::builder() + .entry_type("'TEST_CR'") + .account_id("params.recipient") + .layer("params.layer") + .direction("CREDIT") + .units("params.amount") + .currency("params.currency") + .build() + .unwrap(), + ]; + NewTxTemplate::builder() + .id(uuid::Uuid::new_v4()) + .code(code) + .params(params) + .transaction( + NewTxTemplateTransaction::builder() + .effective("params.effective") + .journal_id("params.journal_id") + .metadata("params.meta") + .build() + .unwrap(), + ) + .entries(entries) + .build() + .unwrap() +} diff --git a/cala-ledger/tests/transaction_post.rs b/cala-ledger/tests/transaction_post.rs index 99facb7a..25a8fd7b 100644 --- a/cala-ledger/tests/transaction_post.rs +++ b/cala-ledger/tests/transaction_post.rs @@ -1,9 +1,9 @@ mod helpers; +use rand::distributions::{Alphanumeric, DistString}; use rust_decimal::Decimal; use cala_ledger::{tx_template::*, *}; -use rand::distributions::{Alphanumeric, DistString}; #[tokio::test] async fn transaction_post() -> anyhow::Result<()> { @@ -22,7 +22,7 @@ async fn transaction_post() -> anyhow::Result<()> { let recipient_account = cala.accounts().create(receiver).await.unwrap(); let tx_code = Alphanumeric.sample_string(&mut rand::thread_rng(), 32); - let new_template = helpers::test_template(&tx_code); + let new_template = helpers::currency_conversion_template(&tx_code); cala.tx_templates().create(new_template).await.unwrap(); diff --git a/cala-ledger/tests/velocity.rs b/cala-ledger/tests/velocity.rs index 179da167..e72eee29 100644 --- a/cala-ledger/tests/velocity.rs +++ b/cala-ledger/tests/velocity.rs @@ -1,5 +1,6 @@ mod helpers; +use rand::distributions::{Alphanumeric, DistString}; use rust_decimal::Decimal; use cala_ledger::{velocity::*, *}; @@ -13,6 +14,9 @@ async fn create_control() -> anyhow::Result<()> { .build()?; let cala = CalaLedger::init(cala_config).await?; + let new_journal = helpers::test_journal(); + let journal = cala.journals().create(new_journal).await.unwrap(); + let velocity = cala.velocities(); let withdrawal_limit = NewVelocityLimit::builder() @@ -83,13 +87,37 @@ async fn create_control() -> anyhow::Result<()> { .add_limit_to_control(control.id(), deposit_limit.id()) .await?; - let (one, _) = helpers::test_accounts(); - let one = cala.accounts().create(one).await.unwrap(); + let (sender, receiver) = helpers::test_accounts(); + let sender_account = cala.accounts().create(sender).await.unwrap(); + let recipient_account = cala.accounts().create(receiver).await.unwrap(); + let mut params = Params::new(); - params.insert("withdrawal_limit", Decimal::from(100)); - params.insert("deposit_limit", Decimal::from(100)); + let limit = Decimal::ONE_HUNDRED; + params.insert("withdrawal_limit", limit); + params.insert("deposit_limit", limit); velocity - .attach_control_to_account(control.id(), one.id(), params) + .attach_control_to_account(control.id(), sender_account.id(), params) .await?; + + let tx_code = Alphanumeric.sample_string(&mut rand::thread_rng(), 32); + let new_template = helpers::velocity_template(&tx_code); + + cala.tx_templates().create(new_template).await.unwrap(); + + let mut params = Params::new(); + params.insert("journal_id", journal.id().to_string()); + params.insert("sender", sender_account.id()); + params.insert("recipient", recipient_account.id()); + params.insert("amount", limit); + + let _ = cala + .post_transaction(TransactionId::new(), &tx_code, params.clone()) + .await?; + params.insert("amount", Decimal::ONE); + let res = cala + .post_transaction(TransactionId::new(), &tx_code, params.clone()) + .await; + assert!(res.is_err()); + Ok(()) } diff --git a/cala-ledger/.sqlx/query-67f2b33262254fd0d5d54f5bf92ead9ca880e166919d3e9fd52650bcef348488.json b/cala-server/.sqlx/query-1bdbfe685cc6583cb82612f37bc6013c2246cb79e265f48b3c51d06b55db28bd.json similarity index 64% rename from cala-ledger/.sqlx/query-67f2b33262254fd0d5d54f5bf92ead9ca880e166919d3e9fd52650bcef348488.json rename to cala-server/.sqlx/query-1bdbfe685cc6583cb82612f37bc6013c2246cb79e265f48b3c51d06b55db28bd.json index 91b6adb0..4ca5ed7e 100644 --- a/cala-ledger/.sqlx/query-67f2b33262254fd0d5d54f5bf92ead9ca880e166919d3e9fd52650bcef348488.json +++ b/cala-server/.sqlx/query-1bdbfe685cc6583cb82612f37bc6013c2246cb79e265f48b3c51d06b55db28bd.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "UPDATE cala_accounts\n SET code = $2, name = $3, external_id = $4, normal_balance_type = $5\n WHERE id = $1 AND data_source_id = '00000000-0000-0000-0000-000000000000'", + "query": "UPDATE cala_accounts\n SET code = $2, name = $3, external_id = $4, normal_balance_type = $5, latest_values = $6\n WHERE id = $1 AND data_source_id = '00000000-0000-0000-0000-000000000000'", "describe": { "columns": [], "parameters": { @@ -19,10 +19,11 @@ ] } } - } + }, + "Jsonb" ] }, "nullable": [] }, - "hash": "67f2b33262254fd0d5d54f5bf92ead9ca880e166919d3e9fd52650bcef348488" + "hash": "1bdbfe685cc6583cb82612f37bc6013c2246cb79e265f48b3c51d06b55db28bd" } diff --git a/cala-ledger/.sqlx/query-de85f101b071aab683abad0a452087b012330d423849a0a20c719816cf0b7343.json b/cala-server/.sqlx/query-2d1a0955c79d7be68272211f35fba584e6a170427c89b39940217bb743d1049b.json similarity index 68% rename from cala-ledger/.sqlx/query-de85f101b071aab683abad0a452087b012330d423849a0a20c719816cf0b7343.json rename to cala-server/.sqlx/query-2d1a0955c79d7be68272211f35fba584e6a170427c89b39940217bb743d1049b.json index d7e7b7d6..4c58ab38 100644 --- a/cala-ledger/.sqlx/query-de85f101b071aab683abad0a452087b012330d423849a0a20c719816cf0b7343.json +++ b/cala-server/.sqlx/query-2d1a0955c79d7be68272211f35fba584e6a170427c89b39940217bb743d1049b.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "INSERT INTO cala_accounts (id, code, name, external_id, normal_balance_type, eventually_consistent)\n VALUES ($1, $2, $3, $4, $5, $6)", + "query": "INSERT INTO cala_accounts (id, code, name, external_id, normal_balance_type, eventually_consistent, latest_values)\n VALUES ($1, $2, $3, $4, $5, $6, $7)", "describe": { "columns": [], "parameters": { @@ -20,10 +20,11 @@ } } }, - "Bool" + "Bool", + "Jsonb" ] }, "nullable": [] }, - "hash": "de85f101b071aab683abad0a452087b012330d423849a0a20c719816cf0b7343" + "hash": "2d1a0955c79d7be68272211f35fba584e6a170427c89b39940217bb743d1049b" } diff --git a/cala-server/.sqlx/query-35ac870c6d687c4e91412a1465238bf150fb646b646a3f12159817caab4fc7aa.json b/cala-server/.sqlx/query-49602a3b0dbe7780cdd64d5a04883d934490afa99104ab790689a9181e28a0e2.json similarity index 74% rename from cala-server/.sqlx/query-35ac870c6d687c4e91412a1465238bf150fb646b646a3f12159817caab4fc7aa.json rename to cala-server/.sqlx/query-49602a3b0dbe7780cdd64d5a04883d934490afa99104ab790689a9181e28a0e2.json index e21da570..dbdefce4 100644 --- a/cala-server/.sqlx/query-35ac870c6d687c4e91412a1465238bf150fb646b646a3f12159817caab4fc7aa.json +++ b/cala-server/.sqlx/query-49602a3b0dbe7780cdd64d5a04883d934490afa99104ab790689a9181e28a0e2.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "INSERT INTO cala_accounts (data_source_id, id, code, name, external_id, normal_balance_type, eventually_consistent, created_at)\n VALUES ($1, $2, $3, $4, $5, $6, $7, $8)", + "query": "INSERT INTO cala_accounts (data_source_id, id, code, name, external_id, normal_balance_type, eventually_consistent, created_at, latest_values)\n VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)", "describe": { "columns": [], "parameters": { @@ -22,10 +22,11 @@ } }, "Bool", - "Timestamptz" + "Timestamptz", + "Jsonb" ] }, "nullable": [] }, - "hash": "35ac870c6d687c4e91412a1465238bf150fb646b646a3f12159817caab4fc7aa" + "hash": "49602a3b0dbe7780cdd64d5a04883d934490afa99104ab790689a9181e28a0e2" } diff --git a/cala-server/.sqlx/query-8b87fd1b4f25b8265528a02375e937c3c446b11076dd83e2bd202cee8c95e2c2.json b/cala-server/.sqlx/query-8b87fd1b4f25b8265528a02375e937c3c446b11076dd83e2bd202cee8c95e2c2.json new file mode 100644 index 00000000..281dff95 --- /dev/null +++ b/cala-server/.sqlx/query-8b87fd1b4f25b8265528a02375e937c3c446b11076dd83e2bd202cee8c95e2c2.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT values FROM cala_velocity_account_controls\n WHERE data_source_id = '00000000-0000-0000-0000-000000000000' AND account_id = ANY($1)", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "values", + "type_info": "Jsonb" + } + ], + "parameters": { + "Left": [ + "UuidArray" + ] + }, + "nullable": [ + false + ] + }, + "hash": "8b87fd1b4f25b8265528a02375e937c3c446b11076dd83e2bd202cee8c95e2c2" +} diff --git a/cala-server/.sqlx/query-d4640c733860f62b22b5d9e5bca022b01dd30aedc0092708459e7f49d563e053.json b/cala-server/.sqlx/query-d4640c733860f62b22b5d9e5bca022b01dd30aedc0092708459e7f49d563e053.json new file mode 100644 index 00000000..ac919a0b --- /dev/null +++ b/cala-server/.sqlx/query-d4640c733860f62b22b5d9e5bca022b01dd30aedc0092708459e7f49d563e053.json @@ -0,0 +1,46 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT c.id, e.sequence, e.event,\n c.created_at AS entity_created_at, e.recorded_at AS event_recorded_at\n FROM cala_velocity_controls c\n JOIN cala_velocity_control_events e\n ON c.data_source_id = e.data_source_id\n AND c.id = e.id\n WHERE c.data_source_id = '00000000-0000-0000-0000-000000000000'\n AND c.id = $1\n ORDER BY e.sequence", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id", + "type_info": "Uuid" + }, + { + "ordinal": 1, + "name": "sequence", + "type_info": "Int4" + }, + { + "ordinal": 2, + "name": "event", + "type_info": "Jsonb" + }, + { + "ordinal": 3, + "name": "entity_created_at", + "type_info": "Timestamptz" + }, + { + "ordinal": 4, + "name": "event_recorded_at", + "type_info": "Timestamptz" + } + ], + "parameters": { + "Left": [ + "Uuid" + ] + }, + "nullable": [ + false, + false, + false, + false, + false + ] + }, + "hash": "d4640c733860f62b22b5d9e5bca022b01dd30aedc0092708459e7f49d563e053" +}