diff --git a/CHANGELOG.md b/CHANGELOG.md index 691d4dae..1e33d301 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,12 +15,21 @@ The minor version will be incremented upon a breaking change and the patch versi ### Features - proto: add tonic feature ([#474](https://github.com/rpcpool/yellowstone-grpc/pull/474)) -- geyser: use default compression as gzip and zstd ([#475](https://github.com/rpcpool/yellowstone-grpc/pull/475)) -- example: add connection options to Rust client ([#478](https://github.com/rpcpool/yellowstone-grpc/pull/478)) -- nodejs: add parse tx function ([#471](https://github.com/rpcpool/yellowstone-grpc/pull/471)) ### Breaking +## 2024-12-01 + +- yellowstone-grpc-client-simple-4.1.0 +- yellowstone-grpc-geyser-4.1.0 + +### Features + +- nodejs: add parse tx function ([#471](https://github.com/rpcpool/yellowstone-grpc/pull/471)) +- geyser: use default compression as gzip and zstd ([#475](https://github.com/rpcpool/yellowstone-grpc/pull/475)) +- example: add connection options to Rust client ([#478](https://github.com/rpcpool/yellowstone-grpc/pull/478)) +- geyser: add worker_threads and affinity ([#481](https://github.com/rpcpool/yellowstone-grpc/pull/481)) + ## 2024-11-28 - yellowstone-grpc-geyser-4.0.1 diff --git a/Cargo.lock b/Cargo.lock index bab34331..0537fb16 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -63,6 +63,18 @@ dependencies = [ "zeroize", ] +[[package]] +name = "affinity" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "763e484feceb7dd021b21c5c6f81aee06b1594a743455ec7efbf72e6355e447b" +dependencies = [ + "cfg-if", + "errno", + "libc", + "num_cpus", +] + [[package]] name = "agave-geyser-plugin-interface" version = "2.1.1" @@ -2507,6 +2519,16 @@ dependencies = [ "autocfg", ] +[[package]] +name = "num_cpus" +version = "1.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43" +dependencies = [ + "hermit-abi 0.3.9", + "libc", +] + [[package]] name = "num_enum" version = "0.7.3" @@ -5951,7 +5973,7 @@ dependencies = [ [[package]] name = "yellowstone-grpc-client-simple" -version = "4.0.0" +version = "4.1.0" dependencies = [ "anyhow", "backoff", @@ -5976,8 +5998,9 @@ dependencies = [ [[package]] name = "yellowstone-grpc-geyser" -version = "4.0.1" +version = "4.1.0" dependencies = [ + "affinity", "agave-geyser-plugin-interface", "anyhow", "base64 0.22.1", diff --git a/Cargo.toml b/Cargo.toml index 774ad987..bc3b2795 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,9 +1,9 @@ [workspace] resolver = "2" members = [ - "examples/rust", # 4.0.0 + "examples/rust", # 4.1.0 "yellowstone-grpc-client", # 4.0.0 - "yellowstone-grpc-geyser", # 4.0.1 + "yellowstone-grpc-geyser", # 4.1.0 "yellowstone-grpc-proto", # 4.0.0 ] exclude = [ @@ -20,6 +20,7 @@ keywords = ["solana"] publish = false [workspace.dependencies] +affinity = "0.1.2" agave-geyser-plugin-interface = "~2.1.1" anyhow = "1.0.62" backoff = "0.4.0" diff --git a/examples/rust/Cargo.toml b/examples/rust/Cargo.toml index 61db0f83..ffa89eca 100644 --- a/examples/rust/Cargo.toml +++ b/examples/rust/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "yellowstone-grpc-client-simple" -version = "4.0.0" +version = "4.1.0" authors = { workspace = true } edition = { workspace = true } homepage = { workspace = true } diff --git a/yellowstone-grpc-geyser/Cargo.toml b/yellowstone-grpc-geyser/Cargo.toml index d2391a89..cc1767f1 100644 --- a/yellowstone-grpc-geyser/Cargo.toml +++ b/yellowstone-grpc-geyser/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "yellowstone-grpc-geyser" -version = "4.0.1" +version = "4.1.0" authors = { workspace = true } edition = { workspace = true } description = "Yellowstone gRPC Geyser Plugin" @@ -17,6 +17,7 @@ crate-type = ["cdylib", "rlib"] name = "config-check" [dependencies] +affinity = { workspace = true } agave-geyser-plugin-interface = { workspace = true } anyhow = { workspace = true } base64 = { workspace = true } diff --git a/yellowstone-grpc-geyser/config.json b/yellowstone-grpc-geyser/config.json index 5f347210..180c913a 100644 --- a/yellowstone-grpc-geyser/config.json +++ b/yellowstone-grpc-geyser/config.json @@ -3,6 +3,10 @@ "log": { "level": "info" }, + "tokio": { + "worker_threads": 8, + "affinity": "0-1,12-13" + }, "grpc": { "address": "0.0.0.0:10000", "tls_config": { diff --git a/yellowstone-grpc-geyser/src/config.rs b/yellowstone-grpc-geyser/src/config.rs index d03b20fe..0d6b2ec0 100644 --- a/yellowstone-grpc-geyser/src/config.rs +++ b/yellowstone-grpc-geyser/src/config.rs @@ -3,7 +3,7 @@ use { GeyserPluginError, Result as PluginResult, }, serde::{de, Deserialize, Deserializer}, - std::{fs::read_to_string, net::SocketAddr, path::Path, time::Duration}, + std::{collections::HashSet, fs::read_to_string, net::SocketAddr, path::Path, time::Duration}, tokio::sync::Semaphore, tonic::codec::CompressionEncoding, yellowstone_grpc_proto::plugin::filter::limits::FilterLimits, @@ -15,6 +15,8 @@ pub struct Config { pub libpath: String, #[serde(default)] pub log: ConfigLog, + #[serde(default)] + pub tokio: ConfigTokio, pub grpc: ConfigGrpc, #[serde(default)] pub prometheus: Option, @@ -58,6 +60,74 @@ impl ConfigLog { } } +#[derive(Debug, Clone, Default, Deserialize)] +#[serde(deny_unknown_fields)] +pub struct ConfigTokio { + /// Number of worker threads in Tokio runtime + pub worker_threads: Option, + /// Threads affinity + #[serde(deserialize_with = "ConfigTokio::deserialize_affinity")] + pub affinity: Option>, +} + +impl ConfigTokio { + fn deserialize_affinity<'de, D>(deserializer: D) -> Result>, D::Error> + where + D: Deserializer<'de>, + { + match Option::<&str>::deserialize(deserializer)? { + Some(taskset) => parse_taskset(taskset).map(Some).map_err(de::Error::custom), + None => Ok(None), + } + } +} + +fn parse_taskset(taskset: &str) -> Result, String> { + let mut set = HashSet::new(); + for taskset2 in taskset.split(',') { + match taskset2.split_once('-') { + Some((start, end)) => { + let start: usize = start + .parse() + .map_err(|_error| format!("failed to parse {start:?} from {taskset:?}"))?; + let end: usize = end + .parse() + .map_err(|_error| format!("failed to parse {end:?} from {taskset:?}"))?; + if start > end { + return Err(format!("invalid interval {taskset2:?} in {taskset:?}")); + } + for idx in start..=end { + set.insert(idx); + } + } + None => { + set.insert( + taskset2.parse().map_err(|_error| { + format!("failed to parse {taskset2:?} from {taskset:?}") + })?, + ); + } + } + } + + let mut vec = set.into_iter().collect::>(); + vec.sort(); + + if let Some(set_max_index) = vec.last().copied() { + let max_index = affinity::get_thread_affinity() + .map_err(|_err| "failed to get affinity".to_owned())? + .into_iter() + .max() + .unwrap_or(0); + + if set_max_index > max_index { + return Err(format!("core index must be in the range [0, {max_index}]")); + } + } + + Ok(vec) +} + #[derive(Debug, Clone, Deserialize)] #[serde(deny_unknown_fields)] pub struct ConfigGrpc { diff --git a/yellowstone-grpc-geyser/src/grpc.rs b/yellowstone-grpc-geyser/src/grpc.rs index 79c8f17e..40dba924 100644 --- a/yellowstone-grpc-geyser/src/grpc.rs +++ b/yellowstone-grpc-geyser/src/grpc.rs @@ -1,6 +1,6 @@ use { crate::{ - config::ConfigGrpc, + config::{ConfigGrpc, ConfigTokio}, metrics::{self, DebugClientMessage}, version::GrpcVersionInfo, }, @@ -325,6 +325,7 @@ pub struct GrpcService { impl GrpcService { #[allow(clippy::type_complexity)] pub async fn create( + config_tokio: ConfigTokio, config: ConfigGrpc, debug_clients_tx: Option>, is_reload: bool, @@ -405,9 +406,17 @@ impl GrpcService { // Run geyser message loop let (messages_tx, messages_rx) = mpsc::unbounded_channel(); spawn_blocking(move || { - Builder::new_multi_thread() + let mut builder = Builder::new_multi_thread(); + if let Some(worker_threads) = config_tokio.worker_threads { + builder.worker_threads(worker_threads); + } + if let Some(tokio_cpus) = config_tokio.affinity.clone() { + builder.on_thread_start(move || { + affinity::set_thread_affinity(&tokio_cpus).expect("failed to set affinity") + }); + } + builder .thread_name_fn(crate::get_thread_name) - .worker_threads(4) .enable_all() .build() .expect("Failed to create a new runtime for geyser loop") diff --git a/yellowstone-grpc-geyser/src/plugin.rs b/yellowstone-grpc-geyser/src/plugin.rs index b32f7bb6..696f2926 100644 --- a/yellowstone-grpc-geyser/src/plugin.rs +++ b/yellowstone-grpc-geyser/src/plugin.rs @@ -71,7 +71,16 @@ impl GeyserPlugin for Plugin { solana_logger::setup_with_default(&config.log.level); // Create inner - let runtime = Builder::new_multi_thread() + let mut builder = Builder::new_multi_thread(); + if let Some(worker_threads) = config.tokio.worker_threads { + builder.worker_threads(worker_threads); + } + if let Some(tokio_cpus) = config.tokio.affinity.clone() { + builder.on_thread_start(move || { + affinity::set_thread_affinity(&tokio_cpus).expect("failed to set affinity") + }); + } + let runtime = builder .thread_name_fn(crate::get_thread_name) .enable_all() .build() @@ -81,6 +90,7 @@ impl GeyserPlugin for Plugin { runtime.block_on(async move { let (debug_client_tx, debug_client_rx) = mpsc::unbounded_channel(); let (snapshot_channel, grpc_channel, grpc_shutdown) = GrpcService::create( + config.tokio, config.grpc, config.debug_clients_http.then_some(debug_client_tx), is_reload,