diff --git a/src/hstreamdb/src/producer.rs b/src/hstreamdb/src/producer.rs index 9efeed1..c5dc79f 100644 --- a/src/hstreamdb/src/producer.rs +++ b/src/hstreamdb/src/producer.rs @@ -71,6 +71,14 @@ impl FlushSettings { pub fn builder() -> FlushSettingsBuilder { default() } + + pub fn sync() -> Self { + Self { + len: 0, + size: 0, + deadline: None, + } + } } #[derive(Default)] diff --git a/src/hstreamdb/tests/tls_test.rs b/src/hstreamdb/tests/tls_test.rs index 3177294..5c708ed 100644 --- a/src/hstreamdb/tests/tls_test.rs +++ b/src/hstreamdb/tests/tls_test.rs @@ -1,32 +1,39 @@ -// use std::{env, fs}; +use std::env; -// use hstreamdb::tls::{Certificate, ClientTlsConfig, Identity}; -// use hstreamdb::{ChannelProviderSettings, Client}; +use hstreamdb::tls::{Certificate, ClientTlsConfig, Identity}; +use hstreamdb::{ChannelProviderSettings, Client}; -// async fn _test_tls_impl() { -// env::set_var("RUST_LOG", "DEBUG"); -// env_logger::init(); +#[tokio::test(flavor = "multi_thread")] +async fn test_tls() { + if let Ok(_) = env::var("ENDPOINT") { + test_tls_impl().await + } else { + log::warn!("cloud endpoint is not presented"); + log::warn!("ignore tls tests"); + } +} -// let server_url: &str = todo!(); -// let tls_dir: &str = todo!(); +async fn test_tls_impl() { + env::set_var("RUST_LOG", "DEBUG"); + env_logger::init(); -// let ca_certificate = -// Certificate::from_pem(fs::read(format!("{tls_dir}/root_ca.crt")).unwrap()); -// let cert = fs::read(format!("{tls_dir}/client.crt")).unwrap(); -// let key = fs::read(format!("{tls_dir}/client.key")).unwrap(); + let server_url: String = env::var("ENDPOINT").unwrap(); + let ca_certificate: String = env::var("ROOT_CA").unwrap(); + let cert = env::var("CLIENT_CRT").unwrap(); + let key = env::var("CLIENT_KEY").unwrap(); -// let client = Client::new( -// server_url, -// ChannelProviderSettings::builder() -// .set_tls_config( -// ClientTlsConfig::new() -// .ca_certificate(ca_certificate) -// .identity(Identity::from_pem(cert, key)), -// ) -// .build(), -// ) -// .await -// .unwrap(); + let client = Client::new( + server_url, + ChannelProviderSettings::builder() + .set_tls_config( + ClientTlsConfig::new() + .ca_certificate(Certificate::from_pem(ca_certificate)) + .identity(Identity::from_pem(cert, key)), + ) + .build(), + ) + .await + .unwrap(); -// log::info!("{:?}", client.list_streams().await.unwrap()); -// } + log::info!("{:?}", client.list_streams().await.unwrap()); +}