Skip to content

Commit

Permalink
more tests
Browse files Browse the repository at this point in the history
  • Loading branch information
alissa-tung committed Feb 17, 2023
1 parent 9388ae7 commit a42c542
Show file tree
Hide file tree
Showing 7 changed files with 64 additions and 9 deletions.
4 changes: 3 additions & 1 deletion src/hstreamdb/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@ use tonic::transport;

use crate::producer;

pub type SubscriptionId = String;

#[derive(Debug)]
pub struct Subscription {
pub subscription_id: String,
pub subscription_id: SubscriptionId,
pub stream_name: String,
pub ack_timeout_seconds: i32,
pub max_unacked_records: i32,
Expand Down
13 changes: 11 additions & 2 deletions src/hstreamdb/src/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,27 @@ use prost_types::Struct;
use tokio::sync::mpsc::error::SendError;
use tokio::sync::mpsc::UnboundedSender;
use tokio_stream::wrappers::UnboundedReceiverStream;
use tokio_stream::StreamExt;
use tonic::{Request, Streaming};

use crate::client::Client;
use crate::common::{self, Payload};
use crate::utils::decode_received_records;

pub struct ConsumerStream(UnboundedReceiverStream<(Payload, Responder)>);

impl ConsumerStream {
pub async fn next(&mut self) -> Option<(Payload, Responder)> {
self.0.next().await
}
}

impl Client {
pub async fn streaming_fetch(
&self,
consumer_name: String,
subscription_id: String,
) -> common::Result<UnboundedReceiverStream<(Payload, Responder)>> {
) -> common::Result<ConsumerStream> {
let url = self.lookup_subscription(subscription_id.clone()).await?;
log::debug!("lookup subscription for {subscription_id}, url = {url}");
let mut channel = self.channels.channel_at(url).await?;
Expand Down Expand Up @@ -47,7 +56,7 @@ impl Client {
sender,
));

Ok(UnboundedReceiverStream::new(receiver))
Ok(ConsumerStream(UnboundedReceiverStream::new(receiver)))
}
}

Expand Down
3 changes: 2 additions & 1 deletion src/hstreamdb/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,6 @@ pub use channel_provider::ChannelProviderSettings;
pub use client::Client;
pub use common::{
CompressionType, Consumer, Error, ListValue, Payload, Record, RecordId, Result, ShardId,
SpecialOffset, Stream, StreamShardOffset, Struct, Subscription, Timestamp,
SpecialOffset, Stream, StreamShardOffset, Struct, Subscription, SubscriptionId, Timestamp,
};
pub use consumer::ConsumerStream;
31 changes: 29 additions & 2 deletions src/hstreamdb/tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::env;
use hstreamdb::appender::Appender;
use hstreamdb::common::{CompressionType, SpecialOffset, Stream};
use hstreamdb::producer::{FlushSettings, Producer};
use hstreamdb::{ChannelProviderSettings, Record, Subscription};
use hstreamdb::{ChannelProviderSettings, ConsumerStream, Record, Subscription, SubscriptionId};
use hstreamdb_test_utils::rand_alphanumeric;

pub async fn init_client() -> anyhow::Result<Client> {
Expand All @@ -15,7 +15,15 @@ pub async fn init_client() -> anyhow::Result<Client> {

#[tokio::test(flavor = "multi_thread")]
async fn make_ci_happy() {
init_client().await.unwrap().0.list_streams().await.unwrap();
let client = init_client().await.unwrap();
let (stream, sub) = client.new_stream_subscription().await.unwrap();
let mut consumer = client.new_consumer(sub.subscription_id).await.unwrap();
let (appender, producer) = client.new_sync_producer(stream.stream_name).await.unwrap();
appender.append(rand_raw_record(4500)).await.unwrap();
producer.start().await;
while let Some(x) = consumer.next().await {
x.1.ack().unwrap();
}
}

pub struct Client(pub hstreamdb::Client);
Expand Down Expand Up @@ -110,4 +118,23 @@ impl Client {
let subscription = self.new_subscription(stream_name).await?;
Ok((stream, subscription))
}

pub async fn new_consumer(
&self,
subscription_id: SubscriptionId,
) -> anyhow::Result<ConsumerStream> {
let fetching_stream = self
.0
.streaming_fetch(rand_alphanumeric(20), subscription_id)
.await
.unwrap();
Ok(fetching_stream)
}
}

pub fn rand_raw_record(len: usize) -> Record {
Record {
partition_key: rand_alphanumeric(10),
payload: hstreamdb::Payload::RawRecord(rand_alphanumeric(len).into_bytes()),
}
}
1 change: 0 additions & 1 deletion src/hstreamdb/tests/consumer_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use hstreamdb::producer::FlushSettings;
use hstreamdb::{ChannelProviderSettings, Subscription};
use hstreamdb_pb::{SpecialOffset, Stream};
use hstreamdb_test_utils::rand_alphanumeric;
use tokio_stream::StreamExt;

#[tokio::test(flavor = "multi_thread")]
async fn test_consumer() {
Expand Down
20 changes: 19 additions & 1 deletion src/hstreamdb/tests/integration_test.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
mod common;

use common::init_client;
use common::{init_client, rand_raw_record};

#[tokio::test(flavor = "multi_thread")]
async fn utils_base_test() {
Expand All @@ -21,3 +21,21 @@ async fn utils_base_test() {

client.new_stream_subscription().await.unwrap();
}

#[tokio::test(flavor = "multi_thread")]
async fn sync_producer_should_be_sync() {
let client = init_client().await.unwrap();

let (stream, sub) = client.new_stream_subscription().await.unwrap();
let stream_name = &stream.stream_name;
let (appender, producer) = client.new_sync_producer(stream_name).await.unwrap();
let mut fetching_stream = client.new_consumer(sub.subscription_id).await.unwrap();

tokio::spawn(producer.start());

for _ in 0..50 {
appender.append(rand_raw_record(200)).await.unwrap();
let (_, responder) = fetching_stream.next().await.unwrap();
responder.ack().unwrap()
}
}
1 change: 0 additions & 1 deletion src/x/hstreamdb-erl-nifs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use rustler::{
ResourceArc, Term,
};
use tokio::sync::{oneshot, Mutex, MutexGuard};
use tokio_stream::StreamExt;
use tonic::transport::{Certificate, ClientTlsConfig, Identity};

mod runtime;
Expand Down

0 comments on commit a42c542

Please sign in to comment.