diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 09f11a1a..9ae6dbe3 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -96,6 +96,8 @@ jobs: run: cargo check -p yellowstone-grpc-geyser --all-targets --tests - name: check features in `proto` run: cargo check -p yellowstone-grpc-proto --all-targets --tests + - name: check features in `proto` + run: cargo check -p yellowstone-grpc-proto --all-targets --tests --all-features - name: Build run: ./ci/cargo-build-test.sh diff --git a/CHANGELOG.md b/CHANGELOG.md index 2ffcbf88..06e1100b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,6 +27,7 @@ The minor version will be incremented upon a breaking change and the patch versi - examples: add progress bar to client tool ([#456](https://github.com/rpcpool/yellowstone-grpc/pull/456)) - proto: change error type in mod `convert_from` ([#457](https://github.com/rpcpool/yellowstone-grpc/pull/457)) - proto: add mod `plugin` with `FilterNames` cache ([#458](https://github.com/rpcpool/yellowstone-grpc/pull/458)) +- proto: move enum Message from geyser crate ([#459](https://github.com/rpcpool/yellowstone-grpc/pull/459)) ### Breaking diff --git a/Cargo.lock b/Cargo.lock index 5410f80d..7e1ca08f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4588,6 +4588,7 @@ dependencies = [ name = "yellowstone-grpc-proto" version = "2.0.0" dependencies = [ + "agave-geyser-plugin-interface", "anyhow", "bincode", "prost", diff --git a/yellowstone-grpc-geyser/config.json b/yellowstone-grpc-geyser/config.json index 7a23fa0e..2e15d719 100644 --- a/yellowstone-grpc-geyser/config.json +++ b/yellowstone-grpc-geyser/config.json @@ -70,7 +70,7 @@ "blocks_meta": { "max": 1 }, - "entry": { + "entries": { "max": 1 } } diff --git a/yellowstone-grpc-geyser/src/config.rs b/yellowstone-grpc-geyser/src/config.rs index a84d5d59..31024b32 100644 --- a/yellowstone-grpc-geyser/src/config.rs +++ b/yellowstone-grpc-geyser/src/config.rs @@ -222,7 +222,7 @@ pub struct ConfigGrpcFilters { pub transactions_status: ConfigGrpcFiltersTransactions, pub blocks: ConfigGrpcFiltersBlocks, pub blocks_meta: ConfigGrpcFiltersBlocksMeta, - pub entry: ConfigGrpcFiltersEntry, + pub entries: ConfigGrpcFiltersEntries, } impl ConfigGrpcFilters { @@ -376,12 +376,12 @@ impl Default for ConfigGrpcFiltersBlocksMeta { #[derive(Debug, Clone, Deserialize)] #[serde(default, deny_unknown_fields)] -pub struct ConfigGrpcFiltersEntry { +pub struct ConfigGrpcFiltersEntries { #[serde(deserialize_with = "deserialize_usize_str")] pub max: usize, } -impl Default for ConfigGrpcFiltersEntry { +impl Default for ConfigGrpcFiltersEntries { fn default() -> Self { Self { max: usize::MAX } } diff --git a/yellowstone-grpc-geyser/src/filters.rs b/yellowstone-grpc-geyser/src/filters.rs index f6d361c3..dd74b333 100644 --- a/yellowstone-grpc-geyser/src/filters.rs +++ b/yellowstone-grpc-geyser/src/filters.rs @@ -1,14 +1,8 @@ use { - crate::{ - config::{ - ConfigGrpcFilters, ConfigGrpcFiltersAccounts, ConfigGrpcFiltersBlocks, - ConfigGrpcFiltersBlocksMeta, ConfigGrpcFiltersEntry, ConfigGrpcFiltersSlots, - ConfigGrpcFiltersTransactions, - }, - message::{ - Message, MessageAccount, MessageAccountInfo, MessageBlock, MessageBlockMeta, - MessageEntry, MessageSlot, MessageTransaction, MessageTransactionInfo, - }, + crate::config::{ + ConfigGrpcFilters, ConfigGrpcFiltersAccounts, ConfigGrpcFiltersBlocks, + ConfigGrpcFiltersBlocksMeta, ConfigGrpcFiltersEntries, ConfigGrpcFiltersSlots, + ConfigGrpcFiltersTransactions, }, base64::{engine::general_purpose::STANDARD as base64_engine, Engine}, solana_sdk::{pubkey::Pubkey, signature::Signature}, @@ -21,13 +15,20 @@ use { }, yellowstone_grpc_proto::{ convert_to, - plugin::filter::{FilterAccountsDataSlice, FilterName, FilterNames}, + plugin::{ + filter::{FilterAccountsDataSlice, FilterName, FilterNames}, + message::{ + CommitmentLevel, Message, MessageAccount, MessageAccountInfo, MessageBlock, + MessageBlockMeta, MessageEntry, MessageSlot, MessageTransaction, + MessageTransactionInfo, + }, + }, prelude::{ subscribe_request_filter_accounts_filter::Filter as AccountsFilterDataOneof, subscribe_request_filter_accounts_filter_lamports::Cmp as AccountsFilterLamports, subscribe_request_filter_accounts_filter_memcmp::Data as AccountsFilterMemcmpOneof, - subscribe_update::UpdateOneof, CommitmentLevel, SubscribeRequest, - SubscribeRequestAccountsDataSlice, SubscribeRequestFilterAccounts, + subscribe_update::UpdateOneof, CommitmentLevel as CommitmentLevelProto, + SubscribeRequest, SubscribeRequestAccountsDataSlice, SubscribeRequestFilterAccounts, SubscribeRequestFilterAccountsFilter, SubscribeRequestFilterAccountsFilterLamports, SubscribeRequestFilterBlocks, SubscribeRequestFilterBlocksMeta, SubscribeRequestFilterEntry, SubscribeRequestFilterSlots, @@ -193,7 +194,7 @@ pub struct Filter { slots: FilterSlots, transactions: FilterTransactions, transactions_status: FilterTransactions, - entry: FilterEntry, + entries: FilterEntries, blocks: FilterBlocks, blocks_meta: FilterBlocksMeta, commitment: CommitmentLevel, @@ -214,7 +215,7 @@ impl Default for Filter { filter_type: FilterTransactionsType::TransactionStatus, filters: HashMap::new(), }, - entry: FilterEntry::default(), + entries: FilterEntries::default(), blocks: FilterBlocks::default(), blocks_meta: FilterBlocksMeta::default(), commitment: CommitmentLevel::Processed, @@ -245,7 +246,7 @@ impl Filter { FilterTransactionsType::TransactionStatus, names, )?, - entry: FilterEntry::new(&config.entry, &limit.entry, names)?, + entries: FilterEntries::new(&config.entry, &limit.entries, names)?, blocks: FilterBlocks::new(&config.blocks, &limit.blocks, names)?, blocks_meta: FilterBlocksMeta::new(&config.blocks_meta, &limit.blocks_meta, names)?, commitment: Self::decode_commitment(config.commitment)?, @@ -258,10 +259,12 @@ impl Filter { } fn decode_commitment(commitment: Option) -> anyhow::Result { - let commitment = commitment.unwrap_or(CommitmentLevel::Processed as i32); - CommitmentLevel::try_from(commitment).map_err(|_error| { - anyhow::anyhow!("failed to create CommitmentLevel from {commitment:?}") - }) + let commitment = commitment.unwrap_or(CommitmentLevelProto::Processed as i32); + CommitmentLevelProto::try_from(commitment) + .map(Into::into) + .map_err(|_error| { + anyhow::anyhow!("failed to create CommitmentLevel from {commitment:?}") + }) } fn decode_pubkeys<'a>( @@ -296,7 +299,7 @@ impl Filter { "transactions_status", self.transactions_status.filters.len(), ), - ("entry", self.entry.filters.len()), + ("entries", self.entries.filters.len()), ("blocks", self.blocks.filters.len()), ("blocks_meta", self.blocks_meta.filters.len()), ( @@ -305,7 +308,7 @@ impl Filter { + self.slots.filters.len() + self.transactions.filters.len() + self.transactions_status.filters.len() - + self.entry.filters.len() + + self.entries.filters.len() + self.blocks.filters.len() + self.blocks_meta.filters.len(), ), @@ -329,7 +332,7 @@ impl Filter { .get_filters(message) .chain(self.transactions_status.get_filters(message)), ), - Message::Entry(message) => self.entry.get_filters(message), + Message::Entry(message) => self.entries.get_filters(message), Message::Block(message) => self.blocks.get_filters(message), Message::BlockMeta(message) => self.blocks_meta.get_filters(message), } @@ -912,14 +915,14 @@ impl FilterTransactions { } #[derive(Debug, Default, Clone)] -struct FilterEntry { +struct FilterEntries { filters: Vec, } -impl FilterEntry { +impl FilterEntries { fn new( configs: &HashMap, - limit: &ConfigGrpcFiltersEntry, + limit: &ConfigGrpcFiltersEntries, names: &mut FilterNames, ) -> anyhow::Result { ConfigGrpcFilters::check_max(configs.len(), limit.max)?; @@ -1143,11 +1146,7 @@ pub fn parse_accounts_data_slice_create( mod tests { use { super::{FilterName, FilterNames, FilteredMessage}, - crate::{ - config::ConfigGrpcFilters, - filters::Filter, - message::{Message, MessageTransaction, MessageTransactionInfo}, - }, + crate::{config::ConfigGrpcFilters, filters::Filter}, solana_sdk::{ hash::Hash, message::{v0::LoadedAddresses, Message as SolMessage, MessageHeader}, @@ -1157,8 +1156,12 @@ mod tests { }, solana_transaction_status::TransactionStatusMeta, std::{collections::HashMap, sync::Arc, time::Duration}, - yellowstone_grpc_proto::geyser::{ - SubscribeRequest, SubscribeRequestFilterAccounts, SubscribeRequestFilterTransactions, + yellowstone_grpc_proto::{ + geyser::{ + SubscribeRequest, SubscribeRequestFilterAccounts, + SubscribeRequestFilterTransactions, + }, + plugin::message::{Message, MessageTransaction, MessageTransactionInfo}, }, }; diff --git a/yellowstone-grpc-geyser/src/grpc.rs b/yellowstone-grpc-geyser/src/grpc.rs index c81613e1..b398e0a1 100644 --- a/yellowstone-grpc-geyser/src/grpc.rs +++ b/yellowstone-grpc-geyser/src/grpc.rs @@ -2,7 +2,6 @@ use { crate::{ config::{ConfigBlockFailAction, ConfigGrpc, ConfigGrpcFilters}, filters::Filter, - message::{Message, MessageBlockMeta, MessageEntry, MessageSlot, MessageTransactionInfo}, metrics::{self, DebugClientMessage}, version::GrpcVersionInfo, }, @@ -37,11 +36,17 @@ use { }, tonic_health::server::health_reporter, yellowstone_grpc_proto::{ - plugin::filter::FilterNames, + plugin::{ + filter::FilterNames, + message::{ + CommitmentLevel, Message, MessageBlockMeta, MessageEntry, MessageSlot, + MessageTransactionInfo, + }, + }, prelude::{ geyser_server::{Geyser, GeyserServer}, subscribe_update::UpdateOneof, - CommitmentLevel, GetBlockHeightRequest, GetBlockHeightResponse, + CommitmentLevel as CommitmentLevelProto, GetBlockHeightRequest, GetBlockHeightResponse, GetLatestBlockhashRequest, GetLatestBlockhashResponse, GetSlotRequest, GetSlotResponse, GetVersionRequest, GetVersionResponse, IsBlockhashValidRequest, IsBlockhashValidResponse, PingRequest, PongResponse, SubscribeRequest, SubscribeUpdate, @@ -156,11 +161,13 @@ impl BlockMetaStorage { } fn parse_commitment(commitment: Option) -> Result { - let commitment = commitment.unwrap_or(CommitmentLevel::Processed as i32); - CommitmentLevel::try_from(commitment).map_err(|_error| { - let msg = format!("failed to create CommitmentLevel from {commitment:?}"); - Status::unknown(msg) - }) + let commitment = commitment.unwrap_or(CommitmentLevelProto::Processed as i32); + CommitmentLevelProto::try_from(commitment) + .map(Into::into) + .map_err(|_error| { + let msg = format!("failed to create CommitmentLevel from {commitment:?}"); + Status::unknown(msg) + }) } async fn get_block( @@ -533,13 +540,21 @@ impl GrpcService { // If we already build Block message, new message will be a problem if slot_messages.sealed && !(matches!(&message, Message::Entry(_)) && slot_messages.entries_count == 0) { - metrics::update_invalid_blocks(format!("unexpected message {}", message.kind())); + let kind = match &message { + Message::Slot(_) => "Slot", + Message::Account(_) => "Account", + Message::Transaction(_) => "Transaction", + Message::Entry(_) => "Entry", + Message::BlockMeta(_) => "BlockMeta", + Message::Block(_) => "Block", + }; + metrics::update_invalid_blocks(format!("unexpected message {kind}")); match block_fail_action { ConfigBlockFailAction::Log => { - error!("unexpected message #{} -- {} (invalid order)", message.get_slot(), message.kind()); + error!("unexpected message #{} -- {kind} (invalid order)", message.get_slot()); } ConfigBlockFailAction::Panic => { - panic!("unexpected message #{} -- {} (invalid order)", message.get_slot(), message.kind()); + panic!("unexpected message #{} -- {kind} (invalid order)", message.get_slot()); } } } diff --git a/yellowstone-grpc-geyser/src/lib.rs b/yellowstone-grpc-geyser/src/lib.rs index c043be3c..f5f4d3f2 100644 --- a/yellowstone-grpc-geyser/src/lib.rs +++ b/yellowstone-grpc-geyser/src/lib.rs @@ -1,7 +1,6 @@ pub mod config; pub mod filters; pub mod grpc; -pub mod message; pub mod metrics; pub mod plugin; pub mod version; diff --git a/yellowstone-grpc-geyser/src/metrics.rs b/yellowstone-grpc-geyser/src/metrics.rs index 447002a7..16f50164 100644 --- a/yellowstone-grpc-geyser/src/metrics.rs +++ b/yellowstone-grpc-geyser/src/metrics.rs @@ -24,7 +24,7 @@ use { sync::{mpsc, oneshot, Notify}, task::JoinHandle, }, - yellowstone_grpc_proto::prelude::CommitmentLevel, + yellowstone_grpc_proto::plugin::message::CommitmentLevel, }; lazy_static::lazy_static! { diff --git a/yellowstone-grpc-geyser/src/plugin.rs b/yellowstone-grpc-geyser/src/plugin.rs index 59f76ea2..6927ec7d 100644 --- a/yellowstone-grpc-geyser/src/plugin.rs +++ b/yellowstone-grpc-geyser/src/plugin.rs @@ -2,7 +2,6 @@ use { crate::{ config::Config, grpc::GrpcService, - message::Message, metrics::{self, PrometheusService}, }, agave_geyser_plugin_interface::geyser_plugin_interface::{ @@ -22,6 +21,7 @@ use { runtime::{Builder, Runtime}, sync::{mpsc, Notify}, }, + yellowstone_grpc_proto::plugin::message::Message, }; #[derive(Debug)] diff --git a/yellowstone-grpc-proto/Cargo.toml b/yellowstone-grpc-proto/Cargo.toml index 04814dc6..7cc95a80 100644 --- a/yellowstone-grpc-proto/Cargo.toml +++ b/yellowstone-grpc-proto/Cargo.toml @@ -11,6 +11,7 @@ keywords = { workspace = true } publish = true [dependencies] +agave-geyser-plugin-interface = { workspace = true, optional = true } bincode = { workspace = true, optional = true } prost = { workspace = true } solana-account-decoder = { workspace = true, optional = true } @@ -26,8 +27,17 @@ tonic-build = { workspace = true } [features] default = ["convert", "tonic-compression"] -convert = ["dep:bincode", "dep:solana-account-decoder", "dep:solana-sdk", "dep:solana-transaction-status"] -plugin = ["dep:thiserror"] +convert = [ + "dep:bincode", + "dep:solana-account-decoder", + "dep:solana-sdk", + "dep:solana-transaction-status" +] +plugin = [ + "convert", + "dep:agave-geyser-plugin-interface", + "dep:thiserror" +] tonic-compression = ["tonic/gzip", "tonic/zstd"] [lints] diff --git a/yellowstone-grpc-geyser/src/message.rs b/yellowstone-grpc-proto/src/plugin/message.rs similarity index 78% rename from yellowstone-grpc-geyser/src/message.rs rename to yellowstone-grpc-proto/src/plugin/message.rs index 6438a2cb..3eaa9cee 100644 --- a/yellowstone-grpc-geyser/src/message.rs +++ b/yellowstone-grpc-proto/src/plugin/message.rs @@ -1,4 +1,5 @@ use { + crate::geyser::CommitmentLevel as CommitmentLevelProto, agave_geyser_plugin_interface::geyser_plugin_interface::{ ReplicaAccountInfoV3, ReplicaBlockInfoV4, ReplicaEntryInfoV2, ReplicaTransactionInfoV2, SlotStatus, @@ -9,9 +10,73 @@ use { }, solana_transaction_status::{Reward, TransactionStatusMeta}, std::sync::Arc, - yellowstone_grpc_proto::prelude::CommitmentLevel, }; +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum CommitmentLevel { + Processed, + Confirmed, + Finalized, +} + +impl From for CommitmentLevel { + fn from(status: SlotStatus) -> Self { + match status { + SlotStatus::Processed => Self::Processed, + SlotStatus::Confirmed => Self::Confirmed, + SlotStatus::Rooted => Self::Finalized, + } + } +} + +impl From for CommitmentLevelProto { + fn from(commitment: CommitmentLevel) -> Self { + match commitment { + CommitmentLevel::Processed => Self::Processed, + CommitmentLevel::Confirmed => Self::Confirmed, + CommitmentLevel::Finalized => Self::Finalized, + } + } +} + +impl From for CommitmentLevel { + fn from(status: CommitmentLevelProto) -> Self { + match status { + CommitmentLevelProto::Processed => Self::Processed, + CommitmentLevelProto::Confirmed => Self::Confirmed, + CommitmentLevelProto::Finalized => Self::Finalized, + } + } +} + +impl PartialEq for CommitmentLevel { + fn eq(&self, other: &CommitmentLevelProto) -> bool { + matches!( + (self, other), + (Self::Processed, CommitmentLevelProto::Processed) + | (Self::Confirmed, CommitmentLevelProto::Confirmed) + | (Self::Finalized, CommitmentLevelProto::Finalized) + ) + } +} + +#[derive(Debug, Clone, Copy)] +pub struct MessageSlot { + pub slot: u64, + pub parent: Option, + pub status: CommitmentLevel, +} + +impl From<(u64, Option, SlotStatus)> for MessageSlot { + fn from((slot, parent, status): (u64, Option, SlotStatus)) -> Self { + Self { + slot, + parent, + status: status.into(), + } + } +} + #[derive(Debug, Clone)] pub struct MessageAccountInfo { pub pubkey: Pubkey, @@ -50,27 +115,6 @@ impl<'a> From<(&'a ReplicaAccountInfoV3<'a>, u64, bool)> for MessageAccount { } } -#[derive(Debug, Clone, Copy)] -pub struct MessageSlot { - pub slot: u64, - pub parent: Option, - pub status: CommitmentLevel, -} - -impl From<(u64, Option, SlotStatus)> for MessageSlot { - fn from((slot, parent, status): (u64, Option, SlotStatus)) -> Self { - Self { - slot, - parent, - status: match status { - SlotStatus::Processed => CommitmentLevel::Processed, - SlotStatus::Confirmed => CommitmentLevel::Confirmed, - SlotStatus::Rooted => CommitmentLevel::Finalized, - }, - } - } -} - #[derive(Debug, Clone)] pub struct MessageTransactionInfo { pub signature: Signature, @@ -117,7 +161,9 @@ impl From<&ReplicaEntryInfoV2<'_>> for MessageEntry { slot: entry.slot, index: entry.index, num_hashes: entry.num_hashes, - hash: entry.hash[0..32].try_into().expect("failed to create hash"), + hash: entry.hash[0..HASH_BYTES] + .try_into() + .expect("failed to create hash"), executed_transaction_count: entry.executed_transaction_count, starting_transaction_index: entry .starting_transaction_index @@ -127,6 +173,37 @@ impl From<&ReplicaEntryInfoV2<'_>> for MessageEntry { } } +#[derive(Debug, Clone)] +pub struct MessageBlockMeta { + pub parent_slot: u64, + pub slot: u64, + pub parent_blockhash: String, + pub blockhash: String, + pub rewards: Vec, + pub num_partitions: Option, + pub block_time: Option, + pub block_height: Option, + pub executed_transaction_count: u64, + pub entries_count: u64, +} + +impl<'a> From<&'a ReplicaBlockInfoV4<'a>> for MessageBlockMeta { + fn from(blockinfo: &'a ReplicaBlockInfoV4<'a>) -> Self { + Self { + parent_slot: blockinfo.parent_slot, + slot: blockinfo.slot, + parent_blockhash: blockinfo.parent_blockhash.to_string(), + blockhash: blockinfo.blockhash.to_string(), + rewards: blockinfo.rewards.rewards.clone(), + num_partitions: blockinfo.rewards.num_partitions, + block_time: blockinfo.block_time, + block_height: blockinfo.block_height, + executed_transaction_count: blockinfo.executed_transaction_count, + entries_count: blockinfo.entry_count, + } + } +} + #[derive(Debug, Clone)] pub struct MessageBlock { pub meta: Arc, @@ -162,45 +239,14 @@ impl } } -#[derive(Debug, Clone)] -pub struct MessageBlockMeta { - pub parent_slot: u64, - pub slot: u64, - pub parent_blockhash: String, - pub blockhash: String, - pub rewards: Vec, - pub num_partitions: Option, - pub block_time: Option, - pub block_height: Option, - pub executed_transaction_count: u64, - pub entries_count: u64, -} - -impl<'a> From<&'a ReplicaBlockInfoV4<'a>> for MessageBlockMeta { - fn from(blockinfo: &'a ReplicaBlockInfoV4<'a>) -> Self { - Self { - parent_slot: blockinfo.parent_slot, - slot: blockinfo.slot, - parent_blockhash: blockinfo.parent_blockhash.to_string(), - blockhash: blockinfo.blockhash.to_string(), - rewards: blockinfo.rewards.rewards.clone(), - num_partitions: blockinfo.rewards.num_partitions, - block_time: blockinfo.block_time, - block_height: blockinfo.block_height, - executed_transaction_count: blockinfo.executed_transaction_count, - entries_count: blockinfo.entry_count, - } - } -} - #[derive(Debug, Clone)] pub enum Message { Slot(MessageSlot), Account(MessageAccount), Transaction(MessageTransaction), Entry(Arc), - Block(Arc), BlockMeta(Arc), + Block(Arc), } impl Message { @@ -210,19 +256,8 @@ impl Message { Self::Account(msg) => msg.slot, Self::Transaction(msg) => msg.slot, Self::Entry(msg) => msg.slot, - Self::Block(msg) => msg.meta.slot, Self::BlockMeta(msg) => msg.slot, - } - } - - pub const fn kind(&self) -> &'static str { - match self { - Self::Slot(_) => "Slot", - Self::Account(_) => "Account", - Self::Transaction(_) => "Transaction", - Self::Entry(_) => "Entry", - Self::Block(_) => "Block", - Self::BlockMeta(_) => "BlockMeta", + Self::Block(msg) => msg.meta.slot, } } } diff --git a/yellowstone-grpc-proto/src/plugin/mod.rs b/yellowstone-grpc-proto/src/plugin/mod.rs index 34d102d7..39a8a487 100644 --- a/yellowstone-grpc-proto/src/plugin/mod.rs +++ b/yellowstone-grpc-proto/src/plugin/mod.rs @@ -1 +1,2 @@ pub mod filter; +pub mod message;