Skip to content

Commit

Permalink
geyser: add compression option to config (#356)
Browse files Browse the repository at this point in the history
  • Loading branch information
fanatid authored Jun 6, 2024
1 parent a8b1bae commit 87fc3c1
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 19 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ The minor version will be incremented upon a breaking change and the patch versi

### Features

- geyser: add compression option to config ([#356](https://github.com/rpcpool/yellowstone-grpc/pull/356))

### Breaking

## 2024-06-05
Expand Down
32 changes: 25 additions & 7 deletions yellowstone-grpc-geyser/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,14 @@
"cert_path": "",
"key_path": ""
},
"compression": {
"accept": [
"gzip"
],
"send": [
"gzip"
]
},
"max_decoding_message_size": "4_194_304",
"snapshot_plugin_channel_capacity": null,
"snapshot_client_channel_capacity": "50_000_000",
Expand All @@ -20,9 +28,13 @@
"max": 1,
"any": false,
"account_max": 10,
"account_reject": ["TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA"],
"account_reject": [
"TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA"
],
"owner_max": 10,
"owner_reject": ["11111111111111111111111111111111"]
"owner_reject": [
"11111111111111111111111111111111"
]
},
"slots": {
"max": 1
Expand All @@ -31,26 +43,32 @@
"max": 1,
"any": false,
"account_include_max": 10,
"account_include_reject": ["TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA"],
"account_include_reject": [
"TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA"
],
"account_exclude_max": 10,
"account_required_max": 10
},
"transactions_status": {
"max": 1,
"any": false,
"account_include_max": 10,
"account_include_reject": ["TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA"],
"account_include_reject": [
"TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA"
],
"account_exclude_max": 10,
"account_required_max": 10
},
"blocks": {
"max": 1,
"account_include_max": 10,
"account_include_any": false,
"account_include_reject": ["TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA"],
"account_include_reject": [
"TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA"
],
"include_transactions": true,
"include_accounts" : false,
"include_entries" : false
"include_accounts": false,
"include_entries": false
},
"blocks_meta": {
"max": 1
Expand Down
51 changes: 51 additions & 0 deletions yellowstone-grpc-geyser/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use {
solana_sdk::pubkey::Pubkey,
std::{collections::HashSet, fs::read_to_string, net::SocketAddr, path::Path},
tokio::sync::Semaphore,
tonic::codec::CompressionEncoding,
};

#[derive(Debug, Clone, Deserialize)]
Expand Down Expand Up @@ -67,6 +68,9 @@ pub struct ConfigGrpc {
pub address: SocketAddr,
/// TLS config
pub tls_config: Option<ConfigGrpcServerTls>,
/// Possible compression options
#[serde(default)]
pub compression: ConfigGrpcCompression,
/// Limits the maximum size of a decoded message, default is 4MiB
#[serde(
default = "ConfigGrpc::max_decoding_message_size_default",
Expand Down Expand Up @@ -137,6 +141,53 @@ pub struct ConfigGrpcServerTls {
pub key_path: String,
}

#[derive(Debug, Clone, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct ConfigGrpcCompression {
#[serde(
deserialize_with = "ConfigGrpcCompression::deserialize_compression",
default = "ConfigGrpcCompression::default_compression"
)]
pub accept: Vec<CompressionEncoding>,
#[serde(
deserialize_with = "ConfigGrpcCompression::deserialize_compression",
default = "ConfigGrpcCompression::default_compression"
)]
pub send: Vec<CompressionEncoding>,
}

impl Default for ConfigGrpcCompression {
fn default() -> Self {
Self {
accept: Self::default_compression(),
send: Self::default_compression(),
}
}
}

impl ConfigGrpcCompression {
fn deserialize_compression<'de, D>(
deserializer: D,
) -> Result<Vec<CompressionEncoding>, D::Error>
where
D: Deserializer<'de>,
{
Vec::<&str>::deserialize(deserializer)?
.into_iter()
.map(|value| match value {
"gzip" => Ok(CompressionEncoding::Gzip),
value => Err(de::Error::custom(format!(
"Unknown compression format: {value}"
))),
})
.collect::<Result<_, _>>()
}

fn default_compression() -> Vec<CompressionEncoding> {
vec![CompressionEncoding::Gzip]
}
}

#[derive(Debug, Default, Clone, Deserialize)]
#[serde(default, deny_unknown_fields)]
pub struct ConfigGrpcFilters {
Expand Down
27 changes: 15 additions & 12 deletions yellowstone-grpc-geyser/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ use {
},
tokio_stream::wrappers::ReceiverStream,
tonic::{
codec::CompressionEncoding,
service::{interceptor::InterceptedService, Interceptor},
transport::{
server::{Server, TcpIncoming},
Expand Down Expand Up @@ -712,7 +711,8 @@ impl SlotMessages {

#[derive(Debug)]
pub struct GrpcService {
config: ConfigGrpc,
config_snapshot_client_channel_capacity: usize,
config_channel_capacity: usize,
config_filters: Arc<ConfigGrpcFilters>,
blocks_meta: Option<BlockMetaStorage>,
subscribe_id: AtomicUsize,
Expand Down Expand Up @@ -777,21 +777,24 @@ impl GrpcService {

// Create Server
let max_decoding_message_size = config.max_decoding_message_size;
let x_token = XTokenChecker::new(config.x_token.clone());
let config_filters = Arc::new(config.filters.clone());
let service = GeyserServer::new(Self {
config,
config_filters,
let mut service = GeyserServer::new(Self {
config_snapshot_client_channel_capacity: config.snapshot_client_channel_capacity,
config_channel_capacity: config.channel_capacity,
config_filters: Arc::new(config.filters),
blocks_meta,
subscribe_id: AtomicUsize::new(0),
snapshot_rx: Mutex::new(snapshot_rx),
broadcast_tx: broadcast_tx.clone(),
debug_clients_tx,
})
.accept_compressed(CompressionEncoding::Gzip)
.send_compressed(CompressionEncoding::Gzip)
.max_decoding_message_size(max_decoding_message_size);
let service = InterceptedService::new(service, x_token);
for encoding in config.compression.accept {
service = service.accept_compressed(encoding);
}
for encoding in config.compression.send {
service = service.send_compressed(encoding);
}
let service = InterceptedService::new(service, XTokenChecker::new(config.x_token));

// Run geyser message loop
let (messages_tx, messages_rx) = mpsc::unbounded_channel();
Expand Down Expand Up @@ -1284,9 +1287,9 @@ impl Geyser for GrpcService {
let id = self.subscribe_id.fetch_add(1, Ordering::Relaxed);
let snapshot_rx = self.snapshot_rx.lock().await.take();
let (stream_tx, stream_rx) = mpsc::channel(if snapshot_rx.is_some() {
self.config.snapshot_client_channel_capacity
self.config_snapshot_client_channel_capacity
} else {
self.config.channel_capacity
self.config_channel_capacity
});
let (client_tx, client_rx) = mpsc::unbounded_channel();
let notify_exit1 = Arc::new(Notify::new());
Expand Down

0 comments on commit 87fc3c1

Please sign in to comment.