Skip to content

Commit

Permalink
[refactor] hyperledger-iroha#3982: Clear live queries after smart con…
Browse files Browse the repository at this point in the history
…tract end

Signed-off-by: Daniil Polyakov <[email protected]>
  • Loading branch information
Arjentix committed Nov 7, 2023
1 parent c3adf25 commit e7b539a
Show file tree
Hide file tree
Showing 4 changed files with 401 additions and 385 deletions.
30 changes: 22 additions & 8 deletions core/src/query/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use iroha_data_model::{
asset::AssetValue,
query::{
cursor::ForwardCursor, error::QueryExecutionFail, pagination::Pagination, sorting::Sorting,
QueryId,
},
BatchedResponse, BatchedResponseV1, HasMetadata, IdentifiableBox, ValidationFail, Value,
};
Expand Down Expand Up @@ -60,7 +61,7 @@ type LiveQuery = Batched<Vec<Value>>;
/// Clients can handle their queries using [`LiveQueryStoreHandle`]
#[derive(Debug)]
pub struct LiveQueryStore {
queries: HashMap<String, (LiveQuery, Instant)>,
queries: HashMap<QueryId, (LiveQuery, Instant)>,
query_idle_time: Duration,
}

Expand Down Expand Up @@ -96,7 +97,8 @@ impl LiveQueryStore {
"All handler to LiveQueryStore are dropped. Shutting down...";

let (insert_sender, mut insert_receiver) = mpsc::channel(1);
let (remove_sender, mut remove_receiver) = mpsc::channel::<(String, oneshot::Sender<_>)>(1);
let (remove_sender, mut remove_receiver) =
mpsc::channel::<(QueryId, oneshot::Sender<_>)>(1);

let mut idle_interval = tokio::time::interval(self.query_idle_time);

Expand Down Expand Up @@ -134,7 +136,7 @@ impl LiveQueryStore {
}
}

fn insert(&mut self, query_id: String, live_query: LiveQuery) {
fn insert(&mut self, query_id: QueryId, live_query: LiveQuery) {
self.queries.insert(query_id, (live_query, Instant::now()));
}

Expand All @@ -147,10 +149,10 @@ impl LiveQueryStore {
#[derive(Clone)]
pub struct LiveQueryStoreHandle {
/// Sender to insert a new query with specified id.
insert_sender: mpsc::Sender<(String, LiveQuery)>,
insert_sender: mpsc::Sender<(QueryId, LiveQuery)>,
/// Sender to send a tuple of query id and another sender, which will be
/// used by [`LiveQueryStore`] to write a response with optional live query.
remove_sender: mpsc::Sender<(String, oneshot::Sender<Option<LiveQuery>>)>,
remove_sender: mpsc::Sender<(QueryId, oneshot::Sender<Option<LiveQuery>>)>,
}

impl LiveQueryStoreHandle {
Expand Down Expand Up @@ -197,13 +199,25 @@ impl LiveQueryStoreHandle {
self.construct_query_response(query_id, cursor.cursor.map(NonZeroU64::get), live_query)
}

fn insert(&self, query_id: String, live_query: LiveQuery) -> Result<()> {
/// Remove query from the storage if there is any.
///
/// Returns `true` if query was removed, `false` otherwise.
///
/// # Errors
///
/// - Returns [`Error::ConnectionClosed`] if [`QueryService`] is dropped,
/// - Otherwise throws up query output handling errors.
pub fn drop_query(&self, query_id: QueryId) -> Result<bool> {
self.remove(query_id).map(|query_opt| query_opt.is_some())
}

fn insert(&self, query_id: QueryId, live_query: LiveQuery) -> Result<()> {
self.insert_sender
.blocking_send((query_id, live_query))
.map_err(|_| Error::ConnectionClosed)
}

fn remove(&self, query_id: String) -> Result<Option<LiveQuery>> {
fn remove(&self, query_id: QueryId) -> Result<Option<LiveQuery>> {
let (sender, receiver) = oneshot::channel();

self.remove_sender
Expand All @@ -215,7 +229,7 @@ impl LiveQueryStoreHandle {

fn construct_query_response(
&self,
query_id: String,
query_id: QueryId,
curr_cursor: Option<u64>,
mut live_query: Batched<Vec<Value>>,
) -> Result<BatchedResponse<Value>> {
Expand Down
Loading

0 comments on commit e7b539a

Please sign in to comment.