Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[refactor] #3982: Clear live queries after smart contract end #4024

Merged
merged 1 commit into from
Nov 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 38 additions & 8 deletions client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,20 +346,20 @@ impl_query_output! {
#[display(fmt = "{}@{torii_url}", "key_pair.public_key()")]
pub struct Client {
/// Url for accessing iroha node
torii_url: Url,
pub torii_url: Url,
/// Accounts keypair
key_pair: KeyPair,
pub key_pair: KeyPair,
/// Transaction time to live in milliseconds
transaction_ttl: Option<Duration>,
pub transaction_ttl: Option<Duration>,
/// Transaction status timeout
transaction_status_timeout: Duration,
pub transaction_status_timeout: Duration,
/// Current account
account_id: AccountId,
pub account_id: AccountId,
/// Http headers which will be appended to each request
headers: HashMap<String, String>,
pub headers: HashMap<String, String>,
/// If `true` add nonce, which makes different hashes for
/// transactions which occur repeatedly and/or simultaneously
add_transaction_nonce: bool,
pub add_transaction_nonce: bool,
}

/// Query request
Expand Down Expand Up @@ -388,6 +388,7 @@ impl QueryRequest {
),
}
}

fn assemble(self) -> DefaultRequestBuilder {
let builder = DefaultRequestBuilder::new(
HttpMethod::POST,
Expand Down Expand Up @@ -837,7 +838,7 @@ impl Client {
///
/// # Errors
/// Fails if sending request fails
pub fn request_with_filter_and_pagination_and_sorting<R: Query + Debug>(
pub(crate) fn request_with_filter_and_pagination_and_sorting<R: Query + Debug>(
&self,
request: R,
pagination: Pagination,
Expand Down Expand Up @@ -873,6 +874,35 @@ impl Client {
self.build_query(request).execute()
}

/// Query API entry point using cursor.
///
/// You should probably not use this function directly.
///
/// # Errors
/// Fails if sending request fails
#[cfg(debug_assertions)]
pub fn request_with_cursor<O>(
&self,
cursor: iroha_data_model::query::cursor::ForwardCursor,
) -> QueryResult<O::Target>
where
O: QueryOutput,
<O as TryFrom<Value>>::Error: Into<eyre::Error>,
{
let request = QueryRequest {
torii_url: self.torii_url.clone(),
headers: self.headers.clone(),
request: iroha_data_model::query::QueryRequest::Cursor(cursor),
};
let response = request.clone().assemble().build()?.send()?;

let mut resp_handler = QueryResponseHandler::<O>::new(request);
let value = resp_handler.handle(&response)?;
let output = O::new(value, resp_handler);

Ok(output)
}

/// Query API entry point.
/// Creates a [`QueryRequestBuilder`] which can be used to configure requests queries from `Iroha` peers.
///
Expand Down
49 changes: 47 additions & 2 deletions client/tests/integration/queries/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use std::str::FromStr as _;

use eyre::{bail, Result};
use iroha_client::client::{self, ClientQueryError};
use iroha_data_model::{
query::{error::QueryExecutionFail, FetchSize, MAX_FETCH_SIZE},
ValidationFail,
prelude::*,
query::{cursor::ForwardCursor, error::QueryExecutionFail, MAX_FETCH_SIZE},
};
use test_network::*;

Expand All @@ -27,3 +30,45 @@ fn too_big_fetch_size_is_not_allowed() {
))
));
}

#[test]
fn live_query_is_dropped_after_smart_contract_end() -> Result<()> {
let (_rt, _peer, client) = <PeerBuilder>::new().with_port(11_140).start_with_runtime();
wait_for_genesis_committed(&[client.clone()], 0);

let wasm = iroha_wasm_builder::Builder::new(
"tests/integration/smartcontracts/query_assets_and_save_cursor",
)
.show_output()
.build()?
.optimize()?
.into_bytes()?;

let transaction = client.build_transaction(
WasmSmartContract::from_compiled(wasm),
UnlimitedMetadata::default(),
)?;
client.submit_transaction_blocking(&transaction)?;

let metadata_value = client.request(FindAccountKeyValueByIdAndKey::new(
client.account_id.clone(),
Name::from_str("cursor").unwrap(),
))?;
let Value::String(cursor) = metadata_value.0 else {
bail!("Expected `Value::String`, got {:?}", metadata_value.0);
};
let asset_cursor = serde_json::from_str::<ForwardCursor>(&cursor)?;

let err = client
.request_with_cursor::<Vec<Asset>>(asset_cursor)
.expect_err("Request with cursor from smart contract should fail");

assert!(matches!(
err,
ClientQueryError::Validation(ValidationFail::QueryFailed(
QueryExecutionFail::UnknownCursor
))
));

Ok(())
}
2 changes: 2 additions & 0 deletions client/tests/integration/smartcontracts/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ members = [
"executor_with_admin",
"executor_with_custom_token",
"executor_with_migration_fail",
"query_assets_and_save_cursor",
]

[profile.dev]
Expand All @@ -27,6 +28,7 @@ opt-level = "z" # Optimize for size vs speed with "s"/"z"(removes vectorizat
codegen-units = 1 # Further reduces binary size but increases compilation time

[workspace.dependencies]
iroha_smart_contract = { version = "=2.0.0-pre-rc.20", path = "../../../../smart_contract", features = ["debug"]}
iroha_trigger = { version = "=2.0.0-pre-rc.20", path = "../../../../smart_contract/trigger", features = ["debug"]}
iroha_executor = { version = "=2.0.0-pre-rc.20", path = "../../../../smart_contract/executor" }
iroha_schema = { version = "=2.0.0-pre-rc.20", path = "../../../../schema" }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ impl Executor {
}
}

// TODO (#4049): Fix unused `visit_register_domain()`
fn visit_register_domain(executor: &mut Executor, authority: &AccountId, _isi: Register<Domain>) {
if executor.block_height() == 0 {
pass!(executor)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
[package]
name = "query_assets_and_save_cursor"

edition.workspace = true
version.workspace = true
authors.workspace = true

license.workspace = true

[lib]
crate-type = ['cdylib']

[dependencies]
iroha_smart_contract.workspace = true

panic-halt.workspace = true
lol_alloc.workspace = true
serde_json = { version = "1.0.108", default-features = false }
mversic marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
//! Smart contract which executes [`FindAllAssets`] and saves cursor to the owner's metadata.

#![no_std]

#[cfg(not(test))]
extern crate panic_halt;

extern crate alloc;

use alloc::string::ToString as _;
use core::num::NonZeroU32;

use iroha_smart_contract::{parse, prelude::*};
use lol_alloc::{FreeListAllocator, LockedAllocator};

#[global_allocator]
static ALLOC: LockedAllocator<FreeListAllocator> = LockedAllocator::new(FreeListAllocator::new());

/// Execute [`FindAllAssets`] and save cursor to the owner's metadata.
#[iroha_smart_contract::main]
fn main(owner: AccountId) {
let asset_cursor = FindAllAssets
.fetch_size(FetchSize::new(Some(NonZeroU32::try_from(1).dbg_unwrap())))
.execute()
.dbg_unwrap();

let (_batch, cursor) = asset_cursor.into_raw_parts();

SetKeyValueExpr::new(
owner,
parse!("cursor" as Name),
serde_json::to_value(cursor)
.dbg_expect("Failed to convert cursor to JSON")
.to_string(),
)
.execute()
.dbg_expect("Failed to save cursor to the owner's metadata");
}
1 change: 1 addition & 0 deletions client/tests/integration/triggers/by_call_trigger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,7 @@ fn trigger_in_genesis_using_base64() -> Result<()> {
info!("Building trigger");
let wasm =
iroha_wasm_builder::Builder::new("tests/integration/smartcontracts/mint_rose_trigger")
.show_output()
.build()?
.optimize()?
.into_bytes()?;
Expand Down
1 change: 1 addition & 0 deletions client/tests/integration/triggers/time_trigger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ fn mint_nft_for_every_user_every_1_sec() -> Result<()> {
let wasm = iroha_wasm_builder::Builder::new(
"tests/integration/smartcontracts/create_nft_for_every_user_trigger",
)
.show_output()
.build()?
.optimize()?
.into_bytes()?;
Expand Down
1 change: 1 addition & 0 deletions client/tests/integration/upgrade.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ fn upgrade_executor(client: &Client, executor: impl AsRef<Path>) -> Result<()> {
info!("Building executor");

let wasm = iroha_wasm_builder::Builder::new(executor.as_ref())
.show_output()
.build()?
.optimize()?
.into_bytes()?;
Expand Down
Binary file modified configs/peer/executor.wasm
Binary file not shown.
28 changes: 20 additions & 8 deletions core/src/query/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use iroha_data_model::{
asset::AssetValue,
query::{
cursor::ForwardCursor, error::QueryExecutionFail, pagination::Pagination, sorting::Sorting,
FetchSize, DEFAULT_FETCH_SIZE, MAX_FETCH_SIZE,
FetchSize, QueryId, DEFAULT_FETCH_SIZE, MAX_FETCH_SIZE,
},
BatchedResponse, BatchedResponseV1, HasMetadata, IdentifiableBox, ValidationFail, Value,
};
Expand Down Expand Up @@ -67,7 +67,7 @@ type LiveQuery = Batched<Vec<Value>>;
/// Clients can handle their queries using [`LiveQueryStoreHandle`]
#[derive(Debug)]
pub struct LiveQueryStore {
queries: HashMap<String, (LiveQuery, Instant)>,
queries: HashMap<QueryId, (LiveQuery, Instant)>,
query_idle_time: Duration,
}

Expand Down Expand Up @@ -138,7 +138,7 @@ impl LiveQueryStore {
LiveQueryStoreHandle { message_sender }
}

fn insert(&mut self, query_id: String, live_query: LiveQuery) {
fn insert(&mut self, query_id: QueryId, live_query: LiveQuery) {
self.queries.insert(query_id, (live_query, Instant::now()));
}

Expand All @@ -148,8 +148,8 @@ impl LiveQueryStore {
}

enum Message {
Insert(String, Batched<Vec<Value>>),
Remove(String, oneshot::Sender<Option<Batched<Vec<Value>>>>),
Insert(QueryId, Batched<Vec<Value>>),
Remove(QueryId, oneshot::Sender<Option<Batched<Vec<Value>>>>),
}

/// Handle to interact with [`LiveQueryStore`].
Expand Down Expand Up @@ -207,13 +207,25 @@ impl LiveQueryStoreHandle {
self.construct_query_response(query_id, cursor.cursor.map(NonZeroU64::get), live_query)
}

fn insert(&self, query_id: String, live_query: LiveQuery) -> Result<()> {
/// Remove query from the storage if there is any.
///
/// Returns `true` if query was removed, `false` otherwise.
///
/// # Errors
///
/// - Returns [`Error::ConnectionClosed`] if [`QueryService`] is dropped,
/// - Otherwise throws up query output handling errors.
pub fn drop_query(&self, query_id: QueryId) -> Result<bool> {
self.remove(query_id).map(|query_opt| query_opt.is_some())
}

fn insert(&self, query_id: QueryId, live_query: LiveQuery) -> Result<()> {
self.message_sender
.blocking_send(Message::Insert(query_id, live_query))
.map_err(|_| Error::ConnectionClosed)
}

fn remove(&self, query_id: String) -> Result<Option<LiveQuery>> {
fn remove(&self, query_id: QueryId) -> Result<Option<LiveQuery>> {
let (sender, receiver) = oneshot::channel();

self.message_sender
Expand All @@ -225,7 +237,7 @@ impl LiveQueryStoreHandle {

fn construct_query_response(
&self,
query_id: String,
query_id: QueryId,
curr_cursor: Option<u64>,
mut live_query: Batched<Vec<Value>>,
) -> Result<BatchedResponse<Value>> {
Expand Down
Loading
Loading