Skip to content

Commit

Permalink
proto: add field created_at to update message (#479)
Browse files Browse the repository at this point in the history
  • Loading branch information
fanatid authored Dec 12, 2024
1 parent 895e1db commit 103b54a
Show file tree
Hide file tree
Showing 14 changed files with 229 additions and 87 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@ The minor version will be incremented upon a breaking change and the patch versi

### Fixes

- nodejs: fix connector for custom port ([#488](https://github.com/rpcpool/yellowstone-grpc/pull/48*))
- nodejs: fix connector for custom port ([#488](https://github.com/rpcpool/yellowstone-grpc/pull/488))

### Features

- proto: add tonic feature ([#474](https://github.com/rpcpool/yellowstone-grpc/pull/474))
- proto: add field `created_at` to update message ([#479](https://github.com/rpcpool/yellowstone-grpc/pull/479))
- nodejs: add parse err function ([#483](https://github.com/rpcpool/yellowstone-grpc/pull/483))

### Breaking
Expand Down
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ log = "0.4.17"
maplit = "1.0.2"
prometheus = "0.13.2"
prost = "0.13.1"
prost-types = "0.13.3"
prost_011 = { package = "prost", version = "0.11.9" }
protobuf-src = "1.1.0"
serde = "1.0.145"
Expand Down
24 changes: 19 additions & 5 deletions examples/rust/src/bin/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -741,6 +741,11 @@ async fn geyser_subscribe(
}

let filters = msg.filters;
let created_at: SystemTime = msg
.created_at
.ok_or(anyhow::anyhow!("no created_at in the message"))?
.try_into()
.context("failed to parse created_at")?;
match msg.update_oneof {
Some(UpdateOneof::Account(msg)) => {
let account = msg
Expand All @@ -749,13 +754,14 @@ async fn geyser_subscribe(
let mut value = create_pretty_account(account)?;
value["isStartup"] = json!(msg.is_startup);
value["slot"] = json!(msg.slot);
print_update("account", &filters, value);
print_update("account", created_at, &filters, value);
}
Some(UpdateOneof::Slot(msg)) => {
let status = CommitmentLevel::try_from(msg.status)
.context("failed to decode commitment")?;
print_update(
"slot",
created_at,
&filters,
json!({
"slot": msg.slot,
Expand All @@ -771,11 +777,12 @@ async fn geyser_subscribe(
.ok_or(anyhow::anyhow!("no transaction in the message"))?;
let mut value = create_pretty_transaction(tx)?;
value["slot"] = json!(msg.slot);
print_update("transaction", &filters, value);
print_update("transaction", created_at, &filters, value);
}
Some(UpdateOneof::TransactionStatus(msg)) => {
print_update(
"transactionStatus",
created_at,
&filters,
json!({
"slot": msg.slot,
Expand All @@ -789,11 +796,12 @@ async fn geyser_subscribe(
);
}
Some(UpdateOneof::Entry(msg)) => {
print_update("entry", &filters, create_pretty_entry(msg)?);
print_update("entry", created_at, &filters, create_pretty_entry(msg)?);
}
Some(UpdateOneof::BlockMeta(msg)) => {
print_update(
"blockmeta",
created_at,
&filters,
json!({
"slot": msg.slot,
Expand All @@ -815,6 +823,7 @@ async fn geyser_subscribe(
Some(UpdateOneof::Block(msg)) => {
print_update(
"block",
created_at,
&filters,
json!({
"slot": msg.slot,
Expand Down Expand Up @@ -962,10 +971,15 @@ fn create_pretty_entry(msg: SubscribeUpdateEntry) -> anyhow::Result<Value> {
}))
}

fn print_update(kind: &str, filters: &[String], value: Value) {
fn print_update(kind: &str, created_at: SystemTime, filters: &[String], value: Value) {
let unix_since = created_at
.duration_since(UNIX_EPOCH)
.expect("valid system time");
info!(
"{kind} ({}): {}",
"{kind} ({}) at {}.{:0>6}: {}",
filters.join(","),
unix_since.as_secs(),
unix_since.subsec_micros(),
serde_json::to_string(&value).expect("json serialization failed")
);
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions yellowstone-grpc-geyser/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ hyper-util = { workspace = true }
lazy_static = { workspace = true }
log = { workspace = true }
prometheus = { workspace = true }
prost-types = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
solana-logger = { workspace = true }
Expand Down
3 changes: 3 additions & 0 deletions yellowstone-grpc-geyser/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use {
},
anyhow::Context,
log::{error, info},
prost_types::Timestamp,
solana_sdk::{
clock::{Slot, MAX_RECENT_BLOCKHASHES},
pubkey::Pubkey,
Expand All @@ -16,6 +17,7 @@ use {
atomic::{AtomicUsize, Ordering},
Arc,
},
time::SystemTime,
},
tokio::{
fs,
Expand Down Expand Up @@ -637,6 +639,7 @@ impl GrpcService {
parent: entry.parent_slot,
status,
dead_error: None,
created_at: Timestamp::from(SystemTime::now())
}));
metrics::missed_status_message_inc(status);
}
Expand Down
1 change: 1 addition & 0 deletions yellowstone-grpc-proto/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ bincode = { workspace = true, optional = true }
bs58 = { workspace = true, optional = true }
bytes = { workspace = true, optional = true }
prost = { workspace = true }
prost-types = { workspace = true }
prost_011 = { workspace = true, optional = true }
serde = { workspace = true, optional = true }
solana-account-decoder = { workspace = true, optional = true }
Expand Down
7 changes: 6 additions & 1 deletion yellowstone-grpc-proto/benches/encode.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use {
criterion::{criterion_group, criterion_main, BenchmarkId, Criterion},
prost::Message as _,
std::time::Duration,
prost_types::Timestamp,
std::time::{Duration, SystemTime},
yellowstone_grpc_proto::plugin::{
filter::message::{
tests::{
Expand Down Expand Up @@ -41,6 +42,7 @@ fn bench_account(c: &mut Criterion) {
.map(|(msg, data_slice)| FilteredUpdate {
filters: filters.clone(),
message: FilteredUpdateOneof::account(&msg, data_slice),
created_at: Timestamp::from(SystemTime::now()),
})
.collect::<Vec<_>>();
bench!(&updates, "accounts");
Expand All @@ -52,7 +54,9 @@ fn bench_account(c: &mut Criterion) {
message: FilteredUpdateOneof::transaction(&MessageTransaction {
transaction,
slot: 42,
created_at: Timestamp::from(SystemTime::now()),
}),
created_at: Timestamp::from(SystemTime::now()),
})
.collect::<Vec<_>>();
bench!(&updates, "transactions");
Expand All @@ -62,6 +66,7 @@ fn bench_account(c: &mut Criterion) {
.map(|block| FilteredUpdate {
filters: filters.clone(),
message: FilteredUpdateOneof::block(Box::new(block)),
created_at: Timestamp::from(SystemTime::now()),
})
.collect::<Vec<_>>();
bench!(&updates, "blocks");
Expand Down
2 changes: 2 additions & 0 deletions yellowstone-grpc-proto/proto/geyser.proto
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
syntax = "proto3";

import "google/protobuf/timestamp.proto";
import public "solana-storage.proto";

option go_package = "github.com/rpcpool/yellowstone-grpc/examples/golang/proto";
Expand Down Expand Up @@ -119,6 +120,7 @@ message SubscribeUpdate {
SubscribeUpdateBlockMeta block_meta = 7;
SubscribeUpdateEntry entry = 8;
}
google.protobuf.Timestamp created_at = 11;
}

message SubscribeUpdateAccount {
Expand Down
2 changes: 1 addition & 1 deletion yellowstone-grpc-proto/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ pub mod prelude {
pub use super::{geyser::*, solana::storage::confirmed_block::*};
}

pub use prost;
#[cfg(feature = "tonic")]
pub use tonic;
pub use {prost, prost_types};

#[cfg(feature = "plugin")]
pub mod plugin;
Expand Down
38 changes: 28 additions & 10 deletions yellowstone-grpc-proto/src/plugin/filter/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,24 +75,24 @@ pub enum FilterError {
pub type FilterResult<T> = Result<T, FilterError>;

macro_rules! filtered_updates_once_owned {
($filters:ident, $message:expr) => {{
($filters:ident, $message:expr, $created_at:expr) => {{
let mut messages = FilteredUpdates::new();
if !$filters.is_empty() {
messages.push(FilteredUpdate::new($filters, $message));
messages.push(FilteredUpdate::new($filters, $message, $created_at));
}
messages
}};
}

macro_rules! filtered_updates_once_ref {
($filters:ident, $message:expr) => {{
($filters:ident, $message:expr, $created_at:expr) => {{
let mut messages = FilteredUpdates::new();
if !$filters.is_empty() {
let mut message_filters = FilteredUpdateFilters::new();
for filter in $filters {
message_filters.push(filter.clone());
}
messages.push(FilteredUpdate::new(message_filters, $message));
messages.push(FilteredUpdate::new(message_filters, $message, $created_at));
}
messages
}};
Expand Down Expand Up @@ -339,7 +339,8 @@ impl FilterAccounts {
let filters = filter.get_filters();
filtered_updates_once_owned!(
filters,
FilteredUpdateOneof::account(message, accounts_data_slice.clone())
FilteredUpdateOneof::account(message, accounts_data_slice.clone()),
message.created_at
)
}
}
Expand Down Expand Up @@ -631,7 +632,11 @@ impl FilterSlots {
}
})
.collect::<FilteredUpdateFilters>();
filtered_updates_once_owned!(filters, FilteredUpdateOneof::slot(message.clone()))
filtered_updates_once_owned!(
filters,
FilteredUpdateOneof::slot(message.clone()),
message.created_at
)
}
}

Expand Down Expand Up @@ -785,7 +790,8 @@ impl FilterTransactions {
FilterTransactionsType::TransactionStatus => {
FilteredUpdateOneof::transaction_status(message)
}
}
},
message.created_at
)
}
}
Expand Down Expand Up @@ -813,7 +819,11 @@ impl FilterEntries {

fn get_updates(&self, message: &Arc<MessageEntry>) -> FilteredUpdates {
let filters = self.filters.as_slice();
filtered_updates_once_ref!(filters, FilteredUpdateOneof::entry(Arc::clone(message)))
filtered_updates_once_ref!(
filters,
FilteredUpdateOneof::entry(Arc::clone(message)),
message.created_at
)
}
}

Expand Down Expand Up @@ -941,6 +951,7 @@ impl FilterBlocks {
accounts,
entries,
})),
message.created_at,
));
}
updates
Expand Down Expand Up @@ -972,7 +983,8 @@ impl FilterBlocksMeta {
let filters = self.filters.as_slice();
filtered_updates_once_ref!(
filters,
FilteredUpdateOneof::block_meta(Arc::clone(message))
FilteredUpdateOneof::block_meta(Arc::clone(message)),
message.created_at
)
}
}
Expand Down Expand Up @@ -1088,6 +1100,7 @@ mod tests {
message::{Message, MessageTransaction, MessageTransactionInfo},
},
},
prost_types::Timestamp,
solana_sdk::{
hash::Hash,
message::{v0::LoadedAddresses, Message as SolMessage, MessageHeader},
Expand All @@ -1096,7 +1109,11 @@ mod tests {
transaction::{SanitizedTransaction, Transaction},
},
solana_transaction_status::TransactionStatusMeta,
std::{collections::HashMap, sync::Arc, time::Duration},
std::{
collections::HashMap,
sync::Arc,
time::{Duration, SystemTime},
},
};

fn create_filter_names() -> FilterNames {
Expand Down Expand Up @@ -1150,6 +1167,7 @@ mod tests {
account_keys,
}),
slot: 100,
created_at: Timestamp::from(SystemTime::now()),
}
}

Expand Down
Loading

0 comments on commit 103b54a

Please sign in to comment.