diff --git a/quic-forward-proxy/src/outbound/tx_forward.rs b/quic-forward-proxy/src/outbound/tx_forward.rs index a71eb7bf..8525d627 100644 --- a/quic-forward-proxy/src/outbound/tx_forward.rs +++ b/quic-forward-proxy/src/outbound/tx_forward.rs @@ -34,7 +34,7 @@ const MAX_PARALLEL_STREAMS: usize = 6; pub const PARALLEL_TPU_CONNECTION_COUNT: usize = 4; const AGENT_SHUTDOWN_IDLE: Duration = Duration::from_millis(2500); // ms; should be 4x400ms+buffer -const QUIC_CONNECTION_PARAMS: QuicConnectionParameters = QuicConnectionParameters { +pub const QUIC_CONNECTION_PARAMS: QuicConnectionParameters = QuicConnectionParameters { connection_timeout: Duration::from_secs(2), connection_retry_count: 10, finalize_timeout: Duration::from_secs(2), @@ -135,7 +135,7 @@ pub async fn tx_forwarder( for raw_tx in &forward_packet.transactions { - // TODO add to ForwardPacket + // TODO remove - duplicate to .convert() let tx = bincode::deserialize::(&raw_tx).unwrap(); let tsi = SentTransactionInfo { diff --git a/quic-forward-proxy/src/outbound/tx_sender.rs b/quic-forward-proxy/src/outbound/tx_sender.rs index cbfca107..60bab10d 100644 --- a/quic-forward-proxy/src/outbound/tx_sender.rs +++ b/quic-forward-proxy/src/outbound/tx_sender.rs @@ -1,14 +1,21 @@ +use std::collections::HashMap; +use std::net::{IpAddr, Ipv4Addr, SocketAddr}; +use std::sync::Arc; use std::time::{Duration, Instant}; use anyhow::bail; use chrono::Utc; -use log::{trace, warn}; +use log::{info, trace, warn}; use prometheus::{ core::GenericGauge, histogram_opts, opts, register_histogram, register_int_counter, register_int_gauge, Histogram, IntCounter, }; -use tokio::sync::mpsc::Receiver; +use solana_sdk::pubkey::Pubkey; +use solana_sdk::transaction::VersionedTransaction; +use solana_streamer::nonblocking::quic::ConnectionPeerType; +use solana_streamer::tls_certificates::new_self_signed_tls_certificate; +use tokio::sync::mpsc::{Receiver, Sender}; // use crate::tpu_utils::tpu_service::TpuService; use solana_lite_rpc_core::{ @@ -19,6 +26,12 @@ use solana_lite_rpc_core::{ }, AnyhowJoinHandle, }; +use solana_lite_rpc_core::solana_utils::SerializableTransaction; +use solana_lite_rpc_core::structures::identity_stakes::IdentityStakesData; +use solana_lite_rpc_services::tpu_utils::tpu_connection_manager::TpuConnectionManager; +use crate::outbound::tx_forward::QUIC_CONNECTION_PARAMS; +use crate::shared::ForwardPacket; +use crate::validator_identity::ValidatorIdentity; lazy_static::lazy_static! { static ref TXS_SENT: IntCounter = @@ -42,13 +55,99 @@ const INTERVAL_PER_BATCH_IN_MS: u64 = 50; const MAX_BATCH_SIZE_IN_PER_INTERVAL: usize = 2000; +// adapter for forward_packets to TransactionSentInfo +pub fn adapter_packets_to_tsi(forwarder_channel2: Sender, + mut forward_channel: Receiver, + broadcast_sender: Arc>) -> AnyhowJoinHandle { + + tokio::spawn(async move { + + let fanout_slots = 4; + + // TODO pass in + let validator_identity = ValidatorIdentity::new(None); + + + let (certificate, key) = new_self_signed_tls_certificate( + &validator_identity.get_keypair_for_tls(), + IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), + ) + .expect("Failed to initialize QUIC connection certificates"); + + // TODO move somewhere else + // TODO make copy of TpuConnectionManager in proxy crate an strip unused features + let tpu_connection_manager = + TpuConnectionManager::new(certificate, key, fanout_slots as usize).await; + // TODO implement cleanup + let mut connections_to_keep: HashMap = HashMap::new(); + + // TODO remove + let identity_stakes = IdentityStakesData { + peer_type: ConnectionPeerType::Staked, + stakes: 30, + min_stakes: 0, + max_stakes: 40, + total_stakes: 100, + }; + + + loop { + let forward_packet = forward_channel.recv().await.unwrap(); + + for raw_tx in &forward_packet.transactions { + + // TODO remove - duplicate to .convert() + let tx = bincode::deserialize::(&raw_tx).unwrap(); + + let tsi = SentTransactionInfo { + signature: tx.get_signature().to_string(), + slot: 4242, + transaction: raw_tx.clone(), + last_valid_block_height: 999, + }; + + + // configure ActiveConnections + { + + connections_to_keep.insert(forward_packet.tpu_identity, forward_packet.tpu_address); + + + tpu_connection_manager + .update_connections( + broadcast_sender.clone(), + &connections_to_keep, + identity_stakes, + DataCache::new_for_tests(), + QUIC_CONNECTION_PARAMS, // TODO improve + ) + .await; + + info!("connections_to_keep: {}", connections_to_keep.len()); + } + + info!("!!!converted "); + // send_transaction + forwarder_channel2.send(tsi).await.unwrap(); + + } + } + + }) +} + /// retry and confirm transactions every 2ms (avg time to confirm tx) pub fn execute( // self, mut recv: Receiver, - notifier: Option, + broadcast_sender: Arc>, ) -> AnyhowJoinHandle { tokio::spawn(async move { + + + + + loop { let mut transaction_infos = Vec::with_capacity(MAX_BATCH_SIZE_IN_PER_INTERVAL); let mut timeout_interval = INTERVAL_PER_BATCH_IN_MS; @@ -95,7 +194,8 @@ pub fn execute( TX_BATCH_SIZES.set(transaction_infos.len() as i64); - forward_txs(transaction_infos).await; + info!("!!!forward_txs {}", transaction_infos.len()); + forward_txs(transaction_infos, broadcast_sender.clone()).await; } }) } @@ -103,6 +203,7 @@ pub fn execute( async fn forward_txs( // &self, transaction_infos: Vec, + broadcast_sender: Arc>, // notifier: Option, ) { if transaction_infos.is_empty() { @@ -136,7 +237,7 @@ async fn forward_txs( // TxProps::new(transaction_info.last_valid_block_height), // ); - let quic_response = match send_transaction(transaction_info) { + let quic_response = match broadcast_sender.send(transaction_info.clone()) { Ok(_) => { TXS_SENT.inc_by(1); 1 @@ -176,7 +277,7 @@ async fn forward_txs( } // was tpu_client.send_transaction -pub fn send_transaction(transaction: &SentTransactionInfo) -> anyhow::Result<()> { +pub fn send_transaction_sdf(transaction: &SentTransactionInfo) -> anyhow::Result<()> { // self.broadcast_sender.send(transaction.clone())?; Ok(()) } diff --git a/quic-forward-proxy/src/proxy.rs b/quic-forward-proxy/src/proxy.rs index de974fe8..840bb1dd 100644 --- a/quic-forward-proxy/src/proxy.rs +++ b/quic-forward-proxy/src/proxy.rs @@ -10,6 +10,10 @@ use crate::tls_self_signed_pair_generator::SelfSignedTlsConfigProvider; use crate::util::AnyhowJoinHandle; use crate::validator_identity::ValidatorIdentity; use log::info; +use tokio::sync::mpsc; +use solana_lite_rpc_core::structures::transaction_sent_info::SentTransactionInfo; +use crate::outbound::tx_sender::{execute, adapter_packets_to_tsi}; +use crate::shared::ForwardPacket; pub struct QuicForwardProxy { // endpoint: Endpoint, @@ -36,31 +40,47 @@ impl QuicForwardProxy { pub async fn start_services(self) -> anyhow::Result<()> { let exit_signal = Arc::new(AtomicBool::new(false)); - let (forwarder_channel, forward_receiver) = tokio::sync::mpsc::channel(1000); + // let (transaction_channel, tx_recv) = mpsc::channel(self.max_nb_txs_in_queue); + let (transaction_channel, forwarder_channel) = mpsc::channel::(1000); + let (forwarder_channel2, tx_recv) = mpsc::channel::(1000); let proxy_listener = proxy_listener::ProxyListener::new(self.proxy_listener_addr, self.tls_config); let quic_proxy = tokio::spawn(async move { proxy_listener - .listen(&forwarder_channel) + .listen(&transaction_channel) .await .expect("proxy listen service"); }); let validator_identity = self.validator_identity.clone(); let exit_signal_clone = exit_signal.clone(); - let forwarder: AnyhowJoinHandle = tokio::spawn(tx_forwarder( - validator_identity, - forward_receiver, - exit_signal_clone, - )); + + + // let forwarder: AnyhowJoinHandle = tokio::spawn(tx_forwarder( + // validator_identity, + // forward_receiver, + // exit_signal_clone, + // )); + + // broadcast to active connections + let (sender_active_connections, _) = + tokio::sync::broadcast::channel::(1000); + let broadcast_sender = Arc::new(sender_active_connections); + + let adapter = adapter_packets_to_tsi(forwarder_channel2, forwarder_channel, broadcast_sender.clone()); + + let tx_sender_jh = execute(tx_recv, broadcast_sender.clone()); tokio::select! { res = quic_proxy => { bail!("TPU Quic Proxy server exited unexpectedly {res:?}"); }, - res = forwarder => { + res = adapter => { + bail!("TPU Quic packet adapter exited unexpectedly {res:?}"); + }, + res = tx_sender_jh => { bail!("TPU Quic Tx forwarder exited unexpectedly {res:?}"); }, }