Skip to content

Commit

Permalink
proto: add optional field filter_by_commitment to Slots filter (#223)
Browse files Browse the repository at this point in the history
  • Loading branch information
fanatid authored Oct 27, 2023
1 parent f88fd64 commit c5f2601
Show file tree
Hide file tree
Showing 9 changed files with 112 additions and 27 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ The minor version will be incremented upon a breaking change and the patch versi

### Features

- proto: add optional field `filter_by_commitment` to Slots filter ([#223](https://github.com/rpcpool/yellowstone-grpc/pull/223))

### Breaking

## 2023-10-19
Expand Down
21 changes: 18 additions & 3 deletions examples/golang/proto/geyser.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion examples/golang/proto/solana-storage.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 11 additions & 2 deletions examples/rust/src/bin/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,10 @@ struct ActionSubscribe {
#[clap(long)]
slots: bool,

/// Filter slots by commitment
#[clap(long)]
slots_filter_by_commitment: bool,

/// Subscribe on transactions updates
#[clap(long)]
transactions: bool,
Expand Down Expand Up @@ -241,7 +245,12 @@ impl Action {

let mut slots: SlotsFilterMap = HashMap::new();
if args.slots {
slots.insert("client".to_owned(), SubscribeRequestFilterSlots {});
slots.insert(
"client".to_owned(),
SubscribeRequestFilterSlots {
filter_by_commitment: Some(args.slots_filter_by_commitment),
},
);
}

let mut transactions: TransactionsFilterMap = HashMap::new();
Expand Down Expand Up @@ -550,7 +559,7 @@ async fn geyser_subscribe(
counter += 1;
if counter == resub {
let mut new_slots: SlotsFilterMap = HashMap::new();
new_slots.insert("client".to_owned(), SubscribeRequestFilterSlots {});
new_slots.insert("client".to_owned(), SubscribeRequestFilterSlots::default());

subscribe_tx
.send(SubscribeRequest {
Expand Down
9 changes: 8 additions & 1 deletion examples/typescript/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,9 @@ async function subscribeCommand(client, args) {
}

if (args.slots) {
request.slots.client = {};
request.slots.client = {
filterByCommitment: args.slotsFilterByCommitment,
};
}

if (args.transactions) {
Expand Down Expand Up @@ -272,6 +274,11 @@ function parseCommandLineArgs() {
describe: "subscribe on slots updates",
type: "boolean",
},
"slots-filter-by-commitment": {
default: false,
describe: "filter slot messages by commitment",
type: "boolean",
},
transactions: {
default: false,
describe: "subscribe on transactions updates",
Expand Down
64 changes: 50 additions & 14 deletions yellowstone-grpc-geyser/src/filters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,19 +82,27 @@ impl Filter {
self.commitment
}

pub fn get_filters<'a>(&self, message: &'a Message) -> Vec<(Vec<String>, MessageRef<'a>)> {
pub fn get_filters<'a>(
&self,
message: &'a Message,
commitment: Option<CommitmentLevel>,
) -> Vec<(Vec<String>, MessageRef<'a>)> {
match message {
Message::Account(message) => self.accounts.get_filters(message),
Message::Slot(message) => self.slots.get_filters(message),
Message::Slot(message) => self.slots.get_filters(message, commitment),
Message::Transaction(message) => self.transactions.get_filters(message),
Message::Entry(message) => self.entry.get_filters(message),
Message::Block(message) => self.blocks.get_filters(message),
Message::BlockMeta(message) => self.blocks_meta.get_filters(message),
}
}

pub fn get_update(&self, message: &Message) -> Vec<SubscribeUpdate> {
self.get_filters(message)
pub fn get_update(
&self,
message: &Message,
commitment: Option<CommitmentLevel>,
) -> Vec<SubscribeUpdate> {
self.get_filters(message, commitment)
.into_iter()
.filter_map(|(filters, message)| {
if filters.is_empty() {
Expand Down Expand Up @@ -336,9 +344,22 @@ impl<'a> FilterAccountsMatch<'a> {
}
}

#[derive(Debug, Default, Clone, Copy)]
struct FilterSlotsInner {
filter_by_commitment: bool,
}

impl FilterSlotsInner {
fn new(filter: &SubscribeRequestFilterSlots) -> Self {
Self {
filter_by_commitment: filter.filter_by_commitment.unwrap_or_default(),
}
}
}

#[derive(Debug, Default, Clone)]
struct FilterSlots {
filters: Vec<String>,
filters: HashMap<String, FilterSlotsInner>,
}

impl FilterSlots {
Expand All @@ -351,14 +372,29 @@ impl FilterSlots {
Ok(Self {
filters: configs
.iter()
// .filter_map(|(name, _filter)| Some(name.clone()))
.map(|(name, _filter)| name.clone())
.map(|(name, filter)| (name.clone(), FilterSlotsInner::new(filter)))
.collect(),
})
}

fn get_filters<'a>(&self, message: &'a MessageSlot) -> Vec<(Vec<String>, MessageRef<'a>)> {
vec![(self.filters.clone(), MessageRef::Slot(message))]
fn get_filters<'a>(
&self,
message: &'a MessageSlot,
commitment: Option<CommitmentLevel>,
) -> Vec<(Vec<String>, MessageRef<'a>)> {
vec![(
self.filters
.iter()
.filter_map(|(name, inner)| {
if !inner.filter_by_commitment || commitment == Some(message.status) {
Some(name.clone())
} else {
None
}
})
.collect(),
MessageRef::Slot(message),
)]
}
}

Expand Down Expand Up @@ -938,7 +974,7 @@ mod tests {
let message_transaction =
create_message_transaction(&keypair_b, vec![account_key_b, account_key_a]);
let message = Message::Transaction(message_transaction);
for (filters, _message) in filter.get_filters(&message) {
for (filters, _message) in filter.get_filters(&message, None) {
assert!(!filters.is_empty());
}
}
Expand Down Expand Up @@ -980,7 +1016,7 @@ mod tests {
let message_transaction =
create_message_transaction(&keypair_b, vec![account_key_b, account_key_a]);
let message = Message::Transaction(message_transaction);
for (filters, _message) in filter.get_filters(&message) {
for (filters, _message) in filter.get_filters(&message, None) {
assert!(!filters.is_empty());
}
}
Expand Down Expand Up @@ -1022,7 +1058,7 @@ mod tests {
let message_transaction =
create_message_transaction(&keypair_b, vec![account_key_b, account_key_a]);
let message = Message::Transaction(message_transaction);
for (filters, _message) in filter.get_filters(&message) {
for (filters, _message) in filter.get_filters(&message, None) {
assert!(filters.is_empty());
}
}
Expand Down Expand Up @@ -1072,7 +1108,7 @@ mod tests {
vec![account_key_x, account_key_y, account_key_z],
);
let message = Message::Transaction(message_transaction);
for (filters, _message) in filter.get_filters(&message) {
for (filters, _message) in filter.get_filters(&message, None) {
assert!(!filters.is_empty());
}
}
Expand Down Expand Up @@ -1120,7 +1156,7 @@ mod tests {
let message_transaction =
create_message_transaction(&keypair_x, vec![account_key_x, account_key_z]);
let message = Message::Transaction(message_transaction);
for (filters, _message) in filter.get_filters(&message) {
for (filters, _message) in filter.get_filters(&message, None) {
assert!(filters.is_empty());
}
}
Expand Down
4 changes: 2 additions & 2 deletions yellowstone-grpc-geyser/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1082,7 +1082,7 @@ impl GrpcService {
}
};

for message in filter.get_update(&message) {
for message in filter.get_update(&message, None) {
if stream_tx.send(Ok(message)).await.is_err() {
error!("client #{id}: stream closed");
is_alive = false;
Expand Down Expand Up @@ -1126,7 +1126,7 @@ impl GrpcService {

if commitment == filter.get_commitment_level() {
for message in messages.iter() {
for message in filter.get_update(message) {
for message in filter.get_update(message, Some(commitment)) {
match stream_tx.try_send(Ok(message)) {
Ok(()) => {}
Err(mpsc::error::TrySendError::Full(_)) => {
Expand Down
4 changes: 3 additions & 1 deletion yellowstone-grpc-proto/proto/geyser.proto
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,9 @@ message SubscribeRequestFilterAccountsFilterMemcmp {
}
}

message SubscribeRequestFilterSlots {}
message SubscribeRequestFilterSlots {
optional bool filter_by_commitment = 1;
}

message SubscribeRequestFilterTransactions {
optional bool vote = 1;
Expand Down
20 changes: 17 additions & 3 deletions yellowstone-grpc-tools/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use {
CommitmentLevel, SubscribeRequest, SubscribeRequestAccountsDataSlice,
SubscribeRequestFilterAccounts, SubscribeRequestFilterAccountsFilter,
SubscribeRequestFilterAccountsFilterMemcmp, SubscribeRequestFilterBlocks,
SubscribeRequestFilterTransactions,
SubscribeRequestFilterSlots, SubscribeRequestFilterTransactions,
},
};

Expand Down Expand Up @@ -40,7 +40,7 @@ pub trait GrpcRequestToProto<T> {
#[derive(Debug, Default, Clone, Deserialize, Serialize)]
#[serde(default)]
pub struct ConfigGrpcRequest {
pub slots: HashSet<String>,
pub slots: HashMap<String, ConfigGrpcRequestSlots>,
pub accounts: HashMap<String, ConfigGrpcRequestAccounts>,
pub transactions: HashMap<String, ConfigGrpcRequestTransactions>,
pub entries: HashSet<String>,
Expand All @@ -67,7 +67,7 @@ impl ConfigGrpcRequest {
impl GrpcRequestToProto<SubscribeRequest> for ConfigGrpcRequest {
fn to_proto(self) -> SubscribeRequest {
SubscribeRequest {
slots: ConfigGrpcRequest::set_to_proto(self.slots),
slots: ConfigGrpcRequest::map_to_proto(self.slots),
accounts: ConfigGrpcRequest::map_to_proto(self.accounts),
transactions: ConfigGrpcRequest::map_to_proto(self.transactions),
entry: ConfigGrpcRequest::set_to_proto(self.entries),
Expand All @@ -79,6 +79,20 @@ impl GrpcRequestToProto<SubscribeRequest> for ConfigGrpcRequest {
}
}

#[derive(Debug, Default, Clone, Deserialize, Serialize)]
#[serde(default)]
pub struct ConfigGrpcRequestSlots {
filter_by_commitment: Option<bool>,
}

impl GrpcRequestToProto<SubscribeRequestFilterSlots> for ConfigGrpcRequestSlots {
fn to_proto(self) -> SubscribeRequestFilterSlots {
SubscribeRequestFilterSlots {
filter_by_commitment: self.filter_by_commitment,
}
}
}

#[derive(Debug, Default, Clone, Deserialize, Serialize)]
#[serde(default)]
pub struct ConfigGrpcRequestAccounts {
Expand Down

0 comments on commit c5f2601

Please sign in to comment.