Skip to content

Commit

Permalink
Fix case where flush completes synchronously (#24)
Browse files Browse the repository at this point in the history
  • Loading branch information
hpeebles authored Feb 12, 2024
1 parent 10b2203 commit 57ddf07
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 10 deletions.
34 changes: 34 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ rand = "0.8.5"
rmp-serde = "1.1.2"
serde = "1.0.196"
serde_bytes = "0.11.14"
test-case = "3.3.1"
tokio = "1.36.0"
tokio-util = "0.7.10"
tracing = "0.1.40"
Expand Down
3 changes: 3 additions & 0 deletions client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,6 @@ edition = "2021"
ic_principal.workspace = true
event_sink_canister.path = "../canister/api"
serde.workspace = true

[dev-dependencies]
test-case.workspace = true
20 changes: 18 additions & 2 deletions client/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use event_sink_canister::{IdempotentEvent, TimestampMillis};
use ic_principal::Principal;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use std::mem;
use std::sync::{Arc, Mutex, MutexGuard};
use std::time::Duration;
use std::{mem, thread};

#[cfg(test)]
mod tests;
Expand Down Expand Up @@ -214,7 +214,23 @@ impl<R: Runtime + Send + 'static> Client<R> {
}

fn on_flush_complete(&mut self, outcome: FlushOutcome, events: Vec<IdempotentEvent>) {
let mut guard = self.inner.try_lock().unwrap();
if let Ok(guard) = self.inner.try_lock() {
self.on_flush_within_lock(guard, outcome, events);
} else {
let clone = self.clone();
thread::spawn(move || {
let guard = clone.inner.lock().unwrap();
clone.on_flush_within_lock(guard, outcome, events);
});
}
}

fn on_flush_within_lock(
&self,
mut guard: MutexGuard<ClientInner<R>>,
outcome: FlushOutcome,
events: Vec<IdempotentEvent>,
) {
guard.flush_in_progress = false;

match outcome {
Expand Down
39 changes: 31 additions & 8 deletions client/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@ use crate::{ClientBuilder, FlushOutcome, Runtime};
use event_sink_canister::{Event, IdempotentEvent, TimestampMillis};
use ic_principal::Principal;
use std::sync::{Arc, Mutex, MutexGuard};
use std::thread;
use std::time::Duration;
use test_case::test_case;

#[test]
fn batch_flushed_when_max_batch_size_reached() {
let runtime = TestRuntime::default();
#[test_case(true)]
#[test_case(false)]
fn batch_flushed_when_max_batch_size_reached(flush_synchronously: bool) {
let runtime = TestRuntime::new(flush_synchronously);
let mut client = ClientBuilder::new(Principal::anonymous(), runtime.clone())
.with_max_batch_size(5)
.build();
Expand All @@ -21,15 +24,17 @@ fn batch_flushed_when_max_batch_size_reached() {
payload: Vec::new(),
});
}
thread::sleep(Duration::from_millis(10));
assert_eq!(client.info().events_pending, 0);
assert_eq!(runtime.inner().flush_invocations, i + 1);
runtime.tick();
}
}

#[test]
fn batch_flushed_when_flush_delay_reached() {
let runtime = TestRuntime::default();
#[test_case(true)]
#[test_case(false)]
fn batch_flushed_when_flush_delay_reached(flush_synchronously: bool) {
let runtime = TestRuntime::new(flush_synchronously);
let mut client = ClientBuilder::new(Principal::anonymous(), runtime.clone())
.with_flush_delay(Duration::from_secs(5))
.build();
Expand All @@ -47,11 +52,13 @@ fn batch_flushed_when_flush_delay_reached() {
runtime.inner().timestamp += 4999;
runtime.tick();
runtime.tick();
thread::sleep(Duration::from_millis(10));
assert_eq!(client.info().events_pending, 5);
assert_eq!(runtime.inner().flush_invocations, i);
runtime.inner().timestamp += 1;
runtime.tick();
runtime.tick();
thread::sleep(Duration::from_millis(10));
assert_eq!(client.info().events_pending, 0);
assert_eq!(runtime.inner().flush_invocations, i + 1);
}
Expand All @@ -60,9 +67,17 @@ fn batch_flushed_when_flush_delay_reached() {
#[derive(Default, Clone)]
struct TestRuntime {
inner: Arc<Mutex<TestRuntimeInner>>,
flush_synchronously: bool,
}

impl TestRuntime {
fn new(flush_synchronously: bool) -> TestRuntime {
TestRuntime {
flush_synchronously,
..Default::default()
}
}

fn inner(&self) -> MutexGuard<TestRuntimeInner> {
self.inner.try_lock().unwrap()
}
Expand Down Expand Up @@ -97,9 +112,16 @@ impl Runtime for TestRuntime {
) {
let mut guard = self.inner();
guard.flush_invocations += 1;
guard.callback_due_at = Some(guard.timestamp);
let outcome = guard.flush_outcome;
guard.callback = Some(Box::new(move || on_complete(outcome)));

if self.flush_synchronously {
guard.callback_due_at = None;
guard.callback = None;
on_complete(outcome);
} else {
guard.callback_due_at = Some(guard.timestamp);
guard.callback = Some(Box::new(move || on_complete(outcome)));
}
}

fn rng(&mut self) -> u128 {
Expand Down Expand Up @@ -127,6 +149,7 @@ impl TestRuntime {
guard
.callback_due_at
.filter(|ts| *ts <= guard.timestamp)
.take()
.and_then(|_| guard.callback.take())
}
}

0 comments on commit 57ddf07

Please sign in to comment.