Skip to content

Commit

Permalink
Remove canister_id from state of Runtime implementations (#10)
Browse files Browse the repository at this point in the history
  • Loading branch information
hpeebles authored Feb 8, 2024
1 parent 8d778f8 commit 1c15baa
Show file tree
Hide file tree
Showing 8 changed files with 43 additions and 25 deletions.
4 changes: 3 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ candid = "0.10.3"
ic-agent = "0.32.0"
ic-cdk = "=0.12.0"
ic-cdk-timers = "0.6.0"
ic_principal = "0.1.1"
ic-stable-structures = "0.6.2"
pocket-ic = "2.0.1"
rand = "0.8.5"
Expand Down
1 change: 1 addition & 0 deletions client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
ic_principal.workspace = true
event_sink_canister.path = "../canister/api"
2 changes: 1 addition & 1 deletion client/agent_runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]

candid.workspace = true
event_sink_canister.path = "../../canister/api"
event_sink_client.path = ".."
ic-agent.workspace = true
ic_principal.workspace = true
rand.workspace = true
tokio.workspace = true
tokio.features = ["full"]
Expand Down
8 changes: 3 additions & 5 deletions client/agent_runtime/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,19 @@
use candid::Principal;
use event_sink_canister::{IdempotentEvent, PushEventsArgs, TimestampMillis};
use event_sink_client::Runtime;
use ic_agent::Agent;
use ic_principal::Principal;
use rand::random;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tokio_util::sync::CancellationToken;

pub struct AgentRuntime {
canister_id: Principal,
agent: Agent,
scheduler_task_cancellation_token: Option<CancellationToken>,
}

impl AgentRuntime {
pub fn new(canister_id: Principal, agent: Agent) -> AgentRuntime {
pub fn new(agent: Agent) -> AgentRuntime {
AgentRuntime {
canister_id,
agent,
scheduler_task_cancellation_token: None,
}
Expand Down Expand Up @@ -44,11 +42,11 @@ impl Runtime for AgentRuntime {

fn flush<F: FnOnce() + Send + 'static>(
&mut self,
canister_id: Principal,
events: Vec<IdempotentEvent>,
trigger_retry: F,
) {
self.cancel_scheduler_task();
let canister_id = self.canister_id;
let agent = self.agent.clone();

tokio::spawn(async move { flush_async(canister_id, agent, events, trigger_retry).await });
Expand Down
2 changes: 1 addition & 1 deletion client/cdk_runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
candid.workspace = true
event_sink_canister.path = "../../canister/api"
event_sink_client.path = ".."
ic-cdk.workspace = true
ic-cdk-timers.workspace = true
ic_principal.workspace = true
rand.workspace = true
29 changes: 15 additions & 14 deletions client/cdk_runtime/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,29 +1,17 @@
use candid::Principal;
use event_sink_canister::{IdempotentEvent, PushEventsArgs, TimestampMillis};
use event_sink_client::Runtime;
use ic_cdk_timers::TimerId;
use ic_principal::Principal;
use rand::rngs::StdRng;
use rand::{Rng, SeedableRng};
use std::time::Duration;

pub struct CdkRuntime {
canister_id: Principal,
rng: StdRng,
scheduled_flush_timer: Option<TimerId>,
}

impl CdkRuntime {
pub fn new(canister_id: Principal) -> CdkRuntime {
let mut seed = [0; 32];
seed[..8].copy_from_slice(&ic_cdk::api::time().to_ne_bytes());

CdkRuntime {
canister_id,
rng: StdRng::from_seed(seed),
scheduled_flush_timer: None,
}
}

fn clear_timer(&mut self) {
if let Some(timer_id) = self.scheduled_flush_timer.take() {
ic_cdk_timers::clear_timer(timer_id);
Expand All @@ -39,11 +27,12 @@ impl Runtime for CdkRuntime {

fn flush<F: FnOnce() + Send + 'static>(
&mut self,
canister_id: Principal,
events: Vec<IdempotentEvent>,
trigger_retry: F,
) {
self.clear_timer();
ic_cdk::spawn(flush_async(self.canister_id, events, trigger_retry))
ic_cdk::spawn(flush_async(canister_id, events, trigger_retry))
}

fn rng(&mut self) -> u128 {
Expand All @@ -67,3 +56,15 @@ async fn flush_async<F: FnOnce()>(
trigger_retry();
}
}

impl Default for CdkRuntime {
fn default() -> Self {
let mut seed = [0; 32];
seed[..8].copy_from_slice(&ic_cdk::api::time().to_ne_bytes());

CdkRuntime {
rng: StdRng::from_seed(seed),
scheduled_flush_timer: None,
}
}
}
21 changes: 18 additions & 3 deletions client/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use event_sink_canister::{IdempotentEvent, TimestampMillis};
use ic_principal::Principal;
use std::mem;
use std::sync::{Arc, Mutex, MutexGuard};
use std::time::Duration;
Expand All @@ -11,6 +12,7 @@ pub struct Client<R> {
}

struct ClientInner<R> {
event_sink_canister_id: Principal,
runtime: R,
flush_delay: Duration,
max_batch_size: usize,
Expand All @@ -24,6 +26,7 @@ pub trait Runtime {
fn schedule_flush<F: FnOnce() + Send + 'static>(&mut self, delay: Duration, callback: F);
fn flush<F: FnOnce() + Send + 'static>(
&mut self,
event_sync_canister_id: Principal,
events: Vec<IdempotentEvent>,
trigger_retry: F,
);
Expand All @@ -38,15 +41,17 @@ impl<R> Client<R> {
}

pub struct ClientBuilder<R> {
event_sink_canister_id: Principal,
runtime: R,
flush_delay: Option<Duration>,
max_batch_size: Option<u32>,
events: Vec<IdempotentEvent>,
}

impl<R: Runtime + Send + 'static> ClientBuilder<R> {
pub fn new(runtime: R) -> ClientBuilder<R> {
pub fn new(event_sink_canister_id: Principal, runtime: R) -> ClientBuilder<R> {
ClientBuilder {
event_sink_canister_id,
runtime,
flush_delay: None,
max_batch_size: None,
Expand Down Expand Up @@ -74,6 +79,7 @@ impl<R: Runtime + Send + 'static> ClientBuilder<R> {
let max_batch_size = self.max_batch_size.unwrap_or(DEFAULT_MAX_BATCH_SIZE) as usize;
let client = Client {
inner: Arc::new(Mutex::new(ClientInner::new(
self.event_sink_canister_id,
self.runtime,
flush_delay,
max_batch_size,
Expand Down Expand Up @@ -115,9 +121,12 @@ impl<R: Runtime + Send + 'static> Client<R> {
};

let clone = self.clone();
let event_sink_canister_id = guard.event_sink_canister_id;
guard
.runtime
.flush(events.clone(), move || clone.requeue_events(events));
.flush(event_sink_canister_id, events.clone(), move || {
clone.requeue_events(events)
});
}
}

Expand Down Expand Up @@ -164,8 +173,14 @@ impl<R> Clone for Client<R> {
}

impl<R> ClientInner<R> {
pub fn new(runtime: R, flush_delay: Duration, max_batch_size: usize) -> ClientInner<R> {
pub fn new(
event_sink_canister_id: Principal,
runtime: R,
flush_delay: Duration,
max_batch_size: usize,
) -> ClientInner<R> {
ClientInner {
event_sink_canister_id,
runtime,
flush_delay,
max_batch_size,
Expand Down

0 comments on commit 1c15baa

Please sign in to comment.