Skip to content

Commit

Permalink
feat: add sets query on account sets (#116)
Browse files Browse the repository at this point in the history
* feat: add sets query on account sets

* refactor: unify queries at service layer
  • Loading branch information
thevaibhav-dixit authored Jun 17, 2024
1 parent 54a5e1d commit a3a3624
Show file tree
Hide file tree
Showing 7 changed files with 302 additions and 21 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

48 changes: 41 additions & 7 deletions cala-ledger/src/account_set/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,18 +291,52 @@ impl AccountSets {
}

#[instrument(
name = "cala_ledger.account_sets.find_where_account_is_member",
skip(self),
name = "cala_ledger.account_sets.find_where_member",
skip(self, member),
err
)]
pub async fn find_where_account_is_member(
pub async fn find_where_member(
&self,
account_id: AccountId,
member: impl Into<AccountSetMember>,
query: PaginatedQueryArgs<AccountSetByNameCursor>,
) -> Result<PaginatedQueryRet<AccountSet, AccountSetByNameCursor>, AccountSetError> {
self.repo
.find_where_account_is_member(account_id, query)
.await
match member.into() {
AccountSetMember::Account(account_id) => {
self.repo
.find_where_account_is_member(account_id, query)
.await
}
AccountSetMember::AccountSet(account_set_id) => {
self.repo
.find_where_account_set_is_member(account_set_id, query)
.await
}
}
}

#[instrument(
name = "cala_ledger.account_sets.find_where_member_in_op",
skip(self, op, member),
err
)]
pub async fn find_where_member_in_op(
&self,
op: &mut AtomicOperation<'_>,
member: impl Into<AccountSetMember>,
query: PaginatedQueryArgs<AccountSetByNameCursor>,
) -> Result<PaginatedQueryRet<AccountSet, AccountSetByNameCursor>, AccountSetError> {
match member.into() {
AccountSetMember::Account(account_id) => {
self.repo
.find_where_account_is_member_in_tx(op.tx(), account_id, query)
.await
}
AccountSetMember::AccountSet(account_set_id) => {
self.repo
.find_where_account_set_is_member_in_tx(op.tx(), account_set_id, query)
.await
}
}
}

pub(crate) async fn fetch_mappings(
Expand Down
92 changes: 90 additions & 2 deletions cala-ledger/src/account_set/repo.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use chrono::{DateTime, Utc};
use sqlx::{PgPool, Postgres, Transaction};
use sqlx::{Executor, PgPool, Postgres, Transaction};

use std::collections::HashMap;

Expand Down Expand Up @@ -367,6 +367,26 @@ impl AccountSetRepo {
&self,
account_id: AccountId,
query: query::PaginatedQueryArgs<AccountSetByNameCursor>,
) -> Result<query::PaginatedQueryRet<AccountSet, AccountSetByNameCursor>, AccountSetError> {
self.find_where_account_is_member_in_executor(&self.pool, account_id, query)
.await
}

pub async fn find_where_account_is_member_in_tx(
&self,
tx: &mut Transaction<'_, Postgres>,
account_id: AccountId,
query: query::PaginatedQueryArgs<AccountSetByNameCursor>,
) -> Result<query::PaginatedQueryRet<AccountSet, AccountSetByNameCursor>, AccountSetError> {
self.find_where_account_is_member_in_executor(&mut **tx, account_id, query)
.await
}

async fn find_where_account_is_member_in_executor(
&self,
executor: impl Executor<'_, Database = Postgres>,
account_id: AccountId,
query: query::PaginatedQueryArgs<AccountSetByNameCursor>,
) -> Result<query::PaginatedQueryRet<AccountSet, AccountSetByNameCursor>, AccountSetError> {
let rows = sqlx::query_as!(
GenericEvent,
Expand All @@ -393,7 +413,75 @@ impl AccountSetRepo {
query.after.map(|c| c.name),
query.first as i64 + 1
)
.fetch_all(&self.pool)
.fetch_all(executor)
.await?;

let (entities, has_next_page) = EntityEvents::load_n::<AccountSet>(rows, query.first)?;
let mut end_cursor = None;
if let Some(last) = entities.last() {
end_cursor = Some(AccountSetByNameCursor {
id: last.values().id,
name: last.values().name.clone(),
});
}
Ok(query::PaginatedQueryRet {
entities,
has_next_page,
end_cursor,
})
}

pub async fn find_where_account_set_is_member(
&self,
account_set_id: AccountSetId,
query: query::PaginatedQueryArgs<AccountSetByNameCursor>,
) -> Result<query::PaginatedQueryRet<AccountSet, AccountSetByNameCursor>, AccountSetError> {
self.find_where_account_set_is_member_in_executor(&self.pool, account_set_id, query)
.await
}

pub async fn find_where_account_set_is_member_in_tx(
&self,
tx: &mut Transaction<'_, Postgres>,
account_set_id: AccountSetId,
query: query::PaginatedQueryArgs<AccountSetByNameCursor>,
) -> Result<query::PaginatedQueryRet<AccountSet, AccountSetByNameCursor>, AccountSetError> {
self.find_where_account_set_is_member_in_executor(&mut **tx, account_set_id, query)
.await
}

async fn find_where_account_set_is_member_in_executor(
&self,
executor: impl Executor<'_, Database = Postgres>,
account_set_id: AccountSetId,
query: query::PaginatedQueryArgs<AccountSetByNameCursor>,
) -> Result<query::PaginatedQueryRet<AccountSet, AccountSetByNameCursor>, AccountSetError> {
let rows = sqlx::query_as!(
GenericEvent,
r#"
WITH member_account_sets AS (
SELECT a.id, a.name, a.created_at
FROM cala_account_set_member_account_sets asm
JOIN cala_account_sets a ON asm.data_source_id = a.data_source_id AND
asm.account_set_id = a.id
WHERE asm.data_source_id = '00000000-0000-0000-0000-000000000000' AND
asm.member_account_set_id = $1
AND ((a.name, a.id) > ($3, $2) OR ($3 IS NULL AND $2 IS NULL))
ORDER BY a.name, a.id
LIMIT $4
)
SELECT mas.id, e.sequence, e.event,
mas.created_at AS entity_created_at, e.recorded_at AS event_recorded_at
FROM member_account_sets mas
JOIN cala_account_set_events e ON mas.id = e.id
ORDER BY mas.name, mas.id, e.sequence
"#,
account_set_id as AccountSetId,
query.after.as_ref().map(|c| c.id) as Option<AccountSetId>,
query.after.map(|c| c.name),
query.first as i64 + 1
)
.fetch_all(executor)
.await?;

let (entities, has_next_page) = EntityEvents::load_n::<AccountSet>(rows, query.first)?;
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions cala-server/schema.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ type AccountSet {
createdAt: Timestamp!
modifiedAt: Timestamp!
balance(currency: CurrencyCode!): Balance
sets(first: Int!, after: String): AccountSetConnection!
}

type AccountSetConnection {
Expand Down
33 changes: 21 additions & 12 deletions cala-server/src/graphql/account.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,25 +68,34 @@ impl Account {
after: Option<String>,
) -> Result<Connection<AccountSetByNameCursor, AccountSet, EmptyFields, EmptyFields>> {
let app = ctx.data_unchecked::<CalaApp>();
let account_id = AccountId::from(self.account_id);
query(
after,
None,
Some(first),
None,
|after, _, first, _| async move {
let first = first.expect("First always exists");
let result = app
.ledger()
.account_sets()
.find_where_account_is_member(
self.account_id.into(),
cala_ledger::query::PaginatedQueryArgs {
first,
after: after
.map(cala_ledger::account_set::AccountSetByNameCursor::from),
},
)
.await?;
let query_args = cala_ledger::query::PaginatedQueryArgs {
first,
after: after.map(cala_ledger::account_set::AccountSetByNameCursor::from),
};

let result = match ctx.data_opt::<DbOp>() {
Some(op) => {
let mut op = op.try_lock().expect("Lock held concurrently");
app.ledger()
.account_sets()
.find_where_member_in_op(&mut op, account_id, query_args)
.await?
}
None => {
app.ledger()
.account_sets()
.find_where_member(account_id, query_args)
.await?
}
};
let mut connection = Connection::new(false, result.has_next_page);
connection
.edges
Expand Down
51 changes: 51 additions & 0 deletions cala-server/src/graphql/account_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,57 @@ impl AccountSet {
};
Ok(balance.map(Balance::from))
}

async fn sets(
&self,
ctx: &Context<'_>,
first: i32,
after: Option<String>,
) -> Result<Connection<AccountSetByNameCursor, AccountSet, EmptyFields, EmptyFields>> {
let app = ctx.data_unchecked::<CalaApp>();
let account_set_id = AccountSetId::from(self.account_set_id);

query(
after.clone(),
None,
Some(first),
None,
|after, _, first, _| async move {
let first = first.expect("First always exists");
let query_args = cala_ledger::query::PaginatedQueryArgs {
first,
after: after.map(cala_ledger::account_set::AccountSetByNameCursor::from),
};

let result = match ctx.data_opt::<DbOp>() {
Some(op) => {
let mut op = op.try_lock().expect("Lock held concurrently");
app.ledger()
.account_sets()
.find_where_member_in_op(&mut op, account_set_id, query_args)
.await?
}
None => {
app.ledger()
.account_sets()
.find_where_member(account_set_id, query_args)
.await?
}
};

let mut connection = Connection::new(false, result.has_next_page);
connection
.edges
.extend(result.entities.into_iter().map(|entity| {
let cursor = AccountSetByNameCursor::from(entity.values());
Edge::new(cursor, AccountSet::from(entity))
}));

Ok::<_, async_graphql::Error>(connection)
},
)
.await
}
}

#[derive(InputObject)]
Expand Down

0 comments on commit a3a3624

Please sign in to comment.