diff --git a/Cargo.lock b/Cargo.lock index 6f522d1..676af6e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1246,6 +1246,7 @@ dependencies = [ "byteorder", "bytes", "dashmap", + "expect-test", "figment", "flatbuffers", "fnv", @@ -2339,6 +2340,12 @@ dependencies = [ "syn 2.0.79", ] +[[package]] +name = "dissimilar" +version = "1.0.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59f8e79d1fbf76bdfbde321e902714bf6c49df88a7dda6fc682fc2979226962d" + [[package]] name = "dlopen2" version = "0.5.0" @@ -2552,6 +2559,16 @@ version = "2.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" +[[package]] +name = "expect-test" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e0be0a561335815e06dab7c62e50353134c796e7a6155402a64bcff66b6a5e0" +dependencies = [ + "dissimilar", + "once_cell", +] + [[package]] name = "fallible-iterator" version = "0.3.0" diff --git a/node/Cargo.toml b/node/Cargo.toml index eafa26c..6eee1a5 100644 --- a/node/Cargo.toml +++ b/node/Cargo.toml @@ -82,6 +82,7 @@ yellowstone-grpc-client = { workspace = true } yellowstone-grpc-proto = { workspace = true } [dev-dependencies] +expect-test = "1.5.0" [lints] workspace = true diff --git a/node/src/ingest/dragon.rs b/node/src/ingest/dragon.rs index 1e97c9f..da978a3 100644 --- a/node/src/ingest/dragon.rs +++ b/node/src/ingest/dragon.rs @@ -1,13 +1,21 @@ use std::{collections::HashMap, time::Duration}; -use crate::types::BonsolInstruction; +use { + anyhow::anyhow, + solana_sdk::{message::AccountKeys, transaction::VersionedTransaction}, + solana_transaction_status::TransactionStatusMeta, + yellowstone_grpc_proto::geyser::SubscribeUpdate, +}; + +use crate::types::{filter_bonsol_instructions, BonsolInstruction}; use { super::{Ingester, TxChannel}, anyhow::Result, futures::stream::StreamExt, solana_sdk::{message::VersionedMessage, pubkey::Pubkey}, - yellowstone_grpc_client::GeyserGrpcClient, + tokio::sync::mpsc::UnboundedSender, + yellowstone_grpc_client::{GeyserGrpcBuilder, GeyserGrpcClient}, yellowstone_grpc_proto::{ convert_from::create_tx_with_meta, prelude::{ @@ -15,6 +23,7 @@ use { }, }, }; + pub struct GrpcIngester { url: String, token: String, @@ -38,135 +47,29 @@ impl GrpcIngester { op_handle: None, } } + pub fn url(&self) -> &str { + &self.url + } +} + +impl<'a> TryFrom<&'a mut GrpcIngester> for GeyserGrpcBuilder { + type Error = anyhow::Error; + fn try_from(value: &'a mut GrpcIngester) -> Result { + Ok(GeyserGrpcClient::build_from_shared(value.url.clone())? + .x_token(Some(value.token.clone()))? + .connect_timeout(Duration::from_secs( + value.connection_timeout_secs.unwrap_or(10) as u64, + )) + .timeout(Duration::from_secs(value.timeout_secs.unwrap_or(10) as u64))) + } } impl Ingester for GrpcIngester { fn start(&mut self, program: Pubkey) -> Result { let (txchan, rx) = tokio::sync::mpsc::unbounded_channel(); - let stream_client = GeyserGrpcClient::build_from_shared(self.url.clone())? - .x_token(Some(self.token.clone()))? - .connect_timeout(Duration::from_secs( - self.connection_timeout_secs.unwrap_or(10) as u64, - )) - .timeout(Duration::from_secs(self.timeout_secs.unwrap_or(10) as u64)); + let stream_client = GeyserGrpcBuilder::try_from(&mut *self)?; self.op_handle = Some(tokio::spawn(async move { - let mut client = stream_client.connect().await?; - let mut txmap = HashMap::new(); - txmap.insert( - program.to_string(), - SubscribeRequestFilterTransactions { - vote: Some(false), - failed: Some(false), - account_required: vec![program.to_string()], - ..Default::default() - }, - ); - let (_, mut stream) = client - .subscribe_with_request(Some(SubscribeRequest { - transactions: txmap, - ..Default::default() - })) - .await?; - while let Some(message) = stream.next().await { - match message { - Ok(msg) => { - if let Some(UpdateOneof::Transaction(txw)) = msg.update_oneof { - if let Some(tx) = txw.transaction { - if let Ok(soltxn) = create_tx_with_meta(tx) { - let acc = soltxn.account_keys(); - let txndata = soltxn.get_transaction(); - let meta = soltxn.get_status_meta(); - //unwrap so we can consume - if let VersionedMessage::V0(msg) = txndata.message { - let bonsolixs = msg - .instructions - .into_iter() - .filter(|ix| { - acc.get(ix.program_id_index as usize) - == Some(&program) - }) - .map(|ix| BonsolInstruction { - cpi: false, - accounts: ix - .accounts - .into_iter() - .map(|idx| { - acc.get(idx as usize) - .map(|a| *a) - .unwrap_or_default() - }) - .collect(), - data: ix.data, - last_known_block: txw.slot, - }) - .collect::>(); - if bonsolixs.len() > 0 { - match txchan.send(bonsolixs) { - Ok(_) => {} - Err(e) => { - println!( - "Error sending to txn ingest channel: {:?}", - e - ); - } - } - } - - if let Some(metadata) = meta { - if let Some(inner_ix) = metadata.inner_instructions { - let ixs = inner_ix - .into_iter() - .flat_map(|ix| { - ix.instructions - .into_iter() - .filter(|ix| { - acc.get( - ix.instruction.program_id_index - as usize, - ) == Some(&program) - }) - .map(|ix| BonsolInstruction { - cpi: true, - accounts: ix - .instruction - .accounts - .into_iter() - .map(|a| { - acc.get(a as usize) - .map(|a| *a) - .unwrap_or_default() - }) - .collect(), - data: ix.instruction.data, - last_known_block: txw.slot, - }) - .collect::>() - }) - .collect::>(); - if ixs.len() > 0 { - match txchan.send(ixs) { - Ok(_) => {} - Err(e) => { - println!( - "Error sending to txn ingest channel: {:?}", - e - ); - } - } - } - } - } - } - } - } - } - } - Err(_) => { - println!("Error in stream"); - } - } - } - return Ok(()); + ingest(program, txchan, stream_client).await })); Ok(rx) } @@ -176,3 +79,254 @@ impl Ingester for GrpcIngester { Ok(()) } } + +async fn ingest( + program: Pubkey, + txchan: UnboundedSender>, + stream_client: GeyserGrpcBuilder, +) -> Result<()> { + let mut client = stream_client.connect().await?; + let mut txmap = HashMap::new(); + txmap.insert( + program.to_string(), + SubscribeRequestFilterTransactions { + vote: Some(false), + failed: Some(false), + account_required: vec![program.to_string()], + ..Default::default() + }, + ); + let (_, mut stream) = client + .subscribe_with_request(Some(SubscribeRequest { + transactions: txmap, + ..Default::default() + })) + .await?; + + while let Some(message) = stream.next().await { + match message { + Ok(msg) => { + if let Err(e) = handle_msg(msg, program, &txchan) { + eprintln!("Error in stream: {e:?}") + } + } + Err(e) => eprintln!("Error in stream: {e:?}"), + } + } + Ok(()) +} + +fn handle_msg( + msg: SubscribeUpdate, + program: Pubkey, + txchan: &UnboundedSender>, +) -> Result<()> { + if let Some(UpdateOneof::Transaction(txw)) = msg.update_oneof { + txw.transaction.map(|tx| -> Result<()> { + create_tx_with_meta(tx) + .map(|soltxn| { + try_send_instructions( + program, + txw.slot, + soltxn.account_keys(), + soltxn.get_transaction(), + soltxn.get_status_meta(), + &txchan, + ) + }) + .map_err(|e| anyhow!("error while sending instructions: {e}"))? + }); + } + Ok(()) +} + +fn try_send_instructions( + program: Pubkey, + last_known_block: u64, + acc: AccountKeys, + txndata: VersionedTransaction, + meta: Option, + txchan: &UnboundedSender>, +) -> Result<()> { + let program_filter = |acc: &AccountKeys, program: &Pubkey, index: usize| -> bool { + acc.get(index).is_some_and(|p| p == program) + }; + + if let VersionedMessage::V0(msg) = txndata.message { + let mut bonsolixs: Vec = filter_bonsol_instructions( + msg.instructions, + &acc, + &program, + last_known_block, + program_filter, + ) + .collect(); + if let Some(metadata) = meta { + if let Some(inner_ix) = metadata.inner_instructions { + bonsolixs.extend(inner_ix.into_iter().flat_map(|ix| { + filter_bonsol_instructions( + ix.instructions, + &acc, + &program, + last_known_block, + program_filter, + ) + })); + } + } + if !bonsolixs.is_empty() { + txchan.send(bonsolixs).map_err(|e| { + anyhow!( + "failed to send instructions to txn ingest channel: {:?}", + e.0 + ) + })? + } + } + Ok(()) +} + +#[cfg(test)] +mod dragon_ingester_tests { + use { + expect_test::{expect, Expect}, + solana_sdk::{ + instruction::CompiledInstruction, + message::{v0::Message, AccountKeys, Message as LegacyMessage, VersionedMessage}, + pubkey::Pubkey, + transaction::VersionedTransaction, + }, + solana_transaction_status::{InnerInstruction, InnerInstructions, TransactionStatusMeta}, + }; + + use {super::try_send_instructions, crate::types::BonsolInstruction}; + + fn check_instructions(output: &[BonsolInstruction], expect: Expect) { + expect.assert_eq(&format!("{output:#?}")); + } + + fn create_test_compiled_tx( + instructions: Vec, + legacy: bool, + ) -> (VersionedTransaction, Pubkey) { + let mut t = VersionedTransaction::default(); + let program = Pubkey::new_unique(); + t.message = if legacy { + let mut msg = LegacyMessage::default(); + msg.instructions = instructions; + VersionedMessage::Legacy(msg) + } else { + let mut msg = Message::default(); + msg.instructions = instructions; + VersionedMessage::V0(msg) + }; + (t, program) + } + + fn create_test_inner_tx(instructions: Vec) -> TransactionStatusMeta { + let mut t = TransactionStatusMeta::default(); + t.inner_instructions = Some(instructions); + t + } + + #[tokio::test] + async fn v1_txns_pass() { + let (txchan, mut rx) = tokio::sync::mpsc::unbounded_channel(); + + let (txndata, program) = create_test_compiled_tx( + vec![CompiledInstruction::new_from_raw_parts( + 0, + vec![0, 0, 0, 0], + vec![0, 1], + )], + false, + ); + let meta = create_test_inner_tx(vec![InnerInstructions { + index: 0, + instructions: vec![InnerInstruction { + instruction: CompiledInstruction::new_from_raw_parts( + 0, + vec![0, 0, 0, 0], + vec![0, 1], + ), + stack_height: None, + }], + }]); + let static_keys = vec![program]; + let acc = AccountKeys::new(&static_keys, None); + + try_send_instructions(program, 1, acc, txndata, Some(meta), &txchan) + .expect("failed to send instructions"); + + assert!(!rx.is_empty()); + let bonsol_ixs = rx + .recv() + .await + .expect("expected a non-empty vector of BonsolInstructions"); + + check_instructions( + &bonsol_ixs, + expect![[r#" + [ + BonsolInstruction { + cpi: false, + accounts: [ + 1111111QLbz7JHiBTspS962RLKV8GndWFwiEaqKM, + ], + data: [ + 0, + 0, + 0, + 0, + ], + last_known_block: 1, + }, + BonsolInstruction { + cpi: true, + accounts: [ + 1111111QLbz7JHiBTspS962RLKV8GndWFwiEaqKM, + ], + data: [ + 0, + 0, + 0, + 0, + ], + last_known_block: 1, + }, + ]"#]], + ); + } + + #[tokio::test] + async fn legacy_txns_fail() { + let (txchan, rx) = tokio::sync::mpsc::unbounded_channel(); + + let (txndata, program) = create_test_compiled_tx( + vec![CompiledInstruction::new_from_raw_parts( + 0, + vec![0, 0, 0, 0], + vec![0, 1], + )], + true, + ); + let meta = create_test_inner_tx(vec![InnerInstructions { + index: 0, + instructions: vec![InnerInstruction { + instruction: CompiledInstruction::new_from_raw_parts( + 0, + vec![0, 0, 0, 0], + vec![0, 1], + ), + stack_height: None, + }], + }]); + let static_keys = vec![program]; + let acc = AccountKeys::new(&static_keys, None); + + try_send_instructions(program, 1, acc, txndata, Some(meta), &txchan) + .expect("failed to send instructions"); + + assert!(rx.is_empty()) + } +} diff --git a/node/src/types.rs b/node/src/types.rs index d70764a..2cfe0f4 100644 --- a/node/src/types.rs +++ b/node/src/types.rs @@ -1,13 +1,97 @@ -use solana_sdk::pubkey::Pubkey; +use { + solana_sdk::{instruction::CompiledInstruction, message::AccountKeys, pubkey::Pubkey}, + solana_transaction_status::InnerInstruction, +}; #[derive(Debug)] pub struct BonsolInstruction { + /// Whether we picked up an instruction that was an inner instruction + /// found in the metadata. pub cpi: bool, pub accounts: Vec, pub data: Vec, pub last_known_block: u64, } +impl BonsolInstruction { + pub fn new(cpi: bool, accounts: Vec, data: Vec, last_known_block: u64) -> Self { + Self { + cpi, + accounts, + data, + last_known_block, + } + } + fn inner(accounts: Vec, data: Vec, last_known_block: u64) -> Self { + Self::new(true, accounts, data, last_known_block) + } + fn outer(accounts: Vec, data: Vec, last_known_block: u64) -> Self { + Self::new(false, accounts, data, last_known_block) + } +} + +/// Conversion trait for Inner and Outer instructions to become Bonsol instructions. +/// This does not use From and Into because it requires context other than just +/// the instructions themselves. +pub trait IntoBonsolInstruction { + /// Convert an instruction into a [`BonsolInstruction`]. + fn into_bonsol_ix(self, acc: &AccountKeys, last_known_block: u64) -> BonsolInstruction; + /// Get the index of the `Pubkey` that represents the program in the `AccountKeys` map. + fn program_id_index(&self) -> u8; +} + +impl IntoBonsolInstruction for InnerInstruction { + fn into_bonsol_ix(self, acc: &AccountKeys, last_known_block: u64) -> BonsolInstruction { + BonsolInstruction::inner( + self.instruction + .accounts + .into_iter() + .filter_map(|idx| acc.get(idx as usize).copied()) + .collect(), + self.instruction.data, + last_known_block, + ) + } + fn program_id_index(&self) -> u8 { + self.instruction.program_id_index + } +} + +impl IntoBonsolInstruction for CompiledInstruction { + fn into_bonsol_ix(self, acc: &AccountKeys, last_known_block: u64) -> BonsolInstruction { + BonsolInstruction::outer( + self.accounts + .into_iter() + .filter_map(|idx| acc.get(idx as usize).copied()) + .collect(), + self.data, + last_known_block, + ) + } + fn program_id_index(&self) -> u8 { + self.program_id_index + } +} + +/// Filter instructions that can be converted to `BonsolInstruction`s given +/// a closure which returns a boolean representing some condition that must be met +/// for an instruction to be converted to `Some(BonsolInstruction)`. +pub fn filter_bonsol_instructions<'a, I>( + ixs: Vec, + acc: &'a AccountKeys, + program: &'a Pubkey, + last_known_block: u64, + program_filter: impl Fn(&AccountKeys, &Pubkey, usize) -> bool + 'a, +) -> impl Iterator + 'a +where + I: IntoBonsolInstruction + 'a, +{ + ixs.into_iter().filter_map(move |ix| { + program_filter(acc, program, ix.program_id_index() as usize) + .then(|| ix.into_bonsol_ix(acc, last_known_block)) + }) +} + pub enum CallbackStatus { Completed, Failure,