Skip to content

Commit

Permalink
kafka: support strings for queue size
Browse files Browse the repository at this point in the history
  • Loading branch information
fanatid committed Oct 4, 2023
1 parent 91a1c50 commit 722eed9
Show file tree
Hide file tree
Showing 9 changed files with 90 additions and 5 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ The minor version will be incremented upon a breaking change and the patch versi

### Features

- kafka: support strings for queue size ([#191](https://github.com/rpcpool/yellowstone-grpc/pull/191)).

### Fixes

### Breaking
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ members = [
"examples/rust", # 1.10.0+solana.1.16.14
"yellowstone-grpc-client", # 1.11.0+solana.1.16.14
"yellowstone-grpc-geyser", # 1.9.0+solana.1.16.14
"yellowstone-grpc-kafka", # 1.0.0+solana.1.16.14
"yellowstone-grpc-kafka", # 1.0.0-rc.0+solana.1.16.14
"yellowstone-grpc-proto", # 1.10.0+solana.1.16.14
]

Expand Down
38 changes: 38 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,44 @@ It's possible to add limits for filters in config. If `filters` field is omitted
- [Rust](examples/rust)
- [TypeScript](examples/typescript)

### Kafka producer / consumer

In addition to gRPC Geyser Plugin we provide Kafka tool. This tool can works in 3 modes:

- `grpc2kafka` — connect to gRPC with specified filter and sent all incoming messages to the Kafka
- `dedup` — consume messages from Kafka and sent deduplicated messages to another topic (right now only support `memory` as deduplication backend)
- `kafka2grpc` — provide gRPC endpoint with sending messages from Kafka

```bash
$ cargo run --bin grpc-kafka -- --help
Yellowstone gRPC Kafka Producer/Dedup/Consumer

Usage: grpc-kafka [OPTIONS] --config <CONFIG> <COMMAND>

Commands:
dedup Receive data from Kafka, deduplicate and send them back to Kafka
grpc2kafka Receive data from gRPC and send them to the Kafka
kafka2grpc Receive data from Kafka and send them over gRPC
help Print this message or the help of the given subcommand(s)

Options:
-c, --config <CONFIG> Path to config file
--prometheus <PROMETHEUS> Prometheus listen address
-h, --help Print help
-V, --version Print version
```

#### Development

```bash
# run kafka locally
docker-compose -f ./yellowstone-grpc-kafka/docker-kafka.yml up
# create topic
kafka_2.13-3.5.0/bin/kafka-topics.sh --bootstrap-server localhost:29092 --create --topic grpc1
# send messages from gRPC to Kafka
cargo run --bin grpc-kafka -- --config yellowstone-grpc-kafka/config.json --prometheus 127.0.0.1:8873 grpc2kafka
```

### License

This project and all source code in this repository is licensed as follows:
Expand Down
1 change: 1 addition & 0 deletions yellowstone-grpc-kafka/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ sha2 = "0.10.7"
tokio = { version = "1.21.2", features = ["rt-multi-thread", "macros", "time", "fs", "signal"] }
tokio-stream = "0.1.11"
tonic = { version = "0.10.2", features = ["gzip", "tls", "tls-roots"] }
tonic-health = "0.10.2"
tracing = "0.1.37"
tracing-subscriber = { version = "0.3.17", features = ["env-filter"] }
yellowstone-grpc-proto = { path = "../yellowstone-grpc-proto" }
Expand Down
4 changes: 2 additions & 2 deletions yellowstone-grpc-kafka/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
},
"kafka_input": "grpc1",
"kafka_output": "grpc2",
"kafka_queue_size": 10000,
"kafka_queue_size": "10_000",
"backend": {
"type": "memory"
}
Expand All @@ -30,7 +30,7 @@
},
"kafka": {},
"kafka_topic": "grpc1",
"kafka_queue_size": 10000
"kafka_queue_size": "10_000"
},
"kafka2grpc": {
"kafka": {
Expand Down
2 changes: 1 addition & 1 deletion yellowstone-grpc-kafka/src/bin/grpc-kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ struct Args {

#[derive(Debug, Clone, Subcommand)]
enum ArgsAction {
// Receive data from Kafka, deduplicate and send them back to Kafka
/// Receive data from Kafka, deduplicate and send them back to Kafka
Dedup,
/// Receive data from gRPC and send them to the Kafka
#[command(name = "grpc2kafka")]
Expand Down
39 changes: 38 additions & 1 deletion yellowstone-grpc-kafka/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use {
crate::dedup::{KafkaDedup, KafkaDedupMemory},
anyhow::Context,
serde::{Deserialize, Serialize},
serde::{
de::{self, Deserializer},
Deserialize, Serialize,
},
std::{
collections::{HashMap, HashSet},
net::SocketAddr,
Expand Down Expand Up @@ -47,6 +50,10 @@ pub struct ConfigDedup {
pub kafka: HashMap<String, String>,
pub kafka_input: String,
pub kafka_output: String,
#[serde(
default = "ConfigGrpc2Kafka::default_kafka_queue_size",
deserialize_with = "ConfigGrpc2Kafka::deserialize_usize_str"
)]
pub kafka_queue_size: usize,
pub backend: ConfigDedupBackend,
}
Expand All @@ -73,9 +80,39 @@ pub struct ConfigGrpc2Kafka {
#[serde(default)]
pub kafka: HashMap<String, String>,
pub kafka_topic: String,
#[serde(
default = "ConfigGrpc2Kafka::default_kafka_queue_size",
deserialize_with = "ConfigGrpc2Kafka::deserialize_usize_str"
)]
pub kafka_queue_size: usize,
}

impl ConfigGrpc2Kafka {
const fn default_kafka_queue_size() -> usize {
10_000
}

fn deserialize_usize_str<'de, D>(deserializer: D) -> Result<usize, D::Error>
where
D: Deserializer<'de>,
{
#[derive(Deserialize)]
#[serde(untagged)]
enum Value {
Integer(usize),
String(String),
}

match Value::deserialize(deserializer)? {
Value::Integer(value) => Ok(value),
Value::String(value) => value
.replace('_', "")
.parse::<usize>()
.map_err(de::Error::custom),
}
}
}

#[derive(Debug, Default, Deserialize, Serialize)]
#[serde(default)]
pub struct ConfigGrpc2KafkaRequest {
Expand Down
6 changes: 6 additions & 0 deletions yellowstone-grpc-kafka/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use {
},
Request, Response, Result as TonicResult, Status,
},
tonic_health::server::health_reporter,
tracing::{error, info},
yellowstone_grpc_proto::prelude::{
geyser_server::{Geyser, GeyserServer},
Expand Down Expand Up @@ -73,8 +74,13 @@ impl GrpcService {
let shutdown_grpc = Arc::clone(&shutdown);

let server = tokio::spawn(async move {
// gRPC Health check service
let (mut health_reporter, health_service) = health_reporter();
health_reporter.set_serving::<GeyserServer<Self>>().await;

Server::builder()
.http2_keepalive_interval(Some(Duration::from_secs(5)))
.add_service(health_service)
.add_service(service)
.serve_with_incoming_shutdown(incoming, shutdown_grpc.notified())
.await
Expand Down

0 comments on commit 722eed9

Please sign in to comment.