diff --git a/.editorconfig b/.editorconfig index 6dddb172..3bc4f166 100644 --- a/.editorconfig +++ b/.editorconfig @@ -9,11 +9,7 @@ insert_final_newline = true [*.{diff,md}] trim_trailing_whitespace = false -[*.{js,json}] -indent_style = space -indent_size = 2 - -[*.proto] +[*.{js,json,proto,toml}] indent_style = space indent_size = 2 diff --git a/.gitignore b/.gitignore index 2fdec21a..ace60611 100644 --- a/.gitignore +++ b/.gitignore @@ -3,6 +3,7 @@ test-ledger # Rust target +yellowstone-grpc-proto/src/bin/raw.rs # Node.js examples/typescript/dist diff --git a/CHANGELOG.md b/CHANGELOG.md index 06e1100b..965b1af1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,7 @@ The minor version will be incremented upon a breaking change and the patch versi - node: remove generated grpc files ([#447](https://github.com/rpcpool/yellowstone-grpc/pull/447)) - proto: add txn_signature filter ([#445](https://github.com/rpcpool/yellowstone-grpc/pull/445)) - geyser: limit length of filter name ([#448](https://github.com/rpcpool/yellowstone-grpc/pull/448)) +- geyser: wrap messages in Arc ([#449](https://github.com/rpcpool/yellowstone-grpc/pull/449)) - examples: add progress bar to client tool ([#456](https://github.com/rpcpool/yellowstone-grpc/pull/456)) - proto: change error type in mod `convert_from` ([#457](https://github.com/rpcpool/yellowstone-grpc/pull/457)) - proto: add mod `plugin` with `FilterNames` cache ([#458](https://github.com/rpcpool/yellowstone-grpc/pull/458)) diff --git a/Cargo.lock b/Cargo.lock index 7e1ca08f..87c1c970 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -126,6 +126,12 @@ dependencies = [ "libc", ] +[[package]] +name = "anes" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4b46cbb362ab8752921c97e041f5e366ee6297bd428a31275b9fcf1e380f7299" + [[package]] name = "anstream" version = "0.6.15" @@ -717,6 +723,12 @@ dependencies = [ "url", ] +[[package]] +name = "cast" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5" + [[package]] name = "cc" version = "1.1.8" @@ -754,6 +766,33 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "ciborium" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42e69ffd6f0917f5c029256a24d0161db17cea3997d185db0d35926308770f0e" +dependencies = [ + "ciborium-io", + "ciborium-ll", + "serde", +] + +[[package]] +name = "ciborium-io" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05afea1e0a06c9be33d539b876f1ce3692f4afea2cb41f740e7743225ed1c757" + +[[package]] +name = "ciborium-ll" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57663b653d948a338bfb3eeba9bb2fd5fcfaecb9e199e87e1eda4d9e8b240fd9" +dependencies = [ + "ciborium-io", + "half", +] + [[package]] name = "cipher" version = "0.4.4" @@ -792,7 +831,7 @@ version = "4.5.13" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "501d359d5f3dcaf6ecdeee48833ae73ec6e42723a1e52419c79abf9507eec0a0" dependencies = [ - "heck", + "heck 0.5.0", "proc-macro2", "quote", "syn 2.0.72", @@ -896,6 +935,42 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "criterion" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2b12d017a929603d80db1831cd3a24082f8137ce19c69e6447f54f5fc8d692f" +dependencies = [ + "anes", + "cast", + "ciborium", + "clap", + "criterion-plot", + "is-terminal", + "itertools 0.10.5", + "num-traits", + "once_cell", + "oorandom", + "plotters", + "rayon", + "regex", + "serde", + "serde_derive", + "serde_json", + "tinytemplate", + "walkdir", +] + +[[package]] +name = "criterion-plot" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6b50826342786a51a89e2da3a28f1c32b06e387201bc2d19791f622c673706b1" +dependencies = [ + "cast", + "itertools 0.10.5", +] + [[package]] name = "crossbeam-channel" version = "0.5.13" @@ -905,6 +980,25 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "crossbeam-deque" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "613f8cc01fe9cf1a3eb3d7f488fd2fa8388403e97039e2f73692932e291a770d" +dependencies = [ + "crossbeam-epoch", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-epoch" +version = "0.9.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.20" @@ -1472,6 +1566,16 @@ dependencies = [ "tracing", ] +[[package]] +name = "half" +version = "2.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6dd08c532ae367adf81c312a4580bc67f1d0fe8bc9c460520283f4c0ff277888" +dependencies = [ + "cfg-if", + "crunchy", +] + [[package]] name = "hash32" version = "0.2.1" @@ -1502,6 +1606,12 @@ version = "0.14.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" +[[package]] +name = "heck" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8" + [[package]] name = "heck" version = "0.5.0" @@ -1523,6 +1633,12 @@ version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024" +[[package]] +name = "hermit-abi" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fbf6a919d6cf397374f7dfeeea91d974c7c0a7221d0d0f4f20d859d329e53fcc" + [[package]] name = "hex" version = "0.4.3" @@ -1559,6 +1675,15 @@ dependencies = [ "hmac 0.8.1", ] +[[package]] +name = "home" +version = "0.5.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3d1354bf6b7235cb4a0576c2619fd4ed18183f689b12b006a0ee7329eeff9a5" +dependencies = [ + "windows-sys 0.52.0", +] + [[package]] name = "hostname" version = "0.4.0" @@ -1842,6 +1967,17 @@ version = "2.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f518f335dce6725a761382244631d86cf0ccb2863413590b31338feb467f9c3" +[[package]] +name = "is-terminal" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "261f68e344040fbd0edea105bef17c66edf46f984ddb1115b775ce31be948f4b" +dependencies = [ + "hermit-abi 0.4.0", + "libc", + "windows-sys 0.52.0", +] + [[package]] name = "is_terminal_polyfill" version = "1.70.1" @@ -2056,6 +2192,12 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "multimap" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a" + [[package]] name = "multimap" version = "0.10.0" @@ -2216,6 +2358,12 @@ version = "1.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" +[[package]] +name = "oorandom" +version = "11.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b410bbe7e14ab526a0e86877eb47c6996a2bd7746f027ba551028c925390e4e9" + [[package]] name = "opaque-debug" version = "0.3.1" @@ -2335,6 +2483,34 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b4596b6d070b27117e987119b4dac604f3c58cfb0b191112e24771b2faeac1a6" +[[package]] +name = "plotters" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5aeb6f403d7a4911efb1e33402027fc44f29b5bf6def3effcc22d7bb75f2b747" +dependencies = [ + "num-traits", + "plotters-backend", + "plotters-svg", + "wasm-bindgen", + "web-sys", +] + +[[package]] +name = "plotters-backend" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df42e13c12958a16b3f7f4386b9ab1f3e7933914ecea48da7139435263a4172a" + +[[package]] +name = "plotters-svg" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51bae2ac328883f7acdfea3d66a7c35751187f870bc81f94563733a154d7a670" +dependencies = [ + "plotters-backend", +] + [[package]] name = "polyval" version = "0.6.2" @@ -2368,6 +2544,16 @@ dependencies = [ "zerocopy", ] +[[package]] +name = "prettyplease" +version = "0.1.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c8646e95016a7a6c4adea95bafa8a16baab64b583356217f2c85db4a39d9a86" +dependencies = [ + "proc-macro2", + "syn 1.0.109", +] + [[package]] name = "prettyplease" version = "0.2.20" @@ -2444,6 +2630,16 @@ dependencies = [ "thiserror", ] +[[package]] +name = "prost" +version = "0.11.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b82eaa1d779e9a4bc1c3217db8ffbeabaae1dca241bf70183242128d48681cd" +dependencies = [ + "bytes", + "prost-derive 0.11.9", +] + [[package]] name = "prost" version = "0.13.1" @@ -2451,7 +2647,29 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e13db3d3fde688c61e2446b4d843bc27a7e8af269a69440c0308021dc92333cc" dependencies = [ "bytes", - "prost-derive", + "prost-derive 0.13.1", +] + +[[package]] +name = "prost-build" +version = "0.11.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "119533552c9a7ffacc21e099c24a0ac8bb19c2a2a3f363de84cd9b844feab270" +dependencies = [ + "bytes", + "heck 0.4.1", + "itertools 0.10.5", + "lazy_static", + "log", + "multimap 0.8.3", + "petgraph", + "prettyplease 0.1.25", + "prost 0.11.9", + "prost-types 0.11.9", + "regex", + "syn 1.0.109", + "tempfile", + "which", ] [[package]] @@ -2461,20 +2679,33 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5bb182580f71dd070f88d01ce3de9f4da5021db7115d2e1c3605a754153b77c1" dependencies = [ "bytes", - "heck", + "heck 0.5.0", "itertools 0.12.1", "log", - "multimap", + "multimap 0.10.0", "once_cell", "petgraph", - "prettyplease", - "prost", - "prost-types", + "prettyplease 0.2.20", + "prost 0.13.1", + "prost-types 0.13.1", "regex", "syn 2.0.72", "tempfile", ] +[[package]] +name = "prost-derive" +version = "0.11.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5d2d8d10f3c6ded6da8b05b5fb3b8a5082514344d56c9f871412d29b4e075b4" +dependencies = [ + "anyhow", + "itertools 0.10.5", + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "prost-derive" version = "0.13.1" @@ -2488,13 +2719,22 @@ dependencies = [ "syn 2.0.72", ] +[[package]] +name = "prost-types" +version = "0.11.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "213622a1460818959ac1181aaeb2dc9c7f63df720db7d788b3e24eacd1983e13" +dependencies = [ + "prost 0.11.9", +] + [[package]] name = "prost-types" version = "0.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cee5168b05f49d4b0ca581206eb14a7b22fafd963efe729ac48eb03266e25cc2" dependencies = [ - "prost", + "prost 0.13.1", ] [[package]] @@ -2601,6 +2841,26 @@ dependencies = [ "rand_core 0.5.1", ] +[[package]] +name = "rayon" +version = "1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b418a60154510ca1a002a752ca9714984e21e4241e804d32555251faf8b78ffa" +dependencies = [ + "either", + "rayon-core", +] + +[[package]] +name = "rayon-core" +version = "1.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1465873a3dfdaa8ae7cb14b4383657caab0b3e8a0aa9ae8e04b044854c8dfce2" +dependencies = [ + "crossbeam-deque", + "crossbeam-utils", +] + [[package]] name = "redox_syscall" version = "0.5.3" @@ -2823,6 +3083,15 @@ version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f3cb5ba0dc43242ce17de99c180e96db90b235b8a9fdc9543c96d2209116bd9f" +[[package]] +name = "same-file" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502" +dependencies = [ + "winapi-util", +] + [[package]] name = "schannel" version = "0.1.23" @@ -3308,6 +3577,23 @@ version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "468aa43b7edb1f9b7b7b686d5c3aeb6630dc1708e86e31343499dd5c4d775183" +[[package]] +name = "solana-storage-proto" +version = "2.0.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3bedde2051ddfa8408a504db80a3007f7b0e9aba537ace9dd06c2187084e0a1a" +dependencies = [ + "bincode", + "bs58", + "prost 0.11.9", + "protobuf-src", + "serde", + "solana-account-decoder", + "solana-sdk", + "solana-transaction-status", + "tonic-build 0.9.2", +] + [[package]] name = "solana-transaction-status" version = "2.0.10" @@ -3778,6 +4064,16 @@ dependencies = [ "time-core", ] +[[package]] +name = "tinytemplate" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be4d6b5f19ff7664e8c98d03e2139cb510db9b0a60b55f8e8709b689d939b6bc" +dependencies = [ + "serde", + "serde_json", +] + [[package]] name = "tinyvec" version = "1.8.0" @@ -3940,7 +4236,7 @@ dependencies = [ "hyper-util", "percent-encoding", "pin-project", - "prost", + "prost 0.13.1", "rustls-native-certs", "rustls-pemfile 2.1.3", "socket2", @@ -3954,15 +4250,28 @@ dependencies = [ "zstd 0.13.2", ] +[[package]] +name = "tonic-build" +version = "0.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a6fdaae4c2c638bb70fe42803a26fbd6fc6ac8c72f5c59f67ecc2a2dcabf4b07" +dependencies = [ + "prettyplease 0.1.25", + "proc-macro2", + "prost-build 0.11.9", + "quote", + "syn 1.0.109", +] + [[package]] name = "tonic-build" version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "568392c5a2bd0020723e3f387891176aabafe36fd9fcd074ad309dfa0c8eb964" dependencies = [ - "prettyplease", + "prettyplease 0.2.20", "proc-macro2", - "prost-build", + "prost-build 0.13.1", "quote", "syn 2.0.72", ] @@ -3974,7 +4283,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e1e10e6a96ee08b6ce443487d4368442d328d0e746f3681f81127f7dc41b4955" dependencies = [ "async-stream", - "prost", + "prost 0.13.1", "tokio", "tokio-stream", "tonic", @@ -4178,6 +4487,16 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6a02e4885ed3bc0f2de90ea6dd45ebcbb66dacffe03547fadbb0eeae2770887d" +[[package]] +name = "walkdir" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29790946404f91d9c5d06f9874efddea1dc06c5efe94541a7d6863108e3a5e4b" +dependencies = [ + "same-file", + "winapi-util", +] + [[package]] name = "want" version = "0.3.1" @@ -4291,6 +4610,18 @@ version = "0.25.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5f20c57d8d7db6d3b86154206ae5d8fba62dd39573114de97c2cb0578251f8e1" +[[package]] +name = "which" +version = "4.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87ba24419a2078cd2b0f2ede2691b6c66d8e47836da3b6db8265ebad47afbfc7" +dependencies = [ + "either", + "home", + "once_cell", + "rustix", +] + [[package]] name = "winapi" version = "0.3.9" @@ -4571,11 +4902,11 @@ dependencies = [ "prometheus", "serde", "serde_json", + "smallvec", "solana-logger", "solana-sdk", "solana-transaction-status", "spl-token-2022", - "thiserror", "tokio", "tokio-stream", "tonic", @@ -4591,14 +4922,19 @@ dependencies = [ "agave-geyser-plugin-interface", "anyhow", "bincode", - "prost", + "bytes", + "criterion", + "prost 0.11.9", + "prost 0.13.1", "protobuf-src", + "smallvec", "solana-account-decoder", "solana-sdk", + "solana-storage-proto", "solana-transaction-status", "thiserror", "tonic", - "tonic-build", + "tonic-build 0.12.1", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 1973e387..a5dcfe87 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,10 +1,10 @@ [workspace] resolver = "2" members = [ - "examples/rust", # 2.0.0 - "yellowstone-grpc-client", # 2.0.0 - "yellowstone-grpc-geyser", # 2.0.0 - "yellowstone-grpc-proto", # 2.0.0 + "examples/rust", # 2.0.0 + "yellowstone-grpc-client", # 2.0.0 + "yellowstone-grpc-geyser", # 2.0.0 + "yellowstone-grpc-proto", # 2.0.0 ] [workspace.package] @@ -27,6 +27,7 @@ bytes = "1.3.0" cargo-lock = "9.0.0" chrono = "0.4.26" clap = "4.3.0" +criterion = "0.5.1" crossbeam-channel = "0.5.8" env_logger = "0.11.3" futures = "0.3.24" @@ -45,6 +46,7 @@ log = "0.4.17" maplit = "1.0.2" prometheus = "0.13.2" prost = "0.13.1" +prost_011 = { package = "prost", version = "0.11.9" } protobuf-src = "1.1.0" scylla = "0.13.0" serde = "1.0.145" @@ -52,7 +54,9 @@ serde_json = "1.0.86" solana-account-decoder = "~2.0.10" solana-logger = "~2.0.10" solana-sdk = "~2.0.10" +solana-storage-proto = "~2.0.10" solana-transaction-status = "~2.0.10" +smallvec = "1.13.2" spl-token-2022 = "4.0.0" thiserror = "1.0" tokio = "1.21.2" diff --git a/examples/rust/Cargo.toml b/examples/rust/Cargo.toml index ee0ca7b9..cb300d0d 100644 --- a/examples/rust/Cargo.toml +++ b/examples/rust/Cargo.toml @@ -28,10 +28,10 @@ maplit = { workspace = true } serde_json = { workspace = true } solana-sdk = { workspace = true } solana-transaction-status = { workspace = true } -tokio = { workspace = true, features = ["rt-multi-thread"] } +tokio = { workspace = true, features = ["rt-multi-thread", "fs"] } tonic = { workspace = true } yellowstone-grpc-client = { workspace = true } -yellowstone-grpc-proto = { workspace = true, default-features = true } +yellowstone-grpc-proto = { workspace = true, default-features = true, features = ["plugin"] } [lints] workspace = true diff --git a/examples/rust/src/bin/client.rs b/examples/rust/src/bin/client.rs index 0ba205b9..418bcfe6 100644 --- a/examples/rust/src/bin/client.rs +++ b/examples/rust/src/bin/client.rs @@ -8,12 +8,19 @@ use { serde_json::{json, Value}, solana_sdk::{hash::Hash, pubkey::Pubkey, signature::Signature}, solana_transaction_status::UiTransactionEncoding, - std::{collections::HashMap, env, fs::File, sync::Arc, time::Duration}, - tokio::sync::Mutex, + std::{ + collections::HashMap, + env, + fs::File, + sync::Arc, + time::{Duration, Instant, SystemTime, UNIX_EPOCH}, + }, + tokio::{fs, sync::Mutex}, tonic::transport::channel::ClientTlsConfig, yellowstone_grpc_client::{GeyserGrpcClient, GeyserGrpcClientError, Interceptor}, yellowstone_grpc_proto::{ convert_from, + plugin::{filter::FilterName, message_ref::Message as MessageRef}, prelude::{ subscribe_request_filter_accounts_filter::Filter as AccountsFilterOneof, subscribe_request_filter_accounts_filter_lamports::Cmp as AccountsFilterLamports, @@ -258,13 +265,16 @@ struct ActionSubscribe { /// Show total stat instead of messages #[clap(long, default_value_t = false)] stats: bool, + + #[clap(long, default_value_t = false)] + verify_encoding: bool, } impl Action { async fn get_subscribe_request( &self, commitment: Option, - ) -> anyhow::Result> { + ) -> anyhow::Result> { Ok(match self { Self::Subscribe(args) => { let mut accounts: AccountFilterMap = HashMap::new(); @@ -439,6 +449,7 @@ impl Action { }, args.resub.unwrap_or(0), args.stats, + args.verify_encoding, )) } _ => None, @@ -485,7 +496,7 @@ async fn main() -> anyhow::Result<()> { .map(|response| info!("response: {response:?}")), Action::HealthWatch => geyser_health_watch(client).await, Action::Subscribe(_) => { - let (request, resub, stats) = args + let (request, resub, stats, verify_encoding) = args .action .get_subscribe_request(commitment) .await @@ -494,7 +505,7 @@ async fn main() -> anyhow::Result<()> { "expect subscribe action" )))?; - geyser_subscribe(client, request, resub, stats).await + geyser_subscribe(client, request, resub, stats, verify_encoding).await } Action::Ping { count } => client .ping(*count) @@ -552,26 +563,29 @@ async fn geyser_subscribe( request: SubscribeRequest, resub: usize, stats: bool, + verify_encoding: bool, ) -> anyhow::Result<()> { let pb_multi = MultiProgress::new(); let mut pb_accounts_c = 0; - let pb_accounts = crate_progress_bar(&pb_multi, "accounts", false)?; + let pb_accounts = crate_progress_bar(&pb_multi, ProgressBarTpl::Msg("accounts"))?; let mut pb_slots_c = 0; - let pb_slots = crate_progress_bar(&pb_multi, "slots", false)?; + let pb_slots = crate_progress_bar(&pb_multi, ProgressBarTpl::Msg("slots"))?; let mut pb_txs_c = 0; - let pb_txs = crate_progress_bar(&pb_multi, "transactions", false)?; + let pb_txs = crate_progress_bar(&pb_multi, ProgressBarTpl::Msg("transactions"))?; let mut pb_txs_st_c = 0; - let pb_txs_st = crate_progress_bar(&pb_multi, "transactions statuses", false)?; + let pb_txs_st = crate_progress_bar(&pb_multi, ProgressBarTpl::Msg("transactions statuses"))?; let mut pb_entries_c = 0; - let pb_entries = crate_progress_bar(&pb_multi, "entries", false)?; + let pb_entries = crate_progress_bar(&pb_multi, ProgressBarTpl::Msg("entries"))?; let mut pb_blocks_mt_c = 0; - let pb_blocks_mt = crate_progress_bar(&pb_multi, "blocks meta", false)?; + let pb_blocks_mt = crate_progress_bar(&pb_multi, ProgressBarTpl::Msg("blocks meta"))?; let mut pb_blocks_c = 0; - let pb_blocks = crate_progress_bar(&pb_multi, "blocks", false)?; + let pb_blocks = crate_progress_bar(&pb_multi, ProgressBarTpl::Msg("blocks"))?; let mut pb_pp_c = 0; - let pb_pp = crate_progress_bar(&pb_multi, "ping/pong", false)?; + let pb_pp = crate_progress_bar(&pb_multi, ProgressBarTpl::Msg("ping/pong"))?; let mut pb_total_c = 0; - let pb_total = crate_progress_bar(&pb_multi, "total", true)?; + let pb_total = crate_progress_bar(&pb_multi, ProgressBarTpl::Total)?; + let mut pb_verify_c = verify_encoding.then_some((0, 0)); + let pb_verify = crate_progress_bar(&pb_multi, ProgressBarTpl::Verify)?; let (mut subscribe_tx, mut stream) = client.subscribe_with_request(Some(request)).await?; @@ -603,6 +617,38 @@ async fn geyser_subscribe( pb_total_c += 1; pb_total.set_message(format_thousands(pb_total_c)); pb_total.inc(encoded_len); + if let Some((prost_c, ref_c)) = &mut pb_verify_c { + // let msg = msg.update_oneof.ok_or(anyhow::anyhow!("no update"))?; + let ts = Instant::now(); + let encoded_len_prost = msg.encoded_len(); + let encoded_prost = msg.clone().encode_to_vec(); + *prost_c += ts.elapsed().as_nanos(); + // Temporary `convert`, need to implement in proto crate + let message = MessageRef::new( + msg.filters.into_iter().map(FilterName::new).collect(), + msg.update_oneof.expect("no update message").into(), + ); + let ts = Instant::now(); + let encoded_len_ref = message.encoded_len(); + let encoded_ref = message.encode_to_vec(); + *ref_c += ts.elapsed().as_nanos(); + pb_verify.set_message(format!( + "{:.2?}%", + 100f64 * (*ref_c as f64) / (*prost_c as f64) + )); + if encoded_len_prost != encoded_len_ref || encoded_prost != encoded_ref { + let dir = "grpc-client-verify"; + let name = SystemTime::now().duration_since(UNIX_EPOCH)?.as_nanos(); + let path = format!("{dir}/{name}"); + error!("found unmached message, save to `{path}`"); + fs::create_dir(dir) + .await + .context("failed to create dir for unmached")?; + fs::write(path, encoded_prost) + .await + .context("failed to save unmached")?; + } + } continue; } @@ -752,14 +798,29 @@ async fn geyser_subscribe( Ok(()) } +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum ProgressBarTpl { + Msg(&'static str), + Total, + Verify, +} + fn crate_progress_bar( pb: &MultiProgress, - kind: &str, - elapsed: bool, + pb_t: ProgressBarTpl, ) -> Result { let pb = pb.add(ProgressBar::no_length()); - let elapsed = if elapsed { " in {elapsed_precise}" } else { "" }; - let tpl = format!("{{spinner}} {kind}: {{msg}} / ~{{bytes}} (~{{bytes_per_sec}}){elapsed}"); + let tpl = match pb_t { + ProgressBarTpl::Msg(kind) => { + format!("{{spinner}} {kind}: {{msg}} / ~{{bytes}} (~{{bytes_per_sec}})") + } + ProgressBarTpl::Total => { + "{spinner} total: {msg} / ~{bytes} (~{bytes_per_sec}) in {elapsed_precise}".to_owned() + } + ProgressBarTpl::Verify => { + "{spinner} verify: {msg} (elapsed time, compare to prost)".to_owned() + } + }; pb.set_style(ProgressStyle::with_template(&tpl)?); Ok(pb) } diff --git a/yellowstone-grpc-geyser/Cargo.toml b/yellowstone-grpc-geyser/Cargo.toml index a2bb09e0..0a999bbd 100644 --- a/yellowstone-grpc-geyser/Cargo.toml +++ b/yellowstone-grpc-geyser/Cargo.toml @@ -39,8 +39,8 @@ serde_json = { workspace = true } solana-logger = { workspace = true } solana-sdk = { workspace = true } solana-transaction-status = { workspace = true } +smallvec = { workspace = true } spl-token-2022 = { workspace = true, features = ["no-entrypoint"] } -thiserror = { workspace = true } tokio = { workspace = true, features = ["rt-multi-thread", "macros", "fs"] } tokio-stream = { workspace = true } tonic = { workspace = true, features = ["gzip", "zstd", "tls", "tls-roots"] } diff --git a/yellowstone-grpc-geyser/src/filters.rs b/yellowstone-grpc-geyser/src/filters.rs index dd74b333..0b782175 100644 --- a/yellowstone-grpc-geyser/src/filters.rs +++ b/yellowstone-grpc-geyser/src/filters.rs @@ -5,6 +5,7 @@ use { ConfigGrpcFiltersTransactions, }, base64::{engine::general_purpose::STANDARD as base64_engine, Engine}, + smallvec::SmallVec, solana_sdk::{pubkey::Pubkey, signature::Signature}, spl_token_2022::{generic_token_account::GenericTokenAccount, state::Account as TokenAccount}, std::{ @@ -14,178 +15,55 @@ use { sync::Arc, }, yellowstone_grpc_proto::{ - convert_to, plugin::{ filter::{FilterAccountsDataSlice, FilterName, FilterNames}, message::{ - CommitmentLevel, Message, MessageAccount, MessageAccountInfo, MessageBlock, - MessageBlockMeta, MessageEntry, MessageSlot, MessageTransaction, - MessageTransactionInfo, + CommitmentLevel, Message, MessageAccount, MessageBlock, MessageBlockMeta, + MessageEntry, MessageSlot, MessageTransaction, + }, + message_ref::{ + Message as FilteredMessage, MessageFilters as FilteredMessageFilters, + MessageRef as FilteredMessageRef, MessageRefBlock as FilteredMessageRefBlock, }, }, prelude::{ subscribe_request_filter_accounts_filter::Filter as AccountsFilterDataOneof, subscribe_request_filter_accounts_filter_lamports::Cmp as AccountsFilterLamports, subscribe_request_filter_accounts_filter_memcmp::Data as AccountsFilterMemcmpOneof, - subscribe_update::UpdateOneof, CommitmentLevel as CommitmentLevelProto, - SubscribeRequest, SubscribeRequestAccountsDataSlice, SubscribeRequestFilterAccounts, + CommitmentLevel as CommitmentLevelProto, SubscribeRequest, + SubscribeRequestAccountsDataSlice, SubscribeRequestFilterAccounts, SubscribeRequestFilterAccountsFilter, SubscribeRequestFilterAccountsFilterLamports, SubscribeRequestFilterBlocks, SubscribeRequestFilterBlocksMeta, SubscribeRequestFilterEntry, SubscribeRequestFilterSlots, - SubscribeRequestFilterTransactions, SubscribeUpdate, SubscribeUpdateAccount, - SubscribeUpdateAccountInfo, SubscribeUpdateBlock, SubscribeUpdateBlockMeta, - SubscribeUpdateEntry, SubscribeUpdatePong, SubscribeUpdateSlot, - SubscribeUpdateTransaction, SubscribeUpdateTransactionInfo, - SubscribeUpdateTransactionStatus, TransactionError as SubscribeUpdateTransactionError, + SubscribeRequestFilterTransactions, }, }, }; -#[derive(Debug, Clone)] -pub enum FilteredMessage<'a> { - Slot(&'a MessageSlot), - Account(&'a MessageAccount), - Transaction(&'a MessageTransaction), - TransactionStatus(&'a MessageTransaction), - Entry(&'a MessageEntry), - Block(MessageBlock), - BlockMeta(&'a MessageBlockMeta), -} +pub type FilteredMessages = SmallVec<[FilteredMessage; 2]>; -impl<'a> FilteredMessage<'a> { - fn as_proto_account( - message: &MessageAccountInfo, - data_slice: &FilterAccountsDataSlice, - ) -> SubscribeUpdateAccountInfo { - let data_slice = data_slice.as_ref(); - let data = if data_slice.is_empty() { - message.data.clone() - } else { - let mut data = Vec::with_capacity(data_slice.iter().map(|ds| ds.end - ds.start).sum()); - for data_slice in data_slice { - if message.data.len() >= data_slice.end { - data.extend_from_slice(&message.data[data_slice.start..data_slice.end]); - } - } - data - }; - SubscribeUpdateAccountInfo { - pubkey: message.pubkey.as_ref().into(), - lamports: message.lamports, - owner: message.owner.as_ref().into(), - executable: message.executable, - rent_epoch: message.rent_epoch, - data, - write_version: message.write_version, - txn_signature: message.txn_signature.map(|s| s.as_ref().into()), +macro_rules! filtered_messages_once_owned { + ($filters:ident, $message:expr) => {{ + let mut messages = FilteredMessages::new(); + if !$filters.is_empty() { + messages.push(FilteredMessage::new($filters, $message)); } - } - - fn as_proto_transaction(message: &MessageTransactionInfo) -> SubscribeUpdateTransactionInfo { - SubscribeUpdateTransactionInfo { - signature: message.signature.as_ref().into(), - is_vote: message.is_vote, - transaction: Some(convert_to::create_transaction(&message.transaction)), - meta: Some(convert_to::create_transaction_meta(&message.meta)), - index: message.index as u64, - } - } - - fn as_proto_entry(message: &MessageEntry) -> SubscribeUpdateEntry { - SubscribeUpdateEntry { - slot: message.slot, - index: message.index as u64, - num_hashes: message.num_hashes, - hash: message.hash.into(), - executed_transaction_count: message.executed_transaction_count, - starting_transaction_index: message.starting_transaction_index, - } - } + messages + }}; +} - pub fn as_proto(&self, accounts_data_slice: &FilterAccountsDataSlice) -> UpdateOneof { - match self { - Self::Slot(message) => UpdateOneof::Slot(SubscribeUpdateSlot { - slot: message.slot, - parent: message.parent, - status: message.status as i32, - }), - Self::Account(message) => UpdateOneof::Account(SubscribeUpdateAccount { - account: Some(Self::as_proto_account( - message.account.as_ref(), - accounts_data_slice, - )), - slot: message.slot, - is_startup: message.is_startup, - }), - Self::Transaction(message) => UpdateOneof::Transaction(SubscribeUpdateTransaction { - transaction: Some(Self::as_proto_transaction(message.transaction.as_ref())), - slot: message.slot, - }), - Self::TransactionStatus(message) => { - UpdateOneof::TransactionStatus(SubscribeUpdateTransactionStatus { - slot: message.slot, - signature: message.transaction.signature.as_ref().into(), - is_vote: message.transaction.is_vote, - index: message.transaction.index as u64, - err: match &message.transaction.meta.status { - Ok(()) => None, - Err(err) => Some(SubscribeUpdateTransactionError { - err: bincode::serialize(&err) - .expect("transaction error to serialize to bytes"), - }), - }, - }) +macro_rules! filtered_messages_once_ref { + ($filters:ident, $message:expr) => {{ + let mut messages = FilteredMessages::new(); + if !$filters.is_empty() { + let mut message_filters = FilteredMessageFilters::new(); + for filter in $filters { + message_filters.push(filter.clone()); } - Self::Entry(message) => UpdateOneof::Entry(Self::as_proto_entry(message)), - Self::Block(message) => UpdateOneof::Block(SubscribeUpdateBlock { - slot: message.meta.slot, - blockhash: message.meta.blockhash.clone(), - rewards: Some(convert_to::create_rewards_obj( - message.meta.rewards.as_slice(), - message.meta.num_partitions, - )), - block_time: message.meta.block_time.map(convert_to::create_timestamp), - block_height: message - .meta - .block_height - .map(convert_to::create_block_height), - parent_slot: message.meta.parent_slot, - parent_blockhash: message.meta.parent_blockhash.clone(), - executed_transaction_count: message.meta.executed_transaction_count, - transactions: message - .transactions - .iter() - .map(|tx| Self::as_proto_transaction(tx.as_ref())) - .collect(), - updated_account_count: message.updated_account_count, - accounts: message - .accounts - .iter() - .map(|acc| Self::as_proto_account(acc.as_ref(), accounts_data_slice)) - .collect(), - entries_count: message.meta.entries_count, - entries: message - .entries - .iter() - .map(|entry| Self::as_proto_entry(entry.as_ref())) - .collect(), - }), - Self::BlockMeta(message) => UpdateOneof::BlockMeta(SubscribeUpdateBlockMeta { - slot: message.slot, - blockhash: message.blockhash.clone(), - rewards: Some(convert_to::create_rewards_obj( - message.rewards.as_slice(), - message.num_partitions, - )), - block_time: message.block_time.map(convert_to::create_timestamp), - block_height: message.block_height.map(convert_to::create_block_height), - parent_slot: message.parent_slot, - parent_blockhash: message.parent_blockhash.clone(), - executed_transaction_count: message.executed_transaction_count, - entries_count: message.entries_count, - }), + messages.push(FilteredMessage::new(message_filters, $message)); } - } + messages + }}; } #[derive(Debug, Clone)] @@ -319,53 +197,39 @@ impl Filter { self.commitment } - pub fn get_filters<'a>( - &'a self, - message: &'a Message, + pub fn get_filters( + &self, + message: &Message, commitment: Option, - ) -> Box, FilteredMessage<'a>)> + Send + 'a> { + ) -> FilteredMessages { match message { - Message::Account(message) => self.accounts.get_filters(message), + Message::Account(message) => self + .accounts + .get_filters(message, &self.accounts_data_slice), Message::Slot(message) => self.slots.get_filters(message, commitment), - Message::Transaction(message) => Box::new( - self.transactions - .get_filters(message) - .chain(self.transactions_status.get_filters(message)), - ), + Message::Transaction(message) => { + let mut messages = self.transactions.get_filters(message); + messages.append(&mut self.transactions_status.get_filters(message)); + messages + } Message::Entry(message) => self.entries.get_filters(message), - Message::Block(message) => self.blocks.get_filters(message), + Message::Block(message) => self.blocks.get_filters(message, &self.accounts_data_slice), Message::BlockMeta(message) => self.blocks_meta.get_filters(message), } } - pub fn get_update<'a>( - &'a self, - message: &'a Message, - commitment: Option, - ) -> Box + Send + 'a> { - Box::new( - self.get_filters(message, commitment) - .filter_map(|(filters, message)| { - if filters.is_empty() { - None - } else { - Some(SubscribeUpdate { - filters: filters - .iter() - .map(|name| name.as_ref().to_string()) - .collect(), - update_oneof: Some(message.as_proto(&self.accounts_data_slice)), - }) - } - }), - ) + pub fn get_pong_msg(&self) -> Option { + self.ping.map(|id| FilteredMessage { + filters: FilteredMessageFilters::new(), + message: FilteredMessageRef::pong(id), + }) } - pub fn get_pong_msg(&self) -> Option { - self.ping.map(|id| SubscribeUpdate { - filters: vec![], - update_oneof: Some(UpdateOneof::Pong(SubscribeUpdatePong { id })), - }) + pub fn create_ping_message() -> FilteredMessage { + FilteredMessage { + filters: FilteredMessageFilters::new(), + message: FilteredMessageRef::Ping, + } } } @@ -446,19 +310,21 @@ impl FilterAccounts { Ok(required) } - fn get_filters<'a>( - &'a self, - message: &'a MessageAccount, - ) -> Box, FilteredMessage<'a>)> + Send + 'a> { + fn get_filters( + &self, + message: &MessageAccount, + accounts_data_slice: &FilterAccountsDataSlice, + ) -> FilteredMessages { let mut filter = FilterAccountsMatch::new(self); filter.match_txn_signature(&message.account.txn_signature); filter.match_account(&message.account.pubkey); filter.match_owner(&message.account.owner); filter.match_data_lamports(&message.account.data, message.account.lamports); - Box::new(std::iter::once(( - filter.get_filters(), - FilteredMessage::Account(message), - ))) + let filters = filter.get_filters(); + filtered_messages_once_owned!( + filters, + FilteredMessageRef::account(message, accounts_data_slice.clone()) + ) } } @@ -649,7 +515,7 @@ impl<'a> FilterAccountsMatch<'a> { } } - pub fn get_filters(&self) -> Vec { + pub fn get_filters(&self) -> FilteredMessageFilters { self.filter .filters .iter() @@ -717,24 +583,23 @@ impl FilterSlots { }) } - fn get_filters<'a>( - &'a self, - message: &'a MessageSlot, + fn get_filters( + &self, + message: &MessageSlot, commitment: Option, - ) -> Box, FilteredMessage<'a>)> + Send + 'a> { - Box::new(std::iter::once(( - self.filters - .iter() - .filter_map(|(name, inner)| { - if !inner.filter_by_commitment || commitment == Some(message.status) { - Some(name.clone()) - } else { - None - } - }) - .collect(), - FilteredMessage::Slot(message), - ))) + ) -> FilteredMessages { + let filters = self + .filters + .iter() + .filter_map(|(name, inner)| { + if !inner.filter_by_commitment || commitment == Some(message.status) { + Some(name.clone()) + } else { + None + } + }) + .collect::(); + filtered_messages_once_owned!(filters, FilteredMessageRef::slot(*message)) } } @@ -827,10 +692,7 @@ impl FilterTransactions { }) } - pub fn get_filters<'a>( - &'a self, - message: &'a MessageTransaction, - ) -> Box, FilteredMessage<'a>)> + Send + 'a> { + pub fn get_filters(&self, message: &MessageTransaction) -> FilteredMessages { let filters = self .filters .iter() @@ -903,14 +765,17 @@ impl FilterTransactions { Some(name.clone()) }) - .collect(); - let message = match self.filter_type { - FilterTransactionsType::Transaction => FilteredMessage::Transaction(message), - FilterTransactionsType::TransactionStatus => { - FilteredMessage::TransactionStatus(message) + .collect::(); + + filtered_messages_once_owned!( + filters, + match self.filter_type { + FilterTransactionsType::Transaction => FilteredMessageRef::transaction(message), + FilterTransactionsType::TransactionStatus => { + FilteredMessageRef::transaction_status(message) + } } - }; - Box::new(std::iter::once((filters, message))) + ) } } @@ -935,14 +800,9 @@ impl FilterEntries { }) } - fn get_filters<'a>( - &'a self, - message: &'a MessageEntry, - ) -> Box, FilteredMessage<'a>)> + Send + 'a> { - Box::new(std::iter::once(( - self.filters.clone(), - FilteredMessage::Entry(message), - ))) + fn get_filters(&self, message: &Arc) -> FilteredMessages { + let filters = self.filters.as_slice(); + filtered_messages_once_ref!(filters, FilteredMessageRef::entry(Arc::clone(message))) } } @@ -1006,11 +866,13 @@ impl FilterBlocks { Ok(this) } - fn get_filters<'a>( - &'a self, - message: &'a MessageBlock, - ) -> Box, FilteredMessage<'a>)> + Send + 'a> { - Box::new(self.filters.iter().map(move |(filter, inner)| { + fn get_filters( + &self, + message: &Arc, + accounts_data_slice: &FilterAccountsDataSlice, + ) -> FilteredMessages { + let mut messages = FilteredMessages::new(); + for (filter, inner) in self.filters.iter() { #[allow(clippy::unnecessary_filter_map)] let transactions = if matches!(inner.include_transactions, None | Some(true)) { message @@ -1063,17 +925,21 @@ impl FilterBlocks { vec![] }; - ( - vec![filter.clone()], - FilteredMessage::Block(MessageBlock { + let mut message_filters = FilteredMessageFilters::new(); + message_filters.push(filter.clone()); + messages.push(FilteredMessage::new( + message_filters, + FilteredMessageRef::block(Box::new(FilteredMessageRefBlock { meta: Arc::clone(&message.meta), transactions, updated_account_count: message.updated_account_count, + accounts_data_slice: accounts_data_slice.clone(), accounts, entries, - }), - ) - })) + })), + )); + } + messages } } @@ -1098,14 +964,9 @@ impl FilterBlocksMeta { }) } - fn get_filters<'a>( - &'a self, - message: &'a MessageBlockMeta, - ) -> Box, FilteredMessage<'a>)> + Send + 'a> { - Box::new(std::iter::once(( - self.filters.clone(), - FilteredMessage::BlockMeta(message), - ))) + fn get_filters(&self, message: &Arc) -> FilteredMessages { + let filters = self.filters.as_slice(); + filtered_messages_once_ref!(filters, FilteredMessageRef::block_meta(Arc::clone(message))) } } @@ -1145,7 +1006,6 @@ pub fn parse_accounts_data_slice_create( #[cfg(test)] mod tests { use { - super::{FilterName, FilterNames, FilteredMessage}, crate::{config::ConfigGrpcFilters, filters::Filter}, solana_sdk::{ hash::Hash, @@ -1161,7 +1021,13 @@ mod tests { SubscribeRequest, SubscribeRequestFilterAccounts, SubscribeRequestFilterTransactions, }, - plugin::message::{Message, MessageTransaction, MessageTransactionInfo}, + plugin::{ + filter::{FilterName, FilterNames}, + message::{Message, MessageTransaction, MessageTransactionInfo}, + message_ref::{ + MessageFilters as FilteredMessageFilters, MessageRef as FilteredMessageRef, + }, + }, }, }; @@ -1355,10 +1221,10 @@ mod tests { }, ); - let config = SubscribeRequest { + let mut config = SubscribeRequest { accounts: HashMap::new(), slots: HashMap::new(), - transactions, + transactions: transactions.clone(), transactions_status: HashMap::new(), blocks: HashMap::new(), blocks_meta: HashMap::new(), @@ -1373,14 +1239,28 @@ mod tests { let message_transaction = create_message_transaction(&keypair_b, vec![account_key_b, account_key_a]); let message = Message::Transaction(message_transaction); - let updates = filter.get_filters(&message, None).collect::>(); + let updates = filter.get_filters(&message, None); + assert_eq!(updates.len(), 1); + assert_eq!( + updates[0].filters, + FilteredMessageFilters::from_vec(vec![FilterName::new("serum")]) + ); + assert!(matches!( + updates[0].message, + FilteredMessageRef::Transaction(_) + )); + + config.transactions_status = transactions; + let filter = Filter::new(&config, &limit, &mut create_filter_names()).unwrap(); + let updates = filter.get_filters(&message, None); assert_eq!(updates.len(), 2); - assert_eq!(updates[0].0, vec![FilterName::new("serum")]); - assert!(matches!(updates[0].1, FilteredMessage::Transaction(_))); - assert_eq!(updates[1].0, Vec::::new()); + assert_eq!( + updates[1].filters, + FilteredMessageFilters::from_vec(vec![FilterName::new("serum")]) + ); assert!(matches!( - updates[1].1, - FilteredMessage::TransactionStatus(_) + updates[1].message, + FilteredMessageRef::TransactionStatus(_) )); } @@ -1405,10 +1285,10 @@ mod tests { }, ); - let config = SubscribeRequest { + let mut config = SubscribeRequest { accounts: HashMap::new(), slots: HashMap::new(), - transactions, + transactions: transactions.clone(), transactions_status: HashMap::new(), blocks: HashMap::new(), blocks_meta: HashMap::new(), @@ -1423,14 +1303,28 @@ mod tests { let message_transaction = create_message_transaction(&keypair_b, vec![account_key_b, account_key_a]); let message = Message::Transaction(message_transaction); - let updates = filter.get_filters(&message, None).collect::>(); + let updates = filter.get_filters(&message, None); + assert_eq!(updates.len(), 1); + assert_eq!( + updates[0].filters, + FilteredMessageFilters::from_vec(vec![FilterName::new("serum")]) + ); + assert!(matches!( + updates[0].message, + FilteredMessageRef::Transaction(_) + )); + + config.transactions_status = transactions; + let filter = Filter::new(&config, &limit, &mut create_filter_names()).unwrap(); + let updates = filter.get_filters(&message, None); assert_eq!(updates.len(), 2); - assert_eq!(updates[0].0, vec![FilterName::new("serum")]); - assert!(matches!(updates[0].1, FilteredMessage::Transaction(_))); - assert_eq!(updates[1].0, Vec::::new()); + assert_eq!( + updates[1].filters, + FilteredMessageFilters::from_vec(vec![FilterName::new("serum")]) + ); assert!(matches!( - updates[1].1, - FilteredMessage::TransactionStatus(_) + updates[1].message, + FilteredMessageRef::TransactionStatus(_) )); } @@ -1473,8 +1367,8 @@ mod tests { let message_transaction = create_message_transaction(&keypair_b, vec![account_key_b, account_key_a]); let message = Message::Transaction(message_transaction); - for (filters, _message) in filter.get_filters(&message, None) { - assert!(filters.is_empty()); + for message in filter.get_filters(&message, None) { + assert!(message.filters.is_empty()); } } @@ -1505,10 +1399,10 @@ mod tests { }, ); - let config = SubscribeRequest { + let mut config = SubscribeRequest { accounts: HashMap::new(), slots: HashMap::new(), - transactions, + transactions: transactions.clone(), transactions_status: HashMap::new(), blocks: HashMap::new(), blocks_meta: HashMap::new(), @@ -1525,14 +1419,28 @@ mod tests { vec![account_key_x, account_key_y, account_key_z], ); let message = Message::Transaction(message_transaction); - let updates = filter.get_filters(&message, None).collect::>(); + let updates = filter.get_filters(&message, None); + assert_eq!(updates.len(), 1); + assert_eq!( + updates[0].filters, + FilteredMessageFilters::from_vec(vec![FilterName::new("serum")]) + ); + assert!(matches!( + updates[0].message, + FilteredMessageRef::Transaction(_) + )); + + config.transactions_status = transactions; + let filter = Filter::new(&config, &limit, &mut create_filter_names()).unwrap(); + let updates = filter.get_filters(&message, None); assert_eq!(updates.len(), 2); - assert_eq!(updates[0].0, vec![FilterName::new("serum")]); - assert!(matches!(updates[0].1, FilteredMessage::Transaction(_))); - assert_eq!(updates[1].0, Vec::::new()); + assert_eq!( + updates[1].filters, + FilteredMessageFilters::from_vec(vec![FilterName::new("serum")]) + ); assert!(matches!( - updates[1].1, - FilteredMessage::TransactionStatus(_) + updates[1].message, + FilteredMessageRef::TransactionStatus(_) )); } @@ -1581,8 +1489,8 @@ mod tests { let message_transaction = create_message_transaction(&keypair_x, vec![account_key_x, account_key_z]); let message = Message::Transaction(message_transaction); - for (filters, _message) in filter.get_filters(&message, None) { - assert!(filters.is_empty()); + for message in filter.get_filters(&message, None) { + assert!(message.filters.is_empty()); } } } diff --git a/yellowstone-grpc-geyser/src/grpc.rs b/yellowstone-grpc-geyser/src/grpc.rs index b398e0a1..437c2786 100644 --- a/yellowstone-grpc-geyser/src/grpc.rs +++ b/yellowstone-grpc-geyser/src/grpc.rs @@ -5,7 +5,7 @@ use { metrics::{self, DebugClientMessage}, version::GrpcVersionInfo, }, - anyhow::Context, + anyhow::Context as _, log::{error, info}, solana_sdk::{ clock::{Slot, MAX_RECENT_BLOCKHASHES}, @@ -42,15 +42,14 @@ use { CommitmentLevel, Message, MessageBlockMeta, MessageEntry, MessageSlot, MessageTransactionInfo, }, + message_ref::Message as FilteredMessage, + proto::geyser_server::{Geyser, GeyserServer}, }, prelude::{ - geyser_server::{Geyser, GeyserServer}, - subscribe_update::UpdateOneof, CommitmentLevel as CommitmentLevelProto, GetBlockHeightRequest, GetBlockHeightResponse, GetLatestBlockhashRequest, GetLatestBlockhashResponse, GetSlotRequest, GetSlotResponse, GetVersionRequest, GetVersionResponse, IsBlockhashValidRequest, - IsBlockhashValidResponse, PingRequest, PongResponse, SubscribeRequest, SubscribeUpdate, - SubscribeUpdatePing, + IsBlockhashValidResponse, PingRequest, PongResponse, SubscribeRequest, }, }, }; @@ -753,7 +752,7 @@ impl GrpcService { async fn client_loop( id: usize, endpoint: String, - stream_tx: mpsc::Sender>, + stream_tx: mpsc::Sender>, mut client_rx: mpsc::UnboundedReceiver>, mut snapshot_rx: Option>>, mut messages_rx: broadcast::Receiver<(CommitmentLevel, Arc>)>, @@ -842,7 +841,7 @@ impl GrpcService { if commitment == filter.get_commitment_level() { for message in messages.iter() { - for message in filter.get_update(message, Some(commitment)) { + for message in filter.get_filters(message, Some(commitment)) { match stream_tx.try_send(Ok(message)) { Ok(()) => {} Err(mpsc::error::TrySendError::Full(_)) => { @@ -883,7 +882,7 @@ impl GrpcService { async fn client_loop_snapshot( id: usize, endpoint: &str, - stream_tx: &mpsc::Sender>, + stream_tx: &mpsc::Sender>, client_rx: &mut mpsc::UnboundedReceiver>, snapshot_rx: crossbeam_channel::Receiver>, is_alive: &mut bool, @@ -933,7 +932,7 @@ impl GrpcService { } }; - for message in filter.get_update(&message, None) { + for message in filter.get_filters(&message, None) { if stream_tx.send(Ok(message)).await.is_err() { error!("client #{id}: stream closed"); *is_alive = false; @@ -946,7 +945,7 @@ impl GrpcService { #[tonic::async_trait] impl Geyser for GrpcService { - type SubscribeStream = ReceiverStream>; + type SubscribeStream = ReceiverStream>; async fn subscribe( &self, @@ -976,18 +975,14 @@ impl Geyser for GrpcService { let exit = ping_exit.notified(); tokio::pin!(exit); - let ping_msg = SubscribeUpdate { - filters: vec![], - update_oneof: Some(UpdateOneof::Ping(SubscribeUpdatePing {})), - }; - loop { tokio::select! { _ = &mut exit => { break; } _ = sleep(Duration::from_secs(10)) => { - match ping_stream_tx.try_send(Ok(ping_msg.clone())) { + let msg = Filter::create_ping_message(); + match ping_stream_tx.try_send(Ok(msg)) { Ok(()) => {} Err(mpsc::error::TrySendError::Full(_)) => {} Err(mpsc::error::TrySendError::Closed(_)) => { @@ -1025,7 +1020,9 @@ impl Geyser for GrpcService { let mut filter_names = filter_names.lock().await; filter_names.try_clean(); - if let Err(error) = match Filter::new(&request, &config_filters, &mut filter_names) { + let maybe_filter = Filter::new(&request, &config_filters, &mut filter_names); + drop(filter_names); + if let Err(error) = match maybe_filter { Ok(filter) => match incoming_client_tx.send(Some(filter)) { Ok(()) => Ok(()), Err(error) => Err(error.to_string()), diff --git a/yellowstone-grpc-proto/Cargo.toml b/yellowstone-grpc-proto/Cargo.toml index 7cc95a80..0fcd7186 100644 --- a/yellowstone-grpc-proto/Cargo.toml +++ b/yellowstone-grpc-proto/Cargo.toml @@ -10,16 +10,30 @@ license = "Apache-2.0" keywords = { workspace = true } publish = true +[[bench]] +name = "encode" +harness = false +required-features = ["plugin-bench"] + [dependencies] agave-geyser-plugin-interface = { workspace = true, optional = true } bincode = { workspace = true, optional = true } +bytes = { workspace = true, optional = true } prost = { workspace = true } +prost_011 = { workspace = true, optional = true } +smallvec = { workspace = true, optional = true } solana-account-decoder = { workspace = true, optional = true } solana-sdk = { workspace = true, optional = true } +solana-storage-proto = { workspace = true, optional = true } solana-transaction-status = { workspace = true, optional = true } thiserror = { workspace = true, optional = true } tonic = { workspace = true } +[dev-dependencies] +criterion = { workspace = true } +prost_011 = { workspace = true } +solana-storage-proto = { workspace = true } + [build-dependencies] anyhow = { workspace = true } protobuf-src = { workspace = true } @@ -28,16 +42,19 @@ tonic-build = { workspace = true } [features] default = ["convert", "tonic-compression"] convert = [ - "dep:bincode", - "dep:solana-account-decoder", - "dep:solana-sdk", - "dep:solana-transaction-status" + "dep:bincode", + "dep:solana-account-decoder", + "dep:solana-sdk", + "dep:solana-transaction-status", ] plugin = [ - "convert", - "dep:agave-geyser-plugin-interface", - "dep:thiserror" + "convert", + "dep:agave-geyser-plugin-interface", + "dep:bytes", + "dep:smallvec", + "dep:thiserror", ] +plugin-bench = ["plugin", "dep:prost_011", "dep:solana-storage-proto"] tonic-compression = ["tonic/gzip", "tonic/zstd"] [lints] diff --git a/yellowstone-grpc-proto/benches/encode.rs b/yellowstone-grpc-proto/benches/encode.rs new file mode 100644 index 00000000..deb66586 --- /dev/null +++ b/yellowstone-grpc-proto/benches/encode.rs @@ -0,0 +1,208 @@ +use { + criterion::{criterion_group, criterion_main, BenchmarkId, Criterion}, + prost::Message as _, + std::{sync::Arc, time::Duration}, + yellowstone_grpc_proto::{ + geyser::{ + subscribe_update::UpdateOneof, SubscribeUpdate, SubscribeUpdateAccount, + SubscribeUpdateTransaction, + }, + plugin::{ + filter::FilterAccountsDataSlice, + message::{MessageAccount, MessageTransaction, MessageTransactionInfo}, + message_ref::{ + tests::{ + create_accounts, create_message_filters, load_predefined_blocks, + load_predefined_transactions, + }, + Message, MessageFilters, MessageRef, MessageRefBlock, + }, + }, + }, +}; + +fn build_subscribe_update(filters: &MessageFilters, update: UpdateOneof) -> SubscribeUpdate { + SubscribeUpdate { + filters: filters.iter().map(|f| f.as_ref().to_owned()).collect(), + update_oneof: Some(update), + } +} + +fn build_subscribe_update_account( + message: &MessageAccount, + data_slice: &FilterAccountsDataSlice, +) -> UpdateOneof { + UpdateOneof::Account(SubscribeUpdateAccount { + account: Some((message.account.as_ref(), data_slice).into()), + slot: message.slot, + is_startup: message.is_startup, + }) +} + +fn build_subscribe_transaction(transaction: &MessageTransactionInfo, slot: u64) -> UpdateOneof { + UpdateOneof::Transaction(SubscribeUpdateTransaction { + transaction: Some(transaction.into()), + slot, + }) +} + +fn build_subscribe_block(block: &MessageRefBlock) -> UpdateOneof { + UpdateOneof::Block(block.into()) +} + +fn bench_account(c: &mut Criterion) { + let filters = create_message_filters(&["my special filter"]); + + let accounts = create_accounts(); + c.bench_with_input( + BenchmarkId::new("accounts", "ref"), + &(&accounts, &filters), + |b, (accounts, filters)| { + b.iter(|| { + for (account, data_slice) in accounts.iter() { + let msg = Message { + filters: (*filters).clone(), + message: MessageRef::account(account, data_slice.clone()), + }; + msg.encode_to_vec().len(); + } + }) + }, + ); + c.bench_with_input( + BenchmarkId::new("accounts", "prost"), + &(&accounts, &filters), + |b, (accounts, filters)| { + b.iter(|| { + for (account, data_slice) in accounts.iter() { + let msg = build_subscribe_update( + filters, + build_subscribe_update_account(account, data_slice), + ); + msg.encode_to_vec().len(); + } + }) + }, + ); + + let accounts = accounts + .iter() + .map(|(account, data_slice)| build_subscribe_update_account(account, data_slice)) + .collect::>(); + c.bench_with_input( + BenchmarkId::new("accounts", "prost clone"), + &(&accounts, &filters), + |b, (accounts, filters)| { + b.iter(|| { + for account in accounts.iter() { + let msg = build_subscribe_update(filters, account.clone()); + msg.encode_to_vec().len(); + } + }) + }, + ); + + let transactions = load_predefined_transactions(); + c.bench_with_input( + BenchmarkId::new("transactions", "ref"), + &(&transactions, &filters), + |b, (transactions, filters)| { + b.iter(|| { + for transaction in transactions.iter() { + let msg = Message { + filters: (*filters).clone(), + message: MessageRef::transaction(&MessageTransaction { + transaction: Arc::clone(transaction), + slot: 42, + }), + }; + msg.encode_to_vec().len(); + } + }) + }, + ); + c.bench_with_input( + BenchmarkId::new("transactions", "prost"), + &(&transactions, &filters), + |b, (transactions, filters)| { + b.iter(|| { + for transaction in transactions.iter() { + let msg = build_subscribe_update( + filters, + build_subscribe_transaction(transaction, 42), + ); + msg.encode_to_vec().len(); + } + }) + }, + ); + + let transactions = transactions + .into_iter() + .map(|transaction| build_subscribe_transaction(transaction.as_ref(), 42)) + .collect::>(); + c.bench_with_input( + BenchmarkId::new("transactions", "prost clone"), + &(&transactions, &filters), + |b, (transactions, filters)| { + b.iter(|| { + for transaction in transactions.iter() { + let msg = build_subscribe_update(filters, transaction.clone()); + msg.encode_to_vec().len(); + } + }) + }, + ); + + let blocks = load_predefined_blocks(); + c.bench_with_input( + BenchmarkId::new("blocks", "ref"), + &(blocks.as_slice(), &filters), + |b, (blocks, filters)| { + b.iter(|| { + for block in blocks.iter() { + let msg = Message { + filters: (*filters).clone(), + message: MessageRef::block(Box::new(block.clone())), + }; + msg.encode_to_vec().len(); + } + }) + }, + ); + c.bench_with_input( + BenchmarkId::new("blocks", "prost"), + &(blocks.as_slice(), &filters), + |b, (blocks, filters)| { + b.iter(|| { + for block in blocks.iter() { + let msg = build_subscribe_update(filters, build_subscribe_block(block)); + msg.encode_to_vec().len(); + } + }) + }, + ); + + let blocks = blocks.iter().map(build_subscribe_block).collect::>(); + c.bench_with_input( + BenchmarkId::new("blocks", "prost clone"), + &(blocks.as_slice(), &filters), + |b, (blocks, filters)| { + b.iter(|| { + for block in blocks.iter() { + let msg = build_subscribe_update(filters, block.clone()); + msg.encode_to_vec().len(); + } + }) + }, + ); +} + +criterion_group!( + name = benches; + config = Criterion::default() + .warm_up_time(Duration::from_secs(3)) // default 3 + .measurement_time(Duration::from_secs(5)); // default 5 + targets = bench_account +); +criterion_main!(benches); diff --git a/yellowstone-grpc-proto/build.rs b/yellowstone-grpc-proto/build.rs index 66db00ba..c66ae43e 100644 --- a/yellowstone-grpc-proto/build.rs +++ b/yellowstone-grpc-proto/build.rs @@ -1,5 +1,96 @@ +use tonic_build::manual::{Builder, Method, Service}; + fn main() -> anyhow::Result<()> { std::env::set_var("PROTOC", protobuf_src::protoc()); + + // build protos tonic_build::compile_protos("proto/geyser.proto")?; + + // build with accepting our custom struct + let geyser_service = Service::builder() + .name("Geyser") + .package("geyser") + .method( + Method::builder() + .name("subscribe") + .route_name("Subscribe") + .input_type("crate::geyser::SubscribeRequest") + // .output_type("crate::geyser::SubscribeUpdate") + .codec_path("tonic::codec::ProstCodec") + .output_type("crate::plugin::message_ref::Message") + // .codec_path("crate::plugin::codec::SubscribeCodec") + .client_streaming() + .server_streaming() + .build(), + ) + .method( + Method::builder() + .name("ping") + .route_name("Ping") + .input_type("crate::geyser::PingRequest") + .output_type("crate::geyser::PongResponse") + .codec_path("tonic::codec::ProstCodec") + .build(), + ) + .method( + Method::builder() + .name("get_latest_blockhash") + .route_name("GetLatestBlockhash") + .input_type("crate::geyser::GetLatestBlockhashRequest") + .output_type("crate::geyser::GetLatestBlockhashResponse") + .codec_path("tonic::codec::ProstCodec") + .build(), + ) + .method( + Method::builder() + .name("get_block_height") + .route_name("GetBlockHeight") + .input_type("crate::geyser::GetBlockHeightRequest") + .output_type("crate::geyser::GetBlockHeightResponse") + .codec_path("tonic::codec::ProstCodec") + .build(), + ) + .method( + Method::builder() + .name("get_slot") + .route_name("GetSlot") + .input_type("crate::geyser::GetSlotRequest") + .output_type("crate::geyser::GetSlotResponse") + .codec_path("tonic::codec::ProstCodec") + .build(), + ) + .method( + Method::builder() + .name("is_blockhash_valid") + .route_name("IsBlockhashValid") + .input_type("crate::geyser::IsBlockhashValidRequest") + .output_type("crate::geyser::IsBlockhashValidResponse") + .codec_path("tonic::codec::ProstCodec") + .build(), + ) + .method( + Method::builder() + .name("get_version") + .route_name("GetVersion") + .input_type("crate::geyser::GetVersionRequest") + .output_type("crate::geyser::GetVersionResponse") + .codec_path("tonic::codec::ProstCodec") + .build(), + ) + .build(); + Builder::new() + .build_client(false) + .compile(&[geyser_service]); + + // patching generated custom struct (if custom Codec is used) + // let mut location = std::path::PathBuf::from(std::env::var("OUT_DIR")?); + // location.push("geyser.Geyser.rs"); + // let geyser_rs = std::fs::read_to_string(location.clone())?; + // let geyser_rs = geyser_rs.replace( + // "let codec = crate::plugin::codec::SubscribeCodec::default();", + // "let codec = crate::plugin::codec::SubscribeCodec::::default();", + // ); + // std::fs::write(location, geyser_rs)?; + Ok(()) } diff --git a/yellowstone-grpc-proto/src/lib.rs b/yellowstone-grpc-proto/src/lib.rs index ee61332d..f3bb8cea 100644 --- a/yellowstone-grpc-proto/src/lib.rs +++ b/yellowstone-grpc-proto/src/lib.rs @@ -235,8 +235,7 @@ pub mod convert_to { pub fn create_rewards_obj(rewards: &[Reward], num_partitions: Option) -> proto::Rewards { proto::Rewards { rewards: create_rewards(rewards), - num_partitions: num_partitions - .map(|num_partitions| proto::NumPartitions { num_partitions }), + num_partitions: num_partitions.map(create_num_partitions), } } @@ -249,17 +248,25 @@ pub mod convert_to { pubkey: reward.pubkey.clone(), lamports: reward.lamports, post_balance: reward.post_balance, - reward_type: match reward.reward_type { - None => proto::RewardType::Unspecified, - Some(RewardType::Fee) => proto::RewardType::Fee, - Some(RewardType::Rent) => proto::RewardType::Rent, - Some(RewardType::Staking) => proto::RewardType::Staking, - Some(RewardType::Voting) => proto::RewardType::Voting, - } as i32, + reward_type: create_reward_type(reward.reward_type) as i32, commission: reward.commission.map(|c| c.to_string()).unwrap_or_default(), } } + pub const fn create_reward_type(reward_type: Option) -> proto::RewardType { + match reward_type { + None => proto::RewardType::Unspecified, + Some(RewardType::Fee) => proto::RewardType::Fee, + Some(RewardType::Rent) => proto::RewardType::Rent, + Some(RewardType::Staking) => proto::RewardType::Staking, + Some(RewardType::Voting) => proto::RewardType::Voting, + } + } + + pub const fn create_num_partitions(num_partitions: u64) -> proto::NumPartitions { + proto::NumPartitions { num_partitions } + } + pub fn create_return_data(return_data: &TransactionReturnData) -> proto::ReturnData { proto::ReturnData { program_id: return_data.program_id.to_bytes().into(), diff --git a/yellowstone-grpc-proto/src/plugin/blocks/18144001.bincode b/yellowstone-grpc-proto/src/plugin/blocks/18144001.bincode new file mode 100644 index 00000000..3d0a65b7 Binary files /dev/null and b/yellowstone-grpc-proto/src/plugin/blocks/18144001.bincode differ diff --git a/yellowstone-grpc-proto/src/plugin/blocks/43200000.bincode b/yellowstone-grpc-proto/src/plugin/blocks/43200000.bincode new file mode 100644 index 00000000..acb1f1c6 Binary files /dev/null and b/yellowstone-grpc-proto/src/plugin/blocks/43200000.bincode differ diff --git a/yellowstone-grpc-proto/src/plugin/blocks/64800004.bincode b/yellowstone-grpc-proto/src/plugin/blocks/64800004.bincode new file mode 100644 index 00000000..232addc3 Binary files /dev/null and b/yellowstone-grpc-proto/src/plugin/blocks/64800004.bincode differ diff --git a/yellowstone-grpc-proto/src/plugin/message_ref.rs b/yellowstone-grpc-proto/src/plugin/message_ref.rs new file mode 100644 index 00000000..acca99fa --- /dev/null +++ b/yellowstone-grpc-proto/src/plugin/message_ref.rs @@ -0,0 +1,2180 @@ +use { + super::{ + filter::{FilterAccountsDataSlice, FilterName}, + message::{ + MessageAccount, MessageAccountInfo, MessageBlockMeta, MessageEntry, MessageSlot, + MessageTransaction, MessageTransactionInfo, + }, + }, + crate::{ + convert_from, convert_to, + geyser::{ + subscribe_update::UpdateOneof, CommitmentLevel as CommitmentLevelProto, + SubscribeUpdate, SubscribeUpdateAccount, SubscribeUpdateAccountInfo, + SubscribeUpdateBlock, SubscribeUpdateBlockMeta, SubscribeUpdateEntry, + SubscribeUpdatePing, SubscribeUpdatePong, SubscribeUpdateSlot, + SubscribeUpdateTransaction, SubscribeUpdateTransactionInfo, + SubscribeUpdateTransactionStatus, + }, + solana::storage::confirmed_block::RewardType as RewardTypeProto, + }, + bytes::buf::{Buf, BufMut}, + prost::{ + encoding::{ + encode_key, encode_varint, encoded_len_varint, key_len, message, DecodeContext, + WireType, + }, + DecodeError, + }, + smallvec::SmallVec, + solana_account_decoder::parse_token::UiTokenAmount, + solana_sdk::{ + hash::{Hash, HASH_BYTES}, + instruction::CompiledInstruction, + message::{ + v0::{LoadedAddresses, LoadedMessage, MessageAddressTableLookup}, + AddressLoader, AddressLoaderError, LegacyMessage, MessageHeader, SanitizedMessage, + }, + pubkey::Pubkey, + signature::{Keypair, Signature, Signer}, + transaction::{MessageHash, SanitizedTransaction, Transaction}, + transaction_context::TransactionReturnData, + }, + solana_transaction_status::{ + InnerInstruction, InnerInstructions, Reward, TransactionStatusMeta, TransactionTokenBalance, + }, + std::{borrow::Cow, collections::HashSet, sync::Arc}, +}; + +#[inline] +fn prost_bytes_encode_raw(tag: u32, value: &[u8], buf: &mut impl BufMut) { + encode_key(tag, WireType::LengthDelimited, buf); + encode_varint(value.len() as u64, buf); + buf.put(value); +} + +#[inline] +pub fn prost_field_encoded_len(tag: u32, len: usize) -> usize { + key_len(tag) + encoded_len_varint(len as u64) + len +} + +#[inline] +pub fn prost_bytes_encoded_len(tag: u32, value: &[u8]) -> usize { + prost_field_encoded_len(tag, value.len()) +} + +macro_rules! prost_message_repeated_encoded_len { + ($tag:expr, $values:expr, $get_len:expr) => {{ + key_len($tag) * $values.len() + + $values + .iter() + .map($get_len) + .map(|len| encoded_len_varint(len as u64) + len) + .sum::() + }}; +} + +#[derive(Debug)] +pub struct Message { + pub filters: MessageFilters, + pub message: MessageRef, +} + +impl From<&Message> for SubscribeUpdate { + fn from(message: &Message) -> Self { + SubscribeUpdate { + filters: message + .filters + .iter() + .map(|f| f.as_ref().to_owned()) + .collect(), + update_oneof: Some((&message.message).into()), + } + } +} + +impl prost::Message for Message { + fn encode_raw(&self, buf: &mut impl BufMut) { + for name in self.filters.iter().map(|filter| filter.as_ref()) { + encode_key(1u32, WireType::LengthDelimited, buf); + encode_varint(name.len() as u64, buf); + buf.put_slice(name.as_bytes()); + } + self.message.encode_raw(buf) + } + + fn encoded_len(&self) -> usize { + prost_message_repeated_encoded_len!(1u32, self.filters, |filter| filter.as_ref().len()) + + self.message.encoded_len() + } + + fn merge_field( + &mut self, + _tag: u32, + _wire_type: WireType, + _buf: &mut impl Buf, + _ctx: DecodeContext, + ) -> Result<(), DecodeError> { + unimplemented!() + } + + fn clear(&mut self) { + unimplemented!() + } +} + +impl Message { + pub fn new(filters: MessageFilters, message: MessageRef) -> Self { + Self { filters, message } + } +} + +pub type MessageFilters = SmallVec<[FilterName; 4]>; + +#[derive(Debug)] +pub enum MessageRef { + Account(MessageAccountRef), // 2 + Slot(MessageSlot), // 3 + Transaction(MessageTransactionRef), // 4 + TransactionStatus(MessageTransactionStatusRef), // 10 + Block(Box), // 5 + Ping, // 6 + Pong(MessageRefPong), // 9 + BlockMeta(Arc), // 7 + Entry(Arc), // 8 +} + +impl From<&MessageRef> for UpdateOneof { + fn from(message: &MessageRef) -> Self { + match message { + MessageRef::Account(msg) => Self::Account(msg.into()), + MessageRef::Slot(msg) => Self::Slot(msg.into()), + MessageRef::Transaction(msg) => Self::Transaction(msg.into()), + MessageRef::TransactionStatus(msg) => Self::TransactionStatus(msg.into()), + MessageRef::Block(msg) => Self::Block(msg.as_ref().into()), + MessageRef::Ping => Self::Ping(SubscribeUpdatePing {}), + MessageRef::Pong(msg) => Self::Pong(SubscribeUpdatePong { id: msg.id }), + MessageRef::BlockMeta(msg) => Self::BlockMeta(msg.as_ref().into()), + MessageRef::Entry(msg) => Self::Entry(msg.as_ref().into()), + } + } +} + +impl From for MessageRef { + fn from(message: UpdateOneof) -> Self { + match message { + UpdateOneof::Account(msg) => Self::Account(msg.into()), + UpdateOneof::Slot(msg) => Self::Slot(msg.into()), + UpdateOneof::Transaction(msg) => Self::Transaction(msg.into()), + UpdateOneof::TransactionStatus(msg) => Self::TransactionStatus(msg.into()), + UpdateOneof::Entry(msg) => Self::Entry(Arc::new(msg.into())), + UpdateOneof::BlockMeta(msg) => Self::BlockMeta(Arc::new(msg.into())), + UpdateOneof::Block(msg) => Self::Block(Box::new(msg.into())), + UpdateOneof::Ping(SubscribeUpdatePing {}) => Self::Ping, + UpdateOneof::Pong(SubscribeUpdatePong { id }) => Self::pong(id), + } + } +} + +impl prost::Message for MessageRef { + fn encode_raw(&self, buf: &mut impl BufMut) { + match self { + MessageRef::Account(msg) => message::encode(2u32, msg, buf), + MessageRef::Slot(msg) => message::encode(3u32, msg, buf), + MessageRef::Transaction(msg) => message::encode(4u32, msg, buf), + MessageRef::TransactionStatus(msg) => message::encode(10u32, msg, buf), + MessageRef::Block(msg) => message::encode(5u32, msg, buf), + MessageRef::Ping => { + encode_key(6u32, WireType::LengthDelimited, buf); + encode_varint(0, buf); + } + MessageRef::Pong(msg) => message::encode(9u32, msg, buf), + MessageRef::BlockMeta(msg) => message::encode(7u32, msg.as_ref(), buf), + MessageRef::Entry(msg) => message::encode(8u32, msg.as_ref(), buf), + } + } + + fn encoded_len(&self) -> usize { + match self { + MessageRef::Account(msg) => message::encoded_len(2u32, msg), + MessageRef::Slot(msg) => message::encoded_len(3u32, msg), + MessageRef::Transaction(msg) => message::encoded_len(4u32, msg), + MessageRef::TransactionStatus(msg) => message::encoded_len(10u32, msg), + MessageRef::Block(msg) => message::encoded_len(5u32, msg), + MessageRef::Ping => key_len(6u32) + encoded_len_varint(0), + MessageRef::Pong(msg) => message::encoded_len(9u32, msg), + MessageRef::BlockMeta(msg) => message::encoded_len(7u32, msg.as_ref()), + MessageRef::Entry(msg) => message::encoded_len(8u32, msg.as_ref()), + } + } + + fn merge_field( + &mut self, + _tag: u32, + _wire_type: WireType, + _buf: &mut impl Buf, + _ctx: DecodeContext, + ) -> Result<(), DecodeError> { + unimplemented!() + } + + fn clear(&mut self) { + unimplemented!() + } +} + +impl MessageRef { + pub fn account(message: &MessageAccount, data_slice: FilterAccountsDataSlice) -> Self { + Self::Account(MessageAccountRef { + slot: message.slot, + account: Arc::clone(&message.account), + is_startup: message.is_startup, + data_slice, + }) + } + + pub const fn slot(message: MessageSlot) -> Self { + Self::Slot(message) + } + + pub fn transaction(message: &MessageTransaction) -> Self { + Self::Transaction(MessageTransactionRef { + transaction: Arc::clone(&message.transaction), + slot: message.slot, + }) + } + + pub fn transaction_status(message: &MessageTransaction) -> Self { + Self::TransactionStatus(MessageTransactionStatusRef { + transaction: Arc::clone(&message.transaction), + slot: message.slot, + }) + } + + pub const fn block(message: Box) -> Self { + Self::Block(message) + } + + pub const fn pong(id: i32) -> Self { + Self::Pong(MessageRefPong { id }) + } + + pub fn block_meta(message: Arc) -> Self { + Self::BlockMeta(message) + } + + pub fn entry(message: Arc) -> Self { + Self::Entry(message) + } +} + +#[derive(Debug)] +pub struct MessageAccountRef { + pub account: Arc, + pub slot: u64, + pub is_startup: bool, + pub data_slice: FilterAccountsDataSlice, +} + +impl From<&MessageAccountRef> for SubscribeUpdateAccount { + fn from(msg: &MessageAccountRef) -> Self { + Self { + account: Some((msg.account.as_ref(), &msg.data_slice).into()), + slot: msg.slot, + is_startup: msg.is_startup, + } + } +} + +impl From for MessageAccountRef { + fn from(msg: SubscribeUpdateAccount) -> Self { + Self { + account: Arc::new(msg.account.expect("no account message").into()), + slot: msg.slot, + is_startup: msg.is_startup, + data_slice: FilterAccountsDataSlice::default(), + } + } +} + +impl From<(&MessageAccountInfo, &FilterAccountsDataSlice)> for SubscribeUpdateAccountInfo { + fn from((account, data_slice): (&MessageAccountInfo, &FilterAccountsDataSlice)) -> Self { + SubscribeUpdateAccountInfo { + pubkey: account.pubkey.as_ref().into(), + lamports: account.lamports, + owner: account.owner.as_ref().into(), + executable: account.executable, + rent_epoch: account.rent_epoch, + data: MessageAccountRef::accout_data_slice(account, data_slice).into_owned(), + write_version: account.write_version, + txn_signature: account.txn_signature.map(|s| s.as_ref().into()), + } + } +} + +impl From for MessageAccountInfo { + fn from(account: SubscribeUpdateAccountInfo) -> Self { + Self { + pubkey: Pubkey::try_from(account.pubkey).expect("invalid pubkey"), + lamports: account.lamports, + owner: Pubkey::try_from(account.owner).expect("invalid owner"), + executable: account.executable, + rent_epoch: account.rent_epoch, + data: account.data, + write_version: account.write_version, + txn_signature: account + .txn_signature + .map(|sig| Signature::try_from(sig).expect("invalid signature")), + } + } +} + +impl prost::Message for MessageAccountRef { + fn encode_raw(&self, buf: &mut impl BufMut) { + Self::account_encode_raw(1u32, &self.account, &self.data_slice, buf); + if self.slot != 0u64 { + ::prost::encoding::uint64::encode(2u32, &self.slot, buf); + } + if self.is_startup { + ::prost::encoding::bool::encode(3u32, &self.is_startup, buf); + } + } + + fn encoded_len(&self) -> usize { + prost_field_encoded_len( + 1u32, + Self::account_encoded_len(self.account.as_ref(), &self.data_slice), + ) + if self.slot != 0u64 { + ::prost::encoding::uint64::encoded_len(2u32, &self.slot) + } else { + 0 + } + if self.is_startup { + ::prost::encoding::bool::encoded_len(3u32, &self.is_startup) + } else { + 0 + } + } + + fn merge_field( + &mut self, + _tag: u32, + _wire_type: WireType, + _buf: &mut impl Buf, + _ctx: DecodeContext, + ) -> Result<(), DecodeError> { + unimplemented!() + } + + fn clear(&mut self) { + unimplemented!() + } +} + +impl MessageAccountRef { + fn accout_data_slice<'a>( + account: &'a MessageAccountInfo, + data_slice: &'a FilterAccountsDataSlice, + ) -> Cow<'a, Vec> { + let data_slice = data_slice.as_ref(); + if data_slice.is_empty() { + Cow::Borrowed(&account.data) + } else { + let mut data = Vec::with_capacity(data_slice.iter().map(|s| s.end - s.start).sum()); + for slice in data_slice.iter() { + if account.data.len() >= slice.end { + data.extend_from_slice(&account.data[slice.start..slice.end]); + } + } + Cow::Owned(data) + } + } + + fn account_data_slice_len( + account: &MessageAccountInfo, + data_slice: &FilterAccountsDataSlice, + ) -> usize { + let data_slice = data_slice.as_ref(); + if data_slice.is_empty() { + account.data.len() + } else { + let mut len = 0; + for slice in data_slice.iter() { + if account.data.len() >= slice.end { + len += account.data[slice.start..slice.end].len(); + } + } + len + } + } + + fn account_encode_raw( + tag: u32, + account: &MessageAccountInfo, + data_slice: &FilterAccountsDataSlice, + buf: &mut impl BufMut, + ) { + encode_key(tag, WireType::LengthDelimited, buf); + encode_varint(Self::account_encoded_len(account, data_slice) as u64, buf); + + prost_bytes_encode_raw(1u32, account.pubkey.as_ref(), buf); + if account.lamports != 0u64 { + ::prost::encoding::uint64::encode(2u32, &account.lamports, buf); + } + prost_bytes_encode_raw(3u32, account.owner.as_ref(), buf); + if account.executable { + ::prost::encoding::bool::encode(4u32, &account.executable, buf); + } + if account.rent_epoch != 0u64 { + ::prost::encoding::uint64::encode(5u32, &account.rent_epoch, buf); + } + let data = Self::accout_data_slice(account, data_slice); + if !data.is_empty() { + prost_bytes_encode_raw(6u32, data.as_ref(), buf); + } + if account.write_version != 0u64 { + ::prost::encoding::uint64::encode(7u32, &account.write_version, buf); + } + if let Some(value) = &account.txn_signature { + prost_bytes_encode_raw(8u32, value.as_ref(), buf); + } + } + + fn account_encoded_len( + account: &MessageAccountInfo, + data_slice: &FilterAccountsDataSlice, + ) -> usize { + let data_len = Self::account_data_slice_len(account, data_slice); + + prost_bytes_encoded_len(1u32, account.pubkey.as_ref()) + + if account.lamports != 0u64 { + ::prost::encoding::uint64::encoded_len(2u32, &account.lamports) + } else { + 0 + } + + prost_bytes_encoded_len(3u32, account.owner.as_ref()) + + if account.executable { + ::prost::encoding::bool::encoded_len(4u32, &account.executable) + } else { + 0 + } + + if account.rent_epoch != 0u64 { + ::prost::encoding::uint64::encoded_len(5u32, &account.rent_epoch) + } else { + 0 + } + + if data_len != 0 { + prost_field_encoded_len(6u32, data_len) + } else { + 0 + } + + if account.write_version != 0u64 { + ::prost::encoding::uint64::encoded_len(7u32, &account.write_version) + } else { + 0 + } + + account + .txn_signature + .map_or(0, |sig| prost_bytes_encoded_len(8u32, sig.as_ref())) + } +} + +impl From<&MessageSlot> for SubscribeUpdateSlot { + fn from(msg: &MessageSlot) -> Self { + Self { + slot: msg.slot, + parent: msg.parent, + status: CommitmentLevelProto::from(msg.status) as i32, + } + } +} + +impl From for MessageSlot { + fn from(msg: SubscribeUpdateSlot) -> Self { + Self { + slot: msg.slot, + parent: msg.parent, + status: CommitmentLevelProto::try_from(msg.status) + .expect("valid commitment") + .into(), + } + } +} + +impl prost::Message for MessageSlot { + fn encode_raw(&self, buf: &mut impl BufMut) { + let status = CommitmentLevelProto::from(self.status) as i32; + if self.slot != 0u64 { + ::prost::encoding::uint64::encode(1u32, &self.slot, buf); + } + if let ::core::option::Option::Some(ref value) = self.parent { + ::prost::encoding::uint64::encode(2u32, value, buf); + } + if status != CommitmentLevelProto::default() as i32 { + ::prost::encoding::int32::encode(3u32, &status, buf); + } + } + + fn encoded_len(&self) -> usize { + let status = CommitmentLevelProto::from(self.status) as i32; + + (if self.slot != 0u64 { + ::prost::encoding::uint64::encoded_len(1u32, &self.slot) + } else { + 0 + }) + self.parent.as_ref().map_or(0, |value| { + ::prost::encoding::uint64::encoded_len(2u32, value) + }) + if status != CommitmentLevelProto::default() as i32 { + ::prost::encoding::int32::encoded_len(3u32, &status) + } else { + 0 + } + } + + fn merge_field( + &mut self, + _tag: u32, + _wire_type: WireType, + _buf: &mut impl Buf, + _ctx: DecodeContext, + ) -> Result<(), DecodeError> { + unimplemented!() + } + + fn clear(&mut self) { + unimplemented!() + } +} + +#[derive(Debug)] +pub struct MessageTransactionRef { + pub transaction: Arc, + pub slot: u64, +} + +impl From<&MessageTransactionRef> for SubscribeUpdateTransaction { + fn from(msg: &MessageTransactionRef) -> Self { + Self { + transaction: Some(msg.transaction.as_ref().into()), + slot: msg.slot, + } + } +} + +impl From for MessageTransactionRef { + fn from(msg: SubscribeUpdateTransaction) -> Self { + Self { + transaction: Arc::new(msg.transaction.expect("no transaction message").into()), + slot: msg.slot, + } + } +} + +impl From<&MessageTransactionInfo> for SubscribeUpdateTransactionInfo { + fn from(tx: &MessageTransactionInfo) -> Self { + SubscribeUpdateTransactionInfo { + signature: tx.signature.as_ref().into(), + is_vote: tx.is_vote, + transaction: Some(convert_to::create_transaction(&tx.transaction)), + meta: Some(convert_to::create_transaction_meta(&tx.meta)), + index: tx.index as u64, + } + } +} + +impl From for MessageTransactionInfo { + fn from(msg: SubscribeUpdateTransactionInfo) -> Self { + #[derive(Debug, Clone)] + struct SimpleAddressLoader; + + impl AddressLoader for SimpleAddressLoader { + fn load_addresses( + self, + _lookups: &[MessageAddressTableLookup], + ) -> Result { + Ok(LoadedAddresses::default()) + } + } + + let versioned_tx = + convert_from::create_tx_versioned(msg.transaction.expect("no transaction message")) + .expect("invalid transaction message"); + let transaction = SanitizedTransaction::try_create( + versioned_tx, + MessageHash::Compute, + None, + SimpleAddressLoader, + &HashSet::new(), + ) + .expect("failed to create tx"); + + Self { + signature: Signature::try_from(msg.signature).expect("invalid signature"), + is_vote: msg.is_vote, + transaction, + meta: convert_from::create_tx_meta(msg.meta.expect("no meta message")) + .expect("invalid meta message"), + index: msg.index as usize, + } + } +} + +impl prost::Message for MessageTransactionRef { + fn encode_raw(&self, buf: &mut impl BufMut) { + Self::tx_meta_encode_raw(1u32, &self.transaction, buf); + if self.slot != 0u64 { + ::prost::encoding::uint64::encode(2u32, &self.slot, buf); + } + } + + fn encoded_len(&self) -> usize { + prost_field_encoded_len(1u32, Self::tx_meta_encoded_len(&self.transaction)) + + if self.slot != 0u64 { + ::prost::encoding::uint64::encoded_len(2u32, &self.slot) + } else { + 0 + } + } + + fn merge_field( + &mut self, + _tag: u32, + _wire_type: WireType, + _buf: &mut impl Buf, + _ctx: DecodeContext, + ) -> Result<(), DecodeError> { + unimplemented!() + } + + fn clear(&mut self) { + unimplemented!() + } +} + +impl MessageTransactionRef { + fn tx_meta_encode_raw(tag: u32, tx: &MessageTransactionInfo, buf: &mut impl BufMut) { + encode_key(tag, WireType::LengthDelimited, buf); + encode_varint(Self::tx_meta_encoded_len(tx) as u64, buf); + + let index = tx.index as u64; + + prost_bytes_encode_raw(1u32, tx.signature.as_ref(), buf); + if tx.is_vote { + ::prost::encoding::bool::encode(2u32, &tx.is_vote, buf); + } + Self::tx_encode_raw(3u32, &tx.transaction, buf); + Self::meta_encode_raw(4u32, &tx.meta, buf); + if index != 0u64 { + ::prost::encoding::uint64::encode(5u32, &index, buf); + } + } + + fn tx_encode_raw(tag: u32, tx: &SanitizedTransaction, buf: &mut impl BufMut) { + encode_key(tag, WireType::LengthDelimited, buf); + encode_varint(Self::tx_encoded_len(tx) as u64, buf); + + for sig in tx.signatures() { + prost_bytes_encode_raw(1u32, sig.as_ref(), buf); + } + Self::message_encode_raw(2u32, tx.message(), buf); + } + + fn message_encode_raw(tag: u32, message: &SanitizedMessage, buf: &mut impl BufMut) { + encode_key(tag, WireType::LengthDelimited, buf); + encode_varint(Self::message_encoded_len(message) as u64, buf); + + let (header, account_keys, recent_blockhash, cixs, versioned, atls) = match message { + SanitizedMessage::Legacy(LegacyMessage { message, .. }) => ( + message.header, + &message.account_keys, + &message.recent_blockhash, + &message.instructions, + false, + None, + ), + SanitizedMessage::V0(LoadedMessage { message, .. }) => ( + message.header, + &message.account_keys, + &message.recent_blockhash, + &message.instructions, + true, + Some(&message.address_table_lookups), + ), + }; + + Self::header_encode_raw(1u32, header, buf); + for pubkey in account_keys { + prost_bytes_encode_raw(2u32, pubkey.as_ref(), buf); + } + prost_bytes_encode_raw(3u32, recent_blockhash.as_ref(), buf); + for cix in cixs { + Self::cix_encode_raw(4u32, cix, buf); + } + if versioned { + ::prost::encoding::bool::encode(5u32, &versioned, buf); + } + if let Some(atls) = atls { + for atl in atls { + Self::atl_encode_raw(6u32, atl, buf); + } + } + } + + fn header_encode_raw(tag: u32, header: MessageHeader, buf: &mut impl BufMut) { + encode_key(tag, WireType::LengthDelimited, buf); + encode_varint(Self::header_encoded_len(header) as u64, buf); + + let num_required_signatures = header.num_required_signatures as u32; + let num_readonly_signed_accounts = header.num_readonly_signed_accounts as u32; + let num_readonly_unsigned_accounts = header.num_readonly_unsigned_accounts as u32; + + if num_required_signatures != 0u32 { + ::prost::encoding::uint32::encode(1u32, &num_required_signatures, buf); + } + if num_readonly_signed_accounts != 0u32 { + ::prost::encoding::uint32::encode(2u32, &num_readonly_signed_accounts, buf); + } + if num_readonly_unsigned_accounts != 0u32 { + ::prost::encoding::uint32::encode(3u32, &num_readonly_unsigned_accounts, buf); + } + } + + fn cix_encode_raw(tag: u32, cix: &CompiledInstruction, buf: &mut impl BufMut) { + encode_key(tag, WireType::LengthDelimited, buf); + encode_varint(Self::cix_encoded_len(cix) as u64, buf); + + let program_id_index = cix.program_id_index as u32; + + if program_id_index != 0u32 { + ::prost::encoding::uint32::encode(1u32, &program_id_index, buf); + } + if !cix.accounts.is_empty() { + prost_bytes_encode_raw(2u32, cix.accounts.as_ref(), buf); + } + if !cix.data.is_empty() { + prost_bytes_encode_raw(3u32, cix.data.as_ref(), buf); + } + } + + fn atl_encode_raw(tag: u32, atl: &MessageAddressTableLookup, buf: &mut impl BufMut) { + encode_key(tag, WireType::LengthDelimited, buf); + encode_varint(Self::atl_encoded_len(atl) as u64, buf); + + prost_bytes_encode_raw(1u32, atl.account_key.as_ref(), buf); + if !atl.writable_indexes.is_empty() { + prost_bytes_encode_raw(2u32, atl.writable_indexes.as_ref(), buf); + } + if !atl.readonly_indexes.is_empty() { + prost_bytes_encode_raw(3u32, atl.readonly_indexes.as_ref(), buf); + } + } + + fn meta_encode_raw(tag: u32, meta: &TransactionStatusMeta, buf: &mut impl BufMut) { + encode_key(tag, WireType::LengthDelimited, buf); + encode_varint(Self::meta_encoded_len(meta) as u64, buf); + + let err = convert_to::create_transaction_error(&meta.status); + + if let Some(msg) = err { + ::prost::encoding::message::encode(1u32, &msg, buf); + } + if meta.fee != 0u64 { + ::prost::encoding::uint64::encode(2u32, &meta.fee, buf); + } + ::prost::encoding::uint64::encode_packed(3u32, &meta.pre_balances, buf); + ::prost::encoding::uint64::encode_packed(4u32, &meta.post_balances, buf); + if let Some(vec) = &meta.inner_instructions { + for ixs in vec { + Self::ixs_encode_raw(5u32, ixs, buf); + } + } + if let Some(log_messages) = &meta.log_messages { + ::prost::encoding::string::encode_repeated(6u32, log_messages, buf); + } + if let Some(vec) = &meta.pre_token_balances { + for pre_token_balances in vec { + Self::token_balance_encode_raw(7u32, pre_token_balances, buf); + } + } + if let Some(vec) = &meta.post_token_balances { + for post_token_balances in vec { + Self::token_balance_encode_raw(8u32, post_token_balances, buf); + } + } + if let Some(vec) = &meta.rewards { + for reward in vec { + MessageBlockMeta::reward_encode_raw(9u32, reward, buf); + } + } + if meta.inner_instructions.is_none() { + ::prost::encoding::bool::encode(10u32, &true, buf); + } + if meta.log_messages.is_none() { + ::prost::encoding::bool::encode(11u32, &true, buf); + } + for pubkey in meta.loaded_addresses.writable.iter() { + prost_bytes_encode_raw(12u32, pubkey.as_ref(), buf); + } + for pubkey in meta.loaded_addresses.readonly.iter() { + prost_bytes_encode_raw(13u32, pubkey.as_ref(), buf); + } + if let Some(rd) = &meta.return_data { + Self::return_data_encode_raw(14u32, rd, buf); + } + if meta.return_data.is_none() { + ::prost::encoding::bool::encode(15u32, &true, buf); + } + if let Some(value) = &meta.compute_units_consumed { + ::prost::encoding::uint64::encode(16u32, value, buf); + } + } + + fn ixs_encode_raw(tag: u32, ixs: &InnerInstructions, buf: &mut impl BufMut) { + encode_key(tag, WireType::LengthDelimited, buf); + encode_varint(Self::ixs_encoded_len(ixs) as u64, buf); + + let index = ixs.index as u32; + + if index != 0u32 { + ::prost::encoding::uint32::encode(1u32, &index, buf); + } + for ix in ixs.instructions.iter() { + Self::ix_encode_raw(2u32, ix, buf); + } + } + + fn ix_encode_raw(tag: u32, ix: &InnerInstruction, buf: &mut impl BufMut) { + encode_key(tag, WireType::LengthDelimited, buf); + encode_varint(Self::ix_encoded_len(ix) as u64, buf); + + let program_id_index = ix.instruction.program_id_index as u32; + + if program_id_index != 0u32 { + ::prost::encoding::uint32::encode(1u32, &program_id_index, buf); + } + if !ix.instruction.accounts.is_empty() { + prost_bytes_encode_raw(2u32, &ix.instruction.accounts, buf); + } + if !ix.instruction.data.is_empty() { + prost_bytes_encode_raw(3u32, &ix.instruction.data, buf); + } + if let Some(value) = &ix.stack_height { + ::prost::encoding::uint32::encode(4u32, value, buf); + } + } + + fn token_balance_encode_raw( + tag: u32, + balance: &TransactionTokenBalance, + buf: &mut impl BufMut, + ) { + encode_key(tag, WireType::LengthDelimited, buf); + encode_varint(Self::token_balance_encoded_len(balance) as u64, buf); + + let account_index = balance.account_index as u32; + + if account_index != 0u32 { + ::prost::encoding::uint32::encode(1u32, &account_index, buf); + } + if !balance.mint.is_empty() { + ::prost::encoding::string::encode(2u32, &balance.mint, buf); + } + Self::ui_token_amount_encode_raw(3u32, &balance.ui_token_amount, buf); + if !balance.owner.is_empty() { + ::prost::encoding::string::encode(4u32, &balance.owner, buf); + } + if !balance.program_id.is_empty() { + ::prost::encoding::string::encode(5u32, &balance.program_id, buf); + } + } + + fn ui_token_amount_encode_raw(tag: u32, amount: &UiTokenAmount, buf: &mut impl BufMut) { + let ui_amount = amount.ui_amount.unwrap_or_default(); + let decimals = amount.decimals as u32; + + encode_key(tag, WireType::LengthDelimited, buf); + encode_varint(Self::ui_token_amount_encoded_len(amount) as u64, buf); + + if ui_amount != 0f64 { + ::prost::encoding::double::encode(1u32, &ui_amount, buf); + } + if decimals != 0u32 { + ::prost::encoding::uint32::encode(2u32, &decimals, buf); + } + if !amount.amount.is_empty() { + ::prost::encoding::string::encode(3u32, &amount.amount, buf); + } + if !amount.ui_amount_string.is_empty() { + ::prost::encoding::string::encode(4u32, &amount.ui_amount_string, buf); + } + } + + fn return_data_encode_raw( + tag: u32, + return_data: &TransactionReturnData, + buf: &mut impl BufMut, + ) { + encode_key(tag, WireType::LengthDelimited, buf); + encode_varint(Self::return_data_encoded_len(return_data) as u64, buf); + + prost_bytes_encode_raw(1u32, return_data.program_id.as_ref(), buf); + if !return_data.data.is_empty() { + prost_bytes_encode_raw(2u32, return_data.data.as_ref(), buf); + } + } + + fn tx_meta_encoded_len(tx: &MessageTransactionInfo) -> usize { + let index = tx.index as u64; + + prost_bytes_encoded_len(1u32, tx.signature.as_ref()) + + if tx.is_vote { + ::prost::encoding::bool::encoded_len(2u32, &tx.is_vote) + } else { + 0 + } + + prost_field_encoded_len(3u32, Self::tx_encoded_len(&tx.transaction)) + + prost_field_encoded_len(4u32, Self::meta_encoded_len(&tx.meta)) + + if index != 0u64 { + ::prost::encoding::uint64::encoded_len(5u32, &index) + } else { + 0 + } + } + + fn tx_encoded_len(tx: &SanitizedTransaction) -> usize { + prost_message_repeated_encoded_len!(1u32, tx.signatures(), |sig| sig.as_ref().len()) + + prost_field_encoded_len(2u32, Self::message_encoded_len(tx.message())) + } + + fn message_encoded_len(message: &SanitizedMessage) -> usize { + let (header, account_keys, recent_blockhash, cixs, versioned, atls) = match message { + SanitizedMessage::Legacy(LegacyMessage { message, .. }) => ( + message.header, + &message.account_keys, + &message.recent_blockhash, + &message.instructions, + false, + None, + ), + SanitizedMessage::V0(LoadedMessage { message, .. }) => ( + message.header, + &message.account_keys, + &message.recent_blockhash, + &message.instructions, + true, + Some(&message.address_table_lookups), + ), + }; + + prost_field_encoded_len(1u32, Self::header_encoded_len(header)) + + prost_message_repeated_encoded_len!(2u32, account_keys, |key| key.as_ref().len()) + + prost_bytes_encoded_len(3u32, recent_blockhash.as_ref()) + + prost_message_repeated_encoded_len!(4u32, cixs, Self::cix_encoded_len) + + if versioned { + ::prost::encoding::bool::encoded_len(5u32, &versioned) + } else { + 0 + } + + if let Some(atls) = atls { + prost_message_repeated_encoded_len!(6u32, atls, Self::atl_encoded_len) + } else { + 0 + } + } + + fn header_encoded_len(header: MessageHeader) -> usize { + let num_required_signatures = header.num_required_signatures as u32; + let num_readonly_signed_accounts = header.num_readonly_signed_accounts as u32; + let num_readonly_unsigned_accounts = header.num_readonly_unsigned_accounts as u32; + + (if num_required_signatures != 0u32 { + ::prost::encoding::uint32::encoded_len(1u32, &num_required_signatures) + } else { + 0 + }) + if num_readonly_signed_accounts != 0u32 { + ::prost::encoding::uint32::encoded_len(2u32, &num_readonly_signed_accounts) + } else { + 0 + } + if num_readonly_unsigned_accounts != 0u32 { + ::prost::encoding::uint32::encoded_len(3u32, &num_readonly_unsigned_accounts) + } else { + 0 + } + } + + fn cix_encoded_len(cix: &CompiledInstruction) -> usize { + let program_id_index = cix.program_id_index as u32; + + (if program_id_index != 0u32 { + ::prost::encoding::uint32::encoded_len(1u32, &program_id_index) + } else { + 0 + }) + if !cix.accounts.is_empty() { + prost_bytes_encoded_len(2u32, &cix.accounts) + } else { + 0 + } + if !cix.data.is_empty() { + prost_bytes_encoded_len(3u32, &cix.data) + } else { + 0 + } + } + + fn atl_encoded_len(atl: &MessageAddressTableLookup) -> usize { + prost_bytes_encoded_len(1u32, atl.account_key.as_ref()) + + if !atl.writable_indexes.is_empty() { + prost_bytes_encoded_len(2u32, &atl.writable_indexes) + } else { + 0 + } + + if !atl.readonly_indexes.is_empty() { + prost_bytes_encoded_len(3u32, &atl.readonly_indexes) + } else { + 0 + } + } + + fn meta_encoded_len(meta: &TransactionStatusMeta) -> usize { + convert_to::create_transaction_error(&meta.status) + .map_or(0, |msg| ::prost::encoding::message::encoded_len(1u32, &msg)) + + if meta.fee != 0u64 { + ::prost::encoding::uint64::encoded_len(2u32, &meta.fee) + } else { + 0 + } + + ::prost::encoding::uint64::encoded_len_packed(3u32, &meta.pre_balances) + + ::prost::encoding::uint64::encoded_len_packed(4u32, &meta.post_balances) + + if let Some(ixs) = &meta.inner_instructions { + prost_message_repeated_encoded_len!(5u32, ixs, Self::ixs_encoded_len) + } else { + 0 + } + + if let Some(log_messages) = &meta.log_messages { + ::prost::encoding::string::encoded_len_repeated(6u32, log_messages) + } else { + 0 + } + + if let Some(pre_token_balances) = &meta.pre_token_balances { + prost_message_repeated_encoded_len!( + 7u32, + pre_token_balances, + Self::token_balance_encoded_len + ) + } else { + 0 + } + + if let Some(post_token_balances) = &meta.post_token_balances { + prost_message_repeated_encoded_len!( + 8u32, + post_token_balances, + Self::token_balance_encoded_len + ) + } else { + 0 + } + + if let Some(rewards) = &meta.rewards { + prost_message_repeated_encoded_len!( + 9u32, + rewards, + MessageBlockMeta::reward_encoded_len + ) + } else { + 0 + } + + if meta.inner_instructions.is_none() { + ::prost::encoding::bool::encoded_len(10u32, &true) + } else { + 0 + } + + if meta.log_messages.is_none() { + ::prost::encoding::bool::encoded_len(11u32, &true) + } else { + 0 + } + + prost_message_repeated_encoded_len!(12u32, meta.loaded_addresses.writable, |pk| pk + .as_ref() + .len()) + + prost_message_repeated_encoded_len!(13u32, meta.loaded_addresses.readonly, |pk| pk + .as_ref() + .len()) + + if let Some(rd) = &meta.return_data { + prost_field_encoded_len(14u32, Self::return_data_encoded_len(rd)) + } else { + 0 + } + + if meta.return_data.is_none() { + ::prost::encoding::bool::encoded_len(15u32, &true) + } else { + 0 + } + + meta.compute_units_consumed.as_ref().map_or(0, |value| { + ::prost::encoding::uint64::encoded_len(16u32, value) + }) + } + + fn ixs_encoded_len(ixs: &InnerInstructions) -> usize { + let index = ixs.index as u32; + + (if index != 0u32 { + ::prost::encoding::uint32::encoded_len(1u32, &index) + } else { + 0 + }) + prost_message_repeated_encoded_len!(2u32, &ixs.instructions, Self::ix_encoded_len) + } + + fn ix_encoded_len(ix: &InnerInstruction) -> usize { + let program_id_index = ix.instruction.program_id_index as u32; + + (if program_id_index != 0u32 { + ::prost::encoding::uint32::encoded_len(1u32, &program_id_index) + } else { + 0 + }) + if !ix.instruction.accounts.is_empty() { + prost_bytes_encoded_len(2u32, &ix.instruction.accounts) + } else { + 0 + } + if !ix.instruction.data.is_empty() { + prost_bytes_encoded_len(3u32, &ix.instruction.data) + } else { + 0 + } + ix.stack_height.map_or(0, |value| { + ::prost::encoding::uint32::encoded_len(4u32, &value) + }) + } + + fn token_balance_encoded_len(balance: &TransactionTokenBalance) -> usize { + let account_index = balance.account_index as u32; + + (if account_index != 0u32 { + ::prost::encoding::uint32::encoded_len(1u32, &account_index) + } else { + 0 + }) + if !balance.mint.is_empty() { + ::prost::encoding::string::encoded_len(2u32, &balance.mint) + } else { + 0 + } + prost_field_encoded_len( + 3u32, + Self::ui_token_amount_encoded_len(&balance.ui_token_amount), + ) + if !balance.owner.is_empty() { + ::prost::encoding::string::encoded_len(4u32, &balance.owner) + } else { + 0 + } + if !balance.program_id.is_empty() { + ::prost::encoding::string::encoded_len(5u32, &balance.program_id) + } else { + 0 + } + } + + fn ui_token_amount_encoded_len(amount: &UiTokenAmount) -> usize { + let ui_amount = amount.ui_amount.unwrap_or_default(); + let decimals = amount.decimals as u32; + + (if ui_amount != 0f64 { + ::prost::encoding::double::encoded_len(1u32, &ui_amount) + } else { + 0 + }) + if decimals != 0u32 { + ::prost::encoding::uint32::encoded_len(2u32, &decimals) + } else { + 0 + } + if !amount.amount.is_empty() { + ::prost::encoding::string::encoded_len(3u32, &amount.amount) + } else { + 0 + } + if !amount.ui_amount_string.is_empty() { + ::prost::encoding::string::encoded_len(4u32, &amount.ui_amount_string) + } else { + 0 + } + } + + fn return_data_encoded_len(return_data: &TransactionReturnData) -> usize { + prost_bytes_encoded_len(1u32, return_data.program_id.as_ref()) + + if !return_data.data.is_empty() { + prost_bytes_encoded_len(2u32, return_data.data.as_ref()) + } else { + 0 + } + } +} + +#[derive(Debug)] +pub struct MessageTransactionStatusRef { + pub transaction: Arc, + pub slot: u64, +} + +impl From<&MessageTransactionStatusRef> for SubscribeUpdateTransactionStatus { + fn from(msg: &MessageTransactionStatusRef) -> Self { + Self { + slot: msg.slot, + signature: msg.transaction.signature.as_ref().to_vec(), + is_vote: msg.transaction.is_vote, + index: msg.transaction.index as u64, + err: convert_to::create_transaction_error(&msg.transaction.meta.status), + } + } +} + +impl From for MessageTransactionStatusRef { + fn from(msg: SubscribeUpdateTransactionStatus) -> Self { + let keypair = Keypair::new(); + let message = solana_sdk::message::Message { + header: MessageHeader { + num_required_signatures: 1, + ..MessageHeader::default() + }, + account_keys: vec![keypair.pubkey()], + ..solana_sdk::message::Message::default() + }; + + let transaction = SanitizedTransaction::from_transaction_for_tests(Transaction::new( + &[&keypair], + message, + Hash::default(), + )); + + Self { + slot: msg.slot, + transaction: Arc::new(MessageTransactionInfo { + signature: Signature::try_from(msg.signature).expect("invalid signature"), + is_vote: msg.is_vote, + transaction, + meta: TransactionStatusMeta { + status: match convert_from::create_tx_error(msg.err.as_ref()) + .expect("invalid meta err") + { + Some(err) => Err(err), + None => Ok(()), + }, + ..Default::default() + }, + index: msg.index as usize, + }), + } + } +} + +impl prost::Message for MessageTransactionStatusRef { + fn encode_raw(&self, buf: &mut impl BufMut) { + if self.slot != 0u64 { + ::prost::encoding::uint64::encode(1u32, &self.slot, buf); + } + let tx = &self.transaction; + prost_bytes_encode_raw(2u32, tx.signature.as_ref(), buf); + if tx.is_vote { + ::prost::encoding::bool::encode(3u32, &tx.is_vote, buf); + } + let index = tx.index as u64; + if index != 0u64 { + ::prost::encoding::uint64::encode(4u32, &index, buf); + } + let err = convert_to::create_transaction_error(&tx.meta.status); + if let Some(msg) = err { + ::prost::encoding::message::encode(5u32, &msg, buf); + } + } + + fn encoded_len(&self) -> usize { + let tx = &self.transaction; + let index = tx.index as u64; + + (if self.slot != 0u64 { + ::prost::encoding::uint64::encoded_len(1u32, &self.slot) + } else { + 0 + }) + prost_bytes_encoded_len(2u32, tx.signature.as_ref()) + + if tx.is_vote { + ::prost::encoding::bool::encoded_len(3u32, &tx.is_vote) + } else { + 0 + } + + if index != 0u64 { + ::prost::encoding::uint64::encoded_len(4u32, &index) + } else { + 0 + } + + convert_to::create_transaction_error(&tx.meta.status) + .map_or(0, |msg| ::prost::encoding::message::encoded_len(5u32, &msg)) + } + + fn merge_field( + &mut self, + _tag: u32, + _wire_type: WireType, + _buf: &mut impl Buf, + _ctx: DecodeContext, + ) -> Result<(), DecodeError> { + unimplemented!() + } + + fn clear(&mut self) { + unimplemented!() + } +} + +#[derive(Debug, Clone)] +pub struct MessageRefBlock { + pub meta: Arc, + pub transactions: Vec>, + pub updated_account_count: u64, + pub accounts: Vec>, + pub accounts_data_slice: FilterAccountsDataSlice, + pub entries: Vec>, +} + +impl From<&MessageRefBlock> for SubscribeUpdateBlock { + fn from(msg: &MessageRefBlock) -> Self { + Self { + slot: msg.meta.slot, + blockhash: msg.meta.blockhash.clone(), + rewards: Some(convert_to::create_rewards_obj( + msg.meta.rewards.as_slice(), + msg.meta.num_partitions, + )), + block_time: msg.meta.block_time.map(convert_to::create_timestamp), + block_height: msg.meta.block_height.map(convert_to::create_block_height), + parent_slot: msg.meta.parent_slot, + parent_blockhash: msg.meta.parent_blockhash.clone(), + executed_transaction_count: msg.meta.executed_transaction_count, + transactions: msg + .transactions + .iter() + .map(|tx| tx.as_ref().into()) + .collect(), + updated_account_count: msg.updated_account_count, + accounts: msg + .accounts + .iter() + .map(|account| (account.as_ref(), &msg.accounts_data_slice).into()) + .collect(), + entries_count: msg.meta.entries_count, + entries: msg + .entries + .iter() + .map(|entry| entry.as_ref().into()) + .collect(), + } + } +} + +impl From for MessageRefBlock { + fn from(msg: SubscribeUpdateBlock) -> Self { + let mut rewards = msg + .rewards + .map(|rewards| convert_from::create_rewards_obj(rewards).expect("invalid rewards")); + + let meta = Arc::new(MessageBlockMeta { + parent_slot: msg.parent_slot, + slot: msg.slot, + parent_blockhash: msg.parent_blockhash, + blockhash: msg.blockhash, + rewards: rewards + .as_mut() + .map(|rewards| std::mem::take(&mut rewards.rewards)) + .unwrap_or_default(), + num_partitions: rewards.and_then(|obj| obj.num_partitions), + block_time: msg.block_time.map(|wrapper| wrapper.timestamp), + block_height: msg.block_height.map(|wrapper| wrapper.block_height), + executed_transaction_count: msg.executed_transaction_count, + entries_count: msg.entries_count, + }); + + Self { + meta, + transactions: msg + .transactions + .into_iter() + .map(Into::into) + .map(Arc::new) + .collect(), + updated_account_count: msg.updated_account_count, + accounts: msg + .accounts + .into_iter() + .map(Into::into) + .map(Arc::new) + .collect(), + accounts_data_slice: FilterAccountsDataSlice::default(), + entries: msg + .entries + .into_iter() + .map(Into::into) + .map(Arc::new) + .collect(), + } + } +} + +impl prost::Message for MessageRefBlock { + fn encode_raw(&self, buf: &mut impl BufMut) { + if self.meta.slot != 0u64 { + ::prost::encoding::uint64::encode(1u32, &self.meta.slot, buf); + } + if !self.meta.blockhash.is_empty() { + ::prost::encoding::string::encode(2u32, &self.meta.blockhash, buf); + } + self.meta.rewards_encode_raw(3u32, buf); + if let Some(block_time) = self.meta.block_time { + let msg = convert_to::create_timestamp(block_time); + ::prost::encoding::message::encode(4u32, &msg, buf); + } + if let Some(block_height) = self.meta.block_height { + let msg = convert_to::create_block_height(block_height); + ::prost::encoding::message::encode(5u32, &msg, buf); + } + for tx in &self.transactions { + MessageTransactionRef::tx_meta_encode_raw(6u32, tx.as_ref(), buf); + } + if self.meta.parent_slot != 0u64 { + ::prost::encoding::uint64::encode(7u32, &self.meta.parent_slot, buf); + } + if !self.meta.parent_blockhash.is_empty() { + ::prost::encoding::string::encode(8u32, &self.meta.parent_blockhash, buf); + } + if self.meta.executed_transaction_count != 0u64 { + ::prost::encoding::uint64::encode(9u32, &self.meta.executed_transaction_count, buf); + } + if self.updated_account_count != 0u64 { + ::prost::encoding::uint64::encode(10u32, &self.updated_account_count, buf); + } + for account in &self.accounts { + MessageAccountRef::account_encode_raw( + 11u32, + account.as_ref(), + &self.accounts_data_slice, + buf, + ); + } + if self.meta.entries_count != 0u64 { + ::prost::encoding::uint64::encode(12u32, &self.meta.entries_count, buf); + } + for entry in &self.entries { + encode_key(13u32, WireType::LengthDelimited, buf); + encode_varint(entry.encoded_len() as u64, buf); + entry.encode_raw(buf); + } + } + + fn encoded_len(&self) -> usize { + (if self.meta.slot != 0u64 { + ::prost::encoding::uint64::encoded_len(1u32, &self.meta.slot) + } else { + 0 + }) + if !self.meta.blockhash.is_empty() { + ::prost::encoding::string::encoded_len(2u32, &self.meta.blockhash) + } else { + 0 + } + prost_field_encoded_len(3u32, self.meta.rewards_encoded_len()) + + self + .meta + .block_time + .map(convert_to::create_timestamp) + .map_or(0, |msg| ::prost::encoding::message::encoded_len(4u32, &msg)) + + self + .meta + .block_height + .map(convert_to::create_block_height) + .map_or(0, |msg| ::prost::encoding::message::encoded_len(5u32, &msg)) + + prost_message_repeated_encoded_len!(6u32, self.transactions, |tx| { + MessageTransactionRef::tx_meta_encoded_len(tx.as_ref()) + }) + + if self.meta.parent_slot != 0u64 { + ::prost::encoding::uint64::encoded_len(7u32, &self.meta.parent_slot) + } else { + 0 + } + + if !self.meta.parent_blockhash.is_empty() { + ::prost::encoding::string::encoded_len(8u32, &self.meta.parent_blockhash) + } else { + 0 + } + + if self.meta.executed_transaction_count != 0u64 { + ::prost::encoding::uint64::encoded_len(9u32, &self.meta.executed_transaction_count) + } else { + 0 + } + + if self.updated_account_count != 0u64 { + ::prost::encoding::uint64::encoded_len(10u32, &self.updated_account_count) + } else { + 0 + } + + prost_message_repeated_encoded_len!(11u32, self.accounts, |account| { + MessageAccountRef::account_encoded_len(account.as_ref(), &self.accounts_data_slice) + }) + + if self.meta.entries_count != 0u64 { + ::prost::encoding::uint64::encoded_len(12u32, &self.meta.entries_count) + } else { + 0 + } + + prost_message_repeated_encoded_len!(13u32, self.entries, |entry| entry.encoded_len()) + } + + fn merge_field( + &mut self, + _tag: u32, + _wire_type: WireType, + _buf: &mut impl Buf, + _ctx: DecodeContext, + ) -> Result<(), DecodeError> { + unimplemented!() + } + + fn clear(&mut self) { + unimplemented!() + } +} + +#[derive(prost::Message)] +pub struct MessageRefPong { + #[prost(int32, tag = "1")] + pub id: i32, +} + +impl From<&MessageBlockMeta> for SubscribeUpdateBlockMeta { + fn from(msg: &MessageBlockMeta) -> Self { + Self { + slot: msg.slot, + blockhash: msg.blockhash.clone(), + rewards: Some(convert_to::create_rewards_obj( + msg.rewards.as_slice(), + msg.num_partitions, + )), + block_time: msg.block_time.map(convert_to::create_timestamp), + block_height: msg.block_height.map(convert_to::create_block_height), + parent_slot: msg.parent_slot, + parent_blockhash: msg.parent_blockhash.clone(), + executed_transaction_count: msg.executed_transaction_count, + entries_count: msg.entries_count, + } + } +} + +impl From for MessageBlockMeta { + fn from(msg: SubscribeUpdateBlockMeta) -> Self { + let mut rewards = msg + .rewards + .map(|rewards| convert_from::create_rewards_obj(rewards).expect("invalid rewards")); + + Self { + parent_slot: msg.parent_slot, + slot: msg.slot, + parent_blockhash: msg.parent_blockhash, + blockhash: msg.blockhash, + rewards: rewards + .as_mut() + .map(|rewards| std::mem::take(&mut rewards.rewards)) + .unwrap_or_default(), + num_partitions: rewards.and_then(|obj| obj.num_partitions), + block_time: msg.block_time.map(|wrapper| wrapper.timestamp), + block_height: msg.block_height.map(|wrapper| wrapper.block_height), + executed_transaction_count: msg.executed_transaction_count, + entries_count: msg.entries_count, + } + } +} + +impl prost::Message for MessageBlockMeta { + fn encode_raw(&self, buf: &mut impl BufMut) { + if self.slot != 0u64 { + ::prost::encoding::uint64::encode(1u32, &self.slot, buf); + } + if !self.blockhash.is_empty() { + ::prost::encoding::string::encode(2u32, &self.blockhash, buf); + } + self.rewards_encode_raw(3u32, buf); + if let Some(block_time) = self.block_time { + let msg = convert_to::create_timestamp(block_time); + ::prost::encoding::message::encode(4u32, &msg, buf); + } + if let Some(block_height) = self.block_height { + let msg = convert_to::create_block_height(block_height); + ::prost::encoding::message::encode(5u32, &msg, buf); + } + if self.parent_slot != 0u64 { + ::prost::encoding::uint64::encode(6u32, &self.parent_slot, buf); + } + if !self.parent_blockhash.is_empty() { + ::prost::encoding::string::encode(7u32, &self.parent_blockhash, buf); + } + if self.executed_transaction_count != 0u64 { + ::prost::encoding::uint64::encode(8u32, &self.executed_transaction_count, buf); + } + if self.entries_count != 0u64 { + ::prost::encoding::uint64::encode(9u32, &self.entries_count, buf); + } + } + + fn encoded_len(&self) -> usize { + (if self.slot != 0u64 { + ::prost::encoding::uint64::encoded_len(1u32, &self.slot) + } else { + 0 + }) + if !self.blockhash.is_empty() { + ::prost::encoding::string::encoded_len(2u32, &self.blockhash) + } else { + 0 + } + prost_field_encoded_len(3u32, self.rewards_encoded_len()) + + self + .block_time + .map(convert_to::create_timestamp) + .map_or(0, |msg| ::prost::encoding::message::encoded_len(4u32, &msg)) + + self + .block_height + .map(convert_to::create_block_height) + .map_or(0, |msg| ::prost::encoding::message::encoded_len(5u32, &msg)) + + if self.parent_slot != 0u64 { + ::prost::encoding::uint64::encoded_len(6u32, &self.parent_slot) + } else { + 0 + } + + if !self.parent_blockhash.is_empty() { + ::prost::encoding::string::encoded_len(7u32, &self.parent_blockhash) + } else { + 0 + } + + if self.executed_transaction_count != 0u64 { + ::prost::encoding::uint64::encoded_len(8u32, &self.executed_transaction_count) + } else { + 0 + } + + if self.entries_count != 0u64 { + ::prost::encoding::uint64::encoded_len(9u32, &self.entries_count) + } else { + 0 + } + } + + fn merge_field( + &mut self, + _tag: u32, + _wire_type: WireType, + _buf: &mut impl Buf, + _ctx: DecodeContext, + ) -> Result<(), DecodeError> { + unimplemented!() + } + + fn clear(&mut self) { + unimplemented!() + } +} + +impl MessageBlockMeta { + fn rewards_encode_raw(&self, tag: u32, buf: &mut impl BufMut) { + encode_key(tag, WireType::LengthDelimited, buf); + encode_varint(self.rewards_encoded_len() as u64, buf); + + for reward in &self.rewards { + Self::reward_encode_raw(1u32, reward, buf); + } + if let Some(num_partitions) = self.num_partitions { + let msg = convert_to::create_num_partitions(num_partitions); + ::prost::encoding::message::encode(2u32, &msg, buf); + } + } + + fn reward_encode_raw(tag: u32, reward: &Reward, buf: &mut impl BufMut) { + encode_key(tag, WireType::LengthDelimited, buf); + encode_varint(Self::reward_encoded_len(reward) as u64, buf); + + if !reward.pubkey.is_empty() { + ::prost::encoding::string::encode(1u32, &reward.pubkey, buf); + } + if reward.lamports != 0i64 { + ::prost::encoding::int64::encode(2u32, &reward.lamports, buf); + } + if reward.post_balance != 0u64 { + ::prost::encoding::uint64::encode(3u32, &reward.post_balance, buf); + } + let reward_type = convert_to::create_reward_type(reward.reward_type) as i32; + if reward_type != RewardTypeProto::default() as i32 { + ::prost::encoding::int32::encode(4u32, &reward_type, buf); + } + let commission = Self::commission_to_str(reward.commission); + if commission != b"" { + prost_bytes_encode_raw(5u32, commission, buf); + } + } + + fn rewards_encoded_len(&self) -> usize { + prost_message_repeated_encoded_len!(1u32, self.rewards, Self::reward_encoded_len) + + self + .num_partitions + .map(convert_to::create_num_partitions) + .map_or(0, |msg| ::prost::encoding::message::encoded_len(2u32, &msg)) + } + + fn reward_encoded_len(reward: &Reward) -> usize { + let reward_type = convert_to::create_reward_type(reward.reward_type) as i32; + let commission = Self::commission_to_str(reward.commission); + + (if !reward.pubkey.is_empty() { + ::prost::encoding::string::encoded_len(1u32, &reward.pubkey) + } else { + 0 + }) + if reward.lamports != 0i64 { + ::prost::encoding::int64::encoded_len(2u32, &reward.lamports) + } else { + 0 + } + if reward.post_balance != 0u64 { + ::prost::encoding::uint64::encoded_len(3u32, &reward.post_balance) + } else { + 0 + } + if reward_type != RewardTypeProto::default() as i32 { + ::prost::encoding::int32::encoded_len(4u32, &reward_type) + } else { + 0 + } + if !commission.is_empty() { + prost_bytes_encoded_len(5u32, commission) + } else { + 0 + } + } + + const fn commission_to_str(commission: Option) -> &'static [u8] { + const TABLE: [&[u8]; 256] = [ + b"0", b"1", b"2", b"3", b"4", b"5", b"6", b"7", b"8", b"9", b"10", b"11", b"12", b"13", + b"14", b"15", b"16", b"17", b"18", b"19", b"20", b"21", b"22", b"23", b"24", b"25", + b"26", b"27", b"28", b"29", b"30", b"31", b"32", b"33", b"34", b"35", b"36", b"37", + b"38", b"39", b"40", b"41", b"42", b"43", b"44", b"45", b"46", b"47", b"48", b"49", + b"50", b"51", b"52", b"53", b"54", b"55", b"56", b"57", b"58", b"59", b"60", b"61", + b"62", b"63", b"64", b"65", b"66", b"67", b"68", b"69", b"70", b"71", b"72", b"73", + b"74", b"75", b"76", b"77", b"78", b"79", b"80", b"81", b"82", b"83", b"84", b"85", + b"86", b"87", b"88", b"89", b"90", b"91", b"92", b"93", b"94", b"95", b"96", b"97", + b"98", b"99", b"100", b"101", b"102", b"103", b"104", b"105", b"106", b"107", b"108", + b"109", b"110", b"111", b"112", b"113", b"114", b"115", b"116", b"117", b"118", b"119", + b"120", b"121", b"122", b"123", b"124", b"125", b"126", b"127", b"128", b"129", b"130", + b"131", b"132", b"133", b"134", b"135", b"136", b"137", b"138", b"139", b"140", b"141", + b"142", b"143", b"144", b"145", b"146", b"147", b"148", b"149", b"150", b"151", b"152", + b"153", b"154", b"155", b"156", b"157", b"158", b"159", b"160", b"161", b"162", b"163", + b"164", b"165", b"166", b"167", b"168", b"169", b"170", b"171", b"172", b"173", b"174", + b"175", b"176", b"177", b"178", b"179", b"180", b"181", b"182", b"183", b"184", b"185", + b"186", b"187", b"188", b"189", b"190", b"191", b"192", b"193", b"194", b"195", b"196", + b"197", b"198", b"199", b"200", b"201", b"202", b"203", b"204", b"205", b"206", b"207", + b"208", b"209", b"210", b"211", b"212", b"213", b"214", b"215", b"216", b"217", b"218", + b"219", b"220", b"221", b"222", b"223", b"224", b"225", b"226", b"227", b"228", b"229", + b"230", b"231", b"232", b"233", b"234", b"235", b"236", b"237", b"238", b"239", b"240", + b"241", b"242", b"243", b"244", b"245", b"246", b"247", b"248", b"249", b"250", b"251", + b"252", b"253", b"254", b"255", + ]; + if let Some(index) = commission { + TABLE[index as usize] + } else { + &[] + } + } +} + +impl From<&MessageEntry> for SubscribeUpdateEntry { + fn from(msg: &MessageEntry) -> Self { + Self { + slot: msg.slot, + index: msg.index as u64, + num_hashes: msg.num_hashes, + hash: msg.hash.into(), + executed_transaction_count: msg.executed_transaction_count, + starting_transaction_index: msg.starting_transaction_index, + } + } +} + +impl From for MessageEntry { + fn from(msg: SubscribeUpdateEntry) -> Self { + Self { + slot: msg.slot, + index: msg.index as usize, + num_hashes: msg.num_hashes, + hash: <[u8; HASH_BYTES]>::try_from(msg.hash).expect("invalid entry hash"), + executed_transaction_count: msg.executed_transaction_count, + starting_transaction_index: msg.starting_transaction_index, + } + } +} + +impl prost::Message for MessageEntry { + fn encode_raw(&self, buf: &mut impl BufMut) { + let index = self.index as u64; + if self.slot != 0u64 { + ::prost::encoding::uint64::encode(1u32, &self.slot, buf); + } + if index != 0u64 { + ::prost::encoding::uint64::encode(2u32, &index, buf); + } + if self.num_hashes != 0u64 { + ::prost::encoding::uint64::encode(3u32, &self.num_hashes, buf); + } + prost_bytes_encode_raw(4u32, &self.hash, buf); + if self.executed_transaction_count != 0u64 { + ::prost::encoding::uint64::encode(5u32, &self.executed_transaction_count, buf); + } + if self.starting_transaction_index != 0u64 { + ::prost::encoding::uint64::encode(6u32, &self.starting_transaction_index, buf); + } + } + + fn encoded_len(&self) -> usize { + let index = self.index as u64; + + (if self.slot != 0u64 { + ::prost::encoding::uint64::encoded_len(1u32, &self.slot) + } else { + 0 + }) + if index != 0u64 { + ::prost::encoding::uint64::encoded_len(2u32, &index) + } else { + 0 + } + if self.num_hashes != 0u64 { + ::prost::encoding::uint64::encoded_len(3u32, &self.num_hashes) + } else { + 0 + } + prost_bytes_encoded_len(4u32, &self.hash) + + if self.executed_transaction_count != 0u64 { + ::prost::encoding::uint64::encoded_len(5u32, &self.executed_transaction_count) + } else { + 0 + } + + if self.starting_transaction_index != 0u64 { + ::prost::encoding::uint64::encoded_len(6u32, &self.starting_transaction_index) + } else { + 0 + } + } + + fn merge_field( + &mut self, + _tag: u32, + _wire_type: WireType, + _buf: &mut impl Buf, + _ctx: DecodeContext, + ) -> Result<(), DecodeError> { + unimplemented!() + } + + fn clear(&mut self) { + unimplemented!() + } +} + +#[cfg(any(test, feature = "plugin-bench"))] +pub mod tests { + #![cfg_attr(feature = "plugin-bench", allow(dead_code))] + #![cfg_attr(feature = "plugin-bench", allow(unused_imports))] + use { + super::*, + crate::plugin::message::CommitmentLevel, + prost::Message as _, + prost_011::Message as _, + solana_sdk::{ + message::SimpleAddressLoader, + pubkey::Pubkey, + signature::Signature, + transaction::{MessageHash, SanitizedTransaction}, + }, + solana_storage_proto::convert::generated, + solana_transaction_status::{ConfirmedBlock, TransactionWithStatusMeta}, + std::{ + collections::{HashMap, HashSet}, + fs, + ops::Range, + str::FromStr, + }, + }; + + pub fn create_message_filters(names: &[&str]) -> MessageFilters { + let mut filters = MessageFilters::new(); + for name in names { + filters.push(FilterName::new(*name)); + } + filters + } + + pub fn create_account_data_slice() -> Vec { + [ + vec![], + vec![Range { start: 0, end: 0 }], + vec![Range { start: 2, end: 3 }], + vec![Range { start: 1, end: 3 }, Range { start: 5, end: 10 }], + ] + .into_iter() + .map(FilterAccountsDataSlice::new) + .collect() + } + + pub fn create_accounts_raw() -> Vec> { + let pubkey = Pubkey::from_str("28Dncoh8nmzXYEGLUcBA5SUw5WDwDBn15uUCwrWBbyuu").unwrap(); + let owner = Pubkey::from_str("5jrPJWVGrFvQ2V9wRZC3kHEZhxo9pmMir15x73oHT6mn").unwrap(); + let txn_signature = Signature::from_str("4V36qYhukXcLFuvhZaudSoJpPaFNB7d5RqYKjL2xiSKrxaBfEajqqL4X6viZkEvHJ8XcTJsqVjZxFegxhN7EC9V5").unwrap(); + + let mut accounts = vec![]; + for lamports in [0, 8123] { + for executable in [true, false] { + for rent_epoch in [0, 4242] { + for data in [ + vec![], + [42; 165].to_vec(), + [42; 1024].to_vec(), + [42; 2 * 1024 * 1024].to_vec(), + ] { + for write_version in [0, 1] { + for txn_signature in [None, Some(txn_signature)] { + accounts.push(Arc::new(MessageAccountInfo { + pubkey, + lamports, + owner, + executable, + rent_epoch, + data: data.clone(), + write_version, + txn_signature, + })); + } + } + } + } + } + } + accounts + } + + pub fn create_accounts() -> Vec<(MessageAccount, FilterAccountsDataSlice)> { + let mut vec = vec![]; + for account in create_accounts_raw() { + for slot in [0, 42] { + for is_startup in [true, false] { + for data_slice in create_account_data_slice() { + let msg = MessageAccount { + account: Arc::clone(&account), + slot, + is_startup, + }; + vec.push((msg, data_slice)); + } + } + } + } + vec + } + + pub fn create_entries() -> Vec> { + [ + MessageEntry { + slot: 299888121, + index: 42, + num_hashes: 128, + hash: [98; 32], + executed_transaction_count: 32, + starting_transaction_index: 1000, + }, + MessageEntry { + slot: 299888121, + index: 0, + num_hashes: 16, + hash: [42; 32], + executed_transaction_count: 32, + starting_transaction_index: 1000, + }, + ] + .into_iter() + .map(Arc::new) + .collect() + } + + pub fn load_predefined() -> Vec { + fs::read_dir("./src/plugin/blocks") + .expect("failed to read `blocks` dir") + .map(|entry| { + let path = entry.expect("failed to read `blocks` dir entry").path(); + let data = fs::read(path).expect("failed to read block"); + generated::ConfirmedBlock::decode(data.as_slice()) + .expect("failed to decode block") + .try_into() + .expect("failed to convert decoded block") + }) + .collect() + } + + pub fn load_predefined_blockmeta() -> Vec> { + load_predefined_blocks() + .into_iter() + .map(|block| (block.meta.blockhash.clone(), block.meta)) + .collect::>() + .into_values() + .collect() + } + + pub fn load_predefined_transactions() -> Vec> { + load_predefined_blocks() + .into_iter() + .flat_map(|block| block.transactions.into_iter().map(|tx| (tx.signature, tx))) + .collect::>() + .into_values() + .collect() + } + + pub fn load_predefined_blocks() -> Vec { + load_predefined() + .into_iter() + .flat_map(|block| { + let transactions = block + .transactions + .into_iter() + .enumerate() + .map(|(index, tx)| { + let TransactionWithStatusMeta::Complete(tx) = tx else { + panic!("tx with missed meta"); + }; + let transaction = SanitizedTransaction::try_create( + tx.transaction.clone(), + MessageHash::Compute, + None, + SimpleAddressLoader::Disabled, + &HashSet::new(), + ) + .expect("failed to create tx"); + MessageTransactionInfo { + signature: tx.transaction.signatures[0], + is_vote: true, + transaction, + meta: tx.meta.clone(), + index, + } + }) + .map(Arc::new) + .collect::>(); + + let entries = create_entries(); + + let slot = block.parent_slot + 1; + let block_meta1 = MessageBlockMeta { + parent_slot: block.parent_slot, + slot, + parent_blockhash: block.previous_blockhash, + blockhash: block.blockhash, + rewards: block.rewards, + num_partitions: block.num_partitions, + block_time: block.block_time, + block_height: block.block_height, + executed_transaction_count: transactions.len() as u64, + entries_count: entries.len() as u64, + }; + let mut block_meta2 = block_meta1.clone(); + block_meta2.num_partitions = Some(42); + + let block_meta1 = Arc::new(block_meta1); + let block_meta2 = Arc::new(block_meta2); + + let accounts = create_accounts_raw(); + create_account_data_slice() + .into_iter() + .flat_map(move |data_slice| { + vec![ + MessageRefBlock { + meta: Arc::clone(&block_meta1), + transactions: transactions.clone(), + updated_account_count: accounts.len() as u64, + accounts: accounts.clone(), + accounts_data_slice: data_slice.clone(), + entries: entries.clone(), + }, + MessageRefBlock { + meta: Arc::clone(&block_meta2), + transactions: transactions.clone(), + updated_account_count: accounts.len() as u64, + accounts: accounts.clone(), + accounts_data_slice: data_slice, + entries: entries.clone(), + }, + ] + }) + }) + .collect() + } + + fn encode_decode_cmp(filters: &[&str], message: MessageRef) { + let msg = Message { + filters: create_message_filters(filters), + message, + }; + let update = SubscribeUpdate::from(&msg); + assert_eq!(msg.encoded_len(), update.encoded_len()); + assert_eq!( + SubscribeUpdate::decode(msg.encode_to_vec().as_slice()).expect("failed to decode"), + update + ); + } + + #[test] + fn test_message_account() { + for (msg, data_slice) in create_accounts() { + encode_decode_cmp(&["123"], MessageRef::account(&msg, data_slice)); + } + } + + #[test] + fn test_message_slot() { + for slot in [0, 42] { + for parent in [None, Some(0), Some(42)] { + for status in [ + CommitmentLevel::Processed, + CommitmentLevel::Confirmed, + CommitmentLevel::Finalized, + ] { + encode_decode_cmp( + &["123"], + MessageRef::slot(MessageSlot { + slot, + parent, + status, + }), + ) + } + } + } + } + + #[test] + fn test_message_transaction() { + for transaction in load_predefined_transactions() { + let msg = MessageTransaction { + transaction, + slot: 42, + }; + encode_decode_cmp(&["123"], MessageRef::transaction(&msg)); + encode_decode_cmp(&["123"], MessageRef::transaction_status(&msg)); + } + } + + #[test] + fn test_message_block() { + for block in load_predefined_blocks() { + encode_decode_cmp(&["123"], MessageRef::block(Box::new(block))); + } + } + + #[test] + fn test_message_ping() { + encode_decode_cmp(&["123"], MessageRef::Ping) + } + + #[test] + fn test_message_pong() { + encode_decode_cmp(&["123"], MessageRef::pong(0)); + encode_decode_cmp(&["123"], MessageRef::pong(42)); + } + + #[test] + fn test_message_blockmeta() { + for block_meta in load_predefined_blockmeta() { + encode_decode_cmp(&["123"], MessageRef::block_meta(block_meta)); + } + } + + #[test] + fn test_message_entry() { + for entry in create_entries() { + encode_decode_cmp(&["123"], MessageRef::entry(entry)); + } + } +} diff --git a/yellowstone-grpc-proto/src/plugin/mod.rs b/yellowstone-grpc-proto/src/plugin/mod.rs index 39a8a487..2d5c06d1 100644 --- a/yellowstone-grpc-proto/src/plugin/mod.rs +++ b/yellowstone-grpc-proto/src/plugin/mod.rs @@ -1,2 +1,9 @@ pub mod filter; pub mod message; +pub mod message_ref; + +pub mod proto { + #![allow(clippy::clone_on_ref_ptr)] + #![allow(clippy::missing_const_for_fn)] + tonic::include_proto!("geyser.Geyser"); +}