diff --git a/cala-ledger-core-types/src/lib.rs b/cala-ledger-core-types/src/lib.rs index b23a4088..55c95066 100644 --- a/cala-ledger-core-types/src/lib.rs +++ b/cala-ledger-core-types/src/lib.rs @@ -13,3 +13,4 @@ pub mod param; pub mod primitives; pub mod transaction; pub mod tx_template; +pub mod velocity; diff --git a/cala-ledger-core-types/src/primitives.rs b/cala-ledger-core-types/src/primitives.rs index cff597f5..2d16bd5b 100644 --- a/cala-ledger-core-types/src/primitives.rs +++ b/cala-ledger-core-types/src/primitives.rs @@ -12,6 +12,7 @@ crate::entity_id! { DataSourceId } crate::entity_id! { TxTemplateId } crate::entity_id! { TransactionId } crate::entity_id! { EntryId } +crate::entity_id! { VelocityLimitId } pub type BalanceId = (JournalId, AccountId, Currency); impl From<&AccountSetId> for AccountId { diff --git a/cala-ledger-core-types/src/velocity/limit.rs b/cala-ledger-core-types/src/velocity/limit.rs new file mode 100644 index 00000000..ae1ee661 --- /dev/null +++ b/cala-ledger-core-types/src/velocity/limit.rs @@ -0,0 +1,35 @@ +use cel_interpreter::CelExpression; +use serde::{Deserialize, Serialize}; + +use crate::{param::*, primitives::*}; + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct VelocityLimitValues { + pub id: VelocityLimitId, + pub name: String, + pub description: String, + pub window: Vec, + pub condition: Option, + pub currency: Option, + pub params: Option>, + pub limit: LimitInput, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct PartitionKeyInput { + pub alias: String, + pub value: CelExpression, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct LimitInput { + pub timestamp_source: Option, + pub balance: Vec, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct BalanceLimitInput { + pub layer: CelExpression, + pub amount: CelExpression, + pub enforcement_direction: CelExpression, +} diff --git a/cala-ledger-core-types/src/velocity/mod.rs b/cala-ledger-core-types/src/velocity/mod.rs new file mode 100644 index 00000000..d73b8fb4 --- /dev/null +++ b/cala-ledger-core-types/src/velocity/mod.rs @@ -0,0 +1,3 @@ +mod limit; + +pub use limit::*; diff --git a/cala-ledger/.sqlx/query-399cd06c60999a20e5e076a9b32cc7bfcad9af2ac2cdeb11905f1dd0a715f2e2.json b/cala-ledger/.sqlx/query-399cd06c60999a20e5e076a9b32cc7bfcad9af2ac2cdeb11905f1dd0a715f2e2.json new file mode 100644 index 00000000..f7008891 --- /dev/null +++ b/cala-ledger/.sqlx/query-399cd06c60999a20e5e076a9b32cc7bfcad9af2ac2cdeb11905f1dd0a715f2e2.json @@ -0,0 +1,15 @@ +{ + "db_name": "PostgreSQL", + "query": "INSERT INTO cala_velocity_limits (id, name)\n VALUES ($1, $2)", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Uuid", + "Varchar" + ] + }, + "nullable": [] + }, + "hash": "399cd06c60999a20e5e076a9b32cc7bfcad9af2ac2cdeb11905f1dd0a715f2e2" +} diff --git a/cala-ledger/migrations/20231208110808_cala_ledger_setup.sql b/cala-ledger/migrations/20231208110808_cala_ledger_setup.sql index 115219b1..7a173ab3 100644 --- a/cala-ledger/migrations/20231208110808_cala_ledger_setup.sql +++ b/cala-ledger/migrations/20231208110808_cala_ledger_setup.sql @@ -191,6 +191,27 @@ CREATE TABLE cala_balance_history ( FOREIGN KEY (data_source_id, latest_entry_id) REFERENCES cala_entries(data_source_id, id) ); +CREATE TABLE cala_velocity_limits ( + data_source_id UUID NOT NULL DEFAULT '00000000-0000-0000-0000-000000000000', + id UUID NOT NULL, + name VARCHAR NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + UNIQUE(data_source_id, id) +); +CREATE INDEX idx_cala_velocity_limits_name ON cala_velocity_limits (name); + + +CREATE TABLE cala_velocity_limit_events ( + data_source_id UUID NOT NULL DEFAULT '00000000-0000-0000-0000-000000000000', + id UUID NOT NULL, + sequence INT NOT NULL, + event_type VARCHAR NOT NULL, + event JSONB NOT NULL, + recorded_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + UNIQUE(data_source_id, id, sequence), + FOREIGN KEY (data_source_id, id) REFERENCES cala_velocity_limits(data_source_id, id) +); + CREATE TABLE cala_outbox_events ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), sequence BIGSERIAL UNIQUE, diff --git a/cala-ledger/src/ledger/mod.rs b/cala-ledger/src/ledger/mod.rs index 90b4a5c6..adacea98 100644 --- a/cala-ledger/src/ledger/mod.rs +++ b/cala-ledger/src/ledger/mod.rs @@ -19,6 +19,7 @@ use crate::{ primitives::TransactionId, transaction::{Transaction, Transactions}, tx_template::{Params, TxTemplates}, + velocity::Velocities, }; #[cfg(feature = "import")] mod import_deps { @@ -37,6 +38,7 @@ pub struct CalaLedger { transactions: Transactions, tx_templates: TxTemplates, entries: Entries, + velocities: Velocities, balances: Balances, outbox: Outbox, #[allow(clippy::type_complexity)] @@ -76,6 +78,7 @@ impl CalaLedger { let transactions = Transactions::new(&pool, outbox.clone()); let entries = Entries::new(&pool, outbox.clone()); let balances = Balances::new(&pool, outbox.clone()); + let velocities = Velocities::new(&pool, outbox.clone()); let account_sets = AccountSets::new(&pool, outbox.clone(), &accounts, &entries, &balances); Ok(Self { accounts, @@ -86,6 +89,7 @@ impl CalaLedger { transactions, entries, balances, + velocities, outbox_handle: Arc::new(Mutex::new(outbox_handle)), pool, }) @@ -103,6 +107,10 @@ impl CalaLedger { &self.accounts } + pub fn velocities(&self) -> &Velocities { + &self.velocities + } + pub fn account_sets(&self) -> &AccountSets { &self.account_sets } diff --git a/cala-ledger/src/lib.rs b/cala-ledger/src/lib.rs index 787a2a27..5e9cdf50 100644 --- a/cala-ledger/src/lib.rs +++ b/cala-ledger/src/lib.rs @@ -137,6 +137,7 @@ pub mod journal; pub mod migrate; pub mod transaction; pub mod tx_template; +pub mod velocity; mod ledger; pub mod outbox; diff --git a/cala-ledger/src/velocity/error.rs b/cala-ledger/src/velocity/error.rs new file mode 100644 index 00000000..d82cb4ab --- /dev/null +++ b/cala-ledger/src/velocity/error.rs @@ -0,0 +1,9 @@ +use thiserror::Error; + +#[derive(Error, Debug)] +pub enum VelocityError { + #[error("VelocityError - Sqlx: {0}")] + Sqlx(#[from] sqlx::Error), + #[error("VelocityError - EntityError: {0}")] + EntityError(#[from] crate::entity::EntityError), +} diff --git a/cala-ledger/src/velocity/limit/entity.rs b/cala-ledger/src/velocity/limit/entity.rs new file mode 100644 index 00000000..274f8728 --- /dev/null +++ b/cala-ledger/src/velocity/limit/entity.rs @@ -0,0 +1,220 @@ +use cel_interpreter::CelExpression; +use derive_builder::Builder; +use serde::{Deserialize, Serialize}; + +pub use crate::{entity::*, param::definition::*}; +pub use cala_types::{ + primitives::{Currency, VelocityLimitId}, + velocity::*, +}; + +#[derive(Debug, Serialize, Deserialize)] +#[serde(tag = "type", rename_all = "snake_case")] +pub enum VelocityLimitEvent { + Initialized { values: VelocityLimitValues }, +} + +impl EntityEvent for VelocityLimitEvent { + type EntityId = VelocityLimitId; + fn event_table_name() -> &'static str { + "cala_velocity_limit_events" + } +} + +#[derive(Builder)] +#[builder(pattern = "owned", build_fn(error = "EntityError"))] +pub struct VelocityLimit { + _values: VelocityLimitValues, + pub(super) _events: EntityEvents, +} + +impl Entity for VelocityLimit { + type Event = VelocityLimitEvent; +} + +impl TryFrom> for VelocityLimit { + type Error = EntityError; + + fn try_from(events: EntityEvents) -> Result { + let mut builder = VelocityLimitBuilder::default(); + for event in events.iter() { + match event { + VelocityLimitEvent::Initialized { values } => { + builder = builder._values(values.clone()); + } + } + } + builder._events(events).build() + } +} + +/// Representation of a ***new*** velocity limit entity with required/optional properties and a builder. +#[derive(Builder, Debug)] +#[builder(build_fn(validate = "Self::validate"))] +pub struct NewVelocityLimit { + #[builder(setter(into))] + pub(super) id: VelocityLimitId, + #[builder(setter(into))] + pub(super) name: String, + #[builder(setter(into))] + description: String, + window: Vec, + #[builder(setter(strip_option, into), default)] + condition: Option, + currency: Option, + #[builder(setter(strip_option), default)] + params: Option>, + limit: NewLimitInput, +} + +impl NewVelocityLimit { + pub fn builder() -> NewVelocityLimitBuilder { + NewVelocityLimitBuilder::default() + } + + pub(super) fn initial_events(self) -> EntityEvents { + let limit = self.limit; + EntityEvents::init( + self.id, + [VelocityLimitEvent::Initialized { + values: VelocityLimitValues { + id: self.id, + name: self.name, + description: self.description, + currency: self.currency, + window: self + .window + .into_iter() + .map(|input| PartitionKeyInput { + alias: input.alias, + value: CelExpression::try_from(input.value).expect("already validated"), + }) + .collect(), + condition: self + .condition + .map(|expr| CelExpression::try_from(expr).expect("already validated")), + params: self + .params + .map(|params| params.into_iter().map(ParamDefinition::from).collect()), + limit: LimitInput { + timestamp_source: limit + .timestamp_source + .map(CelExpression::try_from) + .transpose() + .expect("already validated"), + balance: limit + .balance + .into_iter() + .map(|input| BalanceLimitInput { + layer: CelExpression::try_from(input.layer) + .expect("already validated"), + amount: CelExpression::try_from(input.amount) + .expect("already validated"), + enforcement_direction: CelExpression::try_from( + input.enforcement_direction, + ) + .expect("already validated"), + }) + .collect(), + }, + }, + }], + ) + } +} + +impl NewVelocityLimitBuilder { + fn validate(&self) -> Result<(), String> { + validate_optional_expression(&self.condition)?; + Ok(()) + } +} + +#[derive(Clone, Builder, Debug)] +#[builder(build_fn(validate = "Self::validate"))] +pub struct NewPartitionKeyInput { + #[builder(setter(into))] + alias: String, + #[builder(setter(into))] + value: String, +} +impl NewPartitionKeyInput { + pub fn builder() -> NewPartitionKeyInputBuilder { + NewPartitionKeyInputBuilder::default() + } +} +impl NewPartitionKeyInputBuilder { + fn validate(&self) -> Result<(), String> { + validate_expression( + self.value + .as_ref() + .expect("Mandatory field 'value' not set"), + )?; + Ok(()) + } +} + +#[derive(Clone, Builder, Debug)] +#[builder(build_fn(validate = "Self::validate"))] +pub struct NewLimitInput { + #[builder(setter(strip_option, into), default)] + timestamp_source: Option, + balance: Vec, +} +impl NewLimitInput { + pub fn builder() -> NewLimitInputBuilder { + NewLimitInputBuilder::default() + } +} +impl NewLimitInputBuilder { + fn validate(&self) -> Result<(), String> { + validate_optional_expression(&self.timestamp_source) + } +} + +#[derive(Clone, Builder, Debug)] +#[builder(build_fn(validate = "Self::validate"))] +pub struct NewBalanceLimitInput { + #[builder(setter(into))] + layer: String, + #[builder(setter(into))] + amount: String, + #[builder(setter(into))] + enforcement_direction: String, +} +impl NewBalanceLimitInput { + pub fn builder() -> NewBalanceLimitInputBuilder { + NewBalanceLimitInputBuilder::default() + } +} +impl NewBalanceLimitInputBuilder { + fn validate(&self) -> Result<(), String> { + validate_expression( + self.layer + .as_ref() + .expect("Mandatory field 'value' not set"), + )?; + validate_expression( + self.amount + .as_ref() + .expect("Mandatory field 'value' not set"), + )?; + validate_expression( + self.enforcement_direction + .as_ref() + .expect("Mandatory field 'value' not set"), + )?; + Ok(()) + } +} + +fn validate_expression(expr: &str) -> Result<(), String> { + CelExpression::try_from(expr).map_err(|e| e.to_string())?; + Ok(()) +} +fn validate_optional_expression(expr: &Option>) -> Result<(), String> { + if let Some(Some(expr)) = expr.as_ref() { + CelExpression::try_from(expr.as_str()).map_err(|e| e.to_string())?; + } + Ok(()) +} diff --git a/cala-ledger/src/velocity/limit/mod.rs b/cala-ledger/src/velocity/limit/mod.rs new file mode 100644 index 00000000..79b01243 --- /dev/null +++ b/cala-ledger/src/velocity/limit/mod.rs @@ -0,0 +1,5 @@ +mod entity; +mod repo; + +pub use entity::*; +pub(super) use repo::*; diff --git a/cala-ledger/src/velocity/limit/repo.rs b/cala-ledger/src/velocity/limit/repo.rs new file mode 100644 index 00000000..021d05e2 --- /dev/null +++ b/cala-ledger/src/velocity/limit/repo.rs @@ -0,0 +1,36 @@ +use sqlx::{PgPool, Postgres, Transaction}; + +use super::{super::error::*, entity::*}; + +#[derive(Debug, Clone)] +pub struct VelocityLimitRepo { + _pool: PgPool, +} + +impl VelocityLimitRepo { + pub fn new(pool: &PgPool) -> Self { + Self { + _pool: pool.clone(), + } + } + + pub async fn create_in_tx( + &self, + db: &mut Transaction<'_, Postgres>, + new_limit: NewVelocityLimit, + ) -> Result { + let id = new_limit.id; + sqlx::query!( + r#"INSERT INTO cala_velocity_limits (id, name) + VALUES ($1, $2)"#, + id as VelocityLimitId, + new_limit.name, + ) + .execute(&mut **db) + .await?; + let mut events = new_limit.initial_events(); + events.persist(db).await?; + let limit = VelocityLimit::try_from(events)?; + Ok(limit) + } +} diff --git a/cala-ledger/src/velocity/mod.rs b/cala-ledger/src/velocity/mod.rs new file mode 100644 index 00000000..c00ba487 --- /dev/null +++ b/cala-ledger/src/velocity/mod.rs @@ -0,0 +1,44 @@ +pub mod error; +mod limit; + +use sqlx::PgPool; + +use crate::{atomic_operation::*, outbox::*}; + +use error::*; +pub use limit::*; + +#[derive(Clone)] +pub struct Velocities { + outbox: Outbox, + pool: PgPool, + limits: VelocityLimitRepo, +} + +impl Velocities { + pub(crate) fn new(pool: &PgPool, outbox: Outbox) -> Self { + Self { + limits: VelocityLimitRepo::new(pool), + pool: pool.clone(), + outbox, + } + } + + pub async fn create_limit( + &self, + new_limit: NewVelocityLimit, + ) -> Result { + let mut op = AtomicOperation::init(&self.pool, &self.outbox).await?; + let limit = self.create_limit_in_op(&mut op, new_limit).await?; + op.commit().await?; + Ok(limit) + } + + pub async fn create_limit_in_op( + &self, + op: &mut AtomicOperation<'_>, + new_limit: NewVelocityLimit, + ) -> Result { + self.limits.create_in_tx(op.tx(), new_limit).await + } +} diff --git a/cala-ledger/tests/helpers.rs b/cala-ledger/tests/helpers.rs index 4cec213a..11230dce 100644 --- a/cala-ledger/tests/helpers.rs +++ b/cala-ledger/tests/helpers.rs @@ -1,3 +1,4 @@ +#![allow(dead_code)] use rand::distributions::{Alphanumeric, DistString}; use cala_ledger::{account::*, journal::*, tx_template::*}; diff --git a/cala-ledger/tests/velocity.rs b/cala-ledger/tests/velocity.rs new file mode 100644 index 00000000..8ed8f2ae --- /dev/null +++ b/cala-ledger/tests/velocity.rs @@ -0,0 +1,33 @@ +mod helpers; + +use cala_ledger::{velocity::*, *}; + +#[tokio::test] +async fn create_control() -> anyhow::Result<()> { + let pool = helpers::init_pool().await?; + let cala_config = CalaLedgerConfig::builder() + .pool(pool) + .exec_migrations(false) + .build()?; + let cala = CalaLedger::init(cala_config).await?; + + let velocity = cala.velocities(); + + let limit = NewVelocityLimit::builder() + .id(VelocityLimitId::new()) + .name("Test") + .description("test") + .window(vec![]) + .currency(None) + .limit( + NewLimitInput::builder() + .balance(vec![]) + .build() + .expect("limit"), + ) + .build() + .expect("build control"); + + velocity.create_limit(limit).await?; + Ok(()) +} diff --git a/cala-server/.sqlx/query-399cd06c60999a20e5e076a9b32cc7bfcad9af2ac2cdeb11905f1dd0a715f2e2.json b/cala-server/.sqlx/query-399cd06c60999a20e5e076a9b32cc7bfcad9af2ac2cdeb11905f1dd0a715f2e2.json new file mode 100644 index 00000000..f7008891 --- /dev/null +++ b/cala-server/.sqlx/query-399cd06c60999a20e5e076a9b32cc7bfcad9af2ac2cdeb11905f1dd0a715f2e2.json @@ -0,0 +1,15 @@ +{ + "db_name": "PostgreSQL", + "query": "INSERT INTO cala_velocity_limits (id, name)\n VALUES ($1, $2)", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Uuid", + "Varchar" + ] + }, + "nullable": [] + }, + "hash": "399cd06c60999a20e5e076a9b32cc7bfcad9af2ac2cdeb11905f1dd0a715f2e2" +}