Skip to content

Commit

Permalink
remove GeyserGrpcBuilder::build
Browse files Browse the repository at this point in the history
  • Loading branch information
fanatid committed Mar 30, 2024
1 parent b187007 commit 014b04a
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 41 deletions.
3 changes: 1 addition & 2 deletions examples/rust/src/bin/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,7 @@ impl Args {
.connect_timeout(Duration::from_secs(10))
.timeout(Duration::from_secs(10))
.connect()
.await?
.build()
.await
.map_err(Into::into)
}
}
Expand Down
3 changes: 1 addition & 2 deletions examples/rust/src/bin/subscribe-ping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ async fn main() -> anyhow::Result<()> {
let mut client = GeyserGrpcClient::build_from_shared(args.endpoint)?
.x_token(args.x_token)?
.connect()
.await?
.build()?;
.await?;
let (mut subscribe_tx, mut stream) = client.subscribe().await?;

futures::try_join!(
Expand Down
3 changes: 1 addition & 2 deletions examples/rust/src/bin/tx-blocktime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,7 @@ async fn main() -> anyhow::Result<()> {
let mut client = GeyserGrpcClient::build_from_shared(args.endpoint)?
.x_token(args.x_token)?
.connect()
.await?
.build()?;
.await?;
let (mut subscribe_tx, mut stream) = client.subscribe().await?;

let commitment: CommitmentLevel = args.commitment.unwrap_or_default().into();
Expand Down
55 changes: 24 additions & 31 deletions yellowstone-grpc-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ pub struct InterceptorXToken {
pub x_token: Option<AsciiMetadataValue>,
}

impl From<Option<AsciiMetadataValue>> for InterceptorXToken {
fn from(x_token: Option<AsciiMetadataValue>) -> Self {
Self { x_token }
}
}

impl Interceptor for InterceptorXToken {
fn call(&mut self, mut request: Request<()>) -> Result<Request<()>, Status> {
if let Some(x_token) = self.x_token.clone() {
Expand Down Expand Up @@ -209,10 +215,7 @@ pub type GeyserGrpcBuilderResult<T> = Result<T, GeyserGrpcBuilderError>;
#[derive(Debug)]
pub struct GeyserGrpcBuilder {
pub endpoint: Endpoint,
pub channel: Option<Channel>,

pub x_token: Option<AsciiMetadataValue>,

pub send_compressed: Option<CompressionEncoding>,
pub accept_compressed: Option<CompressionEncoding>,
pub max_decoding_message_size: Option<usize>,
Expand All @@ -224,10 +227,7 @@ impl GeyserGrpcBuilder {
fn new(endpoint: Endpoint) -> Self {
Self {
endpoint,
channel: None,

x_token: None,

send_compressed: None,
accept_compressed: None,
max_decoding_message_size: None,
Expand All @@ -244,14 +244,11 @@ impl GeyserGrpcBuilder {
}

// Create client
pub fn build(mut self) -> GeyserGrpcBuilderResult<GeyserGrpcClient<impl Interceptor>> {
let channel = self
.channel
.take()
.ok_or(GeyserGrpcBuilderError::EmptyChannel)?;
let interceptor = InterceptorXToken {
x_token: self.x_token,
};
fn build(
self,
channel: Channel,
) -> GeyserGrpcBuilderResult<GeyserGrpcClient<impl Interceptor>> {
let interceptor: InterceptorXToken = self.x_token.into();

let mut geyser = GeyserClient::with_interceptor(channel.clone(), interceptor.clone());
if let Some(encoding) = self.send_compressed {
Expand All @@ -273,6 +270,16 @@ impl GeyserGrpcBuilder {
))
}

pub async fn connect(self) -> GeyserGrpcBuilderResult<GeyserGrpcClient<impl Interceptor>> {
let channel = self.endpoint.connect().await?;
self.build(channel)
}

pub fn connect_lazy(self) -> GeyserGrpcBuilderResult<GeyserGrpcClient<impl Interceptor>> {
let channel = self.endpoint.connect_lazy();
self.build(channel)
}

// Set x-token
pub fn x_token<T>(self, x_token: Option<T>) -> GeyserGrpcBuilderResult<Self>
where
Expand All @@ -294,20 +301,6 @@ impl GeyserGrpcBuilder {
}

// Endpoint options
pub async fn connect(self) -> GeyserGrpcBuilderResult<Self> {
Ok(Self {
channel: Some(self.endpoint.connect().await?),
..self
})
}

pub fn connect_lazy(self) -> Self {
Self {
channel: Some(self.endpoint.connect_lazy()),
..self
}
}

pub fn connect_timeout(self, dur: Duration) -> Self {
Self {
endpoint: self.endpoint.connect_timeout(dur),
Expand Down Expand Up @@ -437,7 +430,7 @@ mod tests {
let res = res.unwrap().x_token(Some(x_token));
assert!(res.is_ok());

let res = res.unwrap().connect_lazy().build();
let res = res.unwrap().connect_lazy();
assert!(res.is_ok());
}

Expand All @@ -452,7 +445,7 @@ mod tests {
let res = res.unwrap().x_token(Some(x_token));
assert!(res.is_ok());

let res = res.unwrap().connect_lazy().build();
let res = res.unwrap().connect_lazy();
assert!(res.is_ok());
}

Expand Down Expand Up @@ -481,7 +474,7 @@ mod tests {
let res = res.unwrap().x_token::<String>(None);
assert!(res.is_ok());

let res = res.unwrap().connect_lazy().build();
let res = res.unwrap().connect_lazy();
assert!(res.is_ok());
}

Expand Down
3 changes: 1 addition & 2 deletions yellowstone-grpc-tools/src/bin/grpc-google-pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,7 @@ impl ArgsAction {
.timeout(Duration::from_secs(5))
.max_decoding_message_size(config.max_message_size)
.connect()
.await?
.build()?;
.await?;
let mut geyser = client.subscribe_once(config.request.to_proto()).await?;

// Receive-send loop
Expand Down
3 changes: 1 addition & 2 deletions yellowstone-grpc-tools/src/bin/grpc-kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,8 +231,7 @@ impl ArgsAction {
.connect_timeout(Duration::from_secs(10))
.timeout(Duration::from_secs(5))
.connect()
.await?
.build()?;
.await?;
let mut geyser = client.subscribe_once(config.request.to_proto()).await?;

// Receive-send loop
Expand Down

0 comments on commit 014b04a

Please sign in to comment.