diff --git a/CHANGELOG.md b/CHANGELOG.md index 5fe67eb6..f7f3a550 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,8 +12,20 @@ The minor version will be incremented upon a breaking change and the patch versi ### Features +### Features + +- client: add `GeyserGrpcClient::subscribe_once2` ([#195](https://github.com/rpcpool/yellowstone-grpc/pull/195)). + +### Fixes + +## 2023-10-06 + +- yellowstone-grpc-kafka-1.0.0-rc.2+solana.1.16.15 + ### Fixes +- kafka: fix message size for gRPC client ([#195](https://github.com/rpcpool/yellowstone-grpc/pull/195)). + ### Breaking ## 2023-10-05 diff --git a/Cargo.lock b/Cargo.lock index b71d1f95..cbbf30b7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4639,7 +4639,7 @@ dependencies = [ [[package]] name = "yellowstone-grpc-kafka" -version = "1.0.0-rc.1+solana.1.16.15" +version = "1.0.0-rc.2+solana.1.16.15" dependencies = [ "anyhow", "async-trait", @@ -4663,6 +4663,7 @@ dependencies = [ "tracing", "tracing-subscriber", "vergen", + "yellowstone-grpc-client", "yellowstone-grpc-proto", ] diff --git a/Cargo.toml b/Cargo.toml index b98bb6a7..fb1065d1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,7 +3,7 @@ members = [ "examples/rust", # 1.10.0+solana.1.16.15 "yellowstone-grpc-client", # 1.11.0+solana.1.16.15 "yellowstone-grpc-geyser", # 1.9.0+solana.1.16.15 - "yellowstone-grpc-kafka", # 1.0.0-rc.1+solana.1.16.15 + "yellowstone-grpc-kafka", # 1.0.0-rc.2+solana.1.16.15 "yellowstone-grpc-proto", # 1.10.0+solana.1.16.15 ] diff --git a/README.md b/README.md index 6adaa91e..ef3b3a3f 100644 --- a/README.md +++ b/README.md @@ -138,6 +138,8 @@ docker-compose -f ./yellowstone-grpc-kafka/docker-kafka.yml up kafka_2.13-3.5.0/bin/kafka-topics.sh --bootstrap-server localhost:29092 --create --topic grpc1 # send messages from gRPC to Kafka cargo run --bin grpc-kafka -- --config yellowstone-grpc-kafka/config.json --prometheus 127.0.0.1:8873 grpc2kafka +# read messages from Kafka +kafka_2.13-3.5.0/bin/kafka-console-consumer.sh --bootstrap-server localhost:29092 --topic grpc1 ``` ### License diff --git a/yellowstone-grpc-client/src/lib.rs b/yellowstone-grpc-client/src/lib.rs index 4517f4c3..c2e6d19e 100644 --- a/yellowstone-grpc-client/src/lib.rs +++ b/yellowstone-grpc-client/src/lib.rs @@ -190,20 +190,27 @@ impl GeyserGrpcClient { blocks_meta: HashMap, commitment: Option, accounts_data_slice: Vec, + ) -> GeyserGrpcClientResult>> { + self.subscribe_once2(SubscribeRequest { + slots, + accounts, + transactions, + entry, + blocks, + blocks_meta, + commitment: commitment.map(|value| value as i32), + accounts_data_slice, + }) + .await + } + + #[allow(clippy::too_many_arguments)] + pub async fn subscribe_once2( + &mut self, + request: SubscribeRequest, ) -> GeyserGrpcClientResult>> { let (mut subscribe_tx, response) = self.subscribe().await?; - subscribe_tx - .send(SubscribeRequest { - slots, - accounts, - transactions, - entry, - blocks, - blocks_meta, - commitment: commitment.map(|value| value as i32), - accounts_data_slice, - }) - .await?; + subscribe_tx.send(request).await?; Ok(response) } diff --git a/yellowstone-grpc-kafka/Cargo.toml b/yellowstone-grpc-kafka/Cargo.toml index d04f5b8d..c697a32b 100644 --- a/yellowstone-grpc-kafka/Cargo.toml +++ b/yellowstone-grpc-kafka/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "yellowstone-grpc-kafka" -version = "1.0.0-rc.1+solana.1.16.15" +version = "1.0.0-rc.2+solana.1.16.15" authors = ["Triton One"] edition = "2021" description = "Yellowstone gRPC Kafka Producer/Dedup/Consumer" @@ -26,6 +26,7 @@ tonic = { version = "0.10.2", features = ["gzip", "tls", "tls-roots"] } tonic-health = "0.10.2" tracing = "0.1.37" tracing-subscriber = { version = "0.3.17", features = ["env-filter"] } +yellowstone-grpc-client = { path = "../yellowstone-grpc-client" } yellowstone-grpc-proto = { path = "../yellowstone-grpc-proto" } [build-dependencies] diff --git a/yellowstone-grpc-kafka/config.json b/yellowstone-grpc-kafka/config.json index 496c0f4f..b6f62d71 100644 --- a/yellowstone-grpc-kafka/config.json +++ b/yellowstone-grpc-kafka/config.json @@ -16,7 +16,7 @@ }, "grpc2kafka": { "endpoint": "http://127.0.0.1:10000", - "x_token": "", + "x_token": null, "request": { "slots": ["client"], "blocks": { diff --git a/yellowstone-grpc-kafka/src/bin/grpc-kafka.rs b/yellowstone-grpc-kafka/src/bin/grpc-kafka.rs index 850b1ac9..e801f59e 100644 --- a/yellowstone-grpc-kafka/src/bin/grpc-kafka.rs +++ b/yellowstone-grpc-kafka/src/bin/grpc-kafka.rs @@ -2,9 +2,7 @@ use { anyhow::Context, clap::{Parser, Subcommand}, futures::{ - channel::mpsc, future::{BoxFuture, FutureExt}, - sink::SinkExt, stream::StreamExt, }, rdkafka::{ @@ -14,23 +12,18 @@ use { producer::{FutureProducer, FutureRecord}, }, sha2::{Digest, Sha256}, - std::{net::SocketAddr, sync::Arc}, + std::{net::SocketAddr, sync::Arc, time::Duration}, tokio::{ signal::unix::{signal, SignalKind}, task::JoinSet, }, - tonic::{ - codec::Streaming, - metadata::AsciiMetadataValue, - transport::{Channel, ClientTlsConfig}, - Request, Response, - }, tracing::{debug, trace, warn}, tracing_subscriber::{ filter::{EnvFilter, LevelFilter}, layer::SubscriberExt, util::SubscriberInitExt, }, + yellowstone_grpc_client::GeyserGrpcClient, yellowstone_grpc_kafka::{ config::{Config, ConfigDedup, ConfigGrpc2Kafka, ConfigKafka2Grpc, GrpcRequestToProto}, dedup::KafkaDedup, @@ -38,7 +31,7 @@ use { prom, }, yellowstone_grpc_proto::{ - prelude::{geyser_client::GeyserClient, subscribe_update::UpdateOneof, SubscribeUpdate}, + prelude::{subscribe_update::UpdateOneof, SubscribeUpdate}, prost::Message as _, }, }; @@ -218,28 +211,17 @@ impl ArgsAction { .create() .context("failed to create kafka producer")?; - // Create gRPC client - let mut endpoint = Channel::from_shared(config.endpoint)?; - if endpoint.uri().scheme_str() == Some("https") { - endpoint = endpoint.tls_config(ClientTlsConfig::new())?; - } - let channel = endpoint.connect().await?; - let x_token: Option = match config.x_token { - Some(x_token) => Some(x_token.try_into()?), - None => None, - }; - let mut client = GeyserClient::with_interceptor(channel, move |mut req: Request<()>| { - if let Some(x_token) = x_token.clone() { - req.metadata_mut().insert("x-token", x_token); - } - Ok(req) - }); - - // Subscribe on Geyser events - let (mut subscribe_tx, subscribe_rx) = mpsc::unbounded(); - subscribe_tx.send(config.request.to_proto()).await?; - let response: Response> = client.subscribe(subscribe_rx).await?; - let mut geyser = response.into_inner().boxed(); + // Create gRPC client & subscribe + let mut client = GeyserGrpcClient::connect_with_timeout( + config.endpoint, + config.x_token, + None, + Some(Duration::from_secs(10)), + Some(Duration::from_secs(5)), + false, + ) + .await?; + let mut geyser = client.subscribe_once2(config.request.to_proto()).await?; // Receive-send loop let mut send_tasks = JoinSet::new();