From 580d1c448668886978b6dba924bcf48aaaada3bb Mon Sep 17 00:00:00 2001 From: Nikita Strygin Date: Tue, 20 Feb 2024 10:29:52 +0300 Subject: [PATCH] [refactor] #1981, #4195: make listen_for_events and friends implicitly convert event filter types Signed-off-by: Nikita Strygin --- client/benches/tps/utils.rs | 3 +-- client/src/client.rs | 17 ++++++++++------- client/tests/integration/events/data.rs | 4 ++-- client/tests/integration/events/notification.rs | 6 ++---- client/tests/integration/events/pipeline.rs | 7 +++---- .../integration/triggers/by_call_trigger.rs | 2 +- .../tests/integration/triggers/time_trigger.rs | 8 +++----- 7 files changed, 22 insertions(+), 25 deletions(-) diff --git a/client/benches/tps/utils.rs b/client/benches/tps/utils.rs index c00aed40439..0f791382e5e 100644 --- a/client/benches/tps/utils.rs +++ b/client/benches/tps/utils.rs @@ -173,8 +173,7 @@ impl MeasurerUnit { let (init_sender, init_receiver) = mpsc::channel(); let event_filter = PipelineEventFilter::new() .entity_kind(PipelineEntityKind::Block) - .status_kind(PipelineStatusKind::Committed) - .into(); + .status_kind(PipelineStatusKind::Committed); let blocks_expected = self.config.blocks as usize; let name = self.name; let handle = thread::spawn(move || -> Result<()> { diff --git a/client/src/client.rs b/client/src/client.rs index 5bf9ce65880..0ca27786689 100644 --- a/client/src/client.rs +++ b/client/src/client.rs @@ -605,9 +605,7 @@ impl Client { let mut event_iterator = { let event_iterator_result = tokio::time::timeout_at( deadline, - self.listen_for_events_async( - PipelineEventFilter::new().hash(hash.into()).into(), - ), + self.listen_for_events_async(PipelineEventFilter::new().hash(hash.into())), ) .await .map_err(Into::into) @@ -905,8 +903,9 @@ impl Client { /// - Forwards from [`events_api::EventIterator::new`] pub fn listen_for_events( &self, - event_filter: EventFilterBox, + event_filter: impl Into, ) -> Result>> { + let event_filter = event_filter.into(); iroha_logger::trace!(?event_filter); events_api::EventIterator::new(self.events_handler(event_filter)?) } @@ -918,8 +917,9 @@ impl Client { /// - Forwards from [`events_api::AsyncEventStream::new`] pub async fn listen_for_events_async( &self, - event_filter: EventFilterBox, + event_filter: impl Into, ) -> Result { + let event_filter = event_filter.into(); iroha_logger::trace!(?event_filter, "Async listening with"); events_api::AsyncEventStream::new(self.events_handler(event_filter)?).await } @@ -929,9 +929,12 @@ impl Client { /// # Errors /// Fails if handler construction fails #[inline] - pub fn events_handler(&self, event_filter: EventFilterBox) -> Result { + pub fn events_handler( + &self, + event_filter: impl Into, + ) -> Result { events_api::flow::Init::new( - event_filter, + event_filter.into(), self.headers.clone(), self.torii_url .join(torii_uri::SUBSCRIPTION) diff --git a/client/tests/integration/events/data.rs b/client/tests/integration/events/data.rs index e1f41807972..3136f369860 100644 --- a/client/tests/integration/events/data.rs +++ b/client/tests/integration/events/data.rs @@ -138,7 +138,7 @@ fn transaction_execution_should_produce_events( let listener = client.clone(); let (init_sender, init_receiver) = mpsc::channel(); let (event_sender, event_receiver) = mpsc::channel(); - let event_filter = EventFilterBox::Data(DataEventFilter::ByAny); + let event_filter = DataEventFilter::ByAny; thread::spawn(move || -> Result<()> { let event_iterator = listener.listen_for_events(event_filter)?; init_sender.send(())?; @@ -182,7 +182,7 @@ fn produce_multiple_events() -> Result<()> { let listener = client.clone(); let (init_sender, init_receiver) = mpsc::channel(); let (event_sender, event_receiver) = mpsc::channel(); - let event_filter = EventFilterBox::Data(DataEventFilter::ByAny); + let event_filter = DataEventFilter::ByAny; thread::spawn(move || -> Result<()> { let event_iterator = listener.listen_for_events(event_filter)?; init_sender.send(())?; diff --git a/client/tests/integration/events/notification.rs b/client/tests/integration/events/notification.rs index 5b3f0e9e7bc..8ca576a6f33 100644 --- a/client/tests/integration/events/notification.rs +++ b/client/tests/integration/events/notification.rs @@ -38,8 +38,7 @@ fn trigger_completion_success_should_produce_event() -> Result<()> { NotificationEventFilter::ByTriggerCompleted(TriggerCompletedEventFilter::new( Some(trigger_id), Some(TriggerCompletedOutcomeType::Success), - )) - .into(), + )), )?; if event_it.next().is_some() { sender.send(())?; @@ -87,8 +86,7 @@ fn trigger_completion_failure_should_produce_event() -> Result<()> { NotificationEventFilter::ByTriggerCompleted(TriggerCompletedEventFilter::new( Some(trigger_id), Some(TriggerCompletedOutcomeType::Failure), - )) - .into(), + )), )?; if event_it.next().is_some() { sender.send(())?; diff --git a/client/tests/integration/events/pipeline.rs b/client/tests/integration/events/pipeline.rs index 292a0f97e95..fc4bce6f2d9 100644 --- a/client/tests/integration/events/pipeline.rs +++ b/client/tests/integration/events/pipeline.rs @@ -82,12 +82,12 @@ impl Checker { thread::spawn(move || { let mut event_iterator = self .listener - .listen_for_events(EventFilterBox::Pipeline( + .listen_for_events( PipelineEventFilter::new() .entity_kind(PipelineEntityKind::Transaction) .status_kind(status_kind) .hash(*self.hash), - )) + ) .expect("Failed to create event iterator."); let event_result = event_iterator.next().expect("Stream closed"); let _event = event_result.expect("Must be valid"); @@ -102,8 +102,7 @@ fn committed_block_must_be_available_in_kura() { let event_filter = PipelineEventFilter::new() .entity_kind(PipelineEntityKind::Block) - .status_kind(PipelineStatusKind::Committed) - .into(); + .status_kind(PipelineStatusKind::Committed); let mut event_iter = client .listen_for_events(event_filter) .expect("Failed to subscribe for events"); diff --git a/client/tests/integration/triggers/by_call_trigger.rs b/client/tests/integration/triggers/by_call_trigger.rs index 7df6f7c1878..7d56c0e59ef 100644 --- a/client/tests/integration/triggers/by_call_trigger.rs +++ b/client/tests/integration/triggers/by_call_trigger.rs @@ -60,7 +60,7 @@ fn execute_trigger_should_produce_event() -> Result<()> { let (sender, receiver) = mpsc::channel(); let _handle = thread::spawn(move || -> Result<()> { let mut event_it = thread_client - .listen_for_events(ExecuteTriggerEventFilter::new(trigger_id, account_id).into())?; + .listen_for_events(ExecuteTriggerEventFilter::new(trigger_id, account_id))?; if event_it.next().is_some() { sender.send(())?; return Ok(()); diff --git a/client/tests/integration/triggers/time_trigger.rs b/client/tests/integration/triggers/time_trigger.rs index 3f05fc05229..e74679661fc 100644 --- a/client/tests/integration/triggers/time_trigger.rs +++ b/client/tests/integration/triggers/time_trigger.rs @@ -273,11 +273,9 @@ fn mint_nft_for_every_user_every_1_sec() -> Result<()> { fn get_block_committed_event_listener( client: &Client, ) -> Result>> { - let block_filter = EventFilterBox::Pipeline( - PipelineEventFilter::new() - .entity_kind(PipelineEntityKind::Block) - .status_kind(PipelineStatusKind::Committed), - ); + let block_filter = PipelineEventFilter::new() + .entity_kind(PipelineEntityKind::Block) + .status_kind(PipelineStatusKind::Committed); client.listen_for_events(block_filter) }