From 46977ac8f59b62ae0eee76eba4ce7baf4d78d372 Mon Sep 17 00:00:00 2001 From: nanocryk <6422796+nanocryk@users.noreply.github.com> Date: Fri, 15 Mar 2024 17:10:34 +0100 Subject: [PATCH] handle subscriptions --- Cargo.lock | 61 +++--- Cargo.toml | 3 +- .../Cargo.toml | 5 +- .../src/lib.rs | 26 ++- .../src/ws_client.rs | 182 +++++++++++++++++- 5 files changed, 238 insertions(+), 39 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d67ca7d..7b948c4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1879,6 +1879,36 @@ dependencies = [ "thiserror", ] +[[package]] +name = "dc-orchestrator-chain-rpc-interface" +version = "0.1.0" +dependencies = [ + "async-io 1.13.0", + "async-trait", + "dc-orchestrator-chain-interface", + "dp-core", + "futures", + "jsonrpsee", + "parity-scale-codec", + "polkadot-overseer", + "sc-client-api", + "sc-rpc-api", + "sc-service", + "schnellru", + "serde", + "serde_json", + "sp-api", + "sp-blockchain", + "sp-core", + "sp-state-machine", + "sp-storage 13.0.0", + "thiserror", + "tokio", + "tokio-stream", + "tracing", + "url", +] + [[package]] name = "der" version = "0.7.8" @@ -5111,33 +5141,6 @@ dependencies = [ "syn 1.0.109", ] -[[package]] -name = "orchestrator-chain-rpc-interface" -version = "0.1.0" -dependencies = [ - "async-io 1.13.0", - "async-trait", - "dc-orchestrator-chain-interface", - "futures", - "jsonrpsee", - "parity-scale-codec", - "polkadot-overseer", - "sc-client-api", - "sc-rpc-api", - "sc-service", - "serde", - "serde_json", - "sp-api", - "sp-blockchain", - "sp-core", - "sp-state-machine", - "sp-storage 13.0.0", - "thiserror", - "tokio", - "tracing", - "url", -] - [[package]] name = "ordered-float" version = "1.1.1" @@ -9492,9 +9495,9 @@ dependencies = [ [[package]] name = "tokio-stream" -version = "0.1.14" +version = "0.1.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "397c988d37662c7dda6d2208364a706264bf3d6138b11d436cbac0ad38832842" +checksum = "267ac89e0bec6e691e5813911606935d77c476ff49024f98abcea3e7b15e37af" dependencies = [ "futures-core", "pin-project-lite 0.2.13", diff --git a/Cargo.toml b/Cargo.toml index 3cad777..9a1293f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -92,8 +92,10 @@ async-trait = "0.1" futures = { version = "0.3.1" } hex = { version = "0.4.3", default-features = false } jsonrpsee = { version = "0.16.2" } +schnellru = "0.2.1" thiserror = { version = "1.0.40" } tokio = { version = "1.32.0", default-features = false } +tokio-stream = "0.1.15" tracing = { version = "0.1.37", default-features = false } url = "2.2.2" @@ -102,7 +104,6 @@ codegen-units = 1 inherits = "release" lto = true - [profile.release] opt-level = 3 panic = "unwind" diff --git a/client/orchestrator-chain-rpc-interface/Cargo.toml b/client/orchestrator-chain-rpc-interface/Cargo.toml index 03d5161..04a7069 100644 --- a/client/orchestrator-chain-rpc-interface/Cargo.toml +++ b/client/orchestrator-chain-rpc-interface/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "orchestrator-chain-rpc-interface" +name = "dc-orchestrator-chain-rpc-interface" authors = { workspace = true } edition = "2021" license = "GPL-3.0-only" @@ -10,15 +10,18 @@ async-io = { workspace = true } async-trait = { workspace = true } futures = { workspace = true } jsonrpsee = { workspace = true, features = [ "ws-client" ] } +schnellru = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true, features = [ "sync" ] } +tokio-stream = { workspace = true } tracing = { workspace = true } url = { workspace = true } # Dancekit dc-orchestrator-chain-interface = { workspace = true } +dp-core = { workspace = true } # Substrate parity-scale-codec = { workspace = true } diff --git a/client/orchestrator-chain-rpc-interface/src/lib.rs b/client/orchestrator-chain-rpc-interface/src/lib.rs index 267710b..12b686f 100644 --- a/client/orchestrator-chain-rpc-interface/src/lib.rs +++ b/client/orchestrator-chain-rpc-interface/src/lib.rs @@ -22,7 +22,7 @@ use { dc_orchestrator_chain_interface::{ OrchestratorChainError, OrchestratorChainInterface, OrchestratorChainResult, PHash, PHeader, }, - futures::Stream, + futures::{Stream, StreamExt}, jsonrpsee::{core::params::ArrayParams, rpc_params}, sc_client_api::{StorageData, StorageProof}, sc_rpc_api::state::ReadProof, @@ -37,6 +37,7 @@ use { }; const LOG_TARGET: &str = "orchestrator-rpc-client"; +const NOTIFICATION_CHANNEL_SIZE_LIMIT: usize = 20; /// Format url and force addition of a port fn url_to_string_with_port(url: Url) -> Option { @@ -138,6 +139,17 @@ impl OrchestratorChainRpcClient { ).await } + fn send_register_message( + &self, + message_builder: impl FnOnce(mpsc::Sender) -> WsClientRequest, + ) -> OrchestratorChainResult> { + let (tx, rx) = mpsc::channel(NOTIFICATION_CHANNEL_SIZE_LIMIT); + self.request_sender + .try_send(message_builder(tx)) + .map_err(|e| OrchestratorChainError::WorkerCommunicationError(e.to_string()))?; + Ok(rx) + } + /// Send a request to the RPC worker and awaits for a response. The worker is responsible /// for retrying requests if connection dies. async fn request_tracing<'a, R, OR>( @@ -247,20 +259,26 @@ impl OrchestratorChainInterface for OrchestratorChainRpcClient { async fn import_notification_stream( &self, ) -> OrchestratorChainResult + Send>>> { - todo!() + let rx = self.send_register_message(WsClientRequest::RegisterImportListener)?; + let stream = tokio_stream::wrappers::ReceiverStream::new(rx); + Ok(stream.boxed()) } /// Get a stream of new best block notifications. async fn new_best_notification_stream( &self, ) -> OrchestratorChainResult + Send>>> { - todo!() + let rx = self.send_register_message(WsClientRequest::RegisterBestHeadListener)?; + let stream = tokio_stream::wrappers::ReceiverStream::new(rx); + Ok(stream.boxed()) } /// Get a stream of finality notifications. async fn finality_notification_stream( &self, ) -> OrchestratorChainResult + Send>>> { - todo!() + let rx = self.send_register_message(WsClientRequest::RegisterFinalizationListener)?; + let stream = tokio_stream::wrappers::ReceiverStream::new(rx); + Ok(stream.boxed()) } } diff --git a/client/orchestrator-chain-rpc-interface/src/ws_client.rs b/client/orchestrator-chain-rpc-interface/src/ws_client.rs index ac742d7..c2c1105 100644 --- a/client/orchestrator-chain-rpc-interface/src/ws_client.rs +++ b/client/orchestrator-chain-rpc-interface/src/ws_client.rs @@ -22,12 +22,14 @@ use { }, jsonrpsee::{ core::{ - client::{Client as JsonRpcClient, ClientT as _}, + client::{Client as JsonRpcClient, ClientT as _, Subscription}, params::ArrayParams, Error as JsonRpseeError, JsonValue, }, ws_client::WsClientBuilder, }, + sc_rpc_api::chain::ChainApiClient, + schnellru::{ByLength, LruMap}, std::sync::Arc, tokio::sync::{mpsc, oneshot}, }; @@ -45,7 +47,9 @@ pub struct JsonRpcRequest { pub enum WsClientRequest { JsonRpcRequest(JsonRpcRequest), - // TODO: Add subscriptions once interface needs it. + RegisterBestHeadListener(mpsc::Sender), + RegisterImportListener(mpsc::Sender), + RegisterFinalizationListener(mpsc::Sender), } enum ConnectionStatus { @@ -68,6 +72,16 @@ pub struct ReconnectingWsClientWorker { active_index: usize, request_receiver: mpsc::Receiver, + + imported_header_listeners: Vec>, + finalized_header_listeners: Vec>, + best_header_listeners: Vec>, +} + +struct OrchestratorSubscription { + import_subscription: Subscription, + finalized_subscription: Subscription, + best_subscription: Subscription, } /// Connects to a ws server by cycle throught all provided urls from the starting position until @@ -118,6 +132,9 @@ impl ReconnectingWsClientWorker { active_client, active_index, request_receiver, + best_header_listeners: vec![], + imported_header_listeners: vec![], + finalized_header_listeners: vec![], }, request_sender, )) @@ -170,6 +187,62 @@ impl ReconnectingWsClientWorker { .boxed() } + async fn get_subscriptions(&self) -> Result { + let import_subscription = >::subscribe_all_heads(&self.active_client) + .await + .map_err(|e| { + tracing::error!( + target: LOG_TARGET, + ?e, + "Unable to open `chain_subscribeAllHeads` subscription." + ); + e + })?; + + let best_subscription = >::subscribe_new_heads(&self.active_client) + .await + .map_err(|e| { + tracing::error!( + target: LOG_TARGET, + ?e, + "Unable to open `chain_subscribeNewHeads` subscription." + ); + e + })?; + + let finalized_subscription = >::subscribe_finalized_heads(&self.active_client) + .await + .map_err(|e| { + tracing::error!( + target: LOG_TARGET, + ?e, + "Unable to open `chain_subscribeFinalizedHeads` subscription." + ); + e + })?; + + Ok(OrchestratorSubscription { + import_subscription, + best_subscription, + finalized_subscription, + }) + } + /// Handle a reconnection by fnding a new RPC server and sending all pending requests. async fn handle_reconnect( &mut self, @@ -199,7 +272,10 @@ impl ReconnectingWsClientWorker { pending_requests.push(self.send_request(req)); } - // TODO: Add subscriptions once interface needs it. + // Get subscriptions from new endpoint. + self.get_subscriptions().await.map_err(|e| { + format!("Not able to create streams from newly connected RPC server, shutting down. err: {:?}", e) + })?; Ok(()) } @@ -208,6 +284,14 @@ impl ReconnectingWsClientWorker { let mut pending_requests = FuturesUnordered::new(); let mut connection_status = ConnectionStatus::Connected; + let Ok(mut subscriptions) = self.get_subscriptions().await else { + tracing::error!(target: LOG_TARGET, "Unable to fetch subscriptions on initial connection."); + return; + }; + + let mut imported_blocks_cache = LruMap::new(ByLength::new(40)); + let mut last_seen_finalized_num: dp_core::BlockNumber = 0; + loop { // Handle reconnection. if let ConnectionStatus::Disconnected { failed_request } = connection_status { @@ -232,6 +316,15 @@ impl ReconnectingWsClientWorker { Some(WsClientRequest::JsonRpcRequest(req)) => { pending_requests.push(self.send_request(req)); }, + Some(WsClientRequest::RegisterBestHeadListener(tx)) => { + self.best_header_listeners.push(tx); + }, + Some(WsClientRequest::RegisterImportListener(tx)) => { + self.imported_header_listeners.push(tx); + }, + Some(WsClientRequest::RegisterFinalizationListener(tx)) => { + self.finalized_header_listeners.push(tx); + }, None => { tracing::error!(target: LOG_TARGET, "RPC client receiver closed. Stopping RPC Worker."); return; @@ -244,8 +337,89 @@ impl ReconnectingWsClientWorker { connection_status = ConnectionStatus::Disconnected { failed_request: Some(req) }; } }, - // TODO: Add subscriptions once interface needs it. + import_event = subscriptions.import_subscription.next() => { + match import_event { + Some(Ok(header)) => { + let hash = header.hash(); + if imported_blocks_cache.peek(&hash).is_some() { + tracing::debug!( + target: LOG_TARGET, + number = header.number, + ?hash, + "Duplicate imported block header. This might happen after switching to a new RPC node. Skipping distribution." + ); + continue; + } + imported_blocks_cache.insert(hash, ()); + distribute(header, &mut self.imported_header_listeners); + }, + None => { + tracing::error!(target: LOG_TARGET, "Subscription closed."); + connection_status = ConnectionStatus::Disconnected { failed_request: None}; + }, + Some(Err(error)) => { + tracing::error!(target: LOG_TARGET, ?error, "Error in RPC subscription."); + connection_status = ConnectionStatus::Disconnected { failed_request: None}; + }, + } + }, + best_header_event = subscriptions.best_subscription.next() => { + match best_header_event { + Some(Ok(header)) => distribute(header, &mut self.best_header_listeners), + None => { + tracing::error!(target: LOG_TARGET, "Subscription closed."); + connection_status = ConnectionStatus::Disconnected { failed_request: None}; + }, + Some(Err(error)) => { + tracing::error!(target: LOG_TARGET, ?error, "Error in RPC subscription."); + connection_status = ConnectionStatus::Disconnected { failed_request: None}; + }, + } + } + finalized_event = subscriptions.finalized_subscription.next() => { + match finalized_event { + Some(Ok(header)) if header.number > last_seen_finalized_num => { + last_seen_finalized_num = header.number; + distribute(header, &mut self.finalized_header_listeners); + }, + Some(Ok(header)) => { + tracing::debug!( + target: LOG_TARGET, + number = header.number, + last_seen_finalized_num, + "Duplicate finalized block header. This might happen after switching to a new RPC node. Skipping distribution." + ); + }, + None => { + tracing::error!(target: LOG_TARGET, "Subscription closed."); + connection_status = ConnectionStatus::Disconnected { failed_request: None}; + }, + Some(Err(error)) => { + tracing::error!(target: LOG_TARGET, ?error, "Error in RPC subscription."); + connection_status = ConnectionStatus::Disconnected { failed_request: None}; + }, + } + } } } } } + +/// Send `value` through all channels contained in `senders`. +/// If no one is listening to the sender, it is removed from the vector. +pub fn distribute(value: T, senders: &mut Vec>) { + senders.retain_mut(|e| { + match e.try_send(value.clone()) { + // Receiver has been dropped, remove Sender from list. + Err(mpsc::error::TrySendError::Closed(_)) => false, + // Channel is full. This should not happen. + // TODO: Improve error handling here + // https://github.com/paritytech/cumulus/issues/1482 + Err(error) => { + tracing::error!(target: LOG_TARGET, ?error, "Event distribution channel has reached its limit. This can lead to missed notifications."); + true + }, + _ => true, + } + }); +}