Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add sets query on account sets #116

Merged
merged 2 commits into from
Jun 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

48 changes: 41 additions & 7 deletions cala-ledger/src/account_set/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,18 +291,52 @@ impl AccountSets {
}

#[instrument(
name = "cala_ledger.account_sets.find_where_account_is_member",
skip(self),
name = "cala_ledger.account_sets.find_where_member",
skip(self, member),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't think we should skip member... you can add + std::fmt::Debug to the impl

err
)]
pub async fn find_where_account_is_member(
pub async fn find_where_member(
&self,
account_id: AccountId,
member: impl Into<AccountSetMember>,
query: PaginatedQueryArgs<AccountSetByNameCursor>,
) -> Result<PaginatedQueryRet<AccountSet, AccountSetByNameCursor>, AccountSetError> {
self.repo
.find_where_account_is_member(account_id, query)
.await
match member.into() {
AccountSetMember::Account(account_id) => {
self.repo
.find_where_account_is_member(account_id, query)
.await
}
AccountSetMember::AccountSet(account_set_id) => {
self.repo
.find_where_account_set_is_member(account_set_id, query)
.await
}
}
}

#[instrument(
name = "cala_ledger.account_sets.find_where_member_in_op",
skip(self, op, member),
err
)]
pub async fn find_where_member_in_op(
&self,
op: &mut AtomicOperation<'_>,
member: impl Into<AccountSetMember>,
query: PaginatedQueryArgs<AccountSetByNameCursor>,
) -> Result<PaginatedQueryRet<AccountSet, AccountSetByNameCursor>, AccountSetError> {
match member.into() {
AccountSetMember::Account(account_id) => {
self.repo
.find_where_account_is_member_in_tx(op.tx(), account_id, query)
.await
}
AccountSetMember::AccountSet(account_set_id) => {
self.repo
.find_where_account_set_is_member_in_tx(op.tx(), account_set_id, query)
.await
}
}
}

pub(crate) async fn fetch_mappings(
Expand Down
92 changes: 90 additions & 2 deletions cala-ledger/src/account_set/repo.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use chrono::{DateTime, Utc};
use sqlx::{PgPool, Postgres, Transaction};
use sqlx::{Executor, PgPool, Postgres, Transaction};

use std::collections::HashMap;

Expand Down Expand Up @@ -367,6 +367,26 @@ impl AccountSetRepo {
&self,
account_id: AccountId,
query: query::PaginatedQueryArgs<AccountSetByNameCursor>,
) -> Result<query::PaginatedQueryRet<AccountSet, AccountSetByNameCursor>, AccountSetError> {
self.find_where_account_is_member_in_executor(&self.pool, account_id, query)
.await
}

pub async fn find_where_account_is_member_in_tx(
&self,
tx: &mut Transaction<'_, Postgres>,
account_id: AccountId,
query: query::PaginatedQueryArgs<AccountSetByNameCursor>,
) -> Result<query::PaginatedQueryRet<AccountSet, AccountSetByNameCursor>, AccountSetError> {
self.find_where_account_is_member_in_executor(&mut **tx, account_id, query)
.await
}

async fn find_where_account_is_member_in_executor(
&self,
executor: impl Executor<'_, Database = Postgres>,
account_id: AccountId,
query: query::PaginatedQueryArgs<AccountSetByNameCursor>,
) -> Result<query::PaginatedQueryRet<AccountSet, AccountSetByNameCursor>, AccountSetError> {
let rows = sqlx::query_as!(
GenericEvent,
Expand All @@ -393,7 +413,75 @@ impl AccountSetRepo {
query.after.map(|c| c.name),
query.first as i64 + 1
)
.fetch_all(&self.pool)
.fetch_all(executor)
.await?;

let (entities, has_next_page) = EntityEvents::load_n::<AccountSet>(rows, query.first)?;
let mut end_cursor = None;
if let Some(last) = entities.last() {
end_cursor = Some(AccountSetByNameCursor {
id: last.values().id,
name: last.values().name.clone(),
});
}
Ok(query::PaginatedQueryRet {
entities,
has_next_page,
end_cursor,
})
}

pub async fn find_where_account_set_is_member(
&self,
account_set_id: AccountSetId,
query: query::PaginatedQueryArgs<AccountSetByNameCursor>,
) -> Result<query::PaginatedQueryRet<AccountSet, AccountSetByNameCursor>, AccountSetError> {
self.find_where_account_set_is_member_in_executor(&self.pool, account_set_id, query)
.await
}

pub async fn find_where_account_set_is_member_in_tx(
&self,
tx: &mut Transaction<'_, Postgres>,
account_set_id: AccountSetId,
query: query::PaginatedQueryArgs<AccountSetByNameCursor>,
) -> Result<query::PaginatedQueryRet<AccountSet, AccountSetByNameCursor>, AccountSetError> {
self.find_where_account_set_is_member_in_executor(&mut **tx, account_set_id, query)
.await
}

async fn find_where_account_set_is_member_in_executor(
&self,
executor: impl Executor<'_, Database = Postgres>,
account_set_id: AccountSetId,
query: query::PaginatedQueryArgs<AccountSetByNameCursor>,
) -> Result<query::PaginatedQueryRet<AccountSet, AccountSetByNameCursor>, AccountSetError> {
let rows = sqlx::query_as!(
GenericEvent,
r#"
WITH member_account_sets AS (
SELECT a.id, a.name, a.created_at
FROM cala_account_set_member_account_sets asm
JOIN cala_account_sets a ON asm.data_source_id = a.data_source_id AND
asm.account_set_id = a.id
WHERE asm.data_source_id = '00000000-0000-0000-0000-000000000000' AND
asm.member_account_set_id = $1
AND ((a.name, a.id) > ($3, $2) OR ($3 IS NULL AND $2 IS NULL))
ORDER BY a.name, a.id
LIMIT $4
)
SELECT mas.id, e.sequence, e.event,
mas.created_at AS entity_created_at, e.recorded_at AS event_recorded_at
FROM member_account_sets mas
JOIN cala_account_set_events e ON mas.id = e.id
ORDER BY mas.name, mas.id, e.sequence
"#,
account_set_id as AccountSetId,
query.after.as_ref().map(|c| c.id) as Option<AccountSetId>,
query.after.map(|c| c.name),
query.first as i64 + 1
)
.fetch_all(executor)
.await?;

let (entities, has_next_page) = EntityEvents::load_n::<AccountSet>(rows, query.first)?;
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions cala-server/schema.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ type AccountSet {
createdAt: Timestamp!
modifiedAt: Timestamp!
balance(currency: CurrencyCode!): Balance
sets(first: Int!, after: String): AccountSetConnection!
}

type AccountSetConnection {
Expand Down
33 changes: 21 additions & 12 deletions cala-server/src/graphql/account.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,25 +68,34 @@ impl Account {
after: Option<String>,
) -> Result<Connection<AccountSetByNameCursor, AccountSet, EmptyFields, EmptyFields>> {
let app = ctx.data_unchecked::<CalaApp>();
let account_id = AccountId::from(self.account_id);
query(
after,
None,
Some(first),
None,
|after, _, first, _| async move {
let first = first.expect("First always exists");
let result = app
.ledger()
.account_sets()
.find_where_account_is_member(
self.account_id.into(),
cala_ledger::query::PaginatedQueryArgs {
first,
after: after
.map(cala_ledger::account_set::AccountSetByNameCursor::from),
},
)
.await?;
let query_args = cala_ledger::query::PaginatedQueryArgs {
first,
after: after.map(cala_ledger::account_set::AccountSetByNameCursor::from),
};

let result = match ctx.data_opt::<DbOp>() {
Some(op) => {
let mut op = op.try_lock().expect("Lock held concurrently");
app.ledger()
.account_sets()
.find_where_member_in_op(&mut op, account_id, query_args)
.await?
}
None => {
app.ledger()
.account_sets()
.find_where_member(account_id, query_args)
.await?
}
};
let mut connection = Connection::new(false, result.has_next_page);
connection
.edges
Expand Down
51 changes: 51 additions & 0 deletions cala-server/src/graphql/account_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,57 @@ impl AccountSet {
};
Ok(balance.map(Balance::from))
}

async fn sets(
&self,
ctx: &Context<'_>,
first: i32,
after: Option<String>,
) -> Result<Connection<AccountSetByNameCursor, AccountSet, EmptyFields, EmptyFields>> {
let app = ctx.data_unchecked::<CalaApp>();
let account_set_id = AccountSetId::from(self.account_set_id);

query(
after.clone(),
None,
Some(first),
None,
|after, _, first, _| async move {
let first = first.expect("First always exists");
let query_args = cala_ledger::query::PaginatedQueryArgs {
first,
after: after.map(cala_ledger::account_set::AccountSetByNameCursor::from),
};

let result = match ctx.data_opt::<DbOp>() {
Some(op) => {
let mut op = op.try_lock().expect("Lock held concurrently");
app.ledger()
.account_sets()
.find_where_member_in_op(&mut op, account_set_id, query_args)
.await?
}
None => {
app.ledger()
.account_sets()
.find_where_member(account_set_id, query_args)
.await?
}
};

let mut connection = Connection::new(false, result.has_next_page);
connection
.edges
.extend(result.entities.into_iter().map(|entity| {
let cursor = AccountSetByNameCursor::from(entity.values());
Edge::new(cursor, AccountSet::from(entity))
}));

Ok::<_, async_graphql::Error>(connection)
},
)
.await
}
}

#[derive(InputObject)]
Expand Down
Loading