Skip to content

Commit

Permalink
geyser: wrap messages to Arc (#315)
Browse files Browse the repository at this point in the history
  • Loading branch information
fanatid authored Apr 4, 2024
1 parent a2bb340 commit e3e3f61
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 25 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ The minor version will be incremented upon a breaking change and the patch versi
- geyser: add `transactions_status` filter ([#310](https://github.com/rpcpool/yellowstone-grpc/pull/310))
- geyser: add metric `slot_status_plugin` ([#312](https://github.com/rpcpool/yellowstone-grpc/pull/312))
- geyser: wrap `geyser_loop` with `unconstrained` ([#313](https://github.com/rpcpool/yellowstone-grpc/pull/313))
- geyser: wrap messages to `Arc` ([#315](https://github.com/rpcpool/yellowstone-grpc/pull/315))

### Breaking

Expand Down
46 changes: 23 additions & 23 deletions yellowstone-grpc-geyser/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -655,7 +655,7 @@ impl BlockMetaStorage {

#[derive(Debug, Default)]
struct SlotMessages {
messages: Vec<Option<Message>>, // Option is used for accounts with low write_version
messages: Vec<Option<Arc<Message>>>, // Option is used for accounts with low write_version
block_meta: Option<MessageBlockMeta>,
transactions: Vec<MessageTransactionInfo>,
accounts_dedup: HashMap<Pubkey, (u64, usize)>, // (write_version, message_index)
Expand All @@ -667,7 +667,7 @@ struct SlotMessages {
}

impl SlotMessages {
pub fn try_seal(&mut self) -> Option<Message> {
pub fn try_seal(&mut self) -> Option<Arc<Message>> {
if !self.sealed {
if let Some(block_meta) = &self.block_meta {
let executed_transaction_count = block_meta.executed_transaction_count as usize;
Expand All @@ -686,15 +686,15 @@ impl SlotMessages {

let mut accounts = Vec::with_capacity(self.messages.len());
for item in self.messages.iter().flatten() {
if let Message::Account(account) = item {
if let Message::Account(account) = item.as_ref() {
accounts.push(account.account.clone());
}
}

let message = Message::Block(
let message = Arc::new(Message::Block(
(block_meta.clone(), transactions, accounts, entries).into(),
);
self.messages.push(Some(message.clone()));
));
self.messages.push(Some(Arc::clone(&message)));

self.sealed = true;
self.entries_count = entries_count;
Expand All @@ -713,7 +713,7 @@ pub struct GrpcService {
blocks_meta: Option<BlockMetaStorage>,
subscribe_id: AtomicUsize,
snapshot_rx: Mutex<Option<crossbeam_channel::Receiver<Option<Message>>>>,
broadcast_tx: broadcast::Sender<(CommitmentLevel, Arc<Vec<Message>>)>,
broadcast_tx: broadcast::Sender<(CommitmentLevel, Arc<Vec<Arc<Message>>>)>,
}

impl GrpcService {
Expand All @@ -724,7 +724,7 @@ impl GrpcService {
is_reload: bool,
) -> anyhow::Result<(
Option<crossbeam_channel::Sender<Option<Message>>>,
mpsc::UnboundedSender<Message>,
mpsc::UnboundedSender<Arc<Message>>,
Arc<Notify>,
)> {
// Bind service address
Expand Down Expand Up @@ -811,9 +811,9 @@ impl GrpcService {
}

async fn geyser_loop(
mut messages_rx: mpsc::UnboundedReceiver<Message>,
mut messages_rx: mpsc::UnboundedReceiver<Arc<Message>>,
blocks_meta_tx: Option<mpsc::UnboundedSender<Message>>,
broadcast_tx: broadcast::Sender<(CommitmentLevel, Arc<Vec<Message>>)>,
broadcast_tx: broadcast::Sender<(CommitmentLevel, Arc<Vec<Arc<Message>>>)>,
block_fail_action: ConfigBlockFailAction,
) {
const PROCESSED_MESSAGES_MAX: usize = 31;
Expand All @@ -831,19 +831,19 @@ impl GrpcService {
MESSAGE_QUEUE_SIZE.dec();

// Update metrics
if let Message::Slot(slot_message) = &message {
if let Message::Slot(slot_message) = message.as_ref() {
prom::update_slot_plugin_status(slot_message.status, slot_message.slot);
}

// Update blocks info
if let Some(blocks_meta_tx) = &blocks_meta_tx {
if matches!(message, Message::Slot(_) | Message::BlockMeta(_)) {
let _ = blocks_meta_tx.send(message.clone());
if matches!(message.as_ref(), Message::Slot(_) | Message::BlockMeta(_)) {
let _ = blocks_meta_tx.send(message.as_ref().clone());
}
}

// Remove outdated block reconstruction info
match &message {
match message.as_ref() {
// On startup we can receive few Confirmed/Finalized slots without BlockMeta message
// With saved first Processed slot we can ignore errors caused by startup process
Message::Slot(msg) if processed_first_slot.is_none() && msg.status == CommitmentLevel::Processed => {
Expand Down Expand Up @@ -904,11 +904,11 @@ impl GrpcService {

// Update block reconstruction info
let slot_messages = messages.entry(message.get_slot()).or_default();
if !matches!(message, Message::Slot(_)) {
slot_messages.messages.push(Some(message.clone()));
if !matches!(message.as_ref(), Message::Slot(_)) {
slot_messages.messages.push(Some(Arc::clone(&message)));

// If we already build Block message, new message will be a problem
if slot_messages.sealed && !(matches!(message, Message::Entry(_)) && slot_messages.entries_count == 0) {
if slot_messages.sealed && !(matches!(message.as_ref(), Message::Entry(_)) && slot_messages.entries_count == 0) {
prom::update_invalid_blocks(format!("unexpected message {}", message.kind()));
match block_fail_action {
ConfigBlockFailAction::Log => {
Expand All @@ -921,7 +921,7 @@ impl GrpcService {
}
}
let mut sealed_block_msg = None;
match &message {
match message.as_ref() {
Message::BlockMeta(msg) => {
if slot_messages.block_meta.is_some() {
prom::update_invalid_blocks("unexpected message: BlockMeta (duplicate)");
Expand Down Expand Up @@ -969,7 +969,7 @@ impl GrpcService {
}

for message in messages_vec {
if let Message::Slot(slot) = message {
if let Message::Slot(slot) = message.as_ref() {
let (mut confirmed_messages, mut finalized_messages) = match slot.status {
CommitmentLevel::Processed => {
(Vec::with_capacity(1), Vec::with_capacity(1))
Expand Down Expand Up @@ -1003,7 +1003,7 @@ impl GrpcService {
};

// processed
processed_messages.push(message.clone());
processed_messages.push(Arc::clone(&message));
let _ =
broadcast_tx.send((CommitmentLevel::Processed, processed_messages.into()));
processed_messages = Vec::with_capacity(PROCESSED_MESSAGES_MAX);
Expand All @@ -1012,7 +1012,7 @@ impl GrpcService {
.reset(Instant::now() + PROCESSED_MESSAGES_SLEEP);

// confirmed
confirmed_messages.push(message.clone());
confirmed_messages.push(Arc::clone(&message));
let _ =
broadcast_tx.send((CommitmentLevel::Confirmed, confirmed_messages.into()));

Expand All @@ -1023,7 +1023,7 @@ impl GrpcService {
} else {
let mut confirmed_messages = vec![];
let mut finalized_messages = vec![];
if matches!(message, Message::Block(_)) {
if matches!(message.as_ref(), Message::Block(_)) {
if let Some(slot_messages) = messages.get(&message.get_slot()) {
if let Some(confirmed_at) = slot_messages.confirmed_at {
confirmed_messages.extend(
Expand Down Expand Up @@ -1081,7 +1081,7 @@ impl GrpcService {
stream_tx: mpsc::Sender<TonicResult<SubscribeUpdate>>,
mut client_rx: mpsc::UnboundedReceiver<Option<Filter>>,
mut snapshot_rx: Option<crossbeam_channel::Receiver<Option<Message>>>,
mut messages_rx: broadcast::Receiver<(CommitmentLevel, Arc<Vec<Message>>)>,
mut messages_rx: broadcast::Receiver<(CommitmentLevel, Arc<Vec<Arc<Message>>>)>,
drop_client: impl FnOnce(),
) {
CONNECTIONS_TOTAL.inc();
Expand Down
4 changes: 2 additions & 2 deletions yellowstone-grpc-geyser/src/plugin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@ use {
pub struct PluginInner {
runtime: Runtime,
snapshot_channel: Option<crossbeam_channel::Sender<Option<Message>>>,
grpc_channel: mpsc::UnboundedSender<Message>,
grpc_channel: mpsc::UnboundedSender<Arc<Message>>,
grpc_shutdown: Arc<Notify>,
prometheus: PrometheusService,
}

impl PluginInner {
fn send_message(&self, message: Message) {
if self.grpc_channel.send(message).is_ok() {
if self.grpc_channel.send(Arc::new(message)).is_ok() {
MESSAGE_QUEUE_SIZE.inc();
}
}
Expand Down

0 comments on commit e3e3f61

Please sign in to comment.