diff --git a/CHANGELOG.md b/CHANGELOG.md index 5642d252..1c9124af 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,6 +27,7 @@ The minor version will be incremented upon a breaking change and the patch versi - proto: add mod `plugin` with `FilterNames` cache ([#458](https://github.com/rpcpool/yellowstone-grpc/pull/458)) - proto: move enum Message from geyser crate ([#459](https://github.com/rpcpool/yellowstone-grpc/pull/459)) - proto: move `Filter` from geyser crate ([#466](https://github.com/rpcpool/yellowstone-grpc/pull/466)) +- geyser: serialize from custom message istead of generated ([#467](https://github.com/rpcpool/yellowstone-grpc/pull/467)) ### Breaking diff --git a/Cargo.lock b/Cargo.lock index 1f35b40d..daf15600 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -792,7 +792,7 @@ version = "4.5.13" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "501d359d5f3dcaf6ecdeee48833ae73ec6e42723a1e52419c79abf9507eec0a0" dependencies = [ - "heck", + "heck 0.5.0", "proc-macro2", "quote", "syn 2.0.72", @@ -1502,6 +1502,12 @@ version = "0.14.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" +[[package]] +name = "heck" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8" + [[package]] name = "heck" version = "0.5.0" @@ -1559,6 +1565,15 @@ dependencies = [ "hmac 0.8.1", ] +[[package]] +name = "home" +version = "0.5.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3d1354bf6b7235cb4a0576c2619fd4ed18183f689b12b006a0ee7329eeff9a5" +dependencies = [ + "windows-sys 0.52.0", +] + [[package]] name = "hostname" version = "0.4.0" @@ -2056,6 +2071,12 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "multimap" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a" + [[package]] name = "multimap" version = "0.10.0" @@ -2368,6 +2389,16 @@ dependencies = [ "zerocopy", ] +[[package]] +name = "prettyplease" +version = "0.1.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c8646e95016a7a6c4adea95bafa8a16baab64b583356217f2c85db4a39d9a86" +dependencies = [ + "proc-macro2", + "syn 1.0.109", +] + [[package]] name = "prettyplease" version = "0.2.20" @@ -2444,6 +2475,16 @@ dependencies = [ "thiserror", ] +[[package]] +name = "prost" +version = "0.11.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b82eaa1d779e9a4bc1c3217db8ffbeabaae1dca241bf70183242128d48681cd" +dependencies = [ + "bytes", + "prost-derive 0.11.9", +] + [[package]] name = "prost" version = "0.13.1" @@ -2451,7 +2492,29 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e13db3d3fde688c61e2446b4d843bc27a7e8af269a69440c0308021dc92333cc" dependencies = [ "bytes", - "prost-derive", + "prost-derive 0.13.1", +] + +[[package]] +name = "prost-build" +version = "0.11.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "119533552c9a7ffacc21e099c24a0ac8bb19c2a2a3f363de84cd9b844feab270" +dependencies = [ + "bytes", + "heck 0.4.1", + "itertools 0.10.5", + "lazy_static", + "log", + "multimap 0.8.3", + "petgraph", + "prettyplease 0.1.25", + "prost 0.11.9", + "prost-types 0.11.9", + "regex", + "syn 1.0.109", + "tempfile", + "which", ] [[package]] @@ -2461,20 +2524,33 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5bb182580f71dd070f88d01ce3de9f4da5021db7115d2e1c3605a754153b77c1" dependencies = [ "bytes", - "heck", + "heck 0.5.0", "itertools 0.12.1", "log", - "multimap", + "multimap 0.10.0", "once_cell", "petgraph", - "prettyplease", - "prost", - "prost-types", + "prettyplease 0.2.20", + "prost 0.13.1", + "prost-types 0.13.1", "regex", "syn 2.0.72", "tempfile", ] +[[package]] +name = "prost-derive" +version = "0.11.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5d2d8d10f3c6ded6da8b05b5fb3b8a5082514344d56c9f871412d29b4e075b4" +dependencies = [ + "anyhow", + "itertools 0.10.5", + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "prost-derive" version = "0.13.1" @@ -2488,13 +2564,22 @@ dependencies = [ "syn 2.0.72", ] +[[package]] +name = "prost-types" +version = "0.11.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "213622a1460818959ac1181aaeb2dc9c7f63df720db7d788b3e24eacd1983e13" +dependencies = [ + "prost 0.11.9", +] + [[package]] name = "prost-types" version = "0.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cee5168b05f49d4b0ca581206eb14a7b22fafd963efe729ac48eb03266e25cc2" dependencies = [ - "prost", + "prost 0.13.1", ] [[package]] @@ -3308,6 +3393,23 @@ version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "468aa43b7edb1f9b7b7b686d5c3aeb6630dc1708e86e31343499dd5c4d775183" +[[package]] +name = "solana-storage-proto" +version = "2.0.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3bedde2051ddfa8408a504db80a3007f7b0e9aba537ace9dd06c2187084e0a1a" +dependencies = [ + "bincode", + "bs58", + "prost 0.11.9", + "protobuf-src", + "serde", + "solana-account-decoder", + "solana-sdk", + "solana-transaction-status", + "tonic-build 0.9.2", +] + [[package]] name = "solana-transaction-status" version = "2.0.10" @@ -3940,7 +4042,7 @@ dependencies = [ "hyper-util", "percent-encoding", "pin-project", - "prost", + "prost 0.13.1", "rustls-native-certs", "rustls-pemfile 2.1.3", "socket2", @@ -3954,15 +4056,28 @@ dependencies = [ "zstd 0.13.2", ] +[[package]] +name = "tonic-build" +version = "0.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a6fdaae4c2c638bb70fe42803a26fbd6fc6ac8c72f5c59f67ecc2a2dcabf4b07" +dependencies = [ + "prettyplease 0.1.25", + "proc-macro2", + "prost-build 0.11.9", + "quote", + "syn 1.0.109", +] + [[package]] name = "tonic-build" version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "568392c5a2bd0020723e3f387891176aabafe36fd9fcd074ad309dfa0c8eb964" dependencies = [ - "prettyplease", + "prettyplease 0.2.20", "proc-macro2", - "prost-build", + "prost-build 0.13.1", "quote", "syn 2.0.72", ] @@ -3974,7 +4089,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e1e10e6a96ee08b6ce443487d4368442d328d0e746f3681f81127f7dc41b4955" dependencies = [ "async-stream", - "prost", + "prost 0.13.1", "tokio", "tokio-stream", "tonic", @@ -4291,6 +4406,18 @@ version = "0.25.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5f20c57d8d7db6d3b86154206ae5d8fba62dd39573114de97c2cb0578251f8e1" +[[package]] +name = "which" +version = "4.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87ba24419a2078cd2b0f2ede2691b6c66d8e47836da3b6db8265ebad47afbfc7" +dependencies = [ + "either", + "home", + "once_cell", + "rustix", +] + [[package]] name = "winapi" version = "0.3.9" @@ -4593,16 +4720,20 @@ dependencies = [ "base64 0.22.1", "bincode", "bs58", - "prost", + "bytes", + "prost 0.11.9", + "prost 0.13.1", "protobuf-src", "serde", + "smallvec", "solana-account-decoder", "solana-sdk", + "solana-storage-proto", "solana-transaction-status", "spl-token-2022", "thiserror", "tonic", - "tonic-build", + "tonic-build 0.12.1", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 1973e387..35a55a3d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -45,6 +45,7 @@ log = "0.4.17" maplit = "1.0.2" prometheus = "0.13.2" prost = "0.13.1" +prost_011 = { package = "prost", version = "0.11.9" } protobuf-src = "1.1.0" scylla = "0.13.0" serde = "1.0.145" @@ -52,7 +53,9 @@ serde_json = "1.0.86" solana-account-decoder = "~2.0.10" solana-logger = "~2.0.10" solana-sdk = "~2.0.10" +solana-storage-proto = "~2.0.10" solana-transaction-status = "~2.0.10" +smallvec = "1.13.2" spl-token-2022 = "4.0.0" thiserror = "1.0" tokio = "1.21.2" diff --git a/yellowstone-grpc-geyser/src/config.rs b/yellowstone-grpc-geyser/src/config.rs index 3d227ffe..1906ae50 100644 --- a/yellowstone-grpc-geyser/src/config.rs +++ b/yellowstone-grpc-geyser/src/config.rs @@ -6,7 +6,7 @@ use { std::{fs::read_to_string, net::SocketAddr, path::Path, time::Duration}, tokio::sync::Semaphore, tonic::codec::CompressionEncoding, - yellowstone_grpc_proto::plugin::filter::FilterLimits, + yellowstone_grpc_proto::plugin::filter::limits::FilterLimits, }; #[derive(Debug, Clone, Deserialize)] diff --git a/yellowstone-grpc-geyser/src/grpc.rs b/yellowstone-grpc-geyser/src/grpc.rs index 79bccded..5360b02f 100644 --- a/yellowstone-grpc-geyser/src/grpc.rs +++ b/yellowstone-grpc-geyser/src/grpc.rs @@ -36,20 +36,23 @@ use { tonic_health::server::health_reporter, yellowstone_grpc_proto::{ plugin::{ - filter::{Filter, FilterLimits, FilterNames}, + filter::{ + limits::FilterLimits, + message::{FilteredUpdate, FilteredUpdateOneof}, + name::FilterNames, + Filter, + }, message::{ CommitmentLevel, Message, MessageBlock, MessageBlockMeta, MessageEntry, MessageSlot, MessageTransactionInfo, }, + proto::geyser_server::{Geyser, GeyserServer}, }, prelude::{ - geyser_server::{Geyser, GeyserServer}, - subscribe_update::UpdateOneof, CommitmentLevel as CommitmentLevelProto, GetBlockHeightRequest, GetBlockHeightResponse, GetLatestBlockhashRequest, GetLatestBlockhashResponse, GetSlotRequest, GetSlotResponse, GetVersionRequest, GetVersionResponse, IsBlockhashValidRequest, - IsBlockhashValidResponse, PingRequest, PongResponse, SubscribeRequest, SubscribeUpdate, - SubscribeUpdatePing, + IsBlockhashValidResponse, PingRequest, PongResponse, SubscribeRequest, }, }, }; @@ -724,7 +727,7 @@ impl GrpcService { async fn client_loop( id: usize, endpoint: String, - stream_tx: mpsc::Sender>, + stream_tx: mpsc::Sender>, mut client_rx: mpsc::UnboundedReceiver>, mut snapshot_rx: Option>>, mut messages_rx: broadcast::Receiver<(CommitmentLevel, Arc>)>, @@ -813,7 +816,7 @@ impl GrpcService { if commitment == filter.get_commitment_level() { for message in messages.iter() { - for message in filter.get_update(message, Some(commitment)) { + for message in filter.get_updates(message, Some(commitment)) { match stream_tx.try_send(Ok(message)) { Ok(()) => {} Err(mpsc::error::TrySendError::Full(_)) => { @@ -854,7 +857,7 @@ impl GrpcService { async fn client_loop_snapshot( id: usize, endpoint: &str, - stream_tx: &mpsc::Sender>, + stream_tx: &mpsc::Sender>, client_rx: &mut mpsc::UnboundedReceiver>, snapshot_rx: crossbeam_channel::Receiver>, is_alive: &mut bool, @@ -904,7 +907,7 @@ impl GrpcService { } }; - for message in filter.get_update(&message, None) { + for message in filter.get_updates(&message, None) { if stream_tx.send(Ok(message)).await.is_err() { error!("client #{id}: stream closed"); *is_alive = false; @@ -917,7 +920,7 @@ impl GrpcService { #[tonic::async_trait] impl Geyser for GrpcService { - type SubscribeStream = ReceiverStream>; + type SubscribeStream = ReceiverStream>; async fn subscribe( &self, @@ -947,18 +950,14 @@ impl Geyser for GrpcService { let exit = ping_exit.notified(); tokio::pin!(exit); - let ping_msg = SubscribeUpdate { - filters: vec![], - update_oneof: Some(UpdateOneof::Ping(SubscribeUpdatePing {})), - }; - loop { tokio::select! { _ = &mut exit => { break; } _ = sleep(Duration::from_secs(10)) => { - match ping_stream_tx.try_send(Ok(ping_msg.clone())) { + let msg = FilteredUpdate::new_empty(FilteredUpdateOneof::ping()); + match ping_stream_tx.try_send(Ok(msg)) { Ok(()) => {} Err(mpsc::error::TrySendError::Full(_)) => {} Err(mpsc::error::TrySendError::Closed(_)) => { diff --git a/yellowstone-grpc-proto/Cargo.toml b/yellowstone-grpc-proto/Cargo.toml index 5a3dc459..7723e99d 100644 --- a/yellowstone-grpc-proto/Cargo.toml +++ b/yellowstone-grpc-proto/Cargo.toml @@ -13,17 +13,23 @@ publish = true [dependencies] agave-geyser-plugin-interface = { workspace = true, optional = true } base64 = { workspace = true, optional = true } -bs58 = { workspace = true, optional = true } bincode = { workspace = true, optional = true } +bs58 = { workspace = true, optional = true } +bytes = { workspace = true, optional = true } prost = { workspace = true } serde = { workspace = true, optional = true } solana-account-decoder = { workspace = true, optional = true } solana-sdk = { workspace = true, optional = true } solana-transaction-status = { workspace = true, optional = true } +smallvec = { workspace = true, optional = true } spl-token-2022 = { workspace = true, optional = true } thiserror = { workspace = true, optional = true } tonic = { workspace = true } +[dev-dependencies] +prost_011 = { workspace = true } +solana-storage-proto = { workspace = true } + [build-dependencies] anyhow = { workspace = true } protobuf-src = { workspace = true } @@ -42,7 +48,9 @@ plugin = [ "dep:agave-geyser-plugin-interface", "dep:base64", "dep:bs58", + "dep:bytes", "dep:serde", + "dep:smallvec", "dep:spl-token-2022", "dep:thiserror" ] diff --git a/yellowstone-grpc-proto/build.rs b/yellowstone-grpc-proto/build.rs index 66db00ba..9df65193 100644 --- a/yellowstone-grpc-proto/build.rs +++ b/yellowstone-grpc-proto/build.rs @@ -1,5 +1,96 @@ +use tonic_build::manual::{Builder, Method, Service}; + fn main() -> anyhow::Result<()> { std::env::set_var("PROTOC", protobuf_src::protoc()); + + // build protos tonic_build::compile_protos("proto/geyser.proto")?; + + // build with accepting our custom struct + let geyser_service = Service::builder() + .name("Geyser") + .package("geyser") + .method( + Method::builder() + .name("subscribe") + .route_name("Subscribe") + .input_type("crate::geyser::SubscribeRequest") + // .output_type("crate::geyser::SubscribeUpdate") + .output_type("crate::plugin::filter::message::FilteredUpdate") + .codec_path("tonic::codec::ProstCodec") + // .codec_path("crate::plugin::codec::SubscribeCodec") + .client_streaming() + .server_streaming() + .build(), + ) + .method( + Method::builder() + .name("ping") + .route_name("Ping") + .input_type("crate::geyser::PingRequest") + .output_type("crate::geyser::PongResponse") + .codec_path("tonic::codec::ProstCodec") + .build(), + ) + .method( + Method::builder() + .name("get_latest_blockhash") + .route_name("GetLatestBlockhash") + .input_type("crate::geyser::GetLatestBlockhashRequest") + .output_type("crate::geyser::GetLatestBlockhashResponse") + .codec_path("tonic::codec::ProstCodec") + .build(), + ) + .method( + Method::builder() + .name("get_block_height") + .route_name("GetBlockHeight") + .input_type("crate::geyser::GetBlockHeightRequest") + .output_type("crate::geyser::GetBlockHeightResponse") + .codec_path("tonic::codec::ProstCodec") + .build(), + ) + .method( + Method::builder() + .name("get_slot") + .route_name("GetSlot") + .input_type("crate::geyser::GetSlotRequest") + .output_type("crate::geyser::GetSlotResponse") + .codec_path("tonic::codec::ProstCodec") + .build(), + ) + .method( + Method::builder() + .name("is_blockhash_valid") + .route_name("IsBlockhashValid") + .input_type("crate::geyser::IsBlockhashValidRequest") + .output_type("crate::geyser::IsBlockhashValidResponse") + .codec_path("tonic::codec::ProstCodec") + .build(), + ) + .method( + Method::builder() + .name("get_version") + .route_name("GetVersion") + .input_type("crate::geyser::GetVersionRequest") + .output_type("crate::geyser::GetVersionResponse") + .codec_path("tonic::codec::ProstCodec") + .build(), + ) + .build(); + Builder::new() + .build_client(false) + .compile(&[geyser_service]); + + // patching generated custom struct (if custom Codec is used) + // let mut location = std::path::PathBuf::from(std::env::var("OUT_DIR")?); + // location.push("geyser.Geyser.rs"); + // let geyser_rs = std::fs::read_to_string(location.clone())?; + // let geyser_rs = geyser_rs.replace( + // "let codec = crate::plugin::codec::SubscribeCodec::default();", + // "let codec = crate::plugin::codec::SubscribeCodec::::default();", + // ); + // std::fs::write(location, geyser_rs)?; + Ok(()) } diff --git a/yellowstone-grpc-proto/fixtures/blocks/18144001.bincode b/yellowstone-grpc-proto/fixtures/blocks/18144001.bincode new file mode 100644 index 00000000..3d0a65b7 Binary files /dev/null and b/yellowstone-grpc-proto/fixtures/blocks/18144001.bincode differ diff --git a/yellowstone-grpc-proto/fixtures/blocks/43200000.bincode b/yellowstone-grpc-proto/fixtures/blocks/43200000.bincode new file mode 100644 index 00000000..acb1f1c6 Binary files /dev/null and b/yellowstone-grpc-proto/fixtures/blocks/43200000.bincode differ diff --git a/yellowstone-grpc-proto/fixtures/blocks/64800004.bincode b/yellowstone-grpc-proto/fixtures/blocks/64800004.bincode new file mode 100644 index 00000000..232addc3 Binary files /dev/null and b/yellowstone-grpc-proto/fixtures/blocks/64800004.bincode differ diff --git a/yellowstone-grpc-proto/src/plugin/filter/filter.rs b/yellowstone-grpc-proto/src/plugin/filter/filter.rs index 8eb4c079..bae12393 100644 --- a/yellowstone-grpc-proto/src/plugin/filter/filter.rs +++ b/yellowstone-grpc-proto/src/plugin/filter/filter.rs @@ -4,16 +4,12 @@ use { subscribe_request_filter_accounts_filter::Filter as AccountsFilterDataOneof, subscribe_request_filter_accounts_filter_lamports::Cmp as AccountsFilterLamports, subscribe_request_filter_accounts_filter_memcmp::Data as AccountsFilterMemcmpOneof, - subscribe_update::UpdateOneof, CommitmentLevel as CommitmentLevelProto, - SubscribeRequest, SubscribeRequestAccountsDataSlice, SubscribeRequestFilterAccounts, + CommitmentLevel as CommitmentLevelProto, SubscribeRequest, + SubscribeRequestAccountsDataSlice, SubscribeRequestFilterAccounts, SubscribeRequestFilterAccountsFilter, SubscribeRequestFilterAccountsFilterLamports, SubscribeRequestFilterBlocks, SubscribeRequestFilterBlocksMeta, SubscribeRequestFilterEntry, SubscribeRequestFilterSlots, - SubscribeRequestFilterTransactions, SubscribeUpdate, SubscribeUpdateAccount, - SubscribeUpdateAccountInfo, SubscribeUpdateBlock, SubscribeUpdateBlockMeta, - SubscribeUpdateEntry, SubscribeUpdatePong, SubscribeUpdateSlot, - SubscribeUpdateTransaction, SubscribeUpdateTransactionInfo, - SubscribeUpdateTransactionStatus, + SubscribeRequestFilterTransactions, }, plugin::{ filter::{ @@ -22,12 +18,15 @@ use { FilterLimitsCheckError, FilterLimitsEntries, FilterLimitsSlots, FilterLimitsTransactions, }, + message::{ + FilteredUpdate, FilteredUpdateBlock, FilteredUpdateFilters, + FilteredUpdateOneof, FilteredUpdates, + }, name::{FilterName, FilterNameError, FilterNames}, }, message::{ - CommitmentLevel, Message, MessageAccount, MessageAccountInfo, MessageBlock, - MessageBlockMeta, MessageEntry, MessageSlot, MessageTransaction, - MessageTransactionInfo, + CommitmentLevel, Message, MessageAccount, MessageBlock, MessageBlockMeta, + MessageEntry, MessageSlot, MessageTransaction, }, }, }, @@ -45,138 +44,6 @@ use { }, }; -#[derive(Debug, Clone)] -pub enum FilteredMessage<'a> { - Slot(&'a MessageSlot), - Account(&'a MessageAccount), - Transaction(&'a MessageTransaction), - TransactionStatus(&'a MessageTransaction), - Entry(&'a MessageEntry), - Block(MessageBlock), - BlockMeta(&'a MessageBlockMeta), -} - -impl<'a> FilteredMessage<'a> { - fn as_proto_account( - message: &MessageAccountInfo, - data_slice: &FilterAccountsDataSlice, - ) -> SubscribeUpdateAccountInfo { - let data_slice = data_slice.as_ref(); - let data = if data_slice.is_empty() { - message.data.clone() - } else { - let mut data = Vec::with_capacity(data_slice.iter().map(|ds| ds.end - ds.start).sum()); - for data_slice in data_slice { - if message.data.len() >= data_slice.end { - data.extend_from_slice(&message.data[data_slice.start..data_slice.end]); - } - } - data - }; - SubscribeUpdateAccountInfo { - pubkey: message.pubkey.as_ref().into(), - lamports: message.lamports, - owner: message.owner.as_ref().into(), - executable: message.executable, - rent_epoch: message.rent_epoch, - data, - write_version: message.write_version, - txn_signature: message.txn_signature.map(|s| s.as_ref().into()), - } - } - - fn as_proto_transaction(message: &MessageTransactionInfo) -> SubscribeUpdateTransactionInfo { - SubscribeUpdateTransactionInfo { - signature: message.signature.as_ref().into(), - is_vote: message.is_vote, - transaction: Some(message.transaction.clone()), - meta: Some(message.meta.clone()), - index: message.index as u64, - } - } - - fn as_proto_entry(message: &MessageEntry) -> SubscribeUpdateEntry { - SubscribeUpdateEntry { - slot: message.slot, - index: message.index as u64, - num_hashes: message.num_hashes, - hash: message.hash.to_bytes().to_vec(), - executed_transaction_count: message.executed_transaction_count, - starting_transaction_index: message.starting_transaction_index, - } - } - - pub fn as_proto(&self, accounts_data_slice: &FilterAccountsDataSlice) -> UpdateOneof { - match self { - Self::Slot(message) => UpdateOneof::Slot(SubscribeUpdateSlot { - slot: message.slot, - parent: message.parent, - status: message.status as i32, - }), - Self::Account(message) => UpdateOneof::Account(SubscribeUpdateAccount { - account: Some(Self::as_proto_account( - message.account.as_ref(), - accounts_data_slice, - )), - slot: message.slot, - is_startup: message.is_startup, - }), - Self::Transaction(message) => UpdateOneof::Transaction(SubscribeUpdateTransaction { - transaction: Some(Self::as_proto_transaction(message.transaction.as_ref())), - slot: message.slot, - }), - Self::TransactionStatus(message) => { - UpdateOneof::TransactionStatus(SubscribeUpdateTransactionStatus { - slot: message.slot, - signature: message.transaction.signature.as_ref().into(), - is_vote: message.transaction.is_vote, - index: message.transaction.index as u64, - err: message.transaction.meta.err.clone(), - }) - } - Self::Entry(message) => UpdateOneof::Entry(Self::as_proto_entry(message)), - Self::Block(message) => UpdateOneof::Block(SubscribeUpdateBlock { - slot: message.meta.slot, - blockhash: message.meta.blockhash.clone(), - rewards: Some(message.meta.rewards.clone()), - block_time: message.meta.block_time, - block_height: message.meta.block_height, - parent_slot: message.meta.parent_slot, - parent_blockhash: message.meta.parent_blockhash.clone(), - executed_transaction_count: message.meta.executed_transaction_count, - transactions: message - .transactions - .iter() - .map(|tx| Self::as_proto_transaction(tx.as_ref())) - .collect(), - updated_account_count: message.updated_account_count, - accounts: message - .accounts - .iter() - .map(|acc| Self::as_proto_account(acc.as_ref(), accounts_data_slice)) - .collect(), - entries_count: message.meta.entries_count, - entries: message - .entries - .iter() - .map(|entry| Self::as_proto_entry(entry.as_ref())) - .collect(), - }), - Self::BlockMeta(message) => UpdateOneof::BlockMeta(SubscribeUpdateBlockMeta { - slot: message.slot, - blockhash: message.blockhash.clone(), - rewards: Some(message.rewards.clone()), - block_time: message.block_time, - block_height: message.block_height, - parent_slot: message.parent_slot, - parent_blockhash: message.parent_blockhash.clone(), - executed_transaction_count: message.executed_transaction_count, - entries_count: message.entries_count, - }), - } - } -} - #[derive(Debug, thiserror::Error)] pub enum FilterError { #[error(transparent)] @@ -205,6 +72,30 @@ pub enum FilterError { pub type FilterResult = Result; +macro_rules! filtered_updates_once_owned { + ($filters:ident, $message:expr) => {{ + let mut messages = FilteredUpdates::new(); + if !$filters.is_empty() { + messages.push(FilteredUpdate::new($filters, $message)); + } + messages + }}; +} + +macro_rules! filtered_updates_once_ref { + ($filters:ident, $message:expr) => {{ + let mut messages = FilteredUpdates::new(); + if !$filters.is_empty() { + let mut message_filters = FilteredUpdateFilters::new(); + for filter in $filters { + message_filters.push(filter.clone()); + } + messages.push(FilteredUpdate::new(message_filters, $message)); + } + messages + }}; +} + #[derive(Debug, Clone)] pub struct Filter { accounts: FilterAccounts, @@ -329,53 +220,30 @@ impl Filter { self.commitment } - pub fn get_filters<'a>( - &'a self, - message: &'a Message, + pub fn get_updates( + &self, + message: &Message, commitment: Option, - ) -> Box, FilteredMessage<'a>)> + Send + 'a> { + ) -> FilteredUpdates { match message { - Message::Account(message) => self.accounts.get_filters(message), - Message::Slot(message) => self.slots.get_filters(message, commitment), - Message::Transaction(message) => Box::new( - self.transactions - .get_filters(message) - .chain(self.transactions_status.get_filters(message)), - ), - Message::Entry(message) => self.entries.get_filters(message), - Message::Block(message) => self.blocks.get_filters(message), - Message::BlockMeta(message) => self.blocks_meta.get_filters(message), + Message::Account(message) => self + .accounts + .get_updates(message, &self.accounts_data_slice), + Message::Slot(message) => self.slots.get_updates(message, commitment), + Message::Transaction(message) => { + let mut updates = self.transactions.get_updates(message); + updates.append(&mut self.transactions_status.get_updates(message)); + updates + } + Message::Entry(message) => self.entries.get_updates(message), + Message::Block(message) => self.blocks.get_updates(message, &self.accounts_data_slice), + Message::BlockMeta(message) => self.blocks_meta.get_updates(message), } } - pub fn get_update<'a>( - &'a self, - message: &'a Message, - commitment: Option, - ) -> Box + Send + 'a> { - Box::new( - self.get_filters(message, commitment) - .filter_map(|(filters, message)| { - if filters.is_empty() { - None - } else { - Some(SubscribeUpdate { - filters: filters - .iter() - .map(|name| name.as_ref().to_string()) - .collect(), - update_oneof: Some(message.as_proto(&self.accounts_data_slice)), - }) - } - }), - ) - } - - pub fn get_pong_msg(&self) -> Option { - self.ping.map(|id| SubscribeUpdate { - filters: vec![], - update_oneof: Some(UpdateOneof::Pong(SubscribeUpdatePong { id })), - }) + pub fn get_pong_msg(&self) -> Option { + self.ping + .map(|id| FilteredUpdate::new_empty(FilteredUpdateOneof::pong(id))) } } @@ -456,19 +324,21 @@ impl FilterAccounts { Ok(required) } - fn get_filters<'a>( - &'a self, - message: &'a MessageAccount, - ) -> Box, FilteredMessage<'a>)> + Send + 'a> { + fn get_updates( + &self, + message: &MessageAccount, + accounts_data_slice: &FilterAccountsDataSlice, + ) -> FilteredUpdates { let mut filter = FilterAccountsMatch::new(self); filter.match_txn_signature(&message.account.txn_signature); filter.match_account(&message.account.pubkey); filter.match_owner(&message.account.owner); filter.match_data_lamports(&message.account.data, message.account.lamports); - Box::new(std::iter::once(( - filter.get_filters(), - FilteredMessage::Account(message), - ))) + let filters = filter.get_filters(); + filtered_updates_once_owned!( + filters, + FilteredUpdateOneof::account(message, accounts_data_slice.clone()) + ) } } @@ -675,7 +545,7 @@ impl<'a> FilterAccountsMatch<'a> { } } - fn get_filters(&self) -> Vec { + fn get_filters(&self) -> FilteredUpdateFilters { self.filter .filters .iter() @@ -743,24 +613,23 @@ impl FilterSlots { }) } - fn get_filters<'a>( - &'a self, - message: &'a MessageSlot, + fn get_updates( + &self, + message: &MessageSlot, commitment: Option, - ) -> Box, FilteredMessage<'a>)> + Send + 'a> { - Box::new(std::iter::once(( - self.filters - .iter() - .filter_map(|(name, inner)| { - if !inner.filter_by_commitment || commitment == Some(message.status) { - Some(name.clone()) - } else { - None - } - }) - .collect(), - FilteredMessage::Slot(message), - ))) + ) -> FilteredUpdates { + let filters = self + .filters + .iter() + .filter_map(|(name, inner)| { + if !inner.filter_by_commitment || commitment == Some(message.status) { + Some(name.clone()) + } else { + None + } + }) + .collect::(); + filtered_updates_once_owned!(filters, FilteredUpdateOneof::slot(*message)) } } @@ -851,10 +720,7 @@ impl FilterTransactions { }) } - fn get_filters<'a>( - &'a self, - message: &'a MessageTransaction, - ) -> Box, FilteredMessage<'a>)> + Send + 'a> { + pub fn get_updates(&self, message: &MessageTransaction) -> FilteredUpdates { let filters = self .filters .iter() @@ -908,14 +774,17 @@ impl FilterTransactions { Some(name.clone()) }) - .collect(); - let message = match self.filter_type { - FilterTransactionsType::Transaction => FilteredMessage::Transaction(message), - FilterTransactionsType::TransactionStatus => { - FilteredMessage::TransactionStatus(message) + .collect::(); + + filtered_updates_once_owned!( + filters, + match self.filter_type { + FilterTransactionsType::Transaction => FilteredUpdateOneof::transaction(message), + FilterTransactionsType::TransactionStatus => { + FilteredUpdateOneof::transaction_status(message) + } } - }; - Box::new(std::iter::once((filters, message))) + ) } } @@ -940,14 +809,9 @@ impl FilterEntries { }) } - fn get_filters<'a>( - &'a self, - message: &'a MessageEntry, - ) -> Box, FilteredMessage<'a>)> + Send + 'a> { - Box::new(std::iter::once(( - self.filters.clone(), - FilteredMessage::Entry(message), - ))) + fn get_updates(&self, message: &Arc) -> FilteredUpdates { + let filters = self.filters.as_slice(); + filtered_updates_once_ref!(filters, FilteredUpdateOneof::entry(Arc::clone(message))) } } @@ -1008,11 +872,13 @@ impl FilterBlocks { Ok(this) } - fn get_filters<'a>( - &'a self, - message: &'a MessageBlock, - ) -> Box, FilteredMessage<'a>)> + Send + 'a> { - Box::new(self.filters.iter().map(move |(filter, inner)| { + fn get_updates( + &self, + message: &Arc, + accounts_data_slice: &FilterAccountsDataSlice, + ) -> FilteredUpdates { + let mut updates = FilteredUpdates::new(); + for (filter, inner) in self.filters.iter() { #[allow(clippy::unnecessary_filter_map)] let transactions = if matches!(inner.include_transactions, None | Some(true)) { message @@ -1061,17 +927,21 @@ impl FilterBlocks { vec![] }; - ( - vec![filter.clone()], - FilteredMessage::Block(MessageBlock { + let mut filters = FilteredUpdateFilters::new(); + filters.push(filter.clone()); + updates.push(FilteredUpdate::new( + filters, + FilteredUpdateOneof::block(Box::new(FilteredUpdateBlock { meta: Arc::clone(&message.meta), transactions, updated_account_count: message.updated_account_count, + accounts_data_slice: accounts_data_slice.clone(), accounts, entries, - }), - ) - })) + })), + )); + } + updates } } @@ -1096,14 +966,12 @@ impl FilterBlocksMeta { }) } - fn get_filters<'a>( - &'a self, - message: &'a MessageBlockMeta, - ) -> Box, FilteredMessage<'a>)> + Send + 'a> { - Box::new(std::iter::once(( - self.filters.clone(), - FilteredMessage::BlockMeta(message), - ))) + fn get_updates(&self, message: &Arc) -> FilteredUpdates { + let filters = self.filters.as_slice(); + filtered_updates_once_ref!( + filters, + FilteredUpdateOneof::block_meta(Arc::clone(message)) + ) } } @@ -1118,7 +986,7 @@ impl AsRef<[Range]> for FilterAccountsDataSlice { } impl FilterAccountsDataSlice { - fn new(slices: &[SubscribeRequestAccountsDataSlice], limits: usize) -> FilterResult { + pub fn new(slices: &[SubscribeRequestAccountsDataSlice], limits: usize) -> FilterResult { FilterLimits::check_max(slices.len(), limits)?; let slices = slices @@ -1145,14 +1013,32 @@ impl FilterAccountsDataSlice { } } - Ok(Self(Arc::new(slices))) + Ok(Self::new_unchecked(Arc::new(slices))) + } + + pub fn new_unchecked(slices: Arc>>) -> Self { + Self(slices) + } + + pub fn apply(&self, source: &[u8]) -> Vec { + if self.0.is_empty() { + source.to_vec() + } else { + let mut data = Vec::with_capacity(self.0.iter().map(|ds| ds.end - ds.start).sum()); + for data_slice in self.0.iter() { + if source.len() >= data_slice.end { + data.extend_from_slice(&source[data_slice.start..data_slice.end]); + } + } + data + } } } #[cfg(test)] mod tests { use { - super::{Filter, FilteredMessage}, + super::Filter, crate::{ convert_to, geyser::{ @@ -1162,6 +1048,7 @@ mod tests { plugin::{ filter::{ limits::FilterLimits, + message::{FilteredUpdateFilters, FilteredUpdateOneof}, name::{FilterName, FilterNames}, }, message::{Message, MessageTransaction, MessageTransactionInfo}, @@ -1375,10 +1262,10 @@ mod tests { }, ); - let config = SubscribeRequest { + let mut config = SubscribeRequest { accounts: HashMap::new(), slots: HashMap::new(), - transactions, + transactions: transactions.clone(), transactions_status: HashMap::new(), blocks: HashMap::new(), blocks_meta: HashMap::new(), @@ -1393,14 +1280,28 @@ mod tests { let message_transaction = create_message_transaction(&keypair_b, vec![account_key_b, account_key_a]); let message = Message::Transaction(message_transaction); - let updates = filter.get_filters(&message, None).collect::>(); + let updates = filter.get_updates(&message, None); + assert_eq!(updates.len(), 1); + assert_eq!( + updates[0].filters, + FilteredUpdateFilters::from_vec(vec![FilterName::new("serum")]) + ); + assert!(matches!( + updates[0].message, + FilteredUpdateOneof::Transaction(_) + )); + + config.transactions_status = transactions; + let filter = Filter::new(&config, &limit, &mut create_filter_names()).unwrap(); + let updates = filter.get_updates(&message, None); assert_eq!(updates.len(), 2); - assert_eq!(updates[0].0, vec![FilterName::new("serum")]); - assert!(matches!(updates[0].1, FilteredMessage::Transaction(_))); - assert_eq!(updates[1].0, Vec::::new()); + assert_eq!( + updates[1].filters, + FilteredUpdateFilters::from_vec(vec![FilterName::new("serum")]) + ); assert!(matches!( - updates[1].1, - FilteredMessage::TransactionStatus(_) + updates[1].message, + FilteredUpdateOneof::TransactionStatus(_) )); } @@ -1425,10 +1326,10 @@ mod tests { }, ); - let config = SubscribeRequest { + let mut config = SubscribeRequest { accounts: HashMap::new(), slots: HashMap::new(), - transactions, + transactions: transactions.clone(), transactions_status: HashMap::new(), blocks: HashMap::new(), blocks_meta: HashMap::new(), @@ -1443,14 +1344,28 @@ mod tests { let message_transaction = create_message_transaction(&keypair_b, vec![account_key_b, account_key_a]); let message = Message::Transaction(message_transaction); - let updates = filter.get_filters(&message, None).collect::>(); + let updates = filter.get_updates(&message, None); + assert_eq!(updates.len(), 1); + assert_eq!( + updates[0].filters, + FilteredUpdateFilters::from_vec(vec![FilterName::new("serum")]) + ); + assert!(matches!( + updates[0].message, + FilteredUpdateOneof::Transaction(_) + )); + + config.transactions_status = transactions; + let filter = Filter::new(&config, &limit, &mut create_filter_names()).unwrap(); + let updates = filter.get_updates(&message, None); assert_eq!(updates.len(), 2); - assert_eq!(updates[0].0, vec![FilterName::new("serum")]); - assert!(matches!(updates[0].1, FilteredMessage::Transaction(_))); - assert_eq!(updates[1].0, Vec::::new()); + assert_eq!( + updates[1].filters, + FilteredUpdateFilters::from_vec(vec![FilterName::new("serum")]) + ); assert!(matches!( - updates[1].1, - FilteredMessage::TransactionStatus(_) + updates[1].message, + FilteredUpdateOneof::TransactionStatus(_) )); } @@ -1493,8 +1408,8 @@ mod tests { let message_transaction = create_message_transaction(&keypair_b, vec![account_key_b, account_key_a]); let message = Message::Transaction(message_transaction); - for (filters, _message) in filter.get_filters(&message, None) { - assert!(filters.is_empty()); + for message in filter.get_updates(&message, None) { + assert!(message.filters.is_empty()); } } @@ -1525,10 +1440,10 @@ mod tests { }, ); - let config = SubscribeRequest { + let mut config = SubscribeRequest { accounts: HashMap::new(), slots: HashMap::new(), - transactions, + transactions: transactions.clone(), transactions_status: HashMap::new(), blocks: HashMap::new(), blocks_meta: HashMap::new(), @@ -1545,14 +1460,28 @@ mod tests { vec![account_key_x, account_key_y, account_key_z], ); let message = Message::Transaction(message_transaction); - let updates = filter.get_filters(&message, None).collect::>(); + let updates = filter.get_updates(&message, None); + assert_eq!(updates.len(), 1); + assert_eq!( + updates[0].filters, + FilteredUpdateFilters::from_vec(vec![FilterName::new("serum")]) + ); + assert!(matches!( + updates[0].message, + FilteredUpdateOneof::Transaction(_) + )); + + config.transactions_status = transactions; + let filter = Filter::new(&config, &limit, &mut create_filter_names()).unwrap(); + let updates = filter.get_updates(&message, None); assert_eq!(updates.len(), 2); - assert_eq!(updates[0].0, vec![FilterName::new("serum")]); - assert!(matches!(updates[0].1, FilteredMessage::Transaction(_))); - assert_eq!(updates[1].0, Vec::::new()); + assert_eq!( + updates[1].filters, + FilteredUpdateFilters::from_vec(vec![FilterName::new("serum")]) + ); assert!(matches!( - updates[1].1, - FilteredMessage::TransactionStatus(_) + updates[1].message, + FilteredUpdateOneof::TransactionStatus(_) )); } @@ -1601,8 +1530,8 @@ mod tests { let message_transaction = create_message_transaction(&keypair_x, vec![account_key_x, account_key_z]); let message = Message::Transaction(message_transaction); - for (filters, _message) in filter.get_filters(&message, None) { - assert!(filters.is_empty()); + for message in filter.get_updates(&message, None) { + assert!(message.filters.is_empty()); } } } diff --git a/yellowstone-grpc-proto/src/plugin/filter/message.rs b/yellowstone-grpc-proto/src/plugin/filter/message.rs new file mode 100644 index 00000000..46fad082 --- /dev/null +++ b/yellowstone-grpc-proto/src/plugin/filter/message.rs @@ -0,0 +1,625 @@ +use { + crate::{ + geyser::{ + subscribe_update::UpdateOneof, SubscribeUpdate, SubscribeUpdateAccount, + SubscribeUpdateAccountInfo, SubscribeUpdateBlock, SubscribeUpdateEntry, + SubscribeUpdatePing, SubscribeUpdatePong, SubscribeUpdateSlot, + SubscribeUpdateTransaction, SubscribeUpdateTransactionInfo, + SubscribeUpdateTransactionStatus, + }, + plugin::{ + filter::{name::FilterName, FilterAccountsDataSlice}, + message::{ + MessageAccount, MessageAccountInfo, MessageBlockMeta, MessageEntry, MessageSlot, + MessageTransaction, MessageTransactionInfo, + }, + }, + }, + bytes::buf::{Buf, BufMut}, + prost::{ + encoding::{DecodeContext, WireType}, + DecodeError, + }, + smallvec::SmallVec, + std::sync::Arc, +}; + +pub type FilteredUpdates = SmallVec<[FilteredUpdate; 2]>; + +#[derive(Debug, Clone)] +pub struct FilteredUpdate { + pub filters: FilteredUpdateFilters, + pub message: FilteredUpdateOneof, +} + +impl prost::Message for FilteredUpdate { + fn encode_raw(&self, buf: &mut impl BufMut) { + self.as_subscribe_update().encode_raw(buf) + } + + fn encoded_len(&self) -> usize { + self.as_subscribe_update().encoded_len() + } + + fn merge_field( + &mut self, + _tag: u32, + _wire_type: WireType, + _buf: &mut impl Buf, + _ctx: DecodeContext, + ) -> Result<(), DecodeError> { + unimplemented!() + } + + fn clear(&mut self) { + unimplemented!() + } +} + +impl FilteredUpdate { + pub fn new(filters: FilteredUpdateFilters, message: FilteredUpdateOneof) -> Self { + Self { filters, message } + } + + pub fn new_empty(message: FilteredUpdateOneof) -> Self { + Self::new(FilteredUpdateFilters::new(), message) + } + + fn as_subscribe_update_account( + message: &MessageAccountInfo, + data_slice: &FilterAccountsDataSlice, + ) -> SubscribeUpdateAccountInfo { + SubscribeUpdateAccountInfo { + pubkey: message.pubkey.as_ref().into(), + lamports: message.lamports, + owner: message.owner.as_ref().into(), + executable: message.executable, + rent_epoch: message.rent_epoch, + data: data_slice.apply(&message.data), + write_version: message.write_version, + txn_signature: message.txn_signature.map(|s| s.as_ref().into()), + } + } + + fn as_subscribe_update_transaction( + message: &MessageTransactionInfo, + ) -> SubscribeUpdateTransactionInfo { + SubscribeUpdateTransactionInfo { + signature: message.signature.as_ref().into(), + is_vote: message.is_vote, + transaction: Some(message.transaction.clone()), + meta: Some(message.meta.clone()), + index: message.index as u64, + } + } + + fn as_subscribe_update_entry(message: &MessageEntry) -> SubscribeUpdateEntry { + SubscribeUpdateEntry { + slot: message.slot, + index: message.index as u64, + num_hashes: message.num_hashes, + hash: message.hash.to_bytes().to_vec(), + executed_transaction_count: message.executed_transaction_count, + starting_transaction_index: message.starting_transaction_index, + } + } + + pub fn as_subscribe_update(&self) -> SubscribeUpdate { + let message = match &self.message { + FilteredUpdateOneof::Account(msg) => UpdateOneof::Account(SubscribeUpdateAccount { + account: Some(Self::as_subscribe_update_account( + msg.account.as_ref(), + &msg.data_slice, + )), + slot: msg.slot, + is_startup: msg.is_startup, + }), + FilteredUpdateOneof::Slot(msg) => UpdateOneof::Slot(SubscribeUpdateSlot { + slot: msg.0.slot, + parent: msg.0.parent, + status: msg.0.status as i32, + }), + FilteredUpdateOneof::Transaction(msg) => { + UpdateOneof::Transaction(SubscribeUpdateTransaction { + transaction: Some(Self::as_subscribe_update_transaction( + msg.transaction.as_ref(), + )), + slot: msg.slot, + }) + } + FilteredUpdateOneof::TransactionStatus(msg) => { + UpdateOneof::TransactionStatus(SubscribeUpdateTransactionStatus { + slot: msg.slot, + signature: msg.transaction.signature.as_ref().into(), + is_vote: msg.transaction.is_vote, + index: msg.transaction.index as u64, + err: msg.transaction.meta.err.clone(), + }) + } + FilteredUpdateOneof::Block(msg) => UpdateOneof::Block(SubscribeUpdateBlock { + slot: msg.meta.slot, + blockhash: msg.meta.blockhash.clone(), + rewards: msg.meta.rewards.clone(), + block_time: msg.meta.block_time, + block_height: msg.meta.block_height, + parent_slot: msg.meta.parent_slot, + parent_blockhash: msg.meta.parent_blockhash.clone(), + executed_transaction_count: msg.meta.executed_transaction_count, + transactions: msg + .transactions + .iter() + .map(|tx| Self::as_subscribe_update_transaction(tx.as_ref())) + .collect(), + updated_account_count: msg.updated_account_count, + accounts: msg + .accounts + .iter() + .map(|acc| { + Self::as_subscribe_update_account(acc.as_ref(), &msg.accounts_data_slice) + }) + .collect(), + entries_count: msg.meta.entries_count, + entries: msg + .entries + .iter() + .map(|entry| Self::as_subscribe_update_entry(entry.as_ref())) + .collect(), + }), + FilteredUpdateOneof::Ping => UpdateOneof::Ping(SubscribeUpdatePing {}), + FilteredUpdateOneof::Pong(msg) => UpdateOneof::Pong(*msg), + FilteredUpdateOneof::BlockMeta(msg) => UpdateOneof::BlockMeta(msg.0.as_ref().0.clone()), + FilteredUpdateOneof::Entry(msg) => { + UpdateOneof::Entry(Self::as_subscribe_update_entry(&msg.0)) + } + }; + + SubscribeUpdate { + filters: self + .filters + .iter() + .map(|name| name.as_ref().to_string()) + .collect(), + update_oneof: Some(message), + } + } +} + +pub type FilteredUpdateFilters = SmallVec<[FilterName; 4]>; + +#[derive(Debug, Clone)] +pub enum FilteredUpdateOneof { + Account(FilteredUpdateAccount), // 2 + Slot(FilteredUpdateSlot), // 3 + Transaction(FilteredUpdateTransaction), // 4 + TransactionStatus(FilteredUpdateTransactionStatus), // 10 + Block(Box), // 5 + Ping, // 6 + Pong(SubscribeUpdatePong), // 9 + BlockMeta(FilteredUpdateBlockMeta), // 7 + Entry(FilteredUpdateEntry), // 8 +} + +impl FilteredUpdateOneof { + pub fn account(message: &MessageAccount, data_slice: FilterAccountsDataSlice) -> Self { + Self::Account(FilteredUpdateAccount { + slot: message.slot, + account: Arc::clone(&message.account), + is_startup: message.is_startup, + data_slice, + }) + } + + pub const fn slot(message: MessageSlot) -> Self { + Self::Slot(FilteredUpdateSlot(message)) + } + + pub fn transaction(message: &MessageTransaction) -> Self { + Self::Transaction(FilteredUpdateTransaction { + transaction: Arc::clone(&message.transaction), + slot: message.slot, + }) + } + + pub fn transaction_status(message: &MessageTransaction) -> Self { + Self::TransactionStatus(FilteredUpdateTransactionStatus { + transaction: Arc::clone(&message.transaction), + slot: message.slot, + }) + } + + pub const fn block(message: Box) -> Self { + Self::Block(message) + } + + pub const fn ping() -> Self { + Self::Ping + } + + pub const fn pong(id: i32) -> Self { + Self::Pong(SubscribeUpdatePong { id }) + } + + pub const fn block_meta(message: Arc) -> Self { + Self::BlockMeta(FilteredUpdateBlockMeta(message)) + } + + pub const fn entry(message: Arc) -> Self { + Self::Entry(FilteredUpdateEntry(message)) + } +} + +#[derive(Debug, Clone)] +pub struct FilteredUpdateAccount { + pub account: Arc, + pub slot: u64, + pub is_startup: bool, + pub data_slice: FilterAccountsDataSlice, +} + +#[derive(Debug, Clone)] +pub struct FilteredUpdateSlot(MessageSlot); + +#[derive(Debug, Clone)] +pub struct FilteredUpdateTransaction { + pub transaction: Arc, + pub slot: u64, +} + +#[derive(Debug, Clone)] +pub struct FilteredUpdateTransactionStatus { + pub transaction: Arc, + pub slot: u64, +} + +#[derive(Debug, Clone)] +pub struct FilteredUpdateBlock { + pub meta: Arc, + pub transactions: Vec>, + pub updated_account_count: u64, + pub accounts: Vec>, + pub accounts_data_slice: FilterAccountsDataSlice, + pub entries: Vec>, +} + +#[derive(Debug, Clone)] +pub struct FilteredUpdateBlockMeta(Arc); + +#[derive(Debug, Clone)] +pub struct FilteredUpdateEntry(Arc); + +#[cfg(test)] +pub mod tests { + use { + super::{FilteredUpdate, FilteredUpdateBlock, FilteredUpdateFilters, FilteredUpdateOneof}, + crate::{ + convert_to, + geyser::{SubscribeUpdate, SubscribeUpdateBlockMeta}, + plugin::{ + filter::{name::FilterName, FilterAccountsDataSlice}, + message::{ + CommitmentLevel, MessageAccount, MessageAccountInfo, MessageBlockMeta, + MessageEntry, MessageSlot, MessageTransaction, MessageTransactionInfo, + }, + }, + }, + prost::Message as _, + prost_011::Message as _, + solana_sdk::{ + hash::Hash, + message::SimpleAddressLoader, + pubkey::Pubkey, + signature::Signature, + transaction::{MessageHash, SanitizedTransaction}, + }, + solana_storage_proto::convert::generated, + solana_transaction_status::{ConfirmedBlock, TransactionWithStatusMeta}, + std::{ + collections::{HashMap, HashSet}, + fs, + ops::Range, + str::FromStr, + sync::Arc, + }, + }; + + pub fn create_message_filters(names: &[&str]) -> FilteredUpdateFilters { + let mut filters = FilteredUpdateFilters::new(); + for name in names { + filters.push(FilterName::new(*name)); + } + filters + } + + pub fn create_account_data_slice() -> Vec { + [ + vec![], + vec![Range { start: 0, end: 0 }], + vec![Range { start: 2, end: 3 }], + vec![Range { start: 1, end: 3 }, Range { start: 5, end: 10 }], + ] + .into_iter() + .map(Arc::new) + .map(FilterAccountsDataSlice::new_unchecked) + .collect() + } + + pub fn create_accounts_raw() -> Vec> { + let pubkey = Pubkey::from_str("28Dncoh8nmzXYEGLUcBA5SUw5WDwDBn15uUCwrWBbyuu").unwrap(); + let owner = Pubkey::from_str("5jrPJWVGrFvQ2V9wRZC3kHEZhxo9pmMir15x73oHT6mn").unwrap(); + let txn_signature = Signature::from_str("4V36qYhukXcLFuvhZaudSoJpPaFNB7d5RqYKjL2xiSKrxaBfEajqqL4X6viZkEvHJ8XcTJsqVjZxFegxhN7EC9V5").unwrap(); + + let mut accounts = vec![]; + for lamports in [0, 8123] { + for executable in [true, false] { + for rent_epoch in [0, 4242] { + for data in [ + vec![], + [42; 165].to_vec(), + [42; 1024].to_vec(), + [42; 2 * 1024 * 1024].to_vec(), + ] { + for write_version in [0, 1] { + for txn_signature in [None, Some(txn_signature)] { + accounts.push(Arc::new(MessageAccountInfo { + pubkey, + lamports, + owner, + executable, + rent_epoch, + data: data.clone(), + write_version, + txn_signature, + })); + } + } + } + } + } + } + accounts + } + + pub fn create_accounts() -> Vec<(MessageAccount, FilterAccountsDataSlice)> { + let mut vec = vec![]; + for account in create_accounts_raw() { + for slot in [0, 42] { + for is_startup in [true, false] { + for data_slice in create_account_data_slice() { + let msg = MessageAccount { + account: Arc::clone(&account), + slot, + is_startup, + }; + vec.push((msg, data_slice)); + } + } + } + } + vec + } + + pub fn create_entries() -> Vec> { + [ + MessageEntry { + slot: 299888121, + index: 42, + num_hashes: 128, + hash: Hash::new_from_array([98; 32]), + executed_transaction_count: 32, + starting_transaction_index: 1000, + }, + MessageEntry { + slot: 299888121, + index: 0, + num_hashes: 16, + hash: Hash::new_from_array([42; 32]), + executed_transaction_count: 32, + starting_transaction_index: 1000, + }, + ] + .into_iter() + .map(Arc::new) + .collect() + } + + pub fn load_predefined() -> Vec { + fs::read_dir("./fixtures/blocks") + .expect("failed to read `blocks` dir") + .map(|entry| { + let path = entry.expect("failed to read `blocks` dir entry").path(); + let data = fs::read(path).expect("failed to read block"); + generated::ConfirmedBlock::decode(data.as_slice()) + .expect("failed to decode block") + .try_into() + .expect("failed to convert decoded block") + }) + .collect() + } + + pub fn load_predefined_blockmeta() -> Vec> { + load_predefined_blocks() + .into_iter() + .map(|block| (block.meta.blockhash.clone(), block.meta)) + .collect::>() + .into_values() + .collect() + } + + pub fn load_predefined_transactions() -> Vec> { + load_predefined_blocks() + .into_iter() + .flat_map(|block| block.transactions.into_iter().map(|tx| (tx.signature, tx))) + .collect::>() + .into_values() + .collect() + } + + pub fn load_predefined_blocks() -> Vec { + load_predefined() + .into_iter() + .flat_map(|block| { + let transactions = block + .transactions + .into_iter() + .enumerate() + .map(|(index, tx)| { + let TransactionWithStatusMeta::Complete(tx) = tx else { + panic!("tx with missed meta"); + }; + let transaction = SanitizedTransaction::try_create( + tx.transaction.clone(), + MessageHash::Compute, + None, + SimpleAddressLoader::Disabled, + &HashSet::new(), + ) + .expect("failed to create tx"); + MessageTransactionInfo { + signature: tx.transaction.signatures[0], + is_vote: true, + transaction: convert_to::create_transaction(&transaction), + meta: convert_to::create_transaction_meta(&tx.meta), + index, + account_keys: HashSet::new(), + } + }) + .map(Arc::new) + .collect::>(); + + let entries = create_entries(); + + let slot = block.parent_slot + 1; + let block_meta1 = MessageBlockMeta(SubscribeUpdateBlockMeta { + parent_slot: block.parent_slot, + slot, + parent_blockhash: block.previous_blockhash, + blockhash: block.blockhash, + rewards: Some(convert_to::create_rewards_obj( + &block.rewards, + block.num_partitions, + )), + block_time: block.block_time.map(convert_to::create_timestamp), + block_height: block.block_height.map(convert_to::create_block_height), + executed_transaction_count: transactions.len() as u64, + entries_count: entries.len() as u64, + }); + let mut block_meta2 = block_meta1.clone(); + block_meta2.rewards = + Some(convert_to::create_rewards_obj(&block.rewards, Some(42))); + + let block_meta1 = Arc::new(block_meta1); + let block_meta2 = Arc::new(block_meta2); + + let accounts = create_accounts_raw(); + create_account_data_slice() + .into_iter() + .flat_map(move |data_slice| { + vec![ + FilteredUpdateBlock { + meta: Arc::clone(&block_meta1), + transactions: transactions.clone(), + updated_account_count: accounts.len() as u64, + accounts: accounts.clone(), + accounts_data_slice: data_slice.clone(), + entries: entries.clone(), + }, + FilteredUpdateBlock { + meta: Arc::clone(&block_meta2), + transactions: transactions.clone(), + updated_account_count: accounts.len() as u64, + accounts: accounts.clone(), + accounts_data_slice: data_slice, + entries: entries.clone(), + }, + ] + }) + }) + .collect() + } + + fn encode_decode_cmp(filters: &[&str], message: FilteredUpdateOneof) { + let msg = FilteredUpdate { + filters: create_message_filters(filters), + message, + }; + let update = msg.as_subscribe_update(); + assert_eq!(msg.encoded_len(), update.encoded_len()); + assert_eq!( + SubscribeUpdate::decode(msg.encode_to_vec().as_slice()).expect("failed to decode"), + update + ); + } + + #[test] + fn test_message_account() { + for (msg, data_slice) in create_accounts() { + encode_decode_cmp(&["123"], FilteredUpdateOneof::account(&msg, data_slice)); + } + } + + #[test] + fn test_message_slot() { + for slot in [0, 42] { + for parent in [None, Some(0), Some(42)] { + for status in [ + CommitmentLevel::Processed, + CommitmentLevel::Confirmed, + CommitmentLevel::Finalized, + ] { + encode_decode_cmp( + &["123"], + FilteredUpdateOneof::slot(MessageSlot { + slot, + parent, + status, + }), + ) + } + } + } + } + + #[test] + fn test_message_transaction() { + for transaction in load_predefined_transactions() { + let msg = MessageTransaction { + transaction, + slot: 42, + }; + encode_decode_cmp(&["123"], FilteredUpdateOneof::transaction(&msg)); + encode_decode_cmp(&["123"], FilteredUpdateOneof::transaction_status(&msg)); + } + } + + #[test] + fn test_message_block() { + for block in load_predefined_blocks() { + encode_decode_cmp(&["123"], FilteredUpdateOneof::block(Box::new(block))); + } + } + + #[test] + fn test_message_ping() { + encode_decode_cmp(&["123"], FilteredUpdateOneof::Ping) + } + + #[test] + fn test_message_pong() { + encode_decode_cmp(&["123"], FilteredUpdateOneof::pong(0)); + encode_decode_cmp(&["123"], FilteredUpdateOneof::pong(42)); + } + + #[test] + fn test_message_blockmeta() { + for block_meta in load_predefined_blockmeta() { + encode_decode_cmp(&["123"], FilteredUpdateOneof::block_meta(block_meta)); + } + } + + #[test] + fn test_message_entry() { + for entry in create_entries() { + encode_decode_cmp(&["123"], FilteredUpdateOneof::entry(entry)); + } + } +} diff --git a/yellowstone-grpc-proto/src/plugin/filter/mod.rs b/yellowstone-grpc-proto/src/plugin/filter/mod.rs index e141b432..a3323f2a 100644 --- a/yellowstone-grpc-proto/src/plugin/filter/mod.rs +++ b/yellowstone-grpc-proto/src/plugin/filter/mod.rs @@ -1,6 +1,7 @@ #[allow(clippy::module_inception)] -pub mod filter; +mod filter; pub mod limits; +pub mod message; pub mod name; -pub use {filter::*, limits::*, name::*}; +pub use filter::{Filter, FilterAccountsDataSlice, FilterError, FilterResult}; diff --git a/yellowstone-grpc-proto/src/plugin/filter/name.rs b/yellowstone-grpc-proto/src/plugin/filter/name.rs index 97033ddf..a9f9b846 100644 --- a/yellowstone-grpc-proto/src/plugin/filter/name.rs +++ b/yellowstone-grpc-proto/src/plugin/filter/name.rs @@ -1,6 +1,7 @@ use std::{ borrow::Borrow, collections::HashSet, + ops::Deref, sync::Arc, time::{Duration, Instant}, }; @@ -23,6 +24,14 @@ impl AsRef for FilterName { } } +impl Deref for FilterName { + type Target = str; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + impl Borrow for FilterName { #[inline] fn borrow(&self) -> &str { diff --git a/yellowstone-grpc-proto/src/plugin/message.rs b/yellowstone-grpc-proto/src/plugin/message.rs index 557e64ab..d019453c 100644 --- a/yellowstone-grpc-proto/src/plugin/message.rs +++ b/yellowstone-grpc-proto/src/plugin/message.rs @@ -1,6 +1,7 @@ use { crate::{ - convert_to, geyser::CommitmentLevel as CommitmentLevelProto, + convert_to, + geyser::{CommitmentLevel as CommitmentLevelProto, SubscribeUpdateBlockMeta}, solana::storage::confirmed_block, }, agave_geyser_plugin_interface::geyser_plugin_interface::{ @@ -8,7 +9,11 @@ use { SlotStatus, }, solana_sdk::{clock::Slot, hash::Hash, pubkey::Pubkey, signature::Signature}, - std::{collections::HashSet, sync::Arc}, + std::{ + collections::HashSet, + ops::{Deref, DerefMut}, + sync::Arc, + }, }; #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] @@ -182,34 +187,38 @@ impl MessageEntry { } #[derive(Debug, Clone)] -pub struct MessageBlockMeta { - pub parent_slot: u64, - pub slot: u64, - pub parent_blockhash: String, - pub blockhash: String, - pub rewards: confirmed_block::Rewards, - pub block_time: Option, - pub block_height: Option, - pub executed_transaction_count: u64, - pub entries_count: u64, +pub struct MessageBlockMeta(pub SubscribeUpdateBlockMeta); + +impl Deref for MessageBlockMeta { + type Target = SubscribeUpdateBlockMeta; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl DerefMut for MessageBlockMeta { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } } impl MessageBlockMeta { pub fn from_geyser(info: &ReplicaBlockInfoV4<'_>) -> Self { - Self { + Self(SubscribeUpdateBlockMeta { parent_slot: info.parent_slot, slot: info.slot, parent_blockhash: info.parent_blockhash.to_string(), blockhash: info.blockhash.to_string(), - rewards: convert_to::create_rewards_obj( + rewards: Some(convert_to::create_rewards_obj( &info.rewards.rewards, info.rewards.num_partitions, - ), + )), block_time: info.block_time.map(convert_to::create_timestamp), block_height: info.block_height.map(convert_to::create_block_height), executed_transaction_count: info.executed_transaction_count, entries_count: info.entry_count, - } + }) } } diff --git a/yellowstone-grpc-proto/src/plugin/mod.rs b/yellowstone-grpc-proto/src/plugin/mod.rs index 39a8a487..7cac8b9b 100644 --- a/yellowstone-grpc-proto/src/plugin/mod.rs +++ b/yellowstone-grpc-proto/src/plugin/mod.rs @@ -1,2 +1,8 @@ pub mod filter; pub mod message; + +pub mod proto { + #![allow(clippy::clone_on_ref_ptr)] + #![allow(clippy::missing_const_for_fn)] + tonic::include_proto!("geyser.Geyser"); +}