Skip to content

Commit

Permalink
Various fixes from Triton's branch (#38)
Browse files Browse the repository at this point in the history
* Added rust toolchain

* Remove auth (#3)

* Remove auth

Remove need for JWT auth for internal deploy

* Remove auth from server

* Implement optional authentication

IF you don't provide the JWT public key then the server enters into non authenticated mode.

* Update domain and company name for the certs (#4)

* Fixes cert validation (#6)

* Add support for connecting to multiple servers (#7)

Adds support for connecting to multiple servers.

* Fix metrics per host (#9)

* Add support for connecting to multiple servers

Adds support for connecting to multiple servers.

* Added metrics per server

Adds source tag to metrics so metrics are reported per server.

* Formatting

* Added basic health check (#10)

* Added basic health check

* Update server/rpc_server.rs

Co-authored-by: Kirill Fomichev <[email protected]>

---------

Co-authored-by: Kirill Fomichev <[email protected]>

* Add authentication identifier (#11)

* Fix reporting name

* Get auth information from header

Get auth information from header so we can populate in metrics.

* Update server/rpc_server.rs

Co-authored-by: Kirill Fomichev <[email protected]>

* Update server/rpc_server.rs

Co-authored-by: Kirill Fomichev <[email protected]>

---------

Co-authored-by: Kirill Fomichev <[email protected]>

* Fix init metrics (#13)

Fix init metrics so we get 0 value.

* Bump version (#14)

* Grpc reconnect on disconnect (#15)

* retry logic

Signed-off-by: Wilfred Almeida <[email protected]>

* refactor: retry on disconnect

Signed-off-by: Wilfred Almeida <[email protected]>

* debugging

Signed-off-by: Wilfred Almeida <[email protected]>

* mtransaction client debugging 2

Signed-off-by: Wilfred Almeida <[email protected]>

* revert metrics changes

Signed-off-by: Wilfred Almeida <[email protected]>

* refactor(client): code cleanup

Signed-off-by: Wilfred Almeida <[email protected]>

* refactor(client): PR changes

Signed-off-by: Wilfred Almeida <[email protected]>

* Fix return value and log

* Fixed reconnection message

* Added some updates to log messages

* Removed duplicate break

---------

Signed-off-by: Wilfred Almeida <[email protected]>
Co-authored-by: linuskendall <[email protected]>

* Add unlimited retries (#16)

* Add more retries

Add unlimited retries with a delay.

* Version bump

* CLose metric channel if metric receiver is closed

* Moved metrics closed

* Update client/main.rs

Co-authored-by: Kirill Fomichev <[email protected]>

---------

Co-authored-by: Kirill Fomichev <[email protected]>

* feat: sendTransaction aliased function (#18)

Signed-off-by: Wilfred Almeida <[email protected]>

* Grpc hot reload (#19)

* feat: sendTransaction aliased function

Signed-off-by: Wilfred Almeida <[email protected]>

* feat: grpc urls file watcher

Signed-off-by: Wilfred Almeida <[email protected]>

* refactor: file watcher to signals

Signed-off-by: Wilfred Almeida <[email protected]>

* refactor: spawn management

Signed-off-by: Wilfred Almeida <[email protected]>

* refactor: task spawn/kill management

Signed-off-by: Wilfred Almeida <[email protected]>

* refactor: PR review fixes

Signed-off-by: Wilfred Almeida <[email protected]>

---------

Signed-off-by: Wilfred Almeida <[email protected]>

* Enforce preflight (#21)

* feat: sendTransaction aliased function

Signed-off-by: Wilfred Almeida <[email protected]>

* feat: grpc urls file watcher

Signed-off-by: Wilfred Almeida <[email protected]>

* refactor: file watcher to signals

Signed-off-by: Wilfred Almeida <[email protected]>

* refactor: spawn management

Signed-off-by: Wilfred Almeida <[email protected]>

* refactor: task spawn/kill management

Signed-off-by: Wilfred Almeida <[email protected]>

* refactor: PR review fixes

Signed-off-by: Wilfred Almeida <[email protected]>

* feat: enforce skipPreflight true

Signed-off-by: Wilfred Almeida <[email protected]>

---------

Signed-off-by: Wilfred Almeida <[email protected]>

---------

Signed-off-by: Wilfred Almeida <[email protected]>
Co-authored-by: Pablo Fontoura <[email protected]>
Co-authored-by: Kirill Fomichev <[email protected]>
Co-authored-by: Wilfred Almeida <[email protected]>
  • Loading branch information
4 people authored May 15, 2024
1 parent 6787f55 commit d4c9553
Show file tree
Hide file tree
Showing 17 changed files with 495 additions and 87 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,5 @@ certs/*.key
certs/*.req
certs/*.srl
certs/*.pub
.env
.sw?
26 changes: 25 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "mtransaction"
version = "0.1.0"
version = "0.2.2"
edition = "2021"

[[bin]]
Expand Down Expand Up @@ -44,6 +44,8 @@ base64 = "0.13.0"
bincode = "1.3.3"
enum_dispatch = "0.3.8"
memory-stats = "1.1.0"
signal-hook-tokio = { version = "0.3.1", features = ["futures-v0_3"] }
signal-hook = "0.3.17"

[build-dependencies]
tonic-build = { version = "0.8.0", features = ["prost"] }
2 changes: 1 addition & 1 deletion certs/openssl.server.conf
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
extendedKeyUsage = serverAuth
subjectAltName = DNS:*.marinade.finance,DNS:localhost,IP:0.0.0.0,IP:127.0.0.1
subjectAltName = DNS:*.rpcpool.wg,DNS:localhost,IP:0.0.0.0,IP:127.0.0.1
authorityKeyIdentifier = keyid,issuer
basicConstraints = CA:FALSE
23 changes: 16 additions & 7 deletions client/forwarder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,25 @@ enum TransactionForwarder {
Blackhole(BlackholeForwarder),
}

pub struct ForwardedTransaction {
pub source: String,
pub transaction: Transaction,
}

#[enum_dispatch(TransactionForwarder)]
pub trait Forwarder {
fn process(&self, transaction: Transaction) -> ();
fn process(&self, source: String, transaction: Transaction) -> ();
}

struct BlackholeForwarder {}
impl Forwarder for BlackholeForwarder {
fn process(&self, transaction: Transaction) {
metrics::TX_RECEIVED_COUNT.inc();
metrics::TX_FORWARD_SUCCEEDED_COUNT.inc();
fn process(&self, source: String, transaction: Transaction) {
metrics::TX_RECEIVED_COUNT
.with_label_values(&[source.as_str()])
.inc();
metrics::TX_FORWARD_SUCCEEDED_COUNT
.with_label_values(&[source.as_str()])
.inc();
info!(
"Tx {} -> blackhole ({:?})",
transaction.signature, transaction.tpu
Expand All @@ -38,7 +47,7 @@ pub fn spawn_forwarder(
rpc_url: Option<String>,
blackhole: bool,
throttle_parallel: usize,
) -> UnboundedSender<Transaction> {
) -> UnboundedSender<ForwardedTransaction> {
let forwarder = if blackhole {
warn!("Blackholing all transactions!");
TransactionForwarder::Blackhole(BlackholeForwarder {})
Expand All @@ -52,11 +61,11 @@ pub fn spawn_forwarder(
};

let (tx_transactions, mut rx_transactions) =
tokio::sync::mpsc::unbounded_channel::<Transaction>();
tokio::sync::mpsc::unbounded_channel::<ForwardedTransaction>();

tokio::spawn(async move {
while let Some(transaction) = rx_transactions.recv().await {
forwarder.process(transaction);
forwarder.process(transaction.source, transaction.transaction);
}
});

Expand Down
66 changes: 47 additions & 19 deletions client/grpc_client.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
pub mod pb {
tonic::include_proto!("validator");
}
use crate::forwarder::ForwardedTransaction;
use log::{error, info, warn};
use pb::{
m_transaction_client::MTransactionClient, Ping, Pong, RequestMessageEnvelope,
ResponseMessageEnvelope, Transaction,
ResponseMessageEnvelope,
};
use tokio::sync::mpsc::UnboundedSender;
use tokio_stream::StreamExt;
Expand All @@ -23,24 +24,34 @@ fn process_ping(ping: Ping, tx_upstream_transactions: UnboundedSender<RequestMes
}
}

fn process_transaction(transaction: Transaction, tx_transactions: UnboundedSender<Transaction>) {
fn process_transaction(
transaction: ForwardedTransaction,
tx_transactions: UnboundedSender<ForwardedTransaction>,
) {
if let Err(err) = tx_transactions.send(transaction) {
error!("Failed to enqueue tx: {}", err);
}
}

fn process_upstream_message(
source: String,
response: Result<ResponseMessageEnvelope, Status>,
tx_upstream_transactions: UnboundedSender<RequestMessageEnvelope>,
tx_transactions: UnboundedSender<Transaction>,
tx_transactions: UnboundedSender<ForwardedTransaction>,
) {
match response {
Ok(response_message_envelope) => {
if let Some(ping) = response_message_envelope.ping {
process_ping(ping, tx_upstream_transactions);
}
if let Some(transaction) = response_message_envelope.transaction {
process_transaction(transaction, tx_transactions);
process_transaction(
ForwardedTransaction {
source: source,
transaction: transaction,
},
tx_transactions,
);
}
}
Err(err) => {
Expand All @@ -50,10 +61,11 @@ fn process_upstream_message(
}

async fn mtx_stream(
source: String,
client: &mut MTransactionClient<Channel>,
tx_transactions: tokio::sync::mpsc::UnboundedSender<Transaction>,
tx_transactions: tokio::sync::mpsc::UnboundedSender<ForwardedTransaction>,
mut metrics: tokio::sync::mpsc::Receiver<RequestMessageEnvelope>,
) {
) -> std::result::Result<(), Box<dyn std::error::Error>> {
let (tx_upstream_transactions, mut rx_upstream_transactions) =
tokio::sync::mpsc::unbounded_channel::<RequestMessageEnvelope>();
let request_stream = async_stream::stream! {
Expand All @@ -71,24 +83,26 @@ async fn mtx_stream(
if let Some(metrics) = metrics {
info!("Sending metrics: {:?}", metrics);
if let Err(err) = tx_upstream_transactions.send(metrics) {
error!("Failed to enqueue metrics: {}", err);
error!("Failed to enqueue metrics, source: {:?} err {}", source, err);
}
} else {
error!("Stream of metrics dropped!");
error!("Stream of metrics dropped: {:?}", source);
break
}
}

response = response_stream.next() => {
if let Some(response) = response {
process_upstream_message(response, tx_upstream_transactions.clone(), tx_transactions.clone());
process_upstream_message(source.clone(), response, tx_upstream_transactions.clone(), tx_transactions.clone());
} else {
error!("Upstream closed!");
break
metrics.close();
error!("Upstream closed: {:?}", source);
return Err("Upstream closed".into());
}
}
}
}
Ok(())
}

async fn get_tls_config(
Expand All @@ -111,7 +125,6 @@ async fn get_tls_config(
Some(
ClientTlsConfig::new()
.ca_certificate(ca_cert)
.domain_name("localhost")
.identity(client_identity),
)
} else {
Expand All @@ -127,23 +140,38 @@ pub async fn spawn_grpc_client(
tls_grpc_ca_cert: Option<String>,
tls_grpc_client_key: Option<String>,
tls_grpc_client_cert: Option<String>,
tx_transactions: tokio::sync::mpsc::UnboundedSender<Transaction>,
tx_transactions: tokio::sync::mpsc::UnboundedSender<ForwardedTransaction>,
metrics: tokio::sync::mpsc::Receiver<RequestMessageEnvelope>,
) -> std::result::Result<(), Box<dyn std::error::Error>> {
info!("Loading TLS configuration.");
let tls = get_tls_config(tls_grpc_ca_cert, tls_grpc_client_key, tls_grpc_client_cert).await?;

info!("Opening the gRPC channel.");
let domain_name = grpc_url.host();

info!("Opening the gRPC channel: {:?}", grpc_url.host());
let channel = match tls {
Some(tls) => Channel::builder(grpc_url).tls_config(tls)?,
_ => Channel::builder(grpc_url),
Some(tls) => Channel::builder(grpc_url.clone())
.tls_config(tls.domain_name(domain_name.unwrap_or("localhost")))?,
_ => Channel::builder(grpc_url.clone()),
}
.connect()
.await?;

info!("Streaming from gRPC server.");
let grpc_host = grpc_url.host();

info!("Streaming from gRPC server: {:?}", grpc_host);

let mut client = MTransactionClient::new(channel);
mtx_stream(&mut client, tx_transactions, metrics).await;

Ok(())
mtx_stream(
grpc_host.unwrap_or("unknown").to_string(),
&mut client,
tx_transactions,
metrics,
)
.await
.map(|v| {
info!("Stream ended from gRPC server: {:?}, {:?}", grpc_host, v);
v
})
}
Loading

0 comments on commit d4c9553

Please sign in to comment.