-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add new provider api and tests #637
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking really good
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
I have added test for the missing APIs: |
Maybe this PR would be a good place to discuss if the user should be able to change the pagination parameters. Currently, most of the APIs will return all available elements e.g. |
In the last commit, I have changed the key of the hashmap that the // old way
let balances: HashMap<String, u64> = wallet.get_balances().await?;
let asset_id_hex_key = format!("{:#x}", asset_id);
balances.contains_key(&asset_id_hex_key);
// new way
let balances: HashMap<AssetId, u64> = wallet.get_balances().await?;
assert!(balances.contains_key(&asset_id)); |
I've been pondering pagination as well. What I would like to avoid:
While working on #598 I came up with something like this: fn paginate_all<T, F, Fut, S>(
supplier: F,
request_template: PaginationRequest<S>,
) -> impl TryStream<Ok = T, Error = ProviderError>
where
F: Fn(PaginationRequest<S>) -> Fut,
Fut: Future<Output = Result<PaginatedResult<T, S>, ProviderError>>,
S: Clone,
{
let supplier = Rc::new(supplier);
let template = request_template;
stream::try_unfold((true, None), move |state| {
let supplier = Rc::clone(&supplier);
let template = template.clone();
async move {
let (has_next_page, cursor) = state;
if has_next_page {
let request = PaginationRequest { cursor, ..template };
supplier(request)
.await
.map(|e| Some((e.results, (e.has_next_page, e.cursor))))
} else {
Ok(None)
}
}
})
.map_ok(|e| stream::iter(e).map(Ok))
.try_flatten()
.into_stream()
} You would use it like so: let request_template = PaginationRequest::<String> {
cursor: None,
results: 100,
direction: PageDirection::Forward,
};
let a = paginate_all(
|req| async move { self.get_transactions_by_owner(address, req).await },
request_template,
)
.map_ok(|resp| resp.transaction.id().to_string())
.map_ok(move |id: String| async move {
self.client
.receipts(&id)
.await
.map_err(ProviderError::ClientRequestError)
})
.try_buffered(max_concurrent_requests.unwrap_or(5))
.try_collect::<Vec<_>>()
.await
.unwrap();
This allows you to wrap But it returns a The above result amounts to the following behavior:
The stream API is powerful enough that even if I had to paginate the second API request, I could have easily done so and flattened the results so that the end user still only sees a stream of Receipts. So, we might do something like this, where our APIs will accept the normal API parameters, and an optional paging template. We will return a pub fn get_something(arg1, arg2, paging: Option<PagingRequest>) -> impl Stream <Item=Something> { // ...
} This way we still expose the pagination to the user but make the interface more forgivable. If they ever abandon the stream it will not continue endlessly doing requests since it is lazy :) Dunno. Haven't thought too much about it, just ran into the problem of combining two potentially paging APIs in a friendly way. |
This is really cool @segfault-magnet, I love it. But, I believe we should implement and experiment with this on the events querying issue you're working on; it's a lot more likely the amount logs will be huge, and users will likely want to query it in near real-time, which justifies using streams and having this wonderful interface you came up with. For the case of the current |
In the last commit, I have added a POC for the new provider API. The code is a bit more complicated than before but the UX is IMO much better. The idea was to give the user the possibility to do the pagination but still make the new API clean and easy to use. let coins = provider.get_coins(wallet.address(), asset_id).await?; with the new API the call looks like this let num_results = 2;
let coins = provider
.get_coins_new(wallet.address(), asset_id, num_results)
.call()
.await?
.results; we use a builder pattern to create a let paginated_result = provider
.get_coins_new(wallet.address(), asset_id, 4)
.call()
.await?;
dbg!(paginated_result.results.len());
let paginated_result = provider
.get_coins_new(wallet.address(), asset_id, 2)
.with_cursor(paginated_result.cursor)
.with_direction(PageDirection::Backward)
.call()
.await?;
dbg!(paginated_result.results.len()); Now about the code. If you look at the type BoxFnFuture<'a, T, U> = Box<dyn Fn(PaginationRequest<T>) -> BoxFutureResult<'a, U> + 'a>;
type BoxFutureResult<'a, U> = Box<dyn Future<Output = Result<U, ProviderError>> + 'a + Unpin>;
pub struct ProviderPaginationCaller<'a, T, U>
where
T: Debug,
{
/// The cursor returned from a previous query to indicate an offset
pub cursor: Option<T>,
/// The number of results to take
pub results: usize,
/// The direction of the query (e.g. asc, desc order).
pub direction: PageDirection,
// The function to call
pub function: BoxFnFuture<'a, T, U>,
} |
Talking strictly related to the UX of this new API (and not related to the actual implementation), I believe the API has a nicer UX this way. An important context to remember is how this would look like without @hal3e's proposal. We would need to always create a let pagination_request = PaginationRequest { // I have to import this
cursor: None, // Always passing None until I'm actually paginating
results: 100,
direction: PageDirection::Forward, // And also import this one;
};
let coins = provider
.get_coins_new(wallet.address(), asset_id, pagination_request) // Always need to pass a pagination request.
.await?
.results; So, as we can see, we gain something by not having to With @hal3e's proposal, we drop the mandatory IMO, the Let me know your thoughts @FuelLabs/sdk-rust. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Love what was done.
Perhaps this can evolve further into call
accepting a &mut self
and preparing the struct for getting the next page so that one might exhaust the endpoint without having to create a bunch of pagination structs.
asset_id: &'a AssetId, | ||
num_results: usize, | ||
) -> ProviderPaginationCaller<String, PaginatedResult<Coin, String>> { | ||
ProviderPaginationCaller::new(num_results, |pr: PaginationRequest<String>| { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Due to the way the closure is written the returned ProviderPaginationCaller
must live as long as this provider on which the get_coins_new
is being called.
This is a bit weird since we're cloning the provider regardless, why would it need to live as long as the original one?
A consequence of the fact that we're capturing &self
inside the closure. Had we done something like this:
pub fn get_coins_new(
&self,
from: &Bech32Address,
asset_id: &AssetId,
num_results: usize,
) -> ProviderPaginationCaller<String, PaginatedResult<Coin, String>> {
let hash = Arc::new(from.hash().to_string());
let asset_id = Arc::new(asset_id.to_string());
let provider = Arc::new(self.clone());
ProviderPaginationCaller::new(num_results, move |pr: PaginationRequest<String>| {
let hash = Arc::clone(&hash);
let asset_id_string = Arc::clone(&asset_id);
let provider_clone = Arc::clone(&provider);
Box::pin(async move {
provider_clone
.client
.coins(&hash, Some(&asset_id_string), pr)
.await
.map_err(Into::into)
})
})
Then the ProviderPaginationCaller
would have resolved 'static
as its lifetime parameter 'a'
and thus be able to live longer than the provider it was called on.
'static
would be resolved because the returned closure moved the Arcs and thus became an owner of them of while previously it had first captured them by reference and then proceeded to clone them for the async
block. Thus forcing the closure and, by extension, the returned ProviderPaginationCaller
to have to live as long as self
.
Since this is Fn
and not FnOnce
we have this pattern of Arc s being created outside the closure, then being moved inside the closure but the closure itself cannot give them to the async block because that would make the closure FnOnce
since it had lost what it originally captured -- rather it has to clone the Arcs. Which is what arcs/rcs are for after all.
You can use Rcs as well if you decide to use LocalBoxFuture for some reason.
If you don't decide to implement the 'reuse ProviderPaginationCaller
suggestion I made then you can relax this to FnOnce
and simplify this capture process.
@MujkicA and @leviathanbeak were able to reproduce the inconsistent behavior when getting paginated results from the client. @leviathanbeak will investigate it from the client side. |
closes: #605 and #350
I have updated provider with the new APIs and added respective unit tests.