Skip to content

Commit

Permalink
[refactor] #4315: split pipeline events
Browse files Browse the repository at this point in the history
Signed-off-by: Marin Veršić <[email protected]>
  • Loading branch information
mversic committed Mar 14, 2024
1 parent be0e67f commit 52ee097
Show file tree
Hide file tree
Showing 42 changed files with 1,103 additions and 697 deletions.
2 changes: 1 addition & 1 deletion cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ impl Iroha {
},
);

let queue = Arc::new(Queue::from_config(config.queue));
let queue = Arc::new(Queue::from_config(config.queue, events_sender.clone()));
match Self::start_telemetry(&logger, &config).await? {
TelemetryStartStatus::Started => iroha_logger::info!("Telemetry started"),
TelemetryStartStatus::NotStarted => iroha_logger::warn!("Telemetry not started"),
Expand Down
6 changes: 2 additions & 4 deletions client/benches/tps/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use iroha_client::{
prelude::*,
},
};
use iroha_data_model::events::pipeline::{BlockEventFilter, BlockStatus};
use serde::Deserialize;
use test_network::*;

Expand Down Expand Up @@ -171,10 +172,7 @@ impl MeasurerUnit {
fn spawn_event_counter(&self) -> thread::JoinHandle<Result<()>> {
let listener = self.client.clone();
let (init_sender, init_receiver) = mpsc::channel();
let event_filter = PipelineEventFilter::new()
.entity_kind(PipelineEntityKind::Block)
.status_kind(PipelineStatusKind::Committed)
.into();
let event_filter = BlockEventFilter::default().with_status(BlockStatus::Applied);
let blocks_expected = self.config.blocks as usize;
let name = self.name;
let handle = thread::spawn(move || -> Result<()> {
Expand Down
56 changes: 39 additions & 17 deletions client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ use eyre::{eyre, Result, WrapErr};
use futures_util::StreamExt;
use http_default::{AsyncWebSocketStream, WebSocketStream};
pub use iroha_config::client_api::ConfigDTO;
use iroha_data_model::query::QueryOutputBox;
use iroha_data_model::{
events::pipeline::{BlockStatus, PipelineEventBox, TransactionEventFilter, TransactionStatus},
query::QueryOutputBox,
};
use iroha_logger::prelude::*;
use iroha_telemetry::metrics::Status;
use iroha_torii_const::uri as torii_uri;
Expand Down Expand Up @@ -605,9 +608,7 @@ impl Client {
let mut event_iterator = {
let event_iterator_result = tokio::time::timeout_at(
deadline,
self.listen_for_events_async(
PipelineEventFilter::new().hash(hash.into()).into(),
),
self.listen_for_events_async(TransactionEventFilter::default().with_hash(hash)),
)
.await
.map_err(Into::into)
Expand All @@ -633,17 +634,34 @@ impl Client {
event_iterator: &mut AsyncEventStream,
hash: HashOf<SignedTransaction>,
) -> Result<HashOf<SignedTransaction>> {
let mut block_height = None;

while let Some(event) = event_iterator.next().await {
if let Event::Pipeline(this_event) = event? {
match this_event.status() {
PipelineStatus::Validating => {}
PipelineStatus::Rejected(ref reason) => {
return Err(reason.clone().into());
if let EventBox::Pipeline(this_event) = event? {
match this_event {
PipelineEventBox::Transaction(transaction_event) => {
match transaction_event.status() {
TransactionStatus::Queued => {}
TransactionStatus::Approved => {
block_height = transaction_event.block_height;
}
TransactionStatus::Rejected(reason) => {
return Err(reason.clone().into());
}
TransactionStatus::Expired => return Err(eyre!("Transaction expired")),
}
}
PipelineEventBox::Block(block_event) => {
if Some(block_event.height) == block_height {
if let BlockStatus::Applied = block_event.status() {
return Ok(hash);
}
}
}
PipelineStatus::Committed => return Ok(hash),
}
}
}

Err(eyre!(
"Connection dropped without `Committed` or `Rejected` event"
))
Expand Down Expand Up @@ -905,9 +923,8 @@ impl Client {
/// - Forwards from [`events_api::EventIterator::new`]
pub fn listen_for_events(
&self,
event_filter: FilterBox,
) -> Result<impl Iterator<Item = Result<Event>>> {
iroha_logger::trace!(?event_filter);
event_filter: impl Into<FilterBox>,
) -> Result<impl Iterator<Item = Result<EventBox>>> {
events_api::EventIterator::new(self.events_handler(event_filter)?)
}

Expand All @@ -918,9 +935,8 @@ impl Client {
/// - Forwards from [`events_api::AsyncEventStream::new`]
pub async fn listen_for_events_async(
&self,
event_filter: FilterBox,
event_filter: impl Into<FilterBox> + Send,
) -> Result<AsyncEventStream> {
iroha_logger::trace!(?event_filter, "Async listening with");
events_api::AsyncEventStream::new(self.events_handler(event_filter)?).await
}

Expand All @@ -929,7 +945,13 @@ impl Client {
/// # Errors
/// Fails if handler construction fails
#[inline]
pub fn events_handler(&self, event_filter: FilterBox) -> Result<events_api::flow::Init> {
pub fn events_handler(
&self,
event_filter: impl Into<FilterBox>,
) -> Result<events_api::flow::Init> {
let event_filter = event_filter.into();
iroha_logger::trace!(?event_filter);

events_api::flow::Init::new(
event_filter,
self.headers.clone(),
Expand Down Expand Up @@ -1281,7 +1303,7 @@ pub mod events_api {
pub struct Events;

impl FlowEvents for Events {
type Event = crate::data_model::prelude::Event;
type Event = crate::data_model::prelude::EventBox;

fn message(&self, message: Vec<u8>) -> Result<Self::Event> {
let event_socket_message = EventMessage::decode_all(&mut message.as_slice())?;
Expand Down
13 changes: 7 additions & 6 deletions client/tests/integration/asset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use iroha_config::parameters::actual::Root as Config;
use iroha_data_model::{
asset::{AssetId, AssetValue, AssetValueType},
isi::error::{InstructionEvaluationError, InstructionExecutionError, Mismatch, TypeError},
transaction::error::TransactionRejectionReason,
};
use serde_json::json;
use test_network::*;
Expand Down Expand Up @@ -463,17 +464,17 @@ fn fail_if_dont_satisfy_spec() {
.expect_err("Should be rejected due to non integer value");

let rejection_reason = err
.downcast_ref::<PipelineRejectionReason>()
.unwrap_or_else(|| panic!("Error {err} is not PipelineRejectionReason"));
.downcast_ref::<TransactionRejectionReason>()
.unwrap_or_else(|| panic!("Error {err} is not TransactionRejectionReason"));

assert_eq!(
rejection_reason,
&PipelineRejectionReason::Transaction(TransactionRejectionReason::Validation(
ValidationFail::InstructionFailed(InstructionExecutionError::Evaluate(
InstructionEvaluationError::Type(TypeError::from(Mismatch {
&TransactionRejectionReason::Validation(ValidationFail::InstructionFailed(
InstructionExecutionError::Evaluate(InstructionEvaluationError::Type(
TypeError::from(Mismatch {
expected: AssetValueType::Numeric(NumericSpec::integer()),
actual: AssetValueType::Numeric(NumericSpec::fractional(2))
}))
})
))
))
);
Expand Down
9 changes: 4 additions & 5 deletions client/tests/integration/domain_owner_permissions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use iroha_client::{
crypto::KeyPair,
data_model::{account::SignatureCheckCondition, prelude::*},
};
use iroha_data_model::transaction::error::TransactionRejectionReason;
use serde_json::json;
use test_network::*;

Expand Down Expand Up @@ -37,14 +38,12 @@ fn domain_owner_domain_permissions() -> Result<()> {
.expect_err("Tx should fail due to permissions");

let rejection_reason = err
.downcast_ref::<PipelineRejectionReason>()
.unwrap_or_else(|| panic!("Error {err} is not PipelineRejectionReason"));
.downcast_ref::<TransactionRejectionReason>()
.unwrap_or_else(|| panic!("Error {err} is not TransactionRejectionReason"));

assert!(matches!(
rejection_reason,
&PipelineRejectionReason::Transaction(TransactionRejectionReason::Validation(
ValidationFail::NotPermitted(_)
))
&TransactionRejectionReason::Validation(ValidationFail::NotPermitted(_))
));

// "alice@wonderland" owns the domain and can register AssetDefinitions by default as domain owner
Expand Down
4 changes: 2 additions & 2 deletions client/tests/integration/events/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ fn transaction_execution_should_produce_events(
let listener = client.clone();
let (init_sender, init_receiver) = mpsc::channel();
let (event_sender, event_receiver) = mpsc::channel();
let event_filter = DataEventFilter::AcceptAll.into();
let event_filter = DataEventFilter::AcceptAll;
thread::spawn(move || -> Result<()> {
let event_iterator = listener.listen_for_events(event_filter)?;
init_sender.send(())?;
Expand Down Expand Up @@ -182,7 +182,7 @@ fn produce_multiple_events() -> Result<()> {
let listener = client.clone();
let (init_sender, init_receiver) = mpsc::channel();
let (event_sender, event_receiver) = mpsc::channel();
let event_filter = DataEventFilter::AcceptAll.into();
let event_filter = DataEventFilter::AcceptAll;
thread::spawn(move || -> Result<()> {
let event_iterator = listener.listen_for_events(event_filter)?;
init_sender.send(())?;
Expand Down
6 changes: 2 additions & 4 deletions client/tests/integration/events/notification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ fn trigger_completion_success_should_produce_event() -> Result<()> {
NotificationEventFilter::TriggerCompleted(TriggerCompletedEventFilter::new(
Some(trigger_id),
Some(TriggerCompletedOutcomeType::Success),
))
.into(),
)),
)?;
if event_it.next().is_some() {
sender.send(())?;
Expand Down Expand Up @@ -87,8 +86,7 @@ fn trigger_completion_failure_should_produce_event() -> Result<()> {
NotificationEventFilter::TriggerCompleted(TriggerCompletedEventFilter::new(
Some(trigger_id),
Some(TriggerCompletedOutcomeType::Failure),
))
.into(),
)),
)?;
if event_it.next().is_some() {
sender.send(())?;
Expand Down
62 changes: 33 additions & 29 deletions client/tests/integration/events/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,14 @@ use iroha_client::{
},
};
use iroha_config::parameters::actual::Root as Config;
use iroha_data_model::{
events::pipeline::{
BlockEvent, BlockEventFilter, BlockStatus, TransactionEventFilter, TransactionStatus,
},
isi::error::InstructionExecutionError,
transaction::error::TransactionRejectionReason,
ValidationFail,
};
use test_network::*;

// Needed to re-enable ignored tests.
Expand All @@ -17,24 +25,28 @@ const PEER_COUNT: usize = 7;
#[ignore = "ignore, more in #2851"]
#[test]
fn transaction_with_no_instructions_should_be_committed() -> Result<()> {
test_with_instruction_and_status_and_port(None, PipelineStatusKind::Committed, 10_250)
test_with_instruction_and_status_and_port(None, TransactionStatus::Approved, 10_250)
}

#[ignore = "ignore, more in #2851"]
// #[ignore = "Experiment"]
#[test]
fn transaction_with_fail_instruction_should_be_rejected() -> Result<()> {
let fail = Fail::new("Should be rejected".to_owned());
let msg = "Should be rejected".to_owned();

let fail = Fail::new(msg.clone());
test_with_instruction_and_status_and_port(
Some(fail.into()),
PipelineStatusKind::Rejected,
TransactionStatus::Rejected(Box::new(TransactionRejectionReason::Validation(
ValidationFail::InstructionFailed(InstructionExecutionError::Fail(msg)),
))),
10_350,
)
}

fn test_with_instruction_and_status_and_port(
instruction: Option<InstructionBox>,
should_be: PipelineStatusKind,
should_be: TransactionStatus,
port: u16,
) -> Result<()> {
let (_rt, network, client) =
Expand All @@ -56,9 +68,9 @@ fn test_with_instruction_and_status_and_port(
let mut handles = Vec::new();
for listener in clients {
let checker = Checker { listener, hash };
let handle_validating = checker.clone().spawn(PipelineStatusKind::Validating);
let handle_validating = checker.clone().spawn(TransactionStatus::Queued);
handles.push(handle_validating);
let handle_validated = checker.spawn(should_be);
let handle_validated = checker.spawn(should_be.clone());
handles.push(handle_validated);
}
// When
Expand All @@ -78,16 +90,15 @@ struct Checker {
}

impl Checker {
fn spawn(self, status_kind: PipelineStatusKind) -> JoinHandle<()> {
fn spawn(self, status_kind: TransactionStatus) -> JoinHandle<()> {
thread::spawn(move || {
let mut event_iterator = self
.listener
.listen_for_events(FilterBox::Pipeline(
PipelineEventFilter::new()
.entity_kind(PipelineEntityKind::Transaction)
.status_kind(status_kind)
.hash(*self.hash),
))
.listen_for_events(
TransactionEventFilter::default()
.with_status(status_kind)
.with_hash(self.hash),
)
.expect("Failed to create event iterator.");
let event_result = event_iterator.next().expect("Stream closed");
let _event = event_result.expect("Must be valid");
Expand All @@ -96,14 +107,11 @@ impl Checker {
}

#[test]
fn committed_block_must_be_available_in_kura() {
fn applied_block_must_be_available_in_kura() {
let (_rt, peer, client) = <PeerBuilder>::new().with_port(11_040).start_with_runtime();
wait_for_genesis_committed(&[client.clone()], 0);

let event_filter = PipelineEventFilter::new()
.entity_kind(PipelineEntityKind::Block)
.status_kind(PipelineStatusKind::Committed)
.into();
let event_filter = BlockEventFilter::default().with_status(BlockStatus::Committed);
let mut event_iter = client
.listen_for_events(event_filter)
.expect("Failed to subscribe for events");
Expand All @@ -112,21 +120,17 @@ fn committed_block_must_be_available_in_kura() {
.submit(Fail::new("Dummy instruction".to_owned()))
.expect("Failed to submit transaction");

let event = event_iter.next().expect("Block must be committed");
let Ok(Event::Pipeline(PipelineEvent {
entity_kind: PipelineEntityKind::Block,
status: PipelineStatus::Committed,
hash,
})) = event
else {
panic!("Received unexpected event")
};
let hash = HashOf::from_untyped_unchecked(hash);
let event: BlockEvent = event_iter
.next()
.expect("Block must be committed")
.expect("Block must be committed")
.try_into()
.expect("Received unexpected event");

peer.iroha
.as_ref()
.expect("Must be some")
.kura
.get_block_height_by_hash(&hash)
.get_block_by_height(event.height)
.expect("Block committed event was received earlier");
}
Loading

0 comments on commit 52ee097

Please sign in to comment.