Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tests: integration test #102

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/hstreamdb/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@ use tonic::transport;

use crate::producer;

pub type SubscriptionId = String;

#[derive(Debug)]
pub struct Subscription {
pub subscription_id: String,
pub subscription_id: SubscriptionId,
pub stream_name: String,
pub ack_timeout_seconds: i32,
pub max_unacked_records: i32,
Expand Down
16 changes: 13 additions & 3 deletions src/hstreamdb/src/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,28 @@ use prost_types::Struct;
use tokio::sync::mpsc::error::SendError;
use tokio::sync::mpsc::UnboundedSender;
use tokio_stream::wrappers::UnboundedReceiverStream;
use tokio_stream::StreamExt;
use tonic::{Request, Streaming};

use crate::client::Client;
use crate::common::{self, Payload};
use crate::utils::decode_received_records;
use crate::SubscriptionId;

pub struct ConsumerStream(UnboundedReceiverStream<(Payload, Responder)>);

impl ConsumerStream {
pub async fn next(&mut self) -> Option<(Payload, Responder)> {
self.0.next().await
}
}

impl Client {
pub async fn streaming_fetch(
&self,
consumer_name: String,
subscription_id: String,
) -> common::Result<UnboundedReceiverStream<(Payload, Responder)>> {
subscription_id: SubscriptionId,
) -> common::Result<ConsumerStream> {
let url = self.lookup_subscription(subscription_id.clone()).await?;
log::debug!("lookup subscription for {subscription_id}, url = {url}");
let mut channel = self.channels.channel_at(url).await?;
Expand Down Expand Up @@ -47,7 +57,7 @@ impl Client {
sender,
));

Ok(UnboundedReceiverStream::new(receiver))
Ok(ConsumerStream(UnboundedReceiverStream::new(receiver)))
}
}

Expand Down
3 changes: 2 additions & 1 deletion src/hstreamdb/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,6 @@ pub use channel_provider::ChannelProviderSettings;
pub use client::Client;
pub use common::{
CompressionType, Consumer, Error, ListValue, Payload, Record, RecordId, Result, ShardId,
SpecialOffset, Stream, StreamShardOffset, Struct, Subscription, Timestamp,
SpecialOffset, Stream, StreamShardOffset, Struct, Subscription, SubscriptionId, Timestamp,
};
pub use consumer::ConsumerStream;
8 changes: 8 additions & 0 deletions src/hstreamdb/src/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,14 @@ impl FlushSettings {
pub fn builder() -> FlushSettingsBuilder {
default()
}

pub fn sync() -> Self {
Self {
len: 0,
size: 0,
deadline: None,
}
}
}

#[derive(Default)]
Expand Down
140 changes: 140 additions & 0 deletions src/hstreamdb/tests/common.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
use std::env;

use hstreamdb::appender::Appender;
use hstreamdb::common::{CompressionType, SpecialOffset, Stream};
use hstreamdb::producer::{FlushSettings, Producer};
use hstreamdb::{ChannelProviderSettings, ConsumerStream, Record, Subscription, SubscriptionId};
use hstreamdb_test_utils::rand_alphanumeric;

pub async fn init_client() -> anyhow::Result<Client> {
let server_url = env::var("TEST_SERVER_ADDR").unwrap();
let channel_provider_settings = ChannelProviderSettings::default();
let client = hstreamdb::Client::new(server_url, channel_provider_settings).await?;
Ok(Client(client))
}

#[tokio::test(flavor = "multi_thread")]
async fn make_ci_happy() {
let client = init_client().await.unwrap();
let (stream, sub) = client.new_stream_subscription().await.unwrap();
let mut consumer = client.new_consumer(sub.subscription_id).await.unwrap();
let (appender, producer) = client.new_sync_producer(stream.stream_name).await.unwrap();
appender.append(rand_raw_record(4500)).await.unwrap();
producer.start().await;
while let Some(x) = consumer.next().await {
x.1.ack().unwrap();
}
}

pub struct Client(pub hstreamdb::Client);

impl Client {
pub async fn new_stream(&self) -> anyhow::Result<Stream> {
let stream_name = rand_alphanumeric(20);
let stream = self
.0
.create_stream(Stream {
stream_name,
replication_factor: 3,
backlog_duration: 60 * 60 * 24,
shard_count: 20,
creation_time: None,
})
.await?;
Ok(stream)
}

pub async fn new_subscription<T: Into<String>>(
&self,
stream_name: T,
) -> anyhow::Result<Subscription> {
let subscription = self
.0
.create_subscription(Subscription {
subscription_id: rand_alphanumeric(20),
stream_name: stream_name.into(),
ack_timeout_seconds: 60 * 15,
max_unacked_records: 1000,
offset: SpecialOffset::Earliest,
creation_time: None,
})
.await?;
Ok(subscription)
}

pub async fn new_sync_producer<T: Into<String>>(
&self,
stream_name: T,
) -> anyhow::Result<(Appender, Producer)> {
let producer = self
.0
.new_producer(
stream_name.into(),
CompressionType::None,
None,
FlushSettings::sync(),
ChannelProviderSettings::default(),
None,
)
.await?;
Ok(producer)
}

pub async fn write_rand<T: Into<String>>(
&self,
stream_name: T,
appender_num: usize,
record_num: usize,
payload_size: usize,
) -> anyhow::Result<()> {
let (appender, producer) = self.new_sync_producer(stream_name).await?;

for _ in 0..appender_num {
let appender = appender.clone();
tokio::spawn(async move {
for _ in 0..record_num {
let payload = rand_alphanumeric(payload_size);
match appender
.append(Record {
partition_key: "".to_string(),
payload: hstreamdb::Payload::RawRecord(payload.into_bytes()),
})
.await
{
Ok(_) => (),
Err(err) => log::error!("{}", err),
};
}
});
}
drop(appender);
producer.start().await;
Ok(())
}

pub async fn new_stream_subscription(&self) -> anyhow::Result<(Stream, Subscription)> {
let stream = self.new_stream().await?;
let stream_name = stream.stream_name.clone();
let subscription = self.new_subscription(stream_name).await?;
Ok((stream, subscription))
}

pub async fn new_consumer(
&self,
subscription_id: SubscriptionId,
) -> anyhow::Result<ConsumerStream> {
let fetching_stream = self
.0
.streaming_fetch(rand_alphanumeric(20), subscription_id)
.await
.unwrap();
Ok(fetching_stream)
}
}

pub fn rand_raw_record(len: usize) -> Record {
Record {
partition_key: rand_alphanumeric(10),
payload: hstreamdb::Payload::RawRecord(rand_alphanumeric(len).into_bytes()),
}
}
1 change: 0 additions & 1 deletion src/hstreamdb/tests/consumer_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use hstreamdb::producer::FlushSettings;
use hstreamdb::{ChannelProviderSettings, Subscription};
use hstreamdb_pb::{SpecialOffset, Stream};
use hstreamdb_test_utils::rand_alphanumeric;
use tokio_stream::StreamExt;

#[tokio::test(flavor = "multi_thread")]
async fn test_consumer() {
Expand Down
41 changes: 41 additions & 0 deletions src/hstreamdb/tests/integration_test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
mod common;

use common::{init_client, rand_raw_record};

#[tokio::test(flavor = "multi_thread")]
async fn utils_base_test() {
let client = init_client().await.unwrap();

let stream = client.new_stream().await.unwrap();
let stream_name = &stream.stream_name;
let _subscription = client.new_subscription(stream_name).await.unwrap();
let _producer = client.new_sync_producer(stream_name).await.unwrap();

let appender_num = 5;
let record_num = 50;
let payload_size = 2000;
client
.write_rand(stream_name, appender_num, record_num, payload_size)
.await
.unwrap();

client.new_stream_subscription().await.unwrap();
}

#[tokio::test(flavor = "multi_thread")]
async fn sync_producer_should_be_sync() {
let client = init_client().await.unwrap();

let (stream, sub) = client.new_stream_subscription().await.unwrap();
let stream_name = &stream.stream_name;
let (appender, producer) = client.new_sync_producer(stream_name).await.unwrap();
let mut fetching_stream = client.new_consumer(sub.subscription_id).await.unwrap();

tokio::spawn(producer.start());

for _ in 0..50 {
appender.append(rand_raw_record(200)).await.unwrap();
let (_, responder) = fetching_stream.next().await.unwrap();
responder.ack().unwrap()
}
}
81 changes: 49 additions & 32 deletions src/hstreamdb/tests/tls_test.rs
Original file line number Diff line number Diff line change
@@ -1,32 +1,49 @@
// use std::{env, fs};

// use hstreamdb::tls::{Certificate, ClientTlsConfig, Identity};
// use hstreamdb::{ChannelProviderSettings, Client};

// async fn _test_tls_impl() {
// env::set_var("RUST_LOG", "DEBUG");
// env_logger::init();

// let server_url: &str = todo!();
// let tls_dir: &str = todo!();

// let ca_certificate =
// Certificate::from_pem(fs::read(format!("{tls_dir}/root_ca.crt")).unwrap());
// let cert = fs::read(format!("{tls_dir}/client.crt")).unwrap();
// let key = fs::read(format!("{tls_dir}/client.key")).unwrap();

// let client = Client::new(
// server_url,
// ChannelProviderSettings::builder()
// .set_tls_config(
// ClientTlsConfig::new()
// .ca_certificate(ca_certificate)
// .identity(Identity::from_pem(cert, key)),
// )
// .build(),
// )
// .await
// .unwrap();

// log::info!("{:?}", client.list_streams().await.unwrap());
// }
mod common;

use std::env;

use hstreamdb::tls::{Certificate, ClientTlsConfig, Identity};
use hstreamdb::{ChannelProviderSettings, Client};

#[tokio::test(flavor = "multi_thread")]
async fn test_tls() {
if env::var("ENDPOINT").is_ok() {
test_tls_impl().await
} else {
log::warn!("cloud endpoint is not presented");
log::warn!("ignore tls tests");
}
}

async fn test_tls_impl() {
env::set_var("RUST_LOG", "DEBUG");
env_logger::init();

let server_url: String = env::var("ENDPOINT").unwrap();
let ca_certificate: String = env::var("ROOT_CA").unwrap();
let cert = env::var("CLIENT_CRT").unwrap();
let key = env::var("CLIENT_KEY").unwrap();

let client = Client::new(
server_url,
ChannelProviderSettings::builder()
.set_tls_config(
ClientTlsConfig::new()
.ca_certificate(Certificate::from_pem(ca_certificate))
.identity(Identity::from_pem(cert, key)),
)
.build(),
)
.await
.unwrap();

log::info!("{:?}", client.list_streams().await.unwrap());

let client = common::Client(client);

let (stream, _sub) = client.new_stream_subscription().await.unwrap();
client
.write_rand(stream.stream_name, 10, 2000, 1000)
.await
.unwrap();
}
1 change: 0 additions & 1 deletion src/x/hstreamdb-erl-nifs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use rustler::{
ResourceArc, Term,
};
use tokio::sync::{oneshot, Mutex, MutexGuard};
use tokio_stream::StreamExt;
use tonic::transport::{Certificate, ClientTlsConfig, Identity};

mod runtime;
Expand Down