From 852daa2e566d1bf3a11dab98e394e572312f3d37 Mon Sep 17 00:00:00 2001 From: Derek Date: Wed, 7 Aug 2024 18:11:08 -0700 Subject: [PATCH] add lru cache backend; update arg parsing; add version to Cargo.lock; add caching for eth_maxPriorityFeePerGas --- Cargo.lock | 84 ++++++++++++++++++- Cargo.toml | 4 +- src/args.rs | 16 ++++ src/cache/lru_backend.rs | 57 +++++++++++++ src/cache/mod.rs | 1 + src/main.rs | 68 +++++++++++---- src/rpc_cache_handler/common.rs | 8 ++ .../eth_max_priority_fee_per_gas.rs | 16 ++++ src/rpc_cache_handler/mod.rs | 2 + 9 files changed, 236 insertions(+), 20 deletions(-) create mode 100644 src/cache/lru_backend.rs create mode 100644 src/rpc_cache_handler/eth_max_priority_fee_per_gas.rs diff --git a/Cargo.lock b/Cargo.lock index 1cd6ab7..92fc527 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -236,6 +236,12 @@ dependencies = [ "alloc-no-stdlib", ] +[[package]] +name = "allocator-api2" +version = "0.2.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c6cb57a04249c6480766f7f7cef5467412af1490f8d1e243141daddada3264f" + [[package]] name = "alloy-primitives" version = "0.7.7" @@ -268,6 +274,21 @@ dependencies = [ "bytes", ] +[[package]] +name = "android-tzdata" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e999941b234f3131b00bc13c22d06e8c5ff726d1b6318ac7eb276997bbb4fef0" + +[[package]] +name = "android_system_properties" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "819e7219dbd41043ac279b19830f2efc897156490d7fd6ea916720117ee66311" +dependencies = [ + "libc", +] + [[package]] name = "anstream" version = "0.6.11" @@ -792,16 +813,18 @@ dependencies = [ [[package]] name = "cached-eth-rpc" -version = "0.1.0" +version = "1.0.3" dependencies = [ "actix-web", "alloy-primitives", "anyhow", "async-trait", + "chrono", "clap", "dashmap", "env_logger", "hex", + "lru", "r2d2", "redis", "reqwest", @@ -827,6 +850,20 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "chrono" +version = "0.4.38" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a21f936df1771bf62b77f047b726c4625ff2e8aa607c01ec06e5a05bd8463401" +dependencies = [ + "android-tzdata", + "iana-time-zone", + "js-sys", + "num-traits", + "wasm-bindgen", + "windows-targets 0.52.0", +] + [[package]] name = "clap" version = "4.4.18" @@ -1431,6 +1468,10 @@ name = "hashbrown" version = "0.14.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "290f1a1d9242c78d09ce40a5e87e7554ee637af1351968159f4952f028f75604" +dependencies = [ + "ahash", + "allocator-api2", +] [[package]] name = "heck" @@ -1545,6 +1586,29 @@ dependencies = [ "tokio-native-tls", ] +[[package]] +name = "iana-time-zone" +version = "0.1.60" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7ffbb5a1b541ea2561f8c41c087286cc091e21e556a4f09a8f6cbf17b69b141" +dependencies = [ + "android_system_properties", + "core-foundation-sys", + "iana-time-zone-haiku", + "js-sys", + "wasm-bindgen", + "windows-core", +] + +[[package]] +name = "iana-time-zone-haiku" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f31827a206f56af32e590ba56d5d2d085f558508192593743f16b2306495269f" +dependencies = [ + "cc", +] + [[package]] name = "idna" version = "0.5.0" @@ -1748,6 +1812,15 @@ dependencies = [ "value-bag", ] +[[package]] +name = "lru" +version = "0.12.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37ee39891760e7d94734f6f63fedc29a2e4a152f836120753a72503f09fcf904" +dependencies = [ + "hashbrown", +] + [[package]] name = "memchr" version = "2.7.1" @@ -3182,6 +3255,15 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" +[[package]] +name = "windows-core" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9" +dependencies = [ + "windows-targets 0.52.0", +] + [[package]] name = "windows-sys" version = "0.48.0" diff --git a/Cargo.toml b/Cargo.toml index 6d199d0..af75a0f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "cached-eth-rpc" -version = "0.1.0" +version = "1.0.3" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html @@ -10,10 +10,12 @@ actix-web = "4.8" alloy-primitives = { version = "0.7", features = ["serde"] } anyhow = "1.0" async-trait = "0.1" +chrono = "0.4.38" clap = { version = "4.4", features = ["derive"] } dashmap = { version = "6.0", features = ["serde"] } env_logger = "0.11" hex = "0.4" +lru = "0.12.4" r2d2 = "0.8" redis = { version = "0.24", features = ["r2d2", "async-std"] } reqwest = { version = "0.11", features = ["rustls", "json", "serde_json"] } diff --git a/src/args.rs b/src/args.rs index ffab607..168cde2 100644 --- a/src/args.rs +++ b/src/args.rs @@ -14,6 +14,12 @@ pub struct Args { #[arg(short, long = "endpoint", value_parser = endpoint_parser)] pub endpoints: Vec<(String, Url)>, + #[arg(short, long, default_value = "100000")] + pub lru_max_items: usize, + + #[arg(short, long = "cache", default_value = "lru", value_parser = cache_backend_parser)] + pub cache_type: String, + #[arg( short, long, @@ -34,3 +40,13 @@ fn endpoint_parser(s: &str) -> Result<(String, Url), String> { Ok((name, url)) } + +fn cache_backend_parser(s: &str) -> Result { + match s { + "memory" => {} + "lru" => {} + "redis" => {} + _ => return Err(format!("Invalid cache backend: {}", s)), + } + Ok(s.to_owned()) +} diff --git a/src/cache/lru_backend.rs b/src/cache/lru_backend.rs new file mode 100644 index 0000000..77eef79 --- /dev/null +++ b/src/cache/lru_backend.rs @@ -0,0 +1,57 @@ +use lru::LruCache; +use std::num::NonZeroUsize; +use std::sync::{Arc, Mutex}; + +use anyhow::Context; +use serde_json::{from_str, Value}; + +use super::{CacheBackend, CacheBackendFactory, CacheStatus}; + +pub struct LruBackendFactory { + data: Arc>>, +} + +impl LruBackendFactory { + pub fn new(cap: usize) -> Self { + Self { + data: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(cap).unwrap()))), + } + } +} + +impl CacheBackendFactory for LruBackendFactory { + fn get_instance(&self) -> anyhow::Result> { + Ok(Box::new(LruBackend { + data: self.data.clone(), + })) + } +} + +pub struct LruBackend { + data: Arc>>, +} + +impl CacheBackend for LruBackend { + fn read(&mut self, method: &str, params_key: &str) -> anyhow::Result { + let key = format!("{method}:{params_key}"); + + let mut lru_cache = self.data.lock().unwrap(); + let v = match lru_cache.get(&key) { + Some(value) => { + let value = from_str::(&value).context("fail to deserialize cache value")?; + + CacheStatus::Cached { key, value } + } + + None => CacheStatus::Missed { key }, + }; + + Ok(v) + } + + fn write(&mut self, key: &str, value: &str) -> anyhow::Result<()> { + let mut lru_cache = self.data.lock().unwrap(); + let _ = lru_cache.put(key.to_string(), value.to_string()); + Ok(()) + } +} diff --git a/src/cache/mod.rs b/src/cache/mod.rs index c5d826f..6f80b3d 100644 --- a/src/cache/mod.rs +++ b/src/cache/mod.rs @@ -1,3 +1,4 @@ +pub mod lru_backend; pub mod memory_backend; pub mod redis_backend; diff --git a/src/main.rs b/src/main.rs index d8b133f..d20538e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,7 +2,7 @@ use std::collections::HashMap; use actix_web::{error, web, App, Error, HttpResponse, HttpServer}; use anyhow::Context; -use cache::{memory_backend, CacheBackendFactory}; +use cache::{lru_backend, memory_backend, CacheBackendFactory}; use clap::Parser; use env_logger::Env; use reqwest::Url; @@ -45,6 +45,7 @@ async fn rpc_call( // Scope the redis connection { + // retrieve the caching backend (memory, redis, etc) let mut cache_backend = match chain_state.cache_factory.get_instance() { Ok(v) => v, Err(err) => { @@ -60,6 +61,7 @@ async fn rpc_call( } }; + // iterate through each request looking for the result in cache and aggregating uncached requests for (index, request) in requests.into_iter().enumerate() { let (id, method, params) = match extract_single_request_info(request) { Ok(v) => v, @@ -86,6 +88,7 @@ async fn rpc_call( }}; } + // retrieve the handler for the requested method let cache_entry = match chain_state.cache_entries.get(&method) { Some(cache_entry) => cache_entry, None => { @@ -94,6 +97,7 @@ async fn rpc_call( } }; + // get the cache key from the handler based on the request params let params_key = match cache_entry.handler.extract_cache_key(¶ms) { Ok(Some(params_key)) => params_key, Ok(None) => push_uncached_request_and_continue!(), @@ -107,6 +111,7 @@ async fn rpc_call( } }; + // read results from cache match cache_backend.read(&method, ¶ms_key) { Ok(CacheStatus::Cached { key, value }) => { tracing::info!("cache hit for method {} with key {}", method, key); @@ -133,16 +138,19 @@ async fn rpc_call( }; } + // if nothing to cache then return empty response if uncached_requests.is_empty() { return_response!(); } + // prepare rpc and return the result future let rpc_result = utils::do_rpc_request( &data.http_client, chain_state.rpc_url.clone(), &uncached_requests, ); + // await the rpc response, for each cache miss record the response let rpc_result = match rpc_result.await { Ok(v) => v, Err(err) => { @@ -162,6 +170,7 @@ async fn rpc_call( } }; + // unwrap rpc_result into a vector of responses let result_values = match rpc_result { Value::Array(v) => v, _ => { @@ -185,6 +194,7 @@ async fn rpc_call( } }; + // ensure we got the expected number of responses if result_values.len() != uncached_requests.len() { tracing::warn!( "rpc response length mismatch, expected: {}, got: {}", @@ -193,6 +203,7 @@ async fn rpc_call( ); } + // get the cache backend let mut cache_backend = match chain_state.cache_factory.get_instance() { Ok(v) => v, Err(err) => { @@ -212,6 +223,10 @@ async fn rpc_call( } }; + // for each response, get the corresponding request + // if the response was an error, record an error result and continue + // else assign the response and extract the cache key for insertion + // into the cache backend. for (index, mut response) in result_values.into_iter().enumerate() { let rpc_request = match RequestId::try_from(response["id"].clone()) { Ok(id) if request_id_index_map.get(&id).is_some() => { @@ -355,26 +370,43 @@ fn new_cache_backend_factory( args: &Args, chain_id: u64, ) -> anyhow::Result> { - let factory: Box = match &args.redis_url { - Some(redis_url) => { - tracing::info!("Using redis cache backend"); - - let client = - redis::Client::open(redis_url.as_ref()).context("fail to create redis client")?; - - let conn_pool = r2d2::Pool::builder() - .max_size(300) - .test_on_check_out(false) - .build(client) - .context("fail to create redis connection pool")?; - let factory = RedisBackendFactory::new(chain_id, conn_pool); - - Box::new(factory) - } - None => { + let factory: Box = match args.cache_type.as_str() { + "redis" => match &args.redis_url { + Some(redis_url) => { + tracing::info!("Using redis cache backend"); + + let client = redis::Client::open(redis_url.as_ref()) + .context("fail to create redis client")?; + + let conn_pool = r2d2::Pool::builder() + .max_size(300) + .test_on_check_out(false) + .build(client) + .context("fail to create redis connection pool")?; + let factory = RedisBackendFactory::new(chain_id, conn_pool); + + Box::new(factory) + } + None => { + return Err(anyhow::anyhow!( + "Must specify redis url when using redis cache backend!" + )); + } + }, + "memory" => { tracing::info!("Using in memory cache backend"); Box::new(memory_backend::MemoryBackendFactory::new()) } + "lru" => { + tracing::info!("Using in LRU cache backend"); + Box::new(lru_backend::LruBackendFactory::new(args.lru_max_items)) + } + _ => { + return Err(anyhow::anyhow!( + "Unknown cache backend specified: {}!", + args.cache_type + )); + } }; Ok(factory) diff --git a/src/rpc_cache_handler/common.rs b/src/rpc_cache_handler/common.rs index 7d3f1a1..9e7abb4 100644 --- a/src/rpc_cache_handler/common.rs +++ b/src/rpc_cache_handler/common.rs @@ -2,6 +2,7 @@ use std::str::FromStr; use alloy_primitives::{Address, B256, U64}; use anyhow::{bail, Context}; +use chrono::Local; use serde_json::Value; use sha1::Digest; @@ -100,6 +101,13 @@ pub fn hash_string(s: &str) -> String { hex::encode(result.as_slice()) } +pub fn compute_cache_bucket(ttl: i64) -> i64 { + let now = Local::now(); + let ts = now.timestamp(); + let bucket = ts - ts % ttl; + bucket +} + #[cfg(test)] mod test { mod test_extract_and_format_block_tag { diff --git a/src/rpc_cache_handler/eth_max_priority_fee_per_gas.rs b/src/rpc_cache_handler/eth_max_priority_fee_per_gas.rs new file mode 100644 index 0000000..6aaee85 --- /dev/null +++ b/src/rpc_cache_handler/eth_max_priority_fee_per_gas.rs @@ -0,0 +1,16 @@ +use crate::rpc_cache_handler::{common, RpcCacheHandler}; +use serde_json::Value; + +#[derive(Default, Clone)] +pub struct Handler {} + +impl RpcCacheHandler for Handler { + fn method_name(&self) -> &'static str { + "eth_maxPriorityFeePerGas" + } + + fn extract_cache_key(&self, _: &Value) -> anyhow::Result> { + let bucket = common::compute_cache_bucket(2); + Ok(Some(format!("eth_maxPriorityFeePerGas-{bucket}"))) + } +} diff --git a/src/rpc_cache_handler/mod.rs b/src/rpc_cache_handler/mod.rs index f169c85..4445409 100644 --- a/src/rpc_cache_handler/mod.rs +++ b/src/rpc_cache_handler/mod.rs @@ -21,6 +21,7 @@ mod eth_get_transaction_by_block_number_and_index; mod eth_get_transaction_by_hash; mod eth_get_transaction_count; mod eth_get_transaction_receipt; +mod eth_max_priority_fee_per_gas; pub trait RpcCacheHandler: Send + Sync { fn method_name(&self) -> &'static str; @@ -62,5 +63,6 @@ pub fn factories() -> Vec { get_factory::(), get_factory::(), get_factory::(), + get_factory::(), ] }