From 41f5d503c2ec931ba17506890a7456dab25ba3ed Mon Sep 17 00:00:00 2001 From: GroovieGermanikus Date: Tue, 26 Sep 2023 19:19:05 +0200 Subject: [PATCH] flatten fanout --- .../src/inbound/proxy_listener.rs | 33 +++++++------------ quic-forward-proxy/src/outbound/ng_forward.rs | 18 +++++----- .../src/outbound/tpu_connection_manager.rs | 22 ++++++------- quic-forward-proxy/src/shared/mod.rs | 31 +---------------- 4 files changed, 33 insertions(+), 71 deletions(-) diff --git a/quic-forward-proxy/src/inbound/proxy_listener.rs b/quic-forward-proxy/src/inbound/proxy_listener.rs index edea669d..2137a018 100644 --- a/quic-forward-proxy/src/inbound/proxy_listener.rs +++ b/quic-forward-proxy/src/inbound/proxy_listener.rs @@ -1,6 +1,5 @@ use crate::proxy_request_format::TpuForwardingRequest; use crate::quic_util::connection_stats; -use crate::shared::ForwardPacket; use crate::tls_config_provider_server::ProxyTlsConfigProvider; use crate::tls_self_signed_pair_generator::SelfSignedTlsConfigProvider; use crate::util::FALLBACK_TIMEOUT; @@ -11,6 +10,7 @@ use solana_sdk::packet::PACKET_DATA_SIZE; use std::net::SocketAddr; use std::sync::Arc; use std::time::Duration; +use itertools::Itertools; use tokio::sync::mpsc::Sender; // note: setting this to "1" did not make a difference! @@ -33,7 +33,7 @@ impl ProxyListener { } } - pub async fn listen(&self, forwarder_channel: &Sender) -> anyhow::Result<()> { + pub async fn listen(&self, forwarder_channel: &Sender) -> anyhow::Result<()> { info!( "TPU Quic Proxy server listening on {}", self.proxy_listener_addr @@ -88,7 +88,7 @@ impl ProxyListener { #[tracing::instrument(skip_all, level = "debug")] async fn handle_client_connection( client_conn_handshake: Connecting, - forwarder_channel: Sender, + forwarder_channel: Sender, ) -> anyhow::Result<()> { let client_connection = client_conn_handshake.await.context("handshake")?; @@ -123,8 +123,7 @@ impl ProxyListener { txs.len(), proxy_request.get_tpu_nodes().len(), ); - if forwarder_channel_copy.capacity() < forwarder_channel_copy.max_capacity() - { + if forwarder_channel_copy.capacity() < forwarder_channel_copy.max_capacity() { debug!( "forward channel buffered: {} packets", forwarder_channel_copy.max_capacity() @@ -132,23 +131,13 @@ impl ProxyListener { ); } - for tpu_node in proxy_request.get_tpu_nodes() { - let tpu_address = tpu_node.tpu_socket_addr; - let tpu_identity = tpu_node.identity_tpunode; - forwarder_channel_copy - .send_timeout( - ForwardPacket::new( - txs.clone(), - tpu_address, - tpu_identity, - proxy_request.get_hash(), - ), - FALLBACK_TIMEOUT, - ) - .await - .context("sending internal packet from proxy to forwarder") - .unwrap(); - } + forwarder_channel_copy + .send_timeout(proxy_request, + FALLBACK_TIMEOUT, + ) + .await + .context("sending internal packet from proxy to forwarder") + .unwrap(); }); debug!( diff --git a/quic-forward-proxy/src/outbound/ng_forward.rs b/quic-forward-proxy/src/outbound/ng_forward.rs index 050d00a8..839d8b47 100644 --- a/quic-forward-proxy/src/outbound/ng_forward.rs +++ b/quic-forward-proxy/src/outbound/ng_forward.rs @@ -2,7 +2,6 @@ use crate::outbound::debouncer::Debouncer; use crate::outbound::sharder::Sharder; use crate::quic_util::SkipServerVerification; use crate::quinn_auto_reconnect::AutoReconnect; -use crate::shared::{ForwardPacket}; use crate::util::timeout_fallback; use crate::validator_identity::ValidatorIdentity; use anyhow::{bail, Context}; @@ -31,6 +30,7 @@ use solana_lite_rpc_core::stores::data_cache::DataCache; use solana_lite_rpc_core::structures::identity_stakes::IdentityStakesData; use solana_lite_rpc_core::structures::transaction_sent_info::SentTransactionInfo; use crate::outbound::tpu_connection_manager::{ProxiedTransaction, TpuConnectionManager}; +use crate::proxy_request_format::TpuForwardingRequest; // TODO @@ -48,9 +48,10 @@ const QUIC_CONNECTION_PARAMS: QuicConnectionParameters = QuicConnectionParameter }; + pub async fn ng_forwarder( validator_identity: ValidatorIdentity, - mut transaction_channel: Receiver, + mut transaction_channel: Receiver, exit_signal: Arc, ) -> anyhow::Result<()> { @@ -82,7 +83,6 @@ pub async fn ng_forwarder( let mut connections_to_keep: HashMap = HashMap::new(); - loop { if exit_signal.load(Ordering::Relaxed) { bail!("exit signal received"); @@ -93,11 +93,11 @@ pub async fn ng_forwarder( .recv() .await .expect("channel closed unexpectedly"); - let tpu_address = forward_packet.tpu_address; - let tpu_identity = forward_packet.tpu_identity; - // TODO optimize move into tpu_connection_manager and implement shutdown based on not used - connections_to_keep.insert(tpu_identity, tpu_address); + for tpu_node in forward_packet.get_tpu_nodes() { + // TODO optimize move into tpu_connection_manager and implement shutdown based on not used + connections_to_keep.insert(tpu_node.identity_tpunode, tpu_node.tpu_socket_addr); + } tpu_connection_manager .update_connections( @@ -111,7 +111,9 @@ pub async fn ng_forwarder( tpu_connection_manager.cleanup_unused_connections(&connections_to_keep).await; - for raw_tx in forward_packet.transactions { + info!("broadcast {}", broadcast_sender.receiver_count()); + + for raw_tx in forward_packet.get_transaction_bytes() { let transaction = ProxiedTransaction { transaction: raw_tx, }; diff --git a/quic-forward-proxy/src/outbound/tpu_connection_manager.rs b/quic-forward-proxy/src/outbound/tpu_connection_manager.rs index c4302cf9..186b2399 100644 --- a/quic-forward-proxy/src/outbound/tpu_connection_manager.rs +++ b/quic-forward-proxy/src/outbound/tpu_connection_manager.rs @@ -1,5 +1,5 @@ use dashmap::DashMap; -use log::{error, trace}; +use log::{error, info, trace}; use prometheus::{core::GenericGauge, opts, register_int_gauge}; use quinn::Endpoint; use solana_lite_rpc_core::{ @@ -36,7 +36,7 @@ lazy_static::lazy_static! { pub type WireTransaction = Vec; -#[derive(Clone, Debug, PartialEq, PartialOrd)] +#[derive(Debug, Clone)] pub struct ProxiedTransaction { // pub signature: String, pub transaction: WireTransaction, @@ -205,8 +205,8 @@ impl TpuConnectionManager { } } - // #[tracing::instrument(skip_all, level = "warn")] - // update_connections: solana_lite_rpc_quic_forward_proxy::outbound::tpu_connection_manager: close time.busy=565µs time.idle=666ns + #[tracing::instrument(skip_all, level = "warn")] + // update_connections: solana_lite_rpc_quic_forward_proxy::outbound::tpu_connection_manager: close time.busy=10.6µs time.idle=4.00µs pub async fn update_connections( &self, broadcast_sender: Arc>, @@ -216,15 +216,15 @@ impl TpuConnectionManager { connection_parameters: QuicConnectionParameters, ) { NB_CONNECTIONS_TO_KEEP.set(connections_to_keep.len() as i64); - for (identity, socket_addr) in connections_to_keep { - if self.identity_to_active_connection.get(identity).is_some() { + for (tpu_identity, tpu_addr) in connections_to_keep { + if self.identity_to_active_connection.get(tpu_identity).is_some() { continue; } - trace!("added a connection for {}, {}", identity, socket_addr); + trace!("added a connection for {}, {}", tpu_identity, tpu_addr); let active_connection = ActiveConnection::new( self.endpoints.clone(), - *socket_addr, - *identity, + *tpu_addr, + *tpu_identity, data_cache.clone(), connection_parameters, ); @@ -235,7 +235,7 @@ impl TpuConnectionManager { active_connection.start_listening(broadcast_receiver, rx, identity_stakes); self.identity_to_active_connection.insert( - *identity, + *tpu_identity, Arc::new(ActiveConnectionWithExitChannel { active_connection, exit_stream: sx, @@ -245,7 +245,7 @@ impl TpuConnectionManager { } - + #[tracing::instrument(skip_all, level = "warn")] pub async fn cleanup_unused_connections( &self, connections_to_keep: &HashMap, diff --git a/quic-forward-proxy/src/shared/mod.rs b/quic-forward-proxy/src/shared/mod.rs index f9ce25d3..bb408fc6 100644 --- a/quic-forward-proxy/src/shared/mod.rs +++ b/quic-forward-proxy/src/shared/mod.rs @@ -1,33 +1,4 @@ use std::net::SocketAddr; use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::Signature; - - -#[derive(Debug)] -pub struct TxRawData { - pub signature: String, - pub transaction: Vec, -} - -/// internal structure with transactions and target TPU -#[derive(Debug)] -pub struct ForwardPacket { - pub transactions: Vec>, - pub tpu_address: SocketAddr, - pub tpu_identity: Pubkey, - pub shard_hash: u64, -} - -impl ForwardPacket { - pub fn new(transactions: Vec>, tpu_address: SocketAddr, - tpu_identity: Pubkey, - hash: u64) -> Self { - assert!(!transactions.is_empty(), "no transactions"); - Self { - transactions, - tpu_address, - tpu_identity, - shard_hash: hash, - } - } -} +use crate::proxy_request_format::TpuNode;