diff --git a/Cargo.lock b/Cargo.lock index 9ee41fe..54f03a2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -523,6 +523,7 @@ dependencies = [ "event_sink_canister", "ic_principal", "serde", + "serde_json", "test-case", ] @@ -1939,9 +1940,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.113" +version = "1.0.114" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "69801b70b1c3dac963ecb03a364ba0ceda9cf60c71cfe475e99864759c8b8a79" +checksum = "c5f09b1bd632ef549eaa9f60a1f8de742bdbc698e6cee2095fc84dde5f549ae0" dependencies = [ "itoa", "ryu", diff --git a/Cargo.toml b/Cargo.toml index e119413..1aeaf17 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,6 +22,7 @@ rand = "0.8.5" rmp-serde = "1.1.2" serde = "1.0.196" serde_bytes = "0.11.14" +serde_json = "1.0.114" test-case = "3.3.1" tokio = "1.36.0" tokio-util = "0.7.10" diff --git a/rs/canister/api/src/lib.rs b/rs/canister/api/src/lib.rs index b12e064..bb0512b 100644 --- a/rs/canister/api/src/lib.rs +++ b/rs/canister/api/src/lib.rs @@ -11,16 +11,6 @@ pub use updates::*; pub type TimestampMillis = u64; -#[derive(CandidType, Serialize, Deserialize, Clone, Debug)] -pub struct Event { - pub name: String, - pub timestamp: TimestampMillis, - pub user: Option, - pub source: Option, - #[serde(with = "serde_bytes")] - pub payload: Vec, -} - #[derive(CandidType, Serialize, Deserialize, Clone, Debug)] pub struct IdempotentEvent { pub idempotency_key: u128, diff --git a/rs/client/Cargo.toml b/rs/client/Cargo.toml index d6dcbb6..588213f 100644 --- a/rs/client/Cargo.toml +++ b/rs/client/Cargo.toml @@ -9,6 +9,11 @@ edition = "2021" ic_principal.workspace = true event_sink_canister.path = "../canister/api" serde.workspace = true +serde_json.workspace = true +serde_json.optional = true [dev-dependencies] test-case.workspace = true + +[features] +json = ["serde_json"] diff --git a/rs/client/src/lib.rs b/rs/client/src/lib.rs index fbef814..258e2d4 100644 --- a/rs/client/src/lib.rs +++ b/rs/client/src/lib.rs @@ -39,7 +39,73 @@ struct ClientInner { total_events_flushed: u64, } -pub use event_sink_canister::Event; +pub struct Event { + name: String, + timestamp: TimestampMillis, + user: Option, + source: Option, + payload: Vec, +} + +pub struct EventBuilder { + name: String, + timestamp: TimestampMillis, + user: Option, + source: Option, + payload: Vec, +} + +impl EventBuilder { + pub fn new(name: impl Into, timestamp: TimestampMillis) -> Self { + Self { + name: name.into(), + timestamp, + user: None, + source: None, + payload: Vec::new(), + } + } + + pub fn with_user(mut self, user: impl Into) -> Self { + self.user = Some(user.into()); + self + } + + pub fn with_maybe_user(mut self, user: Option>) -> Self { + self.user = user.map(|u| u.into()); + self + } + + pub fn with_source(mut self, source: impl Into) -> Self { + self.source = Some(source.into()); + self + } + + pub fn with_maybe_source(mut self, source: Option>) -> Self { + self.source = source.map(|u| u.into()); + self + } + + pub fn with_payload(mut self, payload: Vec) -> Self { + self.payload = payload; + self + } + + #[cfg(feature = "json")] + pub fn with_json_payload(self, payload: &P) -> Self { + self.with_payload(serde_json::to_vec(payload).unwrap()) + } + + pub fn build(self) -> Event { + Event { + name: self.name, + timestamp: self.timestamp, + user: self.user, + source: self.source, + payload: self.payload, + } + } +} pub trait Runtime { fn schedule_flush(&mut self, delay: Duration, callback: F); @@ -177,7 +243,7 @@ impl Client { self.process_events(guard, true); } - pub fn flush_batch(&self) { + fn flush_batch(&self) { let guard = self.inner.try_lock().unwrap(); self.flush_batch_within_lock(guard); } diff --git a/rs/client/src/tests.rs b/rs/client/src/tests.rs index 64f31e4..c73baab 100644 --- a/rs/client/src/tests.rs +++ b/rs/client/src/tests.rs @@ -1,5 +1,5 @@ -use crate::{ClientBuilder, FlushOutcome, Runtime}; -use event_sink_canister::{Event, IdempotentEvent, TimestampMillis}; +use crate::{ClientBuilder, EventBuilder, FlushOutcome, Runtime}; +use event_sink_canister::{IdempotentEvent, TimestampMillis}; use ic_principal::Principal; use std::sync::{Arc, Mutex, MutexGuard}; use std::thread; @@ -16,13 +16,7 @@ fn batch_flushed_when_max_batch_size_reached(flush_synchronously: bool) { for i in 0..10 { for _ in 0..5 { - client.push(Event { - name: i.to_string(), - timestamp: 0, - user: None, - source: None, - payload: Vec::new(), - }); + client.push(EventBuilder::new(i.to_string(), 0).build()); } thread::sleep(Duration::from_millis(10)); assert_eq!(client.info().events_pending, 0); @@ -41,13 +35,7 @@ fn batch_flushed_when_flush_delay_reached(flush_synchronously: bool) { for i in 0..10 { for _ in 0..5 { - client.push(Event { - name: i.to_string(), - timestamp: 0, - user: None, - source: None, - payload: Vec::new(), - }) + client.push(EventBuilder::new(i.to_string(), 0).build()); } runtime.inner().timestamp += 4999; runtime.tick();