Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pagination queries for balances endpoint #2490

Open
wants to merge 20 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
- [2327](https://github.com/FuelLabs/fuel-core/pull/2327): Add more services tests and more checks of the pool. Also add an high level documentation for users of the pool and contributors.
- [2416](https://github.com/FuelLabs/fuel-core/issues/2416): Define the `GasPriceServiceV1` task.
- [2033](https://github.com/FuelLabs/fuel-core/pull/2033): Remove `Option<BlockHeight>` in favor of `BlockHeightQuery` where applicable.
- [2490](https://github.com/FuelLabs/fuel-core/pull/2490): Added pagination support for the `balances` GraphQL query, available only when 'balances indexation' is enabled.
- [2472](https://github.com/FuelLabs/fuel-core/pull/2472): Added the `amountU128` field to the `Balance` GraphQL schema, providing the total balance as a `U128`. The existing `amount` field clamps any balance exceeding `U64` to `u64::MAX`.

### Fixed
Expand Down
9 changes: 5 additions & 4 deletions crates/fuel-core/src/graphql_api/ports.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,13 @@ pub trait OffChainDatabase: Send + Sync {
base_asset_id: &AssetId,
) -> StorageResult<TotalBalanceAmount>;

fn balances(
&self,
fn balances<'a>(
&'a self,
owner: &Address,
base_asset_id: &AssetId,
start: Option<AssetId>,
base_asset_id: &'a AssetId,
direction: IterDirection,
) -> BoxedIter<'_, StorageResult<(AssetId, TotalBalanceAmount)>>;
) -> BoxedIter<'a, StorageResult<(AssetId, TotalBalanceAmount)>>;

fn owned_coins_ids(
&self,
Expand Down
24 changes: 15 additions & 9 deletions crates/fuel-core/src/query/balance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,14 @@ impl ReadView {
pub fn balances<'a>(
&'a self,
owner: &'a Address,
start: Option<AssetId>,
direction: IterDirection,
base_asset_id: &'a AssetId,
) -> impl Stream<Item = StorageResult<AddressBalance>> + 'a {
if self.balances_enabled {
futures::future::Either::Left(self.balances_with_cache(
owner,
start,
base_asset_id,
direction,
))
Expand Down Expand Up @@ -140,17 +142,21 @@ impl ReadView {
fn balances_with_cache<'a>(
&'a self,
owner: &'a Address,
base_asset_id: &AssetId,
start: Option<AssetId>,
base_asset_id: &'a AssetId,
direction: IterDirection,
) -> impl Stream<Item = StorageResult<AddressBalance>> + 'a {
stream::iter(self.off_chain.balances(owner, base_asset_id, direction))
.map(move |result| {
result.map(|(asset_id, amount)| AddressBalance {
owner: *owner,
asset_id,
amount,
})
stream::iter(
self.off_chain
.balances(owner, start, base_asset_id, direction),
)
.map(move |result| {
result.map(|(asset_id, amount)| AddressBalance {
owner: *owner,
asset_id,
amount,
})
.yield_each(self.batch_size)
})
.yield_each(self.batch_size)
}
}
15 changes: 8 additions & 7 deletions crates/fuel-core/src/schema/balance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,6 @@ impl BalanceQuery {
Ok(balance)
}

// TODO: This API should be migrated to the indexer for better support and
// discontinued within fuel-core.
#[graphql(complexity = "query_costs().balance_query")]
async fn balances(
&self,
Expand All @@ -92,18 +90,21 @@ impl BalanceQuery {
before: Option<String>,
) -> async_graphql::Result<Connection<AssetId, Balance, EmptyFields, EmptyFields>>
{
if before.is_some() || after.is_some() {
return Err(anyhow!("pagination is not yet supported").into())
}
let query = ctx.read_view()?;
if !query.balances_enabled && (before.is_some() || after.is_some()) {
return Err(anyhow!(
"Can not use pagination when balances indexation is not available"
)
.into())
}
let base_asset_id = *ctx
.data_unchecked::<ConsensusProvider>()
.latest_consensus_params()
.base_asset_id();
let owner = filter.owner.into();
crate::schema::query_pagination(after, before, first, last, |_, direction| {
crate::schema::query_pagination(after, before, first, last, |start, direction| {
Ok(query
.balances(&owner, direction, &base_asset_id)
.balances(&owner, (*start).map(Into::into), direction, &base_asset_id)
.map(|result| {
result.map(|balance| (balance.asset_id.into(), balance.into()))
}))
Expand Down
102 changes: 68 additions & 34 deletions crates/fuel-core/src/service/adapters/graphql_api/off_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,15 +233,48 @@ impl OffChainDatabase for OffChainIterableKeyValueView {
}
}

fn balances(
&self,
fn balances<'a>(
&'a self,
owner: &Address,
base_asset_id: &AssetId,
start: Option<AssetId>,
base_asset_id: &'a AssetId,
direction: IterDirection,
) -> BoxedIter<'_, StorageResult<(AssetId, TotalBalanceAmount)>> {
) -> BoxedIter<'a, StorageResult<(AssetId, TotalBalanceAmount)>> {
match (direction, start) {
(IterDirection::Forward, None) => {
let base_asset_balance = self.base_asset_balance(base_asset_id, owner);
let non_base_asset_balance =
self.non_base_asset_balance(owner, None, direction, base_asset_id);
base_asset_balance
.chain(non_base_asset_balance)
.into_boxed()
}
(IterDirection::Forward, Some(asset_id)) => {
let start = CoinBalancesKey::new(owner, &asset_id);
self.non_base_asset_balance(owner, Some(start), direction, base_asset_id)
}
(IterDirection::Reverse, _) => {
let start = start.map(|asset_id| CoinBalancesKey::new(owner, &asset_id));
let base_asset_balance = self.base_asset_balance(base_asset_id, owner);
let non_base_asset_balance =
self.non_base_asset_balance(owner, start, direction, base_asset_id);
non_base_asset_balance
.chain(base_asset_balance)
.into_boxed()
}
}
}
}

impl OffChainIterableKeyValueView {
fn base_asset_balance(
&self,
base_asset_id: &AssetId,
owner: &Address,
) -> BoxedIter<'_, Result<(AssetId, u128), StorageError>> {
let base_asset_id = *base_asset_id;
let base_balance = self.balance(owner, &base_asset_id, &base_asset_id);
let base_asset_balance = match base_balance {
match base_balance {
Ok(base_asset_balance) => {
if base_asset_balance != 0 {
iter::once(Ok((base_asset_id, base_asset_balance))).into_boxed()
Expand All @@ -250,37 +283,38 @@ impl OffChainDatabase for OffChainIterableKeyValueView {
}
}
Err(err) => iter::once(Err(err)).into_boxed(),
};
}
}

let non_base_asset_balance = self
.iter_all_filtered_keys::<CoinBalances, _>(Some(owner), None, Some(direction))
.filter_map(move |result| match result {
Ok(key) if *key.asset_id() != base_asset_id => Some(Ok(key)),
Ok(_) => None,
Err(err) => Some(Err(err)),
})
.map(move |result| {
result.and_then(|key| {
let asset_id = key.asset_id();
let coin_balance =
self.storage_as_ref::<CoinBalances>()
.get(&key)?
.unwrap_or_default()
.into_owned() as TotalBalanceAmount;
Ok((*asset_id, coin_balance))
})
fn non_base_asset_balance<'a>(
&'a self,
owner: &Address,
start: Option<CoinBalancesKey>,
direction: IterDirection,
base_asset_id: &'a AssetId,
) -> BoxedIter<'_, Result<(AssetId, u128), StorageError>> {
self.iter_all_filtered_keys::<CoinBalances, _>(
Some(owner),
start.as_ref(),
Some(direction),
)
.filter_map(move |result| match result {
Ok(key) if *key.asset_id() != *base_asset_id => Some(Ok(key)),
Ok(_) => None,
Err(err) => Some(Err(err)),
})
.map(move |result| {
result.and_then(|key| {
let asset_id = key.asset_id();
let coin_balance =
self.storage_as_ref::<CoinBalances>()
.get(&key)?
.unwrap_or_default()
.into_owned() as TotalBalanceAmount;
Ok((*asset_id, coin_balance))
})
.into_boxed();

if direction == IterDirection::Forward {
base_asset_balance
.chain(non_base_asset_balance)
.into_boxed()
} else {
non_base_asset_balance
.chain(base_asset_balance)
.into_boxed()
}
})
.into_boxed()
}
}

Expand Down
2 changes: 1 addition & 1 deletion crates/services/gas_price_service/src/common/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ pub enum Error {
},
#[error("Failed to initialize updater: {0:?}")]
CouldNotInitUpdater(anyhow::Error),
#[error("Failed to convert metadata to concrete type. THere is no migration path for this metadata version")]
#[error("Failed to convert metadata to concrete type. There is no migration path for this metadata version")]
CouldNotConvertMetadata, // todo(https://github.com/FuelLabs/fuel-core/issues/2286)
}

Expand Down
2 changes: 1 addition & 1 deletion crates/services/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ where
match shutdown.catch_unwind().await {
Ok(Ok(_)) => {}
Ok(Err(e)) => {
tracing::error!("Go an error during shutdown of the task: {e}");
tracing::error!("Got an error during shutdown of the task: {e}");
}
Err(e) => {
if got_panic.is_some() {
Expand Down
Loading
Loading