From a2d953bf2ea91e1d2123c445a19cdb44026e5d51 Mon Sep 17 00:00:00 2001 From: Kirill Fomichev Date: Sat, 30 Nov 2024 11:29:56 -0500 Subject: [PATCH 1/6] add created_at to definition --- Cargo.lock | 1 + Cargo.toml | 1 + yellowstone-grpc-proto/Cargo.toml | 1 + yellowstone-grpc-proto/proto/geyser.proto | 2 ++ yellowstone-grpc-proto/src/lib.rs | 2 +- 5 files changed, 6 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index 518f601a..f30206d4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6025,6 +6025,7 @@ dependencies = [ "criterion", "prost 0.11.9", "prost 0.13.3", + "prost-types 0.13.3", "protobuf-src", "serde", "smallvec", diff --git a/Cargo.toml b/Cargo.toml index af30051c..c240c252 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -46,6 +46,7 @@ log = "0.4.17" maplit = "1.0.2" prometheus = "0.13.2" prost = "0.13.1" +prost-types = "0.13.3" prost_011 = { package = "prost", version = "0.11.9" } protobuf-src = "1.1.0" serde = "1.0.145" diff --git a/yellowstone-grpc-proto/Cargo.toml b/yellowstone-grpc-proto/Cargo.toml index 9d7f7745..9ddf7e56 100644 --- a/yellowstone-grpc-proto/Cargo.toml +++ b/yellowstone-grpc-proto/Cargo.toml @@ -22,6 +22,7 @@ bincode = { workspace = true, optional = true } bs58 = { workspace = true, optional = true } bytes = { workspace = true, optional = true } prost = { workspace = true } +prost-types = { workspace = true } prost_011 = { workspace = true, optional = true } serde = { workspace = true, optional = true } solana-account-decoder = { workspace = true, optional = true } diff --git a/yellowstone-grpc-proto/proto/geyser.proto b/yellowstone-grpc-proto/proto/geyser.proto index 613fcdb9..94f3113b 100644 --- a/yellowstone-grpc-proto/proto/geyser.proto +++ b/yellowstone-grpc-proto/proto/geyser.proto @@ -1,5 +1,6 @@ syntax = "proto3"; +import "google/protobuf/timestamp.proto"; import public "solana-storage.proto"; option go_package = "github.com/rpcpool/yellowstone-grpc/examples/golang/proto"; @@ -119,6 +120,7 @@ message SubscribeUpdate { SubscribeUpdateBlockMeta block_meta = 7; SubscribeUpdateEntry entry = 8; } + google.protobuf.Timestamp created_at = 11; } message SubscribeUpdateAccount { diff --git a/yellowstone-grpc-proto/src/lib.rs b/yellowstone-grpc-proto/src/lib.rs index 80b56af5..a20e989d 100644 --- a/yellowstone-grpc-proto/src/lib.rs +++ b/yellowstone-grpc-proto/src/lib.rs @@ -33,9 +33,9 @@ pub mod prelude { pub use super::{geyser::*, solana::storage::confirmed_block::*}; } -pub use prost; #[cfg(feature = "tonic")] pub use tonic; +pub use {prost, prost_types}; #[cfg(feature = "plugin")] pub mod plugin; From 62d91c3f355f4432f68c267bf2775560be840178 Mon Sep 17 00:00:00 2001 From: Kirill Fomichev Date: Sat, 30 Nov 2024 14:24:34 -0500 Subject: [PATCH 2/6] add to structs --- CHANGELOG.md | 1 + Cargo.lock | 1 + examples/rust/src/bin/client.rs | 22 ++- yellowstone-grpc-geyser/Cargo.toml | 1 + yellowstone-grpc-geyser/src/grpc.rs | 3 + yellowstone-grpc-proto/benches/encode.rs | 7 +- .../src/plugin/filter/filter.rs | 38 +++-- .../src/plugin/filter/message.rs | 89 +++++++---- yellowstone-grpc-proto/src/plugin/message.rs | 142 +++++++++++++----- 9 files changed, 219 insertions(+), 85 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 00d8bb67..1af59864 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,7 @@ The minor version will be incremented upon a breaking change and the patch versi - proto: add tonic feature ([#474](https://github.com/rpcpool/yellowstone-grpc/pull/474)) - geyser: use default compression as gzip and zstd ([#475](https://github.com/rpcpool/yellowstone-grpc/pull/475)) - example: add connection options to Rust client ([#478](https://github.com/rpcpool/yellowstone-grpc/pull/478)) +- proto: add field `created_at` to update message ([#479](https://github.com/rpcpool/yellowstone-grpc/pull/479)) ### Breaking diff --git a/Cargo.lock b/Cargo.lock index f30206d4..a2b8b02c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5997,6 +5997,7 @@ dependencies = [ "lazy_static", "log", "prometheus", + "prost-types 0.13.3", "serde", "serde_json", "solana-logger", diff --git a/examples/rust/src/bin/client.rs b/examples/rust/src/bin/client.rs index b60ffd95..bee198fe 100644 --- a/examples/rust/src/bin/client.rs +++ b/examples/rust/src/bin/client.rs @@ -741,6 +741,11 @@ async fn geyser_subscribe( } let filters = msg.filters; + let created_at: SystemTime = msg + .created_at + .ok_or(anyhow::anyhow!("no created_at in the message"))? + .try_into() + .context("failed to parse created_at")?; match msg.update_oneof { Some(UpdateOneof::Account(msg)) => { let account = msg @@ -749,13 +754,14 @@ async fn geyser_subscribe( let mut value = create_pretty_account(account)?; value["isStartup"] = json!(msg.is_startup); value["slot"] = json!(msg.slot); - print_update("account", &filters, value); + print_update("account", created_at, &filters, value); } Some(UpdateOneof::Slot(msg)) => { let status = CommitmentLevel::try_from(msg.status) .context("failed to decode commitment")?; print_update( "slot", + created_at, &filters, json!({ "slot": msg.slot, @@ -771,11 +777,12 @@ async fn geyser_subscribe( .ok_or(anyhow::anyhow!("no transaction in the message"))?; let mut value = create_pretty_transaction(tx)?; value["slot"] = json!(msg.slot); - print_update("transaction", &filters, value); + print_update("transaction", created_at, &filters, value); } Some(UpdateOneof::TransactionStatus(msg)) => { print_update( "transactionStatus", + created_at, &filters, json!({ "slot": msg.slot, @@ -789,11 +796,12 @@ async fn geyser_subscribe( ); } Some(UpdateOneof::Entry(msg)) => { - print_update("entry", &filters, create_pretty_entry(msg)?); + print_update("entry", created_at, &filters, create_pretty_entry(msg)?); } Some(UpdateOneof::BlockMeta(msg)) => { print_update( "blockmeta", + created_at, &filters, json!({ "slot": msg.slot, @@ -815,6 +823,7 @@ async fn geyser_subscribe( Some(UpdateOneof::Block(msg)) => { print_update( "block", + created_at, &filters, json!({ "slot": msg.slot, @@ -962,10 +971,13 @@ fn create_pretty_entry(msg: SubscribeUpdateEntry) -> anyhow::Result { })) } -fn print_update(kind: &str, filters: &[String], value: Value) { +fn print_update(kind: &str, created_at: SystemTime, filters: &[String], value: Value) { + let unix_since = created_at.duration_since(UNIX_EPOCH).expect("valid system time"); info!( - "{kind} ({}): {}", + "{kind} ({}) at {}.{:0>6}: {}", filters.join(","), + unix_since.as_secs(), + unix_since.subsec_micros(), serde_json::to_string(&value).expect("json serialization failed") ); } diff --git a/yellowstone-grpc-geyser/Cargo.toml b/yellowstone-grpc-geyser/Cargo.toml index d2391a89..0405bd55 100644 --- a/yellowstone-grpc-geyser/Cargo.toml +++ b/yellowstone-grpc-geyser/Cargo.toml @@ -34,6 +34,7 @@ hyper-util = { workspace = true } lazy_static = { workspace = true } log = { workspace = true } prometheus = { workspace = true } +prost-types = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } solana-logger = { workspace = true } diff --git a/yellowstone-grpc-geyser/src/grpc.rs b/yellowstone-grpc-geyser/src/grpc.rs index 79c8f17e..c9a55d07 100644 --- a/yellowstone-grpc-geyser/src/grpc.rs +++ b/yellowstone-grpc-geyser/src/grpc.rs @@ -6,6 +6,7 @@ use { }, anyhow::Context, log::{error, info}, + prost_types::Timestamp, solana_sdk::{ clock::{Slot, MAX_RECENT_BLOCKHASHES}, pubkey::Pubkey, @@ -16,6 +17,7 @@ use { atomic::{AtomicUsize, Ordering}, Arc, }, + time::SystemTime, }, tokio::{ fs, @@ -628,6 +630,7 @@ impl GrpcService { parent: entry.parent_slot, status, dead_error: None, + created_at: Timestamp::from(SystemTime::now()) })); metrics::missed_status_message_inc(status); } diff --git a/yellowstone-grpc-proto/benches/encode.rs b/yellowstone-grpc-proto/benches/encode.rs index 7dd07a26..565564b0 100644 --- a/yellowstone-grpc-proto/benches/encode.rs +++ b/yellowstone-grpc-proto/benches/encode.rs @@ -1,7 +1,8 @@ use { criterion::{criterion_group, criterion_main, BenchmarkId, Criterion}, prost::Message as _, - std::time::Duration, + prost_types::Timestamp, + std::time::{Duration, SystemTime}, yellowstone_grpc_proto::plugin::{ filter::message::{ tests::{ @@ -41,6 +42,7 @@ fn bench_account(c: &mut Criterion) { .map(|(msg, data_slice)| FilteredUpdate { filters: filters.clone(), message: FilteredUpdateOneof::account(&msg, data_slice), + created_at: Timestamp::from(SystemTime::now()), }) .collect::>(); bench!(&updates, "accounts"); @@ -52,7 +54,9 @@ fn bench_account(c: &mut Criterion) { message: FilteredUpdateOneof::transaction(&MessageTransaction { transaction, slot: 42, + created_at: Timestamp::from(SystemTime::now()), }), + created_at: Timestamp::from(SystemTime::now()), }) .collect::>(); bench!(&updates, "transactions"); @@ -62,6 +66,7 @@ fn bench_account(c: &mut Criterion) { .map(|block| FilteredUpdate { filters: filters.clone(), message: FilteredUpdateOneof::block(Box::new(block)), + created_at: Timestamp::from(SystemTime::now()), }) .collect::>(); bench!(&updates, "blocks"); diff --git a/yellowstone-grpc-proto/src/plugin/filter/filter.rs b/yellowstone-grpc-proto/src/plugin/filter/filter.rs index ac64aa9b..a2bbd428 100644 --- a/yellowstone-grpc-proto/src/plugin/filter/filter.rs +++ b/yellowstone-grpc-proto/src/plugin/filter/filter.rs @@ -75,24 +75,24 @@ pub enum FilterError { pub type FilterResult = Result; macro_rules! filtered_updates_once_owned { - ($filters:ident, $message:expr) => {{ + ($filters:ident, $message:expr, $created_at:expr) => {{ let mut messages = FilteredUpdates::new(); if !$filters.is_empty() { - messages.push(FilteredUpdate::new($filters, $message)); + messages.push(FilteredUpdate::new($filters, $message, $created_at)); } messages }}; } macro_rules! filtered_updates_once_ref { - ($filters:ident, $message:expr) => {{ + ($filters:ident, $message:expr, $created_at:expr) => {{ let mut messages = FilteredUpdates::new(); if !$filters.is_empty() { let mut message_filters = FilteredUpdateFilters::new(); for filter in $filters { message_filters.push(filter.clone()); } - messages.push(FilteredUpdate::new(message_filters, $message)); + messages.push(FilteredUpdate::new(message_filters, $message, $created_at)); } messages }}; @@ -339,7 +339,8 @@ impl FilterAccounts { let filters = filter.get_filters(); filtered_updates_once_owned!( filters, - FilteredUpdateOneof::account(message, accounts_data_slice.clone()) + FilteredUpdateOneof::account(message, accounts_data_slice.clone()), + message.created_at ) } } @@ -631,7 +632,11 @@ impl FilterSlots { } }) .collect::(); - filtered_updates_once_owned!(filters, FilteredUpdateOneof::slot(message.clone())) + filtered_updates_once_owned!( + filters, + FilteredUpdateOneof::slot(message.clone()), + message.created_at + ) } } @@ -785,7 +790,8 @@ impl FilterTransactions { FilterTransactionsType::TransactionStatus => { FilteredUpdateOneof::transaction_status(message) } - } + }, + message.created_at ) } } @@ -813,7 +819,11 @@ impl FilterEntries { fn get_updates(&self, message: &Arc) -> FilteredUpdates { let filters = self.filters.as_slice(); - filtered_updates_once_ref!(filters, FilteredUpdateOneof::entry(Arc::clone(message))) + filtered_updates_once_ref!( + filters, + FilteredUpdateOneof::entry(Arc::clone(message)), + message.created_at + ) } } @@ -941,6 +951,7 @@ impl FilterBlocks { accounts, entries, })), + message.created_at, )); } updates @@ -972,7 +983,8 @@ impl FilterBlocksMeta { let filters = self.filters.as_slice(); filtered_updates_once_ref!( filters, - FilteredUpdateOneof::block_meta(Arc::clone(message)) + FilteredUpdateOneof::block_meta(Arc::clone(message)), + message.created_at ) } } @@ -1088,6 +1100,7 @@ mod tests { message::{Message, MessageTransaction, MessageTransactionInfo}, }, }, + prost_types::Timestamp, solana_sdk::{ hash::Hash, message::{v0::LoadedAddresses, Message as SolMessage, MessageHeader}, @@ -1096,7 +1109,11 @@ mod tests { transaction::{SanitizedTransaction, Transaction}, }, solana_transaction_status::TransactionStatusMeta, - std::{collections::HashMap, sync::Arc, time::Duration}, + std::{ + collections::HashMap, + sync::Arc, + time::{Duration, SystemTime}, + }, }; fn create_filter_names() -> FilterNames { @@ -1150,6 +1167,7 @@ mod tests { account_keys, }), slot: 100, + created_at: Timestamp::from(SystemTime::now()), } } diff --git a/yellowstone-grpc-proto/src/plugin/filter/message.rs b/yellowstone-grpc-proto/src/plugin/filter/message.rs index e0aaf5f0..3cf76f0d 100644 --- a/yellowstone-grpc-proto/src/plugin/filter/message.rs +++ b/yellowstone-grpc-proto/src/plugin/filter/message.rs @@ -24,12 +24,14 @@ use { }, DecodeError, }, + prost_types::Timestamp, smallvec::SmallVec, solana_sdk::signature::Signature, std::{ collections::HashSet, ops::{Deref, DerefMut}, sync::Arc, + time::SystemTime, }, }; @@ -67,6 +69,7 @@ pub type FilteredUpdates = SmallVec<[FilteredUpdate; 2]>; pub struct FilteredUpdate { pub filters: FilteredUpdateFilters, pub message: FilteredUpdateOneof, + pub created_at: Timestamp, } impl prost::Message for FilteredUpdate { @@ -76,12 +79,14 @@ impl prost::Message for FilteredUpdate { encode_varint(name.len() as u64, buf); buf.put_slice(name.as_bytes()); } - self.message.encode_raw(buf) + self.message.encode_raw(buf); + message::encode(11u32, &self.created_at, buf); } fn encoded_len(&self) -> usize { prost_repeated_encoded_len_map!(1u32, self.filters, |filter| filter.as_ref().len()) + self.message.encoded_len() + + message::encoded_len(11u32, &self.created_at) } fn merge_field( @@ -100,12 +105,24 @@ impl prost::Message for FilteredUpdate { } impl FilteredUpdate { - pub const fn new(filters: FilteredUpdateFilters, message: FilteredUpdateOneof) -> Self { - Self { filters, message } + pub const fn new( + filters: FilteredUpdateFilters, + message: FilteredUpdateOneof, + created_at: Timestamp, + ) -> Self { + Self { + filters, + message, + created_at, + } } pub fn new_empty(message: FilteredUpdateOneof) -> Self { - Self::new(FilteredUpdateFilters::new(), message) + Self::new( + FilteredUpdateFilters::new(), + message, + Timestamp::from(SystemTime::now()), + ) } fn as_subscribe_update_account( @@ -211,7 +228,7 @@ impl FilteredUpdate { }), FilteredUpdateOneof::Ping => UpdateOneof::Ping(SubscribeUpdatePing {}), FilteredUpdateOneof::Pong(msg) => UpdateOneof::Pong(*msg), - FilteredUpdateOneof::BlockMeta(msg) => UpdateOneof::BlockMeta(msg.0.clone()), + FilteredUpdateOneof::BlockMeta(msg) => UpdateOneof::BlockMeta(msg.block_meta.clone()), FilteredUpdateOneof::Entry(msg) => { UpdateOneof::Entry(Self::as_subscribe_update_entry(&msg.0)) } @@ -224,13 +241,16 @@ impl FilteredUpdate { .map(|name| name.as_ref().to_string()) .collect(), update_oneof: Some(message), + created_at: Some(self.created_at), } } pub fn from_subscribe_update(update: SubscribeUpdate) -> Result { - let message = match update.update_oneof.ok_or("")? { + let created_at = update.created_at.ok_or("create_at should be defined")?; + + let message = match update.update_oneof.ok_or("update should be defined")? { UpdateOneof::Account(msg) => { - let account = MessageAccount::from_update_oneof(msg)?; + let account = MessageAccount::from_update_oneof(msg, created_at)?; FilteredUpdateOneof::Account(FilteredUpdateAccount { account: account.account, slot: account.slot, @@ -239,11 +259,11 @@ impl FilteredUpdate { }) } UpdateOneof::Slot(msg) => { - let slot = MessageSlot::from_update_oneof(&msg)?; + let slot = MessageSlot::from_update_oneof(&msg, created_at)?; FilteredUpdateOneof::Slot(FilteredUpdateSlot(slot)) } UpdateOneof::Transaction(msg) => { - let tx = MessageTransaction::from_update_oneof(msg)?; + let tx = MessageTransaction::from_update_oneof(msg, created_at)?; FilteredUpdateOneof::Transaction(FilteredUpdateTransaction { transaction: tx.transaction, slot: tx.slot, @@ -267,7 +287,7 @@ impl FilteredUpdate { }) } UpdateOneof::Block(msg) => { - let block = MessageBlock::from_update_oneof(msg)?; + let block = MessageBlock::from_update_oneof(msg, created_at)?; FilteredUpdateOneof::Block(Box::new(FilteredUpdateBlock { meta: block.meta, transactions: block.transactions, @@ -280,11 +300,11 @@ impl FilteredUpdate { UpdateOneof::Ping(_) => FilteredUpdateOneof::Ping, UpdateOneof::Pong(msg) => FilteredUpdateOneof::Pong(msg), UpdateOneof::BlockMeta(msg) => { - let block_meta = MessageBlockMeta(msg); + let block_meta = MessageBlockMeta::from_update_oneof(msg, created_at); FilteredUpdateOneof::BlockMeta(Arc::new(block_meta)) } UpdateOneof::Entry(msg) => { - let entry = MessageEntry::from_update_oneof(&msg)?; + let entry = MessageEntry::from_update_oneof(&msg, created_at)?; FilteredUpdateOneof::Entry(FilteredUpdateEntry(Arc::new(entry))) } }; @@ -292,6 +312,7 @@ impl FilteredUpdate { Ok(Self { filters: update.filters.into_iter().map(FilterName::new).collect(), message, + created_at, }) } } @@ -373,7 +394,7 @@ impl prost::Message for FilteredUpdateOneof { encode_varint(0, buf); } Self::Pong(msg) => message::encode(9u32, msg, buf), - Self::BlockMeta(msg) => message::encode(7u32, &msg.0, buf), + Self::BlockMeta(msg) => message::encode(7u32, &msg.block_meta, buf), Self::Entry(msg) => message::encode(8u32, msg, buf), } } @@ -387,7 +408,7 @@ impl prost::Message for FilteredUpdateOneof { Self::Block(msg) => message::encoded_len(5u32, msg), Self::Ping => key_len(6u32) + encoded_len_varint(0), Self::Pong(msg) => message::encoded_len(9u32, msg), - Self::BlockMeta(msg) => message::encoded_len(7u32, &msg.0), + Self::BlockMeta(msg) => message::encoded_len(7u32, &msg.block_meta), Self::Entry(msg) => message::encoded_len(8u32, msg), } } @@ -972,6 +993,7 @@ pub mod tests { }, prost::Message as _, prost_011::Message as _, + prost_types::Timestamp, solana_sdk::{ hash::Hash, message::SimpleAddressLoader, @@ -987,6 +1009,7 @@ pub mod tests { ops::Range, str::FromStr, sync::Arc, + time::SystemTime, }, }; @@ -1057,6 +1080,7 @@ pub mod tests { account: Arc::clone(&account), slot, is_startup, + created_at: Timestamp::from(SystemTime::now()), }; vec.push((msg, data_slice)); } @@ -1075,6 +1099,7 @@ pub mod tests { hash: Hash::new_from_array([98; 32]), executed_transaction_count: 32, starting_transaction_index: 1000, + created_at: Timestamp::from(SystemTime::now()), }, MessageEntry { slot: 299888121, @@ -1083,6 +1108,7 @@ pub mod tests { hash: Hash::new_from_array([42; 32]), executed_transaction_count: 32, starting_transaction_index: 1000, + created_at: Timestamp::from(SystemTime::now()), }, ] .into_iter() @@ -1157,20 +1183,23 @@ pub mod tests { let entries = create_entries(); let slot = block.parent_slot + 1; - let block_meta1 = MessageBlockMeta(SubscribeUpdateBlockMeta { - parent_slot: block.parent_slot, - slot, - parent_blockhash: block.previous_blockhash, - blockhash: block.blockhash, - rewards: Some(convert_to::create_rewards_obj( - &block.rewards, - block.num_partitions, - )), - block_time: block.block_time.map(convert_to::create_timestamp), - block_height: block.block_height.map(convert_to::create_block_height), - executed_transaction_count: transactions.len() as u64, - entries_count: entries.len() as u64, - }); + let block_meta1 = MessageBlockMeta { + block_meta: SubscribeUpdateBlockMeta { + parent_slot: block.parent_slot, + slot, + parent_blockhash: block.previous_blockhash, + blockhash: block.blockhash, + rewards: Some(convert_to::create_rewards_obj( + &block.rewards, + block.num_partitions, + )), + block_time: block.block_time.map(convert_to::create_timestamp), + block_height: block.block_height.map(convert_to::create_block_height), + executed_transaction_count: transactions.len() as u64, + entries_count: entries.len() as u64, + }, + created_at: Timestamp::from(SystemTime::now()), + }; let mut block_meta2 = block_meta1.clone(); block_meta2.rewards = Some(convert_to::create_rewards_obj(&block.rewards, Some(42))); @@ -1209,6 +1238,7 @@ pub mod tests { let msg = FilteredUpdate { filters: create_message_filters(filters), message, + created_at: Timestamp::from(SystemTime::now()), }; let update = msg.as_subscribe_update(); assert_eq!(msg.encoded_len(), update.encoded_len()); @@ -1250,6 +1280,7 @@ pub mod tests { parent, status, dead_error: None, + created_at: Timestamp::from(SystemTime::now()), }), ) } @@ -1260,6 +1291,7 @@ pub mod tests { parent, status: CommitmentLevel::Dead, dead_error: Some("123".to_owned()), + created_at: Timestamp::from(SystemTime::now()), }), ) } @@ -1272,6 +1304,7 @@ pub mod tests { let msg = MessageTransaction { transaction, slot: 42, + created_at: Timestamp::from(SystemTime::now()), }; encode_decode_cmp(&["123"], FilteredUpdateOneof::transaction(&msg)); encode_decode_cmp(&["123"], FilteredUpdateOneof::transaction_status(&msg)); diff --git a/yellowstone-grpc-proto/src/plugin/message.rs b/yellowstone-grpc-proto/src/plugin/message.rs index fe14d4bd..8ad2e5b9 100644 --- a/yellowstone-grpc-proto/src/plugin/message.rs +++ b/yellowstone-grpc-proto/src/plugin/message.rs @@ -13,6 +13,7 @@ use { ReplicaAccountInfoV3, ReplicaBlockInfoV4, ReplicaEntryInfoV2, ReplicaTransactionInfoV2, SlotStatus, }, + prost_types::Timestamp, solana_sdk::{ clock::Slot, hash::{Hash, HASH_BYTES}, @@ -23,6 +24,7 @@ use { collections::HashSet, ops::{Deref, DerefMut}, sync::Arc, + time::SystemTime, }, }; @@ -101,6 +103,7 @@ pub struct MessageSlot { pub parent: Option, pub status: CommitmentLevel, pub dead_error: Option, + pub created_at: Timestamp, } impl MessageSlot { @@ -114,10 +117,14 @@ impl MessageSlot { } else { None }, + created_at: Timestamp::from(SystemTime::now()), } } - pub fn from_update_oneof(msg: &SubscribeUpdateSlot) -> FromUpdateOneofResult { + pub fn from_update_oneof( + msg: &SubscribeUpdateSlot, + created_at: Timestamp, + ) -> FromUpdateOneofResult { Ok(Self { slot: msg.slot, parent: msg.parent, @@ -125,6 +132,7 @@ impl MessageSlot { .map_err(|_| "failed to parse commitment level")? .into(), dead_error: msg.dead_error.clone(), + created_at, }) } } @@ -179,6 +187,7 @@ pub struct MessageAccount { pub account: Arc, pub slot: Slot, pub is_startup: bool, + pub created_at: Timestamp, } impl MessageAccount { @@ -187,16 +196,21 @@ impl MessageAccount { account: Arc::new(MessageAccountInfo::from_geyser(info)), slot, is_startup, + created_at: Timestamp::from(SystemTime::now()), } } - pub fn from_update_oneof(msg: SubscribeUpdateAccount) -> FromUpdateOneofResult { + pub fn from_update_oneof( + msg: SubscribeUpdateAccount, + created_at: Timestamp, + ) -> FromUpdateOneofResult { Ok(Self { account: Arc::new(MessageAccountInfo::from_update_oneof( msg.account.ok_or("account message should be defined")?, )?), slot: msg.slot, is_startup: msg.is_startup, + created_at, }) } } @@ -281,6 +295,7 @@ impl MessageTransactionInfo { pub struct MessageTransaction { pub transaction: Arc, pub slot: u64, + pub created_at: Timestamp, } impl MessageTransaction { @@ -288,16 +303,21 @@ impl MessageTransaction { Self { transaction: Arc::new(MessageTransactionInfo::from_geyser(info)), slot, + created_at: Timestamp::from(SystemTime::now()), } } - pub fn from_update_oneof(msg: SubscribeUpdateTransaction) -> FromUpdateOneofResult { + pub fn from_update_oneof( + msg: SubscribeUpdateTransaction, + created_at: Timestamp, + ) -> FromUpdateOneofResult { Ok(Self { transaction: Arc::new(MessageTransactionInfo::from_update_oneof( msg.transaction .ok_or("transaction message should be defined")?, )?), slot: msg.slot, + created_at, }) } } @@ -310,6 +330,7 @@ pub struct MessageEntry { pub hash: Hash, pub executed_transaction_count: u64, pub starting_transaction_index: u64, + pub created_at: Timestamp, } impl MessageEntry { @@ -324,10 +345,14 @@ impl MessageEntry { .starting_transaction_index .try_into() .expect("failed convert usize to u64"), + created_at: Timestamp::from(SystemTime::now()), } } - pub fn from_update_oneof(msg: &SubscribeUpdateEntry) -> FromUpdateOneofResult { + pub fn from_update_oneof( + msg: &SubscribeUpdateEntry, + created_at: Timestamp, + ) -> FromUpdateOneofResult { Ok(Self { slot: msg.slot, index: msg.index as usize, @@ -338,43 +363,60 @@ impl MessageEntry { ), executed_transaction_count: msg.executed_transaction_count, starting_transaction_index: msg.starting_transaction_index, + created_at, }) } } #[derive(Debug, Clone, PartialEq)] -pub struct MessageBlockMeta(pub SubscribeUpdateBlockMeta); +pub struct MessageBlockMeta { + pub block_meta: SubscribeUpdateBlockMeta, + pub created_at: Timestamp, +} impl Deref for MessageBlockMeta { type Target = SubscribeUpdateBlockMeta; fn deref(&self) -> &Self::Target { - &self.0 + &self.block_meta } } impl DerefMut for MessageBlockMeta { fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.0 + &mut self.block_meta } } impl MessageBlockMeta { pub fn from_geyser(info: &ReplicaBlockInfoV4<'_>) -> Self { - Self(SubscribeUpdateBlockMeta { - parent_slot: info.parent_slot, - slot: info.slot, - parent_blockhash: info.parent_blockhash.to_string(), - blockhash: info.blockhash.to_string(), - rewards: Some(convert_to::create_rewards_obj( - &info.rewards.rewards, - info.rewards.num_partitions, - )), - block_time: info.block_time.map(convert_to::create_timestamp), - block_height: info.block_height.map(convert_to::create_block_height), - executed_transaction_count: info.executed_transaction_count, - entries_count: info.entry_count, - }) + Self { + block_meta: SubscribeUpdateBlockMeta { + parent_slot: info.parent_slot, + slot: info.slot, + parent_blockhash: info.parent_blockhash.to_string(), + blockhash: info.blockhash.to_string(), + rewards: Some(convert_to::create_rewards_obj( + &info.rewards.rewards, + info.rewards.num_partitions, + )), + block_time: info.block_time.map(convert_to::create_timestamp), + block_height: info.block_height.map(convert_to::create_block_height), + executed_transaction_count: info.executed_transaction_count, + entries_count: info.entry_count, + }, + created_at: Timestamp::from(SystemTime::now()), + } + } + + pub const fn from_update_oneof( + block_meta: SubscribeUpdateBlockMeta, + created_at: Timestamp, + ) -> Self { + Self { + block_meta, + created_at, + } } } @@ -385,6 +427,7 @@ pub struct MessageBlock { pub updated_account_count: u64, pub accounts: Vec>, pub entries: Vec>, + pub created_at: Timestamp, } impl MessageBlock { @@ -400,22 +443,29 @@ impl MessageBlock { updated_account_count: accounts.len() as u64, accounts, entries, + created_at: Timestamp::from(SystemTime::now()), } } - pub fn from_update_oneof(msg: SubscribeUpdateBlock) -> FromUpdateOneofResult { + pub fn from_update_oneof( + msg: SubscribeUpdateBlock, + created_at: Timestamp, + ) -> FromUpdateOneofResult { Ok(Self { - meta: Arc::new(MessageBlockMeta(SubscribeUpdateBlockMeta { - slot: msg.slot, - blockhash: msg.blockhash, - rewards: msg.rewards, - block_time: msg.block_time, - block_height: msg.block_height, - parent_slot: msg.parent_slot, - parent_blockhash: msg.parent_blockhash, - executed_transaction_count: msg.executed_transaction_count, - entries_count: msg.entries_count, - })), + meta: Arc::new(MessageBlockMeta { + block_meta: SubscribeUpdateBlockMeta { + slot: msg.slot, + blockhash: msg.blockhash, + rewards: msg.rewards, + block_time: msg.block_time, + block_height: msg.block_height, + parent_slot: msg.parent_slot, + parent_blockhash: msg.parent_blockhash, + executed_transaction_count: msg.executed_transaction_count, + entries_count: msg.entries_count, + }, + created_at, + }), transactions: msg .transactions .into_iter() @@ -430,8 +480,9 @@ impl MessageBlock { entries: msg .entries .iter() - .map(|entry| MessageEntry::from_update_oneof(entry).map(Arc::new)) + .map(|entry| MessageEntry::from_update_oneof(entry, created_at).map(Arc::new)) .collect::, _>>()?, + created_at, }) } } @@ -458,22 +509,31 @@ impl Message { } } - pub fn from_update_oneof(oneof: UpdateOneof) -> FromUpdateOneofResult { + pub fn from_update_oneof( + oneof: UpdateOneof, + created_at: Timestamp, + ) -> FromUpdateOneofResult { Ok(match oneof { - UpdateOneof::Account(msg) => Self::Account(MessageAccount::from_update_oneof(msg)?), - UpdateOneof::Slot(msg) => Self::Slot(MessageSlot::from_update_oneof(&msg)?), + UpdateOneof::Account(msg) => { + Self::Account(MessageAccount::from_update_oneof(msg, created_at)?) + } + UpdateOneof::Slot(msg) => Self::Slot(MessageSlot::from_update_oneof(&msg, created_at)?), UpdateOneof::Transaction(msg) => { - Self::Transaction(MessageTransaction::from_update_oneof(msg)?) + Self::Transaction(MessageTransaction::from_update_oneof(msg, created_at)?) } UpdateOneof::TransactionStatus(_) => { return Err("TransactionStatus message is not supported") } - UpdateOneof::Block(msg) => Self::Block(Arc::new(MessageBlock::from_update_oneof(msg)?)), + UpdateOneof::Block(msg) => { + Self::Block(Arc::new(MessageBlock::from_update_oneof(msg, created_at)?)) + } UpdateOneof::Ping(_) => return Err("Ping message is not supported"), UpdateOneof::Pong(_) => return Err("Pong message is not supported"), - UpdateOneof::BlockMeta(msg) => Self::BlockMeta(Arc::new(MessageBlockMeta(msg))), + UpdateOneof::BlockMeta(msg) => Self::BlockMeta(Arc::new( + MessageBlockMeta::from_update_oneof(msg, created_at), + )), UpdateOneof::Entry(msg) => { - Self::Entry(Arc::new(MessageEntry::from_update_oneof(&msg)?)) + Self::Entry(Arc::new(MessageEntry::from_update_oneof(&msg, created_at)?)) } }) } From 4ddf5781afc53345e552e040f58ddd0a345ba6cc Mon Sep 17 00:00:00 2001 From: Kirill Fomichev Date: Sat, 30 Nov 2024 17:38:38 -0500 Subject: [PATCH 3/6] fmt --- examples/rust/src/bin/client.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/examples/rust/src/bin/client.rs b/examples/rust/src/bin/client.rs index bee198fe..fcd536b6 100644 --- a/examples/rust/src/bin/client.rs +++ b/examples/rust/src/bin/client.rs @@ -972,7 +972,9 @@ fn create_pretty_entry(msg: SubscribeUpdateEntry) -> anyhow::Result { } fn print_update(kind: &str, created_at: SystemTime, filters: &[String], value: Value) { - let unix_since = created_at.duration_since(UNIX_EPOCH).expect("valid system time"); + let unix_since = created_at + .duration_since(UNIX_EPOCH) + .expect("valid system time"); info!( "{kind} ({}) at {}.{:0>6}: {}", filters.join(","), From 52361541ea1002ae287c27daf846355fcb334334 Mon Sep 17 00:00:00 2001 From: Kirill Fomichev Date: Wed, 11 Dec 2024 22:38:54 -0500 Subject: [PATCH 4/6] fix changelog --- CHANGELOG.md | 2 -- 1 file changed, 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 957546cc..5ba41496 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,8 +17,6 @@ The minor version will be incremented upon a breaking change and the patch versi ### Features - proto: add tonic feature ([#474](https://github.com/rpcpool/yellowstone-grpc/pull/474)) -- geyser: use default compression as gzip and zstd ([#475](https://github.com/rpcpool/yellowstone-grpc/pull/475)) -- example: add connection options to Rust client ([#478](https://github.com/rpcpool/yellowstone-grpc/pull/478)) - proto: add field `created_at` to update message ([#479](https://github.com/rpcpool/yellowstone-grpc/pull/479)) - nodejs: add parse err function ([#483](https://github.com/rpcpool/yellowstone-grpc/pull/483)) From 86c91cf7d487969fc0d3ed79a667b417077eb63f Mon Sep 17 00:00:00 2001 From: Kirill Fomichev Date: Thu, 12 Dec 2024 14:55:37 -0500 Subject: [PATCH 5/6] fix lock file --- yellowstone-grpc-client-nodejs/solana-encoding-wasm/Cargo.lock | 1 + 1 file changed, 1 insertion(+) diff --git a/yellowstone-grpc-client-nodejs/solana-encoding-wasm/Cargo.lock b/yellowstone-grpc-client-nodejs/solana-encoding-wasm/Cargo.lock index 271f3ab6..f9fa86ff 100644 --- a/yellowstone-grpc-client-nodejs/solana-encoding-wasm/Cargo.lock +++ b/yellowstone-grpc-client-nodejs/solana-encoding-wasm/Cargo.lock @@ -3264,6 +3264,7 @@ dependencies = [ "anyhow", "bincode", "prost", + "prost-types", "protobuf-src", "solana-account-decoder", "solana-sdk", From e52c12d744f3bb2c454a54cedc075aa52f8dd851 Mon Sep 17 00:00:00 2001 From: Kirill Fomichev Date: Thu, 12 Dec 2024 15:21:25 -0500 Subject: [PATCH 6/6] changelog --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5ba41496..b0be08b8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,7 +12,7 @@ The minor version will be incremented upon a breaking change and the patch versi ### Fixes -- nodejs: fix connector for custom port ([#488](https://github.com/rpcpool/yellowstone-grpc/pull/48*)) +- nodejs: fix connector for custom port ([#488](https://github.com/rpcpool/yellowstone-grpc/pull/488)) ### Features