From d2c4659f761de613436b857958556f9ddc4e58b7 Mon Sep 17 00:00:00 2001 From: Kirill Fomichev Date: Thu, 4 Apr 2024 18:04:26 -0400 Subject: [PATCH] client: add `GeyserGrpcBuilder` (#309) --- CHANGELOG.md | 15 + Cargo.lock | 67 ++- Cargo.toml | 24 +- examples/rust/Cargo.toml | 2 +- examples/rust/src/bin/client.rs | 46 +- examples/rust/src/bin/subscribe-ping.rs | 5 +- examples/rust/src/bin/tx-blocktime.rs | 5 +- examples/typescript/package-lock.json | 6 +- examples/typescript/package.json | 2 +- examples/typescript/src/client.ts | 50 ++- .../package-lock.json | 6 +- yellowstone-grpc-client-nodejs/package.json | 4 +- yellowstone-grpc-client/Cargo.toml | 3 +- yellowstone-grpc-client/src/lib.rs | 416 ++++++++++++------ yellowstone-grpc-geyser/Cargo.toml | 2 +- yellowstone-grpc-proto/Cargo.toml | 2 +- yellowstone-grpc-tools/Cargo.toml | 2 +- .../src/bin/grpc-google-pubsub.rs | 24 +- yellowstone-grpc-tools/src/bin/grpc-kafka.rs | 17 +- 19 files changed, 444 insertions(+), 254 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 44c021f8..7e853ac5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,19 @@ The minor version will be incremented upon a breaking change and the patch versi ### Fixes +### Features + +### Breaking + +## 2024-04-04 + +- yellowstone-grpc-client-1.15.0+solana.1.18.9 +- yellowstone-grpc-geyser-1.14.0+solana.1.18.9 +- yellowstone-grpc-proto-1.14.0+solana.1.18.9 +- yellowstone-grpc-tools-1.0.0-rc.11+solana.1.18.9 + +### Fixes + - deps: update `h2` crate (`RUSTSEC-2024-0332`) ([#316](https://github.com/rpcpool/yellowstone-grpc/pull/316)) ### Features @@ -25,6 +38,8 @@ The minor version will be incremented upon a breaking change and the patch versi ### Breaking +- client: add `GeyserGrpcBuilder` ([#309](https://github.com/rpcpool/yellowstone-grpc/pull/309)) + ## 2024-03-20 - yellowstone-grpc-client-1.14.0+solana.1.18.7 diff --git a/Cargo.lock b/Cargo.lock index eaa7f7b9..c3d1394c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3624,9 +3624,9 @@ dependencies = [ [[package]] name = "solana-account-decoder" -version = "1.18.7" +version = "1.18.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9204641d327fc2dc7cebcd07bd63fccbc2695057040728924395d5e4ab491b17" +checksum = "6c5f49893e9e7cd1a45869e03ad6d767666544e47aa39cff5bcfd10c9b156177" dependencies = [ "Inflector", "base64 0.21.7", @@ -3649,9 +3649,9 @@ dependencies = [ [[package]] name = "solana-config-program" -version = "1.18.7" +version = "1.18.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c7bf92f66a8643b2a1da993a90d2fbd1860eda936d8ed0f931621360a0a707bf" +checksum = "617228f9569238e544b526c422b632de368a5ce748a7af04f7f762bd937f42f3" dependencies = [ "bincode", "chrono", @@ -3663,9 +3663,9 @@ dependencies = [ [[package]] name = "solana-frozen-abi" -version = "1.18.7" +version = "1.18.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "90bc310416403e8bd4f10799d7736a3a17126b217236983a884f0e1989b93040" +checksum = "cc46edad65d122c8b8634aa9ad11276a804c1aa2404755577fdfdea67484b620" dependencies = [ "block-buffer 0.10.4", "bs58", @@ -3688,9 +3688,9 @@ dependencies = [ [[package]] name = "solana-frozen-abi-macro" -version = "1.18.7" +version = "1.18.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7fc8882e99fb0589cafdd48690c0c1b589d419110630a25e221de68ee1c10fb3" +checksum = "c71c603d2203da423cfd8862572ffca7165268e76cab181035f50d106c3710eb" dependencies = [ "proc-macro2", "quote", @@ -3700,9 +3700,9 @@ dependencies = [ [[package]] name = "solana-geyser-plugin-interface" -version = "1.18.7" +version = "1.18.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dd63fd201e8d6b48ca9ca5bd454d30d99f2df4db5686e2cf2e56c39d937a705e" +checksum = "a80cbffaa91e84aa9031ccf63b7f21265ec787cd05280956026d4580ddbbc707" dependencies = [ "log", "solana-sdk", @@ -3712,9 +3712,9 @@ dependencies = [ [[package]] name = "solana-logger" -version = "1.18.7" +version = "1.18.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d232989da2972107e5e40dcc179f8f4ca2ddcf1cb9ac2b27305ca97f2a0c0408" +checksum = "f5b3eef85d82c2c3030acb1d3272d77984d118d2a026d9b1a2cecc6e4c6602eb" dependencies = [ "env_logger 0.9.3", "lazy_static", @@ -3723,9 +3723,9 @@ dependencies = [ [[package]] name = "solana-measure" -version = "1.18.7" +version = "1.18.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "31ae59f9a98ff5d6e62a516f35a30035d57b07bca4330df265f964936ece573a" +checksum = "bd072023ab870dcb7d524980d9481317bfd29c1dfd74a067747c8d1d249d1075" dependencies = [ "log", "solana-sdk", @@ -3733,9 +3733,9 @@ dependencies = [ [[package]] name = "solana-metrics" -version = "1.18.7" +version = "1.18.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0d80cc2dfca741baa590d11e9fbe2f9d5c10396b738b7224c276532ffb5ee2e0" +checksum = "6f594ca2a2698983b515580925d10be20b079617154bbed07d6cd455d1661b90" dependencies = [ "crossbeam-channel", "gethostname", @@ -3748,9 +3748,9 @@ dependencies = [ [[package]] name = "solana-program" -version = "1.18.7" +version = "1.18.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e9f49c1f01637722d95bbcc9de05c49112bd4aa6aeacad377fa5b3524785cb43" +checksum = "760e9e923050f30f03a159aec9ba1fe09ae7c7494ebd8ba74dc5b7429b11085b" dependencies = [ "ark-bn254", "ark-ec", @@ -3803,9 +3803,9 @@ dependencies = [ [[package]] name = "solana-program-runtime" -version = "1.18.7" +version = "1.18.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aafb4bb43f3beaa971ca30fe6d7b6b15723f4cf67262c88e5ca62d520888e091" +checksum = "a61b65be846413bc504ecae468b6a3fa91b1b37631c80074c41ada8cdc36d165" dependencies = [ "base64 0.21.7", "bincode", @@ -3831,9 +3831,9 @@ dependencies = [ [[package]] name = "solana-sdk" -version = "1.18.7" +version = "1.18.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "32067a43e8911d442c50c2fb3a40ac3e73f5645b8ae911ba5f4c5af9190f3b29" +checksum = "1788023924ebb072288868f8b4b72f5459c1ff653238d769700da9c8043a8aea" dependencies = [ "assert_matches", "base64 0.21.7", @@ -3886,9 +3886,9 @@ dependencies = [ [[package]] name = "solana-sdk-macro" -version = "1.18.7" +version = "1.18.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "457e23df0b7b859eba7b72bc525f63f3a415500fb0a17d554e5166378f4c9b4b" +checksum = "6f3b24f46820e8912b81719a828a3d05f4fbd2f6afdc13826b0327df065ab795" dependencies = [ "bs58", "proc-macro2", @@ -3905,9 +3905,9 @@ checksum = "468aa43b7edb1f9b7b7b686d5c3aeb6630dc1708e86e31343499dd5c4d775183" [[package]] name = "solana-transaction-status" -version = "1.18.7" +version = "1.18.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1ce29d9356c0467d2659103ec9f13edb185be2919a3b034b7f602c6eb89fa1b9" +checksum = "a5fa023f9a09216e809bf28c1dc16c8dfce726dfa64133f9016e8a1f01267f39" dependencies = [ "Inflector", "base64 0.21.7", @@ -3930,9 +3930,9 @@ dependencies = [ [[package]] name = "solana-zk-token-sdk" -version = "1.18.7" +version = "1.18.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c8b429abaacfd233305d996bed2d453e33835cb3bf2508bc72d12cbc4483514f" +checksum = "76cba1f80c9001dc788655f2d7d5671af55c7a0a49b95de819f2fc45d8b803b0" dependencies = [ "aes-gcm-siv", "base64 0.21.7", @@ -5241,11 +5241,10 @@ dependencies = [ [[package]] name = "yellowstone-grpc-client" -version = "1.14.0+solana.1.18.7" +version = "1.15.0+solana.1.18.9" dependencies = [ "bytes", "futures", - "http", "thiserror", "tokio", "tonic 0.10.2", @@ -5255,7 +5254,7 @@ dependencies = [ [[package]] name = "yellowstone-grpc-client-simple" -version = "1.12.0+solana.1.18.7" +version = "1.13.0+solana.1.18.9" dependencies = [ "anyhow", "backoff", @@ -5278,7 +5277,7 @@ dependencies = [ [[package]] name = "yellowstone-grpc-geyser" -version = "1.13.0+solana.1.18.7" +version = "1.14.0+solana.1.18.9" dependencies = [ "anyhow", "base64 0.21.7", @@ -5311,7 +5310,7 @@ dependencies = [ [[package]] name = "yellowstone-grpc-proto" -version = "1.13.0+solana.1.18.7" +version = "1.14.0+solana.1.18.9" dependencies = [ "anyhow", "bincode", @@ -5326,7 +5325,7 @@ dependencies = [ [[package]] name = "yellowstone-grpc-tools" -version = "1.0.0-rc.10+solana.1.18.7" +version = "1.0.0-rc.11+solana.1.18.9" dependencies = [ "anyhow", "async-trait", diff --git a/Cargo.toml b/Cargo.toml index 092abf3b..f2ba278c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,11 +1,11 @@ [workspace] resolver = "2" members = [ - "examples/rust", # 1.12.0+solana.1.18.7 - "yellowstone-grpc-client", # 1.14.0+solana.1.18.7 - "yellowstone-grpc-geyser", # 1.13.0+solana.1.18.7 - "yellowstone-grpc-proto", # 1.13.0+solana.1.18.7 - "yellowstone-grpc-tools", # 1.0.0-rc.10+solana.1.18.7 + "examples/rust", # 1.13.0+solana.1.18.9 + "yellowstone-grpc-client", # 1.15.0+solana.1.18.9 + "yellowstone-grpc-geyser", # 1.14.0+solana.1.18.9 + "yellowstone-grpc-proto", # 1.14.0+solana.1.18.9 + "yellowstone-grpc-tools", # 1.0.0-rc.11+solana.1.18.9 ] [workspace.package] @@ -51,11 +51,11 @@ serde = "1.0.145" serde_json = "1.0.86" serde_yaml = "0.9.25" sha2 = "0.10.7" -solana-account-decoder = "=1.18.7" -solana-geyser-plugin-interface = "=1.18.7" -solana-logger = "=1.18.7" -solana-sdk = "=1.18.7" -solana-transaction-status = "=1.18.7" +solana-account-decoder = "=1.18.9" +solana-geyser-plugin-interface = "=1.18.9" +solana-logger = "=1.18.9" +solana-sdk = "=1.18.9" +solana-transaction-status = "=1.18.9" spl-token-2022 = "0.9.0" thiserror = "1.0" tokio = "1.21.2" @@ -66,8 +66,8 @@ tonic-health = "0.10.2" tracing = "0.1.37" tracing-subscriber = "0.3.17" vergen = "8.2.1" -yellowstone-grpc-client = { path = "yellowstone-grpc-client", version = "=1.14.0+solana.1.18.7" } -yellowstone-grpc-proto = { path = "yellowstone-grpc-proto", version = "=1.13.0+solana.1.18.7" } +yellowstone-grpc-client = { path = "yellowstone-grpc-client", version = "=1.15.0+solana.1.18.9" } +yellowstone-grpc-proto = { path = "yellowstone-grpc-proto", version = "=1.14.0+solana.1.18.9" } [profile.release] debug = true diff --git a/examples/rust/Cargo.toml b/examples/rust/Cargo.toml index abd184dd..4dec6faf 100644 --- a/examples/rust/Cargo.toml +++ b/examples/rust/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "yellowstone-grpc-client-simple" -version = "1.12.0+solana.1.18.7" +version = "1.13.0+solana.1.18.9" authors = { workspace = true } edition = { workspace = true } homepage = { workspace = true } diff --git a/examples/rust/src/bin/client.rs b/examples/rust/src/bin/client.rs index d67f567a..72572165 100644 --- a/examples/rust/src/bin/client.rs +++ b/examples/rust/src/bin/client.rs @@ -7,20 +7,17 @@ use { solana_transaction_status::{EncodedTransactionWithStatusMeta, UiTransactionEncoding}, std::{collections::HashMap, env, fmt, fs::File, sync::Arc, time::Duration}, tokio::sync::Mutex, - yellowstone_grpc_client::{GeyserGrpcClient, GeyserGrpcClientError}, - yellowstone_grpc_proto::{ - prelude::{ - subscribe_request_filter_accounts_filter::Filter as AccountsFilterDataOneof, - subscribe_request_filter_accounts_filter_memcmp::Data as AccountsFilterMemcmpOneof, - subscribe_update::UpdateOneof, CommitmentLevel, SubscribeRequest, - SubscribeRequestAccountsDataSlice, SubscribeRequestFilterAccounts, - SubscribeRequestFilterAccountsFilter, SubscribeRequestFilterAccountsFilterMemcmp, - SubscribeRequestFilterBlocks, SubscribeRequestFilterBlocksMeta, - SubscribeRequestFilterEntry, SubscribeRequestFilterSlots, - SubscribeRequestFilterTransactions, SubscribeRequestPing, SubscribeUpdateAccount, - SubscribeUpdateTransaction, SubscribeUpdateTransactionStatus, - }, - tonic::service::Interceptor, + yellowstone_grpc_client::{GeyserGrpcClient, GeyserGrpcClientError, Interceptor}, + yellowstone_grpc_proto::prelude::{ + subscribe_request_filter_accounts_filter::Filter as AccountsFilterDataOneof, + subscribe_request_filter_accounts_filter_memcmp::Data as AccountsFilterMemcmpOneof, + subscribe_update::UpdateOneof, CommitmentLevel, SubscribeRequest, + SubscribeRequestAccountsDataSlice, SubscribeRequestFilterAccounts, + SubscribeRequestFilterAccountsFilter, SubscribeRequestFilterAccountsFilterMemcmp, + SubscribeRequestFilterBlocks, SubscribeRequestFilterBlocksMeta, + SubscribeRequestFilterEntry, SubscribeRequestFilterSlots, + SubscribeRequestFilterTransactions, SubscribeRequestPing, SubscribeUpdateAccount, + SubscribeUpdateTransaction, SubscribeUpdateTransactionStatus, }, }; @@ -54,6 +51,16 @@ impl Args { fn get_commitment(&self) -> Option { Some(self.commitment.unwrap_or_default().into()) } + + async fn connect(&self) -> anyhow::Result> { + GeyserGrpcClient::build_from_shared(self.endpoint.clone())? + .x_token(self.x_token.clone())? + .connect_timeout(Duration::from_secs(10)) + .timeout(Duration::from_secs(10)) + .connect() + .await + .map_err(Into::into) + } } #[derive(Debug, Clone, Copy, Default, ValueEnum)] @@ -518,16 +525,7 @@ async fn main() -> anyhow::Result<()> { drop(zero_attempts); let commitment = args.get_commitment(); - let mut client = GeyserGrpcClient::connect_with_timeout( - args.endpoint, - args.x_token, - None, - Some(Duration::from_secs(10)), - Some(Duration::from_secs(10)), - false, - ) - .await - .map_err(|e| backoff::Error::transient(anyhow::Error::new(e)))?; + let mut client = args.connect().await.map_err(backoff::Error::transient)?; info!("Connected"); match &args.action { diff --git a/examples/rust/src/bin/subscribe-ping.rs b/examples/rust/src/bin/subscribe-ping.rs index 7e8a1449..89540a80 100644 --- a/examples/rust/src/bin/subscribe-ping.rs +++ b/examples/rust/src/bin/subscribe-ping.rs @@ -33,7 +33,10 @@ async fn main() -> anyhow::Result<()> { let args = Args::parse(); - let mut client = GeyserGrpcClient::connect(args.endpoint, args.x_token, None)?; + let mut client = GeyserGrpcClient::build_from_shared(args.endpoint)? + .x_token(args.x_token)? + .connect() + .await?; let (mut subscribe_tx, mut stream) = client.subscribe().await?; futures::try_join!( diff --git a/examples/rust/src/bin/tx-blocktime.rs b/examples/rust/src/bin/tx-blocktime.rs index aa83f733..18cdb0fe 100644 --- a/examples/rust/src/bin/tx-blocktime.rs +++ b/examples/rust/src/bin/tx-blocktime.rs @@ -83,7 +83,10 @@ async fn main() -> anyhow::Result<()> { let args = Args::parse(); - let mut client = GeyserGrpcClient::connect(args.endpoint, args.x_token, None)?; + let mut client = GeyserGrpcClient::build_from_shared(args.endpoint)? + .x_token(args.x_token)? + .connect() + .await?; let (mut subscribe_tx, mut stream) = client.subscribe().await?; let commitment: CommitmentLevel = args.commitment.unwrap_or_default().into(); diff --git a/examples/typescript/package-lock.json b/examples/typescript/package-lock.json index 2ab28f28..cd59873d 100644 --- a/examples/typescript/package-lock.json +++ b/examples/typescript/package-lock.json @@ -13,19 +13,19 @@ "yargs": "^17.6.2" }, "devDependencies": { - "prettier": "2.8.3", + "prettier": "^2.8.3", "typescript": "^4.9.5" } }, "../../yellowstone-grpc-client-nodejs": { "name": "@triton-one/yellowstone-grpc", - "version": "0.4.0", + "version": "0.5.0", "license": "Apache-2.0", "dependencies": { "@grpc/grpc-js": "^1.8.0" }, "devDependencies": { - "prettier": "2.8.3", + "prettier": "^2.8.3", "ts-proto": "^1.139.0", "typescript": "^4.9.5" } diff --git a/examples/typescript/package.json b/examples/typescript/package.json index b96a9672..53252f82 100644 --- a/examples/typescript/package.json +++ b/examples/typescript/package.json @@ -16,7 +16,7 @@ "start": "npm run build && node dist/client.js" }, "devDependencies": { - "prettier": "2.8.3", + "prettier": "^2.8.3", "typescript": "^4.9.5" } } diff --git a/examples/typescript/src/client.ts b/examples/typescript/src/client.ts index c1b6f84e..9bb44373 100644 --- a/examples/typescript/src/client.ts +++ b/examples/typescript/src/client.ts @@ -10,7 +10,7 @@ async function main() { // Open connection. const client = new Client(args.endpoint, args.xToken, { - 'grpc.max_receive_message_length': 64 * 1024 * 1024 // 64MiB + "grpc.max_receive_message_length": 64 * 1024 * 1024, // 64MiB }); const commitment = parseCommitmentLevel(args.commitment); @@ -90,6 +90,7 @@ async function subscribeCommand(client, args) { accounts: {}, slots: {}, transactions: {}, + transactionsStatus: {}, entry: {}, blocks: {}, blocksMeta: {}, @@ -147,6 +148,17 @@ async function subscribeCommand(client, args) { }; } + if (args.transactionsStatus) { + request.transactionsStatus.client = { + vote: args.transactionsStatusVote, + failed: args.transactionsStatusFailed, + signature: args.transactionsStatusSignature, + accountInclude: args.transactionsStatusAccountInclude, + accountExclude: args.transactionsStatusAccountExclude, + accountRequired: args.transactionsStatusAccountRequired, + }; + } + if (args.entry) { request.entry.client = {}; } @@ -318,6 +330,38 @@ function parseCommandLineArgs() { description: "filter required account in transactions", type: "array", }, + "transactions-status": { + default: false, + describe: "subscribe on transactionsStatus updates", + type: "boolean", + }, + "transactions-status-vote": { + description: "filter vote transactions", + type: "boolean", + }, + "transactions-status-failed": { + description: "filter failed transactions", + type: "boolean", + }, + "transactions-status-signature": { + description: "filter by transaction signature", + type: "string", + }, + "transactions-status-account-include": { + default: [], + description: "filter included account in transactions", + type: "array", + }, + "transactions-status-account-exclude": { + default: [], + description: "filter excluded account in transactions", + type: "array", + }, + "transactions-status-account-required": { + default: [], + description: "filter required account in transactions", + type: "array", + }, entry: { default: false, description: "subscribe on entry updates", @@ -356,8 +400,8 @@ function parseCommandLineArgs() { ping: { default: undefined, description: "send ping request in subscribe", - type: "number" - } + type: "number", + }, }); }) .demandCommand(1) diff --git a/yellowstone-grpc-client-nodejs/package-lock.json b/yellowstone-grpc-client-nodejs/package-lock.json index 3aa8e759..4bf73b70 100644 --- a/yellowstone-grpc-client-nodejs/package-lock.json +++ b/yellowstone-grpc-client-nodejs/package-lock.json @@ -1,18 +1,18 @@ { "name": "@triton-one/yellowstone-grpc", - "version": "0.4.0", + "version": "0.5.0", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "@triton-one/yellowstone-grpc", - "version": "0.4.0", + "version": "0.5.0", "license": "Apache-2.0", "dependencies": { "@grpc/grpc-js": "^1.8.0" }, "devDependencies": { - "prettier": "2.8.3", + "prettier": "^2.8.3", "ts-proto": "^1.139.0", "typescript": "^4.9.5" } diff --git a/yellowstone-grpc-client-nodejs/package.json b/yellowstone-grpc-client-nodejs/package.json index 5f207ff9..af1d4e42 100644 --- a/yellowstone-grpc-client-nodejs/package.json +++ b/yellowstone-grpc-client-nodejs/package.json @@ -1,6 +1,6 @@ { "name": "@triton-one/yellowstone-grpc", - "version": "0.4.0", + "version": "0.5.0", "license": "Apache-2.0", "author": "Triton One", "description": "Yellowstone gRPC Geyser Node.js Client", @@ -29,7 +29,7 @@ "@grpc/grpc-js": "^1.8.0" }, "devDependencies": { - "prettier": "2.8.3", + "prettier": "^2.8.3", "ts-proto": "^1.139.0", "typescript": "^4.9.5" }, diff --git a/yellowstone-grpc-client/Cargo.toml b/yellowstone-grpc-client/Cargo.toml index 49adb7f6..f7045637 100644 --- a/yellowstone-grpc-client/Cargo.toml +++ b/yellowstone-grpc-client/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "yellowstone-grpc-client" -version = "1.14.0+solana.1.18.7" +version = "1.15.0+solana.1.18.9" authors = { workspace = true } edition = { workspace = true } description = "Yellowstone gRPC Geyser Simple Client" @@ -13,7 +13,6 @@ publish = true [dependencies] bytes = { workspace = true } futures = { workspace = true } -http = { workspace = true } thiserror ={ workspace = true } tonic = { workspace = true, features = ["tls", "tls-roots"] } tonic-health = { workspace = true } diff --git a/yellowstone-grpc-client/src/lib.rs b/yellowstone-grpc-client/src/lib.rs index df14d146..61fcf804 100644 --- a/yellowstone-grpc-client/src/lib.rs +++ b/yellowstone-grpc-client/src/lib.rs @@ -5,12 +5,11 @@ use { sink::{Sink, SinkExt}, stream::Stream, }, - http::uri::InvalidUri, - std::{collections::HashMap, time::Duration}, + std::time::Duration, tonic::{ - codec::Streaming, + codec::{CompressionEncoding, Streaming}, metadata::{errors::InvalidMetadataValue, AsciiMetadataValue}, - service::{interceptor::InterceptedService, Interceptor}, + service::interceptor::InterceptedService, transport::channel::{Channel, ClientTlsConfig, Endpoint}, Request, Response, Status, }, @@ -20,18 +19,23 @@ use { GetBlockHeightResponse, GetLatestBlockhashRequest, GetLatestBlockhashResponse, GetSlotRequest, GetSlotResponse, GetVersionRequest, GetVersionResponse, IsBlockhashValidRequest, IsBlockhashValidResponse, PingRequest, PongResponse, - SubscribeRequest, SubscribeRequestAccountsDataSlice, SubscribeRequestFilterAccounts, - SubscribeRequestFilterBlocks, SubscribeRequestFilterBlocksMeta, - SubscribeRequestFilterEntry, SubscribeRequestFilterSlots, - SubscribeRequestFilterTransactions, SubscribeRequestPing, SubscribeUpdate, + SubscribeRequest, SubscribeUpdate, }, }; +pub use tonic::service::Interceptor; + #[derive(Debug, Clone)] pub struct InterceptorXToken { pub x_token: Option, } +impl From> for InterceptorXToken { + fn from(x_token: Option) -> Self { + Self { x_token } + } +} + impl Interceptor for InterceptorXToken { fn call(&mut self, mut request: Request<()>) -> Result, Status> { if let Some(x_token) = self.x_token.clone() { @@ -43,14 +47,6 @@ impl Interceptor for InterceptorXToken { #[derive(Debug, thiserror::Error)] pub enum GeyserGrpcClientError { - #[error("Invalid URI: {0}")] - InvalidUri(#[from] InvalidUri), - #[error("Failed to parse x-token: {0}")] - MetadataValueError(#[from] InvalidMetadataValue), - #[error("Invalid X-Token length: {0}, expected 28")] - InvalidXTokenLength(usize), - #[error("gRPC transport error: {0}")] - TonicError(#[from] tonic::transport::Error), #[error("gRPC status: {0}")] TonicStatus(#[from] Status), #[error("Failed to send subscribe request: {0}")] @@ -65,90 +61,14 @@ pub struct GeyserGrpcClient { } impl GeyserGrpcClient<()> { - pub const fn max_decoding_message_size() -> usize { - 64 * 1024 * 1024 // 64 MiB + pub fn build_from_shared( + endpoint: impl Into, + ) -> GeyserGrpcBuilderResult { + Ok(GeyserGrpcBuilder::new(Endpoint::from_shared(endpoint)?)) } - fn connect2( - endpoint: E, - tls_config: Option, - x_token: Option, - ) -> GeyserGrpcClientResult<(Endpoint, InterceptorXToken)> - where - E: Into, - T: TryInto, - { - let mut endpoint = Channel::from_shared(endpoint)?; - if let Some(tls_config) = tls_config { - endpoint = endpoint.tls_config(tls_config)?; - } else if endpoint.uri().scheme_str() == Some("https") { - endpoint = endpoint.tls_config(ClientTlsConfig::new())?; - } - - let x_token: Option = match x_token { - Some(x_token) => Some(x_token.try_into()?), - None => None, - }; - match x_token { - Some(token) if token.is_empty() => { - return Err(GeyserGrpcClientError::InvalidXTokenLength(token.len())); - } - _ => {} - } - let interceptor = InterceptorXToken { x_token }; - - Ok((endpoint, interceptor)) - } - - pub fn connect( - endpoint: E, - x_token: Option, - tls_config: Option, - ) -> GeyserGrpcClientResult> - where - E: Into, - T: TryInto, - { - let (endpoint, interceptor) = Self::connect2(endpoint, tls_config, x_token)?; - let channel = endpoint.connect_lazy(); - Ok(GeyserGrpcClient::new( - HealthClient::with_interceptor(channel.clone(), interceptor.clone()), - GeyserClient::with_interceptor(channel, interceptor) - .max_decoding_message_size(Self::max_decoding_message_size()), - )) - } - - pub async fn connect_with_timeout( - endpoint: E, - x_token: Option, - tls_config: Option, - connect_timeout: Option, - request_timeout: Option, - connect_lazy: bool, - ) -> GeyserGrpcClientResult> - where - E: Into, - T: TryInto, - { - let (mut endpoint, interceptor) = Self::connect2(endpoint, tls_config, x_token)?; - - if let Some(timeout) = connect_timeout { - endpoint = endpoint.connect_timeout(timeout); - } - if let Some(timeout) = request_timeout { - endpoint = endpoint.timeout(timeout); - } - let channel = if connect_lazy { - endpoint.connect_lazy() - } else { - endpoint.connect().await? - }; - - Ok(GeyserGrpcClient::new( - HealthClient::with_interceptor(channel.clone(), interceptor.clone()), - GeyserClient::with_interceptor(channel, interceptor) - .max_decoding_message_size(Self::max_decoding_message_size()), - )) + pub fn build_from_static(endpoint: &'static str) -> GeyserGrpcBuilder { + GeyserGrpcBuilder::new(Endpoint::from_static(endpoint)) } } @@ -160,6 +80,7 @@ impl GeyserGrpcClient { Self { health, geyser } } + // Health pub async fn health_check(&mut self) -> GeyserGrpcClientResult { let request = HealthCheckRequest { service: "geyser.Geyser".to_owned(), @@ -178,6 +99,7 @@ impl GeyserGrpcClient { Ok(response.into_inner()) } + // Subscribe pub async fn subscribe( &mut self, ) -> GeyserGrpcClientResult<( @@ -206,36 +128,7 @@ impl GeyserGrpcClient { Ok((subscribe_tx, response.into_inner())) } - #[allow(clippy::too_many_arguments)] pub async fn subscribe_once( - &mut self, - slots: HashMap, - accounts: HashMap, - transactions: HashMap, - transactions_status: HashMap, - entry: HashMap, - blocks: HashMap, - blocks_meta: HashMap, - commitment: Option, - accounts_data_slice: Vec, - ping: Option, - ) -> GeyserGrpcClientResult>> { - self.subscribe_once2(SubscribeRequest { - slots, - accounts, - transactions, - transactions_status, - entry, - blocks, - blocks_meta, - commitment: commitment.map(|value| value as i32), - accounts_data_slice, - ping, - }) - .await - } - - pub async fn subscribe_once2( &mut self, request: SubscribeRequest, ) -> GeyserGrpcClientResult>> { @@ -244,6 +137,7 @@ impl GeyserGrpcClient { .map(|(_sink, stream)| stream) } + // RPC calls pub async fn ping(&mut self, count: i32) -> GeyserGrpcClientResult { let message = PingRequest { count }; let request = tonic::Request::new(message); @@ -304,49 +198,295 @@ impl GeyserGrpcClient { } } +#[derive(Debug, thiserror::Error)] +pub enum GeyserGrpcBuilderError { + #[error("Failed to parse x-token: {0}")] + MetadataValueError(#[from] InvalidMetadataValue), + #[error("Invalid X-Token length: {0}, expected 28")] + InvalidXTokenLength(usize), + #[error("gRPC transport error: {0}")] + TonicError(#[from] tonic::transport::Error), + #[error("tonic::transport::Channel should be created, use `connect` or `connect_lazy` first")] + EmptyChannel, +} + +pub type GeyserGrpcBuilderResult = Result; + +#[derive(Debug)] +pub struct GeyserGrpcBuilder { + pub endpoint: Endpoint, + pub x_token: Option, + pub send_compressed: Option, + pub accept_compressed: Option, + pub max_decoding_message_size: Option, + pub max_encoding_message_size: Option, +} + +impl GeyserGrpcBuilder { + // Create new builder + fn new(endpoint: Endpoint) -> Self { + Self { + endpoint, + x_token: None, + send_compressed: None, + accept_compressed: None, + max_decoding_message_size: None, + max_encoding_message_size: None, + } + } + + pub fn from_shared(endpoint: impl Into) -> GeyserGrpcBuilderResult { + Ok(Self::new(Endpoint::from_shared(endpoint)?)) + } + + pub fn from_static(endpoint: &'static str) -> Self { + Self::new(Endpoint::from_static(endpoint)) + } + + // Create client + fn build( + self, + channel: Channel, + ) -> GeyserGrpcBuilderResult> { + let interceptor: InterceptorXToken = self.x_token.into(); + + let mut geyser = GeyserClient::with_interceptor(channel.clone(), interceptor.clone()); + if let Some(encoding) = self.send_compressed { + geyser = geyser.send_compressed(encoding); + } + if let Some(encoding) = self.accept_compressed { + geyser = geyser.accept_compressed(encoding); + } + if let Some(limit) = self.max_decoding_message_size { + geyser = geyser.max_decoding_message_size(limit); + } + if let Some(limit) = self.max_encoding_message_size { + geyser = geyser.max_encoding_message_size(limit); + } + + Ok(GeyserGrpcClient::new( + HealthClient::with_interceptor(channel, interceptor), + geyser, + )) + } + + pub async fn connect(self) -> GeyserGrpcBuilderResult> { + let channel = self.endpoint.connect().await?; + self.build(channel) + } + + pub fn connect_lazy(self) -> GeyserGrpcBuilderResult> { + let channel = self.endpoint.connect_lazy(); + self.build(channel) + } + + // Set x-token + pub fn x_token(self, x_token: Option) -> GeyserGrpcBuilderResult + where + T: TryInto, + { + Ok(Self { + x_token: match x_token { + Some(x_token) => { + let x_token = x_token.try_into()?; + if x_token.is_empty() { + return Err(GeyserGrpcBuilderError::InvalidXTokenLength(x_token.len())); + } + Some(x_token) + } + None => None, + }, + ..self + }) + } + + // Endpoint options + pub fn connect_timeout(self, dur: Duration) -> Self { + Self { + endpoint: self.endpoint.connect_timeout(dur), + ..self + } + } + + pub fn timeout(self, dur: Duration) -> Self { + Self { + endpoint: self.endpoint.timeout(dur), + ..self + } + } + + pub fn tls_config(self, tls_config: ClientTlsConfig) -> GeyserGrpcBuilderResult { + Ok(Self { + endpoint: self.endpoint.tls_config(tls_config)?, + ..self + }) + } + + pub fn buffer_size(self, sz: impl Into>) -> Self { + Self { + endpoint: self.endpoint.buffer_size(sz), + ..self + } + } + + pub fn http2_adaptive_window(self, enabled: bool) -> Self { + Self { + endpoint: self.endpoint.http2_adaptive_window(enabled), + ..self + } + } + + pub fn http2_keep_alive_interval(self, interval: Duration) -> Self { + Self { + endpoint: self.endpoint.http2_keep_alive_interval(interval), + ..self + } + } + + pub fn initial_connection_window_size(self, sz: impl Into>) -> Self { + Self { + endpoint: self.endpoint.initial_connection_window_size(sz), + ..self + } + } + + pub fn initial_stream_window_size(self, sz: impl Into>) -> Self { + Self { + endpoint: self.endpoint.initial_stream_window_size(sz), + ..self + } + } + + pub fn keep_alive_timeout(self, duration: Duration) -> Self { + Self { + endpoint: self.endpoint.keep_alive_timeout(duration), + ..self + } + } + + pub fn keep_alive_while_idle(self, enabled: bool) -> Self { + Self { + endpoint: self.endpoint.keep_alive_while_idle(enabled), + ..self + } + } + + pub fn tcp_keepalive(self, tcp_keepalive: Option) -> Self { + Self { + endpoint: self.endpoint.tcp_keepalive(tcp_keepalive), + ..self + } + } + + pub fn tcp_nodelay(self, enabled: bool) -> Self { + Self { + endpoint: self.endpoint.tcp_nodelay(enabled), + ..self + } + } + + // Geyser options + pub fn send_compressed(self, encoding: CompressionEncoding) -> Self { + Self { + send_compressed: Some(encoding), + ..self + } + } + + pub fn accept_compressed(self, encoding: CompressionEncoding) -> Self { + Self { + accept_compressed: Some(encoding), + ..self + } + } + + pub fn max_decoding_message_size(self, limit: usize) -> Self { + Self { + max_decoding_message_size: Some(limit), + ..self + } + } + + pub fn max_encoding_message_size(self, limit: usize) -> Self { + Self { + max_encoding_message_size: Some(limit), + ..self + } + } +} + #[cfg(test)] mod tests { - use super::{GeyserGrpcClient, GeyserGrpcClientError}; + use super::{GeyserGrpcBuilderError, GeyserGrpcClient}; #[tokio::test] async fn test_channel_https_success() { let endpoint = "https://ams17.rpcpool.com:443"; let x_token = "1000000000000000000000000007"; - let res = GeyserGrpcClient::connect(endpoint, Some(x_token), None); - assert!(res.is_ok()) + + let res = GeyserGrpcClient::build_from_shared(endpoint); + assert!(res.is_ok()); + + let res = res.unwrap().x_token(Some(x_token)); + assert!(res.is_ok()); + + let res = res.unwrap().connect_lazy(); + assert!(res.is_ok()); } #[tokio::test] async fn test_channel_http_success() { let endpoint = "http://127.0.0.1:10000"; let x_token = "1234567891012141618202224268"; - let res = GeyserGrpcClient::connect(endpoint, Some(x_token), None); - assert!(res.is_ok()) + + let res = GeyserGrpcClient::build_from_shared(endpoint); + assert!(res.is_ok()); + + let res = res.unwrap().x_token(Some(x_token)); + assert!(res.is_ok()); + + let res = res.unwrap().connect_lazy(); + assert!(res.is_ok()); } #[tokio::test] async fn test_channel_invalid_token_some() { let endpoint = "http://127.0.0.1:10000"; let x_token = ""; - let res = GeyserGrpcClient::connect(endpoint, Some(x_token), None); + + let res = GeyserGrpcClient::build_from_shared(endpoint); + assert!(res.is_ok()); + + let res = res.unwrap().x_token(Some(x_token)); assert!(matches!( res, - Err(GeyserGrpcClientError::InvalidXTokenLength(_)) + Err(GeyserGrpcBuilderError::InvalidXTokenLength(_)) )); } #[tokio::test] async fn test_channel_invalid_token_none() { let endpoint = "http://127.0.0.1:10000"; - let res = GeyserGrpcClient::connect::<_, String>(endpoint, None, None); + + let res = GeyserGrpcClient::build_from_shared(endpoint); + assert!(res.is_ok()); + + let res = res.unwrap().x_token::(None); + assert!(res.is_ok()); + + let res = res.unwrap().connect_lazy(); assert!(res.is_ok()); } #[tokio::test] async fn test_channel_invalid_uri() { let endpoint = "sites/files/images/picture.png"; - let x_token = "1234567891012141618202224268"; - let res = GeyserGrpcClient::connect(endpoint, Some(x_token), None); - assert!(matches!(res, Err(GeyserGrpcClientError::InvalidUri(_)))); + + let res = GeyserGrpcClient::build_from_shared(endpoint); + assert_eq!( + format!("{:?}", res), + "Err(TonicError(tonic::transport::Error(InvalidUri, InvalidUri(InvalidFormat))))" + .to_owned() + ); } } diff --git a/yellowstone-grpc-geyser/Cargo.toml b/yellowstone-grpc-geyser/Cargo.toml index 641dd058..a46a861f 100644 --- a/yellowstone-grpc-geyser/Cargo.toml +++ b/yellowstone-grpc-geyser/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "yellowstone-grpc-geyser" -version = "1.13.0+solana.1.18.7" +version = "1.14.0+solana.1.18.9" authors = { workspace = true } edition = { workspace = true } description = "Yellowstone gRPC Geyser Plugin" diff --git a/yellowstone-grpc-proto/Cargo.toml b/yellowstone-grpc-proto/Cargo.toml index ec091129..7f8919a8 100644 --- a/yellowstone-grpc-proto/Cargo.toml +++ b/yellowstone-grpc-proto/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "yellowstone-grpc-proto" -version = "1.13.0+solana.1.18.7" +version = "1.14.0+solana.1.18.9" authors = { workspace = true } edition = { workspace = true } description = "Yellowstone gRPC Geyser Protobuf Definitions" diff --git a/yellowstone-grpc-tools/Cargo.toml b/yellowstone-grpc-tools/Cargo.toml index 0a139daa..1fb6e601 100644 --- a/yellowstone-grpc-tools/Cargo.toml +++ b/yellowstone-grpc-tools/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "yellowstone-grpc-tools" -version = "1.0.0-rc.10+solana.1.18.7" +version = "1.0.0-rc.11+solana.1.18.9" authors = { workspace = true } edition = { workspace = true } description = "Yellowstone gRPC Tools" diff --git a/yellowstone-grpc-tools/src/bin/grpc-google-pubsub.rs b/yellowstone-grpc-tools/src/bin/grpc-google-pubsub.rs index 605828bc..c29538e1 100644 --- a/yellowstone-grpc-tools/src/bin/grpc-google-pubsub.rs +++ b/yellowstone-grpc-tools/src/bin/grpc-google-pubsub.rs @@ -117,22 +117,14 @@ impl ArgsAction { let publisher = topic.new_publisher(Some(config.publisher.get_publisher_config())); // Create gRPC client & subscribe - let client = GeyserGrpcClient::connect_with_timeout( - config.endpoint, - config.x_token, - None, - Some(Duration::from_secs(10)), - Some(Duration::from_secs(5)), - false, - ) - .await?; - let mut client = GeyserGrpcClient::new( - client.health, - client - .geyser - .max_decoding_message_size(config.max_message_size), - ); - let mut geyser = client.subscribe_once2(config.request.to_proto()).await?; + let mut client = GeyserGrpcClient::build_from_shared(config.endpoint)? + .x_token(config.x_token)? + .connect_timeout(Duration::from_secs(10)) + .timeout(Duration::from_secs(5)) + .max_decoding_message_size(config.max_message_size) + .connect() + .await?; + let mut geyser = client.subscribe_once(config.request.to_proto()).await?; // Receive-send loop let mut send_tasks = JoinSet::new(); diff --git a/yellowstone-grpc-tools/src/bin/grpc-kafka.rs b/yellowstone-grpc-tools/src/bin/grpc-kafka.rs index 69a01d52..ea9bdaa0 100644 --- a/yellowstone-grpc-tools/src/bin/grpc-kafka.rs +++ b/yellowstone-grpc-tools/src/bin/grpc-kafka.rs @@ -226,16 +226,13 @@ impl ArgsAction { tokio::pin!(kafka_error_rx); // Create gRPC client & subscribe - let mut client = GeyserGrpcClient::connect_with_timeout( - config.endpoint, - config.x_token, - None, - Some(Duration::from_secs(10)), - Some(Duration::from_secs(5)), - false, - ) - .await?; - let mut geyser = client.subscribe_once2(config.request.to_proto()).await?; + let mut client = GeyserGrpcClient::build_from_shared(config.endpoint)? + .x_token(config.x_token)? + .connect_timeout(Duration::from_secs(10)) + .timeout(Duration::from_secs(5)) + .connect() + .await?; + let mut geyser = client.subscribe_once(config.request.to_proto()).await?; // Receive-send loop let mut send_tasks = JoinSet::new();