diff --git a/CHANGELOG.md b/CHANGELOG.md index ac61cb2e..a978c2f2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,8 @@ The minor version will be incremented upon a breaking change and the patch versi ### Features +- proto: add optional field `filter_by_commitment` to Slots filter ([#223](https://github.com/rpcpool/yellowstone-grpc/pull/223)) + ### Breaking ## 2023-10-19 diff --git a/examples/golang/proto/geyser.pb.go b/examples/golang/proto/geyser.pb.go index 5bc94856..816600a9 100644 --- a/examples/golang/proto/geyser.pb.go +++ b/examples/golang/proto/geyser.pb.go @@ -1,6 +1,6 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.28.1 +// protoc-gen-go v1.31.0 // protoc v3.19.6 // source: geyser.proto @@ -442,6 +442,8 @@ type SubscribeRequestFilterSlots struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields + + FilterByCommitment *bool `protobuf:"varint,1,opt,name=filter_by_commitment,json=filterByCommitment,proto3,oneof" json:"filter_by_commitment,omitempty"` } func (x *SubscribeRequestFilterSlots) Reset() { @@ -476,6 +478,13 @@ func (*SubscribeRequestFilterSlots) Descriptor() ([]byte, []int) { return file_geyser_proto_rawDescGZIP(), []int{4} } +func (x *SubscribeRequestFilterSlots) GetFilterByCommitment() bool { + if x != nil && x.FilterByCommitment != nil { + return *x.FilterByCommitment + } + return false +} + type SubscribeRequestFilterTransactions struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -2349,9 +2358,14 @@ var file_geyser_proto_rawDesc = []byte{ 0x73, 0x12, 0x18, 0x0a, 0x06, 0x62, 0x61, 0x73, 0x65, 0x35, 0x38, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x48, 0x00, 0x52, 0x06, 0x62, 0x61, 0x73, 0x65, 0x35, 0x38, 0x12, 0x18, 0x0a, 0x06, 0x62, 0x61, 0x73, 0x65, 0x36, 0x34, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x48, 0x00, 0x52, 0x06, 0x62, - 0x61, 0x73, 0x65, 0x36, 0x34, 0x42, 0x06, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x22, 0x1d, 0x0a, + 0x61, 0x73, 0x65, 0x36, 0x34, 0x42, 0x06, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x22, 0x6d, 0x0a, 0x1b, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, - 0x74, 0x46, 0x69, 0x6c, 0x74, 0x65, 0x72, 0x53, 0x6c, 0x6f, 0x74, 0x73, 0x22, 0x9c, 0x02, 0x0a, + 0x74, 0x46, 0x69, 0x6c, 0x74, 0x65, 0x72, 0x53, 0x6c, 0x6f, 0x74, 0x73, 0x12, 0x35, 0x0a, 0x14, + 0x66, 0x69, 0x6c, 0x74, 0x65, 0x72, 0x5f, 0x62, 0x79, 0x5f, 0x63, 0x6f, 0x6d, 0x6d, 0x69, 0x74, + 0x6d, 0x65, 0x6e, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x48, 0x00, 0x52, 0x12, 0x66, 0x69, + 0x6c, 0x74, 0x65, 0x72, 0x42, 0x79, 0x43, 0x6f, 0x6d, 0x6d, 0x69, 0x74, 0x6d, 0x65, 0x6e, 0x74, + 0x88, 0x01, 0x01, 0x42, 0x17, 0x0a, 0x15, 0x5f, 0x66, 0x69, 0x6c, 0x74, 0x65, 0x72, 0x5f, 0x62, + 0x79, 0x5f, 0x63, 0x6f, 0x6d, 0x6d, 0x69, 0x74, 0x6d, 0x65, 0x6e, 0x74, 0x22, 0x9c, 0x02, 0x0a, 0x22, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x46, 0x69, 0x6c, 0x74, 0x65, 0x72, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x17, 0x0a, 0x04, 0x76, 0x6f, 0x74, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, @@ -3196,6 +3210,7 @@ func file_geyser_proto_init() { (*SubscribeRequestFilterAccountsFilterMemcmp_Base58)(nil), (*SubscribeRequestFilterAccountsFilterMemcmp_Base64)(nil), } + file_geyser_proto_msgTypes[4].OneofWrappers = []interface{}{} file_geyser_proto_msgTypes[5].OneofWrappers = []interface{}{} file_geyser_proto_msgTypes[6].OneofWrappers = []interface{}{} file_geyser_proto_msgTypes[10].OneofWrappers = []interface{}{ diff --git a/examples/golang/proto/solana-storage.pb.go b/examples/golang/proto/solana-storage.pb.go index fcbdb1cc..a6f02fff 100644 --- a/examples/golang/proto/solana-storage.pb.go +++ b/examples/golang/proto/solana-storage.pb.go @@ -1,6 +1,6 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.28.1 +// protoc-gen-go v1.31.0 // protoc v3.19.6 // source: solana-storage.proto diff --git a/examples/rust/src/bin/client.rs b/examples/rust/src/bin/client.rs index afb631e3..d03c9635 100644 --- a/examples/rust/src/bin/client.rs +++ b/examples/rust/src/bin/client.rs @@ -129,6 +129,10 @@ struct ActionSubscribe { #[clap(long)] slots: bool, + /// Filter slots by commitment + #[clap(long)] + slots_filter_by_commitment: bool, + /// Subscribe on transactions updates #[clap(long)] transactions: bool, @@ -241,7 +245,12 @@ impl Action { let mut slots: SlotsFilterMap = HashMap::new(); if args.slots { - slots.insert("client".to_owned(), SubscribeRequestFilterSlots {}); + slots.insert( + "client".to_owned(), + SubscribeRequestFilterSlots { + filter_by_commitment: Some(args.slots_filter_by_commitment), + }, + ); } let mut transactions: TransactionsFilterMap = HashMap::new(); @@ -550,7 +559,7 @@ async fn geyser_subscribe( counter += 1; if counter == resub { let mut new_slots: SlotsFilterMap = HashMap::new(); - new_slots.insert("client".to_owned(), SubscribeRequestFilterSlots {}); + new_slots.insert("client".to_owned(), SubscribeRequestFilterSlots::default()); subscribe_tx .send(SubscribeRequest { diff --git a/examples/typescript/src/client.ts b/examples/typescript/src/client.ts index 34f628f0..57953fe6 100644 --- a/examples/typescript/src/client.ts +++ b/examples/typescript/src/client.ts @@ -128,7 +128,9 @@ async function subscribeCommand(client, args) { } if (args.slots) { - request.slots.client = {}; + request.slots.client = { + filterByCommitment: args.slotsFilterByCommitment, + }; } if (args.transactions) { @@ -272,6 +274,11 @@ function parseCommandLineArgs() { describe: "subscribe on slots updates", type: "boolean", }, + "slots-filter-by-commitment": { + default: false, + describe: "filter slot messages by commitment", + type: "boolean", + }, transactions: { default: false, describe: "subscribe on transactions updates", diff --git a/yellowstone-grpc-geyser/src/filters.rs b/yellowstone-grpc-geyser/src/filters.rs index e344eabf..86d4c1d3 100644 --- a/yellowstone-grpc-geyser/src/filters.rs +++ b/yellowstone-grpc-geyser/src/filters.rs @@ -82,10 +82,14 @@ impl Filter { self.commitment } - pub fn get_filters<'a>(&self, message: &'a Message) -> Vec<(Vec, MessageRef<'a>)> { + pub fn get_filters<'a>( + &self, + message: &'a Message, + commitment: Option, + ) -> Vec<(Vec, MessageRef<'a>)> { match message { Message::Account(message) => self.accounts.get_filters(message), - Message::Slot(message) => self.slots.get_filters(message), + Message::Slot(message) => self.slots.get_filters(message, commitment), Message::Transaction(message) => self.transactions.get_filters(message), Message::Entry(message) => self.entry.get_filters(message), Message::Block(message) => self.blocks.get_filters(message), @@ -93,8 +97,12 @@ impl Filter { } } - pub fn get_update(&self, message: &Message) -> Vec { - self.get_filters(message) + pub fn get_update( + &self, + message: &Message, + commitment: Option, + ) -> Vec { + self.get_filters(message, commitment) .into_iter() .filter_map(|(filters, message)| { if filters.is_empty() { @@ -336,9 +344,22 @@ impl<'a> FilterAccountsMatch<'a> { } } +#[derive(Debug, Default, Clone, Copy)] +struct FilterSlotsInner { + filter_by_commitment: bool, +} + +impl FilterSlotsInner { + fn new(filter: &SubscribeRequestFilterSlots) -> Self { + Self { + filter_by_commitment: filter.filter_by_commitment.unwrap_or_default(), + } + } +} + #[derive(Debug, Default, Clone)] struct FilterSlots { - filters: Vec, + filters: HashMap, } impl FilterSlots { @@ -351,14 +372,29 @@ impl FilterSlots { Ok(Self { filters: configs .iter() - // .filter_map(|(name, _filter)| Some(name.clone())) - .map(|(name, _filter)| name.clone()) + .map(|(name, filter)| (name.clone(), FilterSlotsInner::new(filter))) .collect(), }) } - fn get_filters<'a>(&self, message: &'a MessageSlot) -> Vec<(Vec, MessageRef<'a>)> { - vec![(self.filters.clone(), MessageRef::Slot(message))] + fn get_filters<'a>( + &self, + message: &'a MessageSlot, + commitment: Option, + ) -> Vec<(Vec, MessageRef<'a>)> { + vec![( + self.filters + .iter() + .filter_map(|(name, inner)| { + if !inner.filter_by_commitment || commitment == Some(message.status) { + Some(name.clone()) + } else { + None + } + }) + .collect(), + MessageRef::Slot(message), + )] } } @@ -938,7 +974,7 @@ mod tests { let message_transaction = create_message_transaction(&keypair_b, vec![account_key_b, account_key_a]); let message = Message::Transaction(message_transaction); - for (filters, _message) in filter.get_filters(&message) { + for (filters, _message) in filter.get_filters(&message, None) { assert!(!filters.is_empty()); } } @@ -980,7 +1016,7 @@ mod tests { let message_transaction = create_message_transaction(&keypair_b, vec![account_key_b, account_key_a]); let message = Message::Transaction(message_transaction); - for (filters, _message) in filter.get_filters(&message) { + for (filters, _message) in filter.get_filters(&message, None) { assert!(!filters.is_empty()); } } @@ -1022,7 +1058,7 @@ mod tests { let message_transaction = create_message_transaction(&keypair_b, vec![account_key_b, account_key_a]); let message = Message::Transaction(message_transaction); - for (filters, _message) in filter.get_filters(&message) { + for (filters, _message) in filter.get_filters(&message, None) { assert!(filters.is_empty()); } } @@ -1072,7 +1108,7 @@ mod tests { vec![account_key_x, account_key_y, account_key_z], ); let message = Message::Transaction(message_transaction); - for (filters, _message) in filter.get_filters(&message) { + for (filters, _message) in filter.get_filters(&message, None) { assert!(!filters.is_empty()); } } @@ -1120,7 +1156,7 @@ mod tests { let message_transaction = create_message_transaction(&keypair_x, vec![account_key_x, account_key_z]); let message = Message::Transaction(message_transaction); - for (filters, _message) in filter.get_filters(&message) { + for (filters, _message) in filter.get_filters(&message, None) { assert!(filters.is_empty()); } } diff --git a/yellowstone-grpc-geyser/src/grpc.rs b/yellowstone-grpc-geyser/src/grpc.rs index 64e0ca34..8199231a 100644 --- a/yellowstone-grpc-geyser/src/grpc.rs +++ b/yellowstone-grpc-geyser/src/grpc.rs @@ -1082,7 +1082,7 @@ impl GrpcService { } }; - for message in filter.get_update(&message) { + for message in filter.get_update(&message, None) { if stream_tx.send(Ok(message)).await.is_err() { error!("client #{id}: stream closed"); is_alive = false; @@ -1126,7 +1126,7 @@ impl GrpcService { if commitment == filter.get_commitment_level() { for message in messages.iter() { - for message in filter.get_update(message) { + for message in filter.get_update(message, Some(commitment)) { match stream_tx.try_send(Ok(message)) { Ok(()) => {} Err(mpsc::error::TrySendError::Full(_)) => { diff --git a/yellowstone-grpc-proto/proto/geyser.proto b/yellowstone-grpc-proto/proto/geyser.proto index 293831b2..c87cd454 100644 --- a/yellowstone-grpc-proto/proto/geyser.proto +++ b/yellowstone-grpc-proto/proto/geyser.proto @@ -56,7 +56,9 @@ message SubscribeRequestFilterAccountsFilterMemcmp { } } -message SubscribeRequestFilterSlots {} +message SubscribeRequestFilterSlots { + optional bool filter_by_commitment = 1; +} message SubscribeRequestFilterTransactions { optional bool vote = 1; diff --git a/yellowstone-grpc-tools/src/config.rs b/yellowstone-grpc-tools/src/config.rs index 2edd40f5..e105f375 100644 --- a/yellowstone-grpc-tools/src/config.rs +++ b/yellowstone-grpc-tools/src/config.rs @@ -12,7 +12,7 @@ use { CommitmentLevel, SubscribeRequest, SubscribeRequestAccountsDataSlice, SubscribeRequestFilterAccounts, SubscribeRequestFilterAccountsFilter, SubscribeRequestFilterAccountsFilterMemcmp, SubscribeRequestFilterBlocks, - SubscribeRequestFilterTransactions, + SubscribeRequestFilterSlots, SubscribeRequestFilterTransactions, }, }; @@ -40,7 +40,7 @@ pub trait GrpcRequestToProto { #[derive(Debug, Default, Clone, Deserialize, Serialize)] #[serde(default)] pub struct ConfigGrpcRequest { - pub slots: HashSet, + pub slots: HashMap, pub accounts: HashMap, pub transactions: HashMap, pub entries: HashSet, @@ -67,7 +67,7 @@ impl ConfigGrpcRequest { impl GrpcRequestToProto for ConfigGrpcRequest { fn to_proto(self) -> SubscribeRequest { SubscribeRequest { - slots: ConfigGrpcRequest::set_to_proto(self.slots), + slots: ConfigGrpcRequest::map_to_proto(self.slots), accounts: ConfigGrpcRequest::map_to_proto(self.accounts), transactions: ConfigGrpcRequest::map_to_proto(self.transactions), entry: ConfigGrpcRequest::set_to_proto(self.entries), @@ -79,6 +79,20 @@ impl GrpcRequestToProto for ConfigGrpcRequest { } } +#[derive(Debug, Default, Clone, Deserialize, Serialize)] +#[serde(default)] +pub struct ConfigGrpcRequestSlots { + filter_by_commitment: Option, +} + +impl GrpcRequestToProto for ConfigGrpcRequestSlots { + fn to_proto(self) -> SubscribeRequestFilterSlots { + SubscribeRequestFilterSlots { + filter_by_commitment: self.filter_by_commitment, + } + } +} + #[derive(Debug, Default, Clone, Deserialize, Serialize)] #[serde(default)] pub struct ConfigGrpcRequestAccounts {