Skip to content

Commit

Permalink
chore: Remove excess boxing in query stream
Browse files Browse the repository at this point in the history
During refactoring to pass 'buf lint' checks, I introduced a query stream
trait object, which turned out to not be necessary in the final version of
the updated code. This commit removes the indirection.

Signed-off-by: Andrew Lilley Brinker <[email protected]>
  • Loading branch information
alilleybrinker committed Aug 29, 2024
1 parent 9cdc659 commit 1594100
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 16 deletions.
13 changes: 5 additions & 8 deletions plugins/dummy_rand_data/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use std::{
pin::Pin,
};
use tokio::sync::mpsc::{self, error::TrySendError};
use tonic::Status;
use tonic::{Status, Streaming};

#[derive(Debug)]
pub struct Query {
Expand Down Expand Up @@ -230,12 +230,9 @@ impl Drop for QuerySession {
}
}

type PluginQueryStream =
Box<dyn Stream<Item = Result<InitiateQueryProtocolRequest, Status>> + Send + Unpin + 'static>;

pub struct HcSessionSocket {
tx: mpsc::Sender<Result<InitiateQueryProtocolResponse, Status>>,
rx: PluginQueryStream,
rx: Streaming<InitiateQueryProtocolRequest>,
drop_tx: mpsc::Sender<i32>,
drop_rx: mpsc::Receiver<i32>,
sessions: SessionTracker,
Expand All @@ -258,15 +255,15 @@ impl std::fmt::Debug for HcSessionSocket {
impl HcSessionSocket {
pub fn new(
tx: mpsc::Sender<Result<InitiateQueryProtocolResponse, Status>>,
rx: impl Stream<Item = Result<InitiateQueryProtocolRequest, Status>> + Send + Unpin + 'static,
rx: Streaming<InitiateQueryProtocolRequest>,
) -> Self {
// channel for QuerySession objects to notify us they dropped
// @Todo - make this configurable
let (drop_tx, drop_rx) = mpsc::channel(10);

Self {
tx,
rx: Box::new(rx),
rx,
drop_tx,
drop_rx,
sessions: HashMap::new(),
Expand All @@ -286,7 +283,7 @@ impl HcSessionSocket {
}

async fn message(&mut self) -> Result<Option<PluginQuery>, Status> {
let fut = poll_fn(|cx| Pin::new(&mut *self.rx).poll_next(cx));
let fut = poll_fn(|cx| Pin::new(&mut self.rx).poll_next(cx));

match fut.await {
Some(Ok(m)) => Ok(m.query),
Expand Down
13 changes: 5 additions & 8 deletions plugins/dummy_sha256/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use std::{
pin::Pin,
};
use tokio::sync::mpsc::{self, error::TrySendError};
use tonic::Status;
use tonic::{Status, Streaming};

#[derive(Debug)]
pub struct Query {
Expand Down Expand Up @@ -230,12 +230,9 @@ impl Drop for QuerySession {
}
}

type PluginQueryStream =
Box<dyn Stream<Item = Result<InitiateQueryProtocolRequest, Status>> + Send + Unpin + 'static>;

pub struct HcSessionSocket {
tx: mpsc::Sender<Result<InitiateQueryProtocolResponse, Status>>,
rx: PluginQueryStream,
rx: Streaming<InitiateQueryProtocolRequest>,
drop_tx: mpsc::Sender<i32>,
drop_rx: mpsc::Receiver<i32>,
sessions: SessionTracker,
Expand All @@ -258,15 +255,15 @@ impl std::fmt::Debug for HcSessionSocket {
impl HcSessionSocket {
pub fn new(
tx: mpsc::Sender<Result<InitiateQueryProtocolResponse, Status>>,
rx: impl Stream<Item = Result<InitiateQueryProtocolRequest, Status>> + Send + Unpin + 'static,
rx: Streaming<InitiateQueryProtocolRequest>,
) -> Self {
// channel for QuerySession objects to notify us they dropped
// @Todo - make this configurable
let (drop_tx, drop_rx) = mpsc::channel(10);

Self {
tx,
rx: Box::new(rx),
rx,
drop_tx,
drop_rx,
sessions: HashMap::new(),
Expand All @@ -286,7 +283,7 @@ impl HcSessionSocket {
}

async fn message(&mut self) -> Result<Option<PluginQuery>, Status> {
let fut = poll_fn(|cx| Pin::new(&mut *self.rx).poll_next(cx));
let fut = poll_fn(|cx| Pin::new(&mut self.rx).poll_next(cx));

match fut.await {
Some(Ok(m)) => Ok(m.query),
Expand Down

0 comments on commit 1594100

Please sign in to comment.