Skip to content

Commit

Permalink
[refactor] #4315: split pipeline events
Browse files Browse the repository at this point in the history
Signed-off-by: Marin Veršić <[email protected]>
  • Loading branch information
mversic committed Mar 22, 2024
1 parent 47f278d commit 2dcde0e
Show file tree
Hide file tree
Showing 83 changed files with 1,497 additions and 1,074 deletions.
10 changes: 5 additions & 5 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -242,12 +242,12 @@ Set the `LOG_FILE_PATH` environment variable to an appropriate location to store
<details> <summary> Expand to learn how to compile iroha with tokio console support.</summary>
Sometimes it might be helpful for debugging to analyze tokio tasks using [tokio-console](https://github.com/tokio-rs/console).
Sometimes it might be helpful for debugging to analyze tokio tasks using [tokio_console](https://github.com/tokio-rs/console).
In this case you should compile iroha with support of tokio console like that:
```bash
RUSTFLAGS="--cfg tokio_unstable" cargo build --features tokio-console
RUSTFLAGS="--cfg tokio_unstable" cargo build --features tokio_console
```

Port for tokio console can by configured through `LOG_TOKIO_CONSOLE_ADDR` configuration parameter (or environment variable).
Expand All @@ -257,11 +257,11 @@ Example of running iroha with tokio console support using `scripts/test_env.sh`:

```bash
# 1. Compile iroha
RUSTFLAGS="--cfg tokio_unstable" cargo build --features tokio-console
RUSTFLAGS="--cfg tokio_unstable" cargo build --features tokio_console
# 2. Run iroha with TRACE log level
LOG_LEVEL=TRACE ./scripts/test_env.sh setup
# 3. Access iroha. Peers will be available on ports 5555, 5556, ...
tokio-console http://127.0.0.1:5555
tokio_console http://127.0.0.1:5555
```

</details>
Expand All @@ -272,7 +272,7 @@ tokio-console http://127.0.0.1:5555

To optimize performance it's useful to profile iroha.

To do that you should compile iroha with `profiling` profile and with `profiling` feature:
To do that you should compile iroha with `profiling` profile and with `profiling` feature:

```bash
RUSTFLAGS="-C force-frame-pointers=on" cargo +nightly -Z build-std build --target your-desired-target --profile profiling --features profiling
Expand Down
12 changes: 6 additions & 6 deletions cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,19 @@ categories.workspace = true
workspace = true

[features]
default = ["telemetry", "schema-endpoint"]
default = ["telemetry", "schema_endpoint"]

# Support lightweight telemetry, including diagnostics
telemetry = ["iroha_telemetry", "iroha_core/telemetry", "iroha_torii/telemetry"]
# Support developer-specific telemetry.
# Should not be enabled on production builds.
dev-telemetry = ["iroha_core/dev-telemetry", "iroha_telemetry"]
dev_telemetry = ["iroha_core/dev_telemetry", "iroha_telemetry"]
# Support schema generation from the `schema` endpoint in the local binary.
# Useful for debugging issues with decoding in SDKs.
schema-endpoint = ["iroha_torii/schema"]
schema_endpoint = ["iroha_torii/schema"]
# Support internal testing infrastructure for integration tests.
# Disable in production.
test-network = ["thread-local-panic-hook"]
test_network = ["thread-local-panic-hook"]

[badges]
is-it-maintained-issue-resolution = { repository = "https://github.com/hyperledger/iroha" }
Expand Down Expand Up @@ -79,8 +79,8 @@ vergen = { workspace = true, features = ["cargo"] }

[package.metadata.cargo-all-features]
denylist = [
"schema-endpoint",
"schema_endpoint",
"telemetry",
"test-network",
"test_network",
]
skip_optional_dependencies = true
4 changes: 2 additions & 2 deletions cli/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@ The results of the compilation can be found in `<IROHA REPO ROOT>/target/release
To add optional features, use ``--features``. For example, to add the support for _dev_telemetry_, run:

```bash
cargo build --release --features dev-telemetry
cargo build --release --features dev_telemetry
```

A full list of features can be found in the [cargo manifest file](Cargo.toml) for this crate.

### Disable default features

By default, the Iroha binary is compiled with the `telemetry`, and `schema-endpoint` features. If you wish to remove those features, add `--no-default-features` to the command.
By default, the Iroha binary is compiled with the `telemetry`, and `schema_endpoint` features. If you wish to remove those features, add `--no-default-features` to the command.

```bash
cargo build --release --no-default-features
Expand Down
12 changes: 6 additions & 6 deletions cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ impl NetworkRelay {

impl Iroha {
fn prepare_panic_hook(notify_shutdown: Arc<Notify>) {
#[cfg(not(feature = "test-network"))]
#[cfg(not(feature = "test_network"))]
use std::panic::set_hook;

// This is a hot-fix for tests
Expand All @@ -160,7 +160,7 @@ impl Iroha {
//
// Remove this when all Rust integrations tests will be converted to a
// separate Python tests.
#[cfg(feature = "test-network")]
#[cfg(feature = "test_network")]
use thread_local_panic_hook::set_hook;

set_hook(Box::new(move |info| {
Expand Down Expand Up @@ -251,7 +251,7 @@ impl Iroha {
});
let state = Arc::new(state);

let queue = Arc::new(Queue::from_config(config.queue));
let queue = Arc::new(Queue::from_config(config.queue, events_sender.clone()));
match Self::start_telemetry(&logger, &config).await? {
TelemetryStartStatus::Started => iroha_logger::info!("Telemetry started"),
TelemetryStartStatus::NotStarted => iroha_logger::warn!("Telemetry not started"),
Expand Down Expand Up @@ -369,7 +369,7 @@ impl Iroha {
///
/// # Errors
/// - Forwards initialisation error.
#[cfg(feature = "test-network")]
#[cfg(feature = "test_network")]
pub fn start_as_task(&mut self) -> Result<tokio::task::JoinHandle<eyre::Result<()>>> {
iroha_logger::info!("Starting Iroha as task");
let torii = self
Expand All @@ -386,7 +386,7 @@ impl Iroha {
logger: &LoggerHandle,
config: &Config,
) -> Result<TelemetryStartStatus> {
#[cfg(feature = "dev-telemetry")]
#[cfg(feature = "dev_telemetry")]
{
if let Some(config) = &config.dev_telemetry {
let receiver = logger
Expand Down Expand Up @@ -539,7 +539,7 @@ mod tests {

use super::*;

#[cfg(not(feature = "test-network"))]
#[cfg(not(feature = "test_network"))]
mod no_test_network {
use std::{iter::repeat, panic, thread};

Expand Down
12 changes: 6 additions & 6 deletions client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,24 +24,24 @@ maintenance = { status = "actively-developed" }

[features]
# Use rustls by default to avoid OpenSSL dependency, simplifying compilation with musl
default = ["tls-rustls-native-roots"]
default = ["tls_rustls_native_roots"]

tls-native = [
tls_native = [
"attohttpc/tls-native",
"tokio-tungstenite/native-tls",
"tungstenite/native-tls",
]
tls-native-vendored = [
tls_native_vendored = [
"attohttpc/tls-native-vendored",
"tokio-tungstenite/native-tls-vendored",
"tungstenite/native-tls-vendored",
]
tls-rustls-native-roots = [
tls_rustls_native_roots = [
"attohttpc/tls-rustls-native-roots",
"tokio-tungstenite/rustls-tls-native-roots",
"tungstenite/rustls-tls-native-roots",
]
tls-rustls-webpki-roots = [
tls_rustls_webpki_roots = [
"attohttpc/tls-rustls-webpki-roots",
"tokio-tungstenite/rustls-tls-webpki-roots",
"tungstenite/rustls-tls-webpki-roots",
Expand Down Expand Up @@ -83,7 +83,7 @@ iroha_wasm_builder = { workspace = true }
# TODO: These three activate `transparent_api` but client should never activate this feature.
# Additionally there is a dependency on iroha_core in dev-dependencies in telemetry/derive
# Hopefully, once the integration tests migration is finished these can be removed
iroha = { workspace = true, features = ["dev-telemetry", "telemetry"] }
iroha = { workspace = true, features = ["dev_telemetry", "telemetry"] }
iroha_genesis = { workspace = true }
test_network = { workspace = true }

Expand Down
7 changes: 3 additions & 4 deletions client/benches/tps/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use iroha_client::{
prelude::*,
},
};
use iroha_data_model::events::pipeline::{BlockEventFilter, BlockStatus};
use serde::Deserialize;
use test_network::*;

Expand Down Expand Up @@ -172,13 +173,11 @@ impl MeasurerUnit {
fn spawn_event_counter(&self) -> thread::JoinHandle<Result<()>> {
let listener = self.client.clone();
let (init_sender, init_receiver) = mpsc::channel();
let event_filter = PipelineEventFilter::new()
.for_entity(PipelineEntityKind::Block)
.for_status(PipelineStatusKind::Committed);
let event_filter = BlockEventFilter::default().for_status(BlockStatus::Applied);
let blocks_expected = self.config.blocks as usize;
let name = self.name;
let handle = thread::spawn(move || -> Result<()> {
let mut event_iterator = listener.listen_for_events(event_filter)?;
let mut event_iterator = listener.listen_for_events([event_filter])?;
init_sender.send(())?;
for i in 1..=blocks_expected {
let _event = event_iterator.next().expect("Event stream closed")?;
Expand Down
98 changes: 61 additions & 37 deletions client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,13 @@ use eyre::{eyre, Result, WrapErr};
use futures_util::StreamExt;
use http_default::{AsyncWebSocketStream, WebSocketStream};
pub use iroha_config::client_api::ConfigDTO;
use iroha_data_model::query::QueryOutputBox;
use iroha_data_model::{
events::pipeline::{
BlockEventFilter, BlockStatus, PipelineEventBox, PipelineEventFilterBox,
TransactionEventFilter, TransactionStatus,
},
query::QueryOutputBox,
};
use iroha_logger::prelude::*;
use iroha_telemetry::metrics::Status;
use iroha_torii_const::uri as torii_uri;
Expand Down Expand Up @@ -603,14 +609,19 @@ impl Client {

rt.block_on(async {
let mut event_iterator = {
let event_iterator_result = tokio::time::timeout_at(
deadline,
self.listen_for_events_async(PipelineEventFilter::new().for_hash(hash.into())),
)
.await
.map_err(Into::into)
.and_then(std::convert::identity)
.wrap_err("Failed to establish event listener connection");
let filters = vec![
TransactionEventFilter::default().for_hash(hash).into(),
PipelineEventFilterBox::from(
BlockEventFilter::default().for_status(BlockStatus::Applied),
),
];

let event_iterator_result =
tokio::time::timeout_at(deadline, self.listen_for_events_async(filters))
.await
.map_err(Into::into)
.and_then(std::convert::identity)
.wrap_err("Failed to establish event listener connection");
let _send_result = init_sender.send(event_iterator_result.is_ok());
event_iterator_result?
};
Expand All @@ -631,17 +642,34 @@ impl Client {
event_iterator: &mut AsyncEventStream,
hash: HashOf<SignedTransaction>,
) -> Result<HashOf<SignedTransaction>> {
let mut block_height = None;

while let Some(event) = event_iterator.next().await {
if let Event::Pipeline(this_event) = event? {
match this_event.status() {
PipelineStatus::Validating => {}
PipelineStatus::Rejected(ref reason) => {
return Err(reason.clone().into());
if let EventBox::Pipeline(this_event) = event? {
match this_event {
PipelineEventBox::Transaction(transaction_event) => {
match transaction_event.status() {
TransactionStatus::Queued => {}
TransactionStatus::Approved => {
block_height = transaction_event.block_height;
}
TransactionStatus::Rejected(reason) => {
return Err((Clone::clone(&**reason)).into());
}
TransactionStatus::Expired => return Err(eyre!("Transaction expired")),
}
}
PipelineEventBox::Block(block_event) => {
if Some(block_event.header().height()) == block_height {
if let BlockStatus::Applied = block_event.status() {
return Ok(hash);
}
}
}
PipelineStatus::Committed => return Ok(hash),
}
}
}

Err(eyre!(
"Connection dropped without `Committed` or `Rejected` event"
))
Expand Down Expand Up @@ -903,11 +931,9 @@ impl Client {
/// - Forwards from [`events_api::EventIterator::new`]
pub fn listen_for_events(
&self,
event_filter: impl Into<EventFilterBox>,
) -> Result<impl Iterator<Item = Result<Event>>> {
let event_filter = event_filter.into();
iroha_logger::trace!(?event_filter);
events_api::EventIterator::new(self.events_handler(event_filter)?)
event_filters: impl IntoIterator<Item = impl Into<EventFilterBox>>,
) -> Result<impl Iterator<Item = Result<EventBox>>> {
events_api::EventIterator::new(self.events_handler(event_filters)?)
}

/// Connect asynchronously (through `WebSocket`) to listen for `Iroha` `pipeline` and `data` events.
Expand All @@ -917,11 +943,9 @@ impl Client {
/// - Forwards from [`events_api::AsyncEventStream::new`]
pub async fn listen_for_events_async(
&self,
event_filter: impl Into<EventFilterBox> + Send,
event_filters: impl IntoIterator<Item = impl Into<EventFilterBox>> + Send,
) -> Result<AsyncEventStream> {
let event_filter = event_filter.into();
iroha_logger::trace!(?event_filter, "Async listening with");
events_api::AsyncEventStream::new(self.events_handler(event_filter)?).await
events_api::AsyncEventStream::new(self.events_handler(event_filters)?).await
}

/// Constructs an Events API handler. With it, you can use any WS client you want.
Expand All @@ -931,10 +955,10 @@ impl Client {
#[inline]
pub fn events_handler(
&self,
event_filter: impl Into<EventFilterBox>,
event_filters: impl IntoIterator<Item = impl Into<EventFilterBox>>,
) -> Result<events_api::flow::Init> {
events_api::flow::Init::new(
event_filter.into(),
event_filters,
self.headers.clone(),
self.torii_url
.join(torii_uri::SUBSCRIPTION)
Expand Down Expand Up @@ -1237,12 +1261,12 @@ pub mod events_api {

/// Initialization struct for Events API flow.
pub struct Init {
/// Event filter
filter: EventFilterBox,
/// HTTP request headers
headers: HashMap<String, String>,
/// TORII URL
url: Url,
/// HTTP request headers
headers: HashMap<String, String>,
/// Event filter
filters: Vec<EventFilterBox>,
}

impl Init {
Expand All @@ -1252,14 +1276,14 @@ pub mod events_api {
/// Fails if [`transform_ws_url`] fails.
#[inline]
pub(in super::super) fn new(
filter: EventFilterBox,
filters: impl IntoIterator<Item = impl Into<EventFilterBox>>,
headers: HashMap<String, String>,
url: Url,
) -> Result<Self> {
Ok(Self {
filter,
headers,
url: transform_ws_url(url)?,
headers,
filters: filters.into_iter().map(Into::into).collect(),
})
}
}
Expand All @@ -1269,12 +1293,12 @@ pub mod events_api {

fn init(self) -> InitData<R, Self::Next> {
let Self {
filter,
headers,
url,
headers,
filters,
} = self;

let msg = EventSubscriptionRequest::new(filter).encode();
let msg = EventSubscriptionRequest::new(filters).encode();
InitData::new(R::new(HttpMethod::GET, url).headers(headers), msg, Events)
}
}
Expand All @@ -1284,7 +1308,7 @@ pub mod events_api {
pub struct Events;

impl FlowEvents for Events {
type Event = crate::data_model::prelude::Event;
type Event = crate::data_model::prelude::EventBox;

fn message(&self, message: Vec<u8>) -> Result<Self::Event> {
let event_socket_message = EventMessage::decode_all(&mut message.as_slice())?;
Expand Down
Loading

0 comments on commit 2dcde0e

Please sign in to comment.