From bcc3c37346ea3619931374ba38c13f223b5f7323 Mon Sep 17 00:00:00 2001 From: galactus <96341601+godmodegalactus@users.noreply.github.com> Date: Wed, 14 Feb 2024 19:20:25 +0100 Subject: [PATCH] Accounts support (#327) * Slitting rpc and pubsub services, adding few class and rpc, pubsub callbacks for accounts * Implementing account storage and service writing some tests * Chainging to interface, adding new tests * Implementing accounts related rpc endpoints * Adding tests for account filters * Minor bug fixed and errors * Fixing issue with get program accounts * Implementing subscription * use auto reconnect (#329) * groovie comments part_1 * Fixing tests, fixing groovies comments * Adding workspace to test filters, minor changes --------- Co-authored-by: Groovie | Mango <95291500+grooviegermanikus@users.noreply.github.com> --- Cargo.lock | 41 + Cargo.toml | 4 +- accounts/Cargo.toml | 47 + accounts/src/account_service.rs | 316 +++++++ accounts/src/account_store_interface.rs | 24 + accounts/src/inmemory_account_store.rs | 834 ++++++++++++++++++ accounts/src/lib.rs | 3 + cluster-endpoints/src/endpoint_stremers.rs | 6 +- .../src/grpc/gprc_accounts_streaming.rs | 207 +++++ cluster-endpoints/src/grpc/mod.rs | 1 + cluster-endpoints/src/grpc_subscription.rs | 56 +- .../src/json_rpc_subscription.rs | 3 + cluster-endpoints/src/lib.rs | 1 + core/src/structures/account_data.rs | 33 + core/src/structures/account_filter.rs | 303 +++++++ core/src/structures/mod.rs | 2 + lite-rpc/Cargo.toml | 3 + lite-rpc/src/bridge.rs | 440 +++------ lite-rpc/src/bridge_pubsub.rs | 479 ++++++++++ lite-rpc/src/cli.rs | 3 + lite-rpc/src/lib.rs | 3 + lite-rpc/src/main.rs | 108 ++- lite-rpc/src/rpc.rs | 98 +- lite-rpc/src/rpc_pubsub.rs | 80 ++ lite-rpc/src/start_server.rs | 48 + .../src/account_prio_service.rs | 2 +- prioritization_fees/src/block_priofees.rs | 5 +- 27 files changed, 2710 insertions(+), 440 deletions(-) create mode 100644 accounts/Cargo.toml create mode 100644 accounts/src/account_service.rs create mode 100644 accounts/src/account_store_interface.rs create mode 100644 accounts/src/inmemory_account_store.rs create mode 100644 accounts/src/lib.rs create mode 100644 cluster-endpoints/src/grpc/gprc_accounts_streaming.rs create mode 100644 cluster-endpoints/src/grpc/mod.rs create mode 100644 core/src/structures/account_data.rs create mode 100644 core/src/structures/account_filter.rs create mode 100644 lite-rpc/src/bridge_pubsub.rs create mode 100644 lite-rpc/src/rpc_pubsub.rs create mode 100644 lite-rpc/src/start_server.rs diff --git a/Cargo.lock b/Cargo.lock index f15417ed..4e83dd9b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2519,6 +2519,8 @@ dependencies = [ "quinn", "serde", "serde_json", + "solana-account-decoder", + "solana-lite-rpc-accounts", "solana-lite-rpc-address-lookup-tables", "solana-lite-rpc-blockstore", "solana-lite-rpc-cluster-endpoints", @@ -4312,6 +4314,45 @@ dependencies = [ "syn 2.0.48", ] +[[package]] +name = "solana-lite-rpc-accounts" +version = "0.2.4" +dependencies = [ + "anyhow", + "async-trait", + "base64 0.21.7", + "bincode", + "bs58", + "bytes", + "chrono", + "dashmap 5.5.3", + "futures", + "itertools 0.10.5", + "lazy_static", + "log", + "prometheus", + "quinn", + "rand 0.8.5", + "rand_chacha 0.3.1", + "rustls", + "serde", + "serde_json", + "solana-account-decoder", + "solana-address-lookup-table-program", + "solana-client", + "solana-lite-rpc-core", + "solana-net-utils", + "solana-pubsub-client", + "solana-rpc-client", + "solana-rpc-client-api", + "solana-sdk", + "solana-streamer", + "solana-transaction-status", + "solana-version", + "thiserror", + "tokio", +] + [[package]] name = "solana-lite-rpc-address-lookup-tables" version = "0.2.4" diff --git a/Cargo.toml b/Cargo.toml index 4408ec97..ebf7bbd6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,7 +11,8 @@ members = [ "blockstore", "prioritization_fees", "bench", - "address_lookup_tables" + "address_lookup_tables", + "accounts" ] [workspace.package] @@ -75,6 +76,7 @@ solana-lite-rpc-blockstore = {path = "blockstore", version="0.2.4"} solana-lite-rpc-stakevote = {path = "stake_vote", version="0.2.4"} solana-lite-rpc-prioritization-fees = {path = "prioritization_fees", version="0.2.4"} solana-lite-rpc-address-lookup-tables = {path = "address_lookup_tables", version="0.2.4"} +solana-lite-rpc-accounts = {path = "accounts", version = "0.2.4"} async-trait = "0.1.68" yellowstone-grpc-client = { version = "1.13.0+solana.1.17.15", git = "https://github.com/rpcpool/yellowstone-grpc.git", tag = "v1.12.0+solana.1.17.15" } diff --git a/accounts/Cargo.toml b/accounts/Cargo.toml new file mode 100644 index 00000000..169114b3 --- /dev/null +++ b/accounts/Cargo.toml @@ -0,0 +1,47 @@ +[package] +name = "solana-lite-rpc-accounts" +version = "0.2.4" +edition = "2021" +description = "Library which implements accounts in lite-rpc" +rust-version = "1.73.0" +repository = "https://github.com/blockworks-foundation/lite-rpc" +license = "AGPL" + +[dependencies] +solana-sdk = { workspace = true } +solana-rpc-client-api = { workspace = true } +solana-transaction-status = { workspace = true } +solana-version = { workspace = true } +solana-client = { workspace = true } +solana-net-utils = { workspace = true } +solana-pubsub-client = { workspace = true } +solana-rpc-client = { workspace = true } +solana-streamer = { workspace = true } +solana-account-decoder = { workspace = true } +solana-address-lookup-table-program = { workspace = true } + +serde = { workspace = true } +serde_json = { workspace = true } +tokio = "1.*" +bincode = { workspace = true } +bs58 = { workspace = true } +base64 = { workspace = true } +thiserror = { workspace = true } +futures = { workspace = true } +bytes = { workspace = true } +anyhow = { workspace = true } +log = { workspace = true } +dashmap = { workspace = true } +quinn = { workspace = true } +chrono = { workspace = true } +rustls = { workspace = true } +async-trait = { workspace = true } +itertools = { workspace = true } +prometheus = { workspace = true } +lazy_static = { workspace = true } + +solana-lite-rpc-core = { workspace = true } + +[dev-dependencies] +rand = "0.8.5" +rand_chacha = "0.3.1" \ No newline at end of file diff --git a/accounts/src/account_service.rs b/accounts/src/account_service.rs new file mode 100644 index 00000000..13795ec9 --- /dev/null +++ b/accounts/src/account_service.rs @@ -0,0 +1,316 @@ +use std::{str::FromStr, sync::Arc}; + +use anyhow::bail; +use itertools::Itertools; +use solana_account_decoder::{UiAccount, UiDataSliceConfig}; +use solana_lite_rpc_core::{ + commitment_utils::Commitment, + structures::{ + account_data::{AccountData, AccountNotificationMessage, AccountStream}, + account_filter::AccountFilters, + }, + types::BlockStream, + AnyhowJoinHandle, +}; +use solana_rpc_client::nonblocking::rpc_client::RpcClient; +use solana_rpc_client_api::{ + config::{RpcAccountInfoConfig, RpcProgramAccountsConfig}, + response::RpcKeyedAccount, +}; +use solana_sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey, slot_history::Slot}; +use tokio::sync::broadcast::Sender; + +use crate::account_store_interface::AccountStorageInterface; + +#[derive(Clone)] +pub struct AccountService { + account_store: Arc, + pub account_notification_sender: Sender, +} + +impl AccountService { + pub fn new(account_store: Arc) -> Self { + let (account_notification_sender, _) = tokio::sync::broadcast::channel(256); + Self { + account_store, + account_notification_sender, + } + } + + pub async fn populate_from_rpc( + &self, + rpc_client: Arc, + filters: &AccountFilters, + max_request_in_parallel: usize, + ) -> anyhow::Result<()> { + const NB_ACCOUNTS_IN_GMA: usize = 100; + const NB_RETRY: usize = 10; + let mut accounts = vec![]; + for filter in filters.iter() { + if !filter.accounts.is_empty() { + let mut f_accounts = filter + .accounts + .iter() + .map(|x| Pubkey::from_str(x).expect("Accounts in filters should be valid")) + .collect(); + accounts.append(&mut f_accounts); + } + + if let Some(program_id) = &filter.program_id { + let program_id = + Pubkey::from_str(program_id).expect("Program id in filters should be valid"); + let mut rpc_acc = rpc_client + .get_program_accounts_with_config( + &program_id, + RpcProgramAccountsConfig { + filters: filter.get_rpc_filter(), + account_config: RpcAccountInfoConfig { + encoding: Some(solana_account_decoder::UiAccountEncoding::Base64), + data_slice: Some(UiDataSliceConfig { + offset: 0, + length: 0, + }), + commitment: None, + min_context_slot: None, + }, + with_context: None, + }, + ) + .await? + .iter() + .map(|(pk, _)| *pk) + .collect_vec(); + accounts.append(&mut rpc_acc); + } + } + log::info!("Fetching {} accounts", accounts.len()); + for accounts in accounts.chunks(max_request_in_parallel * NB_ACCOUNTS_IN_GMA) { + for accounts in accounts.chunks(NB_ACCOUNTS_IN_GMA) { + let mut fetch_accounts = vec![]; + let mut updated_slot = 0; + for _ in 0..NB_RETRY { + let accounts = rpc_client + .get_multiple_accounts_with_config( + accounts, + RpcAccountInfoConfig { + encoding: Some(solana_account_decoder::UiAccountEncoding::Base64), + data_slice: None, + commitment: Some(CommitmentConfig::finalized()), + min_context_slot: None, + }, + ) + .await; + match accounts { + Ok(response) => { + fetch_accounts = response.value; + updated_slot = response.context.slot; + break; + } + Err(e) => { + // retry + log::error!("Error fetching all the accounts {e:?}, retrying"); + continue; + } + } + } + for (index, account) in fetch_accounts.iter().enumerate() { + if let Some(account) = account { + self.account_store + .initilize_account(AccountData { + pubkey: accounts[index], + account: account.clone(), + updated_slot, + }) + .await; + } + } + } + } + log::info!("{} accounts successfully fetched", accounts.len()); + Ok(()) + } + + pub fn process_account_stream( + &self, + mut account_stream: AccountStream, + mut block_stream: BlockStream, + ) -> Vec { + let this = self.clone(); + let processed_task = tokio::spawn(async move { + loop { + match account_stream.recv().await { + Ok(account_notification) => { + this.account_store + .update_account( + account_notification.data.clone(), + account_notification.commitment, + ) + .await; + let _ = this.account_notification_sender.send(account_notification); + } + Err(tokio::sync::broadcast::error::RecvError::Lagged(e)) => { + log::error!( + "Account Stream Lagged by {}, we may have missed some account updates", + e + ); + continue; + } + Err(tokio::sync::broadcast::error::RecvError::Closed) => { + bail!("Account Stream Broken"); + } + } + } + }); + + let this = self.clone(); + let block_processing_task = tokio::spawn(async move { + loop { + match block_stream.recv().await { + Ok(block_notification) => { + if block_notification.commitment_config.is_processed() { + // processed commitment is not processed in this loop + continue; + } + let commitment = Commitment::from(block_notification.commitment_config); + let updated_accounts = this + .account_store + .process_slot_data(block_notification.slot, commitment) + .await; + for data in updated_accounts { + let _ = this + .account_notification_sender + .send(AccountNotificationMessage { data, commitment }); + } + } + Err(tokio::sync::broadcast::error::RecvError::Lagged(e)) => { + log::error!("Account Stream Lagged by {}", e); + continue; + } + Err(tokio::sync::broadcast::error::RecvError::Closed) => { + log::error!("Account Stream Broken"); + break; + } + } + } + bail!("Account Block Stream Broken"); + }); + + vec![processed_task, block_processing_task] + } + + pub fn convert_account_data_to_ui_account( + account_data: &AccountData, + config: Option, + ) -> UiAccount { + let encoding = config + .as_ref() + .map(|c| c.encoding) + .unwrap_or_default() + .unwrap_or(solana_account_decoder::UiAccountEncoding::Base64); + let data_slice = config.as_ref().map(|c| c.data_slice).unwrap_or_default(); + UiAccount::encode( + &account_data.pubkey, + &account_data.account, + encoding, + None, + data_slice, + ) + } + + pub async fn get_account( + &self, + account: Pubkey, + config: Option, + ) -> anyhow::Result<(Slot, Option)> { + let commitment = config + .as_ref() + .map(|config| config.commitment.unwrap_or_default()) + .unwrap_or_default(); + + let commitment = Commitment::from(commitment); + + if let Some(account_data) = self.account_store.get_account(account, commitment).await { + let ui_account = + Self::convert_account_data_to_ui_account(&account_data, config.clone()); + + // if minimum context slot is not satisfied return Null + let minimum_context_slot = config + .as_ref() + .map(|c| c.min_context_slot.unwrap_or_default()) + .unwrap_or_default(); + if minimum_context_slot <= account_data.updated_slot { + Ok((account_data.updated_slot, Some(ui_account))) + } else { + Ok((account_data.updated_slot, None)) + } + } else { + bail!( + "Account {} does not satisfy any configured filters", + account.to_string() + ) + } + } + + pub async fn get_program_accounts( + &self, + program_id: Pubkey, + config: Option, + ) -> anyhow::Result<(Slot, Vec)> { + let account_filter = config + .as_ref() + .map(|x| x.filters.clone()) + .unwrap_or_default(); + let commitment = config + .as_ref() + .map(|c| c.account_config.commitment) + .unwrap_or_default() + .unwrap_or_default(); + let commitment = Commitment::from(commitment); + + let program_accounts = self + .account_store + .get_program_accounts(program_id, account_filter, commitment) + .await; + if let Some(program_accounts) = program_accounts { + let min_context_slot = config + .as_ref() + .map(|c| { + if c.with_context.unwrap_or_default() { + c.account_config.min_context_slot + } else { + None + } + }) + .unwrap_or_default() + .unwrap_or_default(); + let slot = program_accounts + .iter() + .map(|program_account| program_account.updated_slot) + .max() + .unwrap_or_default(); + let acc_config = config.map(|c| c.account_config); + let rpc_keyed_accounts = program_accounts + .iter() + .filter_map(|account_data| { + if account_data.updated_slot >= min_context_slot { + Some(RpcKeyedAccount { + pubkey: account_data.pubkey.to_string(), + account: Self::convert_account_data_to_ui_account( + account_data, + acc_config.clone(), + ), + }) + } else { + None + } + }) + .collect_vec(); + Ok((slot, rpc_keyed_accounts)) + } else { + bail!( + "Program id {} does not satisfy any configured filters", + program_id.to_string() + ) + } + } +} diff --git a/accounts/src/account_store_interface.rs b/accounts/src/account_store_interface.rs new file mode 100644 index 00000000..19683515 --- /dev/null +++ b/accounts/src/account_store_interface.rs @@ -0,0 +1,24 @@ +use async_trait::async_trait; +use solana_lite_rpc_core::commitment_utils::Commitment; +use solana_lite_rpc_core::structures::account_data::AccountData; +use solana_rpc_client_api::filter::RpcFilterType; +use solana_sdk::pubkey::Pubkey; +use solana_sdk::slot_history::Slot; + +#[async_trait] +pub trait AccountStorageInterface: Send + Sync { + async fn update_account(&self, account_data: AccountData, commitment: Commitment); + + async fn initilize_account(&self, account_data: AccountData); + + async fn get_account(&self, account_pk: Pubkey, commitment: Commitment) -> Option; + + async fn get_program_accounts( + &self, + program_pubkey: Pubkey, + account_filter: Option>, + commitment: Commitment, + ) -> Option>; + + async fn process_slot_data(&self, slot: Slot, commitment: Commitment) -> Vec; +} diff --git a/accounts/src/inmemory_account_store.rs b/accounts/src/inmemory_account_store.rs new file mode 100644 index 00000000..e455c4b2 --- /dev/null +++ b/accounts/src/inmemory_account_store.rs @@ -0,0 +1,834 @@ +use std::{ + collections::{BTreeSet, HashSet}, + sync::Arc, +}; + +use async_trait::async_trait; +use dashmap::DashMap; +use itertools::Itertools; +use solana_lite_rpc_core::{commitment_utils::Commitment, structures::account_data::AccountData}; +use solana_rpc_client_api::filter::RpcFilterType; +use solana_sdk::{pubkey::Pubkey, slot_history::Slot}; +use std::collections::BTreeMap; +use tokio::sync::RwLock; + +use crate::account_store_interface::AccountStorageInterface; + +#[derive(Clone, Default)] +pub struct AccountDataByCommitment { + pub processed_accounts: BTreeMap, + pub confirmed_account: Option, + pub finalized_account: Option, +} + +impl AccountDataByCommitment { + #[allow(deprecated)] + pub fn get_account_data(&self, commitment: Commitment) -> Option { + match commitment { + Commitment::Processed => self + .processed_accounts + .last_key_value() + .map(|(_, v)| v.clone()) + .or(self.confirmed_account.clone()), + Commitment::Confirmed => self.confirmed_account.clone(), + Commitment::Finalized => self.finalized_account.clone(), + } + } + + pub fn new(data: AccountData, commitment: Commitment) -> Self { + let mut processed_accounts = BTreeMap::new(); + processed_accounts.insert(data.updated_slot, data.clone()); + AccountDataByCommitment { + processed_accounts, + confirmed_account: if commitment == Commitment::Confirmed { + Some(data) + } else { + None + }, + finalized_account: None, + } + } + + pub fn initialize(data: AccountData) -> Self { + let mut processed_accounts = BTreeMap::new(); + processed_accounts.insert(data.updated_slot, data.clone()); + AccountDataByCommitment { + processed_accounts, + confirmed_account: Some(data.clone()), + finalized_account: Some(data), + } + } + + pub fn update(&mut self, data: AccountData, commitment: Commitment) -> bool { + // if commitmentment is processed check and update processed + // if commitmentment is confirmed check and update processed and confirmed + // if commitmentment is finalized check and update all + let update_confirmed = self + .confirmed_account + .as_ref() + .map(|x| x.updated_slot < data.updated_slot) + .unwrap_or(true); + let update_finalized = self + .finalized_account + .as_ref() + .map(|x| x.updated_slot < data.updated_slot) + .unwrap_or(true); + + if self.processed_accounts.get(&data.updated_slot).is_none() { + // processed not present for the slot + self.processed_accounts + .insert(data.updated_slot, data.clone()); + } + let mut updated = false; + match commitment { + Commitment::Confirmed => { + if update_confirmed { + self.confirmed_account = Some(data); + updated = true; + } + } + Commitment::Finalized => { + if update_confirmed { + self.confirmed_account = Some(data.clone()); + } + if update_finalized { + self.finalized_account = Some(data); + updated = true; + } + } + Commitment::Processed => { + // processed already treated + } + } + updated + } + // this method will promote processed account to confirmed account to finalized account + // returns promoted account + pub fn promote_slot_commitment( + &mut self, + slot: Slot, + commitment: Commitment, + ) -> Option<(AccountData, Option)> { + if let Some(account_data) = self.processed_accounts.get(&slot).cloned() { + match commitment { + Commitment::Processed => { + // do nothing + None + } + Commitment::Confirmed => { + if self + .confirmed_account + .as_ref() + .map(|acc| acc.updated_slot) + .unwrap_or_default() + < slot + { + let prev_data = self.confirmed_account.clone(); + self.confirmed_account = Some(account_data.clone()); + Some((account_data, prev_data)) + } else { + None + } + } + Commitment::Finalized => { + // slot finalized remove data from processed + while self + .processed_accounts + .first_key_value() + .map(|(s, _)| *s) + .unwrap_or(u64::MAX) + <= slot + { + self.processed_accounts.pop_first(); + } + + // processed map should not be empty + if self.processed_accounts.is_empty() { + log::error!( + "Processed map should not be empty filling it with finalized data" + ); + self.processed_accounts.insert(slot, account_data.clone()); + } + + if self + .finalized_account + .as_ref() + .map(|acc| acc.updated_slot) + .unwrap_or_default() + < slot + { + let prev_data = self.finalized_account.clone(); + self.finalized_account = Some(account_data.clone()); + Some((account_data, prev_data)) + } else { + None + } + } + } + } else { + // remove processed slot data + while self + .processed_accounts + .first_key_value() + .map(|(s, _)| *s) + .unwrap_or(u64::MAX) + <= slot + { + self.processed_accounts.pop_first(); + } + None + } + } +} + +pub struct InmemoryAccountStore { + account_store: Arc>, + confirmed_slots_map: RwLock>, + owner_map_accounts: Arc>>, +} + +impl InmemoryAccountStore { + pub fn new() -> Self { + Self { + account_store: Arc::new(DashMap::new()), + confirmed_slots_map: RwLock::new(BTreeSet::new()), + owner_map_accounts: Arc::new(DashMap::new()), + } + } + + fn add_account_owner(&self, account: Pubkey, owner: Pubkey) { + match self.owner_map_accounts.entry(owner) { + dashmap::mapref::entry::Entry::Occupied(mut occ) => { + occ.get_mut().insert(account); + } + dashmap::mapref::entry::Entry::Vacant(vc) => { + let mut set = HashSet::new(); + set.insert(account); + vc.insert(set); + } + } + } + + // here if the commitment is processed and the account has changed owner from A->B we keep the key for both A and B + // then we remove the key from A for finalized commitment + fn update_owner( + &self, + prev_account_data: &AccountData, + new_account_data: &AccountData, + commitment: Commitment, + ) { + if prev_account_data.pubkey == new_account_data.pubkey + && prev_account_data.account.owner != new_account_data.account.owner + { + if commitment == Commitment::Finalized { + if let Some(mut accounts) = self + .owner_map_accounts + .get_mut(&prev_account_data.account.owner) + { + accounts.remove(&prev_account_data.pubkey); + } + } + self.add_account_owner(new_account_data.pubkey, new_account_data.account.owner); + } + } +} + +#[async_trait] +impl AccountStorageInterface for InmemoryAccountStore { + async fn update_account(&self, account_data: AccountData, commitment: Commitment) { + let slot = account_data.updated_slot; + // check if the blockhash and slot is already confirmed + let commitment = if commitment == Commitment::Processed { + let lk = self.confirmed_slots_map.read().await; + if lk.contains(&slot) { + Commitment::Confirmed + } else { + Commitment::Processed + } + } else { + commitment + }; + + if let Some(mut account_by_commitment) = self.account_store.get_mut(&account_data.pubkey) { + let prev_account = account_by_commitment.get_account_data(commitment); + if let Some(prev_account) = prev_account { + self.update_owner(&prev_account, &account_data, commitment); + } + account_by_commitment.update(account_data, commitment); + } else { + self.add_account_owner(account_data.pubkey, account_data.account.owner); + self.account_store.insert( + account_data.pubkey, + AccountDataByCommitment::new(account_data.clone(), commitment), + ); + } + } + + async fn initilize_account(&self, account_data: AccountData) { + match self.account_store.get_mut(&account_data.pubkey) { + Some(_) => { + // account has already been filled by an event + self.update_account(account_data, Commitment::Finalized) + .await; + } + None => { + self.add_account_owner(account_data.pubkey, account_data.account.owner); + self.account_store.insert( + account_data.pubkey, + AccountDataByCommitment::initialize(account_data), + ); + } + } + } + + async fn get_account(&self, account_pk: Pubkey, commitment: Commitment) -> Option { + if let Some(account_by_commitment) = self.account_store.get(&account_pk) { + account_by_commitment.get_account_data(commitment).clone() + } else { + None + } + } + + async fn get_program_accounts( + &self, + program_pubkey: Pubkey, + account_filters: Option>, + commitment: Commitment, + ) -> Option> { + if let Some(program_accounts) = self.owner_map_accounts.get(&program_pubkey) { + let mut return_vec = vec![]; + for program_account in program_accounts.iter() { + let account_data = self.get_account(*program_account, commitment).await; + if let Some(account_data) = account_data { + // recheck program owner and filters + if account_data.account.owner.eq(&program_pubkey) { + match &account_filters { + Some(filters) => { + if filters.iter().all(|filter| account_data.allows(filter)) { + return_vec.push(account_data.clone()); + } + } + None => { + return_vec.push(account_data.clone()); + } + } + } + } + } + Some(return_vec) + } else { + None + } + } + + async fn process_slot_data(&self, slot: Slot, commitment: Commitment) -> Vec { + match commitment { + Commitment::Confirmed => { + // insert slot and blockhash that were confirmed + { + let mut lk = self.confirmed_slots_map.write().await; + lk.insert(slot); + } + } + Commitment::Finalized => { + // remove finalized slots form confirmed map + { + let mut lk = self.confirmed_slots_map.write().await; + if !lk.remove(&slot) { + log::warn!( + "following slot {} were not confirmed by account storage", + slot + ); + } + } + } + Commitment::Processed => { + // processed should not use update_slot_data + log::error!("Processed commitment is not treated by process_slot_data method"); + return vec![]; + } + } + + let updated_accounts = self + .account_store + .iter_mut() + .filter_map(|mut acc| acc.promote_slot_commitment(slot, commitment)) + .collect_vec(); + + // update owners + updated_accounts + .iter() + .for_each(|(account_data, prev_account_data)| { + if let Some(prev_account_data) = prev_account_data { + if prev_account_data.account.owner != account_data.account.owner { + self.update_owner(prev_account_data, account_data, commitment); + } + } + }); + + updated_accounts + .iter() + .filter_map(|(account_data, prev_account_data)| { + if let Some(prev_account_data) = prev_account_data { + if prev_account_data != account_data { + Some(account_data) + } else { + None + } + } else { + Some(account_data) + } + }) + .cloned() + .collect_vec() + } +} + +impl Default for InmemoryAccountStore { + fn default() -> Self { + Self::new() + } +} + +#[cfg(test)] +mod tests { + use itertools::Itertools; + use rand::{rngs::ThreadRng, Rng}; + use solana_lite_rpc_core::{ + commitment_utils::Commitment, structures::account_data::AccountData, + }; + use solana_sdk::{account::Account, pubkey::Pubkey, slot_history::Slot}; + + use crate::{ + account_store_interface::AccountStorageInterface, + inmemory_account_store::InmemoryAccountStore, + }; + + fn create_random_account( + rng: &mut ThreadRng, + updated_slot: Slot, + pubkey: Pubkey, + program: Pubkey, + ) -> AccountData { + let length: usize = rng.gen_range(100..1000); + AccountData { + pubkey, + account: Account { + lamports: rng.gen(), + data: (0..length).map(|_| rng.gen::()).collect_vec(), + owner: program, + executable: false, + rent_epoch: 0, + }, + updated_slot, + } + } + + #[tokio::test] + pub async fn test_account_store() { + let store = InmemoryAccountStore::default(); + let mut rng = rand::thread_rng(); + let program = Pubkey::new_unique(); + let pk1 = Pubkey::new_unique(); + let pk2 = Pubkey::new_unique(); + + let account_data_0 = create_random_account(&mut rng, 0, pk1, program); + store.initilize_account(account_data_0.clone()).await; + + let account_data_1 = create_random_account(&mut rng, 0, pk2, program); + store.initilize_account(account_data_1.clone()).await; + + assert_eq!( + store.get_account(pk1, Commitment::Processed).await, + Some(account_data_0.clone()) + ); + assert_eq!( + store.get_account(pk1, Commitment::Confirmed).await, + Some(account_data_0.clone()) + ); + assert_eq!( + store.get_account(pk1, Commitment::Finalized).await, + Some(account_data_0.clone()) + ); + + assert_eq!( + store.get_account(pk2, Commitment::Processed).await, + Some(account_data_1.clone()) + ); + assert_eq!( + store.get_account(pk2, Commitment::Confirmed).await, + Some(account_data_1.clone()) + ); + assert_eq!( + store.get_account(pk2, Commitment::Finalized).await, + Some(account_data_1.clone()) + ); + + let account_data_2 = create_random_account(&mut rng, 1, pk1, program); + let account_data_3 = create_random_account(&mut rng, 2, pk1, program); + let account_data_4 = create_random_account(&mut rng, 3, pk1, program); + let account_data_5 = create_random_account(&mut rng, 4, pk1, program); + + store + .update_account(account_data_2.clone(), Commitment::Processed) + .await; + store + .update_account(account_data_3.clone(), Commitment::Processed) + .await; + store + .update_account(account_data_4.clone(), Commitment::Processed) + .await; + store + .update_account(account_data_5.clone(), Commitment::Processed) + .await; + + assert_eq!( + store.get_account(pk1, Commitment::Processed).await, + Some(account_data_5.clone()) + ); + assert_eq!( + store.get_account(pk1, Commitment::Confirmed).await, + Some(account_data_0.clone()) + ); + assert_eq!( + store.get_account(pk1, Commitment::Finalized).await, + Some(account_data_0.clone()) + ); + + store.process_slot_data(1, Commitment::Confirmed).await; + + assert_eq!( + store.get_account(pk1, Commitment::Processed).await, + Some(account_data_5.clone()) + ); + assert_eq!( + store.get_account(pk1, Commitment::Confirmed).await, + Some(account_data_2.clone()) + ); + assert_eq!( + store.get_account(pk1, Commitment::Finalized).await, + Some(account_data_0.clone()) + ); + + store.process_slot_data(2, Commitment::Confirmed).await; + + assert_eq!( + store.get_account(pk1, Commitment::Processed).await, + Some(account_data_5.clone()) + ); + assert_eq!( + store.get_account(pk1, Commitment::Confirmed).await, + Some(account_data_3.clone()) + ); + assert_eq!( + store.get_account(pk1, Commitment::Finalized).await, + Some(account_data_0.clone()) + ); + + store.process_slot_data(1, Commitment::Finalized).await; + + assert_eq!( + store.get_account(pk1, Commitment::Processed).await, + Some(account_data_5.clone()) + ); + assert_eq!( + store.get_account(pk1, Commitment::Confirmed).await, + Some(account_data_3.clone()) + ); + assert_eq!( + store.get_account(pk1, Commitment::Finalized).await, + Some(account_data_2.clone()) + ); + } + + #[tokio::test] + pub async fn test_account_store_if_finalized_clears_old_processed_slots() { + let store = InmemoryAccountStore::default(); + + let program = Pubkey::new_unique(); + let pk1 = Pubkey::new_unique(); + + let mut rng = rand::thread_rng(); + + store + .initilize_account(create_random_account(&mut rng, 0, pk1, program)) + .await; + + store + .update_account( + create_random_account(&mut rng, 1, pk1, program), + Commitment::Processed, + ) + .await; + store + .update_account( + create_random_account(&mut rng, 1, pk1, program), + Commitment::Processed, + ) + .await; + store + .update_account( + create_random_account(&mut rng, 2, pk1, program), + Commitment::Processed, + ) + .await; + store + .update_account( + create_random_account(&mut rng, 3, pk1, program), + Commitment::Processed, + ) + .await; + store + .update_account( + create_random_account(&mut rng, 4, pk1, program), + Commitment::Processed, + ) + .await; + store + .update_account( + create_random_account(&mut rng, 5, pk1, program), + Commitment::Processed, + ) + .await; + store + .update_account( + create_random_account(&mut rng, 6, pk1, program), + Commitment::Processed, + ) + .await; + store + .update_account( + create_random_account(&mut rng, 7, pk1, program), + Commitment::Processed, + ) + .await; + + let account_8 = create_random_account(&mut rng, 8, pk1, program); + store + .update_account(account_8.clone(), Commitment::Processed) + .await; + store + .update_account( + create_random_account(&mut rng, 9, pk1, program), + Commitment::Processed, + ) + .await; + store + .update_account( + create_random_account(&mut rng, 10, pk1, program), + Commitment::Processed, + ) + .await; + + let last_account = create_random_account(&mut rng, 11, pk1, program); + store + .update_account(last_account.clone(), Commitment::Processed) + .await; + + assert_eq!( + store + .account_store + .get(&pk1) + .unwrap() + .processed_accounts + .len(), + 12 + ); + store.process_slot_data(11, Commitment::Finalized).await; + assert_eq!( + store + .account_store + .get(&pk1) + .unwrap() + .processed_accounts + .len(), + 1 + ); + + assert_eq!( + store.get_account(pk1, Commitment::Finalized).await, + Some(last_account.clone()), + ); + + // check finalizing previous commitment does not affect + store.process_slot_data(8, Commitment::Finalized).await; + + assert_eq!( + store.get_account(pk1, Commitment::Finalized).await, + Some(last_account), + ); + } + + #[tokio::test] + pub async fn test_get_program_account() { + let store = InmemoryAccountStore::default(); + + let prog_1 = Pubkey::new_unique(); + let prog_2 = Pubkey::new_unique(); + + let mut rng = rand::thread_rng(); + + store + .update_account( + create_random_account(&mut rng, 1, Pubkey::new_unique(), prog_1), + Commitment::Confirmed, + ) + .await; + store + .update_account( + create_random_account(&mut rng, 1, Pubkey::new_unique(), prog_1), + Commitment::Confirmed, + ) + .await; + store + .update_account( + create_random_account(&mut rng, 1, Pubkey::new_unique(), prog_1), + Commitment::Confirmed, + ) + .await; + store + .update_account( + create_random_account(&mut rng, 1, Pubkey::new_unique(), prog_1), + Commitment::Confirmed, + ) + .await; + + store + .update_account( + create_random_account(&mut rng, 1, Pubkey::new_unique(), prog_2), + Commitment::Confirmed, + ) + .await; + + let acc_prgram_1 = store + .get_program_accounts(prog_1, None, Commitment::Processed) + .await; + assert!(acc_prgram_1.is_some()); + assert_eq!(acc_prgram_1.unwrap().len(), 4); + let acc_prgram_1 = store + .get_program_accounts(prog_1, None, Commitment::Confirmed) + .await; + assert!(acc_prgram_1.is_some()); + assert_eq!(acc_prgram_1.unwrap().len(), 4); + let acc_prgram_1 = store + .get_program_accounts(prog_1, None, Commitment::Finalized) + .await; + assert!(acc_prgram_1.is_some()); + assert!(acc_prgram_1.unwrap().is_empty()); + + let acc_prgram_2 = store + .get_program_accounts(prog_2, None, Commitment::Processed) + .await; + assert!(acc_prgram_2.is_some()); + assert_eq!(acc_prgram_2.unwrap().len(), 1); + let acc_prgram_2 = store + .get_program_accounts(prog_2, None, Commitment::Confirmed) + .await; + assert!(acc_prgram_2.is_some()); + assert_eq!(acc_prgram_2.unwrap().len(), 1); + let acc_prgram_2 = store + .get_program_accounts(prog_2, None, Commitment::Finalized) + .await; + assert!(acc_prgram_2.is_some()); + assert!(acc_prgram_2.unwrap().is_empty()); + + let acc_prgram_3 = store + .get_program_accounts(Pubkey::new_unique(), None, Commitment::Processed) + .await; + assert!(acc_prgram_3.is_none()); + let acc_prgram_3 = store + .get_program_accounts(Pubkey::new_unique(), None, Commitment::Confirmed) + .await; + assert!(acc_prgram_3.is_none()); + let acc_prgram_3 = store + .get_program_accounts(Pubkey::new_unique(), None, Commitment::Finalized) + .await; + assert!(acc_prgram_3.is_none()); + + store.process_slot_data(1, Commitment::Finalized).await; + + let acc_prgram_1 = store + .get_program_accounts(prog_1, None, Commitment::Finalized) + .await; + assert!(acc_prgram_1.is_some()); + assert_eq!(acc_prgram_1.unwrap().len(), 4); + let acc_prgram_2 = store + .get_program_accounts(prog_2, None, Commitment::Finalized) + .await; + assert!(acc_prgram_2.is_some()); + assert_eq!(acc_prgram_2.unwrap().len(), 1); + + let pk = Pubkey::new_unique(); + let prog_3 = Pubkey::new_unique(); + + let account_finalized = create_random_account(&mut rng, 2, pk, prog_3); + store + .update_account(account_finalized.clone(), Commitment::Finalized) + .await; + store.process_slot_data(2, Commitment::Finalized).await; + + let account_confirmed = create_random_account(&mut rng, 3, pk, prog_3); + store + .update_account(account_confirmed.clone(), Commitment::Confirmed) + .await; + + let prog_4 = Pubkey::new_unique(); + let account_processed = create_random_account(&mut rng, 4, pk, prog_4); + store + .update_account(account_processed.clone(), Commitment::Processed) + .await; + + let f = store + .get_program_accounts(prog_3, None, Commitment::Finalized) + .await; + + let c = store + .get_program_accounts(prog_3, None, Commitment::Confirmed) + .await; + + let p_3 = store + .get_program_accounts(prog_3, None, Commitment::Processed) + .await; + + let p_4 = store + .get_program_accounts(prog_4, None, Commitment::Processed) + .await; + + assert_eq!(c, Some(vec![account_confirmed.clone()])); + assert_eq!(p_3, Some(vec![])); + assert_eq!(p_4, Some(vec![account_processed.clone()])); + + assert_eq!(f, Some(vec![account_finalized.clone()])); + + store.process_slot_data(3, Commitment::Finalized).await; + store.process_slot_data(4, Commitment::Confirmed).await; + + let f = store + .get_program_accounts(prog_3, None, Commitment::Finalized) + .await; + + let p_3 = store + .get_program_accounts(prog_3, None, Commitment::Confirmed) + .await; + + let p_4 = store + .get_program_accounts(prog_4, None, Commitment::Confirmed) + .await; + + assert_eq!(f, Some(vec![account_confirmed.clone()])); + assert_eq!(p_3, Some(vec![])); + assert_eq!(p_4, Some(vec![account_processed.clone()])); + + store.process_slot_data(4, Commitment::Finalized).await; + let p_3 = store + .get_program_accounts(prog_3, None, Commitment::Finalized) + .await; + + let p_4 = store + .get_program_accounts(prog_4, None, Commitment::Finalized) + .await; + + assert_eq!(p_3, Some(vec![])); + assert_eq!(p_4, Some(vec![account_processed.clone()])); + } +} diff --git a/accounts/src/lib.rs b/accounts/src/lib.rs new file mode 100644 index 00000000..11d6260f --- /dev/null +++ b/accounts/src/lib.rs @@ -0,0 +1,3 @@ +pub mod account_service; +pub mod account_store_interface; +pub mod inmemory_account_store; diff --git a/cluster-endpoints/src/endpoint_stremers.rs b/cluster-endpoints/src/endpoint_stremers.rs index fda48d1b..aeab980c 100644 --- a/cluster-endpoints/src/endpoint_stremers.rs +++ b/cluster-endpoints/src/endpoint_stremers.rs @@ -1,4 +1,7 @@ -use solana_lite_rpc_core::types::{BlockStream, ClusterInfoStream, SlotStream, VoteAccountStream}; +use solana_lite_rpc_core::{ + structures::account_data::AccountStream, + types::{BlockStream, ClusterInfoStream, SlotStream, VoteAccountStream}, +}; /// subscribers to broadcast channels should assume that channels are not getting closed unless the system is shutting down pub struct EndpointStreaming { @@ -6,4 +9,5 @@ pub struct EndpointStreaming { pub slot_notifier: SlotStream, pub vote_account_notifier: VoteAccountStream, pub cluster_info_notifier: ClusterInfoStream, + pub processed_account_stream: Option, } diff --git a/cluster-endpoints/src/grpc/gprc_accounts_streaming.rs b/cluster-endpoints/src/grpc/gprc_accounts_streaming.rs new file mode 100644 index 00000000..2da09872 --- /dev/null +++ b/cluster-endpoints/src/grpc/gprc_accounts_streaming.rs @@ -0,0 +1,207 @@ +use geyser_grpc_connector::grpc_subscription_autoreconnect_tasks::create_geyser_autoconnection_task; +use std::{collections::HashMap, time::Duration}; + +use geyser_grpc_connector::GrpcSourceConfig; +use geyser_grpc_connector::Message::GeyserSubscribeUpdate; +use itertools::Itertools; +use solana_lite_rpc_core::{ + commitment_utils::Commitment, + structures::{ + account_data::{AccountData, AccountNotificationMessage, AccountStream}, + account_filter::{AccountFilterType, AccountFilters, MemcmpFilterData}, + }, + AnyhowJoinHandle, +}; +use solana_sdk::{account::Account, pubkey::Pubkey}; +use tokio::sync::broadcast; +use yellowstone_grpc_proto::geyser::{ + subscribe_request_filter_accounts_filter::Filter, + subscribe_request_filter_accounts_filter_memcmp::Data, subscribe_update::UpdateOneof, + SubscribeRequest, SubscribeRequestFilterAccounts, SubscribeRequestFilterAccountsFilter, + SubscribeRequestFilterAccountsFilterMemcmp, +}; + +pub fn start_account_streaming_tasks( + grpc_config: GrpcSourceConfig, + accounts_filters: AccountFilters, + account_stream_sx: tokio::sync::mpsc::UnboundedSender, +) -> AnyhowJoinHandle { + tokio::spawn(async move { + 'main_loop: loop { + // for now we can only be sure that there is one confirmed block per slot, for processed there can be multiple confirmed blocks + // So setting commitment to confirmed + // To do somehow make it processed, we we could get blockhash with slot it should be ideal + let confirmed_commitment = yellowstone_grpc_proto::geyser::CommitmentLevel::Confirmed; + + let mut subscribe_accounts: HashMap = + HashMap::new(); + + for (index, accounts_filter) in accounts_filters.iter().enumerate() { + if !accounts_filter.accounts.is_empty() { + subscribe_accounts.insert( + format!("accounts_{index:?}"), + SubscribeRequestFilterAccounts { + account: accounts_filter + .accounts + .iter() + .map(|x| x.to_string()) + .collect_vec(), + owner: vec![], + filters: vec![], + }, + ); + } + if let Some(program_id) = &accounts_filter.program_id { + let filters = if let Some(filters) = &accounts_filter.filters { + filters + .iter() + .map(|filter| match filter { + AccountFilterType::Datasize(size) => { + SubscribeRequestFilterAccountsFilter { + filter: Some(Filter::Datasize(*size)), + } + } + AccountFilterType::Memcmp(memcmp) => { + SubscribeRequestFilterAccountsFilter { + filter: Some(Filter::Memcmp( + SubscribeRequestFilterAccountsFilterMemcmp { + offset: memcmp.offset, + data: Some(match &memcmp.data { + MemcmpFilterData::Bytes(bytes) => { + Data::Bytes(bytes.clone()) + } + MemcmpFilterData::Base58(data) => { + Data::Base58(data.clone()) + } + MemcmpFilterData::Base64(data) => { + Data::Base64(data.clone()) + } + }), + }, + )), + } + } + }) + .collect_vec() + } else { + vec![] + }; + subscribe_accounts.insert( + format!("accounts_{}", program_id), + SubscribeRequestFilterAccounts { + account: vec![], + owner: vec![program_id.clone()], + filters, + }, + ); + } + } + + let subscribe_request = SubscribeRequest { + accounts: subscribe_accounts, + slots: Default::default(), + transactions: Default::default(), + blocks: Default::default(), + blocks_meta: Default::default(), + entry: Default::default(), + commitment: Some(confirmed_commitment.into()), + accounts_data_slice: Default::default(), + ping: None, + }; + let (_abort_handler, mut accounts_stream) = + create_geyser_autoconnection_task(grpc_config.clone(), subscribe_request); + + while let Some(GeyserSubscribeUpdate(message)) = accounts_stream.recv().await { + let Some(update) = message.update_oneof else { + continue; + }; + + match update { + UpdateOneof::Account(account) => { + if let Some(account_data) = account.account { + let account_pk_bytes: [u8; 32] = account_data + .pubkey + .try_into() + .expect("Pubkey should be 32 byte long"); + let owner: [u8; 32] = account_data + .owner + .try_into() + .expect("owner pubkey should be deserializable"); + let notification = AccountNotificationMessage { + data: AccountData { + pubkey: Pubkey::new_from_array(account_pk_bytes), + account: Account { + lamports: account_data.lamports, + data: account_data.data, + owner: Pubkey::new_from_array(owner), + executable: account_data.executable, + rent_epoch: account_data.rent_epoch, + }, + updated_slot: account.slot, + }, + // TODO update with processed commitment / check above + commitment: Commitment::Confirmed, + }; + if account_stream_sx.send(notification).is_err() { + // non recoverable, i.e the whole stream is being restarted + log::error!("Account stream broken, breaking from main loop"); + break 'main_loop; + } + } + } + UpdateOneof::Ping(_) => { + log::trace!("GRPC Ping accounts stream"); + } + _ => { + log::error!("GRPC accounts steam misconfigured"); + } + }; + } + log::error!("Grpc account subscription broken (resubscribing)"); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + } + Ok(()) + }) +} + +pub fn create_grpc_account_streaming( + grpc_sources: Vec, + accounts_filters: AccountFilters, +) -> (AnyhowJoinHandle, AccountStream) { + let (account_sender, accounts_stream) = broadcast::channel::(128); + + let jh: AnyhowJoinHandle = tokio::spawn(async move { + loop { + let (accounts_sx, mut accounts_rx) = tokio::sync::mpsc::unbounded_channel(); + grpc_sources + .iter() + .map(|grpc_config| { + start_account_streaming_tasks( + grpc_config.clone(), + accounts_filters.clone(), + accounts_sx.clone(), + ) + }) + .collect_vec(); + drop(accounts_sx); + + loop { + match tokio::time::timeout(Duration::from_secs(60), accounts_rx.recv()).await { + Ok(Some(data)) => { + let _ = account_sender.send(data); + } + Ok(None) => { + log::error!("All grpc accounts channels close; restarting subscription"); + break; + } + Err(_elapsed) => { + log::error!("No accounts data for a minute; restarting subscription"); + break; + } + } + } + } + }); + + (jh, accounts_stream) +} diff --git a/cluster-endpoints/src/grpc/mod.rs b/cluster-endpoints/src/grpc/mod.rs new file mode 100644 index 00000000..5809b7e9 --- /dev/null +++ b/cluster-endpoints/src/grpc/mod.rs @@ -0,0 +1 @@ +pub mod gprc_accounts_streaming; diff --git a/cluster-endpoints/src/grpc_subscription.rs b/cluster-endpoints/src/grpc_subscription.rs index e6a1de55..9e126e68 100644 --- a/cluster-endpoints/src/grpc_subscription.rs +++ b/cluster-endpoints/src/grpc_subscription.rs @@ -1,4 +1,5 @@ use crate::endpoint_stremers::EndpointStreaming; +use crate::grpc::gprc_accounts_streaming::create_grpc_account_streaming; use crate::grpc_multiplex::{ create_grpc_multiplex_blocks_subscription, create_grpc_multiplex_processed_slots_subscription, }; @@ -7,6 +8,7 @@ use futures::StreamExt; use geyser_grpc_connector::GrpcSourceConfig; use itertools::Itertools; use solana_client::nonblocking::rpc_client::RpcClient; +use solana_lite_rpc_core::structures::account_filter::AccountFilters; use solana_lite_rpc_core::{ encoding::BASE64, structures::produced_block::{ProducedBlock, TransactionInfo}, @@ -402,6 +404,7 @@ pub fn create_slot_stream_task( pub fn create_grpc_subscription( rpc_client: Arc, grpc_sources: Vec, + accounts_filter: AccountFilters, ) -> anyhow::Result<(EndpointStreaming, Vec)> { let (cluster_info_sx, cluster_info_notifier) = tokio::sync::broadcast::channel(10); let (va_sx, vote_account_notifier) = tokio::sync::broadcast::channel(10); @@ -411,23 +414,46 @@ pub fn create_grpc_subscription( create_grpc_multiplex_processed_slots_subscription(grpc_sources.clone()); let (block_multiplex_channel, jh_multiplex_blockstream) = - create_grpc_multiplex_blocks_subscription(grpc_sources); + create_grpc_multiplex_blocks_subscription(grpc_sources.clone()); let cluster_info_polling = poll_cluster_info(rpc_client.clone(), cluster_info_sx); let vote_accounts_polling = poll_vote_accounts(rpc_client.clone(), va_sx); - let streamers = EndpointStreaming { - blocks_notifier: block_multiplex_channel, - slot_notifier: slot_multiplex_channel, - cluster_info_notifier, - vote_account_notifier, - }; - - let endpoint_tasks = vec![ - jh_multiplex_slotstream, - jh_multiplex_blockstream, - cluster_info_polling, - vote_accounts_polling, - ]; - Ok((streamers, endpoint_tasks)) + // accounts + if !accounts_filter.is_empty() { + let (account_jh, processed_account_stream) = + create_grpc_account_streaming(grpc_sources, accounts_filter); + let streamers = EndpointStreaming { + blocks_notifier: block_multiplex_channel, + slot_notifier: slot_multiplex_channel, + cluster_info_notifier, + vote_account_notifier, + processed_account_stream: Some(processed_account_stream), + }; + + let endpoint_tasks = vec![ + jh_multiplex_slotstream, + jh_multiplex_blockstream, + cluster_info_polling, + vote_accounts_polling, + account_jh, + ]; + Ok((streamers, endpoint_tasks)) + } else { + let streamers = EndpointStreaming { + blocks_notifier: block_multiplex_channel, + slot_notifier: slot_multiplex_channel, + cluster_info_notifier, + vote_account_notifier, + processed_account_stream: None, + }; + + let endpoint_tasks = vec![ + jh_multiplex_slotstream, + jh_multiplex_blockstream, + cluster_info_polling, + vote_accounts_polling, + ]; + Ok((streamers, endpoint_tasks)) + } } diff --git a/cluster-endpoints/src/json_rpc_subscription.rs b/cluster-endpoints/src/json_rpc_subscription.rs index b4a60b1d..ba32c475 100644 --- a/cluster-endpoints/src/json_rpc_subscription.rs +++ b/cluster-endpoints/src/json_rpc_subscription.rs @@ -18,6 +18,7 @@ pub fn create_json_rpc_polling_subscription( let (block_sx, blocks_notifier) = tokio::sync::broadcast::channel(16); let (cluster_info_sx, cluster_info_notifier) = tokio::sync::broadcast::channel(16); let (va_sx, vote_account_notifier) = tokio::sync::broadcast::channel(16); + // does not support accounts support with rpc polling let mut endpoint_tasks = poll_slots(rpc_client.clone(), CommitmentConfig::processed(), slot_sx)?; @@ -41,6 +42,8 @@ pub fn create_json_rpc_polling_subscription( slot_notifier, cluster_info_notifier, vote_account_notifier, + // does not support accounts support with rpc polling + processed_account_stream: None, }; Ok((streamers, endpoint_tasks)) } diff --git a/cluster-endpoints/src/lib.rs b/cluster-endpoints/src/lib.rs index bce281e1..5dd408c4 100644 --- a/cluster-endpoints/src/lib.rs +++ b/cluster-endpoints/src/lib.rs @@ -1,4 +1,5 @@ pub mod endpoint_stremers; +pub mod grpc; pub mod grpc_inspect; pub mod grpc_leaders_getter; pub mod grpc_multiplex; diff --git a/core/src/structures/account_data.rs b/core/src/structures/account_data.rs new file mode 100644 index 00000000..20aa895f --- /dev/null +++ b/core/src/structures/account_data.rs @@ -0,0 +1,33 @@ +use solana_rpc_client_api::filter::RpcFilterType; +use solana_sdk::{account::Account, pubkey::Pubkey, slot_history::Slot}; +use tokio::sync::broadcast::Receiver; + +use crate::commitment_utils::Commitment; + +#[derive(Clone, Eq, PartialEq, Debug)] +pub struct AccountData { + pub pubkey: Pubkey, + pub account: Account, + pub updated_slot: Slot, +} + +impl AccountData { + pub fn allows(&self, filter: &RpcFilterType) -> bool { + match filter { + RpcFilterType::DataSize(size) => self.account.data.len() as u64 == *size, + RpcFilterType::Memcmp(compare) => compare.bytes_match(&self.account.data), + RpcFilterType::TokenAccountState => { + // todo + false + } + } + } +} + +#[derive(Clone)] +pub struct AccountNotificationMessage { + pub data: AccountData, + pub commitment: Commitment, +} + +pub type AccountStream = Receiver; diff --git a/core/src/structures/account_filter.rs b/core/src/structures/account_filter.rs new file mode 100644 index 00000000..10bec0fc --- /dev/null +++ b/core/src/structures/account_filter.rs @@ -0,0 +1,303 @@ +use itertools::Itertools; +use serde::{Deserialize, Serialize}; +use solana_rpc_client_api::filter::{Memcmp as RpcMemcmp, MemcmpEncodedBytes, RpcFilterType}; + +#[derive(Serialize, Deserialize, Clone, PartialEq, Eq, Debug)] +#[serde(rename_all = "camelCase")] +pub enum MemcmpFilterData { + Bytes(Vec), + Base58(String), + Base64(String), +} + +#[derive(Serialize, Deserialize, Clone, PartialEq, Eq, Debug)] +#[serde(rename_all = "camelCase")] +pub struct MemcmpFilter { + pub offset: u64, + pub data: MemcmpFilterData, +} + +#[derive(Serialize, Deserialize, Clone, PartialEq, Eq, Debug)] +#[serde(rename_all = "camelCase")] +pub enum AccountFilterType { + Datasize(u64), + Memcmp(MemcmpFilter), +} + +#[derive(Serialize, Deserialize, Clone, PartialEq, Eq, Debug)] +#[serde(rename_all = "camelCase")] +pub struct AccountFilter { + pub accounts: Vec, + pub program_id: Option, + pub filters: Option>, +} + +impl AccountFilter { + pub fn get_rpc_filter(&self) -> Option> { + self.filters.clone().map(|filters| { + filters + .iter() + .map(|filter| match filter { + AccountFilterType::Datasize(size) => RpcFilterType::DataSize(*size), + AccountFilterType::Memcmp(memcpy) => { + let encoded_bytes = match &memcpy.data { + MemcmpFilterData::Bytes(bytes) => { + MemcmpEncodedBytes::Bytes(bytes.clone()) + } + MemcmpFilterData::Base58(data) => { + MemcmpEncodedBytes::Base58(data.clone()) + } + MemcmpFilterData::Base64(data) => { + MemcmpEncodedBytes::Base64(data.clone()) + } + }; + RpcFilterType::Memcmp(RpcMemcmp::new(memcpy.offset as usize, encoded_bytes)) + } + }) + .collect_vec() + }) + } +} + +pub type AccountFilters = Vec; + +#[test] +fn test_accounts_filters_deserialization() { + let str = "[ + { + \"accounts\": [], + \"programId\": \"4MangoMjqJ2firMokCjjGgoK8d4MXcrgL7XJaL3w6fVg\", + \"filters\": [ + { + \"datasize\": 200 + } + ] + }, + { + \"accounts\": [], + \"programId\": \"4MangoMjqJ2firMokCjjGgoK8d4MXcrgL7XJaL3w6fVg\", + \"filters\": [ + { + \"memcmp\": { + \"offset\": 100, + \"data\": { + \"bytes\": [ + 115, + 101, + 114, + 117, + 109, + 5, + 0, + 0, + 0, + 0, + 0, + 0, + 0 + ] + } + } + } + ] + }, + { + \"accounts\": [], + \"programId\": \"4MangoMjqJ2firMokCjjGgoK8d4MXcrgL7XJaL3w6fVg\" + }, + { + \"accounts\": [], + \"programId\": \"4MangoMjqJ2firMokCjjGgoK8d4MXcrgL7XJaL3w6fVg\", + \"filters\": [ + { + \"datasize\": 200 + }, + { + \"memcmp\": { + \"offset\": 100, + \"data\": { + \"bytes\": [ + 115, + 101, + 114, + 117, + 109, + 5, + 0, + 0, + 0, + 0, + 0, + 0, + 0 + ] + } + } + } + ] + }, + { + \"accounts\": [\"4MangoMjqJ2firMokCjjGgoK8d4MXcrgL7XJaL3w6fVg\", \"srmqPvymJeFKQ4zGQed1GFppgkRHL9kaELCbyksJtPX\"] + } + ] + "; + + let filters: AccountFilters = serde_json::from_str(str).unwrap(); + + assert_eq!( + filters[0], + AccountFilter { + accounts: vec![], + program_id: Some("4MangoMjqJ2firMokCjjGgoK8d4MXcrgL7XJaL3w6fVg".to_string()), + filters: Some(vec![AccountFilterType::Datasize(200)]) + } + ); + + assert_eq!( + filters[1], + AccountFilter { + accounts: vec![], + program_id: Some("4MangoMjqJ2firMokCjjGgoK8d4MXcrgL7XJaL3w6fVg".to_string()), + filters: Some(vec![AccountFilterType::Memcmp(MemcmpFilter { + offset: 100, + data: MemcmpFilterData::Bytes(vec![ + 115, 101, 114, 117, 109, 5, 0, 0, 0, 0, 0, 0, 0 + ]) + })]) + } + ); + + assert_eq!( + filters[2], + AccountFilter { + accounts: vec![], + program_id: Some("4MangoMjqJ2firMokCjjGgoK8d4MXcrgL7XJaL3w6fVg".to_string()), + filters: None + } + ); + + assert_eq!( + filters[3], + AccountFilter { + accounts: vec![], + program_id: Some("4MangoMjqJ2firMokCjjGgoK8d4MXcrgL7XJaL3w6fVg".to_string()), + filters: Some(vec![ + AccountFilterType::Datasize(200), + AccountFilterType::Memcmp(MemcmpFilter { + offset: 100, + data: MemcmpFilterData::Bytes(vec![ + 115, 101, 114, 117, 109, 5, 0, 0, 0, 0, 0, 0, 0 + ]) + }) + ]) + } + ); + + assert_eq!( + filters[4], + AccountFilter { + accounts: vec![ + "4MangoMjqJ2firMokCjjGgoK8d4MXcrgL7XJaL3w6fVg".to_string(), + "srmqPvymJeFKQ4zGQed1GFppgkRHL9kaELCbyksJtPX".to_string() + ], + program_id: None, + filters: None + } + ); +} + +#[test] +fn workspace_write_your_own_filter() { + // update the filters and then run the test using `cargo test -- workspace_write_your_own_filter --nocapture` then use the printed line to update config.json + let mut filters = AccountFilters::new(); + + let mango_program_filter = AccountFilter { + accounts: vec![], + program_id: Some("4MangoMjqJ2firMokCjjGgoK8d4MXcrgL7XJaL3w6fVg".to_string()), + filters: None, + }; + + filters.push(mango_program_filter); + + let oracles = [ + "9BoFW2JxdCDodsa2zfxAZpyT9yiTgSYEcHdNSuA7s5Sf", + "CYGfrBJB9HgLf9iZyN4aH5HvUAi2htQ4MjPxeXMf4Egn", + "7UYk5yhrQtFbZV2bLX1gtqN7QdU9xpBMyAk7tFgoTatk", + "2PRxDHabumHHv6fgcrcyvHsV8ENkWdEph27vhpbSMLn3", + "CtJ8EkqLmeYyGB8s4jevpeNsvmD4dxVR2krfsDLcvV8Y", + "3pxTFXBJbTAtHLCgSWjeasngGCi4ohV16F4fDnd4Xh81", + "GVXRSBjFk6e6J3NbVPXohDJetcTjaeeuykUpbQF8UoMU", + "2dAsTriwLdgmGt7N6Dkq1iUV6pGhSUUwqqePp4qorzor", + "FnVC5oSSdnCHfN5W7xbu74HbxXF3Kmy63gUKWdaaZwD7", + "6ABgrEZk8urs6kJ1JNdC1sspH5zKXRqxy8sg3ZG2cQps", + "7moA1i5vQUpfDwSpK6Pw9s56ahB7WFGidtbL2ujWrVvm", + "Bt1hEbY62aMriY1SyQqbeZbm8VmSbQVGBFzSzMuVNWzN", + "4ivThkX8uRxBpHsdWSqyXYihzKF3zpRGAUCqyuagnLoV", + "D8UUgr8a3aR3yUeHLu7v8FWK7E8Y5sSU7qrYBXUJXBQ5", + "7fMKXU6AnatycNu1CAMndLkKmDPtjZaPNZSJSfXR92Ez", + "JBu1AL4obBcCMqKBBxhpWCNUt136ijcuMZLFvTP7iWdB", + "AnLf8tVYCM816gmBjiy8n53eXKKEDydT5piYjjQDPgTB", + "4dusJxxxiYrMTLGYS6cCAyu3gPn2xXLBjS7orMToZHi1", + "hnkVVuJTRZvX2SawUsecZz2eHJP2oGMdnhdDJa33KSY", + "AFrYBhb5wKQtxRS9UA9YRS4V3dwFm7SqmS6DHKq6YVgo", + "Ag7RdWj5t3U9avU4XKAY7rBbGDCNz456ckNmcpW1aHoE", + "AwpALBTXcaz2t6BayXvQQu7eZ6h7u2UNRCQNmD9ShY7Z", + "8ihFLu5FimgTQ1Unh4dVyEHUGodJ5gJQCrQf4KUVB9bN", + "AV67ufGVkHrPKXdeupXE2MXdw3puq7xnkPNrTxGP3suU", + "3uZCMHY3vnNJspSVk6TvE9qmb4iYVbrEWFQ71uCE5hFR", + "5wRjzrwWZG3af3FE26ZrRj3s8A3BVNyeJ9Pt9Uf2ogdf", + "2qHkYmAn7HNtAGw45hQQkRthDDNiyVyVfDJDaw6iSoRm", + "2FGoL9PNhNGpduRKLsTa4teRaX3vfarXAc1an2KyXxQm", + "4BA3RcS4zE32WWgp49vvvre2t6nXY1W1kMyKZxeeuUey", + "Bfz5q3cDywSSjnWb9oXeQZqYzHwqFGp75mm34eYCPNEA", + "H6ARHf6YXhGYeQfUzQNGk6rDNnLBQKrenN712K4AQJEG", + "79wm3jjcPr6RaNQ4DGvP5KxG1mNd3gEBsg6FsNVFezK4", + "Gnt27xtC473ZT2Mw5u8wZ68Z3gULkSTb5DuxJy7eJotD", + "g6eRCbboSwK4tSWngn773RCMexr1APQr4uA9bGZBYfo", + "91Sfpm86H7ZgngdGfAiVJTNbg42CXBPiurruf29kinMh", + "nrYkQQQur7z8rYTST3G9GqATviK5SxTDkrqd21MW6Ue", + "E4v1BBgoso9s64TQvmyownAVJbhbEPGyzA3qn4n46qj9", + "EzBoEHzYSx37RULrQCh756kNcA7iLrmGesxqpzSwo4v3", + "3vxLXJqLqF3JG5TCbYycbKWRBbCJQLxQmBGCkyqEEefL", + "7yyaeuJ1GGtVBLT2z2xub5ZWYKaNhF28mj1RdV4VDFVk", + "BeAZ81UvesnJR7VVGNzRQGKFHrnxm77x5ozesC1pTjrY", + "H5hokc8gcKezGcwbqFbss99QrpA3WxsRfqGYCm6F1EBy", + "ELrhqYY3WjLRnLwWt3u7sMykNc87EScEAsyCyrDDSAXv", + "FYghp2wYzq36yqXYd8D3Lu6jpMWETHTtxYDZPXdpppyc", + ]; + let oracle_filters = AccountFilter { + accounts: oracles.iter().map(|x| x.to_string()).collect_vec(), + program_id: None, + filters: None, + }; + filters.push(oracle_filters); + + let open_orders = AccountFilter { + accounts: vec![], + program_id: Some("srmqPvymJeFKQ4zGQed1GFppgkRHL9kaELCbyksJtPX".to_string()), + filters: Some(vec![ + AccountFilterType::Datasize(3228), + AccountFilterType::Memcmp(MemcmpFilter { + offset: 0, + data: MemcmpFilterData::Bytes( + [0x73, 0x65, 0x72, 0x75, 0x6d, 5, 0, 0, 0, 0, 0, 0, 0].to_vec(), + ), + }), + AccountFilterType::Memcmp(MemcmpFilter { + offset: 45, + data: MemcmpFilterData::Bytes( + [ + 91, 23, 199, 200, 106, 110, 115, 159, 175, 23, 81, 129, 131, 99, 233, 79, + 144, 139, 243, 112, 4, 206, 109, 63, 188, 241, 151, 189, 210, 245, 31, 28, + ] + .to_vec(), + ), + }), + ]), + }; + filters.push(open_orders); + + let filter_string = serde_json::to_string(&filters).unwrap(); + let filter_string = filter_string.replace('"', "\\\""); + println!("Filter is : \n {} \n", filter_string); +} diff --git a/core/src/structures/mod.rs b/core/src/structures/mod.rs index ba7db841..58e22f53 100644 --- a/core/src/structures/mod.rs +++ b/core/src/structures/mod.rs @@ -1,5 +1,7 @@ // this mod will contain all the core structures that are defined for lite-rpc +pub mod account_data; +pub mod account_filter; pub mod epoch; pub mod identity_stakes; pub mod leader_data; diff --git a/lite-rpc/Cargo.toml b/lite-rpc/Cargo.toml index ab6f5ade..09d55841 100644 --- a/lite-rpc/Cargo.toml +++ b/lite-rpc/Cargo.toml @@ -15,6 +15,8 @@ solana-rpc-client = { workspace = true } solana-rpc-client-api = { workspace = true } solana-transaction-status = { workspace = true } solana-version = { workspace = true } +solana-account-decoder = { workspace = true } + serde = { workspace = true } serde_json = { workspace = true } bincode = { workspace = true } @@ -50,6 +52,7 @@ solana-lite-rpc-cluster-endpoints = { workspace = true } solana-lite-rpc-blockstore = { workspace = true } solana-lite-rpc-prioritization-fees = { workspace = true } solana-lite-rpc-address-lookup-tables = { workspace = true } +solana-lite-rpc-accounts = { workspace = true } [dev-dependencies] bench = { path = "../bench" } diff --git a/lite-rpc/src/bridge.rs b/lite-rpc/src/bridge.rs index 00f167ab..a2716cf5 100644 --- a/lite-rpc/src/bridge.rs +++ b/lite-rpc/src/bridge.rs @@ -1,25 +1,19 @@ use std::collections::HashMap; -use std::{str::FromStr, sync::Arc}; +use std::str::FromStr; -use anyhow::Context; use itertools::Itertools; -use jsonrpsee::core::StringError; -use jsonrpsee::{ - core::SubscriptionResult, server::ServerBuilder, DisconnectError, PendingSubscriptionSink, -}; -use log::{debug, error, warn}; use prometheus::{opts, register_int_counter, IntCounter}; -use solana_lite_rpc_core::types::BlockStream; +use solana_account_decoder::UiAccount; +use solana_lite_rpc_accounts::account_service::AccountService; use solana_lite_rpc_prioritization_fees::account_prio_service::AccountPrioService; use solana_lite_rpc_prioritization_fees::prioritization_fee_calculation_method::PrioritizationFeeCalculationMethod; -use solana_rpc_client::nonblocking::rpc_client::RpcClient; -use solana_rpc_client_api::response::SlotInfo; +use solana_rpc_client_api::config::RpcAccountInfoConfig; +use solana_rpc_client_api::response::{OptionalContext, RpcKeyedAccount}; use solana_rpc_client_api::{ config::{ - RpcBlockSubscribeConfig, RpcBlockSubscribeFilter, RpcBlocksConfigWrapper, RpcContextConfig, - RpcGetVoteAccountsConfig, RpcLeaderScheduleConfig, RpcProgramAccountsConfig, - RpcRequestAirdropConfig, RpcSignatureStatusConfig, RpcSignatureSubscribeConfig, - RpcSignaturesForAddressConfig, RpcTransactionLogsConfig, RpcTransactionLogsFilter, + RpcBlocksConfigWrapper, RpcContextConfig, RpcGetVoteAccountsConfig, + RpcLeaderScheduleConfig, RpcProgramAccountsConfig, RpcRequestAirdropConfig, + RpcSignatureStatusConfig, RpcSignaturesForAddressConfig, }, response::{ Response as RpcResponse, RpcBlockhash, RpcConfirmedTransactionStatusWithSignature, @@ -30,14 +24,11 @@ use solana_rpc_client_api::{ use solana_sdk::epoch_info::EpochInfo; use solana_sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey, slot_history::Slot}; use solana_transaction_status::{TransactionStatus, UiConfirmedBlock}; -use tokio::net::ToSocketAddrs; -use tokio::sync::broadcast::error::RecvError::{Closed, Lagged}; use solana_lite_rpc_blockstore::history::History; use solana_lite_rpc_core::{ encoding, - stores::{block_information_store::BlockInformation, data_cache::DataCache, tx_store::TxProps}, - AnyhowJoinHandle, + stores::{block_information_store::BlockInformation, data_cache::DataCache}, }; use solana_lite_rpc_services::{ transaction_service::TransactionService, tx_sender::TXS_IN_CHANNEL, @@ -45,12 +36,9 @@ use solana_lite_rpc_services::{ use crate::{ configs::{IsBlockHashValidConfig, SendTransactionConfig}, - jsonrpsee_subscrption_handler_sink::JsonRpseeSubscriptionHandlerSink, rpc::LiteRpcServer, }; -use solana_lite_rpc_prioritization_fees::rpc_data::{ - AccountPrioFeesStats, AccountPrioFeesUpdateMessage, PrioFeesStats, PrioFeesUpdateMessage, -}; +use solana_lite_rpc_prioritization_fees::rpc_data::{AccountPrioFeesStats, PrioFeesStats}; use solana_lite_rpc_prioritization_fees::PrioFeesService; lazy_static::lazy_static! { @@ -66,87 +54,35 @@ lazy_static::lazy_static! { register_int_counter!(opts!("literpc_rpc_get_version", "RPC call to version")).unwrap(); static ref RPC_REQUEST_AIRDROP: IntCounter = register_int_counter!(opts!("literpc_rpc_airdrop", "RPC call to request airdrop")).unwrap(); - static ref RPC_SIGNATURE_SUBSCRIBE: IntCounter = - register_int_counter!(opts!("literpc_rpc_signature_subscribe", "RPC call to subscribe to signature")).unwrap(); - static ref RPC_BLOCK_PRIOFEES_SUBSCRIBE: IntCounter = - register_int_counter!(opts!("literpc_rpc_block_priofees_subscribe", "RPC call to subscribe to block prio fees")).unwrap(); - static ref RPC_ACCOUNT_PRIOFEES_SUBSCRIBE: IntCounter = - register_int_counter!(opts!("literpc_rpc_account_priofees_subscribe", "RPC call to subscribe to account prio fees")).unwrap(); } /// A bridge between clients and tpu #[allow(dead_code)] pub struct LiteBridge { data_cache: DataCache, - // should be removed - rpc_client: Arc, transaction_service: TransactionService, history: History, prio_fees_service: PrioFeesService, account_priofees_service: AccountPrioService, - block_stream: BlockStream, + accounts_service: Option, } impl LiteBridge { pub fn new( - rpc_client: Arc, data_cache: DataCache, transaction_service: TransactionService, history: History, prio_fees_service: PrioFeesService, account_priofees_service: AccountPrioService, - block_stream: BlockStream, + accounts_service: Option, ) -> Self { Self { - rpc_client, data_cache, transaction_service, history, prio_fees_service, account_priofees_service, - block_stream, - } - } - - /// List for `JsonRpc` requests - pub async fn start( - self, - http_addr: T, - ws_addr: T, - ) -> anyhow::Result<()> { - let rpc = self.into_rpc(); - - let ws_server_handle = ServerBuilder::default() - .ws_only() - .build(ws_addr.clone()) - .await? - .start(rpc.clone())?; - - let http_server_handle = ServerBuilder::default() - .http_only() - .build(http_addr.clone()) - .await? - .start(rpc)?; - - let ws_server: AnyhowJoinHandle = tokio::spawn(async move { - log::info!("Websocket Server started at {ws_addr:?}"); - ws_server_handle.stopped().await; - anyhow::bail!("Websocket server stopped"); - }); - - let http_server: AnyhowJoinHandle = tokio::spawn(async move { - log::info!("HTTP Server started at {http_addr:?}"); - http_server_handle.stopped().await; - anyhow::bail!("HTTP server stopped"); - }); - - tokio::select! { - res = ws_server => { - anyhow::bail!("WebSocket server {res:?}"); - }, - res = http_server => { - anyhow::bail!("HTTP server {res:?}"); - }, + accounts_service, } } } @@ -454,147 +390,14 @@ impl LiteRpcServer for LiteBridge { async fn request_airdrop( &self, - pubkey_str: String, - lamports: u64, - config: Option, + _pubkey_str: String, + _lamports: u64, + _config: Option, ) -> crate::rpc::Result { RPC_REQUEST_AIRDROP.inc(); - - let pubkey = match Pubkey::from_str(&pubkey_str) { - Ok(pubkey) => pubkey, - Err(err) => { - return Err(jsonrpsee::core::Error::Custom(err.to_string())); - } - }; - - let airdrop_sig = match self - .rpc_client - .request_airdrop_with_config(&pubkey, lamports, config.unwrap_or_default()) - .await - .context("failed to request airdrop") - { - Ok(airdrop_sig) => airdrop_sig.to_string(), - Err(err) => { - return Err(jsonrpsee::core::Error::Custom(err.to_string())); - } - }; - if let Ok((_, block_height)) = self - .rpc_client - .get_latest_blockhash_with_commitment(CommitmentConfig::finalized()) - .await - .context("failed to get latest blockhash") - { - self.data_cache.txs.insert( - airdrop_sig.clone(), - TxProps { - status: None, - last_valid_blockheight: block_height, - sent_by_lite_rpc: true, - }, - ); - } - Ok(airdrop_sig) - } - - async fn program_subscribe( - &self, - _pending: PendingSubscriptionSink, - _pubkey_str: String, - _config: Option, - ) -> SubscriptionResult { - todo!() - } - - async fn slot_subscribe(&self, pending: PendingSubscriptionSink) -> SubscriptionResult { - let sink = pending.accept().await?; - let mut block_stream = self.block_stream.resubscribe(); - tokio::spawn(async move { - loop { - match block_stream.recv().await { - Ok(produced_block) => { - if !produced_block.commitment_config.is_processed() { - continue; - } - let slot_info = SlotInfo { - slot: produced_block.slot, - parent: produced_block.parent_slot, - root: 0, - }; - let result_message = jsonrpsee::SubscriptionMessage::from_json(&slot_info); - - match sink.send(result_message.unwrap()).await { - Ok(()) => { - // success - continue; - } - Err(DisconnectError(_subscription_message)) => { - debug!("Stopping subscription task on disconnect"); - return; - } - }; - } - Err(e) => match e { - Closed => { - break; - } - Lagged(_) => { - log::error!("Slot subscription stream lagged"); - continue; - } - }, - } - } - }); - Ok(()) - } - - async fn block_subscribe( - &self, - _pending: PendingSubscriptionSink, - _filter: RpcBlockSubscribeFilter, - _config: Option, - ) -> SubscriptionResult { - todo!() - } - - async fn logs_subscribe( - &self, - _pending: PendingSubscriptionSink, - _filter: RpcTransactionLogsFilter, - _config: Option, - ) -> SubscriptionResult { - todo!() - } - - // WARN: enable_received_notification: bool is ignored - async fn signature_subscribe( - &self, - pending: PendingSubscriptionSink, - signature: String, - config: RpcSignatureSubscribeConfig, - ) -> SubscriptionResult { - RPC_SIGNATURE_SUBSCRIBE.inc(); - let sink = pending.accept().await?; - - let jsonrpsee_sink = JsonRpseeSubscriptionHandlerSink::new(sink); - self.data_cache.tx_subs.signature_subscribe( - signature, - config.commitment.unwrap_or_default(), - Arc::new(jsonrpsee_sink), - ); - - Ok(()) - } - - async fn slot_updates_subscribe( - &self, - _pending: PendingSubscriptionSink, - ) -> SubscriptionResult { - todo!() - } - - async fn vote_subscribe(&self, _pending: PendingSubscriptionSink) -> SubscriptionResult { - todo!() + Err(jsonrpsee::core::Error::Custom( + "Does not support airdrop".to_string(), + )) } async fn get_leader_schedule( @@ -672,62 +475,6 @@ impl LiteRpcServer for LiteBridge { } } - // use websocket-tungstenite-retry->examples/consume_literpc_priofees.rs to test - async fn latest_block_priofees_subscribe( - &self, - pending: PendingSubscriptionSink, - ) -> SubscriptionResult { - let sink = pending.accept().await?; - - let mut block_fees_stream = self.prio_fees_service.block_fees_stream.subscribe(); - tokio::spawn(async move { - RPC_BLOCK_PRIOFEES_SUBSCRIBE.inc(); - - 'recv_loop: loop { - match block_fees_stream.recv().await { - Ok(PrioFeesUpdateMessage { - slot: confirmation_slot, - priofees_stats, - }) => { - let result_message = - jsonrpsee::SubscriptionMessage::from_json(&RpcResponse { - context: RpcResponseContext { - slot: confirmation_slot, - api_version: None, - }, - value: priofees_stats, - }); - - match sink.send(result_message.unwrap()).await { - Ok(()) => { - // success - continue 'recv_loop; - } - Err(DisconnectError(_subscription_message)) => { - debug!("Stopping subscription task on disconnect"); - return; - } - }; - } - Err(Lagged(lagged)) => { - // this usually happens if there is one "slow receiver", see https://docs.rs/tokio/latest/tokio/sync/broadcast/index.html#lagging - warn!( - "subscriber laggs some({}) priofees update messages - continue", - lagged - ); - continue 'recv_loop; - } - Err(Closed) => { - error!("failed to receive block, sender closed - aborting"); - return; - } - } - } - }); - - Ok(()) - } - async fn get_latest_account_priofees( &self, account: String, @@ -762,66 +509,109 @@ impl LiteRpcServer for LiteBridge { } } - async fn latest_account_priofees_subscribe( + async fn get_account_info( &self, - pending: PendingSubscriptionSink, - account: String, - ) -> SubscriptionResult { - let Ok(account) = Pubkey::from_str(&account) else { - return Err(StringError::from("Invalid account".to_string())); + pubkey_str: String, + config: Option, + ) -> crate::rpc::Result>> { + let Ok(pubkey) = Pubkey::from_str(&pubkey_str) else { + return Err(jsonrpsee::core::Error::Custom( + "invalid account pubkey".to_string(), + )); }; - let sink = pending.accept().await?; - let mut account_fees_stream = self - .account_priofees_service - .priofees_update_sender - .subscribe(); - tokio::spawn(async move { - RPC_BLOCK_PRIOFEES_SUBSCRIBE.inc(); - - 'recv_loop: loop { - match account_fees_stream.recv().await { - Ok(AccountPrioFeesUpdateMessage { + if let Some(account_service) = &self.accounts_service { + match account_service.get_account(pubkey, config).await { + Ok((slot, ui_account)) => Ok(RpcResponse { + context: RpcResponseContext { slot, - accounts_data, - }) => { - if let Some(account_data) = accounts_data.get(&account) { - let result_message = - jsonrpsee::SubscriptionMessage::from_json(&RpcResponse { - context: RpcResponseContext { - slot, - api_version: None, - }, - value: account_data, - }); - - match sink.send(result_message.unwrap()).await { - Ok(()) => { - // success - continue 'recv_loop; - } - Err(DisconnectError(_subscription_message)) => { - debug!("Stopping subscription task on disconnect"); - return; - } - }; + api_version: None, + }, + value: ui_account, + }), + Err(e) => Err(jsonrpsee::core::Error::Custom(e.to_string())), + } + } else { + Err(jsonrpsee::core::Error::Custom( + "account filters are not configured".to_string(), + )) + } + } + + async fn get_multiple_accounts( + &self, + pubkey_strs: Vec, + config: Option, + ) -> crate::rpc::Result>>> { + let pubkeys = pubkey_strs + .iter() + .map(|key| Pubkey::from_str(key)) + .collect_vec(); + if pubkeys.iter().any(|res| res.is_err()) { + return Err(jsonrpsee::core::Error::Custom( + "invalid account pubkey".to_string(), + )); + }; + + if let Some(account_service) = &self.accounts_service { + let mut ui_accounts = vec![]; + let mut max_slot = 0; + for pubkey in pubkeys { + match account_service + .get_account(pubkey.unwrap(), config.clone()) + .await + { + Ok((slot, ui_account)) => { + if slot > max_slot { + max_slot = slot; } + ui_accounts.push(ui_account); } - Err(Lagged(lagged)) => { - // this usually happens if there is one "slow receiver", see https://docs.rs/tokio/latest/tokio/sync/broadcast/index.html#lagging - warn!( - "subscriber laggs some({}) priofees update messages - continue", - lagged - ); - continue 'recv_loop; - } - Err(Closed) => { - error!("failed to receive block, sender closed - aborting"); - return; - } + Err(e) => return Err(jsonrpsee::core::Error::Custom(e.to_string())), } } - }); + Ok(RpcResponse { + context: RpcResponseContext { + slot: max_slot, + api_version: None, + }, + value: ui_accounts, + }) + } else { + Err(jsonrpsee::core::Error::Custom( + "account filters are not configured".to_string(), + )) + } + } + + async fn get_program_accounts( + &self, + program_id_str: String, + config: Option, + ) -> crate::rpc::Result>> { + let Ok(program_id) = Pubkey::from_str(&program_id_str) else { + return Err(jsonrpsee::core::Error::Custom( + "invalid program id pubkey".to_string(), + )); + }; - Ok(()) + if let Some(account_service) = &self.accounts_service { + match account_service + .get_program_accounts(program_id, config) + .await + { + Ok((slot, ui_account)) => Ok(OptionalContext::Context(RpcResponse { + context: RpcResponseContext { + slot, + api_version: None, + }, + value: ui_account, + })), + Err(e) => Err(jsonrpsee::core::Error::Custom(e.to_string())), + } + } else { + Err(jsonrpsee::core::Error::Custom( + "account filters are not configured".to_string(), + )) + } } } diff --git a/lite-rpc/src/bridge_pubsub.rs b/lite-rpc/src/bridge_pubsub.rs new file mode 100644 index 00000000..f2c411af --- /dev/null +++ b/lite-rpc/src/bridge_pubsub.rs @@ -0,0 +1,479 @@ +use prometheus::{opts, register_int_counter, IntCounter}; +use solana_lite_rpc_accounts::account_service::AccountService; +use solana_lite_rpc_core::{ + commitment_utils::Commitment, stores::data_cache::DataCache, + structures::account_data::AccountNotificationMessage, types::BlockStream, +}; +use std::{str::FromStr, sync::Arc}; +use tokio::sync::broadcast::error::RecvError::{Closed, Lagged}; + +use crate::{ + jsonrpsee_subscrption_handler_sink::JsonRpseeSubscriptionHandlerSink, + rpc_pubsub::LiteRpcPubSubServer, +}; +use jsonrpsee::{ + core::{StringError, SubscriptionResult}, + DisconnectError, PendingSubscriptionSink, +}; +use solana_lite_rpc_prioritization_fees::{ + account_prio_service::AccountPrioService, + rpc_data::{AccountPrioFeesUpdateMessage, PrioFeesUpdateMessage}, + PrioFeesService, +}; +use solana_rpc_client_api::{ + config::{ + RpcAccountInfoConfig, RpcBlockSubscribeConfig, RpcBlockSubscribeFilter, + RpcProgramAccountsConfig, RpcSignatureSubscribeConfig, RpcTransactionLogsConfig, + RpcTransactionLogsFilter, + }, + response::{Response as RpcResponse, RpcKeyedAccount, RpcResponseContext, SlotInfo}, +}; +use solana_sdk::pubkey::Pubkey; + +lazy_static::lazy_static! { + static ref RPC_SIGNATURE_SUBSCRIBE: IntCounter = + register_int_counter!(opts!("literpc_rpc_signature_subscribe", "RPC call to subscribe to signature")).unwrap(); + static ref RPC_BLOCK_PRIOFEES_SUBSCRIBE: IntCounter = + register_int_counter!(opts!("literpc_rpc_block_priofees_subscribe", "RPC call to subscribe to block prio fees")).unwrap(); + static ref RPC_ACCOUNT_PRIOFEES_SUBSCRIBE: IntCounter = + register_int_counter!(opts!("literpc_rpc_account_priofees_subscribe", "RPC call to subscribe to account prio fees")).unwrap(); + static ref RPC_ACCOUNT_SUBSCRIBE: IntCounter = + register_int_counter!(opts!("literpc_rpc_account_subscribe", "RPC call to subscribe to account")).unwrap(); + static ref RPC_PROGRAM_ACCOUNT_SUBSCRIBE: IntCounter = + register_int_counter!(opts!("literpc_rpc_program_account_subscribe", "RPC call to subscribe to program account")).unwrap(); +} + +pub struct LitePubSubBridge { + data_cache: DataCache, + prio_fees_service: PrioFeesService, + account_priofees_service: AccountPrioService, + block_stream: BlockStream, + accounts_service: Option, +} + +impl LitePubSubBridge { + pub fn new( + data_cache: DataCache, + prio_fees_service: PrioFeesService, + account_priofees_service: AccountPrioService, + block_stream: BlockStream, + accounts_service: Option, + ) -> Self { + Self { + data_cache, + prio_fees_service, + account_priofees_service, + block_stream, + accounts_service, + } + } +} + +#[jsonrpsee::core::async_trait] +impl LiteRpcPubSubServer for LitePubSubBridge { + async fn slot_subscribe(&self, pending: PendingSubscriptionSink) -> SubscriptionResult { + let sink = pending.accept().await?; + let mut block_stream = self.block_stream.resubscribe(); + tokio::spawn(async move { + loop { + match block_stream.recv().await { + Ok(produced_block) => { + if !produced_block.commitment_config.is_processed() { + continue; + } + let slot_info = SlotInfo { + slot: produced_block.slot, + parent: produced_block.parent_slot, + root: 0, + }; + let result_message = jsonrpsee::SubscriptionMessage::from_json(&slot_info); + + match sink.send(result_message.unwrap()).await { + Ok(()) => { + // success + continue; + } + Err(DisconnectError(_subscription_message)) => { + log::debug!("Stopping subscription task on disconnect"); + return; + } + }; + } + Err(e) => match e { + Closed => { + break; + } + Lagged(_) => { + log::error!("Slot subscription stream lagged"); + continue; + } + }, + } + } + }); + Ok(()) + } + + async fn block_subscribe( + &self, + _pending: PendingSubscriptionSink, + _filter: RpcBlockSubscribeFilter, + _config: Option, + ) -> SubscriptionResult { + todo!() + } + + async fn logs_subscribe( + &self, + _pending: PendingSubscriptionSink, + _filter: RpcTransactionLogsFilter, + _config: Option, + ) -> SubscriptionResult { + todo!() + } + + // WARN: enable_received_notification: bool is ignored + async fn signature_subscribe( + &self, + pending: PendingSubscriptionSink, + signature: String, + config: RpcSignatureSubscribeConfig, + ) -> SubscriptionResult { + RPC_SIGNATURE_SUBSCRIBE.inc(); + let sink = pending.accept().await?; + + let jsonrpsee_sink = JsonRpseeSubscriptionHandlerSink::new(sink); + self.data_cache.tx_subs.signature_subscribe( + signature, + config.commitment.unwrap_or_default(), + Arc::new(jsonrpsee_sink), + ); + + Ok(()) + } + + async fn slot_updates_subscribe( + &self, + _pending: PendingSubscriptionSink, + ) -> SubscriptionResult { + todo!() + } + + async fn vote_subscribe(&self, _pending: PendingSubscriptionSink) -> SubscriptionResult { + todo!() + } + + // use websocket-tungstenite-retry->examples/consume_literpc_priofees.rs to test + async fn latest_block_priofees_subscribe( + &self, + pending: PendingSubscriptionSink, + ) -> SubscriptionResult { + let sink = pending.accept().await?; + + let mut block_fees_stream = self.prio_fees_service.block_fees_stream.subscribe(); + tokio::spawn(async move { + RPC_BLOCK_PRIOFEES_SUBSCRIBE.inc(); + + 'recv_loop: loop { + match block_fees_stream.recv().await { + Ok(PrioFeesUpdateMessage { + slot: confirmation_slot, + priofees_stats, + }) => { + let result_message = + jsonrpsee::SubscriptionMessage::from_json(&RpcResponse { + context: RpcResponseContext { + slot: confirmation_slot, + api_version: None, + }, + value: priofees_stats, + }); + + match sink.send(result_message.unwrap()).await { + Ok(()) => { + // success + continue 'recv_loop; + } + Err(DisconnectError(_subscription_message)) => { + log::debug!("Stopping subscription task on disconnect"); + return; + } + }; + } + Err(Lagged(lagged)) => { + // this usually happens if there is one "slow receiver", see https://docs.rs/tokio/latest/tokio/sync/broadcast/index.html#lagging + log::warn!( + "subscriber laggs some({}) priofees update messages - continue", + lagged + ); + continue 'recv_loop; + } + Err(Closed) => { + log::error!("failed to receive block, sender closed - aborting"); + return; + } + } + } + }); + + Ok(()) + } + + async fn latest_account_priofees_subscribe( + &self, + pending: PendingSubscriptionSink, + account: String, + ) -> SubscriptionResult { + let Ok(account) = Pubkey::from_str(&account) else { + return Err(StringError::from("Invalid account".to_string())); + }; + let sink = pending.accept().await?; + let mut account_fees_stream = self + .account_priofees_service + .priofees_update_sender + .subscribe(); + tokio::spawn(async move { + RPC_BLOCK_PRIOFEES_SUBSCRIBE.inc(); + + 'recv_loop: loop { + match account_fees_stream.recv().await { + Ok(AccountPrioFeesUpdateMessage { + slot, + accounts_data, + }) => { + if let Some(account_data) = accounts_data.get(&account) { + let result_message = + jsonrpsee::SubscriptionMessage::from_json(&RpcResponse { + context: RpcResponseContext { + slot, + api_version: None, + }, + value: account_data, + }); + + match sink.send(result_message.unwrap()).await { + Ok(()) => { + // success + continue 'recv_loop; + } + Err(DisconnectError(_subscription_message)) => { + log::debug!("Stopping subscription task on disconnect"); + return; + } + }; + } + } + Err(Lagged(lagged)) => { + // this usually happens if there is one "slow receiver", see https://docs.rs/tokio/latest/tokio/sync/broadcast/index.html#lagging + log::warn!( + "subscriber laggs some({}) priofees update messages - continue", + lagged + ); + continue 'recv_loop; + } + Err(Closed) => { + log::error!("failed to receive block, sender closed - aborting"); + return; + } + } + } + }); + + Ok(()) + } + + async fn account_subscribe( + &self, + pending: PendingSubscriptionSink, + account: String, + config: Option, + ) -> SubscriptionResult { + let Ok(account) = Pubkey::from_str(&account) else { + return Err(StringError::from("Invalid account".to_string())); + }; + + let Some(accounts_service) = &self.accounts_service else { + return Err(StringError::from( + "Accounts service not configured".to_string(), + )); + }; + let sink = pending.accept().await?; + let mut accounts_stream = accounts_service.account_notification_sender.subscribe(); + + tokio::spawn(async move { + RPC_ACCOUNT_SUBSCRIBE.inc(); + + loop { + match accounts_stream.recv().await { + Ok(AccountNotificationMessage { data, commitment }) => { + if sink.is_closed() { + // sink is already closed + return; + } + + if data.pubkey != account { + // notification is different account + continue; + } + let account_config = config.clone().unwrap_or_default(); + let config_commitment = account_config.commitment.unwrap_or_default(); + let min_context_slot = account_config.min_context_slot.unwrap_or_default(); + // check config + // check if commitment match + if Commitment::from(config_commitment) != commitment { + continue; + } + // check for min context slot + if data.updated_slot < min_context_slot { + continue; + } + + let result_message = + jsonrpsee::SubscriptionMessage::from_json(&RpcResponse { + context: RpcResponseContext { + slot: data.updated_slot, + api_version: None, + }, + value: AccountService::convert_account_data_to_ui_account( + &data, + config.clone(), + ), + }); + + match sink.send(result_message.unwrap()).await { + Ok(()) => { + // success + continue; + } + Err(DisconnectError(_subscription_message)) => { + log::debug!("Stopping subscription task on disconnect"); + return; + } + }; + } + Err(Lagged(lagged)) => { + // this usually happens if there is one "slow receiver", see https://docs.rs/tokio/latest/tokio/sync/broadcast/index.html#lagging + log::warn!( + "subscriber laggs some({}) accounts messages - continue", + lagged + ); + continue; + } + Err(Closed) => { + log::error!( + "failed to receive account notifications, sender closed - aborting" + ); + return; + } + } + } + }); + + Ok(()) + } + + async fn program_subscribe( + &self, + pending: PendingSubscriptionSink, + pubkey_str: String, + config: Option, + ) -> SubscriptionResult { + let Ok(program_id) = Pubkey::from_str(&pubkey_str) else { + return Err(StringError::from("Invalid account".to_string())); + }; + + let Some(accounts_service) = &self.accounts_service else { + return Err(StringError::from( + "Accounts service not configured".to_string(), + )); + }; + let sink = pending.accept().await?; + let mut accounts_stream = accounts_service.account_notification_sender.subscribe(); + + tokio::spawn(async move { + RPC_ACCOUNT_SUBSCRIBE.inc(); + + loop { + match accounts_stream.recv().await { + Ok(AccountNotificationMessage { data, commitment }) => { + if sink.is_closed() { + // sink is already closed + return; + } + if data.account.owner != program_id { + // wrong program owner + continue; + } + + let program_config = config.clone().unwrap_or_default(); + let config_commitment = + program_config.account_config.commitment.unwrap_or_default(); + let min_context_slot = program_config + .account_config + .min_context_slot + .unwrap_or_default(); + // check config + // check if commitment match + if Commitment::from(config_commitment) != commitment { + continue; + } + // check for min context slot + if data.updated_slot < min_context_slot { + continue; + } + // check filters + if let Some(filters) = program_config.filters { + if filters.iter().any(|filter| !data.allows(filter)) { + // filters not stasfied + continue; + } + } + + let value = RpcKeyedAccount { + pubkey: data.pubkey.to_string(), + account: AccountService::convert_account_data_to_ui_account( + &data, + config.clone().map(|x| x.account_config), + ), + }; + + let result_message = + jsonrpsee::SubscriptionMessage::from_json(&RpcResponse { + context: RpcResponseContext { + slot: data.updated_slot, + api_version: None, + }, + value, + }); + + match sink.send(result_message.unwrap()).await { + Ok(()) => { + // success + continue; + } + Err(DisconnectError(_subscription_message)) => { + log::debug!("Stopping subscription task on disconnect"); + return; + } + }; + } + Err(Lagged(lagged)) => { + // this usually happens if there is one "slow receiver", see https://docs.rs/tokio/latest/tokio/sync/broadcast/index.html#lagging + log::warn!( + "subscriber laggs some({}) program accounts messages - continue", + lagged + ); + continue; + } + Err(Closed) => { + log::error!( + "failed to receive account notifications, sender closed - aborting" + ); + return; + } + } + } + }); + Ok(()) + } +} diff --git a/lite-rpc/src/cli.rs b/lite-rpc/src/cli.rs index b51d3ed1..ed511710 100644 --- a/lite-rpc/src/cli.rs +++ b/lite-rpc/src/cli.rs @@ -82,6 +82,9 @@ pub struct Config { #[serde(default)] pub enable_address_lookup_tables: Option, + + #[serde(default)] + pub account_filters: Option, } impl Config { diff --git a/lite-rpc/src/lib.rs b/lite-rpc/src/lib.rs index a2c24f7f..ae7bafe7 100644 --- a/lite-rpc/src/lib.rs +++ b/lite-rpc/src/lib.rs @@ -2,13 +2,16 @@ use const_env::from_env; use solana_transaction_status::TransactionConfirmationStatus; pub mod bridge; +pub mod bridge_pubsub; pub mod cli; pub mod configs; pub mod errors; pub mod jsonrpsee_subscrption_handler_sink; pub mod postgres_logger; pub mod rpc; +pub mod rpc_pubsub; pub mod service_spawner; +pub mod start_server; #[from_env] pub const DEFAULT_RPC_ADDR: &str = "http://0.0.0.0:8899"; diff --git a/lite-rpc/src/main.rs b/lite-rpc/src/main.rs index cd7235f5..6b9c148b 100644 --- a/lite-rpc/src/main.rs +++ b/lite-rpc/src/main.rs @@ -4,11 +4,16 @@ use crate::rpc_tester::RpcTester; use anyhow::bail; use dashmap::DashMap; use lite_rpc::bridge::LiteBridge; +use lite_rpc::bridge_pubsub::LitePubSubBridge; use lite_rpc::cli::Config; use lite_rpc::postgres_logger::PostgresLogger; use lite_rpc::service_spawner::ServiceSpawner; +use lite_rpc::start_server::start_servers; use lite_rpc::{DEFAULT_MAX_NUMBER_OF_TXS_IN_QUEUE, MAX_NB_OF_CONNECTIONS_WITH_LEADERS}; use log::{debug, info}; +use solana_lite_rpc_accounts::account_service::AccountService; +use solana_lite_rpc_accounts::account_store_interface::AccountStorageInterface; +use solana_lite_rpc_accounts::inmemory_account_store::InmemoryAccountStore; use solana_lite_rpc_address_lookup_tables::address_lookup_table_store::AddressLookupTableStore; use solana_lite_rpc_blockstore::history::History; use solana_lite_rpc_cluster_endpoints::endpoint_stremers::EndpointStreaming; @@ -27,6 +32,7 @@ use solana_lite_rpc_core::stores::{ subscription_store::SubscriptionStore, tx_store::TxStore, }; +use solana_lite_rpc_core::structures::account_filter::AccountFilters; use solana_lite_rpc_core::structures::leaderschedule::CalculatedSchedule; use solana_lite_rpc_core::structures::{ epoch::EpochCache, identity_stakes::IdentityStakes, notifications::NotificationSender, @@ -124,6 +130,7 @@ pub async fn start_lite_rpc(args: Config, rpc_client: Arc) -> anyhow: enable_grpc_stream_inspection, enable_address_lookup_tables, address_lookup_tables_binary, + account_filters, .. } = args; @@ -137,6 +144,13 @@ pub async fn start_lite_rpc(args: Config, rpc_client: Arc) -> anyhow: let tpu_connection_path = configure_tpu_connection_path(quic_proxy_addr); + let account_filters = if let Some(account_filters) = account_filters { + serde_json::from_str::(account_filters.as_str()) + .expect("Account filters should be valid") + } else { + vec![] + }; + let (subscriptions, cluster_endpoint_tasks) = if use_grpc { info!("Creating geyser subscription..."); @@ -146,7 +160,6 @@ pub async fn start_lite_rpc(args: Config, rpc_client: Arc) -> anyhow: subscribe_timeout: Duration::from_secs(5), receive_timeout: Duration::from_secs(5), }; - create_grpc_subscription( rpc_client.clone(), grpc_sources @@ -155,6 +168,7 @@ pub async fn start_lite_rpc(args: Config, rpc_client: Arc) -> anyhow: GrpcSourceConfig::new(s.addr.clone(), s.x_token.clone(), None, timeouts.clone()) }) .collect(), + account_filters.clone(), )? } else { info!("Creating RPC poll subscription..."); @@ -166,9 +180,37 @@ pub async fn start_lite_rpc(args: Config, rpc_client: Arc) -> anyhow: cluster_info_notifier, slot_notifier, vote_account_notifier, + processed_account_stream, } = subscriptions; - setup_grpc_stream_debugging(enable_grpc_stream_inspection, &blocks_notifier); + if enable_grpc_stream_inspection { + setup_grpc_stream_debugging(&blocks_notifier) + } else { + info!("Disabled grpc stream inspection"); + } + + let accounts_service = if let Some(account_stream) = processed_account_stream { + // lets use inmemory storage for now + let inmemory_account_storage: Arc = + Arc::new(InmemoryAccountStore::new()); + const MAX_CONNECTIONS_IN_PARALLEL: usize = 10; + let account_service = AccountService::new(inmemory_account_storage); + + account_service + .populate_from_rpc( + rpc_client.clone(), + &account_filters, + MAX_CONNECTIONS_IN_PARALLEL, + ) + .await?; + + account_service + .process_account_stream(account_stream.resubscribe(), blocks_notifier.resubscribe()); + + Some(account_service) + } else { + None + }; info!("Waiting for first finalized block..."); let finalized_block = @@ -290,20 +332,30 @@ pub async fn start_lite_rpc(args: Config, rpc_client: Arc) -> anyhow: let history = History::new(); - let bridge_service = tokio::spawn( - LiteBridge::new( - rpc_client.clone(), - data_cache.clone(), - transaction_service, - history, - block_priofees_service, - account_priofees_service, - blocks_notifier.resubscribe(), - ) - .start(lite_rpc_http_addr, lite_rpc_ws_addr), + let rpc_service = LiteBridge::new( + data_cache.clone(), + transaction_service, + history, + block_priofees_service.clone(), + account_priofees_service.clone(), + accounts_service.clone(), ); + + let pubsub_service = LitePubSubBridge::new( + data_cache.clone(), + block_priofees_service, + account_priofees_service, + blocks_notifier, + accounts_service.clone(), + ); + + let bridge_service = tokio::spawn(start_servers( + rpc_service, + pubsub_service, + lite_rpc_ws_addr, + lite_rpc_http_addr, + )); drop(slot_notifier); - drop(blocks_notifier); tokio::select! { res = tx_service_jh => { @@ -337,22 +389,18 @@ pub async fn start_lite_rpc(args: Config, rpc_client: Arc) -> anyhow: } } -fn setup_grpc_stream_debugging(enable_grpc_stream_debugging: bool, blocks_notifier: &BlockStream) { - if enable_grpc_stream_debugging { - info!("Setting up grpc stream inspection"); - // note: check failes for commitment_config processed because sources might disagree on the blocks - debugtask_blockstream_slot_progression( - blocks_notifier.resubscribe(), - CommitmentConfig::confirmed(), - ); - debugtask_blockstream_slot_progression( - blocks_notifier.resubscribe(), - CommitmentConfig::finalized(), - ); - debugtask_blockstream_confirmation_sequence(blocks_notifier.resubscribe()); - } else { - info!("Disabled grpc stream inspection"); - } +fn setup_grpc_stream_debugging(blocks_notifier: &BlockStream) { + info!("Setting up grpc stream inspection"); + // note: check failes for commitment_config processed because sources might disagree on the blocks + debugtask_blockstream_slot_progression( + blocks_notifier.resubscribe(), + CommitmentConfig::confirmed(), + ); + debugtask_blockstream_slot_progression( + blocks_notifier.resubscribe(), + CommitmentConfig::finalized(), + ); + debugtask_blockstream_confirmation_sequence(blocks_notifier.resubscribe()); } #[tokio::main(flavor = "multi_thread", worker_threads = 16)] diff --git a/lite-rpc/src/rpc.rs b/lite-rpc/src/rpc.rs index e46f10c2..1102ac67 100644 --- a/lite-rpc/src/rpc.rs +++ b/lite-rpc/src/rpc.rs @@ -1,17 +1,17 @@ use crate::configs::{IsBlockHashValidConfig, SendTransactionConfig}; -use jsonrpsee::core::SubscriptionResult; use jsonrpsee::proc_macros::rpc; +use solana_account_decoder::UiAccount; use solana_lite_rpc_prioritization_fees::prioritization_fee_calculation_method::PrioritizationFeeCalculationMethod; use solana_lite_rpc_prioritization_fees::rpc_data::{AccountPrioFeesStats, PrioFeesStats}; use solana_rpc_client_api::config::{ - RpcBlockSubscribeConfig, RpcBlockSubscribeFilter, RpcBlocksConfigWrapper, RpcContextConfig, - RpcGetVoteAccountsConfig, RpcLeaderScheduleConfig, RpcProgramAccountsConfig, - RpcRequestAirdropConfig, RpcSignatureStatusConfig, RpcSignatureSubscribeConfig, - RpcSignaturesForAddressConfig, RpcTransactionLogsConfig, RpcTransactionLogsFilter, + RpcAccountInfoConfig, RpcBlocksConfigWrapper, RpcContextConfig, RpcGetVoteAccountsConfig, + RpcLeaderScheduleConfig, RpcProgramAccountsConfig, RpcRequestAirdropConfig, + RpcSignatureStatusConfig, RpcSignaturesForAddressConfig, }; use solana_rpc_client_api::response::{ - Response as RpcResponse, RpcBlockhash, RpcConfirmedTransactionStatusWithSignature, - RpcContactInfo, RpcPerfSample, RpcPrioritizationFee, RpcVersionInfo, RpcVoteAccountStatus, + OptionalContext, Response as RpcResponse, RpcBlockhash, + RpcConfirmedTransactionStatusWithSignature, RpcContactInfo, RpcKeyedAccount, RpcPerfSample, + RpcPrioritizationFee, RpcVersionInfo, RpcVoteAccountStatus, }; use solana_sdk::commitment_config::CommitmentConfig; use solana_sdk::epoch_info::EpochInfo; @@ -152,59 +152,7 @@ pub trait LiteRpc { config: Option, ) -> Result; - // *********************** - // Direct Subscription Domain - // *********************** - - #[subscription(name = "programSubscribe" => "programNotification", unsubscribe="programUnsubscribe", item=RpcResponse)] - async fn program_subscribe( - &self, - pubkey_str: String, - config: Option, - ) -> SubscriptionResult; - - #[subscription(name = "slotSubscribe" => "slotNotification", unsubscribe="slotUnsubscribe", item=Slot)] - async fn slot_subscribe(&self) -> SubscriptionResult; - - #[subscription(name = "blockSubscribe" => "blockNotification", unsubscribe="blockUnsubscribe", item=RpcResponse)] - async fn block_subscribe( - &self, - filter: RpcBlockSubscribeFilter, - config: Option, - ) -> SubscriptionResult; - - // [transactionSubscribe](https://github.com/solana-foundation/solana-improvement-documents/pull/69) - // - //#[subscription(name = "transactionSubscribe" => "transactionNotification", unsubscribe="transactionUnsubscribe", item=RpcResponse)] - //async fn transaction_subscribe( - // &self, - // commitment_config: CommitmentConfig, - //) -> SubscriptionResult; - - // *********************** - // Indirect Subscription Domain - // *********************** - - #[subscription(name = "logsSubscribe" => "logsNotification", unsubscribe="logsUnsubscribe", item=RpcResponse)] - async fn logs_subscribe( - &self, - filter: RpcTransactionLogsFilter, - config: Option, - ) -> SubscriptionResult; - - // WARN: enable_received_notification: bool is ignored - #[subscription(name = "signatureSubscribe" => "signatureNotification", unsubscribe="signatureUnsubscribe", item=RpcResponse)] - async fn signature_subscribe( - &self, - signature: String, - config: RpcSignatureSubscribeConfig, - ) -> SubscriptionResult; - - #[subscription(name = "slotUpdatesSubscribe" => "slotUpdatesNotification", unsubscribe="slotUpdatesUnsubscribe", item=SlotUpdate)] - async fn slot_updates_subscribe(&self) -> SubscriptionResult; - - #[subscription(name = "voteSubscribe" => "voteNotification", unsubscribe="voteUnsubscribe", item=RpcVote)] - async fn vote_subscribe(&self) -> SubscriptionResult; + // ********************** #[method(name = "getEpochInfo")] async fn get_epoch_info( @@ -243,10 +191,6 @@ pub trait LiteRpc { method: Option, ) -> crate::rpc::Result>; - /// subscribe to prio fees distribution per block; uses confirmation level "confirmed" - #[subscription(name = "blockPrioritizationFeesSubscribe" => "blockPrioritizationFeesNotification", unsubscribe="blockPrioritizationFeesUnsubscribe", item=PrioFeesStats)] - async fn latest_block_priofees_subscribe(&self) -> SubscriptionResult; - #[method(name = "getLatestAccountPrioFees")] async fn get_latest_account_priofees( &self, @@ -254,6 +198,28 @@ pub trait LiteRpc { method: Option, ) -> crate::rpc::Result>; - #[subscription(name = "accountPrioritizationFeesSubscribe" => "accountPrioritizationFeesNotification", unsubscribe="accountPrioritizationFeesUnsubscribe", item=AccountPrioFeesStats)] - async fn latest_account_priofees_subscribe(&self, account: String) -> SubscriptionResult; + // ************************** + // Accounts + // ************************** + + #[method(name = "getAccountInfo")] + async fn get_account_info( + &self, + pubkey_str: String, + config: Option, + ) -> crate::rpc::Result>>; + + #[method(name = "getMultipleAccounts")] + async fn get_multiple_accounts( + &self, + pubkey_strs: Vec, + config: Option, + ) -> crate::rpc::Result>>>; + + #[method(name = "getProgramAccounts")] + async fn get_program_accounts( + &self, + program_id_str: String, + config: Option, + ) -> crate::rpc::Result>>; } diff --git a/lite-rpc/src/rpc_pubsub.rs b/lite-rpc/src/rpc_pubsub.rs new file mode 100644 index 00000000..4650ce24 --- /dev/null +++ b/lite-rpc/src/rpc_pubsub.rs @@ -0,0 +1,80 @@ +use jsonrpsee::core::SubscriptionResult; +use jsonrpsee::proc_macros::rpc; +use solana_rpc_client_api::config::{ + RpcAccountInfoConfig, RpcBlockSubscribeConfig, RpcBlockSubscribeFilter, + RpcProgramAccountsConfig, RpcSignatureSubscribeConfig, RpcTransactionLogsConfig, + RpcTransactionLogsFilter, +}; + +pub type Result = std::result::Result; + +#[rpc(server)] +pub trait LiteRpcPubSub { + // *********************** + // Direct Subscription Domain + // *********************** + + #[subscription(name = "slotSubscribe" => "slotNotification", unsubscribe="slotUnsubscribe", item=Slot)] + async fn slot_subscribe(&self) -> SubscriptionResult; + + #[subscription(name = "blockSubscribe" => "blockNotification", unsubscribe="blockUnsubscribe", item=RpcResponse)] + async fn block_subscribe( + &self, + filter: RpcBlockSubscribeFilter, + config: Option, + ) -> SubscriptionResult; + + // [transactionSubscribe](https://github.com/solana-foundation/solana-improvement-documents/pull/69) + // + //#[subscription(name = "transactionSubscribe" => "transactionNotification", unsubscribe="transactionUnsubscribe", item=RpcResponse)] + //async fn transaction_subscribe( + // &self, + // commitment_config: CommitmentConfig, + //) -> SubscriptionResult; + + // *********************** + // Indirect Subscription Domain + // *********************** + + #[subscription(name = "logsSubscribe" => "logsNotification", unsubscribe="logsUnsubscribe", item=RpcResponse)] + async fn logs_subscribe( + &self, + filter: RpcTransactionLogsFilter, + config: Option, + ) -> SubscriptionResult; + + // WARN: enable_received_notification: bool is ignored + #[subscription(name = "signatureSubscribe" => "signatureNotification", unsubscribe="signatureUnsubscribe", item=RpcResponse)] + async fn signature_subscribe( + &self, + signature: String, + config: RpcSignatureSubscribeConfig, + ) -> SubscriptionResult; + + #[subscription(name = "slotUpdatesSubscribe" => "slotUpdatesNotification", unsubscribe="slotUpdatesUnsubscribe", item=SlotUpdate)] + async fn slot_updates_subscribe(&self) -> SubscriptionResult; + + #[subscription(name = "voteSubscribe" => "voteNotification", unsubscribe="voteUnsubscribe", item=RpcVote)] + async fn vote_subscribe(&self) -> SubscriptionResult; + + /// subscribe to prio fees distribution per block; uses confirmation level "confirmed" + #[subscription(name = "blockPrioritizationFeesSubscribe" => "blockPrioritizationFeesNotification", unsubscribe="blockPrioritizationFeesUnsubscribe", item=PrioFeesStats)] + async fn latest_block_priofees_subscribe(&self) -> SubscriptionResult; + + #[subscription(name = "accountPrioritizationFeesSubscribe" => "accountPrioritizationFeesNotification", unsubscribe="accountPrioritizationFeesUnsubscribe", item=AccountPrioFeesStats)] + async fn latest_account_priofees_subscribe(&self, account: String) -> SubscriptionResult; + + #[subscription(name = "accountSubscribe" => "accountNotification", unsubscribe="accountUnsubscribe", item=RpcResponse)] + async fn account_subscribe( + &self, + account: String, + config: Option, + ) -> SubscriptionResult; + + #[subscription(name = "programSubscribe" => "programNotification", unsubscribe="programUnsubscribe", item=RpcResponse)] + async fn program_subscribe( + &self, + pubkey_str: String, + config: Option, + ) -> SubscriptionResult; +} diff --git a/lite-rpc/src/start_server.rs b/lite-rpc/src/start_server.rs new file mode 100644 index 00000000..d415460d --- /dev/null +++ b/lite-rpc/src/start_server.rs @@ -0,0 +1,48 @@ +use crate::{ + bridge::LiteBridge, bridge_pubsub::LitePubSubBridge, rpc::LiteRpcServer, + rpc_pubsub::LiteRpcPubSubServer, +}; +use jsonrpsee::server::ServerBuilder; +use solana_lite_rpc_core::AnyhowJoinHandle; +pub async fn start_servers( + rpc: LiteBridge, + pubsub: LitePubSubBridge, + ws_addr: String, + http_addr: String, +) -> anyhow::Result<()> { + let rpc = rpc.into_rpc(); + let pubsub = pubsub.into_rpc(); + + let ws_server_handle = ServerBuilder::default() + .ws_only() + .build(ws_addr.clone()) + .await? + .start(pubsub)?; + + let http_server_handle = ServerBuilder::default() + .http_only() + .build(http_addr.clone()) + .await? + .start(rpc)?; + + let ws_server: AnyhowJoinHandle = tokio::spawn(async move { + log::info!("Websocket Server started at {ws_addr:?}"); + ws_server_handle.stopped().await; + anyhow::bail!("Websocket server stopped"); + }); + + let http_server: AnyhowJoinHandle = tokio::spawn(async move { + log::info!("HTTP Server started at {http_addr:?}"); + http_server_handle.stopped().await; + anyhow::bail!("HTTP server stopped"); + }); + + tokio::select! { + res = ws_server => { + anyhow::bail!("WebSocket server {res:?}"); + }, + res = http_server => { + anyhow::bail!("HTTP server {res:?}"); + }, + } +} diff --git a/prioritization_fees/src/account_prio_service.rs b/prioritization_fees/src/account_prio_service.rs index cb9fd71f..a894502c 100644 --- a/prioritization_fees/src/account_prio_service.rs +++ b/prioritization_fees/src/account_prio_service.rs @@ -45,7 +45,7 @@ impl AccountPrioService { } Err(Lagged(_lagged)) => { log::warn!( - "channel error receiving block for priofees calculation - continue" + "channel lagged receiving block for account priofees calculation - continue" ); continue 'recv_loop; } diff --git a/prioritization_fees/src/block_priofees.rs b/prioritization_fees/src/block_priofees.rs index 4ecb1b43..7c74e319 100644 --- a/prioritization_fees/src/block_priofees.rs +++ b/prioritization_fees/src/block_priofees.rs @@ -17,6 +17,7 @@ pub struct PrioFeeStore { recent: Arc>>, } +#[derive(Clone)] pub struct PrioFeesService { pub block_fees_store: PrioFeeStore, // use .subscribe() to get a receiver @@ -137,7 +138,9 @@ pub fn start_block_priofees_task( } } Err(Lagged(_lagged)) => { - warn!("channel error receiving block for priofees calculation - continue"); + warn!( + "channel lagged receiving block for block priofees calculation - continue" + ); continue 'recv_loop; } Err(Closed) => {