From 722eed9d1b9eb84dab23bf4e3b66fc6c6b544538 Mon Sep 17 00:00:00 2001 From: Kirill Fomichev Date: Wed, 4 Oct 2023 17:11:15 +0400 Subject: [PATCH] kafka: support strings for queue size --- CHANGELOG.md | 2 + Cargo.lock | 1 + Cargo.toml | 2 +- README.md | 38 +++++++++++++++++++ yellowstone-grpc-kafka/Cargo.toml | 1 + yellowstone-grpc-kafka/config.json | 4 +- yellowstone-grpc-kafka/src/bin/grpc-kafka.rs | 2 +- yellowstone-grpc-kafka/src/config.rs | 39 +++++++++++++++++++- yellowstone-grpc-kafka/src/grpc.rs | 6 +++ 9 files changed, 90 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 663f0718..ed8308ec 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,8 @@ The minor version will be incremented upon a breaking change and the patch versi ### Features +- kafka: support strings for queue size ([#191](https://github.com/rpcpool/yellowstone-grpc/pull/191)). + ### Fixes ### Breaking diff --git a/Cargo.lock b/Cargo.lock index caec30c9..d9277f56 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4659,6 +4659,7 @@ dependencies = [ "tokio", "tokio-stream", "tonic", + "tonic-health", "tracing", "tracing-subscriber", "vergen", diff --git a/Cargo.toml b/Cargo.toml index a809444b..f93dd70b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,7 +3,7 @@ members = [ "examples/rust", # 1.10.0+solana.1.16.14 "yellowstone-grpc-client", # 1.11.0+solana.1.16.14 "yellowstone-grpc-geyser", # 1.9.0+solana.1.16.14 - "yellowstone-grpc-kafka", # 1.0.0+solana.1.16.14 + "yellowstone-grpc-kafka", # 1.0.0-rc.0+solana.1.16.14 "yellowstone-grpc-proto", # 1.10.0+solana.1.16.14 ] diff --git a/README.md b/README.md index d1a0b19c..6adaa91e 100644 --- a/README.md +++ b/README.md @@ -102,6 +102,44 @@ It's possible to add limits for filters in config. If `filters` field is omitted - [Rust](examples/rust) - [TypeScript](examples/typescript) +### Kafka producer / consumer + +In addition to gRPC Geyser Plugin we provide Kafka tool. This tool can works in 3 modes: + +- `grpc2kafka` — connect to gRPC with specified filter and sent all incoming messages to the Kafka +- `dedup` — consume messages from Kafka and sent deduplicated messages to another topic (right now only support `memory` as deduplication backend) +- `kafka2grpc` — provide gRPC endpoint with sending messages from Kafka + +```bash +$ cargo run --bin grpc-kafka -- --help +Yellowstone gRPC Kafka Producer/Dedup/Consumer + +Usage: grpc-kafka [OPTIONS] --config + +Commands: + dedup Receive data from Kafka, deduplicate and send them back to Kafka + grpc2kafka Receive data from gRPC and send them to the Kafka + kafka2grpc Receive data from Kafka and send them over gRPC + help Print this message or the help of the given subcommand(s) + +Options: + -c, --config Path to config file + --prometheus Prometheus listen address + -h, --help Print help + -V, --version Print version +``` + +#### Development + +```bash +# run kafka locally +docker-compose -f ./yellowstone-grpc-kafka/docker-kafka.yml up +# create topic +kafka_2.13-3.5.0/bin/kafka-topics.sh --bootstrap-server localhost:29092 --create --topic grpc1 +# send messages from gRPC to Kafka +cargo run --bin grpc-kafka -- --config yellowstone-grpc-kafka/config.json --prometheus 127.0.0.1:8873 grpc2kafka +``` + ### License This project and all source code in this repository is licensed as follows: diff --git a/yellowstone-grpc-kafka/Cargo.toml b/yellowstone-grpc-kafka/Cargo.toml index 67f8ce14..03e5a2c6 100644 --- a/yellowstone-grpc-kafka/Cargo.toml +++ b/yellowstone-grpc-kafka/Cargo.toml @@ -23,6 +23,7 @@ sha2 = "0.10.7" tokio = { version = "1.21.2", features = ["rt-multi-thread", "macros", "time", "fs", "signal"] } tokio-stream = "0.1.11" tonic = { version = "0.10.2", features = ["gzip", "tls", "tls-roots"] } +tonic-health = "0.10.2" tracing = "0.1.37" tracing-subscriber = { version = "0.3.17", features = ["env-filter"] } yellowstone-grpc-proto = { path = "../yellowstone-grpc-proto" } diff --git a/yellowstone-grpc-kafka/config.json b/yellowstone-grpc-kafka/config.json index 2de53aae..496c0f4f 100644 --- a/yellowstone-grpc-kafka/config.json +++ b/yellowstone-grpc-kafka/config.json @@ -9,7 +9,7 @@ }, "kafka_input": "grpc1", "kafka_output": "grpc2", - "kafka_queue_size": 10000, + "kafka_queue_size": "10_000", "backend": { "type": "memory" } @@ -30,7 +30,7 @@ }, "kafka": {}, "kafka_topic": "grpc1", - "kafka_queue_size": 10000 + "kafka_queue_size": "10_000" }, "kafka2grpc": { "kafka": { diff --git a/yellowstone-grpc-kafka/src/bin/grpc-kafka.rs b/yellowstone-grpc-kafka/src/bin/grpc-kafka.rs index 546d1d4d..850b1ac9 100644 --- a/yellowstone-grpc-kafka/src/bin/grpc-kafka.rs +++ b/yellowstone-grpc-kafka/src/bin/grpc-kafka.rs @@ -60,7 +60,7 @@ struct Args { #[derive(Debug, Clone, Subcommand)] enum ArgsAction { - // Receive data from Kafka, deduplicate and send them back to Kafka + /// Receive data from Kafka, deduplicate and send them back to Kafka Dedup, /// Receive data from gRPC and send them to the Kafka #[command(name = "grpc2kafka")] diff --git a/yellowstone-grpc-kafka/src/config.rs b/yellowstone-grpc-kafka/src/config.rs index 4a2abcf3..bbd96247 100644 --- a/yellowstone-grpc-kafka/src/config.rs +++ b/yellowstone-grpc-kafka/src/config.rs @@ -1,7 +1,10 @@ use { crate::dedup::{KafkaDedup, KafkaDedupMemory}, anyhow::Context, - serde::{Deserialize, Serialize}, + serde::{ + de::{self, Deserializer}, + Deserialize, Serialize, + }, std::{ collections::{HashMap, HashSet}, net::SocketAddr, @@ -47,6 +50,10 @@ pub struct ConfigDedup { pub kafka: HashMap, pub kafka_input: String, pub kafka_output: String, + #[serde( + default = "ConfigGrpc2Kafka::default_kafka_queue_size", + deserialize_with = "ConfigGrpc2Kafka::deserialize_usize_str" + )] pub kafka_queue_size: usize, pub backend: ConfigDedupBackend, } @@ -73,9 +80,39 @@ pub struct ConfigGrpc2Kafka { #[serde(default)] pub kafka: HashMap, pub kafka_topic: String, + #[serde( + default = "ConfigGrpc2Kafka::default_kafka_queue_size", + deserialize_with = "ConfigGrpc2Kafka::deserialize_usize_str" + )] pub kafka_queue_size: usize, } +impl ConfigGrpc2Kafka { + const fn default_kafka_queue_size() -> usize { + 10_000 + } + + fn deserialize_usize_str<'de, D>(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + #[derive(Deserialize)] + #[serde(untagged)] + enum Value { + Integer(usize), + String(String), + } + + match Value::deserialize(deserializer)? { + Value::Integer(value) => Ok(value), + Value::String(value) => value + .replace('_', "") + .parse::() + .map_err(de::Error::custom), + } + } +} + #[derive(Debug, Default, Deserialize, Serialize)] #[serde(default)] pub struct ConfigGrpc2KafkaRequest { diff --git a/yellowstone-grpc-kafka/src/grpc.rs b/yellowstone-grpc-kafka/src/grpc.rs index c1a455ff..c8772e50 100644 --- a/yellowstone-grpc-kafka/src/grpc.rs +++ b/yellowstone-grpc-kafka/src/grpc.rs @@ -22,6 +22,7 @@ use { }, Request, Response, Result as TonicResult, Status, }, + tonic_health::server::health_reporter, tracing::{error, info}, yellowstone_grpc_proto::prelude::{ geyser_server::{Geyser, GeyserServer}, @@ -73,8 +74,13 @@ impl GrpcService { let shutdown_grpc = Arc::clone(&shutdown); let server = tokio::spawn(async move { + // gRPC Health check service + let (mut health_reporter, health_service) = health_reporter(); + health_reporter.set_serving::>().await; + Server::builder() .http2_keepalive_interval(Some(Duration::from_secs(5))) + .add_service(health_service) .add_service(service) .serve_with_incoming_shutdown(incoming, shutdown_grpc.notified()) .await