Skip to content

Commit

Permalink
geyser: add worker_threads and affinity (#481)
Browse files Browse the repository at this point in the history
  • Loading branch information
fanatid committed Dec 1, 2024
1 parent 75df985 commit 0c62550
Show file tree
Hide file tree
Showing 9 changed files with 141 additions and 14 deletions.
15 changes: 12 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,21 @@ The minor version will be incremented upon a breaking change and the patch versi
### Features

- proto: add tonic feature ([#474](https://github.com/rpcpool/yellowstone-grpc/pull/474))
- geyser: use default compression as gzip and zstd ([#475](https://github.com/rpcpool/yellowstone-grpc/pull/475))
- example: add connection options to Rust client ([#478](https://github.com/rpcpool/yellowstone-grpc/pull/478))
- nodejs: add parse tx function ([#471](https://github.com/rpcpool/yellowstone-grpc/pull/471))

### Breaking

## 2024-12-01

- yellowstone-grpc-client-simple-3.1.0
- yellowstone-grpc-geyser-3.1.0

### Features

- nodejs: add parse tx function ([#471](https://github.com/rpcpool/yellowstone-grpc/pull/471))
- geyser: use default compression as gzip and zstd ([#475](https://github.com/rpcpool/yellowstone-grpc/pull/475))
- example: add connection options to Rust client ([#478](https://github.com/rpcpool/yellowstone-grpc/pull/478))
- geyser: add worker_threads and affinity ([#481](https://github.com/rpcpool/yellowstone-grpc/pull/481))

## 2024-11-28

- yellowstone-grpc-geyser-3.0.1
Expand Down
27 changes: 25 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
[workspace]
resolver = "2"
members = [
"examples/rust", # 3.0.0
"examples/rust", # 3.1.0
"yellowstone-grpc-client", # 3.0.0
"yellowstone-grpc-geyser", # 3.0.1
"yellowstone-grpc-geyser", # 3.1.0
"yellowstone-grpc-proto", # 3.0.0
]
exclude = [
Expand All @@ -20,6 +20,7 @@ keywords = ["solana"]
publish = false

[workspace.dependencies]
affinity = "0.1.2"
agave-geyser-plugin-interface = "~2.0.15"
anyhow = "1.0.62"
backoff = "0.4.0"
Expand Down
2 changes: 1 addition & 1 deletion examples/rust/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "yellowstone-grpc-client-simple"
version = "3.0.0"
version = "3.1.0"
authors = { workspace = true }
edition = { workspace = true }
homepage = { workspace = true }
Expand Down
3 changes: 2 additions & 1 deletion yellowstone-grpc-geyser/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "yellowstone-grpc-geyser"
version = "3.0.1"
version = "3.1.0"
authors = { workspace = true }
edition = { workspace = true }
description = "Yellowstone gRPC Geyser Plugin"
Expand All @@ -17,6 +17,7 @@ crate-type = ["cdylib", "rlib"]
name = "config-check"

[dependencies]
affinity = { workspace = true }
agave-geyser-plugin-interface = { workspace = true }
anyhow = { workspace = true }
base64 = { workspace = true }
Expand Down
4 changes: 4 additions & 0 deletions yellowstone-grpc-geyser/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
"log": {
"level": "info"
},
"tokio": {
"worker_threads": 8,
"affinity": "0-1,12-13"
},
"grpc": {
"address": "0.0.0.0:10000",
"tls_config": {
Expand Down
72 changes: 71 additions & 1 deletion yellowstone-grpc-geyser/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use {
GeyserPluginError, Result as PluginResult,
},
serde::{de, Deserialize, Deserializer},
std::{fs::read_to_string, net::SocketAddr, path::Path, time::Duration},
std::{collections::HashSet, fs::read_to_string, net::SocketAddr, path::Path, time::Duration},
tokio::sync::Semaphore,
tonic::codec::CompressionEncoding,
yellowstone_grpc_proto::plugin::filter::limits::FilterLimits,
Expand All @@ -15,6 +15,8 @@ pub struct Config {
pub libpath: String,
#[serde(default)]
pub log: ConfigLog,
#[serde(default)]
pub tokio: ConfigTokio,
pub grpc: ConfigGrpc,
#[serde(default)]
pub prometheus: Option<ConfigPrometheus>,
Expand Down Expand Up @@ -58,6 +60,74 @@ impl ConfigLog {
}
}

#[derive(Debug, Clone, Default, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct ConfigTokio {
/// Number of worker threads in Tokio runtime
pub worker_threads: Option<usize>,
/// Threads affinity
#[serde(deserialize_with = "ConfigTokio::deserialize_affinity")]
pub affinity: Option<Vec<usize>>,
}

impl ConfigTokio {
fn deserialize_affinity<'de, D>(deserializer: D) -> Result<Option<Vec<usize>>, D::Error>
where
D: Deserializer<'de>,
{
match Option::<&str>::deserialize(deserializer)? {
Some(taskset) => parse_taskset(taskset).map(Some).map_err(de::Error::custom),
None => Ok(None),
}
}
}

fn parse_taskset(taskset: &str) -> Result<Vec<usize>, String> {
let mut set = HashSet::new();
for taskset2 in taskset.split(',') {
match taskset2.split_once('-') {
Some((start, end)) => {
let start: usize = start
.parse()
.map_err(|_error| format!("failed to parse {start:?} from {taskset:?}"))?;
let end: usize = end
.parse()
.map_err(|_error| format!("failed to parse {end:?} from {taskset:?}"))?;
if start > end {
return Err(format!("invalid interval {taskset2:?} in {taskset:?}"));
}
for idx in start..=end {
set.insert(idx);
}
}
None => {
set.insert(
taskset2.parse().map_err(|_error| {
format!("failed to parse {taskset2:?} from {taskset:?}")
})?,
);
}
}
}

let mut vec = set.into_iter().collect::<Vec<usize>>();
vec.sort();

if let Some(set_max_index) = vec.last().copied() {
let max_index = affinity::get_thread_affinity()
.map_err(|_err| "failed to get affinity".to_owned())?
.into_iter()
.max()
.unwrap_or(0);

if set_max_index > max_index {
return Err(format!("core index must be in the range [0, {max_index}]"));
}
}

Ok(vec)
}

#[derive(Debug, Clone, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct ConfigGrpc {
Expand Down
15 changes: 12 additions & 3 deletions yellowstone-grpc-geyser/src/grpc.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use {
crate::{
config::ConfigGrpc,
config::{ConfigGrpc, ConfigTokio},
metrics::{self, DebugClientMessage},
version::GrpcVersionInfo,
},
Expand Down Expand Up @@ -309,6 +309,7 @@ pub struct GrpcService {
impl GrpcService {
#[allow(clippy::type_complexity)]
pub async fn create(
config_tokio: ConfigTokio,
config: ConfigGrpc,
debug_clients_tx: Option<mpsc::UnboundedSender<DebugClientMessage>>,
is_reload: bool,
Expand Down Expand Up @@ -389,9 +390,17 @@ impl GrpcService {
// Run geyser message loop
let (messages_tx, messages_rx) = mpsc::unbounded_channel();
spawn_blocking(move || {
Builder::new_multi_thread()
let mut builder = Builder::new_multi_thread();
if let Some(worker_threads) = config_tokio.worker_threads {
builder.worker_threads(worker_threads);
}
if let Some(tokio_cpus) = config_tokio.affinity.clone() {
builder.on_thread_start(move || {
affinity::set_thread_affinity(&tokio_cpus).expect("failed to set affinity")
});
}
builder
.thread_name_fn(crate::get_thread_name)
.worker_threads(4)
.enable_all()
.build()
.expect("Failed to create a new runtime for geyser loop")
Expand Down
12 changes: 11 additions & 1 deletion yellowstone-grpc-geyser/src/plugin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,16 @@ impl GeyserPlugin for Plugin {
solana_logger::setup_with_default(&config.log.level);

// Create inner
let runtime = Builder::new_multi_thread()
let mut builder = Builder::new_multi_thread();
if let Some(worker_threads) = config.tokio.worker_threads {
builder.worker_threads(worker_threads);
}
if let Some(tokio_cpus) = config.tokio.affinity.clone() {
builder.on_thread_start(move || {
affinity::set_thread_affinity(&tokio_cpus).expect("failed to set affinity")
});
}
let runtime = builder
.thread_name_fn(crate::get_thread_name)
.enable_all()
.build()
Expand All @@ -81,6 +90,7 @@ impl GeyserPlugin for Plugin {
runtime.block_on(async move {
let (debug_client_tx, debug_client_rx) = mpsc::unbounded_channel();
let (snapshot_channel, grpc_channel, grpc_shutdown) = GrpcService::create(
config.tokio,
config.grpc,
config.debug_clients_http.then_some(debug_client_tx),
is_reload,
Expand Down

0 comments on commit 0c62550

Please sign in to comment.