Skip to content

Commit

Permalink
kafka: fix message size for gRPC client (#195)
Browse files Browse the repository at this point in the history
  • Loading branch information
fanatid authored Oct 6, 2023
1 parent a090e97 commit b9e734f
Show file tree
Hide file tree
Showing 8 changed files with 53 additions and 48 deletions.
12 changes: 12 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,20 @@ The minor version will be incremented upon a breaking change and the patch versi

### Features

### Features

- client: add `GeyserGrpcClient::subscribe_once2` ([#195](https://github.com/rpcpool/yellowstone-grpc/pull/195)).

### Fixes

## 2023-10-06

- yellowstone-grpc-kafka-1.0.0-rc.2+solana.1.16.15

### Fixes

- kafka: fix message size for gRPC client ([#195](https://github.com/rpcpool/yellowstone-grpc/pull/195)).

### Breaking

## 2023-10-05
Expand Down
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ members = [
"examples/rust", # 1.10.0+solana.1.16.15
"yellowstone-grpc-client", # 1.11.0+solana.1.16.15
"yellowstone-grpc-geyser", # 1.9.0+solana.1.16.15
"yellowstone-grpc-kafka", # 1.0.0-rc.1+solana.1.16.15
"yellowstone-grpc-kafka", # 1.0.0-rc.2+solana.1.16.15
"yellowstone-grpc-proto", # 1.10.0+solana.1.16.15
]

Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ docker-compose -f ./yellowstone-grpc-kafka/docker-kafka.yml up
kafka_2.13-3.5.0/bin/kafka-topics.sh --bootstrap-server localhost:29092 --create --topic grpc1
# send messages from gRPC to Kafka
cargo run --bin grpc-kafka -- --config yellowstone-grpc-kafka/config.json --prometheus 127.0.0.1:8873 grpc2kafka
# read messages from Kafka
kafka_2.13-3.5.0/bin/kafka-console-consumer.sh --bootstrap-server localhost:29092 --topic grpc1
```

### License
Expand Down
31 changes: 19 additions & 12 deletions yellowstone-grpc-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,20 +190,27 @@ impl<F: Interceptor> GeyserGrpcClient<F> {
blocks_meta: HashMap<String, SubscribeRequestFilterBlocksMeta>,
commitment: Option<CommitmentLevel>,
accounts_data_slice: Vec<SubscribeRequestAccountsDataSlice>,
) -> GeyserGrpcClientResult<impl Stream<Item = Result<SubscribeUpdate, Status>>> {
self.subscribe_once2(SubscribeRequest {
slots,
accounts,
transactions,
entry,
blocks,
blocks_meta,
commitment: commitment.map(|value| value as i32),
accounts_data_slice,
})
.await
}

#[allow(clippy::too_many_arguments)]
pub async fn subscribe_once2(
&mut self,
request: SubscribeRequest,
) -> GeyserGrpcClientResult<impl Stream<Item = Result<SubscribeUpdate, Status>>> {
let (mut subscribe_tx, response) = self.subscribe().await?;
subscribe_tx
.send(SubscribeRequest {
slots,
accounts,
transactions,
entry,
blocks,
blocks_meta,
commitment: commitment.map(|value| value as i32),
accounts_data_slice,
})
.await?;
subscribe_tx.send(request).await?;
Ok(response)
}

Expand Down
3 changes: 2 additions & 1 deletion yellowstone-grpc-kafka/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "yellowstone-grpc-kafka"
version = "1.0.0-rc.1+solana.1.16.15"
version = "1.0.0-rc.2+solana.1.16.15"
authors = ["Triton One"]
edition = "2021"
description = "Yellowstone gRPC Kafka Producer/Dedup/Consumer"
Expand All @@ -26,6 +26,7 @@ tonic = { version = "0.10.2", features = ["gzip", "tls", "tls-roots"] }
tonic-health = "0.10.2"
tracing = "0.1.37"
tracing-subscriber = { version = "0.3.17", features = ["env-filter"] }
yellowstone-grpc-client = { path = "../yellowstone-grpc-client" }
yellowstone-grpc-proto = { path = "../yellowstone-grpc-proto" }

[build-dependencies]
Expand Down
2 changes: 1 addition & 1 deletion yellowstone-grpc-kafka/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
},
"grpc2kafka": {
"endpoint": "http://127.0.0.1:10000",
"x_token": "",
"x_token": null,
"request": {
"slots": ["client"],
"blocks": {
Expand Down
46 changes: 14 additions & 32 deletions yellowstone-grpc-kafka/src/bin/grpc-kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@ use {
anyhow::Context,
clap::{Parser, Subcommand},
futures::{
channel::mpsc,
future::{BoxFuture, FutureExt},
sink::SinkExt,
stream::StreamExt,
},
rdkafka::{
Expand All @@ -14,31 +12,26 @@ use {
producer::{FutureProducer, FutureRecord},
},
sha2::{Digest, Sha256},
std::{net::SocketAddr, sync::Arc},
std::{net::SocketAddr, sync::Arc, time::Duration},
tokio::{
signal::unix::{signal, SignalKind},
task::JoinSet,
},
tonic::{
codec::Streaming,
metadata::AsciiMetadataValue,
transport::{Channel, ClientTlsConfig},
Request, Response,
},
tracing::{debug, trace, warn},
tracing_subscriber::{
filter::{EnvFilter, LevelFilter},
layer::SubscriberExt,
util::SubscriberInitExt,
},
yellowstone_grpc_client::GeyserGrpcClient,
yellowstone_grpc_kafka::{
config::{Config, ConfigDedup, ConfigGrpc2Kafka, ConfigKafka2Grpc, GrpcRequestToProto},
dedup::KafkaDedup,
grpc::GrpcService,
prom,
},
yellowstone_grpc_proto::{
prelude::{geyser_client::GeyserClient, subscribe_update::UpdateOneof, SubscribeUpdate},
prelude::{subscribe_update::UpdateOneof, SubscribeUpdate},
prost::Message as _,
},
};
Expand Down Expand Up @@ -218,28 +211,17 @@ impl ArgsAction {
.create()
.context("failed to create kafka producer")?;

// Create gRPC client
let mut endpoint = Channel::from_shared(config.endpoint)?;
if endpoint.uri().scheme_str() == Some("https") {
endpoint = endpoint.tls_config(ClientTlsConfig::new())?;
}
let channel = endpoint.connect().await?;
let x_token: Option<AsciiMetadataValue> = match config.x_token {
Some(x_token) => Some(x_token.try_into()?),
None => None,
};
let mut client = GeyserClient::with_interceptor(channel, move |mut req: Request<()>| {
if let Some(x_token) = x_token.clone() {
req.metadata_mut().insert("x-token", x_token);
}
Ok(req)
});

// Subscribe on Geyser events
let (mut subscribe_tx, subscribe_rx) = mpsc::unbounded();
subscribe_tx.send(config.request.to_proto()).await?;
let response: Response<Streaming<SubscribeUpdate>> = client.subscribe(subscribe_rx).await?;
let mut geyser = response.into_inner().boxed();
// Create gRPC client & subscribe
let mut client = GeyserGrpcClient::connect_with_timeout(
config.endpoint,
config.x_token,
None,
Some(Duration::from_secs(10)),
Some(Duration::from_secs(5)),
false,
)
.await?;
let mut geyser = client.subscribe_once2(config.request.to_proto()).await?;

// Receive-send loop
let mut send_tasks = JoinSet::new();
Expand Down

0 comments on commit b9e734f

Please sign in to comment.