diff --git a/Cargo.lock b/Cargo.lock index 0c25dea84be..817e6d5d1db 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2945,7 +2945,6 @@ dependencies = [ "thread-local-panic-hook", "tokio", "tracing", - "uuid", "vergen", "warp", ] @@ -3107,6 +3106,7 @@ dependencies = [ "tempfile", "thiserror", "tokio", + "uuid", "wasmtime", ] @@ -3445,7 +3445,9 @@ dependencies = [ name = "iroha_smart_contract" version = "2.0.0-pre-rc.19" dependencies = [ + "derive_more", "iroha_data_model", + "iroha_macro", "iroha_smart_contract_derive", "iroha_smart_contract_utils", "parity-scale-codec", diff --git a/cli/Cargo.toml b/cli/Cargo.toml index a1b3f06cca9..b75abf0af1c 100644 --- a/cli/Cargo.toml +++ b/cli/Cargo.toml @@ -78,7 +78,6 @@ tempfile = { workspace = true } dashmap = { workspace = true } thread-local-panic-hook = { version = "0.1.0", optional = true } -uuid = { version = "1.4.1", features = ["v4"] } [dev-dependencies] serial_test = "0.8.0" diff --git a/cli/src/lib.rs b/cli/src/lib.rs index 44efca1113f..e2bdb206436 100644 --- a/cli/src/lib.rs +++ b/cli/src/lib.rs @@ -25,6 +25,7 @@ use iroha_core::{ handler::ThreadHandler, kura::Kura, prelude::{World, WorldStateView}, + query::store::LiveQueryStore, queue::Queue, smartcontracts::isi::Registrable as _, snapshot::{try_read_snapshot, SnapshotMaker, SnapshotMakerHandle}, @@ -97,12 +98,13 @@ pub struct Iroha { pub kura: Arc, /// Torii web server pub torii: Option, - /// Snapshot service, + /// Snapshot service pub snapshot_maker: SnapshotMakerHandle, /// Thread handlers thread_handlers: Vec, - /// A boolean value indicating whether or not the peers will recieve data from the network. Used in - /// sumeragi testing. + + /// A boolean value indicating whether or not the peers will receive data from the network. + /// Used in sumeragi testing. #[cfg(debug_assertions)] pub freeze_status: Arc, } @@ -246,13 +248,25 @@ impl Iroha { std::path::Path::new(&config.kura.block_store_path), config.kura.debug_output_new_blocks, )?; + let live_query_store_handle = + LiveQueryStore::from_configuration(config.live_query_store).start(); - let notify_shutdown = Arc::new(Notify::new()); let block_count = kura.init()?; - let wsv = try_read_snapshot(&config.snapshot.dir_path, &kura, block_count).map_or_else( + let wsv = try_read_snapshot( + &config.snapshot.dir_path, + &kura, + live_query_store_handle.clone(), + block_count, + ) + .map_or_else( |error| { iroha_logger::warn!(%error, "Failed to load wsv from snapshot, creating empty wsv"); - WorldStateView::from_configuration(config.wsv, world, Arc::clone(&kura)) + WorldStateView::from_configuration( + config.wsv, + world, + Arc::clone(&kura), + live_query_store_handle.clone(), + ) }, |wsv| { iroha_logger::info!( @@ -303,6 +317,8 @@ impl Iroha { #[cfg(debug_assertions)] let freeze_status = Arc::new(AtomicBool::new(false)); + let notify_shutdown = Arc::new(Notify::new()); + NetworkRelay { sumeragi: sumeragi.clone(), block_sync, @@ -323,6 +339,7 @@ impl Iroha { events_sender, Arc::clone(¬ify_shutdown), sumeragi.clone(), + live_query_store_handle, Arc::clone(&kura), ); diff --git a/cli/src/torii/mod.rs b/cli/src/torii/mod.rs index a478fe4f554..d27dd0c32ab 100644 --- a/cli/src/torii/mod.rs +++ b/cli/src/torii/mod.rs @@ -7,21 +7,18 @@ use std::{ fmt::{Debug, Write as _}, net::ToSocketAddrs, sync::Arc, - time::{Duration, Instant}, }; -use dashmap::DashMap; use futures::{stream::FuturesUnordered, StreamExt}; use iroha_core::{ kura::Kura, prelude::*, + query::store::LiveQueryStoreHandle, queue::{self, Queue}, sumeragi::SumeragiHandle, EventsSender, }; -use iroha_data_model::Value; -use parity_scale_codec::Encode; -use tokio::{sync::Notify, time::sleep}; +use tokio::sync::Notify; use utils::*; use warp::{ http::StatusCode, @@ -30,56 +27,10 @@ use warp::{ Filter as _, Reply, }; -use self::cursor::Batched; - #[macro_use] pub(crate) mod utils; -mod cursor; -mod pagination; mod routing; -type LiveQuery = Batched>; - -#[derive(Default)] -struct LiveQueryStore { - queries: DashMap<(String, Vec), (LiveQuery, Instant)>, -} - -impl LiveQueryStore { - fn insert(&self, query_id: String, request: T, live_query: LiveQuery) { - self.queries - .insert((query_id, request.encode()), (live_query, Instant::now())); - } - - fn remove(&self, query_id: &str, request: &T) -> Option { - self.queries - .remove(&(query_id.to_string(), request.encode())) - .map(|(_, (output, _))| output) - } - - fn expired_query_cleanup( - self: Arc, - idle_time: Duration, - notify_shutdown: Arc, - ) -> tokio::task::JoinHandle<()> { - tokio::task::spawn(async move { - loop { - tokio::select! { - _ = sleep(idle_time) => { - self.queries - .retain(|_, (_, last_access_time)| last_access_time.elapsed() <= idle_time); - }, - _ = notify_shutdown.notified() => { - iroha_logger::info!("Query cleanup service is being shut down."); - break; - } - else => break, - } - } - }) - } -} - /// Main network handler and the only entrypoint of the Iroha. pub struct Torii { iroha_cfg: super::Configuration, @@ -87,14 +38,14 @@ pub struct Torii { events: EventsSender, notify_shutdown: Arc, sumeragi: SumeragiHandle, - query_store: Arc, + query_service: LiveQueryStoreHandle, kura: Arc, } /// Torii errors. #[derive(Debug, thiserror::Error, displaydoc::Display)] pub enum Error { - /// Failed to execute or validate query + /// Failed to process query Query(#[from] iroha_data_model::ValidationFail), /// Failed to accept transaction AcceptTransaction(#[from] iroha_core::tx::AcceptTransactionFail), @@ -107,43 +58,14 @@ pub enum Error { #[cfg(feature = "telemetry")] /// Error while getting Prometheus metrics Prometheus(#[source] eyre::Report), - /// Error while resuming cursor - UnknownCursor, -} - -/// Status code for query error response. -fn query_status_code(validation_error: &iroha_data_model::ValidationFail) -> StatusCode { - use iroha_data_model::{ - isi::error::InstructionExecutionError, query::error::QueryExecutionFail::*, - ValidationFail::*, - }; - - match validation_error { - NotPermitted(_) => StatusCode::FORBIDDEN, - QueryFailed(query_error) - | InstructionFailed(InstructionExecutionError::Query(query_error)) => match query_error { - Evaluate(_) | Conversion(_) => StatusCode::BAD_REQUEST, - Signature(_) => StatusCode::UNAUTHORIZED, - Find(_) => StatusCode::NOT_FOUND, - }, - TooComplex => StatusCode::UNPROCESSABLE_ENTITY, - InternalError(_) => StatusCode::INTERNAL_SERVER_ERROR, - InstructionFailed(error) => { - iroha_logger::error!( - ?error, - "Query validation failed with unexpected error. This means a bug inside Runtime Executor", - ); - StatusCode::INTERNAL_SERVER_ERROR - } - } } impl Reply for Error { fn into_response(self) -> Response { - use Error::*; match self { - Query(err) => { - reply::with_status(utils::Scale(&err), query_status_code(&err)).into_response() + Self::Query(err) => { + reply::with_status(utils::Scale(&err), Self::query_status_code(&err)) + .into_response() } _ => reply::with_status(Self::to_string(&self), self.status_code()).into_response(), } @@ -153,11 +75,10 @@ impl Reply for Error { impl Error { fn status_code(&self) -> StatusCode { use Error::*; + match self { - Query(e) => query_status_code(e), - AcceptTransaction(_) | ConfigurationReload(_) | UnknownCursor => { - StatusCode::BAD_REQUEST - } + Query(e) => Self::query_status_code(e), + AcceptTransaction(_) | ConfigurationReload(_) => StatusCode::BAD_REQUEST, Config(_) => StatusCode::NOT_FOUND, PushIntoQueue(err) => match **err { queue::Error::Full => StatusCode::INTERNAL_SERVER_ERROR, @@ -169,6 +90,33 @@ impl Error { } } + fn query_status_code(validation_error: &iroha_data_model::ValidationFail) -> StatusCode { + use iroha_data_model::{ + isi::error::InstructionExecutionError, query::error::QueryExecutionFail::*, + ValidationFail::*, + }; + + match validation_error { + NotPermitted(_) => StatusCode::FORBIDDEN, + QueryFailed(query_error) + | InstructionFailed(InstructionExecutionError::Query(query_error)) => match query_error + { + Evaluate(_) | Conversion(_) | UnknownCursor => StatusCode::BAD_REQUEST, + Signature(_) => StatusCode::UNAUTHORIZED, + Find(_) => StatusCode::NOT_FOUND, + }, + TooComplex => StatusCode::UNPROCESSABLE_ENTITY, + InternalError(_) => StatusCode::INTERNAL_SERVER_ERROR, + InstructionFailed(error) => { + iroha_logger::error!( + ?error, + "Query validation failed with unexpected error. This means a bug inside Runtime Executor", + ); + StatusCode::INTERNAL_SERVER_ERROR + } + } + } + fn to_string(err: &dyn std::error::Error) -> String { let mut s = "Error:\n".to_owned(); let mut idx = 0_i32; diff --git a/cli/src/torii/routing.rs b/cli/src/torii/routing.rs index b61cd81cd78..7263cbd9a1d 100644 --- a/cli/src/torii/routing.rs +++ b/cli/src/torii/routing.rs @@ -5,12 +5,8 @@ // FIXME: This can't be fixed, because one trait in `warp` is private. #![allow(opaque_hidden_inferred_bound)] -use std::{ - cmp::Ordering, - num::{NonZeroU64, NonZeroUsize}, -}; +use std::num::NonZeroUsize; -use cursor::Batch; use eyre::WrapErr; use futures::TryStreamExt; use iroha_config::{ @@ -20,7 +16,8 @@ use iroha_config::{ GetConfiguration, PostConfiguration, }; use iroha_core::{ - smartcontracts::{isi::query::ValidQueryRequest, query::LazyValue}, + query::{pagination::Paginate, store::LiveQueryStoreHandle}, + smartcontracts::query::ValidQueryRequest, sumeragi::SumeragiHandle, }; use iroha_data_model::{ @@ -28,18 +25,38 @@ use iroha_data_model::{ stream::{BlockMessage, BlockSubscriptionRequest}, SignedBlock, }, - http::{BatchedResponse, BatchedResponseV1}, prelude::*, - query::{ForwardCursor, Pagination, Sorting}, + query::{cursor::ForwardCursor, http, sorting::Sorting, Pagination, QueryWithParameters}, + BatchedResponse, }; #[cfg(feature = "telemetry")] use iroha_telemetry::metrics::Status; -use pagination::Paginate; use tokio::task; use super::*; use crate::stream::{Sink, Stream}; +/// Filter for warp which extracts [`http::ClientQueryRequest`] +fn client_query_request( +) -> impl warp::Filter + Copy { + body::versioned::() + .and(sorting()) + .and(paginate()) + .and_then(|signed_query, sorting, pagination| async move { + Result::<_, std::convert::Infallible>::Ok(http::ClientQueryRequest::Query( + QueryWithParameters { + query: signed_query, + sorting, + pagination, + }, + )) + }) + .or(cursor().and_then(|cursor| async move { + Result::<_, std::convert::Infallible>::Ok(http::ClientQueryRequest::Cursor(cursor)) + })) + .unify() +} + /// Filter for warp which extracts sorting fn sorting() -> impl warp::Filter + Copy { warp::query() @@ -81,122 +98,33 @@ async fn handle_instructions( #[iroha_futures::telemetry_future] async fn handle_queries( + live_query_store: LiveQueryStoreHandle, sumeragi: SumeragiHandle, - query_store: Arc, fetch_size: NonZeroUsize, - request: SignedQuery, - sorting: Sorting, - pagination: Pagination, - - cursor: ForwardCursor, -) -> Result>> { - let valid_request = sumeragi.apply_wsv(|wsv| ValidQueryRequest::validate(request, wsv))?; - let request_id = (&valid_request, &sorting, &pagination); - - if let Some(query_id) = cursor.query_id { - let live_query = query_store - .remove(&query_id, &request_id) - .ok_or(Error::UnknownCursor)?; - - return construct_query_response( - request_id, - &query_store, - query_id, - cursor.cursor.map(NonZeroU64::get), - live_query, - ); - } - - sumeragi.apply_wsv(|wsv| { - let res = valid_request.execute(wsv).map_err(ValidationFail::from)?; - - match res { - LazyValue::Value(batch) => { - let cursor = ForwardCursor::default(); - let result = BatchedResponseV1 { batch, cursor }; - Ok(Scale(result.into())) - } - LazyValue::Iter(iter) => { - let live_query = apply_sorting_and_pagination(iter, &sorting, pagination); - let query_id = uuid::Uuid::new_v4().to_string(); - - let curr_cursor = Some(0); - let live_query = live_query.batched(fetch_size); - construct_query_response( - request_id, - &query_store, - query_id, - curr_cursor, - live_query, - ) - } - } - }) -} - -fn construct_query_response( - request_id: (&ValidQueryRequest, &Sorting, &Pagination), - query_store: &LiveQueryStore, - query_id: String, - curr_cursor: Option, - mut live_query: Batched>, + query_request: http::ClientQueryRequest, ) -> Result>> { - let (batch, next_cursor) = live_query.next_batch(curr_cursor)?; - - if !live_query.is_depleted() { - query_store.insert(query_id.clone(), request_id, live_query); - } - - let query_response = BatchedResponseV1 { - batch: Value::Vec(batch), - cursor: ForwardCursor { - query_id: Some(query_id), - cursor: next_cursor, - }, - }; - - Ok(Scale(query_response.into())) -} - -fn apply_sorting_and_pagination( - iter: impl Iterator, - sorting: &Sorting, - pagination: Pagination, -) -> Vec { - if let Some(key) = &sorting.sort_by_metadata_key { - let mut pairs: Vec<(Option, Value)> = iter - .map(|value| { - let key = match &value { - Value::Identifiable(IdentifiableBox::Asset(asset)) => match asset.value() { - AssetValue::Store(store) => store.get(key).cloned(), - _ => None, - }, - Value::Identifiable(v) => TryInto::<&dyn HasMetadata>::try_into(v) - .ok() - .and_then(|has_metadata| has_metadata.metadata().get(key)) - .cloned(), - _ => None, - }; - (key, value) - }) - .collect(); - pairs.sort_by( - |(left_key, _), (right_key, _)| match (left_key, right_key) { - (Some(l), Some(r)) => l.cmp(r), - (Some(_), None) => Ordering::Less, - (None, Some(_)) => Ordering::Greater, - (None, None) => Ordering::Equal, - }, - ); - pairs - .into_iter() - .map(|(_, val)| val) - .paginate(pagination) - .collect() - } else { - iter.paginate(pagination).collect() - } + let handle = tokio::task::spawn_blocking(move || match query_request { + http::ClientQueryRequest::Query(QueryWithParameters { + query: signed_query, + sorting, + pagination, + }) => sumeragi.apply_wsv(|wsv| { + let valid_query = ValidQueryRequest::validate(signed_query, wsv)?; + let query_output = valid_query.execute(wsv)?; + live_query_store + .handle_query_output(query_output, fetch_size, &sorting, pagination) + .map_err(ValidationFail::from) + }), + http::ClientQueryRequest::Cursor(cursor) => live_query_store + .handle_query_cursor(cursor) + .map_err(ValidationFail::from), + }); + handle + .await + .expect("Failed to join query handling task") + .map(Scale) + .map_err(Into::into) } #[derive(serde::Serialize)] @@ -454,7 +382,7 @@ async fn handle_status_precise(sumeragi: SumeragiHandle, segment: String) -> Res } impl Torii { - /// Construct `Torii` from `ToriiConfiguration`. + /// Construct `Torii`. #[allow(clippy::too_many_arguments)] pub fn from_configuration( iroha_cfg: Configuration, @@ -462,6 +390,7 @@ impl Torii { events: EventsSender, notify_shutdown: Arc, sumeragi: SumeragiHandle, + query_service: LiveQueryStoreHandle, kura: Arc, ) -> Self { Self { @@ -470,7 +399,7 @@ impl Torii { queue, notify_shutdown, sumeragi, - query_store: Arc::default(), + query_service, kura, } } @@ -540,19 +469,16 @@ impl Torii { )) .and(body::versioned()), ) - .or(endpoint7( + .or(endpoint4( handle_queries, warp::path(uri::QUERY) .and(add_state!( + self.query_service, self.sumeragi, - self.query_store, NonZeroUsize::try_from(self.iroha_cfg.torii.fetch_size) - .expect("u64 should always fit into usize"), + .expect("u64 should always fit into usize") )) - .and(body::versioned()) - .and(sorting()) - .and(paginate()) - .and(cursor()), + .and(client_query_request()), )) .or(endpoint2( handle_post_configuration, @@ -648,16 +574,10 @@ impl Torii { /// Can fail due to listening to network or if http server fails #[iroha_futures::telemetry_future] pub(crate) async fn start(self) -> eyre::Result<()> { - let query_idle_time = Duration::from_millis(self.iroha_cfg.torii.query_idle_time_ms.get()); - let torii = Arc::new(self); let mut handles = vec![]; handles.extend(Arc::clone(&torii).start_api()?); - handles.push( - Arc::clone(&torii.query_store) - .expired_query_cleanup(query_idle_time, Arc::clone(&torii.notify_shutdown)), - ); handles .into_iter() diff --git a/cli/src/torii/utils.rs b/cli/src/torii/utils.rs index 7d590ff4b48..5c0cbfd0d50 100644 --- a/cli/src/torii/utils.rs +++ b/cli/src/torii/utils.rs @@ -14,6 +14,7 @@ impl Reply for Empty { } /// Structure for response in scale codec in body +#[derive(Debug)] pub struct Scale(pub T); impl Reply for Scale { diff --git a/client/src/client.rs b/client/src/client.rs index 6703547e811..3b3330f0708 100644 --- a/client/src/client.rs +++ b/client/src/client.rs @@ -22,13 +22,12 @@ use iroha_config::{client::Configuration, torii::uri, GetConfiguration, PostConf use iroha_crypto::{HashOf, KeyPair}; use iroha_data_model::{ block::SignedBlock, - http::BatchedResponse, isi::Instruction, predicate::PredicateBox, prelude::*, - query::{ForwardCursor, Pagination, Query, Sorting}, + query::{Pagination, Query, Sorting}, transaction::TransactionPayload, - ValidationFail, + BatchedResponse, ValidationFail, }; use iroha_logger::prelude::*; use iroha_telemetry::metrics::Status; @@ -152,7 +151,7 @@ where .map_err(Into::into) .wrap_err("Unexpected type")?; - self.query_request.query_cursor = cursor; + self.query_request.request = iroha_data_model::query::QueryRequest::Cursor(cursor); Ok(value) } } @@ -262,13 +261,10 @@ where fn next(&mut self) -> Option { if self.client_cursor >= self.iter.len() { - if self - .query_handler - .query_request - .query_cursor - .cursor() - .is_none() - { + let iroha_data_model::query::QueryRequest::Cursor(cursor) = &self.query_handler.query_request.request else { + return None; + }; + if cursor.cursor().is_none() { return None; } @@ -310,7 +306,7 @@ where } } -macro_rules! impl_query_result { +macro_rules! impl_query_output { ( $($ident:ty),+ $(,)? ) => { $( impl QueryOutput for $ident { type Target = Self; @@ -321,7 +317,7 @@ macro_rules! impl_query_result { } )+ }; } -impl_query_result! { +impl_query_output! { bool, iroha_data_model::Value, iroha_data_model::numeric::NumericValue, @@ -367,10 +363,7 @@ pub struct Client { pub struct QueryRequest { torii_url: Url, headers: HashMap, - request: Vec, - sorting: Sorting, - pagination: Pagination, - query_cursor: ForwardCursor, + request: iroha_data_model::query::QueryRequest>, } impl QueryRequest { @@ -381,22 +374,31 @@ impl QueryRequest { Self { torii_url: format!("http://{torii_url}").parse().unwrap(), headers: HashMap::new(), - request: Vec::new(), - sorting: Sorting::default(), - pagination: Pagination::default(), - query_cursor: ForwardCursor::default(), + request: iroha_data_model::query::QueryRequest::Query( + iroha_data_model::query::QueryWithParameters { + query: Vec::default(), + sorting: Sorting::default(), + pagination: Pagination::default(), + }, + ), } } fn assemble(self) -> DefaultRequestBuilder { - DefaultRequestBuilder::new( + let builder = DefaultRequestBuilder::new( HttpMethod::POST, self.torii_url.join(uri::QUERY).expect("Valid URI"), ) - .headers(self.headers) - .params(Vec::from(self.sorting)) - .params(Vec::from(self.pagination)) - .params(Vec::from(self.query_cursor)) - .body(self.request) + .headers(self.headers); + + match self.request { + iroha_data_model::query::QueryRequest::Query(query_with_params) => builder + .params(Vec::from(query_with_params.sorting)) + .params(Vec::from(query_with_params.pagination)) + .body(query_with_params.query), + iroha_data_model::query::QueryRequest::Cursor(cursor) => { + builder.params(Vec::from(cursor)) + } + } } } @@ -810,10 +812,13 @@ impl Client { let query_request = QueryRequest { torii_url: self.torii_url.clone(), headers: self.headers.clone(), - request, - sorting, - pagination, - query_cursor: ForwardCursor::default(), + request: iroha_data_model::query::QueryRequest::Query( + iroha_data_model::query::QueryWithParameters { + query: request, + pagination, + sorting, + }, + ), }; Ok(( diff --git a/client/tests/integration/smartcontracts/create_nft_for_every_user_trigger/src/lib.rs b/client/tests/integration/smartcontracts/create_nft_for_every_user_trigger/src/lib.rs index e5cf1a30c5a..5fa18856b48 100644 --- a/client/tests/integration/smartcontracts/create_nft_for_every_user_trigger/src/lib.rs +++ b/client/tests/integration/smartcontracts/create_nft_for_every_user_trigger/src/lib.rs @@ -16,10 +16,14 @@ static ALLOC: LockedAllocator = LockedAllocator::new(FreeList #[iroha_trigger::main] fn main(_owner: AccountId, _event: Event) { iroha_trigger::log::info!("Executing trigger"); - let accounts = FindAllAccounts.execute().dbg_unwrap(); + + let accounts_cursor = FindAllAccounts.execute().dbg_unwrap(); + let limits = MetadataLimits::new(256, 256); - for account in accounts { + for account in accounts_cursor { + let account = account.dbg_unwrap(); + let mut metadata = Metadata::new(); let name = format!( "nft_for_{}_in_{}", @@ -53,6 +57,7 @@ fn generate_new_nft_id(account_id: &AccountId) -> AssetDefinitionId { let new_number = assets .into_iter() + .map(|res| res.dbg_unwrap()) .filter(|asset| asset.id().definition_id().to_string().starts_with("nft_")) .count() .checked_add(1) diff --git a/client/tests/integration/smartcontracts/executor_with_custom_token/src/lib.rs b/client/tests/integration/smartcontracts/executor_with_custom_token/src/lib.rs index bf54a6c8503..496ac1c326d 100644 --- a/client/tests/integration/smartcontracts/executor_with_custom_token/src/lib.rs +++ b/client/tests/integration/smartcontracts/executor_with_custom_token/src/lib.rs @@ -84,6 +84,9 @@ impl Executor { let mut found_accounts = Vec::new(); for account in accounts { + let account = account.map_err(|error| { + format!("{:?}", anyhow!(error).context("Failed to get account")) + })?; let permission_tokens = FindPermissionTokensByAccountId::new(account.id().clone()) .execute() .map_err(|error| { @@ -97,6 +100,13 @@ impl Executor { })?; for token in permission_tokens { + let token = token.map_err(|error| { + format!( + "{:?}", + anyhow!(error).context("Failed to get permission token") + ) + })?; + if let Ok(can_unregister_domain_token) = iroha_executor::default::domain::tokens::CanUnregisterDomain::try_from(token) { diff --git a/config/iroha_test_config.json b/config/iroha_test_config.json index 3c55e840892..ad415a54587 100644 --- a/config/iroha_test_config.json +++ b/config/iroha_test_config.json @@ -44,8 +44,7 @@ "API_URL": "127.0.0.1:8080", "MAX_TRANSACTION_SIZE": 32768, "MAX_CONTENT_LEN": 16384000, - "FETCH_SIZE": 10, - "QUERY_IDLE_TIME_MS": 30000 + "FETCH_SIZE": 10 }, "BLOCK_SYNC": { "GOSSIP_PERIOD_MS": 10000, @@ -120,5 +119,8 @@ "CREATE_EVERY_MS": 60000, "DIR_PATH": "./storage", "CREATION_ENABLED": true + }, + "LIVE_QUERY_STORE": { + "QUERY_IDLE_TIME_MS": 30000 } } diff --git a/config/src/iroha.rs b/config/src/iroha.rs index 779295f6b4a..ed022853431 100644 --- a/config/src/iroha.rs +++ b/config/src/iroha.rs @@ -59,6 +59,9 @@ view! { /// SnapshotMaker configuration #[config(inner)] pub snapshot: snapshot::Configuration, + /// LiveQueryStore configuration + #[config(inner)] + pub live_query_store: live_query_store::Configuration, } } @@ -79,6 +82,7 @@ impl Default for ConfigurationProxy { network: Some(network::ConfigurationProxy::default()), telemetry: Some(telemetry::ConfigurationProxy::default()), snapshot: Some(snapshot::ConfigurationProxy::default()), + live_query_store: Some(live_query_store::ConfigurationProxy::default()), } } } @@ -221,9 +225,10 @@ mod tests { network in prop::option::of(network::tests::arb_proxy()), telemetry in prop::option::of(telemetry::tests::arb_proxy()), snapshot in prop::option::of(snapshot::tests::arb_proxy()), + live_query_store in prop::option::of(live_query_store::tests::arb_proxy()), ) -> ConfigurationProxy { ConfigurationProxy { public_key, private_key, disable_panic_terminal_colors, kura, sumeragi, torii, block_sync, queue, - logger, genesis, wsv, network, telemetry, snapshot } + logger, genesis, wsv, network, telemetry, snapshot, live_query_store } } } diff --git a/config/src/lib.rs b/config/src/lib.rs index 628fcff9271..6e80c5e1c88 100644 --- a/config/src/lib.rs +++ b/config/src/lib.rs @@ -7,6 +7,7 @@ pub mod client; pub mod genesis; pub mod iroha; pub mod kura; +pub mod live_query_store; pub mod logger; pub mod network; pub mod path; diff --git a/config/src/live_query_store.rs b/config/src/live_query_store.rs new file mode 100644 index 00000000000..79382fee2ca --- /dev/null +++ b/config/src/live_query_store.rs @@ -0,0 +1,44 @@ +//! Module for `LiveQueryStore`-related configuration and structs. + +use std::num::NonZeroU64; + +use iroha_config_base::derive::{Documented, Proxy}; +use serde::{Deserialize, Serialize}; + +/// Default max time a query can remain in the store unaccessed +pub static DEFAULT_QUERY_IDLE_TIME_MS: once_cell::sync::Lazy = + once_cell::sync::Lazy::new(|| NonZeroU64::new(30_000).unwrap()); + +/// Configuration for `QueryService`. +#[derive(Debug, Copy, Clone, PartialEq, Eq, Deserialize, Serialize, Documented, Proxy)] +#[serde(rename_all = "UPPERCASE")] +#[config(env_prefix = "LIVE_QUERY_STORE_")] +pub struct Configuration { + /// Time query can remain in the store if unaccessed + pub query_idle_time_ms: NonZeroU64, +} + +impl Default for ConfigurationProxy { + fn default() -> Self { + Self { + query_idle_time_ms: Some(*DEFAULT_QUERY_IDLE_TIME_MS), + } + } +} + +#[cfg(test)] +pub mod tests { + use proptest::prelude::*; + + use super::*; + + prop_compose! { + pub fn arb_proxy() + ( + query_idle_time_ms in prop::option::of(Just(*DEFAULT_QUERY_IDLE_TIME_MS)), + ) + -> ConfigurationProxy { + ConfigurationProxy { query_idle_time_ms } + } + } +} diff --git a/config/src/torii.rs b/config/src/torii.rs index 764c369ddd4..b0869b70881 100644 --- a/config/src/torii.rs +++ b/config/src/torii.rs @@ -15,9 +15,6 @@ pub const DEFAULT_TORII_MAX_CONTENT_LENGTH: u32 = 2_u32.pow(12) * 4000; /// Default max size of a single batch of results from a query pub static DEFAULT_TORII_FETCH_SIZE: once_cell::sync::Lazy = once_cell::sync::Lazy::new(|| NonZeroU64::new(10).unwrap()); -/// Default max time a query can remain in the store unaccessed -pub static DEFAULT_TORII_QUERY_IDLE_TIME_MS: once_cell::sync::Lazy = - once_cell::sync::Lazy::new(|| NonZeroU64::new(30_000).unwrap()); /// Structure that defines the configuration parameters of `Torii` which is the routing module. /// For example the `p2p_addr`, which is used for consensus and block-synchronisation purposes, @@ -38,8 +35,6 @@ pub struct Configuration { pub max_content_len: u32, /// How many query results are returned in one batch pub fetch_size: NonZeroU64, - /// Time query can remain in the store if unaccessed - pub query_idle_time_ms: NonZeroU64, } impl Default for ConfigurationProxy { @@ -50,7 +45,6 @@ impl Default for ConfigurationProxy { max_transaction_size: Some(DEFAULT_TORII_MAX_TRANSACTION_SIZE), max_content_len: Some(DEFAULT_TORII_MAX_CONTENT_LENGTH), fetch_size: Some(*DEFAULT_TORII_FETCH_SIZE), - query_idle_time_ms: Some(*DEFAULT_TORII_QUERY_IDLE_TIME_MS), } } } @@ -104,10 +98,9 @@ pub mod tests { max_transaction_size in prop::option::of(Just(DEFAULT_TORII_MAX_TRANSACTION_SIZE)), max_content_len in prop::option::of(Just(DEFAULT_TORII_MAX_CONTENT_LENGTH)), fetch_size in prop::option::of(Just(*DEFAULT_TORII_FETCH_SIZE)), - query_idle_time_ms in prop::option::of(Just(*DEFAULT_TORII_QUERY_IDLE_TIME_MS)), ) -> ConfigurationProxy { - ConfigurationProxy { p2p_addr, api_url, max_transaction_size, max_content_len, fetch_size, query_idle_time_ms } + ConfigurationProxy { p2p_addr, api_url, max_transaction_size, max_content_len, fetch_size } } } } diff --git a/configs/peer/config.json b/configs/peer/config.json index ef36a9f525c..02211ed3072 100644 --- a/configs/peer/config.json +++ b/configs/peer/config.json @@ -25,8 +25,7 @@ "API_URL": null, "MAX_TRANSACTION_SIZE": 32768, "MAX_CONTENT_LEN": 16384000, - "FETCH_SIZE": 10, - "QUERY_IDLE_TIME_MS": 30000 + "FETCH_SIZE": 10 }, "BLOCK_SYNC": { "GOSSIP_PERIOD_MS": 10000, @@ -94,5 +93,8 @@ "CREATE_EVERY_MS": 60000, "DIR_PATH": "./storage", "CREATION_ENABLED": true + }, + "LIVE_QUERY_STORE": { + "QUERY_IDLE_TIME_MS": 30000 } } diff --git a/configs/peer/executor.wasm b/configs/peer/executor.wasm index 9d5028a64b7..7747381b6f3 100644 Binary files a/configs/peer/executor.wasm and b/configs/peer/executor.wasm differ diff --git a/core/Cargo.toml b/core/Cargo.toml index b97c91d7d3e..c16d5b47ec6 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -69,6 +69,7 @@ wasmtime = { workspace = true } parking_lot = { workspace = true, features = ["deadlock_detection"] } derive_more = { workspace = true } itertools = { workspace = true } +uuid = { version = "1.4.1", features = ["v4"] } [dev-dependencies] criterion = { workspace = true } diff --git a/core/benches/blocks/common.rs b/core/benches/blocks/common.rs index 56f769b1778..3accf513712 100644 --- a/core/benches/blocks/common.rs +++ b/core/benches/blocks/common.rs @@ -6,6 +6,7 @@ use eyre::Result; use iroha_core::{ block::{BlockBuilder, CommittedBlock}, prelude::*, + query::store::LiveQueryStore, smartcontracts::Execute, sumeragi::network_topology::Topology, wsv::World, @@ -175,7 +176,8 @@ pub fn restore_every_nth( pub fn build_wsv(account_id: &AccountId, key_pair: &KeyPair) -> WorldStateView { let kura = iroha_core::kura::Kura::blank_kura_for_testing(); - let mut wsv = WorldStateView::new(World::with([], UniqueVec::new()), kura); + let query_handle = LiveQueryStore::test().start(); + let mut wsv = WorldStateView::new(World::with([], UniqueVec::new()), kura, query_handle); wsv.config.transaction_limits = TransactionLimits::new(u64::MAX, u64::MAX); wsv.config.wasm_runtime_config.fuel_limit = u64::MAX; wsv.config.wasm_runtime_config.max_memory = u32::MAX; diff --git a/core/benches/kura.rs b/core/benches/kura.rs index 2f74c5ce1e4..d10bd567434 100644 --- a/core/benches/kura.rs +++ b/core/benches/kura.rs @@ -8,6 +8,7 @@ use iroha_core::{ block::*, kura::{BlockStore, LockStatus}, prelude::*, + query::store::LiveQueryStore, sumeragi::network_topology::Topology, wsv::World, }; @@ -42,7 +43,8 @@ async fn measure_block_size_for_n_executors(n_executors: u32) { iroha_core::kura::Kura::new(iroha_config::kura::Mode::Strict, dir.path(), false).unwrap(); let _thread_handle = iroha_core::kura::Kura::start(kura.clone()); - let mut wsv = WorldStateView::new(World::new(), kura); + let query_handle = LiveQueryStore::test().start(); + let mut wsv = WorldStateView::new(World::new(), kura, query_handle); let topology = Topology::new(UniqueVec::new()); let mut block = BlockBuilder::new(vec![tx], topology, Vec::new()) .chain(0, &mut wsv) diff --git a/core/benches/validation.rs b/core/benches/validation.rs index d2617396271..df41e743d39 100644 --- a/core/benches/validation.rs +++ b/core/benches/validation.rs @@ -6,6 +6,7 @@ use criterion::{criterion_group, criterion_main, Criterion}; use iroha_core::{ block::*, prelude::*, + query::store::LiveQueryStore, smartcontracts::{isi::Registrable as _, Execute}, sumeragi::network_topology::Topology, tx::TransactionExecutor, @@ -55,6 +56,7 @@ fn build_test_transaction(keys: KeyPair) -> SignedTransaction { fn build_test_and_transient_wsv(keys: KeyPair) -> WorldStateView { let kura = iroha_core::kura::Kura::blank_kura_for_testing(); + let query_handle = LiveQueryStore::test().start(); let (public_key, _) = keys.into(); let mut wsv = WorldStateView::new( @@ -70,6 +72,7 @@ fn build_test_and_transient_wsv(keys: KeyPair) -> WorldStateView { World::with([domain], UniqueVec::new()) }, kura, + query_handle, ); { @@ -146,7 +149,8 @@ fn sign_blocks(criterion: &mut Criterion) { .expect("Failed to accept transaction."); let key_pair = KeyPair::generate().expect("Failed to generate KeyPair."); let kura = iroha_core::kura::Kura::blank_kura_for_testing(); - let mut wsv = WorldStateView::new(World::new(), kura); + let query_handle = LiveQueryStore::test().start(); + let mut wsv = WorldStateView::new(World::new(), kura, query_handle); let topology = Topology::new(UniqueVec::new()); let mut success_count = 0; diff --git a/core/src/block.rs b/core/src/block.rs index 177d8e90118..6d4a655029e 100644 --- a/core/src/block.rs +++ b/core/src/block.rs @@ -361,27 +361,21 @@ mod valid { let transaction_executor = wsv.transaction_executor(); let limits = &transaction_executor.transaction_limits; - if error.is_none() { - let tx = if is_genesis { + let tx = if is_genesis { AcceptedTransaction::accept_genesis(GenesisTransaction(value)) - } else { - AcceptedTransaction::accept(value, limits)? - }; - - transaction_executor.validate(tx, wsv).map_err(|(_tx, error)| { - TransactionValidationError::NotValid(error) - })?; } else { - let tx = if is_genesis { - AcceptedTransaction::accept_genesis(GenesisTransaction(value)) - } else { AcceptedTransaction::accept(value, limits)? - }; + }; + if error.is_some() { match transaction_executor.validate(tx, wsv) { Err(rejected_transaction) => Ok(rejected_transaction), Ok(_) => Err(TransactionValidationError::RejectedIsValid), }?; + } else { + transaction_executor.validate(tx, wsv).map_err(|(_tx, error)| { + TransactionValidationError::NotValid(error) + })?; } Ok(()) @@ -711,7 +705,7 @@ mod tests { use iroha_data_model::prelude::*; use super::*; - use crate::{kura::Kura, smartcontracts::isi::Registrable as _}; + use crate::{kura::Kura, query::store::LiveQueryStore, smartcontracts::isi::Registrable as _}; #[test] pub fn committed_and_valid_block_hashes_are_equal() { @@ -725,8 +719,8 @@ mod tests { ) } - #[test] - fn should_reject_due_to_repetition() { + #[tokio::test] + async fn should_reject_due_to_repetition() { // Predefined world state let alice_id = AccountId::from_str("alice@wonderland").expect("Valid"); let alice_keys = KeyPair::generate().expect("Valid"); @@ -737,7 +731,8 @@ mod tests { assert!(domain.add_account(account).is_none()); let world = World::with([domain], UniqueVec::new()); let kura = Kura::blank_kura_for_testing(); - let mut wsv = WorldStateView::new(world, kura); + let query_handle = LiveQueryStore::test().start(); + let mut wsv = WorldStateView::new(world, kura, query_handle); // Creating an instruction let asset_definition_id = AssetDefinitionId::from_str("xor#wonderland").expect("Valid"); @@ -767,8 +762,8 @@ mod tests { assert!(valid_block.payload().transactions[1].error.is_some()); } - #[test] - fn tx_order_same_in_validation_and_revalidation() { + #[tokio::test] + async fn tx_order_same_in_validation_and_revalidation() { // Predefined world state let alice_id = AccountId::from_str("alice@wonderland").expect("Valid"); let alice_keys = KeyPair::generate().expect("Valid"); @@ -779,7 +774,8 @@ mod tests { assert!(domain.add_account(account).is_none()); let world = World::with([domain], UniqueVec::new()); let kura = Kura::blank_kura_for_testing(); - let mut wsv = WorldStateView::new(world, kura); + let query_handle = LiveQueryStore::test().start(); + let mut wsv = WorldStateView::new(world, kura, query_handle); // Creating an instruction let asset_definition_id = AssetDefinitionId::from_str("xor#wonderland").expect("Valid"); @@ -834,8 +830,8 @@ mod tests { assert!(valid_block.payload().transactions[2].error.is_none()); } - #[test] - fn failed_transactions_revert() { + #[tokio::test] + async fn failed_transactions_revert() { // Predefined world state let alice_id = AccountId::from_str("alice@wonderland").expect("Valid"); let alice_keys = KeyPair::generate().expect("Valid"); @@ -849,7 +845,8 @@ mod tests { ); let world = World::with([domain], UniqueVec::new()); let kura = Kura::blank_kura_for_testing(); - let mut wsv = WorldStateView::new(world, kura); + let query_handle = LiveQueryStore::test().start(); + let mut wsv = WorldStateView::new(world, kura, query_handle); let transaction_limits = &wsv.transaction_executor().transaction_limits; let domain_id = DomainId::from_str("domain").expect("Valid"); diff --git a/core/src/lib.rs b/core/src/lib.rs index 7337b88851e..e0c6109e31f 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -6,6 +6,7 @@ pub mod executor; pub mod gossiper; pub mod kura; pub mod modules; +pub mod query; pub mod queue; pub mod smartcontracts; pub mod snapshot; diff --git a/cli/src/torii/cursor.rs b/core/src/query/cursor.rs similarity index 71% rename from cli/src/torii/cursor.rs rename to core/src/query/cursor.rs index a3d1aca610c..0757a759081 100644 --- a/cli/src/torii/cursor.rs +++ b/core/src/query/cursor.rs @@ -1,8 +1,14 @@ +//! Module with cursor-based pagination functional like [`Batched`]. + use std::num::{NonZeroU64, NonZeroUsize}; -use crate::torii::{Error, Result}; +use derive_more::Display; +use parity_scale_codec::{Decode, Encode}; +use serde::{Deserialize, Serialize}; +/// Trait for iterators that can be batched. pub trait Batch: IntoIterator + Sized { + /// Pack iterator into batches of the given size. fn batched(self, fetch_size: NonZeroUsize) -> Batched; } @@ -25,10 +31,20 @@ pub struct Batched { cursor: Option, } +/// Unknown cursor error. +/// +/// Happens when client sends a cursor that doesn't match any server's cursor. +#[derive(Debug, Display, thiserror::Error, Copy, Clone, Serialize, Deserialize, Encode, Decode)] +#[display(fmt = "Unknown cursor")] +pub struct UnknownCursor; + impl> Batched { - pub(crate) fn next_batch(&mut self, cursor: Option) -> Result<(I, Option)> { + pub(crate) fn next_batch( + &mut self, + cursor: Option, + ) -> Result<(I, Option), UnknownCursor> { if cursor != self.cursor { - return Err(Error::UnknownCursor); + return Err(UnknownCursor); } let mut batch_size = 0; @@ -72,6 +88,7 @@ impl> Batched { )) } + /// Check if all values where drained from the iterator. pub fn is_depleted(&self) -> bool { self.cursor.is_none() } diff --git a/core/src/query/mod.rs b/core/src/query/mod.rs new file mode 100644 index 00000000000..9b6de03c3e9 --- /dev/null +++ b/core/src/query/mod.rs @@ -0,0 +1,5 @@ +//! This module contains [`QueryService`](service::QueryService) and helpers. + +pub mod cursor; +pub mod pagination; +pub mod store; diff --git a/cli/src/torii/pagination.rs b/core/src/query/pagination.rs similarity index 97% rename from cli/src/torii/pagination.rs rename to core/src/query/pagination.rs index e305edbaec1..d2cc87e88f8 100644 --- a/cli/src/torii/pagination.rs +++ b/core/src/query/pagination.rs @@ -1,3 +1,5 @@ +//! Module with [`Paginate`] iterator adaptor which provides [`paginate`] function. + use iroha_data_model::query::Pagination; /// Describes a collection to which pagination can be applied. diff --git a/core/src/query/store.rs b/core/src/query/store.rs new file mode 100644 index 00000000000..c5ef705c140 --- /dev/null +++ b/core/src/query/store.rs @@ -0,0 +1,279 @@ +//! This module contains [`QueryService`] actor. + +use std::{ + cmp::Ordering, + num::{NonZeroU64, NonZeroUsize}, + time::{Duration, Instant}, +}; + +use dashmap::DashMap; +use iroha_config::live_query_store::Configuration; +use iroha_data_model::{ + asset::AssetValue, + query::{ + cursor::ForwardCursor, error::QueryExecutionFail, pagination::Pagination, sorting::Sorting, + }, + BatchedResponse, BatchedResponseV1, HasMetadata, IdentifiableBox, ValidationFail, Value, +}; +use parity_scale_codec::{Decode, Encode}; +use serde::{Deserialize, Serialize}; +use tokio::{ + sync::{mpsc, oneshot}, + time::sleep, +}; + +use super::{ + cursor::{Batch as _, Batched, UnknownCursor}, + pagination::Paginate as _, +}; +use crate::smartcontracts::query::LazyValue; + +/// Query service error. +#[derive(Debug, thiserror::Error, Copy, Clone, Serialize, Deserialize, Encode, Decode)] +pub enum Error { + /// Unknown cursor error. + #[error(transparent)] + UnknownCursor(#[from] UnknownCursor), + /// Connection with QueryService is closed. + #[error("Connection with QueryService is closed")] + ConnectionClosed, +} + +#[allow(clippy::fallible_impl_from)] +impl From for ValidationFail { + fn from(error: Error) -> Self { + match error { + Error::UnknownCursor(_) => { + ValidationFail::QueryFailed(QueryExecutionFail::UnknownCursor) + } + Error::ConnectionClosed => { + panic!("Connection to `LiveQueryStore` was unexpectedly closed, this is a bug") + } + } + } +} + +/// Result type for [`QueryService`] methods. +pub type Result = std::result::Result; + +type LiveQuery = Batched>; + +/// Service which stores queries which might be non fully consumed by a client. +/// +/// Clients can handle their queries using [`LiveQueryStoreHandle`] +#[derive(Debug)] +pub struct LiveQueryStore { + queries: DashMap, + query_idle_time: Duration, +} + +impl LiveQueryStore { + /// Construct [`QueryService`] from configuration. + pub fn from_configuration(cfg: Configuration) -> Self { + Self { + queries: DashMap::default(), + query_idle_time: Duration::from_millis(cfg.query_idle_time_ms.into()), + } + } + + /// Construct [`QueryService`] for tests. + /// Default configuration will be used. + /// + /// Not marked as `#[cfg(test)]` because it is used in benches as well. + pub fn test() -> Self { + use iroha_config::base::proxy::Builder as _; + + LiveQueryStore::from_configuration( + iroha_config::live_query_store::ConfigurationProxy::default() + .build() + .expect("Failed to build LiveQueryStore configuration from proxy"), + ) + } + + /// Start [`QueryService`]. Requires a [`tokio::runtime::Runtime`] being run + /// as it will create new [`tokio::task`] and detach it. + /// + /// Returns a handle to interact with the service. + pub fn start(self) -> LiveQueryStoreHandle { + const ALL_HANDLERS_DROPPED: &str = + "All handler to LiveQueryStore are dropped. Shutting down..."; + + let (insert_sender, mut insert_receiver) = mpsc::channel(1); + let (remove_sender, mut remove_receiver) = mpsc::channel::<(String, oneshot::Sender<_>)>(1); + + tokio::task::spawn(async move { + loop { + tokio::select! { + _ = sleep(self.query_idle_time) => { + self.queries + .retain(|_, (_, last_access_time)| last_access_time.elapsed() <= self.query_idle_time); + }, + to_insert = insert_receiver.recv() => { + let Some((query_id, live_query)) = to_insert else { + iroha_logger::info!(ALL_HANDLERS_DROPPED); + break; + }; + self.insert(query_id, live_query) + } + to_remove = remove_receiver.recv() => { + let Some((query_id, response_sender)) = to_remove else { + iroha_logger::info!(ALL_HANDLERS_DROPPED); + break; + }; + let live_query_opt = self.remove(&query_id); + let _ = response_sender.send(live_query_opt); + } + else => break, + } + tokio::task::yield_now().await; + } + }); + + LiveQueryStoreHandle { + insert_sender, + remove_sender, + } + } + + fn insert(&self, query_id: String, live_query: LiveQuery) { + self.queries.insert(query_id, (live_query, Instant::now())); + } + + fn remove(&self, query_id: &str) -> Option { + self.queries.remove(query_id).map(|(_, (output, _))| output) + } +} + +/// Handle to interact with [`LiveQueryStore`]. +#[derive(Clone)] +pub struct LiveQueryStoreHandle { + /// Sender to insert a new query with specified id. + insert_sender: mpsc::Sender<(String, LiveQuery)>, + /// Sender to send a tuple of query id and another sender, which will be + /// used by [`LiveQueryStore`] to write a response with optional live query. + remove_sender: mpsc::Sender<(String, oneshot::Sender>)>, +} + +impl LiveQueryStoreHandle { + /// Apply sorting and pagination to the query output. + /// + /// # Errors + /// + /// - Returns [`Error::ConnectionClosed`] if [`QueryService`] is dropped, + /// - Otherwise throws up query output handling errors. + pub fn handle_query_output( + &self, + query_output: LazyValue<'_>, + fetch_size: NonZeroUsize, + sorting: &Sorting, + pagination: Pagination, + ) -> Result> { + match query_output { + LazyValue::Value(batch) => { + let cursor = ForwardCursor::default(); + let result = BatchedResponseV1 { batch, cursor }; + Ok(result.into()) + } + LazyValue::Iter(iter) => { + let live_query = Self::apply_sorting_and_pagination(iter, sorting, pagination); + let query_id = uuid::Uuid::new_v4().to_string(); + + let curr_cursor = Some(0); + let live_query = live_query.batched(fetch_size); + self.construct_query_response(query_id, curr_cursor, live_query) + } + } + } + + /// Retrieve next batch of query output using `cursor`. + /// + /// # Errors + /// + /// - Returns [`Error::ConnectionClosed`] if [`QueryService`] is dropped, + /// - Otherwise throws up query output handling errors. + pub fn handle_query_cursor(&self, cursor: ForwardCursor) -> Result> { + let query_id = cursor.query_id.ok_or(UnknownCursor)?; + let live_query = self.remove(query_id.clone())?.ok_or(UnknownCursor)?; + + self.construct_query_response(query_id, cursor.cursor.map(NonZeroU64::get), live_query) + } + + fn insert(&self, query_id: String, live_query: LiveQuery) -> Result<()> { + self.insert_sender + .blocking_send((query_id, live_query)) + .map_err(|_| Error::ConnectionClosed) + } + + fn remove(&self, query_id: String) -> Result> { + let (sender, receiver) = oneshot::channel(); + + self.remove_sender + .blocking_send((query_id, sender)) + .or(Err(Error::ConnectionClosed))?; + + receiver.blocking_recv().or(Err(Error::ConnectionClosed)) + } + + fn construct_query_response( + &self, + query_id: String, + curr_cursor: Option, + mut live_query: Batched>, + ) -> Result> { + let (batch, next_cursor) = live_query.next_batch(curr_cursor)?; + + if !live_query.is_depleted() { + self.insert(query_id.clone(), live_query)? + } + + let query_response = BatchedResponseV1 { + batch: Value::Vec(batch), + cursor: ForwardCursor { + query_id: Some(query_id), + cursor: next_cursor, + }, + }; + + Ok(query_response.into()) + } + + fn apply_sorting_and_pagination( + iter: impl Iterator, + sorting: &Sorting, + pagination: Pagination, + ) -> Vec { + if let Some(key) = &sorting.sort_by_metadata_key { + let mut pairs: Vec<(Option, Value)> = iter + .map(|value| { + let key = match &value { + Value::Identifiable(IdentifiableBox::Asset(asset)) => match asset.value() { + AssetValue::Store(store) => store.get(key).cloned(), + _ => None, + }, + Value::Identifiable(v) => TryInto::<&dyn HasMetadata>::try_into(v) + .ok() + .and_then(|has_metadata| has_metadata.metadata().get(key)) + .cloned(), + _ => None, + }; + (key, value) + }) + .collect(); + pairs.sort_by( + |(left_key, _), (right_key, _)| match (left_key, right_key) { + (Some(l), Some(r)) => l.cmp(r), + (Some(_), None) => Ordering::Less, + (None, Some(_)) => Ordering::Greater, + (None, None) => Ordering::Equal, + }, + ); + pairs + .into_iter() + .map(|(_, val)| val) + .paginate(pagination) + .collect() + } else { + iter.paginate(pagination).collect() + } + } +} diff --git a/core/src/queue.rs b/core/src/queue.rs index 3d14dc0c628..210f95ef932 100644 --- a/core/src/queue.rs +++ b/core/src/queue.rs @@ -390,9 +390,13 @@ mod tests { use iroha_data_model::{prelude::*, transaction::TransactionLimits}; use iroha_primitives::must_use::MustUse; use rand::Rng as _; + use tokio::test; use super::*; - use crate::{kura::Kura, smartcontracts::isi::Registrable as _, wsv::World, PeersIds}; + use crate::{ + kura::Kura, query::store::LiveQueryStore, smartcontracts::isi::Registrable as _, + wsv::World, PeersIds, + }; fn accepted_tx(account_id: &str, key: KeyPair) -> AcceptedTransaction { let message = std::iter::repeat_with(rand::random::) @@ -422,12 +426,14 @@ mod tests { } #[test] - fn push_tx() { + async fn push_tx() { let key_pair = KeyPair::generate().unwrap(); let kura = Kura::blank_kura_for_testing(); + let query_handle = LiveQueryStore::test().start(); let wsv = Arc::new(WorldStateView::new( world_with_test_domains([key_pair.public_key().clone()]), kura.clone(), + query_handle, )); let queue = Queue::from_configuration(&Configuration { @@ -444,14 +450,16 @@ mod tests { } #[test] - fn push_tx_overflow() { + async fn push_tx_overflow() { let max_txs_in_queue = 10; let key_pair = KeyPair::generate().unwrap(); let kura = Kura::blank_kura_for_testing(); + let query_handle = LiveQueryStore::test().start(); let wsv = Arc::new(WorldStateView::new( world_with_test_domains([key_pair.public_key().clone()]), kura.clone(), + query_handle, )); let queue = Queue::from_configuration(&Configuration { @@ -479,7 +487,7 @@ mod tests { } #[test] - fn push_multisignature_tx() { + async fn push_multisignature_tx() { let max_txs_in_block = 2; let key_pairs = [KeyPair::generate().unwrap(), KeyPair::generate().unwrap()]; let kura = Kura::blank_kura_for_testing(); @@ -494,9 +502,11 @@ mod tests { .build(&account_id); account.signature_check_condition = SignatureCheckCondition::all_account_signatures(); assert!(domain.add_account(account).is_none()); + let query_handle = LiveQueryStore::test().start(); Arc::new(WorldStateView::new( World::with([domain], PeersIds::new()), kura.clone(), + query_handle, )) }; @@ -564,13 +574,15 @@ mod tests { } #[test] - fn get_available_txs() { + async fn get_available_txs() { let max_txs_in_block = 2; let alice_key = KeyPair::generate().expect("Failed to generate keypair."); let kura = Kura::blank_kura_for_testing(); + let query_handle = LiveQueryStore::test().start(); let wsv = Arc::new(WorldStateView::new( world_with_test_domains([alice_key.public_key().clone()]), kura.clone(), + query_handle, )); let queue = Queue::from_configuration(&Configuration { transaction_time_to_live_ms: 100_000, @@ -591,12 +603,14 @@ mod tests { } #[test] - fn push_tx_already_in_blockchain() { + async fn push_tx_already_in_blockchain() { let alice_key = KeyPair::generate().expect("Failed to generate keypair."); let kura = Kura::blank_kura_for_testing(); + let query_handle = LiveQueryStore::test().start(); let mut wsv = WorldStateView::new( world_with_test_domains([alice_key.public_key().clone()]), kura.clone(), + query_handle, ); let tx = accepted_tx("alice@wonderland", alice_key); wsv.transactions.insert(tx.hash(), 1); @@ -618,13 +632,15 @@ mod tests { } #[test] - fn get_tx_drop_if_in_blockchain() { + async fn get_tx_drop_if_in_blockchain() { let max_txs_in_block = 2; let alice_key = KeyPair::generate().expect("Failed to generate keypair."); let kura = Kura::blank_kura_for_testing(); + let query_handle = LiveQueryStore::test().start(); let mut wsv = WorldStateView::new( world_with_test_domains([alice_key.public_key().clone()]), kura.clone(), + query_handle, ); let tx = accepted_tx("alice@wonderland", alice_key); let queue = Queue::from_configuration(&Configuration { @@ -646,13 +662,15 @@ mod tests { } #[test] - fn get_available_txs_with_timeout() { + async fn get_available_txs_with_timeout() { let max_txs_in_block = 6; let alice_key = KeyPair::generate().expect("Failed to generate keypair."); let kura = Kura::blank_kura_for_testing(); + let query_handle = LiveQueryStore::test().start(); let wsv = Arc::new(WorldStateView::new( world_with_test_domains([alice_key.public_key().clone()]), kura.clone(), + query_handle, )); let queue = Queue::from_configuration(&Configuration { transaction_time_to_live_ms: 200, @@ -694,13 +712,15 @@ mod tests { // Queue should only drop transactions which are already committed or ttl expired. // Others should stay in the queue until that moment. #[test] - fn transactions_available_after_pop() { + async fn transactions_available_after_pop() { let max_txs_in_block = 2; let alice_key = KeyPair::generate().expect("Failed to generate keypair."); let kura = Kura::blank_kura_for_testing(); + let query_handle = LiveQueryStore::test().start(); let wsv = Arc::new(WorldStateView::new( world_with_test_domains([alice_key.public_key().clone()]), kura.clone(), + query_handle, )); let queue = Queue::from_configuration(&Configuration { transaction_time_to_live_ms: 100_000, @@ -728,13 +748,15 @@ mod tests { } #[test] - fn custom_expired_transaction_is_rejected() { + async fn custom_expired_transaction_is_rejected() { let max_txs_in_block = 2; let alice_key = KeyPair::generate().expect("Failed to generate keypair."); let kura = Kura::blank_kura_for_testing(); + let query_handle = LiveQueryStore::test().start(); let wsv = Arc::new(WorldStateView::new( world_with_test_domains([alice_key.public_key().clone()]), kura.clone(), + query_handle, )); let queue = Queue::from_configuration(&Configuration { transaction_time_to_live_ms: 100_000, @@ -769,13 +791,15 @@ mod tests { } #[test] - fn concurrent_stress_test() { + async fn concurrent_stress_test() { let max_txs_in_block = 10; let alice_key = KeyPair::generate().expect("Failed to generate keypair."); let kura = Kura::blank_kura_for_testing(); + let query_handle = LiveQueryStore::test().start(); let wsv = WorldStateView::new( world_with_test_domains([alice_key.public_key().clone()]), kura.clone(), + query_handle, ); let queue = Arc::new(Queue::from_configuration(&Configuration { @@ -843,14 +867,16 @@ mod tests { } #[test] - fn push_tx_in_future() { + async fn push_tx_in_future() { let future_threshold_ms = 1000; let alice_key = KeyPair::generate().expect("Failed to generate keypair."); let kura = Kura::blank_kura_for_testing(); + let query_handle = LiveQueryStore::test().start(); let wsv = Arc::new(WorldStateView::new( world_with_test_domains([alice_key.public_key().clone()]), kura.clone(), + query_handle, )); let queue = Queue::from_configuration(&Configuration { @@ -876,7 +902,7 @@ mod tests { } #[test] - fn queue_throttling() { + async fn queue_throttling() { let alice_key_pair = KeyPair::generate().unwrap(); let bob_key_pair = KeyPair::generate().unwrap(); let kura = Kura::blank_kura_for_testing(); @@ -897,7 +923,8 @@ mod tests { assert!(domain.add_account(bob_account).is_none()); World::with([domain], PeersIds::new()) }; - let mut wsv = WorldStateView::new(world, kura.clone()); + let query_handle = LiveQueryStore::test().start(); + let mut wsv = WorldStateView::new(world, kura.clone(), query_handle); let queue = Queue::from_configuration(&Configuration { transaction_time_to_live_ms: 100_000, diff --git a/core/src/smartcontracts/isi/mod.rs b/core/src/smartcontracts/isi/mod.rs index c633adad28c..ae5b286aca5 100644 --- a/core/src/smartcontracts/isi/mod.rs +++ b/core/src/smartcontracts/isi/mod.rs @@ -479,13 +479,15 @@ mod tests { use std::sync::Arc; use iroha_crypto::KeyPair; + use tokio::test; use super::*; - use crate::{kura::Kura, wsv::World, PeersIds}; + use crate::{kura::Kura, query::store::LiveQueryStore, wsv::World, PeersIds}; fn wsv_with_test_domains(kura: &Arc) -> Result { let world = World::with([], PeersIds::new()); - let mut wsv = WorldStateView::new(world, kura.clone()); + let query_handle = LiveQueryStore::test().start(); + let mut wsv = WorldStateView::new(world, kura.clone(), query_handle); let genesis_account_id = AccountId::from_str("genesis@genesis")?; let account_id = AccountId::from_str("alice@wonderland")?; let (public_key, _) = KeyPair::generate()?.into(); @@ -500,7 +502,7 @@ mod tests { } #[test] - fn asset_store() -> Result<()> { + async fn asset_store() -> Result<()> { let kura = Kura::blank_kura_for_testing(); let mut wsv = wsv_with_test_domains(&kura)?; let account_id = AccountId::from_str("alice@wonderland")?; @@ -529,7 +531,7 @@ mod tests { } #[test] - fn account_metadata() -> Result<()> { + async fn account_metadata() -> Result<()> { let kura = Kura::blank_kura_for_testing(); let mut wsv = wsv_with_test_domains(&kura)?; let account_id = AccountId::from_str("alice@wonderland")?; @@ -557,7 +559,7 @@ mod tests { } #[test] - fn asset_definition_metadata() -> Result<()> { + async fn asset_definition_metadata() -> Result<()> { let kura = Kura::blank_kura_for_testing(); let mut wsv = wsv_with_test_domains(&kura)?; let definition_id = AssetDefinitionId::from_str("rose#wonderland")?; @@ -585,7 +587,7 @@ mod tests { } #[test] - fn domain_metadata() -> Result<()> { + async fn domain_metadata() -> Result<()> { let kura = Kura::blank_kura_for_testing(); let mut wsv = wsv_with_test_domains(&kura)?; let domain_id = DomainId::from_str("wonderland")?; @@ -613,7 +615,7 @@ mod tests { } #[test] - fn executing_unregistered_trigger_should_return_error() -> Result<()> { + async fn executing_unregistered_trigger_should_return_error() -> Result<()> { let kura = Kura::blank_kura_for_testing(); let mut wsv = wsv_with_test_domains(&kura)?; let account_id = AccountId::from_str("alice@wonderland")?; @@ -630,7 +632,7 @@ mod tests { } #[test] - fn unauthorized_trigger_execution_should_return_error() -> Result<()> { + async fn unauthorized_trigger_execution_should_return_error() -> Result<()> { let kura = Kura::blank_kura_for_testing(); let mut wsv = wsv_with_test_domains(&kura)?; let account_id = AccountId::from_str("alice@wonderland")?; diff --git a/core/src/smartcontracts/isi/query.rs b/core/src/smartcontracts/isi/query.rs index 5b7795495f5..d8ea3ec9877 100644 --- a/core/src/smartcontracts/isi/query.rs +++ b/core/src/smartcontracts/isi/query.rs @@ -56,7 +56,7 @@ impl_lazy! { } /// Query Request statefully validated on the Iroha node side. -#[derive(Debug, Decode, Encode)] +#[derive(Debug, Clone, Decode, Encode)] #[repr(transparent)] pub struct ValidQueryRequest(SignedQuery); @@ -178,10 +178,11 @@ mod tests { use iroha_data_model::{query::error::FindError, transaction::TransactionLimits}; use iroha_primitives::unique_vec::UniqueVec; use once_cell::sync::Lazy; + use tokio::test; use super::*; use crate::{ - block::*, kura::Kura, smartcontracts::isi::Registrable as _, + block::*, kura::Kura, query::store::LiveQueryStore, smartcontracts::isi::Registrable as _, sumeragi::network_topology::Topology, tx::AcceptedTransaction, wsv::World, PeersIds, }; @@ -256,7 +257,8 @@ mod tests { invalid_tx_per_block: usize, ) -> Result { let kura = Kura::blank_kura_for_testing(); - let mut wsv = WorldStateView::new(world_with_test_domains(), kura.clone()); + let query_handle = LiveQueryStore::test().start(); + let mut wsv = WorldStateView::new(world_with_test_domains(), kura.clone(), query_handle); let limits = TransactionLimits { max_instruction_number: 1, @@ -312,9 +314,10 @@ mod tests { } #[test] - fn asset_store() -> Result<()> { + async fn asset_store() -> Result<()> { let kura = Kura::blank_kura_for_testing(); - let wsv = WorldStateView::new(world_with_test_asset_with_metadata(), kura); + let query_handle = LiveQueryStore::test().start(); + let wsv = WorldStateView::new(world_with_test_asset_with_metadata(), kura, query_handle); let asset_definition_id = AssetDefinitionId::from_str("rose#wonderland")?; let asset_id = AssetId::new(asset_definition_id, ALICE_ID.clone()); @@ -328,9 +331,10 @@ mod tests { } #[test] - fn account_metadata() -> Result<()> { + async fn account_metadata() -> Result<()> { let kura = Kura::blank_kura_for_testing(); - let wsv = WorldStateView::new(world_with_test_account_with_metadata()?, kura); + let query_handle = LiveQueryStore::test().start(); + let wsv = WorldStateView::new(world_with_test_account_with_metadata()?, kura, query_handle); let bytes = FindAccountKeyValueByIdAndKey::new(ALICE_ID.clone(), Name::from_str("Bytes")?) .execute(&wsv)?; @@ -342,7 +346,7 @@ mod tests { } #[test] - fn find_all_blocks() -> Result<()> { + async fn find_all_blocks() -> Result<()> { let num_blocks = 100; let wsv = wsv_with_test_blocks_and_transactions(num_blocks, 1, 1)?; @@ -355,7 +359,7 @@ mod tests { } #[test] - fn find_all_block_headers() -> Result<()> { + async fn find_all_block_headers() -> Result<()> { let num_blocks = 100; let wsv = wsv_with_test_blocks_and_transactions(num_blocks, 1, 1)?; @@ -368,7 +372,7 @@ mod tests { } #[test] - fn find_block_header_by_hash() -> Result<()> { + async fn find_block_header_by_hash() -> Result<()> { let wsv = wsv_with_test_blocks_and_transactions(1, 1, 1)?; let block = wsv.all_blocks().last().expect("WSV is empty"); @@ -387,7 +391,7 @@ mod tests { } #[test] - fn find_all_transactions() -> Result<()> { + async fn find_all_transactions() -> Result<()> { let num_blocks = 100; let wsv = wsv_with_test_blocks_and_transactions(num_blocks, 1, 1)?; @@ -411,9 +415,10 @@ mod tests { } #[test] - fn find_transaction() -> Result<()> { + async fn find_transaction() -> Result<()> { let kura = Kura::blank_kura_for_testing(); - let mut wsv = WorldStateView::new(world_with_test_domains(), kura.clone()); + let query_handle = LiveQueryStore::test().start(); + let mut wsv = WorldStateView::new(world_with_test_domains(), kura.clone(), query_handle); let instructions: [InstructionExpr; 0] = []; let tx = TransactionBuilder::new(ALICE_ID.clone()) @@ -453,7 +458,7 @@ mod tests { } #[test] - fn domain_metadata() -> Result<()> { + async fn domain_metadata() -> Result<()> { let kura = Kura::blank_kura_for_testing(); let wsv = { let mut metadata = Metadata::new(); @@ -474,7 +479,8 @@ mod tests { AssetDefinition::quantity(asset_definition_id).build(&ALICE_ID) ) .is_none()); - WorldStateView::new(World::with([domain], PeersIds::new()), kura) + let query_handle = LiveQueryStore::test().start(); + WorldStateView::new(World::with([domain], PeersIds::new()), kura, query_handle) }; let domain_id = DomainId::from_str("wonderland")?; diff --git a/core/src/smartcontracts/wasm.rs b/core/src/smartcontracts/wasm.rs index c307455a06f..cdd271f17fa 100644 --- a/core/src/smartcontracts/wasm.rs +++ b/core/src/smartcontracts/wasm.rs @@ -3,6 +3,8 @@ //! to wasm format and submitted in a transaction #![allow(clippy::doc_link_with_quotes, clippy::arithmetic_side_effects)] +use std::num::NonZeroUsize; + use error::*; use import_traits::{ ExecuteOperations as _, GetExecutorPayloads as _, SetPermissionTokenSchema as _, @@ -17,9 +19,12 @@ use iroha_data_model::{ isi::InstructionExpr, permission::PermissionTokenSchema, prelude::*, - query::QueryBox, - smart_contract::payloads::{self, Validate}, - Level as LogLevel, ValidationFail, + query::{QueryBox, QueryWithParameters}, + smart_contract::{ + payloads::{self, Validate}, + SmartContractQueryRequest, + }, + BatchedResponse, Level as LogLevel, ValidationFail, }; use iroha_logger::debug; // NOTE: Using error_span so that span info is logged on every event @@ -31,11 +36,7 @@ use wasmtime::{ }; use self::state::Authority; -use super::query::LazyValue; -use crate::{ - smartcontracts::{Execute, ValidQuery as _}, - wsv::WorldStateView, -}; +use crate::{smartcontracts::Execute, wsv::WorldStateView, ValidQuery as _}; /// Name of the exported memory const WASM_MEMORY: &str = "memory"; @@ -79,7 +80,10 @@ mod import_traits { pub trait ExecuteOperations { /// Execute `query` on host #[codec::wrap_trait_fn] - fn execute_query(query: QueryBox, state: &S) -> Result; + fn execute_query( + query_request: SmartContractQueryRequest, + state: &S, + ) -> Result, ValidationFail>; /// Execute `instruction` on host #[codec::wrap_trait_fn] @@ -767,26 +771,36 @@ impl Runtime { #[allow(clippy::needless_pass_by_value)] impl Runtime { - fn default_execute_query(query: QueryBox, state: &S) -> Result { - iroha_logger::debug!(%query, "Executing"); + fn default_execute_query( + query_request: SmartContractQueryRequest, + state: &S, + ) -> Result, ValidationFail> { + iroha_logger::debug!(%query_request, "Executing"); let wsv = state.wsv(); - // NOTE: Smart contract (not executor) is trying to execute the query, validate it first - // TODO: Validation should be skipped when executing smart contract. - // There should be two steps validation and execution. First smart contract - // is validated and then it's executed. Here it's validating in both steps. - // Add a flag indicating whether smart contract is being validated or executed - wsv.executor() - .validate_query(wsv, state.authority(), query.clone())?; - - query - .execute(wsv) - .map(|lazy_value| match lazy_value { - LazyValue::Value(value) => value, - LazyValue::Iter(iter) => Value::Vec(iter.collect()), - }) - .map_err(Into::into) + match query_request { + SmartContractQueryRequest::Query(QueryWithParameters { + query, + sorting, + pagination, + }) => { + wsv.executor() + .validate_query(wsv, state.authority(), query.clone())?; + let output = query.execute(wsv)?; + + wsv.query_handle().handle_query_output( + output, + NonZeroUsize::new(30_000).expect("30 000 is not zero"), + &sorting, + pagination, + ) + } + SmartContractQueryRequest::Cursor(cursor) => { + wsv.query_handle().handle_query_cursor(cursor) + } + } + .map_err(Into::into) } fn default_execute_instruction( @@ -884,10 +898,10 @@ impl<'wrld> import_traits::ExecuteOperations> { #[codec::wrap] fn execute_query( - query: QueryBox, + query_request: SmartContractQueryRequest, state: &state::SmartContract<'wrld>, - ) -> Result { - Self::default_execute_query(query, state) + ) -> Result, ValidationFail> { + Self::default_execute_query(query_request, state) } #[codec::wrap] @@ -950,10 +964,10 @@ impl<'wrld> import_traits::ExecuteOperations> { #[codec::wrap] fn execute_query( - query: QueryBox, + query_request: SmartContractQueryRequest, state: &state::Trigger<'wrld>, - ) -> Result { - Self::default_execute_query(query, state) + ) -> Result, ValidationFail> { + Self::default_execute_query(query_request, state) } #[codec::wrap] @@ -977,16 +991,34 @@ where S: state::Wsv + state::WsvMut + state::Authority, { #[codec::wrap] - fn execute_query(query: QueryBox, state: &S) -> Result { - debug!(%query, "Executing as executor"); - - query - .execute(state.wsv()) - .map(|lazy_value| match lazy_value { - LazyValue::Value(value) => value, - LazyValue::Iter(iter) => Value::Vec(iter.collect()), - }) - .map_err(Into::into) + fn execute_query( + query_request: SmartContractQueryRequest, + state: &S, + ) -> Result, ValidationFail> { + debug!(%query_request, "Executing as executor"); + + let wsv = state.wsv(); + + match query_request { + SmartContractQueryRequest::Query(QueryWithParameters { + query, + sorting, + pagination, + }) => { + let output = query.execute(wsv)?; + + wsv.query_handle().handle_query_output( + output, + NonZeroUsize::new(30_000).expect("30 000 is not zero"), + &sorting, + pagination, + ) + } + SmartContractQueryRequest::Cursor(cursor) => { + wsv.query_handle().handle_query_cursor(cursor) + } + } + .map_err(Into::into) } #[codec::wrap] @@ -1213,18 +1245,33 @@ impl<'wrld> import_traits::ExecuteOperations, - ) -> Result { - debug!(%query, "Executing as executor"); - - query - .execute(state.wsv()) - .map(|lazy_value| match lazy_value { - LazyValue::Value(value) => value, - LazyValue::Iter(iter) => Value::Vec(iter.collect()), - }) - .map_err(Into::into) + ) -> Result, ValidationFail> { + debug!(%query_request, "Executing as executor"); + + let wsv = state.wsv(); + + match query_request { + SmartContractQueryRequest::Query(QueryWithParameters { + query, + sorting, + pagination, + }) => { + let output = query.execute(wsv)?; + + wsv.query_handle().handle_query_output( + output, + NonZeroUsize::new(30_000).expect("30 000 is not zero"), + &sorting, + pagination, + ) + } + SmartContractQueryRequest::Cursor(cursor) => { + wsv.query_handle().handle_query_cursor(cursor) + } + } + .map_err(Into::into) } #[codec::wrap] @@ -1619,10 +1666,15 @@ mod tests { use std::str::FromStr as _; use iroha_crypto::KeyPair; + use iroha_data_model::query::{sorting::Sorting, Pagination}; use parity_scale_codec::Encode; + use tokio::test; use super::*; - use crate::{kura::Kura, smartcontracts::isi::Registrable as _, PeersIds, World}; + use crate::{ + kura::Kura, query::store::LiveQueryStore, smartcontracts::isi::Registrable as _, PeersIds, + World, + }; fn world_with_test_account(authority: &AccountId) -> World { let domain_id = authority.domain_id.clone(); @@ -1679,10 +1731,11 @@ mod tests { } #[test] - fn execute_instruction_exported() -> Result<(), Error> { + async fn execute_instruction_exported() -> Result<(), Error> { let authority = AccountId::from_str("alice@wonderland").expect("Valid"); let kura = Kura::blank_kura_for_testing(); - let mut wsv = WorldStateView::new(world_with_test_account(&authority), kura); + let query_handle = LiveQueryStore::test().start(); + let mut wsv = WorldStateView::new(world_with_test_account(&authority), kura, query_handle); let isi_hex = { let new_authority = AccountId::from_str("mad_hatter@wonderland").expect("Valid"); @@ -1720,11 +1773,16 @@ mod tests { } #[test] - fn execute_query_exported() -> Result<(), Error> { + async fn execute_query_exported() -> Result<(), Error> { let authority = AccountId::from_str("alice@wonderland").expect("Valid"); let kura = Kura::blank_kura_for_testing(); - let mut wsv = WorldStateView::new(world_with_test_account(&authority), kura); - let query_hex = encode_hex(QueryBox::from(FindAccountById::new(authority.clone()))); + let query_handle = LiveQueryStore::test().start(); + let mut wsv = WorldStateView::new(world_with_test_account(&authority), kura, query_handle); + let query_hex = encode_hex(SmartContractQueryRequest::Query(QueryWithParameters { + query: QueryBox::from(FindAccountById::new(authority.clone())), + sorting: Sorting::default(), + pagination: Pagination::default(), + })); let wat = format!( r#" @@ -1757,11 +1815,12 @@ mod tests { } #[test] - fn instruction_limit_reached() -> Result<(), Error> { + async fn instruction_limit_reached() -> Result<(), Error> { let authority = AccountId::from_str("alice@wonderland").expect("Valid"); let kura = Kura::blank_kura_for_testing(); + let query_handle = LiveQueryStore::test().start(); - let mut wsv = WorldStateView::new(world_with_test_account(&authority), kura); + let mut wsv = WorldStateView::new(world_with_test_account(&authority), kura, query_handle); let isi_hex = { let new_authority = AccountId::from_str("mad_hatter@wonderland").expect("Valid"); @@ -1806,10 +1865,11 @@ mod tests { } #[test] - fn instructions_not_allowed() -> Result<(), Error> { + async fn instructions_not_allowed() -> Result<(), Error> { let authority = AccountId::from_str("alice@wonderland").expect("Valid"); let kura = Kura::blank_kura_for_testing(); - let mut wsv = WorldStateView::new(world_with_test_account(&authority), kura); + let query_handle = LiveQueryStore::test().start(); + let mut wsv = WorldStateView::new(world_with_test_account(&authority), kura, query_handle); let isi_hex = { let new_authority = AccountId::from_str("mad_hatter@wonderland").expect("Valid"); @@ -1854,10 +1914,11 @@ mod tests { } #[test] - fn queries_not_allowed() -> Result<(), Error> { + async fn queries_not_allowed() -> Result<(), Error> { let authority = AccountId::from_str("alice@wonderland").expect("Valid"); let kura = Kura::blank_kura_for_testing(); - let mut wsv = WorldStateView::new(world_with_test_account(&authority), kura); + let query_handle = LiveQueryStore::test().start(); + let mut wsv = WorldStateView::new(world_with_test_account(&authority), kura, query_handle); let query_hex = encode_hex(QueryBox::from(FindAccountById::new(authority.clone()))); let wat = format!( @@ -1895,10 +1956,11 @@ mod tests { } #[test] - fn trigger_related_func_is_not_linked_for_smart_contract() -> Result<(), Error> { + async fn trigger_related_func_is_not_linked_for_smart_contract() -> Result<(), Error> { let authority = AccountId::from_str("alice@wonderland").expect("Valid"); let kura = Kura::blank_kura_for_testing(); - let mut wsv = WorldStateView::new(world_with_test_account(&authority), kura); + let query_handle = LiveQueryStore::test().start(); + let mut wsv = WorldStateView::new(world_with_test_account(&authority), kura, query_handle); let query_hex = encode_hex(QueryBox::from(FindAccountById::new(authority.clone()))); let wat = format!( diff --git a/core/src/snapshot.rs b/core/src/snapshot.rs index bc8b9f8b376..182ecf9dbd8 100644 --- a/core/src/snapshot.rs +++ b/core/src/snapshot.rs @@ -22,6 +22,7 @@ use tokio::sync::mpsc; use crate::{ kura::{BlockCount, Kura}, + query::store::LiveQueryStoreHandle, sumeragi::SumeragiHandle, wsv::{KuraSeed, WorldStateView}, }; @@ -162,6 +163,7 @@ impl SnapshotMaker { pub fn try_read_snapshot( snapshot_dir: impl AsRef, kura: &Arc, + query_handle: LiveQueryStoreHandle, BlockCount(block_count): BlockCount, ) -> Result { let mut bytes = Vec::new(); @@ -175,6 +177,7 @@ pub fn try_read_snapshot( let mut deserializer = serde_json::Deserializer::from_slice(&bytes); let seed = KuraSeed { kura: Arc::clone(kura), + query_handle, }; let wsv = seed.deserialize(&mut deserializer)?; let snapshot_height = wsv.block_hashes.len(); diff --git a/core/src/sumeragi/main_loop.rs b/core/src/sumeragi/main_loop.rs index 3a0aef20b68..10f9c063e92 100644 --- a/core/src/sumeragi/main_loop.rs +++ b/core/src/sumeragi/main_loop.rs @@ -47,11 +47,11 @@ pub struct Sumeragi { pub debug_force_soft_fork: bool, /// The current network topology. pub current_topology: Topology, - /// The sumeragi internal `WorldStateView`. This will probably + /// The sumeragi internal [`WorldStateView`]. This will probably /// morph into a wsv + various patches as we attempt to /// multithread isi execution. In the future we might also once /// again merge the internal wsv with the public facing one. But - /// as of now we keep them seperate for greater flexibility when + /// as of now we keep them separate for greater flexibility when /// optimizing. pub wsv: WorldStateView, /// A copy of wsv that is kept one block behind at all times. Because @@ -1161,9 +1161,10 @@ fn handle_block_sync( #[cfg(test)] mod tests { use iroha_primitives::unique_vec; + use tokio::test; use super::*; - use crate::smartcontracts::Registrable; + use crate::{query::store::LiveQueryStore, smartcontracts::Registrable}; fn create_data_for_test( topology: &Topology, @@ -1179,7 +1180,8 @@ mod tests { assert!(domain.add_account(account).is_none()); let world = World::with([domain], topology.ordered_peers.clone()); let kura = Kura::blank_kura_for_testing(); - let mut wsv = WorldStateView::new(world, Arc::clone(&kura)); + let query_handle = LiveQueryStore::test().start(); + let mut wsv = WorldStateView::new(world, Arc::clone(&kura), query_handle); // Create "genesis" block // Creating an instruction @@ -1237,7 +1239,7 @@ mod tests { #[test] #[allow(clippy::redundant_clone)] - fn block_sync_invalid_block() { + async fn block_sync_invalid_block() { let leader_key_pair = KeyPair::generate().unwrap(); let topology = Topology::new(unique_vec![PeerId::new( &"127.0.0.1:8080".parse().unwrap(), @@ -1254,7 +1256,7 @@ mod tests { } #[test] - fn block_sync_invalid_soft_fork_block() { + async fn block_sync_invalid_soft_fork_block() { let leader_key_pair = KeyPair::generate().unwrap(); let topology = Topology::new(unique_vec![PeerId::new( &"127.0.0.1:8080".parse().unwrap(), @@ -1281,7 +1283,7 @@ mod tests { #[test] #[allow(clippy::redundant_clone)] - fn block_sync_not_proper_height() { + async fn block_sync_not_proper_height() { let topology = Topology::new(UniqueVec::new()); let leader_key_pair = KeyPair::generate().unwrap(); let (finalized_wsv, _, mut block) = create_data_for_test(&topology, leader_key_pair); @@ -1305,7 +1307,7 @@ mod tests { #[test] #[allow(clippy::redundant_clone)] - fn block_sync_commit_block() { + async fn block_sync_commit_block() { let leader_key_pair = KeyPair::generate().unwrap(); let topology = Topology::new(unique_vec![PeerId::new( &"127.0.0.1:8080".parse().unwrap(), @@ -1318,7 +1320,7 @@ mod tests { } #[test] - fn block_sync_replace_top_block() { + async fn block_sync_replace_top_block() { let leader_key_pair = KeyPair::generate().unwrap(); let topology = Topology::new(unique_vec![PeerId::new( &"127.0.0.1:8080".parse().unwrap(), @@ -1342,7 +1344,7 @@ mod tests { } #[test] - fn block_sync_small_view_change_index() { + async fn block_sync_small_view_change_index() { let leader_key_pair = KeyPair::generate().unwrap(); let topology = Topology::new(unique_vec![PeerId::new( &"127.0.0.1:8080".parse().unwrap(), @@ -1379,7 +1381,7 @@ mod tests { #[test] #[allow(clippy::redundant_clone)] - fn block_sync_genesis_block_do_not_replace() { + async fn block_sync_genesis_block_do_not_replace() { let topology = Topology::new(UniqueVec::new()); let leader_key_pair = KeyPair::generate().unwrap(); let (finalized_wsv, _, mut block) = create_data_for_test(&topology, leader_key_pair); diff --git a/core/src/wsv.rs b/core/src/wsv.rs index 049947db282..dbcb732c441 100644 --- a/core/src/wsv.rs +++ b/core/src/wsv.rs @@ -46,6 +46,7 @@ use crate::{ block::CommittedBlock, executor::Executor, kura::Kura, + query::store::LiveQueryStoreHandle, smartcontracts::{ triggers::{ self, @@ -295,6 +296,9 @@ pub struct WorldStateView { /// Reference to Kura subsystem. #[serde(skip)] kura: Arc, + /// Handle to the [`LiveQueryStore`]. + #[serde(skip)] + query_handle: LiveQueryStoreHandle, /// Temporary metrics buffer of amounts of any asset that has been transacted. #[serde(skip)] pub new_tx_amounts: Arc>>, @@ -304,6 +308,8 @@ pub struct WorldStateView { pub struct KuraSeed { /// Kura subsystem reference pub kura: Arc, + /// Handle to the [`LiveQueryStore`](crate::query::store::LiveQueryStore). + pub query_handle: LiveQueryStoreHandle, } impl<'de> DeserializeSeed<'de> for KuraSeed { @@ -366,6 +372,7 @@ impl<'de> DeserializeSeed<'de> for KuraSeed { transactions: transactions .ok_or_else(|| serde::de::Error::missing_field("transactions"))?, kura: self.loader.kura, + query_handle: self.loader.query_handle, engine, events_buffer: Vec::new(), new_tx_amounts: Arc::new(Mutex::new(Vec::new())), @@ -392,6 +399,7 @@ impl Clone for WorldStateView { new_tx_amounts: Arc::clone(&self.new_tx_amounts), engine: self.engine.clone(), kura: Arc::clone(&self.kura), + query_handle: self.query_handle.clone(), } } } @@ -401,12 +409,12 @@ impl WorldStateView { /// Construct [`WorldStateView`] with given [`World`]. #[must_use] #[inline] - pub fn new(world: World, kura: Arc) -> Self { + pub fn new(world: World, kura: Arc, query_handle: LiveQueryStoreHandle) -> Self { // Added to remain backward compatible with other code primary in tests let config = ConfigurationProxy::default() .build() .expect("Wsv proxy always builds"); - Self::from_configuration(config, world, kura) + Self::from_configuration(config, world, kura, query_handle) } /// Get `Account`'s `Asset`s @@ -914,7 +922,12 @@ impl WorldStateView { /// Construct [`WorldStateView`] with specific [`Configuration`]. #[inline] - pub fn from_configuration(config: Configuration, world: World, kura: Arc) -> Self { + pub fn from_configuration( + config: Configuration, + world: World, + kura: Arc, + query_handle: LiveQueryStoreHandle, + ) -> Self { Self { world, config, @@ -924,6 +937,7 @@ impl WorldStateView { new_tx_amounts: Arc::new(Mutex::new(Vec::new())), engine: wasm::create_engine(), kura, + query_handle, } } @@ -1262,6 +1276,11 @@ impl WorldStateView { }, ))) } + + /// Get reference to the [`LiveQueryStoreHandle`]. + pub fn query_handle(&self) -> &LiveQueryStoreHandle { + &self.query_handle + } } /// Bounds for `range` queries @@ -1335,16 +1354,20 @@ mod tests { use iroha_primitives::unique_vec::UniqueVec; use super::*; - use crate::{block::ValidBlock, role::RoleIdWithOwner, sumeragi::network_topology::Topology}; + use crate::{ + block::ValidBlock, query::store::LiveQueryStore, role::RoleIdWithOwner, + sumeragi::network_topology::Topology, + }; - #[test] - fn get_block_hashes_after_hash() { + #[tokio::test] + async fn get_block_hashes_after_hash() { const BLOCK_CNT: usize = 10; let topology = Topology::new(UniqueVec::new()); let block = ValidBlock::new_dummy().commit(&topology).unwrap(); let kura = Kura::blank_kura_for_testing(); - let mut wsv = WorldStateView::new(World::default(), kura); + let query_handle = LiveQueryStore::test().start(); + let mut wsv = WorldStateView::new(World::default(), kura, query_handle); let mut block_hashes = vec![]; for i in 1..=BLOCK_CNT { @@ -1364,14 +1387,15 @@ mod tests { .eq(block_hashes.into_iter().skip(7))); } - #[test] - fn get_blocks_from_height() { + #[tokio::test] + async fn get_blocks_from_height() { const BLOCK_CNT: usize = 10; let topology = Topology::new(UniqueVec::new()); let block = ValidBlock::new_dummy().commit(&topology).unwrap(); let kura = Kura::blank_kura_for_testing(); - let mut wsv = WorldStateView::new(World::default(), kura.clone()); + let query_handle = LiveQueryStore::test().start(); + let mut wsv = WorldStateView::new(World::default(), kura.clone(), query_handle); for i in 1..=BLOCK_CNT { let mut block = block.clone(); diff --git a/data_model/src/lib.rs b/data_model/src/lib.rs index 614e2d8d625..8038d813b3a 100644 --- a/data_model/src/lib.rs +++ b/data_model/src/lib.rs @@ -35,7 +35,7 @@ use core::{ use block::SignedBlock; #[cfg(not(target_arch = "aarch64"))] use derive_more::Into; -use derive_more::{AsRef, DebugCustom, Deref, Display, From, FromStr}; +use derive_more::{AsRef, Constructor, DebugCustom, Deref, Display, From, FromStr}; use events::TriggeringFilterBox; use getset::Getters; use iroha_crypto::{HashOf, PublicKey}; @@ -48,6 +48,7 @@ use iroha_primitives::{ small::{Array as SmallArray, SmallVec}, }; use iroha_schema::IntoSchema; +use iroha_version::{declare_versioned_with_scale, version_with_scale}; pub use numeric::model::NumericValue; use parity_scale_codec::{Decode, Encode}; use prelude::{Executable, SignedTransaction, TransactionQueryOutput}; @@ -990,6 +991,21 @@ pub mod model { /// Error ERROR, } + + /// Batched response of a query sent to torii + #[derive( + Debug, Clone, Constructor, Getters, Decode, Encode, Deserialize, Serialize, IntoSchema, + )] + #[version_with_scale(version = 1, versioned_alias = "BatchedResponse")] + #[getset(get = "pub")] + #[must_use] + pub struct BatchedResponseV1 { + /// Current batch + pub batch: T, + /// Index of the next element in the result set. Client will use this value + /// in the next request to continue fetching results of the original query + pub cursor: crate::query::cursor::ForwardCursor, + } } impl Identifiable for TriggerBox { @@ -1843,55 +1859,13 @@ pub fn current_time() -> core::time::Duration { .expect("Failed to get the current system time") } -#[cfg(feature = "http")] -pub mod http { - //! Structures related to HTTP communication - - use iroha_data_model_derive::model; - use iroha_schema::IntoSchema; - use iroha_version::declare_versioned_with_scale; - - pub use self::model::*; - use crate::prelude::QueryOutput; - - declare_versioned_with_scale!(BatchedResponse 1..2, Debug, Clone, iroha_macro::FromVariant, IntoSchema); - - #[model] - pub mod model { - use getset::Getters; - use iroha_version::version_with_scale; - use parity_scale_codec::{Decode, Encode}; - use serde::{Deserialize, Serialize}; - - use super::*; - - /// Batched response of a query sent to torii - #[derive(Debug, Clone, Getters, Decode, Encode, Deserialize, Serialize, IntoSchema)] - #[version_with_scale(version = 1, versioned_alias = "BatchedResponse")] - #[getset(get = "pub")] - #[must_use] - pub struct BatchedResponseV1 { - /// Current batch - pub batch: T, - /// Index of the next element in the result set. Client will use this value - /// in the next request to continue fetching results of the original query - pub cursor: crate::query::cursor::ForwardCursor, - } - } - - impl From> for QueryOutput { - #[inline] - fn from(source: BatchedResponseV1) -> Self { - source.batch - } - } +declare_versioned_with_scale!(BatchedResponse 1..2, Debug, Clone, iroha_macro::FromVariant, IntoSchema); - impl From> for (T, crate::query::cursor::ForwardCursor) { - fn from(source: BatchedResponseV1) -> Self { - let BatchedResponseV1 { batch, cursor } = source; +impl From> for (T, crate::query::cursor::ForwardCursor) { + fn from(source: BatchedResponseV1) -> Self { + let BatchedResponseV1 { batch, cursor } = source; - (batch, cursor) - } + (batch, cursor) } } diff --git a/data_model/src/query/cursor.rs b/data_model/src/query/cursor.rs index 1c3f4a24542..12870d971f3 100644 --- a/data_model/src/query/cursor.rs +++ b/data_model/src/query/cursor.rs @@ -1,5 +1,12 @@ //! Structures and traits related to server-side cursor. +#[cfg(not(feature = "std"))] +use alloc::{ + format, + string::{String, ToString as _}, + vec, + vec::Vec, +}; use core::num::{NonZeroU64, NonZeroUsize}; use getset::Getters; @@ -27,6 +34,15 @@ pub mod model { /// Pointer to the next element in the result set pub cursor: Option, } + + impl ForwardCursor { + /// Create a new cursor. + // + // `derive_more::Constructor` isn't used because we need `const`. + pub const fn new(query_id: Option, cursor: Option) -> Self { + Self { query_id, cursor } + } + } } mod candidate { diff --git a/data_model/src/query/mod.rs b/data_model/src/query/mod.rs index ca6d7e67651..5aa9026332a 100644 --- a/data_model/src/query/mod.rs +++ b/data_model/src/query/mod.rs @@ -3,22 +3,24 @@ #![allow(clippy::missing_inline_in_public_items, unused_imports)] #[cfg(not(feature = "std"))] -use alloc::{boxed::Box, format, string::String, vec::Vec}; +use alloc::{ + boxed::Box, + format, + string::{String, ToString as _}, + vec::Vec, +}; use core::cmp::Ordering; -#[cfg(feature = "http")] pub use cursor::ForwardCursor; -use derive_more::Display; +use derive_more::{Constructor, Display}; use iroha_crypto::{PublicKey, SignatureOf}; use iroha_data_model_derive::model; use iroha_macro::FromVariant; use iroha_schema::IntoSchema; use iroha_version::prelude::*; -#[cfg(feature = "http")] pub use pagination::Pagination; use parity_scale_codec::{Decode, Encode}; use serde::{Deserialize, Serialize}; -#[cfg(feature = "http")] pub use sorting::Sorting; pub use self::model::*; @@ -34,11 +36,8 @@ use crate::{ Identifiable, Value, }; -#[cfg(feature = "http")] pub mod cursor; -#[cfg(feature = "http")] pub mod pagination; -#[cfg(feature = "http")] pub mod sorting; macro_rules! queries { @@ -164,6 +163,27 @@ pub mod model { )] #[ffi_type] pub struct MetadataValue(pub Value); + + /// Request type clients (like http clients or wasm) can send to a query endpoint. + #[derive(Debug, Clone, Encode, Decode, Serialize, Deserialize)] + pub enum QueryRequest { + /// Query it-self. + /// Basically used for one-time queries or to get a cursor for big queries. + Query(QueryWithParameters), + /// Cursor over previously sent [`Query`](QueryRequest::Query). + Cursor(ForwardCursor), + } + + /// Query with parameters client can specify. + #[derive( + Debug, Constructor, Getters, Clone, PartialEq, Eq, Encode, Decode, Serialize, Deserialize, + )] + #[getset(get = "pub")] + pub struct QueryWithParameters { + pub query: Q, + pub sorting: Sorting, + pub pagination: Pagination, + } } impl From for Value { @@ -209,6 +229,25 @@ impl Ord for TransactionQueryOutput { } } +impl core::fmt::Display for QueryRequest { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + match self { + Self::Query(query) => write!(f, "{query}"), + Self::Cursor(cursor) => write!(f, "{cursor:?}"), + } + } +} + +impl core::fmt::Display for QueryWithParameters { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + f.debug_struct("QueryWithParameters") + .field("query", &self.query.to_string()) + .field("sorting", &self.sorting) + .field("pagination", &self.pagination) + .finish() + } +} + pub mod role { //! Queries related to [`Role`]. @@ -1237,10 +1276,6 @@ pub mod http { use super::*; use crate::{account::AccountId, predicate::PredicateBox}; - // TODO: Could we make a variant of `Value` that holds only query results? - /// Type representing Result of executing a query - pub type QueryOutput = Value; - declare_versioned_with_scale!(SignedQuery 1..2, Debug, Clone, iroha_macro::FromVariant, IntoSchema); #[model] @@ -1280,6 +1315,9 @@ pub mod http { /// Payload pub payload: QueryPayload, } + + /// End type of a query http clients can send to an endpoint. + pub type ClientQueryRequest = QueryRequest; } mod candidate { @@ -1454,6 +1492,8 @@ pub mod error { #[skip_try_from] String, ), + /// Unknown query cursor + UnknownCursor, } /// Type assertion error diff --git a/data_model/src/query/pagination.rs b/data_model/src/query/pagination.rs index 71a12d95f8c..f93a1dadc25 100644 --- a/data_model/src/query/pagination.rs +++ b/data_model/src/query/pagination.rs @@ -17,17 +17,14 @@ use derive_more::{Constructor, Display}; use iroha_data_model_derive::model; use parity_scale_codec::{Decode, Encode}; use serde::{Deserialize, Serialize}; -use warp::{ - http::StatusCode, - reply::{self, Response}, - Reply, -}; const PAGINATION_START: &str = "start"; const PAGINATION_LIMIT: &str = "limit"; /// Structure for pagination requests -#[derive(Debug, Display, Clone, Copy, Default, Decode, Encode, Deserialize, Serialize)] +#[derive( + Debug, Display, Clone, Copy, PartialEq, Eq, Default, Decode, Encode, Deserialize, Serialize, +)] #[display( fmt = "{}--{}", "start.map(NonZeroU64::get).unwrap_or(0)", diff --git a/data_model/src/query/sorting.rs b/data_model/src/query/sorting.rs index 9ccbc83c610..015aeb3f34d 100644 --- a/data_model/src/query/sorting.rs +++ b/data_model/src/query/sorting.rs @@ -4,6 +4,7 @@ use alloc::{ format, string::{String, ToString as _}, + vec, vec::Vec, }; @@ -21,7 +22,7 @@ pub mod model { use super::*; /// Struct for sorting requests - #[derive(Debug, Clone, Default, Decode, Encode, Deserialize, Serialize)] + #[derive(Debug, Clone, Default, PartialEq, Eq, Decode, Encode, Deserialize, Serialize)] pub struct Sorting { /// Sort query result using [`Name`] of the key in [`Asset`]'s metadata. pub sort_by_metadata_key: Option, diff --git a/data_model/src/smart_contract.rs b/data_model/src/smart_contract.rs index f159fa69531..7091fc9101d 100644 --- a/data_model/src/smart_contract.rs +++ b/data_model/src/smart_contract.rs @@ -1,5 +1,7 @@ //! This module contains data and structures related only to smart contract execution +use crate::query::{QueryBox, QueryRequest}; + pub mod payloads { //! Payloads with function arguments for different entrypoints @@ -41,3 +43,6 @@ pub mod payloads { pub to_validate: T, } } + +/// Request type for `execute_query()` function. +pub type SmartContractQueryRequest = QueryRequest; diff --git a/docs/source/references/config.md b/docs/source/references/config.md index b43fe9111f2..d8153062d07 100644 --- a/docs/source/references/config.md +++ b/docs/source/references/config.md @@ -56,8 +56,7 @@ The following is the default configuration used by Iroha. "API_URL": null, "MAX_TRANSACTION_SIZE": 32768, "MAX_CONTENT_LEN": 16384000, - "FETCH_SIZE": 10, - "QUERY_IDLE_TIME_MS": 30000 + "FETCH_SIZE": 10 }, "BLOCK_SYNC": { "GOSSIP_PERIOD_MS": 10000, @@ -125,6 +124,9 @@ The following is the default configuration used by Iroha. "CREATE_EVERY_MS": 60000, "DIR_PATH": "./storage", "CREATION_ENABLED": true + }, + "LIVE_QUERY_STORE": { + "QUERY_IDLE_TIME_MS": 30000 } } ``` @@ -282,6 +284,28 @@ Has type `Option`[^1]. Can be configured via environment variable `KURA_IN "strict" ``` +## `live_query_store` + +LiveQueryStore configuration + +Has type `Option`[^1]. Can be configured via environment variable `IROHA_LIVE_QUERY_STORE` + +```json +{ + "QUERY_IDLE_TIME_MS": 30000 +} +``` + +### `live_query_store.query_idle_time_ms` + +Time query can remain in the store if unaccessed + +Has type `Option`[^1]. Can be configured via environment variable `LIVE_QUERY_STORE_QUERY_IDLE_TIME_MS` + +```json +30000 +``` + ## `logger` `Logger` configuration @@ -677,8 +701,7 @@ Has type `Option`[^1]. Can be configured via environm "FETCH_SIZE": 10, "MAX_CONTENT_LEN": 16384000, "MAX_TRANSACTION_SIZE": 32768, - "P2P_ADDR": null, - "QUERY_IDLE_TIME_MS": 30000 + "P2P_ADDR": null } ``` @@ -732,16 +755,6 @@ Has type `Option`[^1]. Can be configured via environment variable `T null ``` -### `torii.query_idle_time_ms` - -Time query can remain in the store if unaccessed - -Has type `Option`[^1]. Can be configured via environment variable `TORII_QUERY_IDLE_TIME_MS` - -```json -30000 -``` - ## `wsv` `WorldStateView` configuration diff --git a/docs/source/references/schema.json b/docs/source/references/schema.json index f7f2fe68aaf..d60d8ffdefa 100644 --- a/docs/source/references/schema.json +++ b/docs/source/references/schema.json @@ -3663,6 +3663,10 @@ "tag": "Conversion", "discriminant": 3, "type": "String" + }, + { + "tag": "UnknownCursor", + "discriminant": 4 } ] }, diff --git a/schema/gen/src/lib.rs b/schema/gen/src/lib.rs index b89d32ff949..b808576c469 100644 --- a/schema/gen/src/lib.rs +++ b/schema/gen/src/lib.rs @@ -6,8 +6,8 @@ use iroha_crypto::MerkleTree; use iroha_data_model::{ block::stream::{BlockMessage, BlockSubscriptionRequest}, - http::BatchedResponse, query::error::QueryExecutionFail, + BatchedResponse, }; use iroha_genesis::RawGenesisBlock; use iroha_schema::prelude::*; @@ -413,7 +413,6 @@ mod tests { }, domain::NewDomain, executor::Executor, - http::{BatchedResponse, BatchedResponseV1}, ipfs::IpfsPath, predicate::{ ip_addr::{Ipv4Predicate, Ipv6Predicate}, @@ -428,7 +427,7 @@ mod tests { ForwardCursor, }, transaction::{error::TransactionLimitError, SignedTransactionV1, TransactionLimits}, - SignedBlockWrapper, + BatchedResponse, BatchedResponseV1, SignedBlockWrapper, }; use iroha_genesis::RawGenesisBlock; use iroha_primitives::{ diff --git a/smart_contract/Cargo.toml b/smart_contract/Cargo.toml index 64ba4666156..b0d9950600f 100644 --- a/smart_contract/Cargo.toml +++ b/smart_contract/Cargo.toml @@ -16,10 +16,12 @@ debug = [] [dependencies] iroha_data_model.workspace = true +iroha_macro.workspace = true iroha_smart_contract_utils.workspace = true iroha_smart_contract_derive.workspace = true parity-scale-codec.workspace = true +derive_more.workspace = true [dev-dependencies] webassembly-test = "0.1.0" diff --git a/smart_contract/derive/src/entrypoint.rs b/smart_contract/derive/src/entrypoint.rs index 9ed630b7b1d..e2ddb99e0e6 100644 --- a/smart_contract/derive/src/entrypoint.rs +++ b/smart_contract/derive/src/entrypoint.rs @@ -29,7 +29,9 @@ pub fn impl_entrypoint(_attr: TokenStream, item: TokenStream) -> TokenStream { block.stmts.insert( 0, parse_quote!( - use ::iroha_wasm::{debug::DebugExpectExt as _, ExecuteOnHost as _, QueryHost as _}; + use ::iroha_wasm::{ + debug::DebugExpectExt as _, ExecuteOnHost as _, ExecuteQueryOnHost as _, + }; ), ); diff --git a/smart_contract/executor/derive/src/entrypoint.rs b/smart_contract/executor/derive/src/entrypoint.rs index d89414f4e23..a518fcbd7a2 100644 --- a/smart_contract/executor/derive/src/entrypoint.rs +++ b/smart_contract/executor/derive/src/entrypoint.rs @@ -88,7 +88,7 @@ fn impl_validate_entrypoint( block.stmts.insert( 0, parse_quote!( - use ::iroha_executor::smart_contract::{ExecuteOnHost as _, QueryHost as _}; + use ::iroha_executor::smart_contract::{ExecuteOnHost as _, ExecuteQueryOnHost as _}; ), ); diff --git a/smart_contract/executor/derive/src/token.rs b/smart_contract/executor/derive/src/token.rs index 97b1b3b4edb..6851ee53372 100644 --- a/smart_contract/executor/derive/src/token.rs +++ b/smart_contract/executor/derive/src/token.rs @@ -26,17 +26,21 @@ fn impl_token(ident: &syn::Ident, generics: &syn::Generics) -> proc_macro2::Toke quote! { impl #impl_generics ::iroha_executor::permission::Token for #ident #ty_generics #where_clause { fn is_owned_by(&self, account_id: &::iroha_executor::data_model::account::AccountId) -> bool { - let all_account_tokens: Vec = ::iroha_executor::smart_contract::debug::DebugExpectExt::dbg_expect( - ::iroha_executor::smart_contract::QueryHost::execute( - &::iroha_executor::data_model::query::permission::FindPermissionTokensByAccountId::new( + let account_tokens_cursor = ::iroha_executor::smart_contract::debug::DebugExpectExt::dbg_expect( + ::iroha_executor::smart_contract::ExecuteQueryOnHost::execute( + ::iroha_executor::data_model::query::permission::FindPermissionTokensByAccountId::new( account_id.clone(), ) ), "Failed to execute `FindPermissionTokensByAccountId` query" - ).try_into().unwrap(); + ); - all_account_tokens + account_tokens_cursor .into_iter() + .map(|res| ::iroha_executor::smart_contract::debug::DebugExpectExt::dbg_expect( + res, + "Failed to get permission token from cursor" + )) .filter_map(|token| Self::try_from(token).ok()) .any(|token| self == &token) } diff --git a/smart_contract/executor/src/default.rs b/smart_contract/executor/src/default.rs index 816e9e58ab3..784a50fb2b9 100644 --- a/smart_contract/executor/src/default.rs +++ b/smart_contract/executor/src/default.rs @@ -1326,7 +1326,7 @@ pub mod role { let role_id = $isi.object; let find_role_query_res = match FindRoleByRoleId::new(role_id).execute() { - Ok(res) => res, + Ok(res) => res.into_inner(), Err(error) => { deny!($executor, error); } diff --git a/smart_contract/executor/src/permission.rs b/smart_contract/executor/src/permission.rs index a0f60a6c35e..5a4b0f45ebf 100644 --- a/smart_contract/executor/src/permission.rs +++ b/smart_contract/executor/src/permission.rs @@ -3,6 +3,7 @@ use alloc::borrow::ToOwned as _; use iroha_schema::IntoSchema; +use iroha_smart_contract::QueryOutputCursor; use iroha_smart_contract_utils::debug::DebugExpectExt as _; use serde::{de::DeserializeOwned, Serialize}; @@ -139,8 +140,9 @@ pub mod asset_definition { asset_definition_id: &AssetDefinitionId, authority: &AccountId, ) -> Result { - let asset_definition = - FindAssetDefinitionById::new(asset_definition_id.clone()).execute()?; + let asset_definition = FindAssetDefinitionById::new(asset_definition_id.clone()) + .execute() + .map(QueryOutputCursor::into_inner)?; if asset_definition.owned_by() == authority { Ok(true) } else { @@ -224,7 +226,9 @@ pub mod trigger { /// - `FindTrigger` fails /// - `is_domain_owner` fails pub fn is_trigger_owner(trigger_id: &TriggerId, authority: &AccountId) -> Result { - let trigger = FindTriggerById::new(trigger_id.clone()).execute()?; + let trigger = FindTriggerById::new(trigger_id.clone()) + .execute() + .map(QueryOutputCursor::into_inner)?; if trigger.action().authority() == authority { Ok(true) } else { @@ -268,6 +272,7 @@ pub mod domain { pub fn is_domain_owner(domain_id: &DomainId, authority: &AccountId) -> Result { FindDomainById::new(domain_id.clone()) .execute() + .map(QueryOutputCursor::into_inner) .map(|domain| domain.owned_by() == authority) } diff --git a/smart_contract/src/lib.rs b/smart_contract/src/lib.rs index 62e0a570e77..e481cf04f63 100644 --- a/smart_contract/src/lib.rs +++ b/smart_contract/src/lib.rs @@ -4,16 +4,22 @@ extern crate alloc; -use alloc::{boxed::Box, collections::BTreeMap}; +use alloc::{boxed::Box, collections::BTreeMap, vec::Vec}; #[cfg(not(test))] use data_model::smart_contract::payloads; use data_model::{ isi::Instruction, prelude::*, - query::{Query, QueryBox}, + query::{ + cursor::ForwardCursor, sorting::Sorting, Pagination, Query, QueryBox, QueryWithParameters, + }, + smart_contract::SmartContractQueryRequest, + BatchedResponse, }; +use derive_more::Display; pub use iroha_data_model as data_model; +use iroha_macro::error::ErrorTryFromEnum; pub use iroha_smart_contract_derive::main; pub use iroha_smart_contract_utils::{debug, log}; use iroha_smart_contract_utils::{ @@ -50,17 +56,6 @@ pub trait ExecuteOnHost: Instruction { fn execute(&self) -> Result<(), ValidationFail>; } -/// Implementing queries can be executed on the host -pub trait QueryHost: Query { - /// Execute query on the host - /// - /// # Errors - /// - /// - If query validation failed - /// - If query execution failed - fn execute(&self) -> Result; -} - // TODO: Remove the Clone bound. It can be done by custom serialization to InstructionExpr impl ExecuteOnHost for I { fn execute(&self) -> Result<(), ValidationFail> { @@ -81,30 +76,260 @@ impl ExecuteOnHost for I { } } -// TODO: Remove the Clone bound. It can be done by custom serialization/deserialization to QueryBox -impl + Encode + Clone> QueryHost for Q +/// Generic query request containing additional parameters. +#[derive(Debug)] +pub struct QueryRequest { + query: Q, + sorting: Sorting, + pagination: Pagination, +} + +impl From> for SmartContractQueryRequest { + fn from(query_request: QueryRequest) -> Self { + SmartContractQueryRequest::Query(QueryWithParameters::new( + query_request.query.into(), + query_request.sorting, + query_request.pagination, + )) + } +} + +/// Implementing queries can be executed on the host +/// +/// TODO: `&self` should be enough +pub trait ExecuteQueryOnHost: Sized { + /// Query output type. + type Output; + + /// Type of [`QueryRequest`]. + type QueryRequest; + + /// Apply sorting to a query + fn sort(self, sorting: Sorting) -> Self::QueryRequest; + + /// Apply pagination to a query + fn paginate(self, pagination: Pagination) -> Self::QueryRequest; + + /// Execute query on the host + /// + /// # Errors + /// + /// - If query validation failed + /// - If query execution failed + fn execute(self) -> Result, ValidationFail>; +} + +impl ExecuteQueryOnHost for Q +where + Q::Output: DecodeAll, + >::Error: core::fmt::Debug, +{ + type Output = Q::Output; + type QueryRequest = QueryRequest; + + fn sort(self, sorting: Sorting) -> Self::QueryRequest { + QueryRequest { + query: self, + sorting, + pagination: Pagination::default(), + } + } + + fn paginate(self, pagination: Pagination) -> Self::QueryRequest { + QueryRequest { + query: self, + sorting: Sorting::default(), + pagination, + } + } + + fn execute(self) -> Result, ValidationFail> { + QueryRequest { + query: self, + sorting: Sorting::default(), + pagination: Pagination::default(), + } + .execute() + } +} + +impl ExecuteQueryOnHost for QueryRequest where Q::Output: DecodeAll, >::Error: core::fmt::Debug, { - fn execute(&self) -> Result { + type Output = Q::Output; + type QueryRequest = Self; + + fn sort(mut self, sorting: Sorting) -> Self { + self.sorting = sorting; + self + } + + fn paginate(mut self, pagination: Pagination) -> Self { + self.pagination = pagination; + self + } + + #[allow(irrefutable_let_patterns)] + fn execute(self) -> Result, ValidationFail> { + #[cfg(not(test))] + use host::execute_query as host_execute_query; + #[cfg(test)] + use tests::_iroha_smart_contract_execute_query_mock as host_execute_query; + + let wasm_query_request = SmartContractQueryRequest::from(self); + + // Safety: - `host_execute_query` doesn't take ownership of it's pointer parameter + // - ownership of the returned result is transferred into `_decode_from_raw` + let res: Result, ValidationFail> = unsafe { + decode_with_length_prefix_from_raw(encode_and_execute( + &wasm_query_request, + host_execute_query, + )) + }; + let BatchedResponse::V1(response) = res? else { + panic!("Unsupported response version") + }; + let (value, cursor) = response.into(); + let typed_value = Self::Output::try_from(value).expect("Query output has incorrect type"); + Ok(QueryOutputCursor { + batch: typed_value, + cursor, + }) + } +} + +/// Cursor over query results implementing [`IntoIterator`]. +/// +/// If you execute [`QueryBox`] when you probably want to use [`collect()`](Self::collect) method +/// instead of [`into_iter()`](Self::into_iter) to ensure that all results vere consumed. +#[derive(Debug, Encode, PartialEq, Eq)] +pub struct QueryOutputCursor { + batch: T, + cursor: ForwardCursor, +} + +impl QueryOutputCursor { + /// Get inner value consuming [`Self`]. + pub fn into_inner(self) -> T { + self.batch + } +} + +impl QueryOutputCursor { + /// Same as [`into_inner()`](Self::into_inner) but collects all values of [`Value::Vec`] + /// in case if there are some cached results left on the host side. + /// + /// # Errors + /// + /// May fail due to the same reasons [`QueryOutputCursorIterator`] can fail to iterate. + pub fn collect(self) -> Result>> { + let Value::Vec(v) = self.batch else { + return Ok(self.batch) + }; + + // Making sure we received all values + let cursor = QueryOutputCursor { + batch: v, + cursor: self.cursor, + }; + cursor + .into_iter() + .collect::, _>>() + .map(Value::Vec) + } +} + +impl> IntoIterator for QueryOutputCursor> { + type Item = Result>>; + type IntoIter = QueryOutputCursorIterator; + + fn into_iter(self) -> Self::IntoIter { + QueryOutputCursorIterator { + iter: self.batch.into_iter(), + cursor: self.cursor, + } + } +} + +/// Iterator over query results. +/// +/// # Errors +/// +/// Iteration may fail due to the following reasons: +/// +/// - Failed to get next batch of results from the host +/// - Failed to convert batch of results into the requested type +/// +/// # Panics +/// +/// Panics if response from host is not [`BatchedResponse::V1`]. +pub struct QueryOutputCursorIterator { + iter: as IntoIterator>::IntoIter, + cursor: ForwardCursor, +} + +impl> QueryOutputCursorIterator { + #[allow(irrefutable_let_patterns)] + fn next_batch(&self) -> Result>> { #[cfg(not(test))] use host::execute_query as host_execute_query; #[cfg(test)] use tests::_iroha_smart_contract_execute_query_mock as host_execute_query; - // TODO: Redundant conversion into `QueryBox` - let query_box: QueryBox = self.clone().into(); + let wasm_query_request = SmartContractQueryRequest::Cursor(self.cursor.clone()); + // Safety: - `host_execute_query` doesn't take ownership of it's pointer parameter // - ownership of the returned result is transferred into `_decode_from_raw` - let res: Result = unsafe { - decode_with_length_prefix_from_raw(encode_and_execute(&query_box, host_execute_query)) + let res: Result, ValidationFail> = unsafe { + decode_with_length_prefix_from_raw(encode_and_execute( + &wasm_query_request, + host_execute_query, + )) + }; + let BatchedResponse::V1(response) = res? else { + panic!("Unsupported response version") + }; + let (value, cursor) = response.into(); + let vec = Vec::::try_from(value)?; + Ok(Self { + iter: vec.into_iter(), + cursor, + }) + } +} + +impl> Iterator for QueryOutputCursorIterator { + type Item = Result>>; + + fn next(&mut self) -> Option { + if let Some(item) = self.iter.next() { + return Some(Ok(item)); + } + + let mut next_iter = match self.next_batch() { + Ok(next_iter) => next_iter, + Err(QueryOutputCursorError::Validation(ValidationFail::QueryFailed( + iroha_data_model::query::error::QueryExecutionFail::UnknownCursor, + ))) => return None, + Err(err) => return Some(Err(err)), }; - res.map(|value| value.try_into().expect("Query returned invalid type")) + core::mem::swap(self, &mut next_iter); + self.iter.next().map(Ok) } } +/// Error iterating other query results. +#[derive(Debug, Display, iroha_macro::FromVariant)] +pub enum QueryOutputCursorError { + /// Validation error on the host side during next batch retrieval. + Validation(ValidationFail), + /// Host returned unexpected output type. + Conversion(ErrorTryFromEnum), +} + /// World state view of the host #[derive(Debug, Clone, Copy)] pub struct Host; @@ -136,7 +361,14 @@ impl Context { impl iroha_data_model::evaluate::Context for Context { fn query(&self, query: &QueryBox) -> Result { - query.execute() + let value_cursor = query.clone().execute()?; + match value_cursor.collect() { + Ok(value) => Ok(value), + Err(QueryOutputCursorError::Validation(err)) => Err(err), + Err(QueryOutputCursorError::Conversion(err)) => { + panic!("Conversion error during collecting query result: {err:?}") + } + } } fn get(&self, name: &Name) -> Option<&Value> { @@ -188,7 +420,7 @@ mod host { /// Most used items pub mod prelude { - pub use crate::{ExecuteOnHost, QueryHost}; + pub use crate::{ExecuteOnHost, ExecuteQueryOnHost}; } #[cfg(test)] @@ -198,13 +430,16 @@ mod tests { use core::{mem::ManuallyDrop, slice}; + use data_model::{query::asset::FindAssetQuantityById, BatchedResponseV1}; use iroha_smart_contract_utils::encode_with_length_prefix; use webassembly_test::webassembly_test; use super::*; - const QUERY_RESULT: Result = - Ok(Value::Numeric(NumericValue::U32(1234_u32))); + const QUERY_RESULT: Result, ValidationFail> = Ok(QueryOutputCursor { + batch: Value::Numeric(NumericValue::U32(1234_u32)), + cursor: ForwardCursor::new(None, None), + }); const ISI_RESULT: Result<(), ValidationFail> = Ok(()); const EXPRESSION_RESULT: NumericValue = NumericValue::U32(5_u32); @@ -216,8 +451,8 @@ mod tests { } fn get_test_query() -> QueryBox { - let account_id: AccountId = "alice@wonderland".parse().expect("Valid"); - FindAccountById::new(account_id).into() + let asset_id: AssetId = "rose##alice@wonderland".parse().expect("Valid"); + FindAssetQuantityById::new(asset_id).into() } fn get_test_expression() -> EvaluatesTo { @@ -242,10 +477,18 @@ mod tests { len: usize, ) -> *const u8 { let bytes = slice::from_raw_parts(ptr, len); - let query = QueryBox::decode_all(&mut &*bytes).unwrap(); - assert_eq!(query, get_test_query()); - - ManuallyDrop::new(encode_with_length_prefix(&QUERY_RESULT)).as_ptr() + let query = SmartContractQueryRequest::decode_all(&mut &*bytes).unwrap(); + let SmartContractQueryRequest::Query(query_with_parameters) = query else { + panic!("Expected query, not a cursor"); + }; + assert_eq!(query_with_parameters.query(), &get_test_query()); + + let response: Result, ValidationFail> = Ok(BatchedResponseV1::new( + QUERY_RESULT.unwrap().into_inner(), + ForwardCursor::new(None, None), + ) + .into()); + ManuallyDrop::new(encode_with_length_prefix(&response)).as_ptr() } #[webassembly_test] diff --git a/smart_contract/trigger/derive/src/entrypoint.rs b/smart_contract/trigger/derive/src/entrypoint.rs index 71387f19be9..5fe5d822f71 100644 --- a/smart_contract/trigger/derive/src/entrypoint.rs +++ b/smart_contract/trigger/derive/src/entrypoint.rs @@ -27,7 +27,7 @@ pub fn impl_entrypoint(_attr: TokenStream, item: TokenStream) -> TokenStream { 0, parse_quote!( use ::iroha_trigger::smart_contract::{ - debug::DebugExpectExt as _, ExecuteOnHost as _, QueryHost as _, + debug::DebugExpectExt as _, ExecuteOnHost as _, ExecuteQueryOnHost as _, }; ), ); diff --git a/telemetry/derive/tests/ui_fail/args_no_wsv.rs b/telemetry/derive/tests/ui_fail/args_no_wsv.rs index 388231c5a8b..d85e46478e6 100644 --- a/telemetry/derive/tests/ui_fail/args_no_wsv.rs +++ b/telemetry/derive/tests/ui_fail/args_no_wsv.rs @@ -1,4 +1,3 @@ -use iroha_core::wsv::WorldStateView; use iroha_telemetry_derive::metrics; #[metrics(+"test_query", "another_test_query_without_timing")] @@ -7,6 +6,5 @@ fn execute(_wsv: &World) -> Result<(), ()> { } fn main() { - let kura = iroha_core::kura::Kura::blank_kura_for_testing(); - let _world = WorldStateView::new(iroha_core::prelude::World::default(), kura); + } diff --git a/telemetry/derive/tests/ui_fail/args_no_wsv.stderr b/telemetry/derive/tests/ui_fail/args_no_wsv.stderr index 4e229217d34..4aa2e1da1c3 100644 --- a/telemetry/derive/tests/ui_fail/args_no_wsv.stderr +++ b/telemetry/derive/tests/ui_fail/args_no_wsv.stderr @@ -1,5 +1,5 @@ error: At least one argument must be a `WorldStateView`. - --> tests/ui_fail/args_no_wsv.rs:5:12 + --> tests/ui_fail/args_no_wsv.rs:4:12 | -5 | fn execute(_wsv: &World) -> Result<(), ()> { +4 | fn execute(_wsv: &World) -> Result<(), ()> { | ^^^^^^^^^^^^ diff --git a/telemetry/derive/tests/ui_fail/bare_spec.rs b/telemetry/derive/tests/ui_fail/bare_spec.rs index 5ea32d7fb4e..bb6029fddf1 100644 --- a/telemetry/derive/tests/ui_fail/bare_spec.rs +++ b/telemetry/derive/tests/ui_fail/bare_spec.rs @@ -1,4 +1,3 @@ -use iroha_core::wsv::WorldStateView; use iroha_telemetry_derive::metrics; #[metrics(test_query, "another_test_query_without_timing")] @@ -7,6 +6,4 @@ fn execute(wsv: &WorldStateView) -> Result<(), ()> { } fn main() { - let kura = iroha_core::kura::Kura::blank_kura_for_testing(); - let _world = WorldStateView::new(iroha_core::prelude::World::default(), kura); } diff --git a/telemetry/derive/tests/ui_fail/bare_spec.stderr b/telemetry/derive/tests/ui_fail/bare_spec.stderr index 72c5f11a848..0bb06d93ac6 100644 --- a/telemetry/derive/tests/ui_fail/bare_spec.stderr +++ b/telemetry/derive/tests/ui_fail/bare_spec.stderr @@ -1,5 +1,5 @@ error: expected literal - --> tests/ui_fail/bare_spec.rs:4:11 + --> tests/ui_fail/bare_spec.rs:3:11 | -4 | #[metrics(test_query, "another_test_query_without_timing")] +3 | #[metrics(test_query, "another_test_query_without_timing")] | ^^^^^^^^^^ diff --git a/telemetry/derive/tests/ui_fail/doubled_plus.rs b/telemetry/derive/tests/ui_fail/doubled_plus.rs index 58fd7eae068..61db9e0dda1 100644 --- a/telemetry/derive/tests/ui_fail/doubled_plus.rs +++ b/telemetry/derive/tests/ui_fail/doubled_plus.rs @@ -1,4 +1,3 @@ -use iroha_core::wsv::WorldStateView; use iroha_telemetry_derive::metrics; #[metrics(+"test_query", ++"another_test_query_without_timing")] @@ -7,6 +6,5 @@ fn execute(wsv: &WorldStateView) -> Result<(), ()> { } fn main() { - let kura = iroha_core::kura::Kura::blank_kura_for_testing(); - let _world = WorldStateView::new(iroha_core::prelude::World::default(), kura); + } diff --git a/telemetry/derive/tests/ui_fail/doubled_plus.stderr b/telemetry/derive/tests/ui_fail/doubled_plus.stderr index becb8bf32c5..751d4f27b17 100644 --- a/telemetry/derive/tests/ui_fail/doubled_plus.stderr +++ b/telemetry/derive/tests/ui_fail/doubled_plus.stderr @@ -1,5 +1,5 @@ error: expected literal - --> tests/ui_fail/doubled_plus.rs:4:27 + --> tests/ui_fail/doubled_plus.rs:3:27 | -4 | #[metrics(+"test_query", ++"another_test_query_without_timing")] +3 | #[metrics(+"test_query", ++"another_test_query_without_timing")] | ^ diff --git a/telemetry/derive/tests/ui_fail/no_args.rs b/telemetry/derive/tests/ui_fail/no_args.rs index 00a0f60dd1f..73c27db3bab 100644 --- a/telemetry/derive/tests/ui_fail/no_args.rs +++ b/telemetry/derive/tests/ui_fail/no_args.rs @@ -1,4 +1,3 @@ -use iroha_core::wsv::WorldStateView; use iroha_telemetry_derive::metrics; #[metrics(+"test_query", "another_test_query_without_timing")] @@ -7,6 +6,4 @@ fn execute() -> Result<(), ()> { } fn main() { - let kura = iroha_core::kura::Kura::blank_kura_for_testing(); - let _world = WorldStateView::new(iroha_core::prelude::World::default(), kura); } diff --git a/telemetry/derive/tests/ui_fail/no_args.stderr b/telemetry/derive/tests/ui_fail/no_args.stderr index 5b1e88c34d2..bf2d6e9b557 100644 --- a/telemetry/derive/tests/ui_fail/no_args.stderr +++ b/telemetry/derive/tests/ui_fail/no_args.stderr @@ -1,5 +1,5 @@ error: Function must have at least one argument of type `WorldStateView`. - --> tests/ui_fail/no_args.rs:5:1 + --> tests/ui_fail/no_args.rs:4:1 | -5 | fn execute() -> Result<(), ()> { +4 | fn execute() -> Result<(), ()> { | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ diff --git a/telemetry/derive/tests/ui_fail/non_snake_case_name.rs b/telemetry/derive/tests/ui_fail/non_snake_case_name.rs index 065cf621c56..97c83ab152f 100644 --- a/telemetry/derive/tests/ui_fail/non_snake_case_name.rs +++ b/telemetry/derive/tests/ui_fail/non_snake_case_name.rs @@ -1,5 +1,3 @@ -#![allow(unused_imports)] // Unused because macro will no generate anything -use iroha_core::wsv::WorldStateView; use iroha_telemetry_derive::metrics; #[metrics(+"test query", "another_test_query_without_timing")] diff --git a/telemetry/derive/tests/ui_fail/non_snake_case_name.stderr b/telemetry/derive/tests/ui_fail/non_snake_case_name.stderr index 2b2d9f0ee51..6bb8fe44028 100644 --- a/telemetry/derive/tests/ui_fail/non_snake_case_name.stderr +++ b/telemetry/derive/tests/ui_fail/non_snake_case_name.stderr @@ -1,7 +1,7 @@ error: Spaces are not allowed. Use underscores '_' - --> tests/ui_fail/non_snake_case_name.rs:5:1 + --> tests/ui_fail/non_snake_case_name.rs:3:1 | -5 | #[metrics(+"test query", "another_test_query_without_timing")] +3 | #[metrics(+"test query", "another_test_query_without_timing")] | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ | = note: this error originates in the attribute macro `metrics` (in Nightly builds, run with -Z macro-backtrace for more info) diff --git a/telemetry/derive/tests/ui_fail/not_execute.rs b/telemetry/derive/tests/ui_fail/not_execute.rs index 4d300ed88fa..7a63c17d08f 100644 --- a/telemetry/derive/tests/ui_fail/not_execute.rs +++ b/telemetry/derive/tests/ui_fail/not_execute.rs @@ -1,4 +1,3 @@ -use iroha_core::wsv::{World, WorldStateView}; use iroha_telemetry_derive::metrics; #[metrics(+"test_query", "another_test_query_without_timing")] @@ -7,7 +6,5 @@ fn exequte(wsv: &WorldStateView) -> Result<(), ()> { } fn main() { - let kura = iroha_core::kura::Kura::blank_kura_for_testing(); - let _something: World = World::default(); - let _world = WorldStateView::new(_something, kura); + } diff --git a/telemetry/derive/tests/ui_fail/not_execute.stderr b/telemetry/derive/tests/ui_fail/not_execute.stderr index 4146a4ab62b..7f4ab728dae 100644 --- a/telemetry/derive/tests/ui_fail/not_execute.stderr +++ b/telemetry/derive/tests/ui_fail/not_execute.stderr @@ -1,5 +1,5 @@ error: Function should be an `impl execute` - --> tests/ui_fail/not_execute.rs:5:4 + --> tests/ui_fail/not_execute.rs:4:4 | -5 | fn exequte(wsv: &WorldStateView) -> Result<(), ()> { +4 | fn exequte(wsv: &WorldStateView) -> Result<(), ()> { | ^^^^^^^ diff --git a/telemetry/derive/tests/ui_fail/not_return_result.rs b/telemetry/derive/tests/ui_fail/not_return_result.rs index 18fbf19d0ca..ca779d8e5ec 100644 --- a/telemetry/derive/tests/ui_fail/not_return_result.rs +++ b/telemetry/derive/tests/ui_fail/not_return_result.rs @@ -1,4 +1,3 @@ -use iroha_core::wsv::{World, WorldStateView}; use iroha_telemetry_derive::metrics; #[metrics(+"test_query", "another_test_query_without_timing")] @@ -7,8 +6,6 @@ fn execute(_wsv: &WorldStateView) -> iroha_core::RESULT { } fn main() { - let kura = iroha_core::kura::Kura::blank_kura_for_testing(); - let _something: World = World::default(); - let _world = WorldStateView::new(_something, kura); + } diff --git a/telemetry/derive/tests/ui_fail/not_return_result.stderr b/telemetry/derive/tests/ui_fail/not_return_result.stderr index 37ef12869a2..6652f72014d 100644 --- a/telemetry/derive/tests/ui_fail/not_return_result.stderr +++ b/telemetry/derive/tests/ui_fail/not_return_result.stderr @@ -1,5 +1,5 @@ error: Should return `Result`. Found RESULT - --> tests/ui_fail/not_return_result.rs:5:50 + --> tests/ui_fail/not_return_result.rs:4:50 | -5 | fn execute(_wsv: &WorldStateView) -> iroha_core::RESULT { +4 | fn execute(_wsv: &WorldStateView) -> iroha_core::RESULT { | ^^^^^^ diff --git a/telemetry/derive/tests/ui_fail/return_nothing.rs b/telemetry/derive/tests/ui_fail/return_nothing.rs index 759960622a6..419325ac0ba 100644 --- a/telemetry/derive/tests/ui_fail/return_nothing.rs +++ b/telemetry/derive/tests/ui_fail/return_nothing.rs @@ -1,4 +1,3 @@ -use iroha_core::wsv::WorldStateView; use iroha_telemetry_derive::metrics; #[metrics(+"test_query", "another_test_query_without_timing")] @@ -7,7 +6,5 @@ fn execute(wsv: &WorldStateView) { } fn main() { - let kura = iroha_core::kura::Kura::blank_kura_for_testing(); - let _something: iroha_core::wsv::World = iroha_core::wsv::World::default(); - let _world = WorldStateView::new(_something, kura); + } diff --git a/telemetry/derive/tests/ui_fail/return_nothing.stderr b/telemetry/derive/tests/ui_fail/return_nothing.stderr index f92d23ee205..93385e20c54 100644 --- a/telemetry/derive/tests/ui_fail/return_nothing.stderr +++ b/telemetry/derive/tests/ui_fail/return_nothing.stderr @@ -1,7 +1,7 @@ error: `Fn` must return `Result`. Returns nothing instead. - --> tests/ui_fail/return_nothing.rs:4:1 + --> tests/ui_fail/return_nothing.rs:3:1 | -4 | #[metrics(+"test_query", "another_test_query_without_timing")] +3 | #[metrics(+"test_query", "another_test_query_without_timing")] | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ | = note: this error originates in the attribute macro `metrics` (in Nightly builds, run with -Z macro-backtrace for more info) diff --git a/telemetry/derive/tests/ui_fail/trailing_plus.rs b/telemetry/derive/tests/ui_fail/trailing_plus.rs index c692d625ae1..3034f0c7f1d 100644 --- a/telemetry/derive/tests/ui_fail/trailing_plus.rs +++ b/telemetry/derive/tests/ui_fail/trailing_plus.rs @@ -1,4 +1,3 @@ -use iroha_core::wsv::WorldStateView; use iroha_telemetry_derive::metrics; #[metrics(+"test_query", "another_test_query_without_timing"+)] @@ -7,7 +6,5 @@ fn execute(wsv: &WorldStateView) -> Result<(), ()> { } fn main() { - let kura = iroha_core::kura::Kura::blank_kura_for_testing(); - let _something: iroha_core::wsv::World = iroha_core::wsv::World::default(); - let _world = WorldStateView::new(_something, kura); + } diff --git a/telemetry/derive/tests/ui_fail/trailing_plus.stderr b/telemetry/derive/tests/ui_fail/trailing_plus.stderr index f613692ad1c..6378d3ff10d 100644 --- a/telemetry/derive/tests/ui_fail/trailing_plus.stderr +++ b/telemetry/derive/tests/ui_fail/trailing_plus.stderr @@ -1,5 +1,5 @@ error: expected `,` - --> tests/ui_fail/trailing_plus.rs:4:61 + --> tests/ui_fail/trailing_plus.rs:3:61 | -4 | #[metrics(+"test_query", "another_test_query_without_timing"+)] +3 | #[metrics(+"test_query", "another_test_query_without_timing"+)] | ^ diff --git a/tools/parity_scale_decoder/src/main.rs b/tools/parity_scale_decoder/src/main.rs index 2f3d3a6f220..f821730be72 100644 --- a/tools/parity_scale_decoder/src/main.rs +++ b/tools/parity_scale_decoder/src/main.rs @@ -29,7 +29,6 @@ use iroha_data_model::{ }, domain::NewDomain, executor::Executor, - http::{BatchedResponse, BatchedResponseV1}, ipfs::IpfsPath, predicate::{ ip_addr::{Ipv4Predicate, Ipv6Predicate}, @@ -44,7 +43,7 @@ use iroha_data_model::{ ForwardCursor, }, transaction::{error::TransactionLimitError, SignedTransactionV1, TransactionLimits}, - SignedBlockWrapper, + BatchedResponse, BatchedResponseV1, SignedBlockWrapper, }; use iroha_primitives::{ addr::{Ipv4Addr, Ipv6Addr},