From 908092eea755bc81f95de4faee799e640ddcece6 Mon Sep 17 00:00:00 2001 From: Alissa Tung Date: Wed, 1 Feb 2023 16:02:01 +0800 Subject: [PATCH] more tests --- src/hstreamdb/src/common.rs | 4 +++- src/hstreamdb/src/consumer.rs | 16 ++++++++++--- src/hstreamdb/src/lib.rs | 3 ++- src/hstreamdb/tests/common.rs | 31 +++++++++++++++++++++++-- src/hstreamdb/tests/consumer_test.rs | 1 - src/hstreamdb/tests/integration_test.rs | 20 +++++++++++++++- src/x/hstreamdb-erl-nifs/src/lib.rs | 1 - 7 files changed, 66 insertions(+), 10 deletions(-) diff --git a/src/hstreamdb/src/common.rs b/src/hstreamdb/src/common.rs index 3705f26..bddd295 100644 --- a/src/hstreamdb/src/common.rs +++ b/src/hstreamdb/src/common.rs @@ -9,9 +9,11 @@ use tonic::transport; use crate::producer; +pub type SubscriptionId = String; + #[derive(Debug)] pub struct Subscription { - pub subscription_id: String, + pub subscription_id: SubscriptionId, pub stream_name: String, pub ack_timeout_seconds: i32, pub max_unacked_records: i32, diff --git a/src/hstreamdb/src/consumer.rs b/src/hstreamdb/src/consumer.rs index f1ec3c9..59f28bf 100644 --- a/src/hstreamdb/src/consumer.rs +++ b/src/hstreamdb/src/consumer.rs @@ -5,18 +5,28 @@ use prost_types::Struct; use tokio::sync::mpsc::error::SendError; use tokio::sync::mpsc::UnboundedSender; use tokio_stream::wrappers::UnboundedReceiverStream; +use tokio_stream::StreamExt; use tonic::{Request, Streaming}; use crate::client::Client; use crate::common::{self, Payload}; use crate::utils::decode_received_records; +use crate::SubscriptionId; + +pub struct ConsumerStream(UnboundedReceiverStream<(Payload, Responder)>); + +impl ConsumerStream { + pub async fn next(&mut self) -> Option<(Payload, Responder)> { + self.0.next().await + } +} impl Client { pub async fn streaming_fetch( &self, consumer_name: String, - subscription_id: String, - ) -> common::Result> { + subscription_id: SubscriptionId, + ) -> common::Result { let channel = self.lookup_subscription(subscription_id.clone()).await?; let mut channel = HStreamApiClient::connect(channel).await?; @@ -46,7 +56,7 @@ impl Client { sender, )); - Ok(UnboundedReceiverStream::new(receiver)) + Ok(ConsumerStream(UnboundedReceiverStream::new(receiver))) } } diff --git a/src/hstreamdb/src/lib.rs b/src/hstreamdb/src/lib.rs index 75c2e71..e669340 100644 --- a/src/hstreamdb/src/lib.rs +++ b/src/hstreamdb/src/lib.rs @@ -16,5 +16,6 @@ pub use channel_provider::ChannelProviderSettings; pub use client::Client; pub use common::{ CompressionType, Consumer, Error, ListValue, Payload, Record, RecordId, Result, ShardId, - SpecialOffset, Stream, StreamShardOffset, Struct, Subscription, Timestamp, + SpecialOffset, Stream, StreamShardOffset, Struct, Subscription, SubscriptionId, Timestamp, }; +pub use consumer::ConsumerStream; diff --git a/src/hstreamdb/tests/common.rs b/src/hstreamdb/tests/common.rs index b5a8a42..fc2c9d6 100644 --- a/src/hstreamdb/tests/common.rs +++ b/src/hstreamdb/tests/common.rs @@ -3,7 +3,7 @@ use std::env; use hstreamdb::appender::Appender; use hstreamdb::common::{CompressionType, SpecialOffset, Stream}; use hstreamdb::producer::{FlushSettings, Producer}; -use hstreamdb::{ChannelProviderSettings, Record, Subscription}; +use hstreamdb::{ChannelProviderSettings, ConsumerStream, Record, Subscription, SubscriptionId}; use hstreamdb_test_utils::rand_alphanumeric; pub async fn init_client() -> anyhow::Result { @@ -15,7 +15,15 @@ pub async fn init_client() -> anyhow::Result { #[tokio::test(flavor = "multi_thread")] async fn make_ci_happy() { - init_client().await.unwrap().0.list_streams().await.unwrap(); + let client = init_client().await.unwrap(); + let (stream, sub) = client.new_stream_subscription().await.unwrap(); + let mut consumer = client.new_consumer(sub.subscription_id).await.unwrap(); + let (appender, producer) = client.new_sync_producer(stream.stream_name).await.unwrap(); + appender.append(rand_raw_record(4500)).await.unwrap(); + producer.start().await; + while let Some(x) = consumer.next().await { + x.1.ack().unwrap(); + } } pub struct Client(pub hstreamdb::Client); @@ -110,4 +118,23 @@ impl Client { let subscription = self.new_subscription(stream_name).await?; Ok((stream, subscription)) } + + pub async fn new_consumer( + &self, + subscription_id: SubscriptionId, + ) -> anyhow::Result { + let fetching_stream = self + .0 + .streaming_fetch(rand_alphanumeric(20), subscription_id) + .await + .unwrap(); + Ok(fetching_stream) + } +} + +pub fn rand_raw_record(len: usize) -> Record { + Record { + partition_key: rand_alphanumeric(10), + payload: hstreamdb::Payload::RawRecord(rand_alphanumeric(len).into_bytes()), + } } diff --git a/src/hstreamdb/tests/consumer_test.rs b/src/hstreamdb/tests/consumer_test.rs index 1c3832e..9c42962 100644 --- a/src/hstreamdb/tests/consumer_test.rs +++ b/src/hstreamdb/tests/consumer_test.rs @@ -6,7 +6,6 @@ use hstreamdb::producer::FlushSettings; use hstreamdb::{ChannelProviderSettings, Subscription}; use hstreamdb_pb::{SpecialOffset, Stream}; use hstreamdb_test_utils::rand_alphanumeric; -use tokio_stream::StreamExt; #[tokio::test(flavor = "multi_thread")] async fn test_consumer() { diff --git a/src/hstreamdb/tests/integration_test.rs b/src/hstreamdb/tests/integration_test.rs index 6701c09..ff79870 100644 --- a/src/hstreamdb/tests/integration_test.rs +++ b/src/hstreamdb/tests/integration_test.rs @@ -1,6 +1,6 @@ mod common; -use common::init_client; +use common::{init_client, rand_raw_record}; #[tokio::test(flavor = "multi_thread")] async fn utils_base_test() { @@ -21,3 +21,21 @@ async fn utils_base_test() { client.new_stream_subscription().await.unwrap(); } + +#[tokio::test(flavor = "multi_thread")] +async fn sync_producer_should_be_sync() { + let client = init_client().await.unwrap(); + + let (stream, sub) = client.new_stream_subscription().await.unwrap(); + let stream_name = &stream.stream_name; + let (appender, producer) = client.new_sync_producer(stream_name).await.unwrap(); + let mut fetching_stream = client.new_consumer(sub.subscription_id).await.unwrap(); + + tokio::spawn(producer.start()); + + for _ in 0..50 { + appender.append(rand_raw_record(200)).await.unwrap(); + let (_, responder) = fetching_stream.next().await.unwrap(); + responder.ack().unwrap() + } +} diff --git a/src/x/hstreamdb-erl-nifs/src/lib.rs b/src/x/hstreamdb-erl-nifs/src/lib.rs index 52cd3dd..e365fbe 100644 --- a/src/x/hstreamdb-erl-nifs/src/lib.rs +++ b/src/x/hstreamdb-erl-nifs/src/lib.rs @@ -20,7 +20,6 @@ use rustler::{ ResourceArc, Term, }; use tokio::sync::{oneshot, Mutex, MutexGuard}; -use tokio_stream::StreamExt; use tonic::transport::{Certificate, ClientTlsConfig, Identity}; mod runtime;