diff --git a/.github/pull_request_template.md b/.github/pull_request_template.md new file mode 100644 index 0000000..0770125 --- /dev/null +++ b/.github/pull_request_template.md @@ -0,0 +1,7 @@ +## Overview + +- Summary of changes + +## Testing + +- Testing performed to validate the changes diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml new file mode 100644 index 0000000..9da3a1d --- /dev/null +++ b/.github/workflows/ci.yaml @@ -0,0 +1,54 @@ +name: CI + +on: + pull_request: + branches: [main] + +jobs: + + changes: + runs-on: ubuntu-latest + outputs: + fee-estimator: ${{ steps.filter.outputs.fee-estimator}} + steps: + - uses: actions/checkout@v3 + - uses: dorny/paths-filter@v2.2.1 + id: filter + with: + filters: | + fee-estimator: + - '*/**' + + fee-estimator-tests: + needs: changes + if: ${{ needs.changes.outputs.fee-estimator == 'true' }} + runs-on: ubuntu-latest + env: + RUST_TOOLCHAIN: stable + CARGO_INCREMENTAL: 0 + SCCACHE_GHA_ENABLED: 'true' + RUSTC_WRAPPER: 'sccache' + ENV: 'test' + steps: + - uses: actions/checkout@v4 + - name: Cleanup Apt + run: sudo find /var/lib/apt/lists/ -type 'f' -name 'archive*' -delete && sudo apt-get update + - uses: rui314/setup-mold@v1 + with: + mold-version: 1.1.1 + make-default: true + - name: Ensure we use Mold + run: sudo ln -s /usr/local/bin/ld.mold /usr/bin/ld.lld + - name: Install rust + caching + uses: actions-rust-lang/setup-rust-toolchain@v1 + with: + rustflags: '-C link-arg=-fuse-ld=lld' + components: rustfmt, clippy + - name: Set up sccache + uses: mozilla-actions/sccache-action@v0.0.3 + - name: Clear sccache stats + run: sccache --zero-stats + - name: Run Tests + run: | + cargo test -- --nocapture + diff --git a/.gitignore b/.gitignore index 575a1af..6969b2c 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,72 @@ -.vscode/ -target/ \ No newline at end of file +# Javascript/Typescript +node_modules/ +dist/ +build/ +coverage/ +vendor/ + +# Node Logs +logs +*.log +npm-debug.log* +AWSCLIV2.pkg + +# Mac +**/.DS_Store + +# IDEs +**/.vscode +**/.idea/ + +# Services +ts-services/parser-lambda/parser/test/data/local/* +ts-services/parser-lambda/parser/test/output/* +ts-services/parser-lambda/parser/playground.ts + +# Secrets +ts-services/dev-api-lambda/crypto-billing/*_secrets.ts +infra/ansible/vars/**/*-secrets.yaml +infra/ansible/vars/**/*-secrets.yml +infra/ansible/photon/vars/*-secrets.yaml +infra/ansible/photon/vars/*-secrets.yml +infra/ansible/rpc/templates/bt-creds*.json + +# Rust +**/target/ + +# CPU profiles +*.cpuprofile + +# Built helm charts +infra/**/*.tgz + +.gradle +.m2 +bin + +# Build output directies +/target +*/target +/build +*/build + +# Local Redis +dump.rdb + +infra/ansible/roles/Datadog.datadog +infra/ansible/photon/roles/datadog.datadog + + +scripts/data + +# Jupyter notebooks +*.ipynb + +# Personal Playground +personal_playground + + +# Used for local deployments of RPC operator +/rpc-operator/ +**/systest/*.json +**/router-benchmarks/*.json diff --git a/Cargo.lock b/Cargo.lock index fd8bd7c..e77bee0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2133,9 +2133,9 @@ dependencies = [ [[package]] name = "jsonrpsee" -version = "0.20.3" +version = "0.22.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "affdc52f7596ccb2d7645231fc6163bb314630c989b64998f3699a28b4d5d4dc" +checksum = "cfdb12a2381ea5b2e68c3469ec604a007b367778cdb14d09612c8069ebd616ad" dependencies = [ "jsonrpsee-core", "jsonrpsee-http-client", @@ -2148,9 +2148,9 @@ dependencies = [ [[package]] name = "jsonrpsee-core" -version = "0.20.3" +version = "0.22.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da2327ba8df2fdbd5e897e2b5ed25ce7f299d345b9736b6828814c3dbd1fd47b" +checksum = "b4b257e1ec385e07b0255dde0b933f948b5c8b8c28d42afda9587c3a967b896d" dependencies = [ "anyhow", "async-trait", @@ -2163,7 +2163,6 @@ dependencies = [ "rustc-hash", "serde", "serde_json", - "soketto", "thiserror", "tokio", "tracing", @@ -2171,9 +2170,9 @@ dependencies = [ [[package]] name = "jsonrpsee-http-client" -version = "0.20.3" +version = "0.22.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f80c17f62c7653ce767e3d7288b793dfec920f97067ceb189ebdd3570f2bc20" +checksum = "1ccf93fc4a0bfe05d851d37d7c32b7f370fe94336b52a2f0efc5f1981895c2e5" dependencies = [ "async-trait", "hyper", @@ -2191,28 +2190,29 @@ dependencies = [ [[package]] name = "jsonrpsee-proc-macros" -version = "0.20.3" +version = "0.22.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "29110019693a4fa2dbda04876499d098fa16d70eba06b1e6e2b3f1b251419515" +checksum = "7d0bb047e79a143b32ea03974a6bf59b62c2a4c5f5d42a381c907a8bbb3f75c0" dependencies = [ "heck 0.4.1", - "proc-macro-crate 1.3.1", + "proc-macro-crate 3.1.0", "proc-macro2", "quote", - "syn 1.0.109", + "syn 2.0.63", ] [[package]] name = "jsonrpsee-server" -version = "0.20.3" +version = "0.22.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "82c39a00449c9ef3f50b84fc00fc4acba20ef8f559f07902244abf4c15c5ab9c" +checksum = "12d8b6a9674422a8572e0b0abb12feeb3f2aeda86528c80d0350c2bd0923ab41" dependencies = [ "futures-util", "http", "hyper", "jsonrpsee-core", "jsonrpsee-types", + "pin-project", "route-recognizer", "serde", "serde_json", @@ -2227,16 +2227,15 @@ dependencies = [ [[package]] name = "jsonrpsee-types" -version = "0.20.3" +version = "0.22.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5be0be325642e850ed0bdff426674d2e66b2b7117c9be23a7caef68a2902b7d9" +checksum = "150d6168405890a7a3231a3c74843f58b8959471f6df76078db2619ddee1d07d" dependencies = [ "anyhow", "beef", "serde", "serde_json", "thiserror", - "tracing", ] [[package]] @@ -2639,7 +2638,7 @@ version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "681030a937600a36906c185595136d26abfebb4aa9c65701cefcaf8578bb982b" dependencies = [ - "proc-macro-crate 3.1.0", + "proc-macro-crate 1.3.1", "proc-macro2", "quote", "syn 2.0.63", @@ -5102,6 +5101,7 @@ dependencies = [ "futures-core", "pin-project-lite", "tokio", + "tokio-util", ] [[package]] @@ -5156,9 +5156,9 @@ dependencies = [ [[package]] name = "toml_datetime" -version = "0.6.5" +version = "0.6.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3550f4e9685620ac18a50ed434eb3aec30db8ba93b0287467bca5826ea25baf1" +checksum = "0dd7358ecb8fc2f8d014bf86f6f638ce72ba252a2c3a2572f2a795f1d23efb41" dependencies = [ "serde", ] diff --git a/Cargo.toml b/Cargo.toml index ef67ced..9b64fed 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,7 +17,7 @@ cadence = "0.29.0" cadence-macros = "0.29.0" dashmap = "5.5.3" queues = "1.1.0" -jsonrpsee = { version = "0.20.1", features = [ +jsonrpsee = { version = "0.22.5", features = [ "server", "http-client", "macros", @@ -38,4 +38,4 @@ yellowstone-grpc-geyser = { git = "https://github.com/helius-labs/yellowstone-gr rand = "0.8.5" futures = "0.3.24" figment = { version = "0.10.6", features = ["env", "test"] } -tower = { version = "0.4.13", features = ["full"] } +tower = { version = "0.4.13", features = ["full"] } \ No newline at end of file diff --git a/src/grpc_geyser.rs b/src/grpc_geyser.rs index 9d3fed1..01de114 100644 --- a/src/grpc_geyser.rs +++ b/src/grpc_geyser.rs @@ -1,10 +1,8 @@ -use std::sync::atomic::Ordering; use std::sync::Arc; -use std::time::Instant; use std::{collections::HashMap, time::Duration}; use cadence_macros::statsd_count; -use futures::{future::TryFutureExt, sink::SinkExt, stream::StreamExt}; +use futures::{sink::SinkExt, stream::StreamExt}; use rand::distributions::Alphanumeric; use rand::Rng; use tokio::time::sleep; diff --git a/src/main.rs b/src/main.rs index c1935c1..f926db0 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,11 +5,13 @@ use cadence::{BufferedUdpMetricSink, QueuingMetricSink, StatsdClient}; use cadence_macros::set_global_default; use figment::{providers::Env, Figment}; use grpc_geyser::GrpcGeyserImpl; -use jsonrpsee::server::{middleware::ProxyGetRequestLayer, ServerBuilder}; +use jsonrpsee::server::{RpcServiceBuilder, ServerBuilder}; +use jsonrpsee::server::middleware::http::ProxyGetRequestLayer; use priority_fee::PriorityFeeTracker; use rpc_server::AtlasPriorityFeeEstimator; use serde::Deserialize; use tracing::{error, info}; + mod errors; mod grpc_consumer; mod grpc_geyser; @@ -17,6 +19,7 @@ mod priority_fee; mod rpc_server; mod slot_cache; mod solana; +mod temp_validator; #[derive(Debug, Deserialize, Clone)] struct EstimatorEnv { @@ -50,7 +53,8 @@ async fn main() { let port = env.port.unwrap_or(4141); let server = ServerBuilder::default() - .set_middleware( + .set_rpc_middleware(RpcServiceBuilder::new().layer(temp_validator::RpcValidatorLayer::new())) + .set_http_middleware( tower::ServiceBuilder::new() // Proxy `GET /health` requests to internal `health` method. .layer( diff --git a/src/priority_fee.rs b/src/priority_fee.rs index 8de8429..69d14cc 100644 --- a/src/priority_fee.rs +++ b/src/priority_fee.rs @@ -1,7 +1,5 @@ -use std::rc::Rc; use std::sync::Arc; -use std::time::Duration; - +use std::time::{Duration, Instant}; use cadence_macros::statsd_count; use cadence_macros::statsd_gauge; use dashmap::DashMap; @@ -10,7 +8,8 @@ use serde::Serialize; use solana_program_runtime::compute_budget::ComputeBudget; use solana_sdk::instruction::CompiledInstruction; use solana_sdk::{pubkey::Pubkey, slot_history::Slot}; -use tracing::error; +use tokio::sync::mpsc::{channel, Receiver, Sender}; +use tracing::{error, warn}; use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof; use yellowstone_grpc_proto::geyser::SubscribeUpdate; @@ -18,7 +17,7 @@ use crate::grpc_consumer::GrpcConsumer; use crate::rpc_server::get_recommended_fee; use crate::slot_cache::SlotCache; -#[derive(Deserialize, Clone)] +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)] pub enum PriorityLevel { Min, // 0th percentile Low, // 25th percentile @@ -59,7 +58,7 @@ impl Into for PriorityLevel { type Percentile = usize; -#[derive(Deserialize, Serialize, Debug, Clone)] +#[derive(Deserialize, Serialize, Debug, Clone, Default)] #[serde(rename_all(serialize = "camelCase", deserialize = "camelCase"))] pub struct MicroLamportPriorityFeeEstimates { pub min: f64, @@ -97,6 +96,7 @@ pub struct PriorityFeeTracker { priority_fees: Arc, compute_budget: ComputeBudget, slot_cache: SlotCache, + sampling_sender: Sender<(Vec, bool, Option)> } impl GrpcConsumer for PriorityFeeTracker { @@ -193,58 +193,198 @@ impl GrpcConsumer for PriorityFeeTracker { impl PriorityFeeTracker { pub fn new(slot_cache_length: usize) -> Self { + let (sampling_txn, sampling_rxn) = channel::<(Vec, bool, Option)>(1_000); + let tracker = Self { priority_fees: Arc::new(DashMap::new()), slot_cache: SlotCache::new(slot_cache_length), compute_budget: ComputeBudget::default(), + sampling_sender: sampling_txn, }; - tracker.poll_fees(); + tracker.poll_fees(sampling_rxn); tracker } - fn poll_fees(&self) { - let priority_fee_tracker = self.clone(); - tokio::spawn(async move { - loop { - let global_fees = - priority_fee_tracker.get_priority_fee_estimates(vec![], false, None); - statsd_gauge!( - "min_priority_fee", - global_fees.min as u64, - "account" => "none" - ); - statsd_gauge!("low_priority_fee", global_fees.low as u64, "account" => "none"); - statsd_gauge!( - "medium_priority_fee", - global_fees.medium as u64, - "account" => "none" - ); - statsd_gauge!( - "high_priority_fee", - global_fees.high as u64, - "account" => "none" - ); - statsd_gauge!( - "very_high_priority_fee", - global_fees.very_high as u64, - "account" => "none" - ); - statsd_gauge!( - "unsafe_max_priority_fee", - global_fees.unsafe_max as u64, - "account" => "none" - ); - statsd_gauge!( - "recommended_priority_fee", - get_recommended_fee(global_fees) as u64, - "account" => "none" - ); - tokio::time::sleep(Duration::from_millis(1000)).await - } - }); + fn poll_fees(&self, mut sampling_rxn: Receiver<(Vec, bool, Option)>) { + { + let priority_fee_tracker = self.clone(); + // task to run global fee comparison every 1 second + tokio::spawn(async move { + loop { + tokio::time::sleep(Duration::from_millis(1_000)).await; + priority_fee_tracker.record_general_fees(); + } + }); + } + + { + let priority_fee_tracker = self.clone(); + // task to poll the queue and run comparison to see what is the diff between algos + tokio::spawn(async move { + loop { + match sampling_rxn.recv().await + { + Some((accounts, include_vote, lookback_period)) => + priority_fee_tracker.record_specific_fees(accounts, include_vote, lookback_period), + _ => {}, + } + }; + }); + } + } + + fn record_general_fees(&self) { + let global_fees = self.calculation1(&vec![], false, &None); + statsd_gauge!( + "min_priority_fee", + global_fees.min as u64, + "account" => "none" + ); + statsd_gauge!("low_priority_fee", global_fees.low as u64, "account" => "none"); + statsd_gauge!( + "medium_priority_fee", + global_fees.medium as u64, + "account" => "none" + ); + statsd_gauge!( + "high_priority_fee", + global_fees.high as u64, + "account" => "none" + ); + statsd_gauge!( + "very_high_priority_fee", + global_fees.very_high as u64, + "account" => "none" + ); + statsd_gauge!( + "unsafe_max_priority_fee", + global_fees.unsafe_max as u64, + "account" => "none" + ); + statsd_gauge!( + "recommended_priority_fee", + get_recommended_fee(global_fees) as u64, + "account" => "none" + ); + } + + fn record_specific_fees(&self, accounts: Vec, include_vote: bool, lookback_period: Option) + { + let old_fee = self.calculation1(&accounts, include_vote, &lookback_period); + let new_fee = self.calculation2(&accounts, include_vote, &lookback_period); + let new_fee_last = self.calculation2(&accounts, include_vote, &Some(1)); + + statsd_gauge!( + "min_priority_fee", + old_fee.min, + "account" => "spec" + ); + statsd_gauge!( + "min_priority_fee_new", + new_fee.min, + "account" => "spec" + ); + statsd_gauge!( + "min_priority_fee_last", + new_fee_last.min, + "account" => "spec" + ); + + statsd_gauge!("low_priority_fee", + old_fee.low, + "account" => "spec" + ); + statsd_gauge!("low_priority_fee_new", + new_fee.low, + "account" => "spec" + ); + statsd_gauge!("low_priority_fee_last", + new_fee_last.low, + "account" => "spec" + ); + + statsd_gauge!( + "medium_priority_fee", + old_fee.medium, + "account" => "spec" + ); + statsd_gauge!( + "medium_priority_fee_new", + new_fee.medium, + "account" => "spec" + ); + statsd_gauge!( + "medium_priority_fee_last", + new_fee_last.medium, + "account" => "spec" + ); + + statsd_gauge!( + "high_priority_fee", + old_fee.high, + "account" => "spec" + ); + statsd_gauge!( + "high_priority_fee_new", + new_fee.high, + "account" => "spec" + ); + statsd_gauge!( + "high_priority_fee_last", + new_fee_last.high, + "account" => "spec" + ); + + statsd_gauge!( + "very_high_priority_fee", + old_fee.very_high, + "account" => "spec" + ); + statsd_gauge!( + "very_high_priority_fee_new", + new_fee.very_high, + "account" => "spec" + ); + statsd_gauge!( + "very_high_priority_fee_last", + new_fee_last.very_high, + "account" => "spec" + ); + + statsd_gauge!( + "unsafe_max_priority_fee", + old_fee.unsafe_max, + "account" => "spec" + ); + statsd_gauge!( + "unsafe_max_priority_fee_new", + new_fee.unsafe_max, + "account" => "spec" + ); + statsd_gauge!( + "very_high_priority_fee_last", + new_fee_last.unsafe_max, + "account" => "spec" + ); + + statsd_gauge!( + "recommended_priority_fee", + get_recommended_fee(old_fee), + "account" => "spec" + ); + statsd_gauge!( + "recommended_priority_fee_new", + get_recommended_fee(new_fee), + "account" => "spec" + ); + statsd_gauge!( + "recommended_priority_fee_last", + get_recommended_fee(new_fee_last), + "account" => "spec" + ); } - fn push_priority_fee_for_txn( + pub fn push_priority_fee_for_txn( &self, slot: Slot, accounts: Vec, @@ -275,19 +415,51 @@ impl PriorityFeeTracker { } } + + + // TODO: DKH - both algos should probably be in some enum (like algo1, algo2) and be passed to + // this method instead of sending a bool flag. I'll refactor this in next iteration. already too many changes pub fn get_priority_fee_estimates( &self, accounts: Vec, include_vote: bool, - lookback_period: Option, + lookback_period: Option, + calculation1: bool ) -> MicroLamportPriorityFeeEstimates { - let start = std::time::Instant::now(); + let start = Instant::now(); + let micro_lamport_priority_fee_estimates = if calculation1 + { + self.calculation1(&accounts, include_vote, &lookback_period) + } + else { + self.calculation2(&accounts, include_vote, &lookback_period) + }; + + statsd_gauge!( + "get_priority_fee_estimates_time", + start.elapsed().as_nanos() as u64 + ); + if let Err(e) = self.sampling_sender.try_send((accounts.to_owned(), include_vote, lookback_period.to_owned())) + { + warn!("Did not add sample for calculation {e}"); + } + + micro_lamport_priority_fee_estimates + } + + /* + Algo1: given the list of accounts the algorithm will: + 1. collect all the transactions fees over n slots + 2. collect all the transaction fees for all the accounts specified over n slots + 3. will calculate the percentile distributions for each of two groups + 4. will choose the highest value from each percentile between two groups + */ + fn calculation1(&self, accounts: &Vec, include_vote: bool, lookback_period: &Option) -> MicroLamportPriorityFeeEstimates { let mut account_fees = vec![]; let mut transaction_fees = vec![]; - let rc_accounts = Rc::new(accounts); for (i, slot_priority_fees) in self.priority_fees.iter().enumerate() { if let Some(lookback_period) = lookback_period { - if i >= lookback_period as usize { + if i >= *lookback_period as usize { break; } } @@ -295,7 +467,7 @@ impl PriorityFeeTracker { transaction_fees.extend_from_slice(&slot_priority_fees.fees.vote_fees); } transaction_fees.extend_from_slice(&slot_priority_fees.fees.non_vote_fees); - for account in rc_accounts.iter() { + for account in accounts { let account_priority_fees = slot_priority_fees.account_fees.get(account); if let Some(account_priority_fees) = account_priority_fees { if include_vote { @@ -313,13 +485,72 @@ impl PriorityFeeTracker { very_high: max_percentile(&mut account_fees, &mut transaction_fees, 95), unsafe_max: max_percentile(&mut account_fees, &mut transaction_fees, 100), }; - let end = std::time::Instant::now(); - statsd_gauge!( - "get_priority_fee_estimates_time", - end.duration_since(start).as_nanos() as u64 - ); micro_lamport_priority_fee_estimates } + + /* + Algo2: given the list of accounts the algorithm will: + 1. collect all the transactions fees over n slots + 2. for each specified account collect the fees and calculate the percentiles + 4. choose maximum values for each percentile between all transactions and each account + */ + fn calculation2(&self, accounts: &Vec, include_vote: bool, lookback_period: &Option) -> MicroLamportPriorityFeeEstimates { + let lookback = calculate_lookback_size(&lookback_period, self.slot_cache.len()); + + let mut slots_vec = Vec::with_capacity(self.slot_cache.len()); + self.slot_cache.copy_slots(&mut slots_vec); + slots_vec.sort(); + slots_vec.reverse(); + + let mut fees = vec![]; + let mut micro_lamport_priority_fee_estimates = MicroLamportPriorityFeeEstimates::default(); + + for slot in &slots_vec[..lookback] { + if let Some(slot_priority_fees) = self.priority_fees.get(slot) + { + if include_vote { + fees.extend_from_slice(&slot_priority_fees.fees.vote_fees); + } + fees.extend_from_slice(&slot_priority_fees.fees.non_vote_fees); + } + } + micro_lamport_priority_fee_estimates = estimate_max_values(&mut fees, + micro_lamport_priority_fee_estimates); + + for account in accounts { + fees.clear(); + + for slot in &slots_vec[..lookback] { + + if let Some(slot_priority_fees) = self.priority_fees.get(slot) + { + let account_priority_fees = slot_priority_fees.account_fees.get(account); + if let Some(account_priority_fees) = account_priority_fees { + if include_vote { + fees.extend_from_slice(&account_priority_fees.vote_fees); + } + fees.extend_from_slice(&account_priority_fees.non_vote_fees); + } + } + } + micro_lamport_priority_fee_estimates = estimate_max_values(&mut fees, + micro_lamport_priority_fee_estimates); + } + micro_lamport_priority_fee_estimates + } +} + +fn estimate_max_values( + mut fees: &mut Vec, + mut estimates: MicroLamportPriorityFeeEstimates, +) -> MicroLamportPriorityFeeEstimates { + estimates.min = percentile(&mut fees, 0).max(estimates.min); + estimates.low = percentile(&mut fees, 25).max(estimates.low); + estimates.medium = percentile(&mut fees, 50).max(estimates.medium); + estimates.high = percentile(&mut fees, 75).max(estimates.high); + estimates.very_high = percentile(&mut fees, 95).max(estimates.very_high); + estimates.unsafe_max = percentile(&mut fees, 100).max(estimates.unsafe_max); + estimates } fn max(a: f64, b: f64) -> f64 { @@ -330,6 +561,14 @@ fn max(a: f64, b: f64) -> f64 { } } +fn calculate_lookback_size(pref_num_slots: &Option, max_available_slots: usize) -> usize { + max_available_slots.min( + pref_num_slots + .map(|v| v as usize) + .unwrap_or(max_available_slots), + ) +} + #[derive(Debug, Clone)] pub struct Fees { non_vote_fees: Vec, @@ -381,13 +620,15 @@ fn percentile(values: &mut Vec, percentile: Percentile) -> f64 { let n = values.len() as f64; let r = (percentile as f64 / 100.0) * (n - 1.0) + 1.0; - if r.fract() == 0.0 { + let val = + if r.fract() == 0.0 { values[r as usize - 1] } else { let ri = r.trunc() as usize - 1; let rf = r.fract(); values[ri] + rf * (values[ri + 1] - values[ri]) - } + }; + val } #[cfg(test)] @@ -426,7 +667,25 @@ mod tests { // Now test the fee estimates for a known priority level, let's say medium (50th percentile) let estimates = - tracker.get_priority_fee_estimates(vec![accounts.get(0).unwrap().clone()], false, None); + tracker.calculation1(&vec![accounts.get(0).unwrap().clone()], false, &None); + // Since the fixed fees are evenly distributed, the 50th percentile should be the middle value + let expected_min_fee = 0.0; + let expected_low_fee = 25.0; + let expected_medium_fee = 50.0; + let expected_high_fee = 75.0; + let expected_very_high_fee = 95.0; + let expected_max_fee = 100.0; + assert_eq!(estimates.min, expected_min_fee); + assert_eq!(estimates.low, expected_low_fee); + assert_eq!(estimates.medium, expected_medium_fee); + assert_eq!(estimates.high, expected_high_fee); + assert_eq!(estimates.very_high, expected_very_high_fee); + assert_eq!(estimates.unsafe_max, expected_max_fee); + + + // Now test the fee estimates for a known priority level, let's say medium (50th percentile) + let estimates = + tracker.calculation2(&vec![accounts.get(0).unwrap().clone()], false, &None); // Since the fixed fees are evenly distributed, the 50th percentile should be the middle value let expected_min_fee = 0.0; let expected_low_fee = 25.0; @@ -467,7 +726,24 @@ mod tests { // Now test the fee estimates for a known priority level, let's say medium (50th percentile) let estimates = - tracker.get_priority_fee_estimates(vec![accounts.get(0).unwrap().clone()], false, None); + tracker.calculation1(&vec![accounts.get(0).unwrap().clone()], false, &None); + let expected_min_fee = 1.0; + let expected_low_fee = 25.0; + let expected_medium_fee = 50.0; + let expected_high_fee = 75.0; + let expected_very_high_fee = 95.0; + let expected_max_fee = 100.0; + assert_eq!(estimates.min, expected_min_fee); + assert_eq!(estimates.low, expected_low_fee); + assert_eq!(estimates.medium, expected_medium_fee); + assert_eq!(estimates.high, expected_high_fee); + assert_eq!(estimates.very_high, expected_very_high_fee); + assert_eq!(estimates.unsafe_max, expected_max_fee); + + + // Now test the fee estimates for a known priority level, let's say medium (50th percentile) + let estimates = + tracker.calculation2(&vec![accounts.get(0).unwrap().clone()], false, &None); let expected_min_fee = 1.0; let expected_low_fee = 25.0; let expected_medium_fee = 50.0; @@ -508,7 +784,20 @@ mod tests { // Now test the fee estimates for a known priority level, let's say medium (50th percentile) let estimates = - tracker.get_priority_fee_estimates(vec![accounts.get(0).unwrap().clone()], false, None); + tracker.calculation1(&vec![accounts.get(0).unwrap().clone()], false, &None); + let expected_low_fee = 25.0; + let expected_medium_fee = 50.0; + let expected_high_fee = 75.0; + let expected_very_high_fee = 95.0; + assert_ne!(estimates.low, expected_low_fee); + assert_ne!(estimates.medium, expected_medium_fee); + assert_ne!(estimates.high, expected_high_fee); + assert_ne!(estimates.very_high, expected_very_high_fee); + + + // Now test the fee estimates for a known priority level, let's say medium (50th percentile) + let estimates = + tracker.calculation2(&vec![accounts.get(0).unwrap().clone()], false, &None); let expected_low_fee = 25.0; let expected_medium_fee = 50.0; let expected_high_fee = 75.0; @@ -519,6 +808,165 @@ mod tests { assert_ne!(estimates.very_high, expected_very_high_fee); } + #[tokio::test] + async fn test_with_transactions_for_different_accounts() { + // same test as above but with an extra slot to throw off the value + init_metrics(); + let tracker = PriorityFeeTracker::new(100); + + // adding set of fees at beginning that would mess up percentiles if they were not removed + let account_1 = Pubkey::new_unique(); + let account_2 = Pubkey::new_unique(); + let account_3 = Pubkey::new_unique(); + let account_4 = Pubkey::new_unique(); + + // Simulate adding the fixed fees as both account-specific and transaction fees + for val in 0..100 { + match val { + 0..=24 => tracker.push_priority_fee_for_txn( + val as Slot, + vec![account_1], + val as u64, + false, + ), + 25..=49 => tracker.push_priority_fee_for_txn( + val as Slot, + vec![account_2], + val as u64, + false, + ), + 50..=74 => tracker.push_priority_fee_for_txn( + val as Slot, + vec![account_3], + val as u64, + false, + ), + 75..=99 => tracker.push_priority_fee_for_txn( + val as Slot, + vec![account_4], + val as u64, + false, + ), + _ => {} + } + } + + // Now test the fee estimates for a known priority level, let's say medium (50th percentile) + let estimates = + tracker.calculation1(&vec![account_1, account_4], false, &None); + let expected_min_fee = 0.0; + let expected_low_fee = 24.75; + let expected_medium_fee = 49.5; + let expected_high_fee = 86.75; + let expected_very_high_fee = 96.55; + let expected_max_fee = 99.0; + assert_eq!(estimates.min, expected_min_fee); + assert_eq!(estimates.low, expected_low_fee); + assert_eq!(estimates.medium, expected_medium_fee); + assert_eq!(estimates.high, expected_high_fee); + assert_eq!(estimates.very_high, expected_very_high_fee); + assert_eq!(estimates.unsafe_max, expected_max_fee); + + + + // Now test the fee estimates for a known priority level, let's say medium (50th percentile) + let estimates = + tracker.calculation2(&vec![account_1, account_4], false, &None); + let expected_min_fee = 75.0; + let expected_low_fee = 81.0; + let expected_medium_fee = 87.0; + let expected_high_fee = 93.0; + let expected_very_high_fee = 97.8; + let expected_max_fee = 99.0; + assert_eq!(estimates.min, expected_min_fee); + assert_eq!(estimates.low, expected_low_fee); + assert_eq!(estimates.medium, expected_medium_fee); + assert_eq!(estimates.high, expected_high_fee); + assert_eq!(estimates.very_high, expected_very_high_fee); + assert_eq!(estimates.unsafe_max, expected_max_fee); + } + + #[tokio::test] + async fn test_with_multiple_transactions_for_different_slots() { + // same test as above but with an extra slot to throw off the value + init_metrics(); + let tracker = PriorityFeeTracker::new(10); + + // adding set of fees at beginning that would mess up percentiles if they were not removed + let account_1 = Pubkey::new_unique(); + let account_2 = Pubkey::new_unique(); + let account_3 = Pubkey::new_unique(); + let account_4 = Pubkey::new_unique(); + + // Simulate adding the fixed fees as both account-specific and transaction fees + for val in 0..100 { + let slot = val / 10; // divide between 10 slots to have more than 1 transaction per slot + // also evenly distribute the orders + match val { + val if 0 == val % 4 => tracker.push_priority_fee_for_txn( + slot as Slot, + vec![account_1], + val as u64, + false, + ), + val if 1 == val % 4 => tracker.push_priority_fee_for_txn( + slot as Slot, + vec![account_2], + val as u64, + false, + ), + val if 2 == val % 4 => tracker.push_priority_fee_for_txn( + slot as Slot, + vec![account_3], + val as u64, + false, + ), + val if 3 == val % 4 => tracker.push_priority_fee_for_txn( + slot as Slot, + vec![account_4], + val as u64, + false, + ), + _ => {} + } + } + + + // Now test the fee estimates for a known priority level, let's say medium (50th percentile) + let estimates = + tracker.calculation1(&vec![account_1, account_4], false, &None); + let expected_min_fee = 0.0; + let expected_low_fee = 24.75; + let expected_medium_fee = 49.5; + let expected_high_fee = 74.25; + let expected_very_high_fee = 94.05; + let expected_max_fee = 99.0; + assert_eq!(estimates.min, expected_min_fee); + assert_eq!(estimates.low, expected_low_fee); + assert_eq!(estimates.medium, expected_medium_fee); + assert_eq!(estimates.high, expected_high_fee); + assert_eq!(estimates.very_high, expected_very_high_fee); + assert_eq!(estimates.unsafe_max, expected_max_fee); + + + + // Now test the fee estimates for a known priority level, let's say medium (50th percentile) + let estimates = + tracker.calculation2(&vec![account_1, account_4], false, &None); + let expected_min_fee = 3.0; + let expected_low_fee = 27.0; + let expected_medium_fee = 51.0; + let expected_high_fee = 75.0; + let expected_very_high_fee = 94.19999999999999; + let expected_max_fee = 99.0; + assert_eq!(estimates.min, expected_min_fee); + assert_eq!(estimates.low, expected_low_fee); + assert_eq!(estimates.medium, expected_medium_fee); + assert_eq!(estimates.high, expected_high_fee); + assert_eq!(estimates.very_high, expected_very_high_fee); + assert_eq!(estimates.unsafe_max, expected_max_fee); + } + #[tokio::test] async fn test_exclude_vote() { let tracker = PriorityFeeTracker::new(10); @@ -544,7 +992,25 @@ mod tests { // Now test the fee estimates for a known priority level, let's say medium (50th percentile) let estimates = - tracker.get_priority_fee_estimates(vec![accounts.get(0).unwrap().clone()], false, None); + tracker.calculation1(&vec![accounts.get(0).unwrap().clone()], false, &None); + // Since the fixed fees are evenly distributed, the 50th percentile should be the middle value + let expected_min_fee = 0.0; + let expected_low_fee = 25.0; + let expected_medium_fee = 50.0; + let expected_high_fee = 75.0; + let expected_very_high_fee = 95.0; + let expected_max_fee = 100.0; + assert_eq!(estimates.min, expected_min_fee); + assert_eq!(estimates.low, expected_low_fee); + assert_eq!(estimates.medium, expected_medium_fee); + assert_eq!(estimates.high, expected_high_fee); + assert_eq!(estimates.very_high, expected_very_high_fee); + assert_eq!(estimates.unsafe_max, expected_max_fee); + + + // Now test the fee estimates for a known priority level, let's say medium (50th percentile) + let estimates = + tracker.calculation2(&vec![accounts.get(0).unwrap().clone()], false, &None); // Since the fixed fees are evenly distributed, the 50th percentile should be the middle value let expected_min_fee = 0.0; let expected_low_fee = 25.0; diff --git a/src/rpc_server.rs b/src/rpc_server.rs index 43db208..6a7a83b 100644 --- a/src/rpc_server.rs +++ b/src/rpc_server.rs @@ -1,6 +1,6 @@ use std::{ collections::{HashMap, HashSet}, - env, fmt, + fmt, str::FromStr, sync::Arc, time::Instant, @@ -28,7 +28,7 @@ use tracing::info; pub struct AtlasPriorityFeeEstimator { pub priority_fee_tracker: Arc, - pub rpc_client: RpcClient, + pub rpc_client: Option, pub max_lookback_slots: usize, } @@ -41,29 +41,36 @@ impl fmt::Debug for AtlasPriorityFeeEstimator { } } -#[derive(Deserialize)] -#[serde(rename_all(serialize = "camelCase", deserialize = "camelCase"))] +#[derive(Serialize, Deserialize, Clone, Debug, Default)] +#[serde( + rename_all(serialize = "camelCase", deserialize = "camelCase"), +)] +// TODO: DKH - add deny_unknown_fields pub struct GetPriorityFeeEstimateRequest { pub transaction: Option, // estimate fee for a txn pub account_keys: Option>, // estimate fee for a list of accounts pub options: Option, } -#[derive(Deserialize, Clone)] -#[serde(rename_all(serialize = "camelCase", deserialize = "camelCase"))] +#[derive(Serialize, Deserialize, Clone, Debug, Default)] +#[serde( + rename_all(serialize = "camelCase", deserialize = "camelCase"), +)] +// TODO: DKH - add deny_unknown_fields pub struct GetPriorityFeeEstimateOptions { // controls input txn encoding pub transaction_encoding: Option, // controls custom priority fee level response pub priority_level: Option, // Default to MEDIUM pub include_all_priority_fee_levels: Option, // Include all priority level estimates in the response - pub lookback_slots: Option, // how many slots to look back on, default 50, min 1, max 300 - pub include_vote: Option, // include vote txns in the estimate + #[serde()] + pub lookback_slots: Option, // how many slots to look back on, default 50, min 1, max 300 + pub include_vote: Option, // include vote txns in the estimate // returns recommended fee, incompatible with custom controls. Currently the recommended fee is the median fee excluding vote txns pub recommended: Option, // return the recommended fee (median fee excluding vote txns) } -#[derive(Serialize, Clone)] +#[derive(Serialize, Clone, Debug, Default)] #[serde(rename_all(serialize = "camelCase", deserialize = "camelCase"))] pub struct GetPriorityFeeEstimateResponse { #[serde(skip_serializing_if = "Option::is_none")] @@ -81,6 +88,12 @@ pub trait AtlasPriorityFeeEstimatorRpc { &self, get_priority_fee_estimate_request: GetPriorityFeeEstimateRequest, ) -> RpcResult; + + #[method(name = "getTestPriorityFeeEstimate")] + fn get_test_priority_fee_estimate( + &self, + get_priority_fee_estimate_request: GetPriorityFeeEstimateRequest, + ) -> RpcResult; } fn validate_get_priority_fee_estimate_request( @@ -113,7 +126,7 @@ fn validate_get_priority_fee_estimate_request( None } -/// returns account keys from transcation +/// returns account keys from transaction fn get_from_account_keys(transaction: &VersionedTransaction) -> Vec { transaction .message @@ -198,33 +211,35 @@ fn get_from_address_lookup_tables( } fn get_accounts( - rpc_client: &RpcClient, + rpc_client: &Option, get_priority_fee_estimate_request: GetPriorityFeeEstimateRequest, ) -> RpcResult> { if let Some(account_keys) = get_priority_fee_estimate_request.account_keys { return Ok(account_keys); } - if let Some(transaction) = get_priority_fee_estimate_request.transaction { - let tx_encoding = if let Some(options) = get_priority_fee_estimate_request.options { - options - .transaction_encoding - .unwrap_or(UiTransactionEncoding::Base58) - } else { - UiTransactionEncoding::Base58 - }; - let binary_encoding = tx_encoding.into_binary_encoding().ok_or_else(|| { - invalid_request(&format!( - "unsupported encoding: {tx_encoding}. Supported encodings: base58, base64" - )) - })?; - let (_, transaction) = - decode_and_deserialize::(transaction, binary_encoding)?; - let account_keys: Vec = vec![ - get_from_account_keys(&transaction), - get_from_address_lookup_tables(rpc_client, &transaction), - ] - .concat(); - return Ok(account_keys); + if let Some(rpc_client) = rpc_client { + if let Some(transaction) = get_priority_fee_estimate_request.transaction { + let tx_encoding = if let Some(options) = get_priority_fee_estimate_request.options { + options + .transaction_encoding + .unwrap_or(UiTransactionEncoding::Base58) + } else { + UiTransactionEncoding::Base58 + }; + let binary_encoding = tx_encoding.into_binary_encoding().ok_or_else(|| { + invalid_request(&format!( + "unsupported encoding: {tx_encoding}. Supported encodings: base58, base64" + )) + })?; + let (_, transaction) = + decode_and_deserialize::(transaction, binary_encoding)?; + let account_keys: Vec = vec![ + get_from_account_keys(&transaction), + get_from_address_lookup_tables(rpc_client, &transaction), + ] + .concat(); + return Ok(account_keys); + } } Ok(vec![]) } @@ -238,6 +253,59 @@ impl AtlasPriorityFeeEstimatorRpcServer for AtlasPriorityFeeEstimator { &self, get_priority_fee_estimate_request: GetPriorityFeeEstimateRequest, ) -> RpcResult { + let algo_run_fn = |accounts: Vec, + include_vote: bool, + lookback_period: Option| + -> MicroLamportPriorityFeeEstimates { + self.priority_fee_tracker.get_priority_fee_estimates( + accounts, + include_vote, + lookback_period, + true, + ) + }; + self.execute_priority_fee_estimate_coordinator(get_priority_fee_estimate_request, algo_run_fn) + } + + fn get_test_priority_fee_estimate( + &self, + get_priority_fee_estimate_request: GetPriorityFeeEstimateRequest, + ) -> RpcResult { + let algo_run_fn = |accounts: Vec, + include_vote: bool, + lookback_period: Option| + -> MicroLamportPriorityFeeEstimates { + self.priority_fee_tracker.get_priority_fee_estimates( + accounts, + include_vote, + lookback_period, + false, + ) + }; + self.execute_priority_fee_estimate_coordinator(get_priority_fee_estimate_request, algo_run_fn) + } +} + +impl AtlasPriorityFeeEstimator { + pub fn new( + priority_fee_tracker: Arc, + rpc_url: String, + max_lookback_slots: usize, + ) -> Self { + let server = AtlasPriorityFeeEstimator { + priority_fee_tracker, + rpc_client: Some(RpcClient::new(rpc_url)), + max_lookback_slots, + }; + server + } + + fn execute_priority_fee_estimate_coordinator( + &self, + get_priority_fee_estimate_request: GetPriorityFeeEstimateRequest, + priority_fee_calc_fn: impl FnOnce(Vec, bool, Option) -> MicroLamportPriorityFeeEstimates, + ) -> RpcResult + { let options = get_priority_fee_estimate_request.options.clone(); let reason = validate_get_priority_fee_estimate_request(&get_priority_fee_estimate_request); if let Some(reason) = reason { @@ -254,16 +322,12 @@ impl AtlasPriorityFeeEstimatorRpcServer for AtlasPriorityFeeEstimator { .collect(); let lookback_slots = options.clone().map(|o| o.lookback_slots).flatten(); if let Some(lookback_slots) = lookback_slots { - if lookback_slots < 1 || lookback_slots > self.max_lookback_slots { + if lookback_slots < 1 || lookback_slots as usize > self.max_lookback_slots { return Err(invalid_request("lookback_slots must be between 1 and 150")); } } let include_vote = should_include_vote(&options); - let priority_fee_levels = self.priority_fee_tracker.get_priority_fee_estimates( - accounts, - include_vote, - lookback_slots, - ); + let priority_fee_levels = priority_fee_calc_fn(accounts, include_vote, lookback_slots); if let Some(options) = options.clone() { if options.include_all_priority_fee_levels == Some(true) { return Ok(GetPriorityFeeEstimateResponse { @@ -295,25 +359,10 @@ impl AtlasPriorityFeeEstimatorRpcServer for AtlasPriorityFeeEstimator { } else { priority_fee_levels.medium }; - return Ok(GetPriorityFeeEstimateResponse { + Ok(GetPriorityFeeEstimateResponse { priority_fee_estimate: Some(priority_fee), priority_fee_levels: None, - }); - } -} - -impl AtlasPriorityFeeEstimator { - pub fn new( - priority_fee_tracker: Arc, - rpc_url: String, - max_lookback_slots: usize, - ) -> Self { - let server = AtlasPriorityFeeEstimator { - priority_fee_tracker, - rpc_client: RpcClient::new(rpc_url), - max_lookback_slots, - }; - server + }) } } @@ -339,3 +388,177 @@ pub fn get_recommended_fee(priority_fee_levels: MicroLamportPriorityFeeEstimates let recommended_with_buffer = recommended * (1.0 + RECOMMENDED_FEE_SAFETY_BUFFER); recommended_with_buffer.ceil() } + +#[cfg(test)] +mod tests { + use crate::priority_fee::PriorityFeeTracker; + use crate::rpc_server::{ + AtlasPriorityFeeEstimator, AtlasPriorityFeeEstimatorRpcServer, + GetPriorityFeeEstimateOptions, GetPriorityFeeEstimateRequest, + }; + use cadence::{NopMetricSink, StatsdClient}; + use jsonrpsee::core::Cow; + use jsonrpsee::core::__reexports::serde_json; + use jsonrpsee::types::{Id, Request, TwoPointZero}; + use solana_sdk::clock::Slot; + use solana_sdk::pubkey::Pubkey; + use std::sync::Arc; + + #[tokio::test] + async fn test_calculating_fees_with_all_options_none() { + prep_statsd(); + + let acc1 = Pubkey::new_unique(); + let acc2 = Pubkey::new_unique(); + let tracker = PriorityFeeTracker::new(150); + tracker.push_priority_fee_for_txn(1 as Slot, vec![acc1, acc2], 100u64, false); + + let server = AtlasPriorityFeeEstimator { + priority_fee_tracker: Arc::new(tracker), + rpc_client: None, + max_lookback_slots: 150, + }; + + let result = server.get_priority_fee_estimate(GetPriorityFeeEstimateRequest { + account_keys: Some(vec![acc1.to_string(), acc2.to_string()]), + options: Some(GetPriorityFeeEstimateOptions::default()), + ..Default::default() + }); + let resp = result.unwrap(); + assert_eq!(resp.priority_fee_estimate, Some(100.0)); + assert!(resp.priority_fee_levels.is_none()); + } + + #[tokio::test] + async fn test_calculating_fees_with_no_options() { + prep_statsd(); + + let acc1 = Pubkey::new_unique(); + let acc2 = Pubkey::new_unique(); + let tracker = PriorityFeeTracker::new(150); + tracker.push_priority_fee_for_txn(1 as Slot, vec![acc1, acc2], 100u64, false); + + let server = AtlasPriorityFeeEstimator { + priority_fee_tracker: Arc::new(tracker), + rpc_client: None, + max_lookback_slots: 150, + }; + + let result = server.get_priority_fee_estimate(GetPriorityFeeEstimateRequest { + account_keys: Some(vec![acc1.to_string(), acc2.to_string()]), + ..Default::default() + }); + let resp = result.unwrap(); + assert_eq!(resp.priority_fee_estimate, Some(100.0)); + assert!(resp.priority_fee_levels.is_none()); + } + + #[tokio::test] + async fn test_calculating_all_fees() { + prep_statsd(); + + let acc1 = Pubkey::new_unique(); + let acc2 = Pubkey::new_unique(); + let tracker = PriorityFeeTracker::new(150); + tracker.push_priority_fee_for_txn(1 as Slot, vec![acc1], 100u64, false); + tracker.push_priority_fee_for_txn(1 as Slot, vec![acc2], 200u64, false); + + let server = AtlasPriorityFeeEstimator { + priority_fee_tracker: Arc::new(tracker), + rpc_client: None, + max_lookback_slots: 150, + }; + + let result = server.get_priority_fee_estimate(GetPriorityFeeEstimateRequest { + account_keys: Some(vec![acc1.to_string(), acc2.to_string()]), + options: Some(GetPriorityFeeEstimateOptions { + include_all_priority_fee_levels: Some(true), + ..Default::default() + }), + ..Default::default() + }); + let resp = result.unwrap(); + let levels = resp.priority_fee_levels.unwrap(); + assert_eq!(levels.min, 100.0); + assert_eq!(levels.low, 125.0); + assert_eq!(levels.medium, 150.0); + assert_eq!(levels.high, 175.0); + assert_eq!(levels.very_high, 195.0); + assert_eq!(levels.unsafe_max, 200.0); + assert!(resp.priority_fee_estimate.is_none()); + } + #[tokio::test] + async fn test_calculating_recommended_given_very_low_calculated_fee() { + prep_statsd(); + + let acc1 = Pubkey::new_unique(); + let acc2 = Pubkey::new_unique(); + let tracker = PriorityFeeTracker::new(150); + tracker.push_priority_fee_for_txn(1 as Slot, vec![acc1], 100u64, false); + tracker.push_priority_fee_for_txn(1 as Slot, vec![acc2], 200u64, false); + + let server = AtlasPriorityFeeEstimator { + priority_fee_tracker: Arc::new(tracker), + rpc_client: None, + max_lookback_slots: 150, + }; + + let result = server.get_priority_fee_estimate(GetPriorityFeeEstimateRequest { + account_keys: Some(vec![acc1.to_string(), acc2.to_string()]), + options: Some(GetPriorityFeeEstimateOptions { + recommended: Some(true), + ..Default::default() + }), + ..Default::default() + }); + let resp = result.unwrap(); + assert!(resp.priority_fee_levels.is_none()); + assert_eq!(resp.priority_fee_estimate, Some(10500.0)); + } + + // #[test] + // TODO: DKH - add the test back after we readd the validation + fn test_parsing_wrong_fields() { + for (param, error) in bad_params() { + let json_val = format!("{{\"jsonrpc\": \"2.0\",\"id\": \"1\", \"method\": \"getPriorityFeeEstimate\", \"params\": [{param}] }}"); + let res = serde_json::from_str::(json_val.as_str()); + let res = res.unwrap(); + assert_request(&res, Id::Str(Cow::const_str("1")), "getPriorityFeeEstimate"); + + if let Some(val) = res.params + { + let params: Result, _> = serde_json::from_str(val.get()); + assert!(params.is_err()); + assert_eq!(params.err().unwrap().to_string(), error, "testing {param}"); + } + } + } + + fn prep_statsd() { + let systemd_client = StatsdClient::builder("test", NopMetricSink) + .with_error_handler(|e| eprintln!("metric error: {}", e)) + .build(); + cadence_macros::set_global_default(systemd_client); + } + + fn assert_request<'a>(request: &Request<'a>, id: Id<'a>, method: &str) { + assert_eq!(request.jsonrpc, TwoPointZero); + assert_eq!(request.id, id); + assert_eq!(request.method, method); + } + + fn bad_params<'a>() -> Vec<(&'a str, &'a str)> { + vec![ + (r#"{"transactions": null}"#,"unknown field `transactions`, expected one of `transaction`, `accountKeys`, `options` at line 1 column 15"), + (r#"{"account_keys": null}"#,"unknown field `account_keys`, expected one of `transaction`, `accountKeys`, `options` at line 1 column 15"), + (r#"{"accountkeys": null}"#,"unknown field `accountkeys`, expected one of `transaction`, `accountKeys`, `options` at line 1 column 14"), + (r#"{"accountKeys": [1, 2]}"#, "invalid type: integer `1`, expected a string at line 1 column 18"), + (r#"{"option": null}"#, "unknown field `option`, expected one of `transaction`, `accountKeys`, `options` at line 1 column 9"), + (r#"{"options": {"transaction_encoding":null}}"#, "unknown field `transaction_encoding`, expected one of `transactionEncoding`, `priorityLevel`, `includeAllPriorityFeeLevels`, `lookbackSlots`, `includeVote`, `recommended` at line 1 column 35"), + (r#"{"options": {"priorityLevel":"HIGH"}}"#, "unknown variant `HIGH`, expected one of `Min`, `Low`, `Medium`, `High`, `VeryHigh`, `UnsafeMax`, `Default` at line 1 column 35"), + (r#"{"options": {"includeAllPriorityFeeLevels":"no"}}"#, "invalid type: string \"no\", expected a boolean at line 1 column 47"), + (r#"{"options": {"lookbackSlots":"no"}}"#, "invalid type: string \"no\", expected u32 at line 1 column 33"), + (r#"{"options": {"lookbackSlots":"-1"}}"#, "invalid type: string \"-1\", expected u32 at line 1 column 33"), + ] + } +} diff --git a/src/slot_cache.rs b/src/slot_cache.rs index f3aa776..2c97d5e 100644 --- a/src/slot_cache.rs +++ b/src/slot_cache.rs @@ -1,16 +1,14 @@ use std::sync::{Arc, RwLock}; -use dashmap::DashMap; -use queues::IsQueue; -use queues::Queue; +use dashmap::DashSet; +use queues::{CircularBuffer, IsQueue}; use solana_sdk::slot_history::Slot; use tracing::error; #[derive(Debug, Clone)] pub struct SlotCache { - slot_queue: Arc>>, - slot_set: Arc>, - slot_cache_length: usize, + slot_queue: Arc>>, + slot_set: Arc>, } /// SlotCache tracks slot_cache_length number of slots, when capacity is reached @@ -18,9 +16,8 @@ pub struct SlotCache { impl SlotCache { pub fn new(slot_cache_length: usize) -> Self { Self { - slot_queue: Arc::new(RwLock::new(Queue::new())), - slot_set: Arc::new(DashMap::new()), - slot_cache_length, + slot_queue: Arc::new(RwLock::new(CircularBuffer::new(slot_cache_length))), + slot_set: Arc::new(DashSet::new()), } } @@ -28,34 +25,37 @@ impl SlotCache { // and returns the oldest slot if the cache // is at capacity pub fn push_pop(&self, slot: Slot) -> Option { - if self.slot_set.contains_key(&slot) { + if self.slot_set.contains(&slot) { return None; } - let mut removed_slot = None; match self.slot_queue.write() { Ok(mut slot_queue) => { - if slot_queue.size() >= self.slot_cache_length { - match slot_queue.remove() { - Ok(oldest_slot) => { + match slot_queue.add(slot) { + Ok(maybe_oldest_slot) => { + if let Some(oldest_slot) = maybe_oldest_slot + { self.slot_set.remove(&oldest_slot); - removed_slot = Some(oldest_slot); - } - Err(e) => { - error!("error removing slot from slot queue: {}", e); } + self.slot_set.insert(slot); + maybe_oldest_slot + } + Err(e) => { + error!("error adding slot to slot queue: {}", e); + None } - } - self.slot_set.insert(slot, 0); - if let Err(e) = slot_queue.add(slot) { - error!("error adding slot to slot queue: {}", e); } } Err(e) => { error!("error getting write lock on slot queue: {}", e); + None } } - removed_slot } + + pub fn copy_slots<'a>(&self, vec: &mut Vec) { + vec.extend(self.slot_set.iter().map(|v| v.clone())); + } + pub fn len(&self) -> usize { self.slot_set.len() } @@ -74,7 +74,6 @@ mod tests { assert_eq!(slot_cache.push_pop(i), None); i += 1; } - // Now push one more and it should return the oldest (first inserted) assert_eq!(slot_cache.push_pop(101), Some(0)); @@ -89,4 +88,40 @@ mod tests { i += 1; } } + + #[test] + fn test_copy() { + // Create a SlotCache with a small length for testing + let slot_cache = SlotCache::new(100); + for i in 0..100 { + assert_eq!(slot_cache.push_pop(i), None); + assert_eq!(slot_cache.len(), (i + 1) as usize); + } + + let mut vec: Vec = Vec::new(); + slot_cache.copy_slots(&mut vec); + vec.sort(); + assert_eq!(vec, (0..100).collect::>()); + + vec.clear(); + slot_cache.copy_slots(&mut vec); + vec.sort(); + assert_eq!(vec, (0..100).collect::>()); + } + + + #[test] + fn test_copy_reversed() { + // Create a SlotCache with a small length for testing + let slot_cache = SlotCache::new(100); + for i in (0..100).rev() { + assert_eq!(slot_cache.push_pop(i), None); + assert_eq!(slot_cache.len(), 100-i as usize, "{i}"); + } + + let mut vec: Vec = Vec::new(); + slot_cache.copy_slots(&mut vec); + vec.sort(); + assert_eq!(vec, (0..100).collect::>()); + } } diff --git a/src/temp_validator.rs b/src/temp_validator.rs new file mode 100644 index 0000000..60caf0a --- /dev/null +++ b/src/temp_validator.rs @@ -0,0 +1,80 @@ +use cadence_macros::statsd_count; +use crate::priority_fee::PriorityLevel; +use jsonrpsee::core::__reexports::serde_json; +use jsonrpsee::server::middleware::rpc::RpcServiceT; +use jsonrpsee::types::Request; +use serde::{Deserialize, Serialize}; +use solana_transaction_status::UiTransactionEncoding; +use tracing::debug; + +#[derive(Serialize, Deserialize, Clone, Debug, Default)] +#[serde( + rename_all(serialize = "camelCase", deserialize = "camelCase"), + deny_unknown_fields +)] +struct GetPriorityFeeEstimateOptionsFake { + // controls input txn encoding + pub transaction_encoding: Option, + // controls custom priority fee level response + pub priority_level: Option, // Default to MEDIUM + pub include_all_priority_fee_levels: Option, // Include all priority level estimates in the response + #[serde()] + pub lookback_slots: Option, // how many slots to look back on, default 50, min 1, max 300 + pub include_vote: Option, // include vote txns in the estimate + // returns recommended fee, incompatible with custom controls. Currently the recommended fee is the median fee excluding vote txns + pub recommended: Option, // return the recommended fee (median fee excluding vote txns) +} + +#[derive(Serialize, Deserialize, Clone, Debug, Default)] +#[serde( + rename_all(serialize = "camelCase", deserialize = "camelCase"), + deny_unknown_fields +)] +struct GetPriorityFeeEstimateRequestFake { + transaction: Option, // estimate fee for a txn + account_keys: Option>, // estimate fee for a list of accounts + options: Option, +} + +/// RPC logger layer. +#[derive(Copy, Clone, Debug)] +pub struct RpcValidatorLayer; + +impl RpcValidatorLayer { + /// Create a new logging layer. + pub fn new() -> Self { + Self + } +} + +impl tower::Layer for RpcValidatorLayer { + type Service = RpcValidator; + + fn layer(&self, service: S) -> Self::Service { + RpcValidator { service } + } +} + +/// A middleware that logs each RPC call and response. +#[derive(Debug)] +pub struct RpcValidator { + service: S, +} + +impl<'a, S> RpcServiceT<'a> for RpcValidator +where + S: RpcServiceT<'a> + Send + Sync, +{ + type Future = S::Future; + + fn call(&self, req: Request<'a>) -> Self::Future { + if let Some(params) = &req.params { + if let Err(err_val) = serde_json::from_str::>(params.get()) { + statsd_count!("rpc_payload_parse_failed", 1); + debug!("RPC parse error: {}, {}", err_val, params); + } + } + + self.service.call(req) + } +}