Skip to content

Commit

Permalink
Add json feature for pushing events with JSON payloads (#35)
Browse files Browse the repository at this point in the history
  • Loading branch information
hpeebles authored Feb 23, 2024
1 parent 6ddc1d1 commit c0cefe7
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 30 deletions.
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ rand = "0.8.5"
rmp-serde = "1.1.2"
serde = "1.0.196"
serde_bytes = "0.11.14"
serde_json = "1.0.114"
test-case = "3.3.1"
tokio = "1.36.0"
tokio-util = "0.7.10"
Expand Down
10 changes: 0 additions & 10 deletions rs/canister/api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,6 @@ pub use updates::*;

pub type TimestampMillis = u64;

#[derive(CandidType, Serialize, Deserialize, Clone, Debug)]
pub struct Event {
pub name: String,
pub timestamp: TimestampMillis,
pub user: Option<String>,
pub source: Option<String>,
#[serde(with = "serde_bytes")]
pub payload: Vec<u8>,
}

#[derive(CandidType, Serialize, Deserialize, Clone, Debug)]
pub struct IdempotentEvent {
pub idempotency_key: u128,
Expand Down
5 changes: 5 additions & 0 deletions rs/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@ edition = "2021"
ic_principal.workspace = true
event_sink_canister.path = "../canister/api"
serde.workspace = true
serde_json.workspace = true
serde_json.optional = true

[dev-dependencies]
test-case.workspace = true

[features]
json = ["serde_json"]
70 changes: 68 additions & 2 deletions rs/client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,73 @@ struct ClientInner<R> {
total_events_flushed: u64,
}

pub use event_sink_canister::Event;
pub struct Event {
name: String,
timestamp: TimestampMillis,
user: Option<String>,
source: Option<String>,
payload: Vec<u8>,
}

pub struct EventBuilder {
name: String,
timestamp: TimestampMillis,
user: Option<String>,
source: Option<String>,
payload: Vec<u8>,
}

impl EventBuilder {
pub fn new(name: impl Into<String>, timestamp: TimestampMillis) -> Self {
Self {
name: name.into(),
timestamp,
user: None,
source: None,
payload: Vec::new(),
}
}

pub fn with_user(mut self, user: impl Into<String>) -> Self {
self.user = Some(user.into());
self
}

pub fn with_maybe_user(mut self, user: Option<impl Into<String>>) -> Self {
self.user = user.map(|u| u.into());
self
}

pub fn with_source(mut self, source: impl Into<String>) -> Self {
self.source = Some(source.into());
self
}

pub fn with_maybe_source(mut self, source: Option<impl Into<String>>) -> Self {
self.source = source.map(|u| u.into());
self
}

pub fn with_payload(mut self, payload: Vec<u8>) -> Self {
self.payload = payload;
self
}

#[cfg(feature = "json")]
pub fn with_json_payload<P: Serialize>(self, payload: &P) -> Self {
self.with_payload(serde_json::to_vec(payload).unwrap())
}

pub fn build(self) -> Event {
Event {
name: self.name,
timestamp: self.timestamp,
user: self.user,
source: self.source,
payload: self.payload,
}
}
}

pub trait Runtime {
fn schedule_flush<F: FnOnce() + Send + 'static>(&mut self, delay: Duration, callback: F);
Expand Down Expand Up @@ -177,7 +243,7 @@ impl<R: Runtime + Send + 'static> Client<R> {
self.process_events(guard, true);
}

pub fn flush_batch(&self) {
fn flush_batch(&self) {
let guard = self.inner.try_lock().unwrap();
self.flush_batch_within_lock(guard);
}
Expand Down
20 changes: 4 additions & 16 deletions rs/client/src/tests.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::{ClientBuilder, FlushOutcome, Runtime};
use event_sink_canister::{Event, IdempotentEvent, TimestampMillis};
use crate::{ClientBuilder, EventBuilder, FlushOutcome, Runtime};
use event_sink_canister::{IdempotentEvent, TimestampMillis};
use ic_principal::Principal;
use std::sync::{Arc, Mutex, MutexGuard};
use std::thread;
Expand All @@ -16,13 +16,7 @@ fn batch_flushed_when_max_batch_size_reached(flush_synchronously: bool) {

for i in 0..10 {
for _ in 0..5 {
client.push(Event {
name: i.to_string(),
timestamp: 0,
user: None,
source: None,
payload: Vec::new(),
});
client.push(EventBuilder::new(i.to_string(), 0).build());
}
thread::sleep(Duration::from_millis(10));
assert_eq!(client.info().events_pending, 0);
Expand All @@ -41,13 +35,7 @@ fn batch_flushed_when_flush_delay_reached(flush_synchronously: bool) {

for i in 0..10 {
for _ in 0..5 {
client.push(Event {
name: i.to_string(),
timestamp: 0,
user: None,
source: None,
payload: Vec::new(),
})
client.push(EventBuilder::new(i.to_string(), 0).build());
}
runtime.inner().timestamp += 4999;
runtime.tick();
Expand Down

0 comments on commit c0cefe7

Please sign in to comment.