Skip to content

Commit

Permalink
chore: create velocity limit
Browse files Browse the repository at this point in the history
  • Loading branch information
bodymindarts committed Jun 28, 2024
1 parent 4a521c2 commit 4a37fb1
Show file tree
Hide file tree
Showing 16 changed files with 448 additions and 0 deletions.
1 change: 1 addition & 0 deletions cala-ledger-core-types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ pub mod param;
pub mod primitives;
pub mod transaction;
pub mod tx_template;
pub mod velocity;
1 change: 1 addition & 0 deletions cala-ledger-core-types/src/primitives.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ crate::entity_id! { DataSourceId }
crate::entity_id! { TxTemplateId }
crate::entity_id! { TransactionId }
crate::entity_id! { EntryId }
crate::entity_id! { VelocityLimitId }

pub type BalanceId = (JournalId, AccountId, Currency);
impl From<&AccountSetId> for AccountId {
Expand Down
35 changes: 35 additions & 0 deletions cala-ledger-core-types/src/velocity/limit.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
use cel_interpreter::CelExpression;
use serde::{Deserialize, Serialize};

use crate::{param::*, primitives::*};

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct VelocityLimitValues {
pub id: VelocityLimitId,
pub name: String,
pub description: String,
pub window: Vec<PartitionKeyInput>,
pub condition: Option<CelExpression>,
pub currency: Option<Currency>,
pub params: Option<Vec<ParamDefinition>>,
pub limit: LimitInput,
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct PartitionKeyInput {
pub alias: String,
pub value: CelExpression,
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct LimitInput {
pub timestamp_source: Option<CelExpression>,
pub balance: Vec<BalanceLimitInput>,
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct BalanceLimitInput {
pub layer: CelExpression,
pub amount: CelExpression,
pub enforcement_direction: CelExpression,
}
3 changes: 3 additions & 0 deletions cala-ledger-core-types/src/velocity/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
mod limit;

pub use limit::*;

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 21 additions & 0 deletions cala-ledger/migrations/20231208110808_cala_ledger_setup.sql
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,27 @@ CREATE TABLE cala_balance_history (
FOREIGN KEY (data_source_id, latest_entry_id) REFERENCES cala_entries(data_source_id, id)
);

CREATE TABLE cala_velocity_limits (
data_source_id UUID NOT NULL DEFAULT '00000000-0000-0000-0000-000000000000',
id UUID NOT NULL,
name VARCHAR NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE(data_source_id, id)
);
CREATE INDEX idx_cala_velocity_limits_name ON cala_velocity_limits (name);


CREATE TABLE cala_velocity_limit_events (
data_source_id UUID NOT NULL DEFAULT '00000000-0000-0000-0000-000000000000',
id UUID NOT NULL,
sequence INT NOT NULL,
event_type VARCHAR NOT NULL,
event JSONB NOT NULL,
recorded_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE(data_source_id, id, sequence),
FOREIGN KEY (data_source_id, id) REFERENCES cala_velocity_limits(data_source_id, id)
);

CREATE TABLE cala_outbox_events (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
sequence BIGSERIAL UNIQUE,
Expand Down
8 changes: 8 additions & 0 deletions cala-ledger/src/ledger/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use crate::{
primitives::TransactionId,
transaction::{Transaction, Transactions},
tx_template::{Params, TxTemplates},
velocity::Velocities,
};
#[cfg(feature = "import")]
mod import_deps {
Expand All @@ -37,6 +38,7 @@ pub struct CalaLedger {
transactions: Transactions,
tx_templates: TxTemplates,
entries: Entries,
velocities: Velocities,
balances: Balances,
outbox: Outbox,
#[allow(clippy::type_complexity)]
Expand Down Expand Up @@ -76,6 +78,7 @@ impl CalaLedger {
let transactions = Transactions::new(&pool, outbox.clone());
let entries = Entries::new(&pool, outbox.clone());
let balances = Balances::new(&pool, outbox.clone());
let velocities = Velocities::new(&pool, outbox.clone());
let account_sets = AccountSets::new(&pool, outbox.clone(), &accounts, &entries, &balances);
Ok(Self {
accounts,
Expand All @@ -86,6 +89,7 @@ impl CalaLedger {
transactions,
entries,
balances,
velocities,
outbox_handle: Arc::new(Mutex::new(outbox_handle)),
pool,
})
Expand All @@ -103,6 +107,10 @@ impl CalaLedger {
&self.accounts
}

pub fn velocities(&self) -> &Velocities {
&self.velocities
}

pub fn account_sets(&self) -> &AccountSets {
&self.account_sets
}
Expand Down
1 change: 1 addition & 0 deletions cala-ledger/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ pub mod journal;
pub mod migrate;
pub mod transaction;
pub mod tx_template;
pub mod velocity;

mod ledger;
pub mod outbox;
Expand Down
9 changes: 9 additions & 0 deletions cala-ledger/src/velocity/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
use thiserror::Error;

#[derive(Error, Debug)]
pub enum VelocityError {
#[error("VelocityError - Sqlx: {0}")]
Sqlx(#[from] sqlx::Error),
#[error("VelocityError - EntityError: {0}")]
EntityError(#[from] crate::entity::EntityError),
}
220 changes: 220 additions & 0 deletions cala-ledger/src/velocity/limit/entity.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
use cel_interpreter::CelExpression;
use derive_builder::Builder;
use serde::{Deserialize, Serialize};

pub use crate::{entity::*, param::definition::*};
pub use cala_types::{
primitives::{Currency, VelocityLimitId},
velocity::*,
};

#[derive(Debug, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum VelocityLimitEvent {
Initialized { values: VelocityLimitValues },
}

impl EntityEvent for VelocityLimitEvent {
type EntityId = VelocityLimitId;
fn event_table_name() -> &'static str {
"cala_velocity_limit_events"
}
}

#[derive(Builder)]
#[builder(pattern = "owned", build_fn(error = "EntityError"))]
pub struct VelocityLimit {
_values: VelocityLimitValues,
pub(super) _events: EntityEvents<VelocityLimitEvent>,
}

impl Entity for VelocityLimit {
type Event = VelocityLimitEvent;
}

impl TryFrom<EntityEvents<VelocityLimitEvent>> for VelocityLimit {
type Error = EntityError;

fn try_from(events: EntityEvents<VelocityLimitEvent>) -> Result<Self, Self::Error> {
let mut builder = VelocityLimitBuilder::default();
for event in events.iter() {
match event {
VelocityLimitEvent::Initialized { values } => {
builder = builder._values(values.clone());
}
}
}
builder._events(events).build()
}
}

/// Representation of a ***new*** velocity limit entity with required/optional properties and a builder.
#[derive(Builder, Debug)]
#[builder(build_fn(validate = "Self::validate"))]
pub struct NewVelocityLimit {
#[builder(setter(into))]
pub(super) id: VelocityLimitId,
#[builder(setter(into))]
pub(super) name: String,
#[builder(setter(into))]
description: String,
window: Vec<NewPartitionKeyInput>,
#[builder(setter(strip_option, into), default)]
condition: Option<String>,
currency: Option<Currency>,
#[builder(setter(strip_option), default)]
params: Option<Vec<NewParamDefinition>>,
limit: NewLimitInput,
}

impl NewVelocityLimit {
pub fn builder() -> NewVelocityLimitBuilder {
NewVelocityLimitBuilder::default()
}

pub(super) fn initial_events(self) -> EntityEvents<VelocityLimitEvent> {
let limit = self.limit;
EntityEvents::init(
self.id,
[VelocityLimitEvent::Initialized {
values: VelocityLimitValues {
id: self.id,
name: self.name,
description: self.description,
currency: self.currency,
window: self
.window
.into_iter()
.map(|input| PartitionKeyInput {
alias: input.alias,
value: CelExpression::try_from(input.value).expect("already validated"),
})
.collect(),
condition: self
.condition
.map(|expr| CelExpression::try_from(expr).expect("already validated")),
params: self
.params
.map(|params| params.into_iter().map(ParamDefinition::from).collect()),
limit: LimitInput {
timestamp_source: limit
.timestamp_source
.map(CelExpression::try_from)
.transpose()
.expect("already validated"),
balance: limit
.balance
.into_iter()
.map(|input| BalanceLimitInput {
layer: CelExpression::try_from(input.layer)
.expect("already validated"),
amount: CelExpression::try_from(input.amount)
.expect("already validated"),
enforcement_direction: CelExpression::try_from(
input.enforcement_direction,
)
.expect("already validated"),
})
.collect(),
},
},
}],
)
}
}

impl NewVelocityLimitBuilder {
fn validate(&self) -> Result<(), String> {
validate_optional_expression(&self.condition)?;
Ok(())
}
}

#[derive(Clone, Builder, Debug)]
#[builder(build_fn(validate = "Self::validate"))]
pub struct NewPartitionKeyInput {
#[builder(setter(into))]
alias: String,
#[builder(setter(into))]
value: String,
}
impl NewPartitionKeyInput {
pub fn builder() -> NewPartitionKeyInputBuilder {
NewPartitionKeyInputBuilder::default()
}
}
impl NewPartitionKeyInputBuilder {
fn validate(&self) -> Result<(), String> {
validate_expression(
self.value
.as_ref()
.expect("Mandatory field 'value' not set"),
)?;
Ok(())
}
}

#[derive(Clone, Builder, Debug)]
#[builder(build_fn(validate = "Self::validate"))]
pub struct NewLimitInput {
#[builder(setter(strip_option, into), default)]
timestamp_source: Option<String>,
balance: Vec<NewBalanceLimitInput>,
}
impl NewLimitInput {
pub fn builder() -> NewLimitInputBuilder {
NewLimitInputBuilder::default()
}
}
impl NewLimitInputBuilder {
fn validate(&self) -> Result<(), String> {
validate_optional_expression(&self.timestamp_source)
}
}

#[derive(Clone, Builder, Debug)]
#[builder(build_fn(validate = "Self::validate"))]
pub struct NewBalanceLimitInput {
#[builder(setter(into))]
layer: String,
#[builder(setter(into))]
amount: String,
#[builder(setter(into))]
enforcement_direction: String,
}
impl NewBalanceLimitInput {
pub fn builder() -> NewBalanceLimitInputBuilder {
NewBalanceLimitInputBuilder::default()
}
}
impl NewBalanceLimitInputBuilder {
fn validate(&self) -> Result<(), String> {
validate_expression(
self.layer
.as_ref()
.expect("Mandatory field 'value' not set"),
)?;
validate_expression(
self.amount
.as_ref()
.expect("Mandatory field 'value' not set"),
)?;
validate_expression(
self.enforcement_direction
.as_ref()
.expect("Mandatory field 'value' not set"),
)?;
Ok(())
}
}

fn validate_expression(expr: &str) -> Result<(), String> {
CelExpression::try_from(expr).map_err(|e| e.to_string())?;
Ok(())
}
fn validate_optional_expression(expr: &Option<Option<String>>) -> Result<(), String> {
if let Some(Some(expr)) = expr.as_ref() {
CelExpression::try_from(expr.as_str()).map_err(|e| e.to_string())?;
}
Ok(())
}
5 changes: 5 additions & 0 deletions cala-ledger/src/velocity/limit/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
mod entity;
mod repo;

pub use entity::*;
pub(super) use repo::*;
Loading

0 comments on commit 4a37fb1

Please sign in to comment.