Skip to content

Commit

Permalink
[refactor] hyperledger-iroha#1981, hyperledger-iroha#4195: make liste…
Browse files Browse the repository at this point in the history
…n_for_events and friends implicitly convert event filter types

Signed-off-by: Nikita Strygin <[email protected]>
  • Loading branch information
DCNick3 committed Mar 18, 2024
1 parent 7227946 commit 20e9c6d
Show file tree
Hide file tree
Showing 7 changed files with 22 additions and 25 deletions.
3 changes: 1 addition & 2 deletions client/benches/tps/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,7 @@ impl MeasurerUnit {
let (init_sender, init_receiver) = mpsc::channel();
let event_filter = PipelineEventFilter::new()
.entity_kind(PipelineEntityKind::Block)
.status_kind(PipelineStatusKind::Committed)
.into();
.status_kind(PipelineStatusKind::Committed);
let blocks_expected = self.config.blocks as usize;
let name = self.name;
let handle = thread::spawn(move || -> Result<()> {
Expand Down
17 changes: 10 additions & 7 deletions client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -605,9 +605,7 @@ impl Client {
let mut event_iterator = {
let event_iterator_result = tokio::time::timeout_at(
deadline,
self.listen_for_events_async(
PipelineEventFilter::new().hash(hash.into()).into(),
),
self.listen_for_events_async(PipelineEventFilter::new().hash(hash.into())),
)
.await
.map_err(Into::into)
Expand Down Expand Up @@ -905,8 +903,9 @@ impl Client {
/// - Forwards from [`events_api::EventIterator::new`]
pub fn listen_for_events(
&self,
event_filter: EventFilterBox,
event_filter: impl Into<EventFilterBox>,
) -> Result<impl Iterator<Item = Result<Event>>> {
let event_filter = event_filter.into();
iroha_logger::trace!(?event_filter);
events_api::EventIterator::new(self.events_handler(event_filter)?)
}
Expand All @@ -918,8 +917,9 @@ impl Client {
/// - Forwards from [`events_api::AsyncEventStream::new`]
pub async fn listen_for_events_async(
&self,
event_filter: EventFilterBox,
event_filter: impl Into<EventFilterBox>,
) -> Result<AsyncEventStream> {
let event_filter = event_filter.into();
iroha_logger::trace!(?event_filter, "Async listening with");
events_api::AsyncEventStream::new(self.events_handler(event_filter)?).await
}
Expand All @@ -929,9 +929,12 @@ impl Client {
/// # Errors
/// Fails if handler construction fails
#[inline]
pub fn events_handler(&self, event_filter: EventFilterBox) -> Result<events_api::flow::Init> {
pub fn events_handler(
&self,
event_filter: impl Into<EventFilterBox>,
) -> Result<events_api::flow::Init> {
events_api::flow::Init::new(
event_filter,
event_filter.into(),
self.headers.clone(),
self.torii_url
.join(torii_uri::SUBSCRIPTION)
Expand Down
4 changes: 2 additions & 2 deletions client/tests/integration/events/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ fn transaction_execution_should_produce_events(
let listener = client.clone();
let (init_sender, init_receiver) = mpsc::channel();
let (event_sender, event_receiver) = mpsc::channel();
let event_filter = EventFilterBox::Data(DataEventFilter::ByAny);
let event_filter = DataEventFilter::ByAny;
thread::spawn(move || -> Result<()> {
let event_iterator = listener.listen_for_events(event_filter)?;
init_sender.send(())?;
Expand Down Expand Up @@ -182,7 +182,7 @@ fn produce_multiple_events() -> Result<()> {
let listener = client.clone();
let (init_sender, init_receiver) = mpsc::channel();
let (event_sender, event_receiver) = mpsc::channel();
let event_filter = EventFilterBox::Data(DataEventFilter::ByAny);
let event_filter = DataEventFilter::ByAny;
thread::spawn(move || -> Result<()> {
let event_iterator = listener.listen_for_events(event_filter)?;
init_sender.send(())?;
Expand Down
6 changes: 2 additions & 4 deletions client/tests/integration/events/notification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ fn trigger_completion_success_should_produce_event() -> Result<()> {
NotificationEventFilter::ByTriggerCompleted(TriggerCompletedEventFilter::new(
Some(trigger_id),
Some(TriggerCompletedOutcomeType::Success),
))
.into(),
)),
)?;
if event_it.next().is_some() {
sender.send(())?;
Expand Down Expand Up @@ -87,8 +86,7 @@ fn trigger_completion_failure_should_produce_event() -> Result<()> {
NotificationEventFilter::ByTriggerCompleted(TriggerCompletedEventFilter::new(
Some(trigger_id),
Some(TriggerCompletedOutcomeType::Failure),
))
.into(),
)),
)?;
if event_it.next().is_some() {
sender.send(())?;
Expand Down
7 changes: 3 additions & 4 deletions client/tests/integration/events/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,12 @@ impl Checker {
thread::spawn(move || {
let mut event_iterator = self
.listener
.listen_for_events(EventFilterBox::Pipeline(
.listen_for_events(
PipelineEventFilter::new()
.entity_kind(PipelineEntityKind::Transaction)
.status_kind(status_kind)
.hash(*self.hash),
))
)
.expect("Failed to create event iterator.");
let event_result = event_iterator.next().expect("Stream closed");
let _event = event_result.expect("Must be valid");
Expand All @@ -102,8 +102,7 @@ fn committed_block_must_be_available_in_kura() {

let event_filter = PipelineEventFilter::new()
.entity_kind(PipelineEntityKind::Block)
.status_kind(PipelineStatusKind::Committed)
.into();
.status_kind(PipelineStatusKind::Committed);
let mut event_iter = client
.listen_for_events(event_filter)
.expect("Failed to subscribe for events");
Expand Down
2 changes: 1 addition & 1 deletion client/tests/integration/triggers/by_call_trigger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ fn execute_trigger_should_produce_event() -> Result<()> {
let (sender, receiver) = mpsc::channel();
let _handle = thread::spawn(move || -> Result<()> {
let mut event_it = thread_client
.listen_for_events(ExecuteTriggerEventFilter::new(trigger_id, account_id).into())?;
.listen_for_events(ExecuteTriggerEventFilter::new(trigger_id, account_id))?;
if event_it.next().is_some() {
sender.send(())?;
return Ok(());
Expand Down
8 changes: 3 additions & 5 deletions client/tests/integration/triggers/time_trigger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,11 +273,9 @@ fn mint_nft_for_every_user_every_1_sec() -> Result<()> {
fn get_block_committed_event_listener(
client: &Client,
) -> Result<impl Iterator<Item = Result<Event>>> {
let block_filter = EventFilterBox::Pipeline(
PipelineEventFilter::new()
.entity_kind(PipelineEntityKind::Block)
.status_kind(PipelineStatusKind::Committed),
);
let block_filter = PipelineEventFilter::new()
.entity_kind(PipelineEntityKind::Block)
.status_kind(PipelineStatusKind::Committed);
client.listen_for_events(block_filter)
}

Expand Down

0 comments on commit 20e9c6d

Please sign in to comment.