Skip to content

Commit

Permalink
[refactor] #1981, #4195, #3068: Unify various event filter APIs, intr…
Browse files Browse the repository at this point in the history
…oduce a fluent builder API

- All event filters implement Debug, Clone, Eq, Ord
- All event filters (except TimeEventFilter) have a similar fluent builder API
    Event filter starts by accepting anything, with each method call limiting the accepted events
- Structs with hidden fields provide getters & builder APIs to create them
- Data event matchers are no longer prefixed with `By` to make them read better inside the `only_events` method call

Signed-off-by: Nikita Strygin <[email protected]>
  • Loading branch information
DCNick3 committed Mar 18, 2024
1 parent 580d1c4 commit a5113ee
Show file tree
Hide file tree
Showing 22 changed files with 850 additions and 400 deletions.
4 changes: 2 additions & 2 deletions client/benches/tps/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,8 @@ impl MeasurerUnit {
let listener = self.client.clone();
let (init_sender, init_receiver) = mpsc::channel();
let event_filter = PipelineEventFilter::new()
.entity_kind(PipelineEntityKind::Block)
.status_kind(PipelineStatusKind::Committed);
.from_entity_of_kind(PipelineEntityKind::Block)
.with_status(PipelineStatusKind::Committed);
let blocks_expected = self.config.blocks as usize;
let name = self.name;
let handle = thread::spawn(move || -> Result<()> {
Expand Down
6 changes: 4 additions & 2 deletions client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -605,7 +605,9 @@ impl Client {
let mut event_iterator = {
let event_iterator_result = tokio::time::timeout_at(
deadline,
self.listen_for_events_async(PipelineEventFilter::new().hash(hash.into())),
self.listen_for_events_async(
PipelineEventFilter::new().from_entity_with_hash(hash.into()),
),
)
.await
.map_err(Into::into)
Expand Down Expand Up @@ -917,7 +919,7 @@ impl Client {
/// - Forwards from [`events_api::AsyncEventStream::new`]
pub async fn listen_for_events_async(
&self,
event_filter: impl Into<EventFilterBox>,
event_filter: impl Into<EventFilterBox> + Send,
) -> Result<AsyncEventStream> {
let event_filter = event_filter.into();
iroha_logger::trace!(?event_filter, "Async listening with");
Expand Down
8 changes: 3 additions & 5 deletions client/tests/integration/domain_owner_permissions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,11 +309,9 @@ fn domain_owner_trigger_permissions() -> Result<()> {
trigger_instructions,
Repeats::from(2_u32),
bob_id,
// FIXME: due to restriction in `ExecuteTriggerEventFilter` it's impossible to execute trigger on behalf of multiple users
TriggeringEventFilterBox::ExecuteTrigger(ExecuteTriggerEventFilter::new(
trigger_id.clone(),
alice_id,
)),
TriggeringEventFilterBox::ExecuteTrigger(
ExecuteTriggerEventFilter::new().from_trigger(trigger_id.clone()),
),
),
));
test_client.submit_blocking(register_trigger)?;
Expand Down
4 changes: 2 additions & 2 deletions client/tests/integration/events/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ fn transaction_execution_should_produce_events(
let listener = client.clone();
let (init_sender, init_receiver) = mpsc::channel();
let (event_sender, event_receiver) = mpsc::channel();
let event_filter = DataEventFilter::ByAny;
let event_filter = DataEventFilter::Any;
thread::spawn(move || -> Result<()> {
let event_iterator = listener.listen_for_events(event_filter)?;
init_sender.send(())?;
Expand Down Expand Up @@ -182,7 +182,7 @@ fn produce_multiple_events() -> Result<()> {
let listener = client.clone();
let (init_sender, init_receiver) = mpsc::channel();
let (event_sender, event_receiver) = mpsc::channel();
let event_filter = DataEventFilter::ByAny;
let event_filter = DataEventFilter::Any;
thread::spawn(move || -> Result<()> {
let event_iterator = listener.listen_for_events(event_filter)?;
init_sender.send(())?;
Expand Down
42 changes: 22 additions & 20 deletions client/tests/integration/events/notification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@ fn trigger_completion_success_should_produce_event() -> Result<()> {
vec![InstructionBox::from(instruction)],
Repeats::Indefinitely,
asset_id.account_id.clone(),
TriggeringEventFilterBox::ExecuteTrigger(ExecuteTriggerEventFilter::new(
trigger_id.clone(),
asset_id.account_id,
)),
TriggeringEventFilterBox::ExecuteTrigger(
ExecuteTriggerEventFilter::new()
.from_trigger(trigger_id.clone())
.under_authority(asset_id.account_id),
),
),
));
test_client.submit_blocking(register_trigger)?;
Expand All @@ -34,12 +35,12 @@ fn trigger_completion_success_should_produce_event() -> Result<()> {
let thread_client = test_client.clone();
let (sender, receiver) = mpsc::channel();
let _handle = thread::spawn(move || -> Result<()> {
let mut event_it = thread_client.listen_for_events(
NotificationEventFilter::ByTriggerCompleted(TriggerCompletedEventFilter::new(
Some(trigger_id),
Some(TriggerCompletedOutcomeType::Success),
)),
)?;
let mut event_it =
thread_client.listen_for_events(NotificationEventFilter::ByTriggerCompleted(
TriggerCompletedEventFilter::new()
.from_trigger(trigger_id)
.with_outcome(TriggerCompletedOutcomeType::Success),
))?;
if event_it.next().is_some() {
sender.send(())?;
return Ok(());
Expand Down Expand Up @@ -69,10 +70,11 @@ fn trigger_completion_failure_should_produce_event() -> Result<()> {
vec![InstructionBox::from(instruction)],
Repeats::Indefinitely,
account_id.clone(),
TriggeringEventFilterBox::ExecuteTrigger(ExecuteTriggerEventFilter::new(
trigger_id.clone(),
account_id,
)),
TriggeringEventFilterBox::ExecuteTrigger(
ExecuteTriggerEventFilter::new()
.from_trigger(trigger_id.clone())
.under_authority(account_id),
),
),
));
test_client.submit_blocking(register_trigger)?;
Expand All @@ -82,12 +84,12 @@ fn trigger_completion_failure_should_produce_event() -> Result<()> {
let thread_client = test_client.clone();
let (sender, receiver) = mpsc::channel();
let _handle = thread::spawn(move || -> Result<()> {
let mut event_it = thread_client.listen_for_events(
NotificationEventFilter::ByTriggerCompleted(TriggerCompletedEventFilter::new(
Some(trigger_id),
Some(TriggerCompletedOutcomeType::Failure),
)),
)?;
let mut event_it =
thread_client.listen_for_events(NotificationEventFilter::ByTriggerCompleted(
TriggerCompletedEventFilter::new()
.from_trigger(trigger_id)
.with_outcome(TriggerCompletedOutcomeType::Failure),
))?;
if event_it.next().is_some() {
sender.send(())?;
return Ok(());
Expand Down
10 changes: 5 additions & 5 deletions client/tests/integration/events/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,9 @@ impl Checker {
.listener
.listen_for_events(
PipelineEventFilter::new()
.entity_kind(PipelineEntityKind::Transaction)
.status_kind(status_kind)
.hash(*self.hash),
.from_entity_of_kind(PipelineEntityKind::Transaction)
.with_status(status_kind)
.from_entity_with_hash(*self.hash),
)
.expect("Failed to create event iterator.");
let event_result = event_iterator.next().expect("Stream closed");
Expand All @@ -101,8 +101,8 @@ fn committed_block_must_be_available_in_kura() {
wait_for_genesis_committed(&[client.clone()], 0);

let event_filter = PipelineEventFilter::new()
.entity_kind(PipelineEntityKind::Block)
.status_kind(PipelineStatusKind::Committed);
.from_entity_of_kind(PipelineEntityKind::Block)
.with_status(PipelineStatusKind::Committed);
let mut event_iter = client
.listen_for_events(event_filter)
.expect("Failed to subscribe for events");
Expand Down
97 changes: 55 additions & 42 deletions client/tests/integration/triggers/by_call_trigger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,11 @@ fn execute_trigger_should_produce_event() -> Result<()> {
let thread_client = test_client.clone();
let (sender, receiver) = mpsc::channel();
let _handle = thread::spawn(move || -> Result<()> {
let mut event_it = thread_client
.listen_for_events(ExecuteTriggerEventFilter::new(trigger_id, account_id))?;
let mut event_it = thread_client.listen_for_events(
ExecuteTriggerEventFilter::new()
.from_trigger(trigger_id)
.under_authority(account_id),
)?;
if event_it.next().is_some() {
sender.send(())?;
return Ok(());
Expand Down Expand Up @@ -121,10 +124,11 @@ fn trigger_failure_should_not_cancel_other_triggers_execution() -> Result<()> {
bad_trigger_instructions,
Repeats::Indefinitely,
account_id.clone(),
TriggeringEventFilterBox::ExecuteTrigger(ExecuteTriggerEventFilter::new(
bad_trigger_id.clone(),
account_id.clone(),
)),
TriggeringEventFilterBox::ExecuteTrigger(
ExecuteTriggerEventFilter::new()
.from_trigger(bad_trigger_id.clone())
.under_authority(account_id.clone()),
),
),
));
test_client.submit(register_bad_trigger)?;
Expand Down Expand Up @@ -176,10 +180,11 @@ fn trigger_should_not_be_executed_with_zero_repeats_count() -> Result<()> {
trigger_instructions,
Repeats::from(1_u32),
account_id.clone(),
TriggeringEventFilterBox::ExecuteTrigger(ExecuteTriggerEventFilter::new(
trigger_id.clone(),
account_id,
)),
TriggeringEventFilterBox::ExecuteTrigger(
ExecuteTriggerEventFilter::new()
.from_trigger(trigger_id.clone())
.under_authority(account_id),
),
),
));
test_client.submit_blocking(register_trigger)?;
Expand Down Expand Up @@ -240,10 +245,11 @@ fn trigger_should_be_able_to_modify_its_own_repeats_count() -> Result<()> {
trigger_instructions,
Repeats::from(1_u32),
account_id.clone(),
TriggeringEventFilterBox::ExecuteTrigger(ExecuteTriggerEventFilter::new(
trigger_id.clone(),
account_id,
)),
TriggeringEventFilterBox::ExecuteTrigger(
ExecuteTriggerEventFilter::new()
.from_trigger(trigger_id.clone())
.under_authority(account_id),
),
),
));
test_client.submit_blocking(register_trigger)?;
Expand Down Expand Up @@ -283,10 +289,11 @@ fn unregister_trigger() -> Result<()> {
Vec::<InstructionBox>::new(),
Repeats::Indefinitely,
account_id.clone(),
TriggeringEventFilterBox::ExecuteTrigger(ExecuteTriggerEventFilter::new(
trigger_id.clone(),
account_id,
)),
TriggeringEventFilterBox::ExecuteTrigger(
ExecuteTriggerEventFilter::new()
.from_trigger(trigger_id.clone())
.under_authority(account_id),
),
),
);
let register_trigger = Register::trigger(trigger.clone());
Expand Down Expand Up @@ -360,10 +367,11 @@ fn trigger_in_genesis_using_base64() -> Result<()> {
.wrap_err("Can't deserialize wasm using base64")?,
Repeats::Indefinitely,
account_id.clone(),
TriggeringEventFilterBox::ExecuteTrigger(ExecuteTriggerEventFilter::new(
trigger_id.clone(),
account_id.clone(),
)),
TriggeringEventFilterBox::ExecuteTrigger(
ExecuteTriggerEventFilter::new()
.from_trigger(trigger_id.clone())
.under_authority(account_id.clone()),
),
),
);

Expand Down Expand Up @@ -410,10 +418,11 @@ fn trigger_should_be_able_to_modify_other_trigger() -> Result<()> {
trigger_unregister_instructions,
Repeats::from(1_u32),
account_id.clone(),
TriggeringEventFilterBox::ExecuteTrigger(ExecuteTriggerEventFilter::new(
trigger_id_unregister.clone(),
account_id.clone(),
)),
TriggeringEventFilterBox::ExecuteTrigger(
ExecuteTriggerEventFilter::new()
.from_trigger(trigger_id_unregister.clone())
.under_authority(account_id.clone()),
),
),
));
test_client.submit_blocking(register_trigger)?;
Expand All @@ -426,10 +435,11 @@ fn trigger_should_be_able_to_modify_other_trigger() -> Result<()> {
trigger_should_be_unregistered_instructions,
Repeats::from(1_u32),
account_id.clone(),
TriggeringEventFilterBox::ExecuteTrigger(ExecuteTriggerEventFilter::new(
trigger_id_to_be_unregistered.clone(),
account_id,
)),
TriggeringEventFilterBox::ExecuteTrigger(
ExecuteTriggerEventFilter::new()
.from_trigger(trigger_id_to_be_unregistered.clone())
.under_authority(account_id),
),
),
));
test_client.submit_blocking(register_trigger)?;
Expand Down Expand Up @@ -470,10 +480,11 @@ fn trigger_burn_repetitions() -> Result<()> {
trigger_instructions,
Repeats::from(1_u32),
account_id.clone(),
TriggeringEventFilterBox::ExecuteTrigger(ExecuteTriggerEventFilter::new(
trigger_id.clone(),
account_id,
)),
TriggeringEventFilterBox::ExecuteTrigger(
ExecuteTriggerEventFilter::new()
.from_trigger(trigger_id.clone())
.under_authority(account_id),
),
),
));
test_client.submit_blocking(register_trigger)?;
Expand Down Expand Up @@ -514,10 +525,11 @@ fn unregistering_one_of_two_triggers_with_identical_wasm_should_not_cause_origin
wasm.clone(),
Repeats::Indefinitely,
account_id.clone(),
TriggeringEventFilterBox::ExecuteTrigger(ExecuteTriggerEventFilter::new(
trigger_id,
account_id.clone(),
)),
TriggeringEventFilterBox::ExecuteTrigger(
ExecuteTriggerEventFilter::new()
.from_trigger(trigger_id)
.under_authority(account_id.clone()),
),
),
)
};
Expand Down Expand Up @@ -564,10 +576,11 @@ fn build_register_trigger_isi(
trigger_instructions,
Repeats::Indefinitely,
asset_id.account_id.clone(),
TriggeringEventFilterBox::ExecuteTrigger(ExecuteTriggerEventFilter::new(
trigger_id,
asset_id.account_id,
)),
TriggeringEventFilterBox::ExecuteTrigger(
ExecuteTriggerEventFilter::new()
.from_trigger(trigger_id)
.under_authority(asset_id.account_id),
),
),
))
}
21 changes: 9 additions & 12 deletions client/tests/integration/triggers/data_trigger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,9 @@ fn must_execute_both_triggers() -> Result<()> {
[instruction.clone()],
Repeats::Indefinitely,
account_id.clone(),
TriggeringEventFilterBox::Data(DataEventFilter::ByAccount(AccountEventFilter {
id_matcher: None,
event_matcher: Some(AccountEventMatcher::ByCreated),
})),
TriggeringEventFilterBox::Data(DataEventFilter::Account(
AccountEventFilter::new().only_events(AccountEventMatcher::Created),
)),
),
));
test_client.submit_blocking(register_trigger)?;
Expand All @@ -37,10 +36,9 @@ fn must_execute_both_triggers() -> Result<()> {
[instruction],
Repeats::Indefinitely,
account_id,
TriggeringEventFilterBox::Data(DataEventFilter::ByDomain(DomainEventFilter {
id_matcher: None,
event_matcher: Some(DomainEventMatcher::ByCreated),
})),
TriggeringEventFilterBox::Data(DataEventFilter::Domain(
DomainEventFilter::new().only_events(DomainEventMatcher::Created),
)),
),
));
test_client.submit_blocking(register_trigger)?;
Expand Down Expand Up @@ -90,10 +88,9 @@ fn domain_scoped_trigger_must_be_executed_only_on_events_in_its_domain() -> Resu
[Mint::asset_numeric(1u32, asset_id.clone())],
Repeats::Indefinitely,
account_id,
TriggeringEventFilterBox::Data(DataEventFilter::ByAccount(AccountEventFilter {
id_matcher: None,
event_matcher: Some(AccountEventMatcher::ByCreated),
})),
TriggeringEventFilterBox::Data(DataEventFilter::Account(
AccountEventFilter::new().only_events(AccountEventMatcher::Created),
)),
),
));
test_client.submit_blocking(register_trigger)?;
Expand Down
7 changes: 2 additions & 5 deletions client/tests/integration/triggers/event_trigger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,8 @@ fn test_mint_asset_when_new_asset_definition_created() -> Result<()> {
vec![instruction],
Repeats::Indefinitely,
account_id,
TriggeringEventFilterBox::Data(DataEventFilter::ByAssetDefinition(
AssetDefinitionEventFilter {
id_matcher: None,
event_matcher: Some(AssetDefinitionEventMatcher::ByCreated),
},
TriggeringEventFilterBox::Data(DataEventFilter::AssetDefinition(
AssetDefinitionEventFilter::new().only_events(AssetDefinitionEventMatcher::Created),
)),
),
));
Expand Down
4 changes: 2 additions & 2 deletions client/tests/integration/triggers/time_trigger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,8 +274,8 @@ fn get_block_committed_event_listener(
client: &Client,
) -> Result<impl Iterator<Item = Result<Event>>> {
let block_filter = PipelineEventFilter::new()
.entity_kind(PipelineEntityKind::Block)
.status_kind(PipelineStatusKind::Committed);
.from_entity_of_kind(PipelineEntityKind::Block)
.with_status(PipelineStatusKind::Committed);
client.listen_for_events(block_filter)
}

Expand Down
Loading

0 comments on commit a5113ee

Please sign in to comment.