Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

geyser: add worker_threads and affinity #481

Merged
merged 1 commit into from
Dec 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,21 @@ The minor version will be incremented upon a breaking change and the patch versi
### Features

- proto: add tonic feature ([#474](https://github.com/rpcpool/yellowstone-grpc/pull/474))
- geyser: use default compression as gzip and zstd ([#475](https://github.com/rpcpool/yellowstone-grpc/pull/475))
- example: add connection options to Rust client ([#478](https://github.com/rpcpool/yellowstone-grpc/pull/478))
- nodejs: add parse tx function ([#471](https://github.com/rpcpool/yellowstone-grpc/pull/471))

### Breaking

## 2024-12-01

- yellowstone-grpc-client-simple-4.1.0
- yellowstone-grpc-geyser-4.1.0

### Features

- nodejs: add parse tx function ([#471](https://github.com/rpcpool/yellowstone-grpc/pull/471))
- geyser: use default compression as gzip and zstd ([#475](https://github.com/rpcpool/yellowstone-grpc/pull/475))
- example: add connection options to Rust client ([#478](https://github.com/rpcpool/yellowstone-grpc/pull/478))
- geyser: add worker_threads and affinity ([#481](https://github.com/rpcpool/yellowstone-grpc/pull/481))

## 2024-11-28

- yellowstone-grpc-geyser-4.0.1
Expand Down
27 changes: 25 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
[workspace]
resolver = "2"
members = [
"examples/rust", # 4.0.0
"examples/rust", # 4.1.0
"yellowstone-grpc-client", # 4.0.0
"yellowstone-grpc-geyser", # 4.0.1
"yellowstone-grpc-geyser", # 4.1.0
"yellowstone-grpc-proto", # 4.0.0
]
exclude = [
Expand All @@ -20,6 +20,7 @@ keywords = ["solana"]
publish = false

[workspace.dependencies]
affinity = "0.1.2"
agave-geyser-plugin-interface = "~2.1.1"
anyhow = "1.0.62"
backoff = "0.4.0"
Expand Down
2 changes: 1 addition & 1 deletion examples/rust/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "yellowstone-grpc-client-simple"
version = "4.0.0"
version = "4.1.0"
authors = { workspace = true }
edition = { workspace = true }
homepage = { workspace = true }
Expand Down
3 changes: 2 additions & 1 deletion yellowstone-grpc-geyser/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "yellowstone-grpc-geyser"
version = "4.0.1"
version = "4.1.0"
authors = { workspace = true }
edition = { workspace = true }
description = "Yellowstone gRPC Geyser Plugin"
Expand All @@ -17,6 +17,7 @@ crate-type = ["cdylib", "rlib"]
name = "config-check"

[dependencies]
affinity = { workspace = true }
agave-geyser-plugin-interface = { workspace = true }
anyhow = { workspace = true }
base64 = { workspace = true }
Expand Down
4 changes: 4 additions & 0 deletions yellowstone-grpc-geyser/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
"log": {
"level": "info"
},
"tokio": {
"worker_threads": 8,
"affinity": "0-1,12-13"
},
"grpc": {
"address": "0.0.0.0:10000",
"tls_config": {
Expand Down
72 changes: 71 additions & 1 deletion yellowstone-grpc-geyser/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use {
GeyserPluginError, Result as PluginResult,
},
serde::{de, Deserialize, Deserializer},
std::{fs::read_to_string, net::SocketAddr, path::Path, time::Duration},
std::{collections::HashSet, fs::read_to_string, net::SocketAddr, path::Path, time::Duration},
tokio::sync::Semaphore,
tonic::codec::CompressionEncoding,
yellowstone_grpc_proto::plugin::filter::limits::FilterLimits,
Expand All @@ -15,6 +15,8 @@ pub struct Config {
pub libpath: String,
#[serde(default)]
pub log: ConfigLog,
#[serde(default)]
pub tokio: ConfigTokio,
pub grpc: ConfigGrpc,
#[serde(default)]
pub prometheus: Option<ConfigPrometheus>,
Expand Down Expand Up @@ -58,6 +60,74 @@ impl ConfigLog {
}
}

#[derive(Debug, Clone, Default, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct ConfigTokio {
/// Number of worker threads in Tokio runtime
pub worker_threads: Option<usize>,
/// Threads affinity
#[serde(deserialize_with = "ConfigTokio::deserialize_affinity")]
pub affinity: Option<Vec<usize>>,
}

impl ConfigTokio {
fn deserialize_affinity<'de, D>(deserializer: D) -> Result<Option<Vec<usize>>, D::Error>
where
D: Deserializer<'de>,
{
match Option::<&str>::deserialize(deserializer)? {
Some(taskset) => parse_taskset(taskset).map(Some).map_err(de::Error::custom),
None => Ok(None),
}
}
}

fn parse_taskset(taskset: &str) -> Result<Vec<usize>, String> {
let mut set = HashSet::new();
for taskset2 in taskset.split(',') {
match taskset2.split_once('-') {
Some((start, end)) => {
let start: usize = start
.parse()
.map_err(|_error| format!("failed to parse {start:?} from {taskset:?}"))?;
let end: usize = end
.parse()
.map_err(|_error| format!("failed to parse {end:?} from {taskset:?}"))?;
if start > end {
return Err(format!("invalid interval {taskset2:?} in {taskset:?}"));
}
for idx in start..=end {
set.insert(idx);
}
}
None => {
set.insert(
taskset2.parse().map_err(|_error| {
format!("failed to parse {taskset2:?} from {taskset:?}")
})?,
);
}
}
}

let mut vec = set.into_iter().collect::<Vec<usize>>();
vec.sort();

if let Some(set_max_index) = vec.last().copied() {
let max_index = affinity::get_thread_affinity()
.map_err(|_err| "failed to get affinity".to_owned())?
.into_iter()
.max()
.unwrap_or(0);

if set_max_index > max_index {
return Err(format!("core index must be in the range [0, {max_index}]"));
}
}

Ok(vec)
}

#[derive(Debug, Clone, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct ConfigGrpc {
Expand Down
15 changes: 12 additions & 3 deletions yellowstone-grpc-geyser/src/grpc.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use {
crate::{
config::ConfigGrpc,
config::{ConfigGrpc, ConfigTokio},
metrics::{self, DebugClientMessage},
version::GrpcVersionInfo,
},
Expand Down Expand Up @@ -325,6 +325,7 @@ pub struct GrpcService {
impl GrpcService {
#[allow(clippy::type_complexity)]
pub async fn create(
config_tokio: ConfigTokio,
config: ConfigGrpc,
debug_clients_tx: Option<mpsc::UnboundedSender<DebugClientMessage>>,
is_reload: bool,
Expand Down Expand Up @@ -405,9 +406,17 @@ impl GrpcService {
// Run geyser message loop
let (messages_tx, messages_rx) = mpsc::unbounded_channel();
spawn_blocking(move || {
Builder::new_multi_thread()
let mut builder = Builder::new_multi_thread();
if let Some(worker_threads) = config_tokio.worker_threads {
builder.worker_threads(worker_threads);
}
if let Some(tokio_cpus) = config_tokio.affinity.clone() {
builder.on_thread_start(move || {
affinity::set_thread_affinity(&tokio_cpus).expect("failed to set affinity")
});
}
builder
.thread_name_fn(crate::get_thread_name)
.worker_threads(4)
.enable_all()
.build()
.expect("Failed to create a new runtime for geyser loop")
Expand Down
12 changes: 11 additions & 1 deletion yellowstone-grpc-geyser/src/plugin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,16 @@ impl GeyserPlugin for Plugin {
solana_logger::setup_with_default(&config.log.level);

// Create inner
let runtime = Builder::new_multi_thread()
let mut builder = Builder::new_multi_thread();
if let Some(worker_threads) = config.tokio.worker_threads {
builder.worker_threads(worker_threads);
}
if let Some(tokio_cpus) = config.tokio.affinity.clone() {
builder.on_thread_start(move || {
affinity::set_thread_affinity(&tokio_cpus).expect("failed to set affinity")
});
}
let runtime = builder
.thread_name_fn(crate::get_thread_name)
.enable_all()
.build()
Expand All @@ -81,6 +90,7 @@ impl GeyserPlugin for Plugin {
runtime.block_on(async move {
let (debug_client_tx, debug_client_rx) = mpsc::unbounded_channel();
let (snapshot_channel, grpc_channel, grpc_shutdown) = GrpcService::create(
config.tokio,
config.grpc,
config.debug_clients_http.then_some(debug_client_tx),
is_reload,
Expand Down
Loading